Add semaphore to limit simultaneous requests to tile data
This commit is contained in:
parent
dd2e995720
commit
c249b1638e
|
@ -29,5 +29,6 @@ ADDITIONAL_CORS_ORIGINS = [
|
||||||
"http://localhost:8880/", # for maputnik on 8880
|
"http://localhost:8880/", # for maputnik on 8880
|
||||||
"http://localhost:8888/", # for maputnik on 8888
|
"http://localhost:8888/", # for maputnik on 8888
|
||||||
]
|
]
|
||||||
|
TILE_SEMAPHORE_SIZE = 4
|
||||||
|
|
||||||
# vim: set ft=python :
|
# vim: set ft=python :
|
||||||
|
|
|
@ -61,4 +61,9 @@ TILES_FILE = None
|
||||||
# default. Python list, or whitespace separated string.
|
# default. Python list, or whitespace separated string.
|
||||||
ADDITIONAL_CORS_ORIGINS = None
|
ADDITIONAL_CORS_ORIGINS = None
|
||||||
|
|
||||||
|
# How many asynchronous requests may be sent to the database to generate tile
|
||||||
|
# information. Should be less than POSTGRES_POOL_SIZE to leave some connections
|
||||||
|
# to the other features of the API ;)
|
||||||
|
TILE_SEMAPHORE_SIZE = 4
|
||||||
|
|
||||||
# vim: set ft=python :
|
# vim: set ft=python :
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
import re
|
import re
|
||||||
|
|
||||||
|
@ -23,7 +24,6 @@ from sqlalchemy.ext.asyncio import AsyncSession
|
||||||
from obs.api.db import User, make_session, connect_db
|
from obs.api.db import User, make_session, connect_db
|
||||||
from obs.api.cors import setup_options, add_cors_headers
|
from obs.api.cors import setup_options, add_cors_headers
|
||||||
from obs.api.utils import get_single_arg
|
from obs.api.utils import get_single_arg
|
||||||
from sqlalchemy.util import asyncio
|
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -58,6 +58,24 @@ app = Sanic(
|
||||||
)
|
)
|
||||||
configure_sanic_logging()
|
configure_sanic_logging()
|
||||||
|
|
||||||
|
app.config.update(
|
||||||
|
dict(
|
||||||
|
DEBUG=False,
|
||||||
|
VERBOSE=False,
|
||||||
|
AUTO_RELOAD=False,
|
||||||
|
POSTGRES_POOL_SIZE=20,
|
||||||
|
POSTGRES_MAX_OVERFLOW=40,
|
||||||
|
DEDICATED_WORKER=True,
|
||||||
|
FRONTEND_URL=None,
|
||||||
|
FRONTEND_HTTPS=True,
|
||||||
|
TILES_FILE=None,
|
||||||
|
TILE_SEMAPHORE_SIZE=4,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# overwrite from defaults again
|
||||||
|
app.config.load_environment_vars("OBS_")
|
||||||
|
|
||||||
if isfile("./config.py"):
|
if isfile("./config.py"):
|
||||||
app.update_config("./config.py")
|
app.update_config("./config.py")
|
||||||
|
|
||||||
|
@ -168,6 +186,9 @@ async def app_connect_db(app, loop):
|
||||||
)
|
)
|
||||||
app.ctx._db_engine = await app.ctx._db_engine_ctx.__aenter__()
|
app.ctx._db_engine = await app.ctx._db_engine_ctx.__aenter__()
|
||||||
|
|
||||||
|
if app.config.TILE_SEMAPHORE_SIZE:
|
||||||
|
app.ctx._tile_semaphore = asyncio.Semaphore(app.config.TILE_SEMAPHORE_SIZE)
|
||||||
|
|
||||||
|
|
||||||
@app.after_server_stop
|
@app.after_server_stop
|
||||||
async def app_disconnect_db(app, loop):
|
async def app_disconnect_db(app, loop):
|
||||||
|
|
|
@ -1,10 +1,12 @@
|
||||||
|
import asyncio
|
||||||
|
from contextlib import asynccontextmanager
|
||||||
from gzip import decompress
|
from gzip import decompress
|
||||||
from sqlite3 import connect
|
from sqlite3 import connect
|
||||||
from datetime import datetime, time, timedelta
|
from datetime import datetime, time, timedelta
|
||||||
from typing import Optional, Tuple
|
from typing import Optional, Tuple
|
||||||
|
|
||||||
import dateutil.parser
|
import dateutil.parser
|
||||||
from sanic.exceptions import Forbidden, InvalidUsage
|
from sanic.exceptions import Forbidden, InvalidUsage, ServiceUnavailable
|
||||||
from sanic.response import raw
|
from sanic.response import raw
|
||||||
|
|
||||||
from sqlalchemy import select, text
|
from sqlalchemy import select, text
|
||||||
|
@ -85,26 +87,59 @@ def get_filter_options(
|
||||||
return user_id, start, end
|
return user_id, start, end
|
||||||
|
|
||||||
|
|
||||||
|
@asynccontextmanager
|
||||||
|
async def use_tile_semaphore(req, timeout=10):
|
||||||
|
"""
|
||||||
|
If configured, acquire a semaphore for the map tile request and release it
|
||||||
|
after the context has finished.
|
||||||
|
|
||||||
|
If the semaphore cannot be acquired within the timeout, issue a 503 Service
|
||||||
|
Unavailable error response that describes that the map tile database is
|
||||||
|
overloaded, so users know what the problem is.
|
||||||
|
|
||||||
|
Operates as a noop when the tile semaphore is not enabled.
|
||||||
|
"""
|
||||||
|
sem = getattr(req.app.ctx, "_tile_semaphore", None)
|
||||||
|
|
||||||
|
if sem is None:
|
||||||
|
yield
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
await asyncio.wait_for(sem.acquire(), timeout)
|
||||||
|
|
||||||
|
try:
|
||||||
|
yield
|
||||||
|
finally:
|
||||||
|
sem.release()
|
||||||
|
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
raise ServiceUnavailable(
|
||||||
|
"Too many map content requests, database overloaded. Please retry later."
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@app.route(r"/tiles/<zoom:int>/<x:int>/<y:(\d+)\.pbf>")
|
@app.route(r"/tiles/<zoom:int>/<x:int>/<y:(\d+)\.pbf>")
|
||||||
async def tiles(req, zoom: int, x: int, y: str):
|
async def tiles(req, zoom: int, x: int, y: str):
|
||||||
if app.config.get("TILES_FILE"):
|
async with use_tile_semaphore(req):
|
||||||
tile = get_tile(req.app.config.TILES_FILE, int(zoom), int(x), int(y))
|
if app.config.get("TILES_FILE"):
|
||||||
|
tile = get_tile(req.app.config.TILES_FILE, int(zoom), int(x), int(y))
|
||||||
|
|
||||||
else:
|
else:
|
||||||
user_id, start, end = get_filter_options(req)
|
user_id, start, end = get_filter_options(req)
|
||||||
|
|
||||||
tile = await req.ctx.db.scalar(
|
tile = await req.ctx.db.scalar(
|
||||||
text(
|
text(
|
||||||
f"select data from getmvt(:zoom, :x, :y, :user_id, :min_time, :max_time) as b(data, key);"
|
f"select data from getmvt(:zoom, :x, :y, :user_id, :min_time, :max_time) as b(data, key);"
|
||||||
).bindparams(
|
).bindparams(
|
||||||
zoom=int(zoom),
|
zoom=int(zoom),
|
||||||
x=int(x),
|
x=int(x),
|
||||||
y=int(y),
|
y=int(y),
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
min_time=start,
|
min_time=start,
|
||||||
max_time=end,
|
max_time=end,
|
||||||
|
)
|
||||||
)
|
)
|
||||||
)
|
|
||||||
|
|
||||||
gzip = "gzip" in req.headers["accept-encoding"]
|
gzip = "gzip" in req.headers["accept-encoding"]
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue