Skip to content

Commit

Permalink
Merge branch 'master' into feature/lambda-docker
Browse files Browse the repository at this point in the history
  • Loading branch information
zcernigoj committed Oct 24, 2023
2 parents 8bb78d1 + ea32438 commit 5e44ddb
Show file tree
Hide file tree
Showing 10 changed files with 1,243 additions and 25 deletions.
62 changes: 47 additions & 15 deletions rest/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,14 @@
from dynamodb import JobsPersistence, ProcessGraphsPersistence, ServicesPersistence
from processing.processing import (
check_process_graph_conversion_validity,
get_batch_job_estimate,
process_data_synchronously,
create_batch_job,
start_batch_job,
cancel_batch_job,
modify_batch_job,
get_batch_job_estimate,
get_batch_job_status,
create_or_get_estimate_values_from_db,
)
from post_processing.post_processing import parse_sh_gtiff_to_format
from processing.utils import inject_variables_in_process_graph, overwrite_spatial_extent_without_parameters
Expand Down Expand Up @@ -480,17 +481,28 @@ def api_batch_job(job_id):

if flask.request.method == "GET":
status, error = get_batch_job_status(job["batch_request_id"], job["deployment_endpoint"])
data_to_jsonify = {
"id": job_id,
"title": job.get("title", None),
"description": job.get("description", None),
"process": {"process_graph": json.loads(job["process"])["process_graph"]},
"status": status.value,
"error": error,
"created": convert_timestamp_to_simpler_format(job["created"]),
"updated": convert_timestamp_to_simpler_format(job["last_updated"]),
}

if status is not openEOBatchJobStatus.CREATED:
data_to_jsonify["costs"] = float(job.get("sum_costs", 0))
data_to_jsonify["usage"] = {
"Platform Credits": {"unit": "credits", "value": round(float(job.get("sum_costs", 0)) * 0.15, 3)},
"Sentinel Hub": {
"unit": "sentinelhub_processing_unit",
"value": float(job.get("sum_costs", 0)),
},
}
return flask.make_response(
jsonify(
id=job_id,
title=job.get("title", None),
description=job.get("description", None),
process={"process_graph": json.loads(job["process"])["process_graph"]},
status=status.value,
error=error,
created=convert_timestamp_to_simpler_format(job["created"]),
updated=convert_timestamp_to_simpler_format(job["last_updated"]),
),
jsonify(data_to_jsonify),
200,
)

Expand All @@ -513,6 +525,19 @@ def api_batch_job(job_id):
update_batch_request_id(job_id, job, new_batch_request_id)
data["deployment_endpoint"] = deployment_endpoint

if json.dumps(data.get("process"), sort_keys=True) != json.dumps(
json.loads(job.get("process")), sort_keys=True
):
estimated_sentinelhub_pu, estimated_file_size = get_batch_job_estimate(
new_batch_request_id, data.get("process"), deployment_endpoint
)
estimated_platform_credits = round(estimated_sentinelhub_pu * 0.15, 3)
JobsPersistence.update_key(
job["id"], "estimated_sentinelhub_pu", str(round(estimated_sentinelhub_pu, 3))
)
JobsPersistence.update_key(job["id"], "estimated_platform_credits", str(estimated_platform_credits))
JobsPersistence.update_key(job["id"], "estimated_file_size", str(estimated_file_size))

for key in data:
JobsPersistence.update_key(job_id, key, data[key])

Expand Down Expand Up @@ -612,7 +637,6 @@ def add_job_to_queue(job_id):

# we can create a /results_metadata.json file here
# the contents of the batch job folder in the bucket isn't revealed anywhere else anyway

metadata_creation_time = datetime.utcnow().strftime(ISO8601_UTC_FORMAT)
batch_job_metadata = {
"type": "Feature",
Expand All @@ -627,6 +651,13 @@ def add_job_to_queue(job_id):
"title": job.get("title", None),
"datetime": metadata_creation_time,
"expires": metadata_valid,
"usage": {
"Platform credits": {"unit": "credits", "value": job.get("estimated_platform_credits", 0)},
"Sentinel Hub": {
"unit": "sentinelhub_processing_unit",
"value": job.get("estimated_sentinelhub_pu", 0),
},
},
"processing:expression": {"format": "openeo", "expression": json.loads(job["process"])},
},
"links": links,
Expand Down Expand Up @@ -663,11 +694,12 @@ def estimate_job_cost(job_id):
if job is None:
raise JobNotFound()

