#!/usr/bin/env python3 """ EDF Schedules Extractor for the OpenEventDatabase. This script fetches nuclear power plant maintenance schedules from the EDF open data API and adds them to the OpenEventDatabase. API URL: https://opendata.edf.fr/api/explore/v2.1/catalog/datasets/disponibilite-du-parc-nucleaire-d-edf-sa-present-passe-et-previsionnel/records """ import json import requests import datetime import logging import sys import os # Add the parent directory to the path so we can import from oedb sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) from oedb.utils.db import db_connect from oedb.utils.logging import logger # API URL for EDF open data API_URL = "https://opendata.edf.fr/api/explore/v2.1/catalog/datasets/disponibilite-du-parc-nucleaire-d-edf-sa-present-passe-et-previsionnel/records" def fetch_edf_data(): """ Fetch maintenance planning data from the EDF open data API. Returns: list: A list of maintenance events. """ logger.info("Fetching data from EDF open data API") try: response = requests.get(API_URL) response.raise_for_status() # Raise an exception for HTTP errors data = response.json() if 'results' not in data: logger.error("No results found in API response") return [] logger.success(f"Successfully fetched {len(data['results'])} records from EDF API") return data['results'] except requests.exceptions.RequestException as e: logger.error(f"Error fetching data from EDF API: {e}") return [] except json.JSONDecodeError as e: logger.error(f"Error decoding JSON response: {e}") return [] def create_event(record): """ Create an event object from an EDF record. Args: record: A record from the EDF API. Returns: dict: A GeoJSON Feature representing the event. """ # Extract data from the record try: # Extract the nuclear power plant name and unit site_name = record.get('site', 'Unknown Site') unit = record.get('unite', 'Unknown Unit') # Extract start and end dates start_date = record.get('date_et_heure_fuseau_horaire_europe_paris') # end_date = record.get('date_fin') if not start_date or not end_date: logger.warning(f"Missing start or end date for {site_name} {unit}, skipping") return None # Extract power values power_available = record.get('puissance_disponible') power_max = record.get('puissance_maximale_possible') # Extract coordinates (if available) # Note: The API might not provide coordinates, so we'd need to geocode the site names # For now, we'll use a placeholder location in France coordinates = [2.2137, 46.2276] # Center of France # Create the event object event = { "type": "Feature", "geometry": { "type": "Point", "coordinates": coordinates }, "properties": { "type": "scheduled", "what": "energy.maintenance.nuclear", "what:series": "EDF Nuclear Maintenance", "where": f"{site_name} - {unit}", "label": f"Nuclear Maintenance: {site_name} - {unit}", "start": start_date, "stop": end_date, "power_available": power_available, "power_max": power_max, "source": "EDF Open Data" } } return event except Exception as e: logger.error(f"Error creating event from record: {e}") return None def submit_event(event): """ Submit an event to the OpenEventDatabase. Args: event: A GeoJSON Feature representing the event. Returns: bool: True if the event was successfully submitted, False otherwise. """ try: # Connect to the database db = db_connect() cur = db.cursor() # Extract event properties properties = event['properties'] geometry = json.dumps(event['geometry']) # Insert the geometry into the geo table cur.execute(""" INSERT INTO geo SELECT geom, md5(st_astext(geom)) as hash, st_centroid(geom) as geom_center FROM (SELECT st_setsrid(st_geomfromgeojson(%s),4326) as geom) as g WHERE ST_IsValid(geom) ON CONFLICT DO NOTHING RETURNING hash; """, (geometry,)) # Get the geometry hash hash_result = cur.fetchone() if hash_result is None: # If the hash is None, get it from the database cur.execute(""" SELECT md5(st_asewkt(geom)), ST_IsValid(geom), ST_IsValidReason(geom) from (SELECT st_geomfromgeojson(%s) as geom) as g; """, (geometry,)) hash_result = cur.fetchone() if hash_result is None or (len(hash_result) > 1 and not hash_result[1]): logger.error(f"Invalid geometry for event: {properties.get('label')}") db.close() return False geo_hash = hash_result[0] # Determine the bounds for the time range bounds = '[]' if properties['start'] == properties['stop'] else '[)' # Insert the event into the database cur.execute(""" INSERT INTO events (events_type, events_what, events_when, events_tags, events_geo) VALUES (%s, %s, tstzrange(%s, %s, %s), %s, %s) ON CONFLICT DO NOTHING RETURNING events_id; """, ( properties['type'], properties['what'], properties['start'], properties['stop'], bounds, json.dumps(properties), geo_hash )) # Get the event ID event_id = cur.fetchone() if event_id: logger.success(f"Event created with ID: {event_id[0]}") db.commit() db.close() return True else: # Check if the event already exists cur.execute(""" SELECT events_id FROM events WHERE events_what = %s AND events_when = tstzrange(%s, %s, %s) AND events_geo = %s; """, ( properties['what'], properties['start'], properties['stop'], bounds, geo_hash )) existing_id = cur.fetchone() if existing_id: logger.info(f"Event already exists with ID: {existing_id[0]}") else: logger.warning(f"Failed to create event: {properties.get('label')}") db.close() return False except Exception as e: logger.error(f"Error submitting event: {e}") return False def main(): """ Main function to fetch EDF data and add events to the database. """ logger.info("Starting EDF schedules extractor") # Fetch data from the EDF API records = fetch_edf_data() if not records: logger.warning("No records found, exiting") return # Process each record success_count = 0 for record in records: # Create an event from the record event = create_event(record) if not event: continue # Submit the event to the database if submit_event(event): success_count += 1 logger.success(f"Successfully added {success_count} out of {len(records)} events to the database") if __name__ == "__main__": main()