""" Quality Assurance resource for the OpenEventDatabase. """ import json import falcon import urllib.request import urllib.error from oedb.models.event import BaseEvent from oedb.utils.logging import logger class QualityAssuranceResource(BaseEvent): """ Resource for quality assurance checks on events. Handles the /quality_assurance endpoint. """ def on_get(self, req, resp): """ Handle GET requests to the /quality_assurance endpoint. Lists problematic events from the last 1000 events. Args: req: The request object. resp: The response object. """ logger.info("Processing GET request to /quality_assurance") try: # Set content type to HTML for the interface resp.content_type = 'text/html' # Get problematic events problematic_events = self.get_problematic_events() # Create HTML response with filtering capabilities html = self.create_qa_html(problematic_events) resp.text = html resp.status = falcon.HTTP_200 logger.success("Successfully processed GET request to /quality_assurance") except Exception as e: logger.error(f"Error processing GET request to /quality_assurance: {e}") resp.status = falcon.HTTP_500 resp.text = f"Erreur: {str(e)}" def get_problematic_events(self): """ Get events from the OEDB API and identify problematic ones. Returns: list: List of problematic events with their issues. """ logger.info("Fetching events from OEDB API for quality assurance") try: # Fetch events from the OEDB API api_url = "https://api.openeventdatabase.org/event?" with urllib.request.urlopen(api_url) as response: data = json.loads(response.read().decode('utf-8')) if not data or 'features' not in data: logger.warning("No features found in API response") return [] events = data['features'] logger.info(f"Retrieved {len(events)} events from API") # Analyze events for problems problematic_events = [] for feature in events: issues = [] # Extract event data event_data = { 'id': feature.get('properties', {}).get('id'), 'properties': feature.get('properties', {}), 'geometry': feature.get('geometry'), 'coordinates': feature.get('geometry', {}).get('coordinates', []), 'createdate': feature.get('properties', {}).get('createdate'), 'lastupdate': feature.get('properties', {}).get('lastupdate') } # Extract coordinates if event_data['coordinates'] and len(event_data['coordinates']) >= 2: event_data['longitude'] = float(event_data['coordinates'][0]) event_data['latitude'] = float(event_data['coordinates'][1]) else: event_data['longitude'] = None event_data['latitude'] = None # Check for various issues issues.extend(self.check_coordinate_issues(event_data)) issues.extend(self.check_geometry_issues(event_data)) issues.extend(self.check_property_issues(event_data)) # Only add to list if there are issues if issues: event_data['issues'] = issues problematic_events.append(event_data) logger.info(f"Found {len(problematic_events)} problematic events out of {len(events)} total") return problematic_events except urllib.error.URLError as e: logger.error(f"Error fetching events from API: {e}") return [] except json.JSONDecodeError as e: logger.error(f"Error decoding JSON response from API: {e}") return [] except Exception as e: logger.error(f"Unexpected error fetching events: {e}") return [] def check_coordinate_issues(self, event_data): """Check for coordinate-related issues.""" issues = [] # Check for null or zero coordinates if event_data['longitude'] is None or event_data['latitude'] is None: issues.append({ 'type': 'missing_coordinates', 'severity': 'high', 'description': 'Coordonnées manquantes (longitude ou latitude null)' }) elif event_data['longitude'] == 0 and event_data['latitude'] == 0: issues.append({ 'type': 'zero_coordinates', 'severity': 'high', 'description': 'Coordonnées nulles (0,0) - probablement invalides' }) # Check for unrealistic coordinates if event_data['longitude'] and event_data['latitude']: if abs(event_data['longitude']) > 180: issues.append({ 'type': 'invalid_longitude', 'severity': 'high', 'description': f'Longitude invalide: {event_data["longitude"]} (doit être entre -180 et 180)' }) if abs(event_data['latitude']) > 90: issues.append({ 'type': 'invalid_latitude', 'severity': 'high', 'description': f'Latitude invalide: {event_data["latitude"]} (doit être entre -90 et 90)' }) return issues def check_geometry_issues(self, event_data): """Check for geometry-related issues.""" issues = [] geometry = event_data.get('geometry') if not geometry: issues.append({ 'type': 'missing_geometry', 'severity': 'high', 'description': 'Géométrie manquante' }) elif geometry.get('type') not in ['Point', 'LineString', 'Polygon', 'MultiPoint', 'MultiLineString', 'MultiPolygon']: issues.append({ 'type': 'invalid_geometry_type', 'severity': 'medium', 'description': f'Type de géométrie invalide: {geometry.get("type")}' }) elif geometry.get('type') == 'Point' and not geometry.get('coordinates'): issues.append({ 'type': 'empty_point_coordinates', 'severity': 'high', 'description': 'Point sans coordonnées' }) return issues def check_property_issues(self, event_data): """Check for property-related issues.""" issues = [] properties = event_data.get('properties', {}) # Check for missing essential properties if not properties.get('what'): issues.append({ 'type': 'missing_what', 'severity': 'medium', 'description': 'Propriété "what" manquante (type d\'événement)' }) if not properties.get('label') and not properties.get('name'): issues.append({ 'type': 'missing_label', 'severity': 'low', 'description': 'Aucun libellé ou nom pour l\'événement' }) # Check for very short or empty descriptions description = properties.get('description', '') if isinstance(description, str) and len(description.strip()) < 5: issues.append({ 'type': 'short_description', 'severity': 'low', 'description': 'Description trop courte ou vide' }) return issues def create_qa_html(self, problematic_events): """Create HTML interface for quality assurance.""" # Count issues by type issue_counts = {} for event in problematic_events: for issue in event['issues']: issue_type = issue['type'] if issue_type not in issue_counts: issue_counts[issue_type] = 0 issue_counts[issue_type] += 1 html = """ Contrôle Qualité - OpenEventDatabase

Contrôle Qualité des Événements

Analyse des 1000 derniers événements pour détecter les problèmes divers

Filtres

Total événements problématiques: """ + str(len(problematic_events)) + """

""" for issue_type, count in sorted(issue_counts.items(), key=lambda x: x[1], reverse=True): issue_label = issue_type.replace('_', ' ').title() html += f"

{issue_label}: {count}

" html += """
""" # Add events for event in problematic_events: properties = event.get('properties', {}) event_title = properties.get('label', properties.get('name', f'Événement #{event["id"]}')) event_what = properties.get('what', 'Non spécifié') # Get severity classes for the card max_severity = 'low' for issue in event['issues']: if issue['severity'] == 'high': max_severity = 'high' break elif issue['severity'] == 'medium' and max_severity == 'low': max_severity = 'medium' html += f"""

{event_title}

Type: {event_what} | Coordonnées: {event.get('latitude', 'N/A')}, {event.get('longitude', 'N/A')}

Problèmes détectés:
    """ for issue in event['issues']: severity_icon = { 'high': '🔴', 'medium': '🟡', 'low': '🟢' }.get(issue['severity'], '⚪') html += f"""
  • {severity_icon} {issue['type'].replace('_', ' ').title()}: {issue['description']}
  • """ html += """
Créé: """ + str(event.get('createdate', 'N/A')) + """ | Modifié: """ + str(event.get('lastupdate', 'N/A')) + """
""" if not problematic_events: html += """
Aucun problème détecté dans les 1000 derniers événements ! 🎉
""" html += """
""" return html # Create a global instance quality_assurance = QualityAssuranceResource()