Skip to content

Commit

Permalink
Revert "Fixes bug with viz_db_ingest Lambda (#652)"
Browse files Browse the repository at this point in the history
This reverts commit 4c34bb8.
  • Loading branch information
nickchadwick-noaa authored Mar 28, 2024
1 parent 4c34bb8 commit ca218e7
Showing 1 changed file with 68 additions and 63 deletions.
131 changes: 68 additions & 63 deletions Core/LAMBDA/viz_functions/viz_db_ingest/lambda_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,81 +56,86 @@ def lambda_handler(event, context):
return json.dumps(dump_dict)

viz_db = database(db_type="viz")
nwm_version = 0

if file.endswith('.nc'):
ds = xr.open_dataset(download_path)
ds_vars = [var for var in ds.variables]
with viz_db.get_db_connection() as connection:
cursor = connection.cursor()
try:
nwm_version = 0

if not target_cols:
target_cols = ds_vars
if file.endswith('.nc'):
ds = xr.open_dataset(download_path)
ds_vars = [var for var in ds.variables]

try:
if "hawaii" in file:
ds['forecast_hour'] = int(int(re.findall("(\d{8})/[a-z0-9_]*/.*t(\d{2})z.*[ftm](\d*)\.", file)[0][-1])/100)
else:
ds['forecast_hour'] = int(re.findall("(\d{8})/[a-z0-9_]*/.*t(\d{2})z.*[ftm](\d*)\.", file)[0][-1])
if 'forecast_hour' not in target_cols:
target_cols.append('forecast_hour')
except:
print("Regex pattern for the forecast hour didn't match the netcdf file")

try:
ds['nwm_vers'] = float(ds.NWM_version_number.replace("v",""))
if 'nwm_vers' not in target_cols:
target_cols.append('nwm_vers')
except:
print("NWM_version_number property is not available in the netcdf file")
if not target_cols:
target_cols = ds_vars

drop_vars = [var for var in ds_vars if var not in target_cols]
df = ds.to_dataframe().reset_index()
df = df.drop(columns=drop_vars)
ds.close()
if 'streamflow' in target_cols:
df = df.loc[df['streamflow'] >= keep_flows_at_or_above].round({'streamflow': 2}).copy() # noqa
df = df[target_cols]
try:
if "hawaii" in file:
ds['forecast_hour'] = int(int(re.findall("(\d{8})/[a-z0-9_]*/.*t(\d{2})z.*[ftm](\d*)\.", file)[0][-1])/100)
else:
ds['forecast_hour'] = int(re.findall("(\d{8})/[a-z0-9_]*/.*t(\d{2})z.*[ftm](\d*)\.", file)[0][-1])
if 'forecast_hour' not in target_cols:
target_cols.append('forecast_hour')
except:
print("Regex pattern for the forecast hour didn't match the netcdf file")

try:
ds['nwm_vers'] = float(ds.NWM_version_number.replace("v",""))
if 'nwm_vers' not in target_cols:
target_cols.append('nwm_vers')
except:
print("NWM_version_number property is not available in the netcdf file")

elif file.endswith('.csv'):
df = pd.read_csv(download_path)
for column in df: # Replace any 'None' strings with nulls
df[column].replace('None', np.nan, inplace=True)
df = df.copy()
else:
print("File format not supported.")
exit()
drop_vars = [var for var in ds_vars if var not in target_cols]
df = ds.to_dataframe().reset_index()
df = df.drop(columns=drop_vars)
ds.close()
if 'streamflow' in target_cols:
df = df.loc[df['streamflow'] >= keep_flows_at_or_above].round({'streamflow': 2}).copy() # noqa
df = df[target_cols]

print(f"--> Preparing and Importing {file}")
f = StringIO() # Use StringIO to store the temporary text file in memory (faster than on disk)
df.to_csv(f, sep='\t', index=False, header=False)
f.seek(0)
try:
with viz_db.get_db_connection() as connection:
cursor = connection.cursor()
cursor.copy_expert(f"COPY {target_table} FROM STDIN WITH DELIMITER E'\t' null as ''", f)
connection.commit()
connection.close()
except (UndefinedTable, BadCopyFileFormat, InvalidTextRepresentation):
if not create_table:
raise
elif file.endswith('.csv'):
df = pd.read_csv(download_path)
for column in df: # Replace any 'None' strings with nulls
df[column].replace('None', np.nan, inplace=True)
df = df.copy()
else:
print("File format not supported.")
exit()

print("Error encountered. Recreating table now and retrying import...")
create_table_df = df.head(0)
schema, table = target_table.split('.')
create_table_df.to_sql(con=viz_db.engine, schema=schema, name=table, index=False, if_exists='replace')
with viz_db.get_db_connection() as connection:
print(f"--> Preparing and Importing {file}")
f = StringIO() # Use StringIO to store the temporary text file in memory (faster than on disk)
df.to_csv(f, sep='\t', index=False, header=False)
f.seek(0)
cursor = connection.cursor()
cursor.copy_expert(f"COPY {target_table} FROM STDIN WITH DELIMITER E'\t' null as ''", f)
connection.commit()
connection.close()
try:
with viz_db.get_db_connection() as connection:
cursor = connection.cursor()
cursor.copy_expert(f"COPY {target_table} FROM STDIN WITH DELIMITER E'\t' null as ''", f)
connection.commit()
except (UndefinedTable, BadCopyFileFormat, InvalidTextRepresentation):
if not create_table:
raise

print("Error encountered. Recreating table now and retrying import...")
create_table_df = df.head(0)
schema, table = target_table.split('.')
create_table_df.to_sql(con=viz_db.engine, schema=schema, name=table, index=False, if_exists='replace')
with viz_db.get_db_connection() as connection:
cursor = connection.cursor()
cursor.copy_expert(f"COPY {target_table} FROM STDIN WITH DELIMITER E'\t' null as ''", f)
connection.commit()

print(f"--> Import of {len(df)} rows Complete. Removing {download_path} and closing db connection.")
os.remove(download_path)
print(f"--> Import of {len(df)} rows Complete. Removing {download_path} and closing db connection.")
os.remove(download_path)

except Exception as e:
print(f"Error: {e}")
raise e

dump_dict = {
"file": file,
"target_table": target_table,
"reference_time": reference_time,
"rows_imported": len(df),
"nwm_version": nwm_version
}
return json.dumps(dump_dict) # Return some info on the import
return json.dumps(dump_dict) # Return some info on the import

0 comments on commit ca218e7

Please sign in to comment.