From 357d3cc6bae549784a2860ffaafa18288f1a9ac6 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Mon, 7 Aug 2023 17:05:11 +0200 Subject: [PATCH] Issue #424 added initial support for load_geojson --- CHANGELOG.md | 2 + openeo/api/process.py | 13 ++ openeo/rest/_testing.py | 98 +++++++++++++++ openeo/rest/connection.py | 36 +++++- openeo/rest/vectorcube.py | 59 ++++++++- openeo/udf/udf_data.py | 2 +- tests/rest/conftest.py | 13 ++ tests/rest/datacube/test_vectorcube.py | 167 ++++++++++--------------- tests/rest/test_connection.py | 28 ++++- 9 files changed, 306 insertions(+), 112 deletions(-) create mode 100644 openeo/rest/_testing.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 3474dcec8..ec642f337 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Initial `load_geojson` support with `Connection.load_geojson()` ([#424](https://github.com/Open-EO/openeo-python-client/issues/424)) + ### Changed - `Connection` based requests: always use finite timeouts by default (20 minutes in general, 30 minutes for synchronous execute requests) diff --git a/openeo/api/process.py b/openeo/api/process.py index f8ae0bf66..405828f24 100644 --- a/openeo/api/process.py +++ b/openeo/api/process.py @@ -45,6 +45,19 @@ def raster_cube(cls, name: str = "data", description: str = "A data cube.") -> ' """ return cls(name=name, description=description, schema={"type": "object", "subtype": "raster-cube"}) + @classmethod + def datacube(cls, name: str = "data", description: str = "A data cube.") -> "Parameter": + """ + Helper to easily create a 'datacube' parameter. + + :param name: name of the parameter. + :param description: description of the parameter + :return: Parameter + + .. versionadded:: 0.22.0 + """ + return cls(name=name, description=description, schema={"type": "object", "subtype": "datacube"}) + @classmethod def string(cls, name: str, description: str = None, default=_DEFAULT_UNDEFINED, values=None) -> 'Parameter': """Helper to create a 'string' type parameter.""" diff --git a/openeo/rest/_testing.py b/openeo/rest/_testing.py new file mode 100644 index 000000000..73458f82a --- /dev/null +++ b/openeo/rest/_testing.py @@ -0,0 +1,98 @@ +import re + +from openeo import Connection + + +class DummyBackend: + """ + Dummy backend that handles sync/batch execution requests + and allows inspection of posted process graphs + """ + + # Default result (can serve both as JSON or binary data) + DEFAULT_RESULT = b'{"what?": "Result data"}' + + def __init__(self, requests_mock, connection: Connection): + self.connection = connection + self.sync_requests = [] + self.batch_jobs = {} + self.next_result = self.DEFAULT_RESULT + requests_mock.post(connection.build_url("/result"), content=self._handle_post_result) + requests_mock.post(connection.build_url("/jobs"), content=self._handle_post_jobs) + requests_mock.post( + re.compile(connection.build_url(r"/jobs/(job-\d+)/results$")), content=self._handle_post_job_results + ) + requests_mock.get(re.compile(connection.build_url(r"/jobs/(job-\d+)$")), json=self._handle_get_job) + requests_mock.get( + re.compile(connection.build_url(r"/jobs/(job-\d+)/results$")), json=self._handle_get_job_results + ) + requests_mock.get( + re.compile(connection.build_url("/jobs/(.*?)/results/result.data$")), + content=self._handle_get_job_result_asset, + ) + + def _handle_post_result(self, request, context): + """handler of `POST /result` (synchronous execute)""" + pg = request.json()["process"]["process_graph"] + self.sync_requests.append(pg) + return self.next_result + + def _handle_post_jobs(self, request, context): + """handler of `POST /jobs` (create batch job)""" + pg = request.json()["process"]["process_graph"] + job_id = f"job-{len(self.batch_jobs):03d}" + self.batch_jobs[job_id] = {"job_id": job_id, "pg": pg, "status": "created"} + context.status_code = 201 + context.headers["openeo-identifier"] = job_id + + def _get_job_id(self, request) -> str: + match = re.match(r"^/jobs/(job-\d+)(/|$)", request.path) + if not match: + raise ValueError(f"Failed to extract job_id from {request.path}") + job_id = match.group(1) + assert job_id in self.batch_jobs + return job_id + + def _handle_post_job_results(self, request, context): + """Handler of `POST /job/{job_id}/results` (start batch job).""" + job_id = self._get_job_id(request) + assert self.batch_jobs[job_id]["status"] == "created" + # TODO: support custom status sequence (instead of directly going to status "finished")? + self.batch_jobs[job_id]["status"] = "finished" + context.status_code = 202 + + def _handle_get_job(self, request, context): + """Handler of `GET /job/{job_id}` (get batch job status and metadata).""" + job_id = self._get_job_id(request) + return {"id": job_id, "status": self.batch_jobs[job_id]["status"]} + + def _handle_get_job_results(self, request, context): + """Handler of `GET /job/{job_id}/results` (list batch job results).""" + job_id = self._get_job_id(request) + assert self.batch_jobs[job_id]["status"] == "finished" + return { + "id": job_id, + "assets": {"result.data": {"href": self.connection.build_url(f"/jobs/{job_id}/results/result.data")}}, + } + + def _handle_get_job_result_asset(self, request, context): + """Handler of `GET /job/{job_id}/results/result.data` (get batch job result asset).""" + job_id = self._get_job_id(request) + assert self.batch_jobs[job_id]["status"] == "finished" + return self.next_result + + def get_sync_pg(self) -> dict: + """Get one and only synchronous process graph""" + assert len(self.sync_requests) == 1 + return self.sync_requests[0] + + def get_batch_pg(self) -> dict: + """Get one and only batch process graph""" + assert len(self.batch_jobs) == 1 + return self.batch_jobs[max(self.batch_jobs.keys())]["pg"] + + def get_pg(self) -> dict: + """Get one and only batch process graph (sync or batch)""" + pgs = self.sync_requests + [b["pg"] for b in self.batch_jobs.values()] + assert len(pgs) == 1 + return pgs[0] diff --git a/openeo/rest/connection.py b/openeo/rest/connection.py index 17765b042..bcdceb569 100644 --- a/openeo/rest/connection.py +++ b/openeo/rest/connection.py @@ -15,10 +15,12 @@ import requests from requests import Response from requests.auth import HTTPBasicAuth, AuthBase +import shapely.geometry.base import openeo from openeo.capabilities import ApiVersionException, ComparableVersion from openeo.config import get_config_option, config_log +from openeo.internal.documentation import openeo_process from openeo.internal.graph_building import PGNode, as_flat_graph, FlatGraphableMixin from openeo.internal.jupyter import VisualDict, VisualList from openeo.internal.processes.builder import ProcessBuilderBase @@ -1095,12 +1097,13 @@ def datacube_from_json(self, src: Union[str, Path], parameters: Optional[dict] = """ return self.datacube_from_flat_graph(load_json_resource(src), parameters=parameters) + @openeo_process def load_collection( self, collection_id: str, spatial_extent: Optional[Dict[str, float]] = None, - temporal_extent: Optional[List[Union[str, datetime.datetime, datetime.date]]] = None, - bands: Optional[List[str]] = None, + temporal_extent: Optional[List[Union[str, datetime.datetime, datetime.date]]] = None, + bands: Optional[List[str]] = None, properties: Optional[Dict[str, Union[str, PGNode, Callable]]] = None, max_cloud_cover: Optional[float] = None, fetch_metadata=True, @@ -1131,6 +1134,7 @@ def load_collection( load_collection, name="imagecollection", since="0.4.10" ) + @openeo_process def load_result( self, id: str, @@ -1168,6 +1172,7 @@ def load_result( cube.metadata = metadata return cube + @openeo_process def load_stac( self, url: str, @@ -1305,6 +1310,33 @@ def load_ml_model(self, id: Union[str, BatchJob]) -> "MlModel": """ return MlModel.load_ml_model(connection=self, id=id) + @openeo_process + def load_geojson( + self, + data: Union[dict, str, Path, shapely.geometry.base.BaseGeometry, Parameter], + properties: Optional[List[str]] = None, + ): + """ + Converts GeoJSON data as defined by RFC 7946 into a vector data cube. + + :param connection: the connection to use to connect with the openEO back-end. + :param data: the geometry to load. One of: + + - GeoJSON-style data structure: e.g. a dictionary with ``"type": "Polygon"`` and ``"coordinates"`` fields + - a path to a local GeoJSON file + - a GeoJSON string + - a shapely geometry object + + :param properties: A list of properties from the GeoJSON file to construct an additional dimension from. + :return: new VectorCube instance + + .. warning:: EXPERIMENTAL: this process is experimental with the potential for major things to change. + + .. versionadded:: 0.22.0 + """ + + return VectorCube.load_geojson(connection=self, data=data, properties=properties) + def create_service(self, graph: dict, type: str, **kwargs) -> Service: # TODO: type hint for graph: is it a nested or a flat one? req = self._build_request_with_process_graph(process_graph=graph, type=type, **kwargs) diff --git a/openeo/rest/vectorcube.py b/openeo/rest/vectorcube.py index c9e63ff30..32fa5c5f2 100644 --- a/openeo/rest/vectorcube.py +++ b/openeo/rest/vectorcube.py @@ -1,14 +1,18 @@ +import json import pathlib import typing -from typing import Union, Optional +from typing import List, Optional, Union +import shapely.geometry.base + +from openeo.api.process import Parameter from openeo.internal.documentation import openeo_process from openeo.internal.graph_building import PGNode from openeo.internal.warnings import legacy_alias from openeo.metadata import CollectionMetadata -from openeo.rest._datacube import _ProcessGraphAbstraction, UDF -from openeo.rest.mlmodel import MlModel +from openeo.rest._datacube import UDF, _ProcessGraphAbstraction from openeo.rest.job import BatchJob +from openeo.rest.mlmodel import MlModel from openeo.util import dict_no_none, guess_format if typing.TYPE_CHECKING: @@ -42,11 +46,58 @@ def process( :param process_id: process id of the process. :param args: argument dictionary for the process. - :return: new DataCube instance + :return: new VectorCube instance """ pg = self._build_pgnode(process_id=process_id, arguments=arguments, namespace=namespace, **kwargs) return VectorCube(graph=pg, connection=self._connection, metadata=metadata or self.metadata) + @classmethod + @openeo_process + def load_geojson( + cls, + connection: "openeo.Connection", + data: Union[dict, str, pathlib.Path, shapely.geometry.base.BaseGeometry, Parameter], + properties: Optional[List[str]] = None, + ): + """ + Converts GeoJSON data as defined by RFC 7946 into a vector data cube. + + :param connection: the connection to use to connect with the openEO back-end. + :param data: the geometry to load. One of: + + - GeoJSON-style data structure: e.g. a dictionary with ``"type": "Polygon"`` and ``"coordinates"`` fields + - a path to a local GeoJSON file + - a GeoJSON string + - a shapely geometry object + + :param properties: A list of properties from the GeoJSON file to construct an additional dimension from. + :return: new VectorCube instance + + .. warning:: EXPERIMENTAL: this process is experimental with the potential for major things to change. + + .. versionadded:: 0.22.0 + """ + # TODO: unify with `DataCube._get_geometry_argument` + if isinstance(data, str) and data.strip().startswith("{"): + # Assume JSON dump + geometry = json.loads(data) + elif isinstance(data, (str, pathlib.Path)): + # Assume local file + with pathlib.Path(data).open(mode="r", encoding="utf-8") as f: + geometry = json.load(f) + assert isinstance(geometry, dict) + elif isinstance(data, shapely.geometry.base.BaseGeometry): + geometry = shapely.geometry.mapping(data) + elif isinstance(data, Parameter): + geometry = data + elif isinstance(data, dict): + geometry = data + else: + raise ValueError(data) + + pg = PGNode(process_id="load_geojson", data=geometry, properties=properties or []) + return cls(graph=pg, connection=connection) + @openeo_process def run_udf( self, diff --git a/openeo/udf/udf_data.py b/openeo/udf/udf_data.py index b26cd16e8..8ea798665 100644 --- a/openeo/udf/udf_data.py +++ b/openeo/udf/udf_data.py @@ -32,7 +32,7 @@ def __init__( The constructor of the UDF argument class that stores all data required by the user defined function. - :param proj: A dictionary of form {"proj type string": "projection description"} i. e. {"EPSG":4326} + :param proj: A dictionary of form {"proj type string": "projection description"} e.g. {"EPSG": 4326} :param datacube_list: A list of data cube objects :param feature_collection_list: A list of VectorTile objects :param structured_data_list: A list of structured data objects diff --git a/tests/rest/conftest.py b/tests/rest/conftest.py index ffe27ba20..7e3df403f 100644 --- a/tests/rest/conftest.py +++ b/tests/rest/conftest.py @@ -8,6 +8,7 @@ import pytest import time_machine +from openeo.rest._testing import DummyBackend from openeo.rest.connection import Connection API_URL = "https://oeo.test/" @@ -71,8 +72,20 @@ def assert_oidc_device_code_flow(url: str = "https://oidc.test/dc", elapsed: flo return assert_oidc_device_code_flow +@pytest.fixture +def con100(requests_mock): + requests_mock.get(API_URL, json={"api_version": "1.0.0"}) + con = Connection(API_URL) + return con + + @pytest.fixture def con120(requests_mock): requests_mock.get(API_URL, json={"api_version": "1.2.0"}) con = Connection(API_URL) return con + + +@pytest.fixture +def dummy_backend(requests_mock, con100) -> DummyBackend: + yield DummyBackend(requests_mock=requests_mock, connection=con100) diff --git a/tests/rest/datacube/test_vectorcube.py b/tests/rest/datacube/test_vectorcube.py index c89ec09da..995f66af0 100644 --- a/tests/rest/datacube/test_vectorcube.py +++ b/tests/rest/datacube/test_vectorcube.py @@ -1,10 +1,11 @@ -import re from pathlib import Path import pytest +import shapely.geometry -from openeo import Connection +from openeo.api.process import Parameter from openeo.internal.graph_building import PGNode +from openeo.rest._testing import DummyBackend from openeo.rest.vectorcube import VectorCube @@ -14,103 +15,6 @@ def vector_cube(con100) -> VectorCube: return VectorCube(graph=pgnode, connection=con100) -class DummyBackend: - """ - Dummy backend that handles sync/batch execution requests - and allows inspection of posted process graphs - """ - - def __init__(self, requests_mock, connection: Connection): - self.connection = connection - self.sync_requests = [] - self.batch_jobs = {} - self.next_result = b"Result data" - requests_mock.post(connection.build_url("/result"), content=self._handle_post_result) - requests_mock.post(connection.build_url("/jobs"), content=self._handle_post_jobs) - requests_mock.post( - re.compile(connection.build_url(r"/jobs/(job-\d+)/results$")), content=self._handle_post_job_results - ) - requests_mock.get(re.compile(connection.build_url(r"/jobs/(job-\d+)$")), json=self._handle_get_job) - requests_mock.get( - re.compile(connection.build_url(r"/jobs/(job-\d+)/results$")), json=self._handle_get_job_results - ) - requests_mock.get( - re.compile(connection.build_url("/jobs/(.*?)/results/result.data$")), - content=self._handle_get_job_result_asset, - ) - - def _handle_post_result(self, request, context): - """handler of `POST /result` (synchronous execute)""" - pg = request.json()["process"]["process_graph"] - self.sync_requests.append(pg) - return self.next_result - - def _handle_post_jobs(self, request, context): - """handler of `POST /jobs` (create batch job)""" - pg = request.json()["process"]["process_graph"] - job_id = f"job-{len(self.batch_jobs):03d}" - self.batch_jobs[job_id] = {"job_id": job_id, "pg": pg, "status": "created"} - context.status_code = 201 - context.headers["openeo-identifier"] = job_id - - def _get_job_id(self, request) -> str: - match = re.match(r"^/jobs/(job-\d+)(/|$)", request.path) - if not match: - raise ValueError(f"Failed to extract job_id from {request.path}") - job_id = match.group(1) - assert job_id in self.batch_jobs - return job_id - - def _handle_post_job_results(self, request, context): - """Handler of `POST /job/{job_id}/results` (start batch job).""" - job_id = self._get_job_id(request) - assert self.batch_jobs[job_id]["status"] == "created" - # TODO: support custom status sequence (instead of directly going to status "finished")? - self.batch_jobs[job_id]["status"] = "finished" - context.status_code = 202 - - def _handle_get_job(self, request, context): - """Handler of `GET /job/{job_id}` (get batch job status and metadata).""" - job_id = self._get_job_id(request) - return {"id": job_id, "status": self.batch_jobs[job_id]["status"]} - - def _handle_get_job_results(self, request, context): - """Handler of `GET /job/{job_id}/results` (list batch job results).""" - job_id = self._get_job_id(request) - assert self.batch_jobs[job_id]["status"] == "finished" - return { - "id": job_id, - "assets": {"result.data": {"href": self.connection.build_url(f"/jobs/{job_id}/results/result.data")}}, - } - - def _handle_get_job_result_asset(self, request, context): - """Handler of `GET /job/{job_id}/results/result.data` (get batch job result asset).""" - job_id = self._get_job_id(request) - assert self.batch_jobs[job_id]["status"] == "finished" - return self.next_result - - def get_sync_pg(self) -> dict: - """Get one and only synchronous process graph""" - assert len(self.sync_requests) == 1 - return self.sync_requests[0] - - def get_batch_pg(self) -> dict: - """Get one and only batch process graph""" - assert len(self.batch_jobs) == 1 - return self.batch_jobs[max(self.batch_jobs.keys())]["pg"] - - def get_pg(self) -> dict: - """Get one and only batch process graph (sync or batch)""" - pgs = self.sync_requests + [b["pg"] for b in self.batch_jobs.values()] - assert len(pgs) == 1 - return pgs[0] - - -@pytest.fixture -def dummy_backend(requests_mock, con100) -> DummyBackend: - yield DummyBackend(requests_mock=requests_mock, connection=con100) - - def test_raster_to_vector(con100): img = con100.load_collection("S2") vector_cube = img.raster_to_vector() @@ -175,7 +79,8 @@ def test_download_auto_save_result_only_file( "result": True, }, } - assert output_path.read_bytes() == b"Result data" + assert output_path.read_bytes() == DummyBackend.DEFAULT_RESULT + assert output_path.read_bytes() == DummyBackend.DEFAULT_RESULT @pytest.mark.parametrize( @@ -216,7 +121,7 @@ def test_download_auto_save_result_with_format( "result": True, }, } - assert output_path.read_bytes() == b"Result data" + assert output_path.read_bytes() == DummyBackend.DEFAULT_RESULT @pytest.mark.parametrize("exec_mode", ["sync", "batch"]) @@ -244,7 +149,7 @@ def test_download_auto_save_result_with_options(vector_cube, dummy_backend, tmp_ "result": True, }, } - assert output_path.read_bytes() == b"Result data" + assert output_path.read_bytes() == DummyBackend.DEFAULT_RESULT @pytest.mark.parametrize( @@ -278,4 +183,60 @@ def test_save_result_and_download( "result": True, }, } - assert output_path.read_bytes() == b"Result data" + assert output_path.read_bytes() == DummyBackend.DEFAULT_RESULT + + +@pytest.mark.parametrize( + "data", + [ + {"type": "Polygon", "coordinates": [[[1, 2], [3, 2], [3, 4], [1, 4], [1, 2]]]}, + """{"type": "Polygon", "coordinates": [[[1, 2], [3, 2], [3, 4], [1, 4], [1, 2]]]}""", + shapely.geometry.Polygon([[1, 2], [3, 2], [3, 4], [1, 4], [1, 2]]), + ], +) +def test_load_geojson_basic(con100, data, dummy_backend): + vc = VectorCube.load_geojson(connection=con100, data=data) + assert isinstance(vc, VectorCube) + vc.execute() + assert dummy_backend.get_pg() == { + "loadgeojson1": { + "process_id": "load_geojson", + "arguments": { + "data": {"type": "Polygon", "coordinates": [[[1, 2], [3, 2], [3, 4], [1, 4], [1, 2]]]}, + "properties": [], + }, + "result": True, + } + } + + +@pytest.mark.parametrize("path_type", [str, Path]) +def test_load_geojson_path(con100, dummy_backend, tmp_path, path_type): + path = tmp_path / "geometry.json" + path.write_text("""{"type": "Polygon", "coordinates": [[[1, 2], [3, 2], [3, 4], [1, 4], [1, 2]]]}""") + vc = VectorCube.load_geojson(connection=con100, data=path_type(path)) + assert isinstance(vc, VectorCube) + vc.execute() + assert dummy_backend.get_pg() == { + "loadgeojson1": { + "process_id": "load_geojson", + "arguments": { + "data": {"type": "Polygon", "coordinates": [[[1, 2], [3, 2], [3, 4], [1, 4], [1, 2]]]}, + "properties": [], + }, + "result": True, + } + } + + +def test_load_geojson_parameter(con100, dummy_backend): + vc = VectorCube.load_geojson(connection=con100, data=Parameter.datacube()) + assert isinstance(vc, VectorCube) + vc.execute() + assert dummy_backend.get_pg() == { + "loadgeojson1": { + "process_id": "load_geojson", + "arguments": {"data": {"from_parameter": "data"}, "properties": []}, + "result": True, + } + } diff --git a/tests/rest/test_connection.py b/tests/rest/test_connection.py index 1b6a0393c..d6dfed25e 100644 --- a/tests/rest/test_connection.py +++ b/tests/rest/test_connection.py @@ -12,6 +12,7 @@ import pytest import requests.auth import requests_mock +import shapely.geometry import openeo from openeo.capabilities import ApiVersionException, ComparableVersion @@ -22,12 +23,12 @@ from openeo.rest.auth.oidc import OidcException from openeo.rest.auth.testing import ABSENT, OidcMock from openeo.rest.connection import ( + DEFAULT_TIMEOUT, + DEFAULT_TIMEOUT_SYNCHRONOUS_EXECUTE, Connection, RestApiConnection, connect, paginate, - DEFAULT_TIMEOUT, - DEFAULT_TIMEOUT_SYNCHRONOUS_EXECUTE, ) from openeo.util import ContextTimer @@ -2359,6 +2360,29 @@ def test_extents(self, con120): } +@pytest.mark.parametrize( + "data", + [ + {"type": "Polygon", "coordinates": [[[1, 2], [3, 2], [3, 4], [1, 4], [1, 2]]]}, + """{"type": "Polygon", "coordinates": [[[1, 2], [3, 2], [3, 4], [1, 4], [1, 2]]]}""", + shapely.geometry.Polygon([[1, 2], [3, 2], [3, 4], [1, 4], [1, 2]]), + ], +) +def test_load_geojson(con100, data, dummy_backend): + vc = con100.load_geojson(data) + vc.execute() + assert dummy_backend.get_pg() == { + "loadgeojson1": { + "process_id": "load_geojson", + "arguments": { + "data": {"type": "Polygon", "coordinates": [[[1, 2], [3, 2], [3, 4], [1, 4], [1, 2]]]}, + "properties": [], + }, + "result": True, + } + } + + def test_list_file_formats(requests_mock): requests_mock.get(API_URL, json={"api_version": "1.0.0"}) conn = Connection(API_URL)