Skip to content

Commit

Permalink
changed patch and point extraction naming conventions #118
Browse files Browse the repository at this point in the history
  • Loading branch information
VincentVerelst committed Nov 6, 2024
1 parent cf61a6a commit 19f728b
Show file tree
Hide file tree
Showing 8 changed files with 98 additions and 94 deletions.
112 changes: 57 additions & 55 deletions scripts/extractions/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,39 +16,39 @@
from openeo_gfmap.backend import cdse_connection
from openeo_gfmap.manager.job_manager import GFMAPJobManager
from openeo_gfmap.manager.job_splitters import load_s2_grid, split_job_s2grid
from patch_extractions.extract_meteo import (
create_datacube_meteo,
create_job_dataframe_meteo,
from patch_extractions.extract_patch_meteo import (
create_job_dataframe_patch_meteo,
create_job_patch_meteo,
)
from patch_extractions.extract_optical import (
create_datacube_optical,
create_job_dataframe_s2,
from patch_extractions.extract_patch_s2 import (
create_job_dataframe_patch_s2,
create_job_patch_s2,
)
from point_extractions.extract_point import (
create_datacube_point,
create_job_dataframe_point,
generate_output_path_point,
post_job_action_point,
from point_extractions.extract_point_worldcereal import (
create_job_dataframe_point_worldcereal,
create_job_point_worldcereal,
generate_output_path_point_worldcereal,
post_job_action_point_worldcereal,
)

from worldcereal.openeo.extract import (
generate_output_path,
generate_output_path_patch,
pipeline_log,
post_job_action,
post_job_action_patch,
)
from worldcereal.stac.constants import ExtractionCollection

from patch_extractions.extract_sar import ( # isort: skip
create_datacube_sar,
create_job_dataframe_s1,
from patch_extractions.extract_patch_s1 import ( # isort: skip
create_job_patch_s1,
create_job_dataframe_patch_s1,
)


from patch_extractions.extract_worldcereal import ( # isort: skip
create_datacube_worldcereal,
create_job_dataframe_worldcereal,
post_job_action_worldcereal,
generate_output_path_worldcereal,
from patch_extractions.extract_patch_worldcereal import ( # isort: skip
create_job_patch_worldcereal,
create_job_dataframe_patch_worldcereal,
post_job_action_patch_worldcereal,
generate_output_path_patch_worldcereal,
)


Expand Down Expand Up @@ -121,11 +121,11 @@ def prepare_job_dataframe(

pipeline_log.info("Dataframes split to jobs, creating the job dataframe...")
collection_switch: dict[ExtractionCollection, typing.Callable] = {
ExtractionCollection.SENTINEL1: create_job_dataframe_s1,
ExtractionCollection.SENTINEL2: create_job_dataframe_s2,
ExtractionCollection.METEO: create_job_dataframe_meteo,
ExtractionCollection.WORLDCEREAL: create_job_dataframe_worldcereal,
ExtractionCollection.POINT: create_job_dataframe_point,
ExtractionCollection.PATCH_SENTINEL1: create_job_dataframe_patch_s1,
ExtractionCollection.PATCH_SENTINEL2: create_job_dataframe_patch_s2,
ExtractionCollection.PATCH_METEO: create_job_dataframe_patch_meteo,
ExtractionCollection.PATCH_WORLDCEREAL: create_job_dataframe_patch_worldcereal,
ExtractionCollection.POINT_WORLDCEREAL: create_job_dataframe_point_worldcereal,
}

create_job_dataframe_fn = collection_switch.get(
Expand Down Expand Up @@ -156,32 +156,32 @@ def setup_extraction_functions(
"""

datacube_creation = {
ExtractionCollection.SENTINEL1: partial(
create_datacube_sar,
ExtractionCollection.PATCH_SENTINEL1: partial(
create_job_patch_s1,
executor_memory=memory,
python_memory=python_memory,
max_executors=max_executors,
),
ExtractionCollection.SENTINEL2: partial(
create_datacube_optical,
ExtractionCollection.PATCH_SENTINEL2: partial(
create_job_patch_s2,
executor_memory=memory,
python_memory=python_memory,
max_executors=max_executors,
),
ExtractionCollection.METEO: partial(
create_datacube_meteo,
ExtractionCollection.PATCH_METEO: partial(
create_job_patch_meteo,
executor_memory=memory,
python_memory=python_memory,
max_executors=max_executors,
),
ExtractionCollection.WORLDCEREAL: partial(
create_datacube_worldcereal,
ExtractionCollection.PATCH_WORLDCEREAL: partial(
create_job_patch_worldcereal,
executor_memory=memory,
python_memory=python_memory,
max_executors=max_executors,
),
ExtractionCollection.POINT: partial(
create_datacube_point,
ExtractionCollection.POINT_WORLDCEREAL: partial(
create_job_point_worldcereal,
executor_memory=memory,
python_memory=python_memory,
max_executors=max_executors,
Expand All @@ -196,19 +196,21 @@ def setup_extraction_functions(
)

path_fns = {
ExtractionCollection.SENTINEL1: partial(
generate_output_path, s2_grid=load_s2_grid()
ExtractionCollection.PATCH_SENTINEL1: partial(
generate_output_path_patch, s2_grid=load_s2_grid()
),
ExtractionCollection.SENTINEL2: partial(
generate_output_path, s2_grid=load_s2_grid()
ExtractionCollection.PATCH_SENTINEL2: partial(
generate_output_path_patch, s2_grid=load_s2_grid()
),
ExtractionCollection.METEO: partial(
generate_output_path, s2_grid=load_s2_grid()
ExtractionCollection.PATCH_METEO: partial(
generate_output_path_patch, s2_grid=load_s2_grid()
),
ExtractionCollection.WORLDCEREAL: partial(
generate_output_path_worldcereal, s2_grid=load_s2_grid()
ExtractionCollection.PATCH_WORLDCEREAL: partial(
generate_output_path_patch_worldcereal, s2_grid=load_s2_grid()
),
ExtractionCollection.POINT_WORLDCEREAL: partial(
generate_output_path_point_worldcereal
),
ExtractionCollection.POINT: partial(generate_output_path_point),
}

path_fn = path_fns.get(
Expand All @@ -219,37 +221,37 @@ def setup_extraction_functions(
)

post_job_actions = {
ExtractionCollection.SENTINEL1: partial(
post_job_action,
ExtractionCollection.PATCH_SENTINEL1: partial(
post_job_action_patch,
extract_value=extract_value,
description="Sentinel1 GRD raw observations, unprocessed.",
title="Sentinel-1 GRD",
spatial_resolution="20m",
s1_orbit_fix=True,
),
ExtractionCollection.SENTINEL2: partial(
post_job_action,
ExtractionCollection.PATCH_SENTINEL2: partial(
post_job_action_patch,
extract_value=extract_value,
description="Sentinel2 L2A observations, processed.",
title="Sentinel-2 L2A",
spatial_resolution="10m",
),
ExtractionCollection.METEO: partial(
post_job_action,
ExtractionCollection.PATCH_METEO: partial(
post_job_action_patch,
extract_value=extract_value,
description="Meteo observations",
title="Meteo observations",
spatial_resolution="1deg",
),
ExtractionCollection.WORLDCEREAL: partial(
post_job_action_worldcereal,
ExtractionCollection.PATCH_WORLDCEREAL: partial(
post_job_action_patch_worldcereal,
extract_value=extract_value,
description="WorldCereal preprocessed inputs",
title="WorldCereal inputs",
spatial_resolution="10m",
),
ExtractionCollection.POINT: partial(
post_job_action_point,
ExtractionCollection.POINT_WORLDCEREAL: partial(
post_job_action_point_worldcereal,
),
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
) # isort: skip


def create_job_dataframe_meteo(
def create_job_dataframe_patch_meteo(
backend: Backend, split_jobs: List[gpd.GeoDataFrame]
) -> pd.DataFrame:
raise NotImplementedError("This function is not implemented yet.")


def create_datacube_meteo(
def create_job_patch_meteo(
row: pd.Series,
connection: openeo.DataCube,
provider=None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
S1_GRD_CATALOGUE_BEGIN_DATE = datetime(2014, 10, 1)


def create_job_dataframe_s1(
def create_job_dataframe_patch_s1(
backend: Backend,
split_jobs: List[gpd.GeoDataFrame],
) -> pd.DataFrame:
Expand Down Expand Up @@ -117,7 +117,7 @@ def create_job_dataframe_s1(
return pd.DataFrame(rows)


def create_datacube_sar(
def create_job_patch_s1(
row: pd.Series,
connection: openeo.DataCube,
provider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
S2_L2A_CATALOGUE_BEGIN_DATE = datetime(2017, 1, 1)


def create_job_dataframe_s2(
def create_job_dataframe_patch_s2(
backend: Backend,
split_jobs: List[gpd.GeoDataFrame],
) -> pd.DataFrame:
Expand Down Expand Up @@ -67,7 +67,7 @@ def create_job_dataframe_s2(
return pd.DataFrame(rows)


def create_datacube_optical(
def create_job_patch_s2(
row: pd.Series,
connection: openeo.DataCube,
provider=None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
WORLDCEREAL_BEGIN_DATE = datetime(2017, 1, 1)


def create_job_dataframe_worldcereal(
def create_job_dataframe_patch_worldcereal(
backend: Backend,
split_jobs: List[gpd.GeoDataFrame],
) -> pd.DataFrame:
Expand Down Expand Up @@ -119,7 +119,7 @@ def create_job_dataframe_worldcereal(
return pd.DataFrame(rows)


def create_datacube_worldcereal(
def create_job_patch_worldcereal(
row: pd.Series,
connection: openeo.DataCube,
provider,
Expand Down Expand Up @@ -333,7 +333,7 @@ def postprocess_extracted_file(
shutil.move(tempfile, item_asset_path)


def post_job_action_worldcereal(
def post_job_action_patch_worldcereal(
job_items: List[pystac.Item],
row: pd.Series,
extract_value: int,
Expand Down Expand Up @@ -397,7 +397,7 @@ def post_job_action_worldcereal(
return job_items


def generate_output_path_worldcereal(
def generate_output_path_patch_worldcereal(
root_folder: Path, geometry_index: int, row: pd.Series, s2_grid: gpd.GeoDataFrame
):
"""Generate the output path for the extracted data, from a base path and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
)


def generate_output_path_point(root_folder: Path, geometry_index: int, row: pd.Series):
def generate_output_path_point_worldcereal(
root_folder: Path, geometry_index: int, row: pd.Series
):
"""
For point extractions, only one asset (a geoparquet file) is generated per job.
Therefore geometry_index is always 0.
Expand All @@ -45,7 +47,7 @@ def generate_output_path_point(root_folder: Path, geometry_index: int, row: pd.S
return real_subfolder / f"point_extractions{row.out_extension}"


def create_job_dataframe_point(
def create_job_dataframe_point_worldcereal(
backend: Backend, split_jobs: List[gpd.GeoDataFrame]
) -> pd.DataFrame:
"""Create a dataframe from the split jobs, containg all the necessary information to run the job."""
Expand Down Expand Up @@ -84,7 +86,7 @@ def create_job_dataframe_point(
return pd.DataFrame(rows)


def create_datacube_point(
def create_job_point_worldcereal(
row: pd.Series,
connection: openeo.DataCube,
provider,
Expand Down Expand Up @@ -144,7 +146,7 @@ def create_datacube_point(
)


def post_job_action_point(
def post_job_action_point_worldcereal(
job_items: List[pystac.Item], row: pd.Series, parameters: Optional[dict] = None
) -> list:
for idx, item in enumerate(job_items):
Expand Down
4 changes: 2 additions & 2 deletions src/worldcereal/openeo/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def filter(self, record):
stream_handler.addFilter(ManagerLoggerFilter())


def post_job_action(
def post_job_action_patch(
job_items: List[pystac.Item],
row: pd.Series,
extract_value: int,
Expand Down Expand Up @@ -131,7 +131,7 @@ def post_job_action(
return job_items


def generate_output_path(
def generate_output_path_patch(
root_folder: Path, geometry_index: int, row: pd.Series, s2_grid: gpd.GeoDataFrame
):
"""Generate the output path for the extracted data, from a base path and
Expand Down
Loading

0 comments on commit 19f728b

Please sign in to comment.