Skip to content

Commit

Permalink
Merge pull request #216 from WorldCereal/118-consolidate-patch-point
Browse files Browse the repository at this point in the history
Consolidate extraction endpoints
  • Loading branch information
VincentVerelst authored Nov 13, 2024
2 parents ff5534f + be2ea78 commit 56b3e67
Show file tree
Hide file tree
Showing 13 changed files with 292 additions and 356 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,44 @@
import geopandas as gpd
import pandas as pd
import requests
from extract_common import generate_output_path, pipeline_log, post_job_action
from extract_meteo import create_datacube_meteo, create_job_dataframe_meteo
from extract_optical import create_datacube_optical, create_job_dataframe_s2
from openeo.rest import OpenEoApiError, OpenEoApiPlainError, OpenEoRestError
from openeo_gfmap import Backend
from openeo_gfmap.backend import cdse_connection
from openeo_gfmap.manager.job_manager import GFMAPJobManager
from openeo_gfmap.manager.job_splitters import load_s2_grid, split_job_s2grid
from patch_extractions.extract_patch_meteo import (
create_job_dataframe_patch_meteo,
create_job_patch_meteo,
)
from patch_extractions.extract_patch_s2 import (
create_job_dataframe_patch_s2,
create_job_patch_s2,
)
from point_extractions.extract_point_worldcereal import (
create_job_dataframe_point_worldcereal,
create_job_point_worldcereal,
generate_output_path_point_worldcereal,
post_job_action_point_worldcereal,
)

from worldcereal.openeo.extract import (
generate_output_path_patch,
pipeline_log,
post_job_action_patch,
)
from worldcereal.stac.constants import ExtractionCollection

