Skip to content

Commit

Permalink
PR #200 finetune VectorCube.apply_dimension
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Aug 3, 2023
1 parent acd2e7a commit eb03aa8
Show file tree
Hide file tree
Showing 4 changed files with 217 additions and 33 deletions.
73 changes: 40 additions & 33 deletions openeo_driver/datacube.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@

from openeo.metadata import CollectionMetadata
from openeo.util import ensure_dir, str_truncate
import openeo.udf
from openeo_driver.datastructs import SarBackscatterArgs, ResolutionMergeArgs, StacAsset
from openeo_driver.errors import FeatureUnsupportedException, InternalException
from openeo_driver.util.geometry import GeometryBufferer, validate_geojson_coordinates
from openeo_driver.util.ioformats import IOFORMATS
from openeo_driver.util.pgparsing import SingleRunUDFProcessGraph
from openeo_driver.util.utm import area_in_square_meters
from openeo_driver.utils import EvalEnv
from openeogeotrellis.backend import SingleNodeUDFProcessGraphVisitor

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -248,38 +249,6 @@ def with_cube(self, cube: xarray.DataArray, flatten_prefix: str = FLATTEN_PREFIX
geometries=self._geometries, cube=cube, flatten_prefix=flatten_prefix
)

def apply_dimension(
self,
process: dict,
*,
dimension: str,
target_dimension: Optional[str] = None,
context: Optional[dict] = None,
env: EvalEnv,
) -> "DriverVectorCube":
if dimension == "bands" and target_dimension == None and len(process) == 1 and next(iter(process.values())).get('process_id') == 'run_udf':
visitor = SingleNodeUDFProcessGraphVisitor().accept_process_graph(process)
udf = visitor.udf_args.get('udf', None)

from openeo.udf import FeatureCollection, UdfData
collection = FeatureCollection(id='VectorCollection', data=self._as_geopandas_df())
data = UdfData(
proj={"EPSG": self._geometries.crs.to_epsg()}, feature_collection_list=[collection], user_context=context
)

log.info(f"[run_udf] Running UDF {str_truncate(udf, width=256)!r} on {data!r}")
result_data = env.backend_implementation.processing.run_udf(udf, data)
log.info(f"[run_udf] UDF resulted in {result_data!r}")

if isinstance(result_data, UdfData):
if(result_data.get_feature_collection_list() is not None and len(result_data.get_feature_collection_list()) == 1):
return DriverVectorCube(geometries=result_data.get_feature_collection_list()[0].data)

raise ValueError(f"Could not handle UDF result: {result_data}")

else:
raise FeatureUnsupportedException()

@classmethod
def from_fiona(
cls,
Expand Down Expand Up @@ -537,6 +506,44 @@ def buffer_points(self, distance: float = 10) -> "DriverVectorCube":
]
)

def apply_dimension(
self,
process: dict,
*,
dimension: str,
target_dimension: Optional[str] = None,
context: Optional[dict] = None,
env: EvalEnv,
) -> "DriverVectorCube":
single_run_udf = SingleRunUDFProcessGraph.parse_or_none(process)

if single_run_udf:
# Process with single "run_udf" node
if self._cube is None and dimension == self.DIM_GEOMETRIES and target_dimension is None:
log.warning(
f"Using experimental feature: DriverVectorCube.apply_dimension along dim {dimension} and empty cube"
)
# TODO: this is non-standard special case: vector cube with only geometries, but no "cube" data
gdf = self._as_geopandas_df()
feature_collection = openeo.udf.FeatureCollection(id="_", data=gdf)
udf_data = openeo.udf.UdfData(
proj={"EPSG": self._geometries.crs.to_epsg()},
feature_collection_list=[feature_collection],
user_context=context,
)
log.info(f"[run_udf] Running UDF {str_truncate(single_run_udf.udf, width=256)!r} on {udf_data!r}")
result_data = env.backend_implementation.processing.run_udf(udf=single_run_udf.udf, data=udf_data)
log.info(f"[run_udf] UDF resulted in {result_data!r}")

if isinstance(result_data, openeo.udf.UdfData):
result_features = result_data.get_feature_collection_list()
if result_features and len(result_features) == 1:
return DriverVectorCube(geometries=result_features[0].data)
raise ValueError(f"Could not handle UDF result: {result_data}")

raise FeatureUnsupportedException()



class DriverMlModel:
"""Base class for driver-side 'ml-model' data structures"""
Expand Down
45 changes: 45 additions & 0 deletions openeo_driver/util/pgparsing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import dataclasses
from typing import Optional


class NotASingleRunUDFProcessGraph(ValueError):
pass


@dataclasses.dataclass(frozen=True)
class SingleRunUDFProcessGraph:
"""
Container (and parser) for a callback process graph containing only a single `run_udf` node.
"""

data: dict
udf: str
runtime: str
version: Optional[str] = None
context: Optional[dict] = None

@classmethod
def parse(cls, process_graph: dict) -> "SingleRunUDFProcessGraph":
try:
(node,) = process_graph.values()
assert node["process_id"] == "run_udf"
assert node["result"] is True
arguments = node["arguments"]
assert {"data", "udf", "runtime"}.issubset(arguments.keys())

