oedb-backend/extractors/utils_extractor_common.py
2025-10-10 17:45:23 +02:00

152 lines
5 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json
import os
import time
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple
import requests
import logging as logger
@dataclass
class CacheConfig:
path: str
ttl_seconds: int = 24 * 3600
def load_cache(cfg: CacheConfig) -> Dict[str, Any]:
if not cfg.path or not os.path.exists(cfg.path):
return {}
try:
with open(cfg.path, 'r', encoding='utf-8') as f:
data = json.load(f)
if not isinstance(data, dict):
return {}
ts = data.get("__fetched_at__")
if cfg.ttl_seconds > 0 and isinstance(ts, (int, float)):
if time.time() - ts > cfg.ttl_seconds:
return {}
return data
except Exception:
return {}
def save_cache(cfg: CacheConfig, data: Dict[str, Any]) -> None:
if not cfg.path:
return
os.makedirs(os.path.dirname(cfg.path), exist_ok=True)
payload = dict(data)
payload["__fetched_at__"] = int(time.time())
with open(cfg.path, 'w', encoding='utf-8') as f:
json.dump(payload, f, ensure_ascii=False, indent=2)
def oedb_feature(label: str, what: str, start: str, stop: Optional[str] = None, description: str = "", where: str = "", online: Optional[bool] = None) -> Dict[str, Any]:
props: Dict[str, Any] = {
"name": label,
"what": what,
"start": start,
"description": description,
}
if stop:
props["stop"] = stop
if where:
props["where"] = where
if online is not None:
props["online"] = "yes" if online else "no"
logger.info(f"props: {json.dumps(props, ensure_ascii=False, indent=2)}")
return {
"type": "Feature",
"properties": props,
# Non localisé par défaut
"geometry": {"type": "Point", "coordinates": [0, 0]},
}
def post_oedb_features(base_url: str, features: List[Dict[str, Any]], dry_run: bool = True, timeout: int = 20, sent_cache_path: str = None) -> Tuple[int, int, int]:
ok = 0
failed = 0
neterr = 0
# Charger le cache des événements déjà envoyés
sent_events = set()
if sent_cache_path and os.path.exists(sent_cache_path):
try:
with open(sent_cache_path, 'r', encoding='utf-8') as f:
sent_events = set(json.load(f))
except:
pass
new_sent_events = set()
for feat in features:
if dry_run:
ok += 1
continue
# Générer un ID unique pour l'événement basé sur ses propriétés
event_id = generate_event_id(feat)
# Vérifier si l'événement a déjà été envoyé
if event_id in sent_events:
print(f"Événement déjà envoyé, ignoré: {feat.get('properties', {}).get('name', 'Sans nom')}")
continue
try:
r = requests.post(f"{base_url.rstrip('/')}/event", json=feat, timeout=timeout)
if 200 <= r.status_code < 300:
ok += 1
new_sent_events.add(event_id)
elif r.status_code == 409:
# Doublon - considérer comme déjà envoyé
print(f"Événement déjà existant (doublon), ignoré: {feat.get('properties', {}).get('name', 'Sans nom')}")
ok += 1
new_sent_events.add(event_id)
else:
print(f"Erreur HTTP {r.status_code}: {r.text}")
failed += 1
except requests.RequestException as e:
print(f"Erreur réseau: {e}")
neterr += 1
# Sauvegarder les nouveaux événements envoyés
if sent_cache_path and new_sent_events:
all_sent_events = sent_events | new_sent_events
try:
os.makedirs(os.path.dirname(sent_cache_path), exist_ok=True)
with open(sent_cache_path, 'w', encoding='utf-8') as f:
json.dump(list(all_sent_events), f, ensure_ascii=False, indent=2)
except Exception as e:
print(f"Erreur lors de la sauvegarde du cache: {e}")
return ok, failed, neterr
def generate_event_id(feature: Dict[str, Any]) -> str:
"""Génère un ID unique pour un événement basé sur ses propriétés."""
props = feature.get('properties', {})
# Utiliser les propriétés clés pour créer un ID unique
key_props = {
'name': props.get('name', ''),
'what': props.get('what', ''),
'start': props.get('start', ''),
'where': props.get('where', '')
}
# Créer un hash des propriétés clés
import hashlib
content = json.dumps(key_props, sort_keys=True, ensure_ascii=False)
return hashlib.md5(content.encode('utf-8')).hexdigest()
def http_get_json(url: str, timeout: int = 20, headers: Optional[Dict[str, str]] = None) -> Any:
r = requests.get(url, timeout=timeout, headers=headers)
r.raise_for_status()
ct = r.headers.get("content-type", "")
if "json" in ct:
return r.json()
return json.loads(r.text)