Skip to content

Commit

Permalink
Merge pull request #52 from Open-EO/18-generate-stac
Browse files Browse the repository at this point in the history
18 generate stac
  • Loading branch information
GriffinBabe authored Mar 7, 2024
2 parents 3dab8b4 + cff7e1f commit d3fb0c6
Show file tree
Hide file tree
Showing 7 changed files with 440 additions and 1,278 deletions.
1,373 changes: 107 additions & 1,266 deletions examples/extraction_pipelines/S2_extraction_example.ipynb

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions src/openeo_gfmap/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class Backend(Enum):
TERRASCOPE = "terrascope"
EODC = "eodc" # Dask implementation. Do not test on this yet.
CDSE = "cdse" # Terrascope implementation (pyspark) #URL: openeo.dataspace.copernicus.eu (need to register)
CDSE_STAGING = "cdse-staging"
LOCAL = "local" # Based on the same components of EODc


Expand Down Expand Up @@ -86,6 +87,14 @@ def cdse_connection() -> openeo.Connection:
)


def cdse_staging_connection() -> openeo.Connection:
"""Performs a connection to the CDSE backend using oidc authentication."""
return _create_connection(
url="openeo-staging.dataspace.copernicus.eu",
env_var_suffix="CDSE_STAGING",
)


def eodc_connection() -> openeo.Connection:
"""Perfroms a connection to the EODC backend using the oidc authentication."""
return _create_connection(
Expand All @@ -97,4 +106,5 @@ def eodc_connection() -> openeo.Connection:
BACKEND_CONNECTIONS: Dict[Backend, Callable] = {
Backend.TERRASCOPE: vito_connection,
Backend.CDSE: cdse_connection,
Backend.CDSE_STAGING: cdse_staging_connection
}
6 changes: 6 additions & 0 deletions src/openeo_gfmap/fetching/s1.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,12 @@ def s1_grd_default_processor(cube: openeo.DataCube, **params):
get_s1_grd_default_processor, collection_name="SENTINEL1_GRD"
),
},
Backend.CDSE_STAGING: {
"default": partial(get_s1_grd_default_fetcher, collection_name="SENTINEL1_GRD"),
"preprocessor": partial(
get_s1_grd_default_processor, collection_name="SENTINEL1_GRD"
),
},
}


Expand Down
6 changes: 6 additions & 0 deletions src/openeo_gfmap/fetching/s2.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,12 @@ def s2_l2a_default_processor(cube: openeo.DataCube, **params):
get_s2_l2a_default_processor, collection_name="SENTINEL2_L2A"
),
},
Backend.CDSE_STAGING: {
"fetch": partial(get_s2_l2a_default_fetcher, collection_name="SENTINEL2_L2A"),
"preprocessor": partial(
get_s2_l2a_default_processor, collection_name="SENTINEL2_L2A"
),
},
}


Expand Down
83 changes: 71 additions & 12 deletions src/openeo_gfmap/manager/job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@
from typing import Callable, Optional, Union

import pandas as pd
import pystac
from openeo.extra.job_management import MultiBackendJobManager
from openeo.rest.job import BatchJob
from pystac import CatalogType

from openeo_gfmap.manager import _log
from openeo_gfmap.stac import constants


class PostJobStatus(Enum):
Expand All @@ -28,15 +31,13 @@ def __init__(
self,
output_dir: Path,
output_path_generator: Callable,
post_job_action: Optional[Callable],
post_job_action: Optional[Callable] = None,
poll_sleep: int = 5,
n_threads: int = 1,
post_job_params: dict = {},
):
self._output_dir = output_dir

self._downloaded_products = []

