import argparse import json import logging import os import re import sys import time from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime from hashlib import md5 from typing import Dict, List, Optional, Tuple import requests from bs4 import BeautifulSoup LOGGER_NAME = "viparis_scraper" logger = logging.getLogger(LOGGER_NAME) class ViparisScraper: BASE_URL = "https://www.viparis.com" LIST_URL = "https://www.viparis.com/actualites-evenements/evenements" # Fichiers de cache CACHE_FILE = os.path.join(os.path.dirname(__file__), "viparis_cache.json") EVENTS_FILE = os.path.join(os.path.dirname(__file__), "viparis_events.json") # Map de coordonnées approximatives pour sites Viparis VENUE_COORDINATES = { "Paris Expo Porte de Versailles": (2.2871, 48.8323), "Paris Nord Villepinte": (2.5156, 48.9725), "Paris Le Bourget": (2.4419, 48.9493), "Palais des Congrès de Paris": (2.2852, 48.8784), "Palais des Congrès d’Issy": (2.2718, 48.8247), "CNIT Forest": (2.2389, 48.8920), "Paris Convention Centre": (2.2866, 48.8329), "Espace Champerret": (2.2938, 48.8859), "Les Salles du Carrousel": (2.3349, 48.8625), "Cité de l'Histoire": (2.2367, 48.8926), "Hôtel Salomon de Rothschild": (2.3009, 48.8765), "La Serre": (2.2871, 48.8323), } def __init__( self, max_events: Optional[int] = None, max_pages: int = 10, dry_run: bool = False, force_refresh: bool = False, cache_duration: int = 6 * 60 * 60, verbose: bool = False, parallel: bool = False, max_workers: int = 4, ) -> None: self.max_events = max_events self.max_pages = max_pages self.dry_run = dry_run self.force_refresh = force_refresh self.cache_duration = cache_duration self.verbose = verbose self.parallel = parallel self.max_workers = max_workers self.session = requests.Session() self.session.headers.update( { "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/120.0.0.0 Safari/537.36" ) } ) self.cache = self._load_json(self.CACHE_FILE, default={"pages": {}, "processed_events": {}}) self.events_store = self._load_json(self.EVENTS_FILE, default={}) self.stats = { "total_detected": 0, "processed": 0, "skipped_cached": 0, "sent": 0, "errors": 0, } # --------------- Utils --------------- @staticmethod def _load_json(path: str, default): if os.path.exists(path): try: with open(path, "r", encoding="utf-8") as f: return json.load(f) except Exception: return default return default @staticmethod def _save_json(path: str, data) -> None: tmp = path + ".tmp" with open(tmp, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) os.replace(tmp, path) @staticmethod def _now_iso() -> str: return datetime.now().isoformat() @staticmethod def _hash_text(text: str) -> str: return md5(text.encode("utf-8")).hexdigest() # --------------- Fetch & parse --------------- def _should_use_cached_page(self, page: int, html: str) -> bool: page_key = str(page) page_hash = self._hash_text(html) cached = self.cache.get("pages", {}).get(page_key) if self.force_refresh: return False if not cached: return False if cached.get("hash") != page_hash: return False last_fetch = cached.get("last_fetch") if not last_fetch: return False try: last_dt = datetime.fromisoformat(last_fetch) return (datetime.now() - last_dt).total_seconds() < self.cache_duration except Exception: return False def _store_page_cache(self, page: int, html: str) -> None: page_key = str(page) self.cache.setdefault("pages", {})[page_key] = { "hash": self._hash_text(html), "last_fetch": self._now_iso(), } self._save_json(self.CACHE_FILE, self.cache) def fetch_list_page(self, page: int = 1) -> Optional[str]: url = self.LIST_URL params = {} if page > 1: params["page"] = page try: resp = self.session.get(url, params=params, timeout=30) resp.raise_for_status() html = resp.text # Store cache for page self._store_page_cache(page, html) return html except Exception as e: logger.error(f"Erreur de récupération page {page}: {e}") return None def fetch_url(self, url: str) -> Optional[str]: try: resp = self.session.get(url, timeout=30) resp.raise_for_status() return resp.text except Exception as e: logger.error(f"Erreur de récupération URL {url}: {e}") return None def parse_list(self, html: str) -> List[Dict]: soup = BeautifulSoup(html, "html.parser") cards = [] # Les cartes d'événements sont généralement des éléments avec un h3 et un lien # On cible des blocs listant titre, dates et site for card in soup.find_all(["article", "div"], class_=re.compile(r"card|event|listing|col|tile|result|grid", re.I)): title_el = card.find(["h3", "h2"]) if not title_el: continue title = title_el.get_text(strip=True) # Filtrer les faux titres d'UI if title.lower() in {"filtres", "trier par sites", "filtres 0", "trier par sites 0"}: continue link_el = card.find("a", href=True) url = None if link_el: href = link_el.get("href", "").strip() if href and not href.startswith("http"): url = self.BASE_URL + href else: url = href # Dates au format "du 11/10/2025 au 12/10/2025" ou "du 01/01/2025 au 31/12/2025" date_text = None date_container = card.find(string=re.compile(r"\d{2}/\d{2}/\d{4}")) if date_container: date_text = date_container.strip() # Lieu / site venue = None for v in self.VENUE_COORDINATES.keys(): if v.lower() in card.get_text(separator=" ", strip=True).lower(): venue = v break if not title or not url: continue cards.append( { "title": title, "url": url, "date_text": date_text, "venue": venue, } ) # Fallback si aucune carte trouvée par classes génériques if not cards: for a in soup.find_all("a", href=True): h = a.get_text(strip=True) if h and re.search(r"\d{2}/\d{2}/\d{4}", a.get_text(" ", strip=True)): href = a["href"] url = href if href.startswith("http") else self.BASE_URL + href cards.append({"title": h, "url": url, "date_text": h, "venue": None}) # Ne garder que les cartes ayant une date et une URL valides filtered = [] for c in cards: if not c.get("url"): continue # Titre requis t = (c.get("title") or "").strip() if not t or t.lower() in {"filtres", "trier par sites"}: continue # Date très fortement conseillée pour éviter le bruit if not c.get("date_text"): continue filtered.append(c) return filtered # --------------- Event processing --------------- @staticmethod def _parse_date_range(date_text: Optional[str]) -> Tuple[Optional[str], Optional[str]]: if not date_text: return None, None # Ex: "du 11/10/2025 au 12/10/2025" ou "01/01/2025 au 31/12/2025" ou dates uniques # On capture la première et la seconde date si présentes dates = re.findall(r"(\d{2}/\d{2}/\d{4})", date_text) if not dates: return None, None def to_iso(d: str) -> Optional[str]: try: dt = datetime.strptime(d, "%d/%m/%Y") return dt.strftime("%Y-%m-%dT00:00:00") except Exception: return None start_iso = to_iso(dates[0]) if len(dates) >= 1 else None stop_iso = to_iso(dates[1]) if len(dates) >= 2 else None return start_iso, stop_iso @staticmethod def _clean_text(text: Optional[str]) -> Optional[str]: if not text: return None t = re.sub(r"\s+", " ", text).strip() return t or None def _event_id(self, title: str, url: str) -> str: base = f"viparis::{title}::{url}" return md5(base.encode("utf-8")).hexdigest() def _build_oedb_event(self, card: Dict) -> Dict: title = self._clean_text(card.get("title")) url = card.get("url") date_text = self._clean_text(card.get("date_text")) venue = self._clean_text(card.get("venue")) start, stop = self._parse_date_range(date_text) properties: Dict[str, object] = { "label": title if title else "Événement Viparis", "type": "scheduled", "what": "culture.viparis", "source:name": "Viparis", "source:url": self.LIST_URL, "last_modified_by": "viparis_scraper", "tags": ["viparis", "paris", "events"], } # Facultatifs conditionnels if date_text: properties["short_description"] = date_text if url: properties["url"] = url if start: properties["start"] = start if stop: properties["stop"] = stop if venue: properties["where"] = venue # Géométrie depuis venue connue geometry = None if venue and venue in self.VENUE_COORDINATES: lon, lat = self.VENUE_COORDINATES[venue] geometry = {"type": "Point", "coordinates": [lon, lat]} event = {"properties": properties} if geometry: event["geometry"] = geometry oedb_event = { "id": self._event_id(title or "", url or ""), "event": event, "raw_html": { "title": title, "date_text": date_text, "venue": venue, "url": url, }, } return oedb_event # --------------- API simulation --------------- def _send_to_api(self, event_data: Dict) -> Tuple[bool, str]: if self.dry_run: logger.info(f"[DRY-RUN] Simulation d'envoi: {event_data['event']['properties'].get('label')}") return True, "simulated" # Ici, on enverrait vers l'API OEDB (non requis pour cette implémentation) return True, "skipped-no-api" # --------------- Processing loop --------------- def process_events(self, cards: List[Dict]) -> None: to_process = cards[: self.max_events] if self.max_events else cards self.stats["total_detected"] = len(cards) logger.info(f"Traitement de {len(to_process)} événements (sur {len(cards)})") def handle(card: Dict) -> Tuple[str, bool, str]: event_data = self._build_oedb_event(card) ev_id = event_data["id"] if ev_id in self.cache.get("processed_events", {}): self.stats["skipped_cached"] += 1 return ev_id, True, "cached" ok, status = self._send_to_api(event_data) if ok: # Log détaillé JSON logger.info("📝 Événement:") logger.info(json.dumps(event_data, ensure_ascii=False, indent=2)) # Marquer comme traité self.cache.setdefault("processed_events", {})[ev_id] = { "processed_at": self._now_iso(), "status": "saved", "event_label": event_data["event"]["properties"].get("label"), } self._save_json(self.CACHE_FILE, self.cache) # Stocker l'event brut self.events_store[ev_id] = event_data self._save_json(self.EVENTS_FILE, self.events_store) self.stats["sent"] += 1 else: self.stats["errors"] += 1 self.stats["processed"] += 1 return ev_id, ok, status if self.parallel and len(to_process) > 10: with ThreadPoolExecutor(max_workers=self.max_workers) as ex: futures = [ex.submit(handle, card) for card in to_process] for fut in as_completed(futures): try: fut.result() except Exception as e: logger.error(f"Erreur traitement parallèle: {e}") self.stats["errors"] += 1 else: for card in to_process: try: handle(card) except Exception as e: logger.error(f"Erreur traitement séquentiel: {e}") self.stats["errors"] += 1 def run(self) -> None: logger.info("🚀 Démarrage du scraping Viparis") logger.info(f"Dry-run: {'OUI' if self.dry_run else 'NON'} | Max events: {self.max_events or '∞'} | Parallel: {self.parallel} ({self.max_workers})") all_cards: List[Dict] = [] seen_urls = set() visited_pages = set() # Démarrer avec la page liste principale # Collecter tous les liens de pagination puis les visiter seed_html = self.fetch_url(self.LIST_URL) if seed_html: pages_to_visit = [] soup = BeautifulSoup(seed_html, "html.parser") for a in soup.find_all('a', href=True): href = a['href'] text_num = a.get_text(strip=True) # Candidats: liens contenant la route evenements et un chiffre (numéro de page) dans href ou texte if ('/actualites-evenements/evenements' in href) and (re.search(r"[0-9]", href) or text_num.isdigit()): if not href.startswith('http'): if href.startswith('/'): full = self.BASE_URL + href else: full = self.BASE_URL + '/' + href else: full = href pages_to_visit.append(full) # Dédupliquer et trier pour stabilité pages_to_visit = sorted(list({u for u in pages_to_visit})) else: pages_to_visit = [] # Toujours inclure la page seed en premier ordered_pages = [self.LIST_URL] + [u for u in pages_to_visit if u != self.LIST_URL] for idx, page_url in enumerate(ordered_pages, start=1): if idx > getattr(self, 'max_pages', 10): logger.info(f"Arrêt pagination: max-pages atteint ({self.max_pages})") break if page_url in visited_pages: continue visited_pages.add(page_url) html = self.fetch_url(page_url) if not html: continue cards = self.parse_list(html) new_cards = [] for c in cards: u = c.get("url") if not u or u in seen_urls: continue seen_urls.add(u) new_cards.append(c) all_cards.extend(new_cards) logger.info(f"Page {idx}: {len(new_cards)} cartes (cumul {len(all_cards)}) [{page_url}]") if self.max_events and len(all_cards) >= self.max_events: break time.sleep(0.4) if not all_cards: logger.warning("Aucun événement détecté sur Viparis.") return logger.info(f"Cartes détectées: {len(all_cards)}") self.process_events(all_cards) logger.info("📊 Statistiques:") for k, v in self.stats.items(): logger.info(f" - {k}: {v}") def setup_logging(verbose: bool) -> None: handler = logging.StreamHandler(sys.stdout) fmt = "%(asctime)s - %(levelname)s - %(message)s" handler.setFormatter(logging.Formatter(fmt)) logger.setLevel(logging.DEBUG if verbose else logging.INFO) if not logger.handlers: logger.addHandler(handler) def main() -> None: parser = argparse.ArgumentParser(description="Scraper Viparis -> OEDB (dry-run par défaut)") parser.add_argument("--max-events", type=int, default=5, help="Nombre max d'événements à traiter") parser.add_argument("--max-pages", type=int, default=10, help="Nombre max de pages à parcourir") parser.add_argument("--dry-run", dest="dry_run", action="store_true", default=True, help="Activer le dry-run (défaut)") parser.add_argument("--no-dry-run", dest="dry_run", action="store_false", help="Désactiver le dry-run (envoi réel)") parser.add_argument("--force-refresh", action="store_true", help="Forcer le rafraîchissement (ignorer cache de pages)") parser.add_argument("--cache-duration", type=int, default=6*60*60, help="Durée de validité du cache des pages (secondes)") parser.add_argument("--verbose", action="store_true", help="Logs verbeux") parser.add_argument("--parallel", action="store_true", help="Activer le traitement parallèle (>10)") parser.add_argument("--max-workers", type=int, default=4, help="Workers pour le traitement parallèle") args = parser.parse_args() setup_logging(args.verbose) scraper = ViparisScraper( max_events=args.max_events, max_pages=args.max_pages, dry_run=args.dry_run, force_refresh=args.force_refresh, cache_duration=args.cache_duration, verbose=args.verbose, parallel=args.parallel, max_workers=args.max_workers, ) scraper.run() # Désactivation de l'ancien point d'entrée pour éviter les conflits CLI # if __name__ == "__main__": # main() #!/usr/bin/env python3 """ Script de scraping pour les événements Viparis https://www.viparis.com/actualites-evenements/evenements Utilise le même système de cache et paramètres que les autres scrapers OEDB """ import requests import json import hashlib import logging import argparse import re from datetime import datetime, timedelta from typing import Dict, List, Optional, Tuple from bs4 import BeautifulSoup from concurrent.futures import ThreadPoolExecutor, as_completed import time # Configuration du logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('viparis_events_scraper.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) class ViparisEventsScraper: """Scraper pour les événements Viparis avec cache JSON et conversion OEDB""" def __init__(self, api_base_url: str = "https://api.openeventdatabase.org", batch_size: int = 1, dry_run: bool = True, max_events: Optional[int] = 1, max_pages: int = 5, cache_duration: int = 3600, parallel: bool = False, max_workers: int = 4, use_selenium: bool = False, venue_ids: Optional[List[int]] = None): """ Initialise le scraper Viparis Args: api_base_url: URL de base de l'API OEDB batch_size: Taille des lots pour l'envoi dry_run: Mode simulation (pas d'envoi réel) max_events: Nombre maximum d'événements à traiter cache_duration: Durée de validité du cache en secondes parallel: Activer le traitement parallèle max_workers: Nombre de workers pour le traitement parallèle """ self.api_base_url = api_base_url self.batch_size = batch_size self.dry_run = dry_run self.max_events = max_events self.cache_duration = cache_duration self.max_pages = max_pages self.parallel = parallel self.max_workers = max_workers self.use_selenium = use_selenium self.venue_ids = venue_ids or [] # URLs self.events_url = "https://www.viparis.com/actualites-evenements/evenements" self.cms_base_url = "https://cms.viparis.com/api/e-events" self.cms_online_base_url = "https://cms.viparis.com/api/events" # Fichiers de cache (chemins absolus depuis ce dossier) base_dir = os.path.dirname(__file__) self.cache_file = os.path.join(base_dir, "viparis_events_cache.json") self.events_file = os.path.join(base_dir, "viparis_events.json") # Charger le cache self.cache_data = self.load_cache_data() # Charger le store d'événements agrégés self.events_store: Dict[str, Dict] = self.load_events_store() # Statistiques self.stats = { "total_events": 0, "new_events": 0, "already_saved": 0, "api_errors": 0, "parse_errors": 0, "sent_this_run": 0, "skipped_due_to_limit": 0 } def _to_feature(self, event_payload: Dict) -> Dict: """Convertit notre structure interne en GeoJSON Feature attendu par OEDB. Accepte soit déjà un Feature, soit { id, event:{properties, geometry}, ... }. """ if isinstance(event_payload, dict) and event_payload.get("type") == "Feature": return event_payload ev = (event_payload or {}).get("event") or {} properties = (ev or {}).get("properties") or {} geometry = (ev or {}).get("geometry") feature: Dict = {"type": "Feature", "properties": properties} if geometry: feature["geometry"] = geometry # Propager un id lisible si disponible if isinstance(event_payload, dict) and event_payload.get("id"): feature["id"] = event_payload["id"] return feature def _prune_empty_values(self, obj: Dict) -> Dict: """Supprime récursivement les clés dont la valeur est vide: None, '', [], {}.""" if not isinstance(obj, dict): return obj cleaned: Dict = {} for k, v in obj.items(): if isinstance(v, dict): sub = self._prune_empty_values(v) if sub: cleaned[k] = sub elif isinstance(v, list): sub_list = [] for it in v: if isinstance(it, dict): pr = self._prune_empty_values(it) if pr: sub_list.append(pr) else: if it not in (None, ""): sub_list.append(it) if sub_list: cleaned[k] = sub_list else: if v not in (None, ""): cleaned[k] = v return cleaned def _clean_event_payload(self, event_payload: Dict) -> Dict: """Nettoie l'événement avant envoi: trim strings, retire champs vides.""" try: if not isinstance(event_payload, dict): return event_payload ev = event_payload.get("event") if isinstance(ev, dict): props = ev.get("properties") if isinstance(props, dict): # Trim de base pour les strings for key, val in list(props.items()): if isinstance(val, str): props[key] = val.strip() # Nettoyer tags vides tags = props.get("tags") if isinstance(tags, list): props["tags"] = [t for t in tags if isinstance(t, str) and t.strip()] # Prune des vides ev["properties"] = self._prune_empty_values(props) geom = ev.get("geometry") if isinstance(geom, dict): ev["geometry"] = self._prune_empty_values(geom) # Re-prune le bloc event event_payload["event"] = self._prune_empty_values(ev) # Retirer raw_data si vide raw = event_payload.get("raw_data") if isinstance(raw, dict): event_payload["raw_data"] = self._prune_empty_values(raw) return self._prune_empty_values(event_payload) except Exception: return event_payload def _compute_event_id_from_like(self, name: str, start_date: str, venue_name: str) -> str: """Calcule l'ID d'événement de la même façon que extract_event_data pour filtrage anticipé.""" safe_name = name or "" safe_start = start_date or "" safe_venue = venue_name or "" return hashlib.md5(f"{safe_name}_{safe_start}_{safe_venue}".encode()).hexdigest() def _html_to_text(self, html: Optional[str]) -> Optional[str]: """Convertit une chaîne HTML en texte brut nettoyé (espaces normalisés).""" if not html: return None try: soup = BeautifulSoup(html, 'html.parser') text = soup.get_text(" ", strip=True) text = re.sub(r"\s+", " ", text).strip() return text or None except Exception: return None def fetch_events_from_api(self, page_size: int = 100) -> List[Dict]: """Récupère la liste des événements via l'API CMS Viparis (prioritaire). Retourne une liste d'objets "event-like" alignés avec extract_event_data. """ try: today_iso = datetime.now().strftime("%Y-%m-%d") events_like: List[Dict] = [] current_page = 1 max_pages_cfg = self.max_pages if isinstance(self.max_pages, int) and self.max_pages > 0 else 1 page_count_from_api: Optional[int] = None while True: # Respecter la limite restante pour le pageSize effective_page_size = page_size if isinstance(self.max_events, int) and self.max_events > 0: remaining = self.max_events - len(events_like) if remaining <= 0: break effective_page_size = max(1, min(page_size, remaining)) params = { # Événements à venir ou en cours "filters[end_date][$gte]": today_iso, "locale": "fr", "populate[0]": "cover", "populate[1]": "venues", "populate[2]": "activity_area", "populate[3]": "event_type", "pagination[page]": current_page, "pagination[pageSize]": effective_page_size, "sort[0]": "start_date:asc", } logger.info(f"🔎 charger la page {current_page} de l'API CMS Viparis (e-events)") resp = requests.get(self.cms_base_url, params=params, timeout=30) resp.raise_for_status() payload = resp.json() or {} data = payload.get("data") or [] pagination_meta = ((payload.get("meta") or {}).get("pagination") or {}) if isinstance(pagination_meta, dict): try: page_count_from_api = int(pagination_meta.get("pageCount") or 1) except Exception: page_count_from_api = None if not isinstance(data, list) or not data: break kept_this_page = 0 skipped_cached = 0 for item in data: try: converted = self._convert_cms_item_to_event_like(item) if not converted: continue # Filtrer en amont les événements déjà en cache name = converted.get("name") or "" start_date = converted.get("start_date") or "" venue_name = (converted.get("venue") or {}).get("name") or "" ev_like_id = self._compute_event_id_from_like(name, start_date, venue_name) if self.is_event_processed(ev_like_id): self.stats["already_saved"] += 1 skipped_cached += 1 continue events_like.append(converted) kept_this_page += 1 if isinstance(self.max_events, int) and self.max_events > 0 and len(events_like) >= self.max_events: break except Exception: continue logger.info(f"API CMS (e-events) page {current_page}: bruts={len(data)} gardés={kept_this_page} ignorés_cache={skipped_cached}") # Deuxième endpoint: événements en ligne (api/events) kept_online = 0 skipped_online_cached = 0 try: online_params = dict(params) logger.info(f"🔎 charger la page {current_page} de l'API CMS Viparis (events en ligne)") resp_online = requests.get(self.cms_online_base_url, params=online_params, timeout=30) resp_online.raise_for_status() payload_online = resp_online.json() or {} data_online = payload_online.get("data") or [] except Exception as e: data_online = [] logger.info(f"API CMS (events en ligne) indisponible page {current_page}: {e}") for item in (data_online if isinstance(data_online, list) else []): try: converted = self._convert_cms_item_to_event_like(item) if not converted: continue # Marquer 'online=yes' pour ces événements (on ajoutera ce flag dans extract_event_data) converted["online"] = True name = converted.get("name") or "" start_date = converted.get("start_date") or "" venue_name = (converted.get("venue") or {}).get("name") or "" ev_like_id = self._compute_event_id_from_like(name, start_date, venue_name) if self.is_event_processed(ev_like_id): self.stats["already_saved"] += 1 skipped_online_cached += 1 continue events_like.append(converted) kept_online += 1 if isinstance(self.max_events, int) and self.max_events > 0 and len(events_like) >= self.max_events: break except Exception: continue logger.info(f"API CMS (events en ligne) page {current_page}: bruts={len(data_online)} gardés={kept_online} ignorés_cache={skipped_online_cached}") # Variante filtrée par identifiant de lieu (venues[id][$in]=...) if self.venue_ids: for vid in self.venue_ids: if isinstance(self.max_events, int) and self.max_events > 0 and len(events_like) >= self.max_events: break try: venue_params = dict(params) # Injecter le filtre Strapi: filters[venues][id][$in][0]= # On repart de online endpoint (events) venue_params.pop("filters[end_date][$gte]", None) venue_params["filters[end_date][$gte]"] = today_iso venue_params["filters[venues][id][$in][0]"] = int(vid) logger.info(f"🔎 charger la page {current_page} (events en ligne, venue={vid})") resp_v = requests.get(self.cms_online_base_url, params=venue_params, timeout=30) resp_v.raise_for_status() payload_v = resp_v.json() or {} data_v = payload_v.get("data") or [] except Exception as e: data_v = [] logger.info(f"API CMS (events en ligne, venue={vid}) indisponible page {current_page}: {e}") kept_v = 0 skipped_v = 0 for item in (data_v if isinstance(data_v, list) else []): try: converted = self._convert_cms_item_to_event_like(item) if not converted: continue converted["online"] = True name = converted.get("name") or "" start_date = converted.get("start_date") or "" venue_name = (converted.get("venue") or {}).get("name") or "" ev_like_id = self._compute_event_id_from_like(name, start_date, venue_name) if self.is_event_processed(ev_like_id): self.stats["already_saved"] += 1 skipped_v += 1 continue events_like.append(converted) kept_v += 1 if isinstance(self.max_events, int) and self.max_events > 0 and len(events_like) >= self.max_events: break except Exception: continue logger.info(f"API CMS (events en ligne, venue={vid}) page {current_page}: bruts={len(data_v)} gardés={kept_v} ignorés_cache={skipped_v}") if isinstance(self.max_events, int) and self.max_events > 0 and len(events_like) >= self.max_events: break # Avancer de page et évaluer borne d'arrêt current_page += 1 # borne via meta.pageCount si fournie, sinon via config if page_count_from_api is not None: if current_page > min(page_count_from_api, max_pages_cfg): break else: if current_page > max_pages_cfg: break return events_like except Exception as e: logger.warning(f"⚠️ API CMS indisponible ou vide: {e}") return [] def _convert_cms_item_to_event_like(self, item: Dict) -> Optional[Dict]: """Convertit un item CMS (Strapi) en dict attendu par extract_event_data. Structure Strapi attendue: { id, attributes: { name, description, start_date, end_date, uid/slug, cover{url}, venues{data:[{attributes:{name, coordinates{lat,lng}}}] } } } Certains champs peuvent varier (startDate vs start_date...), on gère quelques alias. """ try: attrs = (item or {}).get("attributes") or {} name = attrs.get("name") or attrs.get("title") or "Événement sans titre" # description peut être HTML dans Strapi raw_description = attrs.get("description") or "" description = self._html_to_text(raw_description) or raw_description # Gérer différents noms de clés potentiels start_date = attrs.get("start_date") or attrs.get("startDate") or attrs.get("start") end_date = attrs.get("end_date") or attrs.get("endDate") or attrs.get("end") slug = attrs.get("uid") or attrs.get("slug") or "" viparis_ref = attrs.get("uid") or attrs.get("wetix_id") or slug or None # Champs additionnels booking_url = attrs.get("booking_url") or attrs.get("bookingUrl") website_url = attrs.get("website_url") or attrs.get("websiteUrl") short_description = attrs.get("short_description") short_description = self._html_to_text(short_description) if short_description else None code_affaire = attrs.get("code_affaire") # Cover cover_attr = attrs.get("cover") cover_url = None if isinstance(cover_attr, dict): # Strapi peut mettre l'URL directement ou sous data->attributes->url if "url" in cover_attr: cover_url = cover_attr.get("url") elif isinstance(cover_attr.get("data"), dict): cover_url = ((cover_attr.get("data") or {}).get("attributes") or {}).get("url") # Venue (prendre le premier) venue_obj: Optional[Dict] = None venues = attrs.get("venues") if isinstance(venues, dict) and isinstance(venues.get("data"), list) and venues["data"]: v_attr = (venues["data"][0] or {}).get("attributes") or {} v_name = v_attr.get("name") or v_attr.get("title") coords = v_attr.get("coordinates") or {} lat = coords.get("lat") or coords.get("latitude") lng = coords.get("lng") or coords.get("longitude") venue_obj = {"name": v_name} if lat is not None and lng is not None: venue_obj["coordinates"] = {"lat": float(lat), "lng": float(lng)} # Tags depuis event_type (liste) et activity_area (unique) tags: List[str] = [] activity = attrs.get("activity_area") if isinstance(activity, dict) and isinstance(activity.get("data"), dict): act_name = ((activity.get("data") or {}).get("attributes") or {}).get("name") if act_name: tags.append(str(act_name)) ev_types = attrs.get("event_type") # peut être list data ou single if isinstance(ev_types, dict): data = ev_types.get("data") if isinstance(data, list): for it in data: tname = ((it or {}).get("attributes") or {}).get("name") if tname: tags.append(str(tname)) elif isinstance(data, dict): tname = ((data or {}).get("attributes") or {}).get("name") if tname: tags.append(str(tname)) event_like: Dict = { "name": name, "description": description, "start_date": start_date, "end_date": end_date, "slug": slug, "venue": venue_obj or {}, } if viparis_ref: event_like["viparis_ref"] = str(viparis_ref) if cover_url: event_like["cover"] = {"url": cover_url} # Ajouts bruts pour enrichissement dans extract_event_data if booking_url: event_like["booking_url"] = booking_url if website_url: event_like["website_url"] = website_url if short_description: event_like["short_description"] = short_description if code_affaire: event_like["code_affaire"] = code_affaire if tags: event_like["tags"] = tags return event_like except Exception: return None def load_cache_data(self) -> Dict: """Charge les données de cache depuis le fichier JSON""" try: with open(self.cache_file, 'r', encoding='utf-8') as f: return json.load(f) except FileNotFoundError: return { "processed_events": {}, "last_fetch": None, "content_hash": None } def save_cache_data(self): """Sauvegarde les données de cache dans le fichier JSON""" with open(self.cache_file, 'w', encoding='utf-8') as f: json.dump(self.cache_data, f, ensure_ascii=False, indent=2) def load_events_store(self) -> Dict[str, Dict]: """Charge le fichier d'événements agrégés, sinon dictionnaire vide.""" try: with open(self.events_file, 'r', encoding='utf-8') as f: data = json.load(f) return data if isinstance(data, dict) else {} except FileNotFoundError: return {} except Exception: return {} def save_events_store(self) -> None: """Sauvegarde atomiquement le store d'événements agrégés.""" tmp = self.events_file + ".tmp" with open(tmp, 'w', encoding='utf-8') as f: json.dump(self.events_store, f, ensure_ascii=False, indent=2) os.replace(tmp, self.events_file) def get_content_hash(self, content: str) -> str: """Génère un hash du contenu pour détecter les changements""" return hashlib.md5(content.encode('utf-8')).hexdigest() def is_content_changed(self, new_hash: str) -> bool: """Vérifie si le contenu a changé depuis la dernière récupération""" cached_hash = self.cache_data.get("content_hash") return cached_hash != new_hash def fetch_events_data(self, force_refresh: bool = False) -> Optional[str]: """Récupère les données des événements Viparis""" try: logger.info(f"🌐 Récupération des événements Viparis: {self.events_url}") headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' } response = requests.get(self.events_url, headers=headers, timeout=30) response.raise_for_status() content = response.text content_hash = self.get_content_hash(content) # Vérifier si le contenu a changé ou si on force le rafraîchissement if self.is_content_changed(content_hash) or force_refresh: if force_refresh: logger.info("🔄 Rafraîchissement forcé, mise à jour du cache") else: logger.info("🔄 Nouveau contenu détecté, mise à jour du cache") self.cache_data["content_hash"] = content_hash self.cache_data["last_fetch"] = datetime.now().isoformat() self.save_cache_data() return content else: logger.info("ℹ️ Contenu identique au précédent, utilisation du cache") return None except requests.RequestException as e: logger.error(f"❌ Erreur lors de la récupération des événements: {e}") return None except Exception as e: logger.error(f"❌ Erreur inattendue: {e}") return None def parse_events_html(self, html_content: str) -> List[Dict]: """Parse le HTML des événements pour extraire les données""" try: events: List[Dict] = [] # 1) Extraction robuste de window.__NUXT__ dans l'ensemble du HTML nuxt_json = self._extract_nuxt_json(html_content) raw_items: List[Dict] = [] if nuxt_json is not None: # 2) Recherche récursive d'objets événement raw_items = self._find_event_like_objects(nuxt_json) if not raw_items: logger.warning("⚠️ Aucune donnée d'événements (NUXT) trouvée dans le HTML") return [] for event_data in raw_items: try: event = self.extract_event_data(event_data) if event: events.append(event) except Exception as e: logger.error(f"❌ Erreur lors de l'extraction d'un événement: {e}") self.stats["parse_errors"] += 1 continue logger.info(f"📅 {len(events)} événements extraits au total (depuis NUXT)") return events except Exception as e: logger.error(f"❌ Erreur lors du parsing HTML: {e}") return [] def extract_event_data(self, event_data: Dict, *, skip_detail_if_cached: bool = True) -> Optional[Dict]: """Extrait les données d'un événement depuis la structure JSON""" try: # Extraire les informations de base title = event_data.get('name', 'Événement sans titre') description = event_data.get('description', '') start_date = event_data.get('start_date', '') end_date = event_data.get('end_date', '') venue = event_data.get('venue', {}) venue_name = venue.get('name', 'Lieu non spécifié') if venue else 'Lieu non spécifié' # Créer l'ID unique le plus tôt possible (utilisé pour éviter fetch détail) event_id = hashlib.md5(f"{title}_{start_date}_{venue_name}".encode()).hexdigest() # Extraire l'URL de l'événement (prioriser website_url/booking_url si fournis) slug = event_data.get('slug', '') event_url = f"https://www.viparis.com/actualites-evenements/evenements/{slug}" if slug else None website_url = event_data.get('website_url') booking_url = event_data.get('booking_url') preferred_url = website_url or booking_url or event_url # Extraire l'image image_url = "" if 'cover' in event_data and event_data['cover']: cover_data = event_data['cover'] if isinstance(cover_data, dict) and 'url' in cover_data: image_url = cover_data['url'] # Extraire les coordonnées du lieu coordinates = None if venue and 'coordinates' in venue: coords = venue['coordinates'] if 'lat' in coords and 'lng' in coords: coordinates = [float(coords['lng']), float(coords['lat'])] # Créer l'événement au format OEDB oedb_event = { "properties": { "label": title, "description": description, "type": "scheduled", "what": "culture.viparis", "where": venue_name, "start": self.parse_date(start_date), "stop": self.parse_date(end_date) if end_date else self.parse_date(start_date, end=True), "source:name": "Viparis Events", "source:url": self.events_url, "last_modified_by": "viparis_events_scraper", "tags": ["viparis", "paris", "événement-professionnel"] }, "geometry": { "type": "Point", "coordinates": coordinates or [2.3522, 48.8566] # Paris par défaut } } # Ajouter la référence Viparis viparis_ref = event_data.get('viparis_ref') or event_data.get('slug') if viparis_ref: oedb_event["properties"]["ref:FR:viparis"] = str(viparis_ref) # Flag online si origine 'api/events' if event_data.get('online') is True: oedb_event["properties"]["online"] = "yes" # Ajouter l'URL de l'événement si disponible (priorisée) if preferred_url: oedb_event["properties"]["url"] = preferred_url # Ajouter booking_url séparément si distincte if booking_url and booking_url != preferred_url: oedb_event["properties"]["tickets:url"] = booking_url # Code affaire si présent code_affaire = event_data.get('code_affaire') if code_affaire: oedb_event["properties"]["ref:viparis:code_affaire"] = str(code_affaire).strip() # Short description si disponible short_desc = event_data.get('short_description') if isinstance(short_desc, str) and short_desc.strip(): oedb_event["properties"]["short_description"] = short_desc.strip() # Prioriser une description détaillée depuis la page de l'événement si disponible # Éviter des appels réseau supplémentaires en dry-run detailed_desc = None if not self.dry_run and event_url: # si déjà en cache et qu'on doit l'ignorer, ne pas scrapper la page détail if skip_detail_if_cached: if self.is_event_processed(event_id): detailed_desc = None else: detailed_desc = self.fetch_event_detail_description(event_url) else: detailed_desc = self.fetch_event_detail_description(event_url) if detailed_desc and detailed_desc.strip(): oedb_event["properties"]["description"] = detailed_desc.strip() # Ajouter l'image si disponible if image_url: oedb_event["properties"]["image"] = image_url # Créer l'ID unique (déjà calculé plus haut) logger.info("📝 Événement:") # logger.info( json.dumps(oedb_event, ensure_ascii=False, indent=2)) enriched_raw: Dict = {"nuxt": event_data} # Enrichir avec les attributs complets du CMS si possible (via uid/slug) slug_for_cms = event_data.get('uid') or event_data.get('slug') or slug cms_attrs = self.fetch_cms_attributes_by_slug(slug_for_cms) if slug_for_cms else None if cms_attrs: enriched_raw["cms"] = cms_attrs # Fusionner des tags éventuels issus du CMS extra_tags = event_data.get('tags') if isinstance(extra_tags, list) and extra_tags: base_tags = oedb_event["properties"].get("tags", []) merged = base_tags + [t for t in extra_tags if t not in base_tags] oedb_event["properties"]["tags"] = merged return { "id": event_id, "event": oedb_event, "raw_data": enriched_raw } except Exception as e: logger.error(f"❌ Erreur lors de l'extraction des données d'événement: {e}") return None def fetch_cms_attributes_by_slug(self, slug: str) -> Optional[Dict]: """Interroge le CMS Viparis pour récupérer les attributs complets d'un événement par uid/slug.""" try: if not slug: return None base = "https://cms.viparis.com/api/e-events" params = { "filters[uid][$eq]": slug, "locale": "fr", "pagination[page]": 1, "pagination[pageSize]": 1, "populate[0]": "activity_area", "populate[1]": "cover", "populate[2]": "event_type", "populate[3]": "venues", } resp = requests.get(base, params=params, timeout=20) resp.raise_for_status() payload = resp.json() data = payload.get("data") if not data: return None item = data[0] attrs = item.get("attributes") return attrs or None except Exception: return None def fetch_event_detail_description(self, url: str) -> Optional[str]: """Récupère la description détaillée depuis .c-ticket-content__desc sur la page d'un événement.""" try: headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36' } resp = requests.get(url, headers=headers, timeout=30) resp.raise_for_status() soup = BeautifulSoup(resp.text, 'html.parser') el = soup.select_one('.c-ticket-content__desc') if not el: return None text = el.get_text(" ", strip=True) text = re.sub(r"\s+", " ", text).strip() return text or None except Exception: return None def _extract_nuxt_json(self, html: str) -> Optional[Dict]: """Extrait l'objet window.__NUXT__ via comptage d'accolades.""" try: idx = html.find('window.__NUXT__') if idx == -1: return None # Chercher le premier '{' après l'assignation start = html.find('{', idx) if start == -1: return None brace = 0 end = start for i in range(start, len(html)): ch = html[i] if ch == '{': brace += 1 elif ch == '}': brace -= 1 if brace == 0: end = i break if brace != 0: return None json_text = html[start:end + 1] return json.loads(json_text) except Exception: return None def _find_event_like_objects(self, obj) -> List[Dict]: """Parcourt récursivement un objet JSON et retourne des dicts ressemblant à des événements Viparis.""" found: List[Dict] = [] def visit(node): try: if isinstance(node, dict): # Heuristique: un événement a souvent 'name' et 'slug' if 'name' in node and 'slug' in node: found.append(node) for v in node.values(): visit(v) elif isinstance(node, list): for it in node: visit(it) except Exception: pass visit(obj) # Dédupliquer par slug unique = {} for it in found: slug = it.get('slug') if slug and slug not in unique: unique[slug] = it return list(unique.values()) def parse_date(self, date_str: str, end: bool = False) -> str: """Parse une date au format ISO""" try: if not date_str: return datetime.now().isoformat() # Essayer de parser la date if 'T' in date_str: # Format ISO déjà correct dt = datetime.fromisoformat(date_str.replace('Z', '+00:00')) else: # Format date simple dt = datetime.strptime(date_str, '%Y-%m-%d') if end: # Pour la fin, ajouter 2 heures par défaut dt = dt.replace(hour=18, minute=0, second=0) else: # Pour le début, 9h par défaut dt = dt.replace(hour=9, minute=0, second=0) return dt.isoformat() except Exception as e: logger.warning(f"⚠️ Erreur lors du parsing de la date '{date_str}': {e}") return datetime.now().isoformat() def is_event_processed(self, event_id: str) -> bool: """Vérifie si un événement a déjà été traité""" return event_id in self.cache_data.get("processed_events", {}) def mark_event_processed(self, event_id: str, status: str, event_label: str): """Marque un événement comme traité""" self.cache_data.setdefault("processed_events", {})[event_id] = { "processed_at": datetime.now().isoformat(), "status": status, "event_label": event_label } self.save_cache_data() def mark_event_processed_with_payload(self, event_id: str, event_payload: Dict, status: str, response_code: Optional[int] = None): """Marque un événement comme traité et sauvegarde le payload complet tel qu'envoyé à l'API.""" label = ((event_payload or {}).get("event") or {}).get("properties", {}).get("label") record = { "processed_at": datetime.now().isoformat(), "status": status, "event_label": label, "payload": event_payload, "response_code": response_code, } self.cache_data.setdefault("processed_events", {})[event_id] = record self.save_cache_data() def get_cached_status(self, event_id: str) -> Optional[str]: try: return ((self.cache_data or {}).get("processed_events") or {}).get(event_id, {}).get("status") except Exception: return None def send_event_to_api(self, event_data: Dict) -> Tuple[bool, str, Optional[int]]: """Envoie un événement à l'API OEDB""" # Nettoyage: retirer propriétés/champs vides avant envoi clean_payload = self._clean_event_payload(dict(event_data)) feature_payload = self._to_feature(clean_payload) if self.dry_run: label = (feature_payload.get('properties') or {}).get('label') logger.info(f"[DRY-RUN] Simulation d'envoi de l'événement: {label}") return True, "Simulé (dry-run)", None try: url = f"{self.api_base_url}/event" headers = { 'Content-Type': 'application/json', 'User-Agent': 'viparis_events_scraper/1.0' } response = requests.post(url, json=feature_payload, headers=headers, timeout=30) if response.status_code in (200, 201): return True, "Envoyé avec succès", response.status_code elif response.status_code == 409: return True, "Déjà existant", response.status_code else: return False, f"❌ ------------ Erreur API: {response.status_code} - {response.text}", response.status_code except requests.RequestException as e: return False, f"❌ ------------ Erreur de connexion: {e}", None except Exception as e: return False, f"❌ ------------ Erreur inattendue: {e}", None def process_single_event(self, event_data: Dict) -> Tuple[str, bool, str]: """Traite un seul événement (thread-safe)""" event_id = event_data['id'] event_label = event_data['event']['properties']['label'] # Vérifier si déjà traité if self.is_event_processed(event_id): # Si on n'est pas en dry-run, ne pas renvoyer si statut déjà 'saved' cached_status = self.get_cached_status(event_id) if not self.dry_run and cached_status == "saved": self.stats["already_saved"] += 1 logger.info(f"ℹ️ Événement déjà sauvegardé (cache): {event_label}") return event_id, True, "Déjà sauvegardé" logger.info(f"ℹ️ Événement déjà traité: {event_label}") return event_id, True, "Déjà traité" # Envoyer à l'API # Nettoyer avant envoi/cache clean_event = self._clean_event_payload(dict(event_data)) success, message, status_code = self.send_event_to_api(clean_event) logger.info(f"Envoi OEDB → success={success} status={status_code} msg={message}") # Marquer comme traité et sauvegarder le payload tel qu'envoyé # Sauvegarder le Feature envoyé (payload final) try: feature_sent = self._to_feature(clean_event) except Exception: feature_sent = clean_event self.mark_event_processed_with_payload(event_id, feature_sent, "saved" if success else "failed", status_code) # Enregistrer l'événement agrégé en local try: self.events_store[event_id] = feature_sent self.save_events_store() except Exception: pass return event_id, success, message def process_events(self, events: List[Dict]): """Traite la liste des événements""" if not events: logger.warning("⚠️ Aucun événement à traiter") return # Appliquer strictement la limite original_count = len(events) if isinstance(self.max_events, int) and self.max_events > 0: events = events[: self.max_events] skipped = max(0, original_count - len(events)) if skipped: self.stats["skipped_due_to_limit"] = skipped logger.info(f"Traitement de {len(events)} événements") # Traitement parallèle ou séquentiel if self.parallel and len(events) > 10: logger.info(f"Traitement parallèle de {len(events)} événements") self.process_events_parallel(events) else: logger.info(f"Traitement séquentiel de {len(events)} événements") self.process_events_sequential(events) def process_events_sequential(self, events: List[Dict]): """Traitement séquentiel des événements""" for i, event_data in enumerate(events): if self.max_events and i >= self.max_events: break event_id, success, message = self.process_single_event(event_data) # Mettre à jour les statistiques if success: # Ne pas compter comme 'envoyé' les cas 'Déjà ...' if not (isinstance(message, str) and message.startswith("Déjà")): self.stats["sent_this_run"] += 1 logger.info(f"✅ {event_data['event']['properties']['label']} - {message}") else: self.stats["api_errors"] += 1 logger.error(f"❌ {event_data['event']['properties']['label']} - {message}") def process_events_parallel(self, events: List[Dict]): """Traitement parallèle des événements""" with ThreadPoolExecutor(max_workers=self.max_workers) as executor: # Soumettre tous les événements future_to_event = { executor.submit(self.process_single_event, event_data): event_data for event_data in events } # Traiter les résultats for future in as_completed(future_to_event): event_data = future_to_event[future] try: event_id, success, message = future.result() # Mettre à jour les statistiques if success: if not (isinstance(message, str) and message.startswith("Déjà")): self.stats["sent_this_run"] += 1 logger.info(f"✅ {event_data['event']['properties']['label']} - {message}") else: self.stats["api_errors"] += 1 logger.error(f"❌ {event_data['event']['properties']['label']} - {message}") except Exception as e: logger.error(f"❌ Erreur lors du traitement parallèle: {e}") self.stats["api_errors"] += 1 def run(self, force_refresh: bool = False): """Exécute le scraping complet""" logger.info("🚀 Démarrage du scraping des événements Viparis") logger.info(f"Configuration: batch_size={self.batch_size}, api_url={self.api_base_url}") logger.info(f"Mode dry-run: {'OUI' if self.dry_run else 'NON'}") if self.max_events: logger.info(f"Limite d'événements: {self.max_events}") logger.info("=" * 60) # État du cache au démarrage try: cached_count = len(self.cache_data.get("processed_events", {})) logger.info(f"🗃️ Événements déjà en cache (processed_events): {cached_count}") except Exception: logger.info("🗃️ Événements déjà en cache (processed_events): inconnu") try: events: List[Dict] = [] # 1) Tentative via API CMS (prioritaire) logger.info("🔎 Tentative via API CMS Viparis (prioritaire)") api_events = self.fetch_events_from_api() if api_events: logger.info(f"✅ API CMS: {len(api_events)} événements récupérés") # Transformer directement via extract_event_data for raw in api_events: ev = self.extract_event_data(raw, skip_detail_if_cached=True) if not ev: continue # Éviter d'ajouter des événements déjà en cache pour respecter max-events utilement if self.is_event_processed(ev["id"]): self.stats["already_saved"] += 1 logger.info(f"ℹ️ Ignoré (déjà en cache): {ev['event']['properties'].get('label')}") continue events.append(ev) # Arrêter tôt si on a atteint la limite if isinstance(self.max_events, int) and self.max_events > 0 and len(events) >= self.max_events: break # 2) Fallback si API vide/échouée if not events: if self.use_selenium: events = self.fetch_events_with_selenium() else: # Récupérer les données des événements (HTML) html_content = self.fetch_events_data(force_refresh) if html_content is None and not force_refresh: logger.info("ℹ️ Utilisation du cache (pas de nouveau contenu)") return # Parser les événements parsed = self.parse_events_html(html_content) if html_content else [] # Respect strict de max_events if isinstance(self.max_events, int) and self.max_events > 0: events = parsed[: self.max_events] else: events = parsed if not events: logger.warning("⚠️ Aucun événement trouvé") return # Mettre à jour les statistiques self.stats["total_events"] = len(events) # Traiter les événements self.process_events(events) # Afficher les statistiques finales logger.info("📊 Statistiques finales:") for key, value in self.stats.items(): logger.info(f" {key}: {value}") # Compter les événements écrits dans le fichier d'events agrégés try: with open(self.events_file, 'r', encoding='utf-8') as f: data = json.load(f) logger.info(f"🗂️ Total d'événements enregistrés dans {self.events_file}: {len(data)}") except FileNotFoundError: logger.info(f"🗂️ Fichier {self.events_file} absent (0 évènement enregistré)") except Exception as e: logger.info(f"🗂️ Impossible de lire {self.events_file}: {e}") logger.info("✅ Scraping terminé avec succès") except Exception as e: logger.error(f"❌ Erreur lors du scraping: {e}") raise def fetch_events_with_selenium(self) -> List[Dict]: """Charge les pages via Firefox headless (Selenium) pour suivre la pagination dynamique et extraire les événements.""" try: from selenium import webdriver from selenium.webdriver.firefox.options import Options as FirefoxOptions from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from selenium.common.exceptions import TimeoutException, NoSuchElementException except Exception as e: logger.error(f"Selenium non disponible: {e}") return [] options = FirefoxOptions() options.add_argument("-headless") driver = None all_events: List[Dict] = [] seen_ids = set() try: driver = webdriver.Firefox(options=options) driver.set_page_load_timeout(30) driver.get(self.events_url) def collect_current_page_events(): try: WebDriverWait(driver, 20).until( EC.presence_of_element_located((By.TAG_NAME, "body")) ) except TimeoutException: return [] html = driver.page_source soup = BeautifulSoup(html, 'html.parser') # tenter d'extraire via JSON NUXT si présent events = [] # 1) NUXT nuxt_json = self._extract_nuxt_json(html) if nuxt_json is not None: raw_items = self._find_event_like_objects(nuxt_json) for raw in raw_items: ev = self.extract_event_data(raw) if ev: eid = ev['id'] if eid not in seen_ids: seen_ids.add(eid) events.append(ev) # 2) Sinon, heuristique sur cartes rendues (fallback minimal) if not events: cards = soup.select('a[href*="/actualites-evenements/evenements/"] h3, a[href*="/actualites-evenements/evenements/"] h2') for h in cards: a = h.find_parent('a') if not a: continue url = a.get('href') if url and not url.startswith('http'): url = f"https://www.viparis.com{url}" title = h.get_text(strip=True) if not url or not title: continue # Construire un minimum d'event si JSON indisponible eid = md5(f"{title}::{url}".encode('utf-8')).hexdigest() if eid in seen_ids: continue seen_ids.add(eid) all_props = { "label": title, "type": "scheduled", "what": "culture.community.viparis", "source:name": "Viparis Events", "source:url": self.events_url, "last_modified_by": "viparis_events_scraper", "url": url, "tags": ["viparis", "paris", "événement-professionnel"], } events.append({ "id": eid, "event": {"properties": all_props}, "raw_data": {"slug": url.rsplit('/', 1)[-1], "name": title}, }) return events # Collect page 1 all_events.extend(collect_current_page_events()) # Iterate pagination up to max_pages for page_idx in range(2, self.max_pages + 1): moved = False # Try rel=next try: next_link = driver.find_element(By.CSS_SELECTOR, 'a[rel="next"]') driver.execute_script("arguments[0].scrollIntoView(true);", next_link) next_link.click() moved = True except NoSuchElementException: pass if not moved: # Try link text 'Suivant' or 'Next' or numeric page candidates = [ (By.LINK_TEXT, "Suivant"), (By.PARTIAL_LINK_TEXT, "Suiv"), (By.LINK_TEXT, "Next"), (By.LINK_TEXT, str(page_idx)), ] for by, sel in candidates: try: el = driver.find_element(by, sel) driver.execute_script("arguments[0].scrollIntoView(true);", el) el.click() moved = True break except NoSuchElementException: continue if not moved: logger.info("Pagination Selenium: fin atteinte (pas de lien suivant)") break # Wait content update time.sleep(1.0) all_events.extend(collect_current_page_events()) if self.max_events and len(all_events) >= self.max_events: break logger.info(f"Selenium: total événements collectés: {len(all_events)}") return all_events[: self.max_events] if self.max_events else all_events except Exception as e: logger.error(f"Erreur Selenium: {e}") return [] finally: if driver is not None: try: driver.quit() except Exception: pass def main(): """Fonction principale""" parser = argparse.ArgumentParser(description="Scraper d'événements Viparis pour OEDB") parser.add_argument("--api-url", default="https://api.openeventdatabase.org", help="URL de l'API OEDB") parser.add_argument("--batch-size", type=int, default=1, help="Taille des lots pour l'envoi") parser.add_argument("--dry-run", action="store_true", default=False, help="Activer le mode simulation (désactivé par défaut)") parser.add_argument("--no-dry-run", action="store_true", help="Forcer la désactivation du mode simulation") parser.add_argument("--max-events", type=int, default=1, help="Nombre maximum d'événements à traiter") parser.add_argument("--max-pages", type=int, default=10, help="Nombre maximum de pages à parcourir (limite dure)") parser.add_argument("--force-refresh", action="store_true", help="Forcer le rafraîchissement du cache") parser.add_argument("--cache-duration", type=int, default=3600, help="Durée de validité du cache en secondes") parser.add_argument("--verbose", action="store_true", help="Mode verbeux") parser.add_argument("--parallel", action="store_true", help="Activer le traitement parallèle") parser.add_argument("--max-workers", type=int, default=4, help="Nombre de workers pour le traitement parallèle") parser.add_argument("--use-selenium", action="store_true", help="Utiliser Selenium Firefox headless pour la pagination dynamique") args = parser.parse_args() # Configuration du logging if args.verbose: logging.getLogger().setLevel(logging.DEBUG) # Créer le scraper # Logique de sélection du dry_run: --no-dry-run a priorité effective_dry_run = False if args.no_dry_run else bool(args.dry_run) logger.info(f"Mode dry-run effectif: {'OUI' if effective_dry_run else 'NON'}") scraper = ViparisEventsScraper( api_base_url=args.api_url, batch_size=args.batch_size, dry_run=effective_dry_run, max_events=args.max_events, max_pages=args.max_pages, cache_duration=args.cache_duration, parallel=args.parallel, max_workers=args.max_workers, use_selenium=args.use_selenium ) # Exécuter le scraping scraper.run(force_refresh=args.force_refresh) if __name__ == "__main__": main()