Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature - basic data fusion support #59

Merged
merged 45 commits into from
Nov 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
1377c10
parse multiple load collections and mark what still needs to be fixed
zansinergise Nov 3, 2023
708e93f
run linting
zansinergise Nov 3, 2023
be64e94
add error when data fusion is not possible
zansinergise Nov 9, 2023
c7f093c
add support for multiple collections
zansinergise Nov 9, 2023
006a6b4
add first version of data fusion support, probably quite some bugs still
zansinergise Nov 9, 2023
0780be7
Add new packages
zansinergise Nov 9, 2023
4392422
run linting
zansinergise Nov 9, 2023
c141ace
Merge branch 'master' of https://github.com/Open-EO/openeo-sentinelhu…
zansinergise Nov 9, 2023
e5310b0
Merge branch 'master' of https://github.com/Open-EO/openeo-sentinelhu…
zansinergise Nov 10, 2023
e46d11d
remove some comments and add smal fix
zansinergise Nov 10, 2023
0db963d
fix temporal interval
zansinergise Nov 10, 2023
8b44729
add comment what to fix still
zansinergise Nov 10, 2023
11129c1
remove comment
zansinergise Nov 13, 2023
3b8e829
remove comment what to fix
zansinergise Nov 13, 2023
5bcc281
fix get temporal interval function
zansinergise Nov 14, 2023
a184fa4
fix batch job estimate for multiple collections
zansinergise Nov 14, 2023
5be9ae8
update and find byoc load collection node by collection id
zansinergise Nov 14, 2023
4105b85
run linting
zansinergise Nov 14, 2023
3a0faef
temporarily comment out post processing
zansinergise Nov 15, 2023
a8b413a
fix bug
zansinergise Nov 15, 2023
27ac142
update version of pg converter to release candidate
zansinergise Nov 16, 2023
9dfa6d2
add comment on what to maybe fix
zansinergise Nov 16, 2023
3f902ac
correct format of unit test
zansinergise Nov 16, 2023
e8555e7
add missing argument
zansinergise Nov 16, 2023
9003d22
fix failing test for bands metadata
zansinergise Nov 16, 2023
86c80e3
fix format of temporal extent test
zansinergise Nov 16, 2023
2b1395d
fix get maximum temporal extent test
zansinergise Nov 16, 2023
150738a
fix get collection test format
zansinergise Nov 16, 2023
2571fb0
fix sh access token test
zansinergise Nov 16, 2023
8750b3b
run linting
zansinergise Nov 16, 2023
705e344
run linting again
zansinergise Nov 16, 2023
b774b66
remove print statement
zansinergise Nov 16, 2023
9794e3e
add check for different SH deployments for process requests also
zansinergise Nov 17, 2023
10eb416
run linting
zansinergise Nov 17, 2023
cb19ec9
Merge branch 'master' of https://github.com/Open-EO/openeo-sentinelhu…
zansinergise Nov 17, 2023
132bd3e
change variable names from _date to _time
zansinergise Nov 22, 2023
b1b7d0b
Fix not setting correct input bands if any of the load collection nod…
zansinergise Nov 22, 2023
faa1f57
rename to plural function name
zansinergise Nov 22, 2023
2360f7b
add comment for data fusion spatial extent and raise error if spatial…
zansinergise Nov 22, 2023
caebd00
make code more readable when constructiing request dictionary
zansinergise Nov 22, 2023
9346e24
run linting
zansinergise Nov 22, 2023
66f97f1
use latest version of pg-to-evalscript converter
zansinergise Nov 23, 2023
8b340c4
Change wording for error for different spatial extents
zansinergise Nov 23, 2023
b98f625
Merge branch 'master' of https://github.com/Open-EO/openeo-sentinelhu…
zansinergise Nov 23, 2023
edf7e8f
uncomment post processing
zansinergise Nov 23, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion rest/Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ openeo-pg-parser = {editable = true,git = "https://github.com/Ardweaden/openeo-p
requests = "==2.27.1"
flask-cors = "==3.0.10"
honeycomb-beeline = "==3.2.0"
pg-to-evalscript = "==0.2.8"
pg-to-evalscript = "==0.2.9"
sentinelhub = "==3.9.2"
isodate = "==0.6.1"
pyjwt = {extras = ["crypto"], version = "==2.3.0"}
Expand All @@ -33,4 +33,5 @@ placebo = "==0.9.0"
rioxarray = "*"
zarr = "*"
netcdf4 = "*"
pip-tools = "==6.1.0"
werkzeug = "==2.3.7"
601 changes: 303 additions & 298 deletions rest/Pipfile.lock

Large diffs are not rendered by default.

12 changes: 12 additions & 0 deletions rest/openeoerrors.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,15 @@ class InsufficientCredits(SHOpenEOError):
error_code = "InsufficientCredits"
http_code = 402
message = "You do not have enough credits to perform this request. Please visit https://openeo.vito.be/portal/pages/pricing for more information on how to purchase additional credits."


class DataFusionNotPossibleDifferentSHDeployments(SHOpenEOError):
error_code = "DataFusionNotPossible"
http_code = 400
message = "Data fusion not available with selected collections because they are not all available on the same SH deployment."


class DataFusionNotPossibleDifferentSpatialExtents(SHOpenEOError):
error_code = "DataFusionNotPossibleDifferentSpatialExtents"
http_code = 400
message = "Data fusion is possible only if all load_collection processes have the same spatial extent."
219 changes: 147 additions & 72 deletions rest/processing/process.py

Large diffs are not rendered by default.

14 changes: 10 additions & 4 deletions rest/processing/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,12 +187,18 @@ def get_batch_job_estimate(batch_request_id, process, deployment_endpoint):
user_defined_processes=user_defined_processes_graphs,
request_type=ProcessingRequestTypes.BATCH,
)
temporal_interval = p.get_temporal_interval(in_days=True)
temporal_intervals = p.get_temporal_intervals(in_days=True)
average_temporal_interval = 0
for node_id, temporal_interval in temporal_intervals.items():
if temporal_interval is None:
temporal_interval = default_temporal_interval

