Skip to content

Commit

Permalink
Fixes issues with PR #810
Browse files Browse the repository at this point in the history
Refs #772
  • Loading branch information
shawncrawley committed Nov 27, 2024
1 parent a4d2917 commit dfb6066
Show file tree
Hide file tree
Showing 12 changed files with 276 additions and 193 deletions.
18 changes: 4 additions & 14 deletions Core/LAMBDA/layers/viz_lambda_shared_funcs/python/viz_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ def __init__(self, db_type):
self._engine = None
self._connection = None

def __exit__(self, exc_type, exc_value, traceback):
if self._connection:
self._connection.close()

@property
def engine(self):
if not self._engine:
Expand Down Expand Up @@ -89,18 +93,6 @@ def load_df_into_db(self, table_name, df, drop_first=True):
df.to_sql(con=db_engine, schema=schema, name=table, index=False, if_exists='append')
db_engine.dispose()

###################################
def execute_sql(self, sql):
if sql.endswith(".sql"):
sql = open(sql, 'r').read()
with self.connection:
try:
with self.connection.cursor() as cur:
cur.execute(sql)
except Exception as e:
raise e
self.connection.close()

###################################
def execute_sql(self, sql):
if sql.endswith('.sql') and os.path.exists(sql):
Expand All @@ -112,7 +104,6 @@ def execute_sql(self, sql):
cur.execute(sql)
except Exception as e:
raise e
self.connection.close()

###################################
def sql_to_dataframe(self, sql, return_geodataframe=False):
Expand Down Expand Up @@ -148,7 +139,6 @@ def get_est_row_count_in_table(self, table):
rows = cur.fetchone()[0]
except Exception as e:
raise e
self.connection.close()

return rows

Expand Down
2 changes: 1 addition & 1 deletion Core/LAMBDA/viz_functions/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -891,7 +891,7 @@ data "archive_file" "viz_test_wrds_db_zip" {
for_each = fileset("${path.module}", "**/*.sql")
content {
content = file("${path.module}/${source.key}")
filename = basename(source.key)
filename = "sql_files/${basename(source.key)}"
}
}
}
Expand Down
121 changes: 61 additions & 60 deletions Core/LAMBDA/viz_functions/viz_initialize_pipeline/lambda_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,56 @@

SF_CLIENT = boto3.client('stepfunctions')

# PIPELINE_INIT_FILES = [] #Swap out this for the following list to pause all pipelines.
PIPELINE_INIT_FILES = [
## ANA ##
"analysis_assim.channel_rt.tm00.conus.nc",
"analysis_assim.forcing.tm00.conus.nc",
"analysis_assim.channel_rt.tm0000.hawaii.nc",
"analysis_assim.forcing.tm00.hawaii.nc",
"analysis_assim.channel_rt.tm00.puertorico.nc",
"analysis_assim.forcing.tm00.puertorico.nc",
"analysis_assim.channel_rt.tm00.alaska.nc",
"analysis_assim.forcing.tm00.alaska.nc",

## SRF ##
"short_range.channel_rt.f018.conus.nc",
"short_range.forcing.f018.conus.nc",
"short_range.channel_rt.f04800.hawaii.nc",
"short_range.forcing.f048.hawaii.nc",
"short_range.channel_rt.f048.puertorico.nc",
"short_range.forcing.f048.puertorico.nc",
"short_range.forcing.f015.alaska.nc",
"short_range.channel_rt.f015.alaska.nc",

## MRF GFS ##
"medium_range.channel_rt_1.f240.conus.nc",
"medium_range.forcing.f240.conus.nc",
"medium_range.channel_rt.f119.conus.nc",
"medium_range.channel_rt_1.f240.alaska.nc",
"medium_range.forcing.f240.alaska.nc",

## MRF NBM ##
"medium_range_blend.channel_rt.f240.conus.nc",
"medium_range_blend.forcing.f240.conus.nc",
"medium_range_blend.channel_rt.f240.alaska.nc",
"medium_range_blend.forcing.f240.alaska.nc",

## MRF Ensemble - Currently CONUS only - Ensemble member 6 kicks off other ensemble member ingests##
# "medium_range.channel_rt_6.f240.conus.nc",

# NOTE: https://github.com/NOAA-OWP/hydrovis/issues/982
## Coastal ##
# "analysis_assim_coastal.total_water.tm00.atlgulf.nc",
# "analysis_assim_coastal.total_water.tm00.hawaii.nc",
# "analysis_assim_coastal.total_water.tm00.puertorico.nc",
# "medium_range_coastal.total_water.f240.atlgulf.nc",
# "medium_range_blend_coastal.total_water.f240.atlgulf.nc",
# "short_range_coastal.total_water.f018.atlgulf.nc",
# "short_range_coastal.total_water.f048.puertorico.nc",
# "short_range_coastal.total_water.f048.hawaii.nc"
]

