Skip to content

Commit

Permalink
Merge pull request #690 from NOAA-OWP/viz-rds-parameter-group-tuning
Browse files Browse the repository at this point in the history
In an effort to stabilize the Viz RDS database, a handful of things are being done:

- Upgrade the RDS Instance type to have more CPU and Memory
- Tune the parameter group to better utilize the RDS Instance resources
- Refactor all of the Lambdas used in the Viz Pipelines to better handle DB queries and connections
  • Loading branch information
nickchadwick-noaa authored Apr 8, 2024
2 parents 8d80bc4 + ab93e79 commit 008e3c5
Show file tree
Hide file tree
Showing 26 changed files with 313 additions and 303 deletions.
36 changes: 19 additions & 17 deletions Core/LAMBDA/layers/viz_lambda_shared_funcs/python/viz_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,14 @@ def load_df_into_db(self, table_name, df, drop_first=True):
###################################
def run_sql_file_in_db(self, sql_file):
sql = open(sql_file, 'r').read()
with self.connection as db_connection:
with self.connection:
try:
cur = db_connection.cursor()
print(f"---> Running {sql_file}")
cur.execute(sql)
db_connection.commit()
with self.connection.cursor() as cur:
print(f"---> Running {sql_file}")
cur.execute(sql)
except Exception as e:
raise e
self.connection.close()

###################################
def run_sql_in_db(self, sql, return_geodataframe=False):
Expand All @@ -119,21 +119,23 @@ def run_sql_in_db(self, sql, return_geodataframe=False):
###################################
def get_est_row_count_in_table(self, table):
print(f"Getting estimated total rows in {table}.")
with self.connection as db_connection:
with self.connection:
try:
cur = db_connection.cursor()
sql = f"""
SELECT (CASE WHEN c.reltuples < 0 THEN NULL -- never vacuumed
WHEN c.relpages = 0 THEN float8 '0' -- empty table
ELSE c.reltuples / c.relpages END
* (pg_catalog.pg_relation_size(c.oid) / pg_catalog.current_setting('block_size')::int))::bigint
FROM pg_catalog.pg_class c
WHERE c.oid = '{table}'::regclass; -- schema-qualified table here
"""
cur.execute(sql)
rows = cur.fetchone()[0]
with self.connection.cursor() as cur:
sql = f"""
SELECT (CASE WHEN c.reltuples < 0 THEN NULL -- never vacuumed
WHEN c.relpages = 0 THEN float8 '0' -- empty table
ELSE c.reltuples / c.relpages END
* (pg_catalog.pg_relation_size(c.oid) / pg_catalog.current_setting('block_size')::int))::bigint
FROM pg_catalog.pg_class c
WHERE c.oid = '{table}'::regclass; -- schema-qualified table here
"""
cur.execute(sql)
rows = cur.fetchone()[0]
except Exception as e:
raise e
self.connection.close()

return rows

