Skip to content

Commit

Permalink
Fix for Issue #38 and #39 (#40)
Browse files Browse the repository at this point in the history
* Added Logging

* Logging

* Fixed Logging

* Logger and Exceptions

* Uncommenting

* Fixed Logger

* Debugging

* Replaced Strings with Paths

* Logging

* Removed Space

* Only make relative location lower case

* Improved Logging Messages

* Updated Logging

* More Logging During Zip File Download

* backward compatible prefix and suffix removal

* Fixed Import

* refactor type for py3.8

Co-authored-by: saifulkhan <[email protected]>
  • Loading branch information
ErikRZH and saifulkhan authored Jan 24, 2022
1 parent a603870 commit 6a6f2d7
Show file tree
Hide file tree
Showing 15 changed files with 105 additions and 67 deletions.
72 changes: 40 additions & 32 deletions data-api/app/controllers/agents/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,41 +14,49 @@


def uncertainty_agents():
print("Running Uncertainty Analysis Agents")
# Start clustering
t_u_cluster = threading.Thread(target=uncertainty_clustering_agent)
t_u_cluster.start()

# Start non-clustered computations
threading.Thread(target=uncertainty_mean_sample_agent).start()

t_u_cluster.join()
print("Uncertainty Clustering Complete")
# Start computations requiring clustered data
print("Running Uncertainty Cluster Analysis")
threading.Thread(target=uncertainty_cluster_mean_sample_agent).start()
try:
logger.info("Running Uncertainty Analysis Agents")
# Start clustering
t_u_cluster = threading.Thread(target=uncertainty_clustering_agent)
t_u_cluster.start()

# Start non-clustered computations
threading.Thread(target=uncertainty_mean_sample_agent).start()

t_u_cluster.join()
logger.info("Uncertainty Clustering Complete")
# Start computations requiring clustered data
logger.info("Running Uncertainty Cluster Analysis")
threading.Thread(target=uncertainty_cluster_mean_sample_agent).start()
except Exception as e:
logger.info("Uncertainty Agents failed.")
logger.exception(e)


def sensitivity_agents():
print("Converting ents files to sandu SensitivityInput objects.")
#Converts data from Ensemble Time-series (ents) format into sandu SensitivtyInput objects.
t_convert = threading.Thread(target=ents_to_sandu_agent)
t_convert.start()

# Start non clustered threads
threading.Thread(target=convert_data).start()

t_convert.join()
print(" SensitivityInput objects created from ents files.")
threading.Thread(target=summary_curves_agent).start()

print("Running Sensitivity Analysis Agents")
t_s_cluster = threading.Thread(target=sensitivity_clustering_agent)
t_s_cluster.start()

t_s_cluster.join()
print("Sensitivity Clustering Complete")
threading.Thread(target=sensitivity_clustering_range_mean_agent).start()
try:
logger.info("Converting ents files to sandu SensitivityInput objects.")
#Converts data from Ensemble Time-series (ents) format into sandu SensitivtyInput objects.
t_convert = threading.Thread(target=ents_to_sandu_agent)
t_convert.start()

# Start non clustered threads
threading.Thread(target=convert_data).start()

t_convert.join()
logger.info("SensitivityInput objects created from ents files.")
threading.Thread(target=summary_curves_agent).start()

logger.info("Running Sensitivity Analysis Agents")
t_s_cluster = threading.Thread(target=sensitivity_clustering_agent)
t_s_cluster.start()

t_s_cluster.join()
logger.info("Sensitivity Clustering Complete")
threading.Thread(target=sensitivity_clustering_range_mean_agent).start()
except Exception as e:
logger.info("Sensitivity Agents failed.")
logger.exception(e)

