415 lines
		
	
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			415 lines
		
	
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #!/usr/bin/env python3
 | |
| """
 | |
| Scraper pour l'agenda geek - Import des événements dans OEDB
 | |
| 
 | |
| Usage:
 | |
|     python3 agenda_geek.py --limit 10 --offset 0
 | |
| 
 | |
| Options:
 | |
|     --limit: Nombre d'événements à traiter (défaut: 5)
 | |
|     --offset: Nombre d'événements à ignorer (défaut: 0)
 | |
|     --api-url: URL de l'API OEDB (défaut: https://api.openeventdatabase.org)
 | |
|     --dry-run: Mode test sans envoi vers l'API
 | |
|     --verbose: Mode verbeux
 | |
| """
 | |
| 
 | |
| import requests
 | |
| import argparse
 | |
| import re
 | |
| import logging
 | |
| from bs4 import BeautifulSoup
 | |
| from icalendar import Calendar
 | |
| from datetime import datetime, timezone
 | |
| from urllib.parse import urljoin, urlparse
 | |
| from typing import Optional, Dict, List, Tuple
 | |
| import time
 | |
| import json
 | |
| 
 | |
| # Configuration du logging
 | |
| logging.basicConfig(
 | |
|     level=logging.INFO,
 | |
|     format='%(asctime)s - %(levelname)s - %(message)s',
 | |
|     handlers=[
 | |
|         logging.StreamHandler(),
 | |
|         logging.FileHandler('agenda_geek_scraper.log')
 | |
|     ]
 | |
| )
 | |
| logger = logging.getLogger(__name__)
 | |
| 
 | |
| class AgendaGeekScraper:
 | |
|     def __init__(self, api_url: str = "https://api.openeventdatabase.org", dry_run: bool = False, page: int = 1):
 | |
|         self.api_url = api_url.rstrip('/')
 | |
|         self.dry_run = dry_run
 | |
|         self.page = page
 | |
|         self.session = requests.Session()
 | |
|         self.session.headers.update({
 | |
|             'User-Agent': 'OEDB-AgendaGeek-Scraper/1.0 (+https://github.com/cquest/oedb)'
 | |
|         })
 | |
| 
 | |
|     def get_events_list(self) -> List[str]:
 | |
|         """Récupère la liste des liens d'événements depuis la page principale"""
 | |
|         url = f"https://lagendageek.com/tevents/page/{self.page}"
 | |
|         logger.info(f"🔍 Récupération de la liste des événements depuis {url}")
 | |
| 
 | |
|         try:
 | |
|             response = self.session.get(url, timeout=30)
 | |
|             response.raise_for_status()
 | |
| 
 | |
|             soup = BeautifulSoup(response.content, 'html.parser')
 | |
|             event_links = []
 | |
| 
 | |
|             # Rechercher les liens des titres d'événements
 | |
|             title_links = soup.find_all('a', class_='tribe-events-calendar-list__event-title-link')
 | |
| 
 | |
|             for link in title_links:
 | |
|                 href = link.get('href')
 | |
|                 if href:
 | |
|                     full_url = urljoin(url, href)
 | |
|                     event_links.append(full_url)
 | |
|                     logger.debug(f"📅 Événement trouvé: {link.get_text(strip=True)} - {full_url}")
 | |
| 
 | |
|             logger.info(f"✅ {len(event_links)} événements trouvés sur la page")
 | |
|             return event_links
 | |
| 
 | |
|         except requests.RequestException as e:
 | |
|             logger.error(f"❌ Erreur lors de la récupération de la liste: {e}")
 | |
|             return []
 | |
| 
 | |
|     def get_ical_link(self, event_url: str) -> Optional[str]:
 | |
|         """Extrait le lien iCal depuis une page d'événement"""
 | |
|         logger.debug(f"🔗 Recherche du lien iCal pour {event_url}")
 | |
| 
 | |
|         try:
 | |
|             response = self.session.get(event_url, timeout=30)
 | |
|             response.raise_for_status()
 | |
| 
 | |
|             # Le lien iCal est généralement construit en ajoutant ?ical=1 à l'URL
 | |
