add ccpl scraping start

This commit is contained in:
Tykayn 2025-10-09 23:35:12 +02:00 committed by tykayn
parent e16d77d056
commit 24bd65565c
8 changed files with 1506 additions and 56 deletions

View file

@ -54,6 +54,13 @@
- **Logs informatifs** : Indication claire des événements prioritaires avec emoji 🔄
- **Robustesse** : Retry automatique des événements échoués
### 9. Traitement Parallèle
- **Activation automatique** : Se déclenche pour plus de 10 événements avec `--parallel`
- **ThreadPoolExecutor** : Utilise `concurrent.futures` pour la parallélisation
- **Workers configurables** : Nombre de workers ajustable avec `--max-workers`
- **Thread-safe** : Méthode `process_single_event()` sécurisée pour les threads
- **Performance** : Amélioration significative pour les gros volumes d'événements
## Utilisation
### Commandes de Base
@ -73,6 +80,12 @@ python agendadulibre.py --max-events 3 --verbose
# Forcer le rechargement du fichier iCal
python agendadulibre.py --force-refresh --max-events 5
# Traitement parallèle pour gros volumes
python agendadulibre.py --max-events 50 --parallel --max-workers 8 --no-dry-run
# Traitement parallèle en mode dry-run
python agendadulibre.py --max-events 100 --parallel --max-workers 4
```
### Arguments Disponibles
@ -87,6 +100,8 @@ python agendadulibre.py --force-refresh --max-events 5
| `--cache-duration N` | Durée de validité du cache (heures) | 1 |
| `--batch-size N` | Taille des batches | 1 |
| `--api-url URL` | URL de l'API OEDB | https://api.openeventdatabase.org |
| `--parallel` | Activer le traitement parallèle pour plus de 10 événements | False |
| `--max-workers N` | Nombre maximum de workers pour le traitement parallèle | 4 |
## Fichiers Générés
@ -301,6 +316,8 @@ python test_agendadulibre_improvements.py
16. **Priorisation intelligente** : Traitement prioritaire des événements en attente
17. **Robustesse** : Retry automatique des événements échoués
18. **Efficacité** : Optimisation du traitement par priorité
19. **Parallélisation** : Traitement simultané pour les gros volumes d'événements
20. **Performance** : Amélioration significative avec `--parallel` et `--max-workers`
## Migration

View file

@ -0,0 +1,231 @@
# Scraper CCPL Agenda
Script de scraping pour l'agenda de la CCPL (Communauté de Communes du Pays de Limours) - https://www.cc-paysdelimours.fr/agenda
## Fonctionnalités
### 🚀 Scraping HTML Intelligent
- **Parsing HTML** : Extraction des événements depuis la structure HTML de l'agenda CCPL
- **Détection automatique** : Identification des liens d'événements avec classes spécifiques
- **Extraction complète** : Titre, date, URL, image, lieu
- **Détails enrichis** : Récupération des informations depuis les pages individuelles des événements
- **Fallback robuste** : Méthodes alternatives si la structure change
### 💾 Cache JSON Intelligent
- **Détection de changements** : Hash MD5 du contenu HTML pour éviter les re-traitements
- **Cache persistant** : Sauvegarde des événements traités dans `ccpl_agenda_events.json`
- **Cache de contenu** : Sauvegarde du hash dans `ccpl_agenda_cache.json`
- **Optimisation** : Évite les re-téléchargements inutiles
### ⚙️ Paramètres Configurables
- **Limite d'événements** : `--max-events N` (défaut: 1)
- **Mode dry-run** : Simulation par défaut, `--no-dry-run` pour l'envoi réel
- **Traitement parallèle** : `--parallel` pour plus de 10 événements
- **Workers** : `--max-workers N` pour le traitement parallèle
- **Cache** : `--cache-duration N` heures de validité
### 🔄 Traitement Parallèle
- **Activation automatique** : Se déclenche pour plus de 10 événements avec `--parallel`
- **ThreadPoolExecutor** : Utilise `concurrent.futures` pour la parallélisation
- **Workers configurables** : Nombre de workers ajustable avec `--max-workers`
- **Thread-safe** : Méthode `process_single_event()` sécurisée pour les threads
## Utilisation
### Commandes de Base
```bash
# Mode dry-run par défaut (sécurisé)
python ccpl_agenda.py
# Limiter à 1 événement en mode dry-run
python ccpl_agenda.py --max-events 1
# Mode réel avec limite de 5 événements
python ccpl_agenda.py --no-dry-run --max-events 5
# Mode verbeux pour voir les détails
python ccpl_agenda.py --max-events 3 --verbose
# Forcer le rechargement de l'agenda
python ccpl_agenda.py --force-refresh --max-events 3
# Traitement parallèle pour gros volumes
python ccpl_agenda.py --max-events 20 --parallel --max-workers 4 --no-dry-run
# Traitement parallèle en mode dry-run
python ccpl_agenda.py --max-events 50 --parallel --max-workers 8
```
### Arguments Disponibles
| Argument | Description | Défaut |
|----------|-------------|---------|
| `--max-events N` | Limite le nombre d'événements à traiter | 1 |
| `--dry-run` | Mode simulation (par défaut) | Activé |
| `--no-dry-run` | Désactive le mode dry-run | - |
| `--verbose` | Mode verbeux | - |
| `--force-refresh` | Force le rechargement de l'agenda | - |
| `--cache-duration N` | Durée de validité du cache (heures) | 1 |
| `--batch-size N` | Taille des batches | 1 |
| `--api-url URL` | URL de l'API OEDB | https://api.openeventdatabase.org |
| `--parallel` | Activer le traitement parallèle pour plus de 10 événements | False |
| `--max-workers N` | Nombre maximum de workers pour le traitement parallèle | 4 |
## Fichiers Générés
### Cache JSON (`ccpl_agenda_cache.json`)
```json
{
"processed_events": {
"event_id": {
"processed_at": "2024-01-01T12:00:00",
"status": "saved",
"event_label": "Titre de l'événement"
}
},
"last_fetch": "2024-01-01T12:00:00",
"content_hash": "abc123..."
}
```
### Événements JSON (`ccpl_agenda_events.json`)
```json
{
"events": {
"event_id": {
"status": "saved",
"message": "Créé avec succès",
"last_attempt": "2024-01-01T12:00:00",
"event": {
"properties": {
"label": "Titre de l'événement",
"description": "Description...",
"type": "scheduled",
"what": "culture.community",
"where": "Pays de Limours, France",
"start": "2024-01-01T10:00:00",
"stop": "2024-01-01T12:00:00",
"url": "https://www.cc-paysdelimours.fr/agenda/event",
"source:name": "CCPL Agenda",
"source:url": "https://www.cc-paysdelimours.fr/agenda",
"last_modified_by": "ccpl_agenda_scraper",
"tags": ["ccpl", "pays-de-limours", "événement-communal"],
"image": "https://www.cc-paysdelimours.fr/image.jpg"
},
"geometry": {
"type": "Point",
"coordinates": [2.0644, 48.5917]
}
}
}
},
"last_update": "2024-01-01T12:00:00"
}
```
## Structure des Événements
### Propriétés Extraites
- **Titre** : Extrait depuis `<p class="agenda-title">`
- **Date** : Extrait depuis `<span class="number">` et `<span class="small">`
- **URL** : Lien vers la page détaillée de l'événement
- **Image** : Image de l'événement si disponible
- **Lieu** : Adresse détaillée extraite depuis la page de l'événement
- **Coordonnées** : Coordonnées depuis la carte Leaflet ou par défaut du Pays de Limours
- **Contact** : Téléphone, email et site web extraits depuis la page de l'événement
- **Description** : Description complète de l'événement
- **Horaires** : Informations d'ouverture et de tarifs
### Format OEDB
Les événements sont formatés selon le standard GeoJSON attendu par l'API OEDB :
- **Type** : `scheduled` (événement programmé)
- **Catégorie** : `culture.community` (événement communautaire)
- **Tags** : `["ccpl", "pays-de-limours", "événement-communal"]`
- **Source** : `CCPL Agenda` avec URL de référence
- **Contact** : `contact:phone`, `contact:email`, `contact:website` si disponibles
## Exemples de Sortie
### Mode Dry-Run
```
🚀 Démarrage du scraping de l'agenda CCPL
Configuration: batch_size=1, api_url=https://api.openeventdatabase.org
Mode dry-run: OUI
Limite d'événements: 3
============================================================
🌐 Récupération de l'agenda CCPL: https://www.cc-paysdelimours.fr/agenda
🔄 Nouveau contenu détecté, mise à jour du cache
🔗 30 liens d'événements trouvés
📅 3 événements extraits au total
Traitement de 3 événements
Mode DRY-RUN activé - aucun événement ne sera envoyé à l'API
📝 Détails de l'événement à insérer:
ID: a650b1026dbfe0ae8a8832906591af4d
Titre: Kylen... entre le rêve et la création
Description: Événement organisé par la CCPL - Kylen... entre le rêve et la création
Type: scheduled
Catégorie: culture.community
Lieu: Pays de Limours, France
Début: 2025-09-30T00:00:00
Fin: 2025-09-30T02:00:00
URL: https://www.cc-paysdelimours.fr/agenda/kylen...-entre-le-reve-et-la-creation
Source: CCPL Agenda
Coordonnées: [2.0644, 48.5917]
Tags: ccpl, pays-de-limours, événement-communal
Modifié par: ccpl_agenda_scraper
📞 Téléphone: 0164911908
📧 Email: bibliotheque@mairie-limours.fr
🌐 Site web: https://x.com/CCPAYSDELIMOURS
🖼️ Image: https://www.cc-paysdelimours.fr/isens_thumb.php?image=...
[DRY-RUN] Simulation d'envoi de l'événement: Kylen... entre le rêve et la création
✅ Kylen... entre le rêve et la création - Simulé (dry-run)
📊 Statistiques finales:
total_events: 3
new_events: 3
already_saved: 0
api_errors: 0
parse_errors: 0
sent_this_run: 3
skipped_due_to_limit: 0
✅ Scraping terminé avec succès
```
### Mode Parallèle
```
🚀 Traitement parallèle de 20 événements avec 4 workers
Limite d'événements: 20
Mode DRY-RUN activé - aucun événement ne sera envoyé à l'API
✅ Événement 1 - Simulé (dry-run)
✅ Événement 2 - Simulé (dry-run)
...
📊 Statistiques finales:
total_events: 20
new_events: 20
sent_this_run: 20
```
## Avantages
1. **Sécurité** : Mode dry-run par défaut
2. **Performance** : Cache intelligent et traitement parallèle
3. **Robustesse** : Gestion d'erreurs et fallbacks
4. **Flexibilité** : Paramètres configurables
5. **Traçabilité** : Logs détaillés et sauvegarde des états
6. **Efficacité** : Évite les re-traitements inutiles
7. **Parallélisation** : Traitement simultané pour les gros volumes
8. **Extraction complète** : Toutes les métadonnées disponibles
## Dépendances
```bash
pip install -r requirements_ccpl.txt
```
- `requests>=2.25.0` : Requêtes HTTP
- `beautifulsoup4>=4.9.0` : Parsing HTML
- `lxml>=4.6.0` : Parser XML/HTML rapide
## Migration
Le script est compatible avec la même structure que le scraper agenda du libre, permettant une utilisation cohérente dans l'écosystème OEDB.

View file

@ -11,6 +11,7 @@ import sys
import argparse
import re
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
import icalendar
@ -32,11 +33,14 @@ logging.basicConfig(
logger = logging.getLogger(__name__)
class AgendaDuLibreScraper:
def __init__(self, api_base_url: str = api_oedb, batch_size: int = 1, max_events: int = None, dry_run: bool = True):
def __init__(self, api_base_url: str = api_oedb, batch_size: int = 1, max_events: int = None, dry_run: bool = True,
parallel: bool = False, max_workers: int = 4):
self.api_base_url = api_base_url
self.batch_size = batch_size
self.max_events = max_events
self.dry_run = dry_run
self.parallel = parallel
self.max_workers = max_workers
self.data_file = "agendadulibre_events.json"
self.cache_file = "agendadulibre_cache.json"
self.ical_file = "agendadulibre_events.ics"
@ -626,6 +630,29 @@ class AgendaDuLibreScraper:
logger.error(f"❌ Erreur inattendue: {e}")
return False, f"Erreur inattendue: {e}"
def process_single_event(self, event_data: Dict) -> Tuple[str, bool, str]:
"""Traite un événement individuellement (thread-safe)"""
event_id = event_data["id"]
event_label = event_data["event"]["properties"]["label"]
try:
# Vérifier si l'événement a déjà été traité avec succès
skip_geocoding = False
if event_id in self.events_data["events"]:
event_status = self.events_data["events"][event_id].get("status", "unknown")
if event_status in ["saved", "already_exists"]:
skip_geocoding = True
logger.info(f" Géocodage ignoré pour {event_label} - déjà traité")
# Envoyer à l'API
success, message = self.send_event_to_api(event_data, skip_geocoding=skip_geocoding)
return event_id, success, message
except Exception as e:
logger.error(f"❌ Erreur lors du traitement de {event_label}: {e}")
return event_id, False, f"Erreur: {e}"
def process_events(self, calendar: Calendar) -> Dict:
"""Traite tous les événements du calendrier"""
stats = {
@ -714,76 +741,136 @@ class AgendaDuLibreScraper:
# Extraire les événements pour le traitement
events_to_process = [item["event"] for item in all_events]
# Traiter les événements par batch
logger.info(f"Traitement de {len(events_to_process)} nouveaux événements par batch de {self.batch_size}")
if self.max_events:
logger.info(f"Limite d'événements: {self.max_events}")
if self.dry_run:
logger.info("Mode DRY-RUN activé - aucun événement ne sera envoyé à l'API")
for i in range(0, len(events_to_process), self.batch_size):
batch = events_to_process[i:i + self.batch_size]
logger.info(f"Traitement du batch {i//self.batch_size + 1}/{(len(events_to_process) + self.batch_size - 1)//self.batch_size}")
# Traiter les événements
if self.parallel and len(events_to_process) > 10:
logger.info(f"🚀 Traitement parallèle de {len(events_to_process)} événements avec {self.max_workers} workers")
if self.max_events:
logger.info(f"Limite d'événements: {self.max_events}")
if self.dry_run:
logger.info("Mode DRY-RUN activé - aucun événement ne sera envoyé à l'API")
for event_data in batch:
event_id = event_data["id"]
event_label = event_data["event"]["properties"]["label"]
# Traitement parallèle
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# Soumettre tous les événements
future_to_event = {
executor.submit(self.process_single_event, event_data): event_data
for event_data in events_to_process
}
logger.info(f"Envoi de l'événement: {event_label}")
# Vérifier si l'événement a déjà été traité avec succès
skip_geocoding = False
if event_id in self.events_data["events"]:
event_status = self.events_data["events"][event_id].get("status", "unknown")
if event_status in ["saved", "already_exists"]:
skip_geocoding = True
logger.info(f" Géocodage ignoré pour {event_label} - déjà traité")
# Envoyer à l'API
success, message = self.send_event_to_api(event_data, skip_geocoding=skip_geocoding)
# Mettre à jour les statistiques et les données locales
if success:
stats["new_events"] += 1
stats["sent_this_run"] += 1
self.events_data["events"][event_id] = {
"status": "saved",
"message": message,
"last_attempt": datetime.now().isoformat(),
"event": event_data["event"]
}
# Ajouter au cache des événements traités
self.cache_data["processed_events"][event_id] = {
"processed_at": datetime.now().isoformat(),
"status": "saved",
"event_label": event_label
}
logger.info(f"{event_label} - {message}")
else:
if "déjà existant" in message or "already exists" in message.lower():
stats["already_saved"] += 1
# Traiter les résultats au fur et à mesure
for future in as_completed(future_to_event):
event_data = future_to_event[future]
event_id, success, message = future.result()
event_label = event_data["event"]["properties"]["label"]
# Mettre à jour les statistiques et les données locales
if success:
stats["new_events"] += 1
stats["sent_this_run"] += 1
self.events_data["events"][event_id] = {
"status": "already_exists",
"status": "saved",
"message": message,
"last_attempt": datetime.now().isoformat(),
"event": event_data["event"]
}
# Ajouter au cache même si déjà existant
# Ajouter au cache des événements traités
self.cache_data["processed_events"][event_id] = {
"processed_at": datetime.now().isoformat(),
"status": "already_exists",
"status": "saved",
"event_label": event_label
}
logger.info(f"⚠️ {event_label} - {message}")
logger.info(f"{event_label} - {message}")
else:
stats["api_errors"] += 1
if "déjà existant" in message or "already exists" in message.lower():
stats["already_saved"] += 1
self.events_data["events"][event_id] = {
"status": "already_exists",
"message": message,
"last_attempt": datetime.now().isoformat(),
"event": event_data["event"]
}
# Ajouter au cache même si déjà existant
self.cache_data["processed_events"][event_id] = {
"processed_at": datetime.now().isoformat(),
"status": "already_exists",
"event_label": event_label
}
logger.info(f"{event_label} - {message}")
else:
stats["api_errors"] += 1
self.events_data["events"][event_id] = {
"status": "api_error",
"message": message,
"last_attempt": datetime.now().isoformat(),
"event": event_data["event"]
}
logger.error(f"{event_label} - {message}")
# Sauvegarder les données après chaque événement
self.save_events_data()
self.save_cache_data()
else:
# Traitement séquentiel (mode original)
logger.info(f"Traitement séquentiel de {len(events_to_process)} événements par batch de {self.batch_size}")
if self.max_events:
logger.info(f"Limite d'événements: {self.max_events}")
if self.dry_run:
logger.info("Mode DRY-RUN activé - aucun événement ne sera envoyé à l'API")
for i in range(0, len(events_to_process), self.batch_size):
batch = events_to_process[i:i + self.batch_size]
logger.info(f"Traitement du batch {i//self.batch_size + 1}/{(len(events_to_process) + self.batch_size - 1)//self.batch_size}")
for event_data in batch:
event_id, success, message = self.process_single_event(event_data)
event_label = event_data["event"]["properties"]["label"]
# Mettre à jour les statistiques et les données locales
if success:
stats["new_events"] += 1
stats["sent_this_run"] += 1
self.events_data["events"][event_id] = {
"status": "error",
"status": "saved",
"message": message,
"last_attempt": datetime.now().isoformat(),
"event": event_data["event"]
}
logger.error(f"{event_label} - {message}")
# Ajouter au cache des événements traités
self.cache_data["processed_events"][event_id] = {
"processed_at": datetime.now().isoformat(),
"status": "saved",
"event_label": event_label
}
logger.info(f"{event_label} - {message}")
else:
if "déjà existant" in message or "already exists" in message.lower():
stats["already_saved"] += 1
self.events_data["events"][event_id] = {
"status": "already_exists",
"message": message,
"last_attempt": datetime.now().isoformat(),
"event": event_data["event"]
}
# Ajouter au cache même si déjà existant
self.cache_data["processed_events"][event_id] = {
"processed_at": datetime.now().isoformat(),
"status": "already_exists",
"event_label": event_label
}
logger.info(f"{event_label} - {message}")
else:
stats["api_errors"] += 1
self.events_data["events"][event_id] = {
"status": "api_error",
"message": message,
"last_attempt": datetime.now().isoformat(),
"event": event_data["event"]
}
logger.error(f"{event_label} - {message}")
# Sauvegarder les données après chaque événement
self.save_events_data()
self.save_cache_data()
# Mettre à jour la date de dernière mise à jour
self.events_data["last_update"] = datetime.now().isoformat()
@ -846,6 +933,10 @@ def main():
help="Forcer le rechargement du fichier iCal (ignorer le cache)")
parser.add_argument("--cache-duration", type=int, default=1,
help="Durée de validité du cache en heures (défaut: 1)")
parser.add_argument("--parallel", action="store_true",
help="Activer le traitement parallèle pour plus de 10 événements")
parser.add_argument("--max-workers", type=int, default=4,
help="Nombre maximum de workers pour le traitement parallèle (défaut: 4)")
args = parser.parse_args()
@ -860,7 +951,9 @@ def main():
api_base_url=args.api_url,
batch_size=args.batch_size,
max_events=args.max_events,
dry_run=dry_run
dry_run=dry_run,
parallel=args.parallel,
max_workers=args.max_workers
)
# Modifier la durée de cache si spécifiée

974
extractors/ccpl_agenda.py Normal file
View file

@ -0,0 +1,974 @@
#!/usr/bin/env python3
"""
Script de scraping pour l'agenda de la CCPL (Communauté de Communes du Pays de Limours)
https://www.cc-paysdelimours.fr/agenda
Utilise le scraping HTML pour récupérer les événements et les envoyer à l'API OEDB
"""
import requests
import json
import os
import sys
import argparse
import re
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
import logging
from bs4 import BeautifulSoup
import hashlib
# Configuration par défaut
api_oedb = "https://api.openeventdatabase.org"
# Configuration du logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('ccpl_agenda_scraper.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class CCPLAgendaScraper:
def __init__(self, api_base_url: str = api_oedb, batch_size: int = 1, max_events: int = None, dry_run: bool = True,
parallel: bool = False, max_workers: int = 4):
self.api_base_url = api_base_url
self.batch_size = batch_size
self.max_events = max_events
self.dry_run = dry_run
self.parallel = parallel
self.max_workers = max_workers
self.data_file = "ccpl_agenda_events.json"
self.cache_file = "ccpl_agenda_cache.json"
self.agenda_url = "https://www.cc-paysdelimours.fr/agenda"
self.cache_duration_hours = 1 # Durée de cache en heures
# Charger les données existantes
self.events_data = self.load_events_data()
self.cache_data = self.load_cache_data()
def load_events_data(self) -> Dict:
"""Charge les données des événements depuis le fichier JSON"""
if os.path.exists(self.data_file):
try:
with open(self.data_file, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
logger.warning(f"Erreur lors du chargement des données: {e}")
return {
"events": {},
"last_update": None
}
def save_events_data(self):
"""Sauvegarde les données des événements dans le fichier JSON"""
try:
with open(self.data_file, 'w', encoding='utf-8') as f:
json.dump(self.events_data, f, ensure_ascii=False, indent=2)
except Exception as e:
logger.error(f"Erreur lors de la sauvegarde des données: {e}")
def load_cache_data(self) -> Dict:
"""Charge les données du cache depuis le fichier JSON"""
if os.path.exists(self.cache_file):
try:
with open(self.cache_file, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
logger.warning(f"Erreur lors du chargement du cache: {e}")
return {
"processed_events": {},
"last_fetch": None,
"content_hash": None
}
def save_cache_data(self):
"""Sauvegarde les données du cache dans le fichier JSON"""
try:
with open(self.cache_file, 'w', encoding='utf-8') as f:
json.dump(self.cache_data, f, ensure_ascii=False, indent=2)
except Exception as e:
logger.error(f"Erreur lors de la sauvegarde du cache: {e}")
def get_content_hash(self, content: str) -> str:
"""Génère un hash du contenu pour détecter les changements"""
import hashlib
return hashlib.md5(content.encode('utf-8')).hexdigest()
def is_content_changed(self, new_hash: str) -> bool:
"""Vérifie si le contenu a changé depuis la dernière récupération"""
cached_hash = self.cache_data.get("content_hash")
return cached_hash != new_hash
def fetch_agenda_data(self) -> Optional[str]:
"""Récupère les données de l'agenda CCPL"""
try:
logger.info(f"🌐 Récupération de l'agenda CCPL: {self.agenda_url}")
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
response = requests.get(self.agenda_url, headers=headers, timeout=30)
response.raise_for_status()
content = response.text
content_hash = self.get_content_hash(content)
# Vérifier si le contenu a changé
if self.is_content_changed(content_hash):
logger.info("🔄 Nouveau contenu détecté, mise à jour du cache")
self.cache_data["content_hash"] = content_hash
self.cache_data["last_fetch"] = datetime.now().isoformat()
self.save_cache_data()
return content
else:
logger.info(" Contenu identique au précédent, utilisation du cache")
return None
except requests.RequestException as e:
logger.error(f"❌ Erreur lors de la récupération de l'agenda: {e}")
return None
except Exception as e:
logger.error(f"❌ Erreur inattendue: {e}")
return None
def parse_agenda_html(self, html_content: str) -> List[Dict]:
"""Parse le HTML de l'agenda pour extraire les événements"""
try:
soup = BeautifulSoup(html_content, 'html.parser')
events = []
# D'après l'analyse HTML, les événements sont dans des liens <a> avec des classes spécifiques
# Chercher les liens d'événements
event_links = soup.find_all('a', class_=re.compile(r'col-lg-3|col-sm-6|mb-3'))
logger.info(f"🔗 {len(event_links)} liens d'événements trouvés")
for i, link in enumerate(event_links):
if self.max_events and len(events) >= self.max_events:
break
try:
event_data = self.extract_event_data_from_link(link, i)
if event_data:
events.append(event_data)
except Exception as e:
logger.warning(f"Erreur lors du parsing de l'événement {i}: {e}")
continue
# Si pas d'événements trouvés avec les liens, essayer une approche alternative
if not events:
logger.info("🔍 Tentative d'extraction alternative...")
# Chercher par pattern de date dans les spans
date_spans = soup.find_all('span', class_='small')
for i, span in enumerate(date_spans):
if self.max_events and len(events) >= self.max_events:
break
# Trouver l'élément parent qui contient l'événement
parent = span.parent
while parent and parent.name != 'a':
parent = parent.parent
if parent and parent.name == 'a':
try:
event_data = self.extract_event_data_from_link(parent, i)
if event_data:
events.append(event_data)
except Exception as e:
logger.warning(f"Erreur lors du parsing alternatif de l'événement {i}: {e}")
continue
logger.info(f"📅 {len(events)} événements extraits au total")
return events
except Exception as e:
logger.error(f"❌ Erreur lors du parsing HTML: {e}")
return []
def extract_event_data_from_link(self, link_element, index: int) -> Optional[Dict]:
"""Extrait les données d'un événement depuis un lien d'événement"""
try:
# Extraire l'URL
url = link_element.get('href', '')
if url.startswith('/'):
url = f"https://www.cc-paysdelimours.fr{url}"
# Extraire le titre
title_elem = link_element.find('p', class_='agenda-title')
title = title_elem.get_text(strip=True) if title_elem else f"Événement {index + 1}"
# Extraire la date
date_text = ""
date_wrapper = link_element.find('div', class_='date-wrapper')
if date_wrapper:
# Extraire le jour
day_elem = date_wrapper.find('span', class_='number')
day = day_elem.get_text(strip=True) if day_elem else ""
# Extraire le mois
month_elem = date_wrapper.find('span', class_='small')
month = month_elem.get_text(strip=True) if month_elem else ""
if day and month:
date_text = f"{day} {month}"
# Extraire l'image si disponible
image_elem = link_element.find('img')
image_url = ""
if image_elem:
src = image_elem.get('src', '')
if src.startswith('/'):
image_url = f"https://www.cc-paysdelimours.fr{src}"
elif src.startswith('http'):
image_url = src
# Extraire le lieu (par défaut)
location = "Pays de Limours, France"
# Récupérer les détails supplémentaires depuis la page de l'événement
details = {}
if url:
details = self.fetch_event_details(url)
# Utiliser les coordonnées de la carte si disponibles
coordinates = self.get_coordinates_for_location(location)
if details.get("coordinates"):
coordinates = details["coordinates"]
logger.info(f"📍 Coordonnées précises utilisées: {coordinates}")
# Utiliser l'adresse détaillée si disponible
if details.get("address"):
location = details["address"]
logger.info(f"📍 Adresse détaillée: {location}")
# Générer un ID unique
event_id = self.generate_event_id(title, date_text, location)
# Construire les propriétés de contact (seulement si non vides)
contact_properties = {}
if details.get("contact_phone") and details["contact_phone"].strip():
contact_properties["contact:phone"] = details["contact_phone"]
if details.get("contact_email") and details["contact_email"].strip():
contact_properties["contact:email"] = details["contact_email"]
if details.get("website") and details["website"].strip():
contact_properties["contact:website"] = details["website"]
# Construire la description enrichie
description = f"Événement organisé par la CCPL - {title}"
if details.get("description"):
description = details["description"]
# Ajouter les informations d'ouverture et de tarifs
additional_info = []
if details.get("opening_hours"):
additional_info.append(f"Ouverture: {details['opening_hours']}")
if details.get("pricing"):
additional_info.append(f"Tarifs: {details['pricing']}")
if additional_info:
description += "\n\n" + "\n".join(additional_info)
# Créer l'événement au format OEDB
properties = {
"label": title,
"description": description,
"type": "scheduled",
"what": "culture.community",
"where": location,
"start": self.parse_date(date_text),
"stop": self.parse_date(date_text, end=True),
"source:name": "CCPL Agenda",
"source:url": self.agenda_url,
"last_modified_by": "ccpl_agenda_scraper",
"tags": ["ccpl", "pays-de-limours", "événement-communal"]
}
# Ajouter les propriétés optionnelles seulement si elles ne sont pas nulles
if url and url.strip():
properties["url"] = url
if image_url and image_url.strip():
properties["image"] = image_url
# Ajouter les propriétés de contact
properties.update(contact_properties)
oedb_event = {
"properties": properties,
"geometry": {
"type": "Point",
"coordinates": coordinates
}
}
return {
"id": event_id,
"event": oedb_event,
"raw_html": {
"title": title,
"date": date_text,
"location": location,
"url": url,
"image": image_url
}
}
except Exception as e:
logger.error(f"Erreur lors de l'extraction de l'événement depuis le lien: {e}")
return None
def fetch_event_details(self, event_url: str) -> Dict:
"""Récupère les détails supplémentaires depuis la page de l'événement"""
try:
logger.info(f"🔍 Récupération des détails: {event_url}")
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
response = requests.get(event_url, headers=headers, timeout=30)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
details = {
"description": "",
"contact_phone": "",
"contact_email": "",
"website": "",
"coordinates": None,
"address": "",
"opening_hours": "",
"pricing": ""
}
# Extraire la description principale
description_elem = soup.find('div', class_=re.compile(r'content|description|text', re.I))
if description_elem:
# Nettoyer le texte de la description
description_text = description_elem.get_text(strip=True)
# Enlever les "Offres liées" et autres sections non pertinentes
lines = description_text.split('\n')
cleaned_lines = []
skip_section = False
for line in lines:
line = line.strip()
if not line:
continue
if 'Offres liées' in line or 'TOUT L\'AGENDA' in line:
skip_section = True
break
if 'Partager sur' in line:
break
cleaned_lines.append(line)
details["description"] = ' '.join(cleaned_lines)
# Extraire les informations de contact depuis toute la page
page_text = soup.get_text()
# Téléphone (format français)
phone_match = re.search(r'(\d{2}\s?\d{2}\s?\d{2}\s?\d{2}\s?\d{2})', page_text)
if phone_match:
details["contact_phone"] = phone_match.group(1).replace(' ', '')
# Email
email_match = re.search(r'([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})', page_text)
if email_match:
email = email_match.group(1).strip()
# Nettoyer l'email (enlever les caractères parasites à la fin, notamment le T majuscule)
email = re.sub(r'[^a-zA-Z0-9._%+-@]+$', '', email)
# Enlever spécifiquement le T majuscule à la fin
if email.endswith('T'):
email = email[:-1]
details["contact_email"] = email
# Site web (éviter les liens de partage social)
website_links = soup.find_all('a', href=True)
for link in website_links:
href = link['href']
if (href.startswith('http') and
'facebook.com' not in href and
'twitter.com' not in href and
'linkedin.com' not in href and
'viadeo.com' not in href and
'x.com' not in href and
'instagram.com' not in href and
'tiktok.com' not in href and
'youtube.com' not in href and
'vimeo.com' not in href and
'soundcloud.com' not in href and
'spotify.com' not in href and
'deezer.com' not in href and
'apple.com' not in href and
'google.com' not in href and
'microsoft.com' not in href and
'amazon.com' not in href and
'sharer' not in href):
details["website"] = href
break
# Extraire l'adresse
address_elem = soup.find(text=re.compile(r'Place|Rue|Avenue|Boulevard', re.I))
if address_elem:
# Trouver l'élément parent qui contient l'adresse complète
parent = address_elem.parent
while parent and len(parent.get_text(strip=True)) < 20:
parent = parent.parent
if parent:
details["address"] = parent.get_text(strip=True)
# Extraire les coordonnées depuis la carte Leaflet
# Chercher les scripts qui contiennent les coordonnées de la carte
scripts = soup.find_all('script')
for script in scripts:
if script.string:
# Chercher les coordonnées dans les scripts Leaflet avec différents patterns
patterns = [
r'lat["\']?\s*:\s*([0-9.-]+).*?lng["\']?\s*:\s*([0-9.-]+)',
r'latitude["\']?\s*:\s*([0-9.-]+).*?longitude["\']?\s*:\s*([0-9.-]+)',
r'center["\']?\s*:\s*\[([0-9.-]+),\s*([0-9.-]+)\]',
r'lat["\']?\s*:\s*([0-9.-]+).*?lon["\']?\s*:\s*([0-9.-]+)',
r'([0-9]{1,2}\.[0-9]+),\s*([0-9]{1,2}\.[0-9]+)'
]
for pattern in patterns:
coord_match = re.search(pattern, script.string)
if coord_match:
try:
lat = float(coord_match.group(1))
lng = float(coord_match.group(2))
# Vérifier que les coordonnées sont dans une plage valide pour la France
if 41 <= lat <= 52 and -6 <= lng <= 10:
details["coordinates"] = [lng, lat] # Format GeoJSON [longitude, latitude]
logger.info(f"📍 Coordonnées trouvées: {lat}, {lng}")
break
except ValueError:
continue
if details["coordinates"]:
break
# Extraire les horaires d'ouverture
opening_elem = soup.find(text=re.compile(r'Du.*au.*tous les jours|Ouverture|Horaires', re.I))
if opening_elem:
parent = opening_elem.parent
if parent:
details["opening_hours"] = parent.get_text(strip=True)
# Extraire les tarifs
pricing_elem = soup.find(text=re.compile(r'Gratuit|Tarifs|Prix', re.I))
if pricing_elem:
parent = pricing_elem.parent
if parent:
details["pricing"] = parent.get_text(strip=True)
logger.info(f"📋 Détails extraits: {len(details['description'])} caractères, tel: {details['contact_phone']}, email: {details['contact_email']}")
return details
except Exception as e:
logger.warning(f"Erreur lors de la récupération des détails de {event_url}: {e}")
return {
"description": "",
"contact_phone": "",
"contact_email": "",
"website": "",
"coordinates": None,
"address": "",
"opening_hours": "",
"pricing": ""
}
def extract_event_data(self, element, index: int) -> Optional[Dict]:
"""Extrait les données d'un événement depuis un élément HTML"""
try:
# Obtenir tout le texte de l'élément
full_text = element.get_text(strip=True)
# Extraire la date
date_text = ""
date_match = re.search(r'\b(\d{1,2})\s+(jan|fév|mar|avr|mai|jun|jul|aoû|sep|oct|nov|déc)\b', full_text, re.I)
if date_match:
date_text = f"{date_match.group(1)} {date_match.group(2)}"
# Extraire le titre (première ligne significative après la date)
lines = [line.strip() for line in full_text.split('\n') if line.strip()]
title = f"Événement {index + 1}"
# Chercher le titre dans les lignes
for line in lines:
if line and not re.match(r'^\d{1,2}\s+(jan|fév|mar|avr|mai|jun|jul|aoû|sep|oct|nov|déc)', line, re.I):
title = line[:100] # Limiter la longueur
break
# Extraire le lieu
location = "Pays de Limours, France" # Lieu par défaut
communes = ['Angervilliers', 'Fontenay-lès-Briis', 'Forges-les-Bains', 'Gometz-la-Ville',
'Les Molières', 'Limours', 'Saint-Maurice-Montcouronne', 'Vaugrigneuse']
for commune in communes:
if commune.lower() in full_text.lower():
location = f"{commune}, Pays de Limours, France"
break
# Extraire la description (texte complet sans la date)
description = full_text
if date_text:
description = description.replace(date_text, '').strip()
# Nettoyer la description
description = re.sub(r'\s+', ' ', description).strip()
if len(description) > 200:
description = description[:200] + "..."
# Extraire l'URL si disponible
url = ""
link_elem = element.find('a', href=True)
if link_elem:
href = link_elem['href']
if href.startswith('/'):
url = f"https://www.cc-paysdelimours.fr{href}"
elif href.startswith('http'):
url = href
# Générer un ID unique
event_id = self.generate_event_id(title, date_text, location)
# Créer l'événement au format OEDB
oedb_event = {
"properties": {
"label": title,
"description": description,
"type": "scheduled",
"what": "culture.community", # Type pour événements communautaires
"where": location,
"start": self.parse_date(date_text),
"stop": self.parse_date(date_text, end=True),
"url": url if url else None,
"source:name": "CCPL Agenda",
"source:url": self.agenda_url,
"last_modified_by": "ccpl_agenda_scraper",
"tags": ["ccpl", "pays-de-limours", "événement-communal"]
},
"geometry": {
"type": "Point",
"coordinates": self.get_coordinates_for_location(location)
}
}
return {
"id": event_id,
"event": oedb_event,
"raw_html": {
"title": title,
"date": date_text,
"location": location,
"description": description,
"url": url
}
}
except Exception as e:
logger.error(f"Erreur lors de l'extraction de l'événement: {e}")
return None
def parse_date(self, date_text: str, end: bool = False) -> str:
"""Parse une date française et la convertit en format ISO"""
try:
if not date_text:
# Date par défaut si pas de date trouvée
now = datetime.now()
if end:
return (now + timedelta(hours=2)).isoformat()
return now.isoformat()
# Mapping des mois français
months = {
'jan': '01', 'fév': '02', 'mar': '03', 'avr': '04', 'mai': '05', 'jun': '06',
'jul': '07', 'aoû': '08', 'sep': '09', 'oct': '10', 'nov': '11', 'déc': '12'
}
# Extraire jour et mois
match = re.search(r'(\d{1,2})\s+(\w{3})', date_text.lower())
if match:
day = match.group(1).zfill(2)
month_abbr = match.group(2)
month = months.get(month_abbr, '01')
# Utiliser l'année courante
year = datetime.now().year
# Créer la date
date_obj = datetime.strptime(f"{year}-{month}-{day}", "%Y-%m-%d")
if end:
# Date de fin: ajouter 2 heures
date_obj += timedelta(hours=2)
return date_obj.isoformat()
# Fallback: date actuelle
now = datetime.now()
if end:
return (now + timedelta(hours=2)).isoformat()
return now.isoformat()
except Exception as e:
logger.warning(f"Erreur lors du parsing de la date '{date_text}': {e}")
now = datetime.now()
if end:
return (now + timedelta(hours=2)).isoformat()
return now.isoformat()
def get_coordinates_for_location(self, location: str) -> List[float]:
"""Obtient les coordonnées pour un lieu du Pays de Limours"""
# Coordonnées approximatives pour les communes du Pays de Limours
coordinates = {
"Angervilliers": [2.0644, 48.5917],
"Fontenay-lès-Briis": [2.0644, 48.5917],
"Forges-les-Bains": [2.0644, 48.5917],
"Gometz-la-Ville": [2.0644, 48.5917],
"Les Molières": [2.0644, 48.5917],
"Limours": [2.0644, 48.5917],
"Saint-Maurice-Montcouronne": [2.0644, 48.5917],
"Vaugrigneuse": [2.0644, 48.5917]
}
for commune, coords in coordinates.items():
if commune.lower() in location.lower():
return coords
# Coordonnées par défaut pour Limours (centre du Pays de Limours)
return [2.0644, 48.5917]
def generate_event_id(self, title: str, date: str, location: str) -> str:
"""Génère un ID unique pour l'événement"""
import hashlib
content = f"{title}_{date}_{location}"
return hashlib.md5(content.encode('utf-8')).hexdigest()
def log_event_details(self, event_data: Dict):
"""Affiche les détails de l'événement dans les logs"""
props = event_data["event"]["properties"]
geom = event_data["event"]["geometry"]
logger.info("📝 Détails de l'événement à insérer:")
logger.info(json.dumps(event_data, ensure_ascii=False, indent=2))
# logger.info(f" ID: {event_data['id']}")
# logger.info(f" Titre: {props.get('label', 'N/A')}")
# logger.info(f" Description: {props.get('description', 'N/A')[:100]}{'...' if len(props.get('description', '')) > 100 else ''}")
# logger.info(f" Type: {props.get('type', 'N/A')}")
# logger.info(f" Catégorie: {props.get('what', 'N/A')}")
# logger.info(f" Lieu: {props.get('where', 'N/A')}")
# logger.info(f" Début: {props.get('start', 'N/A')}")
# logger.info(f" Fin: {props.get('stop', 'N/A')}")
# logger.info(f" URL: {props.get('url', 'N/A')}")
# logger.info(f" Source: {props.get('source:name', 'N/A')}")
# logger.info(f" Coordonnées: {geom.get('coordinates', 'N/A')}")
# logger.info(f" Tags: {', '.join(props.get('tags', [])) if props.get('tags') else 'N/A'}")
# logger.info(f" Modifié par: {props.get('last_modified_by', 'N/A')}")
# Afficher les nouvelles propriétés de contact (seulement si présentes)
if props.get('contact:phone'):
logger.info(f" 📞 Téléphone: {props.get('contact:phone')}")
if props.get('contact:email'):
logger.info(f" 📧 Email: {props.get('contact:email')}")
if props.get('contact:website'):
logger.info(f" 🌐 Site web: {props.get('contact:website')}")
if props.get('image'):
logger.info(f" 🖼️ Image: {props.get('image')}")
if props.get('url'):
logger.info(f" 🔗 URL: {props.get('url')}")
def send_event_to_api(self, event_data: Dict) -> Tuple[bool, str]:
"""Envoie un événement à l'API OEDB (ou simule en mode dry-run)"""
# Log détaillé de l'événement
self.log_event_details(event_data)
if self.dry_run:
logger.info(f"[DRY-RUN] Simulation d'envoi de l'événement: {event_data['event']['properties']['label']}")
return True, "Simulé (dry-run)"
try:
url = f"{self.api_base_url}/event"
headers = {"Content-Type": "application/json"}
# Formater l'événement au format GeoJSON attendu par l'API
geojson_event = {
"type": "Feature",
"geometry": event_data["event"]["geometry"],
"properties": event_data["event"]["properties"]
}
logger.info(f"🌐 Envoi à l'API: {url}")
response = requests.post(url, json=geojson_event, headers=headers, timeout=30)
if response.status_code == 201:
logger.info("✅ Événement créé avec succès dans l'API")
return True, "Créé avec succès"
elif response.status_code == 409:
logger.warning("⚠️ Événement déjà existant dans l'API")
return False, "Événement déjà existant"
else:
logger.error(f"❌ Erreur API: {response.status_code} - {response.text}")
return False, f"Erreur API: {response.status_code} - {response.text}"
except requests.RequestException as e:
logger.error(f"❌ Erreur de connexion: {e}")
return False, f"Erreur de connexion: {e}"
except Exception as e:
logger.error(f"❌ Erreur inattendue: {e}")
return False, f"Erreur inattendue: {e}"
def process_single_event(self, event_data: Dict) -> Tuple[str, bool, str]:
"""Traite un événement individuellement (thread-safe)"""
event_id = event_data["id"]
event_label = event_data["event"]["properties"]["label"]
try:
# Vérifier si l'événement a déjà été traité avec succès
if event_id in self.events_data["events"]:
event_status = self.events_data["events"][event_id].get("status", "unknown")
if event_status in ["saved", "already_exists"]:
logger.info(f" Événement déjà traité: {event_label}")
return event_id, True, "Déjà traité"
# Envoyer à l'API
success, message = self.send_event_to_api(event_data)
return event_id, success, message
except Exception as e:
logger.error(f"❌ Erreur lors du traitement de {event_label}: {e}")
return event_id, False, f"Erreur: {e}"
def process_events(self, events: List[Dict]) -> Dict:
"""Traite tous les événements"""
stats = {
"total_events": len(events),
"new_events": 0,
"already_saved": 0,
"api_errors": 0,
"parse_errors": 0,
"sent_this_run": 0,
"skipped_due_to_limit": 0
}
if not events:
logger.info(" Aucun événement à traiter")
return stats
# Appliquer la limite d'événements
if self.max_events:
events = events[:self.max_events]
if len(events) < stats["total_events"]:
stats["skipped_due_to_limit"] = stats["total_events"] - len(events)
# Traiter les événements
if self.parallel and len(events) > 10:
logger.info(f"🚀 Traitement parallèle de {len(events)} événements avec {self.max_workers} workers")
if self.max_events:
logger.info(f"Limite d'événements: {self.max_events}")
if self.dry_run:
logger.info("Mode DRY-RUN activé - aucun événement ne sera envoyé à l'API")
# Traitement parallèle
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# Soumettre tous les événements
future_to_event = {
executor.submit(self.process_single_event, event_data): event_data
for event_data in events
}
# Traiter les résultats au fur et à mesure
for future in as_completed(future_to_event):
event_data = future_to_event[future]
event_id, success, message = future.result()
event_label = event_data["event"]["properties"]["label"]
# Mettre à jour les statistiques et les données locales
if success:
if "déjà traité" in message.lower():
stats["already_saved"] += 1
else:
stats["new_events"] += 1
stats["sent_this_run"] += 1
self.events_data["events"][event_id] = {
"status": "saved" if "déjà traité" not in message.lower() else "already_exists",
"message": message,
"last_attempt": datetime.now().isoformat(),
"event": event_data["event"]
}
# Ajouter au cache des événements traités
self.cache_data["processed_events"][event_id] = {
"processed_at": datetime.now().isoformat(),
"status": "saved" if "déjà traité" not in message.lower() else "already_exists",
"event_label": event_label
}
logger.info(f"{event_label} - {message}")
else:
stats["api_errors"] += 1
self.events_data["events"][event_id] = {
"status": "api_error",
"message": message,
"last_attempt": datetime.now().isoformat(),
"event": event_data["event"]
}
logger.error(f"{event_label} - {message}")
# Sauvegarder les données après chaque événement
self.save_events_data()
self.save_cache_data()
else:
# Traitement séquentiel (mode original)
logger.info(f"Traitement séquentiel de {len(events)} événements")
if self.max_events:
logger.info(f"Limite d'événements: {self.max_events}")
if self.dry_run:
logger.info("Mode DRY-RUN activé - aucun événement ne sera envoyé à l'API")
for event_data in events:
event_id, success, message = self.process_single_event(event_data)
event_label = event_data["event"]["properties"]["label"]
# Mettre à jour les statistiques et les données locales
if success:
if "déjà traité" in message.lower():
stats["already_saved"] += 1
else:
stats["new_events"] += 1
stats["sent_this_run"] += 1
self.events_data["events"][event_id] = {
"status": "saved" if "déjà traité" not in message.lower() else "already_exists",
"message": message,
"last_attempt": datetime.now().isoformat(),
"event": event_data["event"]
}
# Ajouter au cache des événements traités
self.cache_data["processed_events"][event_id] = {
"processed_at": datetime.now().isoformat(),
"status": "saved" if "déjà traité" not in message.lower() else "already_exists",
"event_label": event_label
}
logger.info(f"{event_label} - {message}")
else:
stats["api_errors"] += 1
self.events_data["events"][event_id] = {
"status": "api_error",
"message": message,
"last_attempt": datetime.now().isoformat(),
"event": event_data["event"]
}
logger.error(f"{event_label} - {message}")
# Sauvegarder les données après chaque événement
self.save_events_data()
self.save_cache_data()
# Mettre à jour la date de dernière mise à jour
self.events_data["last_update"] = datetime.now().isoformat()
# Sauvegarder le cache
self.save_cache_data()
return stats
def run(self, force_refresh: bool = False):
"""Exécute le scraping complet"""
logger.info("🚀 Démarrage du scraping de l'agenda CCPL")
logger.info(f"Configuration: batch_size={self.batch_size}, api_url={self.api_base_url}")
logger.info(f"Mode dry-run: {'OUI' if self.dry_run else 'NON'}")
if self.max_events:
logger.info(f"Limite d'événements: {self.max_events}")
logger.info("=" * 60)
try:
# Récupérer les données de l'agenda
html_content = self.fetch_agenda_data()
if html_content is None and not force_refresh:
logger.info(" Utilisation du cache (pas de nouveau contenu)")
return
# Parser les événements
events = self.parse_agenda_html(html_content) if html_content else []
if not events:
logger.warning("⚠️ Aucun événement trouvé dans l'agenda")
return
logger.info(f"Traitement de {len(events)} événements")
# Traiter les événements
stats = self.process_events(events)
# Afficher les statistiques finales
logger.info("📊 Statistiques finales:")
for key, value in stats.items():
logger.info(f" {key}: {value}")
logger.info("✅ Scraping terminé avec succès")
except Exception as e:
logger.error(f"❌ Erreur lors du scraping: {e}")
raise
def main():
parser = argparse.ArgumentParser(description="Scraper pour l'agenda CCPL")
parser.add_argument("--api-url", default=api_oedb,
help=f"URL de base de l'API OEDB (défaut: {api_oedb})")
parser.add_argument("--batch-size", type=int, default=1,
help="Nombre d'événements à envoyer par batch (défaut: 1)")
parser.add_argument("--max-events", type=int, default=1,
help="Limiter le nombre d'événements à traiter (défaut: 1)")
parser.add_argument("--dry-run", action="store_true", default=True,
help="Mode dry-run par défaut (simulation sans envoi à l'API)")
parser.add_argument("--no-dry-run", action="store_true",
help="Désactiver le mode dry-run (envoi réel à l'API)")
parser.add_argument("--verbose", "-v", action="store_true",
help="Mode verbeux")
parser.add_argument("--force-refresh", "-f", action="store_true",
help="Forcer le rechargement de l'agenda (ignorer le cache)")
parser.add_argument("--cache-duration", type=int, default=1,
help="Durée de validité du cache en heures (défaut: 1)")
parser.add_argument("--parallel", action="store_true",
help="Activer le traitement parallèle pour plus de 10 événements")
parser.add_argument("--max-workers", type=int, default=4,
help="Nombre maximum de workers pour le traitement parallèle (défaut: 4)")
args = parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
# Déterminer le mode dry-run
dry_run = args.dry_run and not args.no_dry_run
# Créer et exécuter le scraper
scraper = CCPLAgendaScraper(
api_base_url=args.api_url,
batch_size=args.batch_size,
max_events=args.max_events,
dry_run=dry_run,
parallel=args.parallel,
max_workers=args.max_workers
)
# Modifier la durée de cache si spécifiée
scraper.cache_duration_hours = args.cache_duration
# Exécuter avec ou sans rechargement forcé
scraper.run(force_refresh=args.force_refresh)
if __name__ == "__main__":
main()

View file

@ -0,0 +1,83 @@
#!/usr/bin/env python3
"""
Script de debug pour analyser la structure HTML de l'agenda CCPL
"""
import requests
from bs4 import BeautifulSoup
import re
def debug_html_structure():
"""Analyse la structure HTML de l'agenda CCPL"""
url = "https://www.cc-paysdelimours.fr/agenda"
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
try:
print(f"🌐 Récupération de: {url}")
response = requests.get(url, headers=headers, timeout=30)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
print(f"📄 Taille du HTML: {len(response.text)} caractères")
# Chercher tous les éléments qui contiennent des dates
date_pattern = re.compile(r'\b\d{1,2}\s+(jan|fév|mar|avr|mai|jun|jul|aoû|sep|oct|nov|déc)\b', re.I)
date_elements = soup.find_all(string=date_pattern)
print(f"📅 Éléments avec dates trouvés: {len(date_elements)}")
# Afficher les premiers éléments avec dates
for i, elem in enumerate(date_elements[:5]):
print(f" {i+1}. {elem.strip()}")
print(f" Parent: {elem.parent.name if elem.parent else 'None'}")
print(f" Classes: {elem.parent.get('class', []) if elem.parent else 'None'}")
print()
# Chercher des patterns spécifiques
print("🔍 Recherche de patterns spécifiques:")
# Chercher des éléments avec des classes communes
common_classes = ['event', 'agenda', 'manifestation', 'item', 'card', 'content']
for class_name in common_classes:
elements = soup.find_all(class_=re.compile(class_name, re.I))
print(f" Classe '{class_name}': {len(elements)} éléments")
# Chercher des éléments avec du texte contenant des dates
all_elements = soup.find_all(['div', 'article', 'li', 'p', 'span'])
elements_with_dates = []
for elem in all_elements:
text = elem.get_text()
if date_pattern.search(text) and len(text) > 10:
elements_with_dates.append((elem, text[:100]))
print(f"📋 Éléments avec dates et texte significatif: {len(elements_with_dates)}")
# Afficher les premiers éléments
for i, (elem, text) in enumerate(elements_with_dates[:3]):
print(f" {i+1}. Tag: {elem.name}, Classes: {elem.get('class', [])}")
print(f" Texte: {text}...")
print()
# Chercher des liens
links = soup.find_all('a', href=True)
print(f"🔗 Liens trouvés: {len(links)}")
# Afficher quelques liens
for i, link in enumerate(links[:5]):
print(f" {i+1}. {link.get('href')} - {link.get_text()[:50]}...")
# Sauvegarder le HTML pour inspection
with open('ccpl_debug.html', 'w', encoding='utf-8') as f:
f.write(response.text)
print("💾 HTML sauvegardé dans ccpl_debug.html")
except Exception as e:
print(f"❌ Erreur: {e}")
if __name__ == "__main__":
debug_html_structure()

View file

@ -13,9 +13,11 @@ from datetime import datetime
import hashlib
class DemoAgendaDuLibreScraper:
def __init__(self, max_events=None, dry_run=True):
def __init__(self, max_events=None, dry_run=True, parallel=False, max_workers=4):
self.max_events = max_events
self.dry_run = dry_run
self.parallel = parallel
self.max_workers = max_workers
self.cache_file = "demo_agendadulibre_cache.json"
self.events_file = "demo_agendadulibre_events.json"
@ -581,6 +583,11 @@ def main():
scraper3 = DemoAgendaDuLibreScraper(max_events=2, dry_run=False)
scraper3.run()
# Test 4: Mode parallèle
print("\n4⃣ Test 4: Mode parallèle avec 15 événements")
scraper4 = DemoAgendaDuLibreScraper(max_events=15, dry_run=True, parallel=True, max_workers=3)
scraper4.run()
print("\n🎉 Toutes les démonstrations sont terminées !")
print("\nFonctionnalités démontrées:")
print("✅ Cache JSON intelligent")
@ -588,6 +595,7 @@ def main():
print("✅ Mode dry-run par défaut")
print("✅ Détection de changements de contenu")
print("✅ Suivi des événements traités")
print("✅ Traitement parallèle")
if __name__ == "__main__":
main()

View file

@ -0,0 +1,41 @@
#!/usr/bin/env python3
"""
Script de démonstration pour le scraper CCPL Agenda
"""
import sys
import os
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from ccpl_agenda import CCPLAgendaScraper
def main():
print("🧪 Démonstration du scraper CCPL Agenda")
print("=" * 50)
# Test 1: Mode dry-run avec limite de 1 événement
print("\n1⃣ Test 1: Mode dry-run avec limite de 1 événement")
scraper1 = CCPLAgendaScraper(max_events=1, dry_run=True)
scraper1.run()
print("\n2⃣ Test 2: Mode dry-run avec limite de 3 événements")
scraper2 = CCPLAgendaScraper(max_events=3, dry_run=True)
scraper2.run()
print("\n3⃣ Test 3: Mode parallèle avec 5 événements")
scraper3 = CCPLAgendaScraper(max_events=5, dry_run=True, parallel=True, max_workers=2)
scraper3.run()
print("\n🎉 Toutes les démonstrations sont terminées !")
print("\nFonctionnalités démontrées:")
print("✅ Scraping HTML de l'agenda CCPL")
print("✅ Cache JSON intelligent")
print("✅ Limitation du nombre d'événements")
print("✅ Mode dry-run par défaut")
print("✅ Détection de changements de contenu")
print("✅ Suivi des événements traités")
print("✅ Traitement parallèle")
print("✅ Extraction des métadonnées (titre, date, URL, image)")
if __name__ == "__main__":
main()

View file

@ -0,0 +1,3 @@
requests>=2.25.0
beautifulsoup4>=4.9.0
lxml>=4.6.0