This commit is contained in:
Tykayn 2025-09-18 23:43:06 +02:00 committed by tykayn
parent 153e9bd8f9
commit 6d755ee8dc
8 changed files with 1058 additions and 17 deletions

View file

@ -284,6 +284,7 @@ def event_exists(db, properties):
Returns:
bool: True if the event exists, False otherwise.
"""
print('event: ', properties)
try:
cur = db.cursor()
@ -348,6 +349,7 @@ def submit_event(event):
cur = db.cursor()
geometry = json.dumps(event['geometry'])
print('event: ', event)
# Insert the geometry into the geo table
cur.execute("""
INSERT INTO geo
@ -361,20 +363,56 @@ def submit_event(event):
hash_result = cur.fetchone()
if hash_result is None:
# If the hash is None, get it from the database
# If the hash is None, check if the geometry already exists in the database
cur.execute("""
SELECT md5(st_asewkt(geom)),
ST_IsValid(geom),
ST_IsValidReason(geom) from (SELECT st_geomfromgeojson(%s) as geom) as g;
SELECT hash FROM geo
WHERE hash = md5(st_astext(st_setsrid(st_geomfromgeojson(%s),4326)));
""", (geometry,))
hash_result = cur.fetchone()
if hash_result is None or (len(hash_result) > 1 and not hash_result[1]):
logger.error(f"Invalid geometry for event: {properties.get('label')}")
db.close()
return False
geo_hash = hash_result[0]
existing_hash = cur.fetchone()
if existing_hash:
# Geometry already exists in the database, use its hash
geo_hash = existing_hash[0]
logger.info(f"Using existing geometry with hash: {geo_hash}")
else:
# Geometry doesn't exist, try to insert it directly
cur.execute("""
SELECT md5(st_astext(geom)) as hash,
ST_IsValid(geom),
ST_IsValidReason(geom) from (SELECT st_setsrid(st_geomfromgeojson(%s),4326) as geom) as g;
""", (geometry,))
hash_result = cur.fetchone()
if hash_result is None or not hash_result[1]:
logger.error(f"Invalid geometry for event: {properties.get('label')}")
if hash_result and len(hash_result) > 2:
logger.error(f"Reason: {hash_result[2]}")
db.close()
return False
geo_hash = hash_result[0]
# Now insert the geometry explicitly
cur.execute("""
INSERT INTO geo (geom, hash, geom_center)
VALUES (
st_setsrid(st_geomfromgeojson(%s),4326),
%s,
st_centroid(st_setsrid(st_geomfromgeojson(%s),4326))
)
ON CONFLICT (hash) DO NOTHING;
""", (geometry, geo_hash, geometry))
# Verify the geometry was inserted
cur.execute("SELECT 1 FROM geo WHERE hash = %s", (geo_hash,))
if cur.fetchone() is None:
logger.error(f"Failed to insert geometry with hash: {geo_hash}")
db.close()
return False
logger.info(f"Inserted new geometry with hash: {geo_hash}")
else:
geo_hash = hash_result[0]
# Determine the bounds for the time range
bounds = '[]' if properties['start'] == properties['stop'] else '[)'

View file

