Skip to content

Commit

Permalink
code cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
awalsh272 committed Apr 8, 2024
1 parent 7fbcdbd commit 13d46cc
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 103 deletions.
37 changes: 22 additions & 15 deletions puente-analytics-service/lambdas/etl/shared_modules/utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import hashlib
import uuid
import requests
from pandas import json_normalize, read_sql_query
import json
import psycopg2
import numpy as np
from functools import reduce

from shared_modules.env_utils import (
APP_ID,
Expand Down Expand Up @@ -231,17 +231,24 @@ def unique_values(items):
unique.add(item_str)

return list(unique)
# flat = items.values.ravel()
# flat_list = list(flat)
# flattened_list = [item for sublist in flat_list for item in sublist]
# try:
# flat_set = set(flattened_list)
# except TypeError:
# print(items)
# print(flat)
# print(flat_list)
# print(flattened_list)
# raise TypeError("HI")
# flat_set_list = list(flat_set)
# flat_str = str(flat_set_list)
# return flat_str


def get_unique_from_table(table, column):
existing_values = list(query_db(f"SELECT DISTINCT {column} FROM {table}")[column].unique())
return existing_values


def get_missing_ind(df, cols_to_check):
cols_to_check = [
"surveyingUser",
"communityname",
"answer"
]
missing_ind_dict = {col: df[col].notnull() for col in cols_to_check}
missing_rows_dict = {col: df[~idx] for col, idx in missing_ind_dict.items()}

conditions = list(missing_ind_dict.values())

# only not na in all check columns
combined_condition = reduce(lambda x, y: x & y, conditions)
return combined_condition
44 changes: 13 additions & 31 deletions puente-analytics-service/lambdas/etl/silver/dimensions.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
from shared_modules.utils import (
connection,
unique_combos,
coalesce_pkey,
md5_encode,
add_surveyuser_column,
parse_json_config,
title_str,
unique_values,
query_db,
replace_bad_characters,
get_unique_from_table,
)
from shared_modules.env_utils import CONFIGS, CSV_PATH

Expand Down Expand Up @@ -68,7 +67,7 @@ def get_form_dim(con, df):
)
forms = coalesce_pkey(forms, "objectId")
now = datetime.datetime.utcnow()
for i, form_row in forms.iterrows():
for _, form_row in forms.iterrows():
form = form_row.get("objectId")
name = form_row.get("name")
description = form_row.get("description")
Expand Down Expand Up @@ -154,7 +153,7 @@ def get_users_dim(con, df):
dups.to_csv(f"{CSV_PATH}/all_duplicates.csv", index=False)
missing_names = []
missing_surveyorgs = []
for i, user_row in users.iterrows():
for _, user_row in users.iterrows():
survey_user = user_row.get("survey_user")
if survey_user in dups["survey_user"].values:
continue
Expand Down Expand Up @@ -365,7 +364,7 @@ def get_patient_dim(con, df):
missing_hhid = []
missing_names = []
now = datetime.datetime.utcnow()
for i, patient_row in patients.iterrows():
for _, patient_row in patients.iterrows():
patient_id = patient_row.get("objectId")
household_id = patient_row.get("householdId")
first_name = patient_row.get("fname")
Expand Down Expand Up @@ -479,7 +478,7 @@ def get_question_dim(con, df):
inserted_uuids = []
missing_ids = []
missing_labels = []
for i, form_row in forms.iterrows():
for _, form_row in forms.iterrows():
form = form_row.get("objectId")
form_created_at = form_row.get("createdAt")
form_updated_at = form_row.get("updatedAt")
Expand Down Expand Up @@ -674,65 +673,48 @@ def ingest_nosql_table_questions(con, table_name):
def get_custom_form_questions(con, form_results):
cur = con.cursor()

#form_results = form_results[~form_results["formSpecificationsId"].isin(inactive_forms)]

ignore_questions = [
"surveyingUser",
"surveyingOrganization",
"appVersion",
"phoneOS"
]

print("1")
print(form_results[form_results['title']=='Nombre de Medicamento'])

# remove fake "questions"
options_fr = form_results[~form_results["title"].isin(ignore_questions)]
# get unique answers to each question, make them a list of options
options = options_fr.groupby(["title"])["question_answer"].agg(lambda x: unique_values(x)).reset_index().rename({"question_answer": "options"}, axis=1)
options["num_answers"] = options["options"].apply(len)