|             ical_url = f"{event_url.rstrip('/')}/?ical=1"
 | |
| 
 | |
|             # Vérifier que le lien iCal existe
 | |
|             ical_response = self.session.head(ical_url, timeout=10)
 | |
|             if ical_response.status_code == 200:
 | |
|                 logger.debug(f"✅ Lien iCal trouvé: {ical_url}")
 | |
|                 return ical_url
 | |
|             else:
 | |
|                 logger.warning(f"⚠️ Lien iCal non accessible: {ical_url} (status: {ical_response.status_code})")
 | |
|                 return None
 | |
| 
 | |
|         except requests.RequestException as e:
 | |
|             logger.error(f"❌ Erreur lors de la récupération du lien iCal: {e}")
 | |
|             return None
 | |
| 
 | |
|     def parse_ical(self, ical_url: str) -> Optional[Dict]:
 | |
|         """Parse un fichier iCal et extrait les données de l'événement"""
 | |
|         # Convertir webcal:// en https://
 | |
|         if ical_url.startswith('webcal://'):
 | |
|             ical_url = ical_url.replace('webcal://', 'https://')
 | |
| 
 | |
|         logger.debug(f"📖 Parse du fichier iCal: {ical_url}")
 | |
| 
 | |
|         try:
 | |
|             response = self.session.get(ical_url, timeout=30)
 | |
|             response.raise_for_status()
 | |
| 
 | |
|             # Parser le contenu iCal
 | |
|             cal = Calendar.from_ical(response.content)
 | |
| 
 | |
|             for component in cal.walk():
 | |
|                 if component.name == "VEVENT":
 | |
|                     event_data = {
 | |
|                         'summary': str(component.get('SUMMARY', '')),
 | |
|                         'description': str(component.get('DESCRIPTION', '')),
 | |
|                         'location': str(component.get('LOCATION', '')),
 | |
|                         'dtstart': component.get('DTSTART'),
 | |
|                         'dtend': component.get('DTEND'),
 | |
|                         'geo': component.get('GEO'),
 | |
|                         'url': str(component.get('URL', '')),
 | |
|                         'uid': str(component.get('UID', ''))
 | |
|                     }
 | |
| 
 | |
|                     logger.debug(f"📅 Événement parsé: {event_data['summary']}")
 | |
|                     return event_data
 | |
| 
 | |
|             logger.warning("⚠️ Aucun événement VEVENT trouvé dans le fichier iCal")
 | |
|             return None
 | |
| 
 | |
|         except Exception as e:
 | |
|             logger.error(f"❌ Erreur lors du parsing iCal: {e}")
 | |
|             return None
 | |
| 
 | |
|     def geocode_address(self, address: str) -> Optional[Tuple[float, float]]:
 | |
|         """Géocode une adresse en utilisant Nominatim"""
 | |
|         if not address or address.strip() == '':
 | |
|             return None
 | |
| 
 | |
|         logger.debug(f"🌍 Géocodage de l'adresse: {address}")
 | |
| 
 | |
|         try:
 | |
|             # Utiliser Nominatim pour le géocodage
 | |
|             geocode_url = "https://nominatim.openstreetmap.org/search"
 | |
|             params = {
 | |
|                 'q': address,
 | |
|                 'format': 'json',
 | |
|                 'limit': 1,
 | |
|                 'countrycodes': 'fr',  # Limiter à la France
 | |
|                 'addressdetails': 1
 | |
|             }
 | |
| 
 | |
|             response = self.session.get(geocode_url, params=params, timeout=10)
 | |
|             response.raise_for_status()
 | |
| 
 | |
|             results = response.json()
 | |
|             if results:
 | |
|                 result = results[0]
 | |
|                 lat = float(result['lat'])
 | |
|                 lon = float(result['lon'])
 | |
|                 logger.debug(f"✅ Géocodage réussi: {lat}, {lon}")
 | |
|                 return (lat, lon)
 | |
|             else:
 | |
|                 logger.warning(f"⚠️ Aucun résultat de géocodage pour: {address}")
 | |
|                 return None
 | |
| 
 | |
|         except Exception as e:
 | |
