oedb-backend/extractors/edf_schedules.py

247 lines
No EOL
7.8 KiB
Python
Executable file

#!/usr/bin/env python3
"""
EDF Schedules Extractor for the OpenEventDatabase.
This script fetches nuclear power plant maintenance schedules from the EDF open data API
and adds them to the OpenEventDatabase.
API URL: https://opendata.edf.fr/api/explore/v2.1/catalog/datasets/disponibilite-du-parc-nucleaire-d-edf-sa-present-passe-et-previsionnel/records
"""
import json
import requests
import datetime
import logging
import sys
import os
# Add the parent directory to the path so we can import from oedb
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from oedb.utils.db import db_connect
from oedb.utils.logging import logger
# API URL for EDF open data
API_URL = "https://opendata.edf.fr/api/explore/v2.1/catalog/datasets/disponibilite-du-parc-nucleaire-d-edf-sa-present-passe-et-previsionnel/records"
def fetch_edf_data():
"""
Fetch maintenance planning data from the EDF open data API.
Returns:
list: A list of maintenance events.
"""
logger.info("Fetching data from EDF open data API")
try:
response = requests.get(API_URL)
response.raise_for_status() # Raise an exception for HTTP errors
data = response.json()
if 'results' not in data:
logger.error("No results found in API response")
return []
logger.success(f"Successfully fetched {len(data['results'])} records from EDF API")
return data['results']
except requests.exceptions.RequestException as e:
logger.error(f"Error fetching data from EDF API: {e}")
return []
except json.JSONDecodeError as e:
logger.error(f"Error decoding JSON response: {e}")
return []
def create_event(record):
"""
Create an event object from an EDF record.
Args:
record: A record from the EDF API.
Returns:
dict: A GeoJSON Feature representing the event.
"""
# Extract data from the record
try:
# Extract the nuclear power plant name and unit
site_name = record.get('site', 'Unknown Site')
unit = record.get('unite', 'Unknown Unit')
# Extract start and end dates
start_date = record.get('date_et_heure_fuseau_horaire_europe_paris')
# end_date = record.get('date_fin')
if not start_date or not end_date:
logger.warning(f"Missing start or end date for {site_name} {unit}, skipping")
return None
# Extract power values
power_available = record.get('puissance_disponible')
power_max = record.get('puissance_maximale_possible')
# Extract coordinates (if available)
# Note: The API might not provide coordinates, so we'd need to geocode the site names
# For now, we'll use a placeholder location in France
coordinates = [2.2137, 46.2276] # Center of France
# Create the event object
event = {
"type": "Feature",
"geometry": {
"type": "Point",
"coordinates": coordinates
},
"properties": {
"type": "scheduled",
"what": "energy.maintenance.nuclear",
"what:series": "EDF Nuclear Maintenance",
"where": f"{site_name} - {unit}",
"label": f"Nuclear Maintenance: {site_name} - {unit}",
"start": start_date,
"stop": end_date,
"power_available": power_available,
"power_max": power_max,
"source": "EDF Open Data"
}
}
return event
except Exception as e:
logger.error(f"Error creating event from record: {e}")
return None
def submit_event(event):
"""
Submit an event to the OpenEventDatabase.
Args:
event: A GeoJSON Feature representing the event.
Returns:
bool: True if the event was successfully submitted, False otherwise.
"""
try:
# Connect to the database
db = db_connect()
cur = db.cursor()
# Extract event properties
properties = event['properties']
geometry = json.dumps(event['geometry'])
# Insert the geometry into the geo table
cur.execute("""
INSERT INTO geo
SELECT geom, md5(st_astext(geom)) as hash, st_centroid(geom) as geom_center FROM
(SELECT st_setsrid(st_geomfromgeojson(%s),4326) as geom) as g
WHERE ST_IsValid(geom)
ON CONFLICT DO NOTHING RETURNING hash;
""", (geometry,))
# Get the geometry hash
hash_result = cur.fetchone()
if hash_result is None:
# If the hash is None, get it from the database
cur.execute("""
SELECT md5(st_asewkt(geom)),
ST_IsValid(geom),
ST_IsValidReason(geom) from (SELECT st_geomfromgeojson(%s) as geom) as g;
""", (geometry,))
hash_result = cur.fetchone()
if hash_result is None or (len(hash_result) > 1 and not hash_result[1]):
logger.error(f"Invalid geometry for event: {properties.get('label')}")
db.close()
return False
geo_hash = hash_result[0]
# Determine the bounds for the time range
bounds = '[]' if properties['start'] == properties['stop'] else '[)'
# Insert the event into the database
cur.execute("""
INSERT INTO events (events_type, events_what, events_when, events_tags, events_geo)
VALUES (%s, %s, tstzrange(%s, %s, %s), %s, %s)
ON CONFLICT DO NOTHING RETURNING events_id;
""", (
properties['type'],
properties['what'],
properties['start'],
properties['stop'],
bounds,
json.dumps(properties),
geo_hash
))
# Get the event ID
event_id = cur.fetchone()
if event_id:
logger.success(f"Event created with ID: {event_id[0]}")
db.commit()
db.close()
return True
else:
# Check if the event already exists
cur.execute("""
SELECT events_id FROM events
WHERE events_what = %s
AND events_when = tstzrange(%s, %s, %s)
AND events_geo = %s;
""", (
properties['what'],
properties['start'],
properties['stop'],
bounds,
geo_hash
))
existing_id = cur.fetchone()
if existing_id:
logger.info(f"Event already exists with ID: {existing_id[0]}")
else:
logger.warning(f"Failed to create event: {properties.get('label')}")
db.close()
return False
except Exception as e:
logger.error(f"Error submitting event: {e}")
return False
def main():
"""
Main function to fetch EDF data and add events to the database.
"""
logger.info("Starting EDF schedules extractor")
# Fetch data from the EDF API
records = fetch_edf_data()
if not records:
logger.warning("No records found, exiting")
return
# Process each record
success_count = 0
for record in records:
# Create an event from the record
event = create_event(record)
if not event:
continue
# Submit the event to the database
if submit_event(event):
success_count += 1
logger.success(f"Successfully added {success_count} out of {len(records)} events to the database")
if __name__ == "__main__":
main()