diff --git a/CHANGELOG.md b/CHANGELOG.md index 17027b00..fa4d637f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,9 @@ and start a new "In Progress" section above it. ## In progress +## 0.121.0 + +- `export_workspace`: experimental support for merging STAC Collections ([Open-EO/openeo-geopyspark-driver#677)](https://github.com/Open-EO/openeo-geopyspark-driver/issues/677)) ## 0.120.0 diff --git a/openeo_driver/workspace.py b/openeo_driver/workspace.py index 95749e54..1558f2db 100644 --- a/openeo_driver/workspace.py +++ b/openeo_driver/workspace.py @@ -2,21 +2,38 @@ import logging import os.path import shutil -from pathlib import Path -from typing import Union +from pathlib import Path, PurePath +from typing import Optional, Union +from urllib.parse import urlparse from openeo_driver.utils import remove_slash_prefix +from pystac import Asset, Collection, STACObject, SpatialExtent, TemporalExtent, Item +from pystac.catalog import CatalogType +from pystac.layout import HrefLayoutStrategy, CustomLayoutStrategy _log = logging.getLogger(__name__) class Workspace(abc.ABC): @abc.abstractmethod - def import_file(self, common_path: str, file: Path, merge: str, remove_original: bool = False) -> str: + def import_file(self, common_path: Union[str, Path], file: Path, merge: str, remove_original: bool = False) -> str: + raise NotImplementedError + + @abc.abstractmethod + def import_object( + self, common_path: Union[str, Path], s3_uri: str, merge: str, remove_original: bool = False + ) -> str: raise NotImplementedError @abc.abstractmethod - def import_object(self, common_path: str, s3_uri: str, merge: str, remove_original: bool = False) -> str: + def merge(self, stac_resource: STACObject, target: PurePath, remove_original: bool = False) -> STACObject: + """ + Merges a STAC resource, its children and their assets into this workspace at the given path, + possibly removing the original assets. + :param stac_resource: a STAC resource, typically a Collection + :param target: a path identifier to a STAC resource to merge the given STAC resource into + :param remove_original: remove the original assets? + """ raise NotImplementedError @@ -26,8 +43,7 @@ def __init__(self, root_directory: Path): self.root_directory = root_directory def import_file(self, common_path: Union[str, Path], file: Path, merge: str, remove_original: bool = False) -> str: - merge = os.path.normpath(merge) - subdirectory = remove_slash_prefix(merge) + subdirectory = remove_slash_prefix(os.path.normpath(merge)) file_relative = file.relative_to(common_path) target_directory = self.root_directory / subdirectory / file_relative.parent target_directory.relative_to(self.root_directory) # assert target_directory is in root_directory @@ -42,3 +58,146 @@ def import_file(self, common_path: Union[str, Path], file: Path, merge: str, rem def import_object(self, common_path: str, s3_uri: str, merge: str, remove_original: bool = False): raise NotImplementedError(f"importing objects is not supported yet") + + def merge(self, stac_resource: STACObject, target: PurePath, remove_original: bool = False) -> STACObject: + stac_resource = stac_resource.full_copy() + + target = self.root_directory / os.path.normpath(target).lstrip("/") + target.relative_to(self.root_directory) # assert target_directory is in root_directory + + file_operation = shutil.move if remove_original else shutil.copy + + if isinstance(stac_resource, Collection): + new_collection = stac_resource + + existing_collection = None + try: + existing_collection = Collection.from_file(str(target)) + except FileNotFoundError: + pass # nothing to merge into + + def href_layout_strategy() -> HrefLayoutStrategy: + def collection_func(_: Collection, parent_dir: str, is_root: bool) -> str: + if not is_root: + raise NotImplementedError("nested collections") + # make the collection file end up at $target, not at $target/collection.json + return str(Path(parent_dir) / target.name) + + def item_func(item: Item, parent_dir: str) -> str: + # prevent items/assets of 2 adjacent Collection documents from interfering with each other: + # unlike an object storage object, a Collection file cannot act as a parent "directory" as well + return f"{parent_dir}/{target.name}_items/{item.id}.json" + + return CustomLayoutStrategy(collection_func=collection_func, item_func=item_func) + + def replace_asset_href(asset_key: str, asset: Asset) -> Asset: + if urlparse(asset.href).scheme not in ["", "file"]: # TODO: convenient place; move elsewhere? + raise NotImplementedError(f"only importing files on disk is supported, found: {asset.href}") + + # TODO: crummy way to export assets after STAC Collection has been written to disk with new asset hrefs; + # it ends up in the asset metadata on disk + asset.extra_fields["_original_absolute_href"] = asset.get_absolute_href() + asset.href = Path(asset_key).name # asset key matches the asset filename, becomes the relative path + return asset + + if not existing_collection: + new_collection.normalize_hrefs(root_href=str(target.parent), strategy=href_layout_strategy()) + new_collection = new_collection.map_assets(replace_asset_href) + new_collection.save(CatalogType.SELF_CONTAINED) + + for new_item in new_collection.get_items(): + for asset in new_item.get_assets().values(): + file_operation( + asset.extra_fields["_original_absolute_href"], str(Path(new_item.get_self_href()).parent) + ) + + merged_collection = new_collection + else: + merged_collection = _merge_collection_metadata(existing_collection, new_collection) + new_collection = new_collection.map_assets(replace_asset_href) + + for new_item in new_collection.get_items(): + new_item.clear_links() # sever ties with previous collection + merged_collection.add_item(new_item, strategy=href_layout_strategy()) + + merged_collection.normalize_hrefs(root_href=str(target.parent), strategy=href_layout_strategy()) + merged_collection.save(CatalogType.SELF_CONTAINED) + + for new_item in new_collection.get_items(): + for asset in new_item.get_assets().values(): + file_operation( + asset.extra_fields["_original_absolute_href"], Path(new_item.get_self_href()).parent + ) + + for item in merged_collection.get_items(): + for asset in item.assets.values(): + workspace_uri = f"file:{Path(item.get_self_href()).parent / Path(asset.href).name}" + asset.extra_fields["alternate"] = {"file": workspace_uri} + + return merged_collection + else: + raise NotImplementedError(stac_resource) + + +def _merge_collection_metadata(existing_collection: Collection, new_collection: Collection) -> Collection: + existing_collection.extent.spatial = _merge_spatial_extents( + existing_collection.extent.spatial, new_collection.extent.spatial + ) + + existing_collection.extent.temporal = _merge_temporal_extents( + existing_collection.extent.temporal, new_collection.extent.temporal + ) + + # TODO: merge additional metadata? + + return existing_collection + + +def _merge_spatial_extents(a: SpatialExtent, b: SpatialExtent) -> SpatialExtent: + overall_bbox_a, *sub_bboxes_a = a.bboxes + overall_bbox_b, *sub_bboxes_b = b.bboxes + + merged_overall_bbox = [ + min(overall_bbox_a[0], overall_bbox_b[0]), + min(overall_bbox_a[1], overall_bbox_b[1]), + max(overall_bbox_a[2], overall_bbox_b[2]), + max(overall_bbox_a[3], overall_bbox_b[3]) + ] + + merged_sub_bboxes = sub_bboxes_a + sub_bboxes_b + + merged_spatial_extent = SpatialExtent([merged_overall_bbox]) + if merged_sub_bboxes: + merged_spatial_extent.bboxes.append(merged_sub_bboxes) + + return merged_spatial_extent + + +def _merge_temporal_extents(a: TemporalExtent, b: TemporalExtent) -> TemporalExtent: + overall_interval_a, *sub_intervals_a = a.intervals + overall_interval_b, *sub_intervals_b = b.intervals + + def min_time(t1: Optional[str], t2: Optional[str]) -> Optional[str]: + if t1 is None or t2 is None: + return None + + return min(t1, t2) + + def max_time(t1: Optional[str], t2: Optional[str]) -> Optional[str]: + if t1 is None or t2 is None: + return None + + return max(t1, t2) + + merged_overall_interval = [ + min_time(overall_interval_a[0], overall_interval_b[0]), + max_time(overall_interval_a[1], overall_interval_b[1]) + ] + + merged_sub_intervals = sub_intervals_a + sub_intervals_b + + merged_temporal_extent = TemporalExtent([merged_overall_interval]) + if merged_sub_intervals: + merged_temporal_extent.intervals.append(merged_sub_intervals) + + return merged_temporal_extent diff --git a/setup.py b/setup.py index 8ee0ed64..e500ec4a 100644 --- a/setup.py +++ b/setup.py @@ -73,6 +73,7 @@ "reretry~=0.11.8", "markdown>3.4", "traceback-with-variables==2.0.4", + "pystac~=1.8.0", ], extras_require={ "dev": tests_require, diff --git a/tests/test_workspace.py b/tests/test_workspace.py index 20980804..0e8fd868 100644 --- a/tests/test_workspace.py +++ b/tests/test_workspace.py @@ -1,4 +1,11 @@ +import datetime as dt +import shutil +import tempfile +from pathlib import Path + +from pystac import Asset, Collection, Extent, Item, SpatialExtent, TemporalExtent, CatalogType import pytest +from pystac.layout import CustomLayoutStrategy from openeo_driver.workspace import DiskWorkspace @@ -41,3 +48,173 @@ def test_disk_workspace_remove_original(tmp_path, remove_original): assert (target_directory / source_file.name).exists() assert source_file.exists() != remove_original + + +def test_merge_from_disk_new(tmp_path): + new_collection = _collection( + root_path=tmp_path / "src" / "collection", collection_id="collection", asset_filename="asset.tif" + ) + + target = Path("path") / "to" / "collection.json" + + workspace = DiskWorkspace(root_directory=tmp_path) + merged_collection = workspace.merge(stac_resource=new_collection, target=target) + + assert isinstance(merged_collection, Collection) + asset_workspace_uris = { + asset_key: asset.extra_fields["alternate"]["file"] + for item in merged_collection.get_items() + for asset_key, asset in item.get_assets().items() + } + assert asset_workspace_uris == { + "asset.tif": f"file:{workspace.root_directory / 'path' / 'to' / 'collection.json_items' / 'asset.tif'}" + } + + # load it again + workspace_dir = (workspace.root_directory / target).parent + exported_collection = Collection.from_file(str(workspace_dir / "collection.json")) + assert exported_collection.validate_all() == 1 + assert _downloadable_assets(exported_collection) == 1 + + assert exported_collection.extent.spatial.bboxes == [[-180, -90, 180, 90]] + assert exported_collection.extent.temporal.intervals == [[None, None]] + + for item in exported_collection.get_items(): + for asset in item.get_assets().values(): + assert Path(item.get_self_href()).parent == Path(asset.get_absolute_href()).parent + + +def test_merge_from_disk_into_existing(tmp_path): + existing_collection = _collection( + root_path=tmp_path / "src" / "existing_collection", + collection_id="existing_collection", + asset_filename="asset1.tif", + spatial_extent=SpatialExtent([[0, 50, 2, 52]]), + temporal_extent=TemporalExtent([[dt.datetime.fromisoformat("2024-11-01T00:00:00+00:00"), + dt.datetime.fromisoformat("2024-11-03T00:00:00+00:00")]]) + ) + new_collection = _collection( + root_path=tmp_path / "src" / "new_collection", + collection_id="new_collection", + asset_filename="asset2.tif", + spatial_extent=SpatialExtent([[1, 51, 3, 53]]), + temporal_extent=TemporalExtent([[dt.datetime.fromisoformat("2024-11-02T00:00:00+00:00"), + dt.datetime.fromisoformat("2024-11-04T00:00:00+00:00")]]) + ) + + target = Path("path") / "to" / "collection.json" + + workspace = DiskWorkspace(root_directory=tmp_path) + workspace.merge(stac_resource=existing_collection, target=target) + merged_collection = workspace.merge(stac_resource=new_collection, target=target) + + assert isinstance(merged_collection, Collection) + asset_workspace_uris = { + asset_key: asset.extra_fields["alternate"]["file"] + for item in merged_collection.get_items() + for asset_key, asset in item.get_assets().items() + } + assert asset_workspace_uris == { + "asset1.tif": f"file:{workspace.root_directory / 'path' / 'to' / 'collection.json_items' / 'asset1.tif'}", + "asset2.tif": f"file:{workspace.root_directory / 'path' / 'to' / 'collection.json_items' / 'asset2.tif'}", + } + + # load it again + workspace_dir = (workspace.root_directory / target).parent + exported_collection = Collection.from_file(str(workspace_dir / "collection.json")) + assert exported_collection.validate_all() == 2 + assert _downloadable_assets(exported_collection) == 2 + + assert exported_collection.extent.spatial.bboxes == [[0, 50, 3, 53]] + assert exported_collection.extent.temporal.intervals == [ + [dt.datetime.fromisoformat("2024-11-01T00:00:00+00:00"), + dt.datetime.fromisoformat("2024-11-04T00:00:00+00:00")] + ] + + for item in exported_collection.get_items(): + for asset in item.get_assets().values(): + assert Path(item.get_self_href()).parent == Path(asset.get_absolute_href()).parent + + +def test_adjacent_collections_do_not_have_interfering_items_and_assets(tmp_path): + workspace = DiskWorkspace(root_directory=tmp_path) + + collection1 = _collection( + root_path=tmp_path / "src" / "collection1", + collection_id="collection1", + asset_filename="asset1.tif", + ) + + collection2 = _collection( + root_path=tmp_path / "src" / "collection2", + collection_id="collection2", + asset_filename="asset1.tif", # 2 distinct collections can have the same item IDs and assets + ) + + def asset_contents(collection_filename: str): + assets = [ + asset + for item in Collection.from_file(str(workspace.root_directory / collection_filename)).get_items() + for asset in item.get_assets().values() + ] + + assert len(assets) == 1 + + with open(assets[0].get_absolute_href()) as f: + return f.read() + + workspace.merge(collection1, target=Path("collection1.json")) + assert asset_contents(collection_filename="collection1.json") == "collection1-asset1.tif-asset1.tif\n" + + # put collection2 next to collection1 + workspace.merge(collection2, target=Path("collection2.json")) + assert asset_contents(collection_filename="collection2.json") == "collection2-asset1.tif-asset1.tif\n" + + # separate collection files + assert (workspace.root_directory / "collection1.json").exists() + assert (workspace.root_directory / "collection2.json").exists() + + # collection2 should not overwrite collection1's items/assets + assert asset_contents(collection_filename="collection1.json") == "collection1-asset1.tif-asset1.tif\n" + + +def _collection( + root_path: Path, + collection_id: str, + asset_filename: str, + spatial_extent: SpatialExtent = SpatialExtent([[-180, -90, 180, 90]]), + temporal_extent: TemporalExtent = TemporalExtent([[None, None]]), +) -> Collection: + collection = Collection( + id=collection_id, + description=collection_id, + extent=Extent(spatial_extent, temporal_extent), + ) + + item = Item(id=asset_filename, geometry=None, bbox=None, datetime=dt.datetime.utcnow(), properties={}) + + asset_path = root_path / item.id / asset_filename + asset = Asset(href=asset_path.name) # relative to item + + item.add_asset(key=asset_filename, asset=asset) + collection.add_item(item) + + collection.normalize_and_save(root_href=str(root_path), catalog_type=CatalogType.SELF_CONTAINED) + + with open(asset_path, "w") as f: + f.write(f"{collection_id}-{item.id}-{asset_filename}\n") + + assert collection.validate_all() == 1 + assert _downloadable_assets(collection) == 1 + + return collection + + +def _downloadable_assets(collection: Collection) -> int: + assets = [asset for item in collection.get_items(recursive=True) for asset in item.get_assets().values()] + + for asset in assets: + with tempfile.NamedTemporaryFile(mode="wb") as temp_file: + shutil.copy(asset.get_absolute_href(), temp_file.name) # "download" the asset without altering its href + + return len(assets)