Skip to content

Commit

Permalink
update version with shared logic
Browse files Browse the repository at this point in the history
  • Loading branch information
stephanieshong committed Jan 30, 2023
1 parent e6d044e commit d9f55ca
Show file tree
Hide file tree
Showing 325 changed files with 18,766 additions and 0 deletions.
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
"""
This is a very precisely created file, do not change it. It was created to trick Foundry Templates into giving us the
path of the root folder of the deployed template. In generate-anchor.py, we use the anchor path defined in path.py to
create a dummy anchor dataset at the root of the project. Then when a new instance of the template is deployed, this
anchor path is automatically replaced with the path of the anchor dataset in the deployed template. Then to get the
root, we simply remove the name "anchor". Finally, we can use this root path in the rest of the repo. Doing this
allowed us to massively de-duplicate repeated code, in some steps reducing the number of lines of code by more than 90%.
"""

from transforms.api import transform_df, Output
from act.anchor import path


@transform_df(
Output(path.anchor)
)
def compute(ctx):
return ctx.spark_session.range(1)
23 changes: 23 additions & 0 deletions pipeline_logic/v2/act/transform-python/src/act/anchor/path.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""
This is a very precisely created file, do not change it. It was created to trick Foundry Templates into giving us the
path of the root folder of the deployed template. In generate-anchor.py, we use the anchor path defined in path.py to
create a dummy anchor dataset at the root of the project. Then when a new instance of the template is deployed, this
anchor path is automatically replaced with the path of the anchor dataset in the deployed template. Then to get the
root, we simply remove the name "anchor". Finally, we can use this root path in the rest of the repo. Doing this
allowed us to massively de-duplicate repeated code, in some steps reducing the number of lines of code by more than 90%.
"""

anchor = "/UNITE/Data Ingestion & OMOP Mapping/Source Data Model: ACT/Site 411/anchor"
root = anchor[:-len("anchor")]
transform = root + "transform/"
metadata = root + "metadata/"
union_staging = root + "union_staging/"

input_zip = "/UNITE/Data Ingestion & OMOP Mapping/raw_data/Zipped Datasets/site_411_act_raw_zips"
site_id = '/UNITE/Data Ingestion & OMOP Mapping/raw_data/data partner id tables/Data Partner IDs - Site 411'
all_ids = "/UNITE/Data Ingestion & OMOP Mapping/raw_data/data partner id tables/Data Partner IDs - ALL"
mapping = "/UNITE/Data Ingestion & OMOP Mapping/Source Data Model: ACT/Site 411/metadata/n3c_vocab_map"
vocab = "/N3C Export Area/OMOP Vocabularies/vocabulary"
concept = "/N3C Export Area/OMOP Vocabularies/concept"

mapping_overrides = "/UNITE/Data Ingestion & OMOP Mapping/Source Data Model: ACT/Reference Tables/Vocab ID Mapping/act_vocab_id_mapping_table"
209 changes: 209 additions & 0 deletions pipeline_logic/v2/act/transform-python/src/act/local_schemas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
from pyspark.sql import types as T


complete_domain_schema_dict = {
'concept_dimension': {
'CONCEPT_PATH': T.StringType(),
'CONCEPT_CD': T.StringType(),
'NAME_CHAR': T.StringType(),
'UPDATE_DATE': T.TimestampType(),
'DOWNLOAD_DATE': T.TimestampType(),
'IMPORT_DATE': T.TimestampType(),
'SOURCESYSTEM_CD': T.StringType(),
'UPLOAD_ID': T.IntegerType(),
},

"control_map": {
"CASE_PATID": T.StringType(),
"BUDDY_NUM": T.IntegerType(),
"CONTROL_PATID": T.StringType(),
"CASE_AGE": T.IntegerType(),
"CASE_SEX": T.StringType(),
"CASE_RACE": T.StringType(),
"CASE_ETHN": T.StringType(),
"CONTROL_AGE": T.IntegerType(),
"CONTROL_SEX": T.StringType(),
"CONTROL_RACE": T.StringType(),
"CONTROL_ETHN": T.StringType()
},

"note": {
"NOTE_ID": T.StringType(),
"PERSON_ID": T.LongType(),
"NOTE_DATE": T.DateType(),
"NOTE_DATETIME": T.TimestampType(),
"NOTE_TYPE_CONCEPT_ID": T.IntegerType(),
"NOTE_CLASS_CONCEPT_ID": T.IntegerType(),
"NOTE_TITLE": T.StringType(),
"NOTE_TEXT": T.StringType(),
"ENCODING_CONCEPT_ID": T.IntegerType(),
"LANGUAGE_CONCEPT_ID": T.IntegerType(),
"PROVIDER_ID": T.IntegerType(),
"VISIT_OCCURRENCE_ID": T.IntegerType(),
"VISIT_DETAIL_ID": T.IntegerType(),
"NOTE_SOURCE_VALUE": T.StringType(),
},

"note_nlp": {
"NOTE_NLP_ID": T.LongType(),
"NOTE_ID": T.LongType(),
"SECTION_CONCEPT_ID": T.IntegerType(),
"SNIPPET": T.StringType(),
"OFFSET": T.StringType(),
"LEXICAL_VARIANT": T.StringType(),
"NOTE_NLP_CONCEPT_ID": T.IntegerType(),
"NOTE_NLP_SOURCE_CONCEPT_ID": T.IntegerType(),
"NLP_SYSTEM": T.StringType(),
"NLP_DATE": T.DateType(),
"NLP_DATETIME": T.TimestampType(),
"TERM_EXISTS": T.BooleanType(),
"TERM_TEMPORAL": T.StringType(),
"TERM_MODIFIERS": T.StringType()
},

'observation_fact': {
'ENCOUNTER_NUM': T.StringType(),
'CONCEPT_CD': T.StringType(),
'PROVIDER_ID': T.StringType(),
'START_DATE': T.TimestampType(),
'PATIENT_NUM': T.StringType(),
'MODIFIER_CD': T.StringType(),
'INSTANCE_NUM': T.StringType(),
'VALTYPE_CD': T.StringType(),
'TVAL_CHAR': T.StringType(),
'NVAL_NUM': T.DecimalType(18, 5),
'VALUEFLAG_CD': T.StringType(),
'QUANTITY_NUM': T.DecimalType(18, 5),
'UNITS_CD': T.StringType(),
'END_DATE': T.TimestampType(),
'LOCATION_CD': T.StringType(),
'OBSERVATION_BLOB': T.StringType(),
'CONFIDENCE_NUM': T.DecimalType(18, 5),
'UPDATE_DATE': T.TimestampType(),
'DOWNLOAD_DATE': T.TimestampType(),
'IMPORT_DATE': T.TimestampType(),
'SOURCESYSTEM_CD': T.StringType(),
'UPLOAD_ID': T.IntegerType(),
},

'patient_dimension': {
'PATIENT_NUM': T.StringType(),
'VITAL_STATUS_CD': T.StringType(),
'BIRTH_DATE': T.TimestampType(),
'DEATH_DATE': T.TimestampType(),
'SEX_CD': T.StringType(),
'AGE_IN_YEARS_NUM': T.IntegerType(),
'LANGUAGE_CD': T.StringType(),
'RACE_CD': T.StringType(),
'MARITAL_STATUS_CD': T.StringType(),
'RELIGION_CD': T.StringType(),
'ZIP_CD': T.StringType(),
'STATECITYZIP_PATH': T.StringType(),
'PATIENT_BLOB': T.StringType(),
'UPDATE_DATE': T.TimestampType(),
'DOWNLOAD_DATE': T.TimestampType(),
'IMPORT_DATE': T.TimestampType(),
'SOURCESYSTEM_CD': T.StringType(),
'UPLOAD_ID': T.IntegerType(),
'INCOME_CD': T.StringType(),
"ETHNICITY_CD": T.StringType()
},

'visit_dimension': {
'ENCOUNTER_NUM': T.StringType(),
'PATIENT_NUM': T.StringType(),
'ACTIVE_STATUS_CD': T.StringType(),
'START_DATE': T.TimestampType(),
'END_DATE': T.TimestampType(),
'INOUT_CD': T.StringType(),
'LOCATION_CD': T.StringType(),
'LOCATION_PATH': T.StringType(),
'LENGTH_OF_STAY': T.StringType(),
'VISIT_BLOB': T.StringType(),
'UPDATE_DATE': T.TimestampType(),
'DOWNLOAD_DATE': T.TimestampType(),
'IMPORT_DATE': T.TimestampType(),
'SOURCESYSTEM_CD': T.StringType(),
'UPLOAD_ID': T.IntegerType(),
},
}

required_domain_schema_dict = {
'concept_dimension': {
'CONCEPT_PATH': T.StringType(),
'CONCEPT_CD': T.StringType(),
'NAME_CHAR': T.StringType(),
},

'control_map': {
"CASE_PATID": T.StringType(),
"BUDDY_NUM": T.IntegerType(),
"CONTROL_PATID": T.StringType()
},

'note': {},

'note_nlp': {},

'observation_fact': {
'ENCOUNTER_NUM': T.StringType(),
'CONCEPT_CD': T.StringType(),
'PROVIDER_ID': T.StringType(),
'START_DATE': T.TimestampType(),
'PATIENT_NUM': T.StringType(),
'MODIFIER_CD': T.StringType(),
'INSTANCE_NUM': T.StringType(),
},

'patient_dimension': {
'PATIENT_NUM': T.StringType(),
},

'visit_dimension': {
'ENCOUNTER_NUM': T.StringType(),
'PATIENT_NUM': T.StringType(),
},
}

act_local_code_map_schema = {
"ACT_STANDARD_CODE": T.StringType(),
"LOCAL_CONCEPT_CD": T.StringType(),
"NAME_CHAR": T.StringType(),
"PARENT_CONCEPT_PATH": T.StringType(),
"CONCEPT_PATH": T.StringType(),
"PATH_ELEMENT": T.StringType()
}

data_counts_schema = {
"TABLE_NAME": T.StringType(),
"ROW_COUNT": T.StringType()
}

manifest_schema = {
"SITE_ABBREV": T.StringType(),
"SITE_NAME": T.StringType(),
"CONTACT_NAME": T.StringType(),
"CONTACT_EMAIL": T.StringType(),
"CDM_NAME": T.StringType(),
"CDM_VERSION": T.StringType(),
"VOCABULARY_VERSION": T.StringType(),
"N3C_PHENOTYPE_YN": T.StringType(),
"N3C_PHENOTYPE_VERSION": T.StringType(),
"SHIFT_DATE_YN": T.StringType(),
"MAX_NUM_SHIFT_DAYS": T.StringType(),
"RUN_DATE": T.StringType(),
"UPDATE_DATE": T.StringType(),
"NEXT_SUBMISSION_DATE": T.StringType(),
}

n3c_vocab_map_schema = {
"LOCAL_PREFIX": T.StringType(),
"OMOP_VOCAB": T.StringType()
}

metadata_schemas = {
"act_standard2local_code_map": act_local_code_map_schema,
"data_counts": data_counts_schema,
"manifest": manifest_schema,
"n3c_vocab_map": n3c_vocab_map_schema
}
66 changes: 66 additions & 0 deletions pipeline_logic/v2/act/transform-python/src/act/parsing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import csv
import tempfile
import shutil
from transforms.api import TransformInput, TransformOutput
from pyspark.sql import Row
from act.act_schemas import complete_domain_schema_dict, schema_dict_to_struct
from act.site_specific_utils import get_site_dialect_params


def parse_input(ctx, my_input: TransformInput, error_df: TransformOutput, site_id: int, domain: str, regex: str):

def process_file(file_status):
# Copy contents of file from Foundry into temp file
with tempfile.NamedTemporaryFile() as t:
with my_input.filesystem().open(file_status.path, 'rb') as f_bytes:
shutil.copyfileobj(f_bytes, t)
t.flush()

# Read the csv, line by line, and use csv.Sniffer to infer the delimiter
# Write any improperly formatted rows to the errors DataFrame
with open(t.name, newline="", encoding="utf8", errors='ignore') as f:
with error_df.filesystem().open('error_rows', 'w', newline='') as writeback:
dialect = csv.Sniffer().sniff(f.read(1024))
f.seek(0)
dialect_params = get_site_dialect_params(site_id, domain)
r = csv.reader(f, delimiter=dialect.delimiter, **dialect_params)
w = csv.writer(writeback)

# Construct a pyspark.Row from our header row
header = next(r)
MyRow = Row(*header)
expected_num_fields = len(header)

error_encountered = False
for i, row in enumerate(r):
if len(row) == expected_num_fields:
# Properly formatted row
yield MyRow(*row)
elif not row:
continue # ignore empty rows/extra newlines
else:
# Improperly formatted row
if not error_encountered:
# Create header for output csv
w.writerow(["row_number", "row_contents"])
error_encountered = True
# Write to a csv file in the errors dataset, recording the row number and malformed row
malformed_row = "|".join(row)
w.writerow([str(i), malformed_row])

files_df = my_input.filesystem().files(regex=regex)
processed_rdd = files_df.rdd.flatMap(process_file)

if processed_rdd.isEmpty():
# Get OrderedDict that specifies this domain's schema
schema_dict = complete_domain_schema_dict[domain]
# Create StructType for the schema with all types as strings
struct_schema = schema_dict_to_struct(schema_dict, all_string_type=True)
# Create empty dataset with proper columns, all string types
processed_df = processed_rdd.toDF(struct_schema)
else:
# csv file for the domain is empty
# Create dataset with whatever columns the site gave us, all string types
processed_df = processed_rdd.toDF()

return processed_df
7 changes: 7 additions & 0 deletions pipeline_logic/v2/act/transform-python/src/act/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from transforms.api import Pipeline

from act import datasets, anchor


my_pipeline = Pipeline()
my_pipeline.discover_transforms(datasets, anchor)
47 changes: 47 additions & 0 deletions pipeline_logic/v2/act/transform-python/src/act/pkey_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql.window import Window


def new_duplicate_rows_with_collision_bits(omop_domain, lookup_df, ctx, pk_col, full_hash_col):

# Extract all duplicate rows from domain table
# Keep two columns: 51 bit hash (which caused collision) and full hash (to differentiate collisions)
w = Window.partitionBy(pk_col)
duplicates_df = omop_domain.dataframe().select('*', F.count(pk_col).over(w).alias('dupeCount'))\
.where('dupeCount > 1')\
.drop('dupeCount')
duplicates_df = duplicates_df.select(pk_col, full_hash_col)

if ctx.is_incremental:
# Count how many rows in the lookup table exist for the collided hash value
cache = lookup_df.dataframe('previous', schema=T.StructType([
T.StructField(pk_col, T.LongType(), True),
T.StructField(full_hash_col, T.StringType(), True),
T.StructField("collision_bits", T.IntegerType(), True)
]))
cache_count = cache.groupby(pk_col).count()

# Keep only the rows in duplicates_df that are not currently in lookup table
cond = [pk_col, full_hash_col]
duplicates_df = duplicates_df.join(cache, cond, 'left_anti')

# Create counter for rows in duplicates_df
# Subtract 1 because the default collision resolution bit value is 0
w2 = Window.partitionBy(pk_col).orderBy(pk_col)
duplicates_df = duplicates_df.withColumn('row_num', F.row_number().over(w2))
duplicates_df = duplicates_df.withColumn('row_num', (F.col('row_num') - 1))

# If there are already entries in the lookup table for the given primary key,
# then add the number of existing entries to the row number counter
if ctx.is_incremental:
duplicates_df = duplicates_df.join(cache_count, pk_col, 'left')
duplicates_df = duplicates_df.fillna(0, subset=['count'])
duplicates_df = duplicates_df.withColumn('row_num', (F.col('row_num') + F.col('count').cast(T.IntegerType())))

duplicates_df = duplicates_df.withColumnRenamed('row_num', 'collision_bits')

# Remove 'count' column for incremental transforms:
duplicates_df = duplicates_df.select(pk_col, full_hash_col, 'collision_bits')

return duplicates_df
Loading

0 comments on commit d9f55ca

Please sign in to comment.