Skip to content

Commit

Permalink
Merge pull request #59 from Open-EO/feature/data_fusion_support
Browse files Browse the repository at this point in the history
Feature - basic data fusion support
  • Loading branch information
zansinergise authored Nov 23, 2023
2 parents 2b2b224 + edf7e8f commit 3d643dc
Show file tree
Hide file tree
Showing 8 changed files with 542 additions and 426 deletions.
3 changes: 2 additions & 1 deletion rest/Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ openeo-pg-parser = {editable = true,git = "https://github.com/Ardweaden/openeo-p
requests = "==2.27.1"
flask-cors = "==3.0.10"
honeycomb-beeline = "==3.2.0"
pg-to-evalscript = "==0.2.8"
pg-to-evalscript = "==0.2.9"
sentinelhub = "==3.9.2"
isodate = "==0.6.1"
pyjwt = {extras = ["crypto"], version = "==2.3.0"}
Expand All @@ -33,4 +33,5 @@ placebo = "==0.9.0"
rioxarray = "*"
zarr = "*"
netcdf4 = "*"
pip-tools = "==6.1.0"
werkzeug = "==2.3.7"
601 changes: 303 additions & 298 deletions rest/Pipfile.lock

Large diffs are not rendered by default.

12 changes: 12 additions & 0 deletions rest/openeoerrors.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,15 @@ class InsufficientCredits(SHOpenEOError):
error_code = "InsufficientCredits"
http_code = 402
message = "You do not have enough credits to perform this request. Please visit https://openeo.vito.be/portal/pages/pricing for more information on how to purchase additional credits."


class DataFusionNotPossibleDifferentSHDeployments(SHOpenEOError):
error_code = "DataFusionNotPossible"
http_code = 400
message = "Data fusion not available with selected collections because they are not all available on the same SH deployment."


class DataFusionNotPossibleDifferentSpatialExtents(SHOpenEOError):
error_code = "DataFusionNotPossibleDifferentSpatialExtents"
http_code = 400
message = "Data fusion is possible only if all load_collection processes have the same spatial extent."
219 changes: 147 additions & 72 deletions rest/processing/process.py

Large diffs are not rendered by default.

14 changes: 10 additions & 4 deletions rest/processing/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,12 +187,18 @@ def get_batch_job_estimate(batch_request_id, process, deployment_endpoint):
user_defined_processes=user_defined_processes_graphs,
request_type=ProcessingRequestTypes.BATCH,
)
temporal_interval = p.get_temporal_interval(in_days=True)
temporal_intervals = p.get_temporal_intervals(in_days=True)
average_temporal_interval = 0
for node_id, temporal_interval in temporal_intervals.items():
if temporal_interval is None:
temporal_interval = default_temporal_interval

if temporal_interval is None:
temporal_interval = default_temporal_interval
average_temporal_interval += temporal_interval

estimated_pu = estimate_secure_factor * batch_request.value_estimate * default_temporal_interval / temporal_interval
average_temporal_interval = average_temporal_interval / len(temporal_intervals)
estimated_pu = (
estimate_secure_factor * batch_request.value_estimate * default_temporal_interval / average_temporal_interval
)

n_pixels = batch_request.tile_count * batch_request.tile_width_px * batch_request.tile_height_px
estimated_file_size = p.estimate_file_size(n_pixels=n_pixels)
Expand Down
59 changes: 27 additions & 32 deletions rest/processing/sentinel_hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@ def create_processing_request(
bbox=None,
geometry=None,
epsg_code=None,
collection=None,
collections=None,
evalscript=None,
from_date=None,
to_date=None,
width=None,
height=None,
mimetype=None,
Expand All @@ -44,51 +42,53 @@ def create_processing_request(
bbox=bbox,
geometry=geometry,
epsg_code=epsg_code,
collection=collection,
collections=collections,
evalscript=evalscript,
from_date=from_date,
to_date=to_date,
width=width,
height=height,
mimetype=mimetype,
resampling_method=resampling_method,
)

return ProcessingAPIRequest(
f"{collection.service_url}/api/v1/process", request_raw_dict, user=self.user
).fetch()
f"{list(collections.values())[0]['data_collection'].service_url}/api/v1/process",
request_raw_dict,
user=self.user,
).fetch() # fix this - should this always be SentinelhubDeployments.MAIN as it will then also work for cross-deployment data fusion?

