oedb-backend/extractors/agenda_geek.py

416 lines
16 KiB
Python
Raw Normal View History

2025-09-26 17:16:29 +02:00
#!/usr/bin/env python3
"""
Scraper pour l'agenda geek - Import des événements dans OEDB
Usage:
python3 agenda_geek.py --limit 10 --offset 0
Options:
--limit: Nombre d'événements à traiter (défaut: 5)
--offset: Nombre d'événements à ignorer (défaut: 0)
--api-url: URL de l'API OEDB (défaut: https://api.openeventdatabase.org)
--dry-run: Mode test sans envoi vers l'API
--verbose: Mode verbeux
"""
import requests
import argparse
import re
import logging
from bs4 import BeautifulSoup
from icalendar import Calendar
from datetime import datetime, timezone
from urllib.parse import urljoin, urlparse
from typing import Optional, Dict, List, Tuple
import time
import json
# Configuration du logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('agenda_geek_scraper.log')
]
)
logger = logging.getLogger(__name__)
class AgendaGeekScraper:
2025-09-26 17:38:30 +02:00
def __init__(self, api_url: str = "https://api.openeventdatabase.org", dry_run: bool = False, page: int = 1):
2025-09-26 17:16:29 +02:00
self.api_url = api_url.rstrip('/')
self.dry_run = dry_run
2025-09-26 17:38:30 +02:00
self.page = page
2025-09-26 17:16:29 +02:00
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'OEDB-AgendaGeek-Scraper/1.0 (+https://github.com/cquest/oedb)'
})
def get_events_list(self) -> List[str]:
"""Récupère la liste des liens d'événements depuis la page principale"""
2025-09-26 17:38:30 +02:00
url = f"https://lagendageek.com/tevents/page/{self.page}"
2025-09-26 17:16:29 +02:00
logger.info(f"🔍 Récupération de la liste des événements depuis {url}")
try:
response = self.session.get(url, timeout=30)
response.raise_for_status()
soup = BeautifulSoup(response.content, 'html.parser')
event_links = []
# Rechercher les liens des titres d'événements
title_links = soup.find_all('a', class_='tribe-events-calendar-list__event-title-link')
for link in title_links:
href = link.get('href')
if href:
full_url = urljoin(url, href)
event_links.append(full_url)
logger.debug(f"📅 Événement trouvé: {link.get_text(strip=True)} - {full_url}")
logger.info(f"{len(event_links)} événements trouvés sur la page")
return event_links
except requests.RequestException as e:
logger.error(f"❌ Erreur lors de la récupération de la liste: {e}")
return []
def get_ical_link(self, event_url: str) -> Optional[str]:
"""Extrait le lien iCal depuis une page d'événement"""
logger.debug(f"🔗 Recherche du lien iCal pour {event_url}")
try:
response = self.session.get(event_url, timeout=30)
response.raise_for_status()
# Le lien iCal est généralement construit en ajoutant ?ical=1 à l'URL
ical_url = f"{event_url.rstrip('/')}/?ical=1"
# Vérifier que le lien iCal existe
ical_response = self.session.head(ical_url, timeout=10)
if ical_response.status_code == 200:
logger.debug(f"✅ Lien iCal trouvé: {ical_url}")
return ical_url
else:
logger.warning(f"⚠️ Lien iCal non accessible: {ical_url} (status: {ical_response.status_code})")
return None
except requests.RequestException as e:
logger.error(f"❌ Erreur lors de la récupération du lien iCal: {e}")
return None
def parse_ical(self, ical_url: str) -> Optional[Dict]:
"""Parse un fichier iCal et extrait les données de l'événement"""
# Convertir webcal:// en https://
if ical_url.startswith('webcal://'):
ical_url = ical_url.replace('webcal://', 'https://')
logger.debug(f"📖 Parse du fichier iCal: {ical_url}")
try:
response = self.session.get(ical_url, timeout=30)
response.raise_for_status()
# Parser le contenu iCal
cal = Calendar.from_ical(response.content)
for component in cal.walk():
if component.name == "VEVENT":
event_data = {
'summary': str(component.get('SUMMARY', '')),
'description': str(component.get('DESCRIPTION', '')),
'location': str(component.get('LOCATION', '')),
'dtstart': component.get('DTSTART'),
'dtend': component.get('DTEND'),
'geo': component.get('GEO'),
'url': str(component.get('URL', '')),
'uid': str(component.get('UID', ''))
}
logger.debug(f"📅 Événement parsé: {event_data['summary']}")
return event_data
logger.warning("⚠️ Aucun événement VEVENT trouvé dans le fichier iCal")
return None
except Exception as e:
logger.error(f"❌ Erreur lors du parsing iCal: {e}")
return None
def geocode_address(self, address: str) -> Optional[Tuple[float, float]]:
"""Géocode une adresse en utilisant Nominatim"""
if not address or address.strip() == '':
return None
logger.debug(f"🌍 Géocodage de l'adresse: {address}")
try:
# Utiliser Nominatim pour le géocodage
geocode_url = "https://nominatim.openstreetmap.org/search"
params = {
'q': address,
'format': 'json',
'limit': 1,
'countrycodes': 'fr', # Limiter à la France
'addressdetails': 1
}
response = self.session.get(geocode_url, params=params, timeout=10)
response.raise_for_status()
results = response.json()
if results:
result = results[0]
lat = float(result['lat'])
lon = float(result['lon'])
logger.debug(f"✅ Géocodage réussi: {lat}, {lon}")
return (lat, lon)
else:
logger.warning(f"⚠️ Aucun résultat de géocodage pour: {address}")
return None
except Exception as e:
logger.error(f"❌ Erreur lors du géocodage: {e}")
return None
def extract_coordinates(self, event_data: Dict) -> Optional[Tuple[float, float]]:
"""Extrait les coordonnées depuis les données de l'événement"""
# D'abord essayer la propriété GEO
if event_data.get('geo'):
try:
geo = event_data['geo']
logger.debug(f"🔍 Type GEO trouvé: {type(geo)} - Valeur: {geo}")
# Cas 1: GEO avec paramètres latitude/longitude
if hasattr(geo, 'params') and 'latitude' in geo.params and 'longitude' in geo.params:
lat = float(geo.params['latitude'])
lon = float(geo.params['longitude'])
logger.debug(f"📍 Coordonnées GEO (params) trouvées: {lat}, {lon}")
return (lat, lon)
# Cas 2: GEO avec méthode to_ical
elif hasattr(geo, 'to_ical'):
# Format GEO standard: "latitude;longitude"
geo_bytes = geo.to_ical()
# Gérer le cas où c'est déjà une string ou des bytes
if isinstance(geo_bytes, bytes):
geo_str = geo_bytes.decode('utf-8')
else:
geo_str = str(geo_bytes)
logger.debug(f"🔍 GEO string extrait: '{geo_str}'")
parts = geo_str.split(';')
if len(parts) == 2:
lat = float(parts[0])
lon = float(parts[1])
logger.debug(f"📍 Coordonnées GEO parsées: {lat}, {lon}")
return (lat, lon)
# Cas 3: GEO est directement une string
elif isinstance(geo, str):
logger.debug(f"🔍 GEO est une string directe: '{geo}'")
parts = geo.split(';')
if len(parts) == 2:
lat = float(parts[0])
lon = float(parts[1])
logger.debug(f"📍 Coordonnées GEO (string) parsées: {lat}, {lon}")
return (lat, lon)
# Cas 4: Autres formats possibles
else:
logger.debug(f"🔍 Format GEO non reconnu, tentative de conversion en string: {str(geo)}")
geo_str = str(geo)
if ';' in geo_str:
parts = geo_str.split(';')
if len(parts) == 2:
lat = float(parts[0])
lon = float(parts[1])
logger.debug(f"📍 Coordonnées GEO (fallback) parsées: {lat}, {lon}")
return (lat, lon)
except (ValueError, AttributeError) as e:
logger.warning(f"⚠️ Erreur parsing GEO: {e}")
# Si pas de GEO, essayer de géocoder la location
location = event_data.get('location', '').strip()
if location:
return self.geocode_address(location)
return None
def format_for_oedb(self, event_data: Dict, coordinates: Tuple[float, float], source_url: str) -> Dict:
"""Formate les données de l'événement pour l'API OEDB"""
lat, lon = coordinates
# Convertir les dates
dtstart = event_data.get('dtstart')
dtend = event_data.get('dtend')
start_iso = None
end_iso = None
if dtstart:
if hasattr(dtstart, 'dt'):
dt = dtstart.dt
if not isinstance(dt, datetime):
# Si c'est juste une date, créer un datetime
dt = datetime.combine(dt, datetime.min.time())
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
start_iso = dt.isoformat()
if dtend:
if hasattr(dtend, 'dt'):
dt = dtend.dt
if not isinstance(dt, datetime):
dt = datetime.combine(dt, datetime.min.time())
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
end_iso = dt.isoformat()
# Si pas de date de fin, définir à +2h de la date de début
if start_iso and not end_iso:
start_dt = datetime.fromisoformat(start_iso.replace('Z', '+00:00'))
end_dt = start_dt.replace(hour=start_dt.hour + 2)
end_iso = end_dt.isoformat()
# Construire l'objet pour l'API OEDB
oedb_event = {
"type": "Feature",
"geometry": {
"type": "Point",
"coordinates": [lon, lat]
},
"properties": {
"label": event_data.get('summary', 'Événement Agenda Geek'),
"type": "scheduled",
"what": "culture.geek",
"start": start_iso,
"stop": end_iso,
"where": event_data.get('location', ''),
"description": event_data.get('description', ''),
"source:name": "L'Agenda Geek",
"source:url": source_url,
"source:uid": event_data.get('uid', ''),
"url": event_data.get('url', source_url)
}
}
return oedb_event
def send_to_oedb(self, event: Dict) -> bool:
"""Envoie un événement vers l'API OEDB"""
if self.dry_run:
logger.info(f"🏃‍♂️ DRY RUN - Événement qui serait envoyé:")
logger.info(json.dumps(event, indent=2, ensure_ascii=False))
return True
try:
response = self.session.post(
f"{self.api_url}/event",
json=event,
timeout=30
)
if response.status_code == 201:
result = response.json()
event_id = result.get('id', 'unknown')
logger.info(f"✅ Événement créé avec succès: ID {event_id}")
return True
elif response.status_code == 409:
logger.info("⚠️ Événement déjà existant (conflit)")
return True # Considérer comme un succès
else:
logger.error(f"❌ Erreur API ({response.status_code}): {response.text}")
return False
except requests.RequestException as e:
logger.error(f"❌ Erreur lors de l'envoi vers l'API: {e}")
return False
def process_events(self, limit: int = 5, offset: int = 0) -> None:
"""Traite les événements avec pagination"""
logger.info(f"🚀 Début du traitement - Limite: {limit}, Offset: {offset}")
# Récupérer la liste des événements
event_links = self.get_events_list()
if not event_links:
logger.error("❌ Aucun événement trouvé")
return
# Appliquer l'offset et la limite
total_events = len(event_links)
start_idx = min(offset, total_events)
end_idx = min(offset + limit, total_events)
events_to_process = event_links[start_idx:end_idx]
logger.info(f"📊 Traitement de {len(events_to_process)} événements ({start_idx+1} à {end_idx} sur {total_events})")
success_count = 0
error_count = 0
for i, event_url in enumerate(events_to_process, 1):
logger.info(f"🔄 [{i}/{len(events_to_process)}] Traitement de {event_url}")
try:
# Obtenir le lien iCal
ical_url = self.get_ical_link(event_url)
if not ical_url:
logger.warning(f"⚠️ Pas de lien iCal trouvé pour {event_url}")
error_count += 1
continue
# Parser le fichier iCal
event_data = self.parse_ical(ical_url)
if not event_data:
logger.warning(f"⚠️ Impossible de parser l'iCal pour {event_url}")
error_count += 1
continue
# Extraire les coordonnées
coordinates = self.extract_coordinates(event_data)
if not coordinates:
logger.warning(f"⚠️ Pas de coordonnées trouvées pour {event_data.get('summary', 'événement sans titre')}")
error_count += 1
continue
# Formater pour OEDB
oedb_event = self.format_for_oedb(event_data, coordinates, event_url)
# Envoyer vers l'API
if self.send_to_oedb(oedb_event):
success_count += 1
else:
error_count += 1
# Pause entre les requêtes pour éviter la surcharge
time.sleep(1)
except Exception as e:
logger.error(f"❌ Erreur lors du traitement de {event_url}: {e}")
error_count += 1
logger.info(f"🏁 Traitement terminé - Succès: {success_count}, Erreurs: {error_count}")
def main():
parser = argparse.ArgumentParser(description='Scraper Agenda Geek vers OEDB')
2025-09-26 17:38:30 +02:00
parser.add_argument('--limit', type=int, default=20, help='Nombre d\'événements à traiter')
parser.add_argument('--page', type=int, default=1, help='Numéro de page du site')
2025-09-26 17:16:29 +02:00
parser.add_argument('--offset', type=int, default=0, help='Nombre d\'événements à ignorer')
parser.add_argument('--api-url', default='https://api.openeventdatabase.org', help='URL de l\'API OEDB')
parser.add_argument('--dry-run', action='store_true', help='Mode test sans envoi vers l\'API')
parser.add_argument('--verbose', action='store_true', help='Mode verbeux')
args = parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
2025-09-26 17:38:30 +02:00
scraper = AgendaGeekScraper(api_url=args.api_url, dry_run=args.dry_run, page=args.page)
2025-09-26 17:16:29 +02:00
scraper.process_events(limit=args.limit, offset=args.offset)
if __name__ == "__main__":
main()