return cls(
data=arguments["data"],
udf=arguments["udf"],
runtime=arguments["runtime"],
version=arguments.get("version"),
context=arguments.get("context") or {},
)
except Exception as e:
raise NotASingleRunUDFProcessGraph(str(e)) from e

@classmethod
def parse_or_none(cls, process_graph: dict) -> Optional["SingleNodeRunUDFProcessGraph"]:
try:
return cls.parse(process_graph=process_graph)
except NotASingleRunUDFProcessGraph:
return None
54 changes: 54 additions & 0 deletions tests/test_vectorcube.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import textwrap

import geopandas as gpd
import numpy.testing
import pyproj
Expand All @@ -9,6 +11,7 @@
from openeo_driver.datacube import DriverVectorCube
from openeo_driver.testing import DictSubSet, ApproxGeometry
from openeo_driver.util.geometry import as_geojson_feature_collection
from openeo_driver.utils import EvalEnv

from .data import get_path

Expand All @@ -22,6 +25,10 @@ def gdf(self) -> gpd.GeoDataFrame:
df = gpd.read_file(path)
return df

@pytest.fixture
def vc(self, gdf) -> DriverVectorCube:
return DriverVectorCube(geometries=gdf)

def test_basic(self, gdf):
vc = DriverVectorCube(gdf)
assert vc.get_bounding_box() == (1, 1, 5, 4)
Expand Down Expand Up @@ -446,3 +453,50 @@ def test_buffer_points(self):
],
}
)

def test_apply_dimension_run_udf(self, vc, backend_implementation):
udf = textwrap.dedent(
"""
from openeo.udf import UdfData, FeatureCollection
def process_geometries(udf_data: UdfData) -> UdfData:
[feature_collection] = udf_data.get_feature_collection_list()
gdf = feature_collection.data
gdf["geometry"] = gdf["geometry"].buffer(distance=1, resolution=2)
udf_data.set_feature_collection_list([
FeatureCollection(id="_", data=gdf),
])
"""
)
callback = {
"runudf1": {
"process_id": "run_udf",
"arguments": {"data": {"from_parameter": "data"}, "udf": udf, "runtime": "Python"},
"result": True,
}
}
env = EvalEnv({"backend_implementation": backend_implementation})
result = vc.apply_dimension(process=callback, dimension="geometries", env=env)
assert isinstance(result, DriverVectorCube)
feature_collection = result.to_geojson()
assert feature_collection == DictSubSet(
{
"type": "FeatureCollection",
"bbox": pytest.approx((0, 0, 6, 5), abs=0.1),
"features": [
{
"type": "Feature",
"bbox": pytest.approx((0, 0, 4, 4), abs=0.1),
"geometry": DictSubSet({"type": "Polygon"}),
"id": "0",
"properties": {"id": "first", "pop": 1234},
},
{
"type": "Feature",
"bbox": pytest.approx((2, 1, 6, 5), abs=0.1),
"geometry": DictSubSet({"type": "Polygon"}),
"id": "1",
"properties": {"id": "second", "pop": 5678},
},
],
}
)
78 changes: 78 additions & 0 deletions tests/util/test_pgparsing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import pytest

from openeo_driver.util.pgparsing import SingleRunUDFProcessGraph, NotASingleRunUDFProcessGraph


class TestSingleRunUDFProcessGraph:
def test_parse_basic(self):
pg = {
"runudf1": {
"process_id": "run_udf",
"arguments": {
"data": {"from_parameter": "data"},
"udf": "print('Hello world')",
"runtime": "Python",
},
"result": True,
}
}
run_udf = SingleRunUDFProcessGraph.parse(pg)
assert run_udf.data == {"from_parameter": "data"}
assert run_udf.udf == "print('Hello world')"
assert run_udf.runtime == "Python"
assert run_udf.version is None
assert run_udf.context == {}

@pytest.mark.parametrize(
"pg",
[
{
"runudf1": {
"process_id": "run_udffffffffffffffff",
"arguments": {"data": {"from_parameter": "data"}, "udf": "x = 4", "runtime": "Python"},
"result": True,
}
},
{
"runudf1": {
"process_id": "run_udf",
"arguments": {"udf": "x = 4", "runtime": "Python"},
"result": True,
}
},
{
"runudf1": {
"process_id": "run_udf",
"arguments": {"data": {"from_parameter": "data"}, "runtime": "Python"},
"result": True,
}
},
{
"runudf1": {
"process_id": "run_udf",
"arguments": {"data": {"from_parameter": "data"}, "udf": "x = 4"},
"result": True,
}
},
{
"runudf1": {
"process_id": "run_udf",
"arguments": {"data": {"from_parameter": "data"}, "udf": "x = 4", "runtime": "Python"},
}
},
{
"runudf1": {
"process_id": "run_udf",
"arguments": {"data": {"from_parameter": "data"}, "udf": "x = 4", "runtime": "Python"},
"result": True,
},
"runudf2": {
"process_id": "run_udf",
"arguments": {"data": {"from_parameter": "data"}, "udf": "x = 4", "runtime": "Python"},
},
},
],
)
def test_parse_invalid(self, pg):
with pytest.raises(NotASingleRunUDFProcessGraph):
_ = SingleRunUDFProcessGraph.parse(pg)

0 comments on commit eb03aa8

Please sign in to comment.