Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support merging into DiskWorkspace #329

Merged
merged 24 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
a217a30
merge points to an actual STAC resource e.g. a Collection file
bossie Nov 12, 2024
8fdada4
merging into an existing collection requires STAC-aware workspace
bossie Nov 13, 2024
d614510
merge collections
bossie Nov 14, 2024
866a73d
support remove_original
bossie Nov 19, 2024
7afedcf
fix tests
bossie Nov 20, 2024
101d8be
fix implementation
bossie Nov 21, 2024
76b4faa
use tmp_path
bossie Nov 21, 2024
afee6b5
add asset with absolute href
bossie Nov 21, 2024
982f159
relative href for asset
bossie Nov 21, 2024
074cd98
copy exported assets to workspace
bossie Nov 21, 2024
d47c10e
point asset hrefs to workspace
bossie Nov 21, 2024
9e93496
incorporate new implementation
bossie Nov 22, 2024
e90d279
restore alternate workspace URIs
bossie Nov 22, 2024
dd9c354
merges a STAC object, not individual files/objects
bossie Nov 22, 2024
73d5094
overwriting items is the default
bossie Nov 28, 2024
7768eb2
cleanup
bossie Nov 28, 2024
6982cab
allow for reuse by ObjectStorageWorkspace#merge
bossie Nov 28, 2024
5f875c4
adapt CHANGELOG
bossie Nov 28, 2024
57fb83c
Merge branch 'master' into 677-export_workspace-support-stac-merge
bossie Nov 28, 2024
08ebd0f
retain old merge behavior for existing methods
bossie Nov 28, 2024
22fcd35
add (disabled) test for interfering collections
bossie Dec 4, 2024
0993c6b
support filepath_per_band for DiskWorkspace
bossie Dec 6, 2024
2049e7e
Merge branch 'master' into 677-export_workspace-support-stac-merge
bossie Dec 6, 2024
be577e1
address review remarks
bossie Dec 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ and start a new "In Progress" section above it.
<!-- start-of-changelog -->

