#!/usr/bin/env python3 """ OSM Calendar Extractor for the OpenEventDatabase. This script fetches events from the OpenStreetMap Calendar RSS feed and adds them to the OpenEventDatabase via the API. For events that don't have geographic coordinates in the RSS feed but have a link to an OSM Calendar event (https://osmcal.org/event/...), the script will fetch the iCal version of the event and extract the coordinates and location from there. RSS Feed URL: https://osmcal.org/events.rss API Endpoint: https://api.openeventdatabase.org/event Usage: python osm_cal.py [--max-events MAX_EVENTS] [--offset OFFSET] Arguments: --max-events MAX_EVENTS Maximum number of events to insert (default: 1) --offset OFFSET Number of events to skip from the beginning of the RSS feed (default: 0) Examples: # Insert the first event from the RSS feed python osm_cal.py # Insert up to 5 events from the RSS feed python osm_cal.py --max-events 5 # Skip the first 3 events and insert the next 2 python osm_cal.py --offset 3 --max-events 2 Environment Variables: These environment variables can be set in the system environment or in a .env file in the project root directory. """ import json import requests import sys import os import xml.etree.ElementTree as ET import re import html from datetime import datetime, timedelta from bs4 import BeautifulSoup import unicodedata # Add the parent directory to the path so we can import from oedb sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) from oedb.utils.db import db_connect, load_env_from_file from oedb.utils.logging import logger # RSS Feed URL for OSM Calendar RSS_URL = "https://osmcal.org/events.rss" # Base URL for OSM Calendar events OSMCAL_EVENT_BASE_URL = "https://osmcal.org/event/" # Main OSM Calendar page OSMCAL_MAIN_URL = "https://osmcal.org" # Cache file for processed events CACHE_FILE = os.path.join(os.path.dirname(__file__), 'osm_cal_cache.json') def fix_encoding(text): """ Corrige les problèmes d'encodage UTF-8 courants. Args: text (str): Texte potentiellement mal encodé Returns: str: Texte avec l'encodage corrigé """ if not text: return text try: # Essayer de détecter et corriger l'encodage double UTF-8 # (UTF-8 interprété comme Latin-1 puis réencodé en UTF-8) if 'Ã' in text: # Encoder en latin-1 puis décoder en UTF-8 corrected = text.encode('latin-1').decode('utf-8') logger.info(f"Encodage corrigé : '{text}' -> '{corrected}'") return corrected except (UnicodeEncodeError, UnicodeDecodeError): # Si la correction échoue, essayer d'autres méthodes try: # Normaliser les caractères Unicode normalized = unicodedata.normalize('NFKD', text) return normalized except: pass # Si aucune correction ne fonctionne, retourner le texte original return text def load_event_cache(): """ Charge le cache des événements traités depuis le fichier JSON. Returns: dict: Dictionnaire des événements avec leur statut de traitement """ if os.path.exists(CACHE_FILE): try: with open(CACHE_FILE, 'r', encoding='utf-8') as f: cache = json.load(f) logger.info(f"Cache chargé : {len(cache)} événements en cache") return cache except Exception as e: logger.error(f"Erreur lors du chargement du cache : {e}") return {} else: logger.info("Aucun cache trouvé, création d'un nouveau cache") return {} def save_event_cache(cache): """ Sauvegarde le cache des événements dans le fichier JSON. Args: cache (dict): Dictionnaire des événements avec leur statut """ try: with open(CACHE_FILE, 'w', encoding='utf-8') as f: json.dump(cache, f, indent=2, ensure_ascii=False) logger.info(f"Cache sauvegardé : {len(cache)} événements") except Exception as e: logger.error(f"Erreur lors de la sauvegarde du cache : {e}") def scrape_osmcal_event_links(): """ Scrape la page principale d'osmcal.org pour extraire tous les liens d'événements. Returns: list: Liste des URLs d'événements trouvés """ logger.info(f"Scraping de la page principale : {OSMCAL_MAIN_URL}") try: headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' } response = requests.get(OSMCAL_MAIN_URL, headers=headers) response.raise_for_status() soup = BeautifulSoup(response.content, 'html.parser') # Debugging : sauvegarder le HTML pour inspection debug_file = os.path.join(os.path.dirname(__file__), 'osmcal_debug.html') with open(debug_file, 'w', encoding='utf-8') as f: f.write(response.text) logger.info(f"HTML de débogage sauvegardé dans : {debug_file}") event_links = [] # Essayer différents sélecteurs basés sur la structure HTML fournie selectors_to_try = [ 'a.event-list-entry-box', # Sélecteur principal basé sur l'exemple HTML 'li.event-list-entry a', # Sélecteur alternatif basé sur la structure '.event-list-entry a', # Variation sans spécifier le tag li 'a[href*="/event/"]', # Tous les liens contenant "/event/" '.event-list-entry-box' # Au cas où ce serait juste la classe ] for selector in selectors_to_try: logger.info(f"Essai du sélecteur : {selector}") elements = soup.select(selector) logger.info(f"Trouvé {len(elements)} éléments avec le sélecteur {selector}") if elements: for element in elements: href = None # Si l'élément est déjà un lien if element.name == 'a' and element.get('href'): href = element.get('href') # Si l'élément contient un lien elif element.name != 'a': link_element = element.find('a') if link_element and link_element.get('href'): href = link_element.get('href') if href: # Construire l'URL complète si c'est un lien relatif if href.startswith('/'): # Enlever les paramètres de requête de l'URL de base base_url = OSMCAL_MAIN_URL.split('?')[0] if base_url.endswith('/'): base_url = base_url[:-1] full_url = base_url + href else: full_url = href # Vérifier que c'est bien un lien vers un événement if '/event/' in href and full_url not in event_links: event_links.append(full_url) logger.info(f"Lien d'événement trouvé : {full_url}") # Si on a trouvé des liens avec ce sélecteur, on s'arrête if event_links: break # Si aucun lien trouvé, essayer de lister tous les liens pour débugger if not event_links: logger.warning("Aucun lien d'événement trouvé. Listing de tous les liens pour débogage :") all_links = soup.find_all('a', href=True) logger.info(f"Total de liens trouvés sur la page : {len(all_links)}") # Afficher les 10 premiers liens pour débogage for i, link in enumerate(all_links[:10]): logger.info(f"Lien {i+1}: {link.get('href')} (classes: {link.get('class', [])})") # Chercher spécifiquement les liens contenant "event" event_related_links = [link for link in all_links if 'event' in link.get('href', '').lower()] logger.info(f"Liens contenant 'event' : {len(event_related_links)}") for link in event_related_links[:5]: logger.info(f"Lien event: {link.get('href')}") logger.success(f"Trouvé {len(event_links)} liens d'événements uniques sur la page principale") return event_links except requests.exceptions.RequestException as e: logger.error(f"Erreur lors du scraping de osmcal.org : {e}") return [] except Exception as e: logger.error(f"Erreur inattendue lors du scraping : {e}") import traceback logger.error(f"Traceback: {traceback.format_exc()}") return [] def fetch_osm_calendar_data(): """ Fetch events from the OSM Calendar RSS feed. Returns: list: A list of event items from the RSS feed. """ logger.info("Fetching data from OSM Calendar RSS feed") try: response = requests.get(RSS_URL) response.raise_for_status() # Raise an exception for HTTP errors # Parse the XML response root = ET.fromstring(response.content) # Find all item elements (events) channel = root.find('channel') if channel is None: logger.error("No channel element found in RSS feed") return [] items = channel.findall('item') if not items: logger.error("No items found in RSS feed") return [] logger.success(f"Successfully fetched {len(items)} events from OSM Calendar RSS feed") return items except requests.exceptions.RequestException as e: logger.error(f"Error fetching data from OSM Calendar RSS feed: {e}") return [] except ET.ParseError as e: logger.error(f"Error parsing XML response: {e}") return [] except Exception as e: logger.error(f"Unexpected error fetching OSM Calendar data: {e}") return [] def parse_event_dates(description): """ Parse event dates from the description. Args: description (str): The event description HTML. Returns: tuple: A tuple containing (start_date, end_date) as ISO format strings. """ try: # Extract the date information from the description date_pattern = r'(\d+)(?:st|nd|rd|th)\s+(\w+)(?:\s+(\d+):(\d+)(?:\s+–\s+(\d+):(\d+))?)?(?:\s+\(([^)]+)\))?(?:\s+–\s+(\d+)(?:st|nd|rd|th)\s+(\w+))?' date_match = re.search(date_pattern, description) if not date_match: # Try alternative pattern for single day with time range date_pattern = r'(\d+)(?:st|nd|rd|th)\s+(\w+)\s+(\d+):(\d+)\s+–\s+(\d+):(\d+)' date_match = re.search(date_pattern, description) if date_match: # Extract date components day = int(date_match.group(1)) month_name = date_match.group(2) # Convert month name to month number month_map = { 'January': 1, 'February': 2, 'March': 3, 'April': 4, 'May': 5, 'June': 6, 'July': 7, 'August': 8, 'September': 9, 'October': 10, 'November': 11, 'December': 12 } # Try to match the month name (case insensitive) month = None for name, num in month_map.items(): if month_name.lower() == name.lower(): month = num break if month is None: # If month name not found, use current month month = datetime.now().month logger.warning(f"Could not parse month name: {month_name}, using current month") # Get current year (assuming events are current or future) current_year = datetime.now().year # Create start date try: start_date = datetime(current_year, month, day) except ValueError: # Handle invalid dates (e.g., February 30) logger.warning(f"Invalid date: {day} {month_name} {current_year}, using current date") start_date = datetime.now() # Check if there's an end date if len(date_match.groups()) >= 8 and date_match.group(8): end_day = int(date_match.group(8)) end_month_name = date_match.group(9) # Convert end month name to month number end_month = None for name, num in month_map.items(): if end_month_name.lower() == name.lower(): end_month = num break if end_month is None: # If end month name not found, use start month end_month = month logger.warning(f"Could not parse end month name: {end_month_name}, using start month") try: end_date = datetime(current_year, end_month, end_day) # Add a day to include the full end day end_date = end_date + timedelta(days=1) except ValueError: # Handle invalid dates logger.warning(f"Invalid end date: {end_day} {end_month_name} {current_year}, using start date + 1 day") end_date = start_date + timedelta(days=1) else: # If no end date, use start date + 1 day as default end_date = start_date + timedelta(days=1) # Format dates as ISO strings start_iso = start_date.isoformat() end_iso = end_date.isoformat() return (start_iso, end_iso) else: # If no date pattern found, use current date as fallback now = datetime.now() start_iso = now.isoformat() end_iso = (now + timedelta(days=1)).isoformat() logger.warning(f"Could not parse date from description, using current date: {start_iso} to {end_iso}") return (start_iso, end_iso) except Exception as e: logger.error(f"Error parsing event dates: {e}") # Return default dates (current date) now = datetime.now() return (now.isoformat(), (now + timedelta(days=1)).isoformat()) def fetch_ical_data(event_url): """ Fetch and parse iCal data for an OSM Calendar event. Args: event_url (str): The URL of the OSM Calendar event. Returns: tuple: A tuple containing (location_name, coordinates). """ try: # Check if the URL is an OSM Calendar event URL if not event_url.startswith(OSMCAL_EVENT_BASE_URL): logger.warning(f"Not an OSM Calendar event URL: {event_url}") return ("Unknown Location", [0, 0]) # Extract the event ID from the URL event_id_match = re.search(r'event/(\d+)', event_url) if not event_id_match: logger.warning(f"Could not extract event ID from URL: {event_url}") return ("Unknown Location", [0, 0]) event_id = event_id_match.group(1) # Construct the iCal URL ical_url = f"{OSMCAL_EVENT_BASE_URL}{event_id}.ics" # Fetch the iCal content logger.info(f"Fetching iCal data from: {ical_url}") response = requests.get(ical_url) if not response.ok: logger.warning(f"Failed to fetch iCal data: {response.status_code}") return ("Unknown Location", [0, 0]) # Parse the iCal content avec l'encodage correct response.encoding = response.apparent_encoding or 'utf-8' ical_content = response.text # Extract GEO information geo_match = re.search(r'GEO:([-+]?\d+\.\d+);([-+]?\d+\.\d+)', ical_content) if geo_match: # GEO format is latitude;longitude latitude = float(geo_match.group(2)) longitude = float(geo_match.group(1)) coordinates = [longitude, latitude] # GeoJSON uses [longitude, latitude] logger.info(f"Extracted coordinates from iCal: {coordinates}") else: logger.warning(f"No GEO information found in iCal data for event: {event_id}") coordinates = [0, 0] # Extract LOCATION information location_match = re.search(r'LOCATION:(.+?)(?:\r\n|\n|\r)', ical_content) if location_match: location_name = location_match.group(1).strip() # Unescape backslash-escaped characters (e.g., \, becomes ,) location_name = re.sub(r'\\(.)', r'\1', location_name) # Corriger l'encodage location_name = fix_encoding(location_name) logger.info(f"Extracted location from iCal: {location_name}") else: logger.warning(f"No LOCATION information found in iCal data for event: {event_id}") location_name = "Unknown Location" return (location_name, coordinates) except Exception as e: logger.error(f"Error fetching or parsing iCal data: {e}") return ("Unknown Location", [0, 0]) def extract_location(description): """ Extract location information from the event description. Args: description (str): The event description HTML. Returns: tuple: A tuple containing (location_name, coordinates). """ try: # Default coordinates (center of the world) coordinates = [0, 0] location_name = "Unknown Location" # Try to find location in the description location_pattern = r'
([^<]+)
' location_matches = re.findall(location_pattern, description) if location_matches and len(location_matches) > 1: # The second paragraph often contains the location location_candidate = location_matches[1].strip() if location_candidate and "," in location_candidate and not location_candidate.startswith('<'): location_name = fix_encoding(location_candidate) # For now, we don't have exact coordinates, so we'll use a placeholder # In a real implementation, you might want to geocode the location coordinates = [0, 0] return (location_name, coordinates) except Exception as e: logger.error(f"Error extracting location: {e}") return ("Unknown Location", [0, 0]) def create_event(item): """ Create an event object from an RSS item. Args: item: An item element from the RSS feed. Returns: dict: A GeoJSON Feature representing the event. """ try: # Extract data from the item title = item.find('title').text link = item.find('link').text description = item.find('description').text guid = item.find('guid').text # Clean up the description (remove HTML tags for text extraction) clean_description = re.sub(r'<[^>]+>', ' ', description) clean_description = html.unescape(clean_description) clean_description = re.sub(r'\s+', ' ', clean_description).strip() # Corriger l'encodage du titre et de la description title = fix_encoding(title) clean_description = fix_encoding(clean_description) # Parse dates from the description start_date, end_date = parse_event_dates(description) # Extract location information from the description location_name, coordinates = extract_location(description) # If we don't have coordinates and the link is to an OSM Calendar event, # try to get coordinates and location from the iCal file if coordinates == [0, 0] and link and link.startswith(OSMCAL_EVENT_BASE_URL): logger.info(f"No coordinates found in description, trying to get from iCal: {link}") ical_location_name, ical_coordinates = fetch_ical_data(link) # Use iCal coordinates if available if ical_coordinates != [0, 0]: coordinates = ical_coordinates logger.info(f"Using coordinates from iCal: {coordinates}") # Use iCal location name if available and better than what we have if ical_location_name != "Unknown Location": location_name = ical_location_name logger.info(f"Using location name from iCal: {location_name}") # Create a descriptive label label = title # Create the event object event = { "type": "Feature", "geometry": { "type": "Point", "coordinates": coordinates }, "properties": { "type": "scheduled", "what": "community.osm.event", "what:series": "OpenStreetMap Calendar", "where": location_name, "label": label, "description": clean_description, "start": start_date, "stop": end_date, "url": link, "external_id": guid, "source": "OSM Calendar" } } return event except Exception as e: logger.error(f"Error creating event from item: {e}") return None def event_exists(db, properties): """ Check if an event with the same properties already exists in the database. Args: db: Database connection. properties: Event properties. Returns: bool: True if the event exists, False otherwise. """ print('event: ', properties) try: cur = db.cursor() # Check if an event with the same external_id exists if 'external_id' in properties: cur.execute(""" SELECT events_id FROM events WHERE events_tags->>'external_id' = %s; """, (properties['external_id'],)) result = cur.fetchone() if result: logger.info(f"Event with external_id {properties['external_id']} already exists") return True # Check if an event with the same label, start, and stop exists cur.execute(""" SELECT events_id FROM events WHERE events_tags->>'label' = %s AND events_tags->>'start' = %s AND events_tags->>'stop' = %s; """, ( properties.get('label', ''), properties.get('start', ''), properties.get('stop', '') )) result = cur.fetchone() if result: logger.info(f"Event with label '{properties.get('label')}' and same dates already exists") return True return False except Exception as e: logger.error(f"Error checking if event exists: {e}") return False def submit_event(event): """ Submit an event to the OpenEventDatabase using the API. Args: event: A GeoJSON Feature representing the event. Returns: bool: True if the event was successfully submitted, False otherwise. """ try: # Extract event properties for logging properties = event['properties'] # API endpoint for OpenEventDatabase api_url = "https://api.openeventdatabase.org/event" # Make the API request logger.info(f"Submitting event '{properties.get('label')}' to API") response = requests.post( api_url, headers={"Content-Type": "application/json"}, data=json.dumps(event) ) # Check if the request was successful if response.status_code == 200 or response.status_code == 201: # Parse the response to get the event ID response_data = response.json() event_id = response_data.get('id') if event_id: logger.success(f"Event created with ID: {event_id}") logger.info(f" https://api.openeventdatabase.org/event/{event_id}") return True else: logger.warning(f"Event created but no ID returned in response") return True elif response.status_code == 409: # 409 Conflict - L'événement existe déjà, considéré comme un succès logger.success(f"Event already exists in database: {properties.get('label')} (HTTP 409)") return True else: logger.warning(f"Failed to create event: {properties.get('label')}. Status code: {response.status_code}") logger.warning(f"Response: {response.text}") return False except Exception as e: logger.error(f"Error submitting event: {e}") return False def main(max_events=1, offset=0): """ Main function to fetch OSM Calendar events and add them to the OpenEventDatabase API. Args: max_events (int): Maximum number of events to insert (default: 1) offset (int): Number of events to skip from the beginning of the RSS feed (default: 0) The function will exit if the .env file doesn't exist, as it's required for environment variables. """ logger.info(f"Starting OSM Calendar extractor (max_events={max_events}, offset={offset})") # Load environment variables from .env file and check if it exists if not load_env_from_file(): logger.error("Required .env file not found. Exiting.") sys.exit(1) logger.info("Environment variables loaded successfully from .env file") # Charger le cache des événements traités event_cache = load_event_cache() # Scraper la page principale pour obtenir tous les liens d'événements event_links = scrape_osmcal_event_links() if not event_links: logger.warning("Aucun lien d'événement trouvé sur la page principale") return # Identifier les nouveaux événements (non présents dans le cache ou non traités avec succès) new_events = [] success_events = [] for link in event_links: # Vérifier si l'événement existe dans le cache et a le statut 'success' if link in event_cache and event_cache[link].get('status') == 'success': success_events.append(link) logger.info(f"Événement déjà traité avec succès, ignoré : {link}") else: new_events.append(link) # Initialiser l'événement dans le cache s'il n'existe pas if link not in event_cache: event_cache[link] = { 'discovered_at': datetime.now().isoformat(), 'status': 'pending', 'attempts': 0 } else: # Log du statut actuel pour les événements déjà en cache current_status = event_cache[link].get('status', 'unknown') attempts = event_cache[link].get('attempts', 0) logger.info(f"Événement à retraiter (statut: {current_status}, tentatives: {attempts}) : {link}") logger.info(f"Liens d'événements trouvés : {len(event_links)}") logger.info(f"Événements déjà traités avec succès : {len(success_events)}") logger.info(f"Nouveaux événements à traiter : {len(new_events)}") if len(new_events) == 0: logger.success("Aucun nouvel événement à traiter. Tous les événements ont déjà été insérés avec succès.") return # Appliquer l'offset et la limite aux nouveaux événements if offset >= len(new_events): logger.warning(f"Offset {offset} est supérieur ou égal au nombre de nouveaux événements {len(new_events)}") return events_to_process = new_events[offset:offset + max_events] logger.info(f"Traitement de {len(events_to_process)} nouveaux événements") # Fetch events from the OSM Calendar RSS feed pour obtenir les détails rss_items = fetch_osm_calendar_data() if not rss_items: logger.warning("Aucun événement trouvé dans le flux RSS, mais continuons avec les liens scrapés") # Créer un mapping des liens RSS vers les items pour un accès rapide rss_link_to_item = {} for item in rss_items: link_element = item.find('link') if link_element is not None: rss_link_to_item[link_element.text] = item # Process each new event success_count = 0 for event_link in events_to_process: try: # Vérifier si l'événement est déjà en succès (sécurité supplémentaire) if event_cache.get(event_link, {}).get('status') == 'success': logger.info(f"Événement déjà en succès, passage au suivant : {event_link}") success_count += 1 # Compter comme succès puisqu'il est déjà traité continue event_cache[event_link]['attempts'] += 1 event_cache[event_link]['last_attempt'] = datetime.now().isoformat() # Chercher l'item correspondant dans le flux RSS rss_item = rss_link_to_item.get(event_link) if rss_item is not None: # Créer l'événement depuis l'item RSS event = create_event(rss_item) else: # Si pas trouvé dans le flux RSS, essayer de créer un événement minimal depuis le lien logger.warning(f"Événement {event_link} non trouvé dans le flux RSS, tentative de création depuis le lien") event = create_event_from_link(event_link) if event: # Tenter de soumettre l'événement à l'API if submit_event(event): success_count += 1 event_cache[event_link]['status'] = 'success' event_cache[event_link]['inserted_at'] = datetime.now().isoformat() logger.success(f"Événement inséré avec succès : {event_link}") else: event_cache[event_link]['status'] = 'failed' logger.warning(f"Échec de l'insertion de l'événement : {event_link}") else: event_cache[event_link]['status'] = 'failed' logger.error(f"Impossible de créer l'événement depuis : {event_link}") except Exception as e: logger.error(f"Erreur lors du traitement de l'événement {event_link} : {e}") event_cache[event_link]['status'] = 'error' event_cache[event_link]['error'] = str(e) # Sauvegarder le cache mis à jour save_event_cache(event_cache) # Calculer les statistiques finales du cache cache_stats = { 'success': 0, 'pending': 0, 'failed': 0, 'error': 0, 'total': len(event_cache) } for link, data in event_cache.items(): status = data.get('status', 'pending') if status in cache_stats: cache_stats[status] += 1 # Événements en attente d'insertion (tous sauf success) events_awaiting_insertion = cache_stats['pending'] + cache_stats['failed'] + cache_stats['error'] logger.success(f"Traitement terminé : {success_count} événements insérés avec succès sur {len(events_to_process)} traités") logger.info("=== STATISTIQUES GLOBALES DU CACHE ===") logger.info(f"Total d'événements dans le cache : {cache_stats['total']}") logger.info(f"Événements traités avec succès : {cache_stats['success']}") logger.info(f"Événements en attente d'insertion : {events_awaiting_insertion}") logger.info(f" - Statut 'pending' : {cache_stats['pending']}") logger.info(f" - Statut 'failed' : {cache_stats['failed']}") logger.info(f" - Statut 'error' : {cache_stats['error']}") if events_awaiting_insertion > 0: logger.info(f"🔄 Il reste {events_awaiting_insertion} événements à traiter lors de la prochaine exécution") else: logger.success("✅ Tous les événements découverts ont été traités avec succès") def create_event_from_link(event_link): """ Créer un événement minimal depuis un lien osmcal.org quand il n'est pas disponible dans le flux RSS. Args: event_link (str): URL de l'événement osmcal.org Returns: dict: Un objet GeoJSON Feature représentant l'événement, ou None en cas d'échec """ try: logger.info(f"Tentative de création d'événement depuis le lien : {event_link}") # Si c'est un lien vers un événement OSM Calendar, essayer d'obtenir les données iCal if event_link.startswith(OSMCAL_EVENT_BASE_URL): location_name, coordinates = fetch_ical_data(event_link) # Extraire l'ID de l'événement pour créer un GUID event_id_match = re.search(r'event/(\d+)', event_link) if event_id_match: event_id = event_id_match.group(1) external_id = f"osmcal_{event_id}" else: external_id = event_link # Créer un événement avec les informations minimales disponibles now = datetime.now() event = { "type": "Feature", "geometry": { "type": "Point", "coordinates": coordinates }, "properties": { "type": "scheduled", "what": "community.osm.event", "what:series": "OpenStreetMap Calendar", "where": location_name, "label": f"Événement OSM Calendar {event_id if 'event_id' in locals() else 'inconnu'}", "description": f"Événement trouvé sur osmcal.org : {event_link}", "start": now.isoformat(), "stop": (now + timedelta(days=1)).isoformat(), "url": event_link, "external_id": external_id, "source": "OSM Calendar (scraped)" } } return event else: logger.warning(f"Lien non reconnu comme un événement OSM Calendar : {event_link}") return None except Exception as e: logger.error(f"Erreur lors de la création d'événement depuis le lien {event_link} : {e}") return None if __name__ == "__main__": import argparse # Set up command line argument parsing parser = argparse.ArgumentParser(description='OSM Calendar Extractor for the OpenEventDatabase') parser.add_argument('--max-events', type=int, default=1, help='Maximum number of events to insert (default: 1)') parser.add_argument('--offset', type=int, default=0, help='Number of events to skip from the beginning of the RSS feed (default: 0)') # Parse arguments args = parser.parse_args() # Run the main function with the provided arguments main(max_events=args.max_events, offset=args.offset)