if temporal_interval is None:
temporal_interval = default_temporal_interval
average_temporal_interval += temporal_interval

estimated_pu = estimate_secure_factor * batch_request.value_estimate * default_temporal_interval / temporal_interval
average_temporal_interval = average_temporal_interval / len(temporal_intervals)
estimated_pu = (
estimate_secure_factor * batch_request.value_estimate * default_temporal_interval / average_temporal_interval
)

n_pixels = batch_request.tile_count * batch_request.tile_width_px * batch_request.tile_height_px
estimated_file_size = p.estimate_file_size(n_pixels=n_pixels)
Expand Down
59 changes: 27 additions & 32 deletions rest/processing/sentinel_hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@ def create_processing_request(
bbox=None,
geometry=None,
epsg_code=None,
collection=None,
collections=None,
evalscript=None,
from_date=None,
to_date=None,
width=None,
height=None,
mimetype=None,
Expand All @@ -44,51 +42,53 @@ def create_processing_request(
bbox=bbox,
geometry=geometry,
epsg_code=epsg_code,
collection=collection,
collections=collections,
evalscript=evalscript,
from_date=from_date,
to_date=to_date,
width=width,
height=height,
mimetype=mimetype,
resampling_method=resampling_method,
)

return ProcessingAPIRequest(
f"{collection.service_url}/api/v1/process", request_raw_dict, user=self.user
).fetch()
f"{list(collections.values())[0]['data_collection'].service_url}/api/v1/process",
request_raw_dict,
user=self.user,
).fetch() # fix this - should this always be SentinelhubDeployments.MAIN as it will then also work for cross-deployment data fusion?