@ -505,6 +505,7 @@ class DemoResource:
</ul>
<h3>Demo Pages:</h3>
<ul>
<li><a href="/demo/search" target="_blank">/demo/search - Advanced Search</a></li>
<li><a href="/demo/by-what" target="_blank">/demo/by-what - Events by Type</a></li>
<li><a href="/demo/map-by-what" target="_blank">/demo/map-by-what - Map by Event Type</a></li>
<li><a href="/event?what=music" target="_blank">Search Music Events</a></li>
@ -958,6 +959,771 @@ class DemoResource:
resp.status = falcon.HTTP_500
resp.text = f"Error: {str(e)}"
def on_get_search(self, req, resp):
"""
Handle GET requests to the /demo/search endpoint.
Returns an HTML page with a form for searching events and displaying results.
Args:
req: The request object.
resp: The response object.
"""
logger.info("Processing GET request to /demo/search")
try:
# Set content type to HTML
resp.content_type = 'text/html'
# Create HTML response with search form
html = """
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Search Events - OpenEventDatabase</title>
<script src="https://unpkg.com/maplibre-gl@3.0.0/dist/maplibre-gl.js"></script>
<link href="https://unpkg.com/maplibre-gl@3.0.0/dist/maplibre-gl.css" rel="stylesheet" />
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bulma@0.9.4/css/bulma.min.css">
<script defer src="https://use.fontawesome.com/releases/v5.15.4/js/all.js"></script>
<style>
body {
margin: 0;
padding: 20px;
font-family: Arial, sans-serif;
background-color: #f5f5f5;
}
.container {
max-width: 1200px;
margin: 0 auto;
background-color: white;
padding: 20px;
border-radius: 5px;
box-shadow: 0 0 10px rgba(0, 0, 0, 0.1);
}
h1 {
margin-top: 0;
color: #333;
}
.nav-links {
margin-bottom: 20px;
}
.nav-links a {
color: #0078ff;
text-decoration: none;
margin-right: 15px;
}
.nav-links a:hover {
text-decoration: underline;
}
.tabs-container {
margin-top: 20px;
}
.tab-content {
display: none;
padding: 20px;
border: 1px solid #ddd;
border-top: none;
}
.tab-content.active {
display: block;
}
.tab-buttons {
display: flex;
border-bottom: 1px solid #ddd;
}
.tab-button {
padding: 10px 20px;
background-color: #f1f1f1;
border: 1px solid #ddd;
border-bottom: none;
cursor: pointer;
margin-right: 5px;
}
.tab-button.active {
background-color: white;
border-bottom: 1px solid white;
margin-bottom: -1px;
}
#map {
width: 100%;
height: 500px;
margin-top: 20px;
border-radius: 4px;
}
.results-table {
width: 100%;
border-collapse: collapse;
margin-top: 20px;
}
.results-table th, .results-table td {
padding: 8px;
text-align: left;
border-bottom: 1px solid #ddd;
}
.results-table th {
background-color: #f2f2f2;
}
.download-buttons {
margin-top: 20px;
text-align: right;
}
.download-button {
display: inline-block;
padding: 8px 16px;
background-color: #0078ff;
color: white;
border: none;
border-radius: 4px;
cursor: pointer;
margin-left: 10px;
text-decoration: none;
}
.download-button:hover {
background-color: #0056b3;
}
.form-row {
display: flex;
gap: 15px;
margin-bottom: 15px;
}
.form-group {
flex: 1;
}
label {
display: block;
margin-bottom: 5px;
font-weight: bold;
}
input[type="text"],
input[type="datetime-local"],
select,
textarea {
width: 100%;
padding: 8px;
border: 1px solid #ddd;
border-radius: 4px;
box-sizing: border-box;
font-size: 14px;
}
.note {
font-size: 12px;
color: #666;
margin-top: 5px;
}
button {
background-color: #0078ff;
color: white;
border: none;
padding: 10px 15px;
border-radius: 4px;
cursor: pointer;
font-size: 16px;
}
button:hover {
background-color: #0056b3;
}
#result {
margin-top: 20px;
padding: 10px;
border-radius: 4px;
display: none;
}
#result.success {
background-color: #d4edda;
border: 1px solid #c3e6cb;
color: #155724;
}
#result.error {
background-color: #f8d7da;
border: 1px solid #f5c6cb;
color: #721c24;
}
</style>
</head>
<body>
<div class="container">
<div class="nav-links">
<a href="/demo">&larr; Back to Map</a>
<a href="/">API Information</a>
<a href="/event">View Events</a>
</div>
<h1>Search Events</h1>
<form id="searchForm">
<div class="form-row">
<div class="form-group">
<label for="what">Event Type</label>
<input type="text" id="what" name="what" placeholder="e.g., sport.match.football">
<div class="note">Category of the event (e.g., sport.match.football, culture.festival)</div>
</div>
<div class="form-group">
<label for="type">Event Type</label>
<select id="type" name="type">
<option value="">Any</option>
<option value="scheduled">Scheduled</option>
<option value="forecast">Forecast</option>
<option value="unscheduled">Unscheduled</option>
</select>
</div>
</div>
<div class="form-row">
<div class="form-group">
<label for="when">When</label>
<select id="when" name="when">
<option value="now">Now</option>
<option value="today">Today</option>
<option value="yesterday">Yesterday</option>
<option value="tomorrow">Tomorrow</option>
<option value="lasthour">Last Hour</option>
<option value="nexthour">Next Hour</option>
<option value="last7days">Last 7 Days</option>
<option value="next7days">Next 7 Days</option>
<option value="last30days">Last 30 Days</option>
<option value="next30days">Next 30 Days</option>
<option value="custom">Custom Range</option>
</select>
</div>
<div class="form-group" id="customDateGroup" style="display: none;">
<label for="start">Start Date</label>
<input type="datetime-local" id="start" name="start">
</div>
<div class="form-group" id="customDateEndGroup" style="display: none;">
<label for="stop">End Date</label>
<input type="datetime-local" id="stop" name="stop">
</div>
</div>
<div class="form-row">
<div class="form-group">
<label for="near">Near (Longitude, Latitude, Distance in meters)</label>
<input type="text" id="near" name="near" placeholder="e.g., 2.3522,48.8566,10000">
<div class="note">Search for events near a specific location (e.g., 2.3522,48.8566,10000 for events within 10km of Paris)</div>
</div>
<div class="form-group">
<label for="bbox">Bounding Box (East, South, West, North)</label>
<input type="text" id="bbox" name="bbox" placeholder="e.g., -5.0,41.0,10.0,52.0">
<div class="note">Search for events within a geographic bounding box</div>
</div>
</div>
<div class="form-row">
<div class="form-group">
<label for="where_osm">OpenStreetMap ID</label>
<input type="text" id="where_osm" name="where:osm" placeholder="e.g., R12345">
<div class="note">Search for events associated with a specific OpenStreetMap ID</div>
</div>
<div class="form-group">
<label for="where_wikidata">Wikidata ID</label>
<input type="text" id="where_wikidata" name="where:wikidata" placeholder="e.g., Q90">
<div class="note">Search for events associated with a specific Wikidata ID</div>
</div>
</div>
<div class="form-row">
<div class="form-group">
<label for="limit">Result Limit</label>
<input type="number" id="limit" name="limit" value="200" min="1" max="1000">
<div class="note">Maximum number of results to return (default: 200)</div>
</div>
<div class="form-group">
<label for="geom">Geometry Detail</label>
<select id="geom" name="geom">
<option value="">Default (Centroid)</option>
<option value="full">Full Geometry</option>
<option value="only">Geometry Only</option>
<option value="0.01">Simplified (0.01)</option>
</select>
<div class="note">Controls the level of detail in the geometry portion of the response</div>
</div>
</div>
<div class="form-group">
<label>Search Area (Draw on Map)</label>
<div id="map"></div>
<div class="note">Draw a polygon on the map to define the search area, or use the form fields above</div>
</div>
<button type="submit">Search Events</button>
</form>
<div id="result"></div>
<div id="resultsContainer" style="display: none;">
<h2>Search Results</h2>
<div class="tabs-container">
<div class="tab-buttons">
<div class="tab-button active" data-tab="map-tab">Map View</div>
<div class="tab-button" data-tab="table-tab">Table View</div>
</div>
<div id="map-tab" class="tab-content active">
<div id="resultsMap" style="width: 100%; height: 500px;"></div>
</div>
<div id="table-tab" class="tab-content">
<table class="results-table" id="resultsTable">
<thead>
<tr>
<th>ID</th>
<th>Label</th>
<th>Type</th>
<th>What</th>
<th>Where</th>
<th>Start</th>
<th>Stop</th>
</tr>
</thead>
<tbody>
<!-- Results will be added here dynamically -->
</tbody>
</table>
</div>
</div>
<div class="download-buttons">
<button id="downloadCsv" class="download-button">Download CSV</button>
<button id="downloadJson" class="download-button">Download JSON</button>
</div>
</div>
</div>
<script>
// Initialize the map
const map = new maplibregl.Map({
container: 'map',
style: 'https://demotiles.maplibre.org/style.json',
center: [2.3522, 48.8566], // Default center (Paris)
zoom: 4
});
// Add navigation controls
map.addControl(new maplibregl.NavigationControl());
// Add draw controls for polygon
let drawnPolygon = null;
let drawingMode = false;
let points = [];
let lineString = null;
let polygonFill = null;
// Add a button to toggle drawing mode
const drawButton = document.createElement('button');
drawButton.textContent = 'Draw Polygon';
drawButton.style.position = 'absolute';
drawButton.style.top = '10px';
drawButton.style.right = '10px';
drawButton.style.zIndex = '1';
drawButton.style.padding = '5px 10px';
drawButton.style.backgroundColor = '#0078ff';
drawButton.style.color = 'white';
drawButton.style.border = 'none';
drawButton.style.borderRadius = '3px';
drawButton.style.cursor = 'pointer';
document.getElementById('map').appendChild(drawButton);
drawButton.addEventListener('click', () => {
drawingMode = !drawingMode;
drawButton.textContent = drawingMode ? 'Cancel Drawing' : 'Draw Polygon';
if (!drawingMode) {
// Clear the drawing
points = [];
if (lineString) {
map.removeLayer('line-string');
map.removeSource('line-string');
lineString = null;
}
if (polygonFill) {
map.removeLayer('polygon-fill');
map.removeSource('polygon-fill');
polygonFill = null;
}
}
});
// Handle map click events for drawing
map.on('click', (e) => {
if (!drawingMode) return;
const coords = [e.lngLat.lng, e.lngLat.lat];
points.push(coords);
// If we have at least 3 points, create a polygon
if (points.length >= 3) {
const polygonCoords = [...points, points[0]]; // Close the polygon
// Create or update the line string
if (lineString) {
map.removeLayer('line-string');
map.removeSource('line-string');
}
lineString = {
type: 'Feature',
geometry: {
type: 'LineString',
coordinates: polygonCoords
}
};
map.addSource('line-string', {
type: 'geojson',
data: lineString
});
map.addLayer({
id: 'line-string',
type: 'line',
source: 'line-string',
paint: {
'line-color': '#0078ff',
'line-width': 2
}
});
// Create or update the polygon fill
if (polygonFill) {
map.removeLayer('polygon-fill');
map.removeSource('polygon-fill');
}
polygonFill = {
type: 'Feature',
geometry: {
type: 'Polygon',
coordinates: [polygonCoords]
}
};
map.addSource('polygon-fill', {
type: 'geojson',
data: polygonFill
});
map.addLayer({
id: 'polygon-fill',
type: 'fill',
source: 'polygon-fill',
paint: {
'fill-color': '#0078ff',
'fill-opacity': 0.2
}
});
// Store the drawn polygon for search
drawnPolygon = {
type: 'Polygon',
coordinates: [polygonCoords]
};
}
});
// Handle custom date range selection
document.getElementById('when').addEventListener('change', function() {
const customDateGroup = document.getElementById('customDateGroup');
const customDateEndGroup = document.getElementById('customDateEndGroup');
if (this.value === 'custom') {
customDateGroup.style.display = 'block';
customDateEndGroup.style.display = 'block';
} else {
customDateGroup.style.display = 'none';
customDateEndGroup.style.display = 'none';
}
});
// Handle form submission
document.getElementById('searchForm').addEventListener('submit', function(e) {
e.preventDefault();
// Show loading message
const resultElement = document.getElementById('result');
resultElement.textContent = 'Searching...';
resultElement.className = '';
resultElement.style.display = 'block';
// Get form values
const formData = new FormData(this);
const params = new URLSearchParams();
// Add form fields to params
for (const [key, value] of formData.entries()) {
if (value) {
params.append(key, value);
}
}
// Handle custom date range
if (formData.get('when') === 'custom') {
params.delete('when');
} else {
params.delete('start');
params.delete('stop');
}
// Prepare the request
let url = '/event/search';
let method = 'POST';
let body = null;
// If we have a drawn polygon, use it for the search
if (drawnPolygon) {
body = JSON.stringify({
geometry: drawnPolygon
});
} else if (formData.get('near') || formData.get('bbox')) {
// If we have near or bbox parameters, use GET request
url = '/event?' + params.toString();
method = 'GET';
} else {
// Default to a simple point search in Paris if no spatial filter is provided
body = JSON.stringify({
geometry: {
type: 'Point',
coordinates: [2.3522, 48.8566]
}
});
}
// Make the request
fetch(url + (method === 'GET' ? '' : '?' + params.toString()), {
method: method,
headers: {
'Content-Type': 'application/json'
},
body: method === 'POST' ? body : null
})
.then(response => {
if (response.ok) {
return response.json();
} else {
return response.text().then(text => {
throw new Error(text || response.statusText);
});
}
})
.then(data => {
// Show success message
resultElement.textContent = `Found ${data.features ? data.features.length : 0} events`;
resultElement.className = 'success';
// Display results
displayResults(data);
})
.catch(error => {
// Show error message
resultElement.textContent = `Error: ${error.message}`;
resultElement.className = 'error';
// Hide results container
document.getElementById('resultsContainer').style.display = 'none';
});
});
// Function to display search results
function displayResults(data) {
// Show results container
document.getElementById('resultsContainer').style.display = 'block';
// Initialize results map
const resultsMap = new maplibregl.Map({
container: 'resultsMap',
style: 'https://demotiles.maplibre.org/style.json',
center: [2.3522, 48.8566], // Default center (Paris)
zoom: 4
});
// Add navigation controls to results map
resultsMap.addControl(new maplibregl.NavigationControl());
// Add events to the map
resultsMap.on('load', function() {
// Add events as a source
resultsMap.addSource('events', {
type: 'geojson',
data: data
});
// Add a circle layer for events
resultsMap.addLayer({
id: 'events-circle',
type: 'circle',
source: 'events',
paint: {
'circle-radius': 8,
'circle-color': '#FF5722',
'circle-stroke-width': 2,
'circle-stroke-color': '#FFFFFF'
}
});
// Add popups for events
if (data.features) {
data.features.forEach(feature => {
const coordinates = feature.geometry.coordinates.slice();
const properties = feature.properties;
// Create popup content
let popupContent = '<div class="event-popup">';
popupContent += `<h3>${properties.label || 'Event'}</h3>`;
// Display key properties
if (properties.what) {
popupContent += `<p><strong>Type:</strong> ${properties.what}</p>`;
}
if (properties.where) {
popupContent += `<p><strong>Where:</strong> ${properties.where}</p>`;
}
if (properties.start) {
popupContent += `<p><strong>Start:</strong> ${properties.start}</p>`;
}
if (properties.stop) {
popupContent += `<p><strong>End:</strong> ${properties.stop}</p>`;
}
// Add link to view full event
popupContent += `<p><a href="/event/${properties.id}" target="_blank">View Event</a></p>`;
popupContent += '</div>';
// Create popup
const popup = new maplibregl.Popup({
closeButton: true,
closeOnClick: true
}).setHTML(popupContent);
// Add marker with popup
new maplibregl.Marker({
color: '#FF5722'
})
.setLngLat(coordinates)
.setPopup(popup)
.addTo(resultsMap);
});
// Fit map to events bounds
if (data.features.length > 0) {
const bounds = new maplibregl.LngLatBounds();
data.features.forEach(feature => {
bounds.extend(feature.geometry.coordinates);
});
resultsMap.fitBounds(bounds, {
padding: 50,
maxZoom: 12
});
}
}
});
// Populate table with results
const tableBody = document.getElementById('resultsTable').getElementsByTagName('tbody')[0];
tableBody.innerHTML = '';
if (data.features) {
data.features.forEach(feature => {
const properties = feature.properties;
const row = tableBody.insertRow();
row.insertCell(0).textContent = properties.id || '';
row.insertCell(1).textContent = properties.label || '';
row.insertCell(2).textContent = properties.type || '';
row.insertCell(3).textContent = properties.what || '';
row.insertCell(4).textContent = properties.where || '';
row.insertCell(5).textContent = properties.start || '';
row.insertCell(6).textContent = properties.stop || '';
});
}
// Store the data for download
window.searchResults = data;
}
// Handle tab switching
document.querySelectorAll('.tab-button').forEach(button => {
button.addEventListener('click', () => {
// Remove active class from all buttons and content
document.querySelectorAll('.tab-button').forEach(btn => btn.classList.remove('active'));
document.querySelectorAll('.tab-content').forEach(content => content.classList.remove('active'));
// Add active class to clicked button and corresponding content
button.classList.add('active');
document.getElementById(button.dataset.tab).classList.add('active');
});
});
// Handle CSV download
document.getElementById('downloadCsv').addEventListener('click', () => {
if (!window.searchResults || !window.searchResults.features) {
alert('No search results to download');
return;
}
// Convert GeoJSON to CSV
let csv = 'id,label,type,what,where,start,stop,longitude,latitude\\n';
window.searchResults.features.forEach(feature => {
const p = feature.properties;
const coords = feature.geometry.coordinates;
csv += `"${p.id || ''}","${p.label || ''}","${p.type || ''}","${p.what || ''}","${p.where || ''}","${p.start || ''}","${p.stop || ''}",${coords[0]},${coords[1]}\\n`;
});
// Create download link
const blob = new Blob([csv], { type: 'text/csv' });
const url = URL.createObjectURL(blob);
const a = document.createElement('a');
a.href = url;
a.download = 'search_results.csv';
document.body.appendChild(a);
a.click();
document.body.removeChild(a);
URL.revokeObjectURL(url);
});
// Handle JSON download
document.getElementById('downloadJson').addEventListener('click', () => {
if (!window.searchResults) {
alert('No search results to download');
return;
}
// Create download link
const blob = new Blob([JSON.stringify(window.searchResults, null, 2)], { type: 'application/json' });
const url = URL.createObjectURL(blob);
const a = document.createElement('a');
a.href = url;
a.download = 'search_results.json';
document.body.appendChild(a);
a.click();
document.body.removeChild(a);
URL.revokeObjectURL(url);
});
</script>
</body>
</html>
"""
# Set the response body and status
resp.text = html
resp.status = falcon.HTTP_200
logger.success("Successfully processed GET request to /demo/search")
except Exception as e:
logger.error(f"Error processing GET request to /demo/search: {e}")
resp.status = falcon.HTTP_500
resp.text = f"Error: {str(e)}"
def on_get_map_by_what(self, req, resp):
"""
Handle GET requests to the /demo/map-by-what endpoint.

View file

@ -10,15 +10,19 @@ from oedb.utils.logging import logger
def load_env_from_file():
"""
Load environment variables from .env file.
Load environment variables from .env file at the project root directory.
This ensures that database connection parameters are properly set.
Returns:
bool: True if the .env file exists and was loaded, False otherwise.
"""
if os.path.exists('.env'):
logger.info("Loading environment variables from .env file...")
with open('.env', 'r') as f:
# Determine the project root directory (parent directory of the oedb package)
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '../..'))
env_file_path = os.path.join(project_root, '.env')
if os.path.exists(env_file_path):
logger.info(f"Loading environment variables from {env_file_path}...")
with open(env_file_path, 'r') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#'):
@ -26,7 +30,7 @@ def load_env_from_file():
os.environ[key] = value
return True
else:
logger.warning(".env file not found")
logger.warning(f".env file not found at {env_file_path}")
return False
def db_connect():

153
test_env_path.py Executable file
View file

@ -0,0 +1,153 @@
#!/usr/bin/env python3
"""
Test script to verify that the load_env_from_file function correctly finds
the .env file at the project root directory, regardless of the current working directory.
"""
import os
import sys
import shutil
import tempfile
import subprocess
# Add the parent directory to the path so we can import from oedb
sys.path.insert(0, os.path.abspath(os.path.dirname(__file__)))
from oedb.utils.db import load_env_from_file
from oedb.utils.logging import logger
def test_load_env_from_file_in_root():
"""
Test the load_env_from_file function when run from the project root directory.
"""
print("\n=== Testing load_env_from_file from project root ===")
# Backup the .env file if it exists
env_exists = os.path.exists('.env')
if env_exists:
print("Backing up existing .env file...")
shutil.copy('.env', '.env.backup')
# Create a test .env file
print("Creating test .env file at project root...")
with open('.env', 'w') as f:
f.write("TEST_VAR_ROOT=test_value_root\n")
# Test the function
print("Testing load_env_from_file()...")
result = load_env_from_file()
print(f"load_env_from_file() returned: {result}")
print(f"TEST_VAR_ROOT environment variable: {os.getenv('TEST_VAR_ROOT')}")
# Clean up
os.remove('.env')
# Restore the original .env file if it existed
if env_exists:
print("Restoring original .env file...")
shutil.move('.env.backup', '.env')
def test_load_env_from_file_in_subdir():
"""
Test the load_env_from_file function when run from a subdirectory.
"""
print("\n=== Testing load_env_from_file from a subdirectory ===")
# Create a temporary subdirectory
with tempfile.TemporaryDirectory(dir='.') as temp_dir:
print(f"Created temporary subdirectory: {temp_dir}")
# Backup the .env file if it exists
env_exists = os.path.exists('.env')
if env_exists:
print("Backing up existing .env file...")
shutil.copy('.env', '.env.backup')
# Create a test .env file at the project root
print("Creating test .env file at project root...")
with open('.env', 'w') as f:
f.write("TEST_VAR_SUBDIR=test_value_subdir\n")
# Create a test script in the subdirectory
test_script_path = os.path.join(temp_dir, 'test_script.py')
print(f"Creating test script at {test_script_path}...")
with open(test_script_path, 'w') as f:
f.write("""#!/usr/bin/env python3
import os
import sys
# Add the project root to the path
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from oedb.utils.db import load_env_from_file
# Test the function
result = load_env_from_file()
print(f"load_env_from_file() returned: {result}")
print(f"TEST_VAR_SUBDIR environment variable: {os.getenv('TEST_VAR_SUBDIR')}")
""")
# Make the test script executable
os.chmod(test_script_path, 0o755)
# Run the test script from the subdirectory
print(f"Running test script from {temp_dir}...")
subprocess.run([sys.executable, test_script_path], cwd=temp_dir)
# Clean up
os.remove('.env')
# Restore the original .env file if it existed
if env_exists:
print("Restoring original .env file...")
shutil.move('.env.backup', '.env')
def test_osm_cal_script():
"""
Test the osm_cal.py script to ensure it can find the .env file.
"""
print("\n=== Testing osm_cal.py script ===")
# Backup the .env file if it exists
env_exists = os.path.exists('.env')
if env_exists:
print("Backing up existing .env file...")
shutil.copy('.env', '.env.backup')
# Create a test .env file with minimal required variables
print("Creating test .env file at project root...")
with open('.env', 'w') as f:
f.write("DB_NAME=test_db\n")
f.write("DB_HOST=localhost\n")
f.write("DB_USER=test_user\n")
f.write("POSTGRES_PASSWORD=test_password\n")
# Test the osm_cal.py script
print("Testing osm_cal.py script...")
try:
# We'll use a very short timeout since we expect it to try to connect to the database
# and fail, but we just want to verify it gets past the .env check
result = subprocess.run(
[sys.executable, 'extractors/osm_cal.py'],
capture_output=True,
text=True,
timeout=2 # Short timeout
)
print(f"Exit code: {result.returncode}")
print(f"Output: {result.stdout}")
print(f"Error: {result.stderr}")
except subprocess.TimeoutExpired:
print("Process timed out - this is expected as it's trying to connect to the database")
# Clean up
os.remove('.env')
# Restore the original .env file if it existed
if env_exists:
print("Restoring original .env file...")
shutil.move('.env.backup', '.env')
if __name__ == "__main__":
test_load_env_from_file_in_root()
test_load_env_from_file_in_subdir()
test_osm_cal_script()

80
test_osm_cal.py Executable file
View file

@ -0,0 +1,80 @@
#!/usr/bin/env python3
"""
Test script to verify that the osm_cal.py script can successfully submit events to the database.
"""
import os
import sys
import json
from datetime import datetime, timedelta
# Add the parent directory to the path so we can import from oedb
sys.path.insert(0, os.path.abspath(os.path.dirname(__file__)))
from extractors.osm_cal import submit_event, load_env_from_file
from oedb.utils.logging import logger
def create_test_event():
"""
Create a test event with a simple point geometry.
"""
# Create a point geometry (Paris coordinates)
geometry = {
"type": "Point",
"coordinates": [2.3522, 48.8566]
}
# Create event properties
now = datetime.now()
tomorrow = now + timedelta(days=1)
properties = {
"type": "scheduled",
"what": "test.event",
"where": "Paris, France",
"label": "Test Event",
"description": "This is a test event created by the test_osm_cal.py script",
"start": now.isoformat(),
"stop": tomorrow.isoformat(),
"url": "https://example.com",
"external_id": f"test-event-{now.timestamp()}",
"source": "Test Script"
}
# Create the event object
event = {
"type": "Feature",
"geometry": geometry,
"properties": properties
}
return event
def test_submit_event():
"""
Test the submit_event function with a test event.
"""
logger.info("Starting test_submit_event")
# Load environment variables from .env file
if not load_env_from_file():
logger.error("Required .env file not found. Exiting.")
sys.exit(1)
# Create a test event
event = create_test_event()
logger.info(f"Created test event: {event['properties']['label']}")
# Submit the event
logger.info("Submitting test event to the database...")
result = submit_event(event)
if result:
logger.success("Successfully submitted test event to the database")
else:
logger.error("Failed to submit test event to the database")
return result
if __name__ == "__main__":
test_submit_event()