From fd7eead2dd5e0393986f7e36b9acec8bbf4be811 Mon Sep 17 00:00:00 2001 From: Jeroen Dries Date: Thu, 1 Jun 2023 19:59:04 +0200 Subject: [PATCH] propose run udf https://github.com/Open-EO/openeo-geopyspark-driver/issues/437 --- openeo_driver/datacube.py | 35 ++++++++++++++++++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/openeo_driver/datacube.py b/openeo_driver/datacube.py index d526451e..d3c5b86d 100644 --- a/openeo_driver/datacube.py +++ b/openeo_driver/datacube.py @@ -17,13 +17,14 @@ import requests from openeo.metadata import CollectionMetadata -from openeo.util import ensure_dir +from openeo.util import ensure_dir, str_truncate from openeo_driver.datastructs import SarBackscatterArgs, ResolutionMergeArgs, StacAsset from openeo_driver.errors import FeatureUnsupportedException, InternalException from openeo_driver.util.geometry import GeometryBufferer, validate_geojson_coordinates from openeo_driver.util.ioformats import IOFORMATS from openeo_driver.util.utm import area_in_square_meters from openeo_driver.utils import EvalEnv +from openeogeotrellis.backend import SingleNodeUDFProcessGraphVisitor log = logging.getLogger(__name__) @@ -224,6 +225,38 @@ def with_cube(self, cube: xarray.DataArray, flatten_prefix: str = FLATTEN_PREFIX geometries=self._geometries, cube=cube, flatten_prefix=flatten_prefix ) + def apply_dimension( + self, + process: dict, + *, + dimension: str, + target_dimension: Optional[str] = None, + context: Optional[dict] = None, + env: EvalEnv, + ) -> "DriverVectorCube": + if dimension == "bands" and target_dimension == None and len(process) == 1 and next(iter(process.values())).get('process_id') == 'run_udf': + visitor = SingleNodeUDFProcessGraphVisitor().accept_process_graph(process) + udf = visitor.udf_args.get('udf', None) + + from openeo.udf import FeatureCollection, UdfData + collection = FeatureCollection(id='VectorCollection', data=self._as_geopandas_df()) + data = UdfData( + proj={"EPSG": self._geometries.crs.to_epsg()}, feature_collection_list=[collection], user_context=context + ) + + log.info(f"[run_udf] Running UDF {str_truncate(udf, width=256)!r} on {data!r}") + result_data = env.backend_implementation.processing.run_udf(udf, data) + log.info(f"[run_udf] UDF resulted in {result_data!r}") + + if isinstance(result_data, UdfData): + if(result_data.get_feature_collection_list() is not None and len(result_data.get_feature_collection_list()) == 1): + return DriverVectorCube(geometries=result_data.get_feature_collection_list()[0].data) + + raise ValueError(f"Could not handle UDF result: {result_data}") + + else: + raise FeatureUnsupportedException() + @classmethod def from_fiona( cls,