|             logger.error(f"❌ Erreur lors du géocodage: {e}")
 | |
|             return None
 | |
| 
 | |
|     def extract_coordinates(self, event_data: Dict) -> Optional[Tuple[float, float]]:
 | |
|         """Extrait les coordonnées depuis les données de l'événement"""
 | |
|         # D'abord essayer la propriété GEO
 | |
|         if event_data.get('geo'):
 | |
|             try:
 | |
|                 geo = event_data['geo']
 | |
|                 logger.debug(f"🔍 Type GEO trouvé: {type(geo)} - Valeur: {geo}")
 | |
| 
 | |
|                 # Cas 1: GEO avec paramètres latitude/longitude
 | |
|                 if hasattr(geo, 'params') and 'latitude' in geo.params and 'longitude' in geo.params:
 | |
|                     lat = float(geo.params['latitude'])
 | |
|                     lon = float(geo.params['longitude'])
 | |
|                     logger.debug(f"📍 Coordonnées GEO (params) trouvées: {lat}, {lon}")
 | |
|                     return (lat, lon)
 | |
| 
 | |
|                 # Cas 2: GEO avec méthode to_ical
 | |
|                 elif hasattr(geo, 'to_ical'):
 | |
|                     # Format GEO standard: "latitude;longitude"
 | |
|                     geo_bytes = geo.to_ical()
 | |
|                     # Gérer le cas où c'est déjà une string ou des bytes
 | |
|                     if isinstance(geo_bytes, bytes):
 | |
|                         geo_str = geo_bytes.decode('utf-8')
 | |
|                     else:
 | |
|                         geo_str = str(geo_bytes)
 | |
| 
 | |
|                     logger.debug(f"🔍 GEO string extrait: '{geo_str}'")
 | |
|                     parts = geo_str.split(';')
 | |
|                     if len(parts) == 2:
 | |
|                         lat = float(parts[0])
 | |
|                         lon = float(parts[1])
 | |
|                         logger.debug(f"📍 Coordonnées GEO parsées: {lat}, {lon}")
 | |
|                         return (lat, lon)
 | |
| 
 | |
|                 # Cas 3: GEO est directement une string
 | |
|                 elif isinstance(geo, str):
 | |
|                     logger.debug(f"🔍 GEO est une string directe: '{geo}'")
 | |
|                     parts = geo.split(';')
 | |
|                     if len(parts) == 2:
 | |
|                         lat = float(parts[0])
 | |
|                         lon = float(parts[1])
 | |
|                         logger.debug(f"📍 Coordonnées GEO (string) parsées: {lat}, {lon}")
 | |
|                         return (lat, lon)
 | |
| 
 | |
|                 # Cas 4: Autres formats possibles
 | |
|                 else:
 | |
|                     logger.debug(f"🔍 Format GEO non reconnu, tentative de conversion en string: {str(geo)}")
 | |
|                     geo_str = str(geo)
 | |
|                     if ';' in geo_str:
 | |
|                         parts = geo_str.split(';')
 | |
|                         if len(parts) == 2:
 | |
|                             lat = float(parts[0])
 | |
|                             lon = float(parts[1])
 | |
|                             logger.debug(f"📍 Coordonnées GEO (fallback) parsées: {lat}, {lon}")
 | |
|                             return (lat, lon)
 | |
| 
 | |
|             except (ValueError, AttributeError) as e:
 | |
|                 logger.warning(f"⚠️ Erreur parsing GEO: {e}")
 | |
| 
 | |
|         # Si pas de GEO, essayer de géocoder la location
 | |
|         location = event_data.get('location', '').strip()
 | |
|         if location:
 | |
|             return self.geocode_address(location)
 | |
| 
 | |
|         return None
 | |
| 
 | |
|     def format_for_oedb(self, event_data: Dict, coordinates: Tuple[float, float], source_url: str) -> Dict:
 | |
|         """Formate les données de l'événement pour l'API OEDB"""
 | |
|         lat, lon = coordinates
 | |
| 
 | |
|         # Convertir les dates
 | |
|         dtstart = event_data.get('dtstart')
 | |
|         dtend = event_data.get('dtend')
 | |
