Single-container API+Frontend+Worker

This commit is contained in:
Paul Bienkowski 2021-11-14 23:42:02 +01:00
parent c353d2afc1
commit c85f261292
26 changed files with 532 additions and 364 deletions

10
.dockerignore Normal file
View file

@ -0,0 +1,10 @@
local
*.user
frontend/node_modules
api/.pyenv
.git
cache
data
tile-generator/cache
tile-generator/data
tile-generator/build

43
Dockerfile Normal file
View file

@ -0,0 +1,43 @@
# This dockerfile is for the API + Frontend production image
#############################################
# Build the frontend AS builder
#############################################
FROM node:14 as frontend-builder
WORKDIR /opt/obs/frontend
ADD frontend/package.json frontend/package-lock.json /opt/obs/frontend/
RUN echo update-notifier=false >> ~/.npmrc
RUN npm ci
ADD frontend/tsconfig.json frontend/craco.config.js /opt/obs/frontend/
ADD frontend/src /opt/obs/frontend/src/
ADD frontend/public /opt/obs/frontend/public/
RUN npm run build
#############################################
# Build the API and add the built frontend to it
#############################################
FROM python:3.9.7-bullseye
WORKDIR /opt/obs/api
ADD api/requirements.txt /opt/obs/api/
RUN pip install -r requirements.txt
ADD api/scripts /opt/obs/scripts
RUN pip install -e /opt/obs/scripts
ADD api/setup.py /opt/obs/api/
ADD api/obs /opt/obs/api/obs/
RUN pip install -e /opt/obs/api/
COPY --from=frontend-builder /opt/obs/frontend/build /opt/obs/frontend/build
EXPOSE 8000
CMD ["openbikesensor-api"]

View file

@ -10,4 +10,4 @@ RUN pip install -e .
EXPOSE 8000 EXPOSE 8000
CMD ["obs-api"] CMD ["openbikesensor-api"]

View file

@ -19,16 +19,21 @@ KEYCLOAK_URL = "http://keycloak:8080/auth/realms/OBS%20Dev/"
KEYCLOAK_CLIENT_ID = "portal" KEYCLOAK_CLIENT_ID = "portal"
KEYCLOAK_CLIENT_SECRET = "76b84224-dc24-4824-bb98-9e1ba15bd58f" KEYCLOAK_CLIENT_SECRET = "76b84224-dc24-4824-bb98-9e1ba15bd58f"
# Whether the API should run the worker loop, or a dedicated worker is used
DEDICATED_WORKER = True
# The root of the frontend. Needed for redirecting after login, and for CORS. # The root of the frontend. Needed for redirecting after login, and for CORS.
MAIN_FRONTEND_URL = "http://localhost:3001/" # Set to None if frontend is served by the API.
FRONTEND_URL = "http://localhost:3000/"
# Mail settings # Where to find the compiled frontend assets (must include index.html), or None
MAIL_ENABLED = False # to disable serving the frontend.
FRONTEND_DIR = None
# Urls to important documents, hosted elsewhere # Can be an object or a JSON string
IMPRINT_URL = "https://example.com/imprint" FRONTEND_CONFIG = None
PRIVACY_POLICY_URL = "https://example.com/privacy"
# Path overrides:
# API_ROOT_DIR = "??" # default: api/ inside repository # API_ROOT_DIR = "??" # default: api/ inside repository
DATA_DIR = "/data" DATA_DIR = "/data"
# PROCESSING_DIR = "??" # default: DATA_DIR/processing # PROCESSING_DIR = "??" # default: DATA_DIR/processing

54
api/config.prod-test.py Normal file
View file

@ -0,0 +1,54 @@
# Bind address of the server
HOST = "0.0.0.0"
PORT = 3000
# Extended log output, but slower
DEBUG = True
# Required to encrypt or sign sessions, cookies, tokens, etc.
SECRET = "CHANGEME!!!!!!!!!!@##@!!$$$$$$$$$$$$$!!"
# Connection to the database
POSTGRES_URL = "postgresql+asyncpg://obs:obs@postgres/obs"
# URL to the keycloak realm, as reachable by the API service. This is not
# necessarily its publicly reachable URL, keycloak advertises that iself.
KEYCLOAK_URL = "http://keycloak:8080/auth/realms/OBS%20Dev/"
# Auth client credentials
KEYCLOAK_CLIENT_ID = "portal"
KEYCLOAK_CLIENT_SECRET = "76b84224-dc24-4824-bb98-9e1ba15bd58f"
# Whether the API should run the worker loop, or a dedicated worker is used
DEDICATED_WORKER = False
# The root of the frontend. Needed for redirecting after login, and for CORS.
# Set to None if frontend is served by the API.
FRONTEND_URL = None
# Where to find the compiled frontend assets (must include index.html), or None
# to disable serving the frontend.
FRONTEND_DIR = "../frontend/build/"
# Can be an object or a JSON string
FRONTEND_CONFIG = {
"imprintUrl": "https://example.com/imprint",
"privacyPolicyUrl": "https://example.com/privacy",
"mapTileset": {
"url": "https://tiles.wmflabs.org/bw-mapnik/{z}/{x}/{y}.png",
"minZoom": 0,
"maxZoom": 18,
},
"mapHome": {"zoom": 15, "longitude": 7.8302, "latitude": 47.9755},
"obsMapSource": "http://localhost:3002/data/v3.json",
}
# Path overrides:
# API_ROOT_DIR = "??" # default: api/ inside repository
DATA_DIR = "/data"
# PROCESSING_DIR = "??" # default: DATA_DIR/processing
# PROCESSING_OUTPUT_DIR = "??" # default: DATA_DIR/processing-output
# TRACKS_DIR = "??" # default: DATA_DIR/tracks
# OBS_FACE_CACHE_DIR = "??" # default: DATA_DIR/obs-face-cache
# vim: set ft=python :

View file