def get_request_dictionary(
self,
bbox=None,
geometry=None,
epsg_code=None,
collection=None,
collections=None,
evalscript=None,
from_date=None,
to_date=None,
width=None,
height=None,
mimetype=None,
resampling_method=None,
preview_mode="EXTENDED_PREVIEW",
):
request_data_items = []
for node_id, collection in collections.items():
request_data_items.append(
{
"id": node_id,
"type": collection["data_collection"].api_id,
"dataFilter": {
"timeRange": {
"from": collection["from_time"].isoformat(),
"to": collection["to_time"].isoformat(),
},
"previewMode": preview_mode,
},
"processing": self.construct_data_processing(resampling_method),
}
)

return {
"input": {
"bounds": self.construct_input_bounds(bbox, epsg_code, geometry),
"data": [
{
"type": collection.api_id,
"dataFilter": {
"timeRange": {
"from": from_date.isoformat(),
"to": to_date.isoformat(),
},
"previewMode": preview_mode,
},
"processing": self.construct_data_processing(resampling_method),
}
],
"data": request_data_items,
},
"output": self.construct_output(width, height, mimetype),
"evalscript": evalscript,
Expand Down Expand Up @@ -128,10 +128,8 @@ def create_batch_job(
bbox=None,
geometry=None,
epsg_code=None,
collection=None,
collections=None,
evalscript=None,
from_date=None,
to_date=None,
tiling_grid_id=None,
tiling_grid_resolution=None,
mimetype=None,
Expand All @@ -141,15 +139,12 @@ def create_batch_job(
bbox=bbox,
geometry=geometry,
epsg_code=epsg_code,
collection=collection,
collections=collections,
evalscript=evalscript,
from_date=from_date,
to_date=to_date,
mimetype=mimetype,
resampling_method=resampling_method,
preview_mode="DETAIL",
)

batch_request = self.batch.create(
request_raw_dict,
tiling_grid=SentinelHubBatch.tiling_grid(
Expand Down
9 changes: 9 additions & 0 deletions rest/processing/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,15 @@ def get_node_by_process_id(process_graph, process_id):
return node


def get_all_load_collection_nodes(process_graph):
nodes = {}
for node_id, node in process_graph.items():
if node["process_id"] == "load_collection":
nodes[node_id] = node

return nodes


def overwrite_spatial_extent_without_parameters(process_graph):
# https://github.com/Open-EO/openeo-web-editor/issues/277#issuecomment-1246989125
load_collection_node = get_node_by_process_id(process_graph, "load_collection")
Expand Down
51 changes: 32 additions & 19 deletions tests/test_units.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ def test_collections(get_process_graph, collection_id):
{"process_graph": get_process_graph(collection_id=collection_id, bands=None)},
request_type=ProcessingRequestTypes.SYNC,
)
assert process.evalscript.input_bands == all_bands
assert process.evalscript.input_bands[0]["bands"] == all_bands

example_bands = ["B01", "B02"]
process = Process(
{"process_graph": get_process_graph(collection_id=collection_id, bands=example_bands)},
request_type=ProcessingRequestTypes.SYNC,
)
assert process.evalscript.input_bands == example_bands
assert process.evalscript.input_bands[0]["bands"] == example_bands


@responses.activate
Expand Down Expand Up @@ -672,7 +672,9 @@ def test_get_collection(
{"process_graph": get_process_graph(collection_id=collection_id, featureflags=featureflags)},
request_type=ProcessingRequestTypes.SYNC,
)
assert process.collection.api_id == expected_datacollection_api_id
all_collections = process.get_collections()
collection = all_collections["node_loadco1"]["data_collection"]
assert collection.api_id == expected_datacollection_api_id


@responses.activate
Expand Down Expand Up @@ -720,20 +722,28 @@ def test_sentinel_hub_access_token(access_token):
sh = SentinelHub(user=user)
sh.create_processing_request(
bbox=BBox((1, 2, 3, 4), crs=CRS.WGS84),
collection=DataCollection.SENTINEL2_L2A,
collections={
"node_loadco1": {
"data_collection": DataCollection.SENTINEL2_L2A,
"from_time": datetime.now(),
"to_time": datetime.now(),
}
},
evalscript="",
from_date=datetime.now(),
to_date=datetime.now(),
width=1,
height=1,
mimetype=MimeType.PNG,
)
sh = SentinelHub(user=user)
sh.create_batch_job(
collection=DataCollection.SENTINEL2_L2A,
collections={
"node_loadco1": {
"data_collection": DataCollection.SENTINEL2_L2A,
"from_time": datetime.now(),
"to_time": datetime.now(),
}
},
evalscript="",
from_date=datetime.now(),
to_date=datetime.now(),
tiling_grid_id=1,
tiling_grid_resolution=20,
mimetype=MimeType.PNG,
Expand Down Expand Up @@ -765,7 +775,8 @@ def test_get_maximum_temporal_extent(get_process_graph, collection_id, expected_
process = Process(
{"process_graph": get_process_graph(collection_id=collection_id)}, request_type=ProcessingRequestTypes.SYNC
)
from_time, to_time = process.get_maximum_temporal_extent_for_collection()
load_collection_nodes = process.get_all_load_collection_nodes()
from_time, to_time = process.get_maximum_temporal_extent_for_collection(list(load_collection_nodes.values())[0])

assert expected_from_time == from_time
assert expected_to_time == to_time
Expand Down Expand Up @@ -892,8 +903,8 @@ def test_geojson_parsing(fixture, expected_result):
(
{"params": {"collection_id": "sentinel-2-l1c", "temporal_extent": ["2019-01-01", None]}},
{
"from_date": datetime(2019, 1, 1, tzinfo=timezone.utc),
"to_date": datetime(
"from_time": datetime(2019, 1, 1, tzinfo=timezone.utc),
"to_time": datetime(
current_date.year,
current_date.month,
current_date.day,
Expand All @@ -917,15 +928,15 @@ def test_geojson_parsing(fixture, expected_result):
}
},
{
"from_date": datetime(2018, 10, 1, tzinfo=timezone.utc),
"to_date": datetime(2018, 10, 1, hour=9, minute=59, second=59, microsecond=999999, tzinfo=timezone.utc),
"from_time": datetime(2018, 10, 1, tzinfo=timezone.utc),
"to_time": datetime(2018, 10, 1, hour=9, minute=59, second=59, microsecond=999999, tzinfo=timezone.utc),
},
),
(
{"params": {"collection_id": "mapzen-dem", "temporal_extent": None}},
{
"from_date": datetime(current_date.year, current_date.month, current_date.day, tzinfo=timezone.utc),
"to_date": datetime(
"from_time": datetime(current_date.year, current_date.month, current_date.day, tzinfo=timezone.utc),
"to_time": datetime(
current_date.year,
current_date.month,
current_date.day,
Expand Down Expand Up @@ -963,8 +974,10 @@ def test_temporal_extent(get_process_graph, fixture, expected_result):
},
request_type=ProcessingRequestTypes.SYNC,
)
assert process.from_date == expected_result["from_date"]
assert process.to_date == expected_result["to_date"]
all_collections = process.get_collections()
collection = all_collections["node_loadco1"]
assert collection["from_time"] == expected_result["from_time"]
assert collection["to_time"] == expected_result["to_time"]


@pytest.mark.parametrize(
Expand Down Expand Up @@ -1853,4 +1866,4 @@ def test_bands_metadata(process_graph):
collection_id = node["arguments"]["id"]
bands_metadata = collections.get_collection(collection_id)["summaries"]["eo:bands"]
process = Process({"process_graph": process_graph}, request_type=ProcessingRequestTypes.SYNC)
assert process.evalscript.bands_metadata == bands_metadata
assert process.evalscript.bands_metadata["node_1"] == bands_metadata