## In progress
- `export_workspace`: support STAC merge ([Open-EO/openeo-geopyspark-driver#677](https://github.com/Open-EO/openeo-geopyspark-driver/issues/677))
bossie marked this conversation as resolved.
Show resolved Hide resolved


## 0.120.0
Expand Down
170 changes: 166 additions & 4 deletions openeo_driver/workspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,39 @@
import logging
import os.path
import shutil
from pathlib import Path
from typing import Union
from pathlib import Path, PurePath
from typing import Optional, Union
from urllib.parse import urlparse

from openeo_driver.utils import remove_slash_prefix
import pystac
from pystac import Collection, STACObject, SpatialExtent, TemporalExtent, Item
from pystac.catalog import CatalogType
from pystac.layout import HrefLayoutStrategy, CustomLayoutStrategy

_log = logging.getLogger(__name__)


class Workspace(abc.ABC):
@abc.abstractmethod
def import_file(self, common_path: str, file: Path, merge: str, remove_original: bool = False) -> str:
def import_file(self, common_path: Union[str, Path], file: Path, merge: str, remove_original: bool = False) -> str:
raise NotImplementedError

@abc.abstractmethod
def import_object(
self, common_path: Union[str, Path], s3_uri: str, merge: str, remove_original: bool = False
) -> str:
raise NotImplementedError

@abc.abstractmethod
def import_object(self, common_path: str, s3_uri: str, merge: str, remove_original: bool = False) -> str:
def merge(self, stac_resource: STACObject, target: PurePath, remove_original: bool = False) -> STACObject:
"""
Merges a STAC resource, its children and their assets into this workspace at the given path,
possibly removing the original assets.
:param stac_resource: a STAC resource, typically a Collection
:param target: a path identifier to a STAC resource to merge the given STAC resource into
:param remove_original: remove the original assets?
"""
raise NotImplementedError


Expand All @@ -42,3 +60,147 @@ def import_file(self, common_path: Union[str, Path], file: Path, merge: str, rem

def import_object(self, common_path: str, s3_uri: str, merge: str, remove_original: bool = False):
raise NotImplementedError(f"importing objects is not supported yet")

def merge(self, stac_resource: STACObject, target: PurePath, remove_original: bool = False) -> STACObject:
stac_resource = stac_resource.full_copy()

target = os.path.normpath(target)
target = Path(target[1:] if target.startswith("/") else target)
bossie marked this conversation as resolved.
Show resolved Hide resolved
target = self.root_directory / target
target.relative_to(self.root_directory) # assert target_directory is in root_directory

file_operation = shutil.move if remove_original else shutil.copy

if isinstance(stac_resource, Collection):
new_collection = stac_resource

existing_collection = None
try:
existing_collection = pystac.Collection.from_file(str(target))
except FileNotFoundError:
pass # nothing to merge into

def href_layout_strategy() -> HrefLayoutStrategy:
def collection_func(_: Collection, parent_dir: str, is_root: bool) -> str:
if not is_root:
raise NotImplementedError("nested collections")
# make the collection file end up at $target, not at $target/collection.json
return str(Path(parent_dir) / target.name)

def item_func(item: Item, parent_dir: str) -> str:
return f"{parent_dir}/{target.name}_items/{item.id}.json"
bossie marked this conversation as resolved.
Show resolved Hide resolved

return CustomLayoutStrategy(collection_func=collection_func, item_func=item_func)

def replace_asset_href(asset_key: str, asset: pystac.Asset) -> pystac.Asset:
if urlparse(asset.href).scheme not in ["", "file"]: # TODO: convenient place; move elsewhere?
raise NotImplementedError(f"importing objects is not supported yet")
bossie marked this conversation as resolved.
Show resolved Hide resolved

# TODO: crummy way to export assets after STAC Collection has been written to disk with new asset hrefs;
# it ends up in the asset metadata on disk
asset.extra_fields["_original_absolute_href"] = asset.get_absolute_href()
asset.href = Path(asset_key).name # asset key matches the asset filename, becomes the relative path
return asset

if not existing_collection:
# TODO: write to a tempdir, then copy/move everything to $merge?
bossie marked this conversation as resolved.
Show resolved Hide resolved
new_collection.normalize_hrefs(root_href=str(target.parent), strategy=href_layout_strategy())
new_collection = new_collection.map_assets(replace_asset_href)
new_collection.save(CatalogType.SELF_CONTAINED)

for new_item in new_collection.get_items():
for asset in new_item.get_assets().values():
file_operation(
asset.extra_fields["_original_absolute_href"], str(Path(new_item.get_self_href()).parent)
)

merged_collection = new_collection
else:
merged_collection = _merge_collection_metadata(existing_collection, new_collection)
new_collection = new_collection.map_assets(replace_asset_href)

for new_item in new_collection.get_items():
new_item.clear_links() # sever ties with previous collection
merged_collection.add_item(new_item, strategy=href_layout_strategy())

merged_collection.normalize_hrefs(root_href=str(target.parent), strategy=href_layout_strategy())
merged_collection.save(CatalogType.SELF_CONTAINED)

for new_item in new_collection.get_items():
for asset in new_item.get_assets().values():
file_operation(
asset.extra_fields["_original_absolute_href"], Path(new_item.get_self_href()).parent
)

for item in merged_collection.get_items():
for asset in item.assets.values():
workspace_uri = f"file:{Path(item.get_self_href()).parent / Path(asset.href).name}"
asset.extra_fields["alternate"] = {"file": workspace_uri}

return merged_collection
else:
raise NotImplementedError(stac_resource)


def _merge_collection_metadata(existing_collection: Collection, new_collection: Collection) -> Collection:
existing_collection.extent.spatial = _merge_spatial_extents(
existing_collection.extent.spatial, new_collection.extent.spatial
)

existing_collection.extent.temporal = _merge_temporal_extents(
existing_collection.extent.temporal, new_collection.extent.temporal
)

# TODO: merge additional metadata?

return existing_collection


def _merge_spatial_extents(a: SpatialExtent, b: SpatialExtent) -> SpatialExtent:
overall_bbox_a, *sub_bboxes_a = a.bboxes
overall_bbox_b, *sub_bboxes_b = b.bboxes

merged_overall_bbox = [
min(overall_bbox_a[0], overall_bbox_b[0]),
min(overall_bbox_a[1], overall_bbox_b[1]),
max(overall_bbox_a[2], overall_bbox_b[2]),
max(overall_bbox_a[3], overall_bbox_b[3])
]

merged_sub_bboxes = sub_bboxes_a + sub_bboxes_b

merged_spatial_extent = SpatialExtent([merged_overall_bbox])
if merged_sub_bboxes:
merged_spatial_extent.bboxes.append(merged_sub_bboxes)

return merged_spatial_extent


def _merge_temporal_extents(a: TemporalExtent, b: TemporalExtent) -> TemporalExtent:
overall_interval_a, *sub_intervals_a = a.intervals
overall_interval_b, *sub_intervals_b = b.intervals

def min_time(t1: Optional[str], t2: Optional[str]) -> Optional[str]:
if t1 is None or t2 is None:
return None

return min(t1, t2)

def max_time(t1: Optional[str], t2: Optional[str]) -> Optional[str]:
if t1 is None or t2 is None:
return None

return max(t1, t2)

merged_overall_interval = [
min_time(overall_interval_a[0], overall_interval_b[0]),
max_time(overall_interval_a[1], overall_interval_b[1])
]

merged_sub_intervals = sub_intervals_a + sub_intervals_b

merged_temporal_extent = TemporalExtent([merged_overall_interval])
if merged_sub_intervals:
merged_temporal_extent.intervals.append(merged_sub_intervals)

return merged_temporal_extent
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
"reretry~=0.11.8",
"markdown>3.4",
"traceback-with-variables==2.0.4",
"pystac~=1.8.0",
],
extras_require={
"dev": tests_require,
Expand Down
Loading