diff --git a/.sqlfluffignore b/.sqlfluffignore index e231d048..191ef27d 100644 --- a/.sqlfluffignore +++ b/.sqlfluffignore @@ -7,3 +7,5 @@ show_views.sql.jinja # This is a temporary ignore due to time pressure - not sure of root cause, # but the table in question builds codeable_concept_denormalize.sql.jinja + +output.sql \ No newline at end of file diff --git a/cumulus_library/.sqlfluff b/cumulus_library/.sqlfluff index 5de2664d..e764a1c4 100644 --- a/cumulus_library/.sqlfluff +++ b/cumulus_library/.sqlfluff @@ -26,6 +26,7 @@ conditions = ["1 > 0", "1 < 2"] dataset = [["foo","foo"],["bar","bar"]] ext_systems = ["omb", "text"] field = 'column_name' +field_config = {'col1':{'present':True, 'type': 'varchar'},'col2':{'present':False, 'type': 'varchar'}} fhir_extension = fhir_extension id = 'id' medication_datasources = {"by_contained_ref" : True, "by_external_ref" : True} diff --git a/cumulus_library/base_table_builder.py b/cumulus_library/base_table_builder.py index 71f096d0..6eefe7d5 100644 --- a/cumulus_library/base_table_builder.py +++ b/cumulus_library/base_table_builder.py @@ -59,6 +59,7 @@ def execute_queries( table_name = table_name.split(".")[1].replace('"', "") table_names.append(table_name) for table_name in table_names: + table_name = table_name.replace('"', "") cursor.execute(f"DROP TABLE IF EXISTS {table_name}") with get_progress_bar(disable=verbose) as progress: task = progress.add_task( diff --git a/cumulus_library/studies/core/builder_soe.py b/cumulus_library/studies/core/builder_soe.py new file mode 100644 index 00000000..03ab1a36 --- /dev/null +++ b/cumulus_library/studies/core/builder_soe.py @@ -0,0 +1,57 @@ +""" Module for generating condition codeableConcept table""" +import re +from cumulus_library.base_table_builder import BaseTableBuilder +from cumulus_library.helper import get_progress_bar, query_console_output +from cumulus_library.template_sql.templates import ( + get_column_datatype_query, + get_object_denormalize_query, +) + + +class SOEBuilder(BaseTableBuilder): + display_text = "Creating SoE support tables..." + + def prepare_queries(self, cursor: object, schema: str): + """Constructs tables for SOE QA verification + + :param cursor: A database cursor object + :param schema: the schema/db name, matching the cursor + + """ + table = "documentreference" + column = "context" + with get_progress_bar(transient=True) as progress: + task = progress.add_task( + "Detecting SOE...", + total=1, + ) + + query = get_column_datatype_query(schema, table, column) + cursor.execute(query) + progress.advance(task) + result = str(cursor.fetchone()[0]) + field_config = { + "start": { + "present": False, + "type": "varchar", + }, + "end": {"present": False, "type": "varchar"}, + } + if "period row" in result: + # The following will get all text between parenthesis following + # period row - i.e. the schema of the period object + field_schema_str = re.search(r"period row\(\s*([^\n\r]*)\),", result)[1] + for key in field_config.keys(): + if f"{key} {field_config[key]['type']}" in field_schema_str: + field_config[key]["present"] = True + + self.queries.append( + get_object_denormalize_query( + schema, + table, + "id", + f"{column}.period", + field_config, + "core__soe_doc_period", + ) + ) diff --git a/cumulus_library/studies/core/manifest.toml b/cumulus_library/studies/core/manifest.toml index 5d8bdf05..311b14d5 100644 --- a/cumulus_library/studies/core/manifest.toml +++ b/cumulus_library/studies/core/manifest.toml @@ -6,6 +6,7 @@ file_names = [ "builder_encounter_coding.py", "builder_core_medication.py", "builder_patient_extension.py", + "builder_soe.py" ] [sql_config] @@ -23,6 +24,7 @@ file_names = [ "observation_vital_signs.sql", "medication_request.sql", "study_period.sql", + "soe_sequence_of_events.sql", ] [counts_builder_config] @@ -42,4 +44,7 @@ export_list = [ "core__count_medicationrequest_month", "core__meta_date", "core__meta_version", + "core__count_soe_cond_week", + "core__count_soe_document_week", + "core__count_soe_medreq_week" ] diff --git a/cumulus_library/studies/core/soe_sequence_of_events.sql b/cumulus_library/studies/core/soe_sequence_of_events.sql new file mode 100644 index 00000000..6670c844 --- /dev/null +++ b/cumulus_library/studies/core/soe_sequence_of_events.sql @@ -0,0 +1,466 @@ +-- SOE "Sequence of Events" links FHIR resources based on encounter reference +-- and FHIR resource datetime "period". Ideally, every FHIR resource references +-- an encounter. The primary function of the SOE tables and counts is QA/Verification. +-- +-- ########################################################################### +-- Encounter +-- Encounter specifies the start/end period from which other FHIR resources can +-- be mapped. This is useful when FHIR Resource.encounter.reference is missing. +-- +-- # length of stay calculations +-- from Karen Olson +-- LOS= discharge date/time - admit date/time +-- EDLOS= checkout date/time - check-in date/time +-- check-in date/time generally = admit date/time, sometimes it's off a little +-- Care_class= Emergency is ED only +-- An encounter with care_class= Inpatient or Observation can include time in the ED +-- If patient was admitted from the ED, THEN only encounter. +-- +CREATE TABLE core__soe AS +WITH soe_rawdata AS ( + SELECT DISTINCT + date(from_iso8601_timestamp(e.period."start")) AS enc_start_date, + date(from_iso8601_timestamp(e.period."end")) AS enc_end_date, + cast( + from_iso8601_timestamp(e.period."start") AS timestamp + ) AS enc_start_datetime, + cast(from_iso8601_timestamp(e.period."end") AS timestamp) AS enc_end_datetime, + e.class AS enc_class, + subject.reference AS subject_ref, + concat('Encounter/', e.id) AS encounter_ref + FROM encounter AS e +) + +SELECT DISTINCT + enc_start_date, + enc_end_date, + enc_start_datetime, + enc_end_datetime, + enc_class, + subject_ref, + encounter_ref, + date_diff('hour', enc_start_datetime, enc_end_datetime) AS los_hours, + date_diff('day', enc_start_date, enc_end_date) AS los_days +FROM soe_rawdata +WHERE enc_start_date BETWEEN date('2016-01-01') AND current_date; + +-- ########################################################################### +-- Condition + +CREATE TABLE core__soe_cond AS +WITH soe_cond_rawdata AS ( + SELECT DISTINCT + cast( + from_iso8601_timestamp(c.recordeddate) AS timestamp + ) AS cond_recorded_datetime, + cast( + from_iso8601_timestamp(c.onsetdatetime) AS timestamp + ) AS cond_onset_datetime, + c.subject.reference AS subject_ref, + c.encounter.reference AS encounter_ref, + c.category, + concat('Condition/', c.id) AS condition_ref + FROM condition AS c +), + +cond AS ( + SELECT DISTINCT + scr.cond_recorded_datetime, + scr.cond_onset_datetime, + scr.subject_ref, + scr.encounter_ref, + scr.category, + scr.condition_ref, + t2.category_row + FROM soe_cond_rawdata AS scr, + unnest(category) AS t (category_coding), + unnest(category_coding.coding) AS t2 (category_row) + WHERE scr.cond_recorded_datetime BETWEEN date('2016-01-01') AND current_date +), + +cond_link_reference AS ( + SELECT DISTINCT + cond.cond_recorded_datetime, + cond.cond_onset_datetime, + cond.condition_ref, + cs.enc_start_date, + cs.enc_end_date, + cs.enc_start_datetime, + cs.enc_end_datetime, + cs.enc_class, + cs.subject_ref, + cs.encounter_ref, + cs.los_hours, + cs.los_days + FROM cond, core__soe AS cs + WHERE + cond.encounter_ref IS NOT NULL + AND cond.encounter_ref = cs.encounter_ref +), + +cond_link_period AS ( + SELECT DISTINCT + cond.cond_recorded_datetime, + cond.cond_onset_datetime, + cond.condition_ref, + cs.enc_start_date, + cs.enc_end_date, + cs.enc_start_datetime, + cs.enc_end_datetime, + cs.enc_class, + cs.subject_ref, + cs.encounter_ref, + cs.los_hours, + cs.los_days + FROM cond, core__soe AS cs + WHERE + cond.encounter_ref IS NULL + AND cond.subject_ref = cs.subject_ref + AND ( + ( + cond.cond_recorded_datetime + BETWEEN cs.enc_start_datetime + AND cs.enc_end_datetime + ) + OR + ( + cond.cond_onset_datetime + BETWEEN cs.enc_start_datetime + AND cs.enc_end_datetime + ) + ) +) + +SELECT DISTINCT + cond.category_row AS category, + date(cond.cond_recorded_datetime) AS cond_recorded_date, + date(cond.cond_onset_datetime) AS cond_onset_date, + cond.cond_recorded_datetime, + cond.cond_onset_datetime, + lr.encounter_ref AS encounter_ref, + lp.encounter_ref AS period_ref, + coalesce(lr.encounter_ref, lp.encounter_ref) AS encounter_link, + coalesce(lr.enc_class, lp.enc_class) AS enc_class, + -- condition and subject must match. + cond.subject_ref, + cond.condition_ref +FROM cond +LEFT JOIN cond_link_period AS lp ON cond.condition_ref = lp.condition_ref +LEFT JOIN cond_link_reference AS lr ON cond.condition_ref = lr.condition_ref; + +-- ########################################################################### +-- MedicationRequest + +CREATE TABLE core__soe_medreq AS +WITH soe_medreq_rawdata AS ( + SELECT DISTINCT + cast( + from_iso8601_timestamp(mr.authoredon) AS timestamp + ) AS medreq_authored_datetime, + mr.subject.reference AS subject_ref, + mr.encounter.reference AS encounter_ref, + mr.id AS medreq_id, + mr.status AS status, + concat('MedicationRequest/', mr.id) AS medreq_ref, + mr.medicationreference.reference AS medication_ref + FROM medicationrequest AS mr +), + +medreq AS ( + SELECT DISTINCT + medreq_authored_datetime, + subject_ref, + encounter_ref, + medreq_id, + status, + medreq_ref, + medication_ref + FROM soe_medreq_rawdata + WHERE medreq_authored_datetime BETWEEN date('2016-01-01') AND current_date +), + +soe_medreq_link_reference AS ( + SELECT DISTINCT + medreq.medreq_authored_datetime, + medreq.medreq_ref, + cs.enc_start_date, + cs.enc_end_date, + cs.enc_start_datetime, + cs.enc_end_datetime, + cs.enc_class, + cs.subject_ref, + cs.encounter_ref, + cs.los_hours, + cs.los_days + FROM medreq, core__soe AS cs + WHERE + medreq.encounter_ref IS NOT NULL + AND medreq.encounter_ref = cs.encounter_ref +), + +soe_medreq_link_period AS ( + SELECT DISTINCT + medreq.medreq_authored_datetime, + medreq.medreq_ref, + cs.enc_start_date, + cs.enc_end_date, + cs.enc_start_datetime, + cs.enc_end_datetime, + cs.enc_class, + cs.subject_ref, + cs.encounter_ref, + cs.los_hours, + cs.los_days + FROM medreq, core__soe AS cs + WHERE + medreq.encounter_ref IS NULL + AND medreq.subject_ref = cs.subject_ref + AND medreq.medreq_authored_datetime + BETWEEN cs.enc_start_datetime AND cs.enc_end_datetime +) + +SELECT DISTINCT + medreq.status, + date(medreq.medreq_authored_datetime) AS medreq_recorded_date, + medreq.medreq_authored_datetime, + lr.encounter_ref AS encounter_ref, + lp.encounter_ref AS period_ref, + coalesce(lr.encounter_ref, lp.encounter_ref) AS encounter_link, + coalesce(lr.enc_class, lp.enc_class) AS enc_class, + -- condition AND subject must match. + medreq.subject_ref, + medreq.medreq_ref +FROM medreq +LEFT JOIN soe_medreq_link_period AS lp ON medreq.medreq_ref = lp.medreq_ref +LEFT JOIN soe_medreq_link_reference AS lr ON medreq.medreq_ref = lr.medreq_ref; + +-- ########################################################################### +-- DocumentReference + +CREATE TABLE core__soe_document AS +WITH soe_document_rawdata AS ( + SELECT DISTINCT + cast( + from_iso8601_timestamp(sdp."start") AS timestamp + ) AS doc_start_datetime, + cast( + from_iso8601_timestamp(sdp."end") AS timestamp + ) AS doc_end_datetime, + doc.subject.reference AS subject_ref, + doc.context, + doc.id AS doc_id, + concat('DocumentReference/', doc.id) AS doc_ref + FROM documentreference AS doc + LEFT JOIN core__soe_doc_period AS sdp ON sdp.id = doc.id +), + +document AS ( + SELECT DISTINCT + doc_start_datetime, + doc_end_datetime, + subject_ref, + context, + doc_id, + doc_ref + FROM soe_document_rawdata + WHERE + (doc_start_datetime BETWEEN date('2016-01-01') AND current_date) + OR (doc_end_datetime BETWEEN date('2016-01-01') AND current_date) +), + +document_enc AS ( + SELECT DISTINCT + d.doc_start_datetime, + d.doc_end_datetime, + d.subject_ref, + d.context, + d.doc_id, + d.doc_ref, + t.context_encounter.reference AS encounter_ref + FROM + document AS d, + unnest(context.encounter) AS t (context_encounter) +), + +doc_link_period AS ( + SELECT DISTINCT + document.doc_start_datetime, + document.doc_end_datetime, + document.doc_ref, + cs.enc_start_date, + cs.enc_end_date, + cs.enc_start_datetime, + cs.enc_end_datetime, + cs.enc_class, + cs.subject_ref, + cs.encounter_ref, + cs.los_hours, + cs.los_days + FROM document, core__soe AS cs + WHERE + document.subject_ref = cs.subject_ref + AND ( + ( + document.doc_start_datetime + BETWEEN cs.enc_start_datetime + AND cs.enc_end_datetime + ) + OR + ( + document.doc_end_datetime + BETWEEN cs.enc_start_datetime + AND cs.enc_end_datetime + ) + ) +), + +doc_link_reference AS ( + SELECT DISTINCT + document_enc.doc_start_datetime, + document_enc.doc_end_datetime, + document_enc.doc_ref, + cs.enc_start_date, + cs.enc_end_date, + cs.enc_start_datetime, + cs.enc_end_datetime, + cs.enc_class, + cs.subject_ref, + cs.encounter_ref, + cs.los_hours, + cs.los_days + FROM document_enc, core__soe AS cs + WHERE + document_enc.encounter_ref IS NOT NULL + AND document_enc.encounter_ref = cs.encounter_ref +) + +SELECT DISTINCT + document.doc_ref, + document.doc_start_datetime, + document.doc_end_datetime, + lr.encounter_ref AS encounter_ref, + lp.encounter_ref AS period_ref, + -- link REF via Encounter.reference when provided, ELSE link Encounter.period + coalesce(lr.encounter_ref, lp.encounter_ref) AS encounter_link, + -- link CLASS via Encounter.reference when provided, ELSE link Encounter.period + coalesce(lr.enc_class, lp.enc_class) AS enc_class, + -- document must match + document.subject_ref +FROM document +LEFT JOIN doc_link_period AS lp ON document.doc_ref = lp.doc_ref +LEFT JOIN doc_link_reference AS lr ON document.doc_ref = lr.doc_ref; + +-- ########################################################################### +-- Sequence Of Events, COUNTS +-- +-- COUNT Condition by week +CREATE TABLE core__count_soe_cond_week AS +WITH discrete AS ( + SELECT + core__soe_cond.condition_ref, + enc_class.display AS enc_class_display, + enc_class.system AS enc_class_system, + date_trunc('week', core__soe_cond.cond_recorded_date) AS cond_recorded_week, + coalesce( + core__soe_cond.cond_onset_date IS NOT NULL, FALSE + ) AS cond_onset_date_exists, + coalesce( + core__soe_cond.encounter_ref IS NOT NULL, FALSE + ) AS encounter_ref_exists, + coalesce( + core__soe_cond.period_ref IS NOT NULL, FALSE + ) AS period_ref_exists + FROM core__soe_cond +) + +SELECT + count(DISTINCT condition_ref) AS cnt, + enc_class_system, + enc_class_display, + cond_recorded_week, + cond_onset_date_exists, + encounter_ref_exists, + period_ref_exists +FROM discrete +GROUP BY cube( + enc_class_system, + enc_class_display, + cond_recorded_week, + cond_onset_date_exists, + encounter_ref_exists, + period_ref_exists +); + + +-- COUNT MedReq by week + +CREATE TABLE core__count_soe_medreq_week AS +WITH discrete AS ( + SELECT + core__soe_medreq.medreq_ref, + enc_class.display AS enc_class_display, + enc_class.system AS enc_class_system, + date_trunc( + 'week', core__soe_medreq.medreq_recorded_date + ) AS medreq_recorded_week, + coalesce( + core__soe_medreq.encounter_ref IS NOT NULL, FALSE + ) AS encounter_ref_exists, + coalesce( + core__soe_medreq.period_ref IS NOT NULL, FALSE + ) AS period_ref_exists + FROM core__soe_medreq +) + +SELECT + count(DISTINCT medreq_ref) AS cnt, + enc_class_display, + enc_class_system, + medreq_recorded_week, + encounter_ref_exists, + period_ref_exists +FROM discrete +GROUP BY cube( + enc_class_display, + enc_class_system, + medreq_recorded_week, + encounter_ref_exists, + period_ref_exists +); + + +-- COUNT Document by week + +CREATE TABLE core__count_soe_document_week AS +WITH discrete AS ( + SELECT + core__soe_document.doc_ref, + enc_class.display AS enc_class_display, + enc_class.system AS enc_class_system, + date_trunc( + 'week', date(core__soe_document.doc_start_datetime) + ) AS doc_start_week, + coalesce( + core__soe_document.encounter_ref IS NOT NULL, FALSE + ) AS encounter_ref_exists, + coalesce( + core__soe_document.period_ref IS NOT NULL, FALSE + ) AS period_ref_exists + FROM core__soe_document +) + +SELECT + count(DISTINCT doc_ref) AS cnt, + enc_class_display, + enc_class_system, + doc_start_week, + encounter_ref_exists, + period_ref_exists +FROM discrete +GROUP BY cube( + enc_class_display, + enc_class_system, + doc_start_week, + encounter_ref_exists, + period_ref_exists +); diff --git a/cumulus_library/template_sql/object_denormalize.sql.jinja b/cumulus_library/template_sql/object_denormalize.sql.jinja new file mode 100644 index 00000000..ac539d60 --- /dev/null +++ b/cumulus_library/template_sql/object_denormalize.sql.jinja @@ -0,0 +1,15 @@ +CREATE TABLE "{{ schema_name }}"."{{ target_table }}" AS ( + SELECT + {{ source_id }}, + {%- for key in field_config.keys() %} + {%- if field_config[key]['present'] %} + {{ field }}."{{ key }}" AS "{{ key }}" + {%- else %} + cast(NULL AS {{ field_config[key]['type'] }}) AS "{{ key }}" + {%- endif %} + {%- if not loop.last -%} + , + {%- endif -%} + {%- endfor %} + FROM {{ source_table }} +); diff --git a/cumulus_library/template_sql/templates.py b/cumulus_library/template_sql/templates.py index f008a2f6..57975d66 100644 --- a/cumulus_library/template_sql/templates.py +++ b/cumulus_library/template_sql/templates.py @@ -278,6 +278,7 @@ def get_is_table_not_empty_query( unnests: Optional[list[dict]] = [], conditions: Optional[list[str]] = [], ): + """Checks for presence of data, allowing for unnesting or filtering""" path = Path(__file__).parent with open(f"{path}/is_table_not_empty.sql.jinja") as is_table_not_empty: return Template(is_table_not_empty.read()).render( @@ -288,6 +289,48 @@ def get_is_table_not_empty_query( ) +def get_object_denormalize_query( + schema_name: str, + source_table: str, + source_id: str, + field: str, + field_config: dict, + target_table: str, +): + """Generates a table by expanding a specified row element. + + More generally, this is meant to help deal with nested FHIR elements that + differ in implementation between different EHR platforms. As an example, + the first way this is used was to deal with the DocumentReference.context + field which contains a period element (represented by a row in SQL) + (https://www.hl7.org/fhir/datatypes.html#Period). In one EHR vendor's data, + both start and end were present, while in another, only start was present. + + This method allows us to extract the data from these two fields, if present, + and otherwise cast a null value as a missing column, such that the output + table is always guaranteed to have the desired columns for downstream + joins. + + :param schema_name: The athena query to create the table in + :param source_table: The name of the athena table to create + :param source_id: The ID field for use in downstream joins + :param field: the field to target (either column or nested element) + :param field_config: config dict: {'field':{'present':bool,'type':type}} + :param target_table: name of the table to create + + """ + path = Path(__file__).parent + with open(f"{path}/object_denormalize.sql.jinja") as column_datatype: + return Template(column_datatype.read()).render( + schema_name=schema_name, + source_table=source_table, + source_id=source_id, + field=field, + field_config=field_config, + target_table=target_table, + ) + + def get_show_tables(schema_name: str, prefix: str) -> str: """Generates a show tables query, filtered by prefix diff --git a/tests/test_cli.py b/tests/test_cli.py index e911ba0f..c3cafa0c 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -98,8 +98,8 @@ def test_count_builder_mapping( "args,cursor_calls,pandas_cursor_calls", [ (["build", "-t", "vocab", "--database", "test"], 344, 0), - (["build", "-t", "core", "--database", "test"], 47, 0), - (["export", "-t", "core", "--database", "test"], 1, 10), + (["build", "-t", "core", "--database", "test"], 56, 0), + (["export", "-t", "core", "--database", "test"], 1, 13), ( [ "build", @@ -128,7 +128,7 @@ def test_count_builder_mapping( ), ( ["build", "-t", "core", "-s", "tests/test_data/", "--database", "test"], - 47, + 56, 0, ), ( @@ -141,7 +141,7 @@ def test_count_builder_mapping( "cumulus_library/data_export", ], 1, - 10, + 13, ), ], )