-
Notifications
You must be signed in to change notification settings - Fork 0
/
generators.py
212 lines (180 loc) · 10.7 KB
/
generators.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import STU3Templates
import dateutil.parser as parser
import requests
import entities
class STU3Generator:
baseNameToFinalNameMap = {}
def generate(script):
if script.scriptType == 'IndexEventAndInclusionScript':
return STU3Generator.generateIndexEventAndInclusion(script)
def generateIndexEventAndInclusion(script):
STU3Generator.baseNameToFinalNameMap = {}
script_concepts = []
atlas_concepts = []
header = STU3Templates.cql_header.format(script.name)
for concept in script.concepts:
if type(concept.codesets) == str:
atlas_concepts.append(concept)
else:
script_concepts.append(concept)
script_concepts.append(STU3Generator.translateFromAtlas(atlas_concepts))
codesystems = STU3Generator.generateCodesystems(script_concepts[0])
concepts = STU3Generator.generateConcepts(script_concepts[0])
indexEvent = STU3Generator.generateIndexEvent(script.indexEvent)
aggregatorEntity = None
if script.returnAggregator:
aggregatorEntity = script.returnAggregator
inclusions, inclusion_names = STU3Generator.generateInclusions(script.inclusions, aggregatorEntity)
deriveds = STU3Generator.generateDervied(script.deriveds)
aggregator = STU3Generator.generateAggregator(inclusion_names, aggregatorEntity)
functions = STU3Generator.includeFunctions()
output = '\n'.join([header, codesystems, concepts, '''context Patient\n''', indexEvent, inclusions, deriveds, aggregator, functions])
return output
def generateCodesystems(concepts):
unique_codesystems = set()
for concept in concepts:
for codeset in concept.codesets:
unique_codesystems.add(codeset.system)
codesystems_output = ''
for codesystem_uri in unique_codesystems:
common_name = STU3Generator.getCommonCodesystemName(codesystem_uri)
codesystem_cql = STU3Templates.cql_codesystem.format(common_name, codesystem_uri)
codesystems_output += codesystem_cql
return codesystems_output
def translateFromAtlas(concepts):
output_concepts = []
for concept in concepts:
split_url = concept.codesets.split('#')
full_url = 'WebAPI'.join(split_url)
if full_url[-10:] != 'expression':
full_url = ''.join([full_url, '/expression'])
atlas_concepts = requests.get(url = full_url).json()
atlas_concept_set = entities.AtlasConceptSetEntity(atlas_concepts)
pulled_concepts = {}
for atlas_concept in atlas_concept_set.concepts:
try: codesystem = list(STU3Templates.codesystems_map.keys())[list(STU3Templates.codesystems_map.values()).index(atlas_concept.VOCABULARY_ID)]
except: codesystem = atlas_concept.VOCABULARY_ID
pulled_concepts.update(
{atlas_concept.CONCEPT_ID: {'codelist': [atlas_concept.CONCEPT_CODE], 'system': codesystem}})
output_concepts_list = []
for key in pulled_concepts:
output_concepts_list.append(pulled_concepts[key])
output_json = {'name': concept.name, 'codesets': output_concepts_list}
output_concept_entity = entities.ConceptEntity(output_json)
output_concepts.append(output_concept_entity)
return output_concepts
def generateConcepts(concepts):
concepts_cql_list = []
for concept in concepts:
concept_code_values = []
for codeset in concept.codesets:
codesystem_common_name = STU3Generator.getCommonCodesystemName(codeset.system)
for code in codeset.codelist:
concept_code_values.append(STU3Templates.cql_concept_code_template.format(code, codesystem_common_name))
concept_values_block = ',\n\t'.join(concept_code_values)
concept_cql = STU3Templates.cql_concept_template.format(concept.name, concept_values_block)
concepts_cql_list.append(concept_cql)
concepts_output = '\n'.join(concepts_cql_list)
return concepts_output
def getCommonCodesystemName(codesystem):
if (codesystem in STU3Templates.codesystems_map.keys()):
return STU3Templates.codesystems_map[codesystem]
else:
return codesystem
def generateIndexEvent(indexEvent):
base_inclusion = STU3Templates.cql_index_event.format(indexEvent.name, indexEvent.fhirResource, indexEvent.conceptReference)
index_event_return = STU3Templates.cql_index_event_return_with_choice_cast.format(indexEvent.name,indexEvent.returnField.lower(), indexEvent.returnType)
index_event_output = '\n'.join([base_inclusion, index_event_return])
return index_event_output
def convertToISO(date):
return parser.parse(date).isoformat()
def generateInclusions(inclusions, aggregator):
inclusions_cql_list = []
inclusion_names = []
for inclusion in inclusions:
temporal_field = STU3Templates.resource_temporal_map[inclusion.fhirResource]
inclusion_names.append(inclusion.name)
base_inclusion = STU3Templates.cql_index_event.format(inclusion.name, inclusion.fhirResource, inclusion.conceptReference)
if inclusion.timeFrame:
base_inclusion_target = ' '.join([base_inclusion, 'target'])
if inclusion.timeFrame.start[0]=='+' or inclusion.timeFrame.end[0]=='+':
if inclusion.timeFrame.start and inclusion.timeFrame.end:
temporal_suffix = STU3Templates.cql_temporal_both_suffix.format(temporal_field, 'dateTime',inclusion.timeFrame.start, temporal_field, 'dateTime', inclusion.timeFrame.end, temporal_field, inclusion.timeFrame.start, inclusion.timeFrame.end)
elif inclusion.timeFrame.start:
temporal_suffix = STU3Templates.cql_temporal_start_suffix.format(temporal_field, 'dateTime', inclusion.timeFrame.start)
elif inclusion.timeFrame.end:
temporal_suffix = STU3Templates.cql_temporal_end_suffix.format(temporal_field, 'dateTime', inclusion.timeFrame.end)
inclusion_cql = '\n\t'.join([base_inclusion_target, temporal_suffix])
inclusions_cql_list.append(inclusion_cql)
elif inclusion.timeFrame.start[0].isnumeric() or inclusion.timeFrame.end[0].isnumeric:
if inclusion.timeFrame.start and inclusion.timeFrame.end:
start = STU3Generator.convertToISO(inclusion.timeFrame.start)
end = STU3Generator.convertToISO(inclusion.timeFrame.end)
temporal_suffix = STU3Templates.cql_temporal_interval_suffix.format(temporal_field, start, end)
elif inclusion.timeFrame.start:
start = STU3Generator.convertToISO(inclusion.timeFrame.start)
temporal_suffix = STU3Templates.cql_temporal_datetime_start_suffix.format(temporal_field, start)
elif inclusion.timeFrame.end:
end = STU3Generator.convertToISO(inclusion.timeFrame.end)
temporal_suffix = STU3Templates.cql_temporal_datetime_end_suffix.format(temporal_field, end)
inclusion_cql = '\n\t'.join([base_inclusion_target, temporal_suffix])
inclusions_cql_list.append(inclusion_cql)
else:
inclusions_cql_list.append(base_inclusion)
filter_name = inclusion.name
if inclusion.filterType != '':
filter_name = ''.join([inclusion.filterType, inclusion.name])
filter_cql = STU3Templates.cql_filter.format(filter_name, inclusion.filterType, inclusion.name)
inclusions_cql_list.append(filter_cql)
STU3Generator.baseNameToFinalNameMap[inclusion.name] = filter_name
inclusions_output = '\n'.join(inclusions_cql_list)
return inclusions_output, inclusion_names
def generateDervied(deriveds):
deriveds_cql_list = []
for derived in deriveds:
if STU3Generator.baseNameToFinalNameMap:
baseFinalName = STU3Generator.baseNameToFinalNameMap[derived.baseInclusion]
else:
baseFinalName = derived.baseInclusion
# put in default typing for sourceNote here
try:
sourceNote = STU3Templates.fhir_choice_fields_typing_default_map[derived.fhirField].format(derived.sourceNote)
except KeyError:
sourceNote = derived.sourceNote
if not derived.answerValue.renderAnswerWithCQL:
answerValue = ''.join(['\'',derived.answerValue.value, '\''])
else:
try:
answerValue = STU3Templates.fhir_choice_fields_typing_default_map[derived.fhirField].format(derived.answerValue.value)
except KeyError:
answerValue = derived.answerValue.value
derived_cql = STU3Templates.cql_shaping_derived.format(derived.name, baseFinalName, derived.fhirField,
derived.questionConcept, sourceNote,
answerValue, derived.answerValue.valueType)
deriveds_cql_list.append(derived_cql)
derived_output = '\n'.join(deriveds_cql_list)
return derived_output
def generateAggregator(inclusion_names, aggregator):
if aggregator and len(inclusion_names)>=1:
aggregate_cql_list = []
for count, name in enumerate(inclusion_names):
if count==0: aggregate_cql_list.append(STU3Templates.cql_aggregator_prefix.format('returnAggregator', name))
else: aggregate_cql_list.append(STU3Templates.cql_aggregator_suffix.format(aggregator.aggregateType, name))
aggregate_output = ''.join(aggregate_cql_list)
return aggregate_output
else:
return ''
def handleChoice(choice_field_name):
choice_types = STU3Templates.fhir_choice_fields_map[choice_field_name]
choice_texts = []
for choice_type in choice_types:
choice_text = STU3Generator.generateChoiceOptionHandler(choice_field_name,choice_type)
choice_texts.append(choice_text)
return '\n'.join(choice_texts)
def generateChoiceOptionHandler(input_field_name,choice_type):
#STU3Template.${choice_type}_handler.format(input_field_name)
return ''
def includeFunctions():
return STU3Templates.msConvertEffectiveFunction