Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Point extractions sentinel 2 #58

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
bbe6dc2
added an example script for sentinel2 timeseries point extractions
VincentVerelst May 28, 2024
10bf8af
formatting of the point_extractions script
VincentVerelst May 28, 2024
53adf25
adjusted raw_datacube_s2 to handle point extractions as well
VincentVerelst May 29, 2024
733dd7f
removed masked_cube function
VincentVerelst May 29, 2024
da9e31c
added an example script for sentinel2 timeseries point extractions
VincentVerelst May 28, 2024
575c377
formatting of the point_extractions script
VincentVerelst May 28, 2024
bf58f68
adjusted raw_datacube_s2 to handle point extractions as well
VincentVerelst May 29, 2024
92fb911
removed masked_cube function
VincentVerelst May 29, 2024
0ebf058
changed point extraction example to extract EVI as example
VincentVerelst Jun 4, 2024
28bc5b7
changed point extractions script to include a time dimension in the r…
VincentVerelst Jun 6, 2024
a5fa384
small adjustments to raw_datacube_s2
VincentVerelst Jun 11, 2024
dcb95e9
adjusted raw_datacube_s2 to also handle point extractions
VincentVerelst Jun 12, 2024
f20e64e
removed unnecessary imports
VincentVerelst Jun 12, 2024
8d624b5
small changes to point extractions to handle new format of reference …
VincentVerelst Jun 13, 2024
0cddbf1
added post_job_action to the point extractions to change the dtype of…
VincentVerelst Jun 13, 2024
33eabe3
Merge branch 'vv_point_extractions' of https://github.com/WorldCereal…
jdegerickx Jun 19, 2024
286bbc0
changed the preprocessing of the point extractions to match the prepr…
VincentVerelst Jun 20, 2024
aa45245
changed preprossing to also handle point extractions
VincentVerelst Jun 20, 2024
a3c1f97
formatting
VincentVerelst Jun 20, 2024
52d8be0
removed unnecessary imports
VincentVerelst Jun 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,9 @@ def create_datacube_optical(
bands_to_download,
FetchType.POLYGON,
filter_tile=s2_tile,
apply_mask=False,
additional_masks=True,
distance_to_cloud_flag=True,
apply_mask_flag=False,
additional_masks_flag=True,
)

# Increase the memory of the jobs depending on the number of polygons to extract
Expand Down
281 changes: 281 additions & 0 deletions scripts/extractions/point_extractions/point_extractions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,281 @@
"""Extract point data using OpenEO-GFMAP package."""
import argparse
import logging
from functools import partial
from pathlib import Path
from typing import List, Optional

import geojson
import geopandas as gpd
import openeo
import pandas as pd
import pystac
from openeo_gfmap import Backend, BackendContext, FetchType, TemporalContext
from openeo_gfmap.backend import cdse_connection
from openeo_gfmap.manager.job_manager import GFMAPJobManager
from openeo_gfmap.manager.job_splitters import split_job_s2grid
from openeo_gfmap.preprocessing import linear_interpolation, median_compositing

from worldcereal.openeo.preprocessing import raw_datacube_S2

# Logger for this current pipeline
pipeline_log: Optional[logging.Logger] = None


def setup_logger(level=logging.INFO) -> None:
"""Setup the logger from the openeo_gfmap package to the assigned level."""
global pipeline_log
pipeline_log = logging.getLogger("pipeline_sar")

pipeline_log.setLevel(level)

stream_handler = logging.StreamHandler()
pipeline_log.addHandler(stream_handler)

formatter = logging.Formatter("%(asctime)s|%(name)s|%(levelname)s: %(message)s")
stream_handler.setFormatter(formatter)

# Exclude the other loggers from other libraries
class ManagerLoggerFilter(logging.Filter):
"""Filter to only accept the OpenEO-GFMAP manager logs."""

def filter(self, record):
return record.name in [pipeline_log.name]

stream_handler.addFilter(ManagerLoggerFilter())


def filter_extract_true(
geometries: geojson.FeatureCollection,
) -> geojson.FeatureCollection:
"""Remove all the geometries from the Feature Collection that have the property field `extract` set to `False`"""
return geojson.FeatureCollection(
[f for f in geometries.features if f.properties.get("extract", 0) == 1]
)


def get_job_nb_points(row: pd.Series) -> int:
"""Get the number of points in the geometry."""
return len(
list(
filter(
lambda feat: feat.properties.get("extract"),
geojson.loads(row.geometry)["features"],
)
)
)


# TODO: this is an example output_path. Adjust this function to your needs for production.
def generate_output_path(root_folder: Path, geometry_index: int, row: pd.Series):
features = geojson.loads(row.geometry)
sample_id = features[geometry_index].properties.get("sample_id", None)
if sample_id is None:
sample_id = features[geometry_index].properties["sampleID"]

