Skip to content

Commit

Permalink
Merge pull request #117 from NASA-PDS/sqs-to-lambda
Browse files Browse the repository at this point in the history
Tracking product label processing status in an RDS database and using SQS to control the flow of S3 events to lambda functions
  • Loading branch information
ramesh-maddegoda authored Jul 24, 2024
2 parents 4f93b5a + df0867d commit 434ede5
Show file tree
Hide file tree
Showing 16 changed files with 557 additions and 80 deletions.
50 changes: 44 additions & 6 deletions .secrets.baseline
Original file line number Diff line number Diff line change
Expand Up @@ -280,26 +280,64 @@
"is_secret": false
}
],
"terraform/terraform-modules/mwaa-env/mwaa_iam_policy.json": [
"terraform/terraform-modules/mwaa-env/mwaa_execution_role_iam_policy.json": [
{
"type": "AWS Sensitive Information (Experimental Plugin)",
"filename": "terraform/terraform-modules/mwaa-env/mwaa_iam_policy.json",
"hashed_secret": "55357933a7310d2db90c3fa1ed0970a7bb34ed39",
"filename": "terraform/terraform-modules/mwaa-env/mwaa_execution_role_iam_policy.json",
"hashed_secret": "9ad897024d8c36c541d7fe84084c4e9f4df00b2a",
"is_verified": false,
"line_number": 8,
"is_secret": false
}
],
"terraform/terraform-modules/mwaa-env/template_mwaa_iam_policy.json": [
"terraform/terraform-modules/mwaa-env/template_mwaa_execution_role_iam_policy.json": [
{
"type": "AWS Sensitive Information (Experimental Plugin)",
"filename": "terraform/terraform-modules/mwaa-env/template_mwaa_iam_policy.json",
"filename": "terraform/terraform-modules/mwaa-env/template_mwaa_execution_role_iam_policy.json",
"hashed_secret": "9ad897024d8c36c541d7fe84084c4e9f4df00b2a",
"is_verified": false,
"line_number": 8,
"is_secret": false
}
],
"terraform/terraform-modules/product-copy-completion-checker/lambda_inline_policy.json": [
{
"type": "AWS Sensitive Information (Experimental Plugin)",
"filename": "terraform/terraform-modules/product-copy-completion-checker/lambda_inline_policy.json",
"hashed_secret": "9ad897024d8c36c541d7fe84084c4e9f4df00b2a",
"is_verified": false,
"line_number": 11,
"is_secret": false
}
],
"terraform/terraform-modules/product-copy-completion-checker/product-copy-completion-checker.tf": [
{
"type": "AWS Sensitive Information (Experimental Plugin)",
"filename": "terraform/terraform-modules/product-copy-completion-checker/product-copy-completion-checker.tf",
"hashed_secret": "9ad897024d8c36c541d7fe84084c4e9f4df00b2a",
"is_verified": false,
"line_number": 108,
"is_secret": false
}
],
"terraform/terraform-modules/product-copy-completion-checker/template_lambda_inline_policy.json": [
{
"type": "AWS Sensitive Information (Experimental Plugin)",
"filename": "terraform/terraform-modules/product-copy-completion-checker/template_lambda_inline_policy.json",
"hashed_secret": "9ad897024d8c36c541d7fe84084c4e9f4df00b2a",
"is_verified": false,
"line_number": 11,
"is_secret": false
},
{
"type": "AWS Sensitive Information (Experimental Plugin)",
"filename": "terraform/terraform-modules/product-copy-completion-checker/template_lambda_inline_policy.json",
"hashed_secret": "55357933a7310d2db90c3fa1ed0970a7bb34ed39",
"is_verified": false,
"line_number": 41,
"is_secret": false
}
]
},
"generated_at": "2024-06-20T18:08:52Z"
"generated_at": "2024-07-24T00:38:52Z"
}
1 change: 1 addition & 0 deletions terraform/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ module "product-copy-completion-checker" {

database_availability_zones = var.database_availability_zones
airflow_env_name = var.airflow_env_name
region = var.region

depends_on = [module.common]
}
Expand Down
4 changes: 2 additions & 2 deletions terraform/terraform-modules/common/common.tf
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# Terraform script to create the common resources for PDS Nucleus

