oedb-backend/extractors/agendadulibre.py
2025-10-04 19:26:00 +02:00

402 lines
No EOL
17 KiB
Python

#!/usr/bin/env python3
"""
Script de scraping pour l'agenda du libre (https://www.agendadulibre.org/)
Utilise le fichier iCal pour récupérer les événements et les envoyer à l'API OEDB
"""
import requests
import json
import os
import sys
import argparse
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
import icalendar
from icalendar import Calendar, Event
import logging
# Configuration par défaut
api_oedb = "https://api.openeventdatabase.org"
# Configuration du logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('agendadulibre_scraper.log'),
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
class AgendaDuLibreScraper:
def __init__(self, api_base_url: str = api_oedb, batch_size: int = 1):
self.api_base_url = api_base_url
self.batch_size = batch_size
self.data_file = "agendadulibre_events.json"
self.ical_file = "agendadulibre_events.ics"
self.ical_url = "https://www.agendadulibre.org/events.ics"
self.cache_duration_hours = 1 # Durée de cache en heures
# Charger les données existantes
self.events_data = self.load_events_data()
def load_events_data(self) -> Dict:
"""Charge les données d'événements depuis le fichier JSON local"""
if os.path.exists(self.data_file):
try:
with open(self.data_file, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
logger.error(f"Erreur lors du chargement du fichier {self.data_file}: {e}")
return {"events": {}, "last_update": None}
return {"events": {}, "last_update": None}
def save_events_data(self):
"""Sauvegarde les données d'événements dans le fichier JSON local"""
try:
with open(self.data_file, 'w', encoding='utf-8') as f:
json.dump(self.events_data, f, ensure_ascii=False, indent=2)
except Exception as e:
logger.error(f"Erreur lors de la sauvegarde du fichier {self.data_file}: {e}")
def is_ical_cache_valid(self) -> bool:
"""Vérifie si le cache iCal est encore valide (moins d'une heure)"""
if not os.path.exists(self.ical_file):
return False
try:
file_time = os.path.getmtime(self.ical_file)
cache_age = datetime.now().timestamp() - file_time
cache_age_hours = cache_age / 3600
logger.debug(f"Cache iCal âgé de {cache_age_hours:.2f} heures")
return cache_age_hours < self.cache_duration_hours
except Exception as e:
logger.error(f"Erreur lors de la vérification du cache iCal: {e}")
return False
def save_ical_cache(self, ical_content: bytes):
"""Sauvegarde le contenu iCal en cache local"""
try:
with open(self.ical_file, 'wb') as f:
f.write(ical_content)
logger.info(f"Cache iCal sauvegardé dans {self.ical_file}")
except Exception as e:
logger.error(f"Erreur lors de la sauvegarde du cache iCal: {e}")
def load_ical_cache(self) -> Optional[bytes]:
"""Charge le contenu iCal depuis le cache local"""
try:
with open(self.ical_file, 'rb') as f:
content = f.read()
logger.info(f"Cache iCal chargé depuis {self.ical_file}")
return content
except Exception as e:
logger.error(f"Erreur lors du chargement du cache iCal: {e}")
return None
def fetch_ical_data(self, force_refresh: bool = False) -> Optional[Calendar]:
"""Récupère et parse le fichier iCal depuis l'agenda du libre ou depuis le cache"""
ical_content = None
# Vérifier si le cache est valide (sauf si on force le rechargement)
if not force_refresh and self.is_ical_cache_valid():
logger.info("Utilisation du cache iCal local (moins d'une heure)")
ical_content = self.load_ical_cache()
else:
if force_refresh:
logger.info(f"Rechargement forcé du fichier iCal depuis {self.ical_url}")
else:
logger.info(f"Cache iCal expiré ou absent, téléchargement depuis {self.ical_url}")
try:
response = requests.get(self.ical_url, timeout=30)
response.raise_for_status()
ical_content = response.content
# Sauvegarder en cache
self.save_ical_cache(ical_content)
except requests.RequestException as e:
logger.error(f"Erreur lors de la récupération du fichier iCal: {e}")
# Essayer de charger depuis le cache même s'il est expiré
logger.info("Tentative de chargement depuis le cache expiré...")
ical_content = self.load_ical_cache()
if ical_content is None:
logger.error("Impossible de récupérer le contenu iCal")
return None
try:
calendar = Calendar.from_ical(ical_content)
logger.info(f"Fichier iCal parsé avec succès")
return calendar
except Exception as e:
logger.error(f"Erreur lors du parsing du fichier iCal: {e}")
return None
def parse_event(self, event: Event) -> Optional[Dict]:
"""Parse un événement iCal et le convertit au format OEDB"""
try:
# Récupérer les propriétés de base
summary = str(event.get('summary', ''))
description = str(event.get('description', ''))
location = str(event.get('location', ''))
url = str(event.get('url', ''))
# Gestion des dates
dtstart = event.get('dtstart')
dtend = event.get('dtend')
if not dtstart:
logger.warning(f"Événement sans date de début: {summary}")
return None
# Convertir les dates
start_date = dtstart.dt
if isinstance(start_date, datetime):
start_iso = start_date.isoformat()
else:
# Date seulement (sans heure)
start_iso = f"{start_date}T00:00:00"
end_date = None
if dtend:
end_dt = dtend.dt
if isinstance(end_dt, datetime):
end_iso = end_dt.isoformat()
else:
end_iso = f"{end_dt}T23:59:59"
else:
# Si pas de date de fin, ajouter 2 heures par défaut
if isinstance(start_date, datetime):
end_iso = (start_date + timedelta(hours=2)).isoformat()
else:
end_iso = f"{start_date}T02:00:00"
# Créer l'événement au format OEDB
oedb_event = {
"properties": {
"label": summary,
"description": description,
"type": "scheduled",
"what": "culture.floss", # Type par défaut pour l'agenda du libre
"where": location,
"start": start_iso,
"stop": end_iso,
"url": url if url else None,
"source:name": "Agenda du Libre",
"source:url": "https://www.agendadulibre.org/",
"last_modified_by": "agendadulibre_scraper"
},
"geometry": {
"type": "Point",
"coordinates": [0, 0] # Coordonnées par défaut, à géocoder si nécessaire
}
}
# Créer un ID unique basé sur le contenu
event_id = self.generate_event_id(summary, start_iso, location)
return {
"id": event_id,
"event": oedb_event,
"raw_ical": {
"summary": summary,
"description": description,
"location": location,
"url": url,
"dtstart": start_iso,
"dtend": end_iso
}
}
except Exception as e:
logger.error(f"Erreur lors du parsing de l'événement: {e}")
return None
def generate_event_id(self, summary: str, start_date: str, location: str) -> str:
"""Génère un ID unique pour l'événement"""
import hashlib
content = f"{summary}_{start_date}_{location}"
return hashlib.md5(content.encode('utf-8')).hexdigest()
def send_event_to_api(self, event_data: Dict) -> Tuple[bool, str]:
"""Envoie un événement à l'API OEDB"""
try:
url = f"{self.api_base_url}/event"
headers = {"Content-Type": "application/json"}
# Formater l'événement au format GeoJSON attendu par l'API
geojson_event = {
"type": "Feature",
"geometry": event_data["event"]["geometry"],
"properties": event_data["event"]["properties"]
}
response = requests.post(url, json=geojson_event, headers=headers, timeout=30)
if response.status_code == 201:
return True, "Créé avec succès"
elif response.status_code == 409:
return False, "Événement déjà existant"
else:
return False, f"Erreur API: {response.status_code} - {response.text}"
except requests.RequestException as e:
return False, f"Erreur de connexion: {e}"
except Exception as e:
return False, f"Erreur inattendue: {e}"
def process_events(self, calendar: Calendar) -> Dict:
"""Traite tous les événements du calendrier"""
stats = {
"total_events": 0,
"new_events": 0,
"already_saved": 0,
"api_errors": 0,
"parse_errors": 0,
"sent_this_run": 0
}
events_to_process = []
# Parcourir tous les événements
for component in calendar.walk():
if component.name == "VEVENT":
stats["total_events"] += 1
# Parser l'événement
parsed_event = self.parse_event(component)
if not parsed_event:
stats["parse_errors"] += 1
continue
event_id = parsed_event["id"]
# Vérifier si l'événement existe déjà dans nos données
if event_id in self.events_data["events"]:
event_status = self.events_data["events"][event_id].get("status", "unknown")
if event_status in ["saved", "already_exists"]:
stats["already_saved"] += 1
logger.debug(f"Événement déjà traité: {parsed_event['event']['properties']['label']}")
continue
events_to_process.append(parsed_event)
# Traiter les événements par batch
logger.info(f"Traitement de {len(events_to_process)} nouveaux événements par batch de {self.batch_size}")
for i in range(0, len(events_to_process), self.batch_size):
batch = events_to_process[i:i + self.batch_size]
logger.info(f"Traitement du batch {i//self.batch_size + 1}/{(len(events_to_process) + self.batch_size - 1)//self.batch_size}")
for event_data in batch:
event_id = event_data["id"]
event_label = event_data["event"]["properties"]["label"]
logger.info(f"Envoi de l'événement: {event_label}")
# Envoyer à l'API
success, message = self.send_event_to_api(event_data)
# Mettre à jour les statistiques et les données locales
if success:
stats["new_events"] += 1
stats["sent_this_run"] += 1
self.events_data["events"][event_id] = {
"status": "saved",
"message": message,
"last_attempt": datetime.now().isoformat(),
"event": event_data["event"]
}
logger.info(f"{event_label} - {message}")
else:
if "déjà existant" in message or "already exists" in message.lower():
stats["already_saved"] += 1
self.events_data["events"][event_id] = {
"status": "already_exists",
"message": message,
"last_attempt": datetime.now().isoformat(),
"event": event_data["event"]
}
logger.info(f"⚠️ {event_label} - {message}")
else:
stats["api_errors"] += 1
self.events_data["events"][event_id] = {
"status": "error",
"message": message,
"last_attempt": datetime.now().isoformat(),
"event": event_data["event"]
}
logger.error(f"{event_label} - {message}")
# Mettre à jour la date de dernière mise à jour
self.events_data["last_update"] = datetime.now().isoformat()
return stats
def run(self, force_refresh: bool = False):
"""Exécute le scraping complet"""
logger.info("🚀 Démarrage du scraping de l'agenda du libre")
logger.info(f"Configuration: batch_size={self.batch_size}, api_url={self.api_base_url}")
logger.info(f"Cache iCal: {'ignoré' if force_refresh else f'valide pendant {self.cache_duration_hours}h'}")
# Récupérer le fichier iCal
calendar = self.fetch_ical_data(force_refresh=force_refresh)
if not calendar:
logger.error("❌ Impossible de récupérer le fichier iCal")
return False
# Traiter les événements
stats = self.process_events(calendar)
# Sauvegarder les données
self.save_events_data()
# Afficher les statistiques finales
logger.info("📊 Statistiques finales:")
logger.info(f" Total d'événements trouvés: {stats['total_events']}")
logger.info(f" Nouveaux événements envoyés: {stats['new_events']}")
logger.info(f" Événements déjà existants: {stats['already_saved']}")
logger.info(f" Erreurs d'API: {stats['api_errors']}")
logger.info(f" Erreurs de parsing: {stats['parse_errors']}")
logger.info(f" Événements envoyés cette fois: {stats['sent_this_run']}")
logger.info("✅ Scraping terminé avec succès")
return True
def main():
parser = argparse.ArgumentParser(description="Scraper pour l'agenda du libre")
parser.add_argument("--api-url", default=api_oedb,
help=f"URL de base de l'API OEDB (défaut: {api_oedb})")
parser.add_argument("--batch-size", type=int, default=1,
help="Nombre d'événements à envoyer par batch (défaut: 1)")
parser.add_argument("--verbose", "-v", action="store_true",
help="Mode verbeux")
parser.add_argument("--force-refresh", "-f", action="store_true",
help="Forcer le rechargement du fichier iCal (ignorer le cache)")
parser.add_argument("--cache-duration", type=int, default=1,
help="Durée de validité du cache en heures (défaut: 1)")
args = parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
# Créer et exécuter le scraper
scraper = AgendaDuLibreScraper(
api_base_url=args.api_url,
batch_size=args.batch_size
)
# Modifier la durée de cache si spécifiée
scraper.cache_duration_hours = args.cache_duration
# Exécuter avec ou sans rechargement forcé
success = scraper.run(force_refresh=args.force_refresh)
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()