582 lines
No EOL
24 KiB
Python
582 lines
No EOL
24 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Import d'événements depuis l'API GraphQL de Mobilizon vers OEDB
|
|
|
|
Usage:
|
|
python3 mobilizon.py --limit 25 --page-size 10 --instance-url https://mobilizon.fr \
|
|
--api-url https://api.openeventdatabase.org --dry-run --verbose
|
|
|
|
Notes:
|
|
- S'inspire de extractors/agenda_geek.py pour la structure générale (CLI, dry-run,
|
|
session HTTP, envoi vers /event) et évite de scraper les pages web en
|
|
utilisant l'API GraphQL officielle.
|
|
- Ajoute un paramètre --limit pour borner le nombre d'événements à insérer.
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import logging
|
|
import time
|
|
import os
|
|
import math
|
|
import os
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timezone
|
|
from typing import Dict, Iterable, List, Optional, Tuple
|
|
|
|
import requests
|
|
|
|
|
|
# Configuration logging (alignée avec agenda_geek.py)
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
handlers=[
|
|
logging.StreamHandler(),
|
|
]
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class MobilizonEvent:
|
|
uuid: Optional[str]
|
|
url: Optional[str]
|
|
title: Optional[str]
|
|
description: Optional[str]
|
|
begins_on: Optional[str]
|
|
ends_on: Optional[str]
|
|
status: Optional[str]
|
|
latitude: Optional[float]
|
|
longitude: Optional[float]
|
|
address_text: Optional[str]
|
|
tags: Optional[List[str]]
|
|
organizer_name: Optional[str]
|
|
organizer_url: Optional[str]
|
|
category: Optional[str]
|
|
website: Optional[str]
|
|
|
|
|
|
class MobilizonClient:
|
|
def __init__(self, instance_url: str = "https://mobilizon.fr") -> None:
|
|
self.base = instance_url.rstrip('/')
|
|
# L'endpoint GraphQL public d'une instance Mobilizon est typiquement /api
|
|
self.endpoint = f"{self.base}/api"
|
|
self.session = requests.Session()
|
|
self.session.headers.update({
|
|
'User-Agent': 'OEDB-Mobilizon-Importer/1.0 (+https://github.com/cquest/oedb)',
|
|
'Content-Type': 'application/json'
|
|
})
|
|
|
|
def fetch_events_page(self, page: int, page_size: int) -> Tuple[List[MobilizonEvent], int]:
|
|
"""Récupère une page d'événements publics via GraphQL.
|
|
|
|
Retourne (events, total) où total est le total connu côté API (si exposé), sinon 0.
|
|
"""
|
|
# Plusieurs schémas existent selon versions; on tente un query générique.
|
|
# Le champ events retourne elements[] + total dans de nombreuses versions.
|
|
query = """
|
|
query Events($page: Int!, $limit: Int!) {
|
|
events(page: $page, limit: $limit) {
|
|
total
|
|
elements {
|
|
uuid
|
|
url
|
|
title
|
|
description
|
|
beginsOn
|
|
endsOn
|
|
status
|
|
physicalAddress {
|
|
description
|
|
locality
|
|
geom
|
|
street
|
|
postalCode
|
|
region
|
|
country
|
|
}
|
|
onlineAddress
|
|
tags { title slug }
|
|
organizerActor { name url }
|
|
category
|
|
}
|
|
}
|
|
}
|
|
"""
|
|
variables = {"page": page, "limit": page_size}
|
|
|
|
try:
|
|
logger.info(f"Fetching events page {page} with size {page_size}")
|
|
logger.info(f"Query: {query}")
|
|
logger.info(f"Variables: {variables}")
|
|
logger.info(f"Endpoint: {self.endpoint}")
|
|
resp = self.session.post(self.endpoint, json={"query": query, "variables": variables}, timeout=30)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
except requests.RequestException as e:
|
|
logger.error(f"Erreur HTTP GraphQL: {e}")
|
|
return ([], 0)
|
|
except ValueError:
|
|
logger.error("Réponse GraphQL non JSON")
|
|
return ([], 0)
|
|
|
|
if 'errors' in data:
|
|
logger.error(f"Erreurs GraphQL: {data['errors']}")
|
|
return ([], 0)
|
|
|
|
events_raw = (((data.get('data') or {}).get('events')) or {}).get('elements') or []
|
|
|
|
total = (((data.get('data') or {}).get('events')) or {}).get('total') or 0
|
|
|
|
parsed: List[MobilizonEvent] = []
|
|
for ev in events_raw:
|
|
# Adresse/coords
|
|
addr = ev.get('physicalAddress') or {}
|
|
address_text = None
|
|
if addr:
|
|
parts = [
|
|
addr.get('description'),
|
|
addr.get('street'),
|
|
addr.get('postalCode'),
|
|
addr.get('locality'),
|
|
addr.get('region'),
|
|
addr.get('country'),
|
|
]
|
|
address_text = ", ".join([p for p in parts if p]) or None
|
|
|
|
lat = None
|
|
lon = None
|
|
geom = addr.get('geom') if isinstance(addr, dict) else None
|
|
# geom peut être un scalaire JSON (string) ou déjà un objet (selon versions)
|
|
if geom:
|
|
parsed_ok = False
|
|
# 1) Essayer JSON
|
|
if isinstance(geom, (dict, list)):
|
|
try:
|
|
g = geom
|
|
if isinstance(g, dict) and isinstance(g.get('coordinates'), (list, tuple)):
|
|
coords = g.get('coordinates')
|
|
if isinstance(coords, list) and len(coords) >= 2:
|
|
lon = float(coords[0])
|
|
lat = float(coords[1])
|
|
parsed_ok = True
|
|
except Exception:
|
|
pass
|
|
else:
|
|
# string -> tenter json, sinon WKT POINT(lon lat)
|
|
try:
|
|
g = json.loads(geom)
|
|
if isinstance(g, dict) and isinstance(g.get('coordinates'), (list, tuple)):
|
|
coords = g.get('coordinates')
|
|
if isinstance(coords, list) and len(coords) >= 2:
|
|
lon = float(coords[0])
|
|
lat = float(coords[1])
|
|
parsed_ok = True
|
|
except Exception:
|
|
# WKT
|
|
import re
|
|
m = re.search(r"POINT\s*\(\s*([+-]?[0-9]*\.?[0-9]+)\s+([+-]?[0-9]*\.?[0-9]+)\s*\)", str(geom))
|
|
if m:
|
|
try:
|
|
lon = float(m.group(1))
|
|
lat = float(m.group(2))
|
|
parsed_ok = True
|
|
except Exception:
|
|
pass
|
|
|
|
# tags
|
|
tags_field = ev.get('tags')
|
|
tags_list: Optional[List[str]] = None
|
|
if isinstance(tags_field, list):
|
|
tags_list = []
|
|
for t in tags_field:
|
|
if isinstance(t, dict):
|
|
val = t.get('title') or t.get('slug') or t.get('name')
|
|
if val:
|
|
tags_list.append(val)
|
|
elif isinstance(t, str):
|
|
tags_list.append(t)
|
|
if not tags_list:
|
|
tags_list = None
|
|
|
|
# organizer
|
|
organizer = ev.get('organizerActor') or {}
|
|
organizer_name = organizer.get('name') if isinstance(organizer, dict) else None
|
|
organizer_url = organizer.get('url') if isinstance(organizer, dict) else None
|
|
|
|
# category & website
|
|
category = ev.get('category')
|
|
website = ev.get('onlineAddress') or ev.get('url')
|
|
|
|
parsed.append(MobilizonEvent(
|
|
uuid=ev.get('uuid') or ev.get('id'),
|
|
url=ev.get('url') or ev.get('onlineAddress'),
|
|
title=ev.get('title'),
|
|
description=ev.get('description'),
|
|
begins_on=ev.get('beginsOn'),
|
|
ends_on=ev.get('endsOn'),
|
|
status=ev.get('status'),
|
|
latitude=lat,
|
|
longitude=lon,
|
|
address_text=address_text,
|
|
tags=tags_list,
|
|
organizer_name=organizer_name,
|
|
organizer_url=organizer_url,
|
|
category=category,
|
|
website=website,
|
|
))
|
|
|
|
return (parsed, total)
|
|
|
|
|
|
class MobilizonImporter:
|
|
def __init__(self, api_url: str, instance_url: str, dry_run: bool = False, geocode_missing: bool = False, cache_file: Optional[str] = None) -> None:
|
|
self.api_url = api_url.rstrip('/')
|
|
self.client = MobilizonClient(instance_url)
|
|
self.dry_run = dry_run
|
|
self.geocode_missing = geocode_missing
|
|
self.cache_file = cache_file
|
|
self.cache = {"fetched": {}, "sent": {}, "events": {}} # uid -> ts, uid -> event dict
|
|
if self.cache_file:
|
|
self._load_cache()
|
|
self.session = requests.Session()
|
|
self.session.headers.update({
|
|
'User-Agent': 'OEDB-Mobilizon-Importer/1.0 (+https://github.com/cquest/oedb)'
|
|
})
|
|
|
|
def _load_cache(self) -> None:
|
|
try:
|
|
if self.cache_file and os.path.exists(self.cache_file):
|
|
with open(self.cache_file, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
if isinstance(data, dict):
|
|
self.cache["fetched"] = data.get("fetched", {})
|
|
self.cache["sent"] = data.get("sent", {})
|
|
self.cache["events"] = data.get("events", {})
|
|
logger.info(f"Cache chargé: fetched={len(self.cache['fetched'])}, sent={len(self.cache['sent'])}, events={len(self.cache['events'])}")
|
|
except Exception as e:
|
|
logger.warning(f"Chargement du cache échoué: {e}")
|
|
|
|
def _save_cache(self) -> None:
|
|
if not self.cache_file:
|
|
return
|
|
try:
|
|
tmp = self.cache_file + ".tmp"
|
|
with open(tmp, 'w', encoding='utf-8') as f:
|
|
json.dump(self.cache, f, ensure_ascii=False, indent=2)
|
|
os.replace(tmp, self.cache_file)
|
|
except Exception as e:
|
|
logger.warning(f"Écriture du cache échouée: {e}")
|
|
|
|
def geocode_address(self, address: str) -> Optional[Tuple[float, float]]:
|
|
if not address or address.strip() == '':
|
|
return None
|
|
try:
|
|
geocode_url = "https://nominatim.openstreetmap.org/search"
|
|
params = {
|
|
'q': address,
|
|
'format': 'json',
|
|
'limit': 1,
|
|
'addressdetails': 0,
|
|
}
|
|
# Utiliser une session distincte pour respecter headers/politiques
|
|
s = requests.Session()
|
|
s.headers.update({'User-Agent': 'OEDB-Mobilizon-Importer/1.0 (+https://github.com/cquest/oedb)'})
|
|
r = s.get(geocode_url, params=params, timeout=15)
|
|
r.raise_for_status()
|
|
results = r.json()
|
|
if isinstance(results, list) and results:
|
|
lat = float(results[0]['lat'])
|
|
lon = float(results[0]['lon'])
|
|
return (lat, lon)
|
|
except Exception as e:
|
|
logger.warning(f"Géocodage échoué pour '{address}': {e}")
|
|
return None
|
|
|
|
@staticmethod
|
|
def _iso_or_none(dt_str: Optional[str]) -> Optional[str]:
|
|
if not dt_str:
|
|
return None
|
|
try:
|
|
# Mobilizon renvoie souvent des ISO 8601 déjà valides.
|
|
dt = datetime.fromisoformat(dt_str.replace('Z', '+00:00'))
|
|
if dt.tzinfo is None:
|
|
dt = dt.replace(tzinfo=timezone.utc)
|
|
return dt.isoformat()
|
|
except Exception:
|
|
return None
|
|
|
|
@staticmethod
|
|
def _parse_dt(dt_str: Optional[str]) -> Optional[datetime]:
|
|
if not dt_str:
|
|
return None
|
|
try:
|
|
dt = datetime.fromisoformat(dt_str.replace('Z', '+00:00'))
|
|
if dt.tzinfo is None:
|
|
dt = dt.replace(tzinfo=timezone.utc)
|
|
return dt
|
|
except Exception:
|
|
return None
|
|
|
|
@staticmethod
|
|
def _oedb_feature(ev: MobilizonEvent) -> Optional[Dict]:
|
|
# Nécessite des coords; si absentes on ignore (évite un géocodage agressif).
|
|
if ev.latitude is None or ev.longitude is None:
|
|
return None
|
|
|
|
start_iso = MobilizonImporter._iso_or_none(ev.begins_on)
|
|
end_iso = MobilizonImporter._iso_or_none(ev.ends_on)
|
|
|
|
properties = {
|
|
"label": ev.title or "Événement Mobilizon",
|
|
"type": "scheduled",
|
|
"what": "culture.meetup",
|
|
"start": start_iso,
|
|
"stop": end_iso,
|
|
"where": ev.address_text or "",
|
|
"description": ev.description or "",
|
|
"source:name": "Mobilizon",
|
|
"source:url": ev.url or "",
|
|
"source:uid": ev.uuid or "",
|
|
"url": ev.url or "",
|
|
}
|
|
if ev.tags:
|
|
properties["tags"] = ev.tags
|
|
if ev.organizer_name:
|
|
properties["organizer:name"] = ev.organizer_name
|
|
if ev.organizer_url:
|
|
properties["organizer:url"] = ev.organizer_url
|
|
if ev.category:
|
|
properties["category"] = ev.category
|
|
if ev.website:
|
|
properties["website"] = ev.website
|
|
|
|
feature = {
|
|
"type": "Feature",
|
|
"geometry": {
|
|
"type": "Point",
|
|
"coordinates": [ev.longitude, ev.latitude],
|
|
},
|
|
"properties": properties,
|
|
}
|
|
logger.info(json.dumps(feature, indent=2, ensure_ascii=False))
|
|
|
|
return feature
|
|
|
|
def send_to_oedb(self, feature: Dict) -> bool:
|
|
# Toujours logguer le JSON envoyé (ou qui serait envoyé)
|
|
if self.dry_run:
|
|
logger.info("DRY RUN - Événement qui serait envoyé:")
|
|
else:
|
|
logger.info("Envoi de l'événement vers OEDB:")
|
|
logger.info(json.dumps(feature, indent=2, ensure_ascii=False))
|
|
if self.dry_run:
|
|
return True
|
|
try:
|
|
r = self.session.post(f"{self.api_url}/event", json=feature, timeout=30)
|
|
if r.status_code == 201:
|
|
logger.info("Événement créé avec succès")
|
|
try:
|
|
uid = feature.get('properties', {}).get('source:uid')
|
|
if uid:
|
|
self.cache['sent'][uid] = int(time.time())
|
|
self._save_cache()
|
|
except Exception:
|
|
pass
|
|
return True
|
|
if r.status_code == 409:
|
|
logger.info("Événement déjà existant (409)")
|
|
try:
|
|
uid = feature.get('properties', {}).get('source:uid')
|
|
if uid:
|
|
self.cache['sent'][uid] = int(time.time())
|
|
self._save_cache()
|
|
except Exception:
|
|
pass
|
|
return True
|
|
logger.error(f"Erreur API OEDB {r.status_code}: {r.text}")
|
|
return False
|
|
except requests.RequestException as e:
|
|
logger.error(f"Erreur d'appel OEDB: {e}")
|
|
return False
|
|
|
|
def import_events(self, limit: int, page_size: int, start_page: int = 1, sleep_s: float = 0.5) -> None:
|
|
inserted = 0
|
|
fetched = 0 # nombre brut d'événements récupérés depuis l'API
|
|
page = start_page
|
|
pages_fetched = 0
|
|
|
|
# Parcourir les pages jusqu'à atteindre la limite demandée
|
|
while inserted < limit:
|
|
# Ne pas parcourir plus de pages que nécessaire (ex: limit=1, page-size=10 => 1 page max)
|
|
max_pages = max(1, math.ceil(limit / page_size))
|
|
if pages_fetched >= max_pages:
|
|
logger.info("Limite de pages atteinte selon --limit et --page-size, arrêt de la pagination")
|
|
break
|
|
remaining_fetch = max(1, min(page_size, max(1, limit - inserted)))
|
|
events, total = self.client.fetch_events_page(page=page, page_size=remaining_fetch)
|
|
if not events:
|
|
logger.info("Aucun événement supplémentaire retourné par l'API")
|
|
# Traiter des événements non envoyés depuis le cache si disponible
|
|
if self.cache.get('events'):
|
|
logger.info("Utilisation du cache pour traiter les événements non envoyés")
|
|
for uid, ev_data in list(self.cache['events'].items()):
|
|
if inserted >= limit:
|
|
break
|
|
if uid in self.cache['sent']:
|
|
continue
|
|
ev = MobilizonEvent(
|
|
uuid=uid,
|
|
url=ev_data.get('url'),
|
|
title=ev_data.get('title'),
|
|
description=ev_data.get('description'),
|
|
begins_on=ev_data.get('begins_on'),
|
|
ends_on=ev_data.get('ends_on'),
|
|
status=ev_data.get('status'),
|
|
latitude=ev_data.get('latitude'),
|
|
longitude=ev_data.get('longitude'),
|
|
address_text=ev_data.get('address_text'),
|
|
tags=ev_data.get('tags'),
|
|
organizer_name=ev_data.get('organizer_name'),
|
|
organizer_url=ev_data.get('organizer_url'),
|
|
category=ev_data.get('category'),
|
|
website=ev_data.get('website'),
|
|
)
|
|
# Filtrer les événements de plus d'une semaine
|
|
start_dt = self._parse_dt(ev.begins_on)
|
|
end_dt = self._parse_dt(ev.ends_on)
|
|
if start_dt and end_dt:
|
|
duration = end_dt - start_dt
|
|
if duration.total_seconds() > 7 * 24 * 3600:
|
|
continue
|
|
feature = self._oedb_feature(ev)
|
|
if feature is None and self.geocode_missing and ev.address_text:
|
|
coords = self.geocode_address(ev.address_text)
|
|
if coords:
|
|
ev.latitude, ev.longitude = coords
|
|
# mettre à jour le cache
|
|
ev_data['latitude'], ev_data['longitude'] = coords
|
|
self.cache['events'][uid] = ev_data
|
|
self._save_cache()
|
|
feature = self._oedb_feature(ev)
|
|
if feature is None:
|
|
continue
|
|
ok = self.send_to_oedb(feature)
|
|
if ok:
|
|
inserted += 1
|
|
break
|
|
break
|
|
|
|
# marquer fetched et filtrer déjà envoyés/déjà vus
|
|
new_in_page = 0
|
|
filtered: List[MobilizonEvent] = []
|
|
for ev in events:
|
|
uid = ev.uuid or ev.url
|
|
if uid:
|
|
if uid in self.cache['sent']:
|
|
logger.info("Ignoré (déjà envoyé) uid=%s" % uid)
|
|
continue
|
|
if uid not in self.cache['fetched']:
|
|
new_in_page += 1
|
|
self.cache['fetched'][uid] = int(time.time())
|
|
# Sauvegarder l'événement (cache pour dry-run / re-run sans refetch)
|
|
self.cache['events'][uid] = {
|
|
'url': ev.url,
|
|
'title': ev.title,
|
|
'description': ev.description,
|
|
'begins_on': ev.begins_on,
|
|
'ends_on': ev.ends_on,
|
|
'status': ev.status,
|
|
'latitude': ev.latitude,
|
|
'longitude': ev.longitude,
|
|
'address_text': ev.address_text,
|
|
'tags': ev.tags,
|
|
'organizer_name': ev.organizer_name,
|
|
'organizer_url': ev.organizer_url,
|
|
'category': ev.category,
|
|
'website': ev.website,
|
|
}
|
|
filtered.append(ev)
|
|
self._save_cache()
|
|
|
|
fetched += len(events)
|
|
pages_fetched += 1
|
|
|
|
for ev in filtered:
|
|
if inserted >= limit:
|
|
break
|
|
# Filtrer les événements de plus d'une semaine
|
|
start_dt = self._parse_dt(ev.begins_on)
|
|
end_dt = self._parse_dt(ev.ends_on)
|
|
if start_dt and end_dt:
|
|
duration = end_dt - start_dt
|
|
if duration.total_seconds() > 7 * 24 * 3600:
|
|
logger.info("Ignoré (durée > 7 jours)")
|
|
continue
|
|
feature = self._oedb_feature(ev)
|
|
if feature is None:
|
|
# Pas de géométrie -> on saute (évite un géocodage agressif pour rester léger)
|
|
# Mais on loggue tout de même les propriétés pour visibilité
|
|
properties = {
|
|
"label": ev.title or "Événement Mobilizon",
|
|
"type": "scheduled",
|
|
"what": "culture.meetup",
|
|
"start": MobilizonImporter._iso_or_none(ev.begins_on),
|
|
"stop": MobilizonImporter._iso_or_none(ev.ends_on),
|
|
"where": ev.address_text or "",
|
|
"description": ev.description or "",
|
|
"source:name": "Mobilizon",
|
|
"source:url": ev.url or "",
|
|
"source:uid": ev.uuid or "",
|
|
"url": ev.url or "",
|
|
}
|
|
pseudo_feature = {"type": "Feature", "geometry": None, "properties": properties}
|
|
logger.info("Ignoré (pas de géométrie) - Événement qui aurait été envoyé:")
|
|
logger.info(ev)
|
|
logger.info(json.dumps(pseudo_feature, indent=2, ensure_ascii=False))
|
|
# Si demandé, essayer un géocodage sur l'adresse
|
|
if self.geocode_missing and ev.address_text:
|
|
logger.info("Tentative de géocodage pour compléter les coordonnées...")
|
|
coords = self.geocode_address(ev.address_text)
|
|
if coords:
|
|
ev.latitude, ev.longitude = coords
|
|
feature = self._oedb_feature(ev)
|
|
if feature is None:
|
|
continue
|
|
else:
|
|
continue
|
|
ok = self.send_to_oedb(feature)
|
|
if ok:
|
|
inserted += 1
|
|
time.sleep(sleep_s)
|
|
|
|
page += 1
|
|
|
|
logger.info(f"Terminé: {inserted} événement(s) traité(s) (limite demandée: {limit})")
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(description='Import Mobilizon -> OEDB (via GraphQL)')
|
|
parser.add_argument('--limit', type=int, default=20, help="Nombre maximal d'événements à insérer")
|
|
parser.add_argument('--page-size', type=int, default=10, help='Taille des pages GraphQL')
|
|
parser.add_argument('--start-page', type=int, default=1, help='Page de départ (1-indexée)')
|
|
parser.add_argument('--instance-url', default='https://mobilizon.fr', help="URL de l'instance Mobilizon (ex: https://mobilizon.fr)")
|
|
parser.add_argument('--api-url', default='https://api.openeventdatabase.org', help="URL de l'API OEDB")
|
|
parser.add_argument('--dry-run', action='store_true', help='Mode test sans envoi vers OEDB')
|
|
parser.add_argument('--geocode-missing', action='store_true', help="Tenter un géocodage si pas de géométrie fournie", default=True)
|
|
parser.add_argument('--cache-file', default='mobilizon_cache.json', help='Fichier JSON de cache pour éviter les doublons')
|
|
parser.add_argument('--verbose', action='store_true', help='Mode verbeux')
|
|
|
|
args = parser.parse_args()
|
|
if args.verbose:
|
|
logging.getLogger().setLevel(logging.DEBUG)
|
|
|
|
importer = MobilizonImporter(api_url=args.api_url, instance_url=args.instance_url, dry_run=args.dry_run, geocode_missing=args.geocode_missing, cache_file=args.cache_file)
|
|
importer.import_events(limit=args.limit, page_size=args.page_size, start_page=args.start_page)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|
|
|
|
# extractors/mobilizon.py |