Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Packaging mode for studies #323

Merged
merged 4 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cumulus_library/.sqlfluff
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ exclude_rules=
# these rule overfires on athena nested arrays
references.from,
structure.column_order,
structure.unused_join,
aliasing.unused,
# this rule interferes with FHIR naming conventions
capitalisation.identifiers
Expand Down
230 changes: 195 additions & 35 deletions cumulus_library/actions/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
import re
import sys
import tomllib
import zipfile

from rich.progress import Progress, TaskID

from cumulus_library import (
BaseTableBuilder,
CountsBuilder,
base_utils,
databases,
enums,
Expand Down Expand Up @@ -40,7 +42,10 @@ def _load_and_execute_builder(
db_parser: databases.DatabaseParser = None,
write_reference_sql: bool = False,
doc_str: str | None = None,
) -> None:
prepare: bool = False,
data_path: pathlib.Path,
query_count: int | None = None,
) -> int:
"""Loads a table builder from a file.

:param config: a StudyConfig object
Expand All @@ -49,6 +54,10 @@ def _load_and_execute_builder(
:keyword db_parser: an object implementing DatabaseParser for the target database
:keyword write_reference_sql: if true, writes sql to disk inside a study's directory
:keyword doc_str: A string to insert between queries written to disk
:keyword prepare: If true, will render query instead of executing
:keyword data_path: If prepare is true, the path to write rendered data to
:keyword query_count: if prepare is true, the number of queries already rendered
:returns: number of processed queries
"""

# Since we have to support arbitrary user-defined python files here, we
Expand Down Expand Up @@ -79,8 +88,7 @@ def _load_and_execute_builder(
f"Error loading {manifest._study_path}{filename}\n"
"Custom builders must extend the BaseTableBuilder class."
)

