Skip to content

Commit

Permalink
Support a flat ndjson dir layout as well as a hierarchical one (#244)
Browse files Browse the repository at this point in the history
* Support a flat ndjson dir layout as well as a hierarchical one

Previously, --load-ndjson-dir would only look for an ETL output-style
format like this:

dir/condition/*.ndjson
dir/patient/*.ndjson

But now it will also look for flat files as well (i.e. ETL input-style
format) like this:

dir/1.Patient.ndjson
dir/Patient.october.ndjson
dir/Patient.ndjson

This will make it nicer to use the --load-ndjson-dir flow when you are
working on ndjson files directly, without going through Cumulus ETL
first.

* docs: update for newer, easier ndjson layout format
  • Loading branch information
mikix authored May 29, 2024
1 parent 031bbd8 commit 27438b3
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 33 deletions.
42 changes: 29 additions & 13 deletions cumulus_library/databases.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,21 +515,37 @@ def parse_found_schema(self, schema: dict[str, str]) -> dict:
return parsed


def _read_rows_from_table_dir(path: str) -> list[dict]:
# Grab filenames to load (ignoring .meta files and handling missing folders)
folder = Path(path)
filenames = []
if folder.exists():
filenames = sorted(
str(x) for x in folder.iterdir() if x.name.endswith(".ndjson")
)

# Read all ndjson directly into memory
def _read_rows_from_files(filenames: list[str]) -> list[dict]:
"""Reads all provided ndjson files directly into memory"""
rows = []
for filename in filenames:
for filename in sorted(filenames):
with open(filename, encoding="utf8") as f:
for line in f:
rows.append(json.loads(line))
return rows


def _read_rows_from_table_dir(path: Path) -> list[dict]:
"""Grab ndjson files in the Cumulus ETL output format: path/tablename/*.ndjson"""
if not path.exists():
return []

filenames = [str(x) for x in path.iterdir() if x.name.endswith(".ndjson")]
return _read_rows_from_files(filenames)


def _read_rows_for_resource(path: Path, resource: str) -> list[dict]:
rows = []

# Grab any ndjson files in Cumulus ETL input format: path/*.Resource.*.ndjson
if path.exists():
# This pattern is copied from the ETL, allowing a suffix or a numbered prefix.
pattern = re.compile(rf"([0-9]+\.)?{resource}(\.[^/]+)?\.ndjson")
filenames = [str(x) for x in path.iterdir() if pattern.match(x.name)]
rows += _read_rows_from_files(filenames)

# Also grab any ndjson files in Cumulus ETL output format
rows += _read_rows_from_table_dir(path / resource.lower())

return rows

Expand Down Expand Up @@ -562,7 +578,7 @@ def read_ndjson_dir(path: str) -> dict[str, pyarrow.Table]:
]
for resource in resources:
table_name = resource.lower()
rows = _read_rows_from_table_dir(f"{path}/{table_name}")
rows = _read_rows_for_resource(Path(path), resource)

# Make a pyarrow table with full schema from the data
schema = cumulus_fhir_support.pyarrow_schema_from_rows(resource, rows)
Expand All @@ -574,7 +590,7 @@ def read_ndjson_dir(path: str) -> dict[str, pyarrow.Table]:
"etl__completion_encounters",
]
for metadata_table in metadata_tables:
rows = _read_rows_from_table_dir(f"{path}/{metadata_table}")
rows = _read_rows_from_table_dir(Path(f"{path}/{metadata_table}"))
if rows:
# Auto-detecting the schema works for these simple tables
all_tables[metadata_table] = pyarrow.Table.from_pylist(rows)
Expand Down
22 changes: 2 additions & 20 deletions docs/creating-studies.md
Original file line number Diff line number Diff line change
Expand Up @@ -248,32 +248,14 @@ You can grab fake
[Synthea data](https://github.com/smart-on-fhir/sample-bulk-fhir-datasets)
or use the result of actual bulk-export results from your EHR.

Once you have that,
run [Cumulus ETL](https://docs.smarthealthit.org/cumulus/etl/)
on your source ndjson with the
`--output-format=ndjson` flag and pointing at some local directory.
For example:
```shell
docker compose run \
--volume local_dir:/in \
cumulus-etl \
/in/ndjson \
/in/output \
/in/phi \
--output-format=ndjson
```

This will generate a tree of processed & anonymized ndjson
(just like the ETL normally makes).

### Run your study on the local ndjson

Now you can run Cumulus Library but point it at the output files with the
Now you can run Cumulus Library but point it at the ndjson folder with the
`--db-type=duckdb` and `--load-ndjson-dir=DIR` flags. For example:
```shell
cumulus-library build \
--db-type duckdb \
--load-ndjson-dir local_dir/output \
--load-ndjson-dir local_dir/ndjson/ \
--database local_dir/duck.db \
--target my_study
```
Expand Down
35 changes: 35 additions & 0 deletions tests/test_duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,41 @@ def test_duckdb_from_iso8601_timestamp(timestamp, expected):
assert parsed == expected


def test_duckdb_load_ndjson_dir(tmp_path):
filenames = {
"A.Patient.ndjson": False,
"1.Patient.ndjson": True,
"Patient.ndjson": True,
"patient.ndjson": False,
"Patient.hello.bye.ndjson": True,
"Patient.nope": False,
"patient/blarg.ndjson": True,
"patient/blarg.meta": False,
}
os.mkdir(f"{tmp_path}/patient")
for index, (filename, valid) in enumerate(filenames.items()):
with open(f"{tmp_path}/{filename}", "w", encoding="utf8") as f:
row_id = f"Good{index}" if valid else f"Bad{index}"
f.write(f'{{"id":"{row_id}"}}')

db = databases.create_db_backend(
{
"db_type": "duckdb",
"schema_name": ":memory:",
"load_ndjson_dir": tmp_path,
}
)

expected_good_count = len({f for f, v in filenames.items() if v})
found_ids = {
row[0] for row in db.cursor().execute("select id from patient").fetchall()
}
found_good = {row_id for row_id in found_ids if row_id.startswith("Good")}
found_bad = found_ids - found_good
assert len(found_good) == expected_good_count
assert len(found_bad) == 0


def test_duckdb_table_schema():
"""Verify we can detect schemas correctly, even for nested camel case fields"""
db = databases.DuckDatabaseBackend(":memory:")
Expand Down

0 comments on commit 27438b3

Please sign in to comment.