add extractors, rate limit, demo submit form

This commit is contained in:
Tykayn 2025-09-16 00:46:09 +02:00 committed by tykayn
parent cc870323bf
commit 2157091778
12 changed files with 1612 additions and 14 deletions

View file

@ -0,0 +1,130 @@
# SNCF Travaux Extractor Implementation
## Overview
This document describes the implementation of the SNCF Travaux Extractor for the OpenEventDatabase. The extractor fetches railway work schedules from the SNCF open data API and adds them to the database as events.
## Implementation Details
The extractor is implemented in the file `extractors/sncf_travaux.py`. It consists of the following components:
1. **API Integration**: The extractor connects to the SNCF open data API to fetch railway work schedules.
2. **Date Conversion**: The extractor converts week numbers to dates, as the SNCF data provides the year and week number rather than explicit start and end dates.
3. **Event Creation**: The extractor creates event objects from the SNCF data, including all required properties for the OpenEventDatabase.
4. **Database Integration**: The extractor submits events to the OpenEventDatabase.
### Key Functions
#### `fetch_sncf_data()`
This function fetches railway work planning data from the SNCF open data API. It handles HTTP errors, JSON decoding errors, and checks if the response contains a 'results' field.
```python
def fetch_sncf_data():
"""
Fetch railway work planning data from the SNCF open data API.
Returns:
list: A list of railway work records.
"""
# Implementation details...
```
#### `week_to_date(year, week_number)`
This function converts a year and week number to a date range (start date and end date). It handles various input formats and edge cases.
```python
def week_to_date(year, week_number):
"""
Convert a year and week number to a date.
Args:
year (str or int): The year.
week_number (str or int): The week number (1-53).
Returns:
tuple: A tuple containing (start_date, end_date) as ISO format strings.
"""
# Implementation details...
```
#### `create_event(record)`
This function creates an event object from a SNCF record. It extracts relevant data from the record, converts the year and week number to start and end dates, and creates a GeoJSON Feature object with all the necessary properties.
```python
def create_event(record):
"""
Create an event object from a SNCF record.
Args:
record: A record from the SNCF API.
Returns:
dict: A GeoJSON Feature representing the event.
"""
# Implementation details...
```
#### `submit_event(event)`
This function submits an event to the OpenEventDatabase. It connects to the database, inserts the geometry and event data, and handles various error cases.
```python
def submit_event(event):
"""
Submit an event to the OpenEventDatabase.
Args:
event: A GeoJSON Feature representing the event.
Returns:
bool: True if the event was successfully submitted, False otherwise.
"""
# Implementation details...
```
### Event Properties
The events created by the extractor include the following properties:
- `type`: "scheduled" (as these are planned railway works)
- `what`: "transport.railway.maintenance" (a descriptive category)
- `what:series`: "SNCF Railway Maintenance" (to group related events)
- `where`: The line name
- `label`: A descriptive label
- `description`: A detailed description of the work
- `start` and `stop`: The start and end dates derived from the year and week number
Additional properties specific to railway works:
- `line_code`: The line code
- `work_type`: The type of work
- `interventions`: The number of interventions
- `start_point` and `end_point`: The start and end points (kilometer points)
- `structure`: The managing structure
- `source`: "SNCF Open Data"
## Testing
A test script (`test_sncf_travaux.py`) is provided to test the functionality of the extractor without actually submitting events to the database. It tests the `week_to_date()`, `create_event()`, and `fetch_sncf_data()` functions with various inputs.
To run the test script:
```bash
python3 extractors/test_sncf_travaux.py
```
## Usage
To run the extractor and add SNCF railway work events to the database:
```bash
./extractors/sncf_travaux.py
```
## Notes
- The extractor uses a placeholder location (center of France) for the event geometry. In a real implementation, you might want to geocode the line or use a predefined location.
- The extractor assumes that the SNCF API returns data in the format described in the comments. If the API changes, the extractor may need to be updated.
- The extractor handles various error cases, such as missing required fields, invalid week numbers, and database connection errors.

247
extractors/edf_schedules.py Executable file
View file

