Merge pull request #337 from openbikesensor/next-semaphore
Semaphore for tile requests
This commit is contained in:
commit
d8e8d9aec1
|
@ -29,5 +29,7 @@ ADDITIONAL_CORS_ORIGINS = [
|
|||
"http://localhost:8880/", # for maputnik on 8880
|
||||
"http://localhost:8888/", # for maputnik on 8888
|
||||
]
|
||||
TILE_SEMAPHORE_SIZE = 4
|
||||
EXPORT_SEMAPHORE_SIZE = 4
|
||||
|
||||
# vim: set ft=python :
|
||||
|
|
|
@ -61,4 +61,13 @@ TILES_FILE = None
|
|||
# default. Python list, or whitespace separated string.
|
||||
ADDITIONAL_CORS_ORIGINS = None
|
||||
|
||||
# How many asynchronous requests may be sent to the database to generate tile
|
||||
# information. Should be less than POSTGRES_POOL_SIZE to leave some connections
|
||||
# to the other features of the API ;)
|
||||
TILE_SEMAPHORE_SIZE = 4
|
||||
|
||||
# How many asynchronous requests may generate exported data simultaneously.
|
||||
# Keep this small.
|
||||
EXPORT_SEMAPHORE_SIZE = 1
|
||||
|
||||
# vim: set ft=python :
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
import asyncio
|
||||
import logging
|
||||
import re
|
||||
|
||||
|
@ -23,7 +24,6 @@ from sqlalchemy.ext.asyncio import AsyncSession
|
|||
from obs.api.db import User, make_session, connect_db
|
||||
from obs.api.cors import setup_options, add_cors_headers
|
||||
from obs.api.utils import get_single_arg
|
||||
from sqlalchemy.util import asyncio
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
@ -58,6 +58,25 @@ app = Sanic(
|
|||
)
|
||||
configure_sanic_logging()
|
||||
|
||||
app.config.update(
|
||||
dict(
|
||||
DEBUG=False,
|
||||
VERBOSE=False,
|
||||
AUTO_RELOAD=False,
|
||||
POSTGRES_POOL_SIZE=20,
|
||||
POSTGRES_MAX_OVERFLOW=40,
|
||||
DEDICATED_WORKER=True,
|
||||
FRONTEND_URL=None,
|
||||
FRONTEND_HTTPS=True,
|
||||
TILES_FILE=None,
|
||||
TILE_SEMAPHORE_SIZE=4,
|
||||
EXPORT_SEMAPHORE_SIZE=1,
|
||||
)
|
||||
)
|
||||
|
||||
# overwrite from defaults again
|
||||
app.config.load_environment_vars("OBS_")
|
||||
|
||||
if isfile("./config.py"):
|
||||
app.update_config("./config.py")
|
||||
|
||||
|
@ -168,6 +187,12 @@ async def app_connect_db(app, loop):
|
|||
)
|
||||
app.ctx._db_engine = await app.ctx._db_engine_ctx.__aenter__()
|
||||
|
||||
if app.config.TILE_SEMAPHORE_SIZE:
|
||||
app.ctx.tile_semaphore = asyncio.Semaphore(app.config.TILE_SEMAPHORE_SIZE)
|
||||
|
||||
if app.config.EXPORT_SEMAPHORE_SIZE:
|
||||
app.ctx.export_semaphore = asyncio.Semaphore(app.config.EXPORT_SEMAPHORE_SIZE)
|
||||
|
||||
|
||||
@app.after_server_stop
|
||||
async def app_disconnect_db(app, loop):
|
||||
|
|
|
@ -12,6 +12,7 @@ from sanic.response import raw
|
|||
from sanic.exceptions import InvalidUsage
|
||||
|
||||
from obs.api.app import api, json as json_response
|
||||
from obs.api.utils import use_request_semaphore
|
||||
|
||||
|
||||
class ExportFormat(str, Enum):
|
||||
|
@ -26,7 +27,7 @@ def parse_bounding_box(input_string):
|
|||
func.ST_Point(left, bottom),
|
||||
func.ST_Point(right, top),
|
||||
),
|
||||
3857,
|
||||
4326,
|
||||
)
|
||||
|
||||
|
||||
|
@ -60,58 +61,61 @@ def shapefile_zip():
|
|||
|
||||
@api.get(r"/export/events")
|
||||
async def export_events(req):
|
||||
bbox = req.ctx.get_single_arg(
|
||||
"bbox", default="-180,-90,180,90", convert=parse_bounding_box
|
||||
)
|
||||
fmt = req.ctx.get_single_arg("fmt", convert=ExportFormat)
|
||||
async with use_request_semaphore(req, "export_semaphore", timeout=30):
|
||||
bbox = req.ctx.get_single_arg(
|
||||
"bbox", default="-180,-90,180,90", convert=parse_bounding_box
|
||||
)
|
||||
fmt = req.ctx.get_single_arg("fmt", convert=ExportFormat)
|
||||
|
||||
events = await req.ctx.db.stream_scalars(
|
||||
select(OvertakingEvent).where(OvertakingEvent.geometry.bool_op("&&")(bbox))
|
||||
)
|
||||
events = await req.ctx.db.stream_scalars(
|
||||
select(OvertakingEvent).where(
|
||||
OvertakingEvent.geometry.bool_op("&&")(func.ST_Transform(bbox, 3857))
|
||||
)
|
||||
)
|
||||
|
||||
if fmt == ExportFormat.SHAPEFILE:
|
||||
with shapefile_zip() as (writer, zip_buffer):
|
||||
writer.field("distance_overtaker", "N", decimal=4)
|
||||
writer.field("distance_stationary", "N", decimal=4)
|
||||
writer.field("way_id", "N", decimal=0)
|
||||
writer.field("direction", "N", decimal=0)
|
||||
writer.field("course", "N", decimal=4)
|
||||
writer.field("speed", "N", decimal=4)
|
||||
if fmt == ExportFormat.SHAPEFILE:
|
||||
with shapefile_zip() as (writer, zip_buffer):
|
||||
writer.field("distance_overtaker", "N", decimal=4)
|
||||
writer.field("distance_stationary", "N", decimal=4)
|
||||
writer.field("way_id", "N", decimal=0)
|
||||
writer.field("direction", "N", decimal=0)
|
||||
writer.field("course", "N", decimal=4)
|
||||
writer.field("speed", "N", decimal=4)
|
||||
|
||||
async for event in events:
|
||||
writer.point(event.longitude, event.latitude)
|
||||
writer.record(
|
||||
distance_overtaker=event.distance_overtaker,
|
||||
distance_stationary=event.distance_stationary,
|
||||
direction=-1 if event.direction_reversed else 1,
|
||||
way_id=event.way_id,
|
||||
course=event.course,
|
||||
speed=event.speed,
|
||||
# "time"=event.time,
|
||||
)
|
||||
|
||||
return raw(zip_buffer.getbuffer())
|
||||
|
||||
if fmt == ExportFormat.GEOJSON:
|
||||
features = []
|
||||
async for event in events:
|
||||
writer.point(event.longitude, event.latitude)
|
||||
writer.record(
|
||||
distance_overtaker=event.distance_overtaker,
|
||||
distance_stationary=event.distance_stationary,
|
||||
direction=-1 if event.direction_reversed else 1,
|
||||
way_id=event.way_id,
|
||||
course=event.course,
|
||||
speed=event.speed,
|
||||
# "time"=event.time,
|
||||
features.append(
|
||||
{
|
||||
"type": "Feature",
|
||||
"geometry": json.loads(event.geometry),
|
||||
"properties": {
|
||||
"distance_overtaker": event.distance_overtaker,
|
||||
"distance_stationary": event.distance_stationary,
|
||||
"direction": -1 if event.direction_reversed else 1,
|
||||
"way_id": event.way_id,
|
||||
"course": event.course,
|
||||
"speed": event.speed,
|
||||
"time": event.time,
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
return raw(zip_buffer.getbuffer())
|
||||
geojson = {"type": "FeatureCollection", "features": features}
|
||||
return json_response(geojson)
|
||||
|
||||
if fmt == ExportFormat.GEOJSON:
|
||||
features = []
|
||||
async for event in events:
|
||||
features.append(
|
||||
{
|
||||
"type": "Feature",
|
||||
"geometry": json.loads(event.geometry),
|
||||
"properties": {
|
||||
"distance_overtaker": event.distance_overtaker,
|
||||
"distance_stationary": event.distance_stationary,
|
||||
"direction": -1 if event.direction_reversed else 1,
|
||||
"way_id": event.way_id,
|
||||
"course": event.course,
|
||||
"speed": event.speed,
|
||||
"time": event.time,
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
geojson = {"type": "FeatureCollection", "features": features}
|
||||
return json_response(geojson)
|
||||
|
||||
raise InvalidUsage("unknown export format")
|
||||
raise InvalidUsage("unknown export format")
|
||||
|
|
|
@ -7,10 +7,10 @@ import dateutil.parser
|
|||
from sanic.exceptions import Forbidden, InvalidUsage
|
||||
from sanic.response import raw
|
||||
|
||||
from sqlalchemy import select, text
|
||||
from sqlalchemy.sql.expression import table, column
|
||||
from sqlalchemy import text
|
||||
|
||||
from obs.api.app import app
|
||||
from obs.api.utils import use_request_semaphore
|
||||
|
||||
|
||||
def get_tile(filename, zoom, x, y):
|
||||
|
@ -87,24 +87,25 @@ def get_filter_options(
|
|||
|
||||
@app.route(r"/tiles/<zoom:int>/<x:int>/<y:(\d+)\.pbf>")
|
||||
async def tiles(req, zoom: int, x: int, y: str):
|
||||
if app.config.get("TILES_FILE"):
|
||||
tile = get_tile(req.app.config.TILES_FILE, int(zoom), int(x), int(y))
|
||||
async with use_request_semaphore(req, "tile_semaphore"):
|
||||
if app.config.get("TILES_FILE"):
|
||||
tile = get_tile(req.app.config.TILES_FILE, int(zoom), int(x), int(y))
|
||||
|
||||
else:
|
||||
user_id, start, end = get_filter_options(req)
|
||||
else:
|
||||
user_id, start, end = get_filter_options(req)
|
||||
|
||||
tile = await req.ctx.db.scalar(
|
||||
text(
|
||||
f"select data from getmvt(:zoom, :x, :y, :user_id, :min_time, :max_time) as b(data, key);"
|
||||
).bindparams(
|
||||
zoom=int(zoom),
|
||||
x=int(x),
|
||||
y=int(y),
|
||||
user_id=user_id,
|
||||
min_time=start,
|
||||
max_time=end,
|
||||
tile = await req.ctx.db.scalar(
|
||||
text(
|
||||
f"select data from getmvt(:zoom, :x, :y, :user_id, :min_time, :max_time) as b(data, key);"
|
||||
).bindparams(
|
||||
zoom=int(zoom),
|
||||
x=int(x),
|
||||
y=int(y),
|
||||
user_id=user_id,
|
||||
min_time=start,
|
||||
max_time=end,
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
gzip = "gzip" in req.headers["accept-encoding"]
|
||||
|
||||
|
|
|
@ -1,12 +1,13 @@
|
|||
import asyncio
|
||||
from contextlib import asynccontextmanager
|
||||
from datetime import datetime
|
||||
import logging
|
||||
from os.path import commonpath, join, relpath
|
||||
import queue
|
||||
import tarfile
|
||||
from os.path import commonpath, relpath, join
|
||||
|
||||
import dateutil.parser
|
||||
|
||||
from sanic.exceptions import InvalidUsage
|
||||
from sanic.exceptions import InvalidUsage, ServiceUnavailable
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
@ -127,3 +128,35 @@ class StreamerHelper:
|
|||
await self.response.send(tosend)
|
||||
except queue.Empty:
|
||||
break
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def use_request_semaphore(req, semaphore_name, timeout=10):
|
||||
"""
|
||||
If configured, acquire a semaphore for the map tile request and release it
|
||||
after the context has finished.
|
||||
|
||||
If the semaphore cannot be acquired within the timeout, issue a 503 Service
|
||||
Unavailable error response that describes that the database is overloaded,
|
||||
so users know what the problem is.
|
||||
|
||||
Operates as a noop when the tile semaphore is not enabled.
|
||||
"""
|
||||
semaphore = getattr(req.app.ctx, semaphore_name, None)
|
||||
|
||||
if semaphore is None:
|
||||
yield
|
||||
return
|
||||
|
||||
try:
|
||||
await asyncio.wait_for(semaphore.acquire(), timeout)
|
||||
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
semaphore.release()
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
raise ServiceUnavailable(
|
||||
"Too many requests, database overloaded. Please retry later."
|
||||
)
|
||||
|
|
Loading…
Reference in a new issue