oedb-backend/extractors/sncf_travaux.py

338 lines
11 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""
SNCF Travaux Extractor for the OpenEventDatabase.
This script fetches railway work schedules from the SNCF open data API
and adds them to the OpenEventDatabase.
2025-09-18 22:18:25 +02:00
API URL: https://data.sncf.com/api/explore/v2.1/catalog/datasets/interceptions-programmees-sur-ligne/records?limit=100
Example data format:
{
"total_count":482,
"results":[
{
"lib_structdem":"Siège INFRAPOLE PACA",
"cod_ligne":"830000",
"lib_ligne":"Ligne de Paris-Lyon à Marseille-St-Charles",
"pk_debm":"687000",
"pk_finm":"862100",
"familletravaux":"renouvellement de la signalisation",
"nb_interventions":1,
"num_semaine":"31",
"annee":"2023"
}
]
}
To get a start date, we combine the "annee" (year) and "num_semaine" (week number) fields.
"""
import json
import requests
import datetime
import sys
import os
from datetime import datetime, timedelta
# Add the parent directory to the path so we can import from oedb
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from oedb.utils.db import db_connect
from oedb.utils.logging import logger
# API URL for SNCF open data
2025-09-18 22:18:25 +02:00
API_URL = "https://data.sncf.com/api/explore/v2.1/catalog/datasets/interceptions-programmees-sur-ligne/records?limit=100"
def fetch_sncf_data():
"""
Fetch railway work planning data from the SNCF open data API.
Returns:
list: A list of railway work records.
"""
logger.info("Fetching data from SNCF open data API")
try:
response = requests.get(API_URL)
response.raise_for_status() # Raise an exception for HTTP errors
data = response.json()
if 'results' not in data:
logger.error("No results found in API response")
return []
logger.success(f"Successfully fetched {len(data['results'])} records from SNCF API")
return data['results']
except requests.exceptions.RequestException as e:
logger.error(f"Error fetching data from SNCF API: {e}")
return []
except json.JSONDecodeError as e:
logger.error(f"Error decoding JSON response: {e}")
return []
def week_to_date(year, week_number):
"""
Convert a year and week number to a date.
Args:
year (str or int): The year.
week_number (str or int): The week number (1-53).
Returns:
tuple: A tuple containing (start_date, end_date) as ISO format strings.
"""
try:
# Convert inputs to integers
year = int(year)
week_number = int(week_number)
# Validate inputs
if week_number < 1 or week_number > 53:
logger.warning(f"Invalid week number: {week_number}, using week 1 instead")
week_number = 1
# Calculate the date of the first day of the week (Monday)
# The %G and %V format codes are used for ISO week date
# %G is the ISO year number and %V is the ISO week number
start_date = datetime.strptime(f'{year}-{week_number}-1', '%Y-%W-%w')
# If the week number is 0, it means the first partial week of the year
if week_number == 0:
start_date = datetime(year, 1, 1)
# Calculate the end date (Sunday of the same week)
end_date = start_date + timedelta(days=6)
# Format dates as ISO strings
start_iso = start_date.isoformat()
end_iso = end_date.isoformat()
return (start_iso, end_iso)
except ValueError as e:
logger.error(f"Error converting week to date: {e}")
# Return default dates (first week of the year)
default_start = datetime(int(year), 1, 1).isoformat()
default_end = (datetime(int(year), 1, 1) + timedelta(days=6)).isoformat()
return (default_start, default_end)
except Exception as e:
logger.error(f"Unexpected error converting week to date: {e}")
# Return default dates (current date)
now = datetime.now()
return (now.isoformat(), (now + timedelta(days=7)).isoformat())
def create_event(record):
"""
Create an event object from a SNCF record.
Args:
record: A record from the SNCF API.
Returns:
dict: A GeoJSON Feature representing the event.
"""
try:
# Extract data from the record
structure = record.get('lib_structdem', 'Unknown Structure')
line_code = record.get('cod_ligne', 'Unknown Line Code')
line_name = record.get('lib_ligne', 'Unknown Line')
start_point = record.get('pk_debm', '')
end_point = record.get('pk_finm', '')
work_type = record.get('familletravaux', 'Unknown Work Type')
interventions = record.get('nb_interventions', 1)
# Extract year and week number
year = record.get('annee')
week_number = record.get('num_semaine')
if not year or not week_number:
logger.warning(f"Missing year or week number for {line_name}, skipping")
return None
# Convert week number to start and end dates
start_date, end_date = week_to_date(year, week_number)
# Create a descriptive label
label = f"Railway Work: {line_name} - {work_type}"
# Create a description with more details
description = (
f"Railway maintenance work on line {line_code} ({line_name}) "
f"from kilometer point {start_point} to {end_point}. "
f"Type of work: {work_type}. "
f"Number of interventions: {interventions}. "
f"Managed by: {structure}."
)
# Use a placeholder location in France
# In a real implementation, you might want to geocode the line or use a predefined location
coordinates = [2.2137, 46.2276] # Center of France
# Create the event object
event = {
"type": "Feature",
"geometry": {
"type": "Point",
"coordinates": coordinates
},
"properties": {
"type": "scheduled",
"what": "transport.railway.maintenance",
"what:series": "SNCF Railway Maintenance",
"where": line_name,
"label": label,
"description": description,
"start": start_date,
"stop": end_date,
"line_code": line_code,
"work_type": work_type,
"interventions": interventions,
"start_point": start_point,
"end_point": end_point,
"structure": structure,
"source": "SNCF Open Data"
}
}
return event
except Exception as e:
logger.error(f"Error creating event from record: {e}")
return None
def submit_event(event):
"""
Submit an event to the OpenEventDatabase.
Args:
event: A GeoJSON Feature representing the event.
Returns:
bool: True if the event was successfully submitted, False otherwise.
"""
try:
# Connect to the database
db = db_connect()
cur = db.cursor()
# Extract event properties
properties = event['properties']
geometry = json.dumps(event['geometry'])
# Insert the geometry into the geo table
cur.execute("""
INSERT INTO geo
SELECT geom, md5(st_astext(geom)) as hash, st_centroid(geom) as geom_center FROM
(SELECT st_setsrid(st_geomfromgeojson(%s),4326) as geom) as g
WHERE ST_IsValid(geom)
ON CONFLICT DO NOTHING RETURNING hash;
""", (geometry,))
# Get the geometry hash
hash_result = cur.fetchone()
if hash_result is None:
# If the hash is None, get it from the database
cur.execute("""
SELECT md5(st_asewkt(geom)),
ST_IsValid(geom),
ST_IsValidReason(geom) from (SELECT st_geomfromgeojson(%s) as geom) as g;
""", (geometry,))
hash_result = cur.fetchone()
if hash_result is None or (len(hash_result) > 1 and not hash_result[1]):
logger.error(f"Invalid geometry for event: {properties.get('label')}")
db.close()
return False
geo_hash = hash_result[0]
# Determine the bounds for the time range
bounds = '[]' if properties['start'] == properties['stop'] else '[)'
# Insert the event into the database
cur.execute("""
INSERT INTO events (events_type, events_what, events_when, events_tags, events_geo)
VALUES (%s, %s, tstzrange(%s, %s, %s), %s, %s)
ON CONFLICT DO NOTHING RETURNING events_id;
""", (
properties['type'],
properties['what'],
properties['start'],
properties['stop'],
bounds,
json.dumps(properties),
geo_hash
))
# Get the event ID
event_id = cur.fetchone()
if event_id:
logger.success(f"Event created with ID: {event_id[0]}")
db.commit()
db.close()
return True
else:
# Check if the event already exists
cur.execute("""
SELECT events_id FROM events
WHERE events_what = %s
AND events_when = tstzrange(%s, %s, %s)
AND events_geo = %s;
""", (
properties['what'],
properties['start'],
properties['stop'],
bounds,
geo_hash
))
existing_id = cur.fetchone()
if existing_id:
logger.info(f"Event already exists with ID: {existing_id[0]}")
else:
logger.warning(f"Failed to create event: {properties.get('label')}")
db.close()
return False
except Exception as e:
logger.error(f"Error submitting event: {e}")
return False
def main():
"""
Main function to fetch SNCF data and add events to the database.
"""
logger.info("Starting SNCF travaux extractor")
# Fetch data from the SNCF API
records = fetch_sncf_data()
if not records:
logger.warning("No records found, exiting")
return
# Process each record
success_count = 0
for record in records:
# Create an event from the record
event = create_event(record)
if not event:
continue
# Submit the event to the database
if submit_event(event):
success_count += 1
logger.success(f"Successfully added {success_count} out of {len(records)} events to the database")
if __name__ == "__main__":
main()