""" Traffic jam reporting resource for the OpenEventDatabase. """ import falcon import os import requests from oedb.utils.logging import logger from oedb.utils.db import load_env_from_file class DemoTrafficResource: """ Resource for the traffic jam reporting page. Handles the /demo/traffic endpoint. """ def on_get(self, req, resp): """ Handle GET requests to the /demo/traffic endpoint. Returns an HTML page with a form for reporting traffic jams. Args: req: The request object. resp: The response object. """ logger.info("Processing GET request to /demo/traffic") try: # Set content type to HTML resp.content_type = 'text/html' # Load environment variables from .env file load_env_from_file() # Get OAuth2 configuration parameters client_id = os.getenv("CLIENT_ID", "") client_secret = os.getenv("CLIENT_SECRET", "") client_authorizations = os.getenv("CLIENT_AUTORIZATIONS", "read_prefs") client_redirect = os.getenv("CLIENT_REDIRECT", "") # Check if we have an authorization code in the query parameters auth_code = req.params.get('code', None) auth_state = req.params.get('state', None) # Variables to track authentication state is_authenticated = False osm_username = "" osm_user_id = "" # If we have an authorization code, exchange it for an access token if auth_code: logger.info(f"Received authorization code: {auth_code}") try: # Exchange authorization code for access token token_url = "https://www.openstreetmap.org/oauth2/token" token_data = { "grant_type": "authorization_code", "code": auth_code, "redirect_uri": client_redirect, "client_id": client_id, "client_secret": client_secret } token_response = requests.post(token_url, data=token_data) token_response.raise_for_status() token_info = token_response.json() access_token = token_info.get("access_token") if access_token: # Use access token to get user information user_url = "https://api.openstreetmap.org/api/0.6/user/details.json" headers = {"Authorization": f"Bearer {access_token}"} user_response = requests.get(user_url, headers=headers) user_response.raise_for_status() user_info = user_response.json() # Extract user information user = user_info.get("user", {}) osm_username = user.get("display_name", "") osm_user_id = user.get("id", "") if osm_username: is_authenticated = True logger.info(f"User authenticated: {osm_username} (ID: {osm_user_id})") else: logger.error("Failed to get OSM username from user details") else: logger.error("Failed to get access token from token response") except Exception as e: logger.error(f"Error during OAuth2 token exchange: {e}") # Create HTML response with form # Start with the common HTML header html_header = f""" Report Traffic Jam - OpenEventDatabase

Report Road Issue

OpenStreetMap Authentication

""" # Add authentication section based on authentication status if is_authenticated: auth_section = f"""

Logged in as {osm_username}

View OSM Profile

""" else: auth_section = f"""

Authenticate with your OpenStreetMap account to include your username in the traffic report.

""" # Add the rest of the HTML template html_footer = """

Select Issue Type

Pothole
Obstacle
Vehicle on Side
Danger
Click on the map to set the issue location or use the "Get My Current Location" button
View All Saved Events on Map
""" # Concatenate the HTML parts to form the complete template html = html_header + auth_section + html_footer # Set the response body and status resp.text = html resp.status = falcon.HTTP_200 logger.success("Successfully processed GET request to /demo/traffic") except Exception as e: logger.error(f"Error processing GET request to /demo/traffic: {e}") resp.status = falcon.HTTP_500 resp.text = f"Error: {str(e)}" # Create a global instance of DemoTrafficResource demo_traffic = DemoTrafficResource()