"""
Quality Assurance resource for the OpenEventDatabase.
"""
import json
import falcon
import urllib.request
import urllib.error
from oedb.models.event import BaseEvent
from oedb.utils.logging import logger
class QualityAssuranceResource(BaseEvent):
"""
Resource for quality assurance checks on events.
Handles the /quality_assurance endpoint.
"""
def on_get(self, req, resp):
"""
Handle GET requests to the /quality_assurance endpoint.
Lists problematic events from the last 1000 events.
Args:
req: The request object.
resp: The response object.
"""
logger.info("Processing GET request to /quality_assurance")
try:
# Set content type to HTML for the interface
resp.content_type = 'text/html'
# Get problematic events
problematic_events = self.get_problematic_events()
# Create HTML response with filtering capabilities
html = self.create_qa_html(problematic_events)
resp.text = html
resp.status = falcon.HTTP_200
logger.success("Successfully processed GET request to /quality_assurance")
except Exception as e:
logger.error(f"Error processing GET request to /quality_assurance: {e}")
resp.status = falcon.HTTP_500
resp.text = f"Erreur: {str(e)}"
def get_problematic_events(self):
"""
Get events from the OEDB API and identify problematic ones.
Returns:
list: List of problematic events with their issues.
"""
logger.info("Fetching events from OEDB API for quality assurance")
try:
# Fetch events from the OEDB API
api_url = "https://api.openeventdatabase.org/event?"
with urllib.request.urlopen(api_url) as response:
data = json.loads(response.read().decode('utf-8'))
if not data or 'features' not in data:
logger.warning("No features found in API response")
return []
events = data['features']
logger.info(f"Retrieved {len(events)} events from API")
# Analyze events for problems
problematic_events = []
for feature in events:
issues = []
# Extract event data
event_data = {
'id': feature.get('properties', {}).get('id'),
'properties': feature.get('properties', {}),
'geometry': feature.get('geometry'),
'coordinates': feature.get('geometry', {}).get('coordinates', []),
'createdate': feature.get('properties', {}).get('createdate'),
'lastupdate': feature.get('properties', {}).get('lastupdate')
}
# Extract coordinates
if event_data['coordinates'] and len(event_data['coordinates']) >= 2:
event_data['longitude'] = float(event_data['coordinates'][0])
event_data['latitude'] = float(event_data['coordinates'][1])
else:
event_data['longitude'] = None
event_data['latitude'] = None
# Check for various issues
issues.extend(self.check_coordinate_issues(event_data))
issues.extend(self.check_geometry_issues(event_data))
issues.extend(self.check_property_issues(event_data))
# Only add to list if there are issues
if issues:
event_data['issues'] = issues
problematic_events.append(event_data)
logger.info(f"Found {len(problematic_events)} problematic events out of {len(events)} total")
return problematic_events
except urllib.error.URLError as e:
logger.error(f"Error fetching events from API: {e}")
return []
except json.JSONDecodeError as e:
logger.error(f"Error decoding JSON response from API: {e}")
return []
except Exception as e:
logger.error(f"Unexpected error fetching events: {e}")
return []
def check_coordinate_issues(self, event_data):
"""Check for coordinate-related issues."""
issues = []
# Check for null or zero coordinates
if event_data['longitude'] is None or event_data['latitude'] is None:
issues.append({
'type': 'missing_coordinates',
'severity': 'high',
'description': 'Coordonnées manquantes (longitude ou latitude null)'
})
elif event_data['longitude'] == 0 and event_data['latitude'] == 0:
issues.append({
'type': 'zero_coordinates',
'severity': 'high',
'description': 'Coordonnées nulles (0,0) - probablement invalides'
})
# Check for unrealistic coordinates
if event_data['longitude'] and event_data['latitude']:
if abs(event_data['longitude']) > 180:
issues.append({
'type': 'invalid_longitude',
'severity': 'high',
'description': f'Longitude invalide: {event_data["longitude"]} (doit être entre -180 et 180)'
})
if abs(event_data['latitude']) > 90:
issues.append({
'type': 'invalid_latitude',
'severity': 'high',
'description': f'Latitude invalide: {event_data["latitude"]} (doit être entre -90 et 90)'
})
return issues
def check_geometry_issues(self, event_data):
"""Check for geometry-related issues."""
issues = []
geometry = event_data.get('geometry')
if not geometry:
issues.append({
'type': 'missing_geometry',
'severity': 'high',
'description': 'Géométrie manquante'
})
elif geometry.get('type') not in ['Point', 'LineString', 'Polygon', 'MultiPoint', 'MultiLineString', 'MultiPolygon']:
issues.append({
'type': 'invalid_geometry_type',
'severity': 'medium',
'description': f'Type de géométrie invalide: {geometry.get("type")}'
})
elif geometry.get('type') == 'Point' and not geometry.get('coordinates'):
issues.append({
'type': 'empty_point_coordinates',
'severity': 'high',
'description': 'Point sans coordonnées'
})
return issues
def check_property_issues(self, event_data):
"""Check for property-related issues."""
issues = []
properties = event_data.get('properties', {})
# Check for missing essential properties
if not properties.get('what'):
issues.append({
'type': 'missing_what',
'severity': 'medium',
'description': 'Propriété "what" manquante (type d\'événement)'
})
if not properties.get('label') and not properties.get('name'):
issues.append({
'type': 'missing_label',
'severity': 'low',
'description': 'Aucun libellé ou nom pour l\'événement'
})
# Check for very short or empty descriptions
description = properties.get('description', '')
if isinstance(description, str) and len(description.strip()) < 5:
issues.append({
'type': 'short_description',
'severity': 'low',
'description': 'Description trop courte ou vide'
})
return issues
def create_qa_html(self, problematic_events):
"""Create HTML interface for quality assurance."""
# Count issues by type
issue_counts = {}
for event in problematic_events:
for issue in event['issues']:
issue_type = issue['type']
if issue_type not in issue_counts:
issue_counts[issue_type] = 0
issue_counts[issue_type] += 1
html = """
Contrôle Qualité - OpenEventDatabase
Contrôle Qualité des Événements
Analyse des 1000 derniers événements pour détecter les problèmes divers
Filtres
Total événements problématiques: """ + str(len(problematic_events)) + """
"""
for issue_type, count in sorted(issue_counts.items(), key=lambda x: x[1], reverse=True):
issue_label = issue_type.replace('_', ' ').title()
html += f"
{issue_label}: {count}
"
html += """
"""
# Add events
for event in problematic_events:
properties = event.get('properties', {})
event_title = properties.get('label', properties.get('name', f'Événement #{event["id"]}'))
event_what = properties.get('what', 'Non spécifié')
# Get severity classes for the card
max_severity = 'low'
for issue in event['issues']:
if issue['severity'] == 'high':
max_severity = 'high'
break
elif issue['severity'] == 'medium' and max_severity == 'low':
max_severity = 'medium'
html += f"""