#!/usr/bin/env python3 """ OSM Calendar Extractor for the OpenEventDatabase. This script fetches events from the OpenStreetMap Calendar RSS feed and adds them to the OpenEventDatabase if they don't already exist. RSS Feed URL: https://osmcal.org/events.rss Environment Variables: DB_NAME: The name of the database (default: "oedb") DB_HOST: The hostname of the database server (default: "localhost") DB_USER: The username to connect to the database (default: "") POSTGRES_PASSWORD: The password to connect to the database (default: None) These environment variables can be set in the system environment or in a .env file in the project root directory. """ import json import requests import sys import os import xml.etree.ElementTree as ET import re import html from datetime import datetime, timedelta # Add the parent directory to the path so we can import from oedb sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) from oedb.utils.db import db_connect, load_env_from_file from oedb.utils.logging import logger # RSS Feed URL for OSM Calendar RSS_URL = "https://osmcal.org/events.rss" def fetch_osm_calendar_data(): """ Fetch events from the OSM Calendar RSS feed. Returns: list: A list of event items from the RSS feed. """ logger.info("Fetching data from OSM Calendar RSS feed") try: response = requests.get(RSS_URL) response.raise_for_status() # Raise an exception for HTTP errors # Parse the XML response root = ET.fromstring(response.content) # Find all item elements (events) channel = root.find('channel') if channel is None: logger.error("No channel element found in RSS feed") return [] items = channel.findall('item') if not items: logger.error("No items found in RSS feed") return [] logger.success(f"Successfully fetched {len(items)} events from OSM Calendar RSS feed") return items except requests.exceptions.RequestException as e: logger.error(f"Error fetching data from OSM Calendar RSS feed: {e}") return [] except ET.ParseError as e: logger.error(f"Error parsing XML response: {e}") return [] except Exception as e: logger.error(f"Unexpected error fetching OSM Calendar data: {e}") return [] def parse_event_dates(description): """ Parse event dates from the description. Args: description (str): The event description HTML. Returns: tuple: A tuple containing (start_date, end_date) as ISO format strings. """ try: # Extract the date information from the description date_pattern = r'(\d+)(?:st|nd|rd|th)\s+(\w+)(?:\s+(\d+):(\d+)(?:\s+–\s+(\d+):(\d+))?)?(?:\s+\(([^)]+)\))?(?:\s+–\s+(\d+)(?:st|nd|rd|th)\s+(\w+))?' date_match = re.search(date_pattern, description) if not date_match: # Try alternative pattern for single day with time range date_pattern = r'(\d+)(?:st|nd|rd|th)\s+(\w+)\s+(\d+):(\d+)\s+–\s+(\d+):(\d+)' date_match = re.search(date_pattern, description) if date_match: # Extract date components day = int(date_match.group(1)) month_name = date_match.group(2) # Convert month name to month number month_map = { 'January': 1, 'February': 2, 'March': 3, 'April': 4, 'May': 5, 'June': 6, 'July': 7, 'August': 8, 'September': 9, 'October': 10, 'November': 11, 'December': 12 } # Try to match the month name (case insensitive) month = None for name, num in month_map.items(): if month_name.lower() == name.lower(): month = num break if month is None: # If month name not found, use current month month = datetime.now().month logger.warning(f"Could not parse month name: {month_name}, using current month") # Get current year (assuming events are current or future) current_year = datetime.now().year # Create start date try: start_date = datetime(current_year, month, day) except ValueError: # Handle invalid dates (e.g., February 30) logger.warning(f"Invalid date: {day} {month_name} {current_year}, using current date") start_date = datetime.now() # Check if there's an end date if len(date_match.groups()) >= 8 and date_match.group(8): end_day = int(date_match.group(8)) end_month_name = date_match.group(9) # Convert end month name to month number end_month = None for name, num in month_map.items(): if end_month_name.lower() == name.lower(): end_month = num break if end_month is None: # If end month name not found, use start month end_month = month logger.warning(f"Could not parse end month name: {end_month_name}, using start month") try: end_date = datetime(current_year, end_month, end_day) # Add a day to include the full end day end_date = end_date + timedelta(days=1) except ValueError: # Handle invalid dates logger.warning(f"Invalid end date: {end_day} {end_month_name} {current_year}, using start date + 1 day") end_date = start_date + timedelta(days=1) else: # If no end date, use start date + 1 day as default end_date = start_date + timedelta(days=1) # Format dates as ISO strings start_iso = start_date.isoformat() end_iso = end_date.isoformat() return (start_iso, end_iso) else: # If no date pattern found, use current date as fallback now = datetime.now() start_iso = now.isoformat() end_iso = (now + timedelta(days=1)).isoformat() logger.warning(f"Could not parse date from description, using current date: {start_iso} to {end_iso}") return (start_iso, end_iso) except Exception as e: logger.error(f"Error parsing event dates: {e}") # Return default dates (current date) now = datetime.now() return (now.isoformat(), (now + timedelta(days=1)).isoformat()) def extract_location(description): """ Extract location information from the event description. Args: description (str): The event description HTML. Returns: tuple: A tuple containing (location_name, coordinates). """ try: # Default coordinates (center of the world) coordinates = [0, 0] location_name = "Unknown Location" # Try to find location in the description location_pattern = r'