estimated_pu, estimated_file_size = get_batch_job_estimate(
job["batch_request_id"], json.loads(job["process"]), job["deployment_endpoint"]
estimated_sentinelhub_pu, _, estimated_file_size = create_or_get_estimate_values_from_db(
job, job["batch_request_id"]
)

return flask.make_response(
jsonify(costs=estimated_pu, size=estimated_file_size),
jsonify(costs=estimated_sentinelhub_pu, size=estimated_file_size),
200,
)

Expand Down
3 changes: 2 additions & 1 deletion rest/authentication/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
Internal,
CredentialsInvalid,
BillingPlanInvalid,
TokenInvalid,
)
from authentication.oidc_providers import oidc_providers
from authentication.user import OIDCUser, SHUser
Expand Down Expand Up @@ -62,7 +63,7 @@ def authenticate_user_oidc(self, access_token, oidc_provider_id):
user_id = userinfo["sub"]

try:
user = OIDCUser(user_id, oidc_userinfo=userinfo)
user = OIDCUser(user_id, oidc_userinfo=userinfo, access_token=access_token)
except BillingPlanInvalid:
return None

Expand Down
9 changes: 8 additions & 1 deletion rest/authentication/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,23 @@ def get_user_info(self):
user_info["default_plan"] = self.default_plan.name
return user_info

def get_leftover_credits(self):
pass

def report_usage(self, pu_spent, job_id=None):
pass


class OIDCUser(User):
def __init__(self, user_id=None, oidc_userinfo={}):
def __init__(self, user_id=None, oidc_userinfo={}, access_token=None):
super().__init__(user_id)
self.entitlements = [
self.convert_entitlement(entitlement) for entitlement in oidc_userinfo.get("eduperson_entitlement", [])
]
self.oidc_userinfo = oidc_userinfo
self.default_plan = OpenEOPBillingPlan.get_billing_plan(self.entitlements)
self.session = central_user_sentinelhub_session
self.access_token = access_token

def __str__(self):
return f"{self.__class__.__name__}: {self.user_id}"
Expand All @@ -60,6 +64,9 @@ def get_user_info(self):
user_info["info"] = {"oidc_userinfo": self.oidc_userinfo}
return user_info

def get_leftover_credits(self):
return usageReporting.get_leftover_credits_for_user(self.access_token)

def report_usage(self, pu_spent, job_id=None):
usageReporting.report_usage(self.user_id, pu_spent, job_id)

Expand Down
14 changes: 12 additions & 2 deletions rest/dynamodb/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
FAKE_AWS_ACCESS_KEY_ID = "AKIAIOSFODNN7EXAMPLE"
FAKE_AWS_SECRET_ACCESS_KEY = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"

USED_RESERVED_WORDS = ["plan"]


class DeploymentTypes(Enum):
PRODUCTION = "production"
Expand Down Expand Up @@ -121,12 +123,17 @@ def update_key(cls, record_id, key, new_value):
else:
new_value = str(new_value)

updated_item = cls.dynamodb.update_item(
kwargs = dict(
TableName=cls.TABLE_NAME,
Key={"id": {"S": record_id}},
UpdateExpression="SET {} = :new_content".format(key),
ExpressionAttributeValues={":new_content": {data_type: new_value}},
)
if key in USED_RESERVED_WORDS:
kwargs["UpdateExpression"] = "SET #{} = :new_content".format(key)
kwargs["ExpressionAttributeNames"] = {"#{}".format(key): "{}".format(key)}

updated_item = cls.dynamodb.update_item(**kwargs)
return updated_item

@classmethod
Expand Down Expand Up @@ -204,6 +211,10 @@ def create(cls, data):
"http_code": {"N": data.get("http_code", "200")},
"results": {"S": json.dumps(data.get("results"))},
"deployment_endpoint": {"S": data.get("deployment_endpoint", "https://services.sentinel-hub.com")},
"estimated_sentinelhub_pu": {"N": data.get("estimated_sentinelhub_pu", "0")},
"estimated_platform_credits": {"N": data.get("estimated_platform_credits", "0")},
"estimated_file_size": {"N": data.get("estimated_file_size", "0")},
"sum_costs": {"N": data.get("sum_costs", "0")},
}
if data.get("title"):
item["title"] = {"S": str(data.get("title"))}
Expand Down Expand Up @@ -316,7 +327,6 @@ def create(cls, data):


if __name__ == "__main__":

# To create tables, run:
# $ pipenv shell
# <shell> $ DEPLOYMENT_TYPE="production" ./dynamodb.py
Expand Down
Loading

0 comments on commit 5e44ddb

Please sign in to comment.