Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support merging into DiskWorkspace #329

Draft
wants to merge 20 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ and start a new "In Progress" section above it.
<!-- start-of-changelog -->

## In progress
- `export_workspace`: support STAC merge ([Open-EO/openeo-geopyspark-driver#677](https://github.com/Open-EO/openeo-geopyspark-driver/issues/677))

- NDVI process: correctly handle band dimension as part of dry run

Expand Down
156 changes: 154 additions & 2 deletions openeo_driver/workspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,15 @@
import logging
import os.path
import shutil
from pathlib import Path
from typing import Union
from pathlib import Path, PurePath
from typing import Optional, Union
from urllib.parse import urlparse

from openeo_driver.utils import remove_slash_prefix
import pystac
from pystac import Collection, STACObject, SpatialExtent, TemporalExtent
from pystac.catalog import CatalogType
from pystac.layout import HrefLayoutStrategy, CustomLayoutStrategy

_log = logging.getLogger(__name__)

Expand All @@ -19,6 +24,12 @@ def import_file(self, common_path: str, file: Path, merge: str, remove_original:
def import_object(self, common_path: str, s3_uri: str, merge: str, remove_original: bool = False) -> str:
raise NotImplementedError

@abc.abstractmethod
def merge(self, stac_resource: STACObject, target: PurePath, remove_original: bool = False) -> STACObject:
# FIXME: replicate subdirectory behavior (#877)
# TODO: is a PurePath object fine as an abstraction?
raise NotImplementedError


class DiskWorkspace(Workspace):

Expand All @@ -42,3 +53,144 @@ def import_file(self, common_path: Union[str, Path], file: Path, merge: str, rem

def import_object(self, common_path: str, s3_uri: str, merge: str, remove_original: bool = False):
raise NotImplementedError(f"importing objects is not supported yet")

def merge(self, stac_resource: STACObject, target: PurePath, remove_original: bool = False) -> STACObject:
stac_resource = stac_resource.full_copy()

target = os.path.normpath(target)
target = Path(target[1:] if target.startswith("/") else target)
target = self.root_directory / target
target.relative_to(self.root_directory) # assert target_directory is in root_directory

file_operation = shutil.move if remove_original else shutil.copy

if isinstance(stac_resource, Collection):
new_collection = stac_resource

existing_collection = None
try:
existing_collection = pystac.Collection.from_file(str(target))
except FileNotFoundError:
pass # nothing to merge into

def href_layout_strategy() -> HrefLayoutStrategy:
def collection_func(_: Collection, parent_dir: str, is_root: bool) -> str:
if not is_root:
raise NotImplementedError("nested collections")
# make the collection file end up at $target, not at $target/collection.json
return str(Path(parent_dir) / target.name)

return CustomLayoutStrategy(collection_func=collection_func)

def replace_asset_href(asset_key: str, asset: pystac.Asset) -> pystac.Asset:
if urlparse(asset.href).scheme not in ["", "file"]: # TODO: convenient place; move elsewhere?
raise NotImplementedError(f"importing objects is not supported yet")

# TODO: crummy way to export assets after STAC Collection has been written to disk with new asset hrefs;
# it ends up in the asset metadata on disk
asset.extra_fields["_original_absolute_href"] = asset.get_absolute_href()
asset.href = asset_key # asset key matches the asset filename, becomes the relative path
return asset

if not existing_collection:
# TODO: write to a tempdir, then copy/move everything to $merge?
new_collection.normalize_hrefs(root_href=str(target.parent), strategy=href_layout_strategy())
new_collection = new_collection.map_assets(replace_asset_href)
new_collection.save(CatalogType.SELF_CONTAINED)

for new_item in new_collection.get_items():
for asset in new_item.get_assets().values():
file_operation(
asset.extra_fields["_original_absolute_href"], str(Path(new_item.get_self_href()).parent)
)

merged_collection = new_collection
else:
merged_collection = _merge_collection_metadata(existing_collection, new_collection)
new_collection = new_collection.map_assets(replace_asset_href)

for new_item in new_collection.get_items():
new_item.clear_links() # sever ties with previous collection
merged_collection.add_item(new_item)

merged_collection.normalize_hrefs(root_href=str(target.parent), strategy=href_layout_strategy())
merged_collection.save(CatalogType.SELF_CONTAINED)

for new_item in new_collection.get_items():
for asset in new_item.get_assets().values():
file_operation(
asset.extra_fields["_original_absolute_href"], Path(new_item.get_self_href()).parent
)

for item in merged_collection.get_items():
for asset in item.assets.values():
workspace_uri = f"file:{Path(item.get_self_href()).parent / Path(asset.href).name}"
asset.extra_fields["alternate"] = {"file": workspace_uri}

return merged_collection
else:
raise NotImplementedError(stac_resource)


def _merge_collection_metadata(existing_collection: Collection, new_collection: Collection) -> Collection:
existing_collection.extent.spatial = _merge_spatial_extents(
existing_collection.extent.spatial, new_collection.extent.spatial
)

existing_collection.extent.temporal = _merge_temporal_extents(
existing_collection.extent.temporal, new_collection.extent.temporal
)

# TODO: merge additional metadata?

return existing_collection


def _merge_spatial_extents(a: SpatialExtent, b: SpatialExtent) -> SpatialExtent:
overall_bbox_a, *sub_bboxes_a = a.bboxes
overall_bbox_b, *sub_bboxes_b = b.bboxes

merged_overall_bbox = [
min(overall_bbox_a[0], overall_bbox_b[0]),
min(overall_bbox_a[1], overall_bbox_b[1]),
max(overall_bbox_a[2], overall_bbox_b[2]),
max(overall_bbox_a[3], overall_bbox_b[3])
]

merged_sub_bboxes = sub_bboxes_a + sub_bboxes_b

merged_spatial_extent = SpatialExtent([merged_overall_bbox])
if merged_sub_bboxes:
merged_spatial_extent.bboxes.append(merged_sub_bboxes)

return merged_spatial_extent


def _merge_temporal_extents(a: TemporalExtent, b: TemporalExtent) -> TemporalExtent:
overall_interval_a, *sub_intervals_a = a.intervals
overall_interval_b, *sub_intervals_b = b.intervals

def min_time(t1: Optional[str], t2: Optional[str]) -> Optional[str]:
if t1 is None or t2 is None:
return None

return min(t1, t2)

def max_time(t1: Optional[str], t2: Optional[str]) -> Optional[str]:
if t1 is None or t2 is None:
return None

return max(t1, t2)

merged_overall_interval = [
min_time(overall_interval_a[0], overall_interval_b[0]),
max_time(overall_interval_a[1], overall_interval_b[1])
]

merged_sub_intervals = sub_intervals_a + sub_intervals_b

merged_temporal_extent = TemporalExtent([merged_overall_interval])
if merged_sub_intervals:
merged_temporal_extent.intervals.append(merged_sub_intervals)

return merged_temporal_extent
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
"reretry~=0.11.8",
"markdown>3.4",
"traceback-with-variables==2.0.4",
"pystac~=1.8.0",
],
extras_require={
"dev": tests_require,
Expand Down
Loading