Skip to content

Commit

Permalink
Chunking SQL export queries
Browse files Browse the repository at this point in the history
  • Loading branch information
dogversioning committed Jun 10, 2024
1 parent 5db1c2a commit 3258e66
Show file tree
Hide file tree
Showing 18 changed files with 305 additions and 199 deletions.
13 changes: 12 additions & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,18 @@ jobs:
pip install ".[test]"
- name: Test with pytest
run: |
python -m pytest
python -m pytest --cov-report xml --cov=cumulus_library tests
- name: debug
run: |
ls
pwd
- name: Get Cover
uses: orgoro/[email protected]
with:
coverageFile: coverage.xml
token: ${{ secrets.GITHUB_TOKEN }}


lint:
runs-on: ubuntu-22.04
steps:
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ output.sql
*generated.md
MRCONSO.RRF
*.zip
coverage.xml

# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down
45 changes: 38 additions & 7 deletions cumulus_library/actions/exporter.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import csv
import pathlib

import pyarrow
from pyarrow import csv, parquet
from rich.progress import track

from cumulus_library import base_utils, databases, study_parser
Expand All @@ -25,6 +26,18 @@ def reset_counts_exports(
file.unlink()


def _write_chunk(writer, chunk, schema):
writer.write(
pyarrow.Table.from_pandas(
chunk.sort_values(
by=list(chunk.columns), ascending=False, na_position="first"
),
preserve_index=False,
schema=schema,
)
)


def export_study(
manifest_parser: study_parser.StudyManifestParser,
db: databases.DatabaseBackend,
Expand Down Expand Up @@ -56,13 +69,31 @@ def export_study(
description=f"Exporting {manifest_parser.get_study_prefix()} data...",
):
query = f"SELECT * FROM {table}"
dataframe = db.execute_as_pandas(query)
# Note: we assume that, for duckdb, you are unlikely to be dealing with
dataframe_chunks, db_schema = db.execute_as_pandas(query, chunksize=100000000)
if table == "core__observation":
print(db_schema)
first_chunk = next(dataframe_chunks)
path.mkdir(parents=True, exist_ok=True)
dataframe = dataframe.sort_values(
by=list(dataframe.columns), ascending=False, na_position="first"
)
dataframe.to_csv(f"{path}/{table}.csv", index=False, quoting=csv.QUOTE_MINIMAL)
dataframe.to_parquet(f"{path}/{table}.parquet", index=False)
# print(pyarrow.Schema.from_pandas(first_chunk))
# print(db_schema)
schema = pyarrow.schema(db.col_pyarrow_types_from_sql(db_schema))
# print(schema)
with parquet.ParquetWriter(f"{path}/{table}.parquet", schema) as p_writer:
with csv.CSVWriter(
f"{path}/{table}.csv",
schema,
write_options=csv.WriteOptions(
# Note that this quoting style is not exactly csv.QUOTE_MINIMAL
# https://github.com/apache/arrow/issues/42032
quoting_style="needed"
),
) as c_writer:
_write_chunk(p_writer, first_chunk, schema)
_write_chunk(c_writer, first_chunk, schema)
for chunk in dataframe_chunks:
_write_chunk(p_writer, chunk, schema)
_write_chunk(c_writer, chunk, schema)
queries.append(queries)
if archive:
base_utils.zip_dir(path, data_path, manifest_parser.get_study_prefix())
Expand Down
72 changes: 67 additions & 5 deletions cumulus_library/databases.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"""

import abc
import collections
import datetime
import json
import os
Expand Down Expand Up @@ -151,7 +152,9 @@ def pandas_cursor(self) -> DatabaseCursor:
"""

@abc.abstractmethod
def execute_as_pandas(self, sql: str) -> pandas.DataFrame:
def execute_as_pandas(
self, sql: str, chunksize: int | None = None
) -> (pandas.DataFrame | collections.abc.Iterator[pandas.DataFrame], list[tuple]):
"""Returns a pandas.DataFrame version of the results from the provided SQL"""

@abc.abstractmethod
Expand Down Expand Up @@ -200,6 +203,9 @@ def col_parquet_types_from_pandas(self, field_types: list) -> list:

return field_types

def col_pyarrow_types_from_sql(self, columns: list[tuple]) -> list:
return columns

def upload_file(
self,
*,
Expand Down Expand Up @@ -257,8 +263,11 @@ def cursor(self) -> AthenaCursor:
def pandas_cursor(self) -> AthenaPandasCursor:
return self.connection.cursor(cursor=AthenaPandasCursor)

def execute_as_pandas(self, sql: str) -> pandas.DataFrame:
return self.pandas_cursor().execute(sql).as_pandas()
def execute_as_pandas(
self, sql: str, chunksize: int | None = None
) -> (pandas.DataFrame | collections.abc.Iterator[pandas.DataFrame], list[tuple]):
query = self.pandas_cursor().execute(sql, chunksize=chunksize)
return query.as_pandas(), query.description

def parser(self) -> DatabaseParser:
return AthenaParser()
Expand Down Expand Up @@ -286,6 +295,30 @@ def col_parquet_types_from_pandas(self, field_types: list) -> list:
)
return output

def col_pyarrow_types_from_sql(self, columns: list[tuple]) -> list:
output = []
for column in columns:
match column[1]:
case "varchar":
output.append((column[0], pyarrow.string()))
case "bigint":
output.append((column[0], pyarrow.int64()))
case "integer":
output.append((column[0], pyarrow.int64()))
case "double":
output.append((column[0], pyarrow.float64()))
case "boolean":
output.append((column[0], pyarrow.bool_()))
case "date":
output.append((column[0], pyarrow.date64()))
case "timestamp":
output.append((column[0], pyarrow.timestamp("s")))
case _:
raise errors.CumulusLibraryError(
output.append((column[0], column[1]))
)
return output

def upload_file(
self,
*,
Expand Down Expand Up @@ -525,12 +558,41 @@ def pandas_cursor(self) -> duckdb.DuckDBPyConnection:
# Since this is not provided, return the vanilla cursor
return self.connection

def execute_as_pandas(self, sql: str) -> pandas.DataFrame:
def execute_as_pandas(
self, sql: str, chunksize: int | None = None
) -> (pandas.DataFrame | collections.abc.Iterator[pandas.DataFrame], list[tuple]):
# We call convert_dtypes here in case there are integer columns.
# Pandas will normally cast nullable-int as a float type unless
# we call this to convert to its nullable int column type.
# PyAthena seems to do this correctly for us, but not DuckDB.
return self.connection.execute(sql).df().convert_dtypes()
result = self.connection.execute(sql)
if chunksize:
return iter([result.df().convert_dtypes()]), result.description
return result.df().convert_dtypes(), result.description

def col_pyarrow_types_from_sql(self, columns: list[tuple]) -> list:
output = []
for column in columns:
match column[1]:
case "STRING":
output.append((column[0], pyarrow.string()))
case "INTEGER":
output.append((column[0], pyarrow.int64()))
case "NUMBER":
output.append((column[0], pyarrow.float64()))
case "DOUBLE":
output.append((column[0], pyarrow.float64()))
case "boolean" | "bool":
output.append((column[0], pyarrow.bool_()))
case "Date":
output.append((column[0], pyarrow.date64()))
case "TIMESTAMP" | "DATETIME":
output.append((column[0], pyarrow.timestamp("s")))
case _:
raise errors.CumulusLibraryError(
f"{column[0],column[1]} does not have a conversion type"
)
return output

def parser(self) -> DatabaseParser:
return DuckDbParser()
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ dev = [
test = [
"freezegun",
"pytest",
"pytest-cov",
"responses"
]

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
cnt,category_code,recordedDate_month,code_display
"cnt","category_code","recordedDate_month","code_display"
15,,,
15,encounter-diagnosis,,
15,"encounter-diagnosis",,
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
cnt,type_display,author_month,class_display
"cnt","type_display","author_month","class_display"
50,,,
50,Evaluation + Plan note,,
50,Emergency department note,,
46,,,ambulatory
46,Evaluation + Plan note,,ambulatory
46,Emergency department note,,ambulatory
26,,2018-07-01,
26,Evaluation + Plan note,2018-07-01,
26,Emergency department note,2018-07-01,
24,,2018-07-01,ambulatory
24,,2018-06-01,
24,Evaluation + Plan note,2018-07-01,ambulatory
24,Evaluation + Plan note,2018-06-01,
24,Emergency department note,2018-07-01,ambulatory
24,Emergency department note,2018-06-01,
22,,2018-06-01,ambulatory
22,Evaluation + Plan note,2018-06-01,ambulatory
22,Emergency department note,2018-06-01,ambulatory
50,"Evaluation + Plan note",,
50,"Emergency department note",,
46,,,"ambulatory"
46,"Evaluation + Plan note",,"ambulatory"
46,"Emergency department note",,"ambulatory"
26,,"2018-07-01",
26,"Evaluation + Plan note","2018-07-01",
26,"Emergency department note","2018-07-01",
24,,"2018-07-01","ambulatory"
24,,"2018-06-01",
24,"Evaluation + Plan note","2018-07-01","ambulatory"
24,"Evaluation + Plan note","2018-06-01",
24,"Emergency department note","2018-07-01","ambulatory"
24,"Emergency department note","2018-06-01",
22,,"2018-06-01","ambulatory"
22,"Evaluation + Plan note","2018-06-01","ambulatory"
22,"Emergency department note","2018-06-01","ambulatory"
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
cnt,class_display,type_display,serviceType_display,priority_display
"cnt","class_display","type_display","serviceType_display","priority_display"
50,,,,
50,,,,cumulus__none
50,,,cumulus__none,
50,,,cumulus__none,cumulus__none
46,ambulatory,,,
46,ambulatory,,,cumulus__none
46,ambulatory,,cumulus__none,
46,ambulatory,,cumulus__none,cumulus__none
50,,,,"cumulus__none"
50,,,"cumulus__none",
50,,,"cumulus__none","cumulus__none"
46,"ambulatory",,,
46,"ambulatory",,,"cumulus__none"
46,"ambulatory",,"cumulus__none",
46,"ambulatory",,"cumulus__none","cumulus__none"
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
cnt,class_display,type_display,serviceType_display,priority_display,period_start_month
"cnt","class_display","type_display","serviceType_display","priority_display","period_start_month"
50,,,,,
50,,,,cumulus__none,
50,,,cumulus__none,,
50,,,cumulus__none,cumulus__none,
46,ambulatory,,,,
46,ambulatory,,,cumulus__none,
46,ambulatory,,cumulus__none,,
46,ambulatory,,cumulus__none,cumulus__none,
26,,,,,2018-07-01
26,,,,cumulus__none,2018-07-01
26,,,cumulus__none,,2018-07-01
26,,,cumulus__none,cumulus__none,2018-07-01
24,,,,,2018-06-01
24,,,,cumulus__none,2018-06-01
24,,,cumulus__none,,2018-06-01
24,,,cumulus__none,cumulus__none,2018-06-01
24,ambulatory,,,,2018-07-01
24,ambulatory,,,cumulus__none,2018-07-01
24,ambulatory,,cumulus__none,,2018-07-01
24,ambulatory,,cumulus__none,cumulus__none,2018-07-01
22,ambulatory,,,,2018-06-01
22,ambulatory,,,cumulus__none,2018-06-01
22,ambulatory,,cumulus__none,,2018-06-01
22,ambulatory,,cumulus__none,cumulus__none,2018-06-01
50,,,,"cumulus__none",
50,,,"cumulus__none",,
50,,,"cumulus__none","cumulus__none",
46,"ambulatory",,,,
46,"ambulatory",,,"cumulus__none",
46,"ambulatory",,"cumulus__none",,
46,"ambulatory",,"cumulus__none","cumulus__none",
26,,,,,"2018-07-01"
26,,,,"cumulus__none","2018-07-01"
26,,,"cumulus__none",,"2018-07-01"
26,,,"cumulus__none","cumulus__none","2018-07-01"
24,,,,,"2018-06-01"
24,,,,"cumulus__none","2018-06-01"
24,,,"cumulus__none",,"2018-06-01"
24,,,"cumulus__none","cumulus__none","2018-06-01"
24,"ambulatory",,,,"2018-07-01"
24,"ambulatory",,,"cumulus__none","2018-07-01"
24,"ambulatory",,"cumulus__none",,"2018-07-01"
24,"ambulatory",,"cumulus__none","cumulus__none","2018-07-01"
22,"ambulatory",,,,"2018-06-01"
22,"ambulatory",,,"cumulus__none","2018-06-01"
22,"ambulatory",,"cumulus__none",,"2018-06-01"
22,"ambulatory",,"cumulus__none","cumulus__none","2018-06-01"
Loading

0 comments on commit 3258e66

Please sign in to comment.