s2_tile_id = row.s2_tile

subfolder = root_folder / s2_tile_id
return subfolder / f"{sample_id}{row.out_extension}"


def create_job_dataframe(
backend: Backend, split_jobs: List[gpd.GeoDataFrame]
) -> pd.DataFrame:
"""Create a dataframe from the split jobs, containg all the necessary information to run the job."""
columns = [
"backend_name",
"out_extension",
"start_date",
"end_date",
"s2_tile",
"geometry",
]
rows = []
for job in split_jobs:
# Compute the average in the valid date and make a buffer of 1.5 year around
median_time = pd.to_datetime(job.valid_time).mean()
start_date = median_time - pd.Timedelta(days=275) # A bit more than 9 months
end_date = median_time + pd.Timedelta(days=275) # A bit more than 9 months
s2_tile = job.tile.iloc[0]
rows.append(
pd.Series(
dict(
zip(
columns,
[
backend.value,
".parquet",
start_date.strftime("%Y-%m-%d"),
end_date.strftime("%Y-%m-%d"),
s2_tile,
job.to_json(),
],
)
)
)
)

return pd.DataFrame(rows)


def create_datacube(
row: pd.Series,
connection: openeo.DataCube,
provider,
connection_provider,
executor_memory: str = "5G",
executor_memory_overhead: str = "2G",
):
"""Creates an OpenEO BatchJob from the given row information."""

# Load the temporal and spatial extent
temporal_extent = TemporalContext(row.start_date, row.end_date)

# Get the feature collection containing the geometry to the job
geometry = geojson.loads(row.geometry)
assert isinstance(geometry, geojson.FeatureCollection)

# Filter the geometry to the rows with the extract only flag
geometry = filter_extract_true(geometry)
assert len(geometry.features) > 0, "No geometries with the extract flag found"

# Backend name and fetching type
backend = Backend(row.backend_name)
backend_context = BackendContext(backend)

# TODO: Adjust this to the desired bands to download
bands_to_download = [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will need to expand to S1 and meteo too. Has to reflect the inputs required by the models.

"S2-L2A-B04",
"S2-L2A-B08",
]

fetch_type = FetchType.POINT

cube = raw_datacube_S2(
connection=connection,
backend_context=backend_context,
spatial_extent=geometry,
temporal_extent=temporal_extent,
bands=bands_to_download,
fetch_type=fetch_type,
distance_to_cloud_flag=True,
additional_masks_flag=False,
apply_mask_flag=True,
)

# Create monthly median composites
cube = median_compositing(cube=cube, period="month")

# Perform linear interpolation
cube = linear_interpolation(cube)
VincentVerelst marked this conversation as resolved.
Show resolved Hide resolved

# Finally, create a vector cube based on the Point geometries
cube = cube.aggregate_spatial(geometries=geometry, reducer="mean")

# Increase the memory of the jobs depending on the number of polygons to extract
number_points = get_job_nb_points(row)
pipeline_log.debug("Number of polygons to extract %s", number_points)

job_options = {
"executor-memory": executor_memory,
"executor-memoryOverhead": executor_memory_overhead,
}
return cube.create_job(
out_format="Parquet",
title=f"GFMAP_Feature_Extraction_S2_{row.s2_tile}",
job_options=job_options,
)


def post_job_action(
job_items: List[pystac.Item], row: pd.Series, parameters: dict = None
) -> list:
for idx, item in enumerate(job_items):
item_asset_path = Path(list(item.assets.values())[0].href)

gdf = gpd.read_parquet(item_asset_path)

# Convert the dates to datetime format
gdf["date"] = pd.to_datetime(gdf["date"])

# Convert band dtype to uint16 (temporary fix)
# TODO: remove this step when the issue is fixed on the OpenEO backend
bands = ["S2-L2A-B04", "S2-L2A-B08"]
gdf[bands] = gdf[bands].fillna(65535).astype("uint16")

gdf.to_parquet(item_asset_path, index=False)

return job_items


if __name__ == "__main__":
setup_logger()

parser = argparse.ArgumentParser(
description="S2 point extractions with OpenEO-GFMAP package."
)
parser.add_argument(
"output_path", type=Path, help="Path where to save the extraction results."
)

# TODO: get the reference data from the RDM API.
parser.add_argument(
"input_df", type=str, help="Path to the input dataframe for the training data."
)
parser.add_argument(
"--max_locations",
type=int,
default=500,
help="Maximum number of locations to extract per job.",
)
parser.add_argument(
"--memory", type=str, default="3G", help="Memory to allocate for the executor."
)
parser.add_argument(
"--memory-overhead",
type=str,
default="1G",
help="Memory overhead to allocate for the executor.",
)

