diff --git a/scripts/extractions/point_extractions/point_extractions.py b/scripts/extractions/point_extractions/point_extractions.py index 6ef61126..4398079f 100644 --- a/scripts/extractions/point_extractions/point_extractions.py +++ b/scripts/extractions/point_extractions/point_extractions.py @@ -16,7 +16,10 @@ from openeo_gfmap.manager.job_splitters import split_job_s2grid from openeo_gfmap.preprocessing import linear_interpolation, median_compositing -from worldcereal.openeo.preprocessing import raw_datacube_S2 +from worldcereal.openeo.preprocessing import ( + raw_datacube_S2, + worldcereal_preprocessed_inputs_gfmap, +) # Logger for this current pipeline pipeline_log: Optional[logging.Logger] = None @@ -127,8 +130,8 @@ def create_datacube( connection: openeo.DataCube, provider, connection_provider, - executor_memory: str = "5G", - executor_memory_overhead: str = "2G", + executor_memory: str = "3G", + executor_memory_overhead: str = "5G", ): """Creates an OpenEO BatchJob from the given row information.""" @@ -147,34 +150,16 @@ def create_datacube( backend = Backend(row.backend_name) backend_context = BackendContext(backend) - # TODO: Adjust this to the desired bands to download - bands_to_download = [ - "S2-L2A-B04", - "S2-L2A-B08", - ] - - fetch_type = FetchType.POINT - - cube = raw_datacube_S2( + inputs = worldcereal_preprocessed_inputs_gfmap( connection=connection, backend_context=backend_context, spatial_extent=geometry, temporal_extent=temporal_extent, - bands=bands_to_download, - fetch_type=fetch_type, - distance_to_cloud_flag=True, - additional_masks_flag=False, - apply_mask_flag=True, + fetch_type=FetchType.POINT, ) - # Create monthly median composites - cube = median_compositing(cube=cube, period="month") - - # Perform linear interpolation - cube = linear_interpolation(cube) - # Finally, create a vector cube based on the Point geometries - cube = cube.aggregate_spatial(geometries=geometry, reducer="mean") + cube = inputs.aggregate_spatial(geometries=geometry, reducer="mean") # Increase the memory of the jobs depending on the number of polygons to extract number_points = get_job_nb_points(row) @@ -183,6 +168,7 @@ def create_datacube( job_options = { "executor-memory": executor_memory, "executor-memoryOverhead": executor_memory_overhead, + "soft-error": True, } return cube.create_job( out_format="Parquet", @@ -204,7 +190,19 @@ def post_job_action( # Convert band dtype to uint16 (temporary fix) # TODO: remove this step when the issue is fixed on the OpenEO backend - bands = ["S2-L2A-B04", "S2-L2A-B08"] + bands = [ + "S2-L2A-B02", + "S2-L2A-B03", + "S2-L2A-B04", + "S2-L2A-B05", + "S2-L2A-B06", + "S2-L2A-B07", + "S2-L2A-B08", + "S2-L2A-B11", + "S2-L2A-B12", + "S1-SIGMA0-VH", + "S1-SIGMA0-VV", + ] gdf[bands] = gdf[bands].fillna(65535).astype("uint16") gdf.to_parquet(item_asset_path, index=False) @@ -238,7 +236,7 @@ def post_job_action( parser.add_argument( "--memory-overhead", type=str, - default="1G", + default="5G", help="Memory overhead to allocate for the executor.", )