From eb03aa8b7c0fc848c6638a90a561c81b03a078d7 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Tue, 1 Aug 2023 07:47:25 +0200 Subject: [PATCH] PR #200 finetune VectorCube.apply_dimension ref: #197, Open-EO/openeo-geopyspark-driver#437 --- openeo_driver/datacube.py | 73 ++++++++++++++++-------------- openeo_driver/util/pgparsing.py | 45 +++++++++++++++++++ tests/test_vectorcube.py | 54 +++++++++++++++++++++++ tests/util/test_pgparsing.py | 78 +++++++++++++++++++++++++++++++++ 4 files changed, 217 insertions(+), 33 deletions(-) create mode 100644 openeo_driver/util/pgparsing.py create mode 100644 tests/util/test_pgparsing.py diff --git a/openeo_driver/datacube.py b/openeo_driver/datacube.py index 1bd3b441..04c43916 100644 --- a/openeo_driver/datacube.py +++ b/openeo_driver/datacube.py @@ -18,13 +18,14 @@ from openeo.metadata import CollectionMetadata from openeo.util import ensure_dir, str_truncate +import openeo.udf from openeo_driver.datastructs import SarBackscatterArgs, ResolutionMergeArgs, StacAsset from openeo_driver.errors import FeatureUnsupportedException, InternalException from openeo_driver.util.geometry import GeometryBufferer, validate_geojson_coordinates from openeo_driver.util.ioformats import IOFORMATS +from openeo_driver.util.pgparsing import SingleRunUDFProcessGraph from openeo_driver.util.utm import area_in_square_meters from openeo_driver.utils import EvalEnv -from openeogeotrellis.backend import SingleNodeUDFProcessGraphVisitor log = logging.getLogger(__name__) @@ -248,38 +249,6 @@ def with_cube(self, cube: xarray.DataArray, flatten_prefix: str = FLATTEN_PREFIX geometries=self._geometries, cube=cube, flatten_prefix=flatten_prefix ) - def apply_dimension( - self, - process: dict, - *, - dimension: str, - target_dimension: Optional[str] = None, - context: Optional[dict] = None, - env: EvalEnv, - ) -> "DriverVectorCube": - if dimension == "bands" and target_dimension == None and len(process) == 1 and next(iter(process.values())).get('process_id') == 'run_udf': - visitor = SingleNodeUDFProcessGraphVisitor().accept_process_graph(process) - udf = visitor.udf_args.get('udf', None) - - from openeo.udf import FeatureCollection, UdfData - collection = FeatureCollection(id='VectorCollection', data=self._as_geopandas_df()) - data = UdfData( - proj={"EPSG": self._geometries.crs.to_epsg()}, feature_collection_list=[collection], user_context=context - ) - - log.info(f"[run_udf] Running UDF {str_truncate(udf, width=256)!r} on {data!r}") - result_data = env.backend_implementation.processing.run_udf(udf, data) - log.info(f"[run_udf] UDF resulted in {result_data!r}") - - if isinstance(result_data, UdfData): - if(result_data.get_feature_collection_list() is not None and len(result_data.get_feature_collection_list()) == 1): - return DriverVectorCube(geometries=result_data.get_feature_collection_list()[0].data) - - raise ValueError(f"Could not handle UDF result: {result_data}") - - else: - raise FeatureUnsupportedException() - @classmethod def from_fiona( cls, @@ -537,6 +506,44 @@ def buffer_points(self, distance: float = 10) -> "DriverVectorCube": ] ) + def apply_dimension( + self, + process: dict, + *, + dimension: str, + target_dimension: Optional[str] = None, + context: Optional[dict] = None, + env: EvalEnv, + ) -> "DriverVectorCube": + single_run_udf = SingleRunUDFProcessGraph.parse_or_none(process) + + if single_run_udf: + # Process with single "run_udf" node + if self._cube is None and dimension == self.DIM_GEOMETRIES and target_dimension is None: + log.warning( + f"Using experimental feature: DriverVectorCube.apply_dimension along dim {dimension} and empty cube" + ) + # TODO: this is non-standard special case: vector cube with only geometries, but no "cube" data + gdf = self._as_geopandas_df() + feature_collection = openeo.udf.FeatureCollection(id="_", data=gdf) + udf_data = openeo.udf.UdfData( + proj={"EPSG": self._geometries.crs.to_epsg()}, + feature_collection_list=[feature_collection], + user_context=context, + ) + log.info(f"[run_udf] Running UDF {str_truncate(single_run_udf.udf, width=256)!r} on {udf_data!r}") + result_data = env.backend_implementation.processing.run_udf(udf=single_run_udf.udf, data=udf_data) + log.info(f"[run_udf] UDF resulted in {result_data!r}") + + if isinstance(result_data, openeo.udf.UdfData): + result_features = result_data.get_feature_collection_list() + if result_features and len(result_features) == 1: + return DriverVectorCube(geometries=result_features[0].data) + raise ValueError(f"Could not handle UDF result: {result_data}") + + raise FeatureUnsupportedException() + + class DriverMlModel: """Base class for driver-side 'ml-model' data structures""" diff --git a/openeo_driver/util/pgparsing.py b/openeo_driver/util/pgparsing.py new file mode 100644 index 00000000..dcca5b7a --- /dev/null +++ b/openeo_driver/util/pgparsing.py @@ -0,0 +1,45 @@ +import dataclasses +from typing import Optional + + +class NotASingleRunUDFProcessGraph(ValueError): + pass + + +@dataclasses.dataclass(frozen=True) +class SingleRunUDFProcessGraph: + """ + Container (and parser) for a callback process graph containing only a single `run_udf` node. + """ + + data: dict + udf: str + runtime: str + version: Optional[str] = None + context: Optional[dict] = None + + @classmethod + def parse(cls, process_graph: dict) -> "SingleRunUDFProcessGraph": + try: + (node,) = process_graph.values() + assert node["process_id"] == "run_udf" + assert node["result"] is True + arguments = node["arguments"] + assert {"data", "udf", "runtime"}.issubset(arguments.keys()) + + return cls( + data=arguments["data"], + udf=arguments["udf"], + runtime=arguments["runtime"], + version=arguments.get("version"), + context=arguments.get("context") or {}, + ) + except Exception as e: + raise NotASingleRunUDFProcessGraph(str(e)) from e + + @classmethod + def parse_or_none(cls, process_graph: dict) -> Optional["SingleNodeRunUDFProcessGraph"]: + try: + return cls.parse(process_graph=process_graph) + except NotASingleRunUDFProcessGraph: + return None diff --git a/tests/test_vectorcube.py b/tests/test_vectorcube.py index 7c61a0d9..cc2437a8 100644 --- a/tests/test_vectorcube.py +++ b/tests/test_vectorcube.py @@ -1,3 +1,5 @@ +import textwrap + import geopandas as gpd import numpy.testing import pyproj @@ -9,6 +11,7 @@ from openeo_driver.datacube import DriverVectorCube from openeo_driver.testing import DictSubSet, ApproxGeometry from openeo_driver.util.geometry import as_geojson_feature_collection +from openeo_driver.utils import EvalEnv from .data import get_path @@ -22,6 +25,10 @@ def gdf(self) -> gpd.GeoDataFrame: df = gpd.read_file(path) return df + @pytest.fixture + def vc(self, gdf) -> DriverVectorCube: + return DriverVectorCube(geometries=gdf) + def test_basic(self, gdf): vc = DriverVectorCube(gdf) assert vc.get_bounding_box() == (1, 1, 5, 4) @@ -446,3 +453,50 @@ def test_buffer_points(self): ], } ) + + def test_apply_dimension_run_udf(self, vc, backend_implementation): + udf = textwrap.dedent( + """ + from openeo.udf import UdfData, FeatureCollection + def process_geometries(udf_data: UdfData) -> UdfData: + [feature_collection] = udf_data.get_feature_collection_list() + gdf = feature_collection.data + gdf["geometry"] = gdf["geometry"].buffer(distance=1, resolution=2) + udf_data.set_feature_collection_list([ + FeatureCollection(id="_", data=gdf), + ]) + """ + ) + callback = { + "runudf1": { + "process_id": "run_udf", + "arguments": {"data": {"from_parameter": "data"}, "udf": udf, "runtime": "Python"}, + "result": True, + } + } + env = EvalEnv({"backend_implementation": backend_implementation}) + result = vc.apply_dimension(process=callback, dimension="geometries", env=env) + assert isinstance(result, DriverVectorCube) + feature_collection = result.to_geojson() + assert feature_collection == DictSubSet( + { + "type": "FeatureCollection", + "bbox": pytest.approx((0, 0, 6, 5), abs=0.1), + "features": [ + { + "type": "Feature", + "bbox": pytest.approx((0, 0, 4, 4), abs=0.1), + "geometry": DictSubSet({"type": "Polygon"}), + "id": "0", + "properties": {"id": "first", "pop": 1234}, + }, + { + "type": "Feature", + "bbox": pytest.approx((2, 1, 6, 5), abs=0.1), + "geometry": DictSubSet({"type": "Polygon"}), + "id": "1", + "properties": {"id": "second", "pop": 5678}, + }, + ], + } + ) diff --git a/tests/util/test_pgparsing.py b/tests/util/test_pgparsing.py new file mode 100644 index 00000000..a11f80c7 --- /dev/null +++ b/tests/util/test_pgparsing.py @@ -0,0 +1,78 @@ +import pytest + +from openeo_driver.util.pgparsing import SingleRunUDFProcessGraph, NotASingleRunUDFProcessGraph + + +class TestSingleRunUDFProcessGraph: + def test_parse_basic(self): + pg = { + "runudf1": { + "process_id": "run_udf", + "arguments": { + "data": {"from_parameter": "data"}, + "udf": "print('Hello world')", + "runtime": "Python", + }, + "result": True, + } + } + run_udf = SingleRunUDFProcessGraph.parse(pg) + assert run_udf.data == {"from_parameter": "data"} + assert run_udf.udf == "print('Hello world')" + assert run_udf.runtime == "Python" + assert run_udf.version is None + assert run_udf.context == {} + + @pytest.mark.parametrize( + "pg", + [ + { + "runudf1": { + "process_id": "run_udffffffffffffffff", + "arguments": {"data": {"from_parameter": "data"}, "udf": "x = 4", "runtime": "Python"}, + "result": True, + } + }, + { + "runudf1": { + "process_id": "run_udf", + "arguments": {"udf": "x = 4", "runtime": "Python"}, + "result": True, + } + }, + { + "runudf1": { + "process_id": "run_udf", + "arguments": {"data": {"from_parameter": "data"}, "runtime": "Python"}, + "result": True, + } + }, + { + "runudf1": { + "process_id": "run_udf", + "arguments": {"data": {"from_parameter": "data"}, "udf": "x = 4"}, + "result": True, + } + }, + { + "runudf1": { + "process_id": "run_udf", + "arguments": {"data": {"from_parameter": "data"}, "udf": "x = 4", "runtime": "Python"}, + } + }, + { + "runudf1": { + "process_id": "run_udf", + "arguments": {"data": {"from_parameter": "data"}, "udf": "x = 4", "runtime": "Python"}, + "result": True, + }, + "runudf2": { + "process_id": "run_udf", + "arguments": {"data": {"from_parameter": "data"}, "udf": "x = 4", "runtime": "Python"}, + }, + }, + ], + ) + def test_parse_invalid(self, pg): + with pytest.raises(NotASingleRunUDFProcessGraph): + _ = SingleRunUDFProcessGraph.parse(pg)