Skip to content

Commit

Permalink
Merge branch 'feature/zarr-output' into 'master'
Browse files Browse the repository at this point in the history
zarr and netcdf support, split multitemporal gtiff to separate gtiffs with each only containing 1 time point

See merge request team-6/openeo-sentinelhub-python-driver!315
  • Loading branch information
zcernigoj committed Oct 24, 2023
2 parents cc434e4 + 249d3ac commit ea32438
Show file tree
Hide file tree
Showing 13 changed files with 1,262 additions and 626 deletions.
8 changes: 6 additions & 2 deletions rest/Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,13 @@ openeo-pg-parser = {editable = true,git = "https://github.com/Ardweaden/openeo-p
requests = "==2.27.1"
flask-cors = "==3.0.10"
honeycomb-beeline = "==3.2.0"
pg-to-evalscript = "==0.2.6"
pg-to-evalscript = "==0.2.8"
sentinelhub = "==3.6.3"
isodate = "==0.6.1"
pyjwt = {extras = ["crypto"], version = "==2.3.0"}
cryptography = "==3.4.8"
cryptography = "==36.0.1"
placebo = "==0.9.0"
rioxarray = "*"
zarr = "*"
netcdf4 = "*"
pip-tools = "==6.1.0"
1,599 changes: 979 additions & 620 deletions rest/Pipfile.lock

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions rest/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
get_batch_job_status,
create_or_get_estimate_values_from_db,
)
from post_processing.post_processing import parse_sh_gtiff_to_format
from processing.utils import inject_variables_in_process_graph, overwrite_spatial_extent_without_parameters
from processing.openeo_process_errors import OpenEOProcessError
from authentication.authentication import authentication_provider
Expand Down Expand Up @@ -593,6 +594,13 @@ def add_job_to_queue(job_id):
metadata_filename = "metadata.json"
bucket.put_file_to_bucket("", prefix=job["batch_request_id"], file_name=metadata_filename)

# START OF POST_PROCESSING
# post-process gtiffs to appropriate formats

parse_sh_gtiff_to_format(job, bucket)

# END OF POST_PROCESSING

results = bucket.get_data_from_bucket(prefix=job["batch_request_id"])
log(INFO, f"Fetched all results: {str(results)}")

Expand Down
2 changes: 2 additions & 0 deletions rest/authentication/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
from authentication.user import OIDCUser, SHUser
from authentication.utils import decode_sh_access_token

from openeoerrors import TokenInvalid


class AuthScheme(Enum):
BASIC = "basic"
Expand Down
5 changes: 5 additions & 0 deletions rest/buckets/results_bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ def put_file_to_bucket(self, content_as_string, prefix=None, file_name="file"):

self.client.put_object(Bucket=self.bucket_name, Key=file_path, Body=content_as_string)

def upload_file_to_bucket(self, local_file_path, prefix=None, file_name="file"):
s3_file_path = prefix + "/" + file_name if prefix else file_name

self.client.upload_file(local_file_path, self.bucket_name, s3_file_path)

def get_data_from_bucket(self, prefix=None):
continuation_token = None
results = []
Expand Down
17 changes: 17 additions & 0 deletions rest/output_formats/NetCDF.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"gis_data_types": [
"raster"
],
"parameters": {
"datatype": {
"type": "string",
"description": "The values data type.",
"enum": [
"byte",
"uint16",
"float32"
],
"default": "float32"
}
}
}
17 changes: 17 additions & 0 deletions rest/output_formats/ZARR.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"gis_data_types": [
"raster"
],
"parameters": {
"datatype": {
"type": "string",
"description": "The values data type.",
"enum": [
"byte",
"uint16",
"float32"
],
"default": "float32"
}
}
}
Empty file.
10 changes: 10 additions & 0 deletions rest/post_processing/const.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from sentinelhub import MimeType
from processing.const import CustomMimeType

TMP_FOLDER = "/tmp/"

