New import pipeline with a PBF conversion step

This commit is contained in:
Paul Bienkowski 2023-03-26 13:04:25 +02:00
parent 4c1c95e4ff
commit 59f074cb28
3 changed files with 327 additions and 0 deletions

View file

@ -12,3 +12,8 @@ asyncpg~=0.27.0
pyshp~=2.3.1
alembic~=1.9.4
stream-zip~=0.0.50
msgpack~=1.0.5
osmium~=3.6.0
psycopg~=3.1.8
shapely~=2.0.1
pyproj~=3.4.1

205
api/tools/convert_osm.py Executable file
View file

@ -0,0 +1,205 @@
#!/usr/bin/env python3
import sys
import re
import msgpack
import osmium
import shapely.wkb as wkb
from shapely.ops import transform
HIGHWAY_TYPES = {
"trunk",
"primary",
"secondary",
"tertiary",
"unclassified",
"residential",
"trunk_link",
"primary_link",
"secondary_link",
"tertiary_link",
"living_street",
"service",
"track",
"road",
}
ZONE_TYPES = {
"urban",
"rural",
"motorway",
}
URBAN_TYPES = {
"residential",
"living_street",
"road",
}
MOTORWAY_TYPES = {
"motorway",
"motorway_link",
}
ADMIN_LEVEL_MIN = 2
ADMIN_LEVEL_MAX = 8
MINSPEED_RURAL = 60
ONEWAY_YES = {"yes", "true", "1"}
ONEWAY_REVERSE = {"reverse", "-1"}
def parse_number(tag):
if not tag:
return None
match = re.search(r"[0-9]+", tag)
if not match:
return None
digits = match.group(0)
try:
return int(digits)
except ValueError:
return None
def determine_zone(tags):
highway = tags.get("highway")
zone = tags.get("zone:traffic")
if zone is not None:
if "rural" in zone:
return "rural"
if "motorway" in zone:
return "motorway"
return "urban"
# From here on we are guessing based on other tags
if highway in URBAN_TYPES:
return "urban"
if highway in MOTORWAY_TYPES:
return "motorway"
maxspeed_source = tags.get("source:maxspeed")
if maxspeed_source and "rural" in maxspeed_source:
return "rural"
if maxspeed_source and "urban" in maxspeed_source:
return "urban"
for key in ["maxspeed", "maxspeed:forward", "maxspeed:backward"]:
maxspeed = parse_number(tags.get(key))
if maxspeed is not None and maxspeed > MINSPEED_RURAL:
return "rural"
# default to urban if we have no idea
return "urban"
def determine_direction(tags, zone):
if (
tags.get("oneway") in ONEWAY_YES
or tags.get("junction") == "roundabout"
or zone == "motorway"
):
return 1, True
if tags.get("oneway") in ONEWAY_REVERSE:
return -1, True
return 0, False
class StreamPacker:
def __init__(self, stream, *args, **kwargs):
self.stream = stream
self.packer = msgpack.Packer(*args, autoreset=False, **kwargs)
def _write_out(self):
if hasattr(self.packer, "getbuffer"):
chunk = self.packer.getbuffer()
else:
chunk = self.packer.bytes()
self.stream.write(chunk)
self.packer.reset()
def pack(self, *args, **kwargs):
self.packer.pack(*args, **kwargs)
self._write_out()
def pack_array_header(self, *args, **kwargs):
self.packer.pack_array_header(*args, **kwargs)
self._write_out()
def pack_map_header(self, *args, **kwargs):
self.packer.pack_map_header(*args, **kwargs)
self._write_out()
def pack_map_pairs(self, *args, **kwargs):
self.packer.pack_map_pairs(*args, **kwargs)
self._write_out()
# A global factory that creates WKB from a osmium geometry
wkbfab = osmium.geom.WKBFactory()
from pyproj import Transformer
project = Transformer.from_crs("EPSG:4326", "EPSG:3857", always_xy=True).transform
class OSMHandler(osmium.SimpleHandler):
def __init__(self, packer):
self.packer = packer
super().__init__()
def way(self, way):
tags = way.tags
highway = tags.get("highway")
if not highway or highway not in HIGHWAY_TYPES:
return
zone = determine_zone(tags)
directionality, oneway = determine_direction(tags, zone)
name = tags.get("name")
geometry = wkb.loads(wkbfab.create_linestring(way), hex=True)
geometry = transform(project, geometry)
geometry = wkb.dumps(geometry)
self.packer.pack(b"\x01")
self.packer.pack([way.id, name, zone, directionality, oneway, geometry])
def area(self, area):
tags = area.tags
if tags.get("boundary") != "administrative":
return
admin_level = parse_number(tags.get("admin_level"))
if not admin_level:
return
if admin_level < ADMIN_LEVEL_MIN or admin_level > ADMIN_LEVEL_MAX:
return
name = tags.get("name")
geometry = bytes.fromhex(wkbfab.create_multipolygon(area))
self.packer.pack(b"\x02")
self.packer.pack(
[
area.id,
name,
admin_level,
geometry,
]
)
with open(sys.argv[2], "wb") as fout:
packer = StreamPacker(fout)
osmhandler = OSMHandler(packer)
osmhandler.apply_file(sys.argv[1], locations=True)

