Skip to content

Commit

Permalink
Merge pull request #16 from FHIR-Aggregator/data/update_version
Browse files Browse the repository at this point in the history
new data version - updates, testings, and enhancements in mappings
  • Loading branch information
teslajoy authored Oct 30, 2024
2 parents e8c31e0 + 6a783e2 commit dc6ae92
Show file tree
Hide file tree
Showing 13 changed files with 992 additions and 90 deletions.
3 changes: 3 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
data/
*.log
*.json
19 changes: 19 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
FROM python:3.12

WORKDIR /app

COPY . /app/CDA2FHIR

RUN apt-get update && apt-get install -y \
build-essential \
libpq-dev \
&& rm -rf /var/lib/apt/lists/*

RUN pip install --upgrade pip && \
pip install --no-cache-dir charset_normalizer idna certifi requests pydantic pytest click \
pathlib orjson tqdm uuid openpyxl pandas inflection iteration_utilities fhir.resources==7.1.0 \
sqlalchemy==2.0.31 gen3-tracker>=0.0.7rc1

RUN pip install -e /app/CDA2FHIR

ENTRYPOINT ["/bin/bash"]
46 changes: 38 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ Options:
-v, --verbose
-ns, --n_samples TEXT Number of samples to randomly select - max 100.
-nd, --n_diagnosis TEXT Number of diagnosis to randomly select - max 100.
-nf, --n_files TEXT Number of files to randomly select - max 100.
-f, --transform_files Transform CDA files to FHIR DocumentReference and Group.
-p, --path TEXT Path to save the FHIR NDJSON files. default is
CDA2FHIR/data/META.
--help Show this message and exit.
Expand All @@ -41,16 +43,44 @@ Options:
cda2fhir transform
```

### Testing
Current integration testing runs on all data and may take approximately _**2 hours**_.
NOTE: in-case of interest in validating your FHIR data with GEN3, you will need to go through the [user-guide, setup, and documentation of GEN3 tracker](https://aced-idp.github.io/requirements/) before running the ```cda2fhir``` commands.

### FHIR data validation

#### disable gen3-client
```
pytest -cov
mv ~/.gen3/gen3_client_config.ini ~/.gen3/gen3_client_config.ini-xxx
mv ~/.gen3/gen3-client ~/.gen3/gen3-client-xxx
```
### FHIR data validation
For FHIR data validation please run:

#### Run validate
```
time cda2fhir validate
{'summary': {'Specimen': 721837, 'Observation': 731005, 'ResearchStudy': 423, 'BodyStructure': 163, 'Condition': 95262, 'ResearchSubject': 160649, 'Patient': 138738}}
real 5m
user 5m
sys 0m5.1s
```
g3t meta validate <path to data/META folder with ndjson files>
>>>> resources={'summary': {'Specimen': 715864, 'Observation': 724999, 'ResearchStudy': 423, 'BodyStructure': 180, 'Condition': 95288, 'ResearchSubject': 160662, 'Patient': 137522}}

#### Restore gen3-client

```
NOTE: This process may take more than _**5 minutes**_ due to the size of the current data.
mv ~/.gen3/gen3-client-xxx ~/.gen3/gen3-client
mv ~/.gen3/gen3_client_config.ini-xxx ~/.gen3/gen3_client_config.ini
```


This command will validate your FHIR entities and their reference relations to each other. It will also generate a summary count of all entities in each ndjson file.

NOTE: This process may take _**5 minutes**_ or more, depending on your platform or compute power due to the size of the current data.


### Testing
Current integration testing runs on all data and may take approximately _**2 hours**_.

```
pytest -cov
```
164 changes: 150 additions & 14 deletions cda2fhir/cda2fhir.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
from cda2fhir import utils
import json
import orjson
from pathlib import Path
import importlib.resources
from fhir.resources.identifier import Identifier
from fhir.resources.reference import Reference
from fhir.resources.documentreference import DocumentReference
from fhir.resources.group import Group
from cda2fhir.load_data import load_data
from cda2fhir.database import SessionLocal
from cda2fhir.cdamodels import CDASubject, CDAResearchSubject, CDASubjectResearchSubject, CDADiagnosis, CDATreatment, \
CDASubjectAlias, CDASubjectProject, CDAResearchSubjectDiagnosis, CDASpecimen, ProjectdbGap, GDCProgramdbGap, \
CDASubjectIdentifier
CDASubjectIdentifier, CDAProjectRelation, CDAFile, CDAFileSubject, CDAFileSpecimen
from cda2fhir.transformer import PatientTransformer, ResearchStudyTransformer, ResearchSubjectTransformer, \
ConditionTransformer, SpecimenTransformer
from sqlalchemy import select, func
ConditionTransformer, SpecimenTransformer, DocumentReferenceTransformer
from sqlalchemy import select, func, or_
from sqlalchemy.orm import selectinload

gdc_dbgap_names = ['APOLLO', 'CDDP_EAGLE', 'CGCI', 'CTSP', 'EXCEPTIONAL_RESPONDERS', 'FM', 'HCMI', 'MMRF', 'NCICCR', 'OHSU', 'ORGANOID', 'REBC', 'TARGET', 'TCGA', 'TRIO', 'VAREPOP', 'WCDT']
gdc_dbgap_names = ['APOLLO', 'CDDP_EAGLE', 'CGCI', 'CTSP', 'EXCEPTIONAL_RESPONDERS', 'FM', 'HCMI', 'MMRF', 'NCICCR',
'OHSU', 'ORGANOID', 'REBC', 'TARGET', 'TCGA', 'TRIO', 'VAREPOP', 'WCDT']


def fhir_ndjson(entity, out_path):
Expand All @@ -25,18 +30,19 @@ def fhir_ndjson(entity, out_path):
file.write(json.dumps(entity, ensure_ascii=False))


def cda2fhir(path, n_samples, n_diagnosis, save=True, verbose=False):
def cda2fhir(path, n_samples, n_diagnosis, transform_files, n_files, save=True, verbose=False):
"""CDA2FHIR attempts to transform the baseclass definitions of CDA data defined in cdamodels to query relevant
information to create FHIR entities: Specimen, ResearchSubject,
ResearchStudy, Condition, BodyStructure, Observation utilizing transfomer classes."""
load_data()
load_data(transform_files)

session = SessionLocal()
patient_transformer = PatientTransformer(session)
research_study_transformer = ResearchStudyTransformer(session)
research_subject_transformer = ResearchSubjectTransformer(session)
condition_transformer = ConditionTransformer(session)
specimen_transformer = SpecimenTransformer(session)
file_transformer = DocumentReferenceTransformer(session, patient_transformer, specimen_transformer)

if path:
meta_path = Path(path)
Expand Down Expand Up @@ -182,11 +188,13 @@ def cda2fhir(path, n_samples, n_diagnosis, save=True, verbose=False):
research_study = research_study_transformer.research_study(project, cda_rs_subject)

# gdc_dbgap = session.execute(session.query(GDCProgramdbGap).filter(GDCProgramdbGap.GDC_program_name.in_(gdc_dbgap_names))).all()
gdc_dbgap = session.execute(session.query(GDCProgramdbGap).where(GDCProgramdbGap.GDC_program_name.contains(research_study.name))).one_or_none()
gdc_dbgap = session.execute(session.query(GDCProgramdbGap).where(
GDCProgramdbGap.GDC_program_name.contains(research_study.name))).one_or_none()

if gdc_dbgap:
# parent dbGap ID for GDC projects ex. TCGA dgGap id for all projetcs including TCGA substring (ex. TCGA-BRCA)
research_study.identifier.append(Identifier(**{"system": "https://www.ncbi.nlm.nih.gov/gap/GDC", "value": gdc_dbgap[0]}))
research_study.identifier.append(
Identifier(**{"system": "https://www.ncbi.nlm.nih.gov/gap/GDC", "value": gdc_dbgap[0]}))

# query and fetch projet's dbgap id
dbGap_study_accession = session.execute(
Expand All @@ -200,7 +208,6 @@ def cda2fhir(path, n_samples, n_diagnosis, save=True, verbose=False):
research_study.identifier.append(dbGap_identifier)

if research_study:
research_studies.append(research_study)
if _patient and research_study:
_research_subject = research_subject_transformer.research_subject(cda_rs_subject,
_patient[0],
Expand All @@ -226,14 +233,78 @@ def cda2fhir(path, n_samples, n_diagnosis, save=True, verbose=False):
subject_alias=query_subject_alias[0].subject_alias,
value=subject_id_value))
.all())

part_of_study = []
for _cda_subject_identifier in _cda_subject_identifiers:
_program_research_study = research_study_transformer.program_research_study(
name=_cda_subject_identifier[0].system)
if _program_research_study:
research_studies.append(_program_research_study)
research_study.partOf = [
Reference(**{"reference": f"ResearchStudy/{_program_research_study.id}"})]
# research_study.partOf =
part_of_study.append(
Reference(**{"reference": f"ResearchStudy/{_program_research_study.id}"}))

# ResearchStudy relations
# GDC <- [IDC, PDC, ICDC, CDS] and HTAN & CMPC
project_name = project.associated_project
associated_project_programs = session.query(CDAProjectRelation).filter(
or_(
CDAProjectRelation.project_gdc == project_name,
CDAProjectRelation.project_pdc == project_name,
CDAProjectRelation.project_idc == project_name,
CDAProjectRelation.project_cds == project_name,
CDAProjectRelation.project_icdc == project_name
)
).all()

for _p in associated_project_programs:
print("===========: ", _p, "\n")
_p_name = None
if _p.project_gdc == project_name:
_p_name = 'GDC'
elif _p.project_pdc == project_name:
_p_name = 'PDC'
elif _p.project_idc == project_name:
_p_name = 'IDC'
elif _p.project_cds == project_name:
_p_name = 'CDS'
elif _p.project_icdc == project_name:
_p_name = 'ICDC'

print("Program:", _p.program, "Sub-Program:", _p.sub_program, "GDC:", _p.project_gdc,
"PDC:", _p.project_pdc,
"IDC:", _p.project_idc, "CDS:", _p.project_cds, "ICDC:", _p.project_icdc,
"program_project_match:", _p_name)
#
if _p.program:
parent_program = research_study_transformer.program_research_study(
name=_p.program)
part_of_study = [p for p in part_of_study if
p.reference not in f"ResearchStudy/{parent_program.id}"]
part_of_study.append(
Reference(**{"reference": f"ResearchStudy/{parent_program.id}"}))
research_studies.append(parent_program)

if _p_name:
main_program = research_study_transformer.program_research_study(
name=_p_name)
part_of_study = [p for p in part_of_study if
p.reference not in f"ResearchStudy/{main_program.id}"]
part_of_study.append(Reference(**{"reference": f"ResearchStudy/{main_program.id}"}))
research_studies.append(main_program)

if _p.sub_program:
parent_sub_program = research_study_transformer.program_research_study(
name=_p.sub_program)
part_of_study = [p for p in part_of_study if
p.reference not in f"ResearchStudy/{parent_sub_program.id}"]
part_of_study.append(
Reference(**{"reference": f"ResearchStudy/{parent_sub_program.id}"}))
research_studies.append(parent_sub_program)

if part_of_study:
research_study.partOf = part_of_study

research_studies.append(research_study)

if save and research_studies:
research_studies = {rstudy.id: rstudy for rstudy in research_studies if
Expand Down Expand Up @@ -304,12 +375,12 @@ def cda2fhir(path, n_samples, n_diagnosis, save=True, verbose=False):
if save and conditions:
_conditions = {_condition.id: _condition for _condition in conditions if _condition}.values()
fhir_conditions = [orjson.loads(c.json()) for c in _conditions if c]
fhir_ndjson(fhir_conditions, str(meta_path / "Condition.ndjson"))
fhir_ndjson(fhir_conditions, str(meta_path / "Condition.ndjson"))

if save and observations:
observations = {_obs.id: _obs for _obs in observations if _obs}.values()
patient_observations = [orjson.loads(observation.json()) for observation in observations]
fhir_ndjson(patient_observations, str(meta_path / "Observation.ndjson"))
fhir_ndjson(patient_observations, str(meta_path / "Observation.ndjson"))

# MedicationAdministration and Medication -----------------------------------
treatments = session.query(CDATreatment).all()
Expand All @@ -318,6 +389,71 @@ def cda2fhir(path, n_samples, n_diagnosis, save=True, verbose=False):
for treatment in treatments:
print(f"id: {treatment.id}, therapeutic_agent: {treatment.therapeutic_agent}")

# File -----------------------------------
# requires pre-processing and validation
# large record set -> 30+ GB takes time
if transform_files:
if n_files:
n_files = int(n_files)
files = session.execute(
select(CDAFile)
.order_by(func.random())
.limit(n_files)
.options(
selectinload(CDAFile.file_subject_relation).selectinload(CDAFileSubject.subject),
selectinload(CDAFile.specimen_file_relation).selectinload(CDAFileSpecimen.specimen)
)
).scalars().all()
else:
files = session.query(CDAFile).options(
selectinload(CDAFile.file_subject_relation).selectinload(CDAFileSubject.subject),
selectinload(CDAFile.specimen_file_relation).selectinload(CDAFileSpecimen.specimen)
).all()

assert files, "Files are not defined."

all_files = []
all_groups = []
for file in files:
print(f"File ID: {file.id}, File DRS URI: {file.drs_uri}")

_file_subjects = [
session.query(CDASubject).filter(CDASubject.id == file_subject.subject_id).first()
for file_subject in file.file_subject_relation
]
_file_subjects = [subject for subject in _file_subjects if subject] # remove none
print(f"++++++++++++++ FILE's SUBJECTS: {[_subject.id for _subject in _file_subjects]}")

_file_specimens = [
session.query(CDASpecimen).filter(CDASpecimen.id == file_specimen.specimen_id).first()
for file_specimen in file.specimen_file_relation
]
_file_specimens = [specimen for specimen in _file_specimens if specimen] # remove none
print(f"+++++++++++++ FILE's SPECIMENS: {[_specimen.id for _specimen in _file_specimens]}")

# DocumentReference passing associated CDASubject and CDASpecimen
fhir_file = file_transformer.fhir_document_reference(file, _file_subjects, _file_specimens)
if fhir_file["DocumentReference"] and isinstance(fhir_file["DocumentReference"], DocumentReference):
all_files.append(fhir_file["DocumentReference"])

this_files_group = fhir_file.get("Group")
if this_files_group and isinstance(this_files_group, Group):
all_groups.append(this_files_group)

if save and all_files:
document_references = {_doc_ref.id: _doc_ref for _doc_ref in all_files if _doc_ref}.values()
fhir_document_references = [orjson.loads(document_reference.json()) for document_reference in document_references]

utils.create_or_extend(new_items=fhir_document_references, folder_path='data/META', resource_type='DocumentReference', update_existing=False)
# fhir_ndjson(fhir_document_references, str(meta_path / "DocumentReference.ndjson"))

if save and all_groups:
groups = {group.id: group for group in all_groups if group.id}.values()
fhir_groups = [orjson.loads(group.json()) for group in groups]

utils.create_or_extend(new_items=fhir_groups, folder_path='data/META', resource_type='Group', update_existing=False)
# fhir_ndjson(fhir_groups, str(meta_path / "Group.ndjson"))

finally:
print("****** Closing Session ******")
session.close()
Loading

0 comments on commit dc6ae92

Please sign in to comment.