From a217a303662839b75e1a72c017d5f8d3a9f8ba5d Mon Sep 17 00:00:00 2001 From: Jan Van den bosch Date: Tue, 12 Nov 2024 08:46:09 +0100 Subject: [PATCH 01/19] merge points to an actual STAC resource e.g. a Collection file https://github.com/Open-EO/openeo-geopyspark-driver/issues/677 --- openeo_driver/workspace.py | 4 +++- tests/test_workspace.py | 21 +++++++++++++-------- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/openeo_driver/workspace.py b/openeo_driver/workspace.py index e0f0082c..06f213d7 100644 --- a/openeo_driver/workspace.py +++ b/openeo_driver/workspace.py @@ -25,7 +25,9 @@ def __init__(self, root_directory: Path): def import_file(self, file: Path, merge: str, remove_original: bool = False) -> str: merge = os.path.normpath(merge) - subdirectory = merge[1:] if merge.startswith("/") else merge + + # merge points to a file with the STAC Collection; all STAC documents and assets end up in its parent directory + subdirectory = Path(merge[1:] if merge.startswith("/") else merge).parent target_directory = self.root_directory / subdirectory target_directory.relative_to(self.root_directory) # assert target_directory is in root_directory diff --git a/tests/test_workspace.py b/tests/test_workspace.py index ee06dcb6..56a711dc 100644 --- a/tests/test_workspace.py +++ b/tests/test_workspace.py @@ -1,22 +1,27 @@ +from pathlib import Path + import pytest from openeo_driver.workspace import DiskWorkspace -@pytest.mark.parametrize("merge", [ - "subdirectory", - "/subdirectory", - "path/to/subdirectory", - "/path/to/subdirectory", - ".", -]) +@pytest.mark.parametrize( + "merge", + [ + "subdirectory/collection.json", + "/subdirectory/collection.json", + "path/to/subdirectory/collection.json", + "/path/to/subdirectory/collection.json", + "collection.json", + ], +) def test_disk_workspace(tmp_path, merge): source_directory = tmp_path / "src" source_directory.mkdir() source_file = source_directory / "file" source_file.touch() - subdirectory = merge[1:] if merge.startswith("/") else merge + subdirectory = Path(merge[1:] if merge.startswith("/") else merge).parent target_directory = tmp_path / subdirectory workspace = DiskWorkspace(root_directory=tmp_path) From 8fdada45769cc5ddd7a9882b98253387b827f9d1 Mon Sep 17 00:00:00 2001 From: Jan Van den bosch Date: Wed, 13 Nov 2024 16:50:33 +0100 Subject: [PATCH 02/19] merging into an existing collection requires STAC-aware workspace https://github.com/Open-EO/openeo-geopyspark-driver/issues/677 --- openeo_driver/workspace.py | 38 +++++++++++++++++++++++++++- setup.py | 1 + tests/test_workspace.py | 52 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 90 insertions(+), 1 deletion(-) diff --git a/openeo_driver/workspace.py b/openeo_driver/workspace.py index 06f213d7..267aaf7a 100644 --- a/openeo_driver/workspace.py +++ b/openeo_driver/workspace.py @@ -2,8 +2,11 @@ import logging import os.path import shutil -from pathlib import Path +from pathlib import Path, PurePath +from pystac import Collection, STACObject +from pystac.catalog import CatalogType +from pystac.layout import TemplateLayoutStrategy _log = logging.getLogger(__name__) @@ -17,6 +20,12 @@ def import_file(self, file: Path, merge: str, remove_original: bool = False) -> def import_object(self, s3_uri: str, merge: str, remove_original: bool = False): raise NotImplementedError + @abc.abstractmethod + def merge_files(self, stac_resource: Collection, target: PurePath, remove_original: bool = False): + # TODO: use an abstraction like a dict instead? + # TODO: is a PurePath object fine as an abstraction? + raise NotImplementedError + class DiskWorkspace(Workspace): @@ -41,3 +50,30 @@ def import_file(self, file: Path, merge: str, remove_original: bool = False) -> def import_object(self, s3_uri: str, merge: str, remove_original: bool = False): raise NotImplementedError(f"importing objects is not supported yet") + + def merge_files(self, stac_resource: STACObject, target: PurePath, remove_original: bool = False): + # FIXME: merge new $stac_resource with the one in $this_workspace at $target + # FIXME: export STAC resources and assets underneath as well + # FIXME: support remove_original and return equivalent workspace URIs (pass alternate_key and put workspace URIs in "alternate"?) + target = os.path.normpath(target) + target = Path(target[1:] if target.startswith("/") else target) + target = self.root_directory / target + target.relative_to(self.root_directory) # assert target_directory is in root_directory + + target.parent.mkdir(parents=True, exist_ok=True) + + if isinstance(stac_resource, Collection): + # collection ends up in file $target + # items and assets and up in directory $target.parent + for item in stac_resource.get_items(recursive=True): + for asset in item.assets.values(): + shutil.copy(asset.href, target.parent) + asset.href = Path(asset.href).name + + stac_resource.normalize_and_save( + root_href=str(target.parent), + catalog_type=CatalogType.SELF_CONTAINED, + strategy=TemplateLayoutStrategy(collection_template=target.name, item_template="${id}.json"), + ) + else: + raise NotImplementedError(stac_resource) diff --git a/setup.py b/setup.py index 7aafb612..babc98b6 100644 --- a/setup.py +++ b/setup.py @@ -73,6 +73,7 @@ "reretry~=0.11.8", "markdown>3.4", "traceback-with-variables==2.0.4", + "pystac~=1.8.0", ], extras_require={ "dev": tests_require, diff --git a/tests/test_workspace.py b/tests/test_workspace.py index 56a711dc..f9c453a1 100644 --- a/tests/test_workspace.py +++ b/tests/test_workspace.py @@ -1,5 +1,7 @@ +import datetime as dt from pathlib import Path +from pystac import Asset, Collection, Extent, Item, SpatialExtent, TemporalExtent import pytest from openeo_driver.workspace import DiskWorkspace @@ -46,3 +48,53 @@ def test_disk_workspace_remove_original(tmp_path, remove_original): assert (target_directory / source_file.name).exists() assert source_file.exists() != remove_original + + +def test_merge_from_disk_new(tmp_path): + source_directory = tmp_path / "src" + source_directory.mkdir() + source_asset_file = source_directory / "asset.tif" + source_asset_file.touch() + + new_stac_collection = _collection(source_asset_file) + + target = Path("path") / "to" / "collection.json" + + workspace = DiskWorkspace(root_directory=tmp_path) + workspace.merge_files(stac_resource=new_stac_collection, target=target) + + workspace_dir = (workspace.root_directory / target).parent + + merged_stac_collection = Collection.from_file(str(workspace_dir / "collection.json")) + assert merged_stac_collection.validate_all() == 1 + # TODO: check Collection + + assets = [ + asset for item in merged_stac_collection.get_items(recursive=True) for asset in item.get_assets().values() + ] + + assert len(assets) == 1 + + for asset in assets: + asset.copy(str(tmp_path / Path(asset.href).name)) # downloads the asset file + + +def _collection(asset_file: Path) -> Collection: + collection = Collection( + id="somecollection", + description="some description", + extent=Extent(spatial=SpatialExtent([[-180, -90, 180, 90]]), temporal=TemporalExtent([[None, None]])), + ) + + item_id = asset_key = asset_file.name + + item = Item(id=item_id, geometry=None, bbox=None, datetime=dt.datetime.utcnow(), properties={}) + item.add_asset(key=asset_key, asset=Asset(href=str(asset_file))) + + collection.add_item(item) + + return collection + + +def skip_merge_into_existing(): + raise NotImplementedError From d6145108edfaae694229d0c7bdb83d76ec9ac5aa Mon Sep 17 00:00:00 2001 From: Jan Van den bosch Date: Thu, 14 Nov 2024 13:55:58 +0100 Subject: [PATCH 03/19] merge collections https://github.com/Open-EO/openeo-geopyspark-driver/issues/677 --- openeo_driver/workspace.py | 93 +++++++++++++++++++++++++++++++++++--- tests/test_workspace.py | 87 +++++++++++++++++++++++++++-------- 2 files changed, 154 insertions(+), 26 deletions(-) diff --git a/openeo_driver/workspace.py b/openeo_driver/workspace.py index 267aaf7a..949f8d19 100644 --- a/openeo_driver/workspace.py +++ b/openeo_driver/workspace.py @@ -3,8 +3,10 @@ import os.path import shutil from pathlib import Path, PurePath +from typing import Optional -from pystac import Collection, STACObject +import pystac +from pystac import Collection, STACObject, SpatialExtent, TemporalExtent from pystac.catalog import CatalogType from pystac.layout import TemplateLayoutStrategy @@ -52,8 +54,6 @@ def import_object(self, s3_uri: str, merge: str, remove_original: bool = False): raise NotImplementedError(f"importing objects is not supported yet") def merge_files(self, stac_resource: STACObject, target: PurePath, remove_original: bool = False): - # FIXME: merge new $stac_resource with the one in $this_workspace at $target - # FIXME: export STAC resources and assets underneath as well # FIXME: support remove_original and return equivalent workspace URIs (pass alternate_key and put workspace URIs in "alternate"?) target = os.path.normpath(target) target = Path(target[1:] if target.startswith("/") else target) @@ -63,17 +63,96 @@ def merge_files(self, stac_resource: STACObject, target: PurePath, remove_origin target.parent.mkdir(parents=True, exist_ok=True) if isinstance(stac_resource, Collection): + new_collection = stac_resource + + existing_collection = None + try: + existing_collection = pystac.Collection.from_file(str(target)) + except FileNotFoundError: + pass # nothing to merge into + # collection ends up in file $target # items and assets and up in directory $target.parent - for item in stac_resource.get_items(recursive=True): + for item in new_collection.get_items(recursive=True): for asset in item.assets.values(): shutil.copy(asset.href, target.parent) asset.href = Path(asset.href).name - stac_resource.normalize_and_save( + (self + ._merge_collections(existing_collection, new_collection) + .normalize_and_save( root_href=str(target.parent), catalog_type=CatalogType.SELF_CONTAINED, - strategy=TemplateLayoutStrategy(collection_template=target.name, item_template="${id}.json"), - ) + strategy=TemplateLayoutStrategy(collection_template=target.name, item_template="${id}.json") + )) else: raise NotImplementedError(stac_resource) + + def _merge_collections(self, existing_collection: Optional[Collection], new_collection: Collection) -> Collection: + if existing_collection: + existing_collection.extent.spatial = self._merge_spatial_extents( + existing_collection.extent.spatial, + new_collection.extent.spatial + ) + + existing_collection.extent.temporal = self._merge_temporal_extents( + existing_collection.extent.temporal, + new_collection.extent.temporal + ) + + for new_item in new_collection.get_items(recursive=True): + if existing_collection.get_item(new_item.id, recursive=True): + raise ValueError(f"item {new_item.id} is already in collection {existing_collection.id}") + + existing_collection.add_item(new_item, strategy=TemplateLayoutStrategy(item_template="${id}.json")) + return existing_collection + + return new_collection + + def _merge_spatial_extents(self, a: SpatialExtent, b: SpatialExtent) -> SpatialExtent: + overall_bbox_a, *sub_bboxes_a = a.bboxes + overall_bbox_b, *sub_bboxes_b = b.bboxes + + merged_overall_bbox = [ + min(overall_bbox_a[0], overall_bbox_b[0]), + min(overall_bbox_a[1], overall_bbox_b[1]), + max(overall_bbox_a[2], overall_bbox_b[2]), + max(overall_bbox_a[3], overall_bbox_b[3]) + ] + + merged_sub_bboxes = sub_bboxes_a + sub_bboxes_b + + merged_spatial_extent = SpatialExtent([merged_overall_bbox]) + if merged_sub_bboxes: + merged_spatial_extent.bboxes.append(merged_sub_bboxes) + + return merged_spatial_extent + + def _merge_temporal_extents(self, a: TemporalExtent, b: TemporalExtent) -> TemporalExtent: + overall_interval_a, *sub_intervals_a = a.intervals + overall_interval_b, *sub_intervals_b = b.intervals + + def min_time(t1: Optional[str], t2: Optional[str]) -> Optional[str]: + if t1 is None or t2 is None: + return None + + return min(t1, t2) + + def max_time(t1: Optional[str], t2: Optional[str]) -> Optional[str]: + if t1 is None or t2 is None: + return None + + return max(t1, t2) + + merged_overall_interval = [ + min_time(overall_interval_a[0], overall_interval_b[0]), + max_time(overall_interval_a[1], overall_interval_b[1]) + ] + + merged_sub_intervals = sub_intervals_a + sub_intervals_b + + merged_temporal_extent = TemporalExtent([merged_overall_interval]) + if merged_sub_intervals: + merged_temporal_extent.intervals.append(merged_sub_intervals) + + return merged_temporal_extent diff --git a/tests/test_workspace.py b/tests/test_workspace.py index f9c453a1..4ab89eec 100644 --- a/tests/test_workspace.py +++ b/tests/test_workspace.py @@ -1,5 +1,6 @@ import datetime as dt from pathlib import Path +from typing import List from pystac import Asset, Collection, Extent, Item, SpatialExtent, TemporalExtent import pytest @@ -53,10 +54,11 @@ def test_disk_workspace_remove_original(tmp_path, remove_original): def test_merge_from_disk_new(tmp_path): source_directory = tmp_path / "src" source_directory.mkdir() - source_asset_file = source_directory / "asset.tif" - source_asset_file.touch() - new_stac_collection = _collection(source_asset_file) + asset_file = source_directory / "asset.tif" + asset_file.touch() + + new_stac_collection = _collection("collection", [asset_file]) target = Path("path") / "to" / "collection.json" @@ -69,32 +71,79 @@ def test_merge_from_disk_new(tmp_path): assert merged_stac_collection.validate_all() == 1 # TODO: check Collection - assets = [ - asset for item in merged_stac_collection.get_items(recursive=True) for asset in item.get_assets().values() - ] + assert _download_assets(merged_stac_collection, target_dir=tmp_path) == 1 - assert len(assets) == 1 - for asset in assets: - asset.copy(str(tmp_path / Path(asset.href).name)) # downloads the asset file +def test_merge_from_disk_into_existing(tmp_path): + source_directory = tmp_path / "src" + source_directory.mkdir() + + asset_file1 = source_directory / "asset1.tif" + asset_file1.touch() + asset_file2 = source_directory / "asset2.tif" + asset_file2.touch() -def _collection(asset_file: Path) -> Collection: + existing_stac_collection = _collection( + "existing_collection", + [asset_file1], + spatial_extent=SpatialExtent([[0, 50, 2, 52]]), + temporal_extent=TemporalExtent([[dt.datetime.fromisoformat("2024-11-01T00:00:00+00:00"), + dt.datetime.fromisoformat("2024-11-03T00:00:00+00:00")]]) + ) + new_stac_collection = _collection( + "new_collection", + [asset_file2], + spatial_extent=SpatialExtent([[1, 51, 3, 53]]), + temporal_extent=TemporalExtent([[dt.datetime.fromisoformat("2024-11-02T00:00:00+00:00"), + dt.datetime.fromisoformat("2024-11-04T00:00:00+00:00")]]) + ) + + target = Path("path") / "to" / "collection.json" + + workspace = DiskWorkspace(root_directory=tmp_path) + workspace.merge_files(stac_resource=existing_stac_collection, target=target) + workspace.merge_files(stac_resource=new_stac_collection, target=target) + + workspace_dir = (workspace.root_directory / target).parent + + merged_stac_collection = Collection.from_file(str(workspace_dir / "collection.json")) + assert merged_stac_collection.validate_all() == 2 + + assert _download_assets(merged_stac_collection, target_dir=tmp_path) == 2 + + assert merged_stac_collection.extent.spatial.bboxes == [[0, 50, 3, 53]] + assert merged_stac_collection.extent.temporal.intervals == [ + [dt.datetime.fromisoformat("2024-11-01T00:00:00+00:00"), + dt.datetime.fromisoformat("2024-11-04T00:00:00+00:00")] + ] + + +def _collection(collection_id: str, + asset_files: List[Path], + spatial_extent: SpatialExtent = SpatialExtent([[-180, -90, 180, 90]]), + temporal_extent: TemporalExtent = TemporalExtent([[None, None]])) -> Collection: collection = Collection( - id="somecollection", - description="some description", - extent=Extent(spatial=SpatialExtent([[-180, -90, 180, 90]]), temporal=TemporalExtent([[None, None]])), + id=collection_id, + description=collection_id, + extent=Extent(spatial=spatial_extent, temporal=temporal_extent), ) - item_id = asset_key = asset_file.name + for asset_file in asset_files: + item_id = asset_key = asset_file.name - item = Item(id=item_id, geometry=None, bbox=None, datetime=dt.datetime.utcnow(), properties={}) - item.add_asset(key=asset_key, asset=Asset(href=str(asset_file))) + item = Item(id=item_id, geometry=None, bbox=None, datetime=dt.datetime.utcnow(), properties={}) + item.add_asset(key=asset_key, asset=Asset(href=str(asset_file))) - collection.add_item(item) + collection.add_item(item) return collection -def skip_merge_into_existing(): - raise NotImplementedError +def _download_assets(collection: Collection, target_dir: Path) -> int: + assets = [asset for item in collection.get_items(recursive=True) for asset in item.get_assets().values()] + + for asset in assets: + asset.copy(str(target_dir / Path(asset.href).name)) # downloads the asset file + + return len(assets) From 866a73d993fec06604edd5e315106917403fb3f6 Mon Sep 17 00:00:00 2001 From: Jan Van den bosch Date: Tue, 19 Nov 2024 11:08:47 +0100 Subject: [PATCH 04/19] support remove_original https://github.com/Open-EO/openeo-geopyspark-driver/issues/677 --- openeo_driver/workspace.py | 54 +++++++++++++++++++++++++------------- 1 file changed, 36 insertions(+), 18 deletions(-) diff --git a/openeo_driver/workspace.py b/openeo_driver/workspace.py index 949f8d19..5622ad61 100644 --- a/openeo_driver/workspace.py +++ b/openeo_driver/workspace.py @@ -8,7 +8,7 @@ import pystac from pystac import Collection, STACObject, SpatialExtent, TemporalExtent from pystac.catalog import CatalogType -from pystac.layout import TemplateLayoutStrategy +from pystac.layout import TemplateLayoutStrategy, HrefLayoutStrategy, CustomLayoutStrategy _log = logging.getLogger(__name__) @@ -23,8 +23,7 @@ def import_object(self, s3_uri: str, merge: str, remove_original: bool = False): raise NotImplementedError @abc.abstractmethod - def merge_files(self, stac_resource: Collection, target: PurePath, remove_original: bool = False): - # TODO: use an abstraction like a dict instead? + def merge_files(self, stac_resource: STACObject, target: PurePath, remove_original: bool = False) -> STACObject: # TODO: is a PurePath object fine as an abstraction? raise NotImplementedError @@ -53,14 +52,16 @@ def import_file(self, file: Path, merge: str, remove_original: bool = False) -> def import_object(self, s3_uri: str, merge: str, remove_original: bool = False): raise NotImplementedError(f"importing objects is not supported yet") - def merge_files(self, stac_resource: STACObject, target: PurePath, remove_original: bool = False): - # FIXME: support remove_original and return equivalent workspace URIs (pass alternate_key and put workspace URIs in "alternate"?) + def merge_files(self, stac_resource: STACObject, target: PurePath, remove_original: bool = False) -> STACObject: + stac_resource = stac_resource.clone() + target = os.path.normpath(target) target = Path(target[1:] if target.startswith("/") else target) target = self.root_directory / target target.relative_to(self.root_directory) # assert target_directory is in root_directory target.parent.mkdir(parents=True, exist_ok=True) + file_operation = shutil.move if remove_original else shutil.copy if isinstance(stac_resource, Collection): new_collection = stac_resource @@ -71,20 +72,37 @@ def merge_files(self, stac_resource: STACObject, target: PurePath, remove_origin except FileNotFoundError: pass # nothing to merge into - # collection ends up in file $target - # items and assets and up in directory $target.parent - for item in new_collection.get_items(recursive=True): + merged_collection = self._merge_collections(existing_collection, new_collection) + + def layout_strategy() -> HrefLayoutStrategy: + def collection_file_at_merge(col: Collection, parent_dir: str, is_root: bool) -> str: + if not is_root: + raise ValueError("nested collections are not supported") + return str(target) + + return CustomLayoutStrategy(collection_func=collection_file_at_merge) + + # TODO: write to a tempdir, then copy/move everything to $merge? + merged_collection.normalize_hrefs(root_href=str(target), strategy=layout_strategy()) + + def with_href_relative_to_item(asset: pystac.Asset): + # TODO: is crummy way to export assets after STAC Collection has been written to disk with new asset hrefs + asset.extra_fields["_original_absolute_href"] = asset.get_absolute_href() + filename = Path(asset.href).name + asset.href = filename + return asset + + # save STAC with proper asset hrefs + merged_collection = merged_collection.map_assets(lambda _, asset: with_href_relative_to_item(asset)) + merged_collection.save(catalog_type=CatalogType.SELF_CONTAINED) + + # copy assets to workspace on disk + for item in merged_collection.get_items(recursive=True): for asset in item.assets.values(): - shutil.copy(asset.href, target.parent) - asset.href = Path(asset.href).name - - (self - ._merge_collections(existing_collection, new_collection) - .normalize_and_save( - root_href=str(target.parent), - catalog_type=CatalogType.SELF_CONTAINED, - strategy=TemplateLayoutStrategy(collection_template=target.name, item_template="${id}.json") - )) + file_operation(asset.extra_fields["_original_absolute_href"], Path(item.get_self_href()).parent) + workspace_uri = f"file:{Path(item.get_self_href()).parent / Path(asset.href).name}" + asset.extra_fields["alternate"] = {"file": workspace_uri} + return merged_collection else: raise NotImplementedError(stac_resource) From 7afedcf8f1d24671dd0885151ac8e6edadec58f1 Mon Sep 17 00:00:00 2001 From: Jan Van den bosch Date: Wed, 20 Nov 2024 11:56:12 +0100 Subject: [PATCH 05/19] fix tests https://github.com/Open-EO/openeo-geopyspark-driver/issues/677 --- tests/test_workspace.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/test_workspace.py b/tests/test_workspace.py index 4ab89eec..47b15907 100644 --- a/tests/test_workspace.py +++ b/tests/test_workspace.py @@ -2,7 +2,7 @@ from pathlib import Path from typing import List -from pystac import Asset, Collection, Extent, Item, SpatialExtent, TemporalExtent +from pystac import Asset, Collection, Extent, Item, SpatialExtent, TemporalExtent, CatalogType import pytest from openeo_driver.workspace import DiskWorkspace @@ -127,6 +127,7 @@ def _collection(collection_id: str, id=collection_id, description=collection_id, extent=Extent(spatial=spatial_extent, temporal=temporal_extent), + catalog_type=CatalogType.ABSOLUTE_PUBLISHED, ) for asset_file in asset_files: @@ -137,6 +138,9 @@ def _collection(collection_id: str, collection.add_item(item) + collection.normalize_hrefs(".") + assert collection.validate_all() == 1 + return collection From 101d8becf0ee7e512e56ca5918e50c9ee07b75df Mon Sep 17 00:00:00 2001 From: Jan Van den bosch Date: Thu, 21 Nov 2024 09:01:51 +0100 Subject: [PATCH 06/19] fix implementation https://github.com/Open-EO/openeo-geopyspark-driver/issues/677 --- tests/test_workspace.py | 60 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/tests/test_workspace.py b/tests/test_workspace.py index 47b15907..324977d2 100644 --- a/tests/test_workspace.py +++ b/tests/test_workspace.py @@ -4,6 +4,7 @@ from pystac import Asset, Collection, Extent, Item, SpatialExtent, TemporalExtent, CatalogType import pytest +from pystac.layout import CustomLayoutStrategy from openeo_driver.workspace import DiskWorkspace @@ -151,3 +152,62 @@ def _download_assets(collection: Collection, target_dir: Path) -> int: asset.copy(str(target_dir / Path(asset.href).name)) # downloads the asset file return len(assets) + + +def test_create_and_export_collection(tmp_path): + def create_collection(root_href: str, collection_id: str, item_id: str) -> Collection: + collection = Collection( + id=collection_id, + description=collection_id, + extent=Extent(SpatialExtent([[-180, -90, 180, 90]]), TemporalExtent([[None, None]])), + ) + + collection.add_item(Item(id=item_id, geometry=None, bbox=None, datetime=dt.datetime.utcnow(), properties={})) + + collection.normalize_hrefs(root_href=root_href) + # collection.save(CatalogType.SELF_CONTAINED) + assert collection.validate_all() == 1 + + return collection + + # write collection1 + collection1 = create_collection( + root_href="/tmp/test_create_and_export_collection/src/collection1", collection_id="collection1", item_id="item1" + ) + + # export collection1 + exported_collection = collection1.full_copy() + merge = Path("/tmp/test_create_and_export_collection/dst/merged-collection.json") + + def collection_func(col: Collection, parent_dir: str, is_root: bool) -> str: + assert is_root + return str(Path(parent_dir) / merge.name) + + layout_strategy = CustomLayoutStrategy(collection_func=collection_func) + exported_collection.normalize_hrefs(root_href=str(merge.parent), strategy=layout_strategy) + exported_collection.save(CatalogType.SELF_CONTAINED) + assert exported_collection.validate_all() == 1 + + # write collection2 + collection2 = create_collection( + root_href="/tmp/test_create_and_export_collection/src/collection2", collection_id="collection2", item_id="item2" + ) + + # merge collection2 + existing_collection = Collection.from_file(str(merge)) + assert existing_collection.validate_all() == 1 + + new_collection = collection2.full_copy() + existing_collection.extent = new_collection.extent.clone() # "merge" existing with new extent + existing_collection.description = f"{existing_collection.description} + {new_collection.description}" + for new_item in new_collection.get_items(): # add new items to existing + existing_collection.add_item(new_item) + + existing_collection.normalize_hrefs(root_href=str(merge.parent), strategy=layout_strategy) + existing_collection.save(CatalogType.SELF_CONTAINED) + assert existing_collection.validate_all() == 2 + + merged_collection = Collection.from_file(str(merge)) + assert merged_collection.validate_all() == 2 + assert merged_collection.id == "collection1" + assert merged_collection.description == "collection1 + collection2" From 76b4faa064fd05c33093fab6320d7ab85e0bb447 Mon Sep 17 00:00:00 2001 From: Jan Van den bosch Date: Thu, 21 Nov 2024 09:07:35 +0100 Subject: [PATCH 07/19] use tmp_path https://github.com/Open-EO/openeo-geopyspark-driver/issues/677 --- tests/test_workspace.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/tests/test_workspace.py b/tests/test_workspace.py index 324977d2..26e7443d 100644 --- a/tests/test_workspace.py +++ b/tests/test_workspace.py @@ -155,7 +155,9 @@ def _download_assets(collection: Collection, target_dir: Path) -> int: def test_create_and_export_collection(tmp_path): - def create_collection(root_href: str, collection_id: str, item_id: str) -> Collection: + # tmp_path = Path("/tmp/test_create_and_export_collection") + + def create_collection(root_href: Path, collection_id: str, item_id: str) -> Collection: collection = Collection( id=collection_id, description=collection_id, @@ -164,7 +166,7 @@ def create_collection(root_href: str, collection_id: str, item_id: str) -> Colle collection.add_item(Item(id=item_id, geometry=None, bbox=None, datetime=dt.datetime.utcnow(), properties={})) - collection.normalize_hrefs(root_href=root_href) + collection.normalize_hrefs(root_href=str(root_href)) # collection.save(CatalogType.SELF_CONTAINED) assert collection.validate_all() == 1 @@ -172,12 +174,12 @@ def create_collection(root_href: str, collection_id: str, item_id: str) -> Colle # write collection1 collection1 = create_collection( - root_href="/tmp/test_create_and_export_collection/src/collection1", collection_id="collection1", item_id="item1" + root_href=tmp_path / "src" / "collection1", collection_id="collection1", item_id="item1" ) # export collection1 exported_collection = collection1.full_copy() - merge = Path("/tmp/test_create_and_export_collection/dst/merged-collection.json") + merge = tmp_path / "dst" / "merged-collection.json" def collection_func(col: Collection, parent_dir: str, is_root: bool) -> str: assert is_root @@ -190,7 +192,7 @@ def collection_func(col: Collection, parent_dir: str, is_root: bool) -> str: # write collection2 collection2 = create_collection( - root_href="/tmp/test_create_and_export_collection/src/collection2", collection_id="collection2", item_id="item2" + root_href=tmp_path / "src" / "collection2", collection_id="collection2", item_id="item2" ) # merge collection2 From afee6b5f589911e09b221e43cffe6707f3413cb7 Mon Sep 17 00:00:00 2001 From: Jan Van den bosch Date: Thu, 21 Nov 2024 09:46:50 +0100 Subject: [PATCH 08/19] add asset with absolute href https://github.com/Open-EO/openeo-geopyspark-driver/issues/677 --- tests/test_workspace.py | 29 +++++++++++++++++++++++------ 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/tests/test_workspace.py b/tests/test_workspace.py index 26e7443d..aedc1f70 100644 --- a/tests/test_workspace.py +++ b/tests/test_workspace.py @@ -157,24 +157,41 @@ def _download_assets(collection: Collection, target_dir: Path) -> int: def test_create_and_export_collection(tmp_path): # tmp_path = Path("/tmp/test_create_and_export_collection") - def create_collection(root_href: Path, collection_id: str, item_id: str) -> Collection: + tmp_dir = tmp_path / "tmp" + tmp_dir.mkdir() + + def create_collection(root_path: Path, collection_id: str, asset_filename: str) -> Collection: collection = Collection( id=collection_id, description=collection_id, extent=Extent(SpatialExtent([[-180, -90, 180, 90]]), TemporalExtent([[None, None]])), ) - collection.add_item(Item(id=item_id, geometry=None, bbox=None, datetime=dt.datetime.utcnow(), properties={})) + item = Item(id=asset_filename, geometry=None, bbox=None, datetime=dt.datetime.utcnow(), properties={}) + asset = Asset(href=str(root_path / item.id / asset_filename)) + + item.add_asset(key=asset_filename, asset=asset) + collection.add_item(item) + + collection.normalize_hrefs(root_href=str(root_path)) + collection.save(CatalogType.SELF_CONTAINED) + + with open(asset.href, "w") as f: + f.write(f"{asset_filename}\n") - collection.normalize_hrefs(root_href=str(root_href)) - # collection.save(CatalogType.SELF_CONTAINED) assert collection.validate_all() == 1 + assets = {asset_key: asset for item in collection.get_items() for asset_key, asset in item.get_assets().items()} + assert assets + + for asset_key, asset in assets.items(): + asset.clone().copy(str(tmp_dir / asset_key)) + return collection # write collection1 collection1 = create_collection( - root_href=tmp_path / "src" / "collection1", collection_id="collection1", item_id="item1" + root_path=tmp_path / "src" / "collection1", collection_id="collection1", asset_filename="asset1.tif" ) # export collection1 @@ -192,7 +209,7 @@ def collection_func(col: Collection, parent_dir: str, is_root: bool) -> str: # write collection2 collection2 = create_collection( - root_href=tmp_path / "src" / "collection2", collection_id="collection2", item_id="item2" + root_path=tmp_path / "src" / "collection2", collection_id="collection2", asset_filename="asset2.tif" ) # merge collection2 From 982f159b22c561a0a76c93403229221cfbb48d8c Mon Sep 17 00:00:00 2001 From: Jan Van den bosch Date: Thu, 21 Nov 2024 10:04:33 +0100 Subject: [PATCH 09/19] relative href for asset https://github.com/Open-EO/openeo-geopyspark-driver/issues/677 --- tests/test_workspace.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/tests/test_workspace.py b/tests/test_workspace.py index aedc1f70..f5ad1d9a 100644 --- a/tests/test_workspace.py +++ b/tests/test_workspace.py @@ -1,4 +1,5 @@ import datetime as dt +import shutil from pathlib import Path from typing import List @@ -168,7 +169,9 @@ def create_collection(root_path: Path, collection_id: str, asset_filename: str) ) item = Item(id=asset_filename, geometry=None, bbox=None, datetime=dt.datetime.utcnow(), properties={}) - asset = Asset(href=str(root_path / item.id / asset_filename)) + + asset_path = root_path / item.id / asset_filename + asset = Asset(href=asset_path.name) # relative to item item.add_asset(key=asset_filename, asset=asset) collection.add_item(item) @@ -176,7 +179,7 @@ def create_collection(root_path: Path, collection_id: str, asset_filename: str) collection.normalize_hrefs(root_href=str(root_path)) collection.save(CatalogType.SELF_CONTAINED) - with open(asset.href, "w") as f: + with open(asset_path, "w") as f: f.write(f"{asset_filename}\n") assert collection.validate_all() == 1 @@ -185,7 +188,8 @@ def create_collection(root_path: Path, collection_id: str, asset_filename: str) assert assets for asset_key, asset in assets.items(): - asset.clone().copy(str(tmp_dir / asset_key)) + # "download" the asset without altering its href + shutil.copy(asset.get_absolute_href(), tmp_dir / asset_filename) return collection From 074cd98572c0df32d04112d8ea78a0e67cbe2120 Mon Sep 17 00:00:00 2001 From: Jan Van den bosch Date: Thu, 21 Nov 2024 10:29:56 +0100 Subject: [PATCH 10/19] copy exported assets to workspace https://github.com/Open-EO/openeo-geopyspark-driver/issues/677 --- tests/test_workspace.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tests/test_workspace.py b/tests/test_workspace.py index f5ad1d9a..a319a6f2 100644 --- a/tests/test_workspace.py +++ b/tests/test_workspace.py @@ -211,6 +211,11 @@ def collection_func(col: Collection, parent_dir: str, is_root: bool) -> str: exported_collection.save(CatalogType.SELF_CONTAINED) assert exported_collection.validate_all() == 1 + # TODO: adapt hrefs (should point to file in item directory) + for item in exported_collection.get_items(): + for asset in item.get_assets().values(): + shutil.copy(asset.get_absolute_href(), Path(item.get_self_href()).parent) # next to the item + # write collection2 collection2 = create_collection( root_path=tmp_path / "src" / "collection2", collection_id="collection2", asset_filename="asset2.tif" @@ -230,7 +235,14 @@ def collection_func(col: Collection, parent_dir: str, is_root: bool) -> str: existing_collection.save(CatalogType.SELF_CONTAINED) assert existing_collection.validate_all() == 2 + # TODO: adapt hrefs (should point to file in item directory) + for item in new_collection.get_items(): + for asset in item.get_assets().values(): + shutil.copy(asset.get_absolute_href(), Path(item.get_self_href()).parent) # next to the item + merged_collection = Collection.from_file(str(merge)) assert merged_collection.validate_all() == 2 assert merged_collection.id == "collection1" assert merged_collection.description == "collection1 + collection2" + + # TODO: check asset hrefs are relative to items and download them From d47c10e37f9e266fba28b231681a048a68ed7a72 Mon Sep 17 00:00:00 2001 From: Jan Van den bosch Date: Thu, 21 Nov 2024 14:28:04 +0100 Subject: [PATCH 11/19] point asset hrefs to workspace https://github.com/Open-EO/openeo-geopyspark-driver/issues/677 --- tests/test_workspace.py | 53 ++++++++++++++++++++++++++--------------- 1 file changed, 34 insertions(+), 19 deletions(-) diff --git a/tests/test_workspace.py b/tests/test_workspace.py index a319a6f2..1428449f 100644 --- a/tests/test_workspace.py +++ b/tests/test_workspace.py @@ -1,5 +1,6 @@ import datetime as dt import shutil +import uuid from pathlib import Path from typing import List @@ -73,7 +74,7 @@ def test_merge_from_disk_new(tmp_path): assert merged_stac_collection.validate_all() == 1 # TODO: check Collection - assert _download_assets(merged_stac_collection, target_dir=tmp_path) == 1 + assert _downloadable_assets(merged_stac_collection, target_dir=tmp_path) == 1 def test_merge_from_disk_into_existing(tmp_path): @@ -112,7 +113,7 @@ def test_merge_from_disk_into_existing(tmp_path): merged_stac_collection = Collection.from_file(str(workspace_dir / "collection.json")) assert merged_stac_collection.validate_all() == 2 - assert _download_assets(merged_stac_collection, target_dir=tmp_path) == 2 + assert _downloadable_assets(merged_stac_collection, target_dir=tmp_path) == 2 assert merged_stac_collection.extent.spatial.bboxes == [[0, 50, 3, 53]] assert merged_stac_collection.extent.temporal.intervals == [ @@ -146,11 +147,13 @@ def _collection(collection_id: str, return collection -def _download_assets(collection: Collection, target_dir: Path) -> int: +def _downloadable_assets(collection: Collection, target_dir: Path) -> int: assets = [asset for item in collection.get_items(recursive=True) for asset in item.get_assets().values()] for asset in assets: - asset.copy(str(target_dir / Path(asset.href).name)) # downloads the asset file + target_path = target_dir / str(uuid.uuid4()) + shutil.copy(asset.get_absolute_href(), target_path) # "download" the asset without altering its href + target_path.unlink() return len(assets) @@ -183,13 +186,7 @@ def create_collection(root_path: Path, collection_id: str, asset_filename: str) f.write(f"{asset_filename}\n") assert collection.validate_all() == 1 - - assets = {asset_key: asset for item in collection.get_items() for asset_key, asset in item.get_assets().items()} - assert assets - - for asset_key, asset in assets.items(): - # "download" the asset without altering its href - shutil.copy(asset.get_absolute_href(), tmp_dir / asset_filename) + assert _downloadable_assets(collection, tmp_dir) == 1 return collection @@ -208,41 +205,59 @@ def collection_func(col: Collection, parent_dir: str, is_root: bool) -> str: layout_strategy = CustomLayoutStrategy(collection_func=collection_func) exported_collection.normalize_hrefs(root_href=str(merge.parent), strategy=layout_strategy) + + def replace_asset_href(asset_key: str, asset: Asset) -> Asset: + asset.extra_fields["_original_absolute_href"] = asset.get_absolute_href() + asset.href = asset_key # asset key matches the asset filename, becomes the relative path + return asset + + exported_collection = exported_collection.map_assets(replace_asset_href) + exported_collection.save(CatalogType.SELF_CONTAINED) assert exported_collection.validate_all() == 1 - # TODO: adapt hrefs (should point to file in item directory) for item in exported_collection.get_items(): for asset in item.get_assets().values(): - shutil.copy(asset.get_absolute_href(), Path(item.get_self_href()).parent) # next to the item + shutil.copy(asset.extra_fields["_original_absolute_href"], Path(item.get_self_href()).parent) # write collection2 collection2 = create_collection( root_path=tmp_path / "src" / "collection2", collection_id="collection2", asset_filename="asset2.tif" ) - # merge collection2 + # merge collection2 into existing existing_collection = Collection.from_file(str(merge)) assert existing_collection.validate_all() == 1 new_collection = collection2.full_copy() - existing_collection.extent = new_collection.extent.clone() # "merge" existing with new extent + + # "merge" some properties + existing_collection.extent = new_collection.extent.clone() existing_collection.description = f"{existing_collection.description} + {new_collection.description}" - for new_item in new_collection.get_items(): # add new items to existing + + # new_collection.make_all_asset_hrefs_absolute() + new_collection = new_collection.map_assets(replace_asset_href) + + # add new items to existing + for new_item in new_collection.get_items(): + new_item.clear_links() # sever ties with previous collection existing_collection.add_item(new_item) existing_collection.normalize_hrefs(root_href=str(merge.parent), strategy=layout_strategy) existing_collection.save(CatalogType.SELF_CONTAINED) assert existing_collection.validate_all() == 2 - # TODO: adapt hrefs (should point to file in item directory) for item in new_collection.get_items(): for asset in item.get_assets().values(): - shutil.copy(asset.get_absolute_href(), Path(item.get_self_href()).parent) # next to the item + shutil.copy(asset.extra_fields["_original_absolute_href"], Path(item.get_self_href()).parent) merged_collection = Collection.from_file(str(merge)) assert merged_collection.validate_all() == 2 assert merged_collection.id == "collection1" assert merged_collection.description == "collection1 + collection2" - # TODO: check asset hrefs are relative to items and download them + for item in merged_collection.get_items(): + for asset in item.get_assets().values(): + assert Path(item.get_self_href()).parent == Path(asset.get_absolute_href()).parent + + assert _downloadable_assets(merged_collection, tmp_dir) == 2 From 9e93496320c74f4cef19ae23de29e940420cd124 Mon Sep 17 00:00:00 2001 From: Jan Van den bosch Date: Fri, 22 Nov 2024 12:30:28 +0100 Subject: [PATCH 12/19] incorporate new implementation https://github.com/Open-EO/openeo-geopyspark-driver/issues/677 --- openeo_driver/workspace.py | 99 ++++++++++++++++-------------- tests/test_workspace.py | 121 ++++++++++++++----------------------- 2 files changed, 102 insertions(+), 118 deletions(-) diff --git a/openeo_driver/workspace.py b/openeo_driver/workspace.py index 5622ad61..82253ff8 100644 --- a/openeo_driver/workspace.py +++ b/openeo_driver/workspace.py @@ -53,14 +53,13 @@ def import_object(self, s3_uri: str, merge: str, remove_original: bool = False): raise NotImplementedError(f"importing objects is not supported yet") def merge_files(self, stac_resource: STACObject, target: PurePath, remove_original: bool = False) -> STACObject: - stac_resource = stac_resource.clone() + stac_resource = stac_resource.full_copy() target = os.path.normpath(target) target = Path(target[1:] if target.startswith("/") else target) target = self.root_directory / target target.relative_to(self.root_directory) # assert target_directory is in root_directory - target.parent.mkdir(parents=True, exist_ok=True) file_operation = shutil.move if remove_original else shutil.copy if isinstance(stac_resource, Collection): @@ -72,60 +71,72 @@ def merge_files(self, stac_resource: STACObject, target: PurePath, remove_origin except FileNotFoundError: pass # nothing to merge into - merged_collection = self._merge_collections(existing_collection, new_collection) - - def layout_strategy() -> HrefLayoutStrategy: - def collection_file_at_merge(col: Collection, parent_dir: str, is_root: bool) -> str: + def href_layout_strategy() -> HrefLayoutStrategy: + def collection_func(_: Collection, parent_dir: str, is_root: bool) -> str: if not is_root: - raise ValueError("nested collections are not supported") - return str(target) - - return CustomLayoutStrategy(collection_func=collection_file_at_merge) + raise NotImplementedError("nested collections") + # make the collection file end up at $target, not at $target/collection.json + return str(Path(parent_dir) / target.name) - # TODO: write to a tempdir, then copy/move everything to $merge? - merged_collection.normalize_hrefs(root_href=str(target), strategy=layout_strategy()) + return CustomLayoutStrategy(collection_func=collection_func) - def with_href_relative_to_item(asset: pystac.Asset): - # TODO: is crummy way to export assets after STAC Collection has been written to disk with new asset hrefs + def replace_asset_href(asset_key: str, asset: pystac.Asset) -> pystac.Asset: + # TODO: crummy way to export assets after STAC Collection has been written to disk with new asset hrefs; + # it ends up in the asset metadata on disk asset.extra_fields["_original_absolute_href"] = asset.get_absolute_href() - filename = Path(asset.href).name - asset.href = filename + asset.href = asset_key # asset key matches the asset filename, becomes the relative path return asset - # save STAC with proper asset hrefs - merged_collection = merged_collection.map_assets(lambda _, asset: with_href_relative_to_item(asset)) - merged_collection.save(catalog_type=CatalogType.SELF_CONTAINED) - - # copy assets to workspace on disk - for item in merged_collection.get_items(recursive=True): - for asset in item.assets.values(): - file_operation(asset.extra_fields["_original_absolute_href"], Path(item.get_self_href()).parent) - workspace_uri = f"file:{Path(item.get_self_href()).parent / Path(asset.href).name}" - asset.extra_fields["alternate"] = {"file": workspace_uri} - return merged_collection + if not existing_collection: + # TODO: write to a tempdir, then copy/move everything to $merge? + new_collection.normalize_hrefs(root_href=str(target.parent), strategy=href_layout_strategy()) + new_collection = new_collection.map_assets(replace_asset_href) + new_collection.save(CatalogType.SELF_CONTAINED) + + for item in new_collection.get_items(): + for asset in item.get_assets().values(): + file_operation( + asset.extra_fields["_original_absolute_href"], str(Path(item.get_self_href()).parent) + ) + + return new_collection + else: + merged_collection = self._merge_collection_metadata(existing_collection, new_collection) + new_collection = new_collection.map_assets(replace_asset_href) + + for new_item in new_collection.get_items(): + if existing_collection.get_item(new_item.id): + # TODO: how to treat duplicate items? + raise ValueError(f"item {new_item.id} is already in collection {existing_collection.id}") + + new_item.clear_links() # sever ties with previous collection + merged_collection.add_item(new_item) + + merged_collection.normalize_hrefs(root_href=str(target.parent), strategy=href_layout_strategy()) + merged_collection.save(CatalogType.SELF_CONTAINED) + + for new_item in new_collection.get_items(): + for asset in new_item.get_assets().values(): + file_operation( + asset.extra_fields["_original_absolute_href"], Path(new_item.get_self_href()).parent + ) + + return merged_collection else: raise NotImplementedError(stac_resource) - def _merge_collections(self, existing_collection: Optional[Collection], new_collection: Collection) -> Collection: - if existing_collection: - existing_collection.extent.spatial = self._merge_spatial_extents( - existing_collection.extent.spatial, - new_collection.extent.spatial - ) - - existing_collection.extent.temporal = self._merge_temporal_extents( - existing_collection.extent.temporal, - new_collection.extent.temporal - ) + def _merge_collection_metadata(self, existing_collection: Collection, new_collection: Collection) -> Collection: + existing_collection.extent.spatial = self._merge_spatial_extents( + existing_collection.extent.spatial, new_collection.extent.spatial + ) - for new_item in new_collection.get_items(recursive=True): - if existing_collection.get_item(new_item.id, recursive=True): - raise ValueError(f"item {new_item.id} is already in collection {existing_collection.id}") + existing_collection.extent.temporal = self._merge_temporal_extents( + existing_collection.extent.temporal, new_collection.extent.temporal + ) - existing_collection.add_item(new_item, strategy=TemplateLayoutStrategy(item_template="${id}.json")) - return existing_collection + # TODO: merge additional metadata? - return new_collection + return existing_collection def _merge_spatial_extents(self, a: SpatialExtent, b: SpatialExtent) -> SpatialExtent: overall_bbox_a, *sub_bboxes_a = a.bboxes diff --git a/tests/test_workspace.py b/tests/test_workspace.py index 1428449f..f349e224 100644 --- a/tests/test_workspace.py +++ b/tests/test_workspace.py @@ -1,8 +1,7 @@ import datetime as dt import shutil -import uuid +import tempfile from pathlib import Path -from typing import List from pystac import Asset, Collection, Extent, Item, SpatialExtent, TemporalExtent, CatalogType import pytest @@ -55,13 +54,9 @@ def test_disk_workspace_remove_original(tmp_path, remove_original): def test_merge_from_disk_new(tmp_path): - source_directory = tmp_path / "src" - source_directory.mkdir() - - asset_file = source_directory / "asset.tif" - asset_file.touch() - - new_stac_collection = _collection("collection", [asset_file]) + new_stac_collection = _collection( + root_path=tmp_path / "src" / "collection", collection_id="collection", asset_filename="asset.tif" + ) target = Path("path") / "to" / "collection.json" @@ -72,31 +67,28 @@ def test_merge_from_disk_new(tmp_path): merged_stac_collection = Collection.from_file(str(workspace_dir / "collection.json")) assert merged_stac_collection.validate_all() == 1 + assert _downloadable_assets(merged_stac_collection) == 1 + # TODO: check Collection - assert _downloadable_assets(merged_stac_collection, target_dir=tmp_path) == 1 + for item in merged_stac_collection.get_items(): + for asset in item.get_assets().values(): + assert Path(item.get_self_href()).parent == Path(asset.get_absolute_href()).parent def test_merge_from_disk_into_existing(tmp_path): - source_directory = tmp_path / "src" - source_directory.mkdir() - - asset_file1 = source_directory / "asset1.tif" - asset_file1.touch() - - asset_file2 = source_directory / "asset2.tif" - asset_file2.touch() - existing_stac_collection = _collection( - "existing_collection", - [asset_file1], + root_path=tmp_path / "src" / "existing_collection", + collection_id="existing_collection", + asset_filename="asset1.tif", spatial_extent=SpatialExtent([[0, 50, 2, 52]]), temporal_extent=TemporalExtent([[dt.datetime.fromisoformat("2024-11-01T00:00:00+00:00"), dt.datetime.fromisoformat("2024-11-03T00:00:00+00:00")]]) ) new_stac_collection = _collection( - "new_collection", - [asset_file2], + root_path=tmp_path / "src" / "new_collection", + collection_id="new_collection", + asset_filename="asset2.tif", spatial_extent=SpatialExtent([[1, 51, 3, 53]]), temporal_extent=TemporalExtent([[dt.datetime.fromisoformat("2024-11-02T00:00:00+00:00"), dt.datetime.fromisoformat("2024-11-04T00:00:00+00:00")]]) @@ -112,8 +104,7 @@ def test_merge_from_disk_into_existing(tmp_path): merged_stac_collection = Collection.from_file(str(workspace_dir / "collection.json")) assert merged_stac_collection.validate_all() == 2 - - assert _downloadable_assets(merged_stac_collection, target_dir=tmp_path) == 2 + assert _downloadable_assets(merged_stac_collection) == 2 assert merged_stac_collection.extent.spatial.bboxes == [[0, 50, 3, 53]] assert merged_stac_collection.extent.temporal.intervals == [ @@ -121,39 +112,50 @@ def test_merge_from_disk_into_existing(tmp_path): dt.datetime.fromisoformat("2024-11-04T00:00:00+00:00")] ] + for item in merged_stac_collection.get_items(): + for asset in item.get_assets().values(): + assert Path(item.get_self_href()).parent == Path(asset.get_absolute_href()).parent -def _collection(collection_id: str, - asset_files: List[Path], - spatial_extent: SpatialExtent = SpatialExtent([[-180, -90, 180, 90]]), - temporal_extent: TemporalExtent = TemporalExtent([[None, None]])) -> Collection: + +def _collection( + root_path: Path, + collection_id: str, + asset_filename: str, + spatial_extent: SpatialExtent = SpatialExtent([[-180, -90, 180, 90]]), + temporal_extent: TemporalExtent = TemporalExtent([[None, None]]), +) -> Collection: collection = Collection( id=collection_id, description=collection_id, - extent=Extent(spatial=spatial_extent, temporal=temporal_extent), - catalog_type=CatalogType.ABSOLUTE_PUBLISHED, + extent=Extent(spatial_extent, temporal_extent), ) - for asset_file in asset_files: - item_id = asset_key = asset_file.name + item = Item(id=asset_filename, geometry=None, bbox=None, datetime=dt.datetime.utcnow(), properties={}) + + asset_path = root_path / item.id / asset_filename + asset = Asset(href=asset_path.name) # relative to item - item = Item(id=item_id, geometry=None, bbox=None, datetime=dt.datetime.utcnow(), properties={}) - item.add_asset(key=asset_key, asset=Asset(href=str(asset_file))) + item.add_asset(key=asset_filename, asset=asset) + collection.add_item(item) - collection.add_item(item) + collection.normalize_hrefs(root_href=str(root_path)) + collection.save(CatalogType.SELF_CONTAINED) + + with open(asset_path, "w") as f: + f.write(f"{asset_filename}\n") - collection.normalize_hrefs(".") assert collection.validate_all() == 1 + assert _downloadable_assets(collection) == 1 return collection -def _downloadable_assets(collection: Collection, target_dir: Path) -> int: +def _downloadable_assets(collection: Collection) -> int: assets = [asset for item in collection.get_items(recursive=True) for asset in item.get_assets().values()] for asset in assets: - target_path = target_dir / str(uuid.uuid4()) - shutil.copy(asset.get_absolute_href(), target_path) # "download" the asset without altering its href - target_path.unlink() + with tempfile.NamedTemporaryFile(mode="wb") as temp_file: + shutil.copy(asset.get_absolute_href(), temp_file.name) # "download" the asset without altering its href return len(assets) @@ -161,37 +163,8 @@ def _downloadable_assets(collection: Collection, target_dir: Path) -> int: def test_create_and_export_collection(tmp_path): # tmp_path = Path("/tmp/test_create_and_export_collection") - tmp_dir = tmp_path / "tmp" - tmp_dir.mkdir() - - def create_collection(root_path: Path, collection_id: str, asset_filename: str) -> Collection: - collection = Collection( - id=collection_id, - description=collection_id, - extent=Extent(SpatialExtent([[-180, -90, 180, 90]]), TemporalExtent([[None, None]])), - ) - - item = Item(id=asset_filename, geometry=None, bbox=None, datetime=dt.datetime.utcnow(), properties={}) - - asset_path = root_path / item.id / asset_filename - asset = Asset(href=asset_path.name) # relative to item - - item.add_asset(key=asset_filename, asset=asset) - collection.add_item(item) - - collection.normalize_hrefs(root_href=str(root_path)) - collection.save(CatalogType.SELF_CONTAINED) - - with open(asset_path, "w") as f: - f.write(f"{asset_filename}\n") - - assert collection.validate_all() == 1 - assert _downloadable_assets(collection, tmp_dir) == 1 - - return collection - # write collection1 - collection1 = create_collection( + collection1 = _collection( root_path=tmp_path / "src" / "collection1", collection_id="collection1", asset_filename="asset1.tif" ) @@ -199,7 +172,7 @@ def create_collection(root_path: Path, collection_id: str, asset_filename: str) exported_collection = collection1.full_copy() merge = tmp_path / "dst" / "merged-collection.json" - def collection_func(col: Collection, parent_dir: str, is_root: bool) -> str: + def collection_func(_: Collection, parent_dir: str, is_root: bool) -> str: assert is_root return str(Path(parent_dir) / merge.name) @@ -221,7 +194,7 @@ def replace_asset_href(asset_key: str, asset: Asset) -> Asset: shutil.copy(asset.extra_fields["_original_absolute_href"], Path(item.get_self_href()).parent) # write collection2 - collection2 = create_collection( + collection2 = _collection( root_path=tmp_path / "src" / "collection2", collection_id="collection2", asset_filename="asset2.tif" ) @@ -260,4 +233,4 @@ def replace_asset_href(asset_key: str, asset: Asset) -> Asset: for asset in item.get_assets().values(): assert Path(item.get_self_href()).parent == Path(asset.get_absolute_href()).parent - assert _downloadable_assets(merged_collection, tmp_dir) == 2 + assert _downloadable_assets(merged_collection) == 2 From e90d279415200074b98c2bdfc053e9082bac0701 Mon Sep 17 00:00:00 2001 From: Jan Van den bosch Date: Fri, 22 Nov 2024 14:32:24 +0100 Subject: [PATCH 13/19] restore alternate workspace URIs https://github.com/Open-EO/openeo-geopyspark-driver/issues/677 --- openeo_driver/workspace.py | 9 ++++-- tests/test_workspace.py | 61 +++++++++++++++++++++++++------------- 2 files changed, 48 insertions(+), 22 deletions(-) diff --git a/openeo_driver/workspace.py b/openeo_driver/workspace.py index 82253ff8..31bb1bac 100644 --- a/openeo_driver/workspace.py +++ b/openeo_driver/workspace.py @@ -99,7 +99,7 @@ def replace_asset_href(asset_key: str, asset: pystac.Asset) -> pystac.Asset: asset.extra_fields["_original_absolute_href"], str(Path(item.get_self_href()).parent) ) - return new_collection + merged_collection = new_collection else: merged_collection = self._merge_collection_metadata(existing_collection, new_collection) new_collection = new_collection.map_assets(replace_asset_href) @@ -121,7 +121,12 @@ def replace_asset_href(asset_key: str, asset: pystac.Asset) -> pystac.Asset: asset.extra_fields["_original_absolute_href"], Path(new_item.get_self_href()).parent ) - return merged_collection + for item in merged_collection.get_items(): + for asset in item.assets.values(): + workspace_uri = f"file:{Path(item.get_self_href()).parent / Path(asset.href).name}" + asset.extra_fields["alternate"] = {"file": workspace_uri} + + return merged_collection else: raise NotImplementedError(stac_resource) diff --git a/tests/test_workspace.py b/tests/test_workspace.py index f349e224..0b6a7c18 100644 --- a/tests/test_workspace.py +++ b/tests/test_workspace.py @@ -54,30 +54,40 @@ def test_disk_workspace_remove_original(tmp_path, remove_original): def test_merge_from_disk_new(tmp_path): - new_stac_collection = _collection( + new_collection = _collection( root_path=tmp_path / "src" / "collection", collection_id="collection", asset_filename="asset.tif" ) target = Path("path") / "to" / "collection.json" workspace = DiskWorkspace(root_directory=tmp_path) - workspace.merge_files(stac_resource=new_stac_collection, target=target) - + merged_collection = workspace.merge_files(stac_resource=new_collection, target=target) + + assert isinstance(merged_collection, Collection) + asset_workspace_uris = { + asset_key: asset.extra_fields["alternate"]["file"] + for item in merged_collection.get_items() + for asset_key, asset in item.get_assets().items() + } + assert asset_workspace_uris == { + "asset.tif": f"file:{workspace.root_directory / 'path' / 'to' / 'asset.tif' / 'asset.tif'}" + } + + # load it again workspace_dir = (workspace.root_directory / target).parent - - merged_stac_collection = Collection.from_file(str(workspace_dir / "collection.json")) - assert merged_stac_collection.validate_all() == 1 - assert _downloadable_assets(merged_stac_collection) == 1 + exported_collection = Collection.from_file(str(workspace_dir / "collection.json")) + assert exported_collection.validate_all() == 1 + assert _downloadable_assets(exported_collection) == 1 # TODO: check Collection - for item in merged_stac_collection.get_items(): + for item in exported_collection.get_items(): for asset in item.get_assets().values(): assert Path(item.get_self_href()).parent == Path(asset.get_absolute_href()).parent def test_merge_from_disk_into_existing(tmp_path): - existing_stac_collection = _collection( + existing_collection = _collection( root_path=tmp_path / "src" / "existing_collection", collection_id="existing_collection", asset_filename="asset1.tif", @@ -85,7 +95,7 @@ def test_merge_from_disk_into_existing(tmp_path): temporal_extent=TemporalExtent([[dt.datetime.fromisoformat("2024-11-01T00:00:00+00:00"), dt.datetime.fromisoformat("2024-11-03T00:00:00+00:00")]]) ) - new_stac_collection = _collection( + new_collection = _collection( root_path=tmp_path / "src" / "new_collection", collection_id="new_collection", asset_filename="asset2.tif", @@ -97,22 +107,33 @@ def test_merge_from_disk_into_existing(tmp_path): target = Path("path") / "to" / "collection.json" workspace = DiskWorkspace(root_directory=tmp_path) - workspace.merge_files(stac_resource=existing_stac_collection, target=target) - workspace.merge_files(stac_resource=new_stac_collection, target=target) - + workspace.merge_files(stac_resource=existing_collection, target=target) + merged_collection = workspace.merge_files(stac_resource=new_collection, target=target) + + assert isinstance(merged_collection, Collection) + asset_workspace_uris = { + asset_key: asset.extra_fields["alternate"]["file"] + for item in merged_collection.get_items() + for asset_key, asset in item.get_assets().items() + } + assert asset_workspace_uris == { + "asset1.tif": f"file:{workspace.root_directory / 'path' / 'to' / 'asset1.tif' / 'asset1.tif'}", + "asset2.tif": f"file:{workspace.root_directory / 'path' / 'to' / 'asset2.tif' / 'asset2.tif'}", + } + + # load it again workspace_dir = (workspace.root_directory / target).parent + exported_collection = Collection.from_file(str(workspace_dir / "collection.json")) + assert exported_collection.validate_all() == 2 + assert _downloadable_assets(exported_collection) == 2 - merged_stac_collection = Collection.from_file(str(workspace_dir / "collection.json")) - assert merged_stac_collection.validate_all() == 2 - assert _downloadable_assets(merged_stac_collection) == 2 - - assert merged_stac_collection.extent.spatial.bboxes == [[0, 50, 3, 53]] - assert merged_stac_collection.extent.temporal.intervals == [ + assert exported_collection.extent.spatial.bboxes == [[0, 50, 3, 53]] + assert exported_collection.extent.temporal.intervals == [ [dt.datetime.fromisoformat("2024-11-01T00:00:00+00:00"), dt.datetime.fromisoformat("2024-11-04T00:00:00+00:00")] ] - for item in merged_stac_collection.get_items(): + for item in exported_collection.get_items(): for asset in item.get_assets().values(): assert Path(item.get_self_href()).parent == Path(asset.get_absolute_href()).parent From dd9c3544cb1529a1532cc8e22dd1a78f462fb792 Mon Sep 17 00:00:00 2001 From: Jan Van den bosch Date: Fri, 22 Nov 2024 15:16:00 +0100 Subject: [PATCH 14/19] merges a STAC object, not individual files/objects https://github.com/Open-EO/openeo-geopyspark-driver/issues/677 --- openeo_driver/workspace.py | 10 +++++++--- tests/test_workspace.py | 6 +++--- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/openeo_driver/workspace.py b/openeo_driver/workspace.py index 31bb1bac..dbb32cff 100644 --- a/openeo_driver/workspace.py +++ b/openeo_driver/workspace.py @@ -4,11 +4,12 @@ import shutil from pathlib import Path, PurePath from typing import Optional +from urllib.parse import urlparse import pystac from pystac import Collection, STACObject, SpatialExtent, TemporalExtent from pystac.catalog import CatalogType -from pystac.layout import TemplateLayoutStrategy, HrefLayoutStrategy, CustomLayoutStrategy +from pystac.layout import HrefLayoutStrategy, CustomLayoutStrategy _log = logging.getLogger(__name__) @@ -23,7 +24,7 @@ def import_object(self, s3_uri: str, merge: str, remove_original: bool = False): raise NotImplementedError @abc.abstractmethod - def merge_files(self, stac_resource: STACObject, target: PurePath, remove_original: bool = False) -> STACObject: + def merge(self, stac_resource: STACObject, target: PurePath, remove_original: bool = False) -> STACObject: # TODO: is a PurePath object fine as an abstraction? raise NotImplementedError @@ -52,7 +53,7 @@ def import_file(self, file: Path, merge: str, remove_original: bool = False) -> def import_object(self, s3_uri: str, merge: str, remove_original: bool = False): raise NotImplementedError(f"importing objects is not supported yet") - def merge_files(self, stac_resource: STACObject, target: PurePath, remove_original: bool = False) -> STACObject: + def merge(self, stac_resource: STACObject, target: PurePath, remove_original: bool = False) -> STACObject: stac_resource = stac_resource.full_copy() target = os.path.normpath(target) @@ -81,6 +82,9 @@ def collection_func(_: Collection, parent_dir: str, is_root: bool) -> str: return CustomLayoutStrategy(collection_func=collection_func) def replace_asset_href(asset_key: str, asset: pystac.Asset) -> pystac.Asset: + if urlparse(asset.href).scheme not in ["", "file"]: # TODO: convenient place; move elsewhere? + raise NotImplementedError(f"importing objects is not supported yet") + # TODO: crummy way to export assets after STAC Collection has been written to disk with new asset hrefs; # it ends up in the asset metadata on disk asset.extra_fields["_original_absolute_href"] = asset.get_absolute_href() diff --git a/tests/test_workspace.py b/tests/test_workspace.py index 0b6a7c18..6af65809 100644 --- a/tests/test_workspace.py +++ b/tests/test_workspace.py @@ -61,7 +61,7 @@ def test_merge_from_disk_new(tmp_path): target = Path("path") / "to" / "collection.json" workspace = DiskWorkspace(root_directory=tmp_path) - merged_collection = workspace.merge_files(stac_resource=new_collection, target=target) + merged_collection = workspace.merge(stac_resource=new_collection, target=target) assert isinstance(merged_collection, Collection) asset_workspace_uris = { @@ -107,8 +107,8 @@ def test_merge_from_disk_into_existing(tmp_path): target = Path("path") / "to" / "collection.json" workspace = DiskWorkspace(root_directory=tmp_path) - workspace.merge_files(stac_resource=existing_collection, target=target) - merged_collection = workspace.merge_files(stac_resource=new_collection, target=target) + workspace.merge(stac_resource=existing_collection, target=target) + merged_collection = workspace.merge(stac_resource=new_collection, target=target) assert isinstance(merged_collection, Collection) asset_workspace_uris = { From 73d5094967dbebdbddb7c2e61fc6e81c47ce7287 Mon Sep 17 00:00:00 2001 From: Jan Van den bosch Date: Thu, 28 Nov 2024 09:13:01 +0100 Subject: [PATCH 15/19] overwriting items is the default https://github.com/Open-EO/openeo-geopyspark-driver/issues/677 --- openeo_driver/workspace.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/openeo_driver/workspace.py b/openeo_driver/workspace.py index dbb32cff..2485b8fd 100644 --- a/openeo_driver/workspace.py +++ b/openeo_driver/workspace.py @@ -109,10 +109,6 @@ def replace_asset_href(asset_key: str, asset: pystac.Asset) -> pystac.Asset: new_collection = new_collection.map_assets(replace_asset_href) for new_item in new_collection.get_items(): - if existing_collection.get_item(new_item.id): - # TODO: how to treat duplicate items? - raise ValueError(f"item {new_item.id} is already in collection {existing_collection.id}") - new_item.clear_links() # sever ties with previous collection merged_collection.add_item(new_item) From 7768eb286d6a9fa1eed031d6bb907e8e666b8b7a Mon Sep 17 00:00:00 2001 From: Jan Van den bosch Date: Thu, 28 Nov 2024 09:18:18 +0100 Subject: [PATCH 16/19] cleanup https://github.com/Open-EO/openeo-geopyspark-driver/issues/677 --- openeo_driver/workspace.py | 6 +++--- tests/test_workspace.py | 3 +-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/openeo_driver/workspace.py b/openeo_driver/workspace.py index 2485b8fd..d33d736d 100644 --- a/openeo_driver/workspace.py +++ b/openeo_driver/workspace.py @@ -97,10 +97,10 @@ def replace_asset_href(asset_key: str, asset: pystac.Asset) -> pystac.Asset: new_collection = new_collection.map_assets(replace_asset_href) new_collection.save(CatalogType.SELF_CONTAINED) - for item in new_collection.get_items(): - for asset in item.get_assets().values(): + for new_item in new_collection.get_items(): + for asset in new_item.get_assets().values(): file_operation( - asset.extra_fields["_original_absolute_href"], str(Path(item.get_self_href()).parent) + asset.extra_fields["_original_absolute_href"], str(Path(new_item.get_self_href()).parent) ) merged_collection = new_collection diff --git a/tests/test_workspace.py b/tests/test_workspace.py index 6af65809..6117d90b 100644 --- a/tests/test_workspace.py +++ b/tests/test_workspace.py @@ -159,8 +159,7 @@ def _collection( item.add_asset(key=asset_filename, asset=asset) collection.add_item(item) - collection.normalize_hrefs(root_href=str(root_path)) - collection.save(CatalogType.SELF_CONTAINED) + collection.normalize_and_save(root_href=str(root_path), catalog_type=CatalogType.SELF_CONTAINED) with open(asset_path, "w") as f: f.write(f"{asset_filename}\n") From 6982cab59d236a26bc41a07f1d7b0ee9c04b8bbd Mon Sep 17 00:00:00 2001 From: Jan Van den bosch Date: Thu, 28 Nov 2024 10:03:46 +0100 Subject: [PATCH 17/19] allow for reuse by ObjectStorageWorkspace#merge https://github.com/Open-EO/openeo-geopyspark-driver/issues/677 --- openeo_driver/workspace.py | 91 ++++++++++++++++++++------------------ 1 file changed, 47 insertions(+), 44 deletions(-) diff --git a/openeo_driver/workspace.py b/openeo_driver/workspace.py index d33d736d..a267ad2f 100644 --- a/openeo_driver/workspace.py +++ b/openeo_driver/workspace.py @@ -105,7 +105,7 @@ def replace_asset_href(asset_key: str, asset: pystac.Asset) -> pystac.Asset: merged_collection = new_collection else: - merged_collection = self._merge_collection_metadata(existing_collection, new_collection) + merged_collection = _merge_collection_metadata(existing_collection, new_collection) new_collection = new_collection.map_assets(replace_asset_href) for new_item in new_collection.get_items(): @@ -130,63 +130,66 @@ def replace_asset_href(asset_key: str, asset: pystac.Asset) -> pystac.Asset: else: raise NotImplementedError(stac_resource) - def _merge_collection_metadata(self, existing_collection: Collection, new_collection: Collection) -> Collection: - existing_collection.extent.spatial = self._merge_spatial_extents( - existing_collection.extent.spatial, new_collection.extent.spatial - ) - existing_collection.extent.temporal = self._merge_temporal_extents( - existing_collection.extent.temporal, new_collection.extent.temporal - ) +def _merge_collection_metadata(existing_collection: Collection, new_collection: Collection) -> Collection: + existing_collection.extent.spatial = _merge_spatial_extents( + existing_collection.extent.spatial, new_collection.extent.spatial + ) - # TODO: merge additional metadata? + existing_collection.extent.temporal = _merge_temporal_extents( + existing_collection.extent.temporal, new_collection.extent.temporal + ) - return existing_collection + # TODO: merge additional metadata? - def _merge_spatial_extents(self, a: SpatialExtent, b: SpatialExtent) -> SpatialExtent: - overall_bbox_a, *sub_bboxes_a = a.bboxes - overall_bbox_b, *sub_bboxes_b = b.bboxes + return existing_collection - merged_overall_bbox = [ - min(overall_bbox_a[0], overall_bbox_b[0]), - min(overall_bbox_a[1], overall_bbox_b[1]), - max(overall_bbox_a[2], overall_bbox_b[2]), - max(overall_bbox_a[3], overall_bbox_b[3]) - ] - merged_sub_bboxes = sub_bboxes_a + sub_bboxes_b +def _merge_spatial_extents(a: SpatialExtent, b: SpatialExtent) -> SpatialExtent: + overall_bbox_a, *sub_bboxes_a = a.bboxes + overall_bbox_b, *sub_bboxes_b = b.bboxes - merged_spatial_extent = SpatialExtent([merged_overall_bbox]) - if merged_sub_bboxes: - merged_spatial_extent.bboxes.append(merged_sub_bboxes) + merged_overall_bbox = [ + min(overall_bbox_a[0], overall_bbox_b[0]), + min(overall_bbox_a[1], overall_bbox_b[1]), + max(overall_bbox_a[2], overall_bbox_b[2]), + max(overall_bbox_a[3], overall_bbox_b[3]) + ] - return merged_spatial_extent + merged_sub_bboxes = sub_bboxes_a + sub_bboxes_b - def _merge_temporal_extents(self, a: TemporalExtent, b: TemporalExtent) -> TemporalExtent: - overall_interval_a, *sub_intervals_a = a.intervals - overall_interval_b, *sub_intervals_b = b.intervals + merged_spatial_extent = SpatialExtent([merged_overall_bbox]) + if merged_sub_bboxes: + merged_spatial_extent.bboxes.append(merged_sub_bboxes) - def min_time(t1: Optional[str], t2: Optional[str]) -> Optional[str]: - if t1 is None or t2 is None: - return None + return merged_spatial_extent - return min(t1, t2) - def max_time(t1: Optional[str], t2: Optional[str]) -> Optional[str]: - if t1 is None or t2 is None: - return None +def _merge_temporal_extents(a: TemporalExtent, b: TemporalExtent) -> TemporalExtent: + overall_interval_a, *sub_intervals_a = a.intervals + overall_interval_b, *sub_intervals_b = b.intervals - return max(t1, t2) + def min_time(t1: Optional[str], t2: Optional[str]) -> Optional[str]: + if t1 is None or t2 is None: + return None - merged_overall_interval = [ - min_time(overall_interval_a[0], overall_interval_b[0]), - max_time(overall_interval_a[1], overall_interval_b[1]) - ] + return min(t1, t2) - merged_sub_intervals = sub_intervals_a + sub_intervals_b + def max_time(t1: Optional[str], t2: Optional[str]) -> Optional[str]: + if t1 is None or t2 is None: + return None - merged_temporal_extent = TemporalExtent([merged_overall_interval]) - if merged_sub_intervals: - merged_temporal_extent.intervals.append(merged_sub_intervals) + return max(t1, t2) - return merged_temporal_extent + merged_overall_interval = [ + min_time(overall_interval_a[0], overall_interval_b[0]), + max_time(overall_interval_a[1], overall_interval_b[1]) + ] + + merged_sub_intervals = sub_intervals_a + sub_intervals_b + + merged_temporal_extent = TemporalExtent([merged_overall_interval]) + if merged_sub_intervals: + merged_temporal_extent.intervals.append(merged_sub_intervals) + + return merged_temporal_extent From 5f875c443450f8bf385c23ee436ef1c990d19ee2 Mon Sep 17 00:00:00 2001 From: Jan Van den bosch Date: Thu, 28 Nov 2024 10:08:51 +0100 Subject: [PATCH 18/19] adapt CHANGELOG https://github.com/Open-EO/openeo-geopyspark-driver/issues/677 --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9d05366e..2d373fca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ and start a new "In Progress" section above it. ## In progress +- `export_workspace`: support STAC merge ([Open-EO/openeo-geopyspark-driver#677](https://github.com/Open-EO/openeo-geopyspark-driver/issues/677)) # 0.116.0 - Propagate alternate `href`s of job result assets ([Open-EO/openeo-geopyspark-driver#883](https://github.com/Open-EO/openeo-geopyspark-driver/issues/883)) From 08ebd0f04149de6598f362e39fbc46cca85b2a32 Mon Sep 17 00:00:00 2001 From: Jan Van den bosch Date: Thu, 28 Nov 2024 16:56:39 +0100 Subject: [PATCH 19/19] retain old merge behavior for existing methods https://github.com/Open-EO/openeo-geopyspark-driver/issues/677 --- openeo_driver/workspace.py | 1 + tests/test_workspace.py | 19 ++++++++----------- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/openeo_driver/workspace.py b/openeo_driver/workspace.py index 92a08338..30d82c78 100644 --- a/openeo_driver/workspace.py +++ b/openeo_driver/workspace.py @@ -26,6 +26,7 @@ def import_object(self, common_path: str, s3_uri: str, merge: str, remove_origin @abc.abstractmethod def merge(self, stac_resource: STACObject, target: PurePath, remove_original: bool = False) -> STACObject: + # FIXME: replicate subdirectory behavior (#877) # TODO: is a PurePath object fine as an abstraction? raise NotImplementedError diff --git a/tests/test_workspace.py b/tests/test_workspace.py index aeb1ca45..ac4609d7 100644 --- a/tests/test_workspace.py +++ b/tests/test_workspace.py @@ -10,23 +10,20 @@ from openeo_driver.workspace import DiskWorkspace -@pytest.mark.parametrize( - "merge", - [ - "subdirectory/collection.json", - "/subdirectory/collection.json", - "path/to/subdirectory/collection.json", - "/path/to/subdirectory/collection.json", - "collection.json", - ], -) +@pytest.mark.parametrize("merge", [ + "subdirectory", + "/subdirectory", + "path/to/subdirectory", + "/path/to/subdirectory", + ".", +]) def test_disk_workspace(tmp_path, merge): source_directory = tmp_path / "src" source_directory.mkdir() source_file = source_directory / "file" source_file.touch() - subdirectory = Path(merge[1:] if merge.startswith("/") else merge).parent + subdirectory = merge[1:] if merge.startswith("/") else merge target_directory = tmp_path / subdirectory workspace = DiskWorkspace(root_directory=tmp_path)