def get_request_dictionary(
self,
bbox=None,
geometry=None,
epsg_code=None,
collection=None,
collections=None,
evalscript=None,
from_date=None,
to_date=None,
width=None,
height=None,
mimetype=None,
resampling_method=None,
preview_mode="EXTENDED_PREVIEW",
):
request_data_items = []
for node_id, collection in collections.items():
request_data_items.append(
{
"id": node_id,
"type": collection["data_collection"].api_id,
"dataFilter": {
"timeRange": {
"from": collection["from_time"].isoformat(),
"to": collection["to_time"].isoformat(),
},
"previewMode": preview_mode,
},
"processing": self.construct_data_processing(resampling_method),
}
)

return {
"input": {
"bounds": self.construct_input_bounds(bbox, epsg_code, geometry),
"data": [
{
"type": collection.api_id,
"dataFilter": {
"timeRange": {
"from": from_date.isoformat(),
"to": to_date.isoformat(),
},
"previewMode": preview_mode,
},
"processing": self.construct_data_processing(resampling_method),
}
],
"data": request_data_items,
},
"output": self.construct_output(width, height, mimetype),
"evalscript": evalscript,
Expand Down Expand Up @@ -128,10 +128,8 @@ def create_batch_job(
bbox=None,
geometry=None,
epsg_code=None,
collection=None,
collections=None,
evalscript=None,
from_date=None,
to_date=None,
tiling_grid_id=None,
tiling_grid_resolution=None,
mimetype=None,
Expand All @@ -141,15 +139,12 @@ def create_batch_job(
bbox=bbox,
geometry=geometry,
epsg_code=epsg_code,
collection=collection,
collections=collections,
evalscript=evalscript,
from_date=from_date,
to_date=to_date,
mimetype=mimetype,
resampling_method=resampling_method,
preview_mode="DETAIL",
)

batch_request = self.batch.create(
request_raw_dict,
tiling_grid=SentinelHubBatch.tiling_grid(
Expand Down
9 changes: 9 additions & 0 deletions rest/processing/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,15 @@ def get_node_by_process_id(process_graph, process_id):
return node


def get_all_load_collection_nodes(process_graph):
nodes = {}
for node_id, node in process_graph.items():
if node["process_id"] == "load_collection":
nodes[node_id] = node

return nodes


def overwrite_spatial_extent_without_parameters(process_graph):
# https://github.com/Open-EO/openeo-web-editor/issues/277#issuecomment-1246989125
load_collection_node = get_node_by_process_id(process_graph, "load_collection")
Expand Down
51 changes: 32 additions & 19 deletions tests/test_units.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ def test_collections(get_process_graph, collection_id):
{"process_graph": get_process_graph(collection_id=collection_id, bands=None)},
request_type=ProcessingRequestTypes.SYNC,
)
assert process.evalscript.input_bands == all_bands
assert process.evalscript.input_bands[0]["bands"] == all_bands

example_bands = ["B01", "B02"]
process = Process(
{"process_graph": get_process_graph(collection_id=collection_id, bands=example_bands)},
request_type=ProcessingRequestTypes.SYNC,
)
assert process.evalscript.input_bands == example_bands
assert process.evalscript.input_bands[0]["bands"] == example_bands


@responses.activate
Expand Down Expand Up @@ -672,7 +672,9 @@ def test_get_collection(
{"process_graph": get_process_graph(collection_id=collection_id, featureflags=featureflags)},
request_type=ProcessingRequestTypes.SYNC,
)
assert process.collection.api_id == expected_datacollection_api_id
all_collections = process.get_collections()
collection = all_collections["node_loadco1"]["data_collection"]
assert collection.api_id == expected_datacollection_api_id


@responses.activate
Expand Down Expand Up @@ -720,20 +722,28 @@ def test_sentinel_hub_access_token(access_token):
sh = SentinelHub(user=user)
sh.create_processing_request(
bbox=BBox((1, 2, 3, 4), crs=CRS.WGS84),
collection=DataCollection.SENTINEL2_L2A,
collections={
"node_loadco1": {
"data_collection": DataCollection.SENTINEL2_L2A,
"from_time": datetime.now(),
"to_time": datetime.now(),
}
},
evalscript="",
from_date=datetime.now(),
to_date=datetime.now(),
width=1,
height=1,
mimetype=MimeType.PNG,
)
sh = SentinelHub(user=user)
sh.create_batch_job(
collection=DataCollection.SENTINEL2_L2A,
collections={
"node_loadco1": {
"data_collection": DataCollection.SENTINEL2_L2A,
"from_time": datetime.now(),
"to_time": datetime.now(),
}
},
evalscript="",
from_date=datetime.now(),
to_date=datetime.now(),
tiling_grid_id=1,
tiling_grid_resolution=20,
mimetype=MimeType.PNG,
Expand Down Expand Up @@ -765,7 +775,8 @@ def test_get_maximum_temporal_extent(get_process_graph, collection_id, expected_
process = Process(
{"process_graph": get_process_graph(collection_id=collection_id)}, request_type=ProcessingRequestTypes.SYNC
)
from_time, to_time = process.get_maximum_temporal_extent_for_collection()
load_collection_nodes = process.get_all_load_collection_nodes()
from_time, to_time = process.get_maximum_temporal_extent_for_collection(list(load_collection_nodes.values())[0])

assert expected_from_time == from_time
assert expected_to_time == to_time
Expand Down Expand Up @@ -892,8 +903,8 @@ def test_geojson_parsing(fixture, expected_result):
(
{"params": {"collection_id": "sentinel-2-l1c", "temporal_extent": ["2019-01-01", None]}},
{
"from_date": datetime(2019, 1, 1, tzinfo=timezone.utc),
"to_date": datetime(
"from_time": datetime(2019, 1, 1, tzinfo=timezone.utc),
"to_time": datetime(
current_date.year,
current_date.month,
current_date.day,
Expand All @@ -917,15 +928,15 @@ def test_geojson_parsing(fixture, expected_result):
}
},
{
"from_date": datetime(2018, 10, 1, tzinfo=timezone.utc),
"to_date": datetime(2018, 10, 1, hour=9, minute=59, second=59, microsecond=999999, tzinfo=timezone.utc),
"from_time": datetime(2018, 10, 1, tzinfo=timezone.utc),
"to_time": datetime(2018, 10, 1, hour=9, minute=59, second=59, microsecond=999999, tzinfo=timezone.utc),
},
),
(
{"params": {"collection_id": "mapzen-dem", "temporal_extent": None}},
{
"from_date": datetime(current_date.year, current_date.month, current_date.day, tzinfo=timezone.utc),
"to_date": datetime(
"from_time": datetime(current_date.year, current_date.month, current_date.day, tzinfo=timezone.utc),
"to_time": datetime(
current_date.year,
current_date.month,
current_date.day,
Expand Down Expand Up @@ -963,8 +974,10 @@ def test_temporal_extent(get_process_graph, fixture, expected_result):
},
request_type=ProcessingRequestTypes.SYNC,
)
assert process.from_date == expected_result["from_date"]
assert process.to_date == expected_result["to_date"]
all_collections = process.get_collections()
collection = all_collections["node_loadco1"]
assert collection["from_time"] == expected_result["from_time"]
assert collection["to_time"] == expected_result["to_time"]


@pytest.mark.parametrize(
Expand Down Expand Up @@ -1853,4 +1866,4 @@ def test_bands_metadata(process_graph):
collection_id = node["arguments"]["id"]
bands_metadata = collections.get_collection(collection_id)["summaries"]["eo:bands"]
process = Process({"process_graph": process_graph}, request_type=ProcessingRequestTypes.SYNC)
assert process.evalscript.bands_metadata == bands_metadata
assert process.evalscript.bands_metadata["node_1"] == bands_metadata

0 comments on commit 3d643dc

Please sign in to comment.