#!/usr/bin/env python3 """ SNCF Travaux Extractor for the OpenEventDatabase. This script fetches railway work schedules from the SNCF open data API and adds them to the OpenEventDatabase. API URL: https://data.sncf.com/api/explore/v2.1/catalog/datasets/interceptions-programmees-sur-ligne/records?limit=100 Example data format: { "total_count":482, "results":[ { "lib_structdem":"Siège INFRAPOLE PACA", "cod_ligne":"830000", "lib_ligne":"Ligne de Paris-Lyon à Marseille-St-Charles", "pk_debm":"687000", "pk_finm":"862100", "familletravaux":"renouvellement de la signalisation", "nb_interventions":1, "num_semaine":"31", "annee":"2023" } ] } To get a start date, we combine the "annee" (year) and "num_semaine" (week number) fields. """ import json import requests import datetime import sys import os from datetime import datetime, timedelta # Add the parent directory to the path so we can import from oedb sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) from oedb.utils.db import db_connect from oedb.utils.logging import logger # API URL for SNCF open data API_URL = "https://data.sncf.com/api/explore/v2.1/catalog/datasets/interceptions-programmees-sur-ligne/records?limit=100" def fetch_sncf_data(): """ Fetch railway work planning data from the SNCF open data API. Returns: list: A list of railway work records. """ logger.info("Fetching data from SNCF open data API") try: response = requests.get(API_URL) response.raise_for_status() # Raise an exception for HTTP errors data = response.json() if 'results' not in data: logger.error("No results found in API response") return [] logger.success(f"Successfully fetched {len(data['results'])} records from SNCF API") return data['results'] except requests.exceptions.RequestException as e: logger.error(f"Error fetching data from SNCF API: {e}") return [] except json.JSONDecodeError as e: logger.error(f"Error decoding JSON response: {e}") return [] def week_to_date(year, week_number): """ Convert a year and week number to a date. Args: year (str or int): The year. week_number (str or int): The week number (1-53). Returns: tuple: A tuple containing (start_date, end_date) as ISO format strings. """ try: # Convert inputs to integers year = int(year) week_number = int(week_number) # Validate inputs if week_number < 1 or week_number > 53: logger.warning(f"Invalid week number: {week_number}, using week 1 instead") week_number = 1 # Calculate the date of the first day of the week (Monday) # The %G and %V format codes are used for ISO week date # %G is the ISO year number and %V is the ISO week number start_date = datetime.strptime(f'{year}-{week_number}-1', '%Y-%W-%w') # If the week number is 0, it means the first partial week of the year if week_number == 0: start_date = datetime(year, 1, 1) # Calculate the end date (Sunday of the same week) end_date = start_date + timedelta(days=6) # Format dates as ISO strings start_iso = start_date.isoformat() end_iso = end_date.isoformat() return (start_iso, end_iso) except ValueError as e: logger.error(f"Error converting week to date: {e}") # Return default dates (first week of the year) default_start = datetime(int(year), 1, 1).isoformat() default_end = (datetime(int(year), 1, 1) + timedelta(days=6)).isoformat() return (default_start, default_end) except Exception as e: logger.error(f"Unexpected error converting week to date: {e}") # Return default dates (current date) now = datetime.now() return (now.isoformat(), (now + timedelta(days=7)).isoformat()) def create_event(record): """ Create an event object from a SNCF record. Args: record: A record from the SNCF API. Returns: dict: A GeoJSON Feature representing the event. """ try: # Extract data from the record structure = record.get('lib_structdem', 'Unknown Structure') line_code = record.get('cod_ligne', 'Unknown Line Code') line_name = record.get('lib_ligne', 'Unknown Line') start_point = record.get('pk_debm', '') end_point = record.get('pk_finm', '') work_type = record.get('familletravaux', 'Unknown Work Type') interventions = record.get('nb_interventions', 1) # Extract year and week number year = record.get('annee') week_number = record.get('num_semaine') if not year or not week_number: logger.warning(f"Missing year or week number for {line_name}, skipping") return None # Convert week number to start and end dates start_date, end_date = week_to_date(year, week_number) # Create a descriptive label label = f"Railway Work: {line_name} - {work_type}" # Create a description with more details description = ( f"Railway maintenance work on line {line_code} ({line_name}) " f"from kilometer point {start_point} to {end_point}. " f"Type of work: {work_type}. " f"Number of interventions: {interventions}. " f"Managed by: {structure}." ) # Use a placeholder location in France # In a real implementation, you might want to geocode the line or use a predefined location coordinates = [2.2137, 46.2276] # Center of France # Create the event object event = { "type": "Feature", "geometry": { "type": "Point", "coordinates": coordinates }, "properties": { "type": "scheduled", "what": "transport.railway.maintenance", "what:series": "SNCF Railway Maintenance", "where": line_name, "label": label, "description": description, "start": start_date, "stop": end_date, "line_code": line_code, "work_type": work_type, "interventions": interventions, "start_point": start_point, "end_point": end_point, "structure": structure, "source": "SNCF Open Data" } } return event except Exception as e: logger.error(f"Error creating event from record: {e}") return None def submit_event(event): """ Submit an event to the OpenEventDatabase. Args: event: A GeoJSON Feature representing the event. Returns: bool: True if the event was successfully submitted, False otherwise. """ try: # Connect to the database db = db_connect() cur = db.cursor() # Extract event properties properties = event['properties'] geometry = json.dumps(event['geometry']) # Insert the geometry into the geo table cur.execute(""" INSERT INTO geo SELECT geom, md5(st_astext(geom)) as hash, st_centroid(geom) as geom_center FROM (SELECT st_setsrid(st_geomfromgeojson(%s),4326) as geom) as g WHERE ST_IsValid(geom) ON CONFLICT DO NOTHING RETURNING hash; """, (geometry,)) # Get the geometry hash hash_result = cur.fetchone() if hash_result is None: # If the hash is None, get it from the database cur.execute(""" SELECT md5(st_asewkt(geom)), ST_IsValid(geom), ST_IsValidReason(geom) from (SELECT st_geomfromgeojson(%s) as geom) as g; """, (geometry,)) hash_result = cur.fetchone() if hash_result is None or (len(hash_result) > 1 and not hash_result[1]): logger.error(f"Invalid geometry for event: {properties.get('label')}") db.close() return False geo_hash = hash_result[0] # Determine the bounds for the time range bounds = '[]' if properties['start'] == properties['stop'] else '[)' # Insert the event into the database cur.execute(""" INSERT INTO events (events_type, events_what, events_when, events_tags, events_geo) VALUES (%s, %s, tstzrange(%s, %s, %s), %s, %s) ON CONFLICT DO NOTHING RETURNING events_id; """, ( properties['type'], properties['what'], properties['start'], properties['stop'], bounds, json.dumps(properties), geo_hash )) # Get the event ID event_id = cur.fetchone() if event_id: logger.success(f"Event created with ID: {event_id[0]}") db.commit() db.close() return True else: # Check if the event already exists cur.execute(""" SELECT events_id FROM events WHERE events_what = %s AND events_when = tstzrange(%s, %s, %s) AND events_geo = %s; """, ( properties['what'], properties['start'], properties['stop'], bounds, geo_hash )) existing_id = cur.fetchone() if existing_id: logger.info(f"Event already exists with ID: {existing_id[0]}") else: logger.warning(f"Failed to create event: {properties.get('label')}") db.close() return False except Exception as e: logger.error(f"Error submitting event: {e}") return False def main(): """ Main function to fetch SNCF data and add events to the database. """ logger.info("Starting SNCF travaux extractor") # Fetch data from the SNCF API records = fetch_sncf_data() if not records: logger.warning("No records found, exiting") return # Process each record success_count = 0 for record in records: # Create an event from the record event = create_event(record) if not event: continue # Submit the event to the database if submit_event(event): success_count += 1 logger.success(f"Successfully added {success_count} out of {len(records)} events to the database") if __name__ == "__main__": main()