Skip to content

Commit

Permalink
Merge pull request #177 from SANDAG/mwe-abm_15_1_0-database
Browse files Browse the repository at this point in the history
Update datalake_exporter.py database
  • Loading branch information
mwe-sandag authored Jul 25, 2024
2 parents ec063af + c7c5ce1 commit 83fc78e
Showing 1 changed file with 7 additions and 6 deletions.
13 changes: 7 additions & 6 deletions src/main/python/datalake_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def create_scenario_df(ts, EMME_metadata, parent_dir_name, output_path):
"scenario_id": [EMME_metadata["scenario_id"]],
"scenario_guid": [EMME_metadata["scenario_guid"]],
"main_directory" : [EMME_metadata["main_directory"]],
"datalake_path" : ["/".join(["bronze/abm3dev/abm_15_0_0",parent_dir_name])],
"datalake_path" : ["/".join(["bronze/abm3dev",database,parent_dir_name])],
"select_link" : [EMME_metadata["select_link"]],
"sample_rate" : [EMME_metadata["sample_rate"]]
}
Expand Down Expand Up @@ -139,9 +139,9 @@ def create_model_metadata_df(model, model_metadata):
def export_table(table, name, model, parent_dir_name, container):
model_output_file = name+".parquet"
if model == '':
lake_file_name = "/".join(["abm_15_0_0",parent_dir_name,model_output_file])
lake_file_name = "/".join([database,parent_dir_name,model_output_file])
else:
lake_file_name = "/".join(["abm_15_0_0",parent_dir_name,model,model_output_file])
lake_file_name = "/".join([database,parent_dir_name,model,model_output_file])

parquet_file = BytesIO()
table.to_parquet(parquet_file, engine="pyarrow")
Expand Down Expand Up @@ -219,9 +219,9 @@ def write_to_datalake(output_path, models, exclude, env):
else:
with open(file, "rb") as data:
if model == '':
lake_file_name = "/".join(["abm_15_0_0",parent_dir_name,name+ext])
lake_file_name = "/".join([database,parent_dir_name,name+ext])
else:
lake_file_name = "/".join(["abm_15_0_0",parent_dir_name,model,name+ext])
lake_file_name = "/".join([database,parent_dir_name,model,name+ext])
container.upload_blob(name=lake_file_name, data=data)

report_path = os.path.join(os.path.split(output_path)[0], 'report')
Expand All @@ -239,7 +239,7 @@ def write_to_datalake(output_path, models, exclude, env):
for file in other_files:
try:
with open(file, "rb") as data:
lake_file_name = "/".join(["abm_15_0_0",parent_dir_name,os.path.basename(file)])
lake_file_name = "/".join([database,parent_dir_name,os.path.basename(file)])
container.upload_blob(name=lake_file_name, data=data)
except (FileNotFoundError, KeyError):
print(("%s not found" % file), file=sys.stderr)
Expand All @@ -261,4 +261,5 @@ def write_to_datalake(output_path, models, exclude, env):
exclude = [
'final_pipeline.h5'
]
database = 'abm_15_1_0'
write_to_datalake(output_path, models, exclude, env)

0 comments on commit 83fc78e

Please sign in to comment.