from extract_sar import ( # isort: skip
create_datacube_sar,
create_job_dataframe_s1,
from patch_extractions.extract_patch_s1 import ( # isort: skip
create_job_patch_s1,
create_job_dataframe_patch_s1,
)

from extract_worldcereal import ( # isort: skip
create_datacube_worldcereal,
create_job_dataframe_worldcereal,
post_job_action_worldcereal,
generate_output_path_worldcereal,

from patch_extractions.extract_patch_worldcereal import ( # isort: skip
create_job_patch_worldcereal,
create_job_dataframe_patch_worldcereal,
post_job_action_patch_worldcereal,
generate_output_path_patch_worldcereal,
)


Expand Down Expand Up @@ -104,10 +121,11 @@ def prepare_job_dataframe(

pipeline_log.info("Dataframes split to jobs, creating the job dataframe...")
collection_switch: dict[ExtractionCollection, typing.Callable] = {
ExtractionCollection.SENTINEL1: create_job_dataframe_s1,
ExtractionCollection.SENTINEL2: create_job_dataframe_s2,
ExtractionCollection.METEO: create_job_dataframe_meteo,
ExtractionCollection.WORLDCEREAL: create_job_dataframe_worldcereal,
ExtractionCollection.PATCH_SENTINEL1: create_job_dataframe_patch_s1,
ExtractionCollection.PATCH_SENTINEL2: create_job_dataframe_patch_s2,
ExtractionCollection.PATCH_METEO: create_job_dataframe_patch_meteo,
ExtractionCollection.PATCH_WORLDCEREAL: create_job_dataframe_patch_worldcereal,
ExtractionCollection.POINT_WORLDCEREAL: create_job_dataframe_point_worldcereal,
}

create_job_dataframe_fn = collection_switch.get(
Expand Down Expand Up @@ -138,26 +156,32 @@ def setup_extraction_functions(
"""

datacube_creation = {
ExtractionCollection.SENTINEL1: partial(
create_datacube_sar,
ExtractionCollection.PATCH_SENTINEL1: partial(
create_job_patch_s1,
executor_memory=memory,
python_memory=python_memory,
max_executors=max_executors,
),
ExtractionCollection.SENTINEL2: partial(
create_datacube_optical,
ExtractionCollection.PATCH_SENTINEL2: partial(
create_job_patch_s2,
executor_memory=memory,
python_memory=python_memory,
max_executors=max_executors,
),
ExtractionCollection.METEO: partial(
create_datacube_meteo,
ExtractionCollection.PATCH_METEO: partial(
create_job_patch_meteo,
executor_memory=memory,
python_memory=python_memory,
max_executors=max_executors,
),
ExtractionCollection.WORLDCEREAL: partial(
create_datacube_worldcereal,
ExtractionCollection.PATCH_WORLDCEREAL: partial(
create_job_patch_worldcereal,
executor_memory=memory,
python_memory=python_memory,
max_executors=max_executors,
),
ExtractionCollection.POINT_WORLDCEREAL: partial(
create_job_point_worldcereal,
executor_memory=memory,
python_memory=python_memory,
max_executors=max_executors,
Expand All @@ -172,17 +196,20 @@ def setup_extraction_functions(
)

path_fns = {
ExtractionCollection.SENTINEL1: partial(
generate_output_path, s2_grid=load_s2_grid()
ExtractionCollection.PATCH_SENTINEL1: partial(
generate_output_path_patch, s2_grid=load_s2_grid()
),
ExtractionCollection.SENTINEL2: partial(
generate_output_path, s2_grid=load_s2_grid()
ExtractionCollection.PATCH_SENTINEL2: partial(
generate_output_path_patch, s2_grid=load_s2_grid()
),
ExtractionCollection.METEO: partial(
generate_output_path, s2_grid=load_s2_grid()
ExtractionCollection.PATCH_METEO: partial(
generate_output_path_patch, s2_grid=load_s2_grid()
),
ExtractionCollection.WORLDCEREAL: partial(
generate_output_path_worldcereal, s2_grid=load_s2_grid()
ExtractionCollection.PATCH_WORLDCEREAL: partial(
generate_output_path_patch_worldcereal, s2_grid=load_s2_grid()
),
ExtractionCollection.POINT_WORLDCEREAL: partial(
generate_output_path_point_worldcereal
),
}

Expand All @@ -194,35 +221,38 @@ def setup_extraction_functions(
)

post_job_actions = {
ExtractionCollection.SENTINEL1: partial(
post_job_action,
ExtractionCollection.PATCH_SENTINEL1: partial(
post_job_action_patch,
extract_value=extract_value,
description="Sentinel1 GRD raw observations, unprocessed.",
title="Sentinel-1 GRD",
spatial_resolution="20m",
s1_orbit_fix=True,
),
ExtractionCollection.SENTINEL2: partial(
post_job_action,
ExtractionCollection.PATCH_SENTINEL2: partial(
post_job_action_patch,
extract_value=extract_value,
description="Sentinel2 L2A observations, processed.",
title="Sentinel-2 L2A",
spatial_resolution="10m",
),
ExtractionCollection.METEO: partial(
post_job_action,
ExtractionCollection.PATCH_METEO: partial(
post_job_action_patch,
extract_value=extract_value,
description="Meteo observations",
title="Meteo observations",
spatial_resolution="1deg",
),
ExtractionCollection.WORLDCEREAL: partial(
post_job_action_worldcereal,
ExtractionCollection.PATCH_WORLDCEREAL: partial(
post_job_action_patch_worldcereal,
extract_value=extract_value,
description="WorldCereal preprocessed inputs",
title="WorldCereal inputs",
spatial_resolution="10m",
),
ExtractionCollection.POINT_WORLDCEREAL: partial(
post_job_action_point_worldcereal,
),
}

post_job_fn = post_job_actions.get(
Expand Down
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,20 @@
import pandas as pd
from openeo_gfmap import Backend, TemporalContext

from extract_common import ( # isort: skip
from worldcereal.openeo.extract import ( # isort: skip
buffer_geometry, # isort: skip
filter_extract_true, # isort: skip
upload_geoparquet_artifactory, # isort: skip
) # isort: skip


def create_job_dataframe_meteo(
def create_job_dataframe_patch_meteo(
backend: Backend, split_jobs: List[gpd.GeoDataFrame]
) -> pd.DataFrame:
raise NotImplementedError("This function is not implemented yet.")


def create_datacube_meteo(
def create_job_patch_meteo(
row: pd.Series,
connection: openeo.DataCube,
provider=None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from worldcereal.openeo.preprocessing import raw_datacube_S1

from extract_common import ( # isort: skip
from worldcereal.openeo.extract import ( # isort: skip
buffer_geometry, # isort: skip
get_job_nb_polygons, # isort: skip
pipeline_log, # isort: skip
Expand All @@ -30,7 +30,7 @@
S1_GRD_CATALOGUE_BEGIN_DATE = datetime(2014, 10, 1)


def create_job_dataframe_s1(
def create_job_dataframe_patch_s1(
backend: Backend,
split_jobs: List[gpd.GeoDataFrame],
) -> pd.DataFrame:
Expand Down Expand Up @@ -117,7 +117,7 @@ def create_job_dataframe_s1(
return pd.DataFrame(rows)


def create_datacube_sar(
def create_job_patch_s1(
row: pd.Series,
connection: openeo.DataCube,
provider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from worldcereal.openeo.preprocessing import raw_datacube_S2

from extract_common import ( # isort: skip
from worldcereal.openeo.extract import ( # isort: skip
buffer_geometry, # isort: skip
get_job_nb_polygons, # isort: skip
upload_geoparquet_artifactory, # isort: skip
Expand All @@ -23,7 +23,7 @@
S2_L2A_CATALOGUE_BEGIN_DATE = datetime(2017, 1, 1)


def create_job_dataframe_s2(
def create_job_dataframe_patch_s2(
backend: Backend,
split_jobs: List[gpd.GeoDataFrame],
) -> pd.DataFrame:
Expand All @@ -41,7 +41,7 @@ def create_job_dataframe_s2(
# start_date = max(start_date, S2_L2A_CATALOGUE_BEGIN_DATE)
# end_date = min(end_date, datetime.now())

s2_tile = job.tile.iloc[0] # Job dataframes are split depending on the
s2_tile = job.tile.iloc[0]
h3_l3_cell = job.h3_l3_cell.iloc[0]

# Convert dates to string format
Expand All @@ -67,7 +67,7 @@ def create_job_dataframe_s2(
return pd.DataFrame(rows)


def create_datacube_optical(
def create_job_patch_s2(
row: pd.Series,
connection: openeo.DataCube,
provider=None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
)
from worldcereal.utils.geoloader import load_reproject

from extract_common import ( # isort: skip
from worldcereal.openeo.extract import ( # isort: skip
get_job_nb_polygons, # isort: skip
pipeline_log, # isort: skip
upload_geoparquet_artifactory, # isort: skip
Expand All @@ -43,7 +43,7 @@
WORLDCEREAL_BEGIN_DATE = datetime(2017, 1, 1)


def create_job_dataframe_worldcereal(
def create_job_dataframe_patch_worldcereal(
backend: Backend,
split_jobs: List[gpd.GeoDataFrame],
) -> pd.DataFrame:
Expand Down Expand Up @@ -119,7 +119,7 @@ def create_job_dataframe_worldcereal(
return pd.DataFrame(rows)


def create_datacube_worldcereal(
def create_job_patch_worldcereal(
row: pd.Series,
connection: openeo.DataCube,
provider,
Expand Down Expand Up @@ -333,7 +333,7 @@ def postprocess_extracted_file(
shutil.move(tempfile, item_asset_path)


def post_job_action_worldcereal(
def post_job_action_patch_worldcereal(
job_items: List[pystac.Item],
row: pd.Series,
extract_value: int,
Expand Down Expand Up @@ -397,7 +397,7 @@ def post_job_action_worldcereal(
return job_items


def generate_output_path_worldcereal(
def generate_output_path_patch_worldcereal(
root_folder: Path, geometry_index: int, row: pd.Series, s2_grid: gpd.GeoDataFrame
):
"""Generate the output path for the extracted data, from a base path and
Expand Down
Empty file.
Loading

0 comments on commit 56b3e67

Please sign in to comment.