# Remove instances of intermediate classes, if present
# Remove instances of intermediate classes, if present (usually not)
table_builder_subclasses = list(
filter(lambda x: x.__name__ != "CountsBuilder", table_builder_subclasses)
)
Expand All @@ -89,7 +97,10 @@ def _load_and_execute_builder(
# remove it so it doesn't interfere with the next python module to
# execute, since the subclass would otherwise hang around.
table_builder_class = table_builder_subclasses[0]
table_builder = table_builder_class()
if issubclass(table_builder_class, CountsBuilder):
table_builder = table_builder_class(study_prefix=manifest.get_study_prefix())
else:
table_builder = table_builder_class()
Comment on lines +100 to +103
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should we just eventually retool builders to just accept a manifest object in the init method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh gosh, probably?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if write_reference_sql:
prefix = manifest.get_study_prefix()
table_builder.prepare_queries(config=config, manifest=manifest, parser=db_parser)
Expand All @@ -105,19 +116,30 @@ def _load_and_execute_builder(
table_builder.write_queries(
path=pathlib.Path(f"{manifest._study_path}/reference_sql/" + new_filename)
)
elif prepare:
table_builder.prepare_queries(
config=config,
manifest=manifest,
parser=db_parser,
)
_render_output(
manifest.get_study_prefix(), table_builder.queries, data_path, filename, query_count
)
else:
table_builder.execute_queries(
config=config,
manifest=manifest,
parser=db_parser,
)

num_queries = len(table_builder.queries)
# After running the executor code, we'll remove
# it so it doesn't interfere with the next python module to
# execute, since the subclass would otherwise hang around.
del sys.modules[table_builder_module.__name__]
del table_builder_module

return num_queries


def run_protected_table_builder(
config: base_utils.StudyConfig,
Expand All @@ -136,13 +158,36 @@ def run_protected_table_builder(


def _run_workflow(
config: base_utils.StudyConfig, manifest: study_manifest.StudyManifest, filename: str
) -> None:
config: base_utils.StudyConfig,
manifest: study_manifest.StudyManifest,
filename: str,
prepare: str,
data_path: pathlib.Path,
query_count: int,
) -> int:
"""Loads workflow config from toml definitions and executes workflow

:param config: a StudyConfig object
:param manifest: a StudyManifest object
:param filename: Filename of the workflow config
:param prepare: If true, will render query instead of executing
:param data_path: If prepare is true, the path to write rendered data to
:param query_count: if prepare is true, the number of queries already rendered
:returns: a count of processed queries
"""
toml_path = pathlib.Path(f"{manifest._study_path}/{filename}")
if prepare:
with open(toml_path, encoding="utf-8") as file:
workflow_config = file.read()
_render_output(
manifest.get_study_prefix(),
[workflow_config],
data_path,
filename,
query_count,
is_toml=True,
)
return 1
existing_stats = []
if not config.stats_build:
existing_stats = (
Expand All @@ -157,14 +202,13 @@ def _run_workflow(
# but we're letting it slide so that builders function similarly
# across the board
safe_timestamp = base_utils.get_tablename_safe_iso_timestamp()
toml_path = pathlib.Path(f"{manifest._study_path}/{filename}")
with open(toml_path, "rb") as file:
workflow_config = tomllib.load(file)
config_type = workflow_config["config_type"]
target_table = workflow_config.get("target_table", workflow_config.get("table_prefix", ""))

if (target_table,) in existing_stats and not config.stats_build:
return
return 0
match config_type:
case "psm":
builder = psm_builder.PsmBuilder(
Expand Down Expand Up @@ -195,6 +239,7 @@ def _run_workflow(
table_name=f"{target_table}_{safe_timestamp}",
view_name=target_table,
)
return len(builder.queries)


def build_matching_files(
Expand All @@ -203,13 +248,20 @@ def build_matching_files(
*,
builder: str | None,
db_parser: databases.DatabaseParser = None,
prepare: bool,
data_path: pathlib.Path,
):
"""targets all table builders matching a target string for running

:param config: a StudyConfig object
:param manifest: a StudyManifest object
:keyword builder: filename of a module implementing a TableBuilder
:keyword db_parser: an object implementing DatabaseParser for the target database"""
:keyword db_parser: an object implementing DatabaseParser for the target database
:keyword prepare: If true, will render query instead of executing
:keyword data_path: If prepare is true, the path to write rendered data to
"""
if prepare:
_check_if_preparable(manifest.get_study_prefix()) # pragma: no cover
all_generators = manifest.get_all_generators()
matches = []
if not builder: # pragma: no cover
Expand All @@ -218,7 +270,14 @@ def build_matching_files(
for file in all_generators:
if file.find(builder) != -1:
matches.append(file)
build_study(config, manifest, db_parser=db_parser, file_list=matches)
build_study(
config,
manifest,
db_parser=db_parser,
file_list=matches,
prepare=prepare,
data_path=data_path,
)


def build_study(
Expand All @@ -228,35 +287,86 @@ def build_study(
db_parser: databases.DatabaseParser = None,
continue_from: str | None = None,
file_list: list | None = None,
) -> list:
prepare: bool,
data_path: pathlib.Path | None,
) -> None:
"""Creates tables in the schema by iterating through the sql_config.file_names

:param config: a StudyConfig object
:param manifest: a StudyManifest object
:keyword db_parser: a parser for the target database
:keyword continue_from: Name of a file to resume table creation from
:returns: loaded queries (for unit testing only)
:keyword prepare: If true, will render query instead of executing
:keyword data_path: If prepare is true, the path to write rendered data to
"""
if prepare:
_check_if_preparable(manifest.get_study_prefix())
if file_list is None:
file_list = manifest.get_file_list(continue_from)
if prepare:
data_dir = data_path / manifest.get_study_prefix()
for file in data_dir.glob("*"):
if file.is_file(): # pragma: no cover
file.unlink()
query_count = 0
for file in file_list:
if file.endswith(".py"):
_load_and_execute_builder(
query_count += _load_and_execute_builder(
config=config,
manifest=manifest,
filename=file,
db_parser=db_parser,
data_path=data_path,
prepare=prepare,
query_count=query_count,
)
elif file.endswith(".toml"):
_run_workflow(config=config, manifest=manifest, filename=file)
query_count += _run_workflow(
config=config,
manifest=manifest,
filename=file,
data_path=data_path,
prepare=prepare,
query_count=query_count,
)
elif file.endswith(".sql"):
_run_raw_queries(config=config, manifest=manifest, filename=file)
query_count += _run_raw_queries(
config=config,
manifest=manifest,
filename=file,
data_path=data_path,
prepare=prepare,
query_count=query_count,
)
else:
raise errors.StudyManifestParsingError
if prepare:
with zipfile.ZipFile(
f"{data_path}/{manifest.get_study_prefix()}.zip", "w", zipfile.ZIP_DEFLATED
) as z:
for file in data_dir.iterdir():
z.write(file, file.relative_to(data_dir))


def _run_raw_queries(
config: base_utils.StudyConfig, manifest: study_manifest.StudyManifest, filename: str
):
config: base_utils.StudyConfig,
manifest: study_manifest.StudyManifest,
filename: str,
*,
data_path: pathlib.Path | None,
prepare: bool,
query_count: int,
) -> int:
"""Creates tables in the schema by iterating through the sql_config.file_names

:param config: a StudyConfig object
:param manifest: a StudyManifest object
:param filename: the name of the sql file to read
:param prepare: If true, will render query instead of executing
:param data_path: If prepare is true, the path to write rendered data to
:param query_count: the number of queries currently processed
:returns number of queries processed:
"""
queries = []
for query in base_utils.parse_sql(base_utils.load_text(f"{manifest._study_path}/{filename}")):
queries.append([query, filename])
Expand All @@ -266,25 +376,75 @@ def _run_raw_queries(
f"`{manifest.get_study_prefix()}__",
"`",
)
# We'll explicitly create a cursor since recreating cursors for each
# table in a study is slightly slower for some databases
cursor = config.db.cursor()
# We want to only show a progress bar if we are :not: printing SQL lines
with base_utils.get_progress_bar(disable=config.verbose) as progress:
task = progress.add_task(
f"Building tables from {filename}...",
total=len(queries),
visible=not config.verbose,
if prepare:
_render_output(
manifest.get_study_prefix(), [q[0] for q in queries], data_path, filename, query_count
)
_execute_build_queries(
config=config,
manifest=manifest,
cursor=cursor,
queries=queries,
progress=progress,
task=task,
else:
# We'll explicitly create a cursor since recreating cursors for each
# table in a study is slightly slower for some databases
cursor = config.db.cursor()
# We want to only show a progress bar if we are :not: printing SQL lines
with base_utils.get_progress_bar(disable=config.verbose) as progress:
task = progress.add_task(
f"Building tables from {filename}...",
total=len(queries),
visible=not config.verbose,
)
_execute_build_queries(
config=config,
manifest=manifest,
cursor=cursor,
queries=queries,
progress=progress,
task=task,
)
return len(queries)


def _render_output(
study_name: str,
outputs: list,
data_path: pathlib.Path,
filename: str,
count: int,
*,
is_toml: bool = False,
):
if is_toml:
suffix = "toml"
else:
suffix = "sql"
for index, output in enumerate(outputs):
if is_toml:
name = "config"
else:
# This regex attempts to discover the table name, via looking for the first
# dunder, and then gets the start of the line its on as a non-quote requiring
# part of a file name. So for example, finding SQL that looks like this:
# CREATE TABLE foo__bar AS (varchar baz)
# would result in `create_table_foo__bar` being assigned to name. The goal
# is to make this at least mildly parsable at the file system level if someone
# is reviewing a prepared study
name = re.search(r"(.*)__\w*", output)[0].lower().replace(" ", "_")
total_index = count + index
new_filename = f"{total_index:04d}.{filename.rsplit('.', 1)[0]}.{index:02d}.{name}.{suffix}"
file_path = data_path / f"{study_name}/{new_filename}"

file_path.parent.mkdir(exist_ok=True, parents=True)
with open(file_path, "w", encoding="UTF-8") as f:
f.write(output)


def _check_if_preparable(prefix):
# This list should include any study which requires interrogating the database to
# find if data is available to query (outside of a toml-driven workflow),
# which isn't doable as a distributed query
if prefix in ["core", "discovery", "data-metrics"]:
sys.exit(
f"Study '{prefix}'' does not support prepare mode. It must be run "
"directly against a target database."
)
return queries


def _query_error(
Expand Down
Loading
Loading