###################################
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,12 +456,14 @@ def get_service_metadata(include_ingest_sources=True, include_latest_ref_time=Fa
ON a.ingest_table = b.target group by service) AS ref_times
on admin.services.service = ref_times.service2"""


connection = get_db_connection("viz")
with connection.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
cur.execute(f"SELECT * FROM admin.services{extra_sql};")
column_names = [desc[0] for desc in cur.description]
response = cur.fetchall()
cur.close()
with connection:
with connection.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur:
cur.execute(f"SELECT * FROM admin.services{extra_sql};")
column_names = [desc[0] for desc in cur.description]
response = cur.fetchall()
# cur.close()
connection.close()
return list(map(lambda x: dict(zip(column_names, x)), response))

Expand Down Expand Up @@ -493,17 +495,16 @@ def load_df_into_db(table_name, db_engine, df):
def run_sql_file_in_db(db_type, sql_file):
print("Getting connection to run sql files")
sql = open(sql_file, 'r').read()
db_connection = get_db_connection(db_type)

try:
cur = db_connection.cursor()
print(f"Running {sql_file}")
cur.execute(sql)
db_connection.commit()
except Exception as e:
raise e
finally:
db_connection.close()
connection = get_db_connection(db_type)
with connection:
try:
with connection.cursor() as cur:
print(f"Running {sql_file}")
cur.execute(sql)
except Exception as e:
raise e
connection.close()


def move_data_to_another_db(origin_db, dest_db, origin_table, dest_table, stage=True, add_oid=True,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@ def lambda_handler(event, context):
if step == 'domain':
print(f'Executing {step}.sql...')

with viz_db.get_db_connection() as connection:
cur = connection.cursor()
cur.execute(sql)
result = cur.fetchone()
connection.commit()
connection = viz_db.get_db_connection()
with connection:
with connection.cursor() as cur:
cur.execute(sql)
result = cur.fetchone()
connection.close()

reference_time = dt.datetime.strptime(result[0], '%Y-%m-%d %H:%M:%S UTC').strftime(DT_FORMAT)
run_time_obj = dt.datetime.strptime(event['run_time'], '%Y-%m-%dT%H:%M:%SZ')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,30 +543,32 @@ def setup_db_table(db_fim_table, viz_db, sql_replace=None):
db_schema = db_fim_table.split('.')[0]

print(f"Setting up {db_fim_table}")

with viz_db.get_db_connection() as connection:
cur = connection.cursor()

# See if the target table exists #TODO: Ensure table exists would make a good helper function
cur.execute(f"SELECT EXISTS (SELECT FROM pg_tables WHERE schemaname = '{db_fim_table.split('.')[0]}' AND tablename = '{db_fim_table.split('.')[1]}');")
table_exists = cur.fetchone()[0]

# If the target table doesn't exist, create one basd on the sql_replace dict.
if not table_exists:
print(f"--> {db_fim_table} does not exist. Creating now.")
original_table = list(sql_replace.keys())[list(sql_replace.values()).index(db_fim_table)] #ToDo: error handling if not in list
cur.execute(f"DROP TABLE IF EXISTS {db_fim_table}; CREATE TABLE {db_fim_table} (LIKE {original_table})")
connection = viz_db.get_db_connection()
with connection:
with connection.cursor() as cur:
# See if the target table exists #TODO: Ensure table exists would make a good helper function
cur.execute(f"SELECT EXISTS (SELECT FROM pg_tables WHERE schemaname = '{db_fim_table.split('.')[0]}' AND tablename = '{db_fim_table.split('.')[1]}');")
table_exists = cur.fetchone()[0]

# If the target table doesn't exist, create one basd on the sql_replace dict.
if not table_exists:
print(f"--> {db_fim_table} does not exist. Creating now.")
original_table = list(sql_replace.keys())[list(sql_replace.values()).index(db_fim_table)] #ToDo: error handling if not in list
cur.execute(f"DROP TABLE IF EXISTS {db_fim_table}; CREATE TABLE {db_fim_table} (LIKE {original_table})")
connection.commit()

# Drop the existing index on the target table
print("Dropping target table index (if exists).")
SQL = f"DROP INDEX IF EXISTS {db_schema}.{index_name};"
cur.execute(SQL)

# Truncate all records.
print("Truncating target table.")
SQL = f"TRUNCATE TABLE {db_fim_table};"
cur.execute(SQL)
connection.commit()

# Drop the existing index on the target table
print("Dropping target table index (if exists).")
SQL = f"DROP INDEX IF EXISTS {db_schema}.{index_name};"
cur.execute(SQL)

# Truncate all records.
print("Truncating target table.")
SQL = f"TRUNCATE TABLE {db_fim_table};"
cur.execute(SQL)
connection.commit()
connection.close()

return db_fim_table
132 changes: 66 additions & 66 deletions Core/LAMBDA/viz_functions/viz_db_ingest/lambda_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,80 +56,80 @@ def lambda_handler(event, context):
return json.dumps(dump_dict)

viz_db = database(db_type="viz")
with viz_db.get_db_connection() as connection:
cursor = connection.cursor()
try:
nwm_version = 0
try:
nwm_version = 0

if file.endswith('.nc'):
ds = xr.open_dataset(download_path)
ds_vars = [var for var in ds.variables]
if file.endswith('.nc'):
ds = xr.open_dataset(download_path)
ds_vars = [var for var in ds.variables]

if not target_cols:
target_cols = ds_vars
if not target_cols:
target_cols = ds_vars

try:
if "hawaii" in file:
ds['forecast_hour'] = int(int(re.findall("(\d{8})/[a-z0-9_]*/.*t(\d{2})z.*[ftm](\d*)\.", file)[0][-1])/100)
else:
ds['forecast_hour'] = int(re.findall("(\d{8})/[a-z0-9_]*/.*t(\d{2})z.*[ftm](\d*)\.", file)[0][-1])
if 'forecast_hour' not in target_cols:
target_cols.append('forecast_hour')
except:
print("Regex pattern for the forecast hour didn't match the netcdf file")
try:
ds['nwm_vers'] = float(ds.NWM_version_number.replace("v",""))
if 'nwm_vers' not in target_cols:
target_cols.append('nwm_vers')
except:
print("NWM_version_number property is not available in the netcdf file")
try:
if "hawaii" in file:
ds['forecast_hour'] = int(int(re.findall("(\d{8})/[a-z0-9_]*/.*t(\d{2})z.*[ftm](\d*)\.", file)[0][-1])/100)
else:
ds['forecast_hour'] = int(re.findall("(\d{8})/[a-z0-9_]*/.*t(\d{2})z.*[ftm](\d*)\.", file)[0][-1])
if 'forecast_hour' not in target_cols:
target_cols.append('forecast_hour')
except:
print("Regex pattern for the forecast hour didn't match the netcdf file")

try:
ds['nwm_vers'] = float(ds.NWM_version_number.replace("v",""))
if 'nwm_vers' not in target_cols:
target_cols.append('nwm_vers')
except:
print("NWM_version_number property is not available in the netcdf file")

drop_vars = [var for var in ds_vars if var not in target_cols]
df = ds.to_dataframe().reset_index()
df = df.drop(columns=drop_vars)
ds.close()
if 'streamflow' in target_cols:
df = df.loc[df['streamflow'] >= keep_flows_at_or_above].round({'streamflow': 2}).copy() # noqa
df = df[target_cols]
drop_vars = [var for var in ds_vars if var not in target_cols]
df = ds.to_dataframe().reset_index()
df = df.drop(columns=drop_vars)
ds.close()
if 'streamflow' in target_cols:
df = df.loc[df['streamflow'] >= keep_flows_at_or_above].round({'streamflow': 2}).copy() # noqa
df = df[target_cols]

elif file.endswith('.csv'):
df = pd.read_csv(download_path)
for column in df: # Replace any 'None' strings with nulls
df[column].replace('None', np.nan, inplace=True)
df = df.copy()
else:
print("File format not supported.")
exit()
elif file.endswith('.csv'):
df = pd.read_csv(download_path)
for column in df: # Replace any 'None' strings with nulls
df[column].replace('None', np.nan, inplace=True)
df = df.copy()
else:
print("File format not supported.")
exit()

print(f"--> Preparing and Importing {file}")
f = StringIO() # Use StringIO to store the temporary text file in memory (faster than on disk)
df.to_csv(f, sep='\t', index=False, header=False)
f.seek(0)
try:
with viz_db.get_db_connection() as connection:
cursor = connection.cursor()
cursor.copy_expert(f"COPY {target_table} FROM STDIN WITH DELIMITER E'\t' null as ''", f)
connection.commit()
except (UndefinedTable, BadCopyFileFormat, InvalidTextRepresentation):
if not create_table:
raise
print(f"--> Preparing and Importing {file}")
f = StringIO() # Use StringIO to store the temporary text file in memory (faster than on disk)
df.to_csv(f, sep='\t', index=False, header=False)
f.seek(0)
try:
connection = viz_db.get_db_connection()
with connection:
with connection.cursor() as cur:
cur.copy_expert(f"COPY {target_table} FROM STDIN WITH DELIMITER E'\t' null as ''", f)
connection.close()
except (UndefinedTable, BadCopyFileFormat, InvalidTextRepresentation):
if not create_table:
raise

print("Error encountered. Recreating table now and retrying import...")
create_table_df = df.head(0)
schema, table = target_table.split('.')
create_table_df.to_sql(con=viz_db.engine, schema=schema, name=table, index=False, if_exists='replace')
with viz_db.get_db_connection() as connection:
cursor = connection.cursor()
cursor.copy_expert(f"COPY {target_table} FROM STDIN WITH DELIMITER E'\t' null as ''", f)
connection.commit()
print("Error encountered. Recreating table now and retrying import...")
create_table_df = df.head(0)
schema, table = target_table.split('.')
create_table_df.to_sql(con=viz_db.engine, schema=schema, name=table, index=False, if_exists='replace')
connection = viz_db.get_db_connection()
with connection:
with connection.cursor() as cur:
cur.copy_expert(f"COPY {target_table} FROM STDIN WITH DELIMITER E'\t' null as ''", f)
connection.close()

print(f"--> Import of {len(df)} rows Complete. Removing {download_path} and closing db connection.")
os.remove(download_path)
except Exception as e:
print(f"Error: {e}")
raise e
print(f"--> Import of {len(df)} rows Complete. Removing {download_path} and closing db connection.")
os.remove(download_path)

except Exception as e:
print(f"Error: {e}")
raise e

dump_dict = {
"file": file,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,15 @@ def run_sql(sql_path_or_str, sql_replace=None):
sql = re.sub(re.escape(word), replacement, sql, flags=re.IGNORECASE).replace('utc', 'UTC')

viz_db = database(db_type="viz")
with viz_db.get_db_connection() as connection:
cur = connection.cursor()
cur.execute(sql)
try:
result = cur.fetchone()
except:
pass
connection.commit()
connection = viz_db.get_db_connection()
with connection:
with connection.cursor() as cur:
cur.execute(sql)
try:
result = cur.fetchone()
except:
pass
connection.close()

print(f"Finished executing the SQL statement above.")
return result
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
-- We'll temporarily increase work_mem to 512MB, to help with performance on PostGIS spatial joins (default is 4MB)
SET work_mem TO '512MB';
--------------- Building Footprints ---------------
DROP TABLE IF EXISTS publish.ana_inundation_building_footprints;
SELECT
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
-- We'll temporarily increase work_mem to 512MB, to help with performance on PostGIS spatial joins (default is 4MB)
SET work_mem TO '512MB';
--------------- Building Footprints ---------------
DROP TABLE IF EXISTS publish.ana_inundation_building_footprints_hi;
SELECT
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
-- We'll temporarily increase work_mem to 512MB, to help with performance on PostGIS spatial joins (default is 4MB)
SET work_mem TO '512MB';
--------------- Building Footprints ---------------
DROP TABLE IF EXISTS publish.ana_inundation_building_footprints_prvi;
SELECT
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
-- We'll temporarily increase work_mem to 512MB, to help with performance on PostGIS spatial joins (default is 4MB)
SET work_mem TO '512MB';
--------------- Building Footprints ---------------
DROP TABLE IF EXISTS publish.ana_past_14day_max_inundation_building_footprints;
SELECT
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
-- We'll temporarily increase work_mem to 512MB, to help with performance on PostGIS spatial joins (default is 4MB)
SET work_mem TO '512MB';
--------------- Building Footprints ---------------
DROP TABLE IF EXISTS publish.ana_past_7day_max_inundation_building_footprints;
SELECT
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
-- We'll temporarily increase work_mem to 512MB, to help with performance on PostGIS spatial joins (default is 4MB)
SET work_mem TO '512MB';
--------------- Building Footprints ---------------
DROP TABLE IF EXISTS publish.mrf_gfs_max_inundation_10day_building_footprints;
SELECT
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
-- We'll temporarily increase work_mem to 512MB, to help with performance on PostGIS spatial joins (default is 4MB)
SET work_mem TO '512MB';
--------------- Building Footprints ---------------
DROP TABLE IF EXISTS publish.mrf_gfs_max_inundation_3day_building_footprints;
SELECT
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
-- We'll temporarily increase work_mem to 512MB, to help with performance on PostGIS spatial joins (default is 4MB)
SET work_mem TO '512MB';
--------------- Building Footprints ---------------
DROP TABLE IF EXISTS publish.mrf_gfs_max_inundation_5day_building_footprints;
SELECT
Expand Down
Loading

0 comments on commit 008e3c5

Please sign in to comment.