| 
 | |
|         start_iso = None
 | |
|         end_iso = None
 | |
| 
 | |
|         if dtstart:
 | |
|             if hasattr(dtstart, 'dt'):
 | |
|                 dt = dtstart.dt
 | |
|                 if not isinstance(dt, datetime):
 | |
|                     # Si c'est juste une date, créer un datetime
 | |
|                     dt = datetime.combine(dt, datetime.min.time())
 | |
|                 if dt.tzinfo is None:
 | |
|                     dt = dt.replace(tzinfo=timezone.utc)
 | |
|                 start_iso = dt.isoformat()
 | |
| 
 | |
|         if dtend:
 | |
|             if hasattr(dtend, 'dt'):
 | |
|                 dt = dtend.dt
 | |
|                 if not isinstance(dt, datetime):
 | |
|                     dt = datetime.combine(dt, datetime.min.time())
 | |
|                 if dt.tzinfo is None:
 | |
|                     dt = dt.replace(tzinfo=timezone.utc)
 | |
|                 end_iso = dt.isoformat()
 | |
| 
 | |
|         # Si pas de date de fin, définir à +2h de la date de début
 | |
|         if start_iso and not end_iso:
 | |
|             start_dt = datetime.fromisoformat(start_iso.replace('Z', '+00:00'))
 | |
|             end_dt = start_dt.replace(hour=start_dt.hour + 2)
 | |
|             end_iso = end_dt.isoformat()
 | |
| 
 | |
|         # Construire l'objet pour l'API OEDB
 | |
|         oedb_event = {
 | |
|             "type": "Feature",
 | |
|             "geometry": {
 | |
|                 "type": "Point",
 | |
|                 "coordinates": [lon, lat]
 | |
|             },
 | |
|             "properties": {
 | |
|                 "label": event_data.get('summary', 'Événement Agenda Geek'),
 | |
|                 "type": "scheduled",
 | |
|                 "what": "culture.geek",
 | |
|                 "start": start_iso,
 | |
|                 "stop": end_iso,
 | |
|                 "where": event_data.get('location', ''),
 | |
|                 "description": event_data.get('description', ''),
 | |
|                 "source:name": "L'Agenda Geek",
 | |
|                 "source:url": source_url,
 | |
|                 "source:uid": event_data.get('uid', ''),
 | |
|                 "url": event_data.get('url', source_url)
 | |
|             }
 | |
|         }
 | |
| 
 | |
|         return oedb_event
 | |
| 
 | |
|     def send_to_oedb(self, event: Dict) -> bool:
 | |
|         """Envoie un événement vers l'API OEDB"""
 | |
|         if self.dry_run:
 | |
|             logger.info(f"🏃♂️ DRY RUN - Événement qui serait envoyé:")
 | |
|             logger.info(json.dumps(event, indent=2, ensure_ascii=False))
 | |
|             return True
 | |
| 
 | |
|         try:
 | |
|             response = self.session.post(
 | |
|                 f"{self.api_url}/event",
 | |
|                 json=event,
 | |
|                 timeout=30
 | |
|             )
 | |
| 
 | |
|             if response.status_code == 201:
 | |
|                 result = response.json()
 | |
|                 event_id = result.get('id', 'unknown')
 | |
|                 logger.info(f"✅ Événement créé avec succès: ID {event_id}")
 | |
|                 return True
 | |
|             elif response.status_code == 409:
 | |
|                 logger.info("⚠️ Événement déjà existant (conflit)")
 | |
|                 return True  # Considérer comme un succès
 | |
|             else:
 | |
|                 logger.error(f"❌ Erreur API ({response.status_code}): {response.text}")
 | |
|                 return False
 | |
| 
 | |
|         except requests.RequestException as e:
 | |
|             logger.error(f"❌ Erreur lors de l'envoi vers l'API: {e}")
 | |
|             return False
 | |
| 
 | |
|     def process_events(self, limit: int = 5, offset: int = 0) -> None:
 | |
|         """Traite les événements avec pagination"""
 | |
|         logger.info(f"🚀 Début du traitement - Limite: {limit}, Offset: {offset}")
 | |
