Skip to content

Commit

Permalink
Merge branch 'master' into feature/deploy-production-github
Browse files Browse the repository at this point in the history
  • Loading branch information
zcernigoj committed Oct 24, 2023
2 parents b9676fb + ea32438 commit c027420
Show file tree
Hide file tree
Showing 18 changed files with 688 additions and 32 deletions.
5 changes: 4 additions & 1 deletion rest/Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@ openeo-pg-parser = {editable = true,git = "https://github.com/Ardweaden/openeo-p
requests = "==2.27.1"
flask-cors = "==3.0.10"
honeycomb-beeline = "==3.2.0"
pg-to-evalscript = "==0.2.6"
pg-to-evalscript = "==0.2.8"
sentinelhub = "==3.6.3"
isodate = "==0.6.1"
pyjwt = {extras = ["crypto"], version = "==2.3.0"}
cryptography = "==36.0.1"
placebo = "==0.9.0"
rioxarray = "*"
zarr = "*"
netcdf4 = "*"
werkzeug = "==2.3.7"
255 changes: 251 additions & 4 deletions rest/Pipfile.lock

Large diffs are not rendered by default.

70 changes: 55 additions & 15 deletions rest/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,16 @@
from dynamodb import JobsPersistence, ProcessGraphsPersistence, ServicesPersistence
from processing.processing import (
check_process_graph_conversion_validity,
get_batch_job_estimate,
process_data_synchronously,
create_batch_job,
start_batch_job,
cancel_batch_job,
modify_batch_job,
get_batch_job_estimate,
get_batch_job_status,
create_or_get_estimate_values_from_db,
)
from post_processing.post_processing import parse_sh_gtiff_to_format
from processing.utils import inject_variables_in_process_graph, overwrite_spatial_extent_without_parameters
from processing.openeo_process_errors import OpenEOProcessError
from authentication.authentication import authentication_provider
Expand Down Expand Up @@ -479,17 +481,28 @@ def api_batch_job(job_id):

if flask.request.method == "GET":
status, error = get_batch_job_status(job["batch_request_id"], job["deployment_endpoint"])
data_to_jsonify = {
"id": job_id,
"title": job.get("title", None),
"description": job.get("description", None),
"process": {"process_graph": json.loads(job["process"])["process_graph"]},
"status": status.value,
"error": error,
"created": convert_timestamp_to_simpler_format(job["created"]),
"updated": convert_timestamp_to_simpler_format(job["last_updated"]),
}

if status is not openEOBatchJobStatus.CREATED:
data_to_jsonify["costs"] = float(job.get("sum_costs", 0))
data_to_jsonify["usage"] = {
"Platform Credits": {"unit": "credits", "value": round(float(job.get("sum_costs", 0)) * 0.15, 3)},
"Sentinel Hub": {
"unit": "sentinelhub_processing_unit",
"value": float(job.get("sum_costs", 0)),
},
}
return flask.make_response(
jsonify(
id=job_id,
title=job.get("title", None),
description=job.get("description", None),
process={"process_graph": json.loads(job["process"])["process_graph"]},
status=status.value,
error=error,
created=convert_timestamp_to_simpler_format(job["created"]),
updated=convert_timestamp_to_simpler_format(job["last_updated"]),
),
jsonify(data_to_jsonify),
200,
)

Expand All @@ -512,6 +525,19 @@ def api_batch_job(job_id):
update_batch_request_id(job_id, job, new_batch_request_id)
data["deployment_endpoint"] = deployment_endpoint

if json.dumps(data.get("process"), sort_keys=True) != json.dumps(
json.loads(job.get("process")), sort_keys=True
):
estimated_sentinelhub_pu, estimated_file_size = get_batch_job_estimate(
new_batch_request_id, data.get("process"), deployment_endpoint
)
estimated_platform_credits = round(estimated_sentinelhub_pu * 0.15, 3)
JobsPersistence.update_key(
job["id"], "estimated_sentinelhub_pu", str(round(estimated_sentinelhub_pu, 3))
)
JobsPersistence.update_key(job["id"], "estimated_platform_credits", str(estimated_platform_credits))
JobsPersistence.update_key(job["id"], "estimated_file_size", str(estimated_file_size))

for key in data:
JobsPersistence.update_key(job_id, key, data[key])

Expand Down Expand Up @@ -568,6 +594,13 @@ def add_job_to_queue(job_id):
metadata_filename = "metadata.json"
bucket.put_file_to_bucket("", prefix=job["batch_request_id"], file_name=metadata_filename)

# START OF POST_PROCESSING
# post-process gtiffs to appropriate formats

parse_sh_gtiff_to_format(job, bucket)

# END OF POST_PROCESSING

results = bucket.get_data_from_bucket(prefix=job["batch_request_id"])
log(INFO, f"Fetched all results: {str(results)}")

Expand Down Expand Up @@ -604,7 +637,6 @@ def add_job_to_queue(job_id):

# we can create a /results_metadata.json file here
# the contents of the batch job folder in the bucket isn't revealed anywhere else anyway

metadata_creation_time = datetime.utcnow().strftime(ISO8601_UTC_FORMAT)
batch_job_metadata = {
"type": "Feature",
Expand All @@ -619,6 +651,13 @@ def add_job_to_queue(job_id):
"title": job.get("title", None),
"datetime": metadata_creation_time,
"expires": metadata_valid,
"usage": {
"Platform credits": {"unit": "credits", "value": job.get("estimated_platform_credits", 0)},
"Sentinel Hub": {
"unit": "sentinelhub_processing_unit",
"value": job.get("estimated_sentinelhub_pu", 0),
},
},
"processing:expression": {"format": "openeo", "expression": json.loads(job["process"])},
},
"links": links,
Expand Down Expand Up @@ -655,11 +694,12 @@ def estimate_job_cost(job_id):
if job is None:
raise JobNotFound()

estimated_pu, estimated_file_size = get_batch_job_estimate(
job["batch_request_id"], json.loads(job["process"]), job["deployment_endpoint"]
estimated_sentinelhub_pu, _, estimated_file_size = create_or_get_estimate_values_from_db(
job, job["batch_request_id"]
)

return flask.make_response(
jsonify(costs=estimated_pu, size=estimated_file_size),
jsonify(costs=estimated_sentinelhub_pu, size=estimated_file_size),
200,
)

Expand Down
5 changes: 4 additions & 1 deletion rest/authentication/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
Internal,
CredentialsInvalid,
BillingPlanInvalid,
TokenInvalid,
)
from authentication.oidc_providers import oidc_providers
from authentication.user import OIDCUser, SHUser
from authentication.utils import decode_sh_access_token

from openeoerrors import TokenInvalid


class AuthScheme(Enum):
BASIC = "basic"
Expand Down Expand Up @@ -60,7 +63,7 @@ def authenticate_user_oidc(self, access_token, oidc_provider_id):
user_id = userinfo["sub"]

try:
user = OIDCUser(user_id, oidc_userinfo=userinfo)
user = OIDCUser(user_id, oidc_userinfo=userinfo, access_token=access_token)
except BillingPlanInvalid:
return None

Expand Down
9 changes: 8 additions & 1 deletion rest/authentication/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,23 @@ def get_user_info(self):
user_info["default_plan"] = self.default_plan.name
return user_info

def get_leftover_credits(self):
pass

def report_usage(self, pu_spent, job_id=None):
pass


class OIDCUser(User):
def __init__(self, user_id=None, oidc_userinfo={}):
def __init__(self, user_id=None, oidc_userinfo={}, access_token=None):
super().__init__(user_id)
self.entitlements = [
self.convert_entitlement(entitlement) for entitlement in oidc_userinfo.get("eduperson_entitlement", [])
]
self.oidc_userinfo = oidc_userinfo
self.default_plan = OpenEOPBillingPlan.get_billing_plan(self.entitlements)
self.session = central_user_sentinelhub_session
self.access_token = access_token

def __str__(self):
return f"{self.__class__.__name__}: {self.user_id}"
Expand All @@ -60,6 +64,9 @@ def get_user_info(self):
user_info["info"] = {"oidc_userinfo": self.oidc_userinfo}
return user_info

def get_leftover_credits(self):
return usageReporting.get_leftover_credits_for_user(self.access_token)

def report_usage(self, pu_spent, job_id=None):
usageReporting.report_usage(self.user_id, pu_spent, job_id)

Expand Down
5 changes: 5 additions & 0 deletions rest/buckets/results_bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ def put_file_to_bucket(self, content_as_string, prefix=None, file_name="file"):

self.client.put_object(Bucket=self.bucket_name, Key=file_path, Body=content_as_string)

def upload_file_to_bucket(self, local_file_path, prefix=None, file_name="file"):
s3_file_path = prefix + "/" + file_name if prefix else file_name

self.client.upload_file(local_file_path, self.bucket_name, s3_file_path)

def get_data_from_bucket(self, prefix=None):
continuation_token = None
results = []
Expand Down
4 changes: 4 additions & 0 deletions rest/dynamodb/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,10 @@ def create(cls, data):
"http_code": {"N": data.get("http_code", "200")},
"results": {"S": json.dumps(data.get("results"))},
"deployment_endpoint": {"S": data.get("deployment_endpoint", "https://services.sentinel-hub.com")},
"estimated_sentinelhub_pu": {"N": data.get("estimated_sentinelhub_pu", "0")},
"estimated_platform_credits": {"N": data.get("estimated_platform_credits", "0")},
"estimated_file_size": {"N": data.get("estimated_file_size", "0")},
"sum_costs": {"N": data.get("sum_costs", "0")},
}
if data.get("title"):
item["title"] = {"S": str(data.get("title"))}
Expand Down
6 changes: 6 additions & 0 deletions rest/openeoerrors.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,9 @@ def __init__(self, width, height) -> None:

