Skip to content

Commit

Permalink
Issue #424 added initial support for load_geojson
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Aug 7, 2023
1 parent cfdb3e3 commit 357d3cc
Show file tree
Hide file tree
Showing 9 changed files with 306 additions and 112 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Initial `load_geojson` support with `Connection.load_geojson()` ([#424](https://github.com/Open-EO/openeo-python-client/issues/424))

### Changed

- `Connection` based requests: always use finite timeouts by default (20 minutes in general, 30 minutes for synchronous execute requests)
Expand Down
13 changes: 13 additions & 0 deletions openeo/api/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,19 @@ def raster_cube(cls, name: str = "data", description: str = "A data cube.") -> '
"""
return cls(name=name, description=description, schema={"type": "object", "subtype": "raster-cube"})

@classmethod
def datacube(cls, name: str = "data", description: str = "A data cube.") -> "Parameter":
"""
Helper to easily create a 'datacube' parameter.
:param name: name of the parameter.
:param description: description of the parameter
:return: Parameter
.. versionadded:: 0.22.0
"""
return cls(name=name, description=description, schema={"type": "object", "subtype": "datacube"})

@classmethod
def string(cls, name: str, description: str = None, default=_DEFAULT_UNDEFINED, values=None) -> 'Parameter':
"""Helper to create a 'string' type parameter."""
Expand Down
98 changes: 98 additions & 0 deletions openeo/rest/_testing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import re

from openeo import Connection


class DummyBackend:
"""
Dummy backend that handles sync/batch execution requests
and allows inspection of posted process graphs
"""

# Default result (can serve both as JSON or binary data)
DEFAULT_RESULT = b'{"what?": "Result data"}'

def __init__(self, requests_mock, connection: Connection):
self.connection = connection
self.sync_requests = []
self.batch_jobs = {}
self.next_result = self.DEFAULT_RESULT
requests_mock.post(connection.build_url("/result"), content=self._handle_post_result)
requests_mock.post(connection.build_url("/jobs"), content=self._handle_post_jobs)
requests_mock.post(
re.compile(connection.build_url(r"/jobs/(job-\d+)/results$")), content=self._handle_post_job_results
)
requests_mock.get(re.compile(connection.build_url(r"/jobs/(job-\d+)$")), json=self._handle_get_job)
requests_mock.get(
re.compile(connection.build_url(r"/jobs/(job-\d+)/results$")), json=self._handle_get_job_results
)
requests_mock.get(
re.compile(connection.build_url("/jobs/(.*?)/results/result.data$")),
content=self._handle_get_job_result_asset,
)

def _handle_post_result(self, request, context):
"""handler of `POST /result` (synchronous execute)"""
pg = request.json()["process"]["process_graph"]
self.sync_requests.append(pg)
return self.next_result

def _handle_post_jobs(self, request, context):
"""handler of `POST /jobs` (create batch job)"""
pg = request.json()["process"]["process_graph"]
job_id = f"job-{len(self.batch_jobs):03d}"
self.batch_jobs[job_id] = {"job_id": job_id, "pg": pg, "status": "created"}
context.status_code = 201
context.headers["openeo-identifier"] = job_id

def _get_job_id(self, request) -> str:
match = re.match(r"^/jobs/(job-\d+)(/|$)", request.path)
if not match:
raise ValueError(f"Failed to extract job_id from {request.path}")
job_id = match.group(1)
assert job_id in self.batch_jobs
return job_id

def _handle_post_job_results(self, request, context):
"""Handler of `POST /job/{job_id}/results` (start batch job)."""
job_id = self._get_job_id(request)
assert self.batch_jobs[job_id]["status"] == "created"
# TODO: support custom status sequence (instead of directly going to status "finished")?
self.batch_jobs[job_id]["status"] = "finished"
context.status_code = 202

def _handle_get_job(self, request, context):
"""Handler of `GET /job/{job_id}` (get batch job status and metadata)."""
job_id = self._get_job_id(request)
return {"id": job_id, "status": self.batch_jobs[job_id]["status"]}

def _handle_get_job_results(self, request, context):
"""Handler of `GET /job/{job_id}/results` (list batch job results)."""
job_id = self._get_job_id(request)
assert self.batch_jobs[job_id]["status"] == "finished"
return {
"id": job_id,
"assets": {"result.data": {"href": self.connection.build_url(f"/jobs/{job_id}/results/result.data")}},
}

def _handle_get_job_result_asset(self, request, context):
"""Handler of `GET /job/{job_id}/results/result.data` (get batch job result asset)."""
job_id = self._get_job_id(request)
assert self.batch_jobs[job_id]["status"] == "finished"
return self.next_result

def get_sync_pg(self) -> dict:
"""Get one and only synchronous process graph"""
assert len(self.sync_requests) == 1
return self.sync_requests[0]

def get_batch_pg(self) -> dict:
"""Get one and only batch process graph"""
assert len(self.batch_jobs) == 1
return self.batch_jobs[max(self.batch_jobs.keys())]["pg"]

def get_pg(self) -> dict:
"""Get one and only batch process graph (sync or batch)"""
pgs = self.sync_requests + [b["pg"] for b in self.batch_jobs.values()]
assert len(pgs) == 1
return pgs[0]
36 changes: 34 additions & 2 deletions openeo/rest/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
import requests
from requests import Response
from requests.auth import HTTPBasicAuth, AuthBase
import shapely.geometry.base

import openeo
from openeo.capabilities import ApiVersionException, ComparableVersion
from openeo.config import get_config_option, config_log
from openeo.internal.documentation import openeo_process
from openeo.internal.graph_building import PGNode, as_flat_graph, FlatGraphableMixin
from openeo.internal.jupyter import VisualDict, VisualList
from openeo.internal.processes.builder import ProcessBuilderBase
Expand Down Expand Up @@ -1095,12 +1097,13 @@ def datacube_from_json(self, src: Union[str, Path], parameters: Optional[dict] =
"""
return self.datacube_from_flat_graph(load_json_resource(src), parameters=parameters)

@openeo_process
def load_collection(
self,
collection_id: str,
spatial_extent: Optional[Dict[str, float]] = None,
temporal_extent: Optional[List[Union[str, datetime.datetime, datetime.date]]] = None,
bands: Optional[List[str]] = None,
temporal_extent: Optional[List[Union[str, datetime.datetime, datetime.date]]] = None,
bands: Optional[List[str]] = None,
properties: Optional[Dict[str, Union[str, PGNode, Callable]]] = None,
max_cloud_cover: Optional[float] = None,
fetch_metadata=True,
Expand Down Expand Up @@ -1131,6 +1134,7 @@ def load_collection(
load_collection, name="imagecollection", since="0.4.10"
)

@openeo_process
def load_result(
self,
id: str,
Expand Down Expand Up @@ -1168,6 +1172,7 @@ def load_result(
cube.metadata = metadata
return cube

@openeo_process
def load_stac(
self,
url: str,
Expand Down Expand Up @@ -1305,6 +1310,33 @@ def load_ml_model(self, id: Union[str, BatchJob]) -> "MlModel":
"""
return MlModel.load_ml_model(connection=self, id=id)

@openeo_process
def load_geojson(
self,
data: Union[dict, str, Path, shapely.geometry.base.BaseGeometry, Parameter],
properties: Optional[List[str]] = None,
):
"""
Converts GeoJSON data as defined by RFC 7946 into a vector data cube.
:param connection: the connection to use to connect with the openEO back-end.
:param data: the geometry to load. One of:
- GeoJSON-style data structure: e.g. a dictionary with ``"type": "Polygon"`` and ``"coordinates"`` fields
- a path to a local GeoJSON file
- a GeoJSON string
- a shapely geometry object
:param properties: A list of properties from the GeoJSON file to construct an additional dimension from.
:return: new VectorCube instance
.. warning:: EXPERIMENTAL: this process is experimental with the potential for major things to change.
.. versionadded:: 0.22.0
"""

return VectorCube.load_geojson(connection=self, data=data, properties=properties)

def create_service(self, graph: dict, type: str, **kwargs) -> Service:
# TODO: type hint for graph: is it a nested or a flat one?
req = self._build_request_with_process_graph(process_graph=graph, type=type, **kwargs)
Expand Down
59 changes: 55 additions & 4 deletions openeo/rest/vectorcube.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
import json
import pathlib
import typing
from typing import Union, Optional
from typing import List, Optional, Union

import shapely.geometry.base

from openeo.api.process import Parameter
from openeo.internal.documentation import openeo_process
from openeo.internal.graph_building import PGNode
from openeo.internal.warnings import legacy_alias
from openeo.metadata import CollectionMetadata
from openeo.rest._datacube import _ProcessGraphAbstraction, UDF
from openeo.rest.mlmodel import MlModel
from openeo.rest._datacube import UDF, _ProcessGraphAbstraction
from openeo.rest.job import BatchJob
from openeo.rest.mlmodel import MlModel
from openeo.util import dict_no_none, guess_format

if typing.TYPE_CHECKING:
Expand Down Expand Up @@ -42,11 +46,58 @@ def process(
:param process_id: process id of the process.
:param args: argument dictionary for the process.
:return: new DataCube instance
:return: new VectorCube instance
"""
pg = self._build_pgnode(process_id=process_id, arguments=arguments, namespace=namespace, **kwargs)
return VectorCube(graph=pg, connection=self._connection, metadata=metadata or self.metadata)

@classmethod
@openeo_process
def load_geojson(
cls,
connection: "openeo.Connection",
data: Union[dict, str, pathlib.Path, shapely.geometry.base.BaseGeometry, Parameter],
properties: Optional[List[str]] = None,
):
"""
Converts GeoJSON data as defined by RFC 7946 into a vector data cube.
:param connection: the connection to use to connect with the openEO back-end.
:param data: the geometry to load. One of:
- GeoJSON-style data structure: e.g. a dictionary with ``"type": "Polygon"`` and ``"coordinates"`` fields
- a path to a local GeoJSON file
- a GeoJSON string
- a shapely geometry object
:param properties: A list of properties from the GeoJSON file to construct an additional dimension from.
:return: new VectorCube instance
.. warning:: EXPERIMENTAL: this process is experimental with the potential for major things to change.
.. versionadded:: 0.22.0
"""
# TODO: unify with `DataCube._get_geometry_argument`
if isinstance(data, str) and data.strip().startswith("{"):
# Assume JSON dump
geometry = json.loads(data)
elif isinstance(data, (str, pathlib.Path)):
# Assume local file
with pathlib.Path(data).open(mode="r", encoding="utf-8") as f:
geometry = json.load(f)
assert isinstance(geometry, dict)
elif isinstance(data, shapely.geometry.base.BaseGeometry):
geometry = shapely.geometry.mapping(data)
elif isinstance(data, Parameter):
geometry = data
elif isinstance(data, dict):
geometry = data
else:
raise ValueError(data)

pg = PGNode(process_id="load_geojson", data=geometry, properties=properties or [])
return cls(graph=pg, connection=connection)

@openeo_process
def run_udf(
self,
Expand Down
2 changes: 1 addition & 1 deletion openeo/udf/udf_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __init__(
The constructor of the UDF argument class that stores all data required by the
user defined function.
:param proj: A dictionary of form {"proj type string": "projection description"} i. e. {"EPSG":4326}
:param proj: A dictionary of form {"proj type string": "projection description"} e.g. {"EPSG": 4326}
:param datacube_list: A list of data cube objects
:param feature_collection_list: A list of VectorTile objects
:param structured_data_list: A list of structured data objects
Expand Down
13 changes: 13 additions & 0 deletions tests/rest/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import pytest
import time_machine

from openeo.rest._testing import DummyBackend
from openeo.rest.connection import Connection

API_URL = "https://oeo.test/"
Expand Down Expand Up @@ -71,8 +72,20 @@ def assert_oidc_device_code_flow(url: str = "https://oidc.test/dc", elapsed: flo
return assert_oidc_device_code_flow


@pytest.fixture
def con100(requests_mock):
requests_mock.get(API_URL, json={"api_version": "1.0.0"})
con = Connection(API_URL)
return con


@pytest.fixture
def con120(requests_mock):
requests_mock.get(API_URL, json={"api_version": "1.2.0"})
con = Connection(API_URL)
return con


@pytest.fixture
def dummy_backend(requests_mock, con100) -> DummyBackend:
yield DummyBackend(requests_mock=requests_mock, connection=con100)
Loading

0 comments on commit 357d3cc

Please sign in to comment.