def run_agents():
download_data()
Expand Down
14 changes: 7 additions & 7 deletions data-api/app/controllers/agents/ents_to_sandu_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def output_file_name(ents_in: str, index: str) -> str:
Returns:
output_file_name: The name and path to file output_i.csv.
"""
output_file_name = ents_in + "/output_" + str(index) + ".csv"
output_file_name = ents_in / Path("output_" + str(index) + ".csv")
return output_file_name


Expand All @@ -46,8 +46,8 @@ def make_dataframe(ents_in: str, quantities_of_interest: List[dict]) -> pd.DataF
Returns:
df: dataframe containing parameters-quantity_of_interests_mean/variance
"""
df = pd.read_csv(ents_in + "/parameters.csv")
df_temp = pd.read_csv(ents_in + "/output_metadata.csv")
df = pd.read_csv(ents_in / "parameters.csv")
df_temp = pd.read_csv(ents_in / "output_metadata.csv")
desc = df_temp["description"]
time_name = df_temp.loc[df_temp["description"] == "time_unit"]["output"].at[0]
for quantity in quantities_of_interest:
Expand Down Expand Up @@ -75,7 +75,7 @@ def get_parameters_and_bounds(ents_in: str) -> Tuple[list, list]:
bounds: List containing the lower and upper bounds of the parameters as [lower, upper],
in the same order as the parameters list.
"""
df = pd.read_csv(ents_in + "/parameters_metadata.csv")
df = pd.read_csv(ents_in / "parameters_metadata.csv")
parameters = df["parameter"].to_list()
bounds = get_bounds(ents_in, parameters)
return parameters, bounds
Expand All @@ -92,7 +92,7 @@ def get_bounds(ents_in: str, parameters):
bounds: List containing the lower and upper bounds of the parameters as [lower, upper],
in the same order as the parameters list.
"""
df = pd.read_csv(ents_in + "/parameters.csv")
df = pd.read_csv(ents_in / "parameters.csv")
bounds = []
for parameter in parameters:
upper_bound = df[parameter].max().item()
Expand All @@ -107,8 +107,8 @@ def ents_to_sandu_agent():
for model in model_list:
# filepath to folder containing an Ensemble Time Series (ents) format dataset.
folder = Path(DATA_PATH_LIVE)
relative_location = "ents_format_datasets/" + model["name"]
location = str(folder / relative_location)
relative_location = Path("ents_format_datasets") / Path(model["name"])
location = folder / relative_location