error_code = "ImageDimensionInvalid"
http_code = 400


class InsufficientCredits(SHOpenEOError):
error_code = "InsufficientCredits"
http_code = 402
message = "You do not have sufficient credits to perform this request. Please visit https://portal.terrascope.be/pages/pricing to find more information on how to buy additional credits."
17 changes: 17 additions & 0 deletions rest/output_formats/NetCDF.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"gis_data_types": [
"raster"
],
"parameters": {
"datatype": {
"type": "string",
"description": "The values data type.",
"enum": [
"byte",
"uint16",
"float32"
],
"default": "float32"
}
}
}
17 changes: 17 additions & 0 deletions rest/output_formats/ZARR.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"gis_data_types": [
"raster"
],
"parameters": {
"datatype": {
"type": "string",
"description": "The values data type.",
"enum": [
"byte",
"uint16",
"float32"
],
"default": "float32"
}
}
}
Empty file.
10 changes: 10 additions & 0 deletions rest/post_processing/const.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from sentinelhub import MimeType
from processing.const import CustomMimeType

TMP_FOLDER = "/tmp/"

parsed_output_file_name = {
MimeType.TIFF: {"name": "output", "ext": ".tif"},
CustomMimeType.ZARR: {"name": "output", "ext": ".zarr"},
CustomMimeType.NETCDF: {"name": "output", "ext": ".nc"},
}
110 changes: 110 additions & 0 deletions rest/post_processing/gtiff_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import os
import shutil
import rioxarray
from dateutil import parser
import pandas as pd
import xarray as xr
import requests
from sentinelhub import MimeType

