diff --git a/api/config.dev.py b/api/config.dev.py index 7e7bb44..43a9ec7 100644 --- a/api/config.dev.py +++ b/api/config.dev.py @@ -30,5 +30,6 @@ ADDITIONAL_CORS_ORIGINS = [ "http://localhost:8888/", # for maputnik on 8888 ] TILE_SEMAPHORE_SIZE = 4 +EXPORT_SEMAPHORE_SIZE = 4 # vim: set ft=python : diff --git a/api/config.py.example b/api/config.py.example index 060d3c3..93be821 100644 --- a/api/config.py.example +++ b/api/config.py.example @@ -66,4 +66,8 @@ ADDITIONAL_CORS_ORIGINS = None # to the other features of the API ;) TILE_SEMAPHORE_SIZE = 4 +# How many asynchronous requests may generate exported data simultaneously. +# Keep this small. +EXPORT_SEMAPHORE_SIZE = 1 + # vim: set ft=python : diff --git a/api/obs/api/app.py b/api/obs/api/app.py index baa6e13..9693256 100644 --- a/api/obs/api/app.py +++ b/api/obs/api/app.py @@ -70,6 +70,7 @@ app.config.update( FRONTEND_HTTPS=True, TILES_FILE=None, TILE_SEMAPHORE_SIZE=4, + EXPORT_SEMAPHORE_SIZE=1, ) ) @@ -187,7 +188,10 @@ async def app_connect_db(app, loop): app.ctx._db_engine = await app.ctx._db_engine_ctx.__aenter__() if app.config.TILE_SEMAPHORE_SIZE: - app.ctx._tile_semaphore = asyncio.Semaphore(app.config.TILE_SEMAPHORE_SIZE) + app.ctx.tile_semaphore = asyncio.Semaphore(app.config.TILE_SEMAPHORE_SIZE) + + if app.config.EXPORT_SEMAPHORE_SIZE: + app.ctx.export_semaphore = asyncio.Semaphore(app.config.EXPORT_SEMAPHORE_SIZE) @app.after_server_stop diff --git a/api/obs/api/routes/exports.py b/api/obs/api/routes/exports.py index 4e84a72..13083c3 100644 --- a/api/obs/api/routes/exports.py +++ b/api/obs/api/routes/exports.py @@ -12,6 +12,7 @@ from sanic.response import raw from sanic.exceptions import InvalidUsage from obs.api.app import api, json as json_response +from obs.api.utils import use_request_semaphore class ExportFormat(str, Enum): @@ -60,60 +61,61 @@ def shapefile_zip(): @api.get(r"/export/events") async def export_events(req): - bbox = req.ctx.get_single_arg( - "bbox", default="-180,-90,180,90", convert=parse_bounding_box - ) - fmt = req.ctx.get_single_arg("fmt", convert=ExportFormat) - - events = await req.ctx.db.stream_scalars( - select(OvertakingEvent).where( - OvertakingEvent.geometry.bool_op("&&")(func.ST_Transform(bbox, 3857)) + async with use_request_semaphore(req, "export_semaphore", timeout=30): + bbox = req.ctx.get_single_arg( + "bbox", default="-180,-90,180,90", convert=parse_bounding_box ) - ) + fmt = req.ctx.get_single_arg("fmt", convert=ExportFormat) - if fmt == ExportFormat.SHAPEFILE: - with shapefile_zip() as (writer, zip_buffer): - writer.field("distance_overtaker", "N", decimal=4) - writer.field("distance_stationary", "N", decimal=4) - writer.field("way_id", "N", decimal=0) - writer.field("direction", "N", decimal=0) - writer.field("course", "N", decimal=4) - writer.field("speed", "N", decimal=4) + events = await req.ctx.db.stream_scalars( + select(OvertakingEvent).where( + OvertakingEvent.geometry.bool_op("&&")(func.ST_Transform(bbox, 3857)) + ) + ) + if fmt == ExportFormat.SHAPEFILE: + with shapefile_zip() as (writer, zip_buffer): + writer.field("distance_overtaker", "N", decimal=4) + writer.field("distance_stationary", "N", decimal=4) + writer.field("way_id", "N", decimal=0) + writer.field("direction", "N", decimal=0) + writer.field("course", "N", decimal=4) + writer.field("speed", "N", decimal=4) + + async for event in events: + writer.point(event.longitude, event.latitude) + writer.record( + distance_overtaker=event.distance_overtaker, + distance_stationary=event.distance_stationary, + direction=-1 if event.direction_reversed else 1, + way_id=event.way_id, + course=event.course, + speed=event.speed, + # "time"=event.time, + ) + + return raw(zip_buffer.getbuffer()) + + if fmt == ExportFormat.GEOJSON: + features = [] async for event in events: - writer.point(event.longitude, event.latitude) - writer.record( - distance_overtaker=event.distance_overtaker, - distance_stationary=event.distance_stationary, - direction=-1 if event.direction_reversed else 1, - way_id=event.way_id, - course=event.course, - speed=event.speed, - # "time"=event.time, + features.append( + { + "type": "Feature", + "geometry": json.loads(event.geometry), + "properties": { + "distance_overtaker": event.distance_overtaker, + "distance_stationary": event.distance_stationary, + "direction": -1 if event.direction_reversed else 1, + "way_id": event.way_id, + "course": event.course, + "speed": event.speed, + "time": event.time, + }, + } ) - return raw(zip_buffer.getbuffer()) + geojson = {"type": "FeatureCollection", "features": features} + return json_response(geojson) - if fmt == ExportFormat.GEOJSON: - features = [] - async for event in events: - features.append( - { - "type": "Feature", - "geometry": json.loads(event.geometry), - "properties": { - "distance_overtaker": event.distance_overtaker, - "distance_stationary": event.distance_stationary, - "direction": -1 if event.direction_reversed else 1, - "way_id": event.way_id, - "course": event.course, - "speed": event.speed, - "time": event.time, - }, - } - ) - - geojson = {"type": "FeatureCollection", "features": features} - return json_response(geojson) - - raise InvalidUsage("unknown export format") + raise InvalidUsage("unknown export format") diff --git a/api/obs/api/routes/tiles.py b/api/obs/api/routes/tiles.py index 66b6e0d..f0452a5 100644 --- a/api/obs/api/routes/tiles.py +++ b/api/obs/api/routes/tiles.py @@ -1,18 +1,16 @@ -import asyncio -from contextlib import asynccontextmanager from gzip import decompress from sqlite3 import connect from datetime import datetime, time, timedelta from typing import Optional, Tuple import dateutil.parser -from sanic.exceptions import Forbidden, InvalidUsage, ServiceUnavailable +from sanic.exceptions import Forbidden, InvalidUsage from sanic.response import raw -from sqlalchemy import select, text -from sqlalchemy.sql.expression import table, column +from sqlalchemy import text from obs.api.app import app +from obs.api.utils import use_request_semaphore def get_tile(filename, zoom, x, y): @@ -87,41 +85,9 @@ def get_filter_options( return user_id, start, end -@asynccontextmanager -async def use_tile_semaphore(req, timeout=10): - """ - If configured, acquire a semaphore for the map tile request and release it - after the context has finished. - - If the semaphore cannot be acquired within the timeout, issue a 503 Service - Unavailable error response that describes that the map tile database is - overloaded, so users know what the problem is. - - Operates as a noop when the tile semaphore is not enabled. - """ - sem = getattr(req.app.ctx, "_tile_semaphore", None) - - if sem is None: - yield - return - - try: - await asyncio.wait_for(sem.acquire(), timeout) - - try: - yield - finally: - sem.release() - - except asyncio.TimeoutError: - raise ServiceUnavailable( - "Too many map content requests, database overloaded. Please retry later." - ) - - @app.route(r"/tiles///") async def tiles(req, zoom: int, x: int, y: str): - async with use_tile_semaphore(req): + async with use_request_semaphore(req, "tile_semaphore"): if app.config.get("TILES_FILE"): tile = get_tile(req.app.config.TILES_FILE, int(zoom), int(x), int(y)) diff --git a/api/obs/api/utils.py b/api/obs/api/utils.py index 0ddd0f2..7197d43 100644 --- a/api/obs/api/utils.py +++ b/api/obs/api/utils.py @@ -1,12 +1,13 @@ +import asyncio +from contextlib import asynccontextmanager from datetime import datetime import logging +from os.path import commonpath, join, relpath import queue import tarfile -from os.path import commonpath, relpath, join import dateutil.parser - -from sanic.exceptions import InvalidUsage +from sanic.exceptions import InvalidUsage, ServiceUnavailable log = logging.getLogger(__name__) @@ -127,3 +128,35 @@ class StreamerHelper: await self.response.send(tosend) except queue.Empty: break + + +@asynccontextmanager +async def use_request_semaphore(req, semaphore_name, timeout=10): + """ + If configured, acquire a semaphore for the map tile request and release it + after the context has finished. + + If the semaphore cannot be acquired within the timeout, issue a 503 Service + Unavailable error response that describes that the database is overloaded, + so users know what the problem is. + + Operates as a noop when the tile semaphore is not enabled. + """ + semaphore = getattr(req.app.ctx, semaphore_name, None) + + if semaphore is None: + yield + return + + try: + await asyncio.wait_for(semaphore.acquire(), timeout) + + try: + yield + finally: + semaphore.release() + + except asyncio.TimeoutError: + raise ServiceUnavailable( + "Too many requests, database overloaded. Please retry later." + )