oedb-backend/extractors/osm_cal.py
2025-09-18 22:14:41 +02:00

434 lines
No EOL
14 KiB
Python
Executable file
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
OSM Calendar Extractor for the OpenEventDatabase.
This script fetches events from the OpenStreetMap Calendar RSS feed
and adds them to the OpenEventDatabase if they don't already exist.
RSS Feed URL: https://osmcal.org/events.rss
"""
import json
import requests
import sys
import os
import xml.etree.ElementTree as ET
import re
import html
from datetime import datetime, timedelta
# Add the parent directory to the path so we can import from oedb
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from oedb.utils.db import db_connect
from oedb.utils.logging import logger
# RSS Feed URL for OSM Calendar
RSS_URL = "https://osmcal.org/events.rss"
def fetch_osm_calendar_data():
"""
Fetch events from the OSM Calendar RSS feed.
Returns:
list: A list of event items from the RSS feed.
"""
logger.info("Fetching data from OSM Calendar RSS feed")
try:
response = requests.get(RSS_URL)
response.raise_for_status() # Raise an exception for HTTP errors
# Parse the XML response
root = ET.fromstring(response.content)
# Find all item elements (events)
channel = root.find('channel')
if channel is None:
logger.error("No channel element found in RSS feed")
return []
items = channel.findall('item')
if not items:
logger.error("No items found in RSS feed")
return []
logger.success(f"Successfully fetched {len(items)} events from OSM Calendar RSS feed")
return items
except requests.exceptions.RequestException as e:
logger.error(f"Error fetching data from OSM Calendar RSS feed: {e}")
return []
except ET.ParseError as e:
logger.error(f"Error parsing XML response: {e}")
return []
except Exception as e:
logger.error(f"Unexpected error fetching OSM Calendar data: {e}")
return []
def parse_event_dates(description):
"""
Parse event dates from the description.
Args:
description (str): The event description HTML.
Returns:
tuple: A tuple containing (start_date, end_date) as ISO format strings.
"""
try:
# Extract the date information from the description
date_pattern = r'(\d+)(?:st|nd|rd|th)\s+(\w+)(?:\s+(\d+):(\d+)(?:\s+\s+(\d+):(\d+))?)?(?:\s+\(([^)]+)\))?(?:\s+\s+(\d+)(?:st|nd|rd|th)\s+(\w+))?'
date_match = re.search(date_pattern, description)
if not date_match:
# Try alternative pattern for single day with time range
date_pattern = r'(\d+)(?:st|nd|rd|th)\s+(\w+)\s+(\d+):(\d+)\s+\s+(\d+):(\d+)'
date_match = re.search(date_pattern, description)
if date_match:
# Extract date components
day = int(date_match.group(1))
month_name = date_match.group(2)
# Convert month name to month number
month_map = {
'January': 1, 'February': 2, 'March': 3, 'April': 4,
'May': 5, 'June': 6, 'July': 7, 'August': 8,
'September': 9, 'October': 10, 'November': 11, 'December': 12
}
# Try to match the month name (case insensitive)
month = None
for name, num in month_map.items():
if month_name.lower() == name.lower():
month = num
break
if month is None:
# If month name not found, use current month
month = datetime.now().month
logger.warning(f"Could not parse month name: {month_name}, using current month")
# Get current year (assuming events are current or future)
current_year = datetime.now().year
# Create start date
try:
start_date = datetime(current_year, month, day)
except ValueError:
# Handle invalid dates (e.g., February 30)
logger.warning(f"Invalid date: {day} {month_name} {current_year}, using current date")
start_date = datetime.now()
# Check if there's an end date
if len(date_match.groups()) >= 8 and date_match.group(8):
end_day = int(date_match.group(8))
end_month_name = date_match.group(9)
# Convert end month name to month number
end_month = None
for name, num in month_map.items():
if end_month_name.lower() == name.lower():
end_month = num
break
if end_month is None:
# If end month name not found, use start month
end_month = month
logger.warning(f"Could not parse end month name: {end_month_name}, using start month")
try:
end_date = datetime(current_year, end_month, end_day)
# Add a day to include the full end day
end_date = end_date + timedelta(days=1)
except ValueError:
# Handle invalid dates
logger.warning(f"Invalid end date: {end_day} {end_month_name} {current_year}, using start date + 1 day")
end_date = start_date + timedelta(days=1)
else:
# If no end date, use start date + 1 day as default
end_date = start_date + timedelta(days=1)
# Format dates as ISO strings
start_iso = start_date.isoformat()
end_iso = end_date.isoformat()
return (start_iso, end_iso)
else:
# If no date pattern found, use current date as fallback
now = datetime.now()
start_iso = now.isoformat()
end_iso = (now + timedelta(days=1)).isoformat()
logger.warning(f"Could not parse date from description, using current date: {start_iso} to {end_iso}")
return (start_iso, end_iso)
except Exception as e:
logger.error(f"Error parsing event dates: {e}")
# Return default dates (current date)
now = datetime.now()
return (now.isoformat(), (now + timedelta(days=1)).isoformat())
def extract_location(description):
"""
Extract location information from the event description.
Args:
description (str): The event description HTML.
Returns:
tuple: A tuple containing (location_name, coordinates).
"""
try:
# Default coordinates (center of the world)
coordinates = [0, 0]
location_name = "Unknown Location"
# Try to find location in the description
location_pattern = r'<p>([^<]+)</p>'
location_matches = re.findall(location_pattern, description)
if location_matches and len(location_matches) > 1:
# The second paragraph often contains the location
location_candidate = location_matches[1].strip()
if location_candidate and "," in location_candidate and not location_candidate.startswith('<'):
location_name = location_candidate
# For now, we don't have exact coordinates, so we'll use a placeholder
# In a real implementation, you might want to geocode the location
coordinates = [0, 0]
return (location_name, coordinates)
except Exception as e:
logger.error(f"Error extracting location: {e}")
return ("Unknown Location", [0, 0])
def create_event(item):
"""
Create an event object from an RSS item.
Args:
item: An item element from the RSS feed.
Returns:
dict: A GeoJSON Feature representing the event.
"""
try:
# Extract data from the item
title = item.find('title').text
link = item.find('link').text
description = item.find('description').text
guid = item.find('guid').text
# Clean up the description (remove HTML tags for text extraction)
clean_description = re.sub(r'<[^>]+>', ' ', description)
clean_description = html.unescape(clean_description)
clean_description = re.sub(r'\s+', ' ', clean_description).strip()
# Parse dates from the description
start_date, end_date = parse_event_dates(description)
# Extract location information
location_name, coordinates = extract_location(description)
# Create a descriptive label
label = title
# Create the event object
event = {
"type": "Feature",
"geometry": {
"type": "Point",
"coordinates": coordinates
},
"properties": {
"type": "scheduled",
"what": "community.osm.event",
"what:series": "OpenStreetMap Calendar",
"where": location_name,
"label": label,
"description": clean_description,
"start": start_date,
"stop": end_date,
"url": link,
"external_id": guid,
"source": "OSM Calendar"
}
}
return event
except Exception as e:
logger.error(f"Error creating event from item: {e}")
return None
def event_exists(db, properties):
"""
Check if an event with the same properties already exists in the database.
Args:
db: Database connection.
properties: Event properties.
Returns:
bool: True if the event exists, False otherwise.
"""
try:
cur = db.cursor()
# Check if an event with the same external_id exists
if 'external_id' in properties:
cur.execute("""
SELECT events_id FROM events
WHERE events_tags->>'external_id' = %s;
""", (properties['external_id'],))
result = cur.fetchone()
if result:
logger.info(f"Event with external_id {properties['external_id']} already exists")
return True
# Check if an event with the same label, start, and stop exists
cur.execute("""
SELECT events_id FROM events
WHERE events_tags->>'label' = %s
AND events_tags->>'start' = %s
AND events_tags->>'stop' = %s;
""", (
properties.get('label', ''),
properties.get('start', ''),
properties.get('stop', '')
))
result = cur.fetchone()
if result:
logger.info(f"Event with label '{properties.get('label')}' and same dates already exists")
return True
return False
except Exception as e:
logger.error(f"Error checking if event exists: {e}")
return False
def submit_event(event):
"""
Submit an event to the OpenEventDatabase.
Args:
event: A GeoJSON Feature representing the event.
Returns:
bool: True if the event was successfully submitted, False otherwise.
"""
try:
# Connect to the database
db = db_connect()
# Extract event properties
properties = event['properties']
# Check if the event already exists
if event_exists(db, properties):
logger.info(f"Skipping event '{properties.get('label')}' as it already exists")
db.close()
return False
cur = db.cursor()
geometry = json.dumps(event['geometry'])
# Insert the geometry into the geo table
cur.execute("""
INSERT INTO geo
SELECT geom, md5(st_astext(geom)) as hash, st_centroid(geom) as geom_center FROM
(SELECT st_setsrid(st_geomfromgeojson(%s),4326) as geom) as g
WHERE ST_IsValid(geom)
ON CONFLICT DO NOTHING RETURNING hash;
""", (geometry,))
# Get the geometry hash
hash_result = cur.fetchone()
if hash_result is None:
# If the hash is None, get it from the database
cur.execute("""
SELECT md5(st_asewkt(geom)),
ST_IsValid(geom),
ST_IsValidReason(geom) from (SELECT st_geomfromgeojson(%s) as geom) as g;
""", (geometry,))
hash_result = cur.fetchone()
if hash_result is None or (len(hash_result) > 1 and not hash_result[1]):
logger.error(f"Invalid geometry for event: {properties.get('label')}")
db.close()
return False
geo_hash = hash_result[0]
# Determine the bounds for the time range
bounds = '[]' if properties['start'] == properties['stop'] else '[)'
# Insert the event into the database
cur.execute("""
INSERT INTO events (events_type, events_what, events_when, events_tags, events_geo)
VALUES (%s, %s, tstzrange(%s, %s, %s), %s, %s)
ON CONFLICT DO NOTHING RETURNING events_id;
""", (
properties['type'],
properties['what'],
properties['start'],
properties['stop'],
bounds,
json.dumps(properties),
geo_hash
))
# Get the event ID
event_id = cur.fetchone()
if event_id:
logger.success(f"Event created with ID: {event_id[0]}")
db.commit()
db.close()
return True
else:
logger.warning(f"Failed to create event: {properties.get('label')}")
db.close()
return False
except Exception as e:
logger.error(f"Error submitting event: {e}")
return False
def main():
"""
Main function to fetch OSM Calendar events and add them to the database.
"""
logger.info("Starting OSM Calendar extractor")
# Fetch events from the OSM Calendar RSS feed
items = fetch_osm_calendar_data()
if not items:
logger.warning("No events found, exiting")
return
# Process each item
success_count = 0
for item in items:
# Create an event from the item
event = create_event(item)
if not event:
continue
# Submit the event to the database
if submit_event(event):
success_count += 1
logger.success(f"Successfully added {success_count} out of {len(items)} events to the database")
if __name__ == "__main__":
main()