([^<]+)

' location_matches = re.findall(location_pattern, description) if location_matches and len(location_matches) > 1: # The second paragraph often contains the location location_candidate = location_matches[1].strip() if location_candidate and "," in location_candidate and not location_candidate.startswith('<'): location_name = location_candidate # For now, we don't have exact coordinates, so we'll use a placeholder # In a real implementation, you might want to geocode the location coordinates = [0, 0] return (location_name, coordinates) except Exception as e: logger.error(f"Error extracting location: {e}") return ("Unknown Location", [0, 0]) def create_event(item): """ Create an event object from an RSS item. Args: item: An item element from the RSS feed. Returns: dict: A GeoJSON Feature representing the event. """ try: # Extract data from the item title = item.find('title').text link = item.find('link').text description = item.find('description').text guid = item.find('guid').text # Clean up the description (remove HTML tags for text extraction) clean_description = re.sub(r'<[^>]+>', ' ', description) clean_description = html.unescape(clean_description) clean_description = re.sub(r'\s+', ' ', clean_description).strip() # Parse dates from the description start_date, end_date = parse_event_dates(description) # Extract location information location_name, coordinates = extract_location(description) # Create a descriptive label label = title # Create the event object event = { "type": "Feature", "geometry": { "type": "Point", "coordinates": coordinates }, "properties": { "type": "scheduled", "what": "community.osm.event", "what:series": "OpenStreetMap Calendar", "where": location_name, "label": label, "description": clean_description, "start": start_date, "stop": end_date, "url": link, "external_id": guid, "source": "OSM Calendar" } } return event except Exception as e: logger.error(f"Error creating event from item: {e}") return None def event_exists(db, properties): """ Check if an event with the same properties already exists in the database. Args: db: Database connection. properties: Event properties. Returns: bool: True if the event exists, False otherwise. """ print('event: ', properties) try: cur = db.cursor() # Check if an event with the same external_id exists if 'external_id' in properties: cur.execute(""" SELECT events_id FROM events WHERE events_tags->>'external_id' = %s; """, (properties['external_id'],)) result = cur.fetchone() if result: logger.info(f"Event with external_id {properties['external_id']} already exists") return True # Check if an event with the same label, start, and stop exists cur.execute(""" SELECT events_id FROM events WHERE events_tags->>'label' = %s AND events_tags->>'start' = %s AND events_tags->>'stop' = %s; """, ( properties.get('label', ''), properties.get('start', ''), properties.get('stop', '') )) result = cur.fetchone() if result: logger.info(f"Event with label '{properties.get('label')}' and same dates already exists") return True return False except Exception as e: logger.error(f"Error checking if event exists: {e}") return False def submit_event(event): """ Submit an event to the OpenEventDatabase. Args: event: A GeoJSON Feature representing the event. Returns: bool: True if the event was successfully submitted, False otherwise. """ try: # Connect to the database db = db_connect() # Extract event properties properties = event['properties'] # Check if the event already exists if event_exists(db, properties): logger.info(f"Skipping event '{properties.get('label')}' as it already exists") db.close() return False cur = db.cursor() geometry = json.dumps(event['geometry']) print('event: ', event) # Insert the geometry into the geo table cur.execute(""" INSERT INTO geo SELECT geom, md5(st_astext(geom)) as hash, st_centroid(geom) as geom_center FROM (SELECT st_setsrid(st_geomfromgeojson(%s),4326) as geom) as g WHERE ST_IsValid(geom) ON CONFLICT DO NOTHING RETURNING hash; """, (geometry,)) # Get the geometry hash hash_result = cur.fetchone() if hash_result is None: # If the hash is None, check if the geometry already exists in the database cur.execute(""" SELECT hash FROM geo WHERE hash = md5(st_astext(st_setsrid(st_geomfromgeojson(%s),4326))); """, (geometry,)) existing_hash = cur.fetchone() if existing_hash: # Geometry already exists in the database, use its hash geo_hash = existing_hash[0] logger.info(f"Using existing geometry with hash: {geo_hash}") else: # Geometry doesn't exist, try to insert it directly cur.execute(""" SELECT md5(st_astext(geom)) as hash, ST_IsValid(geom), ST_IsValidReason(geom) from (SELECT st_setsrid(st_geomfromgeojson(%s),4326) as geom) as g; """, (geometry,)) hash_result = cur.fetchone() if hash_result is None or not hash_result[1]: logger.error(f"Invalid geometry for event: {properties.get('label')}") if hash_result and len(hash_result) > 2: logger.error(f"Reason: {hash_result[2]}") db.close() return False geo_hash = hash_result[0] # Now insert the geometry explicitly cur.execute(""" INSERT INTO geo (geom, hash, geom_center) VALUES ( st_setsrid(st_geomfromgeojson(%s),4326), %s, st_centroid(st_setsrid(st_geomfromgeojson(%s),4326)) ) ON CONFLICT (hash) DO NOTHING; """, (geometry, geo_hash, geometry)) # Verify the geometry was inserted cur.execute("SELECT 1 FROM geo WHERE hash = %s", (geo_hash,)) if cur.fetchone() is None: logger.error(f"Failed to insert geometry with hash: {geo_hash}") db.close() return False logger.info(f"Inserted new geometry with hash: {geo_hash}") else: geo_hash = hash_result[0] # Determine the bounds for the time range bounds = '[]' if properties['start'] == properties['stop'] else '[)' # Insert the event into the database cur.execute(""" INSERT INTO events (events_type, events_what, events_when, events_tags, events_geo) VALUES (%s, %s, tstzrange(%s, %s, %s), %s, %s) ON CONFLICT DO NOTHING RETURNING events_id; """, ( properties['type'], properties['what'], properties['start'], properties['stop'], bounds, json.dumps(properties), geo_hash )) # Get the event ID event_id = cur.fetchone() if event_id: logger.success(f"Event created with ID: {event_id[0]}") db.commit() db.close() return True else: logger.warning(f"Failed to create event: {properties.get('label')}") db.close() return False except Exception as e: logger.error(f"Error submitting event: {e}") return False def main(): """ Main function to fetch OSM Calendar events and add them to the database. The function will exit if the .env file doesn't exist, as it's required for database connection parameters. """ logger.info("Starting OSM Calendar extractor") # Load environment variables from .env file and check if it exists if not load_env_from_file(): logger.error("Required .env file not found. Exiting.") sys.exit(1) logger.info("Environment variables loaded successfully from .env file") # Fetch events from the OSM Calendar RSS feed items = fetch_osm_calendar_data() if not items: logger.warning("No events found, exiting") return # Process each item success_count = 0 for item in items: # Create an event from the item event = create_event(item) if not event: continue # Submit the event to the database if submit_event(event): success_count += 1 logger.success(f"Successfully added {success_count} out of {len(items)} events to the database") if __name__ == "__main__": main()