# options and form results together
options_fr = options_fr.merge(options, on="title", how="left")

print("2")
print(options_fr[options_fr['title']=='Nombre de Medicamento'])

options_fr["field_type"] = None
options_fr["is_list"] = options_fr["question_answer"].apply(lambda x: isinstance(x, list))

# define different field types based on number of answers
options_fr.loc[options_fr["num_answers"] == 1, "field_type"] = "input"
options_fr.loc[options_fr["num_answers"] > 1, "field_type"] = "select"
options_fr.loc[options_fr["is_list"], "field_type"] = "selectMulti"

options_fr["form_id"] = options_fr["formSpecificationsId"].apply(lambda x: md5_encode(x))

existing_forms = list(query_db("SELECT DISTINCT uuid FROM form_dim")["uuid"].unique())
# make sure the form exists
existing_forms = get_unique_from_table("form_dim", "uuid")
options_fr = options_fr[options_fr["form_id"].isin(existing_forms)]

print("3")
print(options_fr.shape)
#print(options_fr[options_fr['title']=='Nombre de Medicamento'])

inserted_uuids = []
#existing_qs = list(query_db("SELECT DISTINCT question FROM question_dim")["question"].unique())


#options_fr = options_fr[~options_fr["title"].isin(existing_qs)]
options_fr.to_csv("./custom_questions_options.csv")
options_fr = coalesce_pkey(options_fr, "title")

print("4")
print(options_fr[options_fr['title']=='Nombre de Medicamento'])

for i, row in options_fr.iterrows():
for _, row in options_fr.iterrows():
form = row.get("formSpecificationsId")
form_created_at = row.get("createdAt")
form_updated_at = row.get("updatedAt")
question = row.get("title")
options_list = row.get("options")
if not isinstance(options_list, list):
print(options_list, type(options_list))
# if options_list:
# options_list = options_list.replace("[", "{").replace("]", "}")
# print(options_list)

field_type = row.get("field_type")
# TODO: come up with a way of defining this
formik_key = None
Expand Down
82 changes: 25 additions & 57 deletions puente-analytics-service/lambdas/etl/silver/facts.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,14 @@
import uuid
from functools import reduce
from psycopg2.errors import ForeignKeyViolation
from sqlalchemy import create_engine

from shared_modules.utils import (
get_subquestions,
connection,
md5_encode,
parse_json_config,
query_bronze_layer,
title_str,
encode,
query_db,
replace_bad_characters,
get_unique_from_table
)
from shared_modules.env_utils import CONFIGS, CSV_PATH, get_engine_str

Expand All @@ -26,22 +22,12 @@ def get_custom_forms(conn, df):
fk_missing_rows = []
missing_qa_rows = []

# df["fields"] = df["fields"].apply(json.loads)
# exploded_df = df.explode("fields")

# exploded_df["fields"] = exploded_df["fields"].apply(lambda x: get_subquestions(x))
# exploded_df = exploded_df.explode("fields")

# exploded_df["title"] = exploded_df["fields"].apply(lambda x: x.get("title"))
# exploded_df["question_answer"] = exploded_df["fields"].apply(
# lambda x: x.get("answer")
# )

cols_to_check = [
"surveyingUser",
"communityname",
"question_answer"
]
# get rows with no missing values in key columns
missing_ind_dict = {col: df[col].notnull() for col in cols_to_check}
missing_rows_dict = {col: df[~idx] for col, idx in missing_ind_dict.items()}

Expand All @@ -68,33 +54,23 @@ def get_custom_forms(conn, df):
exploded_df[col] = exploded_df[col].apply(lambda x: title_str(x))

inserted_uuids = []
existing_qs = list(query_db("SELECT DISTINCT question FROM question_dim")["question"].unique())

existing_patients = list(query_db("SELECT DISTINCT FROM question_dim")["question"].unique())

print(exploded_df.shape)
# only existing questions
existing_qs = get_unique_from_table("question_dim", "question")
exploded_df = exploded_df[exploded_df["title"].isin(existing_qs)]
print("post qs")
print(exploded_df.shape)

existing_patients = get_unique_from_table("patient_dim", "uuid")

# only existing forms
exploded_df["form_id"] = exploded_df["formSpecificationsId"].apply(lambda x: md5_encode(x))