from processing.const import CustomMimeType
from openeoerrors import Internal

# assume it's only 1 time and 1 bands dimension
def check_dimensions(time_dimensions, bands_dimensions):
if len(time_dimensions) == 0:
raise Internal("No time dimensions exist. Only 1 time dimension is supported.")

if len(time_dimensions) > 1:
raise Internal("More than 1 time dimension exist. Only 1 time dimension is supported.")

if len(bands_dimensions) == 0:
raise Internal("No bands dimensions exist. Only 1 bands dimension is supported.")

if len(bands_dimensions) > 1:
raise Internal("More than 1 bands dimension exist. Only 1 bands dimension is supported.")


def get_timestamps_arrays(datacube_time_as_bands, time_dimensions, bands_dimensions, output_format):
num_of_img_bands = len(datacube_time_as_bands["band"])
num_of_bands_dimension = len(bands_dimensions[0]["labels"])

list_of_timestamps = []
list_of_timestamp_arrays = []

for i in range(0, num_of_img_bands, num_of_bands_dimension):
date = time_dimensions[0]["labels"][int(i / num_of_bands_dimension)]
timestamp_array = datacube_time_as_bands[i : i + num_of_bands_dimension]

if output_format in [CustomMimeType.NETCDF, CustomMimeType.ZARR]:
pandas_time = pd.to_datetime(parser.parse(date))
timestamp_array = timestamp_array.assign_coords(band=bands_dimensions[0]["labels"])
timestamp_array = timestamp_array.assign_coords(t=pandas_time)
timestamp_array = timestamp_array.expand_dims(dim="t")

