Skip to content

Commit

Permalink
Packaging mode for studies (#323)
Browse files Browse the repository at this point in the history
* Packaging mode for studies

* PR feedback

* sqlfluff update

* coverage
  • Loading branch information
dogversioning authored Dec 10, 2024
1 parent dd41536 commit 763e0de
Show file tree
Hide file tree
Showing 23 changed files with 372 additions and 196 deletions.
1 change: 1 addition & 0 deletions cumulus_library/.sqlfluff
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ exclude_rules=
# these rule overfires on athena nested arrays
references.from,
structure.column_order,
structure.unused_join,
aliasing.unused,
# this rule interferes with FHIR naming conventions
capitalisation.identifiers
Expand Down
230 changes: 195 additions & 35 deletions cumulus_library/actions/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
import re
import sys
import tomllib
import zipfile

from rich.progress import Progress, TaskID

from cumulus_library import (
BaseTableBuilder,
CountsBuilder,
base_utils,
databases,
enums,
Expand Down Expand Up @@ -40,7 +42,10 @@ def _load_and_execute_builder(
db_parser: databases.DatabaseParser = None,
write_reference_sql: bool = False,
doc_str: str | None = None,
) -> None:
prepare: bool = False,
data_path: pathlib.Path,
query_count: int | None = None,
) -> int:
"""Loads a table builder from a file.
:param config: a StudyConfig object
Expand All @@ -49,6 +54,10 @@ def _load_and_execute_builder(
:keyword db_parser: an object implementing DatabaseParser for the target database
:keyword write_reference_sql: if true, writes sql to disk inside a study's directory
:keyword doc_str: A string to insert between queries written to disk
:keyword prepare: If true, will render query instead of executing
:keyword data_path: If prepare is true, the path to write rendered data to
:keyword query_count: if prepare is true, the number of queries already rendered
:returns: number of processed queries
"""

# Since we have to support arbitrary user-defined python files here, we
Expand Down Expand Up @@ -79,8 +88,7 @@ def _load_and_execute_builder(
f"Error loading {manifest._study_path}{filename}\n"
"Custom builders must extend the BaseTableBuilder class."
)

# Remove instances of intermediate classes, if present
# Remove instances of intermediate classes, if present (usually not)
table_builder_subclasses = list(
filter(lambda x: x.__name__ != "CountsBuilder", table_builder_subclasses)
)
Expand All @@ -89,7 +97,10 @@ def _load_and_execute_builder(
# remove it so it doesn't interfere with the next python module to
# execute, since the subclass would otherwise hang around.
table_builder_class = table_builder_subclasses[0]
table_builder = table_builder_class()
if issubclass(table_builder_class, CountsBuilder):
table_builder = table_builder_class(study_prefix=manifest.get_study_prefix())
else:
table_builder = table_builder_class()
if write_reference_sql:
prefix = manifest.get_study_prefix()
table_builder.prepare_queries(config=config, manifest=manifest, parser=db_parser)
Expand All @@ -105,19 +116,30 @@ def _load_and_execute_builder(
table_builder.write_queries(
path=pathlib.Path(f"{manifest._study_path}/reference_sql/" + new_filename)
)
elif prepare:
table_builder.prepare_queries(
config=config,
manifest=manifest,
parser=db_parser,
)
_render_output(
manifest.get_study_prefix(), table_builder.queries, data_path, filename, query_count
)
else:
table_builder.execute_queries(
config=config,
manifest=manifest,
parser=db_parser,
)

num_queries = len(table_builder.queries)
# After running the executor code, we'll remove
# it so it doesn't interfere with the next python module to
# execute, since the subclass would otherwise hang around.
del sys.modules[table_builder_module.__name__]
del table_builder_module

return num_queries


def run_protected_table_builder(
config: base_utils.StudyConfig,
Expand All @@ -136,13 +158,36 @@ def run_protected_table_builder(


def _run_workflow(
config: base_utils.StudyConfig, manifest: study_manifest.StudyManifest, filename: str
) -> None:
config: base_utils.StudyConfig,
manifest: study_manifest.StudyManifest,
filename: str,
prepare: str,
data_path: pathlib.Path,
query_count: int,
) -> int:
"""Loads workflow config from toml definitions and executes workflow
:param config: a StudyConfig object
:param manifest: a StudyManifest object
:param filename: Filename of the workflow config
:param prepare: If true, will render query instead of executing
:param data_path: If prepare is true, the path to write rendered data to
:param query_count: if prepare is true, the number of queries already rendered
:returns: a count of processed queries
"""
toml_path = pathlib.Path(f"{manifest._study_path}/{filename}")
if prepare:
with open(toml_path, encoding="utf-8") as file:
workflow_config = file.read()
_render_output(
manifest.get_study_prefix(),
[workflow_config],
data_path,
filename,
query_count,
is_toml=True,
)
return 1
existing_stats = []
if not config.stats_build:
existing_stats = (
Expand All @@ -157,14 +202,13 @@ def _run_workflow(
# but we're letting it slide so that builders function similarly
# across the board
safe_timestamp = base_utils.get_tablename_safe_iso_timestamp()
toml_path = pathlib.Path(f"{manifest._study_path}/{filename}")
with open(toml_path, "rb") as file:
workflow_config = tomllib.load(file)
config_type = workflow_config["config_type"]
target_table = workflow_config.get("target_table", workflow_config.get("table_prefix", ""))

if (target_table,) in existing_stats and not config.stats_build:
return
return 0
match config_type:
case "psm":
builder = psm_builder.PsmBuilder(
Expand Down Expand Up @@ -195,6 +239,7 @@ def _run_workflow(
table_name=f"{target_table}_{safe_timestamp}",
view_name=target_table,
)
return len(builder.queries)


def build_matching_files(
Expand All @@ -203,13 +248,20 @@ def build_matching_files(
*,
builder: str | None,
db_parser: databases.DatabaseParser = None,
prepare: bool,
data_path: pathlib.Path,
):
"""targets all table builders matching a target string for running
:param config: a StudyConfig object
:param manifest: a StudyManifest object
:keyword builder: filename of a module implementing a TableBuilder
:keyword db_parser: an object implementing DatabaseParser for the target database"""
:keyword db_parser: an object implementing DatabaseParser for the target database
:keyword prepare: If true, will render query instead of executing
:keyword data_path: If prepare is true, the path to write rendered data to
"""
if prepare:
_check_if_preparable(manifest.get_study_prefix()) # pragma: no cover
all_generators = manifest.get_all_generators()
matches = []
if not builder: # pragma: no cover
Expand All @@ -218,7 +270,14 @@ def build_matching_files(
for file in all_generators:
if file.find(builder) != -1:
matches.append(file)
build_study(config, manifest, db_parser=db_parser, file_list=matches)
build_study(
config,
manifest,
db_parser=db_parser,
file_list=matches,
prepare=prepare,
data_path=data_path,
)


def build_study(
Expand All @@ -228,35 +287,86 @@ def build_study(
db_parser: databases.DatabaseParser = None,
continue_from: str | None = None,
file_list: list | None = None,
) -> list:
prepare: bool,
data_path: pathlib.Path | None,
) -> None:
"""Creates tables in the schema by iterating through the sql_config.file_names
:param config: a StudyConfig object
:param manifest: a StudyManifest object
:keyword db_parser: a parser for the target database
:keyword continue_from: Name of a file to resume table creation from
:returns: loaded queries (for unit testing only)
:keyword prepare: If true, will render query instead of executing
:keyword data_path: If prepare is true, the path to write rendered data to
"""
if prepare:
_check_if_preparable(manifest.get_study_prefix())
if file_list is None:
file_list = manifest.get_file_list(continue_from)
if prepare:
data_dir = data_path / manifest.get_study_prefix()
for file in data_dir.glob("*"):
if file.is_file(): # pragma: no cover
file.unlink()
query_count = 0
for file in file_list:
if file.endswith(".py"):
_load_and_execute_builder(
query_count += _load_and_execute_builder(
config=config,
manifest=manifest,
filename=file,
db_parser=db_parser,
data_path=data_path,
prepare=prepare,
query_count=query_count,
)
elif file.endswith(".toml"):
_run_workflow(config=config, manifest=manifest, filename=file)
query_count += _run_workflow(
config=config,
manifest=manifest,
filename=file,
data_path=data_path,
prepare=prepare,
query_count=query_count,
)
elif file.endswith(".sql"):
_run_raw_queries(config=config, manifest=manifest, filename=file)
query_count += _run_raw_queries(
config=config,
manifest=manifest,
filename=file,
data_path=data_path,
prepare=prepare,
query_count=query_count,
)
else:
raise errors.StudyManifestParsingError
if prepare:
with zipfile.ZipFile(
f"{data_path}/{manifest.get_study_prefix()}.zip", "w", zipfile.ZIP_DEFLATED
) as z:
for file in data_dir.iterdir():
z.write(file, file.relative_to(data_dir))


def _run_raw_queries(
config: base_utils.StudyConfig, manifest: study_manifest.StudyManifest, filename: str
):
config: base_utils.StudyConfig,
manifest: study_manifest.StudyManifest,
filename: str,
*,
data_path: pathlib.Path | None,
prepare: bool,
query_count: int,
) -> int:
"""Creates tables in the schema by iterating through the sql_config.file_names
:param config: a StudyConfig object
:param manifest: a StudyManifest object
:param filename: the name of the sql file to read
:param prepare: If true, will render query instead of executing
:param data_path: If prepare is true, the path to write rendered data to
:param query_count: the number of queries currently processed
:returns number of queries processed:
"""
queries = []
for query in base_utils.parse_sql(base_utils.load_text(f"{manifest._study_path}/{filename}")):
queries.append([query, filename])
Expand All @@ -266,25 +376,75 @@ def _run_raw_queries(
f"`{manifest.get_study_prefix()}__",
"`",
)
# We'll explicitly create a cursor since recreating cursors for each
# table in a study is slightly slower for some databases
cursor = config.db.cursor()
# We want to only show a progress bar if we are :not: printing SQL lines
with base_utils.get_progress_bar(disable=config.verbose) as progress:
task = progress.add_task(
f"Building tables from {filename}...",
total=len(queries),
visible=not config.verbose,
if prepare:
_render_output(
manifest.get_study_prefix(), [q[0] for q in queries], data_path, filename, query_count
)
_execute_build_queries(
config=config,
manifest=manifest,
cursor=cursor,
queries=queries,
progress=progress,
task=task,
else:
# We'll explicitly create a cursor since recreating cursors for each
# table in a study is slightly slower for some databases
cursor = config.db.cursor()
# We want to only show a progress bar if we are :not: printing SQL lines
with base_utils.get_progress_bar(disable=config.verbose) as progress:
task = progress.add_task(
f"Building tables from {filename}...",
total=len(queries),
visible=not config.verbose,
)
_execute_build_queries(
config=config,
manifest=manifest,
cursor=cursor,
queries=queries,
progress=progress,
task=task,
)
return len(queries)


def _render_output(
study_name: str,
outputs: list,
data_path: pathlib.Path,
filename: str,
count: int,
*,
is_toml: bool = False,
):
if is_toml:
suffix = "toml"
else:
suffix = "sql"
for index, output in enumerate(outputs):
if is_toml:
name = "config"
else:
# This regex attempts to discover the table name, via looking for the first
# dunder, and then gets the start of the line its on as a non-quote requiring
# part of a file name. So for example, finding SQL that looks like this:
# CREATE TABLE foo__bar AS (varchar baz)
# would result in `create_table_foo__bar` being assigned to name. The goal
# is to make this at least mildly parsable at the file system level if someone
# is reviewing a prepared study
name = re.search(r"(.*)__\w*", output)[0].lower().replace(" ", "_")
total_index = count + index
new_filename = f"{total_index:04d}.{filename.rsplit('.', 1)[0]}.{index:02d}.{name}.{suffix}"
file_path = data_path / f"{study_name}/{new_filename}"

file_path.parent.mkdir(exist_ok=True, parents=True)
with open(file_path, "w", encoding="UTF-8") as f:
f.write(output)


def _check_if_preparable(prefix):
# This list should include any study which requires interrogating the database to
# find if data is available to query (outside of a toml-driven workflow),
# which isn't doable as a distributed query
if prefix in ["core", "discovery", "data-metrics"]:
sys.exit(
f"Study '{prefix}'' does not support prepare mode. It must be run "
"directly against a target database."
)
return queries


def _query_error(
Expand Down
Loading

0 comments on commit 763e0de

Please sign in to comment.