args = parser.parse_args()

tracking_df_path = Path(args.output_path) / "job_tracking.csv"

# Load the input dataframe, and perform dataset splitting using the h3 tile
# to respect the area of interest. Also filters out the jobs that have
# no location with the extract=True flag.
pipeline_log.info("Loading input dataframe from %s.", args.input_df)

input_df = gpd.read_parquet(args.input_df)

split_dfs = split_job_s2grid(input_df, max_points=args.max_locations)
split_dfs = [df for df in split_dfs if df.extract.any()]

job_df = create_job_dataframe(Backend.CDSE, split_dfs).iloc[
[2]
] # TODO: remove iloc

# Setup the memory parameters for the job creator.
create_datacube = partial(
create_datacube,
executor_memory=args.memory,
executor_memory_overhead=args.memory_overhead,
)

manager = GFMAPJobManager(
output_dir=args.output_path,
output_path_generator=generate_output_path,
post_job_action=post_job_action,
collection_id="SENTINEL2-POINT-FEATURE-EXTRACTION",
collection_description="Sentinel-2 basic point feature extraction.",
poll_sleep=60,
n_threads=2,
post_job_params={},
)

manager.add_backend(Backend.CDSE.value, cdse_connection, parallel_jobs=2)

pipeline_log.info("Launching the jobs from the manager.")
manager.run_jobs(job_df, create_datacube, tracking_df_path)
40 changes: 28 additions & 12 deletions src/worldcereal/openeo/preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ def raw_datacube_S2(
bands: List[str],
fetch_type: FetchType,
filter_tile: Optional[str] = None,
additional_masks: bool = True,
apply_mask: bool = False,
distance_to_cloud_flag: Optional[bool] = True,
additional_masks_flag: Optional[bool] = True,
apply_mask_flag: Optional[bool] = False,
) -> DataCube:
"""Extract Sentinel-2 datacube from OpenEO using GFMAP routines.
Raw data is extracted with no cloud masking applied by default (can be
Expand All @@ -55,7 +56,12 @@ def raw_datacube_S2(
filter_tile : Optional[str], optional
Filter by tile ID, by default disabled. This forces the process to only
one tile ID from the Sentinel-2 collection.
apply_mask : bool, optional
distance_to_cloud_flag : Optional[bool], optional
Compute the distance to cloud, by default True.
additional_masks_flag : bool, optional
Add the additional masks to the cube, by default True. This includes the
distance to cloud and the SCL dilation mask.
apply_mask_flag : bool, optional
Apply cloud masking, by default False. Can be enabled for high
optimization of memory usage.
"""
Expand Down Expand Up @@ -95,7 +101,9 @@ def raw_datacube_S2(
erosion_kernel_size=3,
).rename_labels("bands", ["S2-L2A-SCL_DILATED_MASK"])

if additional_masks:
additional_masks = scl_dilated_mask

if distance_to_cloud_flag:
# Compute the distance to cloud and add it to the cube
distance_to_cloud = scl_cube.apply_neighborhood(
process=UDF.from_file(Path(__file__).parent / "udf_distance_to_cloud.py"),
Expand All @@ -111,20 +119,27 @@ def raw_datacube_S2(
).rename_labels("bands", ["S2-L2A-DISTANCE-TO-CLOUD"])

additional_masks = scl_dilated_mask.merge_cubes(distance_to_cloud)
additional_masks = scl_dilated_mask.merge_cubes(distance_to_cloud)

# Try filtering using the geometry
if fetch_type == FetchType.TILE:
additional_masks = additional_masks.filter_spatial(
spatial_extent.to_geojson()
)
# Try filtering using the geometry
if fetch_type == FetchType.TILE:
additional_masks = additional_masks.filter_spatial(spatial_extent.to_geojson())

# Create the job to extract S2
extraction_parameters = {
"target_resolution": None, # Disable target resolution
"load_collection": {
"eo:cloud_cover": lambda val: val <= 95.0,
},
}
if additional_masks_flag:
extraction_parameters["pre_merge"] = additional_masks

if filter_tile:
extraction_parameters["load_collection"]["tileId"] = (
lambda val: val == filter_tile
)
if apply_mask:
if apply_mask_flag:
extraction_parameters["pre_mask"] = scl_dilated_mask

extractor = build_sentinel2_l2a_extractor(
Expand Down Expand Up @@ -263,8 +278,9 @@ def worldcereal_preprocessed_inputs_gfmap(
],
fetch_type=FetchType.TILE,
filter_tile=False,
additional_masks=False,
apply_mask=True,
distance_to_cloud_flag=True,
additional_masks_flag=False,
apply_mask_flag=True,
)

s2_data = median_compositing(s2_data, period="month")
Expand Down
Loading