#!/usr/bin/env python3 """ Script de scraping pour l'agenda du libre (https://www.agendadulibre.org/) Utilise le fichier iCal pour récupérer les événements et les envoyer à l'API OEDB """ import requests import json import os import sys import argparse from datetime import datetime, timedelta from typing import Dict, List, Optional, Tuple import icalendar from icalendar import Calendar, Event import logging # Configuration par défaut api_oedb = "https://api.openeventdatabase.org" # Configuration du logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('agendadulibre_scraper.log'), logging.StreamHandler(sys.stdout) ] ) logger = logging.getLogger(__name__) class AgendaDuLibreScraper: def __init__(self, api_base_url: str = api_oedb, batch_size: int = 1): self.api_base_url = api_base_url self.batch_size = batch_size self.data_file = "agendadulibre_events.json" self.ical_file = "agendadulibre_events.ics" self.ical_url = "https://www.agendadulibre.org/events.ics" self.cache_duration_hours = 1 # Durée de cache en heures # Charger les données existantes self.events_data = self.load_events_data() def load_events_data(self) -> Dict: """Charge les données d'événements depuis le fichier JSON local""" if os.path.exists(self.data_file): try: with open(self.data_file, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: logger.error(f"Erreur lors du chargement du fichier {self.data_file}: {e}") return {"events": {}, "last_update": None} return {"events": {}, "last_update": None} def save_events_data(self): """Sauvegarde les données d'événements dans le fichier JSON local""" try: with open(self.data_file, 'w', encoding='utf-8') as f: json.dump(self.events_data, f, ensure_ascii=False, indent=2) except Exception as e: logger.error(f"Erreur lors de la sauvegarde du fichier {self.data_file}: {e}") def is_ical_cache_valid(self) -> bool: """Vérifie si le cache iCal est encore valide (moins d'une heure)""" if not os.path.exists(self.ical_file): return False try: file_time = os.path.getmtime(self.ical_file) cache_age = datetime.now().timestamp() - file_time cache_age_hours = cache_age / 3600 logger.debug(f"Cache iCal âgé de {cache_age_hours:.2f} heures") return cache_age_hours < self.cache_duration_hours except Exception as e: logger.error(f"Erreur lors de la vérification du cache iCal: {e}") return False def save_ical_cache(self, ical_content: bytes): """Sauvegarde le contenu iCal en cache local""" try: with open(self.ical_file, 'wb') as f: f.write(ical_content) logger.info(f"Cache iCal sauvegardé dans {self.ical_file}") except Exception as e: logger.error(f"Erreur lors de la sauvegarde du cache iCal: {e}") def load_ical_cache(self) -> Optional[bytes]: """Charge le contenu iCal depuis le cache local""" try: with open(self.ical_file, 'rb') as f: content = f.read() logger.info(f"Cache iCal chargé depuis {self.ical_file}") return content except Exception as e: logger.error(f"Erreur lors du chargement du cache iCal: {e}") return None def fetch_ical_data(self, force_refresh: bool = False) -> Optional[Calendar]: """Récupère et parse le fichier iCal depuis l'agenda du libre ou depuis le cache""" ical_content = None # Vérifier si le cache est valide (sauf si on force le rechargement) if not force_refresh and self.is_ical_cache_valid(): logger.info("Utilisation du cache iCal local (moins d'une heure)") ical_content = self.load_ical_cache() else: if force_refresh: logger.info(f"Rechargement forcé du fichier iCal depuis {self.ical_url}") else: logger.info(f"Cache iCal expiré ou absent, téléchargement depuis {self.ical_url}") try: response = requests.get(self.ical_url, timeout=30) response.raise_for_status() ical_content = response.content # Sauvegarder en cache self.save_ical_cache(ical_content) except requests.RequestException as e: logger.error(f"Erreur lors de la récupération du fichier iCal: {e}") # Essayer de charger depuis le cache même s'il est expiré logger.info("Tentative de chargement depuis le cache expiré...") ical_content = self.load_ical_cache() if ical_content is None: logger.error("Impossible de récupérer le contenu iCal") return None try: calendar = Calendar.from_ical(ical_content) logger.info(f"Fichier iCal parsé avec succès") return calendar except Exception as e: logger.error(f"Erreur lors du parsing du fichier iCal: {e}") return None def parse_event(self, event: Event) -> Optional[Dict]: """Parse un événement iCal et le convertit au format OEDB""" try: # Récupérer les propriétés de base summary = str(event.get('summary', '')) description = str(event.get('description', '')) location = str(event.get('location', '')) url = str(event.get('url', '')) # Gestion des dates dtstart = event.get('dtstart') dtend = event.get('dtend') if not dtstart: logger.warning(f"Événement sans date de début: {summary}") return None # Convertir les dates start_date = dtstart.dt if isinstance(start_date, datetime): start_iso = start_date.isoformat() else: # Date seulement (sans heure) start_iso = f"{start_date}T00:00:00" end_date = None if dtend: end_dt = dtend.dt if isinstance(end_dt, datetime): end_iso = end_dt.isoformat() else: end_iso = f"{end_dt}T23:59:59" else: # Si pas de date de fin, ajouter 2 heures par défaut if isinstance(start_date, datetime): end_iso = (start_date + timedelta(hours=2)).isoformat() else: end_iso = f"{start_date}T02:00:00" # Créer l'événement au format OEDB oedb_event = { "properties": { "label": summary, "description": description, "type": "scheduled", "what": "culture.floss", # Type par défaut pour l'agenda du libre "where": location, "start": start_iso, "stop": end_iso, "url": url if url else None, "source:name": "Agenda du Libre", "source:url": "https://www.agendadulibre.org/", "last_modified_by": "agendadulibre_scraper" }, "geometry": { "type": "Point", "coordinates": [0, 0] # Coordonnées par défaut, à géocoder si nécessaire } } # Créer un ID unique basé sur le contenu event_id = self.generate_event_id(summary, start_iso, location) return { "id": event_id, "event": oedb_event, "raw_ical": { "summary": summary, "description": description, "location": location, "url": url, "dtstart": start_iso, "dtend": end_iso } } except Exception as e: logger.error(f"Erreur lors du parsing de l'événement: {e}") return None def generate_event_id(self, summary: str, start_date: str, location: str) -> str: """Génère un ID unique pour l'événement""" import hashlib content = f"{summary}_{start_date}_{location}" return hashlib.md5(content.encode('utf-8')).hexdigest() def send_event_to_api(self, event_data: Dict) -> Tuple[bool, str]: """Envoie un événement à l'API OEDB""" try: url = f"{self.api_base_url}/event" headers = {"Content-Type": "application/json"} # Formater l'événement au format GeoJSON attendu par l'API geojson_event = { "type": "Feature", "geometry": event_data["event"]["geometry"], "properties": event_data["event"]["properties"] } response = requests.post(url, json=geojson_event, headers=headers, timeout=30) if response.status_code == 201: return True, "Créé avec succès" elif response.status_code == 409: return False, "Événement déjà existant" else: return False, f"Erreur API: {response.status_code} - {response.text}" except requests.RequestException as e: return False, f"Erreur de connexion: {e}" except Exception as e: return False, f"Erreur inattendue: {e}" def process_events(self, calendar: Calendar) -> Dict: """Traite tous les événements du calendrier""" stats = { "total_events": 0, "new_events": 0, "already_saved": 0, "api_errors": 0, "parse_errors": 0, "sent_this_run": 0 } events_to_process = [] # Parcourir tous les événements for component in calendar.walk(): if component.name == "VEVENT": stats["total_events"] += 1 # Parser l'événement parsed_event = self.parse_event(component) if not parsed_event: stats["parse_errors"] += 1 continue event_id = parsed_event["id"] # Vérifier si l'événement existe déjà dans nos données if event_id in self.events_data["events"]: event_status = self.events_data["events"][event_id].get("status", "unknown") if event_status in ["saved", "already_exists"]: stats["already_saved"] += 1 logger.debug(f"Événement déjà traité: {parsed_event['event']['properties']['label']}") continue events_to_process.append(parsed_event) # Traiter les événements par batch logger.info(f"Traitement de {len(events_to_process)} nouveaux événements par batch de {self.batch_size}") for i in range(0, len(events_to_process), self.batch_size): batch = events_to_process[i:i + self.batch_size] logger.info(f"Traitement du batch {i//self.batch_size + 1}/{(len(events_to_process) + self.batch_size - 1)//self.batch_size}") for event_data in batch: event_id = event_data["id"] event_label = event_data["event"]["properties"]["label"] logger.info(f"Envoi de l'événement: {event_label}") # Envoyer à l'API success, message = self.send_event_to_api(event_data) # Mettre à jour les statistiques et les données locales if success: stats["new_events"] += 1 stats["sent_this_run"] += 1 self.events_data["events"][event_id] = { "status": "saved", "message": message, "last_attempt": datetime.now().isoformat(), "event": event_data["event"] } logger.info(f"✅ {event_label} - {message}") else: if "déjà existant" in message or "already exists" in message.lower(): stats["already_saved"] += 1 self.events_data["events"][event_id] = { "status": "already_exists", "message": message, "last_attempt": datetime.now().isoformat(), "event": event_data["event"] } logger.info(f"⚠️ {event_label} - {message}") else: stats["api_errors"] += 1 self.events_data["events"][event_id] = { "status": "error", "message": message, "last_attempt": datetime.now().isoformat(), "event": event_data["event"] } logger.error(f"❌ {event_label} - {message}") # Mettre à jour la date de dernière mise à jour self.events_data["last_update"] = datetime.now().isoformat() return stats def run(self, force_refresh: bool = False): """Exécute le scraping complet""" logger.info("🚀 Démarrage du scraping de l'agenda du libre") logger.info(f"Configuration: batch_size={self.batch_size}, api_url={self.api_base_url}") logger.info(f"Cache iCal: {'ignoré' if force_refresh else f'valide pendant {self.cache_duration_hours}h'}") # Récupérer le fichier iCal calendar = self.fetch_ical_data(force_refresh=force_refresh) if not calendar: logger.error("❌ Impossible de récupérer le fichier iCal") return False # Traiter les événements stats = self.process_events(calendar) # Sauvegarder les données self.save_events_data() # Afficher les statistiques finales logger.info("📊 Statistiques finales:") logger.info(f" Total d'événements trouvés: {stats['total_events']}") logger.info(f" Nouveaux événements envoyés: {stats['new_events']}") logger.info(f" Événements déjà existants: {stats['already_saved']}") logger.info(f" Erreurs d'API: {stats['api_errors']}") logger.info(f" Erreurs de parsing: {stats['parse_errors']}") logger.info(f" Événements envoyés cette fois: {stats['sent_this_run']}") logger.info("✅ Scraping terminé avec succès") return True def main(): parser = argparse.ArgumentParser(description="Scraper pour l'agenda du libre") parser.add_argument("--api-url", default=api_oedb, help=f"URL de base de l'API OEDB (défaut: {api_oedb})") parser.add_argument("--batch-size", type=int, default=1, help="Nombre d'événements à envoyer par batch (défaut: 1)") parser.add_argument("--verbose", "-v", action="store_true", help="Mode verbeux") parser.add_argument("--force-refresh", "-f", action="store_true", help="Forcer le rechargement du fichier iCal (ignorer le cache)") parser.add_argument("--cache-duration", type=int, default=1, help="Durée de validité du cache en heures (défaut: 1)") args = parser.parse_args() if args.verbose: logging.getLogger().setLevel(logging.DEBUG) # Créer et exécuter le scraper scraper = AgendaDuLibreScraper( api_base_url=args.api_url, batch_size=args.batch_size ) # Modifier la durée de cache si spécifiée scraper.cache_duration_hours = args.cache_duration # Exécuter avec ou sans rechargement forcé success = scraper.run(force_refresh=args.force_refresh) sys.exit(0 if success else 1) if __name__ == "__main__": main()