list_of_timestamps.append(date)
list_of_timestamp_arrays.append(timestamp_array)

return list_of_timestamps, list_of_timestamp_arrays


def save_as_gtiff(list_of_timestamps, list_of_timestamp_arrays, output_dir, output_name):
output_file_paths = []
for array, date in zip(list_of_timestamp_arrays, list_of_timestamps):
file_name = f"{output_name['name']}_{date}{output_name['ext']}"
file_path = os.path.join(output_dir, file_name)
output_file_paths.append(file_path)

array.rio.to_raster(
file_path,
tiled=True, # GDAL: By default striped TIFF files are created. This option can be used to force creation of tiled TIFF files.
windowed=True, # rioxarray: read & write one window at a time
)
return output_file_paths


def save_as_netcdf(list_of_timestamp_arrays, output_dir, output_name):
datacube_with_time_dimension = xr.combine_by_coords(list_of_timestamp_arrays)
output_file_path = os.path.join(output_dir, f"{output_name['name']}{output_name['ext']}")
datacube_with_time_dimension.to_netcdf(output_file_path)
return [output_file_path]


def save_as_zarr(list_of_timestamp_arrays, output_dir, output_name):
datacube_with_time_dimension = xr.combine_by_coords(list_of_timestamp_arrays)
output_file_path = os.path.join(output_dir, f"{output_name['name']}{output_name['ext']}")
datacube_with_time_dimension.to_zarr(output_file_path)
# zip the zarr folder to avoid listing a bunch of files
shutil.make_archive(output_file_path, "zip", output_file_path)
output_file_path = f"{output_file_path}.zip"
return [output_file_path]


def parse_multitemporal_gtiff_to_format(input_tiff, input_metadata, output_dir, output_name, output_format):
datacube_time_as_bands = rioxarray.open_rasterio(input_tiff)
datacube_metadata = requests.get(input_metadata).json()

time_dimensions = [dim for dim in datacube_metadata["outputDimensions"] if dim["type"] == "temporal"]
bands_dimensions = [dim for dim in datacube_metadata["outputDimensions"] if dim["type"] == "bands"]

# mock a bands dimension (with 1 band) if it's not present in the data
# e.g. save_result process right after ndvi process which doesn't have a target band set
if len(bands_dimensions) == 0:
bands_dimensions = [{"name": "bands", "type": "bands", "labels": ["results"]}]

check_dimensions(time_dimensions, bands_dimensions)

list_of_timestamps, list_of_timestamp_arrays = get_timestamps_arrays(
datacube_time_as_bands, time_dimensions, bands_dimensions, output_format
)

if output_format == MimeType.TIFF:
return save_as_gtiff(list_of_timestamps, list_of_timestamp_arrays, output_dir, output_name)

if output_format == CustomMimeType.NETCDF:
return save_as_netcdf(list_of_timestamp_arrays, output_dir, output_name)

if output_format == CustomMimeType.ZARR:
return save_as_zarr(list_of_timestamp_arrays, output_dir, output_name)

raise Internal(f"Parsing to format {output_format} is not supported")
Loading

0 comments on commit c027420

Please sign in to comment.