Skip to content

Commit

Permalink
propose run udf
Browse files Browse the repository at this point in the history
  • Loading branch information
jdries committed Jun 1, 2023
1 parent fdf43b0 commit fd7eead
Showing 1 changed file with 34 additions and 1 deletion.
35 changes: 34 additions & 1 deletion openeo_driver/datacube.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@
import requests

from openeo.metadata import CollectionMetadata
from openeo.util import ensure_dir
from openeo.util import ensure_dir, str_truncate
from openeo_driver.datastructs import SarBackscatterArgs, ResolutionMergeArgs, StacAsset
from openeo_driver.errors import FeatureUnsupportedException, InternalException
from openeo_driver.util.geometry import GeometryBufferer, validate_geojson_coordinates
from openeo_driver.util.ioformats import IOFORMATS
from openeo_driver.util.utm import area_in_square_meters
from openeo_driver.utils import EvalEnv
from openeogeotrellis.backend import SingleNodeUDFProcessGraphVisitor

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -224,6 +225,38 @@ def with_cube(self, cube: xarray.DataArray, flatten_prefix: str = FLATTEN_PREFIX
geometries=self._geometries, cube=cube, flatten_prefix=flatten_prefix
)

def apply_dimension(
self,
process: dict,
*,
dimension: str,
target_dimension: Optional[str] = None,
context: Optional[dict] = None,
env: EvalEnv,
) -> "DriverVectorCube":
if dimension == "bands" and target_dimension == None and len(process) == 1 and next(iter(process.values())).get('process_id') == 'run_udf':
visitor = SingleNodeUDFProcessGraphVisitor().accept_process_graph(process)
udf = visitor.udf_args.get('udf', None)

from openeo.udf import FeatureCollection, UdfData
collection = FeatureCollection(id='VectorCollection', data=self._as_geopandas_df())
data = UdfData(
proj={"EPSG": self._geometries.crs.to_epsg()}, feature_collection_list=[collection], user_context=context
)

log.info(f"[run_udf] Running UDF {str_truncate(udf, width=256)!r} on {data!r}")
result_data = env.backend_implementation.processing.run_udf(udf, data)
log.info(f"[run_udf] UDF resulted in {result_data!r}")

if isinstance(result_data, UdfData):
if(result_data.get_feature_collection_list() is not None and len(result_data.get_feature_collection_list()) == 1):
return DriverVectorCube(geometries=result_data.get_feature_collection_list()[0].data)

raise ValueError(f"Could not handle UDF result: {result_data}")

else:
raise FeatureUnsupportedException()

@classmethod
def from_fiona(
cls,
Expand Down

0 comments on commit fd7eead

Please sign in to comment.