2016-04-14 17:46:30 +02:00
# backend.py
# openeventdatabase
2016-05-17 16:03:13 +02:00
from datetime import datetime
2016-05-13 21:11:33 +02:00
import json
2016-05-04 19:22:56 +02:00
import os
2016-06-05 20:03:19 +02:00
import re
2016-06-05 20:37:35 +02:00
import subprocess
2016-05-13 21:11:33 +02:00
2016-04-14 17:46:30 +02:00
import falcon
import psycopg2
2016-05-13 21:10:58 +02:00
import psycopg2 . extras
2016-05-13 20:56:04 +02:00
2016-04-14 17:46:30 +02:00
2016-05-04 19:22:56 +02:00
def db_connect ( ) :
2016-05-13 21:22:05 +02:00
return psycopg2 . connect (
dbname = os . getenv ( " DB_NAME " , " oedb " ) ,
host = os . getenv ( " DB_HOST " , None ) ,
password = os . getenv ( " POSTGRES_PASSWORD " , None ) ,
user = os . getenv ( " DB_USER " , None ) )
2016-05-04 19:22:56 +02:00
2016-05-13 20:56:04 +02:00
2016-05-17 16:03:13 +02:00
class EventEncoder ( json . JSONEncoder ) :
def default ( self , o ) :
if isinstance ( o , datetime ) :
return o . isoformat ( )
try :
return super ( ) . default ( o )
except TypeError :
return str ( o )
def dumps ( data ) :
2016-06-08 23:58:37 +02:00
return json . dumps ( data , cls = EventEncoder , sort_keys = True )
2016-05-17 16:03:13 +02:00
2016-05-13 21:25:33 +02:00
class HeaderMiddleware :
def process_response ( self , req , resp , resource ) :
resp . set_header ( ' X-Powered-By ' , ' OpenEventDatabase ' )
resp . set_header ( ' Access-Control-Allow-Origin ' , ' * ' )
resp . set_header ( ' Access-Control-Allow-Headers ' , ' X-Requested-With ' )
2016-05-23 14:56:10 +02:00
resp . set_header ( ' Access-Control-Allow-Headers ' , ' Content-Type ' )
2016-05-13 20:56:04 +02:00
2016-04-14 17:46:30 +02:00
class StatsResource ( object ) :
def on_get ( self , req , resp ) :
2016-05-04 19:22:56 +02:00
db = db_connect ( )
2016-05-13 21:10:58 +02:00
cur = db . cursor ( cursor_factory = psycopg2 . extras . DictCursor )
2016-06-05 20:24:47 +02:00
# estimated row count, way faster then count(*)
cur . execute ( " SELECT reltuples FROM pg_class r WHERE relname = ' events ' ; " )
count = cur . fetchone ( ) [ 0 ]
2016-06-05 20:37:35 +02:00
cur . execute ( " SELECT max(lastupdate) as last_updated, current_timestamp-pg_postmaster_start_time() from events; " )
pg_stats = cur . fetchone ( )
last = pg_stats [ 0 ]
pg_uptime = pg_stats [ 1 ]
uptime = subprocess . check_output ( [ " uptime " , " -p " ] ) . decode ( ' utf-8 ' ) [ 0 : - 1 ]
2016-04-14 17:46:30 +02:00
cur . close ( )
db . close ( )
2016-06-05 20:37:35 +02:00
resp . body = dumps ( dict ( events_count = count , last_updated = last , uptime = uptime , db_uptime = pg_uptime ) )
2016-05-05 22:31:54 +02:00
resp . status = falcon . HTTP_200
2016-05-13 20:56:04 +02:00
2016-05-17 09:45:11 +02:00
class BaseEvent :
2016-05-13 21:49:52 +02:00
def row_to_feature ( self , row ) :
2016-05-17 09:45:11 +02:00
properties = dict ( row [ ' events_tags ' ] )
properties . update ( {
2016-05-17 16:03:13 +02:00
' createdate ' : row [ ' createdate ' ] ,
2016-05-17 17:53:45 +02:00
' lastupdate ' : row [ ' lastupdate ' ] ,
" id " : row [ ' events_id ' ]
2016-05-17 09:45:11 +02:00
} )
2016-05-17 10:05:39 +02:00
if " distance " in row :
properties [ ' distance ' ] = row [ ' distance ' ]
2016-05-13 21:49:52 +02:00
return {
" type " : " Feature " ,
" geometry " : json . loads ( row [ ' geometry ' ] ) ,
2016-05-17 09:45:11 +02:00
" properties " : properties
2016-05-13 21:49:52 +02:00
}
2016-05-30 08:57:08 +02:00
def rows_to_collection ( self , rows ) :
2016-05-17 10:05:39 +02:00
return {
" type " : " FeatureCollection " ,
2016-05-30 08:03:17 +02:00
" features " : [ self . row_to_feature ( r ) for r in rows ] ,
2016-05-30 08:57:08 +02:00
" count " : len ( rows )
2016-05-17 10:05:39 +02:00
}
2016-05-17 09:45:11 +02:00
class EventsResource ( BaseEvent ) :
2016-05-13 20:56:04 +02:00
def on_get ( self , req , resp ) :
2016-05-05 22:31:54 +02:00
db = db_connect ( )
2016-05-13 21:49:52 +02:00
cur = db . cursor ( cursor_factory = psycopg2 . extras . DictCursor )
2016-05-17 09:45:11 +02:00
cur . execute ( " SELECT events_id, events_tags, createdate, lastupdate, st_asgeojson(geom) as geometry FROM events JOIN geo ON (hash=events_geo) " )
2016-05-30 08:57:08 +02:00
resp . body = dumps ( self . rows_to_collection ( cur . fetchall ( ) ) )
2016-04-14 17:46:30 +02:00
resp . status = falcon . HTTP_200
2016-05-13 20:56:04 +02:00
2016-05-17 09:45:11 +02:00
class EventResource ( BaseEvent ) :
2016-05-13 20:56:04 +02:00
def maybe_insert_geometry ( self , geometry , cur ) :
2016-05-07 10:59:57 +02:00
# insert into geo table if not existing
2016-05-17 09:45:11 +02:00
cur . execute ( """ INSERT INTO geo (hash, geom, geom_center) SELECT *, st_centroid(geom) FROM (SELECT md5(ewkt) as hash, st_setsrid(st_geomfromewkt(ewkt),4326) as geom FROM (SELECT st_asewkt(st_geomfromgeojson( %s )) as ewkt) as g) as i ON CONFLICT DO NOTHING RETURNING hash; """ , ( geometry , ) )
2016-05-07 10:59:57 +02:00
# get its id (md5 hash)
h = cur . fetchone ( )
if h is None :
2016-05-13 20:56:04 +02:00
cur . execute ( """ SELECT md5(st_asewkt(st_geomfromgeojson( %s ))); """ , ( geometry , ) )
2016-05-07 10:59:57 +02:00
h = cur . fetchone ( )
return h
2016-06-05 20:03:19 +02:00
def relative_time ( self , when , cur ) :
2016-06-08 23:38:49 +02:00
event_start = cur . mogrify ( " %s " , ( when , ) ) . decode ( " utf-8 " )
2016-06-09 00:21:17 +02:00
event_stop = cur . mogrify ( " %s " , ( when , ) ) . decode ( " utf-8 " )
2016-06-05 20:03:19 +02:00
if when == ' NOW ' :
event_start = " now() "
2016-06-09 00:21:17 +02:00
event_stop = " now() "
2016-06-08 23:38:49 +02:00
if when == ' TODAY ' :
2016-06-05 20:03:19 +02:00
event_start = " CURRENT_DATE "
2016-06-09 00:21:17 +02:00
event_stop = " CURRENT_DATE + INTERVAL ' 1 DAY ' "
2016-06-08 23:38:49 +02:00
if when == ' TOMORROW ' :
2016-06-05 20:03:19 +02:00
event_start = " CURRENT_DATE + INTERVAL ' 1 DAY ' "
2016-06-09 00:21:17 +02:00
event_stop = " CURRENT_DATE + INTERVAL ' 2 DAY ' "
2016-06-08 23:38:49 +02:00
if when == ' YESTERDAY ' :
2016-06-05 20:03:19 +02:00
event_start = " CURRENT_DATE - INTERVAL ' 1 DAY ' "
2016-06-09 00:21:17 +02:00
event_stop = " CURRENT_DATE "
m = re . match ( ' (LAST|NEXT)(MONTH|WEEK|DAY|HOUR|MINUTE) ' , when )
if m is not None :
2016-06-13 23:42:40 +02:00
when = m . group ( 1 ) + ' 1 ' + m . group ( 2 ) + ' S '
2016-06-09 00:21:17 +02:00
m = re . match ( ' (LAST|NEXT)([0-9]*)(MONTH|WEEK|MINUTE|HOUR|DAY)S ' , when )
if m is not None :
if m . group ( 1 ) == ' LAST ' :
2016-06-09 00:25:09 +02:00
event_start = " now() - INTERVAL ' %s %s ' " % m . group ( 2 , 3 )
2016-06-09 00:21:17 +02:00
event_stop = " now() "
else :
event_start = " now() "
2016-06-09 00:25:09 +02:00
event_stop = " now() + INTERVAL ' %s %s ' " % m . group ( 2 , 3 )
2016-06-08 23:38:49 +02:00
2016-06-05 20:03:19 +02:00
return event_start , event_stop
2016-06-08 23:38:49 +02:00
2016-05-13 20:56:04 +02:00
def on_get ( self , req , resp , id = None ) :
2016-05-04 19:22:56 +02:00
db = db_connect ( )
2016-05-17 09:45:11 +02:00
cur = db . cursor ( cursor_factory = psycopg2 . extras . DictCursor )
2016-05-06 15:08:32 +02:00
if id is None :
2016-05-06 20:51:46 +02:00
# get query search parameters
2016-05-06 18:06:08 +02:00
if ' bbox ' in req . params :
2016-05-06 20:51:46 +02:00
# limit search with bbox (E,S,W,N)
2016-05-08 14:47:20 +02:00
event_bbox = cur . mogrify ( " AND geom && ST_SetSRID(ST_MakeBox2D(ST_Point( %s , %s ),ST_Point( %s , %s )),4326) " , tuple ( req . params [ ' bbox ' ] ) ) . decode ( " utf-8 " )
2016-05-08 16:09:29 +02:00
event_dist = " "
2016-05-06 22:21:12 +02:00
elif ' near ' in req . params :
2016-05-13 20:56:04 +02:00
# Limit search with location+distance
# (long, lat, distance in meters)
if len ( req . params [ ' near ' ] ) < 3 :
2016-05-09 16:11:17 +02:00
dist = 1
else :
dist = req . params [ ' near ' ] [ 2 ]
2016-05-17 10:05:39 +02:00
event_bbox = cur . mogrify ( " AND ST_Intersects(geom, ST_Buffer(st_setsrid(st_makepoint( %s , %s ),4326)::geography, %s )::geometry) " , ( req . params [ ' near ' ] [ 0 ] , req . params [ ' near ' ] [ 1 ] , dist ) ) . decode ( " utf-8 " )
2016-06-04 23:05:06 +02:00
event_dist = cur . mogrify ( " ST_Length(ST_ShortestLine(geom, st_setsrid(st_makepoint( %s , %s ),4326))::geography)::integer as distance, " , ( req . params [ ' near ' ] [ 0 ] , req . params [ ' near ' ] [ 1 ] ) ) . decode ( " utf-8 " )
2016-05-06 18:01:55 +02:00
else :
2016-05-06 20:51:46 +02:00
event_bbox = " "
2016-05-08 16:09:29 +02:00
event_dist = " "
2016-05-06 20:51:46 +02:00
if ' when ' in req . params :
# limit search with fixed time
2016-05-30 07:51:16 +02:00
when = req . params [ ' when ' ] . upper ( )
2016-06-05 20:03:19 +02:00
event_when = " tstzrange( %s , %s , ' [] ' ) " % ( self . relative_time ( when , cur ) )
2016-05-06 22:21:12 +02:00
elif ' start ' in req . params and ' stop ' in req . params :
2016-05-08 14:43:41 +02:00
# limit search with fixed time (start to stop)
2016-06-05 20:03:19 +02:00
event_start , unused = self . relative_time ( req . params [ ' start ' ] , cur )
unused , event_stop = self . relative_time ( req . params [ ' stop ' ] , cur )
event_when = " tstzrange( %s , %s , ' [] ' ) " % ( event_start , event_stop )
2016-05-08 14:43:41 +02:00
elif ' start ' in req . params and ' stop ' not in req . params :
# limit search with fixed time (start to now)
2016-06-05 20:03:19 +02:00
event_start , unused = self . relative_time ( req . params [ ' start ' ] , cur )
event_when = " tstzrange( %s ,now(), ' [] ' ) " % event_start
2016-05-08 14:43:41 +02:00
elif ' start ' not in req . params and ' stop ' in req . params :
# limit search with fixed time (now to stop)
2016-06-05 20:03:19 +02:00
unused , event_stop = self . relative_time ( req . params [ ' stop ' ] , cur )
event_when = " tstzrange(now(), %s , ' [] ' ) " % event_stop
2016-05-06 20:51:46 +02:00
else :
2016-05-17 10:05:39 +02:00
event_when = " tstzrange(now(),now(), ' [] ' ) "
2016-05-06 20:51:46 +02:00
if ' what ' in req . params :
# limit search based on "what"
2016-05-13 20:56:04 +02:00
event_what = cur . mogrify ( " AND events_what LIKE %s " , ( req . params [ ' what ' ] + " % " , ) ) . decode ( " utf-8 " )
2016-05-06 20:51:46 +02:00
else :
event_what = " "
if ' type ' in req . params :
# limit search based on type (scheduled, forecast, unscheduled)
2016-05-13 20:56:04 +02:00
event_type = cur . mogrify ( " AND events_type = %s " , ( req . params [ ' type ' ] , ) ) . decode ( " utf-8 " )
2016-05-06 20:51:46 +02:00
else :
event_type = " "
2016-05-09 14:39:27 +02:00
event_geom = " geom_center "
if ' geom ' in req . params :
if req . params [ ' geom ' ] == ' full ' :
event_geom = " geom "
2016-05-30 09:06:25 +02:00
else :
event_geom = cur . mogrify ( " ST_SnapToGrid(geom, %s ) " , ( req . params [ ' geom ' ] , ) ) . decode ( " utf-8 " )
2016-05-09 14:39:27 +02:00
2016-05-13 20:56:04 +02:00
# Search recent active events.
2016-05-17 10:05:39 +02:00
sql = """ SELECT events_id, events_tags, createdate, lastupdate, {event_dist} st_asgeojson( {event_geom} ) as geometry FROM events JOIN geo ON (hash=events_geo) {event_bbox} WHERE events_when && {event_when} {event_what} {event_type} ORDER BY createdate DESC LIMIT 200 """
# No user generated content here, so format is safe.
sql = sql . format ( event_dist = event_dist , event_geom = event_geom ,
event_bbox = event_bbox , event_what = event_what ,
event_when = event_when , event_type = event_type )
cur . execute ( sql )
2016-05-30 08:57:08 +02:00
resp . body = dumps ( self . rows_to_collection ( cur . fetchall ( ) ) )
2016-05-06 18:01:55 +02:00
resp . status = falcon . HTTP_200
2016-05-06 15:08:32 +02:00
else :
2016-05-17 09:45:11 +02:00
# Get single event geojson Feature by id.
cur . execute ( " SELECT events_id, events_tags, createdate, lastupdate, st_asgeojson(geom) as geometry FROM events JOIN geo ON (hash=events_geo) WHERE events_id= %s " , [ id ] )
2016-05-06 15:08:32 +02:00
2016-05-06 18:01:55 +02:00
e = cur . fetchone ( )
if e is not None :
2016-05-17 16:03:13 +02:00
resp . body = dumps ( self . row_to_feature ( e ) )
2016-05-06 18:01:55 +02:00
resp . status = falcon . HTTP_200
else :
resp . status = falcon . HTTP_404
2016-04-20 07:48:58 +02:00
db . close ( )
2016-05-07 10:59:57 +02:00
def insert_or_update ( self , req , resp , id , query ) :
2016-05-03 17:11:43 +02:00
2016-04-30 23:50:28 +02:00
# get request body payload (geojson Feature)
2016-04-14 19:05:59 +02:00
body = req . stream . read ( ) . decode ( ' utf-8 ' )
2016-05-13 20:56:04 +02:00
j = json . loads ( body )
2016-05-03 17:11:43 +02:00
if " properties " not in j or " geometry " not in j :
resp . body = " missing ' geometry ' or ' properties ' elements "
resp . status = falcon . HTTP_400
if " start " not in j [ ' properties ' ] :
event_start = j [ ' properties ' ] [ ' when ' ]
else :
event_start = j [ ' properties ' ] [ ' start ' ]
if " stop " not in j [ ' properties ' ] :
event_stop = j [ ' properties ' ] [ ' when ' ]
else :
event_stop = j [ ' properties ' ] [ ' stop ' ]
if event_start == event_stop :
2016-05-06 17:27:01 +02:00
bounds = ' [] '
2016-05-03 17:11:43 +02:00
else :
2016-05-06 17:27:01 +02:00
bounds = ' [) '
2016-04-14 19:05:59 +02:00
# connect to db and insert
2016-05-04 19:22:56 +02:00
db = db_connect ( )
2016-04-14 19:05:59 +02:00
cur = db . cursor ( )
2016-04-30 23:50:28 +02:00
# get the geometry part
2016-05-17 16:03:13 +02:00
geometry = dumps ( j [ ' geometry ' ] )
2016-05-07 10:59:57 +02:00
h = self . maybe_insert_geometry ( geometry , cur )
2016-05-17 16:03:13 +02:00
params = ( j [ ' properties ' ] [ ' type ' ] , j [ ' properties ' ] [ ' what ' ] , event_start , event_stop , bounds , dumps ( j [ ' properties ' ] ) , h [ 0 ] )
2016-05-13 20:56:04 +02:00
if id :
2016-05-07 10:59:57 +02:00
params = params + ( id , )
2016-05-13 20:56:04 +02:00
cur . execute ( query , params )
2016-04-14 19:05:59 +02:00
# get newly created event id
e = cur . fetchone ( )
db . commit ( )
cur . close ( )
db . close ( )
# send back to client
2016-06-07 06:38:07 +02:00
if e is None :
resp . status = falcon . HTTP_409
else :
resp . body = """ { " id " : " %s " } """ % ( e [ 0 ] )
resp . status = falcon . HTTP_201
2016-04-14 19:05:59 +02:00
2016-05-07 10:59:57 +02:00
def on_post ( self , req , resp ) :
2016-06-07 06:38:48 +02:00
self . insert_or_update ( req , resp , None , """ INSERT INTO events ( events_type, events_what, events_when, events_tags, events_geo) VALUES ( %s , %s , tstzrange( %s , %s , %s ) , %s , %s ) ON CONFLICT DO NOTHING RETURNING events_id; """ )
2016-05-07 10:59:57 +02:00
def on_put ( self , req , resp , id ) :
self . insert_or_update ( req , resp , id , """ UPDATE events SET ( events_type, events_what, events_when, events_tags, events_geo) = ( %s , %s , tstzrange( %s , %s , %s ) , %s , %s ) WHERE events_id = %s RETURNING events_id; """ )
2016-05-08 17:06:45 +02:00
def on_delete ( self , req , resp , id ) :
db = db_connect ( )
cur = db . cursor ( )
2016-05-13 20:56:04 +02:00
cur . execute ( """ DELETE FROM events WHERE events_id = %s ; """ , ( id , ) ) ;
2016-05-08 17:06:45 +02:00
db . commit ( )
cur . close ( )
db . close ( )
if cur . rowcount :
resp . status = falcon . HTTP_204
else :
resp . status = falcon . HTTP_404
2016-05-13 21:25:33 +02:00
# Falcon.API instances are callable WSGI apps.
app = falcon . API ( middleware = [ HeaderMiddleware ( ) ] )
2016-04-14 17:46:30 +02:00
# Resources are represented by long-lived class instances
2016-05-05 22:31:54 +02:00
events = EventsResource ( )
2016-04-14 19:05:59 +02:00
event = EventResource ( )
2016-04-14 17:46:30 +02:00
stats = StatsResource ( )
# things will handle all requests to the matching URL path
2016-05-05 22:31:54 +02:00
app . add_route ( ' /events ' , events )
2016-04-30 23:50:28 +02:00
app . add_route ( ' /event/ {id} ' , event ) # handle single event requests
2016-04-14 19:05:59 +02:00
app . add_route ( ' /event ' , event ) # handle single event requests
2016-04-14 17:46:30 +02:00
app . add_route ( ' /stats ' , stats )