###################################################################################################################################################
class DuplicatePipelineException(Exception):
""" my custom exception class """
Expand All @@ -45,70 +95,23 @@ def lambda_handler(event, context):
# Initializing the pipeline class below also does some start-up logic like this based on the event, but I'm keeping this seperate at the very top to keep the timing of those false starts as low as possible.
if "Records" in event:
sns_event = event.get('Records')[0].get('Sns')
s3_event = json.loads(sns_event.get('Message'))
s3_message = json.loads(sns_event.get('Message'))
s3_event = s3_message.get('Records')[0].get('s3')
s3_key = s3_event.get('object').get('key')

if sns_event["TopicArn"] == SNS_TOPIC__WRDS_DB_DUMP:
if any(suffix in s3_key for suffix in PIPELINE_INIT_FILES):
print(f"Continuing pipeline initialization with Shared Bucket S3 key: {s3_key}")
elif sns_event["TopicArn"] == SNS_TOPIC__WRDS_DB_DUMP:
print("Starting WRDS DB Sync Step Function...")
SF_CLIENT.start_execution(
stateMachineArn=SF_ARN__SYNC_WRDS_DB,
name=s3_event.get('object').get('key').split('/')[-1].split('.')[0],
name=s3_key.split('/')[-1].split('.')[0],
input=json.dumps(s3_event)
)
# pipeline_iniitializing_files = [] #Swap out this for the following list to pause all pipelines.
pipeline_iniitializing_files = [
## ANA ##
"analysis_assim.channel_rt.tm00.conus.nc",
"analysis_assim.forcing.tm00.conus.nc",
"analysis_assim.channel_rt.tm0000.hawaii.nc",
"analysis_assim.forcing.tm00.hawaii.nc",
"analysis_assim.channel_rt.tm00.puertorico.nc",
"analysis_assim.forcing.tm00.puertorico.nc",
"analysis_assim.channel_rt.tm00.alaska.nc",
"analysis_assim.forcing.tm00.alaska.nc",

## SRF ##
"short_range.channel_rt.f018.conus.nc",
"short_range.forcing.f018.conus.nc",
"short_range.channel_rt.f04800.hawaii.nc",
"short_range.forcing.f048.hawaii.nc",
"short_range.channel_rt.f048.puertorico.nc",
"short_range.forcing.f048.puertorico.nc",
"short_range.forcing.f015.alaska.nc",
"short_range.channel_rt.f015.alaska.nc",

## MRF GFS ##
"medium_range.channel_rt_1.f240.conus.nc",
"medium_range.forcing.f240.conus.nc",
"medium_range.channel_rt.f119.conus.nc",
"medium_range.channel_rt_1.f240.alaska.nc",
"medium_range.forcing.f240.alaska.nc",

## MRF NBM ##
"medium_range_blend.channel_rt.f240.conus.nc",
"medium_range_blend.forcing.f240.conus.nc",
"medium_range_blend.channel_rt.f240.alaska.nc",
"medium_range_blend.forcing.f240.alaska.nc",

## MRF Ensemble - Currently CONUS only - Ensemble member 6 kicks off other ensemble member ingests##
# "medium_range.channel_rt_6.f240.conus.nc",
return
else:
return

# NOTE: https://github.com/NOAA-OWP/hydrovis/issues/982
## Coastal ##
# "analysis_assim_coastal.total_water.tm00.atlgulf.nc",
# "analysis_assim_coastal.total_water.tm00.hawaii.nc",
# "analysis_assim_coastal.total_water.tm00.puertorico.nc",
# "medium_range_coastal.total_water.f240.atlgulf.nc",
# "medium_range_blend_coastal.total_water.f240.atlgulf.nc",
# "short_range_coastal.total_water.f018.atlgulf.nc",
# "short_range_coastal.total_water.f048.puertorico.nc",
# "short_range_coastal.total_water.f048.hawaii.nc"
]
if s3_event.get('Records')[0].get('s3').get('object').get('key'):
s3_key = s3_event.get('Records')[0].get('s3').get('object').get('key')
if any(suffix in s3_key for suffix in pipeline_iniitializing_files) == False:
return
else:
print(f"Continuing pipeline initialization with Shared Bucket S3 key: {s3_key}")

###### Initialize the pipeline class & configuration classes ######
#Initialize the pipeline object - This will parse the lambda event, initialize a configuration, and pull service metadata for that configuration from the viz processing database.
try:
Expand Down Expand Up @@ -667,5 +670,3 @@ def get_configuration_data_flow(self):
"db_ingest_groups": self.db_ingest_groups,
"python_preprocessing": self.lambda_input_sets
}

return
8 changes: 4 additions & 4 deletions Core/LAMBDA/viz_functions/viz_test_wrds_db/lambda_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@


THIS_DIR = Path(__file__).parent
FILES_DIR = THIS_DIR / 'files'
FILES_DIR = THIS_DIR / 'sql_files'
IGNORE_FILES = ['dba_stuff.sql']
IGNORE_TABLES = ['building_footprints_fema']

