Skip to content

Commit

Permalink
feat: allow passing bytesio object as json/yaml config (#13)
Browse files Browse the repository at this point in the history
* fix: json parsing if index out of range

* feat: allow parsing of bytesio json/yaml config

* fix: allow parsing of bytesio json/yaml config

* build: add docker compose setup for running pytest + db

* feat: allow passing of BytesIO config to PostgresClient

* fix: incorrect handling of dbshell.close() if not exists

* fix: update underpass default url

* fix: improve handling for queryExec + reduce poll to 2s

* test: add test for PostgresClient queryRemote

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* test: force add extra test aoi, comment fgb test

* feat: update queryRemote polling interval logic

* refactor: add error log if max polling duration reached

* test: add logger to remoteQuery tests

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
spwoodcock and pre-commit-ci[bot] authored Feb 8, 2024
1 parent e76442f commit 2f1ea40
Show file tree
Hide file tree
Showing 8 changed files with 311 additions and 53 deletions.
57 changes: 57 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Copyright (c) 2022, 2023 Humanitarian OpenStreetMap Team
# This file is part of osm-rawdata.
#
# osm-rawdata is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# osm-rawdata is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with osm-rawdata. If not, see <https:#www.gnu.org/licenses/>.
#

version: "3"

networks:
net:
name: osm-rawdata

services:
rawdata:
image: "ghcr.io/hotosm/osm-rawdata:ci"
build:
target: ci
container_name: osm-rawdata
volumes:
# Mount local package
- ./osm_rawdata:/root/.local/lib/python3.10/site-packages/osm_rawdata
# Mount local tests
- ./tests:/data/tests
depends_on:
db:
condition: service_healthy
networks:
- net
restart: "unless-stopped"
command: "pytest"

db:
image: "postgis/postgis:14-3.4-alpine"
environment:
- POSTGRES_USER=fmtm
- POSTGRES_PASSWORD=testpass
- POSTGRES_DB=underpass
networks:
- net
restart: "unless-stopped"
healthcheck:
test: pg_isready -U fmtm -d underpass
start_period: 5s
interval: 10s
timeout: 5s
retries: 3
35 changes: 25 additions & 10 deletions osm_rawdata/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import json
import logging
import sys
from io import BytesIO

# import time
from pathlib import Path
from sys import argv
from typing import Union

import flatdict
import yaml
Expand Down Expand Up @@ -68,16 +70,16 @@ def __init__(self, boundary: Polygon = None):
# for polygon extracts, sometimes we just want the center point
self.centroid = False

def parseYaml(self, filespec: str):
def parseYaml(self, config: Union[str, BytesIO]):
"""Parse the YAML config file format into the internal data structure.
Args:
filespec (str): The file to read.
config (str, BytesIO): the file or BytesIO object to read.
Returns:
config (dict): The config data.
"""
yaml_data = self.load_yaml(filespec)
yaml_data = self.load_yaml(config)

self._yaml_parse_tables(yaml_data)
self._yaml_parse_where(yaml_data)
Expand All @@ -87,7 +89,7 @@ def parseYaml(self, filespec: str):
return self.config

@staticmethod
def load_yaml(filespec: str):
def load_yaml(config: Union[str, BytesIO]):
"""Private method to load YAML data from a file.
Args:
Expand All @@ -96,8 +98,14 @@ def load_yaml(filespec: str):
Returns:
data (dict): The loaded YAML data.
"""
with open(filespec, "r") as file:
return yaml.safe_load(file)
if isinstance(config, str):
with open(config, "r") as file:
return yaml.safe_load(file)
elif isinstance(config, BytesIO):
return yaml.safe_load(config.getvalue())
else:
log.error(f"Unsupported config format: {config}")
raise ValueError(f"Invalid config {config}")

def _yaml_parse_tables(self, data):
"""Private method to parse 'from' data.
Expand Down Expand Up @@ -176,18 +184,25 @@ def _yaml_parse_select_and_keep(self, data):
for tag in data["keep"]:
self.config["select"][table].append({tag: []})

def parseJson(self, filespec: str):
def parseJson(self, config: Union[str, BytesIO]):
"""Parse the JSON format config file used by the raw-data-api
and export tool.
Args:
filespec (str): the file to read
config (str, BytesIO): the file or BytesIO object to read.
Returns:
config (dict): the config data
"""
file = open(filespec, "r")
data = json.load(file)
if isinstance(config, str):
with open(config, "r") as config_file:
data = json.load(config_file)
elif isinstance(config, BytesIO):
data = json.load(config)
else:
log.error(f"Unsupported config format: {config}")
raise ValueError(f"Invalid config {config}")

# Get the geometry
self.geometry = shape(data["geometry"])
for key, value in flatdict.FlatDict(data).items():
Expand Down
141 changes: 102 additions & 39 deletions osm_rawdata/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from io import BytesIO
from pathlib import Path
from sys import argv
from typing import Optional, Union

import geojson
import psycopg2
Expand Down Expand Up @@ -132,7 +133,7 @@ def __init__(

# Use a persistant connect, better for multiple requests
self.session = requests.Session()
self.url = os.getenv("UNDERPASS_API_URL", "https://raw-data-api0.hotosm.org/v1")
self.url = os.getenv("UNDERPASS_API_URL", "https://api-prod.raw-data.hotosm.org/v1")
self.headers = {"accept": "application/json", "Content-Type": "application/json"}
else:
log.info(f"Opening database connection to: {self.uri['dbname']}")
Expand All @@ -156,10 +157,9 @@ def __init__(
log.error(f"Couldn't connect to database: {e}")

def __del__(self):
"""
Close any open connections to Postgres.
"""
self.dbshell.close()
"""Close any open connections to Postgres."""
if self.dbshell:
self.dbshell.close()

def createJson(
self,
Expand Down Expand Up @@ -289,12 +289,14 @@ def createSQL(
jor = ""
for entry in join_or:
for k, v in entry.items():
if type(v[0]) == list:
# It's an array of values
value = str(v[0])
any = f"ANY(ARRAY{value})"
jor += f"tags->>'{k}'={any} OR "
continue
# Check if v is a non-empty list
if isinstance(v, list) and v:
if isinstance(v[0], list):
# It's an array of values
value = str(v[0])
any = f"ANY(ARRAY{value})"
jor += f"tags->>'{k}'={any} OR "
continue
if k == "op":
continue
if len(v) == 1:
Expand Down Expand Up @@ -438,7 +440,7 @@ def queryLocal(

def queryRemote(
self,
query: str = None,
query: str,
):
"""This queries a remote postgres database using the FastAPI
backend to the HOT Export Tool.
Expand All @@ -449,22 +451,63 @@ def queryRemote(
Returns:
(FeatureCollection): the results of the query
"""
# Send the request to raw data api
result = None

url = f"{self.url}/snapshot/"
result = self.session.post(url, data=query, headers=self.headers)
try:
result = self.session.post(url, data=query, headers=self.headers)
result.raise_for_status()
except requests.exceptions.HTTPError:
if result is not None:
error_dict = result.json()
error_dict["status_code"] = result.status_code
log.error(f"Failed to get extract from Raw Data API: {error_dict}")
return error_dict
else:
log.error("Failed to make request to raw data API")

if result is None:
log.error("Raw Data API did not return a response. Skipping.")
return None

if result.status_code != 200:
log.error(f"{result.json()['detail'][0]['msg']}")
error_message = result.json().get("detail")[0].get("msg")
log.error(f"{error_message}")
return None
task_id = result.json()["task_id"]
newurl = f"{self.url}/tasks/status/{task_id}"
while True:
result = self.session.get(newurl, headers=self.headers)
if result.json()["status"] == "PENDING":
log.debug("Retrying...")
time.sleep(1)
elif result.json()["status"] == "SUCCESS":

task_id = result.json().get("task_id")
task_query_url = f"{self.url}/tasks/status/{task_id}"
log.debug(f"Raw Data API Query URL: {task_query_url}")

polling_interval = 2 # Initial polling interval in seconds
max_polling_duration = 600 # Maximum duration for polling in seconds (10 minutes)
elapsed_time = 0

while elapsed_time < max_polling_duration:
result = self.session.get(task_query_url, headers=self.headers)
result_json = result.json()

if result_json.get("status") == "PENDING":
# Adjust polling frequency after the first minute
if elapsed_time > 60:
polling_interval = 10 # Poll every 10 seconds after the first minute

# Wait before polling again
log.debug(f"Waiting {polling_interval} seconds before polling API again...")
time.sleep(polling_interval)
elapsed_time += polling_interval

elif result_json.get("status") == "SUCCESS":
break
zip = result.json()["result"]["download_url"]
result = self.session.get(zip, headers=self.headers)

else:
# Maximum polling duration reached
log.error(f"{max_polling_duration} second elapsed. Aborting data extract.")
return None

zip_url = result_json["result"]["download_url"]
result = self.session.get(zip_url, headers=self.headers)
fp = BytesIO(result.content)
zfp = zipfile.ZipFile(fp, "r")
zfp.extract("Export.geojson", "/tmp/")
Expand All @@ -473,39 +516,59 @@ def queryRemote(
os.remove("/tmp/Export.geojson")
return json.loads(data)

# return zfp.read("Export.geojson")


class PostgresClient(DatabaseAccess):
"""Class to handle SQL queries for the categories."""

def __init__(
self,
uri: str,
config: str = None,
config: Optional[Union[str, BytesIO]] = None,
# output: str = None
):
"""This is a client for a postgres database.
Args:
uri (str): The URI string for the database connection
config (str): The filespec for the query config file
uri (str): The URI string for the database connection.
config (str, BytesIO): The query config file path or BytesIO object.
Currently only YAML format is accepted if BytesIO is passed.
Returns:
(bool): Whether the data base connection was sucessful
"""
super().__init__(uri)
self.qc = QueryConfig()

if config:
# Load the config file for the SQL query
path = Path(config)
if path.suffix == ".json":
self.qc.parseJson(config)
elif path.suffix == ".yaml":
self.qc.parseYaml(config)
# filespec string passed
if isinstance(config, str):
path = Path(config)
if not path.exists():
raise FileNotFoundError(f"Config file does not exist {config}")
with open(config, "rb") as config_file:
config_data = BytesIO(config_file.read())
if path.suffix == ".json":
config_type = "json"
elif path.suffix == ".yaml":
config_type = "yaml"
else:
log.error(f"Unsupported file format: {config}")
raise ValueError(f"Invalid config {config}")

# BytesIO object passed
elif isinstance(config, BytesIO):
config_data = config
config_type = "yaml"

else:
log.error(f"{path} is an unsupported file format!")
quit()
log.warning(f"Config input is invalid for PostgresClient: {config}")
raise ValueError(f"Invalid config {config}")

# Parse the config
if config_type == "json":
self.qc.parseJson(config_data)
elif config_type == "yaml":
self.qc.parseYaml(config_data)

def createDB(self, dburi: uriParser):
"""Setup the postgres database connection.
Expand Down Expand Up @@ -568,8 +631,8 @@ def execQuery(
alldata += result["features"]
collection = FeatureCollection(alldata)
else:
request = self.createJson(self.qc, poly, allgeom)
collection = self.queryRemote(request)
json_config = self.createJson(self.qc, poly, allgeom)
collection = self.queryRemote(json_config)
return collection


Expand Down
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ testpaths = [
"tests",
]
pythonpath = "osm_rawdata"
log_cli = true
log_cli_level = "DEBUG"

[tool.commitizen]
name = "cz_conventional_commits"
Expand Down
Loading

0 comments on commit 2f1ea40

Please sign in to comment.