resource "aws_security_group" "nucleus_security_group" {
name = "nucleus_security_group"
description = "nucleus_security_group"
name = var.nucleus_security_group_name
description = "PDS Nucleus security group"
vpc_id = var.vpc_id

ingress {
Expand Down
8 changes: 8 additions & 0 deletions terraform/terraform-modules/common/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,11 @@ variable "mwaa_dag_s3_bucket_name" {
type = string
sensitive = true
}

variable "nucleus_security_group_name" {
description = "The name of the PDS Nucleus security group"
default = "pds_nucleus_security_group"
type = string
sensitive = true
}

6 changes: 3 additions & 3 deletions terraform/terraform-modules/mwaa-env/mwaa_env.tf
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ data "aws_iam_policy_document" "assume_role" {
data "aws_caller_identity" "current" {}

data "template_file" "mwaa_inline_policy_template" {
template = file("terraform-modules/mwaa-env/template_mwaa_iam_policy.json")
template = file("terraform-modules/mwaa-env/template_mwaa_execution_role_iam_policy.json")
vars = {
pds_nucleus_aws_account_id = data.aws_caller_identity.current.account_id
pds_nucleus_region = var.region
Expand All @@ -27,14 +27,14 @@ data "template_file" "mwaa_inline_policy_template" {

resource "local_file" "mwaa_inline_policy_file" {
content = data.template_file.mwaa_inline_policy_template.rendered
filename = "terraform-modules/mwaa-env/mwaa_iam_policy.json"
filename = "terraform-modules/mwaa-env/mwaa_execution_role_iam_policy.json"

depends_on = [data.template_file.mwaa_inline_policy_template]
}

# IAM Policy Document for Inline Policy
data "aws_iam_policy_document" "mwaa_inline_policy" {
source_policy_documents = [file("${path.module}/mwaa_iam_policy.json")]
source_policy_documents = [file("${path.module}/mwaa_execution_role_iam_policy.json")]

depends_on = [local_file.mwaa_inline_policy_file]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@
"Action": "iam:PassRole",
"Effect": "Allow",
"Resource": "arn:aws:iam::${pds_nucleus_aws_account_id}:role/pds_nucleus_*"
},
{
"Action": "lambda:InvokeFunction",
"Effect": "Allow",
"Resource": "arn:aws:lambda:${pds_nucleus_region}:${pds_nucleus_aws_account_id}:function:pds_nucleus_*"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
"Effect": "Allow",
"Action": "airflow:PublishMetrics",
"Resource": [
"arn:aws:airflow:*:441083951559:role/*/*",
"arn:aws:airflow:*:441083951559:environment/*"
"arn:aws:airflow:*:${pds_nucleus_aws_account_id}:role/*/*",
"arn:aws:airflow:*:${pds_nucleus_aws_account_id}:environment/*"
]
},
{
Expand All @@ -21,14 +21,14 @@
"ecs:DescribeTasks"
],
"Resource": [
"arn:aws:ecs:*:441083951559:task-definition/pds*:*",
"arn:aws:ecs:*:441083951559:task/pds*/*"
"arn:aws:ecs:*:${pds_nucleus_aws_account_id}:task-definition/pds*:*",
"arn:aws:ecs:*:${pds_nucleus_aws_account_id}:task/pds*/*"
]
},
{
"Effect": "Allow",
"Action": "iam:PassRole",
"Resource": "arn:aws:iam::441083951559:role/pds-nucleus*"
"Resource": "arn:aws:iam::${pds_nucleus_aws_account_id}:role/pds-nucleus*"
},
{
"Effect": "Allow",
Expand All @@ -38,11 +38,11 @@
"kms:GenerateDataKey*",
"kms:Encrypt"
],
"NotResource": "arn:aws:kms:*:441083951559:key/*",
"NotResource": "arn:aws:kms:*:${pds_nucleus_aws_account_id}:key/*",
"Condition": {
"StringLike": {
"kms:ViaService": [
"sqs.us-west-2.amazonaws.com"
"sqs.${pds_nucleus_region}.amazonaws.com"
]
}
}
Expand All @@ -54,7 +54,7 @@
"logs:GetLogEvents",
"logs:PutLogEvents"
],
"Resource": "arn:aws:logs:*:441083951559:log-group:*:log-stream:*"
"Resource": "arn:aws:logs:*:${pds_nucleus_aws_account_id}:log-group:*:log-stream:*"
},
{
"Effect": "Allow",
Expand All @@ -69,7 +69,7 @@
"logs:GetLogGroupFields",
"logs:CreateLogGroup"
],
"Resource": "arn:aws:logs:*:441083951559:log-group:*"
"Resource": "arn:aws:logs:*:${pds_nucleus_aws_account_id}:log-group:*"
},
{
"Effect": "Allow",
Expand All @@ -81,7 +81,7 @@
"sqs:ReceiveMessage",
"sqs:SendMessage"
],
"Resource": "arn:aws:sqs:us-west-2:*:airflow-celery-*"
"Resource": "arn:aws:sqs:${pds_nucleus_region}:*:airflow-celery-*"
},
{
"Effect": "Deny",
Expand Down Expand Up @@ -113,7 +113,7 @@
"logs:GetQueryResults"
],
"Resource": [
"arn:aws:logs:us-west-2:441083951559:log-group:airflow-pds-nucleus-airflow-env-*"
"arn:aws:logs:${pds_nucleus_region}:${pds_nucleus_aws_account_id}:log-group:airflow-${airflow_env_name}-*"
]
},
{
Expand All @@ -133,7 +133,12 @@
{
"Action": "iam:PassRole",
"Effect": "Allow",
"Resource": "arn:aws:iam::441083951559:role/pds_nucleus_*"
"Resource": "arn:aws:iam::${pds_nucleus_aws_account_id}:role/pds_nucleus_*"
},
{
"Action": "lambda:InvokeFunction",
"Effect": "Allow",
"Resource": "arn:aws:lambda:${pds_nucleus_region}:${pds_nucleus_aws_account_id}:function:pds_nucleus_*"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def lambda_handler(event, context):
create_product_table()
create_datafile_table()
create_product_datafile_mapping_table()
create_product_processing_status_table()
return f"Processed lambda request ID: {context.aws_request_id}"
except Exception as e:
logger.error(f"Error creating database tables. Exception: {str(e)}")
Expand All @@ -41,9 +42,9 @@ def create_product_table():
CREATE TABLE product
(
s3_url_of_product_label VARCHAR(1500) CHARACTER SET latin1,
processing_status VARCHAR(10),
completion_status VARCHAR(50),
last_updated_epoch_time BIGINT,
pds_node VARCHAR(10),
pds_node VARCHAR(50),
PRIMARY KEY (s3_url_of_product_label)
);
"""
Expand Down Expand Up @@ -92,3 +93,24 @@ def create_product_datafile_mapping_table():
database='pds_nucleus',
sql=sql)
logger.debug(f"Response for create_product_datafile_mapping_table() : {str(response)}")


def create_product_processing_status_table():
""" Created product processing status table """
sql = """
CREATE TABLE product_processing_status
(
s3_url_of_product_label VARCHAR(1500) CHARACTER SET latin1,
processing_status VARCHAR(50),
last_updated_epoch_time BIGINT,
pds_node VARCHAR(50),
batch_number VARCHAR(100),
PRIMARY KEY (s3_url_of_product_label)
);
"""
response = rds_data.execute_statement(
resourceArn=db_clust_arn,
secretArn=db_secret_arn,
database='pds_nucleus',
sql=sql)
logger.debug(f"Response for create_product_processing_status_table() : {str(response)}")
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,13 @@ def process_completed_products():

sql = """
SELECT DISTINCT s3_url_of_product_label from product
WHERE processing_status = 'INCOMPLETE' and
WHERE completion_status = 'INCOMPLETE' and
pds_node = :pds_node_param and
s3_url_of_product_label
NOT IN (SELECT s3_url_of_product_label from product_data_file_mapping
where s3_url_of_data_file
NOT IN (SELECT s3_url_of_data_file from data_file)) and s3_url_of_product_label
IN (SELECT s3_url_of_product_label from product_data_file_mapping) limit 5;
IN (SELECT s3_url_of_product_label from product_data_file_mapping) limit 100;
"""

pds_node_param = {'name': 'pds_node_param',
Expand All @@ -104,7 +104,7 @@ def process_completed_products():

for data_dict in record:
for data_type, s3_url_of_product_label in data_dict.items():
update_product_processing_status_in_database(s3_url_of_product_label, 'COMPLETE')
update_product_completion_status_in_database(s3_url_of_product_label, 'COMPLETE')
list_of_product_labels_to_process.append(s3_url_of_product_label)

if count == n:
Expand All @@ -116,22 +116,22 @@ def process_completed_products():
count = 0
list_of_product_labels_to_process = []

def update_product_processing_status_in_database(s3_url_of_product_label, processing_status):
def update_product_completion_status_in_database(s3_url_of_product_label, completion_status):
""" Updates the product processing status of the given s3_url_of_product_label """
sql = """
UPDATE product
SET processing_status = :processing_status_param,
SET completion_status = :completion_status_param,
last_updated_epoch_time = :last_updated_epoch_time_param
WHERE s3_url_of_product_label = :s3_url_of_product_label_param
"""

processing_status_param = {'name': 'processing_status_param', 'value': {'stringValue': processing_status}}
completion_status_param = {'name': 'completion_status_param', 'value': {'stringValue': completion_status}}
last_updated_epoch_time_param = {'name': 'last_updated_epoch_time_param',
'value': {'longValue': round(time.time() * 1000)}}
s3_url_of_product_label_param = {'name': 's3_url_of_product_label_param',
'value': {'stringValue': s3_url_of_product_label}}

param_set = [processing_status_param, last_updated_epoch_time_param, s3_url_of_product_label_param]
param_set = [completion_status_param, last_updated_epoch_time_param, s3_url_of_product_label_param]

response = rds_data.execute_statement(
resourceArn=db_clust_arn,
Expand All @@ -140,7 +140,7 @@ def update_product_processing_status_in_database(s3_url_of_product_label, proces
sql=sql,
parameters=param_set)

logger.debug(f"Response for update_product_processing_status_in_database: {str(response)}")
logger.debug(f"Response for update_product_completion_status_in_database: {str(response)}")

def submit_data_to_nucleus(list_of_product_labels_to_process):
""" Submits data to Nucleus """
Expand Down Expand Up @@ -168,7 +168,7 @@ def create_harvest_configs_and_trigger_nucleus(list_of_product_labels_to_process
list_of_s3_urls_to_copy.extend(get_list_of_data_files(s3_url_of_product_label))

# Generate a random suffix for harvest config file name and manifest file name to avoid conflicting duplicate file names
current_time = datetime.now().strftime("%m-%d-%Y-%H-%M-%S")
current_time = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
random_batch_number = current_time + uuid.uuid4().hex

try:
Expand Down Expand Up @@ -225,7 +225,7 @@ def create_harvest_configs_and_trigger_nucleus(list_of_product_labels_to_process
logger.error(f"Error creating harvest config files in s3 bucker: {pds_nucleus_config_bucket_name}. Exception: {str(e)}")
return

trigger_nucleus_workflow(list_of_product_labels_to_process_with_file_paths, s3_config_dir, efs_config_dir)
trigger_nucleus_workflow(random_batch_number, list_of_product_labels_to_process_with_file_paths, s3_config_dir, efs_config_dir)

logger.info(f"Triggered Nucleus workflow: {dag_name} for product labels: {list_of_product_labels_to_process_with_file_paths}")

Expand Down Expand Up @@ -268,7 +268,7 @@ def get_list_of_data_files(s3_url_of_product_label):
return list_of_data_files


def trigger_nucleus_workflow(list_of_product_labels_to_process, s3_config_dir, efs_config_dir):
def trigger_nucleus_workflow(random_batch_number, list_of_product_labels_to_process, s3_config_dir, efs_config_dir):
""" Triggers Nucleus workflow with parameters """

# Convert list to comma seperated list
Expand All @@ -290,9 +290,17 @@ def trigger_nucleus_workflow(list_of_product_labels_to_process, s3_config_dir, e
list_of_product_labels_to_process_key = "list_of_product_labels_to_process"
list_of_product_labels_to_process_value = str(comma_seperated_list_of_product_labels_to_process)

pds_node_name_key = "pds_node_name"
pds_node_name_value = pds_node_name

batch_number_key = "batch_number"
batch_number_value = random_batch_number

conf = "{\"" + \
s3_config_dir_key + "\":\"" + s3_config_dir_value + "\",\"" + \
list_of_product_labels_to_process_key + "\":\"" + list_of_product_labels_to_process_value + "\",\"" + \
pds_node_name_key + "\":\"" + pds_node_name_value + "\",\"" + \
batch_number_key + "\":\"" + batch_number_value + "\",\"" + \
efs_config_dir_key + "\":\"" + efs_config_dir_value + "\"}"

logger.info(f"Triggering Nucleus workflow {dag_name} with parameters : {conf}")
Expand Down
Loading

0 comments on commit 434ede5

Please sign in to comment.