#!/usr/bin/env python3 """ OSM Calendar Extractor for the OpenEventDatabase. This script fetches events from the OpenStreetMap Calendar RSS feed and adds them to the OpenEventDatabase via the API. For events that don't have geographic coordinates in the RSS feed but have a link to an OSM Calendar event (https://osmcal.org/event/...), the script will fetch the iCal version of the event and extract the coordinates and location from there. RSS Feed URL: https://osmcal.org/events.rss API Endpoint: https://api.openeventdatabase.org/event Usage: python osm_cal.py [--max-events MAX_EVENTS] [--offset OFFSET] Arguments: --max-events MAX_EVENTS Maximum number of events to insert (default: 1) --offset OFFSET Number of events to skip from the beginning of the RSS feed (default: 0) Examples: # Insert the first event from the RSS feed python osm_cal.py # Insert up to 5 events from the RSS feed python osm_cal.py --max-events 5 # Skip the first 3 events and insert the next 2 python osm_cal.py --offset 3 --max-events 2 Environment Variables: These environment variables can be set in the system environment or in a .env file in the project root directory. """ import json import requests import sys import os import xml.etree.ElementTree as ET import re import html from datetime import datetime, timedelta # Add the parent directory to the path so we can import from oedb sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) from oedb.utils.db import db_connect, load_env_from_file from oedb.utils.logging import logger # RSS Feed URL for OSM Calendar RSS_URL = "https://osmcal.org/events.rss" # Base URL for OSM Calendar events OSMCAL_EVENT_BASE_URL = "https://osmcal.org/event/" def fetch_osm_calendar_data(): """ Fetch events from the OSM Calendar RSS feed. Returns: list: A list of event items from the RSS feed. """ logger.info("Fetching data from OSM Calendar RSS feed") try: response = requests.get(RSS_URL) response.raise_for_status() # Raise an exception for HTTP errors # Parse the XML response root = ET.fromstring(response.content) # Find all item elements (events) channel = root.find('channel') if channel is None: logger.error("No channel element found in RSS feed") return [] items = channel.findall('item') if not items: logger.error("No items found in RSS feed") return [] logger.success(f"Successfully fetched {len(items)} events from OSM Calendar RSS feed") return items except requests.exceptions.RequestException as e: logger.error(f"Error fetching data from OSM Calendar RSS feed: {e}") return [] except ET.ParseError as e: logger.error(f"Error parsing XML response: {e}") return [] except Exception as e: logger.error(f"Unexpected error fetching OSM Calendar data: {e}") return [] def parse_event_dates(description): """ Parse event dates from the description. Args: description (str): The event description HTML. Returns: tuple: A tuple containing (start_date, end_date) as ISO format strings. """ try: # Extract the date information from the description date_pattern = r'(\d+)(?:st|nd|rd|th)\s+(\w+)(?:\s+(\d+):(\d+)(?:\s+–\s+(\d+):(\d+))?)?(?:\s+\(([^)]+)\))?(?:\s+–\s+(\d+)(?:st|nd|rd|th)\s+(\w+))?' date_match = re.search(date_pattern, description) if not date_match: # Try alternative pattern for single day with time range date_pattern = r'(\d+)(?:st|nd|rd|th)\s+(\w+)\s+(\d+):(\d+)\s+–\s+(\d+):(\d+)' date_match = re.search(date_pattern, description) if date_match: # Extract date components day = int(date_match.group(1)) month_name = date_match.group(2) # Convert month name to month number month_map = { 'January': 1, 'February': 2, 'March': 3, 'April': 4, 'May': 5, 'June': 6, 'July': 7, 'August': 8, 'September': 9, 'October': 10, 'November': 11, 'December': 12 } # Try to match the month name (case insensitive) month = None for name, num in month_map.items(): if month_name.lower() == name.lower(): month = num break if month is None: # If month name not found, use current month month = datetime.now().month logger.warning(f"Could not parse month name: {month_name}, using current month") # Get current year (assuming events are current or future) current_year = datetime.now().year # Create start date try: start_date = datetime(current_year, month, day) except ValueError: # Handle invalid dates (e.g., February 30) logger.warning(f"Invalid date: {day} {month_name} {current_year}, using current date") start_date = datetime.now() # Check if there's an end date if len(date_match.groups()) >= 8 and date_match.group(8): end_day = int(date_match.group(8)) end_month_name = date_match.group(9) # Convert end month name to month number end_month = None for name, num in month_map.items(): if end_month_name.lower() == name.lower(): end_month = num break if end_month is None: # If end month name not found, use start month end_month = month logger.warning(f"Could not parse end month name: {end_month_name}, using start month") try: end_date = datetime(current_year, end_month, end_day) # Add a day to include the full end day end_date = end_date + timedelta(days=1) except ValueError: # Handle invalid dates logger.warning(f"Invalid end date: {end_day} {end_month_name} {current_year}, using start date + 1 day") end_date = start_date + timedelta(days=1) else: # If no end date, use start date + 1 day as default end_date = start_date + timedelta(days=1) # Format dates as ISO strings start_iso = start_date.isoformat() end_iso = end_date.isoformat() return (start_iso, end_iso) else: # If no date pattern found, use current date as fallback now = datetime.now() start_iso = now.isoformat() end_iso = (now + timedelta(days=1)).isoformat() logger.warning(f"Could not parse date from description, using current date: {start_iso} to {end_iso}") return (start_iso, end_iso) except Exception as e: logger.error(f"Error parsing event dates: {e}") # Return default dates (current date) now = datetime.now() return (now.isoformat(), (now + timedelta(days=1)).isoformat()) def fetch_ical_data(event_url): """ Fetch and parse iCal data for an OSM Calendar event. Args: event_url (str): The URL of the OSM Calendar event. Returns: tuple: A tuple containing (location_name, coordinates). """ try: # Check if the URL is an OSM Calendar event URL if not event_url.startswith(OSMCAL_EVENT_BASE_URL): logger.warning(f"Not an OSM Calendar event URL: {event_url}") return ("Unknown Location", [0, 0]) # Extract the event ID from the URL event_id_match = re.search(r'event/(\d+)', event_url) if not event_id_match: logger.warning(f"Could not extract event ID from URL: {event_url}") return ("Unknown Location", [0, 0]) event_id = event_id_match.group(1) # Construct the iCal URL ical_url = f"{OSMCAL_EVENT_BASE_URL}{event_id}.ics" # Fetch the iCal content logger.info(f"Fetching iCal data from: {ical_url}") response = requests.get(ical_url) if not response.ok: logger.warning(f"Failed to fetch iCal data: {response.status_code}") return ("Unknown Location", [0, 0]) # Parse the iCal content ical_content = response.text # Extract GEO information geo_match = re.search(r'GEO:([-+]?\d+\.\d+);([-+]?\d+\.\d+)', ical_content) if geo_match: # GEO format is latitude;longitude latitude = float(geo_match.group(2)) longitude = float(geo_match.group(1)) coordinates = [longitude, latitude] # GeoJSON uses [longitude, latitude] logger.info(f"Extracted coordinates from iCal: {coordinates}") else: logger.warning(f"No GEO information found in iCal data for event: {event_id}") coordinates = [0, 0] # Extract LOCATION information location_match = re.search(r'LOCATION:(.+?)(?:\r\n|\n|\r)', ical_content) if location_match: location_name = location_match.group(1).strip() # Unescape backslash-escaped characters (e.g., \, becomes ,) location_name = re.sub(r'\\(.)', r'\1', location_name) logger.info(f"Extracted location from iCal: {location_name}") else: logger.warning(f"No LOCATION information found in iCal data for event: {event_id}") location_name = "Unknown Location" return (location_name, coordinates) except Exception as e: logger.error(f"Error fetching or parsing iCal data: {e}") return ("Unknown Location", [0, 0]) def extract_location(description): """ Extract location information from the event description. Args: description (str): The event description HTML. Returns: tuple: A tuple containing (location_name, coordinates). """ try: # Default coordinates (center of the world) coordinates = [0, 0] location_name = "Unknown Location" # Try to find location in the description location_pattern = r'
([^<]+)
' location_matches = re.findall(location_pattern, description) if location_matches and len(location_matches) > 1: # The second paragraph often contains the location location_candidate = location_matches[1].strip() if location_candidate and "," in location_candidate and not location_candidate.startswith('<'): location_name = location_candidate # For now, we don't have exact coordinates, so we'll use a placeholder # In a real implementation, you might want to geocode the location coordinates = [0, 0] return (location_name, coordinates) except Exception as e: logger.error(f"Error extracting location: {e}") return ("Unknown Location", [0, 0]) def create_event(item): """ Create an event object from an RSS item. Args: item: An item element from the RSS feed. Returns: dict: A GeoJSON Feature representing the event. """ try: # Extract data from the item title = item.find('title').text link = item.find('link').text description = item.find('description').text guid = item.find('guid').text # Clean up the description (remove HTML tags for text extraction) clean_description = re.sub(r'<[^>]+>', ' ', description) clean_description = html.unescape(clean_description) clean_description = re.sub(r'\s+', ' ', clean_description).strip() # Parse dates from the description start_date, end_date = parse_event_dates(description) # Extract location information from the description location_name, coordinates = extract_location(description) # If we don't have coordinates and the link is to an OSM Calendar event, # try to get coordinates and location from the iCal file if coordinates == [0, 0] and link and link.startswith(OSMCAL_EVENT_BASE_URL): logger.info(f"No coordinates found in description, trying to get from iCal: {link}") ical_location_name, ical_coordinates = fetch_ical_data(link) # Use iCal coordinates if available if ical_coordinates != [0, 0]: coordinates = ical_coordinates logger.info(f"Using coordinates from iCal: {coordinates}") # Use iCal location name if available and better than what we have if ical_location_name != "Unknown Location": location_name = ical_location_name logger.info(f"Using location name from iCal: {location_name}") # Create a descriptive label label = title # Create the event object event = { "type": "Feature", "geometry": { "type": "Point", "coordinates": coordinates }, "properties": { "type": "scheduled", "what": "community.osm.event", "what:series": "OpenStreetMap Calendar", "where": location_name, "label": label, "description": clean_description, "start": start_date, "stop": end_date, "url": link, "external_id": guid, "source": "OSM Calendar" } } return event except Exception as e: logger.error(f"Error creating event from item: {e}") return None def event_exists(db, properties): """ Check if an event with the same properties already exists in the database. Args: db: Database connection. properties: Event properties. Returns: bool: True if the event exists, False otherwise. """ print('event: ', properties) try: cur = db.cursor() # Check if an event with the same external_id exists if 'external_id' in properties: cur.execute(""" SELECT events_id FROM events WHERE events_tags->>'external_id' = %s; """, (properties['external_id'],)) result = cur.fetchone() if result: logger.info(f"Event with external_id {properties['external_id']} already exists") return True # Check if an event with the same label, start, and stop exists cur.execute(""" SELECT events_id FROM events WHERE events_tags->>'label' = %s AND events_tags->>'start' = %s AND events_tags->>'stop' = %s; """, ( properties.get('label', ''), properties.get('start', ''), properties.get('stop', '') )) result = cur.fetchone() if result: logger.info(f"Event with label '{properties.get('label')}' and same dates already exists") return True return False except Exception as e: logger.error(f"Error checking if event exists: {e}") return False def submit_event(event): """ Submit an event to the OpenEventDatabase using the API. Args: event: A GeoJSON Feature representing the event. Returns: bool: True if the event was successfully submitted, False otherwise. """ try: # Extract event properties for logging properties = event['properties'] # API endpoint for OpenEventDatabase api_url = "https://api.openeventdatabase.org/event" # Make the API request logger.info(f"Submitting event '{properties.get('label')}' to API") response = requests.post( api_url, headers={"Content-Type": "application/json"}, data=json.dumps(event) ) # Check if the request was successful if response.status_code == 200 or response.status_code == 201: # Parse the response to get the event ID response_data = response.json() event_id = response_data.get('id') if event_id: logger.success(f"Event created with ID: {event_id}") return True else: logger.warning(f"Event created but no ID returned in response") return True else: logger.warning(f"Failed to create event: {properties.get('label')}. Status code: {response.status_code}") logger.warning(f"Response: {response.text}") return False except Exception as e: logger.error(f"Error submitting event: {e}") return False def main(max_events=1, offset=0): """ Main function to fetch OSM Calendar events and add them to the OpenEventDatabase API. Args: max_events (int): Maximum number of events to insert (default: 1) offset (int): Number of events to skip from the beginning of the RSS feed (default: 0) The function will exit if the .env file doesn't exist, as it's required for environment variables. """ logger.info(f"Starting OSM Calendar extractor (max_events={max_events}, offset={offset})") # Load environment variables from .env file and check if it exists if not load_env_from_file(): logger.error("Required .env file not found. Exiting.") sys.exit(1) logger.info("Environment variables loaded successfully from .env file") # Fetch events from the OSM Calendar RSS feed items = fetch_osm_calendar_data() if not items: logger.warning("No events found, exiting") return # Apply offset and limit if offset >= len(items): logger.warning(f"Offset {offset} is greater than or equal to the number of events {len(items)}, no events to process") return # Slice the items list according to offset and max_events items_to_process = items[offset:offset + max_events] logger.info(f"Processing {len(items_to_process)} events (offset={offset}, max_events={max_events})") # Process each item success_count = 0 for item in items_to_process: # Create an event from the item event = create_event(item) if not event: continue # Submit the event to the API if submit_event(event): success_count += 1 logger.success(f"Successfully added {success_count} out of {len(items_to_process)} events to the OpenEventDatabase") if __name__ == "__main__": import argparse # Set up command line argument parsing parser = argparse.ArgumentParser(description='OSM Calendar Extractor for the OpenEventDatabase') parser.add_argument('--max-events', type=int, default=1, help='Maximum number of events to insert (default: 1)') parser.add_argument('--offset', type=int, default=0, help='Number of events to skip from the beginning of the RSS feed (default: 0)') # Parse arguments args = parser.parse_args() # Run the main function with the provided arguments main(max_events=args.max_events, offset=args.offset)