#!/usr/bin/env python3 """ Import d'événements depuis l'API GraphQL de Mobilizon vers OEDB Usage: python3 mobilizon.py --limit 25 --page-size 10 --instance-url https://mobilizon.fr \ --api-url https://api.openeventdatabase.org --dry-run --verbose Notes: - S'inspire de extractors/agenda_geek.py pour la structure générale (CLI, dry-run, session HTTP, envoi vers /event) et évite de scraper les pages web en utilisant l'API GraphQL officielle. - Ajoute un paramètre --limit pour borner le nombre d'événements à insérer. """ import argparse import json import logging import time import os import math import os from dataclasses import dataclass from datetime import datetime, timezone from typing import Dict, Iterable, List, Optional, Tuple import requests # Configuration logging (alignée avec agenda_geek.py) logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), ] ) logger = logging.getLogger(__name__) @dataclass class MobilizonEvent: uuid: Optional[str] url: Optional[str] title: Optional[str] description: Optional[str] begins_on: Optional[str] ends_on: Optional[str] status: Optional[str] latitude: Optional[float] longitude: Optional[float] address_text: Optional[str] tags: Optional[List[str]] organizer_name: Optional[str] organizer_url: Optional[str] category: Optional[str] website: Optional[str] class MobilizonClient: def __init__(self, instance_url: str = "https://mobilizon.fr") -> None: self.base = instance_url.rstrip('/') # L'endpoint GraphQL public d'une instance Mobilizon est typiquement /api self.endpoint = f"{self.base}/api" self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'OEDB-Mobilizon-Importer/1.0 (+https://github.com/cquest/oedb)', 'Content-Type': 'application/json' }) def fetch_events_page(self, page: int, page_size: int) -> Tuple[List[MobilizonEvent], int]: """Récupère une page d'événements publics via GraphQL. Retourne (events, total) où total est le total connu côté API (si exposé), sinon 0. """ # Plusieurs schémas existent selon versions; on tente un query générique. # Le champ events retourne elements[] + total dans de nombreuses versions. query = """ query Events($page: Int!, $limit: Int!) { events(page: $page, limit: $limit) { total elements { uuid url title description beginsOn endsOn status physicalAddress { description locality geom street postalCode region country } onlineAddress tags { title slug } organizerActor { name url } category } } } """ variables = {"page": page, "limit": page_size} try: logger.info(f"Fetching events page {page} with size {page_size}") logger.info(f"Query: {query}") logger.info(f"Variables: {variables}") logger.info(f"Endpoint: {self.endpoint}") resp = self.session.post(self.endpoint, json={"query": query, "variables": variables}, timeout=30) resp.raise_for_status() data = resp.json() except requests.RequestException as e: logger.error(f"Erreur HTTP GraphQL: {e}") return ([], 0) except ValueError: logger.error("Réponse GraphQL non JSON") return ([], 0) if 'errors' in data: logger.error(f"Erreurs GraphQL: {data['errors']}") return ([], 0) events_raw = (((data.get('data') or {}).get('events')) or {}).get('elements') or [] total = (((data.get('data') or {}).get('events')) or {}).get('total') or 0 parsed: List[MobilizonEvent] = [] for ev in events_raw: # Adresse/coords addr = ev.get('physicalAddress') or {} address_text = None if addr: parts = [ addr.get('description'), addr.get('street'), addr.get('postalCode'), addr.get('locality'), addr.get('region'), addr.get('country'), ] address_text = ", ".join([p for p in parts if p]) or None lat = None lon = None geom = addr.get('geom') if isinstance(addr, dict) else None # geom peut être un scalaire JSON (string) ou déjà un objet (selon versions) if geom: parsed_ok = False # 1) Essayer JSON if isinstance(geom, (dict, list)): try: g = geom if isinstance(g, dict) and isinstance(g.get('coordinates'), (list, tuple)): coords = g.get('coordinates') if isinstance(coords, list) and len(coords) >= 2: lon = float(coords[0]) lat = float(coords[1]) parsed_ok = True except Exception: pass else: # string -> tenter json, sinon WKT POINT(lon lat) try: g = json.loads(geom) if isinstance(g, dict) and isinstance(g.get('coordinates'), (list, tuple)): coords = g.get('coordinates') if isinstance(coords, list) and len(coords) >= 2: lon = float(coords[0]) lat = float(coords[1]) parsed_ok = True except Exception: # WKT import re m = re.search(r"POINT\s*\(\s*([+-]?[0-9]*\.?[0-9]+)\s+([+-]?[0-9]*\.?[0-9]+)\s*\)", str(geom)) if m: try: lon = float(m.group(1)) lat = float(m.group(2)) parsed_ok = True except Exception: pass # tags tags_field = ev.get('tags') tags_list: Optional[List[str]] = None if isinstance(tags_field, list): tags_list = [] for t in tags_field: if isinstance(t, dict): val = t.get('title') or t.get('slug') or t.get('name') if val: tags_list.append(val) elif isinstance(t, str): tags_list.append(t) if not tags_list: tags_list = None # organizer organizer = ev.get('organizerActor') or {} organizer_name = organizer.get('name') if isinstance(organizer, dict) else None organizer_url = organizer.get('url') if isinstance(organizer, dict) else None # category & website category = ev.get('category') website = ev.get('onlineAddress') or ev.get('url') parsed.append(MobilizonEvent( uuid=ev.get('uuid') or ev.get('id'), url=ev.get('url') or ev.get('onlineAddress'), title=ev.get('title'), description=ev.get('description'), begins_on=ev.get('beginsOn'), ends_on=ev.get('endsOn'), status=ev.get('status'), latitude=lat, longitude=lon, address_text=address_text, tags=tags_list, organizer_name=organizer_name, organizer_url=organizer_url, category=category, website=website, )) return (parsed, total) class MobilizonImporter: def __init__(self, api_url: str, instance_url: str, dry_run: bool = False, geocode_missing: bool = False, cache_file: Optional[str] = None) -> None: self.api_url = api_url.rstrip('/') self.client = MobilizonClient(instance_url) self.dry_run = dry_run self.geocode_missing = geocode_missing self.cache_file = cache_file self.cache = {"fetched": {}, "sent": {}, "events": {}} # uid -> ts, uid -> event dict if self.cache_file: self._load_cache() self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'OEDB-Mobilizon-Importer/1.0 (+https://github.com/cquest/oedb)' }) def _load_cache(self) -> None: try: if self.cache_file and os.path.exists(self.cache_file): with open(self.cache_file, 'r', encoding='utf-8') as f: data = json.load(f) if isinstance(data, dict): self.cache["fetched"] = data.get("fetched", {}) self.cache["sent"] = data.get("sent", {}) self.cache["events"] = data.get("events", {}) logger.info(f"Cache chargé: fetched={len(self.cache['fetched'])}, sent={len(self.cache['sent'])}, events={len(self.cache['events'])}") except Exception as e: logger.warning(f"Chargement du cache échoué: {e}") def _save_cache(self) -> None: if not self.cache_file: return try: tmp = self.cache_file + ".tmp" with open(tmp, 'w', encoding='utf-8') as f: json.dump(self.cache, f, ensure_ascii=False, indent=2) os.replace(tmp, self.cache_file) except Exception as e: logger.warning(f"Écriture du cache échouée: {e}") def geocode_address(self, address: str) -> Optional[Tuple[float, float]]: if not address or address.strip() == '': return None try: geocode_url = "https://nominatim.openstreetmap.org/search" params = { 'q': address, 'format': 'json', 'limit': 1, 'addressdetails': 0, } # Utiliser une session distincte pour respecter headers/politiques s = requests.Session() s.headers.update({'User-Agent': 'OEDB-Mobilizon-Importer/1.0 (+https://github.com/cquest/oedb)'}) r = s.get(geocode_url, params=params, timeout=15) r.raise_for_status() results = r.json() if isinstance(results, list) and results: lat = float(results[0]['lat']) lon = float(results[0]['lon']) return (lat, lon) except Exception as e: logger.warning(f"Géocodage échoué pour '{address}': {e}") return None @staticmethod def _iso_or_none(dt_str: Optional[str]) -> Optional[str]: if not dt_str: return None try: # Mobilizon renvoie souvent des ISO 8601 déjà valides. dt = datetime.fromisoformat(dt_str.replace('Z', '+00:00')) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt.isoformat() except Exception: return None @staticmethod def _parse_dt(dt_str: Optional[str]) -> Optional[datetime]: if not dt_str: return None try: dt = datetime.fromisoformat(dt_str.replace('Z', '+00:00')) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) return dt except Exception: return None @staticmethod def _oedb_feature(ev: MobilizonEvent) -> Optional[Dict]: # Nécessite des coords; si absentes on ignore (évite un géocodage agressif). if ev.latitude is None or ev.longitude is None: return None start_iso = MobilizonImporter._iso_or_none(ev.begins_on) end_iso = MobilizonImporter._iso_or_none(ev.ends_on) properties = { "label": ev.title or "Événement Mobilizon", "type": "scheduled", "what": "culture.meetup", "start": start_iso, "stop": end_iso, "where": ev.address_text or "", "description": ev.description or "", "source:name": "Mobilizon", "source:url": ev.url or "", "source:uid": ev.uuid or "", "url": ev.url or "", } if ev.tags: properties["tags"] = ev.tags if ev.organizer_name: properties["organizer:name"] = ev.organizer_name if ev.organizer_url: properties["organizer:url"] = ev.organizer_url if ev.category: properties["category"] = ev.category if ev.website: properties["website"] = ev.website feature = { "type": "Feature", "geometry": { "type": "Point", "coordinates": [ev.longitude, ev.latitude], }, "properties": properties, } logger.info(json.dumps(feature, indent=2, ensure_ascii=False)) return feature def send_to_oedb(self, feature: Dict) -> bool: # Toujours logguer le JSON envoyé (ou qui serait envoyé) if self.dry_run: logger.info("DRY RUN - Événement qui serait envoyé:") else: logger.info("Envoi de l'événement vers OEDB:") logger.info(json.dumps(feature, indent=2, ensure_ascii=False)) if self.dry_run: return True try: r = self.session.post(f"{self.api_url}/event", json=feature, timeout=30) if r.status_code == 201: logger.info("Événement créé avec succès") try: uid = feature.get('properties', {}).get('source:uid') if uid: self.cache['sent'][uid] = int(time.time()) self._save_cache() except Exception: pass return True if r.status_code == 409: logger.info("Événement déjà existant (409)") try: uid = feature.get('properties', {}).get('source:uid') if uid: self.cache['sent'][uid] = int(time.time()) self._save_cache() except Exception: pass return True logger.error(f"Erreur API OEDB {r.status_code}: {r.text}") return False except requests.RequestException as e: logger.error(f"Erreur d'appel OEDB: {e}") return False def import_events(self, limit: int, page_size: int, start_page: int = 1, sleep_s: float = 0.5) -> None: inserted = 0 fetched = 0 # nombre brut d'événements récupérés depuis l'API page = start_page pages_fetched = 0 # Parcourir les pages jusqu'à atteindre la limite demandée while inserted < limit: # Ne pas parcourir plus de pages que nécessaire (ex: limit=1, page-size=10 => 1 page max) max_pages = max(1, math.ceil(limit / page_size)) if pages_fetched >= max_pages: logger.info("Limite de pages atteinte selon --limit et --page-size, arrêt de la pagination") break remaining_fetch = max(1, min(page_size, max(1, limit - inserted))) events, total = self.client.fetch_events_page(page=page, page_size=remaining_fetch) if not events: logger.info("Aucun événement supplémentaire retourné par l'API") # Traiter des événements non envoyés depuis le cache si disponible if self.cache.get('events'): logger.info("Utilisation du cache pour traiter les événements non envoyés") for uid, ev_data in list(self.cache['events'].items()): if inserted >= limit: break if uid in self.cache['sent']: continue ev = MobilizonEvent( uuid=uid, url=ev_data.get('url'), title=ev_data.get('title'), description=ev_data.get('description'), begins_on=ev_data.get('begins_on'), ends_on=ev_data.get('ends_on'), status=ev_data.get('status'), latitude=ev_data.get('latitude'), longitude=ev_data.get('longitude'), address_text=ev_data.get('address_text'), tags=ev_data.get('tags'), organizer_name=ev_data.get('organizer_name'), organizer_url=ev_data.get('organizer_url'), category=ev_data.get('category'), website=ev_data.get('website'), ) # Filtrer les événements de plus d'une semaine start_dt = self._parse_dt(ev.begins_on) end_dt = self._parse_dt(ev.ends_on) if start_dt and end_dt: duration = end_dt - start_dt if duration.total_seconds() > 7 * 24 * 3600: continue feature = self._oedb_feature(ev) if feature is None and self.geocode_missing and ev.address_text: coords = self.geocode_address(ev.address_text) if coords: ev.latitude, ev.longitude = coords # mettre à jour le cache ev_data['latitude'], ev_data['longitude'] = coords self.cache['events'][uid] = ev_data self._save_cache() feature = self._oedb_feature(ev) if feature is None: continue ok = self.send_to_oedb(feature) if ok: inserted += 1 break break # marquer fetched et filtrer déjà envoyés/déjà vus new_in_page = 0 filtered: List[MobilizonEvent] = [] for ev in events: uid = ev.uuid or ev.url if uid: if uid in self.cache['sent']: logger.info("Ignoré (déjà envoyé) uid=%s" % uid) continue if uid not in self.cache['fetched']: new_in_page += 1 self.cache['fetched'][uid] = int(time.time()) # Sauvegarder l'événement (cache pour dry-run / re-run sans refetch) self.cache['events'][uid] = { 'url': ev.url, 'title': ev.title, 'description': ev.description, 'begins_on': ev.begins_on, 'ends_on': ev.ends_on, 'status': ev.status, 'latitude': ev.latitude, 'longitude': ev.longitude, 'address_text': ev.address_text, 'tags': ev.tags, 'organizer_name': ev.organizer_name, 'organizer_url': ev.organizer_url, 'category': ev.category, 'website': ev.website, } filtered.append(ev) self._save_cache() fetched += len(events) pages_fetched += 1 for ev in filtered: if inserted >= limit: break # Filtrer les événements de plus d'une semaine start_dt = self._parse_dt(ev.begins_on) end_dt = self._parse_dt(ev.ends_on) if start_dt and end_dt: duration = end_dt - start_dt if duration.total_seconds() > 7 * 24 * 3600: logger.info("Ignoré (durée > 7 jours)") continue feature = self._oedb_feature(ev) if feature is None: # Pas de géométrie -> on saute (évite un géocodage agressif pour rester léger) # Mais on loggue tout de même les propriétés pour visibilité properties = { "label": ev.title or "Événement Mobilizon", "type": "scheduled", "what": "culture.meetup", "start": MobilizonImporter._iso_or_none(ev.begins_on), "stop": MobilizonImporter._iso_or_none(ev.ends_on), "where": ev.address_text or "", "description": ev.description or "", "source:name": "Mobilizon", "source:url": ev.url or "", "source:uid": ev.uuid or "", "url": ev.url or "", } pseudo_feature = {"type": "Feature", "geometry": None, "properties": properties} logger.info("Ignoré (pas de géométrie) - Événement qui aurait été envoyé:") logger.info(ev) logger.info(json.dumps(pseudo_feature, indent=2, ensure_ascii=False)) # Si demandé, essayer un géocodage sur l'adresse if self.geocode_missing and ev.address_text: logger.info("Tentative de géocodage pour compléter les coordonnées...") coords = self.geocode_address(ev.address_text) if coords: ev.latitude, ev.longitude = coords feature = self._oedb_feature(ev) if feature is None: continue else: continue ok = self.send_to_oedb(feature) if ok: inserted += 1 time.sleep(sleep_s) page += 1 logger.info(f"Terminé: {inserted} événement(s) traité(s) (limite demandée: {limit})") def main() -> None: parser = argparse.ArgumentParser(description='Import Mobilizon -> OEDB (via GraphQL)') parser.add_argument('--limit', type=int, default=20, help="Nombre maximal d'événements à insérer") parser.add_argument('--page-size', type=int, default=10, help='Taille des pages GraphQL') parser.add_argument('--start-page', type=int, default=1, help='Page de départ (1-indexée)') parser.add_argument('--instance-url', default='https://mobilizon.fr', help="URL de l'instance Mobilizon (ex: https://mobilizon.fr)") parser.add_argument('--api-url', default='https://api.openeventdatabase.org', help="URL de l'API OEDB") parser.add_argument('--dry-run', action='store_true', help='Mode test sans envoi vers OEDB') parser.add_argument('--geocode-missing', action='store_true', help="Tenter un géocodage si pas de géométrie fournie", default=True) parser.add_argument('--cache-file', default='mobilizon_cache.json', help='Fichier JSON de cache pour éviter les doublons') parser.add_argument('--verbose', action='store_true', help='Mode verbeux') args = parser.parse_args() if args.verbose: logging.getLogger().setLevel(logging.DEBUG) importer = MobilizonImporter(api_url=args.api_url, instance_url=args.instance_url, dry_run=args.dry_run, geocode_missing=args.geocode_missing, cache_file=args.cache_file) importer.import_events(limit=args.limit, page_size=args.page_size, start_page=args.start_page) if __name__ == '__main__': main() # extractors/mobilizon.py