| 
 | |
|         # Récupérer la liste des événements
 | |
|         event_links = self.get_events_list()
 | |
| 
 | |
|         if not event_links:
 | |
|             logger.error("❌ Aucun événement trouvé")
 | |
|             return
 | |
| 
 | |
|         # Appliquer l'offset et la limite
 | |
|         total_events = len(event_links)
 | |
|         start_idx = min(offset, total_events)
 | |
|         end_idx = min(offset + limit, total_events)
 | |
| 
 | |
|         events_to_process = event_links[start_idx:end_idx]
 | |
| 
 | |
|         logger.info(f"📊 Traitement de {len(events_to_process)} événements ({start_idx+1} à {end_idx} sur {total_events})")
 | |
| 
 | |
|         success_count = 0
 | |
|         error_count = 0
 | |
| 
 | |
|         for i, event_url in enumerate(events_to_process, 1):
 | |
|             logger.info(f"🔄 [{i}/{len(events_to_process)}] Traitement de {event_url}")
 | |
| 
 | |
|             try:
 | |
|                 # Obtenir le lien iCal
 | |
|                 ical_url = self.get_ical_link(event_url)
 | |
|                 if not ical_url:
 | |
|                     logger.warning(f"⚠️ Pas de lien iCal trouvé pour {event_url}")
 | |
|                     error_count += 1
 | |
|                     continue
 | |
| 
 | |
|                 # Parser le fichier iCal
 | |
|                 event_data = self.parse_ical(ical_url)
 | |
|                 if not event_data:
 | |
|                     logger.warning(f"⚠️ Impossible de parser l'iCal pour {event_url}")
 | |
|                     error_count += 1
 | |
|                     continue
 | |
| 
 | |
|                 # Extraire les coordonnées
 | |
|                 coordinates = self.extract_coordinates(event_data)
 | |
|                 if not coordinates:
 | |
|                     logger.warning(f"⚠️ Pas de coordonnées trouvées pour {event_data.get('summary', 'événement sans titre')}")
 | |
|                     error_count += 1
 | |
|                     continue
 | |
| 
 | |
|                 # Formater pour OEDB
 | |
|                 oedb_event = self.format_for_oedb(event_data, coordinates, event_url)
 | |
| 
 | |
|                 # Envoyer vers l'API
 | |
|                 if self.send_to_oedb(oedb_event):
 | |
|                     success_count += 1
 | |
|                 else:
 | |
|                     error_count += 1
 | |
| 
 | |
|                 # Pause entre les requêtes pour éviter la surcharge
 | |
|                 time.sleep(1)
 | |
| 
 | |
|             except Exception as e:
 | |
|                 logger.error(f"❌ Erreur lors du traitement de {event_url}: {e}")
 | |
|                 error_count += 1
 | |
| 
 | |
|         logger.info(f"🏁 Traitement terminé - Succès: {success_count}, Erreurs: {error_count}")
 | |
| 
 | |
| def main():
 | |
|     parser = argparse.ArgumentParser(description='Scraper Agenda Geek vers OEDB')
 | |
|     parser.add_argument('--limit', type=int, default=20, help='Nombre d\'événements à traiter')
 | |
|     parser.add_argument('--page', type=int, default=1, help='Numéro de page du site')
 | |
|     parser.add_argument('--offset', type=int, default=0, help='Nombre d\'événements à ignorer')
 | |
|     parser.add_argument('--api-url', default='https://api.openeventdatabase.org', help='URL de l\'API OEDB')
 | |
|     parser.add_argument('--dry-run', action='store_true', help='Mode test sans envoi vers l\'API')
 | |
|     parser.add_argument('--verbose', action='store_true', help='Mode verbeux')
 | |
| 
 | |
|     args = parser.parse_args()
 | |
| 
 | |
|     if args.verbose:
 | |
|         logging.getLogger().setLevel(logging.DEBUG)
 | |
| 
 | |
|     scraper = AgendaGeekScraper(api_url=args.api_url, dry_run=args.dry_run, page=args.page)
 | |
|     scraper.process_events(limit=args.limit, offset=args.offset)
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     main()
 | 
