oedb-backend/extractors/agendadulibre.py
2025-10-09 23:35:12 +02:00

967 lines
No EOL
45 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Script de scraping pour l'agenda du libre (https://www.agendadulibre.org/)
Utilise le fichier iCal pour récupérer les événements et les envoyer à l'API OEDB
"""
import requests
import json
import os
import sys
import argparse
import re
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
import icalendar
from icalendar import Calendar, Event
import logging
# Configuration par défaut
api_oedb = "https://api.openeventdatabase.org"
# Configuration du logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('agendadulibre_scraper.log'),
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
class AgendaDuLibreScraper:
def __init__(self, api_base_url: str = api_oedb, batch_size: int = 1, max_events: int = None, dry_run: bool = True,
parallel: bool = False, max_workers: int = 4):
self.api_base_url = api_base_url
self.batch_size = batch_size
self.max_events = max_events
self.dry_run = dry_run
self.parallel = parallel
self.max_workers = max_workers
self.data_file = "agendadulibre_events.json"
self.cache_file = "agendadulibre_cache.json"
self.ical_file = "agendadulibre_events.ics"
self.ical_url = "https://www.agendadulibre.org/events.ics"
self.cache_duration_hours = 1 # Durée de cache en heures
# Charger les données existantes
self.events_data = self.load_events_data()
self.cache_data = self.load_cache_data()
def load_events_data(self) -> Dict:
"""Charge les données d'événements depuis le fichier JSON local"""
if os.path.exists(self.data_file):
try:
with open(self.data_file, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
logger.error(f"Erreur lors du chargement du fichier {self.data_file}: {e}")
return {"events": {}, "last_update": None}
return {"events": {}, "last_update": None}
def load_cache_data(self) -> Dict:
"""Charge les données de cache depuis le fichier JSON local"""
if os.path.exists(self.cache_file):
try:
with open(self.cache_file, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
logger.error(f"Erreur lors du chargement du fichier cache {self.cache_file}: {e}")
return {"processed_events": {}, "last_ical_fetch": None, "ical_content_hash": None}
return {"processed_events": {}, "last_ical_fetch": None, "ical_content_hash": None}
def save_events_data(self):
"""Sauvegarde les données d'événements dans le fichier JSON local"""
try:
with open(self.data_file, 'w', encoding='utf-8') as f:
json.dump(self.events_data, f, ensure_ascii=False, indent=2)
except Exception as e:
logger.error(f"Erreur lors de la sauvegarde du fichier {self.data_file}: {e}")
def save_cache_data(self):
"""Sauvegarde les données de cache dans le fichier JSON local"""
try:
with open(self.cache_file, 'w', encoding='utf-8') as f:
json.dump(self.cache_data, f, ensure_ascii=False, indent=2)
except Exception as e:
logger.error(f"Erreur lors de la sauvegarde du fichier cache {self.cache_file}: {e}")
def is_ical_cache_valid(self) -> bool:
"""Vérifie si le cache iCal est encore valide (moins d'une heure)"""
if not os.path.exists(self.ical_file):
return False
try:
file_time = os.path.getmtime(self.ical_file)
cache_age = datetime.now().timestamp() - file_time
cache_age_hours = cache_age / 3600
logger.debug(f"Cache iCal âgé de {cache_age_hours:.2f} heures")
return cache_age_hours < self.cache_duration_hours
except Exception as e:
logger.error(f"Erreur lors de la vérification du cache iCal: {e}")
return False
def get_content_hash(self, content: bytes) -> str:
"""Calcule le hash du contenu pour détecter les changements"""
import hashlib
return hashlib.md5(content).hexdigest()
def is_ical_content_changed(self, new_content: bytes) -> bool:
"""Vérifie si le contenu iCal a changé depuis la dernière fois"""
new_hash = self.get_content_hash(new_content)
old_hash = self.cache_data.get("ical_content_hash")
return new_hash != old_hash
def save_ical_cache(self, ical_content: bytes):
"""Sauvegarde le contenu iCal en cache local"""
try:
with open(self.ical_file, 'wb') as f:
f.write(ical_content)
logger.info(f"Cache iCal sauvegardé dans {self.ical_file}")
# Mettre à jour le cache JSON avec le hash du contenu
self.cache_data["ical_content_hash"] = self.get_content_hash(ical_content)
self.cache_data["last_ical_fetch"] = datetime.now().isoformat()
self.save_cache_data()
except Exception as e:
logger.error(f"Erreur lors de la sauvegarde du cache iCal: {e}")
def load_ical_cache(self) -> Optional[bytes]:
"""Charge le contenu iCal depuis le cache local"""
try:
with open(self.ical_file, 'rb') as f:
content = f.read()
logger.info(f"Cache iCal chargé depuis {self.ical_file}")
return content
except Exception as e:
logger.error(f"Erreur lors du chargement du cache iCal: {e}")
return None
def fetch_ical_data(self, force_refresh: bool = False) -> Optional[Calendar]:
"""Récupère et parse le fichier iCal depuis l'agenda du libre ou depuis le cache"""
ical_content = None
# Vérifier si le cache est valide (sauf si on force le rechargement)
if not force_refresh and self.is_ical_cache_valid():
logger.info("Utilisation du cache iCal local (moins d'une heure)")
ical_content = self.load_ical_cache()
else:
if force_refresh:
logger.info(f"Rechargement forcé du fichier iCal depuis {self.ical_url}")
else:
logger.info(f"Cache iCal expiré ou absent, téléchargement depuis {self.ical_url}")
try:
response = requests.get(self.ical_url, timeout=30)
response.raise_for_status()
ical_content = response.content
# Vérifier si le contenu a changé
if not self.is_ical_content_changed(ical_content):
logger.info("Contenu iCal identique au précédent, utilisation du cache existant")
ical_content = self.load_ical_cache()
else:
logger.info("Nouveau contenu iCal détecté, mise à jour du cache")
# Sauvegarder en cache
self.save_ical_cache(ical_content)
except requests.RequestException as e:
logger.error(f"Erreur lors de la récupération du fichier iCal: {e}")
# Essayer de charger depuis le cache même s'il est expiré
logger.info("Tentative de chargement depuis le cache expiré...")
ical_content = self.load_ical_cache()
if ical_content is None:
logger.error("Impossible de récupérer le contenu iCal")
return None
try:
calendar = Calendar.from_ical(ical_content)
logger.info(f"Fichier iCal parsé avec succès")
return calendar
except Exception as e:
logger.error(f"Erreur lors du parsing du fichier iCal: {e}")
return None
def parse_event(self, event: Event) -> Optional[Dict]:
"""Parse un événement iCal et le convertit au format OEDB"""
try:
# Récupérer les propriétés de base
summary = str(event.get('summary', ''))
description = str(event.get('description', ''))
location = str(event.get('location', ''))
url = str(event.get('url', ''))
# Extraire les coordonnées GEO si disponibles
geo_coords = self.extract_geo_coordinates(event)
# Extraire les catégories si disponibles
categories = self.extract_categories(event)
# Extraire les propriétés supplémentaires
organizer = self.extract_organizer(event)
alt_description = self.extract_alt_description(event)
short_description = self.extract_short_description(event)
sequence = self.extract_sequence(event)
repeat_rules = self.extract_repeat_rules(event)
# Gestion des dates
dtstart = event.get('dtstart')
dtend = event.get('dtend')
if not dtstart:
logger.warning(f"Événement sans date de début: {summary}")
return None
# Convertir les dates
start_date = dtstart.dt
if isinstance(start_date, datetime):
start_iso = start_date.isoformat()
else:
# Date seulement (sans heure)
start_iso = f"{start_date}T00:00:00"
end_date = None
if dtend:
end_dt = dtend.dt
if isinstance(end_dt, datetime):
end_iso = end_dt.isoformat()
else:
end_iso = f"{end_dt}T23:59:59"
else:
# Si pas de date de fin, ajouter 2 heures par défaut
if isinstance(start_date, datetime):
end_iso = (start_date + timedelta(hours=2)).isoformat()
else:
end_iso = f"{start_date}T02:00:00"
# Créer l'événement au format OEDB
oedb_event = {
"properties": {
"label": summary,
"description": description,
"type": "scheduled",
"what": "culture.floss", # Type par défaut pour l'agenda du libre
"where": location,
"start": start_iso,
"stop": end_iso,
"url": url if url else None,
"source:name": "Agenda du Libre",
"source:url": "https://www.agendadulibre.org/",
"last_modified_by": "agendadulibre_scraper",
"tags": categories if categories else [], # Ajouter les catégories comme tags
"organizer": organizer, # Organisateur de l'événement
"alt_description": alt_description, # Description alternative HTML
"short_description": short_description, # Description courte
"sequence": sequence, # Numéro de séquence
"repeat_rules": repeat_rules # Règles de répétition
},
"geometry": {
"type": "Point",
"coordinates": geo_coords if geo_coords else [0, 0] # Utiliser GEO ou coordonnées par défaut
}
}
# Créer un ID unique basé sur le contenu
event_id = self.generate_event_id(summary, start_iso, location)
return {
"id": event_id,
"event": oedb_event,
}
except Exception as e:
logger.error(f"Erreur lors du parsing de l'événement: {e}")
return None
def extract_geo_coordinates(self, event: Event) -> Optional[List[float]]:
"""Extrait les coordonnées du champ GEO: de l'événement iCal"""
try:
geo = event.get('geo')
if geo:
# Le champ GEO peut être sous différentes formes
if hasattr(geo, 'lat') and hasattr(geo, 'lon'):
# Format avec attributs lat/lon
lat = float(geo.lat)
lon = float(geo.lon)
logger.info(f"📍 Coordonnées GEO trouvées: {lat}, {lon}")
return [lon, lat] # Format GeoJSON (longitude, latitude)
else:
# Format string "latitude;longitude"
geo_str = str(geo)
if ';' in geo_str:
parts = geo_str.split(';')
if len(parts) == 2:
lat = float(parts[0].strip())
lon = float(parts[1].strip())
logger.info(f"📍 Coordonnées GEO trouvées: {lat}, {lon}")
return [lon, lat] # Format GeoJSON (longitude, latitude)
else:
logger.debug(f"Format GEO non reconnu: {geo_str}")
else:
logger.debug("Aucun champ GEO trouvé")
return None
except (ValueError, AttributeError, TypeError) as e:
logger.warning(f"Erreur lors de l'extraction des coordonnées GEO: {e}")
return None
except Exception as e:
logger.error(f"Erreur inattendue lors de l'extraction GEO: {e}")
return None
def extract_categories(self, event: Event) -> List[str]:
"""Extrait les catégories du champ CATEGORIES: de l'événement iCal"""
try:
categories = []
# Le champ CATEGORIES peut apparaître plusieurs fois
for category in event.get('categories', []):
if category:
# Extraire la valeur de l'objet vCategory
if hasattr(category, 'cats'):
# Si c'est un objet vCategory avec des catégories
for cat in category.cats:
cat_str = str(cat).strip()
if cat_str:
categories.append(cat_str)
else:
# Sinon, convertir directement en string
cat_str = str(category).strip()
if cat_str:
categories.append(cat_str)
if categories:
logger.info(f"🏷️ Catégories trouvées: {', '.join(categories)}")
else:
logger.debug("Aucune catégorie trouvée")
return categories
except Exception as e:
logger.warning(f"Erreur lors de l'extraction des catégories: {e}")
return []
def extract_organizer(self, event: Event) -> Optional[str]:
"""Extrait l'organisateur du champ ORGANIZER: de l'événement iCal"""
try:
organizer = event.get('organizer')
if organizer:
organizer_str = str(organizer).strip()
if organizer_str:
logger.debug(f"👤 Organisateur trouvé: {organizer_str}")
return organizer_str
return None
except Exception as e:
logger.warning(f"Erreur lors de l'extraction de l'organisateur: {e}")
return None
def extract_alt_description(self, event: Event) -> Optional[str]:
"""Extrait la description alternative HTML du champ X-ALT-DESC;FMTTYPE=text/html: de l'événement iCal"""
try:
# Chercher le champ X-ALT-DESC avec FMTTYPE=text/html
for prop in event.property_items():
if prop[0] == 'X-ALT-DESC' and hasattr(prop[1], 'params') and prop[1].params.get('FMTTYPE') == 'text/html':
alt_desc = str(prop[1]).strip()
if alt_desc:
logger.debug(f"📄 Description alternative HTML trouvée: {len(alt_desc)} caractères")
return alt_desc
return None
except Exception as e:
logger.warning(f"Erreur lors de l'extraction de la description alternative: {e}")
return None
def extract_short_description(self, event: Event) -> Optional[str]:
"""Extrait la description courte du champ SUMMARY: de l'événement iCal"""
try:
summary = event.get('summary')
if summary:
summary_str = str(summary).strip()
if summary_str:
logger.debug(f"📝 Description courte trouvée: {summary_str}")
return summary_str
return None
except Exception as e:
logger.warning(f"Erreur lors de l'extraction de la description courte: {e}")
return None
def extract_sequence(self, event: Event) -> Optional[int]:
"""Extrait le numéro de séquence du champ SEQUENCE: de l'événement iCal"""
try:
sequence = event.get('sequence')
if sequence is not None:
seq_num = int(sequence)
logger.debug(f"🔢 Séquence trouvée: {seq_num}")
return seq_num
return None
except (ValueError, TypeError) as e:
logger.warning(f"Erreur lors de l'extraction de la séquence: {e}")
return None
except Exception as e:
logger.warning(f"Erreur inattendue lors de l'extraction de la séquence: {e}")
return None
def extract_repeat_rules(self, event: Event) -> Optional[str]:
"""Extrait les règles de répétition du champ RRULE: de l'événement iCal"""
try:
# Essayer différentes variantes de casse
rrule = event.get('rrule') or event.get('RRULE') or event.get('Rrule')
if rrule:
rrule_str = str(rrule).strip()
if rrule_str:
logger.info(f"🔄 Règles de répétition trouvées: {rrule_str}")
return rrule_str
# Vérifier aussi dans les propriétés avec parcours manuel
for prop in event.property_items():
if prop[0].upper() == 'RRULE':
rrule_str = str(prop[1]).strip()
if rrule_str:
logger.info(f"🔄 Règles de répétition trouvées (parcours): {rrule_str}")
return rrule_str
# Note: Pas de log ici car c'est normal qu'il n'y ait pas de RRULE
# dans tous les événements (seulement les événements récurrents en ont)
return None
except Exception as e:
logger.warning(f"Erreur lors de l'extraction des règles de répétition: {e}")
return None
def generate_event_id(self, summary: str, start_date: str, location: str) -> str:
"""Génère un ID unique pour l'événement"""
import hashlib
content = f"{summary}_{start_date}_{location}"
return hashlib.md5(content.encode('utf-8')).hexdigest()
def clean_location_for_geocoding(self, location: str) -> Optional[str]:
"""Nettoie le lieu pour le géocodage en extrayant l'adresse après la première virgule"""
if not location or location.strip() == "":
return None
# Diviser par la première virgule
parts = location.split(',', 1)
if len(parts) > 1:
# Prendre la partie après la première virgule
address_part = parts[1].strip()
# Vérifier si on a un numéro et une adresse
# Pattern pour détecter un numéro suivi d'une adresse
address_pattern = r'^\s*\d+.*'
if re.match(address_pattern, address_part):
logger.info(f"📍 Adresse potentielle trouvée: {address_part}")
return address_part
# Si pas de virgule ou pas d'adresse valide, essayer le lieu complet
logger.info(f"📍 Tentative de géocodage avec le lieu complet: {location}")
return location.strip()
def geocode_with_nominatim(self, location: str) -> Optional[Tuple[float, float]]:
"""Géocode un lieu avec Nominatim"""
if not location:
return None
try:
# URL de l'API Nominatim
nominatim_url = "https://nominatim.openstreetmap.org/search"
# Paramètres de la requête
params = {
'q': location,
'format': 'json',
'limit': 1,
'countrycodes': 'fr', # Limiter à la France
'addressdetails': 1
}
headers = {
'User-Agent': 'AgendaDuLibreScraper/1.0 (contact@example.com)'
}
logger.info(f"🌍 Géocodage avec Nominatim: {location}")
# Faire la requête avec un timeout
response = requests.get(nominatim_url, params=params, headers=headers, timeout=10)
response.raise_for_status()
# Parser la réponse
results = response.json()
if results and len(results) > 0:
result = results[0]
lat = float(result['lat'])
lon = float(result['lon'])
logger.info(f"✅ Géocodage réussi: {location} -> ({lat}, {lon})")
logger.info(f" Adresse trouvée: {result.get('display_name', 'N/A')}")
# Respecter la limite de 1 requête par seconde pour Nominatim
time.sleep(1)
return (lon, lat) # Retourner (longitude, latitude) pour GeoJSON
else:
logger.warning(f"⚠️ Aucun résultat de géocodage pour: {location}")
return None
except requests.RequestException as e:
logger.error(f"❌ Erreur de connexion Nominatim: {e}")
return None
except (ValueError, KeyError) as e:
logger.error(f"❌ Erreur de parsing Nominatim: {e}")
return None
except Exception as e:
logger.error(f"❌ Erreur inattendue lors du géocodage: {e}")
return None
def improve_event_coordinates(self, event_data: Dict) -> Dict:
"""Améliore les coordonnées de l'événement si nécessaire"""
coords = event_data["event"]["geometry"]["coordinates"]
# Vérifier si les coordonnées sont par défaut (0, 0)
if coords == [0, 0]:
location = event_data["event"]["properties"].get("where", "")
if location:
# Nettoyer le lieu pour le géocodage
clean_location = self.clean_location_for_geocoding(location)
if clean_location:
# Tenter le géocodage
new_coords = self.geocode_with_nominatim(clean_location)
if new_coords:
# Mettre à jour les coordonnées
event_data["event"]["geometry"]["coordinates"] = list(new_coords)
logger.info(f"🎯 Coordonnées mises à jour par géocodage: {coords} -> {new_coords}")
else:
logger.warning(f"⚠️ Impossible de géocoder: {clean_location}")
else:
logger.info(f" Lieu non géocodable: {location}")
else:
logger.info(" Aucun lieu spécifié pour le géocodage")
else:
# Vérifier si les coordonnées viennent du champ GEO
geo_coords = event_data.get("raw_ical", {}).get("geo")
if geo_coords:
logger.info(f"✅ Coordonnées utilisées depuis le champ GEO: {coords}")
else:
logger.info(f" Coordonnées déjà définies: {coords}")
return event_data
def log_event_details(self, event_data: Dict):
"""Log détaillé de l'événement avant envoi"""
props = event_data["event"]["properties"]
geom = event_data["event"]["geometry"]
logger.info("📝 Détails de l'événement à insérer:")
# INSERT_YOUR_CODE
# Affiche un dump lisible de l'événement avec json.dumps (indentation)
try:
logger.info(json.dumps(event_data, ensure_ascii=False, indent=2))
except Exception as e:
logger.warning(f"Erreur lors de l'affichage lisible de l'événement: {e}")
# logger.info(event_data)
# logger.info(f" ID: {event_data['id']}")
# logger.info(f" Titre: {props.get('label', 'N/A')}")
# logger.info(f" Description: {props.get('description', 'N/A')[:100]}{'...' if len(props.get('description', '')) > 100 else ''}")
# logger.info(f" Type: {props.get('type', 'N/A')}")
# logger.info(f" Catégorie: {props.get('what', 'N/A')}")
# logger.info(f" Lieu: {props.get('where', 'N/A')}")
# logger.info(f" Début: {props.get('start', 'N/A')}")
# logger.info(f" Fin: {props.get('stop', 'N/A')}")
# logger.info(f" URL: {props.get('url', 'N/A')}")
# logger.info(f" Source: {props.get('source:name', 'N/A')}")
# logger.info(f" Coordonnées: {geom.get('coordinates', 'N/A')}")
# logger.info(f" Tags: {', '.join(props.get('tags', [])) if props.get('tags') else 'N/A'}")
# logger.info(f" Organisateur: {props.get('organizer', 'N/A')}")
# logger.info(f" Description courte: {props.get('short_description', 'N/A')}")
# logger.info(f" Séquence: {props.get('sequence', 'N/A')}")
# logger.info(f" Règles de répétition: {props.get('repeat_rules', 'N/A')}")
# logger.info(f" Description HTML: {'Oui' if props.get('alt_description') else 'N/A'}")
# logger.info(f" Modifié par: {props.get('last_modified_by', 'N/A')}")
def send_event_to_api(self, event_data: Dict, skip_geocoding: bool = False) -> Tuple[bool, str]:
"""Envoie un événement à l'API OEDB (ou simule en mode dry-run)"""
# Améliorer les coordonnées si nécessaire (sauf si déjà traité)
if not skip_geocoding:
event_data = self.improve_event_coordinates(event_data)
else:
logger.info(" Géocodage ignoré - événement déjà traité")
# Log détaillé de l'événement
self.log_event_details(event_data)
if self.dry_run:
logger.info(f"[DRY-RUN] Simulation d'envoi de l'événement: {event_data['event']['properties']['label']}")
return True, "Simulé (dry-run)"
try:
url = f"{self.api_base_url}/event"
headers = {"Content-Type": "application/json"}
# Formater l'événement au format GeoJSON attendu par l'API
geojson_event = {
"type": "Feature",
"geometry": event_data["event"]["geometry"],
"properties": event_data["event"]["properties"]
}
logger.info(f"🌐 Envoi à l'API: {url}")
response = requests.post(url, json=geojson_event, headers=headers, timeout=30)
if response.status_code == 201:
logger.info("✅ Événement créé avec succès dans l'API")
return True, "Créé avec succès"
elif response.status_code == 409:
logger.warning("⚠️ Événement déjà existant dans l'API")
return False, "Événement déjà existant"
else:
logger.error(f"❌ Erreur API: {response.status_code} - {response.text}")
return False, f"Erreur API: {response.status_code} - {response.text}"
except requests.RequestException as e:
logger.error(f"❌ Erreur de connexion: {e}")
return False, f"Erreur de connexion: {e}"
except Exception as e:
logger.error(f"❌ Erreur inattendue: {e}")
return False, f"Erreur inattendue: {e}"
def process_single_event(self, event_data: Dict) -> Tuple[str, bool, str]:
"""Traite un événement individuellement (thread-safe)"""
event_id = event_data["id"]
event_label = event_data["event"]["properties"]["label"]
try:
# Vérifier si l'événement a déjà été traité avec succès
skip_geocoding = False
if event_id in self.events_data["events"]:
event_status = self.events_data["events"][event_id].get("status", "unknown")
if event_status in ["saved", "already_exists"]:
skip_geocoding = True
logger.info(f" Géocodage ignoré pour {event_label} - déjà traité")
# Envoyer à l'API
success, message = self.send_event_to_api(event_data, skip_geocoding=skip_geocoding)
return event_id, success, message
except Exception as e:
logger.error(f"❌ Erreur lors du traitement de {event_label}: {e}")
return event_id, False, f"Erreur: {e}"
def process_events(self, calendar: Calendar) -> Dict:
"""Traite tous les événements du calendrier"""
stats = {
"total_events": 0,
"new_events": 0,
"already_saved": 0,
"api_errors": 0,
"parse_errors": 0,
"sent_this_run": 0,
"skipped_due_to_limit": 0
}
events_to_process = []
pending_events = [] # Événements en attente d'envoi
processed_count = 0
# Parcourir tous les événements
for component in calendar.walk():
if component.name == "VEVENT":
stats["total_events"] += 1
# Parser l'événement
parsed_event = self.parse_event(component)
if not parsed_event:
stats["parse_errors"] += 1
continue
event_id = parsed_event["id"]
event_label = parsed_event["event"]["properties"]["label"]
# Vérifier le statut de l'événement
event_status = None
skip_reason = ""
# Vérifier dans les données d'événements
if event_id in self.events_data["events"]:
event_status = self.events_data["events"][event_id].get("status", "unknown")
if event_status in ["saved", "already_exists"]:
stats["already_saved"] += 1
logger.info(f"⏭️ Événement ignoré: {event_label} - déjà traité (status: {event_status})")
continue
# Vérifier dans le cache des événements traités
if event_id in self.cache_data["processed_events"]:
cache_status = self.cache_data["processed_events"][event_id].get("status", "unknown")
if cache_status in ["saved", "already_exists"]:
stats["already_saved"] += 1
logger.info(f"⏭️ Événement ignoré: {event_label} - déjà dans le cache (status: {cache_status})")
continue
# Déterminer la priorité de l'événement
priority = 0 # 0 = nouveau, 1 = en attente, 2 = échec précédent
if event_status in ["pending", "failed", "api_error"]:
priority = 1 # Priorité haute pour les événements en attente
logger.info(f"🔄 Événement en attente prioritaire: {event_label} (status: {event_status})")
elif event_id in self.cache_data["processed_events"]:
cache_status = self.cache_data["processed_events"][event_id].get("status", "unknown")
if cache_status in ["pending", "failed", "api_error"]:
priority = 1 # Priorité haute pour les événements en attente dans le cache
logger.info(f"🔄 Événement en attente du cache: {event_label} (status: {cache_status})")
# Ajouter l'événement avec sa priorité
event_with_priority = {
"event": parsed_event,
"priority": priority,
"event_id": event_id,
"event_label": event_label
}
if priority > 0:
pending_events.append(event_with_priority)
else:
events_to_process.append(event_with_priority)
# Trier les événements : d'abord les événements en attente, puis les nouveaux
all_events = pending_events + events_to_process
all_events.sort(key=lambda x: x["priority"], reverse=True) # Priorité décroissante
# Appliquer la limite d'événements
if self.max_events:
all_events = all_events[:self.max_events]
if len(pending_events) + len(events_to_process) > self.max_events:
stats["skipped_due_to_limit"] = len(pending_events) + len(events_to_process) - self.max_events
# Extraire les événements pour le traitement
events_to_process = [item["event"] for item in all_events]
# Traiter les événements
if self.parallel and len(events_to_process) > 10:
logger.info(f"🚀 Traitement parallèle de {len(events_to_process)} événements avec {self.max_workers} workers")
if self.max_events:
logger.info(f"Limite d'événements: {self.max_events}")
if self.dry_run:
logger.info("Mode DRY-RUN activé - aucun événement ne sera envoyé à l'API")
# Traitement parallèle
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# Soumettre tous les événements
future_to_event = {
executor.submit(self.process_single_event, event_data): event_data
for event_data in events_to_process
}
# Traiter les résultats au fur et à mesure
for future in as_completed(future_to_event):
event_data = future_to_event[future]
event_id, success, message = future.result()
event_label = event_data["event"]["properties"]["label"]
# Mettre à jour les statistiques et les données locales
if success:
stats["new_events"] += 1
stats["sent_this_run"] += 1
self.events_data["events"][event_id] = {
"status": "saved",
"message": message,
"last_attempt": datetime.now().isoformat(),
"event": event_data["event"]
}
# Ajouter au cache des événements traités
self.cache_data["processed_events"][event_id] = {
"processed_at": datetime.now().isoformat(),
"status": "saved",
"event_label": event_label
}
logger.info(f"{event_label} - {message}")
else:
if "déjà existant" in message or "already exists" in message.lower():
stats["already_saved"] += 1
self.events_data["events"][event_id] = {
"status": "already_exists",
"message": message,
"last_attempt": datetime.now().isoformat(),
"event": event_data["event"]
}
# Ajouter au cache même si déjà existant
self.cache_data["processed_events"][event_id] = {
"processed_at": datetime.now().isoformat(),
"status": "already_exists",
"event_label": event_label
}
logger.info(f"{event_label} - {message}")
else:
stats["api_errors"] += 1
self.events_data["events"][event_id] = {
"status": "api_error",
"message": message,
"last_attempt": datetime.now().isoformat(),
"event": event_data["event"]
}
logger.error(f"{event_label} - {message}")
# Sauvegarder les données après chaque événement
self.save_events_data()
self.save_cache_data()
else:
# Traitement séquentiel (mode original)
logger.info(f"Traitement séquentiel de {len(events_to_process)} événements par batch de {self.batch_size}")
if self.max_events:
logger.info(f"Limite d'événements: {self.max_events}")
if self.dry_run:
logger.info("Mode DRY-RUN activé - aucun événement ne sera envoyé à l'API")
for i in range(0, len(events_to_process), self.batch_size):
batch = events_to_process[i:i + self.batch_size]
logger.info(f"Traitement du batch {i//self.batch_size + 1}/{(len(events_to_process) + self.batch_size - 1)//self.batch_size}")
for event_data in batch:
event_id, success, message = self.process_single_event(event_data)
event_label = event_data["event"]["properties"]["label"]
# Mettre à jour les statistiques et les données locales
if success:
stats["new_events"] += 1
stats["sent_this_run"] += 1
self.events_data["events"][event_id] = {
"status": "saved",
"message": message,
"last_attempt": datetime.now().isoformat(),
"event": event_data["event"]
}
# Ajouter au cache des événements traités
self.cache_data["processed_events"][event_id] = {
"processed_at": datetime.now().isoformat(),
"status": "saved",
"event_label": event_label
}
logger.info(f"{event_label} - {message}")
else:
if "déjà existant" in message or "already exists" in message.lower():
stats["already_saved"] += 1
self.events_data["events"][event_id] = {
"status": "already_exists",
"message": message,
"last_attempt": datetime.now().isoformat(),
"event": event_data["event"]
}
# Ajouter au cache même si déjà existant
self.cache_data["processed_events"][event_id] = {
"processed_at": datetime.now().isoformat(),
"status": "already_exists",
"event_label": event_label
}
logger.info(f"{event_label} - {message}")
else:
stats["api_errors"] += 1
self.events_data["events"][event_id] = {
"status": "api_error",
"message": message,
"last_attempt": datetime.now().isoformat(),
"event": event_data["event"]
}
logger.error(f"{event_label} - {message}")
# Sauvegarder les données après chaque événement
self.save_events_data()
self.save_cache_data()
# Mettre à jour la date de dernière mise à jour
self.events_data["last_update"] = datetime.now().isoformat()
# Sauvegarder le cache
self.save_cache_data()
return stats
def run(self, force_refresh: bool = False):
"""Exécute le scraping complet"""
logger.info("🚀 Démarrage du scraping de l'agenda du libre")
logger.info(f"Configuration: batch_size={self.batch_size}, api_url={self.api_base_url}")
logger.info(f"Mode dry-run: {'OUI' if self.dry_run else 'NON'}")
if self.max_events:
logger.info(f"Limite d'événements: {self.max_events}")
logger.info(f"Cache iCal: {'ignoré' if force_refresh else f'valide pendant {self.cache_duration_hours}h'}")
# Récupérer le fichier iCal
calendar = self.fetch_ical_data(force_refresh=force_refresh)
if not calendar:
logger.error("❌ Impossible de récupérer le fichier iCal")
return False
# Traiter les événements
stats = self.process_events(calendar)
# Sauvegarder les données
self.save_events_data()
# Afficher les statistiques finales
logger.info("📊 Statistiques finales:")
logger.info(f" Total d'événements trouvés: {stats['total_events']}")
logger.info(f" Nouveaux événements envoyés: {stats['new_events']}")
logger.info(f" Événements déjà existants: {stats['already_saved']}")
logger.info(f" Erreurs d'API: {stats['api_errors']}")
logger.info(f" Erreurs de parsing: {stats['parse_errors']}")
logger.info(f" Événements envoyés cette fois: {stats['sent_this_run']}")
if stats['skipped_due_to_limit'] > 0:
logger.info(f" Événements ignorés (limite atteinte): {stats['skipped_due_to_limit']}")
logger.info("✅ Scraping terminé avec succès")
return True
def main():
parser = argparse.ArgumentParser(description="Scraper pour l'agenda du libre")
parser.add_argument("--api-url", default=api_oedb,
help=f"URL de base de l'API OEDB (défaut: {api_oedb})")
parser.add_argument("--batch-size", type=int, default=1,
help="Nombre d'événements à envoyer par batch (défaut: 1)")
parser.add_argument("--max-events", type=int, default=None,
help="Limiter le nombre d'événements à traiter (défaut: aucun)")
parser.add_argument("--dry-run", action="store_true", default=True,
help="Mode dry-run par défaut (simulation sans envoi à l'API)")
parser.add_argument("--no-dry-run", action="store_true",
help="Désactiver le mode dry-run (envoi réel à l'API)")
parser.add_argument("--verbose", "-v", action="store_true",
help="Mode verbeux")
parser.add_argument("--force-refresh", "-f", action="store_true",
help="Forcer le rechargement du fichier iCal (ignorer le cache)")
parser.add_argument("--cache-duration", type=int, default=1,
help="Durée de validité du cache en heures (défaut: 1)")
parser.add_argument("--parallel", action="store_true",
help="Activer le traitement parallèle pour plus de 10 événements")
parser.add_argument("--max-workers", type=int, default=4,
help="Nombre maximum de workers pour le traitement parallèle (défaut: 4)")
args = parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
# Déterminer le mode dry-run
dry_run = args.dry_run and not args.no_dry_run
# Créer et exécuter le scraper
scraper = AgendaDuLibreScraper(
api_base_url=args.api_url,
batch_size=args.batch_size,
max_events=args.max_events,
dry_run=dry_run,
parallel=args.parallel,
max_workers=args.max_workers
)
# Modifier la durée de cache si spécifiée
scraper.cache_duration_hours = args.cache_duration
# Exécuter avec ou sans rechargement forcé
success = scraper.run(force_refresh=args.force_refresh)
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()