117
api/tools/import_osm.py Executable file
View file

@ -0,0 +1,117 @@
#!/usr/bin/env python3
from dataclasses import asdict, dataclass
import asyncio
from os.path import basename, splitext
import sys
import logging
import msgpack
import psycopg
from psycopg.types import TypeInfo
from psycopg.types.shapely import register_shapely
from shapely.wkb import loads
from obs.api.app import app
from obs.api.db import ZoneType, connect_db, make_session
from obs.api.utils import chunk
log = logging.getLogger(__name__)
ROAD_BUFFER = 1000
AREA_BUFFER = 100
@dataclass
class Region:
relation_id: int
name: str
admin_level: int
geometry: bytes
@dataclass
class Road:
way_id: int
name: str
zone: str
directionality: int
oneway: int
geometry: bytes
def read_file(filename, type_only=False):
"""
Reads a file iteratively, yielding Road and Region objects as they
appear. Those may be mixed.
"""
with open(filename, "rb") as f:
unpacker = msgpack.Unpacker(f)
try:
while True:
type_id = unpacker.unpack()
data = unpacker.unpack()
if type_id == b"\x01":
yield Road(*data)
# elif type_id == b"\x02":
# yield Region(*data)
except msgpack.OutOfData:
pass
async def import_osm(connection, filename, import_group=None):
if import_group is None:
import_group = splitext(basename(filename))[0]
# Pass 1: Find IDs only
road_ids = []
for item in read_file(filename):
road_ids.append(item.way_id)
async with connection.cursor() as cursor:
print(f"Pass 1: Delete previous")
print(f"Deleting import group {import_group}")
await cursor.execute(
"DELETE FROM road WHERE import_group = %s", (import_group,)
)
print(f"Deleting by ID")
for ids in chunk(road_ids, 10000):
await cursor.execute("DELETE FROM road WHERE way_id = ANY(%s)", (ids,))
# Pass 2: Import
print(f"Pass 2: Import")
async with cursor.copy(
"COPY road (way_id, name, zone, directionality, oneway, geometry, import_group) FROM STDIN"
) as copy:
for item in read_file(filename):
await copy.write_row(
(
item.way_id,
item.name,
item.zone,
item.directionality,
item.oneway,
bytes.hex(item.geometry),
import_group,
)
)
async def main():
url = app.config.POSTGRES_URL
url = url.replace("+asyncpg", "")
async with await psycopg.AsyncConnection.connect(url) as connection:
for filename in sys.argv[1:]:
print("Loading file", filename)
await import_osm(connection, filename)
if __name__ == "__main__":
asyncio.run(main())