parsed_output_file_name = {
MimeType.TIFF: {"name": "output", "ext": ".tif"},
CustomMimeType.ZARR: {"name": "output", "ext": ".zarr"},
CustomMimeType.NETCDF: {"name": "output", "ext": ".nc"},
}
110 changes: 110 additions & 0 deletions rest/post_processing/gtiff_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import os
import shutil
import rioxarray
from dateutil import parser
import pandas as pd
import xarray as xr
import requests
from sentinelhub import MimeType

from processing.const import CustomMimeType
from openeoerrors import Internal

# assume it's only 1 time and 1 bands dimension
def check_dimensions(time_dimensions, bands_dimensions):
if len(time_dimensions) == 0:
raise Internal("No time dimensions exist. Only 1 time dimension is supported.")

if len(time_dimensions) > 1:
raise Internal("More than 1 time dimension exist. Only 1 time dimension is supported.")

if len(bands_dimensions) == 0:
raise Internal("No bands dimensions exist. Only 1 bands dimension is supported.")

if len(bands_dimensions) > 1:
raise Internal("More than 1 bands dimension exist. Only 1 bands dimension is supported.")


def get_timestamps_arrays(datacube_time_as_bands, time_dimensions, bands_dimensions, output_format):
num_of_img_bands = len(datacube_time_as_bands["band"])
num_of_bands_dimension = len(bands_dimensions[0]["labels"])

list_of_timestamps = []
list_of_timestamp_arrays = []

for i in range(0, num_of_img_bands, num_of_bands_dimension):
date = time_dimensions[0]["labels"][int(i / num_of_bands_dimension)]
timestamp_array = datacube_time_as_bands[i : i + num_of_bands_dimension]

if output_format in [CustomMimeType.NETCDF, CustomMimeType.ZARR]:
pandas_time = pd.to_datetime(parser.parse(date))
timestamp_array = timestamp_array.assign_coords(band=bands_dimensions[0]["labels"])
timestamp_array = timestamp_array.assign_coords(t=pandas_time)
timestamp_array = timestamp_array.expand_dims(dim="t")

list_of_timestamps.append(date)
list_of_timestamp_arrays.append(timestamp_array)

return list_of_timestamps, list_of_timestamp_arrays


def save_as_gtiff(list_of_timestamps, list_of_timestamp_arrays, output_dir, output_name):
output_file_paths = []
for array, date in zip(list_of_timestamp_arrays, list_of_timestamps):
file_name = f"{output_name['name']}_{date}{output_name['ext']}"
file_path = os.path.join(output_dir, file_name)
output_file_paths.append(file_path)

array.rio.to_raster(
file_path,
tiled=True, # GDAL: By default striped TIFF files are created. This option can be used to force creation of tiled TIFF files.
windowed=True, # rioxarray: read & write one window at a time
)
return output_file_paths


def save_as_netcdf(list_of_timestamp_arrays, output_dir, output_name):
datacube_with_time_dimension = xr.combine_by_coords(list_of_timestamp_arrays)
output_file_path = os.path.join(output_dir, f"{output_name['name']}{output_name['ext']}")
datacube_with_time_dimension.to_netcdf(output_file_path)
return [output_file_path]


def save_as_zarr(list_of_timestamp_arrays, output_dir, output_name):
datacube_with_time_dimension = xr.combine_by_coords(list_of_timestamp_arrays)
output_file_path = os.path.join(output_dir, f"{output_name['name']}{output_name['ext']}")
datacube_with_time_dimension.to_zarr(output_file_path)
# zip the zarr folder to avoid listing a bunch of files
shutil.make_archive(output_file_path, "zip", output_file_path)
output_file_path = f"{output_file_path}.zip"
return [output_file_path]


def parse_multitemporal_gtiff_to_format(input_tiff, input_metadata, output_dir, output_name, output_format):
datacube_time_as_bands = rioxarray.open_rasterio(input_tiff)
datacube_metadata = requests.get(input_metadata).json()