@ -0,0 +1,247 @@
#!/usr/bin/env python3
"""
EDF Schedules Extractor for the OpenEventDatabase.
This script fetches nuclear power plant maintenance schedules from the EDF open data API
and adds them to the OpenEventDatabase.
API URL: https://opendata.edf.fr/api/explore/v2.1/catalog/datasets/disponibilite-du-parc-nucleaire-d-edf-sa-present-passe-et-previsionnel/records
"""
import json
import requests
import datetime
import logging
import sys
import os
# Add the parent directory to the path so we can import from oedb
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from oedb.utils.db import db_connect
from oedb.utils.logging import logger
# API URL for EDF open data
API_URL = "https://opendata.edf.fr/api/explore/v2.1/catalog/datasets/disponibilite-du-parc-nucleaire-d-edf-sa-present-passe-et-previsionnel/records"
def fetch_edf_data():
"""
Fetch maintenance planning data from the EDF open data API.
Returns:
list: A list of maintenance events.
"""
logger.info("Fetching data from EDF open data API")
try:
response = requests.get(API_URL)
response.raise_for_status() # Raise an exception for HTTP errors
data = response.json()
if 'results' not in data:
logger.error("No results found in API response")
return []
logger.success(f"Successfully fetched {len(data['results'])} records from EDF API")
return data['results']
except requests.exceptions.RequestException as e:
logger.error(f"Error fetching data from EDF API: {e}")
return []
except json.JSONDecodeError as e:
logger.error(f"Error decoding JSON response: {e}")
return []
def create_event(record):
"""
Create an event object from an EDF record.
Args:
record: A record from the EDF API.
Returns:
dict: A GeoJSON Feature representing the event.
"""
# Extract data from the record
try:
# Extract the nuclear power plant name and unit
site_name = record.get('site', 'Unknown Site')
unit = record.get('unite', 'Unknown Unit')
# Extract start and end dates
start_date = record.get('date_et_heure_fuseau_horaire_europe_paris')
# end_date = record.get('date_fin')
if not start_date or not end_date:
logger.warning(f"Missing start or end date for {site_name} {unit}, skipping")
return None
# Extract power values
power_available = record.get('puissance_disponible')
power_max = record.get('puissance_maximale_possible')
# Extract coordinates (if available)
# Note: The API might not provide coordinates, so we'd need to geocode the site names
# For now, we'll use a placeholder location in France
coordinates = [2.2137, 46.2276] # Center of France
# Create the event object
event = {
"type": "Feature",
"geometry": {
"type": "Point",
"coordinates": coordinates
},
"properties": {
"type": "scheduled",
"what": "energy.maintenance.nuclear",
"what:series": "EDF Nuclear Maintenance",
"where": f"{site_name} - {unit}",
"label": f"Nuclear Maintenance: {site_name} - {unit}",
"start": start_date,
"stop": end_date,
"power_available": power_available,
"power_max": power_max,
"source": "EDF Open Data"
}
}
return event
except Exception as e:
logger.error(f"Error creating event from record: {e}")
return None
def submit_event(event):
"""
Submit an event to the OpenEventDatabase.
Args:
event: A GeoJSON Feature representing the event.
Returns:
bool: True if the event was successfully submitted, False otherwise.
"""
try:
# Connect to the database
db = db_connect()
cur = db.cursor()
# Extract event properties
properties = event['properties']
geometry = json.dumps(event['geometry'])
# Insert the geometry into the geo table
cur.execute("""
INSERT INTO geo
SELECT geom, md5(st_astext(geom)) as hash, st_centroid(geom) as geom_center FROM
(SELECT st_setsrid(st_geomfromgeojson(%s),4326) as geom) as g
WHERE ST_IsValid(geom)
ON CONFLICT DO NOTHING RETURNING hash;
""", (geometry,))
# Get the geometry hash
hash_result = cur.fetchone()
if hash_result is None:
# If the hash is None, get it from the database
cur.execute("""
SELECT md5(st_asewkt(geom)),
ST_IsValid(geom),
ST_IsValidReason(geom) from (SELECT st_geomfromgeojson(%s) as geom) as g;
""", (geometry,))
hash_result = cur.fetchone()
if hash_result is None or (len(hash_result) > 1 and not hash_result[1]):
logger.error(f"Invalid geometry for event: {properties.get('label')}")
db.close()
return False
geo_hash = hash_result[0]
# Determine the bounds for the time range
bounds = '[]' if properties['start'] == properties['stop'] else '[)'
# Insert the event into the database
cur.execute("""
INSERT INTO events (events_type, events_what, events_when, events_tags, events_geo)
VALUES (%s, %s, tstzrange(%s, %s, %s), %s, %s)
ON CONFLICT DO NOTHING RETURNING events_id;
""", (
properties['type'],
properties['what'],
properties['start'],
properties['stop'],
bounds,
json.dumps(properties),
geo_hash
))
# Get the event ID
event_id = cur.fetchone()
if event_id:
logger.success(f"Event created with ID: {event_id[0]}")
db.commit()
db.close()
return True
else:
# Check if the event already exists
cur.execute("""
SELECT events_id FROM events
WHERE events_what = %s
AND events_when = tstzrange(%s, %s, %s)
AND events_geo = %s;
""", (
properties['what'],
properties['start'],
properties['stop'],
bounds,
geo_hash
))
existing_id = cur.fetchone()
if existing_id:
logger.info(f"Event already exists with ID: {existing_id[0]}")
else:
logger.warning(f"Failed to create event: {properties.get('label')}")
db.close()
return False
except Exception as e:
logger.error(f"Error submitting event: {e}")
return False
def main():
"""
Main function to fetch EDF data and add events to the database.
"""
logger.info("Starting EDF schedules extractor")
# Fetch data from the EDF API
records = fetch_edf_data()
if not records:
logger.warning("No records found, exiting")
return
# Process each record
success_count = 0
for record in records:
# Create an event from the record
event = create_event(record)
if not event:
continue
# Submit the event to the database
if submit_event(event):
success_count += 1
logger.success(f"Successfully added {success_count} out of {len(records)} events to the database")
if __name__ == "__main__":
main()