@ -19,20 +19,29 @@ KEYCLOAK_URL = "http://localhost:1234/auth/realms/obs/"
KEYCLOAK_CLIENT_ID = "portal" KEYCLOAK_CLIENT_ID = "portal"
KEYCLOAK_CLIENT_SECRET = "00000000-0000-0000-0000-000000000000" KEYCLOAK_CLIENT_SECRET = "00000000-0000-0000-0000-000000000000"
# Whether the API should run the worker loop, or a dedicated worker is used
DEDICATED_WORKER = True
# The root of the frontend. Needed for redirecting after login, and for CORS. # The root of the frontend. Needed for redirecting after login, and for CORS.
MAIN_FRONTEND_URL = "https://portal.example.com/" # Set to None if frontend is served by the API.
FRONTEND_URL = None
# Mail settings # Where to find the compiled frontend assets (must include index.html), or None
MAIL_ENABLED = False # to disable serving the frontend.
MAIL_FROM = "Sender Name <sender@example.com>" FRONTEND_DIR = "../frontend/build/"
MAIL_SMTP_HOST = "mail.example.com"
MAIL_SMTP_PORT = 465
MAIL_STARTTLS = False
MAIL_SMTP_USERNAME = "sender@example.com"
# Urls to important documents, hosted elsewhere # Can be an object or a JSON string
IMPRINT_URL = "https://example.com/imprint" FRONTEND_CONFIG = {
PRIVACY_POLICY_URL = "https://example.com/privacy" "imprintUrl": "https://example.com/imprint",
"privacyPolicyUrl": "https://example.com/privacy",
"mapTileset": {
"url": "https://tiles.wmflabs.org/bw-mapnik/{z}/{x}/{y}.png",
"minZoom": 0,
"maxZoom": 18,
},
"mapHome": {"zoom": 15, "longitude": 7.8302, "latitude": 47.9755},
"obsMapSource": "http://localhost:3002/data/v3.json",
}
# Path overrides: # Path overrides:
# API_ROOT_DIR = "??" # default: api/ inside repository # API_ROOT_DIR = "??" # default: api/ inside repository

View file

@ -3,20 +3,19 @@ import os
from json import JSONEncoder, dumps from json import JSONEncoder, dumps
from functools import wraps, partial from functools import wraps, partial
from urllib.parse import urlparse from urllib.parse import urlparse
from os.path import dirname, join, normpath, abspath from os.path import dirname, join, normpath, abspath, exists, isfile
from datetime import datetime, date from datetime import datetime, date
from sanic import Sanic from sanic import Sanic, Blueprint
from sanic.response import text, json as json_response from sanic.response import text, json as json_response, file as file_response
from sanic.exceptions import Unauthorized from sanic.exceptions import Unauthorized, NotFound
from sanic_session import Session, InMemorySessionInterface from sanic_session import Session, InMemorySessionInterface
from sanic_cors import CORS
from sqlalchemy import select from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import sessionmaker
from obs.api.db import User, async_session from obs.api.db import User, make_session, connect_db
from sanic_session.base import BaseSessionInterface from sanic_session.base import BaseSessionInterface
from sanic_session.utils import ExpiringDict from sanic_session.utils import ExpiringDict
@ -27,6 +26,9 @@ app = Sanic("OpenBikeSensor Portal API")
app.update_config("./config.py") app.update_config("./config.py")
c = app.config c = app.config
api = Blueprint("api", url_prefix="/api")
auth = Blueprint("auth", url_prefix="")
# Configure paths # Configure paths
c.API_ROOT_DIR = c.get("API_ROOT_DIR") or abspath(join(dirname(__file__), "..", "..")) c.API_ROOT_DIR = c.get("API_ROOT_DIR") or abspath(join(dirname(__file__), "..", ".."))
c.DATA_DIR = c.get("DATA_DIR") or normpath(join(c.API_ROOT_DIR, "../data")) c.DATA_DIR = c.get("DATA_DIR") or normpath(join(c.API_ROOT_DIR, "../data"))
@ -36,11 +38,15 @@ c.PROCESSING_OUTPUT_DIR = c.get("PROCESSING_OUTPUT_DIR") or join(
) )
c.TRACKS_DIR = c.get("TRACKS_DIR") or join(c.DATA_DIR, "tracks") c.TRACKS_DIR = c.get("TRACKS_DIR") or join(c.DATA_DIR, "tracks")
c.OBS_FACE_CACHE_DIR = c.get("OBS_FACE_CACHE_DIR") or join(c.DATA_DIR, "obs-face-cache") c.OBS_FACE_CACHE_DIR = c.get("OBS_FACE_CACHE_DIR") or join(c.DATA_DIR, "obs-face-cache")
c.FRONTEND_DIR = c.get("FRONTEND_DIR")
main_frontend_url = urlparse(c.MAIN_FRONTEND_URL) if c.FRONTEND_URL:
from sanic_cors import CORS
frontend_url = urlparse(c.FRONTEND_URL)
CORS( CORS(
app, app,
origins=[f"{main_frontend_url.scheme}://{main_frontend_url.netloc}"], origins=[f"{frontend_url.scheme}://{frontend_url.netloc}"],
supports_credentials=True, supports_credentials=True,
) )
@ -51,28 +57,28 @@ Session(app, interface=InMemorySessionInterface())
@app.before_server_start @app.before_server_start
async def app_connect_db(app, loop): async def app_connect_db(app, loop):
app.ctx._db_engine = create_async_engine(c.POSTGRES_URL, echo=c.DEBUG) app.ctx._db_engine_ctx = connect_db(c.POSTGRES_URL)
app.ctx._db_engine = await app.ctx._db_engine_ctx.__aenter__()
@app.after_server_stop @app.after_server_stop
async def app_disconnect_db(app, loop): async def app_disconnect_db(app, loop):
if app.ctx._db_engine: if hasattr(app.ctx, "_db_engine_ctx"):
await app.ctx._db_engine.dispose() await app.ctx._db_engine_ctx.__aexit__(None, None, None)
@app.middleware("request") @app.middleware("request")
async def inject_session(req): async def inject_session(req):
req.ctx.db = sessionmaker( req.ctx._session_ctx = make_session()
req.app.ctx._db_engine, class_=AsyncSession, expire_on_commit=False req.ctx.db = await req.ctx._session_ctx.__aenter__()
)() sessionmaker(req.app.ctx._db_engine, class_=AsyncSession, expire_on_commit=False)()
req.ctx._db_session_ctx_token = async_session.set(req.ctx.db)
@app.middleware("response") @app.middleware("response")
async def close_session(req, response): async def close_session(req, response):
if hasattr(req.ctx, "_db_session_ctx_token"): if hasattr(req.ctx, "_session_ctx"):
async_session.reset(req.ctx._db_session_ctx_token)
await req.ctx.db.close() await req.ctx.db.close()
await req.ctx._session_ctx.__aexit__(None, None, None)
@app.middleware("request") @app.middleware("request")
@ -87,11 +93,6 @@ async def load_user(req):
req.ctx.user = user req.ctx.user = user
@app.route("/")
def index(req):
return text("Hello, %s!" % (req.ctx.user.username if req.ctx.user else "World"))
def require_auth(fn): def require_auth(fn):
@wraps(fn) @wraps(fn)
def wrapper(req, *args, **kwargs): def wrapper(req, *args, **kwargs):
@ -116,3 +117,44 @@ def json(*args, **kwargs):
from . import routes from . import routes
INDEX_HTML = join(c.FRONTEND_DIR, "index.html")
if exists(INDEX_HTML):
@app.get("/config.json")
def get_frontend_config(req):
base_path = req.server_path.replace("config.json", "")
return json_response(
{
**req.app.config.FRONTEND_CONFIG,
"apiUrl": f"{req.scheme}://{req.host}{base_path}api",
"loginUrl": f"{req.scheme}://{req.host}{base_path}login",
}
)
@app.get("/<path:path>")
def get_frontend_static(req, path):
if path.startswith("api/"):
raise NotFound()
file = join(c.FRONTEND_DIR, path)
if not exists(file) or not path or not isfile(file):
file = INDEX_HTML
return file_response(file)
app.blueprint(api)
app.blueprint(auth)
if not app.config.DEDICATED_WORKER:
async def worker():
from obs.api.process import process_tracks_loop
from obs.face.osm import DataSource, DatabaseTileSource
data_source = DataSource(DatabaseTileSource())
# run forever
await process_tracks_loop(data_source, 10)
app.add_task(worker())