time_dimensions = [dim for dim in datacube_metadata["outputDimensions"] if dim["type"] == "temporal"]
bands_dimensions = [dim for dim in datacube_metadata["outputDimensions"] if dim["type"] == "bands"]

# mock a bands dimension (with 1 band) if it's not present in the data
# e.g. save_result process right after ndvi process which doesn't have a target band set
if len(bands_dimensions) == 0:
bands_dimensions = [{"name": "bands", "type": "bands", "labels": ["results"]}]

check_dimensions(time_dimensions, bands_dimensions)

list_of_timestamps, list_of_timestamp_arrays = get_timestamps_arrays(
datacube_time_as_bands, time_dimensions, bands_dimensions, output_format
)

if output_format == MimeType.TIFF:
return save_as_gtiff(list_of_timestamps, list_of_timestamp_arrays, output_dir, output_name)

if output_format == CustomMimeType.NETCDF:
return save_as_netcdf(list_of_timestamp_arrays, output_dir, output_name)

if output_format == CustomMimeType.ZARR:
return save_as_zarr(list_of_timestamp_arrays, output_dir, output_name)

raise Internal(f"Parsing to format {output_format} is not supported")
73 changes: 73 additions & 0 deletions rest/post_processing/post_processing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import os
import json
import shutil

from processing.const import ShBatchResponseOutput, ProcessingRequestTypes
from processing.processing import new_process
from post_processing.gtiff_parser import parse_multitemporal_gtiff_to_format
from post_processing.const import TMP_FOLDER, parsed_output_file_name


def check_if_already_parsed(results, output_format):
for result in results:
if (
parsed_output_file_name[output_format]["name"] in result["Key"]
and parsed_output_file_name[output_format]["ext"] in result["Key"]
):
return True

return False


def generate_subfolder_groups(batch_request_id, bucket, results):
subfolder_groups = {}
for result in results:
for output in [ShBatchResponseOutput.DATA, ShBatchResponseOutput.METADATA]:
if output.value in result["Key"]:
url = bucket.generate_presigned_url(object_key=result["Key"])
subfolder_name = (
result["Key"].replace(f"{batch_request_id}", "").replace("/", "").split(output.value)[0]
)
if subfolder_name not in subfolder_groups:
subfolder_groups[subfolder_name] = {}
subfolder_groups[subfolder_name][output.value] = url

return subfolder_groups


def upload_output_to_bucket(local_file_paths, bucket):
for path in local_file_paths:
s3_path = path[len(f"{TMP_FOLDER}") :]
bucket.upload_file_to_bucket(path, None, s3_path)


def parse_sh_gtiff_to_format(job, bucket):
batch_request_id = job["batch_request_id"]
results = bucket.get_data_from_bucket(prefix=batch_request_id)

process = new_process(json.loads(job["process"]), request_type=ProcessingRequestTypes.BATCH)
output_format = process.get_mimetype()

if check_if_already_parsed(results, output_format):
return

subfolder_groups = generate_subfolder_groups(batch_request_id, bucket, results)

for subfolder_id, subfolder_group in subfolder_groups.items():
input_tiff = subfolder_group[ShBatchResponseOutput.DATA.value]
input_metadata = subfolder_group[ShBatchResponseOutput.METADATA.value]

# preventively remove directory and create it again
batch_request_dir = f"{TMP_FOLDER}{batch_request_id}"
batch_subfolder = f"{batch_request_dir}/{subfolder_id}/"
if os.path.exists(batch_request_dir):
shutil.rmtree(batch_request_dir)
os.makedirs(batch_subfolder)

output_file_paths = parse_multitemporal_gtiff_to_format(
input_tiff, input_metadata, batch_subfolder, parsed_output_file_name[output_format], output_format
)
upload_output_to_bucket(output_file_paths, bucket)