# List containing dictionaries with information needed to make SensitivityInput objects.
quantities_of_interest = model["quantities_of_interest"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ def parameter_variation(df_in: pd.DataFrame, parameters_in: list, bounds_in: lis
parameters_var = [parameters_in[i] for i in range(len(parameters_in)) if len(bounds_in[i]) > 1]
bounds_var = [bounds_in[i] for i in range(len(parameters_in)) if len(bounds_in[i]) > 1]

print("")
df_s = gpe.get_scalar_features(df_in, quantity_mean_in, quantity_variance_in, scalar_mean_function,
scalar_variance_function)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import json
from app.core.settings import DATA_PATH_LIVE
from typing import List

from loguru import logger

def get_sensitivity_models() -> List[dict]:
"""Parses the inventory file, containing information on what models and clusterings are present.
Expand All @@ -21,7 +21,7 @@ def get_sensitivity_models() -> List[dict]:
filename_model_list = Path(DATA_PATH_LIVE) / "models/sensitivity/sensitivity_inventory.json"

if not filename_model_list.is_file():
print("CANNOT FIND ", filename_model_list)
logger.info("CANNOT FIND " + str(filename_model_list))
return
with open(filename_model_list) as f:
model_list = json.load(f)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
import app.controllers.agents.uncertainty_analysis.clustering_tools as ct
import app.controllers.agents.sensitivity_analysis.sensitivity_model_inventory as inventory
from app.core.settings import DATA_PATH_LIVE

from loguru import logger
from app.utils.common import own_removesuffix

def form_output_clusters(clusters: List[dict]):
"""Find clusters in raw input data, by looking at the output/time series, to enable for cluster-wise analysis.
Expand All @@ -25,7 +26,7 @@ def form_output_clusters(clusters: List[dict]):

# Check if file exists
if not filename.is_file():
print("CANNOT FIND ", filename)
logger.info("CANNOT FIND " + str(filename))
return

with open(filename, "r") as read_file:
Expand Down Expand Up @@ -57,7 +58,7 @@ def form_input_clusters(clusters: List[dict]):

# Check if file exists
if not filename.is_file():
print("CANNOT FIND ", filename)
logger.info("CANNOT FIND " + str(filename))
return

with open(filename, "r") as read_file:
Expand All @@ -74,7 +75,7 @@ def form_input_clusters(clusters: List[dict]):
df["cluster"] = clusters

# Generate filename
output_filename_param = str(output_filename).removesuffix('.json')
output_filename_param = own_removesuffix(str(output_filename), '.json')
output_filename_param = output_filename_param + "_" + parameter + ".json"

# cluster_raw_data:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def get_cluster_list(model_list: List[dict]) -> List[dict]:
for i in model["k"]:
for metric in model["metric"]:
for quantity in model["quantities_of_interest"]:
with open(str(folder) + "/models/sensitivity/" + model["name"] + "/" + quantity["name"] + "_raw.json", "r") as read_file:
with open(str(folder / Path("models/sensitivity/" + model["name"] + "/" + quantity["name"] + "_raw.json")), "r") as read_file:
x = json.load(read_file, object_hook=lambda d: SensitivityInput(**d))

parameters = [x.parameters[i] for i in range(len(x.parameters)) if len(x.bounds[i]) > 1]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from typing import Callable
import random
import numpy as np

from loguru import logger
from app.core.settings import DATA_PATH_LIVE
from pathlib import Path

Expand Down Expand Up @@ -55,16 +55,16 @@ def calculate_summary_curves_on_models(model_list):
folder = Path(DATA_PATH_LIVE)
for model in model_list:
relative_location = "models/sensitivity/" + model["name"]
location = str(folder / relative_location)
location = folder / relative_location
for quantity in model["quantities_of_interest"]:
input_filename = location + "/" + quantity["name"] + "_raw.json"
input_filename = location / Path(quantity["name"] + "_raw.json")
for scalar_feature in model["scalar_features"]:
scalar_function = {"sum": sum, "max": np.max}
output_filename = location + "/" + quantity["name"] +"_" + scalar_feature + "_summary_curves.json"
output_filename = location / Path(quantity["name"] +"_" + scalar_feature + "_summary_curves.json")
calculate_summary_curves(input_filename,output_filename,scalar_mean_function = scalar_function[scalar_feature], scalar_variance_function= scalar_function[scalar_feature])


def summary_curves_agent():
model_list = inventory.get_sensitivity_models()
calculate_summary_curves_on_models(model_list)
print("Summary Curves Calculated")
logger.info("Summary Curves Calculated")
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import json
from app.core.settings import DATA_PATH_LIVE
from typing import List

from loguru import logger

def get_uncertainty_models() -> List[dict]:
"""Parses the inventory file, containing information on what models and clusterings are present.
Expand All @@ -16,7 +16,7 @@ def get_uncertainty_models() -> List[dict]:
filename_model_list = Path(DATA_PATH_LIVE) / "models/uncertainty/uncertainty_inventory.json"

if not filename_model_list.is_file():
print("CANNOT FIND ", filename_model_list)
logger.info("CANNOT FIND " + str(filename_model_list))
return
with open(filename_model_list) as f:
model_list = json.load(f)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def clusters_mean_sample(raw_clusters: List[dict]):

# Check if file exists
if not filename.is_file():
print("CANNOT FIND ", filename)
logger.info("CANNOT FIND " + str(filename))
return

with open(filename, "r") as read_file:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def uncertainty_form_clusters(clusters: List[dict]):

# Check if file exists
if not filename.is_file():
print("CANNOT FIND ", filename)
logger.info("CANNOT FIND " + str(filename))
return
with open(filename, "r") as read_file:
x = json.load(read_file, object_hook=lambda d: UncertaintyInput(**d))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def compute_mean_sample(input_filename, output_filename):

# Check if file exists
if not input_filename.is_file():
print("CANNOT FIND ", input_filename)
logger.info("CANNOT FIND " + str(input_filename))
return

with open(input_filename, "r") as read_file:
Expand Down
2 changes: 1 addition & 1 deletion data-api/app/core/propagate_data_query_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ class PropagateDataQueryModel(BaseModel):
mustKeys: list
shouldKeys: list
filterKeys: list
mustNotKeys: Optional[list[str]]
mustNotKeys: Optional[list]
minimumShouldMatch: int
alpha: float
beta: float
Expand Down
17 changes: 14 additions & 3 deletions data-api/app/services/download_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from data_pipeline_api.registry.downloader import Downloader
from ..utils.naming import format_component_name
from app.utils.common import own_removeprefix

def to_df(f, key):
print('converting', key)
Expand Down Expand Up @@ -151,35 +152,45 @@ def download_urls(urls, folder):
for url in urls:
try:
# Convert to lower case to make it consistent with previous convention
save_to = str(folder/url['save_to']).lower()
save_to = folder/(url['save_to'].lower())
parentfolder = Path(save_to).parents[0]
parentfolder.mkdir(parents=True, exist_ok=True) # Create directory for file if not already present

if url['name'] == 'phe':
df = pd.read_csv(url['url'], encoding='iso-8859-1')

logger.info("Saving phe data to: " + str(save_to))
# Split the file based on area code
area_types = parse_qs(urlparse(url['url']).query).get('areaType')
if area_types and len(area_types) and area_types[0] != 'overview':
df.groupby('areaCode').apply(lambda x: save_code(x, save_to))
else:
df.to_csv(save_to, index=None)
elif url['url'].lower().endswith('.csv'):
logger.info("Saving CSV file to: " + str(save_to))
df = pd.read_csv(url['url'])
df.to_csv(save_to, index=None)
elif url['url'].lower().endswith('.json'):
logger.info("Saving JSON file saved: " + str(save_to))
r = requests.get(url['url'])
with open(save_to, "w", encoding="utf-8") as f:
json.dump(r.json(), f, ensure_ascii=False, indent=4)
elif url['url'].lower().endswith('.zip'): #download and unzip zip files
logger.info("Downloading Zip file from: " + url['url'])
logger.info("Saving Zip file to: " + str(save_to))
http_response = urlopen(url['url'])
logger.info("Getting http_response")
zipfile = ZipFile(BytesIO(http_response.read()))
logger.info("Creating zip file")
zipinfos = zipfile.infolist()
logger.info("Zip file created")
# iterate through each file and remove the top directory
for zipinfo in zipinfos:
zipinfo.filename = zipinfo.filename.removeprefix(zipinfo.filename.split('/')[0]) #Removes the top level folder when extracting
zipinfo.filename = own_removeprefix(zipinfo.filename, (zipinfo.filename.split('/')[0])) #Removes the top level folder when extracting
zipfile.extract(zipinfo, path=save_to)
logger.info("Zip file extracted")
logger.info("File saved")
except Exception as e:
logger.error("Failed to download a file from URL", e)
logger.exception(e)

print('Download from URLs has finished')
12 changes: 6 additions & 6 deletions data-api/app/services/elasticsearch_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ def ping(self):

def build_query(
self,
must_keys: list[str],
should_keys: list[str],
filter_keys: list[str],
must_not_keys: list[str] = None,
must_keys: list,
should_keys: list,
filter_keys: list,
must_not_keys: list = None,
minimum_should_match: int = 1,
) -> dict:

Expand All @@ -54,8 +54,8 @@ def build_query(
must_not_keys = [d.lower() for d in must_not_keys]

if len(should_keys) == 0:
minimum_should_match = 0
minimum_should_match = 0

if not must_not_keys:
must_not_keys = [d.lower() for d in must_not_keys]

Expand Down
19 changes: 19 additions & 0 deletions data-api/app/utils/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
def own_removesuffix(input_string: str, suffix: str, /) -> str:
"""Method to replace the .removesuffix method introduced in Python 3.9, thus working in older versions.
Adapted from: https://www.python.org/dev/peps/pep-0616/
"""
# suffix='' should not call self[:-0].
if suffix and input_string.endswith(suffix):
return input_string[:-len(suffix)]
else:
return input_string[:]


def own_removeprefix(input_string: str, prefix: str, /) -> str:
"""Method to replace the .removeprefix method introduced in Python 3.9, thus working in older versions.
Adapted from: https://www.python.org/dev/peps/pep-0616/
"""
if input_string.startswith(prefix):
return input_string[len(prefix):]
else:
return input_string[:]

0 comments on commit 6a6f2d7

Please sign in to comment.