#!/usr/bin/env python3 from dataclasses import asdict, dataclass import asyncio from os.path import basename, splitext import sys import logging import msgpack import psycopg from psycopg.types import TypeInfo from psycopg.types.shapely import register_shapely from shapely.wkb import loads from obs.api.app import app from obs.api.db import ZoneType, connect_db, make_session from obs.api.utils import chunk log = logging.getLogger(__name__) ROAD_BUFFER = 1000 AREA_BUFFER = 100 ROAD_TYPE = b"\x01" REGION_TYPE = b"\x02" @dataclass class Region: relation_id: int name: str admin_level: int geometry: bytes @dataclass class Road: way_id: int name: str zone: str directionality: int oneway: int geometry: bytes data_types = {ROAD_TYPE: Road, REGION_TYPE: Region} def read_file(filename, only_type: bytes): """ Reads a file iteratively, yielding Road and Region objects as they appear. Those may be mixed. """ with open(filename, "rb") as f: unpacker = msgpack.Unpacker(f) try: while True: type_id = unpacker.unpack() data = unpacker.unpack() if type_id == only_type: yield data_types[only_type](*data) except msgpack.OutOfData: pass async def import_osm(connection, filename, import_group=None): if import_group is None: import_group = splitext(basename(filename))[0] # Pass 1: Find IDs only road_ids = [] region_ids = [] for item in read_file(filename, only_type=ROAD_TYPE): road_ids.append(item.way_id) for item in read_file(filename, only_type=REGION_TYPE): region_ids.append(item.relation_id) async with connection.cursor() as cursor: print(f"Pass 1: Delete previous") print(f"Deleting import group {import_group}") await cursor.execute( "DELETE FROM road WHERE import_group = %s", (import_group,) ) await cursor.execute( "DELETE FROM region WHERE import_group = %s", (import_group,) ) print(f"Deleting roads by ID") for ids in chunk(road_ids, 10000): await cursor.execute("DELETE FROM road WHERE way_id = ANY(%s)", (ids,)) print(f"Deleting regions by ID") for ids in chunk(region_ids, 10000): await cursor.execute("DELETE FROM region WHERE relation_id = ANY(%s)", (ids,)) # Pass 2: Import print(f"Pass 2: Import Roads") async with cursor.copy( "COPY road (way_id, name, zone, directionality, oneway, geometry, import_group) FROM STDIN" ) as copy: for item in read_file(filename, ROAD_TYPE): await copy.write_row( ( item.way_id, item.name, item.zone, item.directionality, item.oneway, bytes.hex(item.geometry), import_group, ) ) print(f"Pass 2: Import Regions") async with cursor.copy( "COPY region (relation_id, name, geometry, admin_level, import_group) FROM STDIN" ) as copy: for item in read_file(filename, REGION_TYPE): await copy.write_row( ( item.relation_id, item.name, bytes.hex(item.geometry), item.admin_level, import_group, ) ) async def main(): url = app.config.POSTGRES_URL url = url.replace("+asyncpg", "") async with await psycopg.AsyncConnection.connect(url) as connection: for filename in sys.argv[1:]: print("Loading file", filename) await import_osm(connection, filename) if __name__ == "__main__": asyncio.run(main())