# remove folder after the folder/file has been uploaded
shutil.rmtree(batch_request_dir)
33 changes: 30 additions & 3 deletions rest/processing/const.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,27 @@
from enum import Enum
import mimetypes
from sentinelhub import MimeType

from sentinelhub import MimeType, SentinelHubBatch

class ShBatchResponseOutput(Enum):
DATA = "default"
METADATA = "userdata"


# Driver needs a way to save the originally requested format in the Process class so that
# the post-processing can parse geotiff from Sentinel Hub to the correct format.
# inspired by sentinelhub.py MimeType class
# https://github.com/sentinel-hub/sentinelhub-py/blob/master/sentinelhub/constants.py#L261
class CustomMimeType(Enum):
ZARR = "zarr"
NETCDF = "netcdf"

# This method is needed because mimetype.get_string() is called in construct_output() in rest/processing/sentinel_hub.py
def get_string(self) -> str:
# Need to get geotiff from Sentinel Hub (parsing to the correct format is done in post-processing)
if self is CustomMimeType.ZARR or self is CustomMimeType.NETCDF:
return MimeType.TIFF.get_string()
return mimetypes.types_map["." + self.value]


class SampleType(Enum):
Expand All @@ -24,12 +45,16 @@ def from_gdal_datatype(gdal_datatype):
MimeType.PNG: SampleType.UINT8,
MimeType.JPG: SampleType.UINT8,
MimeType.TIFF: SampleType.FLOAT32,
CustomMimeType.ZARR: SampleType.FLOAT32,
CustomMimeType.NETCDF: SampleType.FLOAT32,
}

supported_sample_types = {
MimeType.PNG: [SampleType.UINT8, SampleType.UINT16],
MimeType.JPG: [SampleType.UINT8],
MimeType.TIFF: [SampleType.UINT8, SampleType.UINT16, SampleType.FLOAT32],
CustomMimeType.ZARR: [SampleType.UINT8, SampleType.UINT16, SampleType.FLOAT32],
CustomMimeType.NETCDF: [SampleType.UINT8, SampleType.UINT16, SampleType.FLOAT32],
}

sample_types_to_bytes = {
Expand All @@ -53,6 +78,8 @@ def get_unsupported_mimetype_message(self):
supported_mime_types = {
ProcessingRequestTypes.BATCH: {
"gtiff": MimeType.TIFF,
"zarr": CustomMimeType.ZARR,
"netcdf": CustomMimeType.NETCDF,
},
ProcessingRequestTypes.SYNC: {
"gtiff": MimeType.TIFF,
Expand All @@ -62,6 +89,6 @@ def get_unsupported_mimetype_message(self):
}

supported_mime_types_error_msg = {
ProcessingRequestTypes.BATCH: "Currently only GTIFF is supported.'",
ProcessingRequestTypes.SYNC: "Currently supported formats are GTIFF, PNG or JPEG",
ProcessingRequestTypes.BATCH: "Currently supported formats are GTIFF, NETCDF and ZARR.",
ProcessingRequestTypes.SYNC: "Currently supported formats are GTIFF, PNG and JPEG.",
}
6 changes: 5 additions & 1 deletion rest/processing/sentinel_hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from buckets import BUCKET_NAMES
from processing.processing_api_request import ProcessingAPIRequest
from processing.const import ShBatchResponseOutput


class SentinelHub:
Expand Down Expand Up @@ -111,7 +112,10 @@ def construct_data_processing(self, resampling_method):

def construct_output(self, width, height, mimetype):
output = {
"responses": [{"identifier": "default", "format": {"type": mimetype.get_string()}}],
"responses": [
{"identifier": ShBatchResponseOutput.DATA.value, "format": {"type": mimetype.get_string()}},
{"identifier": ShBatchResponseOutput.METADATA.value, "format": {"type": "application/json"}},
],
}
if width is not None:
output["width"] = width
Expand Down

0 comments on commit ea32438

Please sign in to comment.