Expand All @@ -31,7 +31,7 @@ def lambda_handler(event, context):
ALTER SERVER test_wrds_location OPTIONS (fetch_size '150000');
'''

db.execute_sql(connection, sql)
db.execute_sql(sql)

for fname in FILES_DIR.iterdir():
if fname.name in IGNORE_FILES: continue
Expand Down Expand Up @@ -65,13 +65,13 @@ def lambda_handler(event, context):
sql = re.sub(f'DROP TABLE IF EXISTS {table}\\b;?', '', sql, flags=re.IGNORECASE)

print(f"Executing {fname.name} in test environment...")
db.execute_sql(connection, sql)
db.execute_sql(sql)

sql = f'''
DROP SERVER IF EXISTS test_wrds_location CASCADE;
DROP SCHEMA IF EXISTS automated_test CASCADE;
DROP SCHEMA IF EXISTS test_external CASCADE;
'''
db.execute_sql(connection, sql)
db.execute_sql(sql)

connection.close()
112 changes: 112 additions & 0 deletions Core/StepFunctions/utils/main.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
variable "environment" {
type = string
}

variable "region" {
type = string
}

variable "rds_bastion_id" {
description = "ID of the RDS Bastion EC2 machine that the DB deploys will be executed from."
type = string
}

variable "test_wrds_db_lambda_arn" {
type = string
}

variable "sync_wrds_db_role" {
type = string
}

variable "aws_instances_to_reboot" {
type = list(string)
}

variable "email_sns_topics" {
description = "SnS topics"
type = map(any)
}

####################################################
## Ensure EC2 Ready For Use Step Function ##
####################################################

resource "aws_sfn_state_machine" "ensure_ec2_ready_for_use_step_function" {
name = "hv-vpp-${var.environment}-ensure-ec2-ready-for-use"
role_arn = var.sync_wrds_db_role

definition = templatefile("${path.module}/ensure_ec2_ready.json.tftpl", {})
}

###################################################
## Restore DB From S3 Dump Step Function ##
###################################################

resource "aws_sfn_state_machine" "restore_db_from_s3_dump_step_function" {
name = "hv-vpp-${var.environment}-restore-db-from-s3"
role_arn = var.sync_wrds_db_role

definition = templatefile("${path.module}/restore_db_from_s3_dump.json.tftpl", {
ensure_ec2_ready_step_function_arn = aws_sfn_state_machine.ensure_ec2_ready_for_use_step_function.arn
rds_bastion_id = var.rds_bastion_id
region = var.region
})
}

#################################################
## Sync WRDS Location DB Step Function ##
#################################################

resource "aws_sfn_state_machine" "sync_wrds_location_db_step_function" {
name = "hv-vpp-${var.environment}-sync-wrds-location-db"
role_arn = var.sync_wrds_db_role

definition = templatefile("${path.module}/sync_wrds_location_db.json.tftpl", {
restore_db_dump_from_s3_step_function_arn = aws_sfn_state_machine.restore_db_from_s3_dump_step_function.arn
test_wrds_db_lambda_arn = var.test_wrds_db_lambda_arn
rds_bastion_id = var.rds_bastion_id
region = var.region
})
}

####### Step Function Failure / Time Out SNS #######
resource "aws_cloudwatch_event_rule" "sync_wrds_location_db_step_function_failure" {
name = "hv-vpp-${var.environment}-sync-wrds-location-db-step-function-failure"
description = "Alert when the sync wrds location db step function times out or fails."

event_pattern = <<EOF
{
"source": ["aws.states"],
"detail-type": ["Step Functions Execution Status Change"],
"detail": {
"status": ["FAILED", "TIMED_OUT"],
"stateMachineArn": ["${aws_sfn_state_machine.sync_wrds_location_db_step_function.arn}"]
}
}
EOF
}

resource "aws_cloudwatch_event_target" "sync_wrds_location_db_step_function_failure_sns" {
rule = aws_cloudwatch_event_rule.sync_wrds_location_db_step_function_failure.name
target_id = "SendToSNS"
arn = var.email_sns_topics["viz_lambda_errors"].arn
input_path = "$.detail.name"
}

################################################
## Reboot EC2 Instances Step Function ##
################################################

resource "aws_sfn_state_machine" "reboot_ec2_instances_step_function" {
name = "hv-vpp-${var.environment}-reboot-ec2-instances"
role_arn = var.sync_wrds_db_role

definition = templatefile("${path.module}/reboot_ec2_instances.json.tftpl", {
aws_instances_to_reboot = var.aws_instances_to_reboot
})
}

output "sync_wrds_location_db_step_function" {
value = aws_sfn_state_machine.sync_wrds_location_db_step_function
}
Loading

0 comments on commit dfb6066

Please sign in to comment.