diff --git a/examples/extraction_pipelines/S2_extraction_example.ipynb b/examples/extraction_pipelines/S2_extraction_example.ipynb new file mode 100644 index 0000000..35e05ab --- /dev/null +++ b/examples/extraction_pipelines/S2_extraction_example.ipynb @@ -0,0 +1,908 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Example of a GFMAP full-extraction pipeline\n", + "\n", + "Designing a full-extraction pipeline using the openeo-gfmap GFMAPJobManager.\n", + "\n", + "The pipeline consits of the following elements:\n", + "* An input dataframe where each row corresponds to each executed job.\n", + "* An user function defined to create OpenEO BatchJob from input rows of the beforementioned dataframe.\n", + "* An user function defined to generate an output path of each of the Job products\n", + "* An user function executed after the assets are downloaded and saved from a finished job (optional). This **post job action** can do anything and will be executed locally inside the GFMAPJobManager.\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Setting up the logging module " + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "# Configuring the logging for the openeo_gfmap package\n", + "from openeo_gfmap.manager import _log\n", + "import logging\n", + "\n", + "_log.setLevel(logging.DEBUG)\n", + "\n", + "stream_handler = logging.StreamHandler()\n", + "_log.addHandler(stream_handler)\n", + "\n", + "formatter = logging.Formatter('%(asctime)s|%(name)s|%(levelname)s: %(message)s')\n", + "stream_handler.setFormatter(formatter)\n", + "\n", + "# Exclude the other loggers from other libraries\n", + "class MyLoggerFilter(logging.Filter):\n", + " def filter(self, record):\n", + " return record.name == _log.name\n", + "\n", + "stream_handler.addFilter(MyLoggerFilter())\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### First step: splitting the job\n", + "\n", + "We load the initial crop-type dataset that will be the base of our extractions.\n", + "\n", + "Splitting the dataset of extraction in multiple job based on position is necessary to respect OpenEO limitations.\n", + "\n", + "This script performs a split with the H3 hexagonal grid, yielding a list of sub-geodataframes.\n", + "\n", + "A subtility here is that some polygons are not directly extracted (field with `extract=False`), but should be kept for post-job actions. This requirement is filled by removing sub-dataframes that do not contain any extractable polyons." + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/data/users/Private/couchard/openeo-gfmap/src/openeo_gfmap/manager/job_splitters.py:51: UserWarning: Geometry is in a geographic CRS. Results from 'centroid' are likely incorrect. Use 'GeoSeries.to_crs()' to re-project geometries to a projected CRS before this operation.\n", + "\n", + " polygons[\"h3index\"] = polygons.geometry.centroid.apply(\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "334 sub-datasets.\n", + "236 sub-datasets after filtering sub-datasets with no point to extract.\n" + ] + } + ], + "source": [ + "from pathlib import Path\n", + "import geopandas as gpd\n", + "from openeo_gfmap.manager.job_splitters import split_job_hex\n", + "\n", + "base_df_path = \"https://artifactory.vgt.vito.be/artifactory/auxdata-public/gfmap/DEMO_CROPTYPE.gpkg\"\n", + "base_df = gpd.read_file(base_df_path)\n", + "# Splits the job using GFMAP\n", + "split_jobs = split_job_hex(\n", + " base_df, max_points=60, grid_resolution=4\n", + ")\n", + "\n", + "print(f'{len(split_jobs)} sub-datasets.')\n", + "\n", + "# Remove the geometry where there are no points with the \"extract\" flag\n", + "split_jobs = [\n", + " job for job in split_jobs if job.extract.any()\n", + "]\n", + "print(f'{len(split_jobs)} sub-datasets after filtering sub-datasets with no point to extract.')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Second step: creating a dataframe for the GFMAP Job Manager\n", + "\n", + "Implementing a function that yields a `pandas.DataFrame` where each row correponds to a job.\n", + "\n", + "The dataframe should contain the informations required by the GFMAP Job Manager, as well as additional information used by the datacube creation function and the post-job action function.\n", + "\n", + "The output dataframe should be savable as a .csv file.\n", + "\n", + "Note: the full information of a sub-geodataframe of polygons can be saved into a row of a `pandas.DataFrame` by storing it in a row as string implementing the `geojson.FeatureCollection` interface. To convert the `geopandas.GeoDataFrame` into a stirng, simply use the `.to_json()` function." + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
backend_nameout_prefixout_extensionstart_dateend_dategeometry
0cdseS2_10m.nc2020-08-302022-03-03{\"type\": \"FeatureCollection\", \"features\": [{\"i...
1cdseS2_10m.nc2020-08-302022-03-03{\"type\": \"FeatureCollection\", \"features\": [{\"i...
2cdseS2_10m.nc2020-08-302022-03-03{\"type\": \"FeatureCollection\", \"features\": [{\"i...
3cdseS2_10m.nc2020-08-302022-03-03{\"type\": \"FeatureCollection\", \"features\": [{\"i...
4cdseS2_10m.nc2020-08-302022-03-03{\"type\": \"FeatureCollection\", \"features\": [{\"i...
.....................
231cdseS2_10m.nc2020-08-302022-03-03{\"type\": \"FeatureCollection\", \"features\": [{\"i...
232cdseS2_10m.nc2020-08-302022-03-03{\"type\": \"FeatureCollection\", \"features\": [{\"i...
233cdseS2_10m.nc2020-08-302022-03-03{\"type\": \"FeatureCollection\", \"features\": [{\"i...
234cdseS2_10m.nc2020-08-302022-03-03{\"type\": \"FeatureCollection\", \"features\": [{\"i...
235cdseS2_10m.nc2020-08-302022-03-03{\"type\": \"FeatureCollection\", \"features\": [{\"i...
\n", + "

236 rows × 6 columns

\n", + "
" + ], + "text/plain": [ + " backend_name out_prefix out_extension start_date end_date \\\n", + "0 cdse S2_10m .nc 2020-08-30 2022-03-03 \n", + "1 cdse S2_10m .nc 2020-08-30 2022-03-03 \n", + "2 cdse S2_10m .nc 2020-08-30 2022-03-03 \n", + "3 cdse S2_10m .nc 2020-08-30 2022-03-03 \n", + "4 cdse S2_10m .nc 2020-08-30 2022-03-03 \n", + ".. ... ... ... ... ... \n", + "231 cdse S2_10m .nc 2020-08-30 2022-03-03 \n", + "232 cdse S2_10m .nc 2020-08-30 2022-03-03 \n", + "233 cdse S2_10m .nc 2020-08-30 2022-03-03 \n", + "234 cdse S2_10m .nc 2020-08-30 2022-03-03 \n", + "235 cdse S2_10m .nc 2020-08-30 2022-03-03 \n", + "\n", + " geometry \n", + "0 {\"type\": \"FeatureCollection\", \"features\": [{\"i... \n", + "1 {\"type\": \"FeatureCollection\", \"features\": [{\"i... \n", + "2 {\"type\": \"FeatureCollection\", \"features\": [{\"i... \n", + "3 {\"type\": \"FeatureCollection\", \"features\": [{\"i... \n", + "4 {\"type\": \"FeatureCollection\", \"features\": [{\"i... \n", + ".. ... \n", + "231 {\"type\": \"FeatureCollection\", \"features\": [{\"i... \n", + "232 {\"type\": \"FeatureCollection\", \"features\": [{\"i... \n", + "233 {\"type\": \"FeatureCollection\", \"features\": [{\"i... \n", + "234 {\"type\": \"FeatureCollection\", \"features\": [{\"i... \n", + "235 {\"type\": \"FeatureCollection\", \"features\": [{\"i... \n", + "\n", + "[236 rows x 6 columns]" + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "from openeo_gfmap import Backend\n", + "from typing import List\n", + "import pandas as pd\n", + "\n", + "def create_job_dataframe_s2(backend: Backend, split_jobs: List[gpd.GeoDataFrame]) -> pd.DataFrame:\n", + " \"\"\"Create a dataframe from the split jobs, containg all the necessary information to run the job.\"\"\"\n", + " columns = ['backend_name', 'out_prefix', 'out_extension', 'start_date', 'end_date', 'geometry']\n", + " rows = []\n", + " for job in split_jobs:\n", + " # Compute the average in the valid date and make a buffer of 1.5 year around\n", + " median_time = pd.to_datetime(job.valid_date).mean()\n", + " start_date = median_time - pd.Timedelta(days=275) # A bit more than 9 months\n", + " end_date = median_time + pd.Timedelta(days=275) # A bit more than 9 months\n", + " \n", + " rows.append(\n", + " pd.Series(\n", + " dict(zip(columns, [backend.value, 'S2_10m', '.nc', start_date.strftime('%Y-%m-%d'), end_date.strftime('%Y-%m-%d'), job.to_json()]))\n", + " )\n", + " )\n", + "\n", + " return pd.DataFrame(rows)\n", + "\n", + "job_df = create_job_dataframe_s2(Backend.CDSE, split_jobs)\n", + "\n", + "job_df" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Sub-sampling job dataframe to reduce execution time" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [], + "source": [ + "# Run a subset of the jobs to test the manager, the selected jobs have a fair amount of geometries to extract\n", + "job_df = job_df.iloc[[0, 2, 3, -6]].reset_index(drop=True)" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "array([3])" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import geojson\n", + "\n", + "def get_job_nb_polygons(row: pd.Series) -> int:\n", + " \"\"\"Get the number of polygons in the geometry.\"\"\"\n", + " return len(list(filter(lambda feat: feat.properties.get(\"extract\"), geojson.loads(row.geometry)['features'])))\n", + "\n", + "job_df['nb_polygons'] = job_df.apply(get_job_nb_polygons, axis=1)\n", + "job_df.nb_polygons.values" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Third step: implement the datacube creator function.\n", + "\n", + "Implement a function to create, from the additional rows provided before, an `openeo.BatchJob` that will be used to run the job.\n", + "\n", + "In this case we extract Sentinel-2 data around a 64x64 pixel square of polygons which have the field `extract=True` (although we keep them in the row for the post-job action.)\n", + "\n", + "Note:\n", + "Because the polygons to extract are specified in UTM dimensions (required to have a specific size), the dataset of polygon cannot be send directly through the openeo process graph (GeoJSON only support lat/lon coordinates). The sub-datasets of polygons are therefore uploaded to a publicly accessible URL so they can be used later by openeo during the execution of the job." + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [], + "source": [ + "import openeo\n", + "\n", + "import requests\n", + "from tempfile import NamedTemporaryFile\n", + "import os\n", + "import pandas as pd\n", + "import geojson\n", + "from shapely.geometry import Point\n", + "\n", + "from openeo_gfmap import TemporalContext, Backend, BackendContext, FetchType, SpatialContext\n", + "from openeo_gfmap.fetching import build_sentinel2_l2a_extractor\n", + "\n", + "def upload_geoparquet_artifactory(gdf: gpd.GeoDataFrame, row_id: int) -> str:\n", + " # Save the dataframe as geoparquet to upload it to artifactory\n", + " temporary_file = NamedTemporaryFile()\n", + " gdf.to_parquet(temporary_file.name)\n", + " \n", + " artifactory_username = os.getenv('ARTIFACTORY_USERNAME')\n", + " artifactory_password = os.getenv('ARTIFACTORY_PASSWORD')\n", + "\n", + " headers = {\n", + " \"Content-Type\": \"application/octet-stream\"\n", + " }\n", + "\n", + " upload_url = f\"https://artifactory.vgt.vito.be/artifactory/auxdata-public/gfmap-temp/openeogfmap_dataframe_{row_id}.parquet\"\n", + "\n", + " with open(temporary_file.name, 'rb') as f:\n", + " response = requests.put(upload_url, headers=headers, data=f, auth=(artifactory_username, artifactory_password))\n", + "\n", + " assert response.status_code == 201, f\"Error uploading the dataframe to artifactory: {response.text}\"\n", + "\n", + " return upload_url\n", + "\n", + "\n", + "def create_datacube_s2(row: pd.Series, connection: openeo.DataCube, provider=None, connection_provider=None) -> openeo.BatchJob:\n", + "\n", + " def buffer_geometry(geometry: geojson.FeatureCollection, buffer: int) -> gpd.GeoDataFrame:\n", + " gdf = gpd.GeoDataFrame.from_features(geometry).set_crs(epsg=4326)\n", + " utm = gdf.estimate_utm_crs()\n", + " gdf = gdf.to_crs(utm)\n", + "\n", + " gdf['geometry'] = gdf.centroid.apply(\n", + " # Clips the point to the closest 20m from the S2 grid\n", + " lambda point: Point(round(point.x / 20.0) * 20.0, round(point.y / 20.0) * 20.0)\n", + " ).buffer(distance=buffer, cap_style=3)\n", + "\n", + " return gdf\n", + "\n", + " def filter_extractonly_geometries(collection: geojson.FeatureCollection):\n", + " # Filter out geometries that do not have the field extract=True\n", + " features = [f for f in collection.features if f.properties.get('extract', False)]\n", + " return geojson.FeatureCollection(features)\n", + "\n", + " start_date = row.start_date\n", + " end_date = row.end_date\n", + " temporal_context = TemporalContext(start_date, end_date)\n", + "\n", + " # Get the feature collection containing the geometry to the job\n", + " geometry = geojson.loads(row.geometry)\n", + " assert isinstance(geometry, geojson.FeatureCollection)\n", + "\n", + " # Filter the geometry to the rows with the extract only flag\n", + " geometry = filter_extractonly_geometries(geometry)\n", + " assert len(geometry.features) > 0, \"No geometries with the extract flag found\"\n", + "\n", + " # Performs a buffer of 64 px around the geometry\n", + " geometry_df = buffer_geometry(geometry, 320)\n", + " spatial_extent_url = upload_geoparquet_artifactory(geometry_df, row.name)\n", + "\n", + " # Backend name and fetching type\n", + " backend = Backend(row.backend_name)\n", + " backend_context = BackendContext(backend)\n", + "\n", + " fetch_type = FetchType.POLYGON\n", + " bands_to_download = ['S2-L2A-B01', 'S2-L2A-B02', 'S2-L2A-B03', 'S2-L2A-B04', 'S2-L2A-B05', 'S2-L2A-B06', 'S2-L2A-B07', 'S2-L2A-B08', 'S2-L2A-B8A', 'S2-L2A-B09', 'S2-L2A-B11', 'S2-L2A-B12', 'S2-L2A-SCL']\n", + "\n", + " # Create the job to extract S2\n", + " extraction_parameters = {\n", + " \"target_resolution\": 10,\n", + " \"load_collection\": {\n", + " \"eo:cloud_cover\": lambda val: val <= 95.0,\n", + " },\n", + " }\n", + " extractor = build_sentinel2_l2a_extractor(\n", + " backend_context, bands=bands_to_download, fetch_type=fetch_type.POLYGON, **extraction_parameters \n", + " )\n", + "\n", + " cube = extractor.get_cube(connection, spatial_extent_url, temporal_context)\n", + "\n", + " # Compute the SCL dilation and add it to the cube\n", + " scl_dilated_mask = cube.process(\n", + " \"to_scl_dilation_mask\",\n", + " data=cube,\n", + " scl_band_name=\"S2-L2A-SCL\",\n", + " kernel1_size=17, # 17px dilation on a 20m layer\n", + " kernel2_size=77, # 77px dilation on a 20m layer\n", + " mask1_values=[2, 4, 5, 6, 7],\n", + " mask2_values=[3, 8, 9, 10, 11],\n", + " erosion_kernel_size=3\n", + " ).rename_labels(\"bands\", [\"S2-L2A-SCL_DILATED_MASK\"])\n", + "\n", + " cube = cube.merge_cubes(scl_dilated_mask)\n", + " cube = cube.linear_scale_range(0, 65534, 0, 65534)\n", + "\n", + " # Get the h3index to use in the tile\n", + " h3index = geometry.features[0].properties['h3index']\n", + " valid_date = geometry.features[0].properties['valid_date']\n", + "\n", + " # Increase the memory of the jobs depending on the number of polygons to extract\n", + " number_polygons = get_job_nb_polygons(row)\n", + " _log.debug(f\"Number of polygons to extract: {number_polygons}\")\n", + "\n", + " job_options = {\n", + " \"executor-memory\": \"5G\",\n", + " \"executor-memoryOverhead\": \"2G\",\n", + " }\n", + "\n", + " return cube.create_job(\n", + " out_format=\"NetCDF\",\n", + " title=f\"GFMAP_Extraction_S2_{h3index}_{valid_date}\",\n", + " sample_by_feature=True,\n", + " job_options=job_options\n", + " )\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Fourth step: create output paths\n", + "\n", + "Implement a function that from a temporary path containing a job result, from the job dataframe row and the root folder will choose the output path where to save that job result." + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "CPU times: user 4.62 s, sys: 85.2 ms, total: 4.71 s\n", + "Wall time: 5.03 s\n" + ] + } + ], + "source": [ + "%%time\n", + "\n", + "# Load the S2 grid\n", + "s2_grid = gpd.read_file(\"https://artifactory.vgt.vito.be/artifactory/auxdata-public/gfmap/s2grid_bounds.gpkg\")" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [], + "source": [ + "from pathlib import Path\n", + "import xarray as xr\n", + "from pyproj import Transformer, CRS\n", + "from shapely.geometry import box, Point\n", + "\n", + "def generate_output_path_s2(root_folder: Path, tmp_path: Path, geometry_index: int, row: pd.Series):\n", + " features = geojson.loads(row.geometry)\n", + " sample_id = features[geometry_index].properties['sample_id']\n", + " ref_id = features[geometry_index].properties['ref_id']\n", + " \n", + " # Loads the array lazily in-memory\n", + " try:\n", + " inds = xr.open_dataset(tmp_path, chunks='auto')\n", + " \n", + " source_crs = CRS.from_wkt(inds.crs.attrs['crs_wkt'])\n", + " dst_crs = CRS.from_epsg(4326)\n", + " \n", + " transformer = Transformer.from_crs(source_crs, dst_crs, always_xy=True)\n", + "\n", + " # Get the center point of the tile\n", + " centroid_utm = box(\n", + " inds.x.min().item(), inds.y.min().item(), inds.x.max().item(), inds.y.max().item()\n", + " ).centroid\n", + " centroid_latlon = Point(*transformer.transform(centroid_utm.x, centroid_utm.y))\n", + "\n", + " # Intersecting with the s2 grid\n", + " intersecting = s2_grid.geometry.intersects(centroid_latlon)\n", + "\n", + " # Select the intersecting cell that has a centroid the closest from the point\n", + " intersecting_cells = s2_grid[intersecting]\n", + " intersecting_cells['distance'] = intersecting_cells.distance(centroid_latlon)\n", + " intersecting_cells.sort_values('distance', inplace=True)\n", + " s2_tile = intersecting_cells.iloc[0]\n", + "\n", + " s2_tile_id = s2_tile.tile\n", + "\n", + " subfolder = root_folder / ref_id / str(source_crs.to_epsg()) / s2_tile_id / sample_id\n", + " except Exception:\n", + " subfolder = root_folder / 'unsortable'\n", + "\n", + " return subfolder / f'{row.out_prefix}_{sample_id}_{source_crs.to_epsg()}_{row.start_date}_{row.end_date}{row.out_extension}'\n", + " " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Fifth step: Define the post-job action\n", + "\n", + "The post-job action will be called once the job resut was downloaded and saved to a specific path.\n", + "\n", + "A post-job action function must receive 3 parameters:\n", + "* `job_items`: STAC items containing the currently extracted data for the job.\n", + "* `row`: The current job dataframe row.\n", + "* `parameters`: User-defined parameters set in the `GFMAPJobManager` constructor.\n", + "\n", + "The post-job action must return back the list of job items. The user is responsible for updating,\n", + "adding and removing the items. For example, in this case the user creates a raster file with\n", + "ground truth data, the user then adds an asset using the predefined `openeo_gfmap.stac.AUXILIARY`\n", + "AssetDefintion to the related item, pointing to the generated NetCDF file." + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "<>:16: SyntaxWarning: assertion is always true, perhaps remove parentheses?\n", + "<>:16: SyntaxWarning: assertion is always true, perhaps remove parentheses?\n", + "/tmp/ipykernel_4054/2115750546.py:16: SyntaxWarning: assertion is always true, perhaps remove parentheses?\n", + " assert (\n" + ] + } + ], + "source": [ + "from importlib.metadata import version\n", + "from datetime import datetime\n", + "\n", + "from openeo_gfmap.stac import AUXILIARY\n", + "from rasterio.features import rasterize\n", + "from rasterio.transform import from_bounds\n", + "import pystac\n", + "import json\n", + "\n", + "def add_item_asset(related_item: pystac.Item, path: Path):\n", + " asset = AUXILIARY.create_asset(\n", + " href=path.as_posix()\n", + " )\n", + " related_item.add_asset('auxiliary', asset)\n", + "\n", + "def post_job_action(job_items: List[pystac.Item], row: pd.Series, parameters: dict = {}) -> list:\n", + " base_gpd = gpd.GeoDataFrame.from_features(json.loads(row.geometry)).set_crs(epsg=4326)\n", + " assert (\n", + " len(base_gpd[base_gpd.extract == True]) == len(job_items),\n", + " \"The number of result paths should be the same as the number of geometries\"\n", + " )\n", + " extracted_gpd = base_gpd[base_gpd.extract == True].reset_index(drop=True)\n", + " # In this case we want to burn the metadata in a new file in the same folder as the S2 product\n", + " for idx, item in enumerate(job_items):\n", + " sample_id = extracted_gpd.iloc[idx].sample_id\n", + " ref_id = extracted_gpd.iloc[idx].ref_id\n", + " confidence = extracted_gpd.iloc[idx].confidence\n", + " valid_date = extracted_gpd.iloc[idx].valid_date\n", + "\n", + " item_asset_path = Path(\n", + " list(item.assets.values())[0].href\n", + " )\n", + " # Read information from the item file (could also read it from the item object metadata)\n", + " result_ds = xr.open_dataset(item_asset_path, chunks='auto')\n", + "\n", + " # Add some metadata to the result_df netcdf file\n", + " result_ds.attrs.update({\n", + " 'start_date': row.start_date,\n", + " 'end_date': row.end_date,\n", + " 'valid_date': valid_date,\n", + " 'GFMAP_version': version('openeo_gfmap'),\n", + " 'creation_date': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),\n", + " 'description': f'Sentinel2 L2A observations for sample: {sample_id}, unprocessed.',\n", + " 'title': f'Sentinel2 L2A - {sample_id}',\n", + " 'sample_id': sample_id,\n", + " 'spatial_resolution': '10m'\n", + " })\n", + " result_ds.to_netcdf(item_asset_path, format='NETCDF4', engine='h5netcdf')\n", + "\n", + " target_crs = CRS.from_wkt(result_ds.crs.attrs['crs_wkt'])\n", + "\n", + " # Get the surrounding polygons around our extracted center geometry to rastetize them\n", + " bounds = (\n", + " result_ds.x.min().item(),\n", + " result_ds.y.min().item(),\n", + " result_ds.x.max().item(),\n", + " result_ds.y.max().item()\n", + " )\n", + " bbox = box(*bounds)\n", + " surround_gpd = base_gpd.to_crs(target_crs).clip(bbox)\n", + "\n", + " # Burn the polygon croptypes\n", + " transform = from_bounds(*bounds, result_ds.x.size, result_ds.y.size)\n", + " croptype_shapes = list(zip(surround_gpd.geometry, surround_gpd.croptype_label))\n", + "\n", + " fill_value = 0\n", + " croptype = rasterize(\n", + " croptype_shapes,\n", + " out_shape=(result_ds.y.size, result_ds.x.size),\n", + " transform=transform,\n", + " all_touched=False,\n", + " fill=fill_value,\n", + " default_value=0,\n", + " dtype='int64'\n", + " )\n", + "\n", + " # Create the attributes to add to the metadata\n", + " attributes = {\n", + " 'ref_id': ref_id,\n", + " 'sample_id': sample_id,\n", + " 'confidence': str(confidence),\n", + " 'valid_date': valid_date,\n", + " '_FillValue': fill_value,\n", + " 'Conventions': 'CF-1.9',\n", + " 'GFMAP_version': version('openeo_gfmap'),\n", + " 'creation_date': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),\n", + " 'description': f'Contains rasterized WorldCereal labels for sample: {sample_id}.',\n", + " 'title': f'WORLDCEREAL Auxiliary file for sample: {sample_id}',\n", + " 'spatial_resolution': '10m'\n", + " }\n", + "\n", + " aux_dataset = xr.Dataset(\n", + " {'LABEL': (('y', 'x'), croptype), 'crs': result_ds['crs']},\n", + " coords={'y': result_ds.y, 'x': result_ds.x},\n", + " attrs=attributes\n", + " )\n", + " # Required to map the 'crs' layer as geo-reference for the 'LABEL' layer.\n", + " aux_dataset['LABEL'].attrs['grid_mapping'] = 'crs'\n", + "\n", + " # Save the metadata in the same folder as the S2 product\n", + " metadata_path = (\n", + " item_asset_path.parent / \n", + " f'WORLDCEREAL_10m_{sample_id}_{target_crs.to_epsg()}_{valid_date}.nc'\n", + " )\n", + " aux_dataset.to_netcdf(metadata_path, format='NETCDF4', engine='h5netcdf')\n", + " aux_dataset.close()\n", + "\n", + " # Adds this metadata as a new asset\n", + " add_item_asset(item, metadata_path)\n", + " \n", + " return job_items\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Sixth and last step: Running the manager\n", + "\n", + "Let's initialize and execute the Job Manager as defined the GFMAP, and then run it using the functions defined previously\n", + "\n", + "STAC related parameters such as `collection_id` and `collection_description` are also required." + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [], + "source": [ + "from openeo_gfmap.manager.job_manager import GFMAPJobManager\n", + "from openeo_gfmap.backend import cdse_staging_connection\n", + "\n", + "\n", + "base_output_dir = Path('/data/users/Public/couchard/world_cereal/extractions_5/')\n", + "tracking_job_csv = base_output_dir / 'job_tracker.csv'\n", + "\n", + "manager = GFMAPJobManager(\n", + " output_dir=base_output_dir,\n", + " output_path_generator=generate_output_path_s2,\n", + " collection_id='SENTINEL-EXTRACTION',\n", + " collection_description=(\n", + " \"Sentinel-2 and Auxiliary data extraction example.\"\n", + " ),\n", + " post_job_action=post_job_action,\n", + " poll_sleep=60,\n", + " n_threads=2,\n", + " post_job_params={}\n", + ")\n", + "\n", + "manager.add_backend(\n", + " Backend.CDSE_STAGING.value, cdse_staging_connection, parallel_jobs=6\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2024-03-07 14:32:38,392|openeo_gfmap.manager|INFO: Starting job manager using 2 worker threads.\n", + "2024-03-07 14:32:38,395|openeo_gfmap.manager|INFO: Workers started, creating and running jobs.\n", + "2024-03-07 14:32:38,592|openeo_gfmap.manager|DEBUG: Normalizing dataframe. Columns: Index(['backend_name', 'out_prefix', 'out_extension', 'start_date', 'end_date',\n", + " 'geometry', 'nb_polygons', 'status', 'id', 'start_time', 'cpu',\n", + " 'memory', 'duration', 'description', 'costs'],\n", + " dtype='object')\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Authenticated using refresh token.\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2024-03-07 14:32:40,375|openeo_gfmap.manager|DEBUG: Number of polygons to extract: 3\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "DataCube()\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2024-03-07 14:34:05,989|openeo_gfmap.manager|DEBUG: Status of job j-240307f2186344ee9bed9c649679dc92 is running (on backend cdse-staging).\n", + "2024-03-07 14:35:06,713|openeo_gfmap.manager|DEBUG: Status of job j-240307f2186344ee9bed9c649679dc92 is running (on backend cdse-staging).\n", + "2024-03-07 14:36:07,149|openeo_gfmap.manager|DEBUG: Status of job j-240307f2186344ee9bed9c649679dc92 is running (on backend cdse-staging).\n", + "2024-03-07 14:37:08,393|openeo_gfmap.manager|DEBUG: Status of job j-240307f2186344ee9bed9c649679dc92 is running (on backend cdse-staging).\n", + "2024-03-07 14:38:08,991|openeo_gfmap.manager|DEBUG: Status of job j-240307f2186344ee9bed9c649679dc92 is running (on backend cdse-staging).\n", + "2024-03-07 14:39:09,497|openeo_gfmap.manager|DEBUG: Status of job j-240307f2186344ee9bed9c649679dc92 is running (on backend cdse-staging).\n", + "2024-03-07 14:40:11,296|openeo_gfmap.manager|DEBUG: Status of job j-240307f2186344ee9bed9c649679dc92 is running (on backend cdse-staging).\n", + "2024-03-07 14:41:12,353|openeo_gfmap.manager|DEBUG: Status of job j-240307f2186344ee9bed9c649679dc92 is running (on backend cdse-staging).\n", + "2024-03-07 14:42:13,100|openeo_gfmap.manager|DEBUG: Status of job j-240307f2186344ee9bed9c649679dc92 is running (on backend cdse-staging).\n", + "2024-03-07 14:43:13,535|openeo_gfmap.manager|DEBUG: Status of job j-240307f2186344ee9bed9c649679dc92 is running (on backend cdse-staging).\n", + "2024-03-07 14:44:14,023|openeo_gfmap.manager|DEBUG: Status of job j-240307f2186344ee9bed9c649679dc92 is running (on backend cdse-staging).\n", + "2024-03-07 14:45:14,444|openeo_gfmap.manager|DEBUG: Status of job j-240307f2186344ee9bed9c649679dc92 is running (on backend cdse-staging).\n", + "2024-03-07 14:46:14,884|openeo_gfmap.manager|DEBUG: Status of job j-240307f2186344ee9bed9c649679dc92 is running (on backend cdse-staging).\n", + "2024-03-07 14:47:15,338|openeo_gfmap.manager|DEBUG: Status of job j-240307f2186344ee9bed9c649679dc92 is running (on backend cdse-staging).\n", + "2024-03-07 14:48:15,855|openeo_gfmap.manager|DEBUG: Status of job j-240307f2186344ee9bed9c649679dc92 is running (on backend cdse-staging).\n", + "2024-03-07 14:49:18,136|openeo_gfmap.manager|DEBUG: Status of job j-240307f2186344ee9bed9c649679dc92 is running (on backend cdse-staging).\n", + "2024-03-07 14:50:19,075|openeo_gfmap.manager|DEBUG: Status of job j-240307f2186344ee9bed9c649679dc92 is running (on backend cdse-staging).\n", + "2024-03-07 14:51:19,412|openeo_gfmap.manager|DEBUG: Status of job j-240307f2186344ee9bed9c649679dc92 is running (on backend cdse-staging).\n", + "2024-03-07 14:52:19,838|openeo_gfmap.manager|DEBUG: Status of job j-240307f2186344ee9bed9c649679dc92 is running (on backend cdse-staging).\n", + "2024-03-07 14:53:21,069|openeo_gfmap.manager|DEBUG: Status of job j-240307f2186344ee9bed9c649679dc92 is running (on backend cdse-staging).\n", + "2024-03-07 14:54:21,621|openeo_gfmap.manager|DEBUG: Status of job j-240307f2186344ee9bed9c649679dc92 is running (on backend cdse-staging).\n", + "2024-03-07 14:55:22,007|openeo_gfmap.manager|DEBUG: Status of job j-240307f2186344ee9bed9c649679dc92 is running (on backend cdse-staging).\n", + "2024-03-07 14:56:22,363|openeo_gfmap.manager|DEBUG: Status of job j-240307f2186344ee9bed9c649679dc92 is running (on backend cdse-staging).\n", + "2024-03-07 14:57:23,108|openeo_gfmap.manager|DEBUG: Status of job j-240307f2186344ee9bed9c649679dc92 is finished (on backend cdse-staging).\n", + "2024-03-07 14:57:23,110|openeo_gfmap.manager|INFO: Job j-240307f2186344ee9bed9c649679dc92 finished successfully, queueing on_job_done...\n", + "2024-03-07 14:57:23,115|openeo_gfmap.manager|DEBUG: Worker thread Thread-5: polled finished job with status PostJobStatus.FINISHED.\n", + "2024-03-07 14:57:24,650|openeo_gfmap.manager|DEBUG: Downloading asset openEO.nc from job j-240307f2186344ee9bed9c649679dc92 -> /tmp/tmpyu7z9urd\n", + "2024-03-07 14:57:43,859|openeo_gfmap.manager|DEBUG: Generating output path for asset openEO.nc from job j-240307f2186344ee9bed9c649679dc92...\n", + "/tmp/ipykernel_4054/3116825532.py:31: UserWarning: Geometry is in a geographic CRS. Results from 'distance' are likely incorrect. Use 'GeoSeries.to_crs()' to re-project geometries to a projected CRS before this operation.\n", + "\n", + " intersecting_cells['distance'] = intersecting_cells.distance(centroid_latlon)\n", + "/home/couchard/miniconda3/envs/gfmap/lib/python3.9/site-packages/geopandas/geodataframe.py:1543: SettingWithCopyWarning: \n", + "A value is trying to be set on a copy of a slice from a DataFrame.\n", + "Try using .loc[row_indexer,col_indexer] = value instead\n", + "\n", + "See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy\n", + " super().__setitem__(key, value)\n", + "/tmp/ipykernel_4054/3116825532.py:32: SettingWithCopyWarning: \n", + "A value is trying to be set on a copy of a slice from a DataFrame\n", + "\n", + "See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy\n", + " intersecting_cells.sort_values('distance', inplace=True)\n", + "2024-03-07 14:57:44,390|openeo_gfmap.manager|DEBUG: Generated path for asset openEO.nc from job j-240307f2186344ee9bed9c649679dc92 -> /data/users/Public/couchard/world_cereal/extractions_5/2021_EUR_DEMO_POLY_110/32635/35VMD/2021_LV_LPIS_POLY_110-12880341/S2_10m_2021_LV_LPIS_POLY_110-12880341_32635_2020-08-30_2022-03-03.nc\n", + "2024-03-07 14:57:48,864|openeo_gfmap.manager|INFO: Downloaded asset openEO.nc from job j-240307f2186344ee9bed9c649679dc92 -> /data/users/Public/couchard/world_cereal/extractions_5/2021_EUR_DEMO_POLY_110/32635/35VMD/2021_LV_LPIS_POLY_110-12880341/S2_10m_2021_LV_LPIS_POLY_110-12880341_32635_2020-08-30_2022-03-03.nc\n", + "2024-03-07 14:57:54,824|openeo_gfmap.manager|INFO: Parsed item j-240307f2186344ee9bed9c649679dc92_openEO.nc from job j-240307f2186344ee9bed9c649679dc92\n", + "2024-03-07 14:57:54,826|openeo_gfmap.manager|DEBUG: Calling post job action for job j-240307f2186344ee9bed9c649679dc92...\n", + "2024-03-07 14:57:55,644|openeo_gfmap.manager|INFO: Added 1 items to the STAC collection.\n", + "2024-03-07 14:57:55,646|openeo_gfmap.manager|INFO: Job j-240307f2186344ee9bed9c649679dc92 and post job action finished successfully.\n" + ] + } + ], + "source": [ + "# Run the jobs and create the STAC catalogue\n", + "manager.run_jobs(job_df, create_datacube_s2, tracking_job_csv)\n", + "manager.create_stac() # By default the STAC catalogue will be saved in the stac subfolder of the base_output_dir folder." + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "gfmap", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.18" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/examples/feature_extractors/quantile_feature_extractor.py b/examples/feature_extractors/quantile_feature_extractor.py index aafd8b1..8e2f9e3 100644 --- a/examples/feature_extractors/quantile_feature_extractor.py +++ b/examples/feature_extractors/quantile_feature_extractor.py @@ -42,13 +42,13 @@ def output_labels(self) -> list: def execute(self, inarr: xr.DataArray) -> xr.DataArray: # compute indices - b03 = inarr.sel(bands="S2-B03") - b04 = inarr.sel(bands="S2-B04") - b05 = inarr.sel(bands="S2-B05") - b06 = inarr.sel(bands="S2-B06") - b08 = inarr.sel(bands="S2-B08") - b11 = inarr.sel(bands="S2-B11") - b12 = inarr.sel(bands="S2-B12") + b03 = inarr.sel(bands="S2-L2A-B03") + b04 = inarr.sel(bands="S2-L2A-B04") + b05 = inarr.sel(bands="S2-L2A-B05") + b06 = inarr.sel(bands="S2-L2A-B06") + b08 = inarr.sel(bands="S2-L2A-B08") + b11 = inarr.sel(bands="S2-L2A-B11") + b12 = inarr.sel(bands="S2-L2A-B12") ndvi = (b08 - b04) / (b08 + b04) ndwi = (b03 - b08) / (b03 + b08) @@ -100,14 +100,14 @@ def execute(self, inarr: xr.DataArray) -> xr.DataArray: # The bands that you can extract are defined in the code openeo_gfmap.fetching.s2.BASE_SENTINEL2_L2A_MAPPING bands = [ - "S2-B03", - "S2-B04", - "S2-B05", - "S2-B06", - "S2-B08", - "S2-B11", - "S2-B12", - "S2-SCL", + "S2-L2A-B03", + "S2-L2A-B04", + "S2-L2A-B05", + "S2-L2A-B06", + "S2-L2A-B08", + "S2-L2A-B11", + "S2-L2A-B12", + "S2-L2A-SCL", ] # Use the base feching diff --git a/pyproject.toml b/pyproject.toml index 98b6556..bd7c450 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,7 +34,8 @@ dependencies = [ "openeo[localprocessing]", "cftime", "pyarrow", - "fastparquet" + "fastparquet", + "h3", ] [project.urls] diff --git a/src/openeo_gfmap/backend.py b/src/openeo_gfmap/backend.py index ba8cdb8..b932ea8 100644 --- a/src/openeo_gfmap/backend.py +++ b/src/openeo_gfmap/backend.py @@ -19,6 +19,7 @@ class Backend(Enum): TERRASCOPE = "terrascope" EODC = "eodc" # Dask implementation. Do not test on this yet. CDSE = "cdse" # Terrascope implementation (pyspark) #URL: openeo.dataspace.copernicus.eu (need to register) + CDSE_STAGING = "cdse-staging" LOCAL = "local" # Based on the same components of EODc @@ -82,6 +83,14 @@ def cdse_connection() -> openeo.Connection: ) +def cdse_staging_connection() -> openeo.Connection: + """Performs a connection to the CDSE backend using oidc authentication.""" + return _create_connection( + url="openeo-staging.dataspace.copernicus.eu", + env_var_suffix="CDSE_STAGING", + ) + + def eodc_connection() -> openeo.Connection: """Perfroms a connection to the EODC backend using the oidc authentication.""" return _create_connection( @@ -93,4 +102,5 @@ def eodc_connection() -> openeo.Connection: BACKEND_CONNECTIONS: Dict[Backend, Callable] = { Backend.TERRASCOPE: vito_connection, Backend.CDSE: cdse_connection, + Backend.CDSE_STAGING: cdse_staging_connection, } diff --git a/src/openeo_gfmap/extractions/__init__.py b/src/openeo_gfmap/extractions/__init__.py deleted file mode 100644 index 543009c..0000000 --- a/src/openeo_gfmap/extractions/__init__.py +++ /dev/null @@ -1,11 +0,0 @@ -"""Extraction sub-module. - -Logic behind the extraction of training or inference data. Different backends -are supported in order to obtain a very similar result at the end of this -component. -""" - -from .extraction import CollectionExtractor -from .s2 import build_sentinel2_l2a_extractor - -__all__ = ["build_sentinel2_l2a_extractor", "CollectionExtractor"] diff --git a/src/openeo_gfmap/extractions/commons.py b/src/openeo_gfmap/extractions/commons.py deleted file mode 100644 index d43d523..0000000 --- a/src/openeo_gfmap/extractions/commons.py +++ /dev/null @@ -1,41 +0,0 @@ -""" Common operations within collection extraction logic, such as reprojection. -""" -from typing import Optional, Union - -import openeo -from rasterio import CRS -from rasterio.errors import CRSError - - -def resample_reproject( - datacube: openeo.DataCube, resolution: float, epsg_code: Optional[Union[str, int]] -) -> openeo.DataCube: - """Reprojects the given datacube to the target epsg code, if the provided - epsg code is not None. Also performs checks on the give code to check - its validity. - """ - if epsg_code is not None: - # Checks that the code is valid - try: - CRS.from_epsg(int(epsg_code)) - except (CRSError, ValueError) as exc: - raise ValueError( - f"Specified target_crs: {epsg_code} is not a valid " "EPSG code." - ) from exc - return datacube.resample_spatial(resolution=resolution, projection=epsg_code) - return datacube.resample_spatial(resolution=resolution) - - -def rename_bands(datacube: openeo.DataCube, mapping: dict) -> openeo.DataCube: - """Rename the bands from the given mapping scheme""" - # Filter out bands that are not part of the datacube - print(datacube.dimension_labels("bands")) - - def filter_condition(band_name, _): - return band_name in datacube.dimension_labels("bands") - - mapping = {k: v for k, v in mapping.items() if filter_condition(k, v)} - - return datacube.rename_labels( - dimension="bands", target=list(mapping.values()), source=list(mapping.keys()) - ) diff --git a/src/openeo_gfmap/extractions/extraction.py b/src/openeo_gfmap/extractions/extraction.py deleted file mode 100644 index 902dc57..0000000 --- a/src/openeo_gfmap/extractions/extraction.py +++ /dev/null @@ -1,78 +0,0 @@ -""" Main file for extractions and pre-processing of data through OpenEO -""" - -from typing import Callable - -import openeo - -from openeo_gfmap import BackendContext -from openeo_gfmap.spatial import SpatialContext -from openeo_gfmap.temporal import TemporalContext - - -class CollectionExtractor: - """Base class to extract a particolar collection. - - Parameters - ---------- - backend_context: BackendContext - Information about the backend in use, useful in certain cases. - bands: list - List of band names to load from that collection. - collection_fetch: Callable - Function defining how to fetch a collection for a specific backend, - the function accepts the following parameters: connection, - spatial extent, temporal extent, bands and additional parameters. - collection_preprocessing: Callable - Function defining how to harmonize the data of a collection in a - backend. For example, this function could rename the bands as they - can be different for every backend/collection (SENTINEL2_L2A or - SENTINEL2_L2A_SENTINELHUB). Accepts the following parameters: - datacube (of pre-fetched collection) and additional parameters. - colection_params: dict - Additional parameters encoded within a dictionnary that will be - passed in the fetch and preprocessing function. - """ - - def __init__( - self, - backend_context: BackendContext, - bands: list, - collection_fetch: Callable, - collection_preprocessing: Callable, - **collection_params, - ): - self.backend_contect = backend_context - self.bands = bands - self.fetcher = collection_fetch - self.processing = collection_preprocessing - self.params = collection_params - - def get_cube( - self, - connection: openeo.Connection, - spatial_context: SpatialContext, - temporal_context: TemporalContext, - ) -> openeo.DataCube: - """Retrieve a data cube from the given spatial and temporal context. - - Parameters - ---------- - connection: openeo.Connection - A connection to an OpenEO backend. The backend provided must be the - same as the one this extractor class is configured for. - spatial_extent: SpatialContext - Either a GeoJSON collection on which spatial filtering will be - applied or a bounding box with an EPSG code. If a bounding box is - provided, no filtering is applied and the entirety of the data is - fetched for that region. - temporal_extent: TemporalContext - The begin and end date of the extraction. - """ - collection_data = self.fetcher( - connection, spatial_context, temporal_context, self.bands, **self.params - ) - - preprocessed_data = self.processing(collection_data, **self.params) - - return preprocessed_data diff --git a/src/openeo_gfmap/extractions/s2.py b/src/openeo_gfmap/extractions/s2.py deleted file mode 100644 index 342f963..0000000 --- a/src/openeo_gfmap/extractions/s2.py +++ /dev/null @@ -1,129 +0,0 @@ -""" Extraction of S2 features, depending on the backend. -""" -from typing import Callable - -import openeo -from geojson import GeoJSON - -from openeo_gfmap.backend import Backend, BackendContext -from openeo_gfmap.spatial import BoundingBoxExtent, SpatialContext -from openeo_gfmap.temporal import TemporalContext - -from .commons import rename_bands, resample_reproject -from .extraction import CollectionExtractor - -BASE_SENTINEL2_L2A_MAPPING = { - "B01": "S2-B01", - "B02": "S2-B02", - "B03": "S2-B03", - "B04": "S2-B04", - "B05": "S2-B05", - "B06": "S2-B06", - "B07": "S2-B07", - "B08": "S2-B08", - "B8A": "S2-B8A", - "B09": "S2-B09", - "B11": "S2-B11", - "B12": "S2-B12", - "AOT": "S2-AOT", - "SCL": "S2-SCL", - "SNW": "S2-SNW", - "CLD": "S2-CLD", - "CLP": "s2cloudless-CLP", - "CLM": "s2clodless-CLM", -} - - -BAND_MAPPINGS = { - "SENTINEL2_L2A": BASE_SENTINEL2_L2A_MAPPING, -} - - -def get_s2_l2a_default_fetcher(collection_name: str) -> Callable: - """Builds the fetch function from the collection name as it stored in the - target backend. - """ - - def s2_l2a_fetch_default( - connection: openeo.Connection, - spatial_extent: SpatialContext, - temporal_extent: TemporalContext, - bands: list, - **params, - ) -> openeo.DataCube: - """Default collection fetcher for Sentinel_L2A collections. - Parameters - ---------- - connection: openeo.Connection - Connection to a general backend. - spatial_extent: SpatialContext - Either a GeoJSON collection or a bounding box of locations. - Performs spatial filtering if the spatial context is a GeoJSON - collection, as it implies sparse data. - temporal_extent: TemporalContexct - A time range, defined by a start and end date. - bands: list - The name of the bands to load from that collection - Returns - ------- - openeo.DataCube: a datacube containing the collection raw products. - """ - if isinstance(spatial_extent, BoundingBoxExtent): - spatial_extent = dict(spatial_extent) - elif isinstance(spatial_extent, GeoJSON): - assert ( - spatial_extent.get("crs", None) is not None - ), "CRS not defined within GeoJSON collection." - spatial_extent = dict(spatial_extent) - - cube = connection.load_collection(collection_name, spatial_extent, temporal_extent, bands) - - # Apply if the collection is a GeoJSON Feature collection - if isinstance(spatial_extent, GeoJSON): - cube = cube.filter_spatial(spatial_extent) - - return cube - - return s2_l2a_fetch_default - - -def get_s2_l2a_default_processor(collection_name: str) -> Callable: - """Builds the preprocessing function from the collection name as it stored - in the target backend. - """ - - def s2_l2a_default_processor(cube: openeo.DataCube, **params): - """Default collection preprocessing method. - This method performs reprojection if specified, upsampling of bands - at 10m resolution as well as band reprojection. - """ - # Reproject collection data to target CRS, if specified so - cube = resample_reproject(cube, 10.0, params.get("target_crs", None)) - - cube = rename_bands(cube, BAND_MAPPINGS.get(collection_name)) - - return cube - - return s2_l2a_default_processor - - -SENTINEL2_L2A_BACKEND_MAP = { - Backend.TERRASCOPE: { - "fetch": get_s2_l2a_default_fetcher("SENTINEL2_L2A"), - "preprocessor": get_s2_l2a_default_processor("SENTINEL2_L2A"), - } -} - - -def build_sentinel2_l2a_extractor( - backend_context: BackendContext, bands: list, **params -) -> CollectionExtractor: - """Creates a S2 L2A extractor adapted to the given backend.""" - backend_functions = SENTINEL2_L2A_BACKEND_MAP.get(backend_context.backend) - - fetcher, preprocessor = ( - backend_functions["fetch"], - backend_functions["preprocessor"], - ) - - return CollectionExtractor(backend_context, bands, fetcher, preprocessor, **params) diff --git a/src/openeo_gfmap/features/feature_extractor.py b/src/openeo_gfmap/features/feature_extractor.py index 28f6b79..b52ffa0 100644 --- a/src/openeo_gfmap/features/feature_extractor.py +++ b/src/openeo_gfmap/features/feature_extractor.py @@ -2,6 +2,8 @@ implementation of feature extractors of a UDF. """ +import inspect +import re from abc import ABC, abstractmethod import numpy as np @@ -13,59 +15,10 @@ from pyproj import Transformer from pyproj.crs import CRS -REQUIRED_IMPORTS = """ -from abc import ABC, abstractmethod - -import openeo -from openeo.udf import XarrayDataCube, inspect -from openeo.udf.udf_data import UdfData - -import xarray as xr -import numpy as np -from pyproj import Transformer -from pyproj.crs import CRS - -from typing import Union -""" - - LAT_HARMONIZED_NAME = "GEO-LAT" LON_HARMONIZED_NAME = "GEO-LON" EPSG_HARMONIZED_NAME = "GEO-EPSG" -# To fill in: EPSG_HARMONIZED_NAME, Is it pixel based and Feature Extractor class -APPLY_DATACUBE_SOURCE_CODE = """ -LAT_HARMONIZED_NAME = "{lat_harmonized_name}" -LON_HARMONIZED_NAME = "{lon_harmonized_name}" -EPSG_HARMONIZED_NAME = "{epsg_harmonized_name}" - -from openeo.udf import XarrayDataCube -from openeo.udf.udf_data import UdfData - -IS_PIXEL_BASED = {is_pixel_based} - -def apply_udf_data(udf_data: UdfData) -> XarrayDataCube: - feature_extractor = {feature_extractor_class}() # User-defined, feature extractor class initialized here - - if not IS_PIXEL_BASED: - assert len(udf_data.datacube_list) == 1, "OpenEO GFMAP Feature extractor pipeline only supports single input cubes for the tile." - - cube = udf_data.datacube_list[0] - parameters = udf_data.user_context - - proj = udf_data.proj - if proj is not None: - proj = proj["EPSG"] - - parameters[EPSG_HARMONIZED_NAME] = proj - - cube = feature_extractor._execute(cube, parameters=parameters) - - udf_data.datacube_list = [cube] - - return udf_data -""" - class FeatureExtractor(ABC): """Base class for all feature extractor UDFs. It provides some common @@ -184,12 +137,65 @@ def execute(self, inarr: xr.DataArray) -> xr.DataArray: pass +def apply_udf_data(udf_data: UdfData) -> XarrayDataCube: + feature_extractor_class = "" + + # User-defined, feature extractor class initialized here + feature_extractor = feature_extractor_class() + + is_pixel_based = issubclass(feature_extractor_class, PointFeatureExtractor) + + if not is_pixel_based: + assert ( + len(udf_data.datacube_list) == 1 + ), "OpenEO GFMAP Feature extractor pipeline only supports single input cubes for the tile." + + cube = udf_data.datacube_list[0] + parameters = udf_data.user_context + + proj = udf_data.proj + if proj is not None: + proj = proj["EPSG"] + + parameters[EPSG_HARMONIZED_NAME] = proj + + cube = feature_extractor._execute(cube, parameters=parameters) + + udf_data.datacube_list = [cube] + + return udf_data + + +def _get_imports() -> str: + with open(__file__, "r", encoding="UTF-8") as f: + script_source = f.read() + + lines = script_source.split("\n") + + imports = [] + static_globals = [] + + for line in lines: + if line.strip().startswith(("import ", "from ")): + imports.append(line) + elif re.match("^[A-Z_0-9]+\s*=.*$", line): + static_globals.append(line) + + return "\n".join(imports) + "\n\n" + "\n".join(static_globals) + + +def _get_apply_udf_data(feature_extractor: FeatureExtractor) -> str: + source_lines = inspect.getsource(apply_udf_data) + source = "".join(source_lines) + # replace in the source function the `feature_extractor_class` + return source.replace('""', feature_extractor.__name__) + + def generate_udf_code(feature_extractor_class: FeatureExtractor) -> openeo.UDF: """Generates the udf code by packing imports of this file, the necessary superclass and subclasses as well as the user defined feature extractor class and the apply_datacube function. """ - import inspect # UDF code that will be built here udf_code = "" @@ -198,36 +204,12 @@ class and the apply_datacube function. feature_extractor_class, FeatureExtractor ), "The feature extractor class must be a subclass of FeatureExtractor." - if issubclass(feature_extractor_class, PatchFeatureExtractor): - udf_code += f"{REQUIRED_IMPORTS}\n\n" - udf_code += f"{inspect.getsource(FeatureExtractor)}\n\n" - udf_code += f"{inspect.getsource(PatchFeatureExtractor)}\n\n" - udf_code += f"{inspect.getsource(feature_extractor_class)}\n\n" - udf_code += APPLY_DATACUBE_SOURCE_CODE.format( - lat_harmonized_name=LAT_HARMONIZED_NAME, - lon_harmonized_name=LON_HARMONIZED_NAME, - epsg_harmonized_name=EPSG_HARMONIZED_NAME, - is_pixel_based=False, - feature_extractor_class=feature_extractor_class.__name__, - ) - elif issubclass(feature_extractor_class, PointFeatureExtractor): - udf_code += f"{REQUIRED_IMPORTS}\n\n" - udf_code += f"{inspect.getsource(FeatureExtractor)}\n\n" - udf_code += f"{inspect.getsource(PointFeatureExtractor)}\n\n" - udf_code += f"{inspect.getsource(feature_extractor_class)}\n\n" - udf_code += APPLY_DATACUBE_SOURCE_CODE.format( - lat_harmonized_name=LAT_HARMONIZED_NAME, - lon_harmonized_name=LON_HARMONIZED_NAME, - epsg_harmonized_name=EPSG_HARMONIZED_NAME, - is_pixel_based=True, - feature_extractor_class=feature_extractor_class.__name__, - ) - else: - raise NotImplementedError( - "The feature extractor must be a subclass of either " - "PatchFeatureExtractor or PointFeatureExtractor." - ) - + udf_code += _get_imports() + "\n\n" + udf_code += f"{inspect.getsource(FeatureExtractor)}\n\n" + udf_code += f"{inspect.getsource(PatchFeatureExtractor)}\n\n" + udf_code += f"{inspect.getsource(PointFeatureExtractor)}\n\n" + udf_code += f"{inspect.getsource(feature_extractor_class)}\n\n" + udf_code += _get_apply_udf_data(feature_extractor_class) return udf_code @@ -247,13 +229,16 @@ def apply_feature_extractor( default, the feature extractor expects to receive S1 and S2 data stored in uint16 with the harmonized naming as implemented in the fetching module. """ + feature_extractor = feature_extractor_class() + feature_extractor._parameters = parameters + output_labels = feature_extractor.output_labels() udf_code = generate_udf_code(feature_extractor_class) udf = openeo.UDF(code=udf_code, context=parameters) cube = cube.apply_neighborhood(process=udf, size=size, overlap=overlap) - return cube.rename_labels(dimension="bands", target=feature_extractor_class().output_labels()) + return cube.rename_labels(dimension="bands", target=output_labels) def apply_feature_extractor_local( @@ -264,6 +249,10 @@ def apply_feature_extractor_local( excepts for the cube parameter which expects a `xarray.DataArray` instead of a `openeo.rest.datacube.DataCube` object. """ + feature_extractor = feature_extractor_class() + feature_extractor._parameters = parameters + output_labels = feature_extractor.output_labels() + udf_code = generate_udf_code(feature_extractor_class) udf = openeo.UDF(code=udf_code, context=parameters) @@ -276,8 +265,4 @@ def apply_feature_extractor_local( assert len(output_cubes) == 1, "UDF should have only a single output cube." - return ( - output_cubes[0] - .get_array() - .assign_coords({"bands": feature_extractor_class().output_labels()}) - ) + return output_cubes[0].get_array().assign_coords({"bands": output_labels}) diff --git a/src/openeo_gfmap/fetching/commons.py b/src/openeo_gfmap/fetching/commons.py index 898c86d..69dc65f 100644 --- a/src/openeo_gfmap/fetching/commons.py +++ b/src/openeo_gfmap/fetching/commons.py @@ -111,12 +111,16 @@ def load_collection( properties=load_collection_parameters, ) elif fetch_type == FetchType.POLYGON: - assert isinstance( - spatial_extent, GeoJSON - ), "Please provide only a GeoJSON FeatureCollection for point based fetching." - assert ( - spatial_extent["type"] == "FeatureCollection" - ), "Please provide a FeatureCollection type of GeoJSON" + if isinstance(spatial_extent, GeoJSON): + assert ( + spatial_extent["type"] == "FeatureCollection" + ), "Please provide a FeatureCollection type of GeoJSON" + elif isinstance(spatial_extent, str): + assert spatial_extent.startswith("https://") or spatial_extent.startswith( + "http://" + ), "Please provide a valid URL or a path to a GeoJSON file." + else: + raise ValueError("Please provide a valid URL to a GeoParquet or GeoJSON file.") cube = connection.load_collection( collection_id=collection_name, temporal_extent=[temporal_extent.start_date, temporal_extent.end_date], @@ -133,6 +137,13 @@ def load_collection( cube = cube.mask(pre_mask.resample_cube_spatial(cube)) if fetch_type == FetchType.POLYGON: - cube = cube.filter_spatial(spatial_extent) + if isinstance(spatial_extent, str): + geometry = connection.load_url( + spatial_extent, + format="Parquet" if ".parquet" in spatial_extent else "GeoJSON", + ) + cube = cube.filter_spatial(geometry) + else: + cube = cube.filter_spatial(spatial_extent) return cube diff --git a/src/openeo_gfmap/fetching/s1.py b/src/openeo_gfmap/fetching/s1.py index cf061fb..17d8454 100644 --- a/src/openeo_gfmap/fetching/s1.py +++ b/src/openeo_gfmap/fetching/s1.py @@ -19,10 +19,10 @@ from .fetching import CollectionFetcher, FetchType BASE_SENTINEL1_GRD_MAPPING = { - "VH": "S1-VH", - "HH": "S1-HH", - "HV": "S1-HV", - "VV": "S1-VV", + "VH": "S1-SIGMA0-VH", + "HH": "S1-SIGMA0-HH", + "HV": "S1-SIGMA0-HV", + "VV": "S1-SIGMA0-VV", } @@ -130,6 +130,10 @@ def s1_grd_default_processor(cube: openeo.DataCube, **params): "default": partial(get_s1_grd_default_fetcher, collection_name="SENTINEL1_GRD"), "preprocessor": partial(get_s1_grd_default_processor, collection_name="SENTINEL1_GRD"), }, + Backend.CDSE_STAGING: { + "default": partial(get_s1_grd_default_fetcher, collection_name="SENTINEL1_GRD"), + "preprocessor": partial(get_s1_grd_default_processor, collection_name="SENTINEL1_GRD"), + }, } diff --git a/src/openeo_gfmap/fetching/s2.py b/src/openeo_gfmap/fetching/s2.py index 0f7b3b3..d513fba 100644 --- a/src/openeo_gfmap/fetching/s2.py +++ b/src/openeo_gfmap/fetching/s2.py @@ -20,39 +20,39 @@ from .fetching import CollectionFetcher, FetchType BASE_SENTINEL2_L2A_MAPPING = { - "B01": "S2-B01", - "B02": "S2-B02", - "B03": "S2-B03", - "B04": "S2-B04", - "B05": "S2-B05", - "B06": "S2-B06", - "B07": "S2-B07", - "B08": "S2-B08", - "B8A": "S2-B8A", - "B09": "S2-B09", - "B11": "S2-B11", - "B12": "S2-B12", - "AOT": "S2-AOT", - "SCL": "S2-SCL", - "SNW": "S2-SNW", + "B01": "S2-L2A-B01", + "B02": "S2-L2A-B02", + "B03": "S2-L2A-B03", + "B04": "S2-L2A-B04", + "B05": "S2-L2A-B05", + "B06": "S2-L2A-B06", + "B07": "S2-L2A-B07", + "B08": "S2-L2A-B08", + "B8A": "S2-L2A-B8A", + "B09": "S2-L2A-B09", + "B11": "S2-L2A-B11", + "B12": "S2-L2A-B12", + "AOT": "S2-L2A-AOT", + "SCL": "S2-L2A-SCL", + "SNW": "S2-L2A-SNW", } ELEMENT84_SENTINEL2_L2A_MAPPING = { - "coastal": "S2-B01", - "blue": "S2-B02", - "green": "S2-B03", - "red": "S2-B04", - "rededge1": "S2-B05", - "rededge2": "S2-B06", - "rededge3": "S2-B07", - "nir": "S2-B08", - "nir08": "S2-B8A", - "nir09": "S2-B09", - "cirrus": "S2-B10", - "swir16": "S2-B11", - "swir22": "S2-B12", - "scl": "S2-SCL", - "aot": "S2-AOT", + "coastal": "S2-L2A-B01", + "blue": "S2-L2A-B02", + "green": "S2-L2A-B03", + "red": "S2-L2A-B04", + "rededge1": "S2-L2A-B05", + "rededge2": "S2-L2A-B06", + "rededge3": "S2-L2A-B07", + "nir": "S2-L2A-B08", + "nir08": "S2-L2A-B8A", + "nir09": "S2-L2A-B09", + "cirrus": "S2-L2A-B10", + "swir16": "S2-L2A-B11", + "swir22": "S2-L2A-B12", + "scl": "S2-L2A-SCL", + "aot": "S2-L2A-AOT", } @@ -105,10 +105,6 @@ def s2_l2a_fetch_default( **params, ) - # Apply if the collection is a GeoJSON Feature collection - if isinstance(spatial_extent, GeoJSON): - cube = cube.filter_spatial(spatial_extent) - return cube return s2_l2a_fetch_default @@ -190,6 +186,10 @@ def s2_l2a_default_processor(cube: openeo.DataCube, **params): "fetch": partial(get_s2_l2a_default_fetcher, collection_name="SENTINEL2_L2A"), "preprocessor": partial(get_s2_l2a_default_processor, collection_name="SENTINEL2_L2A"), }, + Backend.CDSE_STAGING: { + "fetch": partial(get_s2_l2a_default_fetcher, collection_name="SENTINEL2_L2A"), + "preprocessor": partial(get_s2_l2a_default_processor, collection_name="SENTINEL2_L2A"), + }, } diff --git a/src/openeo_gfmap/manager/__init__.py b/src/openeo_gfmap/manager/__init__.py new file mode 100644 index 0000000..4e6c070 --- /dev/null +++ b/src/openeo_gfmap/manager/__init__.py @@ -0,0 +1,6 @@ +"""OpenEO GFMAP Manager submodule. Implements the logic of splitting the jobs into subjobs and +managing the subjobs. +""" +import logging + +_log = logging.getLogger(__name__) diff --git a/src/openeo_gfmap/manager/job_manager.py b/src/openeo_gfmap/manager/job_manager.py new file mode 100644 index 0000000..6021cb2 --- /dev/null +++ b/src/openeo_gfmap/manager/job_manager.py @@ -0,0 +1,304 @@ +import json +import queue +import shutil +import threading +from enum import Enum +from pathlib import Path +from tempfile import NamedTemporaryFile +from typing import Callable, Optional, Union + +import pandas as pd +import pystac +from openeo.extra.job_management import MultiBackendJobManager +from openeo.rest.job import BatchJob +from pystac import CatalogType + +from openeo_gfmap.manager import _log +from openeo_gfmap.stac import constants + + +class PostJobStatus(Enum): + """Indicates the workers if the job finished as sucessful or with an error.""" + + FINISHED = "finished" + ERROR = "error" + + +class GFMAPJobManager(MultiBackendJobManager): + """A job manager for the GFMAP backend.""" + + def __init__( + self, + output_dir: Path, + output_path_generator: Callable, + collection_id: str, + collection_description: str = "", + post_job_action: Optional[Callable] = None, + poll_sleep: int = 5, + n_threads: int = 1, + post_job_params: dict = {}, + ): + self._output_dir = output_dir + + # Setup the threads to work on the on_job_done and on_job_error methods + self._finished_job_queue = queue.Queue() + self._n_threads = n_threads + + self._threads = [] + + self._output_path_gen = output_path_generator + self._post_job_action = post_job_action + self._post_job_params = post_job_params + + # Monkey patching the _normalize_df method to ensure we have no modification on the + # geometry column + MultiBackendJobManager._normalize_df = self._normalize_df + super().__init__(poll_sleep) + + # Generate the root STAC collection + self._root_collection = pystac.Collection( + id=collection_id, + description=collection_description, + extent=None, + ) + + def _post_job_worker(self): + """Checks which jobs are finished or failed and calls the `on_job_done` or `on_job_error` + methods.""" + while True: + try: + status, job, row = self._finished_job_queue.get(timeout=1) + _log.debug( + f"Worker thread {threading.current_thread().name}: polled finished job with status {status}." + ) + if status == PostJobStatus.ERROR: + self.on_job_error(job, row) + elif status == PostJobStatus.FINISHED: + self.on_job_done(job, row) + else: + raise ValueError(f"Unknown status: {status}") + self._finished_job_queue.task_done() + except queue.Empty: + continue + except KeyboardInterrupt: + _log.debug(f"Worker thread {threading.current_thread().name} interrupted.") + return + + def _update_statuses(self, df: pd.DataFrame): + """Updates the statues of the jobs in the dataframe from the backend. If a job is finished + or failed, it will be queued to the `on_job_done` or `on_job_error` methods. + + The method is executed every `poll_sleep` seconds. + """ + active = df[df.status.isin(["created", "queued", "running"])] + for idx, row in active.iterrows(): + # Parses the backend from the csv + connection = self._get_connection(row.backend_name) + job = connection.job(row.id) + job_metadata = job.describe_job() + job_status = job_metadata["status"] + _log.debug( + msg=f"Status of job {job.job_id} is {job_status} (on backend {row.backend_name}).", + ) + + # Update the status if the job finished since last check + # Case is which it finished sucessfully + if (df.loc[idx, "status"] in ["created", "queued", "running"]) and ( + job_metadata["status"] == "finished" + ): + _log.info(f"Job {job.job_id} finished successfully, queueing on_job_done...") + self._finished_job_queue.put((PostJobStatus.FINISHED, job, row)) + df.loc[idx, "costs"] = job_metadata["costs"] + + # Case in which it failed + if (df.loc[idx, "status"] != "error") and (job_metadata["status"] == "error"): + _log.info(f"Job {job.job_id} finished with error, queueing on_job_error...") + self._finished_job_queue.put((PostJobStatus.ERROR, job, row)) + df.loc[idx, "costs"] = job_metadata["costs"] + + df.loc[idx, "status"] = job_status + + def on_job_error(self, job: BatchJob, row: pd.Series): + """Method called when a job finishes with an error. + + Parameters + ---------- + job: BatchJob + The job that finished with an error. + row: pd.Series + The row in the dataframe that contains the job relative information. + """ + logs = job.logs() + error_logs = [log for log in logs if log.level.lower() == "error"] + + job_metadata = job.describe_job() + title = job_metadata["title"] + job_id = job_metadata["id"] + + output_log_path = Path(self._output_dir) / "failed_jobs" / f"{title}_{job_id}.log" + output_log_path.parent.mkdir(parents=True, exist_ok=True) + + if len(error_logs) > 0: + output_log_path.write_text(json.dumps(error_logs, indent=2)) + else: + output_log_path.write_text( + f"Couldn't find any error logs. Please check the error manually on job ID: {job.job_id}." + ) + + def on_job_done(self, job: BatchJob, row: pd.Series): + """Method called when a job finishes successfully. It will first download the results of + the job and then call the `post_job_action` method. + """ + job_products = {} + for idx, asset in enumerate(job.get_results().get_assets()): + temp_file = NamedTemporaryFile(delete=False) + try: + _log.debug( + f"Downloading asset {asset.name} from job {job.job_id} -> {temp_file.name}" + ) + asset.download(temp_file.name) + _log.debug( + f"Generating output path for asset {asset.name} from job {job.job_id}..." + ) + output_path = self._output_path_gen(self._output_dir, temp_file.name, idx, row) + _log.debug( + f"Generated path for asset {asset.name} from job {job.job_id} -> {output_path}" + ) + # Make the output path + output_path.parent.mkdir(parents=True, exist_ok=True) + # Move the temporary file to the final location + shutil.move(temp_file.name, output_path) + # Add to the list of downloaded products + job_products[f"{job.job_id}_{asset.name}"] = [output_path] + _log.info(f"Downloaded asset {asset.name} from job {job.job_id} -> {output_path}") + except Exception as e: + _log.exception(f"Error downloading asset {asset.name} from job {job.job_id}", e) + raise e + finally: + shutil.rmtree(temp_file.name, ignore_errors=True) + + # First update the STAC collection with the assets directly resulting from the OpenEO batch job + job_metadata = pystac.Collection.from_dict(job.get_results().get_metadata()) + job_items = [] + + for item_metadata in job_metadata.get_all_items(): + try: + item = pystac.read_file(item_metadata.get_self_href()) + asset_path = job_products[f"{job.job_id}_{item.id}"][0] + + assert len(item.assets.values()) == 1, "Each item should only contain one asset" + for asset in item.assets.values(): + asset.href = str( + asset_path + ) # Update the asset href to the output location set by the output_path_generator + item.id = f"{job.job_id}_{item.id}" + # Add the item to the the current job items. + job_items.append(item) + _log.info(f"Parsed item {item.id} from job {job.job_id}") + except Exception as e: + _log.exception( + f"Error failed to add item {item.id} from job {job.job_id} to STAC collection", + e, + ) + raise e + + # _post_job_action returns an updated list of stac items. Post job action can therefore + # update the stac items and access their products through the HREF. It is also the + # reponsible of adding the appropriate metadata/assets to the items. + if self._post_job_action is not None: + _log.debug(f"Calling post job action for job {job.job_id}...") + job_items = self._post_job_action(job_items, row, self._post_job_params) + + self._root_collection.add_items(job_items) + _log.info(f"Added {len(job_items)} items to the STAC collection.") + + _log.info(f"Job {job.job_id} and post job action finished successfully.") + + def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame: + """Ensure we have the required columns and the expected type for the geometry column. + + :param df: The dataframe to normalize. + :return: a new dataframe that is normalized. + """ + + # check for some required columns. + required_with_default = [ + ("status", "not_started"), + ("id", None), + ("start_time", None), + ("cpu", None), + ("memory", None), + ("duration", None), + ("backend_name", None), + ("description", None), + ("costs", None), + ] + new_columns = {col: val for (col, val) in required_with_default if col not in df.columns} + df = df.assign(**new_columns) + + _log.debug(f"Normalizing dataframe. Columns: {df.columns}") + + return df + + def run_jobs(self, df: pd.DataFrame, start_job: Callable, output_file: Union[str, Path]): + """Starts the jobs defined in the dataframe and runs the `start_job` function on each job. + + Parameters + ---------- + df: pd.DataFrame + The dataframe containing the jobs to be started. The dataframe expects the following columns: + + * `backend_name`: Name of the backend to use. + * Additional fields that will be used in your custom job creation function `start_job` + as well as in post-job actions and path generator. + + The following column names are RESERVED for the managed of the jobs, please do not + provide them in the input df: + + * `status`: Current status of the job. + * `id`: Job ID, used to access job information from the backend. + * `start_time`: The time at which the job was started. + * `cpu`: The amount of CPU used by the job. + * `memory`: The amount of memory used by the job. + * `duration`: The duration of the job. + + start_job: Callable + Callable function that will take in argument the rows of each job and that will + create a datacube. + output_file: Union[str, Path] + The file to track the results of the jobs. + """ + _log.info(f"Starting job manager using {self._n_threads} worker threads.") + # Starts the thread pool to work on the on_job_done and on_job_error methods + for _ in range(self._n_threads): + thread = threading.Thread(target=self._post_job_worker) + thread.start() + self._threads.append(thread) + + _log.info("Workers started, creating and running jobs.") + super().run_jobs(df, start_job, output_file) + + def create_stac(self, output_path: Optional[Union[str, Path]] = None): + """Method to be called after run_jobs to create a STAC catalog + and write it to self._output_dir + """ + if output_path is None: + output_path = self._output_dir / "stac" + + self._root_collection.license = constants.LICENSE + self._root_collection.add_link(constants.LICENSE_LINK) + self._root_collection.stac_extensions = constants.STAC_EXTENSIONS + + datacube_extension = pystac.extensions.datacube.DatacubeExtension.ext( + self._root_collection, add_if_missing=True + ) + datacube_extension.apply(constants.CUBE_DIMENSIONS) + + item_asset_extension = pystac.extensions.item_assets.ItemAssetsExtension.ext( + self._root_collection, add_if_missing=True + ) + item_asset_extension.item_assets = constants.ITEM_ASSETS + + self._root_collection.update_extent_from_items() + self._root_collection.normalize_hrefs(str(output_path)) + self._root_collection.save(catalog_type=CatalogType.SELF_CONTAINED) diff --git a/src/openeo_gfmap/manager/job_splitters.py b/src/openeo_gfmap/manager/job_splitters.py new file mode 100644 index 0000000..a696b5c --- /dev/null +++ b/src/openeo_gfmap/manager/job_splitters.py @@ -0,0 +1,63 @@ +"""Job splitter functionalities, except input points/polygons to extract in the +form of a GeoDataFrames. +""" +from typing import List + +import geopandas as gpd +import h3 + + +def _resplit_group(polygons: gpd.GeoDataFrame, max_points: int) -> List[gpd.GeoDataFrame]: + """Performs re-splitting of a dataset of polygons in a list of datasets""" + for i in range(0, len(polygons), max_points): + yield polygons.iloc[i : i + max_points].reset_index(drop=True) + + +def split_job_hex( + polygons: gpd.GeoDataFrame, max_points: int = 500, grid_resolution: int = 3 +) -> List[gpd.GeoDataFrame]: + """Split a job into multiple jobs from the position of the polygons/points. The centroid of + the geometries to extract are used to select a hexagon in the H3 grid. Using the H3 grid + allows to split jobs in equal areas, which is useful for parallel processing while taking into + account OpenEO's limitations. + + Parameters + ---------- + polygons: gpd.GeoDataFrae + Dataset containing the polygons to split the job by with a `geometry` column. + max_points: int + The maximum number of points to be included in each job. + grid_resolution: int + The scale to use in the H3 hexagonal grid to split jobs to, default is 4. Changing the + grid scale will drastically increase/decrease the area on which jobs will work. + More information on the H3 grid can be found at + https://h3geo.org/docs/core-library/restable + Returns: + -------- + split_polygons: list + List of jobs, split by the GeoDataFrame. + """ + + if "geometry" not in polygons.columns: + raise ValueError("The GeoDataFrame must contain a 'geometry' column.") + + if polygons.crs is None: + raise ValueError("The GeoDataFrame must contain a CRS") + + # Project to lat/lon positions + polygons = polygons.to_crs(epsg=4326) + + # Split the polygons into multiple jobs + polygons["h3index"] = polygons.geometry.centroid.apply( + lambda pt: h3.geo_to_h3(pt.y, pt.x, grid_resolution) + ) + + split_datasets = [] + for _, sub_gdf in polygons.groupby("h3index"): + if len(sub_gdf) > max_points: + # Performs another split + split_datasets.extend(_resplit_group(sub_gdf, max_points)) + else: + split_datasets.append(sub_gdf.reset_index(drop=True)) + + return split_datasets diff --git a/src/openeo_gfmap/preprocessing/cloudmasking.py b/src/openeo_gfmap/preprocessing/cloudmasking.py index e605689..93289d3 100644 --- a/src/openeo_gfmap/preprocessing/cloudmasking.py +++ b/src/openeo_gfmap/preprocessing/cloudmasking.py @@ -6,8 +6,8 @@ import openeo from openeo.processes import if_, is_nan -SCL_HARMONIZED_NAME: str = "S2-SCL" -BAPSCORE_HARMONIZED_NAME: str = "S2-BAPSCORE" +SCL_HARMONIZED_NAME: str = "S2-L2A-SCL" +BAPSCORE_HARMONIZED_NAME: str = "S2-L2A-BAPSCORE" def mask_scl_dilation(cube: openeo.DataCube, **params: dict) -> openeo.DataCube: @@ -91,7 +91,7 @@ def get_bap_score(cube: openeo.DataCube, **params: dict) -> openeo.DataCube: Returns ------- openeo.DataCube - A 4D datacube containing the BAP score as name 'S2-BAPSCORE'. + A 4D datacube containing the BAP score as name 'S2-L2A-BAPSCORE'. """ udf_path = Path(__file__).parent / "udf_score.py" @@ -159,7 +159,7 @@ def get_bap_mask(cube: openeo.DataCube, period: Union[str, list], **params: dict openeo.DataCube The datacube with the BAP mask applied. """ - # Checks if the S2-SCL band is present in the datacube + # Checks if the S2-L2A-SCL band is present in the datacube assert ( SCL_HARMONIZED_NAME in cube.metadata.band_names ), f"The {SCL_HARMONIZED_NAME} band is not present in the datacube." @@ -196,7 +196,7 @@ def max_score_selection(score): f"'period' must be a string or a list of dates (in YYYY-mm-dd format), got {period}." ) - return rank_mask.rename_labels("bands", ["S2-BAPMASK"]) + return rank_mask.rename_labels("bands", ["S2-L2A-BAPMASK"]) def bap_masking(cube: openeo.DataCube, period: Union[str, list], **params: dict): diff --git a/src/openeo_gfmap/preprocessing/udf_rank.py b/src/openeo_gfmap/preprocessing/udf_rank.py index 7271b61..043712d 100644 --- a/src/openeo_gfmap/preprocessing/udf_rank.py +++ b/src/openeo_gfmap/preprocessing/udf_rank.py @@ -16,7 +16,7 @@ def apply_datacube(cube: XarrayDataCube, context: dict) -> XarrayDataCube: intervals = context.get("intervals", None) array = cube.get_array().transpose("t", "bands", "y", "x") - bap_score = array.sel(bands="S2-BAPSCORE") + bap_score = array.sel(bands="S2-L2A-BAPSCORE") def select_maximum(score: xr.DataArray): max_score = score.max(dim="t") diff --git a/src/openeo_gfmap/spatial.py b/src/openeo_gfmap/spatial.py index 6625120..f4d419f 100644 --- a/src/openeo_gfmap/spatial.py +++ b/src/openeo_gfmap/spatial.py @@ -44,4 +44,4 @@ def to_geometry(self) -> Polygon: return box(self.west, self.south, self.east, self.north) -SpatialContext = Union[GeoJSON, BoundingBoxExtent] +SpatialContext = Union[GeoJSON, BoundingBoxExtent, str] diff --git a/src/openeo_gfmap/stac/__init__.py b/src/openeo_gfmap/stac/__init__.py new file mode 100644 index 0000000..a0b68a1 --- /dev/null +++ b/src/openeo_gfmap/stac/__init__.py @@ -0,0 +1,6 @@ +"""Definitions of the constants in the STAC collection +""" + +from openeo_gfmap.stac.constants import AUXILIARY + +__all__ = ["AUXILIARY"] diff --git a/src/openeo_gfmap/stac/constants.py b/src/openeo_gfmap/stac/constants.py new file mode 100644 index 0000000..0213547 --- /dev/null +++ b/src/openeo_gfmap/stac/constants.py @@ -0,0 +1,229 @@ +""" +Constants in the STAC collection generated after a series of batch jobs +""" +import pystac + +LICENSE = "CC-BY-4.0" +LICENSE_LINK = pystac.Link( + rel="license", + target="https://spdx.org/licenses/CC-BY-4.0.html", + media_type=pystac.MediaType.HTML, + title="Creative Commons Attribution 4.0 International License", +) +STAC_EXTENSIONS = [ + "https://stac-extensions.github.io/eo/v1.1.0/schema.json", + "https://stac-extensions.github.io/file/v2.1.0/schema.json", + "https://stac-extensions.github.io/processing/v1.1.0/schema.json", + "https://stac-extensions.github.io/projection/v1.1.0/schema.json", +] +CONSTELLATION = ["sentinel-2"] +PLATFORM = ["sentinel-2a", "sentinel-2b"] +INSTRUMENTS = ["msi"] +GSD = [10, 20, 60] +SUMMARIES = pystac.Summaries( + { + "constellation": CONSTELLATION, + "platform": PLATFORM, + "instruments": INSTRUMENTS, + "gsd": GSD, + } +) + + +def create_spatial_dimension( + name: str, +) -> pystac.extensions.datacube.HorizontalSpatialDimension: + return pystac.extensions.datacube.HorizontalSpatialDimension( + { + "axis": name, + "step": 10, + "reference_system": { + "$schema": "https://proj.org/schemas/v0.2/projjson.schema.json", + "area": "World", + "bbox": { + "east_longitude": 180, + "north_latitude": 90, + "south_latitude": -90, + "west_longitude": -180, + }, + "coordinate_system": { + "axis": [ + { + "abbreviation": "Lat", + "direction": "north", + "name": "Geodetic latitude", + "unit": "degree", + }, + { + "abbreviation": "Lon", + "direction": "east", + "name": "Geodetic longitude", + "unit": "degree", + }, + ], + "subtype": "ellipsoidal", + }, + "datum": { + "ellipsoid": { + "inverse_flattening": 298.257223563, + "name": "WGS 84", + "semi_major_axis": 6378137, + }, + "name": "World Geodetic System 1984", + "type": "GeodeticReferenceFrame", + }, + "id": {"authority": "OGC", "code": "Auto42001", "version": "1.3"}, + "name": "AUTO 42001 (Universal Transverse Mercator)", + "type": "GeodeticCRS", + }, + } + ) + + +TEMPORAL_DIMENSION = pystac.extensions.datacube.TemporalDimension( + {"extent": ["2015-06-23T00:00:00Z", "2019-07-10T13:44:56Z"], "step": "P5D"} +) + +BANDS_DIMENSION = pystac.extensions.datacube.AdditionalDimension( + { + "values": [ + "S2-L2A-SCL", + "S2-L2A-B01", + "S2-L2A-B02", + "S2-L2A-B03", + "S2-L2A-B04", + "S2-L2A-B05", + "S2-L2A-B06", + "S2-L2A-B07", + "S2-L2A-B08", + "S2-L2A-B8A", + "S2-L2A-B09", + "S2-L2A-B10", + "S2-L2A-B11", + "S2-L2A-B12", + "LABEL", + ] + } +) + +CUBE_DIMENSIONS = { + "x": create_spatial_dimension("x"), + "y": create_spatial_dimension("y"), + "time": TEMPORAL_DIMENSION, + "spectral": BANDS_DIMENSION, +} + +SENTINEL2 = pystac.extensions.item_assets.AssetDefinition( + { + "gsd": 10, + "title": "Sentinel2", + "description": "Sentinel-2 bands", + "type": "application/x-netcdf", + "roles": ["data"], + "proj:shape": [64, 64], + "raster:bands": [{"name": "S2-L2A-B01"}, {"name": "S2-L2A-B02"}], + "cube:variables": { + "S2-L2A-B01": {"dimensions": ["time", "y", "x"], "type": "data"}, + "S2-L2A-B02": {"dimensions": ["time", "y", "x"], "type": "data"}, + "S2-L2A-B03": {"dimensions": ["time", "y", "x"], "type": "data"}, + "S2-L2A-B04": {"dimensions": ["time", "y", "x"], "type": "data"}, + "S2-L2A-B05": {"dimensions": ["time", "y", "x"], "type": "data"}, + "S2-L2A-B06": {"dimensions": ["time", "y", "x"], "type": "data"}, + "S2-L2A-B07": {"dimensions": ["time", "y", "x"], "type": "data"}, + "S2-L2A-B8A": {"dimensions": ["time", "y", "x"], "type": "data"}, + "S2-L2A-B08": {"dimensions": ["time", "y", "x"], "type": "data"}, + "S2-L2A-B11": {"dimensions": ["time", "y", "x"], "type": "data"}, + "S2-L2A-B12": {"dimensions": ["time", "y", "x"], "type": "data"}, + "S2-L2A-SCL": {"dimensions": ["time", "y", "x"], "type": "data"}, + }, + "eo:bands": [ + { + "name": "S2-L2A-B01", + "common_name": "coastal", + "center_wavelength": 0.443, + "full_width_half_max": 0.027, + }, + { + "name": "S2-L2A-B02", + "common_name": "blue", + "center_wavelength": 0.49, + "full_width_half_max": 0.098, + }, + { + "name": "S2-L2A-B03", + "common_name": "green", + "center_wavelength": 0.56, + "full_width_half_max": 0.045, + }, + { + "name": "S2-L2A-B04", + "common_name": "red", + "center_wavelength": 0.665, + "full_width_half_max": 0.038, + }, + { + "name": "S2-L2A-B05", + "common_name": "rededge", + "center_wavelength": 0.704, + "full_width_half_max": 0.019, + }, + { + "name": "S2-L2A-B06", + "common_name": "rededge", + "center_wavelength": 0.74, + "full_width_half_max": 0.018, + }, + { + "name": "S2-L2A-B07", + "common_name": "rededge", + "center_wavelength": 0.783, + "full_width_half_max": 0.028, + }, + { + "name": "S2-L2A-B08", + "common_name": "nir", + "center_wavelength": 0.842, + "full_width_half_max": 0.145, + }, + { + "name": "S2-L2A-B8A", + "common_name": "nir08", + "center_wavelength": 0.865, + "full_width_half_max": 0.033, + }, + { + "name": "S2-L2A-B11", + "common_name": "swir16", + "center_wavelength": 1.61, + "full_width_half_max": 0.143, + }, + { + "name": "S2-L2A-B12", + "common_name": "swir22", + "center_wavelength": 2.19, + "full_width_half_max": 0.242, + }, + ], + } +) + +AUXILIARY = pystac.extensions.item_assets.AssetDefinition( + { + "title": "ground truth data", + "description": "This asset contains the crop type codes.", + "type": "application/x-netcdf", + "roles": ["data"], + "proj:shape": [64, 64], + "raster:bands": [{"name": "CROPTYPE", "data_type": "uint16", "bits_per_sample": 16}], + } +) + +SENTINEL1 = pystac.extensions.item_assets.AssetDefinition({}) + +AGERA5 = pystac.extensions.item_assets.AssetDefinition({}) +ITEM_ASSETS = { + "sentinel2": SENTINEL2, + "auxiliary": AUXILIARY, + "sentinel1": SENTINEL1, + "agera5": AGERA5, +} diff --git a/src/openeo_gfmap/utils/build_df.py b/src/openeo_gfmap/utils/build_df.py index 7627189..8302b30 100644 --- a/src/openeo_gfmap/utils/build_df.py +++ b/src/openeo_gfmap/utils/build_df.py @@ -27,7 +27,7 @@ def load_json(input_file: Path, bands: list) -> pd.DataFrame: A `pandas.DataFrame` containing a combination of the band names and the timestamps as column names. For example, the Sentinel-2 green band on the 1st October 2020 is will - have the column name `S2-B02:2020-10-01` + have the column name `S2-L2A-B02:2020-10-01` """ df = pd.read_json(input_file) diff --git a/src/openeo_gfmap/utils/tile_processing.py b/src/openeo_gfmap/utils/tile_processing.py index fa1efab..5986eda 100644 --- a/src/openeo_gfmap/utils/tile_processing.py +++ b/src/openeo_gfmap/utils/tile_processing.py @@ -20,7 +20,7 @@ def normalize_array(inarr: xr.DataArray, percentile: float = 0.99) -> xr.DataArr def select_optical_bands(inarr: xr.DataArray) -> xr.DataArray: """Filters and keep only the optical bands for a given array.""" return inarr.sel( - bands=[band for band in inarr.coords["bands"].to_numpy() if band.startswith("S2-B")] + bands=[band for band in inarr.coords["bands"].to_numpy() if band.startswith("S2-L2A-B")] ) diff --git a/tests/test_openeo_gfmap/resources/test_optical_cube.nc b/tests/test_openeo_gfmap/resources/test_optical_cube.nc index ce7f8fb..8d5e2c0 100644 Binary files a/tests/test_openeo_gfmap/resources/test_optical_cube.nc and b/tests/test_openeo_gfmap/resources/test_optical_cube.nc differ diff --git a/tests/test_openeo_gfmap/resources/wc_extraction_dataset.gpkg b/tests/test_openeo_gfmap/resources/wc_extraction_dataset.gpkg new file mode 100644 index 0000000..0136a4a Binary files /dev/null and b/tests/test_openeo_gfmap/resources/wc_extraction_dataset.gpkg differ diff --git a/tests/test_openeo_gfmap/test_cloud_masking.py b/tests/test_openeo_gfmap/test_cloud_masking.py index 2328f9d..d8b88dc 100644 --- a/tests/test_openeo_gfmap/test_cloud_masking.py +++ b/tests/test_openeo_gfmap/test_cloud_masking.py @@ -42,7 +42,7 @@ def test_bap_score(backend: Backend): # Fetch the datacube s2_extractor = build_sentinel2_l2a_extractor( backend_context=backend_context, - bands=["S2-B04", "S2-B08", "S2-SCL"], + bands=["S2-L2A-B04", "S2-L2A-B08", "S2-L2A-SCL"], fetch_type=FetchType.TILE, **fetching_parameters, ) @@ -51,9 +51,9 @@ def test_bap_score(backend: Backend): # Compute the BAP score bap_score = get_bap_score(cube, **preprocessing_parameters) - ndvi = cube.ndvi(nir="S2-B08", red="S2-B04") + ndvi = cube.ndvi(nir="S2-L2A-B08", red="S2-L2A-B04") - cube = bap_score.merge_cubes(ndvi).rename_labels("bands", ["S2-BAPSCORE", "S2-NDVI"]) + cube = bap_score.merge_cubes(ndvi).rename_labels("bands", ["S2-L2A-BAPSCORE", "S2-L2A-NDVI"]) job = cube.create_job( title="BAP score unittest", @@ -78,7 +78,7 @@ def test_bap_masking(backend: Backend): # Fetch the datacube s2_extractor = build_sentinel2_l2a_extractor( backend_context=backend_context, - bands=["S2-B04", "S2-B03", "S2-B02", "S2-SCL"], + bands=["S2-L2A-B04", "S2-L2A-B03", "S2-L2A-B02", "S2-L2A-SCL"], fetch_type=FetchType.TILE, **fetching_parameters, ) @@ -96,7 +96,7 @@ def test_bap_masking(backend: Backend): cube = cube.linear_scale_range(0, 65535, 0, 65535) # Remove SCL - cube = cube.filter_bands([band for band in cube.metadata.band_names if band != "S2-SCL"]) + cube = cube.filter_bands([band for band in cube.metadata.band_names if band != "S2-L2A-SCL"]) job = cube.create_job( title="BAP compositing unittest", @@ -122,7 +122,7 @@ def test_bap_quintad(backend: Backend): # Fetch the datacube s2_extractor = build_sentinel2_l2a_extractor( backend_context=backend_context, - bands=["S2-SCL"], + bands=["S2-L2A-SCL"], fetch_type=FetchType.TILE, **fetching_parameters, ) @@ -171,7 +171,7 @@ def test_bap_quintad(backend: Backend): s2_extractor = build_sentinel2_l2a_extractor( backend_context=backend_context, - bands=["S2-B04", "S2-B03", "S2-B02", "S2-B08", "S2-SCL"], + bands=["S2-L2A-B04", "S2-L2A-B03", "S2-L2A-B02", "S2-L2A-B08", "S2-L2A-SCL"], fetch_type=FetchType.TILE, **fetching_parameters, ) diff --git a/tests/test_openeo_gfmap/test_feature_extractors.py b/tests/test_openeo_gfmap/test_feature_extractors.py index 621c010..8fe91e4 100644 --- a/tests/test_openeo_gfmap/test_feature_extractors.py +++ b/tests/test_openeo_gfmap/test_feature_extractors.py @@ -34,7 +34,7 @@ def execute(self, inarr: xr.DataArray): from scipy.ndimage import gaussian_filter # Performs some gaussian filtering to blur the RGB bands - rgb_bands = inarr.sel(bands=["S2-B04", "S2-B03", "S2-B02"]) + rgb_bands = inarr.sel(bands=["S2-L2A-B04", "S2-L2A-B03", "S2-L2A-B02"]) for band in rgb_bands.bands: for timestamp in rgb_bands.t: @@ -78,7 +78,7 @@ def test_patch_feature_udf(backend: Backend, connection_fn: Callable): connection = connection_fn() output_path = Path(__file__).parent / f"results/patch_features_{backend.value}.nc/" - bands_to_extract = ["S2-B04", "S2-B03", "S2-B02"] + bands_to_extract = ["S2-L2A-B04", "S2-L2A-B03", "S2-L2A-B02"] # Setup the RGB cube extraction extractor = build_sentinel2_l2a_extractor(backend_context, bands_to_extract, FetchType.TILE) @@ -120,7 +120,7 @@ def test_latlon_extractor(backend: Backend, connection_fn: Callable): REDUCED_TEMPORAL_CONTEXT = TemporalContext(start_date="2023-06-01", end_date="2023-06-30") - bands_to_extract = ["S2-B04"] + bands_to_extract = ["S2-L2A-B04"] extractor = build_sentinel2_l2a_extractor(backend_context, bands_to_extract, FetchType.TILE) diff --git a/tests/test_openeo_gfmap/test_managers.py b/tests/test_openeo_gfmap/test_managers.py new file mode 100644 index 0000000..5ed266b --- /dev/null +++ b/tests/test_openeo_gfmap/test_managers.py @@ -0,0 +1,23 @@ +"""Test the job splitters and managers of OpenEO GFMAP.""" +from pathlib import Path + +import geopandas as gpd + +from openeo_gfmap.manager.job_splitters import split_job_hex + + +def test_split_jobs(): + dataset_path = Path(__file__).parent / "resources/wc_extraction_dataset.gpkg" + + # Load the dataset + dataset = gpd.read_file(dataset_path) + + # Split the dataset + split_dataset = split_job_hex(dataset, max_points=500) + + # Check the number of splits + assert len(split_dataset) > 1 + + for ds in split_dataset: + print(len(ds)) + assert len(ds) <= 500 diff --git a/tests/test_openeo_gfmap/test_s1_fetchers.py b/tests/test_openeo_gfmap/test_s1_fetchers.py index bf1c7d9..d9cde85 100644 --- a/tests/test_openeo_gfmap/test_s1_fetchers.py +++ b/tests/test_openeo_gfmap/test_s1_fetchers.py @@ -46,8 +46,8 @@ def sentinel1_grd( ): context = BackendContext(backend) country = spatial_extent["country"] - bands = ["S1-VV", "S1-VH"] - expected_harmonized_bands = ["S1-VV", "S1-VH"] + bands = ["S1-SIGMA0-VV", "S1-SIGMA0-VH"] + expected_harmonized_bands = ["S1-SIGMA0-VV", "S1-SIGMA0-VH"] fetching_parameters = { "target_resolution": 10.0, @@ -152,7 +152,7 @@ def sentinel1_grd_point_based( given polygons. """ context = BackendContext(backend) - bands = ["S1-VV", "S1-VH"] + bands = ["S1-SIGMA0-VV", "S1-SIGMA0-VH"] # Because it is tested in malawi, and this is the EPSG code for # the UTM projection in that zone @@ -203,7 +203,7 @@ def sentinel1_grd_polygon_based( connection: openeo.Connection, ): context = BackendContext(backend) - bands = ["S1-VV", "S1-VH"] + bands = ["S1-SIGMA0-VV", "S1-SIGMA0-VH"] fetching_parameters = { "target_crs": 3035, # Location in Europe diff --git a/tests/test_openeo_gfmap/test_s2_fetchers.py b/tests/test_openeo_gfmap/test_s2_fetchers.py index c6bbfe7..0e18672 100644 --- a/tests/test_openeo_gfmap/test_s2_fetchers.py +++ b/tests/test_openeo_gfmap/test_s2_fetchers.py @@ -86,14 +86,21 @@ def sentinel2_l2a( country = spatial_extent["country"] # Fetch a variety of spatial resolution and metadata from different # providers. - bands = ["S2-B01", "S2-B04", "S2-B08", "S2-B11", "S2-SCL", "S2-AOT"] + bands = [ + "S2-L2A-B01", + "S2-L2A-B04", + "S2-L2A-B08", + "S2-L2A-B11", + "S2-L2A-SCL", + "S2-L2A-AOT", + ] expected_harmonized_names = [ - "S2-B01", - "S2-B04", - "S2-B08", - "S2-B11", - "S2-SCL", - "S2-AOT", + "S2-L2A-B01", + "S2-L2A-B04", + "S2-L2A-B08", + "S2-L2A-B11", + "S2-L2A-SCL", + "S2-L2A-AOT", ] fetching_parameters = ( {"target_resolution": 10.0, "target_crs": 3035} if country == "Belgium" else {} @@ -190,7 +197,7 @@ def sentinel2_l2a_point_based( given polygons. """ context = BackendContext(backend) - bands = ["S2-B01", "S2-B04", "S2-B08", "S2-B11"] + bands = ["S2-L2A-B01", "S2-L2A-B04", "S2-L2A-B08", "S2-L2A-B11"] # Because it it tested in malawi, and this is the EPSG code for the # UTM projection for that zone @@ -234,7 +241,7 @@ def sentinel2_l2a_polygon_based( connection: openeo.Connection, ): context = BackendContext(backend) - bands = ["S2-B02", "S2-B03", "S2-B04"] + bands = ["S2-L2A-B02", "S2-L2A-B03", "S2-L2A-B04"] fetching_parameters = {"target_crs": 3035} # Location in Europe extractor = build_sentinel2_l2a_extractor(