diff --git a/examples/extraction_pipelines/S2_extraction_example.ipynb b/examples/extraction_pipelines/S2_extraction_example.ipynb
new file mode 100644
index 0000000..35e05ab
--- /dev/null
+++ b/examples/extraction_pipelines/S2_extraction_example.ipynb
@@ -0,0 +1,908 @@
+{
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "# Example of a GFMAP full-extraction pipeline\n",
+ "\n",
+ "Designing a full-extraction pipeline using the openeo-gfmap GFMAPJobManager.\n",
+ "\n",
+ "The pipeline consits of the following elements:\n",
+ "* An input dataframe where each row corresponds to each executed job.\n",
+ "* An user function defined to create OpenEO BatchJob from input rows of the beforementioned dataframe.\n",
+ "* An user function defined to generate an output path of each of the Job products\n",
+ "* An user function executed after the assets are downloaded and saved from a finished job (optional). This **post job action** can do anything and will be executed locally inside the GFMAPJobManager.\n"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "### Setting up the logging module "
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 1,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# Configuring the logging for the openeo_gfmap package\n",
+ "from openeo_gfmap.manager import _log\n",
+ "import logging\n",
+ "\n",
+ "_log.setLevel(logging.DEBUG)\n",
+ "\n",
+ "stream_handler = logging.StreamHandler()\n",
+ "_log.addHandler(stream_handler)\n",
+ "\n",
+ "formatter = logging.Formatter('%(asctime)s|%(name)s|%(levelname)s: %(message)s')\n",
+ "stream_handler.setFormatter(formatter)\n",
+ "\n",
+ "# Exclude the other loggers from other libraries\n",
+ "class MyLoggerFilter(logging.Filter):\n",
+ " def filter(self, record):\n",
+ " return record.name == _log.name\n",
+ "\n",
+ "stream_handler.addFilter(MyLoggerFilter())\n"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "### First step: splitting the job\n",
+ "\n",
+ "We load the initial crop-type dataset that will be the base of our extractions.\n",
+ "\n",
+ "Splitting the dataset of extraction in multiple job based on position is necessary to respect OpenEO limitations.\n",
+ "\n",
+ "This script performs a split with the H3 hexagonal grid, yielding a list of sub-geodataframes.\n",
+ "\n",
+ "A subtility here is that some polygons are not directly extracted (field with `extract=False`), but should be kept for post-job actions. This requirement is filled by removing sub-dataframes that do not contain any extractable polyons."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 2,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "/data/users/Private/couchard/openeo-gfmap/src/openeo_gfmap/manager/job_splitters.py:51: UserWarning: Geometry is in a geographic CRS. Results from 'centroid' are likely incorrect. Use 'GeoSeries.to_crs()' to re-project geometries to a projected CRS before this operation.\n",
+ "\n",
+ " polygons[\"h3index\"] = polygons.geometry.centroid.apply(\n"
+ ]
+ },
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "334 sub-datasets.\n",
+ "236 sub-datasets after filtering sub-datasets with no point to extract.\n"
+ ]
+ }
+ ],
+ "source": [
+ "from pathlib import Path\n",
+ "import geopandas as gpd\n",
+ "from openeo_gfmap.manager.job_splitters import split_job_hex\n",
+ "\n",
+ "base_df_path = \"https://artifactory.vgt.vito.be/artifactory/auxdata-public/gfmap/DEMO_CROPTYPE.gpkg\"\n",
+ "base_df = gpd.read_file(base_df_path)\n",
+ "# Splits the job using GFMAP\n",
+ "split_jobs = split_job_hex(\n",
+ " base_df, max_points=60, grid_resolution=4\n",
+ ")\n",
+ "\n",
+ "print(f'{len(split_jobs)} sub-datasets.')\n",
+ "\n",
+ "# Remove the geometry where there are no points with the \"extract\" flag\n",
+ "split_jobs = [\n",
+ " job for job in split_jobs if job.extract.any()\n",
+ "]\n",
+ "print(f'{len(split_jobs)} sub-datasets after filtering sub-datasets with no point to extract.')"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "### Second step: creating a dataframe for the GFMAP Job Manager\n",
+ "\n",
+ "Implementing a function that yields a `pandas.DataFrame` where each row correponds to a job.\n",
+ "\n",
+ "The dataframe should contain the informations required by the GFMAP Job Manager, as well as additional information used by the datacube creation function and the post-job action function.\n",
+ "\n",
+ "The output dataframe should be savable as a .csv file.\n",
+ "\n",
+ "Note: the full information of a sub-geodataframe of polygons can be saved into a row of a `pandas.DataFrame` by storing it in a row as string implementing the `geojson.FeatureCollection` interface. To convert the `geopandas.GeoDataFrame` into a stirng, simply use the `.to_json()` function."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 3,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/html": [
+ "
\n",
+ "\n",
+ "
\n",
+ " \n",
+ " \n",
+ " | \n",
+ " backend_name | \n",
+ " out_prefix | \n",
+ " out_extension | \n",
+ " start_date | \n",
+ " end_date | \n",
+ " geometry | \n",
+ "
\n",
+ " \n",
+ " \n",
+ " \n",
+ " 0 | \n",
+ " cdse | \n",
+ " S2_10m | \n",
+ " .nc | \n",
+ " 2020-08-30 | \n",
+ " 2022-03-03 | \n",
+ " {\"type\": \"FeatureCollection\", \"features\": [{\"i... | \n",
+ "
\n",
+ " \n",
+ " 1 | \n",
+ " cdse | \n",
+ " S2_10m | \n",
+ " .nc | \n",
+ " 2020-08-30 | \n",
+ " 2022-03-03 | \n",
+ " {\"type\": \"FeatureCollection\", \"features\": [{\"i... | \n",
+ "
\n",
+ " \n",
+ " 2 | \n",
+ " cdse | \n",
+ " S2_10m | \n",
+ " .nc | \n",
+ " 2020-08-30 | \n",
+ " 2022-03-03 | \n",
+ " {\"type\": \"FeatureCollection\", \"features\": [{\"i... | \n",
+ "
\n",
+ " \n",
+ " 3 | \n",
+ " cdse | \n",
+ " S2_10m | \n",
+ " .nc | \n",
+ " 2020-08-30 | \n",
+ " 2022-03-03 | \n",
+ " {\"type\": \"FeatureCollection\", \"features\": [{\"i... | \n",
+ "
\n",
+ " \n",
+ " 4 | \n",
+ " cdse | \n",
+ " S2_10m | \n",
+ " .nc | \n",
+ " 2020-08-30 | \n",
+ " 2022-03-03 | \n",
+ " {\"type\": \"FeatureCollection\", \"features\": [{\"i... | \n",
+ "
\n",
+ " \n",
+ " ... | \n",
+ " ... | \n",
+ " ... | \n",
+ " ... | \n",
+ " ... | \n",
+ " ... | \n",
+ " ... | \n",
+ "
\n",
+ " \n",
+ " 231 | \n",
+ " cdse | \n",
+ " S2_10m | \n",
+ " .nc | \n",
+ " 2020-08-30 | \n",
+ " 2022-03-03 | \n",
+ " {\"type\": \"FeatureCollection\", \"features\": [{\"i... | \n",
+ "
\n",
+ " \n",
+ " 232 | \n",
+ " cdse | \n",
+ " S2_10m | \n",
+ " .nc | \n",
+ " 2020-08-30 | \n",
+ " 2022-03-03 | \n",
+ " {\"type\": \"FeatureCollection\", \"features\": [{\"i... | \n",
+ "
\n",
+ " \n",
+ " 233 | \n",
+ " cdse | \n",
+ " S2_10m | \n",
+ " .nc | \n",
+ " 2020-08-30 | \n",
+ " 2022-03-03 | \n",
+ " {\"type\": \"FeatureCollection\", \"features\": [{\"i... | \n",
+ "
\n",
+ " \n",
+ " 234 | \n",
+ " cdse | \n",
+ " S2_10m | \n",
+ " .nc | \n",
+ " 2020-08-30 | \n",
+ " 2022-03-03 | \n",
+ " {\"type\": \"FeatureCollection\", \"features\": [{\"i... | \n",
+ "
\n",
+ " \n",
+ " 235 | \n",
+ " cdse | \n",
+ " S2_10m | \n",
+ " .nc | \n",
+ " 2020-08-30 | \n",
+ " 2022-03-03 | \n",
+ " {\"type\": \"FeatureCollection\", \"features\": [{\"i... | \n",
+ "
\n",
+ " \n",
+ "
\n",
+ "
236 rows × 6 columns
\n",
+ "
"
+ ],
+ "text/plain": [
+ " backend_name out_prefix out_extension start_date end_date \\\n",
+ "0 cdse S2_10m .nc 2020-08-30 2022-03-03 \n",
+ "1 cdse S2_10m .nc 2020-08-30 2022-03-03 \n",
+ "2 cdse S2_10m .nc 2020-08-30 2022-03-03 \n",
+ "3 cdse S2_10m .nc 2020-08-30 2022-03-03 \n",
+ "4 cdse S2_10m .nc 2020-08-30 2022-03-03 \n",
+ ".. ... ... ... ... ... \n",
+ "231 cdse S2_10m .nc 2020-08-30 2022-03-03 \n",
+ "232 cdse S2_10m .nc 2020-08-30 2022-03-03 \n",
+ "233 cdse S2_10m .nc 2020-08-30 2022-03-03 \n",
+ "234 cdse S2_10m .nc 2020-08-30 2022-03-03 \n",
+ "235 cdse S2_10m .nc 2020-08-30 2022-03-03 \n",
+ "\n",
+ " geometry \n",
+ "0 {\"type\": \"FeatureCollection\", \"features\": [{\"i... \n",
+ "1 {\"type\": \"FeatureCollection\", \"features\": [{\"i... \n",
+ "2 {\"type\": \"FeatureCollection\", \"features\": [{\"i... \n",
+ "3 {\"type\": \"FeatureCollection\", \"features\": [{\"i... \n",
+ "4 {\"type\": \"FeatureCollection\", \"features\": [{\"i... \n",
+ ".. ... \n",
+ "231 {\"type\": \"FeatureCollection\", \"features\": [{\"i... \n",
+ "232 {\"type\": \"FeatureCollection\", \"features\": [{\"i... \n",
+ "233 {\"type\": \"FeatureCollection\", \"features\": [{\"i... \n",
+ "234 {\"type\": \"FeatureCollection\", \"features\": [{\"i... \n",
+ "235 {\"type\": \"FeatureCollection\", \"features\": [{\"i... \n",
+ "\n",
+ "[236 rows x 6 columns]"
+ ]
+ },
+ "execution_count": 3,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "from openeo_gfmap import Backend\n",
+ "from typing import List\n",
+ "import pandas as pd\n",
+ "\n",
+ "def create_job_dataframe_s2(backend: Backend, split_jobs: List[gpd.GeoDataFrame]) -> pd.DataFrame:\n",
+ " \"\"\"Create a dataframe from the split jobs, containg all the necessary information to run the job.\"\"\"\n",
+ " columns = ['backend_name', 'out_prefix', 'out_extension', 'start_date', 'end_date', 'geometry']\n",
+ " rows = []\n",
+ " for job in split_jobs:\n",
+ " # Compute the average in the valid date and make a buffer of 1.5 year around\n",
+ " median_time = pd.to_datetime(job.valid_date).mean()\n",
+ " start_date = median_time - pd.Timedelta(days=275) # A bit more than 9 months\n",
+ " end_date = median_time + pd.Timedelta(days=275) # A bit more than 9 months\n",
+ " \n",
+ " rows.append(\n",
+ " pd.Series(\n",
+ " dict(zip(columns, [backend.value, 'S2_10m', '.nc', start_date.strftime('%Y-%m-%d'), end_date.strftime('%Y-%m-%d'), job.to_json()]))\n",
+ " )\n",
+ " )\n",
+ "\n",
+ " return pd.DataFrame(rows)\n",
+ "\n",
+ "job_df = create_job_dataframe_s2(Backend.CDSE, split_jobs)\n",
+ "\n",
+ "job_df"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "### Sub-sampling job dataframe to reduce execution time"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 4,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# Run a subset of the jobs to test the manager, the selected jobs have a fair amount of geometries to extract\n",
+ "job_df = job_df.iloc[[0, 2, 3, -6]].reset_index(drop=True)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 5,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "array([3])"
+ ]
+ },
+ "execution_count": 5,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "import geojson\n",
+ "\n",
+ "def get_job_nb_polygons(row: pd.Series) -> int:\n",
+ " \"\"\"Get the number of polygons in the geometry.\"\"\"\n",
+ " return len(list(filter(lambda feat: feat.properties.get(\"extract\"), geojson.loads(row.geometry)['features'])))\n",
+ "\n",
+ "job_df['nb_polygons'] = job_df.apply(get_job_nb_polygons, axis=1)\n",
+ "job_df.nb_polygons.values"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "### Third step: implement the datacube creator function.\n",
+ "\n",
+ "Implement a function to create, from the additional rows provided before, an `openeo.BatchJob` that will be used to run the job.\n",
+ "\n",
+ "In this case we extract Sentinel-2 data around a 64x64 pixel square of polygons which have the field `extract=True` (although we keep them in the row for the post-job action.)\n",
+ "\n",
+ "Note:\n",
+ "Because the polygons to extract are specified in UTM dimensions (required to have a specific size), the dataset of polygon cannot be send directly through the openeo process graph (GeoJSON only support lat/lon coordinates). The sub-datasets of polygons are therefore uploaded to a publicly accessible URL so they can be used later by openeo during the execution of the job."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 6,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "import openeo\n",
+ "\n",
+ "import requests\n",
+ "from tempfile import NamedTemporaryFile\n",
+ "import os\n",
+ "import pandas as pd\n",
+ "import geojson\n",
+ "from shapely.geometry import Point\n",
+ "\n",
+ "from openeo_gfmap import TemporalContext, Backend, BackendContext, FetchType, SpatialContext\n",
+ "from openeo_gfmap.fetching import build_sentinel2_l2a_extractor\n",
+ "\n",
+ "def upload_geoparquet_artifactory(gdf: gpd.GeoDataFrame, row_id: int) -> str:\n",
+ " # Save the dataframe as geoparquet to upload it to artifactory\n",
+ " temporary_file = NamedTemporaryFile()\n",
+ " gdf.to_parquet(temporary_file.name)\n",
+ " \n",
+ " artifactory_username = os.getenv('ARTIFACTORY_USERNAME')\n",
+ " artifactory_password = os.getenv('ARTIFACTORY_PASSWORD')\n",
+ "\n",
+ " headers = {\n",
+ " \"Content-Type\": \"application/octet-stream\"\n",
+ " }\n",
+ "\n",
+ " upload_url = f\"https://artifactory.vgt.vito.be/artifactory/auxdata-public/gfmap-temp/openeogfmap_dataframe_{row_id}.parquet\"\n",
+ "\n",
+ " with open(temporary_file.name, 'rb') as f:\n",
+ " response = requests.put(upload_url, headers=headers, data=f, auth=(artifactory_username, artifactory_password))\n",
+ "\n",
+ " assert response.status_code == 201, f\"Error uploading the dataframe to artifactory: {response.text}\"\n",
+ "\n",
+ " return upload_url\n",
+ "\n",
+ "\n",
+ "def create_datacube_s2(row: pd.Series, connection: openeo.DataCube, provider=None, connection_provider=None) -> openeo.BatchJob:\n",
+ "\n",
+ " def buffer_geometry(geometry: geojson.FeatureCollection, buffer: int) -> gpd.GeoDataFrame:\n",
+ " gdf = gpd.GeoDataFrame.from_features(geometry).set_crs(epsg=4326)\n",
+ " utm = gdf.estimate_utm_crs()\n",
+ " gdf = gdf.to_crs(utm)\n",
+ "\n",
+ " gdf['geometry'] = gdf.centroid.apply(\n",
+ " # Clips the point to the closest 20m from the S2 grid\n",
+ " lambda point: Point(round(point.x / 20.0) * 20.0, round(point.y / 20.0) * 20.0)\n",
+ " ).buffer(distance=buffer, cap_style=3)\n",
+ "\n",
+ " return gdf\n",
+ "\n",
+ " def filter_extractonly_geometries(collection: geojson.FeatureCollection):\n",
+ " # Filter out geometries that do not have the field extract=True\n",
+ " features = [f for f in collection.features if f.properties.get('extract', False)]\n",
+ " return geojson.FeatureCollection(features)\n",
+ "\n",
+ " start_date = row.start_date\n",
+ " end_date = row.end_date\n",
+ " temporal_context = TemporalContext(start_date, end_date)\n",
+ "\n",
+ " # Get the feature collection containing the geometry to the job\n",
+ " geometry = geojson.loads(row.geometry)\n",
+ " assert isinstance(geometry, geojson.FeatureCollection)\n",
+ "\n",
+ " # Filter the geometry to the rows with the extract only flag\n",
+ " geometry = filter_extractonly_geometries(geometry)\n",
+ " assert len(geometry.features) > 0, \"No geometries with the extract flag found\"\n",
+ "\n",
+ " # Performs a buffer of 64 px around the geometry\n",
+ " geometry_df = buffer_geometry(geometry, 320)\n",
+ " spatial_extent_url = upload_geoparquet_artifactory(geometry_df, row.name)\n",
+ "\n",
+ " # Backend name and fetching type\n",
+ " backend = Backend(row.backend_name)\n",
+ " backend_context = BackendContext(backend)\n",
+ "\n",
+ " fetch_type = FetchType.POLYGON\n",
+ " bands_to_download = ['S2-L2A-B01', 'S2-L2A-B02', 'S2-L2A-B03', 'S2-L2A-B04', 'S2-L2A-B05', 'S2-L2A-B06', 'S2-L2A-B07', 'S2-L2A-B08', 'S2-L2A-B8A', 'S2-L2A-B09', 'S2-L2A-B11', 'S2-L2A-B12', 'S2-L2A-SCL']\n",
+ "\n",
+ " # Create the job to extract S2\n",
+ " extraction_parameters = {\n",
+ " \"target_resolution\": 10,\n",
+ " \"load_collection\": {\n",
+ " \"eo:cloud_cover\": lambda val: val <= 95.0,\n",
+ " },\n",
+ " }\n",
+ " extractor = build_sentinel2_l2a_extractor(\n",
+ " backend_context, bands=bands_to_download, fetch_type=fetch_type.POLYGON, **extraction_parameters \n",
+ " )\n",
+ "\n",
+ " cube = extractor.get_cube(connection, spatial_extent_url, temporal_context)\n",
+ "\n",
+ " # Compute the SCL dilation and add it to the cube\n",
+ " scl_dilated_mask = cube.process(\n",
+ " \"to_scl_dilation_mask\",\n",
+ " data=cube,\n",
+ " scl_band_name=\"S2-L2A-SCL\",\n",
+ " kernel1_size=17, # 17px dilation on a 20m layer\n",
+ " kernel2_size=77, # 77px dilation on a 20m layer\n",
+ " mask1_values=[2, 4, 5, 6, 7],\n",
+ " mask2_values=[3, 8, 9, 10, 11],\n",
+ " erosion_kernel_size=3\n",
+ " ).rename_labels(\"bands\", [\"S2-L2A-SCL_DILATED_MASK\"])\n",
+ "\n",
+ " cube = cube.merge_cubes(scl_dilated_mask)\n",
+ " cube = cube.linear_scale_range(0, 65534, 0, 65534)\n",
+ "\n",
+ " # Get the h3index to use in the tile\n",
+ " h3index = geometry.features[0].properties['h3index']\n",
+ " valid_date = geometry.features[0].properties['valid_date']\n",
+ "\n",
+ " # Increase the memory of the jobs depending on the number of polygons to extract\n",
+ " number_polygons = get_job_nb_polygons(row)\n",
+ " _log.debug(f\"Number of polygons to extract: {number_polygons}\")\n",
+ "\n",
+ " job_options = {\n",
+ " \"executor-memory\": \"5G\",\n",
+ " \"executor-memoryOverhead\": \"2G\",\n",
+ " }\n",
+ "\n",
+ " return cube.create_job(\n",
+ " out_format=\"NetCDF\",\n",
+ " title=f\"GFMAP_Extraction_S2_{h3index}_{valid_date}\",\n",
+ " sample_by_feature=True,\n",
+ " job_options=job_options\n",
+ " )\n"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "### Fourth step: create output paths\n",
+ "\n",
+ "Implement a function that from a temporary path containing a job result, from the job dataframe row and the root folder will choose the output path where to save that job result."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 7,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "CPU times: user 4.62 s, sys: 85.2 ms, total: 4.71 s\n",
+ "Wall time: 5.03 s\n"
+ ]
+ }
+ ],
+ "source": [
+ "%%time\n",
+ "\n",
+ "# Load the S2 grid\n",
+ "s2_grid = gpd.read_file(\"https://artifactory.vgt.vito.be/artifactory/auxdata-public/gfmap/s2grid_bounds.gpkg\")"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 8,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "from pathlib import Path\n",
+ "import xarray as xr\n",
+ "from pyproj import Transformer, CRS\n",
+ "from shapely.geometry import box, Point\n",
+ "\n",
+ "def generate_output_path_s2(root_folder: Path, tmp_path: Path, geometry_index: int, row: pd.Series):\n",
+ " features = geojson.loads(row.geometry)\n",
+ " sample_id = features[geometry_index].properties['sample_id']\n",
+ " ref_id = features[geometry_index].properties['ref_id']\n",
+ " \n",
+ " # Loads the array lazily in-memory\n",
+ " try:\n",
+ " inds = xr.open_dataset(tmp_path, chunks='auto')\n",
+ " \n",
+ " source_crs = CRS.from_wkt(inds.crs.attrs['crs_wkt'])\n",
+ " dst_crs = CRS.from_epsg(4326)\n",
+ " \n",
+ " transformer = Transformer.from_crs(source_crs, dst_crs, always_xy=True)\n",
+ "\n",
+ " # Get the center point of the tile\n",
+ " centroid_utm = box(\n",
+ " inds.x.min().item(), inds.y.min().item(), inds.x.max().item(), inds.y.max().item()\n",
+ " ).centroid\n",
+ " centroid_latlon = Point(*transformer.transform(centroid_utm.x, centroid_utm.y))\n",
+ "\n",
+ " # Intersecting with the s2 grid\n",
+ " intersecting = s2_grid.geometry.intersects(centroid_latlon)\n",
+ "\n",
+ " # Select the intersecting cell that has a centroid the closest from the point\n",
+ " intersecting_cells = s2_grid[intersecting]\n",
+ " intersecting_cells['distance'] = intersecting_cells.distance(centroid_latlon)\n",
+ " intersecting_cells.sort_values('distance', inplace=True)\n",
+ " s2_tile = intersecting_cells.iloc[0]\n",
+ "\n",
+ " s2_tile_id = s2_tile.tile\n",
+ "\n",
+ " subfolder = root_folder / ref_id / str(source_crs.to_epsg()) / s2_tile_id / sample_id\n",
+ " except Exception:\n",
+ " subfolder = root_folder / 'unsortable'\n",
+ "\n",
+ " return subfolder / f'{row.out_prefix}_{sample_id}_{source_crs.to_epsg()}_{row.start_date}_{row.end_date}{row.out_extension}'\n",
+ " "
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "### Fifth step: Define the post-job action\n",
+ "\n",
+ "The post-job action will be called once the job resut was downloaded and saved to a specific path.\n",
+ "\n",
+ "A post-job action function must receive 3 parameters:\n",
+ "* `job_items`: STAC items containing the currently extracted data for the job.\n",
+ "* `row`: The current job dataframe row.\n",
+ "* `parameters`: User-defined parameters set in the `GFMAPJobManager` constructor.\n",
+ "\n",
+ "The post-job action must return back the list of job items. The user is responsible for updating,\n",
+ "adding and removing the items. For example, in this case the user creates a raster file with\n",
+ "ground truth data, the user then adds an asset using the predefined `openeo_gfmap.stac.AUXILIARY`\n",
+ "AssetDefintion to the related item, pointing to the generated NetCDF file."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 9,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "<>:16: SyntaxWarning: assertion is always true, perhaps remove parentheses?\n",
+ "<>:16: SyntaxWarning: assertion is always true, perhaps remove parentheses?\n",
+ "/tmp/ipykernel_4054/2115750546.py:16: SyntaxWarning: assertion is always true, perhaps remove parentheses?\n",
+ " assert (\n"
+ ]
+ }
+ ],
+ "source": [
+ "from importlib.metadata import version\n",
+ "from datetime import datetime\n",
+ "\n",
+ "from openeo_gfmap.stac import AUXILIARY\n",
+ "from rasterio.features import rasterize\n",
+ "from rasterio.transform import from_bounds\n",
+ "import pystac\n",
+ "import json\n",
+ "\n",
+ "def add_item_asset(related_item: pystac.Item, path: Path):\n",
+ " asset = AUXILIARY.create_asset(\n",
+ " href=path.as_posix()\n",
+ " )\n",
+ " related_item.add_asset('auxiliary', asset)\n",
+ "\n",
+ "def post_job_action(job_items: List[pystac.Item], row: pd.Series, parameters: dict = {}) -> list:\n",
+ " base_gpd = gpd.GeoDataFrame.from_features(json.loads(row.geometry)).set_crs(epsg=4326)\n",
+ " assert (\n",
+ " len(base_gpd[base_gpd.extract == True]) == len(job_items),\n",
+ " \"The number of result paths should be the same as the number of geometries\"\n",
+ " )\n",
+ " extracted_gpd = base_gpd[base_gpd.extract == True].reset_index(drop=True)\n",
+ " # In this case we want to burn the metadata in a new file in the same folder as the S2 product\n",
+ " for idx, item in enumerate(job_items):\n",
+ " sample_id = extracted_gpd.iloc[idx].sample_id\n",
+ " ref_id = extracted_gpd.iloc[idx].ref_id\n",
+ " confidence = extracted_gpd.iloc[idx].confidence\n",
+ " valid_date = extracted_gpd.iloc[idx].valid_date\n",
+ "\n",
+ " item_asset_path = Path(\n",
+ " list(item.assets.values())[0].href\n",
+ " )\n",
+ " # Read information from the item file (could also read it from the item object metadata)\n",
+ " result_ds = xr.open_dataset(item_asset_path, chunks='auto')\n",
+ "\n",
+ " # Add some metadata to the result_df netcdf file\n",
+ " result_ds.attrs.update({\n",
+ " 'start_date': row.start_date,\n",
+ " 'end_date': row.end_date,\n",
+ " 'valid_date': valid_date,\n",
+ " 'GFMAP_version': version('openeo_gfmap'),\n",
+ " 'creation_date': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),\n",
+ " 'description': f'Sentinel2 L2A observations for sample: {sample_id}, unprocessed.',\n",
+ " 'title': f'Sentinel2 L2A - {sample_id}',\n",
+ " 'sample_id': sample_id,\n",
+ " 'spatial_resolution': '10m'\n",
+ " })\n",
+ " result_ds.to_netcdf(item_asset_path, format='NETCDF4', engine='h5netcdf')\n",
+ "\n",
+ " target_crs = CRS.from_wkt(result_ds.crs.attrs['crs_wkt'])\n",
+ "\n",
+ " # Get the surrounding polygons around our extracted center geometry to rastetize them\n",
+ " bounds = (\n",
+ " result_ds.x.min().item(),\n",
+ " result_ds.y.min().item(),\n",
+ " result_ds.x.max().item(),\n",
+ " result_ds.y.max().item()\n",
+ " )\n",
+ " bbox = box(*bounds)\n",
+ " surround_gpd = base_gpd.to_crs(target_crs).clip(bbox)\n",
+ "\n",
+ " # Burn the polygon croptypes\n",
+ " transform = from_bounds(*bounds, result_ds.x.size, result_ds.y.size)\n",
+ " croptype_shapes = list(zip(surround_gpd.geometry, surround_gpd.croptype_label))\n",
+ "\n",
+ " fill_value = 0\n",
+ " croptype = rasterize(\n",
+ " croptype_shapes,\n",
+ " out_shape=(result_ds.y.size, result_ds.x.size),\n",
+ " transform=transform,\n",
+ " all_touched=False,\n",
+ " fill=fill_value,\n",
+ " default_value=0,\n",
+ " dtype='int64'\n",
+ " )\n",
+ "\n",
+ " # Create the attributes to add to the metadata\n",
+ " attributes = {\n",
+ " 'ref_id': ref_id,\n",
+ " 'sample_id': sample_id,\n",
+ " 'confidence': str(confidence),\n",
+ " 'valid_date': valid_date,\n",
+ " '_FillValue': fill_value,\n",
+ " 'Conventions': 'CF-1.9',\n",
+ " 'GFMAP_version': version('openeo_gfmap'),\n",
+ " 'creation_date': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),\n",
+ " 'description': f'Contains rasterized WorldCereal labels for sample: {sample_id}.',\n",
+ " 'title': f'WORLDCEREAL Auxiliary file for sample: {sample_id}',\n",
+ " 'spatial_resolution': '10m'\n",
+ " }\n",
+ "\n",
+ " aux_dataset = xr.Dataset(\n",
+ " {'LABEL': (('y', 'x'), croptype), 'crs': result_ds['crs']},\n",
+ " coords={'y': result_ds.y, 'x': result_ds.x},\n",
+ " attrs=attributes\n",
+ " )\n",
+ " # Required to map the 'crs' layer as geo-reference for the 'LABEL' layer.\n",
+ " aux_dataset['LABEL'].attrs['grid_mapping'] = 'crs'\n",
+ "\n",
+ " # Save the metadata in the same folder as the S2 product\n",
+ " metadata_path = (\n",
+ " item_asset_path.parent / \n",
+ " f'WORLDCEREAL_10m_{sample_id}_{target_crs.to_epsg()}_{valid_date}.nc'\n",
+ " )\n",
+ " aux_dataset.to_netcdf(metadata_path, format='NETCDF4', engine='h5netcdf')\n",
+ " aux_dataset.close()\n",
+ "\n",
+ " # Adds this metadata as a new asset\n",
+ " add_item_asset(item, metadata_path)\n",
+ " \n",
+ " return job_items\n"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "### Sixth and last step: Running the manager\n",
+ "\n",
+ "Let's initialize and execute the Job Manager as defined the GFMAP, and then run it using the functions defined previously\n",
+ "\n",
+ "STAC related parameters such as `collection_id` and `collection_description` are also required."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 10,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "from openeo_gfmap.manager.job_manager import GFMAPJobManager\n",
+ "from openeo_gfmap.backend import cdse_staging_connection\n",
+ "\n",
+ "\n",
+ "base_output_dir = Path('/data/users/Public/couchard/world_cereal/extractions_5/')\n",
+ "tracking_job_csv = base_output_dir / 'job_tracker.csv'\n",
+ "\n",
+ "manager = GFMAPJobManager(\n",
+ " output_dir=base_output_dir,\n",
+ " output_path_generator=generate_output_path_s2,\n",
+ " collection_id='SENTINEL-EXTRACTION',\n",
+ " collection_description=(\n",
+ " \"Sentinel-2 and Auxiliary data extraction example.\"\n",
+ " ),\n",
+ " post_job_action=post_job_action,\n",
+ " poll_sleep=60,\n",
+ " n_threads=2,\n",
+ " post_job_params={}\n",
+ ")\n",
+ "\n",
+ "manager.add_backend(\n",
+ " Backend.CDSE_STAGING.value, cdse_staging_connection, parallel_jobs=6\n",
+ ")"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 11,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "2024-03-07 14:32:38,392|openeo_gfmap.manager|INFO: Starting job manager using 2 worker threads.\n",
+ "2024-03-07 14:32:38,395|openeo_gfmap.manager|INFO: Workers started, creating and running jobs.\n",
+ "2024-03-07 14:32:38,592|openeo_gfmap.manager|DEBUG: Normalizing dataframe. Columns: Index(['backend_name', 'out_prefix', 'out_extension', 'start_date', 'end_date',\n",
+ " 'geometry', 'nb_polygons', 'status', 'id', 'start_time', 'cpu',\n",
+ " 'memory', 'duration', 'description', 'costs'],\n",
+ " dtype='object')\n"
+ ]
+ },
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "Authenticated using refresh token.\n"
+ ]
+ },
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "2024-03-07 14:32:40,375|openeo_gfmap.manager|DEBUG: Number of polygons to extract: 3\n"
+ ]
+ },
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "DataCube()\n"
+ ]
+ },
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "2024-03-07 14:34:05,989|openeo_gfmap.manager|DEBUG: Status of job j-240307f2186344ee9bed9c649679dc92 is running (on backend cdse-staging).\n",
+ "2024-03-07 14:35:06,713|openeo_gfmap.manager|DEBUG: Status of job j-240307f2186344ee9bed9c649679dc92 is running (on backend cdse-staging).\n",
+ "2024-03-07 14:36:07,149|openeo_gfmap.manager|DEBUG: Status of job j-240307f2186344ee9bed9c649679dc92 is running (on backend cdse-staging).\n",
+ "2024-03-07 14:37:08,393|openeo_gfmap.manager|DEBUG: Status of job j-240307f2186344ee9bed9c649679dc92 is running (on backend cdse-staging).\n",
+ "2024-03-07 14:38:08,991|openeo_gfmap.manager|DEBUG: Status of job j-240307f2186344ee9bed9c649679dc92 is running (on backend cdse-staging).\n",
+ "2024-03-07 14:39:09,497|openeo_gfmap.manager|DEBUG: Status of job j-240307f2186344ee9bed9c649679dc92 is running (on backend cdse-staging).\n",
+ "2024-03-07 14:40:11,296|openeo_gfmap.manager|DEBUG: Status of job j-240307f2186344ee9bed9c649679dc92 is running (on backend cdse-staging).\n",
+ "2024-03-07 14:41:12,353|openeo_gfmap.manager|DEBUG: Status of job j-240307f2186344ee9bed9c649679dc92 is running (on backend cdse-staging).\n",
+ "2024-03-07 14:42:13,100|openeo_gfmap.manager|DEBUG: Status of job j-240307f2186344ee9bed9c649679dc92 is running (on backend cdse-staging).\n",
+ "2024-03-07 14:43:13,535|openeo_gfmap.manager|DEBUG: Status of job j-240307f2186344ee9bed9c649679dc92 is running (on backend cdse-staging).\n",
+ "2024-03-07 14:44:14,023|openeo_gfmap.manager|DEBUG: Status of job j-240307f2186344ee9bed9c649679dc92 is running (on backend cdse-staging).\n",
+ "2024-03-07 14:45:14,444|openeo_gfmap.manager|DEBUG: Status of job j-240307f2186344ee9bed9c649679dc92 is running (on backend cdse-staging).\n",
+ "2024-03-07 14:46:14,884|openeo_gfmap.manager|DEBUG: Status of job j-240307f2186344ee9bed9c649679dc92 is running (on backend cdse-staging).\n",
+ "2024-03-07 14:47:15,338|openeo_gfmap.manager|DEBUG: Status of job j-240307f2186344ee9bed9c649679dc92 is running (on backend cdse-staging).\n",
+ "2024-03-07 14:48:15,855|openeo_gfmap.manager|DEBUG: Status of job j-240307f2186344ee9bed9c649679dc92 is running (on backend cdse-staging).\n",
+ "2024-03-07 14:49:18,136|openeo_gfmap.manager|DEBUG: Status of job j-240307f2186344ee9bed9c649679dc92 is running (on backend cdse-staging).\n",
+ "2024-03-07 14:50:19,075|openeo_gfmap.manager|DEBUG: Status of job j-240307f2186344ee9bed9c649679dc92 is running (on backend cdse-staging).\n",
+ "2024-03-07 14:51:19,412|openeo_gfmap.manager|DEBUG: Status of job j-240307f2186344ee9bed9c649679dc92 is running (on backend cdse-staging).\n",
+ "2024-03-07 14:52:19,838|openeo_gfmap.manager|DEBUG: Status of job j-240307f2186344ee9bed9c649679dc92 is running (on backend cdse-staging).\n",
+ "2024-03-07 14:53:21,069|openeo_gfmap.manager|DEBUG: Status of job j-240307f2186344ee9bed9c649679dc92 is running (on backend cdse-staging).\n",
+ "2024-03-07 14:54:21,621|openeo_gfmap.manager|DEBUG: Status of job j-240307f2186344ee9bed9c649679dc92 is running (on backend cdse-staging).\n",
+ "2024-03-07 14:55:22,007|openeo_gfmap.manager|DEBUG: Status of job j-240307f2186344ee9bed9c649679dc92 is running (on backend cdse-staging).\n",
+ "2024-03-07 14:56:22,363|openeo_gfmap.manager|DEBUG: Status of job j-240307f2186344ee9bed9c649679dc92 is running (on backend cdse-staging).\n",
+ "2024-03-07 14:57:23,108|openeo_gfmap.manager|DEBUG: Status of job j-240307f2186344ee9bed9c649679dc92 is finished (on backend cdse-staging).\n",
+ "2024-03-07 14:57:23,110|openeo_gfmap.manager|INFO: Job j-240307f2186344ee9bed9c649679dc92 finished successfully, queueing on_job_done...\n",
+ "2024-03-07 14:57:23,115|openeo_gfmap.manager|DEBUG: Worker thread Thread-5: polled finished job with status PostJobStatus.FINISHED.\n",
+ "2024-03-07 14:57:24,650|openeo_gfmap.manager|DEBUG: Downloading asset openEO.nc from job j-240307f2186344ee9bed9c649679dc92 -> /tmp/tmpyu7z9urd\n",
+ "2024-03-07 14:57:43,859|openeo_gfmap.manager|DEBUG: Generating output path for asset openEO.nc from job j-240307f2186344ee9bed9c649679dc92...\n",
+ "/tmp/ipykernel_4054/3116825532.py:31: UserWarning: Geometry is in a geographic CRS. Results from 'distance' are likely incorrect. Use 'GeoSeries.to_crs()' to re-project geometries to a projected CRS before this operation.\n",
+ "\n",
+ " intersecting_cells['distance'] = intersecting_cells.distance(centroid_latlon)\n",
+ "/home/couchard/miniconda3/envs/gfmap/lib/python3.9/site-packages/geopandas/geodataframe.py:1543: SettingWithCopyWarning: \n",
+ "A value is trying to be set on a copy of a slice from a DataFrame.\n",
+ "Try using .loc[row_indexer,col_indexer] = value instead\n",
+ "\n",
+ "See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy\n",
+ " super().__setitem__(key, value)\n",
+ "/tmp/ipykernel_4054/3116825532.py:32: SettingWithCopyWarning: \n",
+ "A value is trying to be set on a copy of a slice from a DataFrame\n",
+ "\n",
+ "See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy\n",
+ " intersecting_cells.sort_values('distance', inplace=True)\n",
+ "2024-03-07 14:57:44,390|openeo_gfmap.manager|DEBUG: Generated path for asset openEO.nc from job j-240307f2186344ee9bed9c649679dc92 -> /data/users/Public/couchard/world_cereal/extractions_5/2021_EUR_DEMO_POLY_110/32635/35VMD/2021_LV_LPIS_POLY_110-12880341/S2_10m_2021_LV_LPIS_POLY_110-12880341_32635_2020-08-30_2022-03-03.nc\n",
+ "2024-03-07 14:57:48,864|openeo_gfmap.manager|INFO: Downloaded asset openEO.nc from job j-240307f2186344ee9bed9c649679dc92 -> /data/users/Public/couchard/world_cereal/extractions_5/2021_EUR_DEMO_POLY_110/32635/35VMD/2021_LV_LPIS_POLY_110-12880341/S2_10m_2021_LV_LPIS_POLY_110-12880341_32635_2020-08-30_2022-03-03.nc\n",
+ "2024-03-07 14:57:54,824|openeo_gfmap.manager|INFO: Parsed item j-240307f2186344ee9bed9c649679dc92_openEO.nc from job j-240307f2186344ee9bed9c649679dc92\n",
+ "2024-03-07 14:57:54,826|openeo_gfmap.manager|DEBUG: Calling post job action for job j-240307f2186344ee9bed9c649679dc92...\n",
+ "2024-03-07 14:57:55,644|openeo_gfmap.manager|INFO: Added 1 items to the STAC collection.\n",
+ "2024-03-07 14:57:55,646|openeo_gfmap.manager|INFO: Job j-240307f2186344ee9bed9c649679dc92 and post job action finished successfully.\n"
+ ]
+ }
+ ],
+ "source": [
+ "# Run the jobs and create the STAC catalogue\n",
+ "manager.run_jobs(job_df, create_datacube_s2, tracking_job_csv)\n",
+ "manager.create_stac() # By default the STAC catalogue will be saved in the stac subfolder of the base_output_dir folder."
+ ]
+ }
+ ],
+ "metadata": {
+ "kernelspec": {
+ "display_name": "gfmap",
+ "language": "python",
+ "name": "python3"
+ },
+ "language_info": {
+ "codemirror_mode": {
+ "name": "ipython",
+ "version": 3
+ },
+ "file_extension": ".py",
+ "mimetype": "text/x-python",
+ "name": "python",
+ "nbconvert_exporter": "python",
+ "pygments_lexer": "ipython3",
+ "version": "3.9.18"
+ }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 2
+}
diff --git a/examples/feature_extractors/quantile_feature_extractor.py b/examples/feature_extractors/quantile_feature_extractor.py
index aafd8b1..8e2f9e3 100644
--- a/examples/feature_extractors/quantile_feature_extractor.py
+++ b/examples/feature_extractors/quantile_feature_extractor.py
@@ -42,13 +42,13 @@ def output_labels(self) -> list:
def execute(self, inarr: xr.DataArray) -> xr.DataArray:
# compute indices
- b03 = inarr.sel(bands="S2-B03")
- b04 = inarr.sel(bands="S2-B04")
- b05 = inarr.sel(bands="S2-B05")
- b06 = inarr.sel(bands="S2-B06")
- b08 = inarr.sel(bands="S2-B08")
- b11 = inarr.sel(bands="S2-B11")
- b12 = inarr.sel(bands="S2-B12")
+ b03 = inarr.sel(bands="S2-L2A-B03")
+ b04 = inarr.sel(bands="S2-L2A-B04")
+ b05 = inarr.sel(bands="S2-L2A-B05")
+ b06 = inarr.sel(bands="S2-L2A-B06")
+ b08 = inarr.sel(bands="S2-L2A-B08")
+ b11 = inarr.sel(bands="S2-L2A-B11")
+ b12 = inarr.sel(bands="S2-L2A-B12")
ndvi = (b08 - b04) / (b08 + b04)
ndwi = (b03 - b08) / (b03 + b08)
@@ -100,14 +100,14 @@ def execute(self, inarr: xr.DataArray) -> xr.DataArray:
# The bands that you can extract are defined in the code openeo_gfmap.fetching.s2.BASE_SENTINEL2_L2A_MAPPING
bands = [
- "S2-B03",
- "S2-B04",
- "S2-B05",
- "S2-B06",
- "S2-B08",
- "S2-B11",
- "S2-B12",
- "S2-SCL",
+ "S2-L2A-B03",
+ "S2-L2A-B04",
+ "S2-L2A-B05",
+ "S2-L2A-B06",
+ "S2-L2A-B08",
+ "S2-L2A-B11",
+ "S2-L2A-B12",
+ "S2-L2A-SCL",
]
# Use the base feching
diff --git a/pyproject.toml b/pyproject.toml
index 98b6556..bd7c450 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -34,7 +34,8 @@ dependencies = [
"openeo[localprocessing]",
"cftime",
"pyarrow",
- "fastparquet"
+ "fastparquet",
+ "h3",
]
[project.urls]
diff --git a/src/openeo_gfmap/backend.py b/src/openeo_gfmap/backend.py
index ba8cdb8..b932ea8 100644
--- a/src/openeo_gfmap/backend.py
+++ b/src/openeo_gfmap/backend.py
@@ -19,6 +19,7 @@ class Backend(Enum):
TERRASCOPE = "terrascope"
EODC = "eodc" # Dask implementation. Do not test on this yet.
CDSE = "cdse" # Terrascope implementation (pyspark) #URL: openeo.dataspace.copernicus.eu (need to register)
+ CDSE_STAGING = "cdse-staging"
LOCAL = "local" # Based on the same components of EODc
@@ -82,6 +83,14 @@ def cdse_connection() -> openeo.Connection:
)
+def cdse_staging_connection() -> openeo.Connection:
+ """Performs a connection to the CDSE backend using oidc authentication."""
+ return _create_connection(
+ url="openeo-staging.dataspace.copernicus.eu",
+ env_var_suffix="CDSE_STAGING",
+ )
+
+
def eodc_connection() -> openeo.Connection:
"""Perfroms a connection to the EODC backend using the oidc authentication."""
return _create_connection(
@@ -93,4 +102,5 @@ def eodc_connection() -> openeo.Connection:
BACKEND_CONNECTIONS: Dict[Backend, Callable] = {
Backend.TERRASCOPE: vito_connection,
Backend.CDSE: cdse_connection,
+ Backend.CDSE_STAGING: cdse_staging_connection,
}
diff --git a/src/openeo_gfmap/extractions/__init__.py b/src/openeo_gfmap/extractions/__init__.py
deleted file mode 100644
index 543009c..0000000
--- a/src/openeo_gfmap/extractions/__init__.py
+++ /dev/null
@@ -1,11 +0,0 @@
-"""Extraction sub-module.
-
-Logic behind the extraction of training or inference data. Different backends
-are supported in order to obtain a very similar result at the end of this
-component.
-"""
-
-from .extraction import CollectionExtractor
-from .s2 import build_sentinel2_l2a_extractor
-
-__all__ = ["build_sentinel2_l2a_extractor", "CollectionExtractor"]
diff --git a/src/openeo_gfmap/extractions/commons.py b/src/openeo_gfmap/extractions/commons.py
deleted file mode 100644
index d43d523..0000000
--- a/src/openeo_gfmap/extractions/commons.py
+++ /dev/null
@@ -1,41 +0,0 @@
-""" Common operations within collection extraction logic, such as reprojection.
-"""
-from typing import Optional, Union
-
-import openeo
-from rasterio import CRS
-from rasterio.errors import CRSError
-
-
-def resample_reproject(
- datacube: openeo.DataCube, resolution: float, epsg_code: Optional[Union[str, int]]
-) -> openeo.DataCube:
- """Reprojects the given datacube to the target epsg code, if the provided
- epsg code is not None. Also performs checks on the give code to check
- its validity.
- """
- if epsg_code is not None:
- # Checks that the code is valid
- try:
- CRS.from_epsg(int(epsg_code))
- except (CRSError, ValueError) as exc:
- raise ValueError(
- f"Specified target_crs: {epsg_code} is not a valid " "EPSG code."
- ) from exc
- return datacube.resample_spatial(resolution=resolution, projection=epsg_code)
- return datacube.resample_spatial(resolution=resolution)
-
-
-def rename_bands(datacube: openeo.DataCube, mapping: dict) -> openeo.DataCube:
- """Rename the bands from the given mapping scheme"""
- # Filter out bands that are not part of the datacube
- print(datacube.dimension_labels("bands"))
-
- def filter_condition(band_name, _):
- return band_name in datacube.dimension_labels("bands")
-
- mapping = {k: v for k, v in mapping.items() if filter_condition(k, v)}
-
- return datacube.rename_labels(
- dimension="bands", target=list(mapping.values()), source=list(mapping.keys())
- )
diff --git a/src/openeo_gfmap/extractions/extraction.py b/src/openeo_gfmap/extractions/extraction.py
deleted file mode 100644
index 902dc57..0000000
--- a/src/openeo_gfmap/extractions/extraction.py
+++ /dev/null
@@ -1,78 +0,0 @@
-""" Main file for extractions and pre-processing of data through OpenEO
-"""
-
-from typing import Callable
-
-import openeo
-
-from openeo_gfmap import BackendContext
-from openeo_gfmap.spatial import SpatialContext
-from openeo_gfmap.temporal import TemporalContext
-
-
-class CollectionExtractor:
- """Base class to extract a particolar collection.
-
- Parameters
- ----------
- backend_context: BackendContext
- Information about the backend in use, useful in certain cases.
- bands: list
- List of band names to load from that collection.
- collection_fetch: Callable
- Function defining how to fetch a collection for a specific backend,
- the function accepts the following parameters: connection,
- spatial extent, temporal extent, bands and additional parameters.
- collection_preprocessing: Callable
- Function defining how to harmonize the data of a collection in a
- backend. For example, this function could rename the bands as they
- can be different for every backend/collection (SENTINEL2_L2A or
- SENTINEL2_L2A_SENTINELHUB). Accepts the following parameters:
- datacube (of pre-fetched collection) and additional parameters.
- colection_params: dict
- Additional parameters encoded within a dictionnary that will be
- passed in the fetch and preprocessing function.
- """
-
- def __init__(
- self,
- backend_context: BackendContext,
- bands: list,
- collection_fetch: Callable,
- collection_preprocessing: Callable,
- **collection_params,
- ):
- self.backend_contect = backend_context
- self.bands = bands
- self.fetcher = collection_fetch
- self.processing = collection_preprocessing
- self.params = collection_params
-
- def get_cube(
- self,
- connection: openeo.Connection,
- spatial_context: SpatialContext,
- temporal_context: TemporalContext,
- ) -> openeo.DataCube:
- """Retrieve a data cube from the given spatial and temporal context.
-
- Parameters
- ----------
- connection: openeo.Connection
- A connection to an OpenEO backend. The backend provided must be the
- same as the one this extractor class is configured for.
- spatial_extent: SpatialContext
- Either a GeoJSON collection on which spatial filtering will be
- applied or a bounding box with an EPSG code. If a bounding box is
- provided, no filtering is applied and the entirety of the data is
- fetched for that region.
- temporal_extent: TemporalContext
- The begin and end date of the extraction.
- """
- collection_data = self.fetcher(
- connection, spatial_context, temporal_context, self.bands, **self.params
- )
-
- preprocessed_data = self.processing(collection_data, **self.params)
-
- return preprocessed_data
diff --git a/src/openeo_gfmap/extractions/s2.py b/src/openeo_gfmap/extractions/s2.py
deleted file mode 100644
index 342f963..0000000
--- a/src/openeo_gfmap/extractions/s2.py
+++ /dev/null
@@ -1,129 +0,0 @@
-""" Extraction of S2 features, depending on the backend.
-"""
-from typing import Callable
-
-import openeo
-from geojson import GeoJSON
-
-from openeo_gfmap.backend import Backend, BackendContext
-from openeo_gfmap.spatial import BoundingBoxExtent, SpatialContext
-from openeo_gfmap.temporal import TemporalContext
-
-from .commons import rename_bands, resample_reproject
-from .extraction import CollectionExtractor
-
-BASE_SENTINEL2_L2A_MAPPING = {
- "B01": "S2-B01",
- "B02": "S2-B02",
- "B03": "S2-B03",
- "B04": "S2-B04",
- "B05": "S2-B05",
- "B06": "S2-B06",
- "B07": "S2-B07",
- "B08": "S2-B08",
- "B8A": "S2-B8A",
- "B09": "S2-B09",
- "B11": "S2-B11",
- "B12": "S2-B12",
- "AOT": "S2-AOT",
- "SCL": "S2-SCL",
- "SNW": "S2-SNW",
- "CLD": "S2-CLD",
- "CLP": "s2cloudless-CLP",
- "CLM": "s2clodless-CLM",
-}
-
-
-BAND_MAPPINGS = {
- "SENTINEL2_L2A": BASE_SENTINEL2_L2A_MAPPING,
-}
-
-
-def get_s2_l2a_default_fetcher(collection_name: str) -> Callable:
- """Builds the fetch function from the collection name as it stored in the
- target backend.
- """
-
- def s2_l2a_fetch_default(
- connection: openeo.Connection,
- spatial_extent: SpatialContext,
- temporal_extent: TemporalContext,
- bands: list,
- **params,
- ) -> openeo.DataCube:
- """Default collection fetcher for Sentinel_L2A collections.
- Parameters
- ----------
- connection: openeo.Connection
- Connection to a general backend.
- spatial_extent: SpatialContext
- Either a GeoJSON collection or a bounding box of locations.
- Performs spatial filtering if the spatial context is a GeoJSON
- collection, as it implies sparse data.
- temporal_extent: TemporalContexct
- A time range, defined by a start and end date.
- bands: list
- The name of the bands to load from that collection
- Returns
- -------
- openeo.DataCube: a datacube containing the collection raw products.
- """
- if isinstance(spatial_extent, BoundingBoxExtent):
- spatial_extent = dict(spatial_extent)
- elif isinstance(spatial_extent, GeoJSON):
- assert (
- spatial_extent.get("crs", None) is not None
- ), "CRS not defined within GeoJSON collection."
- spatial_extent = dict(spatial_extent)
-
- cube = connection.load_collection(collection_name, spatial_extent, temporal_extent, bands)
-
- # Apply if the collection is a GeoJSON Feature collection
- if isinstance(spatial_extent, GeoJSON):
- cube = cube.filter_spatial(spatial_extent)
-
- return cube
-
- return s2_l2a_fetch_default
-
-
-def get_s2_l2a_default_processor(collection_name: str) -> Callable:
- """Builds the preprocessing function from the collection name as it stored
- in the target backend.
- """
-
- def s2_l2a_default_processor(cube: openeo.DataCube, **params):
- """Default collection preprocessing method.
- This method performs reprojection if specified, upsampling of bands
- at 10m resolution as well as band reprojection.
- """
- # Reproject collection data to target CRS, if specified so
- cube = resample_reproject(cube, 10.0, params.get("target_crs", None))
-
- cube = rename_bands(cube, BAND_MAPPINGS.get(collection_name))
-
- return cube
-
- return s2_l2a_default_processor
-
-
-SENTINEL2_L2A_BACKEND_MAP = {
- Backend.TERRASCOPE: {
- "fetch": get_s2_l2a_default_fetcher("SENTINEL2_L2A"),
- "preprocessor": get_s2_l2a_default_processor("SENTINEL2_L2A"),
- }
-}
-
-
-def build_sentinel2_l2a_extractor(
- backend_context: BackendContext, bands: list, **params
-) -> CollectionExtractor:
- """Creates a S2 L2A extractor adapted to the given backend."""
- backend_functions = SENTINEL2_L2A_BACKEND_MAP.get(backend_context.backend)
-
- fetcher, preprocessor = (
- backend_functions["fetch"],
- backend_functions["preprocessor"],
- )
-
- return CollectionExtractor(backend_context, bands, fetcher, preprocessor, **params)
diff --git a/src/openeo_gfmap/features/feature_extractor.py b/src/openeo_gfmap/features/feature_extractor.py
index 28f6b79..b52ffa0 100644
--- a/src/openeo_gfmap/features/feature_extractor.py
+++ b/src/openeo_gfmap/features/feature_extractor.py
@@ -2,6 +2,8 @@
implementation of feature extractors of a UDF.
"""
+import inspect
+import re
from abc import ABC, abstractmethod
import numpy as np
@@ -13,59 +15,10 @@
from pyproj import Transformer
from pyproj.crs import CRS
-REQUIRED_IMPORTS = """
-from abc import ABC, abstractmethod
-
-import openeo
-from openeo.udf import XarrayDataCube, inspect
-from openeo.udf.udf_data import UdfData
-
-import xarray as xr
-import numpy as np
-from pyproj import Transformer
-from pyproj.crs import CRS
-
-from typing import Union
-"""
-
-
LAT_HARMONIZED_NAME = "GEO-LAT"
LON_HARMONIZED_NAME = "GEO-LON"
EPSG_HARMONIZED_NAME = "GEO-EPSG"
-# To fill in: EPSG_HARMONIZED_NAME, Is it pixel based and Feature Extractor class
-APPLY_DATACUBE_SOURCE_CODE = """
-LAT_HARMONIZED_NAME = "{lat_harmonized_name}"
-LON_HARMONIZED_NAME = "{lon_harmonized_name}"
-EPSG_HARMONIZED_NAME = "{epsg_harmonized_name}"
-
-from openeo.udf import XarrayDataCube
-from openeo.udf.udf_data import UdfData
-
-IS_PIXEL_BASED = {is_pixel_based}
-
-def apply_udf_data(udf_data: UdfData) -> XarrayDataCube:
- feature_extractor = {feature_extractor_class}() # User-defined, feature extractor class initialized here
-
- if not IS_PIXEL_BASED:
- assert len(udf_data.datacube_list) == 1, "OpenEO GFMAP Feature extractor pipeline only supports single input cubes for the tile."
-
- cube = udf_data.datacube_list[0]
- parameters = udf_data.user_context
-
- proj = udf_data.proj
- if proj is not None:
- proj = proj["EPSG"]
-
- parameters[EPSG_HARMONIZED_NAME] = proj
-
- cube = feature_extractor._execute(cube, parameters=parameters)
-
- udf_data.datacube_list = [cube]
-
- return udf_data
-"""
-
class FeatureExtractor(ABC):
"""Base class for all feature extractor UDFs. It provides some common
@@ -184,12 +137,65 @@ def execute(self, inarr: xr.DataArray) -> xr.DataArray:
pass
+def apply_udf_data(udf_data: UdfData) -> XarrayDataCube:
+ feature_extractor_class = ""
+
+ # User-defined, feature extractor class initialized here
+ feature_extractor = feature_extractor_class()
+
+ is_pixel_based = issubclass(feature_extractor_class, PointFeatureExtractor)
+
+ if not is_pixel_based:
+ assert (
+ len(udf_data.datacube_list) == 1
+ ), "OpenEO GFMAP Feature extractor pipeline only supports single input cubes for the tile."
+
+ cube = udf_data.datacube_list[0]
+ parameters = udf_data.user_context
+
+ proj = udf_data.proj
+ if proj is not None:
+ proj = proj["EPSG"]
+
+ parameters[EPSG_HARMONIZED_NAME] = proj
+
+ cube = feature_extractor._execute(cube, parameters=parameters)
+
+ udf_data.datacube_list = [cube]
+
+ return udf_data
+
+
+def _get_imports() -> str:
+ with open(__file__, "r", encoding="UTF-8") as f:
+ script_source = f.read()
+
+ lines = script_source.split("\n")
+
+ imports = []
+ static_globals = []
+
+ for line in lines:
+ if line.strip().startswith(("import ", "from ")):
+ imports.append(line)
+ elif re.match("^[A-Z_0-9]+\s*=.*$", line):
+ static_globals.append(line)
+
+ return "\n".join(imports) + "\n\n" + "\n".join(static_globals)
+
+
+def _get_apply_udf_data(feature_extractor: FeatureExtractor) -> str:
+ source_lines = inspect.getsource(apply_udf_data)
+ source = "".join(source_lines)
+ # replace in the source function the `feature_extractor_class`
+ return source.replace('""', feature_extractor.__name__)
+
+
def generate_udf_code(feature_extractor_class: FeatureExtractor) -> openeo.UDF:
"""Generates the udf code by packing imports of this file, the necessary
superclass and subclasses as well as the user defined feature extractor
class and the apply_datacube function.
"""
- import inspect
# UDF code that will be built here
udf_code = ""
@@ -198,36 +204,12 @@ class and the apply_datacube function.
feature_extractor_class, FeatureExtractor
), "The feature extractor class must be a subclass of FeatureExtractor."
- if issubclass(feature_extractor_class, PatchFeatureExtractor):
- udf_code += f"{REQUIRED_IMPORTS}\n\n"
- udf_code += f"{inspect.getsource(FeatureExtractor)}\n\n"
- udf_code += f"{inspect.getsource(PatchFeatureExtractor)}\n\n"
- udf_code += f"{inspect.getsource(feature_extractor_class)}\n\n"
- udf_code += APPLY_DATACUBE_SOURCE_CODE.format(
- lat_harmonized_name=LAT_HARMONIZED_NAME,
- lon_harmonized_name=LON_HARMONIZED_NAME,
- epsg_harmonized_name=EPSG_HARMONIZED_NAME,
- is_pixel_based=False,
- feature_extractor_class=feature_extractor_class.__name__,
- )
- elif issubclass(feature_extractor_class, PointFeatureExtractor):
- udf_code += f"{REQUIRED_IMPORTS}\n\n"
- udf_code += f"{inspect.getsource(FeatureExtractor)}\n\n"
- udf_code += f"{inspect.getsource(PointFeatureExtractor)}\n\n"
- udf_code += f"{inspect.getsource(feature_extractor_class)}\n\n"
- udf_code += APPLY_DATACUBE_SOURCE_CODE.format(
- lat_harmonized_name=LAT_HARMONIZED_NAME,
- lon_harmonized_name=LON_HARMONIZED_NAME,
- epsg_harmonized_name=EPSG_HARMONIZED_NAME,
- is_pixel_based=True,
- feature_extractor_class=feature_extractor_class.__name__,
- )
- else:
- raise NotImplementedError(
- "The feature extractor must be a subclass of either "
- "PatchFeatureExtractor or PointFeatureExtractor."
- )
-
+ udf_code += _get_imports() + "\n\n"
+ udf_code += f"{inspect.getsource(FeatureExtractor)}\n\n"
+ udf_code += f"{inspect.getsource(PatchFeatureExtractor)}\n\n"
+ udf_code += f"{inspect.getsource(PointFeatureExtractor)}\n\n"
+ udf_code += f"{inspect.getsource(feature_extractor_class)}\n\n"
+ udf_code += _get_apply_udf_data(feature_extractor_class)
return udf_code
@@ -247,13 +229,16 @@ def apply_feature_extractor(
default, the feature extractor expects to receive S1 and S2 data stored in
uint16 with the harmonized naming as implemented in the fetching module.
"""
+ feature_extractor = feature_extractor_class()
+ feature_extractor._parameters = parameters
+ output_labels = feature_extractor.output_labels()
udf_code = generate_udf_code(feature_extractor_class)
udf = openeo.UDF(code=udf_code, context=parameters)
cube = cube.apply_neighborhood(process=udf, size=size, overlap=overlap)
- return cube.rename_labels(dimension="bands", target=feature_extractor_class().output_labels())
+ return cube.rename_labels(dimension="bands", target=output_labels)
def apply_feature_extractor_local(
@@ -264,6 +249,10 @@ def apply_feature_extractor_local(
excepts for the cube parameter which expects a `xarray.DataArray` instead of
a `openeo.rest.datacube.DataCube` object.
"""
+ feature_extractor = feature_extractor_class()
+ feature_extractor._parameters = parameters
+ output_labels = feature_extractor.output_labels()
+
udf_code = generate_udf_code(feature_extractor_class)
udf = openeo.UDF(code=udf_code, context=parameters)
@@ -276,8 +265,4 @@ def apply_feature_extractor_local(
assert len(output_cubes) == 1, "UDF should have only a single output cube."
- return (
- output_cubes[0]
- .get_array()
- .assign_coords({"bands": feature_extractor_class().output_labels()})
- )
+ return output_cubes[0].get_array().assign_coords({"bands": output_labels})
diff --git a/src/openeo_gfmap/fetching/commons.py b/src/openeo_gfmap/fetching/commons.py
index 898c86d..69dc65f 100644
--- a/src/openeo_gfmap/fetching/commons.py
+++ b/src/openeo_gfmap/fetching/commons.py
@@ -111,12 +111,16 @@ def load_collection(
properties=load_collection_parameters,
)
elif fetch_type == FetchType.POLYGON:
- assert isinstance(
- spatial_extent, GeoJSON
- ), "Please provide only a GeoJSON FeatureCollection for point based fetching."
- assert (
- spatial_extent["type"] == "FeatureCollection"
- ), "Please provide a FeatureCollection type of GeoJSON"
+ if isinstance(spatial_extent, GeoJSON):
+ assert (
+ spatial_extent["type"] == "FeatureCollection"
+ ), "Please provide a FeatureCollection type of GeoJSON"
+ elif isinstance(spatial_extent, str):
+ assert spatial_extent.startswith("https://") or spatial_extent.startswith(
+ "http://"
+ ), "Please provide a valid URL or a path to a GeoJSON file."
+ else:
+ raise ValueError("Please provide a valid URL to a GeoParquet or GeoJSON file.")
cube = connection.load_collection(
collection_id=collection_name,
temporal_extent=[temporal_extent.start_date, temporal_extent.end_date],
@@ -133,6 +137,13 @@ def load_collection(
cube = cube.mask(pre_mask.resample_cube_spatial(cube))
if fetch_type == FetchType.POLYGON:
- cube = cube.filter_spatial(spatial_extent)
+ if isinstance(spatial_extent, str):
+ geometry = connection.load_url(
+ spatial_extent,
+ format="Parquet" if ".parquet" in spatial_extent else "GeoJSON",
+ )
+ cube = cube.filter_spatial(geometry)
+ else:
+ cube = cube.filter_spatial(spatial_extent)
return cube
diff --git a/src/openeo_gfmap/fetching/s1.py b/src/openeo_gfmap/fetching/s1.py
index cf061fb..17d8454 100644
--- a/src/openeo_gfmap/fetching/s1.py
+++ b/src/openeo_gfmap/fetching/s1.py
@@ -19,10 +19,10 @@
from .fetching import CollectionFetcher, FetchType
BASE_SENTINEL1_GRD_MAPPING = {
- "VH": "S1-VH",
- "HH": "S1-HH",
- "HV": "S1-HV",
- "VV": "S1-VV",
+ "VH": "S1-SIGMA0-VH",
+ "HH": "S1-SIGMA0-HH",
+ "HV": "S1-SIGMA0-HV",
+ "VV": "S1-SIGMA0-VV",
}
@@ -130,6 +130,10 @@ def s1_grd_default_processor(cube: openeo.DataCube, **params):
"default": partial(get_s1_grd_default_fetcher, collection_name="SENTINEL1_GRD"),
"preprocessor": partial(get_s1_grd_default_processor, collection_name="SENTINEL1_GRD"),
},
+ Backend.CDSE_STAGING: {
+ "default": partial(get_s1_grd_default_fetcher, collection_name="SENTINEL1_GRD"),
+ "preprocessor": partial(get_s1_grd_default_processor, collection_name="SENTINEL1_GRD"),
+ },
}
diff --git a/src/openeo_gfmap/fetching/s2.py b/src/openeo_gfmap/fetching/s2.py
index 0f7b3b3..d513fba 100644
--- a/src/openeo_gfmap/fetching/s2.py
+++ b/src/openeo_gfmap/fetching/s2.py
@@ -20,39 +20,39 @@
from .fetching import CollectionFetcher, FetchType
BASE_SENTINEL2_L2A_MAPPING = {
- "B01": "S2-B01",
- "B02": "S2-B02",
- "B03": "S2-B03",
- "B04": "S2-B04",
- "B05": "S2-B05",
- "B06": "S2-B06",
- "B07": "S2-B07",
- "B08": "S2-B08",
- "B8A": "S2-B8A",
- "B09": "S2-B09",
- "B11": "S2-B11",
- "B12": "S2-B12",
- "AOT": "S2-AOT",
- "SCL": "S2-SCL",
- "SNW": "S2-SNW",
+ "B01": "S2-L2A-B01",
+ "B02": "S2-L2A-B02",
+ "B03": "S2-L2A-B03",
+ "B04": "S2-L2A-B04",
+ "B05": "S2-L2A-B05",
+ "B06": "S2-L2A-B06",
+ "B07": "S2-L2A-B07",
+ "B08": "S2-L2A-B08",
+ "B8A": "S2-L2A-B8A",
+ "B09": "S2-L2A-B09",
+ "B11": "S2-L2A-B11",
+ "B12": "S2-L2A-B12",
+ "AOT": "S2-L2A-AOT",
+ "SCL": "S2-L2A-SCL",
+ "SNW": "S2-L2A-SNW",
}
ELEMENT84_SENTINEL2_L2A_MAPPING = {
- "coastal": "S2-B01",
- "blue": "S2-B02",
- "green": "S2-B03",
- "red": "S2-B04",
- "rededge1": "S2-B05",
- "rededge2": "S2-B06",
- "rededge3": "S2-B07",
- "nir": "S2-B08",
- "nir08": "S2-B8A",
- "nir09": "S2-B09",
- "cirrus": "S2-B10",
- "swir16": "S2-B11",
- "swir22": "S2-B12",
- "scl": "S2-SCL",
- "aot": "S2-AOT",
+ "coastal": "S2-L2A-B01",
+ "blue": "S2-L2A-B02",
+ "green": "S2-L2A-B03",
+ "red": "S2-L2A-B04",
+ "rededge1": "S2-L2A-B05",
+ "rededge2": "S2-L2A-B06",
+ "rededge3": "S2-L2A-B07",
+ "nir": "S2-L2A-B08",
+ "nir08": "S2-L2A-B8A",
+ "nir09": "S2-L2A-B09",
+ "cirrus": "S2-L2A-B10",
+ "swir16": "S2-L2A-B11",
+ "swir22": "S2-L2A-B12",
+ "scl": "S2-L2A-SCL",
+ "aot": "S2-L2A-AOT",
}
@@ -105,10 +105,6 @@ def s2_l2a_fetch_default(
**params,
)
- # Apply if the collection is a GeoJSON Feature collection
- if isinstance(spatial_extent, GeoJSON):
- cube = cube.filter_spatial(spatial_extent)
-
return cube
return s2_l2a_fetch_default
@@ -190,6 +186,10 @@ def s2_l2a_default_processor(cube: openeo.DataCube, **params):
"fetch": partial(get_s2_l2a_default_fetcher, collection_name="SENTINEL2_L2A"),
"preprocessor": partial(get_s2_l2a_default_processor, collection_name="SENTINEL2_L2A"),
},
+ Backend.CDSE_STAGING: {
+ "fetch": partial(get_s2_l2a_default_fetcher, collection_name="SENTINEL2_L2A"),
+ "preprocessor": partial(get_s2_l2a_default_processor, collection_name="SENTINEL2_L2A"),
+ },
}
diff --git a/src/openeo_gfmap/manager/__init__.py b/src/openeo_gfmap/manager/__init__.py
new file mode 100644
index 0000000..4e6c070
--- /dev/null
+++ b/src/openeo_gfmap/manager/__init__.py
@@ -0,0 +1,6 @@
+"""OpenEO GFMAP Manager submodule. Implements the logic of splitting the jobs into subjobs and
+managing the subjobs.
+"""
+import logging
+
+_log = logging.getLogger(__name__)
diff --git a/src/openeo_gfmap/manager/job_manager.py b/src/openeo_gfmap/manager/job_manager.py
new file mode 100644
index 0000000..6021cb2
--- /dev/null
+++ b/src/openeo_gfmap/manager/job_manager.py
@@ -0,0 +1,304 @@
+import json
+import queue
+import shutil
+import threading
+from enum import Enum
+from pathlib import Path
+from tempfile import NamedTemporaryFile
+from typing import Callable, Optional, Union
+
+import pandas as pd
+import pystac
+from openeo.extra.job_management import MultiBackendJobManager
+from openeo.rest.job import BatchJob
+from pystac import CatalogType
+
+from openeo_gfmap.manager import _log
+from openeo_gfmap.stac import constants
+
+
+class PostJobStatus(Enum):
+ """Indicates the workers if the job finished as sucessful or with an error."""
+
+ FINISHED = "finished"
+ ERROR = "error"
+
+
+class GFMAPJobManager(MultiBackendJobManager):
+ """A job manager for the GFMAP backend."""
+
+ def __init__(
+ self,
+ output_dir: Path,
+ output_path_generator: Callable,
+ collection_id: str,
+ collection_description: str = "",
+ post_job_action: Optional[Callable] = None,
+ poll_sleep: int = 5,
+ n_threads: int = 1,
+ post_job_params: dict = {},
+ ):
+ self._output_dir = output_dir
+
+ # Setup the threads to work on the on_job_done and on_job_error methods
+ self._finished_job_queue = queue.Queue()
+ self._n_threads = n_threads
+
+ self._threads = []
+
+ self._output_path_gen = output_path_generator
+ self._post_job_action = post_job_action
+ self._post_job_params = post_job_params
+
+ # Monkey patching the _normalize_df method to ensure we have no modification on the
+ # geometry column
+ MultiBackendJobManager._normalize_df = self._normalize_df
+ super().__init__(poll_sleep)
+
+ # Generate the root STAC collection
+ self._root_collection = pystac.Collection(
+ id=collection_id,
+ description=collection_description,
+ extent=None,
+ )
+
+ def _post_job_worker(self):
+ """Checks which jobs are finished or failed and calls the `on_job_done` or `on_job_error`
+ methods."""
+ while True:
+ try:
+ status, job, row = self._finished_job_queue.get(timeout=1)
+ _log.debug(
+ f"Worker thread {threading.current_thread().name}: polled finished job with status {status}."
+ )
+ if status == PostJobStatus.ERROR:
+ self.on_job_error(job, row)
+ elif status == PostJobStatus.FINISHED:
+ self.on_job_done(job, row)
+ else:
+ raise ValueError(f"Unknown status: {status}")
+ self._finished_job_queue.task_done()
+ except queue.Empty:
+ continue
+ except KeyboardInterrupt:
+ _log.debug(f"Worker thread {threading.current_thread().name} interrupted.")
+ return
+
+ def _update_statuses(self, df: pd.DataFrame):
+ """Updates the statues of the jobs in the dataframe from the backend. If a job is finished
+ or failed, it will be queued to the `on_job_done` or `on_job_error` methods.
+
+ The method is executed every `poll_sleep` seconds.
+ """
+ active = df[df.status.isin(["created", "queued", "running"])]
+ for idx, row in active.iterrows():
+ # Parses the backend from the csv
+ connection = self._get_connection(row.backend_name)
+ job = connection.job(row.id)
+ job_metadata = job.describe_job()
+ job_status = job_metadata["status"]
+ _log.debug(
+ msg=f"Status of job {job.job_id} is {job_status} (on backend {row.backend_name}).",
+ )
+
+ # Update the status if the job finished since last check
+ # Case is which it finished sucessfully
+ if (df.loc[idx, "status"] in ["created", "queued", "running"]) and (
+ job_metadata["status"] == "finished"
+ ):
+ _log.info(f"Job {job.job_id} finished successfully, queueing on_job_done...")
+ self._finished_job_queue.put((PostJobStatus.FINISHED, job, row))
+ df.loc[idx, "costs"] = job_metadata["costs"]
+
+ # Case in which it failed
+ if (df.loc[idx, "status"] != "error") and (job_metadata["status"] == "error"):
+ _log.info(f"Job {job.job_id} finished with error, queueing on_job_error...")
+ self._finished_job_queue.put((PostJobStatus.ERROR, job, row))
+ df.loc[idx, "costs"] = job_metadata["costs"]
+
+ df.loc[idx, "status"] = job_status
+
+ def on_job_error(self, job: BatchJob, row: pd.Series):
+ """Method called when a job finishes with an error.
+
+ Parameters
+ ----------
+ job: BatchJob
+ The job that finished with an error.
+ row: pd.Series
+ The row in the dataframe that contains the job relative information.
+ """
+ logs = job.logs()
+ error_logs = [log for log in logs if log.level.lower() == "error"]
+
+ job_metadata = job.describe_job()
+ title = job_metadata["title"]
+ job_id = job_metadata["id"]
+
+ output_log_path = Path(self._output_dir) / "failed_jobs" / f"{title}_{job_id}.log"
+ output_log_path.parent.mkdir(parents=True, exist_ok=True)
+
+ if len(error_logs) > 0:
+ output_log_path.write_text(json.dumps(error_logs, indent=2))
+ else:
+ output_log_path.write_text(
+ f"Couldn't find any error logs. Please check the error manually on job ID: {job.job_id}."
+ )
+
+ def on_job_done(self, job: BatchJob, row: pd.Series):
+ """Method called when a job finishes successfully. It will first download the results of
+ the job and then call the `post_job_action` method.
+ """
+ job_products = {}
+ for idx, asset in enumerate(job.get_results().get_assets()):
+ temp_file = NamedTemporaryFile(delete=False)
+ try:
+ _log.debug(
+ f"Downloading asset {asset.name} from job {job.job_id} -> {temp_file.name}"
+ )
+ asset.download(temp_file.name)
+ _log.debug(
+ f"Generating output path for asset {asset.name} from job {job.job_id}..."
+ )
+ output_path = self._output_path_gen(self._output_dir, temp_file.name, idx, row)
+ _log.debug(
+ f"Generated path for asset {asset.name} from job {job.job_id} -> {output_path}"
+ )
+ # Make the output path
+ output_path.parent.mkdir(parents=True, exist_ok=True)
+ # Move the temporary file to the final location
+ shutil.move(temp_file.name, output_path)
+ # Add to the list of downloaded products
+ job_products[f"{job.job_id}_{asset.name}"] = [output_path]
+ _log.info(f"Downloaded asset {asset.name} from job {job.job_id} -> {output_path}")
+ except Exception as e:
+ _log.exception(f"Error downloading asset {asset.name} from job {job.job_id}", e)
+ raise e
+ finally:
+ shutil.rmtree(temp_file.name, ignore_errors=True)
+
+ # First update the STAC collection with the assets directly resulting from the OpenEO batch job
+ job_metadata = pystac.Collection.from_dict(job.get_results().get_metadata())
+ job_items = []
+
+ for item_metadata in job_metadata.get_all_items():
+ try:
+ item = pystac.read_file(item_metadata.get_self_href())
+ asset_path = job_products[f"{job.job_id}_{item.id}"][0]
+
+ assert len(item.assets.values()) == 1, "Each item should only contain one asset"
+ for asset in item.assets.values():
+ asset.href = str(
+ asset_path
+ ) # Update the asset href to the output location set by the output_path_generator
+ item.id = f"{job.job_id}_{item.id}"
+ # Add the item to the the current job items.
+ job_items.append(item)
+ _log.info(f"Parsed item {item.id} from job {job.job_id}")
+ except Exception as e:
+ _log.exception(
+ f"Error failed to add item {item.id} from job {job.job_id} to STAC collection",
+ e,
+ )
+ raise e
+
+ # _post_job_action returns an updated list of stac items. Post job action can therefore
+ # update the stac items and access their products through the HREF. It is also the
+ # reponsible of adding the appropriate metadata/assets to the items.
+ if self._post_job_action is not None:
+ _log.debug(f"Calling post job action for job {job.job_id}...")
+ job_items = self._post_job_action(job_items, row, self._post_job_params)
+
+ self._root_collection.add_items(job_items)
+ _log.info(f"Added {len(job_items)} items to the STAC collection.")
+
+ _log.info(f"Job {job.job_id} and post job action finished successfully.")
+
+ def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame:
+ """Ensure we have the required columns and the expected type for the geometry column.
+
+ :param df: The dataframe to normalize.
+ :return: a new dataframe that is normalized.
+ """
+
+ # check for some required columns.
+ required_with_default = [
+ ("status", "not_started"),
+ ("id", None),
+ ("start_time", None),
+ ("cpu", None),
+ ("memory", None),
+ ("duration", None),
+ ("backend_name", None),
+ ("description", None),
+ ("costs", None),
+ ]
+ new_columns = {col: val for (col, val) in required_with_default if col not in df.columns}
+ df = df.assign(**new_columns)
+
+ _log.debug(f"Normalizing dataframe. Columns: {df.columns}")
+
+ return df
+
+ def run_jobs(self, df: pd.DataFrame, start_job: Callable, output_file: Union[str, Path]):
+ """Starts the jobs defined in the dataframe and runs the `start_job` function on each job.
+
+ Parameters
+ ----------
+ df: pd.DataFrame
+ The dataframe containing the jobs to be started. The dataframe expects the following columns:
+
+ * `backend_name`: Name of the backend to use.
+ * Additional fields that will be used in your custom job creation function `start_job`
+ as well as in post-job actions and path generator.
+
+ The following column names are RESERVED for the managed of the jobs, please do not
+ provide them in the input df:
+
+ * `status`: Current status of the job.
+ * `id`: Job ID, used to access job information from the backend.
+ * `start_time`: The time at which the job was started.
+ * `cpu`: The amount of CPU used by the job.
+ * `memory`: The amount of memory used by the job.
+ * `duration`: The duration of the job.
+
+ start_job: Callable
+ Callable function that will take in argument the rows of each job and that will
+ create a datacube.
+ output_file: Union[str, Path]
+ The file to track the results of the jobs.
+ """
+ _log.info(f"Starting job manager using {self._n_threads} worker threads.")
+ # Starts the thread pool to work on the on_job_done and on_job_error methods
+ for _ in range(self._n_threads):
+ thread = threading.Thread(target=self._post_job_worker)
+ thread.start()
+ self._threads.append(thread)
+
+ _log.info("Workers started, creating and running jobs.")
+ super().run_jobs(df, start_job, output_file)
+
+ def create_stac(self, output_path: Optional[Union[str, Path]] = None):
+ """Method to be called after run_jobs to create a STAC catalog
+ and write it to self._output_dir
+ """
+ if output_path is None:
+ output_path = self._output_dir / "stac"
+
+ self._root_collection.license = constants.LICENSE
+ self._root_collection.add_link(constants.LICENSE_LINK)
+ self._root_collection.stac_extensions = constants.STAC_EXTENSIONS
+
+ datacube_extension = pystac.extensions.datacube.DatacubeExtension.ext(
+ self._root_collection, add_if_missing=True
+ )
+ datacube_extension.apply(constants.CUBE_DIMENSIONS)
+
+ item_asset_extension = pystac.extensions.item_assets.ItemAssetsExtension.ext(
+ self._root_collection, add_if_missing=True
+ )
+ item_asset_extension.item_assets = constants.ITEM_ASSETS
+
+ self._root_collection.update_extent_from_items()
+ self._root_collection.normalize_hrefs(str(output_path))
+ self._root_collection.save(catalog_type=CatalogType.SELF_CONTAINED)
diff --git a/src/openeo_gfmap/manager/job_splitters.py b/src/openeo_gfmap/manager/job_splitters.py
new file mode 100644
index 0000000..a696b5c
--- /dev/null
+++ b/src/openeo_gfmap/manager/job_splitters.py
@@ -0,0 +1,63 @@
+"""Job splitter functionalities, except input points/polygons to extract in the
+form of a GeoDataFrames.
+"""
+from typing import List
+
+import geopandas as gpd
+import h3
+
+
+def _resplit_group(polygons: gpd.GeoDataFrame, max_points: int) -> List[gpd.GeoDataFrame]:
+ """Performs re-splitting of a dataset of polygons in a list of datasets"""
+ for i in range(0, len(polygons), max_points):
+ yield polygons.iloc[i : i + max_points].reset_index(drop=True)
+
+
+def split_job_hex(
+ polygons: gpd.GeoDataFrame, max_points: int = 500, grid_resolution: int = 3
+) -> List[gpd.GeoDataFrame]:
+ """Split a job into multiple jobs from the position of the polygons/points. The centroid of
+ the geometries to extract are used to select a hexagon in the H3 grid. Using the H3 grid
+ allows to split jobs in equal areas, which is useful for parallel processing while taking into
+ account OpenEO's limitations.
+
+ Parameters
+ ----------
+ polygons: gpd.GeoDataFrae
+ Dataset containing the polygons to split the job by with a `geometry` column.
+ max_points: int
+ The maximum number of points to be included in each job.
+ grid_resolution: int
+ The scale to use in the H3 hexagonal grid to split jobs to, default is 4. Changing the
+ grid scale will drastically increase/decrease the area on which jobs will work.
+ More information on the H3 grid can be found at
+ https://h3geo.org/docs/core-library/restable
+ Returns:
+ --------
+ split_polygons: list
+ List of jobs, split by the GeoDataFrame.
+ """
+
+ if "geometry" not in polygons.columns:
+ raise ValueError("The GeoDataFrame must contain a 'geometry' column.")
+
+ if polygons.crs is None:
+ raise ValueError("The GeoDataFrame must contain a CRS")
+
+ # Project to lat/lon positions
+ polygons = polygons.to_crs(epsg=4326)
+
+ # Split the polygons into multiple jobs
+ polygons["h3index"] = polygons.geometry.centroid.apply(
+ lambda pt: h3.geo_to_h3(pt.y, pt.x, grid_resolution)
+ )
+
+ split_datasets = []
+ for _, sub_gdf in polygons.groupby("h3index"):
+ if len(sub_gdf) > max_points:
+ # Performs another split
+ split_datasets.extend(_resplit_group(sub_gdf, max_points))
+ else:
+ split_datasets.append(sub_gdf.reset_index(drop=True))
+
+ return split_datasets
diff --git a/src/openeo_gfmap/preprocessing/cloudmasking.py b/src/openeo_gfmap/preprocessing/cloudmasking.py
index e605689..93289d3 100644
--- a/src/openeo_gfmap/preprocessing/cloudmasking.py
+++ b/src/openeo_gfmap/preprocessing/cloudmasking.py
@@ -6,8 +6,8 @@
import openeo
from openeo.processes import if_, is_nan
-SCL_HARMONIZED_NAME: str = "S2-SCL"
-BAPSCORE_HARMONIZED_NAME: str = "S2-BAPSCORE"
+SCL_HARMONIZED_NAME: str = "S2-L2A-SCL"
+BAPSCORE_HARMONIZED_NAME: str = "S2-L2A-BAPSCORE"
def mask_scl_dilation(cube: openeo.DataCube, **params: dict) -> openeo.DataCube:
@@ -91,7 +91,7 @@ def get_bap_score(cube: openeo.DataCube, **params: dict) -> openeo.DataCube:
Returns
-------
openeo.DataCube
- A 4D datacube containing the BAP score as name 'S2-BAPSCORE'.
+ A 4D datacube containing the BAP score as name 'S2-L2A-BAPSCORE'.
"""
udf_path = Path(__file__).parent / "udf_score.py"
@@ -159,7 +159,7 @@ def get_bap_mask(cube: openeo.DataCube, period: Union[str, list], **params: dict
openeo.DataCube
The datacube with the BAP mask applied.
"""
- # Checks if the S2-SCL band is present in the datacube
+ # Checks if the S2-L2A-SCL band is present in the datacube
assert (
SCL_HARMONIZED_NAME in cube.metadata.band_names
), f"The {SCL_HARMONIZED_NAME} band is not present in the datacube."
@@ -196,7 +196,7 @@ def max_score_selection(score):
f"'period' must be a string or a list of dates (in YYYY-mm-dd format), got {period}."
)
- return rank_mask.rename_labels("bands", ["S2-BAPMASK"])
+ return rank_mask.rename_labels("bands", ["S2-L2A-BAPMASK"])
def bap_masking(cube: openeo.DataCube, period: Union[str, list], **params: dict):
diff --git a/src/openeo_gfmap/preprocessing/udf_rank.py b/src/openeo_gfmap/preprocessing/udf_rank.py
index 7271b61..043712d 100644
--- a/src/openeo_gfmap/preprocessing/udf_rank.py
+++ b/src/openeo_gfmap/preprocessing/udf_rank.py
@@ -16,7 +16,7 @@ def apply_datacube(cube: XarrayDataCube, context: dict) -> XarrayDataCube:
intervals = context.get("intervals", None)
array = cube.get_array().transpose("t", "bands", "y", "x")
- bap_score = array.sel(bands="S2-BAPSCORE")
+ bap_score = array.sel(bands="S2-L2A-BAPSCORE")
def select_maximum(score: xr.DataArray):
max_score = score.max(dim="t")
diff --git a/src/openeo_gfmap/spatial.py b/src/openeo_gfmap/spatial.py
index 6625120..f4d419f 100644
--- a/src/openeo_gfmap/spatial.py
+++ b/src/openeo_gfmap/spatial.py
@@ -44,4 +44,4 @@ def to_geometry(self) -> Polygon:
return box(self.west, self.south, self.east, self.north)
-SpatialContext = Union[GeoJSON, BoundingBoxExtent]
+SpatialContext = Union[GeoJSON, BoundingBoxExtent, str]
diff --git a/src/openeo_gfmap/stac/__init__.py b/src/openeo_gfmap/stac/__init__.py
new file mode 100644
index 0000000..a0b68a1
--- /dev/null
+++ b/src/openeo_gfmap/stac/__init__.py
@@ -0,0 +1,6 @@
+"""Definitions of the constants in the STAC collection
+"""
+
+from openeo_gfmap.stac.constants import AUXILIARY
+
+__all__ = ["AUXILIARY"]
diff --git a/src/openeo_gfmap/stac/constants.py b/src/openeo_gfmap/stac/constants.py
new file mode 100644
index 0000000..0213547
--- /dev/null
+++ b/src/openeo_gfmap/stac/constants.py
@@ -0,0 +1,229 @@
+"""
+Constants in the STAC collection generated after a series of batch jobs
+"""
+import pystac
+
+LICENSE = "CC-BY-4.0"
+LICENSE_LINK = pystac.Link(
+ rel="license",
+ target="https://spdx.org/licenses/CC-BY-4.0.html",
+ media_type=pystac.MediaType.HTML,
+ title="Creative Commons Attribution 4.0 International License",
+)
+STAC_EXTENSIONS = [
+ "https://stac-extensions.github.io/eo/v1.1.0/schema.json",
+ "https://stac-extensions.github.io/file/v2.1.0/schema.json",
+ "https://stac-extensions.github.io/processing/v1.1.0/schema.json",
+ "https://stac-extensions.github.io/projection/v1.1.0/schema.json",
+]
+CONSTELLATION = ["sentinel-2"]
+PLATFORM = ["sentinel-2a", "sentinel-2b"]
+INSTRUMENTS = ["msi"]
+GSD = [10, 20, 60]
+SUMMARIES = pystac.Summaries(
+ {
+ "constellation": CONSTELLATION,
+ "platform": PLATFORM,
+ "instruments": INSTRUMENTS,
+ "gsd": GSD,
+ }
+)
+
+
+def create_spatial_dimension(
+ name: str,
+) -> pystac.extensions.datacube.HorizontalSpatialDimension:
+ return pystac.extensions.datacube.HorizontalSpatialDimension(
+ {
+ "axis": name,
+ "step": 10,
+ "reference_system": {
+ "$schema": "https://proj.org/schemas/v0.2/projjson.schema.json",
+ "area": "World",
+ "bbox": {
+ "east_longitude": 180,
+ "north_latitude": 90,
+ "south_latitude": -90,
+ "west_longitude": -180,
+ },
+ "coordinate_system": {
+ "axis": [
+ {
+ "abbreviation": "Lat",
+ "direction": "north",
+ "name": "Geodetic latitude",
+ "unit": "degree",
+ },
+ {
+ "abbreviation": "Lon",
+ "direction": "east",
+ "name": "Geodetic longitude",
+ "unit": "degree",
+ },
+ ],
+ "subtype": "ellipsoidal",
+ },
+ "datum": {
+ "ellipsoid": {
+ "inverse_flattening": 298.257223563,
+ "name": "WGS 84",
+ "semi_major_axis": 6378137,
+ },
+ "name": "World Geodetic System 1984",
+ "type": "GeodeticReferenceFrame",
+ },
+ "id": {"authority": "OGC", "code": "Auto42001", "version": "1.3"},
+ "name": "AUTO 42001 (Universal Transverse Mercator)",
+ "type": "GeodeticCRS",
+ },
+ }
+ )
+
+
+TEMPORAL_DIMENSION = pystac.extensions.datacube.TemporalDimension(
+ {"extent": ["2015-06-23T00:00:00Z", "2019-07-10T13:44:56Z"], "step": "P5D"}
+)
+
+BANDS_DIMENSION = pystac.extensions.datacube.AdditionalDimension(
+ {
+ "values": [
+ "S2-L2A-SCL",
+ "S2-L2A-B01",
+ "S2-L2A-B02",
+ "S2-L2A-B03",
+ "S2-L2A-B04",
+ "S2-L2A-B05",
+ "S2-L2A-B06",
+ "S2-L2A-B07",
+ "S2-L2A-B08",
+ "S2-L2A-B8A",
+ "S2-L2A-B09",
+ "S2-L2A-B10",
+ "S2-L2A-B11",
+ "S2-L2A-B12",
+ "LABEL",
+ ]
+ }
+)
+
+CUBE_DIMENSIONS = {
+ "x": create_spatial_dimension("x"),
+ "y": create_spatial_dimension("y"),
+ "time": TEMPORAL_DIMENSION,
+ "spectral": BANDS_DIMENSION,
+}
+
+SENTINEL2 = pystac.extensions.item_assets.AssetDefinition(
+ {
+ "gsd": 10,
+ "title": "Sentinel2",
+ "description": "Sentinel-2 bands",
+ "type": "application/x-netcdf",
+ "roles": ["data"],
+ "proj:shape": [64, 64],
+ "raster:bands": [{"name": "S2-L2A-B01"}, {"name": "S2-L2A-B02"}],
+ "cube:variables": {
+ "S2-L2A-B01": {"dimensions": ["time", "y", "x"], "type": "data"},
+ "S2-L2A-B02": {"dimensions": ["time", "y", "x"], "type": "data"},
+ "S2-L2A-B03": {"dimensions": ["time", "y", "x"], "type": "data"},
+ "S2-L2A-B04": {"dimensions": ["time", "y", "x"], "type": "data"},
+ "S2-L2A-B05": {"dimensions": ["time", "y", "x"], "type": "data"},
+ "S2-L2A-B06": {"dimensions": ["time", "y", "x"], "type": "data"},
+ "S2-L2A-B07": {"dimensions": ["time", "y", "x"], "type": "data"},
+ "S2-L2A-B8A": {"dimensions": ["time", "y", "x"], "type": "data"},
+ "S2-L2A-B08": {"dimensions": ["time", "y", "x"], "type": "data"},
+ "S2-L2A-B11": {"dimensions": ["time", "y", "x"], "type": "data"},
+ "S2-L2A-B12": {"dimensions": ["time", "y", "x"], "type": "data"},
+ "S2-L2A-SCL": {"dimensions": ["time", "y", "x"], "type": "data"},
+ },
+ "eo:bands": [
+ {
+ "name": "S2-L2A-B01",
+ "common_name": "coastal",
+ "center_wavelength": 0.443,
+ "full_width_half_max": 0.027,
+ },
+ {
+ "name": "S2-L2A-B02",
+ "common_name": "blue",
+ "center_wavelength": 0.49,
+ "full_width_half_max": 0.098,
+ },
+ {
+ "name": "S2-L2A-B03",
+ "common_name": "green",
+ "center_wavelength": 0.56,
+ "full_width_half_max": 0.045,
+ },
+ {
+ "name": "S2-L2A-B04",
+ "common_name": "red",
+ "center_wavelength": 0.665,
+ "full_width_half_max": 0.038,
+ },
+ {
+ "name": "S2-L2A-B05",
+ "common_name": "rededge",
+ "center_wavelength": 0.704,
+ "full_width_half_max": 0.019,
+ },
+ {
+ "name": "S2-L2A-B06",
+ "common_name": "rededge",
+ "center_wavelength": 0.74,
+ "full_width_half_max": 0.018,
+ },
+ {
+ "name": "S2-L2A-B07",
+ "common_name": "rededge",
+ "center_wavelength": 0.783,
+ "full_width_half_max": 0.028,
+ },
+ {
+ "name": "S2-L2A-B08",
+ "common_name": "nir",
+ "center_wavelength": 0.842,
+ "full_width_half_max": 0.145,
+ },
+ {
+ "name": "S2-L2A-B8A",
+ "common_name": "nir08",
+ "center_wavelength": 0.865,
+ "full_width_half_max": 0.033,
+ },
+ {
+ "name": "S2-L2A-B11",
+ "common_name": "swir16",
+ "center_wavelength": 1.61,
+ "full_width_half_max": 0.143,
+ },
+ {
+ "name": "S2-L2A-B12",
+ "common_name": "swir22",
+ "center_wavelength": 2.19,
+ "full_width_half_max": 0.242,
+ },
+ ],
+ }
+)
+
+AUXILIARY = pystac.extensions.item_assets.AssetDefinition(
+ {
+ "title": "ground truth data",
+ "description": "This asset contains the crop type codes.",
+ "type": "application/x-netcdf",
+ "roles": ["data"],
+ "proj:shape": [64, 64],
+ "raster:bands": [{"name": "CROPTYPE", "data_type": "uint16", "bits_per_sample": 16}],
+ }
+)
+
+SENTINEL1 = pystac.extensions.item_assets.AssetDefinition({})
+
+AGERA5 = pystac.extensions.item_assets.AssetDefinition({})
+ITEM_ASSETS = {
+ "sentinel2": SENTINEL2,
+ "auxiliary": AUXILIARY,
+ "sentinel1": SENTINEL1,
+ "agera5": AGERA5,
+}
diff --git a/src/openeo_gfmap/utils/build_df.py b/src/openeo_gfmap/utils/build_df.py
index 7627189..8302b30 100644
--- a/src/openeo_gfmap/utils/build_df.py
+++ b/src/openeo_gfmap/utils/build_df.py
@@ -27,7 +27,7 @@ def load_json(input_file: Path, bands: list) -> pd.DataFrame:
A `pandas.DataFrame` containing a combination of the band names and the
timestamps as column names.
For example, the Sentinel-2 green band on the 1st October 2020 is will
- have the column name `S2-B02:2020-10-01`
+ have the column name `S2-L2A-B02:2020-10-01`
"""
df = pd.read_json(input_file)
diff --git a/src/openeo_gfmap/utils/tile_processing.py b/src/openeo_gfmap/utils/tile_processing.py
index fa1efab..5986eda 100644
--- a/src/openeo_gfmap/utils/tile_processing.py
+++ b/src/openeo_gfmap/utils/tile_processing.py
@@ -20,7 +20,7 @@ def normalize_array(inarr: xr.DataArray, percentile: float = 0.99) -> xr.DataArr
def select_optical_bands(inarr: xr.DataArray) -> xr.DataArray:
"""Filters and keep only the optical bands for a given array."""
return inarr.sel(
- bands=[band for band in inarr.coords["bands"].to_numpy() if band.startswith("S2-B")]
+ bands=[band for band in inarr.coords["bands"].to_numpy() if band.startswith("S2-L2A-B")]
)
diff --git a/tests/test_openeo_gfmap/resources/test_optical_cube.nc b/tests/test_openeo_gfmap/resources/test_optical_cube.nc
index ce7f8fb..8d5e2c0 100644
Binary files a/tests/test_openeo_gfmap/resources/test_optical_cube.nc and b/tests/test_openeo_gfmap/resources/test_optical_cube.nc differ
diff --git a/tests/test_openeo_gfmap/resources/wc_extraction_dataset.gpkg b/tests/test_openeo_gfmap/resources/wc_extraction_dataset.gpkg
new file mode 100644
index 0000000..0136a4a
Binary files /dev/null and b/tests/test_openeo_gfmap/resources/wc_extraction_dataset.gpkg differ
diff --git a/tests/test_openeo_gfmap/test_cloud_masking.py b/tests/test_openeo_gfmap/test_cloud_masking.py
index 2328f9d..d8b88dc 100644
--- a/tests/test_openeo_gfmap/test_cloud_masking.py
+++ b/tests/test_openeo_gfmap/test_cloud_masking.py
@@ -42,7 +42,7 @@ def test_bap_score(backend: Backend):
# Fetch the datacube
s2_extractor = build_sentinel2_l2a_extractor(
backend_context=backend_context,
- bands=["S2-B04", "S2-B08", "S2-SCL"],
+ bands=["S2-L2A-B04", "S2-L2A-B08", "S2-L2A-SCL"],
fetch_type=FetchType.TILE,
**fetching_parameters,
)
@@ -51,9 +51,9 @@ def test_bap_score(backend: Backend):
# Compute the BAP score
bap_score = get_bap_score(cube, **preprocessing_parameters)
- ndvi = cube.ndvi(nir="S2-B08", red="S2-B04")
+ ndvi = cube.ndvi(nir="S2-L2A-B08", red="S2-L2A-B04")
- cube = bap_score.merge_cubes(ndvi).rename_labels("bands", ["S2-BAPSCORE", "S2-NDVI"])
+ cube = bap_score.merge_cubes(ndvi).rename_labels("bands", ["S2-L2A-BAPSCORE", "S2-L2A-NDVI"])
job = cube.create_job(
title="BAP score unittest",
@@ -78,7 +78,7 @@ def test_bap_masking(backend: Backend):
# Fetch the datacube
s2_extractor = build_sentinel2_l2a_extractor(
backend_context=backend_context,
- bands=["S2-B04", "S2-B03", "S2-B02", "S2-SCL"],
+ bands=["S2-L2A-B04", "S2-L2A-B03", "S2-L2A-B02", "S2-L2A-SCL"],
fetch_type=FetchType.TILE,
**fetching_parameters,
)
@@ -96,7 +96,7 @@ def test_bap_masking(backend: Backend):
cube = cube.linear_scale_range(0, 65535, 0, 65535)
# Remove SCL
- cube = cube.filter_bands([band for band in cube.metadata.band_names if band != "S2-SCL"])
+ cube = cube.filter_bands([band for band in cube.metadata.band_names if band != "S2-L2A-SCL"])
job = cube.create_job(
title="BAP compositing unittest",
@@ -122,7 +122,7 @@ def test_bap_quintad(backend: Backend):
# Fetch the datacube
s2_extractor = build_sentinel2_l2a_extractor(
backend_context=backend_context,
- bands=["S2-SCL"],
+ bands=["S2-L2A-SCL"],
fetch_type=FetchType.TILE,
**fetching_parameters,
)
@@ -171,7 +171,7 @@ def test_bap_quintad(backend: Backend):
s2_extractor = build_sentinel2_l2a_extractor(
backend_context=backend_context,
- bands=["S2-B04", "S2-B03", "S2-B02", "S2-B08", "S2-SCL"],
+ bands=["S2-L2A-B04", "S2-L2A-B03", "S2-L2A-B02", "S2-L2A-B08", "S2-L2A-SCL"],
fetch_type=FetchType.TILE,
**fetching_parameters,
)
diff --git a/tests/test_openeo_gfmap/test_feature_extractors.py b/tests/test_openeo_gfmap/test_feature_extractors.py
index 621c010..8fe91e4 100644
--- a/tests/test_openeo_gfmap/test_feature_extractors.py
+++ b/tests/test_openeo_gfmap/test_feature_extractors.py
@@ -34,7 +34,7 @@ def execute(self, inarr: xr.DataArray):
from scipy.ndimage import gaussian_filter
# Performs some gaussian filtering to blur the RGB bands
- rgb_bands = inarr.sel(bands=["S2-B04", "S2-B03", "S2-B02"])
+ rgb_bands = inarr.sel(bands=["S2-L2A-B04", "S2-L2A-B03", "S2-L2A-B02"])
for band in rgb_bands.bands:
for timestamp in rgb_bands.t:
@@ -78,7 +78,7 @@ def test_patch_feature_udf(backend: Backend, connection_fn: Callable):
connection = connection_fn()
output_path = Path(__file__).parent / f"results/patch_features_{backend.value}.nc/"
- bands_to_extract = ["S2-B04", "S2-B03", "S2-B02"]
+ bands_to_extract = ["S2-L2A-B04", "S2-L2A-B03", "S2-L2A-B02"]
# Setup the RGB cube extraction
extractor = build_sentinel2_l2a_extractor(backend_context, bands_to_extract, FetchType.TILE)
@@ -120,7 +120,7 @@ def test_latlon_extractor(backend: Backend, connection_fn: Callable):
REDUCED_TEMPORAL_CONTEXT = TemporalContext(start_date="2023-06-01", end_date="2023-06-30")
- bands_to_extract = ["S2-B04"]
+ bands_to_extract = ["S2-L2A-B04"]
extractor = build_sentinel2_l2a_extractor(backend_context, bands_to_extract, FetchType.TILE)
diff --git a/tests/test_openeo_gfmap/test_managers.py b/tests/test_openeo_gfmap/test_managers.py
new file mode 100644
index 0000000..5ed266b
--- /dev/null
+++ b/tests/test_openeo_gfmap/test_managers.py
@@ -0,0 +1,23 @@
+"""Test the job splitters and managers of OpenEO GFMAP."""
+from pathlib import Path
+
+import geopandas as gpd
+
+from openeo_gfmap.manager.job_splitters import split_job_hex
+
+
+def test_split_jobs():
+ dataset_path = Path(__file__).parent / "resources/wc_extraction_dataset.gpkg"
+
+ # Load the dataset
+ dataset = gpd.read_file(dataset_path)
+
+ # Split the dataset
+ split_dataset = split_job_hex(dataset, max_points=500)
+
+ # Check the number of splits
+ assert len(split_dataset) > 1
+
+ for ds in split_dataset:
+ print(len(ds))
+ assert len(ds) <= 500
diff --git a/tests/test_openeo_gfmap/test_s1_fetchers.py b/tests/test_openeo_gfmap/test_s1_fetchers.py
index bf1c7d9..d9cde85 100644
--- a/tests/test_openeo_gfmap/test_s1_fetchers.py
+++ b/tests/test_openeo_gfmap/test_s1_fetchers.py
@@ -46,8 +46,8 @@ def sentinel1_grd(
):
context = BackendContext(backend)
country = spatial_extent["country"]
- bands = ["S1-VV", "S1-VH"]
- expected_harmonized_bands = ["S1-VV", "S1-VH"]
+ bands = ["S1-SIGMA0-VV", "S1-SIGMA0-VH"]
+ expected_harmonized_bands = ["S1-SIGMA0-VV", "S1-SIGMA0-VH"]
fetching_parameters = {
"target_resolution": 10.0,
@@ -152,7 +152,7 @@ def sentinel1_grd_point_based(
given polygons.
"""
context = BackendContext(backend)
- bands = ["S1-VV", "S1-VH"]
+ bands = ["S1-SIGMA0-VV", "S1-SIGMA0-VH"]
# Because it is tested in malawi, and this is the EPSG code for
# the UTM projection in that zone
@@ -203,7 +203,7 @@ def sentinel1_grd_polygon_based(
connection: openeo.Connection,
):
context = BackendContext(backend)
- bands = ["S1-VV", "S1-VH"]
+ bands = ["S1-SIGMA0-VV", "S1-SIGMA0-VH"]
fetching_parameters = {
"target_crs": 3035, # Location in Europe
diff --git a/tests/test_openeo_gfmap/test_s2_fetchers.py b/tests/test_openeo_gfmap/test_s2_fetchers.py
index c6bbfe7..0e18672 100644
--- a/tests/test_openeo_gfmap/test_s2_fetchers.py
+++ b/tests/test_openeo_gfmap/test_s2_fetchers.py
@@ -86,14 +86,21 @@ def sentinel2_l2a(
country = spatial_extent["country"]
# Fetch a variety of spatial resolution and metadata from different
# providers.
- bands = ["S2-B01", "S2-B04", "S2-B08", "S2-B11", "S2-SCL", "S2-AOT"]
+ bands = [
+ "S2-L2A-B01",
+ "S2-L2A-B04",
+ "S2-L2A-B08",
+ "S2-L2A-B11",
+ "S2-L2A-SCL",
+ "S2-L2A-AOT",
+ ]
expected_harmonized_names = [
- "S2-B01",
- "S2-B04",
- "S2-B08",
- "S2-B11",
- "S2-SCL",
- "S2-AOT",
+ "S2-L2A-B01",
+ "S2-L2A-B04",
+ "S2-L2A-B08",
+ "S2-L2A-B11",
+ "S2-L2A-SCL",
+ "S2-L2A-AOT",
]
fetching_parameters = (
{"target_resolution": 10.0, "target_crs": 3035} if country == "Belgium" else {}
@@ -190,7 +197,7 @@ def sentinel2_l2a_point_based(
given polygons.
"""
context = BackendContext(backend)
- bands = ["S2-B01", "S2-B04", "S2-B08", "S2-B11"]
+ bands = ["S2-L2A-B01", "S2-L2A-B04", "S2-L2A-B08", "S2-L2A-B11"]
# Because it it tested in malawi, and this is the EPSG code for the
# UTM projection for that zone
@@ -234,7 +241,7 @@ def sentinel2_l2a_polygon_based(
connection: openeo.Connection,
):
context = BackendContext(backend)
- bands = ["S2-B02", "S2-B03", "S2-B04"]
+ bands = ["S2-L2A-B02", "S2-L2A-B03", "S2-L2A-B04"]
fetching_parameters = {"target_crs": 3035} # Location in Europe
extractor = build_sentinel2_l2a_extractor(