View file

@ -1,17 +0,0 @@
# Configure paths
config.API_ROOT_DIR = config.get("API_ROOT_DIR") or abspath(
join(dirname(__file__), "..", "..")
)
config.DATA_DIR = config.get("DATA_DIR") or normpath(
join(config.API_ROOT_DIR, "../data")
)
config.PROCESSING_DIR = config.get("PROCESSING_DIR") or join(
config.DATA_DIR, "processing"
)
config.PROCESSING_OUTPUT_DIR = config.get("PROCESSING_OUTPUT_DIR") or join(
config.DATA_DIR, "processing-output"
)
config.TRACKS_DIR = config.get("TRACKS_DIR") or join(config.DATA_DIR, "tracks")
config.OBS_FACE_CACHE_DIR = config.get("OBS_FACE_CACHE_DIR") or join(
config.DATA_DIR, "obs-face-cache"
)

View file

@ -14,7 +14,7 @@ from slugify import slugify
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy.orm import sessionmaker as SessionMaker, relationship
from sqlalchemy.types import UserDefinedType, BIGINT, TEXT from sqlalchemy.types import UserDefinedType, BIGINT, TEXT
from sqlalchemy import ( from sqlalchemy import (
Boolean, Boolean,
@ -38,18 +38,18 @@ from sqlalchemy.dialects.postgresql import HSTORE, UUID
Base = declarative_base() Base = declarative_base()
engine = ContextVar("engine") engine = None
async_session = ContextVar("async_session") sessionmaker = None
@asynccontextmanager @asynccontextmanager
async def make_session(): async def make_session():
async with async_session.get()() as session: async with sessionmaker() as session:
yield session yield session
async def init_models(): async def init_models():
async with engine.get().begin() as conn: async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all) await conn.run_sync(Base.metadata.drop_all)
await conn.execute(text('CREATE EXTENSION IF NOT EXISTS "hstore";')) await conn.execute(text('CREATE EXTENSION IF NOT EXISTS "hstore";'))
await conn.execute(text('CREATE EXTENSION IF NOT EXISTS "postgis";')) await conn.execute(text('CREATE EXTENSION IF NOT EXISTS "postgis";'))
@ -64,19 +64,19 @@ def random_string(length):
@asynccontextmanager @asynccontextmanager
async def connect_db(url): async def connect_db(url):
engine_ = create_async_engine(url, echo=False) global engine, sessionmaker
t1 = engine.set(engine_)
async_session_ = sessionmaker(engine_, class_=AsyncSession, expire_on_commit=False) engine = create_async_engine(url, echo=False)
t2 = async_session.set(async_session_) sessionmaker = SessionMaker(engine, class_=AsyncSession, expire_on_commit=False)
yield yield
# for AsyncEngine created in function scope, close and # for AsyncEngine created in function scope, close and
# clean-up pooled connections # clean-up pooled connections
await engine_.dispose() await engine.dispose()
engine.reset(t1)
async_session.reset(t2) engine = None
sessionmaker = None
ZoneType = SqlEnum("rural", "urban", "motorway", name="zone_type") ZoneType = SqlEnum("rural", "urban", "motorway", name="zone_type")

230
api/obs/api/process.py Normal file
View file

@ -0,0 +1,230 @@
import logging
import os
import json
import asyncio
import hashlib
import struct
import pytz
from os.path import join
from datetime import datetime
from sqlalchemy import delete, select
from sqlalchemy.orm import joinedload
from obs.face.importer import ImportMeasurementsCsv
from obs.face.geojson import ExportMeasurements
from obs.face.annotate import AnnotateMeasurements
from obs.face.filter import (
AnonymizationMode,
ChainFilter,
ConfirmedFilter,
DistanceMeasuredFilter,
PrivacyFilter,
PrivacyZone,
PrivacyZonesFilter,
RequiredFieldsFilter,
)
from obs.api.db import OvertakingEvent, Track, make_session
from obs.api.app import app
log = logging.getLogger(__name__)
async def process_tracks_loop(data_source, delay):
while True:
async with make_session() as session:
track = (
await session.execute(
select(Track)
.where(Track.processing_status == "queued")
.order_by(Track.processing_queued_at)
.options(joinedload(Track.author))
)
).scalar()
if track is None:
await asyncio.sleep(delay)
continue
try:
await process_track(session, track, data_source)
except:
log.exception("Failed to process track %s. Will continue.", track.slug)
await asyncio.sleep(1)
continue
async def process_tracks(data_source, tracks):
"""
Processes the tracks and writes event data to the database.
:param tracks: A list of strings which
"""
with make_session() as session:
for track_id_or_slug in tracks:
track = (
await session.execute(
select(Track)
.where(
Track.id == track_id_or_slug
if isinstance(track_id_or_slug, int)
else Track.slug == track_id_or_slug
)
.options(joinedload(Track.author))
)
).scalar()
if not track:
raise ValueError(f"Track {track_id_or_slug!r} not found.")
await process_track(session, track, data_source)
def to_naive_utc(t):
if t is None:
return None
return t.astimezone(pytz.UTC).replace(tzinfo=None)
async def process_track(session, track, data_source):
try:
track.processing_status = "complete"
track.processed_at = datetime.utcnow()
await session.commit()
original_file_path = track.get_original_file_path(app.config)
output_dir = join(
app.config.PROCESSING_OUTPUT_DIR, track.author.username, track.slug
)
os.makedirs(output_dir, exist_ok=True)
log.info("Annotating and filtering CSV file")
imported_data, statistics = ImportMeasurementsCsv().read(
original_file_path,
user_id="dummy", # TODO: user username or id or nothing?
dataset_id=Track.slug, # TODO: use track id or slug or nothing?
)
annotator = AnnotateMeasurements(
data_source, cache_dir=app.config.OBS_FACE_CACHE_DIR
)
input_data = await annotator.annotate(imported_data)
track_filter = ChainFilter(
RequiredFieldsFilter(),
PrivacyFilter(
user_id_mode=AnonymizationMode.REMOVE,
measurement_id_mode=AnonymizationMode.REMOVE,
),
# TODO: load user privacy zones and create a PrivacyZonesFilter() from them
)
measurements_filter = DistanceMeasuredFilter()
overtaking_events_filter = ConfirmedFilter()
track_points = track_filter.filter(input_data, log=log)
measurements = measurements_filter.filter(track_points, log=log)
overtaking_events = overtaking_events_filter.filter(measurements, log=log)
exporter = ExportMeasurements("measurements.dummy")
await exporter.add_measurements(measurements)
measurements_json = exporter.get_data()
del exporter
exporter = ExportMeasurements("overtaking_events.dummy")
await exporter.add_measurements(overtaking_events)
overtaking_events_json = exporter.get_data()
del exporter
track_json = {
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [[m["latitude"], m["longitude"]] for m in track_points],
},
}
for output_filename, data in [
("measurements.json", measurements_json),
("overtakingEvents.json", overtaking_events_json),
("track.json", track_json),
]:
target = join(output_dir, output_filename)
log.debug("Writing file %s", target)
with open(target, "w") as fp:
json.dump(data, fp, indent=4)
log.info("Import events into database...")
await clear_track_data(session, track)
await import_overtaking_events(session, track, overtaking_events)
log.info("Write track statistics and update status...")
track.recorded_at = to_naive_utc(statistics["t_min"])
track.recorded_until = to_naive_utc(statistics["t_max"])
track.duration = statistics["t"]
track.length = statistics["d"]
track.segments = statistics["n_segments"]
track.num_events = statistics["n_confirmed"]
track.num_measurements = statistics["n_measurements"]
track.num_valid = statistics["n_valid"]
track.processing_status = "complete"
track.processed_at = datetime.utcnow()
await session.commit()
log.info("Track %s imported.", track.slug)
except BaseException as e:
await clear_track_data(session, track)
track.processing_status = "error"
track.processing_log = str(e)
track.processed_at = datetime.utcnow()
await session.commit()
raise
async def clear_track_data(session, track):
track.recorded_at = None
track.recorded_until = None
track.duration = None
track.length = None
track.segments = None
track.num_events = None
track.num_measurements = None
track.num_valid = None
await session.execute(
delete(OvertakingEvent).where(OvertakingEvent.track_id == track.id)
)
async def import_overtaking_events(session, track, overtaking_events):
event_models = []
for m in overtaking_events:
hex_hash = hashlib.sha256(
struct.pack("QQ", track.id, int(m["time"].timestamp()))
).hexdigest()
event_models.append(
OvertakingEvent(
track_id=track.id,
hex_hash=hex_hash,
way_id=m.get("OSM_way_id"),
direction_reversed=m.get("OSM_way_orientation", 0) < 0,
geometry=json.dumps(
{
"type": "Point",
"coordinates": [m["longitude"], m["latitude"]],
}
),
latitude=m["latitude"],
longitude=m["longitude"],
time=m["time"].astimezone(pytz.utc).replace(tzinfo=None),
distance_overtaker=m["distance_overtaker"],
distance_stationary=m["distance_stationary"],
course=m["course"],
speed=m["speed"],
)
)
session.add_all(event_models)

