diff --git a/extractors/agenda_geek.py b/extractors/agenda_geek.py new file mode 100644 index 0000000..5721990 --- /dev/null +++ b/extractors/agenda_geek.py @@ -0,0 +1,413 @@ +#!/usr/bin/env python3 +""" +Scraper pour l'agenda geek - Import des événements dans OEDB + +Usage: + python3 agenda_geek.py --limit 10 --offset 0 + +Options: + --limit: Nombre d'événements à traiter (défaut: 5) + --offset: Nombre d'événements à ignorer (défaut: 0) + --api-url: URL de l'API OEDB (défaut: https://api.openeventdatabase.org) + --dry-run: Mode test sans envoi vers l'API + --verbose: Mode verbeux +""" + +import requests +import argparse +import re +import logging +from bs4 import BeautifulSoup +from icalendar import Calendar +from datetime import datetime, timezone +from urllib.parse import urljoin, urlparse +from typing import Optional, Dict, List, Tuple +import time +import json + +# Configuration du logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[ + logging.StreamHandler(), + logging.FileHandler('agenda_geek_scraper.log') + ] +) +logger = logging.getLogger(__name__) + +class AgendaGeekScraper: + def __init__(self, api_url: str = "https://api.openeventdatabase.org", dry_run: bool = False): + self.api_url = api_url.rstrip('/') + self.dry_run = dry_run + self.session = requests.Session() + self.session.headers.update({ + 'User-Agent': 'OEDB-AgendaGeek-Scraper/1.0 (+https://github.com/cquest/oedb)' + }) + + def get_events_list(self) -> List[str]: + """Récupère la liste des liens d'événements depuis la page principale""" + url = "https://lagendageek.com/tevents/page/10" + logger.info(f"🔍 Récupération de la liste des événements depuis {url}") + + try: + response = self.session.get(url, timeout=30) + response.raise_for_status() + + soup = BeautifulSoup(response.content, 'html.parser') + event_links = [] + + # Rechercher les liens des titres d'événements + title_links = soup.find_all('a', class_='tribe-events-calendar-list__event-title-link') + + for link in title_links: + href = link.get('href') + if href: + full_url = urljoin(url, href) + event_links.append(full_url) + logger.debug(f"📅 Événement trouvé: {link.get_text(strip=True)} - {full_url}") + + logger.info(f"✅ {len(event_links)} événements trouvés sur la page") + return event_links + + except requests.RequestException as e: + logger.error(f"❌ Erreur lors de la récupération de la liste: {e}") + return [] + + def get_ical_link(self, event_url: str) -> Optional[str]: + """Extrait le lien iCal depuis une page d'événement""" + logger.debug(f"🔗 Recherche du lien iCal pour {event_url}") + + try: + response = self.session.get(event_url, timeout=30) + response.raise_for_status() + + # Le lien iCal est généralement construit en ajoutant ?ical=1 à l'URL + ical_url = f"{event_url.rstrip('/')}/?ical=1" + + # Vérifier que le lien iCal existe + ical_response = self.session.head(ical_url, timeout=10) + if ical_response.status_code == 200: + logger.debug(f"✅ Lien iCal trouvé: {ical_url}") + return ical_url + else: + logger.warning(f"⚠️ Lien iCal non accessible: {ical_url} (status: {ical_response.status_code})") + return None + + except requests.RequestException as e: + logger.error(f"❌ Erreur lors de la récupération du lien iCal: {e}") + return None + + def parse_ical(self, ical_url: str) -> Optional[Dict]: + """Parse un fichier iCal et extrait les données de l'événement""" + # Convertir webcal:// en https:// + if ical_url.startswith('webcal://'): + ical_url = ical_url.replace('webcal://', 'https://') + + logger.debug(f"📖 Parse du fichier iCal: {ical_url}") + + try: + response = self.session.get(ical_url, timeout=30) + response.raise_for_status() + + # Parser le contenu iCal + cal = Calendar.from_ical(response.content) + + for component in cal.walk(): + if component.name == "VEVENT": + event_data = { + 'summary': str(component.get('SUMMARY', '')), + 'description': str(component.get('DESCRIPTION', '')), + 'location': str(component.get('LOCATION', '')), + 'dtstart': component.get('DTSTART'), + 'dtend': component.get('DTEND'), + 'geo': component.get('GEO'), + 'url': str(component.get('URL', '')), + 'uid': str(component.get('UID', '')) + } + + logger.debug(f"📅 Événement parsé: {event_data['summary']}") + return event_data + + logger.warning("⚠️ Aucun événement VEVENT trouvé dans le fichier iCal") + return None + + except Exception as e: + logger.error(f"❌ Erreur lors du parsing iCal: {e}") + return None + + def geocode_address(self, address: str) -> Optional[Tuple[float, float]]: + """Géocode une adresse en utilisant Nominatim""" + if not address or address.strip() == '': + return None + + logger.debug(f"🌍 Géocodage de l'adresse: {address}") + + try: + # Utiliser Nominatim pour le géocodage + geocode_url = "https://nominatim.openstreetmap.org/search" + params = { + 'q': address, + 'format': 'json', + 'limit': 1, + 'countrycodes': 'fr', # Limiter à la France + 'addressdetails': 1 + } + + response = self.session.get(geocode_url, params=params, timeout=10) + response.raise_for_status() + + results = response.json() + if results: + result = results[0] + lat = float(result['lat']) + lon = float(result['lon']) + logger.debug(f"✅ Géocodage réussi: {lat}, {lon}") + return (lat, lon) + else: + logger.warning(f"⚠️ Aucun résultat de géocodage pour: {address}") + return None + + except Exception as e: + logger.error(f"❌ Erreur lors du géocodage: {e}") + return None + + def extract_coordinates(self, event_data: Dict) -> Optional[Tuple[float, float]]: + """Extrait les coordonnées depuis les données de l'événement""" + # D'abord essayer la propriété GEO + if event_data.get('geo'): + try: + geo = event_data['geo'] + logger.debug(f"🔍 Type GEO trouvé: {type(geo)} - Valeur: {geo}") + + # Cas 1: GEO avec paramètres latitude/longitude + if hasattr(geo, 'params') and 'latitude' in geo.params and 'longitude' in geo.params: + lat = float(geo.params['latitude']) + lon = float(geo.params['longitude']) + logger.debug(f"📍 Coordonnées GEO (params) trouvées: {lat}, {lon}") + return (lat, lon) + + # Cas 2: GEO avec méthode to_ical + elif hasattr(geo, 'to_ical'): + # Format GEO standard: "latitude;longitude" + geo_bytes = geo.to_ical() + # Gérer le cas où c'est déjà une string ou des bytes + if isinstance(geo_bytes, bytes): + geo_str = geo_bytes.decode('utf-8') + else: + geo_str = str(geo_bytes) + + logger.debug(f"🔍 GEO string extrait: '{geo_str}'") + parts = geo_str.split(';') + if len(parts) == 2: + lat = float(parts[0]) + lon = float(parts[1]) + logger.debug(f"📍 Coordonnées GEO parsées: {lat}, {lon}") + return (lat, lon) + + # Cas 3: GEO est directement une string + elif isinstance(geo, str): + logger.debug(f"🔍 GEO est une string directe: '{geo}'") + parts = geo.split(';') + if len(parts) == 2: + lat = float(parts[0]) + lon = float(parts[1]) + logger.debug(f"📍 Coordonnées GEO (string) parsées: {lat}, {lon}") + return (lat, lon) + + # Cas 4: Autres formats possibles + else: + logger.debug(f"🔍 Format GEO non reconnu, tentative de conversion en string: {str(geo)}") + geo_str = str(geo) + if ';' in geo_str: + parts = geo_str.split(';') + if len(parts) == 2: + lat = float(parts[0]) + lon = float(parts[1]) + logger.debug(f"📍 Coordonnées GEO (fallback) parsées: {lat}, {lon}") + return (lat, lon) + + except (ValueError, AttributeError) as e: + logger.warning(f"⚠️ Erreur parsing GEO: {e}") + + # Si pas de GEO, essayer de géocoder la location + location = event_data.get('location', '').strip() + if location: + return self.geocode_address(location) + + return None + + def format_for_oedb(self, event_data: Dict, coordinates: Tuple[float, float], source_url: str) -> Dict: + """Formate les données de l'événement pour l'API OEDB""" + lat, lon = coordinates + + # Convertir les dates + dtstart = event_data.get('dtstart') + dtend = event_data.get('dtend') + + start_iso = None + end_iso = None + + if dtstart: + if hasattr(dtstart, 'dt'): + dt = dtstart.dt + if not isinstance(dt, datetime): + # Si c'est juste une date, créer un datetime + dt = datetime.combine(dt, datetime.min.time()) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + start_iso = dt.isoformat() + + if dtend: + if hasattr(dtend, 'dt'): + dt = dtend.dt + if not isinstance(dt, datetime): + dt = datetime.combine(dt, datetime.min.time()) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + end_iso = dt.isoformat() + + # Si pas de date de fin, définir à +2h de la date de début + if start_iso and not end_iso: + start_dt = datetime.fromisoformat(start_iso.replace('Z', '+00:00')) + end_dt = start_dt.replace(hour=start_dt.hour + 2) + end_iso = end_dt.isoformat() + + # Construire l'objet pour l'API OEDB + oedb_event = { + "type": "Feature", + "geometry": { + "type": "Point", + "coordinates": [lon, lat] + }, + "properties": { + "label": event_data.get('summary', 'Événement Agenda Geek'), + "type": "scheduled", + "what": "culture.geek", + "start": start_iso, + "stop": end_iso, + "where": event_data.get('location', ''), + "description": event_data.get('description', ''), + "source:name": "L'Agenda Geek", + "source:url": source_url, + "source:uid": event_data.get('uid', ''), + "url": event_data.get('url', source_url) + } + } + + return oedb_event + + def send_to_oedb(self, event: Dict) -> bool: + """Envoie un événement vers l'API OEDB""" + if self.dry_run: + logger.info(f"🏃‍♂️ DRY RUN - Événement qui serait envoyé:") + logger.info(json.dumps(event, indent=2, ensure_ascii=False)) + return True + + try: + response = self.session.post( + f"{self.api_url}/event", + json=event, + timeout=30 + ) + + if response.status_code == 201: + result = response.json() + event_id = result.get('id', 'unknown') + logger.info(f"✅ Événement créé avec succès: ID {event_id}") + return True + elif response.status_code == 409: + logger.info("⚠️ Événement déjà existant (conflit)") + return True # Considérer comme un succès + else: + logger.error(f"❌ Erreur API ({response.status_code}): {response.text}") + return False + + except requests.RequestException as e: + logger.error(f"❌ Erreur lors de l'envoi vers l'API: {e}") + return False + + def process_events(self, limit: int = 5, offset: int = 0) -> None: + """Traite les événements avec pagination""" + logger.info(f"🚀 Début du traitement - Limite: {limit}, Offset: {offset}") + + # Récupérer la liste des événements + event_links = self.get_events_list() + + if not event_links: + logger.error("❌ Aucun événement trouvé") + return + + # Appliquer l'offset et la limite + total_events = len(event_links) + start_idx = min(offset, total_events) + end_idx = min(offset + limit, total_events) + + events_to_process = event_links[start_idx:end_idx] + + logger.info(f"📊 Traitement de {len(events_to_process)} événements ({start_idx+1} à {end_idx} sur {total_events})") + + success_count = 0 + error_count = 0 + + for i, event_url in enumerate(events_to_process, 1): + logger.info(f"🔄 [{i}/{len(events_to_process)}] Traitement de {event_url}") + + try: + # Obtenir le lien iCal + ical_url = self.get_ical_link(event_url) + if not ical_url: + logger.warning(f"⚠️ Pas de lien iCal trouvé pour {event_url}") + error_count += 1 + continue + + # Parser le fichier iCal + event_data = self.parse_ical(ical_url) + if not event_data: + logger.warning(f"⚠️ Impossible de parser l'iCal pour {event_url}") + error_count += 1 + continue + + # Extraire les coordonnées + coordinates = self.extract_coordinates(event_data) + if not coordinates: + logger.warning(f"⚠️ Pas de coordonnées trouvées pour {event_data.get('summary', 'événement sans titre')}") + error_count += 1 + continue + + # Formater pour OEDB + oedb_event = self.format_for_oedb(event_data, coordinates, event_url) + + # Envoyer vers l'API + if self.send_to_oedb(oedb_event): + success_count += 1 + else: + error_count += 1 + + # Pause entre les requêtes pour éviter la surcharge + time.sleep(1) + + except Exception as e: + logger.error(f"❌ Erreur lors du traitement de {event_url}: {e}") + error_count += 1 + + logger.info(f"🏁 Traitement terminé - Succès: {success_count}, Erreurs: {error_count}") + +def main(): + parser = argparse.ArgumentParser(description='Scraper Agenda Geek vers OEDB') + parser.add_argument('--limit', type=int, default=5, help='Nombre d\'événements à traiter') + parser.add_argument('--offset', type=int, default=0, help='Nombre d\'événements à ignorer') + parser.add_argument('--api-url', default='https://api.openeventdatabase.org', help='URL de l\'API OEDB') + parser.add_argument('--dry-run', action='store_true', help='Mode test sans envoi vers l\'API') + parser.add_argument('--verbose', action='store_true', help='Mode verbeux') + + args = parser.parse_args() + + if args.verbose: + logging.getLogger().setLevel(logging.DEBUG) + + scraper = AgendaGeekScraper(api_url=args.api_url, dry_run=args.dry_run) + scraper.process_events(limit=args.limit, offset=args.offset) + +if __name__ == "__main__": + main()