# Setup the threads to work on the on_job_done and on_job_error methods
self._finished_job_queue = queue.Queue()
self._n_threads = n_threads
Expand All @@ -52,6 +53,13 @@ def __init__(
MultiBackendJobManager._normalize_df = self._normalize_df
super().__init__(poll_sleep)

# Generate the root STAC collection
self._root_collection = pystac.Collection(
id=constants.ID,
description=constants.DESCRIPTION,
extent=None,
)

def _post_job_worker(self):
"""Checks which jobs are finished or failed and calls the `on_job_done` or `on_job_error`
methods."""
Expand Down Expand Up @@ -149,7 +157,7 @@ def on_job_done(self, job: BatchJob, row: pd.Series):
"""Method called when a job finishes successfully. It will first download the results of
the job and then call the `post_job_action` method.
"""
job_products = []
job_products = {}
for idx, asset in enumerate(job.get_results().get_assets()):
temp_file = NamedTemporaryFile(delete=False)
try:
Expand All @@ -171,7 +179,7 @@ def on_job_done(self, job: BatchJob, row: pd.Series):
# Move the temporary file to the final location
shutil.move(temp_file.name, output_path)
# Add to the list of downloaded products
job_products.append(output_path)
job_products[f"{job.job_id}_{asset.name}"] = [output_path]
_log.info(
f"Downloaded asset {asset.name} from job {job.job_id} -> {output_path}"
)
Expand All @@ -183,16 +191,42 @@ def on_job_done(self, job: BatchJob, row: pd.Series):
finally:
shutil.rmtree(temp_file.name, ignore_errors=True)

# Call the post job action
# First update the STAC collection with the assets directly resulting from the OpenEO batch job
job_metadata = pystac.Collection.from_dict(job.get_results().get_metadata())
job_items = []

for item_metadata in job_metadata.get_all_items():
try:
item = pystac.read_file(item_metadata.get_self_href())
asset_path = job_products[f"{job.job_id}_{item.id}"][0]

assert (
len(item.assets.values()) == 1
), "Each item should only contain one asset"
for asset in item.assets.values():
asset.href = str(
asset_path
) # Update the asset href to the output location set by the output_path_generator
item.id = f"{job.job_id}_{item.id}"
# Add the item to the the current job items.
job_items.append(item)
_log.info(f"Parsed item {item.id} from job {job.job_id}")
except Exception as e:
_log.exception(
f"Error failed to add item {item.id} from job {job.job_id} to STAC collection",
e,
)
raise e

# _post_job_action returns an updated list of stac items. Post job action can therefore
# update the stac items and access their products through the HREF. It is also the
# reponsible of adding the appropriate metadata/assets to the items.
if self._post_job_action is not None:
_log.debug(f"Calling post job action for job {job.job_id}...")
job_products = self._post_job_action(
job_products, row, self._post_job_params
)

self._downloaded_products.extend(job_products)
job_items = self._post_job_action(job_items, row, self._post_job_params)

# TODO STAC metadata
self._root_collection.add_items(job_items)
_log.info(f"Added {len(job_items)} items to the STAC collection.")

_log.info(f"Job {job.job_id} and post job action finished successfully.")

Expand Down Expand Up @@ -263,3 +297,28 @@ def run_jobs(

_log.info("Workers started, creating and running jobs.")
super().run_jobs(df, start_job, output_file)

def create_stac(self, output_path: Optional[Union[str, Path]] = None):
"""Method to be called after run_jobs to create a STAC catalog
and write it to self._output_dir
"""
if output_path is None:
output_path = self._output_dir / "stac"

self._root_collection.license = constants.LICENSE
self._root_collection.add_link(constants.LICENSE_LINK)
self._root_collection.stac_extensions = constants.STAC_EXTENSIONS

datacube_extension = pystac.extensions.datacube.DatacubeExtension.ext(
self._root_collection, add_if_missing=True
)
datacube_extension.apply(constants.CUBE_DIMENSIONS)

item_asset_extension = pystac.extensions.item_assets.ItemAssetsExtension.ext(
self._root_collection, add_if_missing=True
)
item_asset_extension.item_assets = constants.ITEM_ASSETS

self._root_collection.update_extent_from_items()
self._root_collection.normalize_hrefs(str(output_path))
self._root_collection.save(catalog_type=CatalogType.SELF_CONTAINED)
6 changes: 6 additions & 0 deletions src/openeo_gfmap/stac/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
"""Definitions of the constants in the STAC collection
"""

from openeo_gfmap.stac.constants import AUXILIARY

__all__ = ["AUXILIARY"]
Loading

0 comments on commit d3fb0c6

Please sign in to comment.