View file

@ -2,7 +2,7 @@ import logging
# from sqlalchemy import select # from sqlalchemy import select
from obs.api.app import app from obs.api.app import api
from sanic.response import json from sanic.response import json
@ -11,7 +11,7 @@ log = logging.getLogger(__name__)
from obs.api import __version__ as version from obs.api import __version__ as version
@app.route("/info") @api.route("/info")
async def info(req): async def info(req):
return json( return json(
{ {

View file

@ -8,7 +8,7 @@ from oic.oic import Client
from oic.oic.message import AuthorizationResponse, RegistrationResponse from oic.oic.message import AuthorizationResponse, RegistrationResponse
from oic.utils.authn.client import CLIENT_AUTHN_METHOD from oic.utils.authn.client import CLIENT_AUTHN_METHOD
from obs.api.app import app from obs.api.app import auth
from obs.api.db import User from obs.api.db import User
from sanic.response import json, redirect from sanic.response import json, redirect
@ -19,7 +19,7 @@ log = logging.getLogger(__name__)
client = Client(client_authn_method=CLIENT_AUTHN_METHOD) client = Client(client_authn_method=CLIENT_AUTHN_METHOD)
@app.before_server_start @auth.before_server_start
async def connect_auth_client(app, loop): async def connect_auth_client(app, loop):
client.allow["issuer_mismatch"] = True client.allow["issuer_mismatch"] = True
pc = client.provider_config(app.config.KEYCLOAK_URL) pc = client.provider_config(app.config.KEYCLOAK_URL)
@ -31,7 +31,7 @@ async def connect_auth_client(app, loop):
) )
@app.route("/login") @auth.route("/login")
@parse_parameters @parse_parameters
async def login(req, next: str = None): async def login(req, next: str = None):
session = req.ctx.session session = req.ctx.session
@ -53,7 +53,7 @@ async def login(req, next: str = None):
return redirect(login_url) return redirect(login_url)
@app.route("/login/redirect") @auth.route("/login/redirect")
async def login_redirect(req): async def login_redirect(req):
session = req.ctx.session session = req.ctx.session
@ -84,7 +84,11 @@ async def login_redirect(req):
if user is None: if user is None:
user = ( user = (
await req.ctx.db.execute( await req.ctx.db.execute(
select(User).where(User.email == email and User.username = preferred_username and User.match_by_username_email) select(User).where(
User.email == email
and User.username == preferred_username
and User.match_by_username_email
)
) )
).scalar() ).scalar()

View file

@ -9,7 +9,7 @@ from sqlalchemy import select, func
from sanic.response import json from sanic.response import json
from sanicargs import parse_parameters from sanicargs import parse_parameters
from obs.api.app import app from obs.api.app import api
from obs.api.db import Track, OvertakingEvent, User from obs.api.db import Track, OvertakingEvent, User
@ -30,7 +30,7 @@ def round_to(value: float, multiples: float) -> float:
return round(value / multiples) * multiples return round(value / multiples) * multiples
@app.route("/stats") @api.route("/stats")
@parse_parameters @parse_parameters
async def stats(req, user: str = None, start: datetime = None, end: datetime = None): async def stats(req, user: str = None, start: datetime = None, end: datetime = None):
conditions = [ conditions = [
@ -51,7 +51,6 @@ async def stats(req, user: str = None, start: datetime = None, end: datetime = N
if by_user: if by_user:
conditions.append(Track.author_id == req.ctx.user.id) conditions.append(Track.author_id == req.ctx.user.id)
print(conditions)
track_condition = reduce(and_, conditions) track_condition = reduce(and_, conditions)
public_track_condition = Track.public and track_condition public_track_condition = Track.public and track_condition

View file

@ -7,7 +7,7 @@ from sqlalchemy import select, func
from sqlalchemy.orm import joinedload from sqlalchemy.orm import joinedload
from obs.api.db import Track, User, Comment from obs.api.db import Track, User, Comment
from obs.api.app import app, require_auth, json from obs.api.app import api, require_auth, json
from sanic.response import file_stream, empty from sanic.response import file_stream, empty
from sanic.exceptions import InvalidUsage, NotFound, Forbidden from sanic.exceptions import InvalidUsage, NotFound, Forbidden
@ -60,7 +60,7 @@ async def _return_tracks(req, extend_query, limit, offset):
) )
@app.get("/tracks") @api.get("/tracks")
@parse_parameters @parse_parameters
async def get_tracks(req, limit: int = 20, offset: int = 0, author: str = None): async def get_tracks(req, limit: int = 20, offset: int = 0, author: str = None):
def extend_query(q): def extend_query(q):
@ -74,7 +74,7 @@ async def get_tracks(req, limit: int = 20, offset: int = 0, author: str = None):
return await _return_tracks(req, extend_query, limit, offset) return await _return_tracks(req, extend_query, limit, offset)
@app.get("/tracks/feed") @api.get("/tracks/feed")
@require_auth @require_auth
@parse_parameters @parse_parameters
async def get_feed(req, limit: int = 20, offset: int = 0): async def get_feed(req, limit: int = 20, offset: int = 0):
@ -84,7 +84,7 @@ async def get_feed(req, limit: int = 20, offset: int = 0):
return await _return_tracks(req, extend_query, limit, offset) return await _return_tracks(req, extend_query, limit, offset)
@app.post("/tracks") @api.post("/tracks")
@require_auth @require_auth
async def post_track(req): async def post_track(req):
try: try:
@ -143,7 +143,7 @@ async def _load_track(req, slug, raise_not_found=True):
return track return track
@app.get("/tracks/<slug:str>") @api.get("/tracks/<slug:str>")
async def get_track(req, slug: str): async def get_track(req, slug: str):
track = await _load_track(req, slug) track = await _load_track(req, slug)
return json( return json(
@ -151,7 +151,7 @@ async def get_track(req, slug: str):
) )
@app.delete("/tracks/<slug:str>") @api.delete("/tracks/<slug:str>")
@require_auth @require_auth
async def delete_track(req, slug: str): async def delete_track(req, slug: str):
track = await _load_track(req, slug) track = await _load_track(req, slug)
@ -164,7 +164,7 @@ async def delete_track(req, slug: str):
return empty() return empty()
@app.get("/tracks/<slug:str>/data") @api.get("/tracks/<slug:str>/data")
async def get_track_data(req, slug: str): async def get_track_data(req, slug: str):
track = await _load_track(req, slug) track = await _load_track(req, slug)
@ -191,17 +191,17 @@ async def get_track_data(req, slug: str):
) )
@app.get("/tracks/<slug:str>/download/original.csv") @api.get("/tracks/<slug:str>/download/original.csv")
async def download_original_file(req, slug: str): async def download_original_file(req, slug: str):
track = await _load_track(req, slug) track = await _load_track(req, slug)
if not track.is_visible_to_private(req.ctx.user): if not track.is_visible_to_private(req.ctx.user):
raise Forbidden() raise Forbidden()
return await file_stream(track.get_original_file_path(app.config)) return await file_stream(track.get_original_file_path(req.app.config))
@app.put("/tracks/<slug:str>") @api.put("/tracks/<slug:str>")
@require_auth @require_auth
async def put_track(req, slug: str): async def put_track(req, slug: str):
track = await _load_track(req, slug) track = await _load_track(req, slug)
@ -254,7 +254,7 @@ async def put_track(req, slug: str):
) )
@app.get("/tracks/<slug:str>/comments") @api.get("/tracks/<slug:str>/comments")
@parse_parameters @parse_parameters
async def get_track_comments(req, slug: str, limit: int = 20, offset: int = 0): async def get_track_comments(req, slug: str, limit: int = 20, offset: int = 0):
track = await _load_track(req, slug) track = await _load_track(req, slug)
@ -289,7 +289,7 @@ async def get_track_comments(req, slug: str, limit: int = 20, offset: int = 0):
) )
@app.post("/tracks/<slug:str>/comments") @api.post("/tracks/<slug:str>/comments")
@require_auth @require_auth
async def post_track_comment(req, slug: str): async def post_track_comment(req, slug: str):
track = await _load_track(req, slug) track = await _load_track(req, slug)
@ -326,21 +326,11 @@ async def post_track_comment(req, slug: str):
return json({"comment": comment.to_dict(for_user_id=req.ctx.user.id)}) return json({"comment": comment.to_dict(for_user_id=req.ctx.user.id)})
@app.delete("/tracks/<slug:str>/comments/<uid:str>") @api.delete("/tracks/<slug:str>/comments/<uid:str>")
@require_auth @require_auth
async def delete_track_comment(req, slug: str, uid: str): async def delete_track_comment(req, slug: str, uid: str):
print("XXXXXXXXXXXXXX")
print("XXXXXXXXXXXXXX")
print("XXXXXXXXXXXXXX")
print("XXXXXXXXXXXXXX")
print("XXXXXXXXXXXXXX")
print("XXXXXXXXXXXXXX")
print("XXXXXXXXXXXXXX")
print("XXXXXXXXXXXXXX")
print("XXXXXXXXXXXXXX")
track = await _load_track(req, slug) track = await _load_track(req, slug)
print("trackid", track.id, " uid", uid)
comment = ( comment = (
await req.ctx.db.execute( await req.ctx.db.execute(
select(Comment) select(Comment)

View file

@ -2,7 +2,7 @@ import logging
from sanic.response import json from sanic.response import json
from obs.api.app import app, require_auth from obs.api.app import api, require_auth
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -20,13 +20,13 @@ def user_to_json(user):
} }
@app.get("/user") @api.get("/user")
@require_auth @require_auth
async def get_user(req): async def get_user(req):
return json(user_to_json(req.ctx.user)) return json(user_to_json(req.ctx.user))
@app.put("/user") @api.put("/user")
@require_auth @require_auth
async def put_user(req): async def put_user(req):
user = req.ctx.user user = req.ctx.user

@ -1 +1 @@
Subproject commit 145b06a80d4607ff2c4a8dcf80e2f5fb0e1c8f1a Subproject commit 94e183d7024742fcedc2c79985f0ec42f90ccc69

View file

@ -4,7 +4,7 @@ with open("requirements.txt") as f:
requires = list(f.readlines()) requires = list(f.readlines())
setup( setup(
name="obs-api", name="openbikesensor-api",
version="0.0.1", version="0.0.1",
author="OpenBikeSensor Contributors", author="OpenBikeSensor Contributors",
license="LGPL-3.0", license="LGPL-3.0",
@ -15,7 +15,7 @@ setup(
install_requires=requires, install_requires=requires,
entry_points={ entry_points={
"console_scripts": [ "console_scripts": [
"obs-api=obs.bin.obs_api:main", "openbikesensor-api=obs.bin.openbikesensor_api:main",
] ]
}, },
) )

View file

@ -9,7 +9,7 @@ from sqlalchemy import select
from motor.motor_asyncio import AsyncIOMotorClient from motor.motor_asyncio import AsyncIOMotorClient
from obs.api.db import make_session, connect_db, User, Track, async_session, Comment from obs.api.db import make_session, connect_db, User, Track, Comment
from obs.api.app import app from obs.api.app import app
log = logging.getLogger(__name__) log = logging.getLogger(__name__)

View file

@ -1,37 +1,13 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import argparse import argparse
import logging import logging
import os
import sys
import json
import shutil
import asyncio import asyncio
import hashlib
import struct
import pytz
from os.path import join, dirname, abspath
from datetime import datetime
from sqlalchemy import delete, select
from sqlalchemy.orm import joinedload
from obs.face.importer import ImportMeasurementsCsv
from obs.face.geojson import ExportMeasurements
from obs.face.annotate import AnnotateMeasurements
from obs.face.filter import (
AnonymizationMode,
ChainFilter,
ConfirmedFilter,
DistanceMeasuredFilter,
PrivacyFilter,
PrivacyZone,
PrivacyZonesFilter,
RequiredFieldsFilter,
)
from obs.face.osm import DataSource, DatabaseTileSource, OverpassTileSource from obs.face.osm import DataSource, DatabaseTileSource, OverpassTileSource
from obs.api.db import make_session, connect_db, OvertakingEvent, async_session, Track from obs.api.db import make_session, connect_db, make_session
from obs.api.app import app from obs.api.app import app
from obs.api.process import process_tracks, process_tracks_loop
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
@ -62,210 +38,16 @@ async def main():
args = parser.parse_args() args = parser.parse_args()
async with connect_db(app.config.POSTGRES_URL): async with connect_db(app.config.POSTGRES_URL):
async with make_session() as session:
log.info("Loading OpenStreetMap data") log.info("Loading OpenStreetMap data")
tile_source = DatabaseTileSource(async_session.get()) tile_source = DatabaseTileSource()
# tile_source = OverpassTileSource(app.config.OBS_FACE_CACHE_DIR) # tile_source = OverpassTileSource(app.config.OBS_FACE_CACHE_DIR)
data_source = DataSource(tile_source) data_source = DataSource(tile_source)
if args.tracks: if args.tracks:
async with make_session() as session:
await process_tracks(session, data_source, args.tracks) await process_tracks(session, data_source, args.tracks)
else: else:
await process_tracks_loop(session, data_source, args.loop_delay) await process_tracks_loop(data_source, args.loop_delay)
async def process_tracks_loop(session, data_source, delay):
while True:
track = (
await session.execute(
select(Track)
.where(Track.processing_status == "queued")
.order_by(Track.processing_queued_at)
.options(joinedload(Track.author))
)
).scalar()
if track is None:
await asyncio.sleep(delay)
else:
try:
await process_track(session, track, data_source)
except:
log.exception("Failed to process track %s. Will continue.", track.slug)
async def process_tracks(session, data_source, tracks):
"""
Processes the tracks and writes event data to the database.
:param tracks: A list of strings which
"""
for track_id_or_slug in tracks:
track = (
await session.execute(
select(Track)
.where(
Track.id == track_id_or_slug
if isinstance(track_id_or_slug, int)
else Track.slug == track_id_or_slug
)
.options(joinedload(Track.author))
)
).scalar()
if not track:
raise ValueError(f"Track {track_id_or_slug!r} not found.")
await process_track(session, track, data_source)
def to_naive_utc(t):
if t is None:
return None
return t.astimezone(pytz.UTC).replace(tzinfo=None)
async def process_track(session, track, data_source):
try:
track.processing_status = "complete"
track.processed_at = datetime.utcnow()
await session.commit()
original_file_path = track.get_original_file_path(app.config)
output_dir = join(
app.config.PROCESSING_OUTPUT_DIR, track.author.username, track.slug
)
os.makedirs(output_dir, exist_ok=True)
log.info("Annotating and filtering CSV file")
imported_data, statistics = ImportMeasurementsCsv().read(
original_file_path,
user_id="dummy", # TODO: user username or id or nothing?
dataset_id=Track.slug, # TODO: use track id or slug or nothing?
)
annotator = AnnotateMeasurements(
data_source, cache_dir=app.config.OBS_FACE_CACHE_DIR
)
input_data = await annotator.annotate(imported_data)
track_filter = ChainFilter(
RequiredFieldsFilter(),
PrivacyFilter(
user_id_mode=AnonymizationMode.REMOVE,
measurement_id_mode=AnonymizationMode.REMOVE,
),
# TODO: load user privacy zones and create a PrivacyZonesFilter() from them
)
measurements_filter = DistanceMeasuredFilter()
overtaking_events_filter = ConfirmedFilter()
track_points = track_filter.filter(input_data, log=log)
measurements = measurements_filter.filter(track_points, log=log)
overtaking_events = overtaking_events_filter.filter(measurements, log=log)
exporter = ExportMeasurements("measurements.dummy")
await exporter.add_measurements(measurements)
measurements_json = exporter.get_data()
del exporter
exporter = ExportMeasurements("overtaking_events.dummy")
await exporter.add_measurements(overtaking_events)
overtaking_events_json = exporter.get_data()
del exporter
track_json = {
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [[m["latitude"], m["longitude"]] for m in track_points],
},
}
for output_filename, data in [
("measurements.json", measurements_json),
("overtakingEvents.json", overtaking_events_json),
("track.json", track_json),
]:
target = join(output_dir, output_filename)
log.debug("Writing file %s", target)
with open(target, "w") as fp:
json.dump(data, fp, indent=4)
log.info("Import events into database...")
await clear_track_data(session, track)
await import_overtaking_events(session, track, overtaking_events)
log.info("Write track statistics and update status...")
track.recorded_at = to_naive_utc(statistics["t_min"])
track.recorded_until = to_naive_utc(statistics["t_max"])
track.duration = statistics["t"]
track.length = statistics["d"]
track.segments = statistics["n_segments"]
track.num_events = statistics["n_confirmed"]
track.num_measurements = statistics["n_measurements"]
track.num_valid = statistics["n_valid"]
track.processing_status = "complete"
track.processed_at = datetime.utcnow()
await session.commit()
log.info("Track %s imported.", track.slug)
except BaseException as e:
await clear_track_data(session, track)
track.processing_status = "error"
track.processing_log = str(e)
track.processed_at = datetime.utcnow()
await session.commit()
raise
async def clear_track_data(session, track):
track.recorded_at = None
track.recorded_until = None
track.duration = None
track.length = None
track.segments = None
track.num_events = None
track.num_measurements = None
track.num_valid = None
await session.execute(
delete(OvertakingEvent).where(OvertakingEvent.track_id == track.id)
)
async def import_overtaking_events(session, track, overtaking_events):
event_models = []
for m in overtaking_events:
hex_hash = hashlib.sha256(
struct.pack("QQ", track.id, int(m["time"].timestamp()))
).hexdigest()
event_models.append(
OvertakingEvent(
track_id=track.id,
hex_hash=hex_hash,
way_id=m["OSM_way_id"],
direction_reversed=m["OSM_way_orientation"] < 0,
geometry=json.dumps(
{
"type": "Point",
"coordinates": [m["longitude"], m["latitude"]],
}
),
latitude=m["latitude"],
longitude=m["longitude"],
time=m["time"].astimezone(pytz.utc).replace(tzinfo=None),
distance_overtaker=m["distance_overtaker"],
distance_stationary=m["distance_stationary"],
course=m["course"],
speed=m["speed"],
)
)
session.add_all(event_models)
if __name__ == "__main__": if __name__ == "__main__":

View file

@ -27,7 +27,7 @@ services:
- backend - backend
api: api:
image: obs-api image: openbikesensor-api
build: build:
context: ./source/api context: ./source/api
volumes: volumes:
@ -51,7 +51,7 @@ services:
- backend - backend
worker: worker:
image: obs-api image: openbikesensor-api
build: build:
context: ./source/api context: ./source/api
volumes: volumes:
@ -72,11 +72,11 @@ services:
frontend: frontend:
image: obs-frontend image: obs-frontend
volumes:
- ./config/frontend.json:/usr/local/apache2/htdocs/config.json
build: build:
context: ./source/frontend context: ./source/frontend
dockerfile: Dockerfile-prod dockerfile: Dockerfile-prod
volumes:
- ./config/frontend.json:/usr/local/apache2/htdocs/config.json
links: links:
- api - api
restart: on-failure restart: on-failure

View file

@ -28,7 +28,7 @@ services:
- ./local/postgres/data:/var/lib/postgresql/data - ./local/postgres/data:/var/lib/postgresql/data
api: api:
image: obs-api image: openbikesensor-api
build: build:
context: ./api/ context: ./api/
dockerfile: Dockerfile dockerfile: Dockerfile
@ -37,6 +37,7 @@ services:
- ./api/scripts/obs:/opt/obs/scripts/obs - ./api/scripts/obs:/opt/obs/scripts/obs
- ./api/tools:/opt/obs/api/tools - ./api/tools:/opt/obs/api/tools
- ./api/config.dev.py:/opt/obs/api/config.py - ./api/config.dev.py:/opt/obs/api/config.py
- ./frontend/build:/opt/obs/frontend/build
- ./local/api-data:/data - ./local/api-data:/data
links: links:
- postgres - postgres
@ -45,10 +46,10 @@ services:
- '3000:3000' - '3000:3000'
restart: on-failure restart: on-failure
command: command:
- obs-api - openbikesensor-api
worker: worker:
image: obs-api image: openbikesensor-api
build: build:
context: ./api/ context: ./api/
dockerfile: Dockerfile dockerfile: Dockerfile
@ -67,7 +68,7 @@ services:
- tools/process_track.py - tools/process_track.py
frontend: frontend:
image: obs-frontend image: openbikesensor-frontend
build: build:
context: ./frontend context: ./frontend
volumes: volumes:
@ -166,3 +167,18 @@ services:
DB_USER: obs DB_USER: obs
DB_PASSWORD: obs DB_PASSWORD: obs
# DB_SCHEMA: keycloak # DB_SCHEMA: keycloak
prod-test:
image: openbikesensor-portal
build:
context: ./
dockerfile: Dockerfile
volumes:
- ./api/config.prod-test.py:/opt/obs/api/config.py
- ./local/api-data:/data
links:
- postgres
- keycloak
ports:
- '3000:3000'
restart: on-failure

View file

@ -1,5 +1,6 @@
{ {
"apiUrl": "http://localhost:3000", "apiUrl": "http://localhost:3000/api",
"loginUrl": "http://localhost:3000/login",
"imprintUrl": "https://example.com/imprint", "imprintUrl": "https://example.com/imprint",
"privacyPolicyUrl": "https://example.com/privacy", "privacyPolicyUrl": "https://example.com/privacy",
"mapTileset": { "mapTileset": {

View file

@ -1,11 +1,6 @@
{ {
"apiUrl": "https://portal.example.com/api", "apiUrl": "https://portal.example.com/api",
"auth": { "loginUrl": "https://portal.example.com/login",
"server": "https://portal.example.com/api",
"clientId": "CHANGEME",
"scope": "*",
"redirectUri": "https://portal.example.com/redirect"
},
"imprintUrl": "https://example.com/imprint", "imprintUrl": "https://example.com/imprint",
"privacyPolicyUrl": "https://example.com/privacy", "privacyPolicyUrl": "https://example.com/privacy",
"mapTileset": { "mapTileset": {
@ -18,5 +13,5 @@
"longitude": 9.1797, "longitude": 9.1797,
"latitude": 48.7784 "latitude": 48.7784
}, },
"obsMapSource": "http://api.example.com/tileserver/data/v3.json" "obsMapSource": "http://portal.example.com/tileserver/data/v3.json"
} }

View file

@ -1,11 +1,16 @@
{ {
"apiUrl": "https://api.example.com", "apiUrl": "https://api.example.com",
"auth": {
"server": "https://api.example.com",
"clientId": "!!!<<<CHANGEME>>>!!!",
"scope": "*",
"redirectUri": "https://portal.example.com/redirect"
},
"imprintUrl": "https://portal.example.com/imprint", "imprintUrl": "https://portal.example.com/imprint",
"privacyPolicyUrl": "https://portal.example.com/privacy" "privacyPolicyUrl": "https://portal.example.com/privacy",
"mapTileset": {
"url": "https://tiles.wmflabs.org/bw-mapnik/{z}/{x}/{y}.png",
"minZoom": 0,
"maxZoom": 18
},
"mapHome": {
"zoom": 15,
"longitude": 7.8302,
"latitude": 47.9755
},
"obsMapSource": "https://portal.example.com/tileset/data/v3.json"
} }

View file

@ -47,7 +47,7 @@ class API {
async makeLoginUrl() { async makeLoginUrl() {
const config = await configPromise const config = await configPromise
const url = new URL(config.apiUrl + '/login') const url = new URL(config.loginUrl || (config.apiUrl + '/login'))
url.searchParams.append('next', window.location.href) // bring us back to the current page url.searchParams.append('next', window.location.href) // bring us back to the current page
return url.toString() return url.toString()
} }