#!/usr/bin/env python3 """ Script de scraping pour l'agenda de la CCPL (Communauté de Communes du Pays de Limours) https://www.cc-paysdelimours.fr/agenda Utilise le scraping HTML pour récupérer les événements et les envoyer à l'API OEDB """ import requests import json import os import sys import argparse import re import time from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime, timedelta from typing import Dict, List, Optional, Tuple import logging from bs4 import BeautifulSoup import hashlib # Configuration par défaut api_oedb = "https://api.openeventdatabase.org" # Configuration du logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('ccpl_agenda_scraper.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) class CCPLAgendaScraper: def __init__(self, api_base_url: str = api_oedb, batch_size: int = 1, max_events: int = None, dry_run: bool = True, parallel: bool = False, max_workers: int = 4): self.api_base_url = api_base_url self.batch_size = batch_size self.max_events = max_events self.dry_run = dry_run self.parallel = parallel self.max_workers = max_workers self.data_file = "ccpl_agenda_events.json" self.cache_file = "ccpl_agenda_cache.json" self.agenda_url = "https://www.cc-paysdelimours.fr/agenda" self.cache_duration_hours = 1 # Durée de cache en heures # Charger les données existantes self.events_data = self.load_events_data() self.cache_data = self.load_cache_data() def load_events_data(self) -> Dict: """Charge les données des événements depuis le fichier JSON""" if os.path.exists(self.data_file): try: with open(self.data_file, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: logger.warning(f"Erreur lors du chargement des données: {e}") return { "events": {}, "last_update": None } def save_events_data(self): """Sauvegarde les données des événements dans le fichier JSON""" try: with open(self.data_file, 'w', encoding='utf-8') as f: json.dump(self.events_data, f, ensure_ascii=False, indent=2) except Exception as e: logger.error(f"Erreur lors de la sauvegarde des données: {e}") def load_cache_data(self) -> Dict: """Charge les données du cache depuis le fichier JSON""" if os.path.exists(self.cache_file): try: with open(self.cache_file, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: logger.warning(f"Erreur lors du chargement du cache: {e}") return { "processed_events": {}, "last_fetch": None, "content_hash": None } def save_cache_data(self): """Sauvegarde les données du cache dans le fichier JSON""" try: with open(self.cache_file, 'w', encoding='utf-8') as f: json.dump(self.cache_data, f, ensure_ascii=False, indent=2) except Exception as e: logger.error(f"Erreur lors de la sauvegarde du cache: {e}") def get_content_hash(self, content: str) -> str: """Génère un hash du contenu pour détecter les changements""" import hashlib return hashlib.md5(content.encode('utf-8')).hexdigest() def is_content_changed(self, new_hash: str) -> bool: """Vérifie si le contenu a changé depuis la dernière récupération""" cached_hash = self.cache_data.get("content_hash") return cached_hash != new_hash def fetch_agenda_data(self, force_refresh: bool = False) -> Optional[str]: """Récupère les données de l'agenda CCPL""" try: logger.info(f"🌐 Récupération de l'agenda CCPL: {self.agenda_url}") headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' } response = requests.get(self.agenda_url, headers=headers, timeout=30) response.raise_for_status() content = response.text content_hash = self.get_content_hash(content) # Vérifier si le contenu a changé ou si on force le rafraîchissement if self.is_content_changed(content_hash) or force_refresh: if force_refresh: logger.info("🔄 Rafraîchissement forcé, mise à jour du cache") else: logger.info("🔄 Nouveau contenu détecté, mise à jour du cache") self.cache_data["content_hash"] = content_hash self.cache_data["last_fetch"] = datetime.now().isoformat() self.save_cache_data() return content else: logger.info("ℹ️ Contenu identique au précédent, utilisation du cache") return None except requests.RequestException as e: logger.error(f"❌ Erreur lors de la récupération de l'agenda: {e}") return None except Exception as e: logger.error(f"❌ Erreur inattendue: {e}") return None def parse_agenda_html(self, html_content: str) -> List[Dict]: """Parse le HTML de l'agenda pour extraire les événements""" try: soup = BeautifulSoup(html_content, 'html.parser') events = [] # D'après l'analyse HTML, les événements sont dans des liens avec des classes spécifiques # Chercher les liens d'événements event_links = soup.find_all('a', class_=re.compile(r'col-lg-3|col-sm-6|mb-3')) logger.info(f"🔗 {len(event_links)} liens d'événements trouvés") for i, link in enumerate(event_links): if self.max_events and len(events) >= self.max_events: break try: event_data = self.extract_event_data_from_link(link, i) if event_data: events.append(event_data) except Exception as e: logger.warning(f"Erreur lors du parsing de l'événement {i}: {e}") continue # Si pas d'événements trouvés avec les liens, essayer une approche alternative if not events: logger.info("🔍 Tentative d'extraction alternative...") # Chercher par pattern de date dans les spans date_spans = soup.find_all('span', class_='small') for i, span in enumerate(date_spans): if self.max_events and len(events) >= self.max_events: break # Trouver l'élément parent qui contient l'événement parent = span.parent while parent and parent.name != 'a': parent = parent.parent if parent and parent.name == 'a': try: event_data = self.extract_event_data_from_link(parent, i) if event_data: events.append(event_data) except Exception as e: logger.warning(f"Erreur lors du parsing alternatif de l'événement {i}: {e}") continue logger.info(f"📅 {len(events)} événements extraits au total") return events except Exception as e: logger.error(f"❌ Erreur lors du parsing HTML: {e}") return [] def extract_event_data_from_link(self, link_element, index: int) -> Optional[Dict]: """Extrait les données d'un événement depuis un lien d'événement""" try: # Extraire l'URL url = link_element.get('href', '') if url.startswith('/'): url = f"https://www.cc-paysdelimours.fr{url}" # Extraire le titre title_elem = link_element.find('p', class_='agenda-title') title = title_elem.get_text(strip=True) if title_elem else f"Événement {index + 1}" # Extraire la date date_text = "" date_wrapper = link_element.find('div', class_='date-wrapper') if date_wrapper: # Extraire le jour day_elem = date_wrapper.find('span', class_='number') day = day_elem.get_text(strip=True) if day_elem else "" # Extraire le mois month_elem = date_wrapper.find('span', class_='small') month = month_elem.get_text(strip=True) if month_elem else "" if day and month: date_text = f"{day} {month}" # Extraire l'image si disponible image_elem = link_element.find('img') image_url = "" if image_elem: src = image_elem.get('src', '') if src.startswith('/'): image_url = f"https://www.cc-paysdelimours.fr{src}" elif src.startswith('http'): image_url = src # Extraire le lieu (par défaut) location = "Pays de Limours, France" # Récupérer les détails supplémentaires depuis la page de l'événement details = {} if url: details = self.fetch_event_details(url) # Utiliser les coordonnées de la carte si disponibles coordinates = self.get_coordinates_for_location(location) if details.get("coordinates"): coordinates = details["coordinates"] logger.info(f"📍 Coordonnées précises utilisées: {coordinates}") # Utiliser l'adresse détaillée si disponible if details.get("address"): location = details["address"] logger.info(f"📍 Adresse détaillée: {location}") # Générer un ID unique event_id = self.generate_event_id(title, date_text, location) # Construire les propriétés de contact (seulement si non vides) contact_properties = {} if details.get("contact_phone") and details["contact_phone"].strip(): contact_properties["contact:phone"] = details["contact_phone"] if details.get("contact_email") and details["contact_email"].strip(): contact_properties["contact:email"] = details["contact_email"] if details.get("website") and details["website"].strip(): contact_properties["contact:website"] = details["website"] # Construire la description enrichie description = f"Événement organisé par la CCPL - {title}" if details.get("description"): description = details["description"] # Ajouter les informations d'ouverture et de tarifs additional_info = [] if details.get("opening_hours"): additional_info.append(f"Ouverture: {details['opening_hours']}") if details.get("pricing"): additional_info.append(f"Tarifs: {details['pricing']}") if additional_info: description += "\n\n" + "\n".join(additional_info) # Créer l'événement au format OEDB properties = { "label": title, "description": description, "type": "scheduled", "what": "culture.community.ccpl", "where": location, "start": self.parse_date(date_text), "stop": self.parse_date(date_text, end=True), "source:name": "CCPL Agenda", "source:url": self.agenda_url, "last_modified_by": "ccpl_agenda_scraper", "tags": ["ccpl", "pays-de-limours", "événement-communal"] } # Ajouter les propriétés optionnelles seulement si elles ne sont pas nulles if url and url.strip(): properties["url"] = url if image_url and image_url.strip(): properties["image"] = image_url # Ajouter les propriétés de contact properties.update(contact_properties) oedb_event = { "properties": properties, "geometry": { "type": "Point", "coordinates": coordinates } } return { "id": event_id, "event": oedb_event, "raw_html": { "title": title, "date": date_text, "location": location, "url": url, "image": image_url } } except Exception as e: logger.error(f"Erreur lors de l'extraction de l'événement depuis le lien: {e}") return None def fetch_event_details(self, event_url: str) -> Dict: """Récupère les détails supplémentaires depuis la page de l'événement""" try: logger.info(f"🔍 Récupération des détails: {event_url}") headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' } response = requests.get(event_url, headers=headers, timeout=30) response.raise_for_status() soup = BeautifulSoup(response.text, 'html.parser') details = { "description": "", "contact_phone": "", "contact_email": "", "website": "", "coordinates": None, "address": "", "opening_hours": "", "pricing": "" } # Extraire la description principale description_elem = soup.find('div', class_=re.compile(r'content|description|text', re.I)) if description_elem: # Nettoyer le texte de la description description_text = description_elem.get_text(strip=True) # Enlever les "Offres liées" et autres sections non pertinentes lines = description_text.split('\n') cleaned_lines = [] skip_section = False for line in lines: line = line.strip() if not line: continue if 'Offres liées' in line or 'TOUT L\'AGENDA' in line: skip_section = True break if 'Partager sur' in line: break cleaned_lines.append(line) details["description"] = ' '.join(cleaned_lines) # Extraire les informations de contact depuis toute la page page_text = soup.get_text() # Téléphone (format français) phone_match = re.search(r'(\d{2}\s?\d{2}\s?\d{2}\s?\d{2}\s?\d{2})', page_text) if phone_match: details["contact_phone"] = phone_match.group(1).replace(' ', '') # Email email_match = re.search(r'([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})', page_text) if email_match: email = email_match.group(1).strip() # Nettoyer l'email (enlever les caractères parasites à la fin, notamment le T majuscule) email = re.sub(r'[^a-zA-Z0-9._%+-@]+$', '', email) # Enlever spécifiquement le T majuscule à la fin if email.endswith('T'): email = email[:-1] details["contact_email"] = email # Site web (éviter les liens de partage social) website_links = soup.find_all('a', href=True) for link in website_links: href = link['href'] if (href.startswith('http') and 'facebook.com' not in href and 'twitter.com' not in href and 'linkedin.com' not in href and 'viadeo.com' not in href and 'x.com' not in href and 'instagram.com' not in href and 'tiktok.com' not in href and 'youtube.com' not in href and 'vimeo.com' not in href and 'soundcloud.com' not in href and 'spotify.com' not in href and 'deezer.com' not in href and 'apple.com' not in href and 'google.com' not in href and 'microsoft.com' not in href and 'amazon.com' not in href and 'sharer' not in href): details["website"] = href break # Extraire l'adresse address_elem = soup.find(text=re.compile(r'Place|Rue|Avenue|Boulevard', re.I)) if address_elem: # Trouver l'élément parent qui contient l'adresse complète parent = address_elem.parent while parent and len(parent.get_text(strip=True)) < 20: parent = parent.parent if parent: details["address"] = parent.get_text(strip=True) # Extraire les coordonnées depuis la carte Leaflet # Chercher les scripts qui contiennent les coordonnées de la carte scripts = soup.find_all('script') for script in scripts: if script.string: # Chercher les coordonnées dans les scripts Leaflet avec différents patterns patterns = [ r'lat["\']?\s*:\s*([0-9.-]+).*?lng["\']?\s*:\s*([0-9.-]+)', r'latitude["\']?\s*:\s*([0-9.-]+).*?longitude["\']?\s*:\s*([0-9.-]+)', r'center["\']?\s*:\s*\[([0-9.-]+),\s*([0-9.-]+)\]', r'lat["\']?\s*:\s*([0-9.-]+).*?lon["\']?\s*:\s*([0-9.-]+)', r'([0-9]{1,2}\.[0-9]+),\s*([0-9]{1,2}\.[0-9]+)' ] for pattern in patterns: coord_match = re.search(pattern, script.string) if coord_match: try: lat = float(coord_match.group(1)) lng = float(coord_match.group(2)) # Vérifier que les coordonnées sont dans une plage valide pour la France if 41 <= lat <= 52 and -6 <= lng <= 10: details["coordinates"] = [lng, lat] # Format GeoJSON [longitude, latitude] logger.info(f"📍 Coordonnées trouvées: {lat}, {lng}") break except ValueError: continue if details["coordinates"]: break # Extraire les horaires d'ouverture opening_elem = soup.find(text=re.compile(r'Du.*au.*tous les jours|Ouverture|Horaires', re.I)) if opening_elem: parent = opening_elem.parent if parent: details["opening_hours"] = parent.get_text(strip=True) # Extraire les tarifs pricing_elem = soup.find(text=re.compile(r'Gratuit|Tarifs|Prix', re.I)) if pricing_elem: parent = pricing_elem.parent if parent: details["pricing"] = parent.get_text(strip=True) logger.info(f"📋 Détails extraits: {len(details['description'])} caractères, tel: {details['contact_phone']}, email: {details['contact_email']}") return details except Exception as e: logger.warning(f"Erreur lors de la récupération des détails de {event_url}: {e}") return { "description": "", "contact_phone": "", "contact_email": "", "website": "", "coordinates": None, "address": "", "opening_hours": "", "pricing": "" } def extract_event_data(self, element, index: int) -> Optional[Dict]: """Extrait les données d'un événement depuis un élément HTML""" try: # Obtenir tout le texte de l'élément full_text = element.get_text(strip=True) # Extraire la date date_text = "" date_match = re.search(r'\b(\d{1,2})\s+(jan|fév|mar|avr|mai|jun|jul|aoû|sep|oct|nov|déc)\b', full_text, re.I) if date_match: date_text = f"{date_match.group(1)} {date_match.group(2)}" # Extraire le titre (première ligne significative après la date) lines = [line.strip() for line in full_text.split('\n') if line.strip()] title = f"Événement {index + 1}" # Chercher le titre dans les lignes for line in lines: if line and not re.match(r'^\d{1,2}\s+(jan|fév|mar|avr|mai|jun|jul|aoû|sep|oct|nov|déc)', line, re.I): title = line[:100] # Limiter la longueur break # Extraire le lieu location = "Pays de Limours, France" # Lieu par défaut communes = ['Angervilliers', 'Fontenay-lès-Briis', 'Forges-les-Bains', 'Gometz-la-Ville', 'Les Molières', 'Limours', 'Saint-Maurice-Montcouronne', 'Vaugrigneuse'] for commune in communes: if commune.lower() in full_text.lower(): location = f"{commune}, Pays de Limours, France" break # Extraire la description (texte complet sans la date) description = full_text if date_text: description = description.replace(date_text, '').strip() # Nettoyer la description description = re.sub(r'\s+', ' ', description).strip() if len(description) > 200: description = description[:200] + "..." # Extraire l'URL si disponible url = "" link_elem = element.find('a', href=True) if link_elem: href = link_elem['href'] if href.startswith('/'): url = f"https://www.cc-paysdelimours.fr{href}" elif href.startswith('http'): url = href # Générer un ID unique event_id = self.generate_event_id(title, date_text, location) # Créer l'événement au format OEDB oedb_event = { "properties": { "label": title, "description": description, "type": "scheduled", "what": "culture.community", # Type pour événements communautaires "where": location, "start": self.parse_date(date_text), "stop": self.parse_date(date_text, end=True), "url": url if url else None, "source:name": "CCPL Agenda", "source:url": self.agenda_url, "last_modified_by": "ccpl_agenda_scraper", "tags": ["ccpl", "pays-de-limours", "événement-communal"] }, "geometry": { "type": "Point", "coordinates": self.get_coordinates_for_location(location) } } return { "id": event_id, "event": oedb_event, "raw_html": { "title": title, "date": date_text, "location": location, "description": description, "url": url } } except Exception as e: logger.error(f"Erreur lors de l'extraction de l'événement: {e}") return None def parse_date(self, date_text: str, end: bool = False) -> str: """Parse une date française et la convertit en format ISO""" try: if not date_text: # Date par défaut si pas de date trouvée now = datetime.now() if end: return (now + timedelta(hours=2)).isoformat() return now.isoformat() # Mapping des mois français months = { 'jan': '01', 'fév': '02', 'mar': '03', 'avr': '04', 'mai': '05', 'jun': '06', 'jul': '07', 'aoû': '08', 'sep': '09', 'oct': '10', 'nov': '11', 'déc': '12' } # Extraire jour et mois match = re.search(r'(\d{1,2})\s+(\w{3})', date_text.lower()) if match: day = match.group(1).zfill(2) month_abbr = match.group(2) month = months.get(month_abbr, '01') # Utiliser l'année courante year = datetime.now().year # Créer la date date_obj = datetime.strptime(f"{year}-{month}-{day}", "%Y-%m-%d") if end: # Date de fin: ajouter 2 heures date_obj += timedelta(hours=2) return date_obj.isoformat() # Fallback: date actuelle now = datetime.now() if end: return (now + timedelta(hours=2)).isoformat() return now.isoformat() except Exception as e: logger.warning(f"Erreur lors du parsing de la date '{date_text}': {e}") now = datetime.now() if end: return (now + timedelta(hours=2)).isoformat() return now.isoformat() def get_coordinates_for_location(self, location: str) -> List[float]: """Obtient les coordonnées pour un lieu du Pays de Limours""" # Coordonnées approximatives pour les communes du Pays de Limours coordinates = { "Angervilliers": [2.0644, 48.5917], "Fontenay-lès-Briis": [2.0644, 48.5917], "Forges-les-Bains": [2.0644, 48.5917], "Gometz-la-Ville": [2.0644, 48.5917], "Les Molières": [2.0644, 48.5917], "Limours": [2.0644, 48.5917], "Saint-Maurice-Montcouronne": [2.0644, 48.5917], "Vaugrigneuse": [2.0644, 48.5917] } for commune, coords in coordinates.items(): if commune.lower() in location.lower(): return coords # Coordonnées par défaut pour Limours (centre du Pays de Limours) return [2.0644, 48.5917] def generate_event_id(self, title: str, date: str, location: str) -> str: """Génère un ID unique pour l'événement""" import hashlib content = f"{title}_{date}_{location}" return hashlib.md5(content.encode('utf-8')).hexdigest() def log_event_details(self, event_data: Dict): """Affiche les détails de l'événement dans les logs""" props = event_data["event"]["properties"] geom = event_data["event"]["geometry"] logger.info("📝 Détails de l'événement à insérer:") logger.info(json.dumps(event_data, ensure_ascii=False, indent=2)) # logger.info(f" ID: {event_data['id']}") # logger.info(f" Titre: {props.get('label', 'N/A')}") # logger.info(f" Description: {props.get('description', 'N/A')[:100]}{'...' if len(props.get('description', '')) > 100 else ''}") # logger.info(f" Type: {props.get('type', 'N/A')}") # logger.info(f" Catégorie: {props.get('what', 'N/A')}") # logger.info(f" Lieu: {props.get('where', 'N/A')}") # logger.info(f" Début: {props.get('start', 'N/A')}") # logger.info(f" Fin: {props.get('stop', 'N/A')}") # logger.info(f" URL: {props.get('url', 'N/A')}") # logger.info(f" Source: {props.get('source:name', 'N/A')}") # logger.info(f" Coordonnées: {geom.get('coordinates', 'N/A')}") # logger.info(f" Tags: {', '.join(props.get('tags', [])) if props.get('tags') else 'N/A'}") # logger.info(f" Modifié par: {props.get('last_modified_by', 'N/A')}") # Afficher les nouvelles propriétés de contact (seulement si présentes) if props.get('contact:phone'): logger.info(f" 📞 Téléphone: {props.get('contact:phone')}") if props.get('contact:email'): logger.info(f" 📧 Email: {props.get('contact:email')}") if props.get('contact:website'): logger.info(f" 🌐 Site web: {props.get('contact:website')}") if props.get('image'): logger.info(f" 🖼️ Image: {props.get('image')}") if props.get('url'): logger.info(f" 🔗 URL: {props.get('url')}") def send_event_to_api(self, event_data: Dict) -> Tuple[bool, str]: """Envoie un événement à l'API OEDB (ou simule en mode dry-run)""" # Log détaillé de l'événement self.log_event_details(event_data) if self.dry_run: logger.info(f"[DRY-RUN] Simulation d'envoi de l'événement: {event_data['event']['properties']['label']}") return True, "Simulé (dry-run)" try: url = f"{self.api_base_url}/event" headers = {"Content-Type": "application/json"} # Formater l'événement au format GeoJSON attendu par l'API geojson_event = { "type": "Feature", "geometry": event_data["event"]["geometry"], "properties": event_data["event"]["properties"] } logger.info(f"🌐 Envoi à l'API: {url}") response = requests.post(url, json=geojson_event, headers=headers, timeout=30) if response.status_code == 201: logger.info("✅ Événement créé avec succès dans l'API") return True, "Créé avec succès" elif response.status_code == 409: logger.warning("⚠️ Événement déjà existant dans l'API") return False, "Événement déjà existant" else: logger.error(f"❌ Erreur API: {response.status_code} - {response.text}") return False, f"Erreur API: {response.status_code} - {response.text}" except requests.RequestException as e: logger.error(f"❌ Erreur de connexion: {e}") return False, f"Erreur de connexion: {e}" except Exception as e: logger.error(f"❌ Erreur inattendue: {e}") return False, f"Erreur inattendue: {e}" def process_single_event(self, event_data: Dict) -> Tuple[str, bool, str]: """Traite un événement individuellement (thread-safe)""" event_id = event_data["id"] event_label = event_data["event"]["properties"]["label"] try: # Vérifier si l'événement a déjà été traité avec succès if event_id in self.events_data["events"]: event_status = self.events_data["events"][event_id].get("status", "unknown") if event_status in ["saved", "already_exists"]: logger.info(f"ℹ️ Événement déjà traité: {event_label}") return event_id, True, "Déjà traité" # Envoyer à l'API success, message = self.send_event_to_api(event_data) return event_id, success, message except Exception as e: logger.error(f"❌ Erreur lors du traitement de {event_label}: {e}") return event_id, False, f"Erreur: {e}" def process_events(self, events: List[Dict]) -> Dict: """Traite tous les événements""" stats = { "total_events": len(events), "new_events": 0, "already_saved": 0, "api_errors": 0, "parse_errors": 0, "sent_this_run": 0, "skipped_due_to_limit": 0 } if not events: logger.info("ℹ️ Aucun événement à traiter") return stats # Appliquer la limite d'événements if self.max_events: events = events[:self.max_events] if len(events) < stats["total_events"]: stats["skipped_due_to_limit"] = stats["total_events"] - len(events) # Traiter les événements if self.parallel and len(events) > 10: logger.info(f"🚀 Traitement parallèle de {len(events)} événements avec {self.max_workers} workers") if self.max_events: logger.info(f"Limite d'événements: {self.max_events}") if self.dry_run: logger.info("Mode DRY-RUN activé - aucun événement ne sera envoyé à l'API") # Traitement parallèle with ThreadPoolExecutor(max_workers=self.max_workers) as executor: # Soumettre tous les événements future_to_event = { executor.submit(self.process_single_event, event_data): event_data for event_data in events } # Traiter les résultats au fur et à mesure for future in as_completed(future_to_event): event_data = future_to_event[future] event_id, success, message = future.result() event_label = event_data["event"]["properties"]["label"] # Mettre à jour les statistiques et les données locales if success: if "déjà traité" in message.lower(): stats["already_saved"] += 1 else: stats["new_events"] += 1 stats["sent_this_run"] += 1 self.events_data["events"][event_id] = { "status": "saved" if "déjà traité" not in message.lower() else "already_exists", "message": message, "last_attempt": datetime.now().isoformat(), "event": event_data["event"] } # Ajouter au cache des événements traités self.cache_data["processed_events"][event_id] = { "processed_at": datetime.now().isoformat(), "status": "saved" if "déjà traité" not in message.lower() else "already_exists", "event_label": event_label } logger.info(f"✅ {event_label} - {message}") else: stats["api_errors"] += 1 self.events_data["events"][event_id] = { "status": "api_error", "message": message, "last_attempt": datetime.now().isoformat(), "event": event_data["event"] } logger.error(f"❌ {event_label} - {message}") # Sauvegarder les données après chaque événement self.save_events_data() self.save_cache_data() else: # Traitement séquentiel (mode original) logger.info(f"Traitement séquentiel de {len(events)} événements") if self.max_events: logger.info(f"Limite d'événements: {self.max_events}") if self.dry_run: logger.info("Mode DRY-RUN activé - aucun événement ne sera envoyé à l'API") for event_data in events: event_id, success, message = self.process_single_event(event_data) event_label = event_data["event"]["properties"]["label"] # Mettre à jour les statistiques et les données locales if success: if "déjà traité" in message.lower(): stats["already_saved"] += 1 else: stats["new_events"] += 1 stats["sent_this_run"] += 1 self.events_data["events"][event_id] = { "status": "saved" if "déjà traité" not in message.lower() else "already_exists", "message": message, "last_attempt": datetime.now().isoformat(), "event": event_data["event"] } # Ajouter au cache des événements traités self.cache_data["processed_events"][event_id] = { "processed_at": datetime.now().isoformat(), "status": "saved" if "déjà traité" not in message.lower() else "already_exists", "event_label": event_label } logger.info(f"✅ {event_label} - {message}") else: stats["api_errors"] += 1 self.events_data["events"][event_id] = { "status": "api_error", "message": message, "last_attempt": datetime.now().isoformat(), "event": event_data["event"] } logger.error(f"❌ {event_label} - {message}") # Sauvegarder les données après chaque événement self.save_events_data() self.save_cache_data() # Mettre à jour la date de dernière mise à jour self.events_data["last_update"] = datetime.now().isoformat() # Sauvegarder le cache self.save_cache_data() return stats def run(self, force_refresh: bool = False): """Exécute le scraping complet""" logger.info("🚀 Démarrage du scraping de l'agenda CCPL") logger.info(f"Configuration: batch_size={self.batch_size}, api_url={self.api_base_url}") logger.info(f"Mode dry-run: {'OUI' if self.dry_run else 'NON'}") if self.max_events: logger.info(f"Limite d'événements: {self.max_events}") logger.info("=" * 60) try: # Récupérer les données de l'agenda html_content = self.fetch_agenda_data(force_refresh) if html_content is None and not force_refresh: logger.info("ℹ️ Utilisation du cache (pas de nouveau contenu)") return # Parser les événements events = self.parse_agenda_html(html_content) if html_content else [] if not events: logger.warning("⚠️ Aucun événement trouvé dans l'agenda") return logger.info(f"Traitement de {len(events)} événements") # Traiter les événements stats = self.process_events(events) # Afficher les statistiques finales logger.info("📊 Statistiques finales:") for key, value in stats.items(): logger.info(f" {key}: {value}") logger.info("✅ Scraping terminé avec succès") except Exception as e: logger.error(f"❌ Erreur lors du scraping: {e}") raise def main(): parser = argparse.ArgumentParser(description="Scraper pour l'agenda CCPL") parser.add_argument("--api-url", default=api_oedb, help=f"URL de base de l'API OEDB (défaut: {api_oedb})") parser.add_argument("--batch-size", type=int, default=1, help="Nombre d'événements à envoyer par batch (défaut: 1)") parser.add_argument("--max-events", type=int, default=1, help="Limiter le nombre d'événements à traiter (défaut: 1)") parser.add_argument("--dry-run", action="store_true", default=True, help="Mode dry-run par défaut (simulation sans envoi à l'API)") parser.add_argument("--no-dry-run", action="store_true", help="Désactiver le mode dry-run (envoi réel à l'API)") parser.add_argument("--verbose", "-v", action="store_true", help="Mode verbeux") parser.add_argument("--force-refresh", "-f", action="store_true", help="Forcer le rechargement de l'agenda (ignorer le cache)") parser.add_argument("--cache-duration", type=int, default=1, help="Durée de validité du cache en heures (défaut: 1)") parser.add_argument("--parallel", action="store_true", help="Activer le traitement parallèle pour plus de 10 événements") parser.add_argument("--max-workers", type=int, default=4, help="Nombre maximum de workers pour le traitement parallèle (défaut: 4)") args = parser.parse_args() if args.verbose: logging.getLogger().setLevel(logging.DEBUG) # Déterminer le mode dry-run dry_run = args.dry_run and not args.no_dry_run # Créer et exécuter le scraper scraper = CCPLAgendaScraper( api_base_url=args.api_url, batch_size=args.batch_size, max_events=args.max_events, dry_run=dry_run, parallel=args.parallel, max_workers=args.max_workers ) # Modifier la durée de cache si spécifiée scraper.cache_duration_hours = args.cache_duration # Exécuter avec ou sans rechargement forcé scraper.run(force_refresh=args.force_refresh) if __name__ == "__main__": main()