From c249b1638e0ac0a52fcf3cd45cce3ea7d019f98a Mon Sep 17 00:00:00 2001 From: Paul Bienkowski Date: Sat, 13 May 2023 20:42:22 +0200 Subject: [PATCH 1/3] Add semaphore to limit simultaneous requests to tile data --- api/config.dev.py | 1 + api/config.py.example | 5 +++ api/obs/api/app.py | 23 ++++++++++++- api/obs/api/routes/tiles.py | 67 ++++++++++++++++++++++++++++--------- 4 files changed, 79 insertions(+), 17 deletions(-) diff --git a/api/config.dev.py b/api/config.dev.py index d5a402f..7e7bb44 100644 --- a/api/config.dev.py +++ b/api/config.dev.py @@ -29,5 +29,6 @@ ADDITIONAL_CORS_ORIGINS = [ "http://localhost:8880/", # for maputnik on 8880 "http://localhost:8888/", # for maputnik on 8888 ] +TILE_SEMAPHORE_SIZE = 4 # vim: set ft=python : diff --git a/api/config.py.example b/api/config.py.example index e2ff283..060d3c3 100644 --- a/api/config.py.example +++ b/api/config.py.example @@ -61,4 +61,9 @@ TILES_FILE = None # default. Python list, or whitespace separated string. ADDITIONAL_CORS_ORIGINS = None +# How many asynchronous requests may be sent to the database to generate tile +# information. Should be less than POSTGRES_POOL_SIZE to leave some connections +# to the other features of the API ;) +TILE_SEMAPHORE_SIZE = 4 + # vim: set ft=python : diff --git a/api/obs/api/app.py b/api/obs/api/app.py index b785512..baa6e13 100644 --- a/api/obs/api/app.py +++ b/api/obs/api/app.py @@ -1,3 +1,4 @@ +import asyncio import logging import re @@ -23,7 +24,6 @@ from sqlalchemy.ext.asyncio import AsyncSession from obs.api.db import User, make_session, connect_db from obs.api.cors import setup_options, add_cors_headers from obs.api.utils import get_single_arg -from sqlalchemy.util import asyncio log = logging.getLogger(__name__) @@ -58,6 +58,24 @@ app = Sanic( ) configure_sanic_logging() +app.config.update( + dict( + DEBUG=False, + VERBOSE=False, + AUTO_RELOAD=False, + POSTGRES_POOL_SIZE=20, + POSTGRES_MAX_OVERFLOW=40, + DEDICATED_WORKER=True, + FRONTEND_URL=None, + FRONTEND_HTTPS=True, + TILES_FILE=None, + TILE_SEMAPHORE_SIZE=4, + ) +) + +# overwrite from defaults again +app.config.load_environment_vars("OBS_") + if isfile("./config.py"): app.update_config("./config.py") @@ -168,6 +186,9 @@ async def app_connect_db(app, loop): ) app.ctx._db_engine = await app.ctx._db_engine_ctx.__aenter__() + if app.config.TILE_SEMAPHORE_SIZE: + app.ctx._tile_semaphore = asyncio.Semaphore(app.config.TILE_SEMAPHORE_SIZE) + @app.after_server_stop async def app_disconnect_db(app, loop): diff --git a/api/obs/api/routes/tiles.py b/api/obs/api/routes/tiles.py index 9b6b652..66b6e0d 100644 --- a/api/obs/api/routes/tiles.py +++ b/api/obs/api/routes/tiles.py @@ -1,10 +1,12 @@ +import asyncio +from contextlib import asynccontextmanager from gzip import decompress from sqlite3 import connect from datetime import datetime, time, timedelta from typing import Optional, Tuple import dateutil.parser -from sanic.exceptions import Forbidden, InvalidUsage +from sanic.exceptions import Forbidden, InvalidUsage, ServiceUnavailable from sanic.response import raw from sqlalchemy import select, text @@ -85,26 +87,59 @@ def get_filter_options( return user_id, start, end +@asynccontextmanager +async def use_tile_semaphore(req, timeout=10): + """ + If configured, acquire a semaphore for the map tile request and release it + after the context has finished. + + If the semaphore cannot be acquired within the timeout, issue a 503 Service + Unavailable error response that describes that the map tile database is + overloaded, so users know what the problem is. + + Operates as a noop when the tile semaphore is not enabled. + """ + sem = getattr(req.app.ctx, "_tile_semaphore", None) + + if sem is None: + yield + return + + try: + await asyncio.wait_for(sem.acquire(), timeout) + + try: + yield + finally: + sem.release() + + except asyncio.TimeoutError: + raise ServiceUnavailable( + "Too many map content requests, database overloaded. Please retry later." + ) + + @app.route(r"/tiles///") async def tiles(req, zoom: int, x: int, y: str): - if app.config.get("TILES_FILE"): - tile = get_tile(req.app.config.TILES_FILE, int(zoom), int(x), int(y)) + async with use_tile_semaphore(req): + if app.config.get("TILES_FILE"): + tile = get_tile(req.app.config.TILES_FILE, int(zoom), int(x), int(y)) - else: - user_id, start, end = get_filter_options(req) + else: + user_id, start, end = get_filter_options(req) - tile = await req.ctx.db.scalar( - text( - f"select data from getmvt(:zoom, :x, :y, :user_id, :min_time, :max_time) as b(data, key);" - ).bindparams( - zoom=int(zoom), - x=int(x), - y=int(y), - user_id=user_id, - min_time=start, - max_time=end, + tile = await req.ctx.db.scalar( + text( + f"select data from getmvt(:zoom, :x, :y, :user_id, :min_time, :max_time) as b(data, key);" + ).bindparams( + zoom=int(zoom), + x=int(x), + y=int(y), + user_id=user_id, + min_time=start, + max_time=end, + ) ) - ) gzip = "gzip" in req.headers["accept-encoding"] From d3fbb113f38e09ef09473d4235c6883a7aebcd68 Mon Sep 17 00:00:00 2001 From: Paul Bienkowski Date: Sat, 13 May 2023 20:52:04 +0200 Subject: [PATCH 2/3] Fix export bounding box ESPG id --- api/obs/api/routes/exports.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/api/obs/api/routes/exports.py b/api/obs/api/routes/exports.py index 90218fd..4e84a72 100644 --- a/api/obs/api/routes/exports.py +++ b/api/obs/api/routes/exports.py @@ -26,7 +26,7 @@ def parse_bounding_box(input_string): func.ST_Point(left, bottom), func.ST_Point(right, top), ), - 3857, + 4326, ) @@ -66,7 +66,9 @@ async def export_events(req): fmt = req.ctx.get_single_arg("fmt", convert=ExportFormat) events = await req.ctx.db.stream_scalars( - select(OvertakingEvent).where(OvertakingEvent.geometry.bool_op("&&")(bbox)) + select(OvertakingEvent).where( + OvertakingEvent.geometry.bool_op("&&")(func.ST_Transform(bbox, 3857)) + ) ) if fmt == ExportFormat.SHAPEFILE: From a6811d4ba21f2cb67a940de04e3a048446a8dce8 Mon Sep 17 00:00:00 2001 From: Paul Bienkowski Date: Sat, 13 May 2023 20:52:45 +0200 Subject: [PATCH 3/3] Also add semaphore for exports --- api/config.dev.py | 1 + api/config.py.example | 4 ++ api/obs/api/app.py | 6 +- api/obs/api/routes/exports.py | 102 +++++++++++++++++----------------- api/obs/api/routes/tiles.py | 42 ++------------ api/obs/api/utils.py | 39 ++++++++++++- 6 files changed, 102 insertions(+), 92 deletions(-) diff --git a/api/config.dev.py b/api/config.dev.py index 7e7bb44..43a9ec7 100644 --- a/api/config.dev.py +++ b/api/config.dev.py @@ -30,5 +30,6 @@ ADDITIONAL_CORS_ORIGINS = [ "http://localhost:8888/", # for maputnik on 8888 ] TILE_SEMAPHORE_SIZE = 4 +EXPORT_SEMAPHORE_SIZE = 4 # vim: set ft=python : diff --git a/api/config.py.example b/api/config.py.example index 060d3c3..93be821 100644 --- a/api/config.py.example +++ b/api/config.py.example @@ -66,4 +66,8 @@ ADDITIONAL_CORS_ORIGINS = None # to the other features of the API ;) TILE_SEMAPHORE_SIZE = 4 +# How many asynchronous requests may generate exported data simultaneously. +# Keep this small. +EXPORT_SEMAPHORE_SIZE = 1 + # vim: set ft=python : diff --git a/api/obs/api/app.py b/api/obs/api/app.py index baa6e13..9693256 100644 --- a/api/obs/api/app.py +++ b/api/obs/api/app.py @@ -70,6 +70,7 @@ app.config.update( FRONTEND_HTTPS=True, TILES_FILE=None, TILE_SEMAPHORE_SIZE=4, + EXPORT_SEMAPHORE_SIZE=1, ) ) @@ -187,7 +188,10 @@ async def app_connect_db(app, loop): app.ctx._db_engine = await app.ctx._db_engine_ctx.__aenter__() if app.config.TILE_SEMAPHORE_SIZE: - app.ctx._tile_semaphore = asyncio.Semaphore(app.config.TILE_SEMAPHORE_SIZE) + app.ctx.tile_semaphore = asyncio.Semaphore(app.config.TILE_SEMAPHORE_SIZE) + + if app.config.EXPORT_SEMAPHORE_SIZE: + app.ctx.export_semaphore = asyncio.Semaphore(app.config.EXPORT_SEMAPHORE_SIZE) @app.after_server_stop diff --git a/api/obs/api/routes/exports.py b/api/obs/api/routes/exports.py index 4e84a72..13083c3 100644 --- a/api/obs/api/routes/exports.py +++ b/api/obs/api/routes/exports.py @@ -12,6 +12,7 @@ from sanic.response import raw from sanic.exceptions import InvalidUsage from obs.api.app import api, json as json_response +from obs.api.utils import use_request_semaphore class ExportFormat(str, Enum): @@ -60,60 +61,61 @@ def shapefile_zip(): @api.get(r"/export/events") async def export_events(req): - bbox = req.ctx.get_single_arg( - "bbox", default="-180,-90,180,90", convert=parse_bounding_box - ) - fmt = req.ctx.get_single_arg("fmt", convert=ExportFormat) - - events = await req.ctx.db.stream_scalars( - select(OvertakingEvent).where( - OvertakingEvent.geometry.bool_op("&&")(func.ST_Transform(bbox, 3857)) + async with use_request_semaphore(req, "export_semaphore", timeout=30): + bbox = req.ctx.get_single_arg( + "bbox", default="-180,-90,180,90", convert=parse_bounding_box ) - ) + fmt = req.ctx.get_single_arg("fmt", convert=ExportFormat) - if fmt == ExportFormat.SHAPEFILE: - with shapefile_zip() as (writer, zip_buffer): - writer.field("distance_overtaker", "N", decimal=4) - writer.field("distance_stationary", "N", decimal=4) - writer.field("way_id", "N", decimal=0) - writer.field("direction", "N", decimal=0) - writer.field("course", "N", decimal=4) - writer.field("speed", "N", decimal=4) + events = await req.ctx.db.stream_scalars( + select(OvertakingEvent).where( + OvertakingEvent.geometry.bool_op("&&")(func.ST_Transform(bbox, 3857)) + ) + ) + if fmt == ExportFormat.SHAPEFILE: + with shapefile_zip() as (writer, zip_buffer): + writer.field("distance_overtaker", "N", decimal=4) + writer.field("distance_stationary", "N", decimal=4) + writer.field("way_id", "N", decimal=0) + writer.field("direction", "N", decimal=0) + writer.field("course", "N", decimal=4) + writer.field("speed", "N", decimal=4) + + async for event in events: + writer.point(event.longitude, event.latitude) + writer.record( + distance_overtaker=event.distance_overtaker, + distance_stationary=event.distance_stationary, + direction=-1 if event.direction_reversed else 1, + way_id=event.way_id, + course=event.course, + speed=event.speed, + # "time"=event.time, + ) + + return raw(zip_buffer.getbuffer()) + + if fmt == ExportFormat.GEOJSON: + features = [] async for event in events: - writer.point(event.longitude, event.latitude) - writer.record( - distance_overtaker=event.distance_overtaker, - distance_stationary=event.distance_stationary, - direction=-1 if event.direction_reversed else 1, - way_id=event.way_id, - course=event.course, - speed=event.speed, - # "time"=event.time, + features.append( + { + "type": "Feature", + "geometry": json.loads(event.geometry), + "properties": { + "distance_overtaker": event.distance_overtaker, + "distance_stationary": event.distance_stationary, + "direction": -1 if event.direction_reversed else 1, + "way_id": event.way_id, + "course": event.course, + "speed": event.speed, + "time": event.time, + }, + } ) - return raw(zip_buffer.getbuffer()) + geojson = {"type": "FeatureCollection", "features": features} + return json_response(geojson) - if fmt == ExportFormat.GEOJSON: - features = [] - async for event in events: - features.append( - { - "type": "Feature", - "geometry": json.loads(event.geometry), - "properties": { - "distance_overtaker": event.distance_overtaker, - "distance_stationary": event.distance_stationary, - "direction": -1 if event.direction_reversed else 1, - "way_id": event.way_id, - "course": event.course, - "speed": event.speed, - "time": event.time, - }, - } - ) - - geojson = {"type": "FeatureCollection", "features": features} - return json_response(geojson) - - raise InvalidUsage("unknown export format") + raise InvalidUsage("unknown export format") diff --git a/api/obs/api/routes/tiles.py b/api/obs/api/routes/tiles.py index 66b6e0d..f0452a5 100644 --- a/api/obs/api/routes/tiles.py +++ b/api/obs/api/routes/tiles.py @@ -1,18 +1,16 @@ -import asyncio -from contextlib import asynccontextmanager from gzip import decompress from sqlite3 import connect from datetime import datetime, time, timedelta from typing import Optional, Tuple import dateutil.parser -from sanic.exceptions import Forbidden, InvalidUsage, ServiceUnavailable +from sanic.exceptions import Forbidden, InvalidUsage from sanic.response import raw -from sqlalchemy import select, text -from sqlalchemy.sql.expression import table, column +from sqlalchemy import text from obs.api.app import app +from obs.api.utils import use_request_semaphore def get_tile(filename, zoom, x, y): @@ -87,41 +85,9 @@ def get_filter_options( return user_id, start, end -@asynccontextmanager -async def use_tile_semaphore(req, timeout=10): - """ - If configured, acquire a semaphore for the map tile request and release it - after the context has finished. - - If the semaphore cannot be acquired within the timeout, issue a 503 Service - Unavailable error response that describes that the map tile database is - overloaded, so users know what the problem is. - - Operates as a noop when the tile semaphore is not enabled. - """ - sem = getattr(req.app.ctx, "_tile_semaphore", None) - - if sem is None: - yield - return - - try: - await asyncio.wait_for(sem.acquire(), timeout) - - try: - yield - finally: - sem.release() - - except asyncio.TimeoutError: - raise ServiceUnavailable( - "Too many map content requests, database overloaded. Please retry later." - ) - - @app.route(r"/tiles///") async def tiles(req, zoom: int, x: int, y: str): - async with use_tile_semaphore(req): + async with use_request_semaphore(req, "tile_semaphore"): if app.config.get("TILES_FILE"): tile = get_tile(req.app.config.TILES_FILE, int(zoom), int(x), int(y)) diff --git a/api/obs/api/utils.py b/api/obs/api/utils.py index 0ddd0f2..7197d43 100644 --- a/api/obs/api/utils.py +++ b/api/obs/api/utils.py @@ -1,12 +1,13 @@ +import asyncio +from contextlib import asynccontextmanager from datetime import datetime import logging +from os.path import commonpath, join, relpath import queue import tarfile -from os.path import commonpath, relpath, join import dateutil.parser - -from sanic.exceptions import InvalidUsage +from sanic.exceptions import InvalidUsage, ServiceUnavailable log = logging.getLogger(__name__) @@ -127,3 +128,35 @@ class StreamerHelper: await self.response.send(tosend) except queue.Empty: break + + +@asynccontextmanager +async def use_request_semaphore(req, semaphore_name, timeout=10): + """ + If configured, acquire a semaphore for the map tile request and release it + after the context has finished. + + If the semaphore cannot be acquired within the timeout, issue a 503 Service + Unavailable error response that describes that the database is overloaded, + so users know what the problem is. + + Operates as a noop when the tile semaphore is not enabled. + """ + semaphore = getattr(req.app.ctx, semaphore_name, None) + + if semaphore is None: + yield + return + + try: + await asyncio.wait_for(semaphore.acquire(), timeout) + + try: + yield + finally: + semaphore.release() + + except asyncio.TimeoutError: + raise ServiceUnavailable( + "Too many requests, database overloaded. Please retry later." + )