#!/usr/bin/env python3 """ Script de scraping pour l'agenda du libre (https://www.agendadulibre.org/) Utilise le fichier iCal pour récupérer les événements et les envoyer à l'API OEDB """ import requests import json import os import sys import argparse import re import time from datetime import datetime, timedelta from typing import Dict, List, Optional, Tuple import icalendar from icalendar import Calendar, Event import logging # Configuration par défaut api_oedb = "https://api.openeventdatabase.org" # Configuration du logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('agendadulibre_scraper.log'), logging.StreamHandler(sys.stdout) ] ) logger = logging.getLogger(__name__) class AgendaDuLibreScraper: def __init__(self, api_base_url: str = api_oedb, batch_size: int = 1, max_events: int = None, dry_run: bool = True): self.api_base_url = api_base_url self.batch_size = batch_size self.max_events = max_events self.dry_run = dry_run self.data_file = "agendadulibre_events.json" self.cache_file = "agendadulibre_cache.json" self.ical_file = "agendadulibre_events.ics" self.ical_url = "https://www.agendadulibre.org/events.ics" self.cache_duration_hours = 1 # Durée de cache en heures # Charger les données existantes self.events_data = self.load_events_data() self.cache_data = self.load_cache_data() def load_events_data(self) -> Dict: """Charge les données d'événements depuis le fichier JSON local""" if os.path.exists(self.data_file): try: with open(self.data_file, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: logger.error(f"Erreur lors du chargement du fichier {self.data_file}: {e}") return {"events": {}, "last_update": None} return {"events": {}, "last_update": None} def load_cache_data(self) -> Dict: """Charge les données de cache depuis le fichier JSON local""" if os.path.exists(self.cache_file): try: with open(self.cache_file, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: logger.error(f"Erreur lors du chargement du fichier cache {self.cache_file}: {e}") return {"processed_events": {}, "last_ical_fetch": None, "ical_content_hash": None} return {"processed_events": {}, "last_ical_fetch": None, "ical_content_hash": None} def save_events_data(self): """Sauvegarde les données d'événements dans le fichier JSON local""" try: with open(self.data_file, 'w', encoding='utf-8') as f: json.dump(self.events_data, f, ensure_ascii=False, indent=2) except Exception as e: logger.error(f"Erreur lors de la sauvegarde du fichier {self.data_file}: {e}") def save_cache_data(self): """Sauvegarde les données de cache dans le fichier JSON local""" try: with open(self.cache_file, 'w', encoding='utf-8') as f: json.dump(self.cache_data, f, ensure_ascii=False, indent=2) except Exception as e: logger.error(f"Erreur lors de la sauvegarde du fichier cache {self.cache_file}: {e}") def is_ical_cache_valid(self) -> bool: """Vérifie si le cache iCal est encore valide (moins d'une heure)""" if not os.path.exists(self.ical_file): return False try: file_time = os.path.getmtime(self.ical_file) cache_age = datetime.now().timestamp() - file_time cache_age_hours = cache_age / 3600 logger.debug(f"Cache iCal âgé de {cache_age_hours:.2f} heures") return cache_age_hours < self.cache_duration_hours except Exception as e: logger.error(f"Erreur lors de la vérification du cache iCal: {e}") return False def get_content_hash(self, content: bytes) -> str: """Calcule le hash du contenu pour détecter les changements""" import hashlib return hashlib.md5(content).hexdigest() def is_ical_content_changed(self, new_content: bytes) -> bool: """Vérifie si le contenu iCal a changé depuis la dernière fois""" new_hash = self.get_content_hash(new_content) old_hash = self.cache_data.get("ical_content_hash") return new_hash != old_hash def save_ical_cache(self, ical_content: bytes): """Sauvegarde le contenu iCal en cache local""" try: with open(self.ical_file, 'wb') as f: f.write(ical_content) logger.info(f"Cache iCal sauvegardé dans {self.ical_file}") # Mettre à jour le cache JSON avec le hash du contenu self.cache_data["ical_content_hash"] = self.get_content_hash(ical_content) self.cache_data["last_ical_fetch"] = datetime.now().isoformat() self.save_cache_data() except Exception as e: logger.error(f"Erreur lors de la sauvegarde du cache iCal: {e}") def load_ical_cache(self) -> Optional[bytes]: """Charge le contenu iCal depuis le cache local""" try: with open(self.ical_file, 'rb') as f: content = f.read() logger.info(f"Cache iCal chargé depuis {self.ical_file}") return content except Exception as e: logger.error(f"Erreur lors du chargement du cache iCal: {e}") return None def fetch_ical_data(self, force_refresh: bool = False) -> Optional[Calendar]: """Récupère et parse le fichier iCal depuis l'agenda du libre ou depuis le cache""" ical_content = None # Vérifier si le cache est valide (sauf si on force le rechargement) if not force_refresh and self.is_ical_cache_valid(): logger.info("Utilisation du cache iCal local (moins d'une heure)") ical_content = self.load_ical_cache() else: if force_refresh: logger.info(f"Rechargement forcé du fichier iCal depuis {self.ical_url}") else: logger.info(f"Cache iCal expiré ou absent, téléchargement depuis {self.ical_url}") try: response = requests.get(self.ical_url, timeout=30) response.raise_for_status() ical_content = response.content # Vérifier si le contenu a changé if not self.is_ical_content_changed(ical_content): logger.info("Contenu iCal identique au précédent, utilisation du cache existant") ical_content = self.load_ical_cache() else: logger.info("Nouveau contenu iCal détecté, mise à jour du cache") # Sauvegarder en cache self.save_ical_cache(ical_content) except requests.RequestException as e: logger.error(f"Erreur lors de la récupération du fichier iCal: {e}") # Essayer de charger depuis le cache même s'il est expiré logger.info("Tentative de chargement depuis le cache expiré...") ical_content = self.load_ical_cache() if ical_content is None: logger.error("Impossible de récupérer le contenu iCal") return None try: calendar = Calendar.from_ical(ical_content) logger.info(f"Fichier iCal parsé avec succès") return calendar except Exception as e: logger.error(f"Erreur lors du parsing du fichier iCal: {e}") return None def parse_event(self, event: Event) -> Optional[Dict]: """Parse un événement iCal et le convertit au format OEDB""" try: # Récupérer les propriétés de base summary = str(event.get('summary', '')) description = str(event.get('description', '')) location = str(event.get('location', '')) url = str(event.get('url', '')) # Extraire les coordonnées GEO si disponibles geo_coords = self.extract_geo_coordinates(event) # Extraire les catégories si disponibles categories = self.extract_categories(event) # Extraire les propriétés supplémentaires organizer = self.extract_organizer(event) alt_description = self.extract_alt_description(event) short_description = self.extract_short_description(event) sequence = self.extract_sequence(event) repeat_rules = self.extract_repeat_rules(event) # Gestion des dates dtstart = event.get('dtstart') dtend = event.get('dtend') if not dtstart: logger.warning(f"Événement sans date de début: {summary}") return None # Convertir les dates start_date = dtstart.dt if isinstance(start_date, datetime): start_iso = start_date.isoformat() else: # Date seulement (sans heure) start_iso = f"{start_date}T00:00:00" end_date = None if dtend: end_dt = dtend.dt if isinstance(end_dt, datetime): end_iso = end_dt.isoformat() else: end_iso = f"{end_dt}T23:59:59" else: # Si pas de date de fin, ajouter 2 heures par défaut if isinstance(start_date, datetime): end_iso = (start_date + timedelta(hours=2)).isoformat() else: end_iso = f"{start_date}T02:00:00" # Créer l'événement au format OEDB oedb_event = { "properties": { "label": summary, "description": description, "type": "scheduled", "what": "culture.floss", # Type par défaut pour l'agenda du libre "where": location, "start": start_iso, "stop": end_iso, "url": url if url else None, "source:name": "Agenda du Libre", "source:url": "https://www.agendadulibre.org/", "last_modified_by": "agendadulibre_scraper", "tags": categories if categories else [], # Ajouter les catégories comme tags "organizer": organizer, # Organisateur de l'événement "alt_description": alt_description, # Description alternative HTML "short_description": short_description, # Description courte "sequence": sequence, # Numéro de séquence "repeat_rules": repeat_rules # Règles de répétition }, "geometry": { "type": "Point", "coordinates": geo_coords if geo_coords else [0, 0] # Utiliser GEO ou coordonnées par défaut } } # Créer un ID unique basé sur le contenu event_id = self.generate_event_id(summary, start_iso, location) return { "id": event_id, "event": oedb_event, } except Exception as e: logger.error(f"Erreur lors du parsing de l'événement: {e}") return None def extract_geo_coordinates(self, event: Event) -> Optional[List[float]]: """Extrait les coordonnées du champ GEO: de l'événement iCal""" try: geo = event.get('geo') if geo: # Le champ GEO peut être sous différentes formes if hasattr(geo, 'lat') and hasattr(geo, 'lon'): # Format avec attributs lat/lon lat = float(geo.lat) lon = float(geo.lon) logger.info(f"📍 Coordonnées GEO trouvées: {lat}, {lon}") return [lon, lat] # Format GeoJSON (longitude, latitude) else: # Format string "latitude;longitude" geo_str = str(geo) if ';' in geo_str: parts = geo_str.split(';') if len(parts) == 2: lat = float(parts[0].strip()) lon = float(parts[1].strip()) logger.info(f"📍 Coordonnées GEO trouvées: {lat}, {lon}") return [lon, lat] # Format GeoJSON (longitude, latitude) else: logger.debug(f"Format GEO non reconnu: {geo_str}") else: logger.debug("Aucun champ GEO trouvé") return None except (ValueError, AttributeError, TypeError) as e: logger.warning(f"Erreur lors de l'extraction des coordonnées GEO: {e}") return None except Exception as e: logger.error(f"Erreur inattendue lors de l'extraction GEO: {e}") return None def extract_categories(self, event: Event) -> List[str]: """Extrait les catégories du champ CATEGORIES: de l'événement iCal""" try: categories = [] # Le champ CATEGORIES peut apparaître plusieurs fois for category in event.get('categories', []): if category: # Extraire la valeur de l'objet vCategory if hasattr(category, 'cats'): # Si c'est un objet vCategory avec des catégories for cat in category.cats: cat_str = str(cat).strip() if cat_str: categories.append(cat_str) else: # Sinon, convertir directement en string cat_str = str(category).strip() if cat_str: categories.append(cat_str) if categories: logger.info(f"🏷️ Catégories trouvées: {', '.join(categories)}") else: logger.debug("Aucune catégorie trouvée") return categories except Exception as e: logger.warning(f"Erreur lors de l'extraction des catégories: {e}") return [] def extract_organizer(self, event: Event) -> Optional[str]: """Extrait l'organisateur du champ ORGANIZER: de l'événement iCal""" try: organizer = event.get('organizer') if organizer: organizer_str = str(organizer).strip() if organizer_str: logger.debug(f"👤 Organisateur trouvé: {organizer_str}") return organizer_str return None except Exception as e: logger.warning(f"Erreur lors de l'extraction de l'organisateur: {e}") return None def extract_alt_description(self, event: Event) -> Optional[str]: """Extrait la description alternative HTML du champ X-ALT-DESC;FMTTYPE=text/html: de l'événement iCal""" try: # Chercher le champ X-ALT-DESC avec FMTTYPE=text/html for prop in event.property_items(): if prop[0] == 'X-ALT-DESC' and hasattr(prop[1], 'params') and prop[1].params.get('FMTTYPE') == 'text/html': alt_desc = str(prop[1]).strip() if alt_desc: logger.debug(f"📄 Description alternative HTML trouvée: {len(alt_desc)} caractères") return alt_desc return None except Exception as e: logger.warning(f"Erreur lors de l'extraction de la description alternative: {e}") return None def extract_short_description(self, event: Event) -> Optional[str]: """Extrait la description courte du champ SUMMARY: de l'événement iCal""" try: summary = event.get('summary') if summary: summary_str = str(summary).strip() if summary_str: logger.debug(f"📝 Description courte trouvée: {summary_str}") return summary_str return None except Exception as e: logger.warning(f"Erreur lors de l'extraction de la description courte: {e}") return None def extract_sequence(self, event: Event) -> Optional[int]: """Extrait le numéro de séquence du champ SEQUENCE: de l'événement iCal""" try: sequence = event.get('sequence') if sequence is not None: seq_num = int(sequence) logger.debug(f"🔢 Séquence trouvée: {seq_num}") return seq_num return None except (ValueError, TypeError) as e: logger.warning(f"Erreur lors de l'extraction de la séquence: {e}") return None except Exception as e: logger.warning(f"Erreur inattendue lors de l'extraction de la séquence: {e}") return None def extract_repeat_rules(self, event: Event) -> Optional[str]: """Extrait les règles de répétition du champ RRULE: de l'événement iCal""" try: # Essayer différentes variantes de casse rrule = event.get('rrule') or event.get('RRULE') or event.get('Rrule') if rrule: rrule_str = str(rrule).strip() if rrule_str: logger.info(f"🔄 Règles de répétition trouvées: {rrule_str}") return rrule_str # Vérifier aussi dans les propriétés avec parcours manuel for prop in event.property_items(): if prop[0].upper() == 'RRULE': rrule_str = str(prop[1]).strip() if rrule_str: logger.info(f"🔄 Règles de répétition trouvées (parcours): {rrule_str}") return rrule_str # Note: Pas de log ici car c'est normal qu'il n'y ait pas de RRULE # dans tous les événements (seulement les événements récurrents en ont) return None except Exception as e: logger.warning(f"Erreur lors de l'extraction des règles de répétition: {e}") return None def generate_event_id(self, summary: str, start_date: str, location: str) -> str: """Génère un ID unique pour l'événement""" import hashlib content = f"{summary}_{start_date}_{location}" return hashlib.md5(content.encode('utf-8')).hexdigest() def clean_location_for_geocoding(self, location: str) -> Optional[str]: """Nettoie le lieu pour le géocodage en extrayant l'adresse après la première virgule""" if not location or location.strip() == "": return None # Diviser par la première virgule parts = location.split(',', 1) if len(parts) > 1: # Prendre la partie après la première virgule address_part = parts[1].strip() # Vérifier si on a un numéro et une adresse # Pattern pour détecter un numéro suivi d'une adresse address_pattern = r'^\s*\d+.*' if re.match(address_pattern, address_part): logger.info(f"📍 Adresse potentielle trouvée: {address_part}") return address_part # Si pas de virgule ou pas d'adresse valide, essayer le lieu complet logger.info(f"📍 Tentative de géocodage avec le lieu complet: {location}") return location.strip() def geocode_with_nominatim(self, location: str) -> Optional[Tuple[float, float]]: """Géocode un lieu avec Nominatim""" if not location: return None try: # URL de l'API Nominatim nominatim_url = "https://nominatim.openstreetmap.org/search" # Paramètres de la requête params = { 'q': location, 'format': 'json', 'limit': 1, 'countrycodes': 'fr', # Limiter à la France 'addressdetails': 1 } headers = { 'User-Agent': 'AgendaDuLibreScraper/1.0 (contact@example.com)' } logger.info(f"🌍 Géocodage avec Nominatim: {location}") # Faire la requête avec un timeout response = requests.get(nominatim_url, params=params, headers=headers, timeout=10) response.raise_for_status() # Parser la réponse results = response.json() if results and len(results) > 0: result = results[0] lat = float(result['lat']) lon = float(result['lon']) logger.info(f"✅ Géocodage réussi: {location} -> ({lat}, {lon})") logger.info(f" Adresse trouvée: {result.get('display_name', 'N/A')}") # Respecter la limite de 1 requête par seconde pour Nominatim time.sleep(1) return (lon, lat) # Retourner (longitude, latitude) pour GeoJSON else: logger.warning(f"⚠️ Aucun résultat de géocodage pour: {location}") return None except requests.RequestException as e: logger.error(f"❌ Erreur de connexion Nominatim: {e}") return None except (ValueError, KeyError) as e: logger.error(f"❌ Erreur de parsing Nominatim: {e}") return None except Exception as e: logger.error(f"❌ Erreur inattendue lors du géocodage: {e}") return None def improve_event_coordinates(self, event_data: Dict) -> Dict: """Améliore les coordonnées de l'événement si nécessaire""" coords = event_data["event"]["geometry"]["coordinates"] # Vérifier si les coordonnées sont par défaut (0, 0) if coords == [0, 0]: location = event_data["event"]["properties"].get("where", "") if location: # Nettoyer le lieu pour le géocodage clean_location = self.clean_location_for_geocoding(location) if clean_location: # Tenter le géocodage new_coords = self.geocode_with_nominatim(clean_location) if new_coords: # Mettre à jour les coordonnées event_data["event"]["geometry"]["coordinates"] = list(new_coords) logger.info(f"🎯 Coordonnées mises à jour par géocodage: {coords} -> {new_coords}") else: logger.warning(f"⚠️ Impossible de géocoder: {clean_location}") else: logger.info(f"ℹ️ Lieu non géocodable: {location}") else: logger.info("ℹ️ Aucun lieu spécifié pour le géocodage") else: # Vérifier si les coordonnées viennent du champ GEO geo_coords = event_data.get("raw_ical", {}).get("geo") if geo_coords: logger.info(f"✅ Coordonnées utilisées depuis le champ GEO: {coords}") else: logger.info(f"ℹ️ Coordonnées déjà définies: {coords}") return event_data def log_event_details(self, event_data: Dict): """Log détaillé de l'événement avant envoi""" props = event_data["event"]["properties"] geom = event_data["event"]["geometry"] logger.info("📝 Détails de l'événement à insérer:") # INSERT_YOUR_CODE # Affiche un dump lisible de l'événement avec json.dumps (indentation) try: logger.info(json.dumps(event_data, ensure_ascii=False, indent=2)) except Exception as e: logger.warning(f"Erreur lors de l'affichage lisible de l'événement: {e}") # logger.info(event_data) # logger.info(f" ID: {event_data['id']}") # logger.info(f" Titre: {props.get('label', 'N/A')}") # logger.info(f" Description: {props.get('description', 'N/A')[:100]}{'...' if len(props.get('description', '')) > 100 else ''}") # logger.info(f" Type: {props.get('type', 'N/A')}") # logger.info(f" Catégorie: {props.get('what', 'N/A')}") # logger.info(f" Lieu: {props.get('where', 'N/A')}") # logger.info(f" Début: {props.get('start', 'N/A')}") # logger.info(f" Fin: {props.get('stop', 'N/A')}") # logger.info(f" URL: {props.get('url', 'N/A')}") # logger.info(f" Source: {props.get('source:name', 'N/A')}") # logger.info(f" Coordonnées: {geom.get('coordinates', 'N/A')}") # logger.info(f" Tags: {', '.join(props.get('tags', [])) if props.get('tags') else 'N/A'}") # logger.info(f" Organisateur: {props.get('organizer', 'N/A')}") # logger.info(f" Description courte: {props.get('short_description', 'N/A')}") # logger.info(f" Séquence: {props.get('sequence', 'N/A')}") # logger.info(f" Règles de répétition: {props.get('repeat_rules', 'N/A')}") # logger.info(f" Description HTML: {'Oui' if props.get('alt_description') else 'N/A'}") # logger.info(f" Modifié par: {props.get('last_modified_by', 'N/A')}") def send_event_to_api(self, event_data: Dict, skip_geocoding: bool = False) -> Tuple[bool, str]: """Envoie un événement à l'API OEDB (ou simule en mode dry-run)""" # Améliorer les coordonnées si nécessaire (sauf si déjà traité) if not skip_geocoding: event_data = self.improve_event_coordinates(event_data) else: logger.info("ℹ️ Géocodage ignoré - événement déjà traité") # Log détaillé de l'événement self.log_event_details(event_data) if self.dry_run: logger.info(f"[DRY-RUN] Simulation d'envoi de l'événement: {event_data['event']['properties']['label']}") return True, "Simulé (dry-run)" try: url = f"{self.api_base_url}/event" headers = {"Content-Type": "application/json"} # Formater l'événement au format GeoJSON attendu par l'API geojson_event = { "type": "Feature", "geometry": event_data["event"]["geometry"], "properties": event_data["event"]["properties"] } logger.info(f"🌐 Envoi à l'API: {url}") response = requests.post(url, json=geojson_event, headers=headers, timeout=30) if response.status_code == 201: logger.info("✅ Événement créé avec succès dans l'API") return True, "Créé avec succès" elif response.status_code == 409: logger.warning("⚠️ Événement déjà existant dans l'API") return False, "Événement déjà existant" else: logger.error(f"❌ Erreur API: {response.status_code} - {response.text}") return False, f"Erreur API: {response.status_code} - {response.text}" except requests.RequestException as e: logger.error(f"❌ Erreur de connexion: {e}") return False, f"Erreur de connexion: {e}" except Exception as e: logger.error(f"❌ Erreur inattendue: {e}") return False, f"Erreur inattendue: {e}" def process_events(self, calendar: Calendar) -> Dict: """Traite tous les événements du calendrier""" stats = { "total_events": 0, "new_events": 0, "already_saved": 0, "api_errors": 0, "parse_errors": 0, "sent_this_run": 0, "skipped_due_to_limit": 0 } events_to_process = [] pending_events = [] # Événements en attente d'envoi processed_count = 0 # Parcourir tous les événements for component in calendar.walk(): if component.name == "VEVENT": stats["total_events"] += 1 # Parser l'événement parsed_event = self.parse_event(component) if not parsed_event: stats["parse_errors"] += 1 continue event_id = parsed_event["id"] event_label = parsed_event["event"]["properties"]["label"] # Vérifier le statut de l'événement event_status = None skip_reason = "" # Vérifier dans les données d'événements if event_id in self.events_data["events"]: event_status = self.events_data["events"][event_id].get("status", "unknown") if event_status in ["saved", "already_exists"]: stats["already_saved"] += 1 logger.info(f"⏭️ Événement ignoré: {event_label} - déjà traité (status: {event_status})") continue # Vérifier dans le cache des événements traités if event_id in self.cache_data["processed_events"]: cache_status = self.cache_data["processed_events"][event_id].get("status", "unknown") if cache_status in ["saved", "already_exists"]: stats["already_saved"] += 1 logger.info(f"⏭️ Événement ignoré: {event_label} - déjà dans le cache (status: {cache_status})") continue # Déterminer la priorité de l'événement priority = 0 # 0 = nouveau, 1 = en attente, 2 = échec précédent if event_status in ["pending", "failed", "api_error"]: priority = 1 # Priorité haute pour les événements en attente logger.info(f"🔄 Événement en attente prioritaire: {event_label} (status: {event_status})") elif event_id in self.cache_data["processed_events"]: cache_status = self.cache_data["processed_events"][event_id].get("status", "unknown") if cache_status in ["pending", "failed", "api_error"]: priority = 1 # Priorité haute pour les événements en attente dans le cache logger.info(f"🔄 Événement en attente du cache: {event_label} (status: {cache_status})") # Ajouter l'événement avec sa priorité event_with_priority = { "event": parsed_event, "priority": priority, "event_id": event_id, "event_label": event_label } if priority > 0: pending_events.append(event_with_priority) else: events_to_process.append(event_with_priority) # Trier les événements : d'abord les événements en attente, puis les nouveaux all_events = pending_events + events_to_process all_events.sort(key=lambda x: x["priority"], reverse=True) # Priorité décroissante # Appliquer la limite d'événements if self.max_events: all_events = all_events[:self.max_events] if len(pending_events) + len(events_to_process) > self.max_events: stats["skipped_due_to_limit"] = len(pending_events) + len(events_to_process) - self.max_events # Extraire les événements pour le traitement events_to_process = [item["event"] for item in all_events] # Traiter les événements par batch logger.info(f"Traitement de {len(events_to_process)} nouveaux événements par batch de {self.batch_size}") if self.max_events: logger.info(f"Limite d'événements: {self.max_events}") if self.dry_run: logger.info("Mode DRY-RUN activé - aucun événement ne sera envoyé à l'API") for i in range(0, len(events_to_process), self.batch_size): batch = events_to_process[i:i + self.batch_size] logger.info(f"Traitement du batch {i//self.batch_size + 1}/{(len(events_to_process) + self.batch_size - 1)//self.batch_size}") for event_data in batch: event_id = event_data["id"] event_label = event_data["event"]["properties"]["label"] logger.info(f"Envoi de l'événement: {event_label}") # Vérifier si l'événement a déjà été traité avec succès skip_geocoding = False if event_id in self.events_data["events"]: event_status = self.events_data["events"][event_id].get("status", "unknown") if event_status in ["saved", "already_exists"]: skip_geocoding = True logger.info(f"ℹ️ Géocodage ignoré pour {event_label} - déjà traité") # Envoyer à l'API success, message = self.send_event_to_api(event_data, skip_geocoding=skip_geocoding) # Mettre à jour les statistiques et les données locales if success: stats["new_events"] += 1 stats["sent_this_run"] += 1 self.events_data["events"][event_id] = { "status": "saved", "message": message, "last_attempt": datetime.now().isoformat(), "event": event_data["event"] } # Ajouter au cache des événements traités self.cache_data["processed_events"][event_id] = { "processed_at": datetime.now().isoformat(), "status": "saved", "event_label": event_label } logger.info(f"✅ {event_label} - {message}") else: if "déjà existant" in message or "already exists" in message.lower(): stats["already_saved"] += 1 self.events_data["events"][event_id] = { "status": "already_exists", "message": message, "last_attempt": datetime.now().isoformat(), "event": event_data["event"] } # Ajouter au cache même si déjà existant self.cache_data["processed_events"][event_id] = { "processed_at": datetime.now().isoformat(), "status": "already_exists", "event_label": event_label } logger.info(f"⚠️ {event_label} - {message}") else: stats["api_errors"] += 1 self.events_data["events"][event_id] = { "status": "error", "message": message, "last_attempt": datetime.now().isoformat(), "event": event_data["event"] } logger.error(f"❌ {event_label} - {message}") # Mettre à jour la date de dernière mise à jour self.events_data["last_update"] = datetime.now().isoformat() # Sauvegarder le cache self.save_cache_data() return stats def run(self, force_refresh: bool = False): """Exécute le scraping complet""" logger.info("🚀 Démarrage du scraping de l'agenda du libre") logger.info(f"Configuration: batch_size={self.batch_size}, api_url={self.api_base_url}") logger.info(f"Mode dry-run: {'OUI' if self.dry_run else 'NON'}") if self.max_events: logger.info(f"Limite d'événements: {self.max_events}") logger.info(f"Cache iCal: {'ignoré' if force_refresh else f'valide pendant {self.cache_duration_hours}h'}") # Récupérer le fichier iCal calendar = self.fetch_ical_data(force_refresh=force_refresh) if not calendar: logger.error("❌ Impossible de récupérer le fichier iCal") return False # Traiter les événements stats = self.process_events(calendar) # Sauvegarder les données self.save_events_data() # Afficher les statistiques finales logger.info("📊 Statistiques finales:") logger.info(f" Total d'événements trouvés: {stats['total_events']}") logger.info(f" Nouveaux événements envoyés: {stats['new_events']}") logger.info(f" Événements déjà existants: {stats['already_saved']}") logger.info(f" Erreurs d'API: {stats['api_errors']}") logger.info(f" Erreurs de parsing: {stats['parse_errors']}") logger.info(f" Événements envoyés cette fois: {stats['sent_this_run']}") if stats['skipped_due_to_limit'] > 0: logger.info(f" Événements ignorés (limite atteinte): {stats['skipped_due_to_limit']}") logger.info("✅ Scraping terminé avec succès") return True def main(): parser = argparse.ArgumentParser(description="Scraper pour l'agenda du libre") parser.add_argument("--api-url", default=api_oedb, help=f"URL de base de l'API OEDB (défaut: {api_oedb})") parser.add_argument("--batch-size", type=int, default=1, help="Nombre d'événements à envoyer par batch (défaut: 1)") parser.add_argument("--max-events", type=int, default=None, help="Limiter le nombre d'événements à traiter (défaut: aucun)") parser.add_argument("--dry-run", action="store_true", default=True, help="Mode dry-run par défaut (simulation sans envoi à l'API)") parser.add_argument("--no-dry-run", action="store_true", help="Désactiver le mode dry-run (envoi réel à l'API)") parser.add_argument("--verbose", "-v", action="store_true", help="Mode verbeux") parser.add_argument("--force-refresh", "-f", action="store_true", help="Forcer le rechargement du fichier iCal (ignorer le cache)") parser.add_argument("--cache-duration", type=int, default=1, help="Durée de validité du cache en heures (défaut: 1)") args = parser.parse_args() if args.verbose: logging.getLogger().setLevel(logging.DEBUG) # Déterminer le mode dry-run dry_run = args.dry_run and not args.no_dry_run # Créer et exécuter le scraper scraper = AgendaDuLibreScraper( api_base_url=args.api_url, batch_size=args.batch_size, max_events=args.max_events, dry_run=dry_run ) # Modifier la durée de cache si spécifiée scraper.cache_duration_hours = args.cache_duration # Exécuter avec ou sans rechargement forcé success = scraper.run(force_refresh=args.force_refresh) sys.exit(0 if success else 1) if __name__ == "__main__": main()