Also add semaphore for exports
This commit is contained in:
parent
d3fbb113f3
commit
a6811d4ba2
|
@ -30,5 +30,6 @@ ADDITIONAL_CORS_ORIGINS = [
|
||||||
"http://localhost:8888/", # for maputnik on 8888
|
"http://localhost:8888/", # for maputnik on 8888
|
||||||
]
|
]
|
||||||
TILE_SEMAPHORE_SIZE = 4
|
TILE_SEMAPHORE_SIZE = 4
|
||||||
|
EXPORT_SEMAPHORE_SIZE = 4
|
||||||
|
|
||||||
# vim: set ft=python :
|
# vim: set ft=python :
|
||||||
|
|
|
@ -66,4 +66,8 @@ ADDITIONAL_CORS_ORIGINS = None
|
||||||
# to the other features of the API ;)
|
# to the other features of the API ;)
|
||||||
TILE_SEMAPHORE_SIZE = 4
|
TILE_SEMAPHORE_SIZE = 4
|
||||||
|
|
||||||
|
# How many asynchronous requests may generate exported data simultaneously.
|
||||||
|
# Keep this small.
|
||||||
|
EXPORT_SEMAPHORE_SIZE = 1
|
||||||
|
|
||||||
# vim: set ft=python :
|
# vim: set ft=python :
|
||||||
|
|
|
@ -70,6 +70,7 @@ app.config.update(
|
||||||
FRONTEND_HTTPS=True,
|
FRONTEND_HTTPS=True,
|
||||||
TILES_FILE=None,
|
TILES_FILE=None,
|
||||||
TILE_SEMAPHORE_SIZE=4,
|
TILE_SEMAPHORE_SIZE=4,
|
||||||
|
EXPORT_SEMAPHORE_SIZE=1,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -187,7 +188,10 @@ async def app_connect_db(app, loop):
|
||||||
app.ctx._db_engine = await app.ctx._db_engine_ctx.__aenter__()
|
app.ctx._db_engine = await app.ctx._db_engine_ctx.__aenter__()
|
||||||
|
|
||||||
if app.config.TILE_SEMAPHORE_SIZE:
|
if app.config.TILE_SEMAPHORE_SIZE:
|
||||||
app.ctx._tile_semaphore = asyncio.Semaphore(app.config.TILE_SEMAPHORE_SIZE)
|
app.ctx.tile_semaphore = asyncio.Semaphore(app.config.TILE_SEMAPHORE_SIZE)
|
||||||
|
|
||||||
|
if app.config.EXPORT_SEMAPHORE_SIZE:
|
||||||
|
app.ctx.export_semaphore = asyncio.Semaphore(app.config.EXPORT_SEMAPHORE_SIZE)
|
||||||
|
|
||||||
|
|
||||||
@app.after_server_stop
|
@app.after_server_stop
|
||||||
|
|
|
@ -12,6 +12,7 @@ from sanic.response import raw
|
||||||
from sanic.exceptions import InvalidUsage
|
from sanic.exceptions import InvalidUsage
|
||||||
|
|
||||||
from obs.api.app import api, json as json_response
|
from obs.api.app import api, json as json_response
|
||||||
|
from obs.api.utils import use_request_semaphore
|
||||||
|
|
||||||
|
|
||||||
class ExportFormat(str, Enum):
|
class ExportFormat(str, Enum):
|
||||||
|
@ -60,60 +61,61 @@ def shapefile_zip():
|
||||||
|
|
||||||
@api.get(r"/export/events")
|
@api.get(r"/export/events")
|
||||||
async def export_events(req):
|
async def export_events(req):
|
||||||
bbox = req.ctx.get_single_arg(
|
async with use_request_semaphore(req, "export_semaphore", timeout=30):
|
||||||
"bbox", default="-180,-90,180,90", convert=parse_bounding_box
|
bbox = req.ctx.get_single_arg(
|
||||||
)
|
"bbox", default="-180,-90,180,90", convert=parse_bounding_box
|
||||||
fmt = req.ctx.get_single_arg("fmt", convert=ExportFormat)
|
|
||||||
|
|
||||||
events = await req.ctx.db.stream_scalars(
|
|
||||||
select(OvertakingEvent).where(
|
|
||||||
OvertakingEvent.geometry.bool_op("&&")(func.ST_Transform(bbox, 3857))
|
|
||||||
)
|
)
|
||||||
)
|
fmt = req.ctx.get_single_arg("fmt", convert=ExportFormat)
|
||||||
|
|
||||||
if fmt == ExportFormat.SHAPEFILE:
|
events = await req.ctx.db.stream_scalars(
|
||||||
with shapefile_zip() as (writer, zip_buffer):
|
select(OvertakingEvent).where(
|
||||||
writer.field("distance_overtaker", "N", decimal=4)
|
OvertakingEvent.geometry.bool_op("&&")(func.ST_Transform(bbox, 3857))
|
||||||
writer.field("distance_stationary", "N", decimal=4)
|
)
|
||||||
writer.field("way_id", "N", decimal=0)
|
)
|
||||||
writer.field("direction", "N", decimal=0)
|
|
||||||
writer.field("course", "N", decimal=4)
|
|
||||||
writer.field("speed", "N", decimal=4)
|
|
||||||
|
|
||||||
|
if fmt == ExportFormat.SHAPEFILE:
|
||||||
|
with shapefile_zip() as (writer, zip_buffer):
|
||||||
|
writer.field("distance_overtaker", "N", decimal=4)
|
||||||
|
writer.field("distance_stationary", "N", decimal=4)
|
||||||
|
writer.field("way_id", "N", decimal=0)
|
||||||
|
writer.field("direction", "N", decimal=0)
|
||||||
|
writer.field("course", "N", decimal=4)
|
||||||
|
writer.field("speed", "N", decimal=4)
|
||||||
|
|
||||||
|
async for event in events:
|
||||||
|
writer.point(event.longitude, event.latitude)
|
||||||
|
writer.record(
|
||||||
|
distance_overtaker=event.distance_overtaker,
|
||||||
|
distance_stationary=event.distance_stationary,
|
||||||
|
direction=-1 if event.direction_reversed else 1,
|
||||||
|
way_id=event.way_id,
|
||||||
|
course=event.course,
|
||||||
|
speed=event.speed,
|
||||||
|
# "time"=event.time,
|
||||||
|
)
|
||||||
|
|
||||||
|
return raw(zip_buffer.getbuffer())
|
||||||
|
|
||||||
|
if fmt == ExportFormat.GEOJSON:
|
||||||
|
features = []
|
||||||
async for event in events:
|
async for event in events:
|
||||||
writer.point(event.longitude, event.latitude)
|
features.append(
|
||||||
writer.record(
|
{
|
||||||
distance_overtaker=event.distance_overtaker,
|
"type": "Feature",
|
||||||
distance_stationary=event.distance_stationary,
|
"geometry": json.loads(event.geometry),
|
||||||
direction=-1 if event.direction_reversed else 1,
|
"properties": {
|
||||||
way_id=event.way_id,
|
"distance_overtaker": event.distance_overtaker,
|
||||||
course=event.course,
|
"distance_stationary": event.distance_stationary,
|
||||||
speed=event.speed,
|
"direction": -1 if event.direction_reversed else 1,
|
||||||
# "time"=event.time,
|
"way_id": event.way_id,
|
||||||
|
"course": event.course,
|
||||||
|
"speed": event.speed,
|
||||||
|
"time": event.time,
|
||||||
|
},
|
||||||
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
return raw(zip_buffer.getbuffer())
|
geojson = {"type": "FeatureCollection", "features": features}
|
||||||
|
return json_response(geojson)
|
||||||
|
|
||||||
if fmt == ExportFormat.GEOJSON:
|
raise InvalidUsage("unknown export format")
|
||||||
features = []
|
|
||||||
async for event in events:
|
|
||||||
features.append(
|
|
||||||
{
|
|
||||||
"type": "Feature",
|
|
||||||
"geometry": json.loads(event.geometry),
|
|
||||||
"properties": {
|
|
||||||
"distance_overtaker": event.distance_overtaker,
|
|
||||||
"distance_stationary": event.distance_stationary,
|
|
||||||
"direction": -1 if event.direction_reversed else 1,
|
|
||||||
"way_id": event.way_id,
|
|
||||||
"course": event.course,
|
|
||||||
"speed": event.speed,
|
|
||||||
"time": event.time,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
geojson = {"type": "FeatureCollection", "features": features}
|
|
||||||
return json_response(geojson)
|
|
||||||
|
|
||||||
raise InvalidUsage("unknown export format")
|
|
||||||
|
|
|
@ -1,18 +1,16 @@
|
||||||
import asyncio
|
|
||||||
from contextlib import asynccontextmanager
|
|
||||||
from gzip import decompress
|
from gzip import decompress
|
||||||
from sqlite3 import connect
|
from sqlite3 import connect
|
||||||
from datetime import datetime, time, timedelta
|
from datetime import datetime, time, timedelta
|
||||||
from typing import Optional, Tuple
|
from typing import Optional, Tuple
|
||||||
|
|
||||||
import dateutil.parser
|
import dateutil.parser
|
||||||
from sanic.exceptions import Forbidden, InvalidUsage, ServiceUnavailable
|
from sanic.exceptions import Forbidden, InvalidUsage
|
||||||
from sanic.response import raw
|
from sanic.response import raw
|
||||||
|
|
||||||
from sqlalchemy import select, text
|
from sqlalchemy import text
|
||||||
from sqlalchemy.sql.expression import table, column
|
|
||||||
|
|
||||||
from obs.api.app import app
|
from obs.api.app import app
|
||||||
|
from obs.api.utils import use_request_semaphore
|
||||||
|
|
||||||
|
|
||||||
def get_tile(filename, zoom, x, y):
|
def get_tile(filename, zoom, x, y):
|
||||||
|
@ -87,41 +85,9 @@ def get_filter_options(
|
||||||
return user_id, start, end
|
return user_id, start, end
|
||||||
|
|
||||||
|
|
||||||
@asynccontextmanager
|
|
||||||
async def use_tile_semaphore(req, timeout=10):
|
|
||||||
"""
|
|
||||||
If configured, acquire a semaphore for the map tile request and release it
|
|
||||||
after the context has finished.
|
|
||||||
|
|
||||||
If the semaphore cannot be acquired within the timeout, issue a 503 Service
|
|
||||||
Unavailable error response that describes that the map tile database is
|
|
||||||
overloaded, so users know what the problem is.
|
|
||||||
|
|
||||||
Operates as a noop when the tile semaphore is not enabled.
|
|
||||||
"""
|
|
||||||
sem = getattr(req.app.ctx, "_tile_semaphore", None)
|
|
||||||
|
|
||||||
if sem is None:
|
|
||||||
yield
|
|
||||||
return
|
|
||||||
|
|
||||||
try:
|
|
||||||
await asyncio.wait_for(sem.acquire(), timeout)
|
|
||||||
|
|
||||||
try:
|
|
||||||
yield
|
|
||||||
finally:
|
|
||||||
sem.release()
|
|
||||||
|
|
||||||
except asyncio.TimeoutError:
|
|
||||||
raise ServiceUnavailable(
|
|
||||||
"Too many map content requests, database overloaded. Please retry later."
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@app.route(r"/tiles/<zoom:int>/<x:int>/<y:(\d+)\.pbf>")
|
@app.route(r"/tiles/<zoom:int>/<x:int>/<y:(\d+)\.pbf>")
|
||||||
async def tiles(req, zoom: int, x: int, y: str):
|
async def tiles(req, zoom: int, x: int, y: str):
|
||||||
async with use_tile_semaphore(req):
|
async with use_request_semaphore(req, "tile_semaphore"):
|
||||||
if app.config.get("TILES_FILE"):
|
if app.config.get("TILES_FILE"):
|
||||||
tile = get_tile(req.app.config.TILES_FILE, int(zoom), int(x), int(y))
|
tile = get_tile(req.app.config.TILES_FILE, int(zoom), int(x), int(y))
|
||||||
|
|
||||||
|
|
|
@ -1,12 +1,13 @@
|
||||||
|
import asyncio
|
||||||
|
from contextlib import asynccontextmanager
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
import logging
|
import logging
|
||||||
|
from os.path import commonpath, join, relpath
|
||||||
import queue
|
import queue
|
||||||
import tarfile
|
import tarfile
|
||||||
from os.path import commonpath, relpath, join
|
|
||||||
|
|
||||||
import dateutil.parser
|
import dateutil.parser
|
||||||
|
from sanic.exceptions import InvalidUsage, ServiceUnavailable
|
||||||
from sanic.exceptions import InvalidUsage
|
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -127,3 +128,35 @@ class StreamerHelper:
|
||||||
await self.response.send(tosend)
|
await self.response.send(tosend)
|
||||||
except queue.Empty:
|
except queue.Empty:
|
||||||
break
|
break
|
||||||
|
|
||||||
|
|
||||||
|
@asynccontextmanager
|
||||||
|
async def use_request_semaphore(req, semaphore_name, timeout=10):
|
||||||
|
"""
|
||||||
|
If configured, acquire a semaphore for the map tile request and release it
|
||||||
|
after the context has finished.
|
||||||
|
|
||||||
|
If the semaphore cannot be acquired within the timeout, issue a 503 Service
|
||||||
|
Unavailable error response that describes that the database is overloaded,
|
||||||
|
so users know what the problem is.
|
||||||
|
|
||||||
|
Operates as a noop when the tile semaphore is not enabled.
|
||||||
|
"""
|
||||||
|
semaphore = getattr(req.app.ctx, semaphore_name, None)
|
||||||
|
|
||||||
|
if semaphore is None:
|
||||||
|
yield
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
await asyncio.wait_for(semaphore.acquire(), timeout)
|
||||||
|
|
||||||
|
try:
|
||||||
|
yield
|
||||||
|
finally:
|
||||||
|
semaphore.release()
|
||||||
|
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
raise ServiceUnavailable(
|
||||||
|
"Too many requests, database overloaded. Please retry later."
|
||||||
|
)
|
||||||
|
|
Loading…
Reference in a new issue