337
extractors/sncf_travaux.py Executable file
View file

@ -0,0 +1,337 @@
#!/usr/bin/env python3
"""
SNCF Travaux Extractor for the OpenEventDatabase.
This script fetches railway work schedules from the SNCF open data API
and adds them to the OpenEventDatabase.
API URL: https://data.sncf.com/api/explore/v2.1/catalog/datasets/interceptions-programmees-sur-ligne/records?limit=200
Example data format:
{
"total_count":482,
"results":[
{
"lib_structdem":"Siège INFRAPOLE PACA",
"cod_ligne":"830000",
"lib_ligne":"Ligne de Paris-Lyon à Marseille-St-Charles",
"pk_debm":"687000",
"pk_finm":"862100",
"familletravaux":"renouvellement de la signalisation",
"nb_interventions":1,
"num_semaine":"31",
"annee":"2023"
}
]
}
To get a start date, we combine the "annee" (year) and "num_semaine" (week number) fields.
"""
import json
import requests
import datetime
import sys
import os
from datetime import datetime, timedelta
# Add the parent directory to the path so we can import from oedb
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from oedb.utils.db import db_connect
from oedb.utils.logging import logger
# API URL for SNCF open data
API_URL = "https://data.sncf.com/api/explore/v2.1/catalog/datasets/interceptions-programmees-sur-ligne/records?limit=200"
def fetch_sncf_data():
"""
Fetch railway work planning data from the SNCF open data API.
Returns:
list: A list of railway work records.
"""
logger.info("Fetching data from SNCF open data API")
try:
response = requests.get(API_URL)
response.raise_for_status() # Raise an exception for HTTP errors
data = response.json()
if 'results' not in data:
logger.error("No results found in API response")
return []
logger.success(f"Successfully fetched {len(data['results'])} records from SNCF API")
return data['results']
except requests.exceptions.RequestException as e:
logger.error(f"Error fetching data from SNCF API: {e}")
return []
except json.JSONDecodeError as e:
logger.error(f"Error decoding JSON response: {e}")
return []
def week_to_date(year, week_number):
"""
Convert a year and week number to a date.
Args:
year (str or int): The year.
week_number (str or int): The week number (1-53).
Returns:
tuple: A tuple containing (start_date, end_date) as ISO format strings.
"""
try:
# Convert inputs to integers
year = int(year)
week_number = int(week_number)
# Validate inputs
if week_number < 1 or week_number > 53:
logger.warning(f"Invalid week number: {week_number}, using week 1 instead")
week_number = 1
# Calculate the date of the first day of the week (Monday)
# The %G and %V format codes are used for ISO week date
# %G is the ISO year number and %V is the ISO week number
start_date = datetime.strptime(f'{year}-{week_number}-1', '%Y-%W-%w')
# If the week number is 0, it means the first partial week of the year
if week_number == 0:
start_date = datetime(year, 1, 1)
# Calculate the end date (Sunday of the same week)
end_date = start_date + timedelta(days=6)
# Format dates as ISO strings
start_iso = start_date.isoformat()
end_iso = end_date.isoformat()
return (start_iso, end_iso)
except ValueError as e:
logger.error(f"Error converting week to date: {e}")
# Return default dates (first week of the year)
default_start = datetime(int(year), 1, 1).isoformat()
default_end = (datetime(int(year), 1, 1) + timedelta(days=6)).isoformat()
return (default_start, default_end)
except Exception as e:
logger.error(f"Unexpected error converting week to date: {e}")
# Return default dates (current date)
now = datetime.now()
return (now.isoformat(), (now + timedelta(days=7)).isoformat())
def create_event(record):
"""
Create an event object from a SNCF record.
Args:
record: A record from the SNCF API.
Returns:
dict: A GeoJSON Feature representing the event.
"""
try:
# Extract data from the record
structure = record.get('lib_structdem', 'Unknown Structure')
line_code = record.get('cod_ligne', 'Unknown Line Code')
line_name = record.get('lib_ligne', 'Unknown Line')
start_point = record.get('pk_debm', '')
end_point = record.get('pk_finm', '')
work_type = record.get('familletravaux', 'Unknown Work Type')
interventions = record.get('nb_interventions', 1)
# Extract year and week number
year = record.get('annee')
week_number = record.get('num_semaine')
if not year or not week_number:
logger.warning(f"Missing year or week number for {line_name}, skipping")
return None
# Convert week number to start and end dates
start_date, end_date = week_to_date(year, week_number)
# Create a descriptive label
label = f"Railway Work: {line_name} - {work_type}"
# Create a description with more details
description = (
f"Railway maintenance work on line {line_code} ({line_name}) "
f"from kilometer point {start_point} to {end_point}. "
f"Type of work: {work_type}. "
f"Number of interventions: {interventions}. "
f"Managed by: {structure}."
)
# Use a placeholder location in France
# In a real implementation, you might want to geocode the line or use a predefined location
coordinates = [2.2137, 46.2276] # Center of France
# Create the event object
event = {
"type": "Feature",
"geometry": {
"type": "Point",
"coordinates": coordinates
},
"properties": {
"type": "scheduled",
"what": "transport.railway.maintenance",
"what:series": "SNCF Railway Maintenance",
"where": line_name,
"label": label,
"description": description,
"start": start_date,
"stop": end_date,
"line_code": line_code,
"work_type": work_type,
"interventions": interventions,
"start_point": start_point,
"end_point": end_point,
"structure": structure,
"source": "SNCF Open Data"
}
}
return event
except Exception as e:
logger.error(f"Error creating event from record: {e}")
return None
def submit_event(event):
"""
Submit an event to the OpenEventDatabase.
Args:
event: A GeoJSON Feature representing the event.
Returns:
bool: True if the event was successfully submitted, False otherwise.
"""
try:
# Connect to the database
db = db_connect()
cur = db.cursor()
# Extract event properties
properties = event['properties']
geometry = json.dumps(event['geometry'])
# Insert the geometry into the geo table
cur.execute("""
INSERT INTO geo
SELECT geom, md5(st_astext(geom)) as hash, st_centroid(geom) as geom_center FROM
(SELECT st_setsrid(st_geomfromgeojson(%s),4326) as geom) as g
WHERE ST_IsValid(geom)
ON CONFLICT DO NOTHING RETURNING hash;
""", (geometry,))
# Get the geometry hash
hash_result = cur.fetchone()
if hash_result is None:
# If the hash is None, get it from the database
cur.execute("""
SELECT md5(st_asewkt(geom)),
ST_IsValid(geom),
ST_IsValidReason(geom) from (SELECT st_geomfromgeojson(%s) as geom) as g;
""", (geometry,))
hash_result = cur.fetchone()
if hash_result is None or (len(hash_result) > 1 and not hash_result[1]):
logger.error(f"Invalid geometry for event: {properties.get('label')}")
db.close()
return False
geo_hash = hash_result[0]
# Determine the bounds for the time range
bounds = '[]' if properties['start'] == properties['stop'] else '[)'
# Insert the event into the database
cur.execute("""
INSERT INTO events (events_type, events_what, events_when, events_tags, events_geo)
VALUES (%s, %s, tstzrange(%s, %s, %s), %s, %s)
ON CONFLICT DO NOTHING RETURNING events_id;
""", (
properties['type'],
properties['what'],
properties['start'],
properties['stop'],
bounds,
json.dumps(properties),
geo_hash
))
# Get the event ID
event_id = cur.fetchone()
if event_id:
logger.success(f"Event created with ID: {event_id[0]}")
db.commit()
db.close()
return True
else:
# Check if the event already exists
cur.execute("""
SELECT events_id FROM events
WHERE events_what = %s
AND events_when = tstzrange(%s, %s, %s)
AND events_geo = %s;
""", (
properties['what'],
properties['start'],
properties['stop'],
bounds,
geo_hash
))
existing_id = cur.fetchone()
if existing_id:
logger.info(f"Event already exists with ID: {existing_id[0]}")
else:
logger.warning(f"Failed to create event: {properties.get('label')}")
db.close()
return False
except Exception as e:
logger.error(f"Error submitting event: {e}")
return False
def main():
"""
Main function to fetch SNCF data and add events to the database.
"""
logger.info("Starting SNCF travaux extractor")
# Fetch data from the SNCF API
records = fetch_sncf_data()
if not records:
logger.warning("No records found, exiting")
return
# Process each record
success_count = 0
for record in records:
# Create an event from the record
event = create_event(record)
if not event:
continue
# Submit the event to the database
if submit_event(event):
success_count += 1
logger.success(f"Successfully added {success_count} out of {len(records)} events to the database")
if __name__ == "__main__":
main()

