diff --git a/osm_rawdata/importer.py b/osm_rawdata/importer.py index 3c72c83..f50395d 100755 --- a/osm_rawdata/importer.py +++ b/osm_rawdata/importer.py @@ -20,54 +20,48 @@ # import argparse +import concurrent.futures import logging import subprocess import sys -import os -import concurrent.futures -import geojson -from geojson import Feature, FeatureCollection -from sys import argv from pathlib import Path -from cpuinfo import get_cpu_info -from shapely.geometry import shape +from sys import argv + # from geoalchemy2 import shape import geoalchemy2 -import shapely - -from pandas import DataFrame -import pyarrow.parquet as pq +import geojson from codetiming import Timer -from osm_rawdata.postgres import uriParser -from progress.spinner import PixelSpinner +from cpuinfo import get_cpu_info +from pandas import DataFrame from shapely import wkb +from shapely.geometry import shape from sqlalchemy import MetaData, cast, column, create_engine, select, table, text from sqlalchemy.dialects.postgresql import JSONB, insert +from sqlalchemy.engine.base import Connection from sqlalchemy.orm import sessionmaker from sqlalchemy_utils import create_database, database_exists -from sqlalchemy.engine.base import Connection -from shapely.geometry import Point, LineString, Polygon -from shapely import wkt, wkb # Find the other files for this project import osm_rawdata as rw import osm_rawdata.db_models from osm_rawdata.db_models import Base from osm_rawdata.overture import Overture +from osm_rawdata.postgres import uriParser rootdir = rw.__path__[0] # Instantiate logger -log = logging.getLogger('osm-rawdata') +log = logging.getLogger("osm-rawdata") # The number of threads is based on the CPU cores info = get_cpu_info() -cores = info['count'] +cores = info["count"] + def importThread( - data: list, - db: Connection, - ): + data: list, + db: Connection, +): """Thread to handle importing Args: @@ -75,15 +69,15 @@ def importThread( db (Connection): A database connection """ # log.debug(f"In importThread()") - #timer = Timer(text="importThread() took {seconds:.0f}s") - #timer.start() + # timer = Timer(text="importThread() took {seconds:.0f}s") + # timer.start() ways = table( "ways_poly", column("id"), column("user"), column("geom"), column("tags"), - ) + ) nodes = table( "nodes", @@ -91,7 +85,7 @@ def importThread( column("user"), column("geom"), column("tags"), - ) + ) nodes = table( "ways_line", @@ -99,7 +93,7 @@ def importThread( column("user"), column("geom"), column("tags"), - ) + ) index = 0 @@ -107,34 +101,35 @@ def importThread( # log.debug(feature) index -= 1 entry = dict() - tags = feature['properties'] - tags['building'] = 'yes' - entry['id'] = index + tags = feature["properties"] + tags["building"] = "yes" + entry["id"] = index ewkt = shape(feature["geometry"]) geom = wkb.dumps(ewkt) type = ewkt.geom_type scalar = select(cast(tags, JSONB)) - if type == 'Polygon': + if type == "Polygon": sql = insert(ways).values( # id = entry['id'], geom=geom, tags=scalar, - ) - elif type == 'Point': + ) + elif type == "Point": sql = insert(nodes).values( # id = entry['id'], geom=geom, tags=scalar, - ) + ) db.execute(sql) # db.commit() + def parquetThread( data: DataFrame, db: Connection, - ): +): """Thread to handle importing Args: @@ -149,7 +144,7 @@ def parquetThread( column("user"), column("geom"), column("tags"), - ) + ) nodes = table( "nodes", @@ -157,7 +152,7 @@ def parquetThread( column("user"), column("geom"), column("tags"), - ) + ) lines = table( "ways_line", @@ -165,45 +160,45 @@ def parquetThread( column("user"), column("geom"), column("tags"), - ) + ) index = -1 log.debug(f"There are {len(data)} entries in the data") - if len(data) == 0: + if len(data) == 0: return overture = Overture() for index in data.index: feature = data.loc[index] - dataset = feature['sources'][0]['dataset'] - if dataset == 'OpenStreetMap' or dataset == 'Microsoft ML Buildings': + dataset = feature["sources"][0]["dataset"] + if dataset == "OpenStreetMap" or dataset == "Microsoft ML Buildings": continue tags = overture.parse(feature) - geom = feature['geometry'] + geom = feature["geometry"] hex = wkb.loads(geom, hex=True) gdata = geoalchemy2.shape.from_shape(hex, srid=4326, extended=True) # geom_type = wkb.loads(geom).geom_type - scalar = select(cast(tags['properties'], JSONB)) + scalar = select(cast(tags["properties"], JSONB)) sql = None - if hex.geom_type == 'Polygon': + if hex.geom_type == "Polygon": sql = insert(ways).values( # osm_id = entry['osm_id'], geom=bytes(gdata.data), tags=scalar, ) - elif hex.geom_type == 'MultiPolygon': + elif hex.geom_type == "MultiPolygon": gdata = geoalchemy2.shape.from_shape(hex.convex_hull, srid=4326, extended=True) sql = insert(ways).values( geom=bytes(gdata.data), tags=scalar, ) - elif hex.geom_type == 'Point': + elif hex.geom_type == "Point": sql = insert(nodes).values( # osm_id = entry['osm_id'], geom=bytes(gdata.data), tags=scalar, ) - elif hex.geom_type == 'LineString': + elif hex.geom_type == "LineString": sql = insert(lines).values( # osm_id = entry['osm_id'], geom=bytes(gdata.data), @@ -219,6 +214,7 @@ def parquetThread( # print(f"FIXME2: {entry}") timer.stop() + class MapImporter(object): def __init__( self, @@ -246,14 +242,14 @@ def __init__( meta = MetaData() meta.create_all(engine) - # if dburi: - # self.uri = uriParser(dburi) - # engine = create_engine(f"postgresql://{self.dburi}", echo=True) - # if not database_exists(engine.url): - # create_database(engine.url) - # self.db = engine.connect() + # if dburi: + # self.uri = uriParser(dburi) + # engine = create_engine(f"postgresql://{self.dburi}", echo=True) + # if not database_exists(engine.url): + # create_database(engine.url) + # self.db = engine.connect() - # Add the extension we need to process the data + # Add the extension we need to process the data sql = text( "CREATE EXTENSION IF NOT EXISTS postgis; CREATE EXTENSION IF NOT EXISTS hstore;CREATE EXTENSION IF NOT EXISTS dblink;" ) @@ -364,8 +360,8 @@ def importGeoJson( """ # load the GeoJson file file = open(infile, "r") - #size = os.path.getsize(infile) - #for line in file.readlines(): + # size = os.path.getsize(infile) + # for line in file.readlines(): # print(line) data = geojson.load(file) @@ -378,12 +374,12 @@ def importGeoJson( timer.start() # A chunk is a group of threads - entries = len(data['features']) + entries = len(data["features"]) chunk = round(entries / cores) # For small files we only need one thread if entries <= chunk: - result = importThread(data['features'], self.connections[0]) + result = importThread(data["features"], self.connections[0]) timer.stop() return True @@ -391,7 +387,7 @@ def importGeoJson( block = 0 while block <= entries: log.debug("Dispatching Block %d:%d" % (block, block + chunk)) - result = executor.submit(importThread, data['features'][block : block + chunk], self.connections[index]) + result = executor.submit(importThread, data["features"][block : block + chunk], self.connections[index]) block += chunk index += 1 executor.shutdown() @@ -399,6 +395,7 @@ def importGeoJson( return True + def main(): """This main function lets this class be run standalone by a bash script.""" parser = argparse.ArgumentParser( @@ -445,6 +442,7 @@ def main(): mi.importParquet(args.infile) log.info(f"Imported {args.infile} into {args.uri}") + if __name__ == "__main__": """This is just a hook so this file can be run standalone during development.""" main() diff --git a/osm_rawdata/overture.py b/osm_rawdata/overture.py index 0bf955b..38148e7 100755 --- a/osm_rawdata/overture.py +++ b/osm_rawdata/overture.py @@ -14,39 +14,38 @@ # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . - + import argparse import logging +import math import sys -import os -from sys import argv -from geojson import Point, Feature, FeatureCollection, dump, Polygon + import geojson -from shapely.geometry import shape, Polygon, mapping -import shapely -from shapely import wkt, wkb -import pyarrow.parquet as pq -# import pyarrow as pa -from pandas import Series import pandas as pd -import math +from codetiming import Timer +from geojson import Feature, FeatureCollection from numpy import ndarray + +# import pyarrow as pa +from pandas import Series from progress.spinner import PixelSpinner -from codetiming import Timer +from shapely import wkb # Instantiate logger -log = logging.getLogger('osm-rawdata') +log = logging.getLogger("osm-rawdata") + class Overture(object): - def __init__(self, - filespec: str = None, - ): + def __init__( + self, + filespec: str = None, + ): """A class for parsing Overture V2 files. Args: data (list): The list of features """ - #pfile = pq.ParquetFile(filespec) + # pfile = pq.ParquetFile(filespec) # self.data = pfile.read() if filespec: try: @@ -56,19 +55,20 @@ def __init__(self, log.error(f"Couldn't read data from {filespec}!") self.filespec = filespec - def parse(self, - data: Series, - ): + def parse( + self, + data: Series, + ): # log.debug(data) entry = dict() # timer = Timer(text="importParquet() took {seconds:.0f}s") # timer.start() - for key,value in data.to_dict().items(): + for key, value in data.to_dict().items(): if value is None: continue if type(value) == float and math.isnan(value): continue - if key == 'geometry': + if key == "geometry": geom = wkb.loads(value) if type(value) == ndarray: # print(f"LIST: {key} = {value}") @@ -76,30 +76,34 @@ def parse(self, for k1, v1 in value[0].items(): if v1 is not None: if type(v1) == ndarray: - import epdb; epdb.st() + import epdb + + epdb.st() entry[k1] = v1 else: # FIXME: for now the data only has one entry in the array, # but this could change. if type(value[0]) == ndarray: - import epdb; epdb.st() + import epdb + + epdb.st() entry[key] = value[0] continue - if key == 'sources' and type(value) == list: - if 'dataset' in value[0]: - entry['source'] = value[0]['dataset'] - if 'recordId' in valve[0] and ['recordId'] is not None: - entry['record'] = value[0]['recordId'] - if value[0]['confidence'] is not None: - entry['confidence'] = value[0]['confidence'] + if key == "sources" and type(value) == list: + if "dataset" in value[0]: + entry["source"] = value[0]["dataset"] + if "recordId" in valve[0] and ["recordId"] is not None: + entry["record"] = value[0]["recordId"] + if value[0]["confidence"] is not None: + entry["confidence"] = value[0]["confidence"] else: - entry['source'] = value['dataset'] - if value[0]['recordId'] is not None: - entry['record'] = value[0]['recordId'] - if value[0]['confidence'] is not None: - entry['confidence'] = value[0]['confidence'] + entry["source"] = value["dataset"] + if value[0]["recordId"] is not None: + entry["record"] = value[0]["recordId"] + if value[0]["confidence"] is not None: + entry["confidence"] = value[0]["confidence"] if type(value) == dict: - if key == 'bbox': + if key == "bbox": continue for k1, v1 in value.items(): if v1 is None: @@ -110,7 +114,7 @@ def parse(self, if v2 is None: continue if type(v2) == ndarray: - for k3,v3 in v2.tolist()[0].items(): + for k3, v3 in v2.tolist()[0].items(): if v3 is not None: entry[k3] = v3 elif type(v2) == str: @@ -118,15 +122,16 @@ def parse(self, continue # FIXME: we should use the language to adjust the name tag # lang = v1[0]['language'] - #timer.stop() + # timer.stop() return Feature(geometry=geom, properties=entry) + def main(): """This main function lets this class be run standalone by a bash script, primarily to assist in code development or debugging. This should really be a test case. """ - categories = ('buildings', 'places', 'highways', 'admin', 'localities') + categories = ("buildings", "places", "highways", "admin", "localities") parser = argparse.ArgumentParser( prog="conflateDB", formatter_class=argparse.RawDescriptionHelpFormatter, @@ -134,7 +139,7 @@ def main(): ) parser.add_argument("-v", "--verbose", action="store_true", help="verbose output") parser.add_argument("-i", "--infile", required=True, help="Input file") - parser.add_argument("-o", "--outfile", default='overture.geojson', help="Output file") + parser.add_argument("-o", "--outfile", default="overture.geojson", help="Output file") args = parser.parse_args() @@ -143,9 +148,7 @@ def main(): log.setLevel(logging.DEBUG) ch = logging.StreamHandler(sys.stdout) ch.setLevel(logging.DEBUG) - formatter = logging.Formatter( - "%(threadName)10s - %(name)s - %(levelname)s - %(message)s" - ) + formatter = logging.Formatter("%(threadName)10s - %(name)s - %(levelname)s - %(message)s") ch.setFormatter(formatter) log.addHandler(ch) @@ -159,11 +162,11 @@ def main(): spin.next() feature = overture.data.loc[index] entry = overture.parse(feature) - if entry['properties']['dataset'] != 'OpenStreetMap': + if entry["properties"]["dataset"] != "OpenStreetMap": features.append(entry) if len(features) > 0: - file = open(args.outfile, 'w') + file = open(args.outfile, "w") geojson.dump(FeatureCollection(features), file) timer.stop() log.info(f"Wrote {args.outfile}") @@ -172,6 +175,7 @@ def main(): spin.finish() + if __name__ == "__main__": """This is just a hook so this file can be run standlone during development.""" main()