oedb-backend/extractors/programmedesexpos_paris.py
2025-10-14 23:02:32 +02:00

395 lines
14 KiB
Python

#!/usr/bin/env python3
"""
Import d'expositions depuis https://programmedesexpos.paris/ vers l'API OpenEventDatabase.
Principe:
- Récupère les balises <script type="application/ld+json"> sur une ou plusieurs pages
- Filtre les objets JSON-LD de type Event/Exhibition
- Convertit en GeoJSON Feature avec properties.type="scheduled" et what="culture.exhibition"
- Utilise un cache pour éviter de renvoyer des événements déjà transmis (HTTP 201 ou 409)
Exécution (exemples):
python3 programmedesexpos_paris.py \
--api-url https://api.openeventdatabase.org \
--pages 1 --dry-run --verbose
python3 programmedesexpos_paris.py \
--api-url https://api.openeventdatabase.org \
--pages 5 --geocode-missing
Notes:
- Le site embarque parfois un objet WebPage (à ignorer). On ne garde que Event/Exhibition.
- UID source: on privilégie l'URL de l'événement, à défaut @id, sinon un hash de name+date.
"""
import argparse
import json
import logging
import os
import re
import time
from dataclasses import dataclass
from datetime import datetime, timezone
from hashlib import md5
from typing import Dict, Iterable, List, Optional, Tuple, Union
import requests
from bs4 import BeautifulSoup
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler()],
)
logger = logging.getLogger(__name__)
BASE_URL = "https://programmedesexpos.paris/"
CACHE_FILE = os.path.join(os.path.dirname(__file__), 'programmedesexpos_cache.json')
@dataclass
class ExpoEvent:
uid: str
url: Optional[str]
name: Optional[str]
description: Optional[str]
start: Optional[str]
stop: Optional[str]
latitude: Optional[float]
longitude: Optional[float]
where_text: Optional[str]
def _is_event_type(obj_type: Union[str, List[str], None]) -> bool:
if obj_type is None:
return False
if isinstance(obj_type, str):
t = obj_type.lower()
return 'event' in t or 'exhibition' in t
if isinstance(obj_type, list):
return any(_is_event_type(t) for t in obj_type)
return False
def _to_iso8601(value: Optional[str]) -> Optional[str]:
if not value:
return None
try:
# Supporte valeurs ISO déjà valides
dt = datetime.fromisoformat(value.replace('Z', '+00:00'))
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt.isoformat()
except Exception:
# Tenter extraction par regex YYYY-MM-DD
m = re.match(r"(\d{4}-\d{2}-\d{2})", value)
if m:
try:
dt = datetime.fromisoformat(m.group(1))
return dt.isoformat()
except Exception:
return None
return None
def load_cache() -> Dict:
if os.path.exists(CACHE_FILE):
try:
with open(CACHE_FILE, 'r', encoding='utf-8') as f:
data = json.load(f)
if isinstance(data, dict):
for k in ["fetched", "sent", "pending", "events"]:
if k not in data:
data[k] = {}
logger.info(f"Cache chargé: fetched={len(data['fetched'])}, sent={len(data['sent'])}, pending={len(data['pending'])}, events={len(data['events'])}")
return data
except Exception as e:
logger.warning(f"Chargement du cache échoué: {e}")
return {"fetched": {}, "sent": {}, "pending": {}, "events": {}}
def save_cache(cache: Dict) -> None:
try:
tmp = CACHE_FILE + ".tmp"
with open(tmp, 'w', encoding='utf-8') as f:
json.dump(cache, f, ensure_ascii=False, indent=2)
os.replace(tmp, CACHE_FILE)
except Exception as e:
logger.warning(f"Écriture du cache échouée: {e}")
def geocode_address(address: str) -> Optional[Tuple[float, float]]:
if not address:
return None
try:
geocode_url = "https://nominatim.openstreetmap.org/search"
params = {
'q': address,
'format': 'json',
'limit': 1,
'addressdetails': 0,
}
s = requests.Session()
s.headers.update({'User-Agent': 'OEDB-ProgrammedesExpos-Importer/1.0 (+https://github.com/cquest/oedb)'})
r = s.get(geocode_url, params=params, timeout=20)
r.raise_for_status()
results = r.json()
if isinstance(results, list) and results:
lat = float(results[0]['lat'])
lon = float(results[0]['lon'])
return (lat, lon)
except Exception as e:
logger.warning(f"Géocodage échoué pour '{address}': {e}")
return None
def parse_jsonld_scripts(html_text: str, page_url: str) -> List[Dict]:
soup = BeautifulSoup(html_text, 'html.parser')
scripts = soup.find_all('script', attrs={'type': 'application/ld+json'})
items: List[Dict] = []
for sc in scripts:
try:
raw = sc.string or sc.get_text("", strip=True)
if not raw:
continue
data = json.loads(raw)
# Parfois un tableau d'objets
if isinstance(data, list):
for obj in data:
if isinstance(obj, dict):
items.append(obj)
elif isinstance(data, dict):
items.append(data)
except Exception:
continue
# Annoter la source de page si absente
for it in items:
it.setdefault('page:url', page_url)
return items
def jsonld_to_expo_event(obj: Dict) -> Optional[ExpoEvent]:
if not _is_event_type(obj.get('@type')):
return None
name = obj.get('name') or obj.get('headline')
description = obj.get('description')
start = _to_iso8601(obj.get('startDate'))
stop = _to_iso8601(obj.get('endDate'))
# URL prioritaire: champ url; sinon @id; sinon page URL
url = obj.get('url') or obj.get('@id') or obj.get('page:url')
# UID: URL sinon hash de name+start
uid = url or md5(f"{name or ''}|{start or ''}".encode('utf-8')).hexdigest()
# Localisation
lat = None
lon = None
where_text_parts: List[str] = []
loc = obj.get('location')
if isinstance(loc, dict):
# Nom du lieu
loc_name = loc.get('name')
if loc_name:
where_text_parts.append(loc_name)
# Adresse postale
addr = loc.get('address')
if isinstance(addr, dict):
for key in ['streetAddress', 'postalCode', 'addressLocality', 'addressRegion', 'addressCountry']:
val = addr.get(key)
if val:
where_text_parts.append(str(val))
# Coordonnées potentiellement dans location.geo
geo = loc.get('geo')
if isinstance(geo, dict):
try:
lat = float(geo.get('latitude')) if geo.get('latitude') is not None else None
lon = float(geo.get('longitude')) if geo.get('longitude') is not None else None
except Exception:
lat = None
lon = None
# Fallback: geo directement sur l'objet
if (lat is None or lon is None) and isinstance(obj.get('geo'), dict):
geo = obj['geo']
try:
lat = float(geo.get('latitude')) if geo.get('latitude') is not None else lat
lon = float(geo.get('longitude')) if geo.get('longitude') is not None else lon
except Exception:
pass
where_text = ", ".join([p for p in where_text_parts if p]) if where_text_parts else None
return ExpoEvent(
uid=uid,
url=url,
name=name,
description=description,
start=start,
stop=stop,
latitude=lat,
longitude=lon,
where_text=where_text,
)
def to_oedb_feature(ev: ExpoEvent) -> Optional[Dict]:
properties = {
"label": ev.name or "Exposition",
"type": "scheduled",
"what": "culture.exhibition.paris",
"start": ev.start,
"stop": ev.stop,
"where": ev.where_text or "",
"description": ev.description or "",
"source:name": "Programme des Expos Paris",
"source:url": ev.url or "",
"source:uid": ev.uid,
"url": ev.url or "",
}
# Géométrie: si coords absentes, retourner un Point avec coordonnées null
geometry: Dict = {"type": "Point", "coordinates": [ev.longitude, ev.latitude]} if ev.longitude is not None and ev.latitude is not None else {"type": "Point", "coordinates": None}
feature = {
"type": "Feature",
"geometry": geometry,
"properties": properties,
}
logger.info(f"feature: {json.dumps(feature, indent=2, ensure_ascii=False)}")
return feature
class Importer:
def __init__(self, api_url: str, dry_run: bool, geocode_missing: bool, pages: int) -> None:
self.api_url = api_url.rstrip('/')
self.dry_run = dry_run
self.geocode_missing = geocode_missing
self.pages = max(1, pages)
self.session = requests.Session()
self.session.headers.update({'User-Agent': 'OEDB-ProgrammedesExpos-Importer/1.0 (+https://github.com/cquest/oedb)'})
self.cache = load_cache()
def _save(self) -> None:
save_cache(self.cache)
def fetch_pages(self) -> List[Tuple[str, str]]:
pages: List[Tuple[str, str]] = []
for i in range(1, self.pages + 1):
url = BASE_URL if i == 1 else (BASE_URL.rstrip('/') + f"/page/{i}/")
try:
logger.info(f"Téléchargement page {i}: {url}")
r = self.session.get(url, timeout=30)
r.raise_for_status()
pages.append((url, r.text))
except requests.RequestException as e:
logger.warning(f"Échec téléchargement {url}: {e}")
break
return pages
def send_to_oedb(self, feature: Dict) -> bool:
if self.dry_run:
logger.info("DRY RUN - Événement qui serait envoyé:")
logger.info(json.dumps(feature, indent=2, ensure_ascii=False))
return True
try:
r = self.session.post(f"{self.api_url}/event", json=feature, timeout=30)
if r.status_code in (200, 201):
logger.info("Événement créé avec succès")
return True
if r.status_code == 409:
logger.info("Événement déjà existant (409)")
return True
logger.error(f"Erreur API OEDB {r.status_code}: {r.text}")
return False
except requests.RequestException as e:
logger.error(f"Erreur d'appel OEDB: {e}")
return False
def run(self, limit: int, sleep_s: float = 0.5) -> None:
inserted = 0
pages = self.fetch_pages()
for page_url, html_text in pages:
if inserted >= limit:
break
jsonld_items = parse_jsonld_scripts(html_text, page_url)
for obj in jsonld_items:
if inserted >= limit:
break
ev = jsonld_to_expo_event(obj)
if not ev:
continue
# Filtrage via cache
if ev.uid in self.cache['sent']:
logger.info(f"Ignoré (déjà envoyé) uid={ev.uid}")
continue
# Déterminer si l'événement était déjà en cache avant ce run
in_cache = ev.uid in self.cache['events']
# Géocoder seulement si pas en cache et coordonnées manquantes mais where_text présent
if (ev.latitude is None or ev.longitude is None) and ev.where_text and not in_cache:
coords = geocode_address(ev.where_text)
if coords:
ev.latitude, ev.longitude = coords
# Marquer fetched et enregistrer/mettre à jour l'événement dans le cache
self.cache['fetched'][ev.uid] = int(time.time())
self.cache['events'][ev.uid] = {
'url': ev.url,
'name': ev.name,
'description': ev.description,
'start': ev.start,
'stop': ev.stop,
'latitude': ev.latitude,
'longitude': ev.longitude,
'where_text': ev.where_text,
}
# Si pas déjà marqué envoyé, on le marque pending (sera déplacé vers sent après envoi effectif)
if ev.uid not in self.cache['sent']:
self.cache['pending'][ev.uid] = int(time.time())
self._save()
feature = to_oedb_feature(ev)
ok = self.send_to_oedb(feature)
if ok:
# En dry-run on conserve en pending, en commit on bascule en sent
if not self.dry_run:
self.cache['sent'][ev.uid] = int(time.time())
if ev.uid in self.cache['pending']:
self.cache['pending'].pop(ev.uid, None)
self._save()
inserted += 1
time.sleep(sleep_s)
logger.info(f"Terminé: {inserted} événement(s) traité(s) (limite demandée: {limit})")
def main() -> None:
parser = argparse.ArgumentParser(description="Import 'Programme des Expositions à Paris' -> OEDB")
parser.add_argument('--api-url', default='https://api.openeventdatabase.org', help="URL de l'API OEDB")
parser.add_argument('--pages', type=int, default=1, help='Nombre de pages à scrapper (1 = accueil)')
parser.add_argument('--limit', type=int, default=50, help="Nombre maximal d'événements à insérer")
# Dry-run par défaut, --commit pour envoyer réellement
parser.add_argument('--commit', action='store_true', help='Envoyer réellement vers OEDB (désactive le dry-run)')
parser.add_argument('--geocode-missing', action='store_true', help='Géocoder si pas de coordonnées en JSON-LD')
parser.add_argument('--verbose', action='store_true', help='Logs verbeux')
args = parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
dry_run = not args.commit
importer = Importer(api_url=args.api_url, dry_run=dry_run, geocode_missing=args.geocode_missing, pages=args.pages)
importer.run(limit=args.limit)
if __name__ == '__main__':
main()