#!/usr/bin/env python3 """ Scraper pour l'agenda geek - Import des événements dans OEDB Usage: python3 agenda_geek.py --limit 10 --offset 0 Options: --limit: Nombre d'événements à traiter (défaut: 5) --offset: Nombre d'événements à ignorer (défaut: 0) --api-url: URL de l'API OEDB (défaut: https://api.openeventdatabase.org) --dry-run: Mode test sans envoi vers l'API --verbose: Mode verbeux """ import requests import argparse import re import logging from bs4 import BeautifulSoup from icalendar import Calendar from datetime import datetime, timezone from urllib.parse import urljoin, urlparse from typing import Optional, Dict, List, Tuple import time import json # Configuration du logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler('agenda_geek_scraper.log') ] ) logger = logging.getLogger(__name__) class AgendaGeekScraper: def __init__(self, api_url: str = "https://api.openeventdatabase.org", dry_run: bool = False, page: int = 1): self.api_url = api_url.rstrip('/') self.dry_run = dry_run self.page = page self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'OEDB-AgendaGeek-Scraper/1.0 (+https://github.com/cquest/oedb)' }) def get_events_list(self) -> List[str]: """Récupère la liste des liens d'événements depuis la page principale""" url = f"https://lagendageek.com/tevents/page/{self.page}" logger.info(f"🔍 Récupération de la liste des événements depuis {url}") try: response = self.session.get(url, timeout=30) response.raise_for_status() soup = BeautifulSoup(response.content, 'html.parser') event_links = [] # Rechercher les liens des titres d'événements title_links = soup.find_all('a', class_='tribe-events-calendar-list__event-title-link') for link in title_links: href = link.get('href') if href: full_url = urljoin(url, href) event_links.append(full_url) logger.debug(f"📅 Événement trouvé: {link.get_text(strip=True)} - {full_url}") logger.info(f"✅ {len(event_links)} événements trouvés sur la page") return event_links except requests.RequestException as e: logger.error(f"❌ Erreur lors de la récupération de la liste: {e}") return [] def get_ical_link(self, event_url: str) -> Optional[str]: """Extrait le lien iCal depuis une page d'événement""" logger.debug(f"🔗 Recherche du lien iCal pour {event_url}") try: response = self.session.get(event_url, timeout=30) response.raise_for_status() # Le lien iCal est généralement construit en ajoutant ?ical=1 à l'URL ical_url = f"{event_url.rstrip('/')}/?ical=1" # Vérifier que le lien iCal existe ical_response = self.session.head(ical_url, timeout=10) if ical_response.status_code == 200: logger.debug(f"✅ Lien iCal trouvé: {ical_url}") return ical_url else: logger.warning(f"⚠️ Lien iCal non accessible: {ical_url} (status: {ical_response.status_code})") return None except requests.RequestException as e: logger.error(f"❌ Erreur lors de la récupération du lien iCal: {e}") return None def parse_ical(self, ical_url: str) -> Optional[Dict]: """Parse un fichier iCal et extrait les données de l'événement""" # Convertir webcal:// en https:// if ical_url.startswith('webcal://'): ical_url = ical_url.replace('webcal://', 'https://') logger.debug(f"📖 Parse du fichier iCal: {ical_url}") try: response = self.session.get(ical_url, timeout=30) response.raise_for_status() # Parser le contenu iCal cal = Calendar.from_ical(response.content) for component in cal.walk(): if component.name == "VEVENT": event_data = { 'summary': str(component.get('SUMMARY', '')), 'description': str(component.get('DESCRIPTION', '')), 'location': str(component.get('LOCATION', '')), 'dtstart': component.get('DTSTART'), 'dtend': component.get('DTEND'), 'geo': component.get('GEO'), 'url': str(component.get('URL', '')), 'uid': str(component.get('UID', '')) } logger.debug(f"📅 Événement parsé: {event_data['summary']}") return event_data logger.warning("⚠️ Aucun événement VEVENT trouvé dans le fichier iCal") return None except Exception as e: logger.error(f"❌ Erreur lors du parsing iCal: {e}") return None def geocode_address(self, address: str) -> Optional[Tuple[float, float]]: """Géocode une adresse en utilisant Nominatim""" if not address or address.strip() == '': return None logger.debug(f"🌍 Géocodage de l'adresse: {address}") try: # Utiliser Nominatim pour le géocodage geocode_url = "https://nominatim.openstreetmap.org/search" params = { 'q': address, 'format': 'json', 'limit': 1, 'countrycodes': 'fr', # Limiter à la France 'addressdetails': 1 } response = self.session.get(geocode_url, params=params, timeout=10) response.raise_for_status() results = response.json() if results: result = results[0] lat = float(result['lat']) lon = float(result['lon']) logger.debug(f"✅ Géocodage réussi: {lat}, {lon}") return (lat, lon) else: logger.warning(f"⚠️ Aucun résultat de géocodage pour: {address}") return None except Exception as e: logger.error(f"❌ Erreur lors du géocodage: {e}") return None def extract_coordinates(self, event_data: Dict) -> Optional[Tuple[float, float]]: """Extrait les coordonnées depuis les données de l'événement""" # D'abord essayer la propriété GEO if event_data.get('geo'): try: geo = event_data['geo'] logger.debug(f"🔍 Type GEO trouvé: {type(geo)} - Valeur: {geo}") # Cas 1: GEO avec paramètres latitude/longitude if hasattr(geo, 'params') and 'latitude' in geo.params and 'longitude' in geo.params: lat = float(geo.params['latitude']) lon = float(geo.params['longitude']) logger.debug(f"📍 Coordonnées GEO (params) trouvées: {lat}, {lon}") return (lat, lon) # Cas 2: GEO avec méthode to_ical elif hasattr(geo, 'to_ical'): # Format GEO standard: "latitude;longitude" geo_bytes = geo.to_ical() # Gérer le cas où c'est déjà une string ou des bytes if isinstance(geo_bytes, bytes): geo_str = geo_bytes.decode('utf-8') else: geo_str = str(geo_bytes) logger.debug(f"🔍 GEO string extrait: '{geo_str}'") parts = geo_str.split(';') if len(parts) == 2: lat = float(parts[0]) lon = float(parts[1]) logger.debug(f"📍 Coordonnées GEO parsées: {lat}, {lon}") return (lat, lon) # Cas 3: GEO est directement une string elif isinstance(geo, str): logger.debug(f"🔍 GEO est une string directe: '{geo}'") parts = geo.split(';') if len(parts) == 2: lat = float(parts[0]) lon = float(parts[1]) logger.debug(f"📍 Coordonnées GEO (string) parsées: {lat}, {lon}") return (lat, lon) # Cas 4: Autres formats possibles else: logger.debug(f"🔍 Format GEO non reconnu, tentative de conversion en string: {str(geo)}") geo_str = str(geo) if ';' in geo_str: parts = geo_str.split(';') if len(parts) == 2: lat = float(parts[0]) lon = float(parts[1]) logger.debug(f"📍 Coordonnées GEO (fallback) parsées: {lat}, {lon}") return (lat, lon) except (ValueError, AttributeError) as e: logger.warning(f"⚠️ Erreur parsing GEO: {e}") # Si pas de GEO, essayer de géocoder la location location = event_data.get('location', '').strip() if location: return self.geocode_address(location) return None def format_for_oedb(self, event_data: Dict, coordinates: Tuple[float, float], source_url: str) -> Dict: """Formate les données de l'événement pour l'API OEDB""" lat, lon = coordinates # Convertir les dates dtstart = event_data.get('dtstart') dtend = event_data.get('dtend') start_iso = None end_iso = None if dtstart: if hasattr(dtstart, 'dt'): dt = dtstart.dt if not isinstance(dt, datetime): # Si c'est juste une date, créer un datetime dt = datetime.combine(dt, datetime.min.time()) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) start_iso = dt.isoformat() if dtend: if hasattr(dtend, 'dt'): dt = dtend.dt if not isinstance(dt, datetime): dt = datetime.combine(dt, datetime.min.time()) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) end_iso = dt.isoformat() # Si pas de date de fin, définir à +2h de la date de début if start_iso and not end_iso: start_dt = datetime.fromisoformat(start_iso.replace('Z', '+00:00')) end_dt = start_dt.replace(hour=start_dt.hour + 2) end_iso = end_dt.isoformat() # Construire l'objet pour l'API OEDB oedb_event = { "type": "Feature", "geometry": { "type": "Point", "coordinates": [lon, lat] }, "properties": { "label": event_data.get('summary', 'Événement Agenda Geek'), "type": "scheduled", "what": "culture.geek", "start": start_iso, "stop": end_iso, "where": event_data.get('location', ''), "description": event_data.get('description', ''), "source:name": "L'Agenda Geek", "source:url": source_url, "source:uid": event_data.get('uid', ''), "url": event_data.get('url', source_url) } } return oedb_event def send_to_oedb(self, event: Dict) -> bool: """Envoie un événement vers l'API OEDB""" if self.dry_run: logger.info(f"🏃‍♂️ DRY RUN - Événement qui serait envoyé:") logger.info(json.dumps(event, indent=2, ensure_ascii=False)) return True try: response = self.session.post( f"{self.api_url}/event", json=event, timeout=30 ) if response.status_code == 201: result = response.json() event_id = result.get('id', 'unknown') logger.info(f"✅ Événement créé avec succès: ID {event_id}") return True elif response.status_code == 409: logger.info("⚠️ Événement déjà existant (conflit)") return True # Considérer comme un succès else: logger.error(f"❌ Erreur API ({response.status_code}): {response.text}") return False except requests.RequestException as e: logger.error(f"❌ Erreur lors de l'envoi vers l'API: {e}") return False def process_events(self, limit: int = 5, offset: int = 0) -> None: """Traite les événements avec pagination""" logger.info(f"🚀 Début du traitement - Limite: {limit}, Offset: {offset}") # Récupérer la liste des événements event_links = self.get_events_list() if not event_links: logger.error("❌ Aucun événement trouvé") return # Appliquer l'offset et la limite total_events = len(event_links) start_idx = min(offset, total_events) end_idx = min(offset + limit, total_events) events_to_process = event_links[start_idx:end_idx] logger.info(f"📊 Traitement de {len(events_to_process)} événements ({start_idx+1} à {end_idx} sur {total_events})") success_count = 0 error_count = 0 for i, event_url in enumerate(events_to_process, 1): logger.info(f"🔄 [{i}/{len(events_to_process)}] Traitement de {event_url}") try: # Obtenir le lien iCal ical_url = self.get_ical_link(event_url) if not ical_url: logger.warning(f"⚠️ Pas de lien iCal trouvé pour {event_url}") error_count += 1 continue # Parser le fichier iCal event_data = self.parse_ical(ical_url) if not event_data: logger.warning(f"⚠️ Impossible de parser l'iCal pour {event_url}") error_count += 1 continue # Extraire les coordonnées coordinates = self.extract_coordinates(event_data) if not coordinates: logger.warning(f"⚠️ Pas de coordonnées trouvées pour {event_data.get('summary', 'événement sans titre')}") error_count += 1 continue # Formater pour OEDB oedb_event = self.format_for_oedb(event_data, coordinates, event_url) # Envoyer vers l'API if self.send_to_oedb(oedb_event): success_count += 1 else: error_count += 1 # Pause entre les requêtes pour éviter la surcharge time.sleep(1) except Exception as e: logger.error(f"❌ Erreur lors du traitement de {event_url}: {e}") error_count += 1 logger.info(f"🏁 Traitement terminé - Succès: {success_count}, Erreurs: {error_count}") def main(): parser = argparse.ArgumentParser(description='Scraper Agenda Geek vers OEDB') parser.add_argument('--limit', type=int, default=20, help='Nombre d\'événements à traiter') parser.add_argument('--page', type=int, default=1, help='Numéro de page du site') parser.add_argument('--offset', type=int, default=0, help='Nombre d\'événements à ignorer') parser.add_argument('--api-url', default='https://api.openeventdatabase.org', help='URL de l\'API OEDB') parser.add_argument('--dry-run', action='store_true', help='Mode test sans envoi vers l\'API') parser.add_argument('--verbose', action='store_true', help='Mode verbeux') args = parser.parse_args() if args.verbose: logging.getLogger().setLevel(logging.DEBUG) scraper = AgendaGeekScraper(api_url=args.api_url, dry_run=args.dry_run, page=args.page) scraper.process_events(limit=args.limit, offset=args.offset) if __name__ == "__main__": main()