#print(exploded_df["form_id"].unique())

existing_forms = list(query_db("SELECT DISTINCT uuid FROM form_dim")["uuid"].unique())
#print(existing_forms)

existing_forms = get_unique_from_table("form_dim", "uuid")
exploded_df = exploded_df[exploded_df["form_id"].isin(existing_forms)]
print("final exploded shape")
print(exploded_df.shape)

print(len(exploded_df["title"].unique()))

error_dict = {}

missing_patients = []


for i, row in exploded_df.iterrows():
for _, row in exploded_df.iterrows():
object_id = row.get("client.objectId")
user = row.get("surveyingUser")
survey_org = row.get("surveyingOrganization")
Expand All @@ -106,16 +82,7 @@ def get_custom_forms(conn, df):
title = row.get("title")
question_answer = row.get("question_answer")

row_insert = (
object_id,
user,
survey_org,
form,
household,
community_name,
title,
question_answer,
)
# remove test rows
check_list = []
for field in [household, community_name, question_answer, user]:
if isinstance(field, str):
Expand All @@ -131,34 +98,30 @@ def get_custom_forms(conn, df):
household_id = md5_encode(household)

patient_id = md5_encode(object_id)

# track patients that aren't already existing but in custom forms
if patient_id not in existing_patients:
missing_patients.append(object_id)
user_id = md5_encode(user)
surveying_organization_id = md5_encode(survey_org)
form_id = md5_encode(form)
community_id = md5_encode(community_name)

# if form_id not in existing_forms:
# continue

# fake "questions" to ignore
ignore_questions = [
'surveyinguser',
'surveyingorganization',
'phoneos',
'appversion',
]

if title.lower().strip() in ignore_questions:
ignore_count += 1
#print(title)
continue

#title = replace_bad_characters(title)

question_id = md5_encode(title)

#print("title")
#print(title)
#print(question_id)

id = str(uuid.uuid4())

insert_tuple = (
Expand Down Expand Up @@ -203,6 +166,7 @@ def get_custom_forms(conn, df):
list(insert_tuple)[:5] + [title, object_id] + list(insert_tuple[5:])
)
fk_missing_rows.append(insert_tuple)
# show which table is doing the FK error
if "question_dim" in str(e):
error_table = "question_dim"
elif "patient_dim" in str(e):
Expand Down Expand Up @@ -233,6 +197,9 @@ def get_custom_forms(conn, df):
print(ignore_count)
print("error dict")
print(error_dict)
print("missing patients")
print(len(list(set(missing_patients))))
print(list(set(missing_patients)))

cols = [
"object_id",
Expand Down Expand Up @@ -319,17 +286,17 @@ def add_nosql_to_fact(con, table_name, survey_df):
_, _, formik_key, _, _ = question_tuple
questions.append(formik_key)

# questions that are column names
questions = [question for question in questions if question in list(merged.columns)]
merged.to_csv(F"{CSV_PATH}/merged_{table_name}.csv", index=False)
# melt these question columns into question and answer columns. Long data gang
comb_df = merged[id_cols + questions].melt(
id_vars=id_cols, var_name="question", value_name="answer"
)

# ignore some questions
ignore_questions = ["searchIndex", "surveyingUser"] + [col for col in questions if "location" in col]
comb_df = comb_df[~comb_df['question'].isin(ignore_questions)]

comb_df.to_csv(f"{CSV_PATH}/comb_df_{table_name}.csv")

fk_missing_rows = []
notnull_missing_rows = []
user_fk = []
Expand Down Expand Up @@ -414,7 +381,7 @@ def add_nosql_to_fact(con, table_name, survey_df):

# # Close the database connection
# engine.dispose()
for i, row in comb_df.iterrows():
for _, row in comb_df.iterrows():
created_at = row["createdAt"]
updated_at = row["updatedAt"]
question_name = row["question"]
Expand All @@ -429,6 +396,7 @@ def add_nosql_to_fact(con, table_name, survey_df):
question_name = replace_bad_characters(question_name)

check_list = []
# remove test rows
for field in [nosql_household_id, user, community_name]:
if isinstance(field, str):
check = ("test" in field.lower()) or ("forgot" in field.lower()) or ("experimental" in field.lower())
Expand Down

0 comments on commit 13d46cc

Please sign in to comment.