127
extractors/test_sncf_travaux.py Executable file
View file

@ -0,0 +1,127 @@
#!/usr/bin/env python3
"""
Test script for the SNCF travaux extractor.
This script tests the functionality of the SNCF travaux extractor without actually
submitting events to the database.
"""
import sys
import os
import json
from datetime import datetime
# Add the parent directory to the path so we can import from the extractor
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
# Import functions from the extractor
from extractors.sncf_travaux import fetch_sncf_data, week_to_date, create_event
def test_week_to_date():
"""Test the week_to_date function."""
print("Testing week_to_date function...")
# Test with valid inputs
year = 2023
week = 31
start_date, end_date = week_to_date(year, week)
print(f"Week {week} of {year} starts on {start_date} and ends on {end_date}")
# Test with string inputs
year = "2023"
week = "31"
start_date, end_date = week_to_date(year, week)
print(f"Week {week} of {year} starts on {start_date} and ends on {end_date}")
# Test with invalid week number
year = 2023
week = 60 # Invalid week number
start_date, end_date = week_to_date(year, week)
print(f"Invalid week {week} of {year} returns {start_date} to {end_date}")
print("week_to_date function test completed.\n")
def test_create_event():
"""Test the create_event function."""
print("Testing create_event function...")
# Create a sample record
record = {
"lib_structdem": "Siège INFRAPOLE PACA",
"cod_ligne": "830000",
"lib_ligne": "Ligne de Paris-Lyon à Marseille-St-Charles",
"pk_debm": "687000",
"pk_finm": "862100",
"familletravaux": "renouvellement de la signalisation",
"nb_interventions": 1,
"num_semaine": "31",
"annee": "2023"
}
# Create an event from the record
event = create_event(record)
if event:
print("Event created successfully:")
print(f"Label: {event['properties']['label']}")
print(f"Start date: {event['properties']['start']}")
print(f"End date: {event['properties']['stop']}")
print(f"Type: {event['properties']['type']}")
print(f"What: {event['properties']['what']}")
print(f"Where: {event['properties']['where']}")
print(f"Description: {event['properties']['description'][:100]}...")
else:
print("Failed to create event from record.")
# Test with missing required fields
record_missing = {
"lib_structdem": "Siège INFRAPOLE PACA",
"cod_ligne": "830000",
"lib_ligne": "Ligne de Paris-Lyon à Marseille-St-Charles",
# Missing year and week number
}
event_missing = create_event(record_missing)
if event_missing is None:
print("Correctly handled record with missing required fields.")
else:
print("Failed to handle record with missing required fields.")
print("create_event function test completed.\n")
def test_fetch_sncf_data():
"""Test the fetch_sncf_data function."""
print("Testing fetch_sncf_data function...")
print("Note: This test will make an actual API request to the SNCF API.")
print("If you're not connected to the internet, this test will fail.")
# Fetch data from the SNCF API
records = fetch_sncf_data()
if records:
print(f"Successfully fetched {len(records)} records from the SNCF API.")
print("First record:")
print(json.dumps(records[0], indent=2))
else:
print("Failed to fetch data from the SNCF API.")
print("fetch_sncf_data function test completed.\n")
def main():
"""Run all tests."""
print("Starting tests for SNCF travaux extractor...")
# Test the week_to_date function
test_week_to_date()
# Test the create_event function
test_create_event()
# Test the fetch_sncf_data function
# Uncomment the following line to test the API request
# test_fetch_sncf_data()
print("All tests completed.")
if __name__ == "__main__":
main()