From 2a4f90ddb430c38c43841da184f98b47029b0830 Mon Sep 17 00:00:00 2001 From: Damon Bayer Date: Tue, 17 Dec 2024 16:40:21 -0600 Subject: [PATCH 01/19] Create README.md --- pipelines/README.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 pipelines/README.md diff --git a/pipelines/README.md b/pipelines/README.md new file mode 100644 index 00000000..920ca953 --- /dev/null +++ b/pipelines/README.md @@ -0,0 +1 @@ +# Pipelines From 7b077fcf6ef4b6e6b9c962daf49bc9fd9a5b7f22 Mon Sep 17 00:00:00 2001 From: Damon Bayer Date: Wed, 18 Dec 2024 12:02:59 -0600 Subject: [PATCH 02/19] move/rename build_model.py --- .../pyrenew/build_pyrenew_model.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename pipelines/{build_model.py => model_fitting/pyrenew/build_pyrenew_model.py} (100%) diff --git a/pipelines/build_model.py b/pipelines/model_fitting/pyrenew/build_pyrenew_model.py similarity index 100% rename from pipelines/build_model.py rename to pipelines/model_fitting/pyrenew/build_pyrenew_model.py From 6d4d8cca36a548db7d19c41e4b27401d6c1990bd Mon Sep 17 00:00:00 2001 From: Damon Bayer Date: Wed, 18 Dec 2024 13:45:57 -0600 Subject: [PATCH 03/19] delete unused files --- pipelines/default_priors.py | 68 ------------------------------------- pipelines/plot_parameters.R | 31 ----------------- 2 files changed, 99 deletions(-) delete mode 100755 pipelines/default_priors.py delete mode 100644 pipelines/plot_parameters.R diff --git a/pipelines/default_priors.py b/pipelines/default_priors.py deleted file mode 100755 index 05c4a9f4..00000000 --- a/pipelines/default_priors.py +++ /dev/null @@ -1,68 +0,0 @@ -import jax.numpy as jnp -import numpyro.distributions as dist -import pyrenew.transformation as transformation -from numpyro.infer.reparam import LocScaleReparam -from pyrenew.randomvariable import DistributionalVariable, TransformedVariable - -i0_first_obs_n_rv = DistributionalVariable( - "i0_first_obs_n_rv", - dist.Beta(1, 10), -) - -initialization_rate_rv = DistributionalVariable( - "rate", dist.Normal(0, 0.01), reparam=LocScaleReparam(0) -) - -r_logmean = jnp.log(1.2) -r_logsd = jnp.log(jnp.sqrt(2)) - -log_r_mu_intercept_rv = DistributionalVariable( - "log_r_mu_intercept_rv", dist.Normal(r_logmean, r_logsd) -) - -eta_sd_rv = DistributionalVariable( - "eta_sd", dist.TruncatedNormal(0.15, 0.05, low=0) -) - -autoreg_rt_rv = DistributionalVariable("autoreg_rt", dist.Beta(2, 40)) - - -inf_feedback_strength_rv = TransformedVariable( - "inf_feedback", - DistributionalVariable( - "inf_feedback_raw", - dist.LogNormal(jnp.log(50), jnp.log(1.5)), - ), - transforms=transformation.AffineTransform(loc=0, scale=-1), -) -# Could be reparameterized? - -p_ed_visit_mean_rv = DistributionalVariable( - "p_ed_visit_mean", - dist.Normal( - transformation.SigmoidTransform().inv(0.005), - 0.3, - ), -) # logit scale - - -p_ed_visit_w_sd_rv = DistributionalVariable( - "p_ed_visit_w_sd_sd", dist.TruncatedNormal(0, 0.01, low=0) -) - - -autoreg_p_ed_visit_rv = DistributionalVariable( - "autoreg_p_ed_visit_rv", dist.Beta(1, 100) -) - -ed_visit_wday_effect_rv = TransformedVariable( - "ed_visit_wday_effect", - DistributionalVariable( - "ed_visit_wday_effect_raw", - dist.Dirichlet(jnp.array([5, 5, 5, 5, 5, 5, 5])), - ), - transformation.AffineTransform(loc=0, scale=7), -) - -# Based on looking at some historical posteriors. -phi_rv = DistributionalVariable("phi", dist.LogNormal(4, 1)) diff --git a/pipelines/plot_parameters.R b/pipelines/plot_parameters.R deleted file mode 100644 index 5219bacf..00000000 --- a/pipelines/plot_parameters.R +++ /dev/null @@ -1,31 +0,0 @@ -library(dplyr) -library(ggplot2) -library(tidybayes) -library(ggdist) -library(tibble) - -pathogen <- "influenza" -job_path <- "" -state <- "US" - -fit <- arrow::read_parquet(fs::path(job_path, - "model_runs", - state, - "mcmc_tidy", - "pyrenew_posterior", - ext = "parquet" -)) |> - tibble() - - -inf_feedback <- fit |> - spread_draws(inf_feedback_raw, p_ed_visit_mean) |> - ggplot(aes( - x = inf_feedback_raw, - y = p_ed_visit_mean, - group = .draw - )) + - geom_point() + - scale_x_continuous(transform = "log10") + - scale_y_continuous(transform = "logit") + - theme_minimal() From 5452574884cf1d7842be107eb9cf279a541a90ed Mon Sep 17 00:00:00 2001 From: Damon Bayer Date: Wed, 18 Dec 2024 14:08:11 -0600 Subject: [PATCH 04/19] move and rename --- pipelines/{ => collate}/collate_plots.py | 0 pipelines/{ => collate}/collate_score_tables.R | 0 pipelines/{ => collate}/plot_category_pointintervals.R | 0 .../summarize_visualize_scores.R} | 0 .../render_diagnostic_report.R} | 0 pipelines/{fit_model.py => fit_model/fit_pyrenew_model.py} | 0 .../{model_fitting => fit_model}/pyrenew/build_pyrenew_model.py | 0 pipelines/{ => fit_model}/timeseries_forecasts.R | 0 pipelines/{ => hubverse}/create_hubverse_table.R | 0 pipelines/{ => hubverse}/create_observed_data_tables.py | 0 .../make_obs_time_series_for_viz.py} | 0 pipelines/{ => hubverse}/score_hubverse.R | 0 pipelines/{ => postproces}/convert_inferencedata_to_parquet.R | 0 pipelines/{ => postproces}/generate_predictive.py | 0 .../plot_state_forecast.R} | 0 pipelines/{ => postproces}/score_forecast.R | 0 pipelines/{ => preprocess}/generate_epiweekly.R | 0 pipelines/{ => preprocess}/prep_data.py | 0 pipelines/{save_eval_data.py => preprocess/prep_eval_data.py} | 0 pipelines/{ => preprocess}/pull_nhsn.R | 0 pipelines/{ => workflows}/forecast_state.py | 0 21 files changed, 0 insertions(+), 0 deletions(-) rename pipelines/{ => collate}/collate_plots.py (100%) rename pipelines/{ => collate}/collate_score_tables.R (100%) rename pipelines/{ => collate}/plot_category_pointintervals.R (100%) rename pipelines/{postprocess_scoring.R => collate/summarize_visualize_scores.R} (100%) rename pipelines/{render_webpage.R => diagnostic_report/render_diagnostic_report.R} (100%) rename pipelines/{fit_model.py => fit_model/fit_pyrenew_model.py} (100%) rename pipelines/{model_fitting => fit_model}/pyrenew/build_pyrenew_model.py (100%) rename pipelines/{ => fit_model}/timeseries_forecasts.R (100%) rename pipelines/{ => hubverse}/create_hubverse_table.R (100%) rename pipelines/{ => hubverse}/create_observed_data_tables.py (100%) rename pipelines/{pull_state_timeseries.py => hubverse/make_obs_time_series_for_viz.py} (100%) rename pipelines/{ => hubverse}/score_hubverse.R (100%) rename pipelines/{ => postproces}/convert_inferencedata_to_parquet.R (100%) rename pipelines/{ => postproces}/generate_predictive.py (100%) rename pipelines/{postprocess_state_forecast.R => postproces/plot_state_forecast.R} (100%) rename pipelines/{ => postproces}/score_forecast.R (100%) rename pipelines/{ => preprocess}/generate_epiweekly.R (100%) rename pipelines/{ => preprocess}/prep_data.py (100%) rename pipelines/{save_eval_data.py => preprocess/prep_eval_data.py} (100%) rename pipelines/{ => preprocess}/pull_nhsn.R (100%) rename pipelines/{ => workflows}/forecast_state.py (100%) diff --git a/pipelines/collate_plots.py b/pipelines/collate/collate_plots.py similarity index 100% rename from pipelines/collate_plots.py rename to pipelines/collate/collate_plots.py diff --git a/pipelines/collate_score_tables.R b/pipelines/collate/collate_score_tables.R similarity index 100% rename from pipelines/collate_score_tables.R rename to pipelines/collate/collate_score_tables.R diff --git a/pipelines/plot_category_pointintervals.R b/pipelines/collate/plot_category_pointintervals.R similarity index 100% rename from pipelines/plot_category_pointintervals.R rename to pipelines/collate/plot_category_pointintervals.R diff --git a/pipelines/postprocess_scoring.R b/pipelines/collate/summarize_visualize_scores.R similarity index 100% rename from pipelines/postprocess_scoring.R rename to pipelines/collate/summarize_visualize_scores.R diff --git a/pipelines/render_webpage.R b/pipelines/diagnostic_report/render_diagnostic_report.R similarity index 100% rename from pipelines/render_webpage.R rename to pipelines/diagnostic_report/render_diagnostic_report.R diff --git a/pipelines/fit_model.py b/pipelines/fit_model/fit_pyrenew_model.py similarity index 100% rename from pipelines/fit_model.py rename to pipelines/fit_model/fit_pyrenew_model.py diff --git a/pipelines/model_fitting/pyrenew/build_pyrenew_model.py b/pipelines/fit_model/pyrenew/build_pyrenew_model.py similarity index 100% rename from pipelines/model_fitting/pyrenew/build_pyrenew_model.py rename to pipelines/fit_model/pyrenew/build_pyrenew_model.py diff --git a/pipelines/timeseries_forecasts.R b/pipelines/fit_model/timeseries_forecasts.R similarity index 100% rename from pipelines/timeseries_forecasts.R rename to pipelines/fit_model/timeseries_forecasts.R diff --git a/pipelines/create_hubverse_table.R b/pipelines/hubverse/create_hubverse_table.R similarity index 100% rename from pipelines/create_hubverse_table.R rename to pipelines/hubverse/create_hubverse_table.R diff --git a/pipelines/create_observed_data_tables.py b/pipelines/hubverse/create_observed_data_tables.py similarity index 100% rename from pipelines/create_observed_data_tables.py rename to pipelines/hubverse/create_observed_data_tables.py diff --git a/pipelines/pull_state_timeseries.py b/pipelines/hubverse/make_obs_time_series_for_viz.py similarity index 100% rename from pipelines/pull_state_timeseries.py rename to pipelines/hubverse/make_obs_time_series_for_viz.py diff --git a/pipelines/score_hubverse.R b/pipelines/hubverse/score_hubverse.R similarity index 100% rename from pipelines/score_hubverse.R rename to pipelines/hubverse/score_hubverse.R diff --git a/pipelines/convert_inferencedata_to_parquet.R b/pipelines/postproces/convert_inferencedata_to_parquet.R similarity index 100% rename from pipelines/convert_inferencedata_to_parquet.R rename to pipelines/postproces/convert_inferencedata_to_parquet.R diff --git a/pipelines/generate_predictive.py b/pipelines/postproces/generate_predictive.py similarity index 100% rename from pipelines/generate_predictive.py rename to pipelines/postproces/generate_predictive.py diff --git a/pipelines/postprocess_state_forecast.R b/pipelines/postproces/plot_state_forecast.R similarity index 100% rename from pipelines/postprocess_state_forecast.R rename to pipelines/postproces/plot_state_forecast.R diff --git a/pipelines/score_forecast.R b/pipelines/postproces/score_forecast.R similarity index 100% rename from pipelines/score_forecast.R rename to pipelines/postproces/score_forecast.R diff --git a/pipelines/generate_epiweekly.R b/pipelines/preprocess/generate_epiweekly.R similarity index 100% rename from pipelines/generate_epiweekly.R rename to pipelines/preprocess/generate_epiweekly.R diff --git a/pipelines/prep_data.py b/pipelines/preprocess/prep_data.py similarity index 100% rename from pipelines/prep_data.py rename to pipelines/preprocess/prep_data.py diff --git a/pipelines/save_eval_data.py b/pipelines/preprocess/prep_eval_data.py similarity index 100% rename from pipelines/save_eval_data.py rename to pipelines/preprocess/prep_eval_data.py diff --git a/pipelines/pull_nhsn.R b/pipelines/preprocess/pull_nhsn.R similarity index 100% rename from pipelines/pull_nhsn.R rename to pipelines/preprocess/pull_nhsn.R diff --git a/pipelines/forecast_state.py b/pipelines/workflows/forecast_state.py similarity index 100% rename from pipelines/forecast_state.py rename to pipelines/workflows/forecast_state.py From 310473bdb1a1473e3786224094b4b0c6b752dc42 Mon Sep 17 00:00:00 2001 From: Damon Bayer Date: Wed, 18 Dec 2024 17:11:14 -0600 Subject: [PATCH 05/19] change import references --- pipelines/__init__.py | 0 pipelines/batch/__init__.py | 0 pipelines/batch/setup_eval_job.py | 2 +- .../batch/setup_parameter_inference_job.py | 2 +- pipelines/batch/setup_prod_job.py | 2 +- pipelines/collate/__init__.py | 0 pipelines/diagnostic_report/__init__.py | 0 pipelines/fit_model/__init__.py | 0 pipelines/fit_model/pyrenew/__init__.py | 0 .../{ => pyrenew}/fit_pyrenew_model.py | 2 +- pipelines/forecast_state.py | 528 ++++++++++++++++++ pipelines/hubverse/__init__.py | 0 .../hubverse/create_observed_data_tables.py | 5 +- pipelines/iteration_helpers/loop_fit.sh | 2 +- .../loop_generate_predictive.sh | 2 +- .../iteration_helpers/loop_postprocess.sh | 4 +- pipelines/iteration_helpers/loop_score.sh | 2 +- pipelines/postprocess/__init__.py | 0 .../convert_inferencedata_to_parquet.R | 0 .../generate_predictive.py | 2 +- .../plot_state_forecast.R | 0 .../score_forecast.R | 0 pipelines/preprocess/__init__.py | 0 pipelines/preprocess/prep_eval_data.py | 2 +- pipelines/tests/test_forecast_state.sh | 2 +- pipelines/tests/test_run.sh | 50 -- pipelines/workflows/__init__.py | 0 pipelines/workflows/forecast_state.py | 32 +- 28 files changed, 560 insertions(+), 79 deletions(-) create mode 100644 pipelines/__init__.py create mode 100644 pipelines/batch/__init__.py create mode 100644 pipelines/collate/__init__.py create mode 100644 pipelines/diagnostic_report/__init__.py create mode 100644 pipelines/fit_model/__init__.py create mode 100644 pipelines/fit_model/pyrenew/__init__.py rename pipelines/fit_model/{ => pyrenew}/fit_pyrenew_model.py (97%) create mode 100644 pipelines/forecast_state.py create mode 100644 pipelines/hubverse/__init__.py create mode 100644 pipelines/postprocess/__init__.py rename pipelines/{postproces => postprocess}/convert_inferencedata_to_parquet.R (100%) rename pipelines/{postproces => postprocess}/generate_predictive.py (96%) rename pipelines/{postproces => postprocess}/plot_state_forecast.R (100%) rename pipelines/{postproces => postprocess}/score_forecast.R (100%) create mode 100644 pipelines/preprocess/__init__.py delete mode 100644 pipelines/tests/test_run.sh create mode 100644 pipelines/workflows/__init__.py diff --git a/pipelines/__init__.py b/pipelines/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pipelines/batch/__init__.py b/pipelines/batch/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pipelines/batch/setup_eval_job.py b/pipelines/batch/setup_eval_job.py index 6b51fdaa..1dba1ee6 100644 --- a/pipelines/batch/setup_eval_job.py +++ b/pipelines/batch/setup_eval_job.py @@ -127,7 +127,7 @@ def main( base_call = ( "/bin/bash -c '" - "python pipelines/forecast_state.py " + "python pipelines/workflows/forecast_state.py " "--disease {disease} " "--state {state} " "--n-training-days {n_training} " diff --git a/pipelines/batch/setup_parameter_inference_job.py b/pipelines/batch/setup_parameter_inference_job.py index 72adde94..c49a68dc 100644 --- a/pipelines/batch/setup_parameter_inference_job.py +++ b/pipelines/batch/setup_parameter_inference_job.py @@ -128,7 +128,7 @@ def main( base_call = ( "/bin/bash -c '" - "python pipelines/forecast_state.py " + "python pipelines/workflows/forecast_state.py " "--disease {disease} " "--state {state} " "--n-training-days 450 " diff --git a/pipelines/batch/setup_prod_job.py b/pipelines/batch/setup_prod_job.py index 2b8040f7..f604874d 100644 --- a/pipelines/batch/setup_prod_job.py +++ b/pipelines/batch/setup_prod_job.py @@ -156,7 +156,7 @@ def main( base_call = ( "/bin/bash -c '" - "python pipelines/forecast_state.py " + "python pipelines/workflows/forecast_state.py " "--disease {disease} " "--state {state} " "--n-training-days {n_training_days} " diff --git a/pipelines/collate/__init__.py b/pipelines/collate/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pipelines/diagnostic_report/__init__.py b/pipelines/diagnostic_report/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pipelines/fit_model/__init__.py b/pipelines/fit_model/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pipelines/fit_model/pyrenew/__init__.py b/pipelines/fit_model/pyrenew/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pipelines/fit_model/fit_pyrenew_model.py b/pipelines/fit_model/pyrenew/fit_pyrenew_model.py similarity index 97% rename from pipelines/fit_model/fit_pyrenew_model.py rename to pipelines/fit_model/pyrenew/fit_pyrenew_model.py index cc2a731e..8a37456c 100644 --- a/pipelines/fit_model/fit_pyrenew_model.py +++ b/pipelines/fit_model/pyrenew/fit_pyrenew_model.py @@ -4,7 +4,7 @@ import jax import numpy as np -from build_model import build_model_from_dir +from fit_model.pyrenew.build_pyrenew_model import build_model_from_dir def fit_and_save_model( diff --git a/pipelines/forecast_state.py b/pipelines/forecast_state.py new file mode 100644 index 00000000..8f06dd6a --- /dev/null +++ b/pipelines/forecast_state.py @@ -0,0 +1,528 @@ +import argparse +import logging +import os +import shutil +import subprocess +from datetime import datetime, timedelta +from pathlib import Path + +import numpyro +import polars as pl +import tomli_w +import tomllib +from preprocess.prep_data import process_and_save_state +from pygit2 import Repository +from preprocess.prep_eval_data import save_eval_data + +numpyro.set_host_device_count(4) + +from fit_model.pyrenew.fit_pyrenew_model import fit_and_save_model # noqa +from postprocess.generate_predictive import generate_and_save_predictions # noqa + + +def record_git_info(model_run_dir: Path): + metadata_file = Path(model_run_dir, "metadata.toml") + + if metadata_file.exists(): + with open(metadata_file, "rb") as file: + metadata = tomllib.load(file) + else: + metadata = {} + + try: + repo = Repository(os.getcwd()) + branch_name = repo.head.shorthand + commit_sha = str(repo.head.target) + except Exception as e: + branch_name = os.environ.get("GIT_BRANCH_NAME", "unknown") + commit_sha = os.environ.get("GIT_COMMIT_SHA", "unknown") + + new_metadata = { + "branch_name": branch_name, + "commit_sha": commit_sha, + } + + metadata.update(new_metadata) + + metadata_file.parent.mkdir(parents=True, exist_ok=True) + with open(metadata_file, "wb") as file: + tomli_w.dump(metadata, file) + + +def copy_and_record_priors(priors_path: Path, model_run_dir: Path): + metadata_file = Path(model_run_dir, "metadata.toml") + shutil.copyfile(priors_path, Path(model_run_dir, "priors.py")) + + if metadata_file.exists(): + with open(metadata_file, "rb") as file: + metadata = tomllib.load(file) + else: + metadata = {} + + new_metadata = { + "priors_path": str(priors_path), + } + + metadata.update(new_metadata) + + with open(metadata_file, "wb") as file: + tomli_w.dump(metadata, file) + + +def generate_epiweekly(model_run_dir: Path) -> None: + result = subprocess.run( + [ + "Rscript", + "pipelines/preprocess/generate_epiweekly.R", + f"{model_run_dir}", + ], + capture_output=True, + ) + if result.returncode != 0: + raise RuntimeError(f"generate_epiweekly: {result.stderr}") + return None + + +def timeseries_forecasts( + model_run_dir: Path, model_name: str, n_forecast_days: int, n_samples: int +) -> None: + result = subprocess.run( + [ + "Rscript", + "pipelines/fit_model/timeseries_forecasts.R", + f"{model_run_dir}", + "--model-name", + f"{model_name}", + "--n-forecast-days", + f"{n_forecast_days}", + "--n-samples", + f"{n_samples}", + ], + capture_output=True, + ) + if result.returncode != 0: + raise RuntimeError(f"timeseries_forecasts: {result.stderr}") + return None + + +def convert_inferencedata_to_parquet( + model_run_dir: Path, model_name: str +) -> None: + result = subprocess.run( + [ + "Rscript", + "pipelines/postprocess/convert_inferencedata_to_parquet.R", + f"{model_run_dir}", + "--model-name", + f"{model_name}", + ], + capture_output=True, + ) + if result.returncode != 0: + raise RuntimeError( + f"convert_inferencedata_to_parquet: {result.stderr}" + ) + return None + + +def plot_state_forecast( + model_run_dir: Path, pyrenew_model_name: str, timeseries_model_name: str +) -> None: + result = subprocess.run( + [ + "Rscript", + "pipelines/postprocess/plot_state_forecast.R", + f"{model_run_dir}", + "--pyrenew-model-name", + f"{pyrenew_model_name}", + "--timeseries-model-name", + f"{timeseries_model_name}", + ], + capture_output=True, + ) + if result.returncode != 0: + raise RuntimeError(f"plot_state_forecast: {result.stderr}") + return None + + +def score_forecast(model_run_dir: Path) -> None: + result = subprocess.run( + [ + "Rscript", + "pipelines/postprocess/score_forecast.R", + f"{model_run_dir}", + ], + capture_output=True, + ) + if result.returncode != 0: + raise RuntimeError(f"score_forecast: {result.stderr}") + return None + + +def render_diagnostic_report(model_run_dir: Path) -> None: + result = subprocess.run( + [ + "Rscript", + "pipelines/diagnostic_report/render_diagnostic_report.R", + f"{model_run_dir}", + ], + capture_output=True, + ) + if result.returncode != 0: + raise RuntimeError(f"render_diagnostic_report: {result.stderr}") + return None + + +def get_available_reports( + data_dir: str | Path, glob_pattern: str = "*.parquet" +): + return [ + datetime.strptime(f.stem, "%Y-%m-%d").date() + for f in Path(data_dir).glob(glob_pattern) + ] + + +def main( + disease: str, + report_date: str, + state: str, + facility_level_nssp_data_dir: Path | str, + state_level_nssp_data_dir: Path | str, + param_data_dir: Path | str, + priors_path: Path | str, + output_dir: Path | str, + n_training_days: int, + n_forecast_days: int, + n_chains: int, + n_warmup: int, + n_samples: int, + exclude_last_n_days: int = 0, + score: bool = False, + eval_data_path: Path = None, +): + logging.basicConfig(level=logging.INFO) + logger = logging.getLogger(__name__) + + available_facility_level_reports = get_available_reports( + facility_level_nssp_data_dir + ) + + available_state_level_reports = get_available_reports( + state_level_nssp_data_dir + ) + first_available_state_report = min(available_state_level_reports) + last_available_state_report = max(available_state_level_reports) + + if report_date == "latest": + report_date = max(available_facility_level_reports) + else: + report_date = datetime.strptime(report_date, "%Y-%m-%d").date() + + if report_date in available_state_level_reports: + state_report_date = report_date + elif report_date > last_available_state_report: + state_report_date = last_available_state_report + elif report_date > first_available_state_report: + raise ValueError( + "Dataset appear to be missing some state-level " + f"reports. First entry is {first_available_state_report}, " + f"last is {last_available_state_report}, but no entry " + f"for {report_date}" + ) + else: + raise ValueError( + "Requested report date is earlier than the first " + "state-level vintage. This is not currently supported" + ) + + logger.info(f"Report date: {report_date}") + if state_report_date is not None: + logger.info(f"Using state-level data as of: {state_report_date}") + + # + 1 because max date in dataset is report_date - 1 + last_training_date = report_date - timedelta(days=exclude_last_n_days + 1) + + if last_training_date >= report_date: + raise ValueError( + "Last training date must be before the report date. " + "Got a last training date of {last_training_date} " + "with a report date of {report_date}." + ) + + logger.info(f"last training date: {last_training_date}") + + first_training_date = last_training_date - timedelta( + days=n_training_days - 1 + ) + + logger.info(f"First training date {first_training_date}") + + facility_level_nssp_data, state_level_nssp_data = None, None + + if report_date in available_facility_level_reports: + logger.info( + "Facility level data available for " "the given report date" + ) + facility_datafile = f"{report_date}.parquet" + facility_level_nssp_data = pl.scan_parquet( + Path(facility_level_nssp_data_dir, facility_datafile) + ) + if state_report_date in available_state_level_reports: + logger.info("State-level data available for the given report " "date.") + state_datafile = f"{state_report_date}.parquet" + state_level_nssp_data = pl.scan_parquet( + Path(state_level_nssp_data_dir, state_datafile) + ) + if facility_level_nssp_data is None and state_level_nssp_data is None: + raise ValueError( + "No data available for the requested report date " f"{report_date}" + ) + + param_estimates = pl.scan_parquet(Path(param_data_dir, "prod.parquet")) + model_batch_dir_name = ( + f"{disease.lower()}_r_{report_date}_f_" + f"{first_training_date}_t_{last_training_date}" + ) + + model_batch_dir = Path(output_dir, model_batch_dir_name) + + model_run_dir = Path(model_batch_dir, "model_runs", state) + + os.makedirs(model_run_dir, exist_ok=True) + + logger.info("Recording git info...") + record_git_info(model_run_dir) + + logger.info(f"Copying and recording priors from {priors_path}...") + copy_and_record_priors(priors_path, model_run_dir) + + logger.info(f"Processing {state}") + process_and_save_state( + state_abb=state, + disease=disease, + facility_level_nssp_data=facility_level_nssp_data, + state_level_nssp_data=state_level_nssp_data, + report_date=report_date, + state_level_report_date=state_report_date, + first_training_date=first_training_date, + last_training_date=last_training_date, + param_estimates=param_estimates, + model_run_dir=model_run_dir, + logger=logger, + ) + logger.info("Getting eval data...") + if eval_data_path is None: + raise ValueError("No path to an evaluation dataset provided.") + save_eval_data( + state=state, + report_date=report_date, + disease=disease, + first_training_date=first_training_date, + last_training_date=last_training_date, + latest_comprehensive_path=eval_data_path, + output_data_dir=Path(model_run_dir, "data"), + last_eval_date=report_date + timedelta(days=n_forecast_days), + ) + + logger.info("Generating epiweekly datasets from daily datasets...") + generate_epiweekly(model_run_dir) + + logger.info("Data preparation complete.") + + logger.info("Fitting model") + fit_and_save_model( + model_run_dir, + "pyrenew_e", + n_warmup=n_warmup, + n_samples=n_samples, + n_chains=n_chains, + ) + logger.info("Model fitting complete") + + logger.info("Performing posterior prediction / forecasting...") + + n_days_past_last_training = n_forecast_days + exclude_last_n_days + generate_and_save_predictions( + model_run_dir, "pyrenew_e", n_days_past_last_training + ) + + logger.info( + "Performing baseline forecasting and non-target pathogen " + "forecasting..." + ) + n_denominator_samples = n_samples * n_chains + timeseries_forecasts( + model_run_dir, + "timeseries_e", + n_days_past_last_training, + n_denominator_samples, + ) + logger.info("All forecasting complete.") + + logger.info("Converting inferencedata to parquet...") + convert_inferencedata_to_parquet(model_run_dir, "pyrenew_e") + logger.info("Conversion complete.") + + logger.info("Postprocessing forecast...") + plot_state_forecast(model_run_dir, "pyrenew_e", "timeseries_e") + logger.info("Postprocessing complete.") + + logger.info("Rendering webpage...") + render_diagnostic_report(model_run_dir) + logger.info("Rendering complete.") + + if score: + logger.info("Scoring forecast...") + score_forecast(model_run_dir) + + logger.info( + "Single state pipeline complete " + f"for state {state} with " + f"report date {report_date}." + ) + return None + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Create fit data for disease modeling." + ) + parser.add_argument( + "--disease", + type=str, + required=True, + help="Disease to model (e.g., COVID-19, Influenza, RSV).", + ) + + parser.add_argument( + "--state", + type=str, + required=True, + help=( + "Two letter abbreviation for the state to fit" + "(e.g. 'AK', 'AL', 'AZ', etc.)." + ), + ) + + parser.add_argument( + "--report-date", + type=str, + default="latest", + help="Report date in YYYY-MM-DD format or latest (default: latest).", + ) + + parser.add_argument( + "--facility-level-nssp-data-dir", + type=Path, + default=Path("private_data", "nssp_etl_gold"), + help=( + "Directory in which to look for facility-level NSSP " + "ED visit data" + ), + ) + + parser.add_argument( + "--state-level-nssp-data-dir", + type=Path, + default=Path("private_data", "nssp_state_level_gold"), + help=( + "Directory in which to look for state-level NSSP " "ED visit data." + ), + ) + + parser.add_argument( + "--param-data-dir", + type=Path, + default=Path("private_data", "prod_param_estimates"), + help=( + "Directory in which to look for parameter estimates" + "such as delay PMFs." + ), + required=True, + ) + + parser.add_argument( + "--priors-path", + type=Path, + help=( + "Path to an executible python file defining random variables " + "that require priors as pyrenew RandomVariable objects." + ), + required=True, + ) + + parser.add_argument( + "--output-dir", + type=Path, + default="private_data", + help="Directory in which to save output.", + ) + + parser.add_argument( + "--n-training-days", + type=int, + default=180, + help="Number of training days (default: 180).", + ) + + parser.add_argument( + "--n-forecast-days", + type=int, + default=28, + help=( + "Number of days ahead to forecast relative to the " + "report date (default: 28).", + ), + ) + + parser.add_argument( + "--n-chains", + type=int, + default=4, + help="Number of MCMC chains to run (default: 4).", + ) + + parser.add_argument( + "--n-warmup", + type=int, + default=1000, + help=( + "Number of warmup iterations per chain for NUTS" "(default: 1000)." + ), + ) + + parser.add_argument( + "--n-samples", + type=int, + default=1000, + help=( + "Number of posterior samples to draw per " + "chain using NUTS (default: 1000)." + ), + ) + + parser.add_argument( + "--exclude-last-n-days", + type=int, + default=0, + help=( + "Optionally exclude the final n days of available training " + "data (Default: 0, i.e. exclude no available data" + ), + ) + + parser.add_argument( + "--score", + type=bool, + action=argparse.BooleanOptionalAction, + help=("If this flag is provided, will attempt to score the forecast."), + ) + + parser.add_argument( + "--eval-data-path", + type=Path, + help=("Path to a parquet file containing compehensive truth data."), + ) + args = parser.parse_args() + numpyro.set_host_device_count(args.n_chains) + main(**vars(args)) diff --git a/pipelines/hubverse/__init__.py b/pipelines/hubverse/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pipelines/hubverse/create_observed_data_tables.py b/pipelines/hubverse/create_observed_data_tables.py index 16ad780c..27c8b4f1 100644 --- a/pipelines/hubverse/create_observed_data_tables.py +++ b/pipelines/hubverse/create_observed_data_tables.py @@ -5,7 +5,10 @@ import epiweeks import polars as pl -from prep_data import aggregate_facility_level_nssp_to_state, get_state_pop_df +from preprocess.prep_data import ( + aggregate_facility_level_nssp_to_state, + get_state_pop_df, +) def save_observed_data_tables( diff --git a/pipelines/iteration_helpers/loop_fit.sh b/pipelines/iteration_helpers/loop_fit.sh index 837d5cf0..d59d2933 100755 --- a/pipelines/iteration_helpers/loop_fit.sh +++ b/pipelines/iteration_helpers/loop_fit.sh @@ -13,5 +13,5 @@ BASE_DIR="$1" for SUBDIR in "$BASE_DIR"/*/; do # Run the Python script with the current subdirectory as the model_dir argument echo "$SUBDIR" - python fit_model.py "$SUBDIR" + python fit_model.pyrenew.fit_pyrenew_model.py "$SUBDIR" done diff --git a/pipelines/iteration_helpers/loop_generate_predictive.sh b/pipelines/iteration_helpers/loop_generate_predictive.sh index b2510d21..c533f616 100755 --- a/pipelines/iteration_helpers/loop_generate_predictive.sh +++ b/pipelines/iteration_helpers/loop_generate_predictive.sh @@ -14,5 +14,5 @@ BASE_DIR="$1" for SUBDIR in "$BASE_DIR"/*/; do # Run the Python script with the current subdirectory as the model_dir argument echo "$SUBDIR" - python generate_predictive.py "$SUBDIR" --n-forecast-points 28 + python postprocess/generate_predictive.py "$SUBDIR" --n-forecast-points 28 done diff --git a/pipelines/iteration_helpers/loop_postprocess.sh b/pipelines/iteration_helpers/loop_postprocess.sh index 22603452..f2fb80db 100755 --- a/pipelines/iteration_helpers/loop_postprocess.sh +++ b/pipelines/iteration_helpers/loop_postprocess.sh @@ -13,6 +13,6 @@ BASE_DIR="$1" for SUBDIR in "$BASE_DIR"/*/; do # Run the R script with the current subdirectory as the model_dir argument echo "$SUBDIR" - Rscript convert_inferencedata_to_parquet.R "$SUBDIR" - Rscript postprocess_state_forecast.R "$SUBDIR" + Rscript postprocess/convert_inferencedata_to_parquet.R "$SUBDIR" + Rscript postproces/plot_state_forecast.R "$SUBDIR" done diff --git a/pipelines/iteration_helpers/loop_score.sh b/pipelines/iteration_helpers/loop_score.sh index 6ead92cb..d584e26b 100755 --- a/pipelines/iteration_helpers/loop_score.sh +++ b/pipelines/iteration_helpers/loop_score.sh @@ -13,5 +13,5 @@ BASE_DIR="$1" for SUBDIR in "$BASE_DIR"/*/; do # Run the R script with the current subdirectory as the model_dir argument echo "$SUBDIR" - Rscript score_forecast.R "$SUBDIR" + Rscript postprocess/score_forecast.R "$SUBDIR" done diff --git a/pipelines/postprocess/__init__.py b/pipelines/postprocess/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pipelines/postproces/convert_inferencedata_to_parquet.R b/pipelines/postprocess/convert_inferencedata_to_parquet.R similarity index 100% rename from pipelines/postproces/convert_inferencedata_to_parquet.R rename to pipelines/postprocess/convert_inferencedata_to_parquet.R diff --git a/pipelines/postproces/generate_predictive.py b/pipelines/postprocess/generate_predictive.py similarity index 96% rename from pipelines/postproces/generate_predictive.py rename to pipelines/postprocess/generate_predictive.py index 773a5503..589f6c0a 100644 --- a/pipelines/postproces/generate_predictive.py +++ b/pipelines/postprocess/generate_predictive.py @@ -3,7 +3,7 @@ from pathlib import Path import arviz as az -from build_model import build_model_from_dir +from fit_model.pyrenew.build_pyrenew_model import build_model_from_dir def generate_and_save_predictions( diff --git a/pipelines/postproces/plot_state_forecast.R b/pipelines/postprocess/plot_state_forecast.R similarity index 100% rename from pipelines/postproces/plot_state_forecast.R rename to pipelines/postprocess/plot_state_forecast.R diff --git a/pipelines/postproces/score_forecast.R b/pipelines/postprocess/score_forecast.R similarity index 100% rename from pipelines/postproces/score_forecast.R rename to pipelines/postprocess/score_forecast.R diff --git a/pipelines/preprocess/__init__.py b/pipelines/preprocess/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pipelines/preprocess/prep_eval_data.py b/pipelines/preprocess/prep_eval_data.py index 6a85389b..fca17f55 100644 --- a/pipelines/preprocess/prep_eval_data.py +++ b/pipelines/preprocess/prep_eval_data.py @@ -3,7 +3,7 @@ from pathlib import Path import polars as pl -from prep_data import get_state_pop_df, process_state_level_data +from preprocess.prep_data import get_state_pop_df, process_state_level_data def save_eval_data( diff --git a/pipelines/tests/test_forecast_state.sh b/pipelines/tests/test_forecast_state.sh index 56a42e4f..1eef160b 100755 --- a/pipelines/tests/test_forecast_state.sh +++ b/pipelines/tests/test_forecast_state.sh @@ -16,7 +16,7 @@ else echo "TEST-MODE: Finished generating test data" fi echo "TEST-MODE: Running forecasting pipeline" -python pipelines/forecast_state.py \ +python pipelines/workflows/forecast_state.py \ --disease "COVID-19" \ --state "CA" \ --facility-level-nssp-data-dir "$BASE_DIR/private_data/nssp_etl_gold" \ diff --git a/pipelines/tests/test_run.sh b/pipelines/tests/test_run.sh deleted file mode 100644 index 50258803..00000000 --- a/pipelines/tests/test_run.sh +++ /dev/null @@ -1,50 +0,0 @@ -#!/bin/bash - -# Check if the base directory is provided as an argument -if [ -z "$1" ]; then - echo "Usage: $0 " - exit 1 -fi - -# Base directory containing subdirectories -BASE_DIR="$1" -N_SAMPLES=$2 -N_AHEAD=$3 - -# Iterate over each subdirectory in the base directory -echo "TEST-MODE: Running loop over subdirectories in $BASE_DIR" -echo "For $N_SAMPLES samples on 1 chain, and $N_AHEAD forecast points" -echo "" -for SUBDIR in "$BASE_DIR"/*/; do - echo "TEST-MODE: Inference for $SUBDIR" - python fit_model.py "$SUBDIR" --n-chains 1 --n-samples $N_SAMPLES - echo "TEST-MODE: Finished inference" - echo "" - echo "TEST-MODE: Generating posterior predictions for $SUBDIR" - python generate_predictive.py "$SUBDIR" --n-forecast-points $N_AHEAD - echo "TEST-MODE: Finished generating posterior predictions" - echo "" - echo "TEST-MODE: Converting inferencedata to parquet for $SUBDIR" - Rscript convert_inferencedata_to_parquet.R "$SUBDIR" - echo "TEST-MODE: Finished converting inferencedata to parquet" - echo "" - echo "TEST-MODE: Generate epiweekly data for $SUBDIR" - Rscript generate_epiweekly.R "$SUBDIR" - echo "TEST-MODE: Finished generating epiweekly data" - echo "" - echo "TEST-MODE: Forecasting baseline models for $SUBDIR" - Rscript timeseries_forecasts.R "$SUBDIR" --n-forecast-days $N_AHEAD --n-samples $N_SAMPLES - echo "TEST-MODE: Finished forecasting baseline models" - echo "" - echo "TEST-MODE: Postprocessing state forecast for $SUBDIR" - Rscript postprocess_state_forecast.R "$SUBDIR" - echo "TEST-MODE: Finished postprocessing state forecast" - echo "" - echo "TEST-MODE: Rendering webpage for $SUBDIR" - Rscript render_webpage.R "$SUBDIR" - echo "TEST-MODE: Finished rendering webpage" - echo "" - echo "TEST-MODE: Scoring forecast for $SUBDIR" - Rscript score_forecast.R "$SUBDIR" - echo "TEST-MODE: Finished scoring forecast" -done diff --git a/pipelines/workflows/__init__.py b/pipelines/workflows/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pipelines/workflows/forecast_state.py b/pipelines/workflows/forecast_state.py index 97f3946f..8f06dd6a 100644 --- a/pipelines/workflows/forecast_state.py +++ b/pipelines/workflows/forecast_state.py @@ -10,14 +10,14 @@ import polars as pl import tomli_w import tomllib -from prep_data import process_and_save_state +from preprocess.prep_data import process_and_save_state from pygit2 import Repository -from save_eval_data import save_eval_data +from preprocess.prep_eval_data import save_eval_data numpyro.set_host_device_count(4) -from fit_model import fit_and_save_model # noqa -from generate_predictive import generate_and_save_predictions # noqa +from fit_model.pyrenew.fit_pyrenew_model import fit_and_save_model # noqa +from postprocess.generate_predictive import generate_and_save_predictions # noqa def record_git_info(model_run_dir: Path): @@ -73,7 +73,7 @@ def generate_epiweekly(model_run_dir: Path) -> None: result = subprocess.run( [ "Rscript", - "pipelines/generate_epiweekly.R", + "pipelines/preprocess/generate_epiweekly.R", f"{model_run_dir}", ], capture_output=True, @@ -89,7 +89,7 @@ def timeseries_forecasts( result = subprocess.run( [ "Rscript", - "pipelines/timeseries_forecasts.R", + "pipelines/fit_model/timeseries_forecasts.R", f"{model_run_dir}", "--model-name", f"{model_name}", @@ -111,7 +111,7 @@ def convert_inferencedata_to_parquet( result = subprocess.run( [ "Rscript", - "pipelines/convert_inferencedata_to_parquet.R", + "pipelines/postprocess/convert_inferencedata_to_parquet.R", f"{model_run_dir}", "--model-name", f"{model_name}", @@ -125,13 +125,13 @@ def convert_inferencedata_to_parquet( return None -def postprocess_forecast( +def plot_state_forecast( model_run_dir: Path, pyrenew_model_name: str, timeseries_model_name: str ) -> None: result = subprocess.run( [ "Rscript", - "pipelines/postprocess_state_forecast.R", + "pipelines/postprocess/plot_state_forecast.R", f"{model_run_dir}", "--pyrenew-model-name", f"{pyrenew_model_name}", @@ -141,7 +141,7 @@ def postprocess_forecast( capture_output=True, ) if result.returncode != 0: - raise RuntimeError(f"postprocess_forecast: {result.stderr}") + raise RuntimeError(f"plot_state_forecast: {result.stderr}") return None @@ -149,7 +149,7 @@ def score_forecast(model_run_dir: Path) -> None: result = subprocess.run( [ "Rscript", - "pipelines/score_forecast.R", + "pipelines/postprocess/score_forecast.R", f"{model_run_dir}", ], capture_output=True, @@ -159,17 +159,17 @@ def score_forecast(model_run_dir: Path) -> None: return None -def render_webpage(model_run_dir: Path) -> None: +def render_diagnostic_report(model_run_dir: Path) -> None: result = subprocess.run( [ "Rscript", - "pipelines/render_webpage.R", + "pipelines/diagnostic_report/render_diagnostic_report.R", f"{model_run_dir}", ], capture_output=True, ) if result.returncode != 0: - raise RuntimeError(f"render_webpage: {result.stderr}") + raise RuntimeError(f"render_diagnostic_report: {result.stderr}") return None @@ -364,11 +364,11 @@ def main( logger.info("Conversion complete.") logger.info("Postprocessing forecast...") - postprocess_forecast(model_run_dir, "pyrenew_e", "timeseries_e") + plot_state_forecast(model_run_dir, "pyrenew_e", "timeseries_e") logger.info("Postprocessing complete.") logger.info("Rendering webpage...") - render_webpage(model_run_dir) + render_diagnostic_report(model_run_dir) logger.info("Rendering complete.") if score: From 44eb59a98811850c507ff104b7cfb736feee14af Mon Sep 17 00:00:00 2001 From: Damon Bayer Date: Wed, 18 Dec 2024 17:28:32 -0600 Subject: [PATCH 06/19] pre-commit --- pipelines/forecast_state.py | 2 +- pipelines/workflows/forecast_state.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pipelines/forecast_state.py b/pipelines/forecast_state.py index 8f06dd6a..0afd2f6f 100644 --- a/pipelines/forecast_state.py +++ b/pipelines/forecast_state.py @@ -11,8 +11,8 @@ import tomli_w import tomllib from preprocess.prep_data import process_and_save_state -from pygit2 import Repository from preprocess.prep_eval_data import save_eval_data +from pygit2 import Repository numpyro.set_host_device_count(4) diff --git a/pipelines/workflows/forecast_state.py b/pipelines/workflows/forecast_state.py index 8f06dd6a..0afd2f6f 100644 --- a/pipelines/workflows/forecast_state.py +++ b/pipelines/workflows/forecast_state.py @@ -11,8 +11,8 @@ import tomli_w import tomllib from preprocess.prep_data import process_and_save_state -from pygit2 import Repository from preprocess.prep_eval_data import save_eval_data +from pygit2 import Repository numpyro.set_host_device_count(4) From 35ac23076f1b1c3ffa46f29956f7d1a308277a1a Mon Sep 17 00:00:00 2001 From: Damon Bayer Date: Wed, 18 Dec 2024 17:40:20 -0600 Subject: [PATCH 07/19] adjust imports --- .../fit_model/pyrenew/fit_pyrenew_model.py | 4 +- pipelines/forecast_state.py | 10 ++-- .../hubverse/create_observed_data_tables.py | 2 +- pipelines/iteration_helpers/loop_fit.sh | 2 +- pipelines/postprocess/generate_predictive.py | 4 +- pipelines/preprocess/prep_eval_data.py | 5 +- pipelines/tests/test_run.sh | 50 +++++++++++++++++++ pipelines/workflows/forecast_state.py | 10 ++-- 8 files changed, 74 insertions(+), 13 deletions(-) create mode 100644 pipelines/tests/test_run.sh diff --git a/pipelines/fit_model/pyrenew/fit_pyrenew_model.py b/pipelines/fit_model/pyrenew/fit_pyrenew_model.py index 8a37456c..0bdbd7fc 100644 --- a/pipelines/fit_model/pyrenew/fit_pyrenew_model.py +++ b/pipelines/fit_model/pyrenew/fit_pyrenew_model.py @@ -4,7 +4,9 @@ import jax import numpy as np -from fit_model.pyrenew.build_pyrenew_model import build_model_from_dir +from pipelines.fit_model.pyrenew.build_pyrenew_model import ( + build_model_from_dir, +) def fit_and_save_model( diff --git a/pipelines/forecast_state.py b/pipelines/forecast_state.py index 0afd2f6f..3f2e3a20 100644 --- a/pipelines/forecast_state.py +++ b/pipelines/forecast_state.py @@ -10,14 +10,16 @@ import polars as pl import tomli_w import tomllib -from preprocess.prep_data import process_and_save_state -from preprocess.prep_eval_data import save_eval_data +from pipelines.preprocess.prep_data import process_and_save_state +from pipelines.preprocess.prep_eval_data import save_eval_data from pygit2 import Repository numpyro.set_host_device_count(4) -from fit_model.pyrenew.fit_pyrenew_model import fit_and_save_model # noqa -from postprocess.generate_predictive import generate_and_save_predictions # noqa +from pipelines.fit_model.pyrenew.fit_pyrenew_model import fit_and_save_model # noqa +from pipelines.postprocess.generate_predictive import ( + generate_and_save_predictions, +) # noqa def record_git_info(model_run_dir: Path): diff --git a/pipelines/hubverse/create_observed_data_tables.py b/pipelines/hubverse/create_observed_data_tables.py index 27c8b4f1..f55b9120 100644 --- a/pipelines/hubverse/create_observed_data_tables.py +++ b/pipelines/hubverse/create_observed_data_tables.py @@ -5,7 +5,7 @@ import epiweeks import polars as pl -from preprocess.prep_data import ( +from pipelines.preprocess.prep_data import ( aggregate_facility_level_nssp_to_state, get_state_pop_df, ) diff --git a/pipelines/iteration_helpers/loop_fit.sh b/pipelines/iteration_helpers/loop_fit.sh index d59d2933..99eafb6a 100755 --- a/pipelines/iteration_helpers/loop_fit.sh +++ b/pipelines/iteration_helpers/loop_fit.sh @@ -13,5 +13,5 @@ BASE_DIR="$1" for SUBDIR in "$BASE_DIR"/*/; do # Run the Python script with the current subdirectory as the model_dir argument echo "$SUBDIR" - python fit_model.pyrenew.fit_pyrenew_model.py "$SUBDIR" + python pipelines.fit_model.pyrenew.fit_pyrenew_model.py "$SUBDIR" done diff --git a/pipelines/postprocess/generate_predictive.py b/pipelines/postprocess/generate_predictive.py index 589f6c0a..e63ee97b 100644 --- a/pipelines/postprocess/generate_predictive.py +++ b/pipelines/postprocess/generate_predictive.py @@ -3,7 +3,9 @@ from pathlib import Path import arviz as az -from fit_model.pyrenew.build_pyrenew_model import build_model_from_dir +from pipelines.fit_model.pyrenew.build_pyrenew_model import ( + build_model_from_dir, +) def generate_and_save_predictions( diff --git a/pipelines/preprocess/prep_eval_data.py b/pipelines/preprocess/prep_eval_data.py index fca17f55..b3dce93b 100644 --- a/pipelines/preprocess/prep_eval_data.py +++ b/pipelines/preprocess/prep_eval_data.py @@ -3,7 +3,10 @@ from pathlib import Path import polars as pl -from preprocess.prep_data import get_state_pop_df, process_state_level_data +from pipelines.preprocess.prep_data import ( + get_state_pop_df, + process_state_level_data, +) def save_eval_data( diff --git a/pipelines/tests/test_run.sh b/pipelines/tests/test_run.sh new file mode 100644 index 00000000..2b45f563 --- /dev/null +++ b/pipelines/tests/test_run.sh @@ -0,0 +1,50 @@ +#!/bin/bash + +# Check if the base directory is provided as an argument +if [ -z "$1" ]; then + echo "Usage: $0 " + exit 1 +fi + +# Base directory containing subdirectories +BASE_DIR="$1" +N_SAMPLES=$2 +N_AHEAD=$3 + +# Iterate over each subdirectory in the base directory +echo "TEST-MODE: Running loop over subdirectories in $BASE_DIR" +echo "For $N_SAMPLES samples on 1 chain, and $N_AHEAD forecast points" +echo "" +for SUBDIR in "$BASE_DIR"/*/; do + echo "TEST-MODE: Inference for $SUBDIR" + python pipelines.fit_model.pyrenew.fit_pyrenew_model.py "$SUBDIR" --n-chains 1 --n-samples $N_SAMPLES + echo "TEST-MODE: Finished inference" + echo "" + echo "TEST-MODE: Generating posterior predictions for $SUBDIR" + python postprocess/generate_predictive.py "$SUBDIR" --n-forecast-points $N_AHEAD + echo "TEST-MODE: Finished generating posterior predictions" + echo "" + echo "TEST-MODE: Converting inferencedata to parquet for $SUBDIR" + Rscript postprocess/convert_inferencedata_to_parquet.R "$SUBDIR" + echo "TEST-MODE: Finished converting inferencedata to parquet" + echo "" + echo "TEST-MODE: Generate epiweekly data for $SUBDIR" + Rscript preprocess/generate_epiweekly.R "$SUBDIR" + echo "TEST-MODE: Finished generating epiweekly data" + echo "" + echo "TEST-MODE: Forecasting baseline models for $SUBDIR" + Rscript fit_model/timeseries_forecasts.R "$SUBDIR" --n-forecast-days $N_AHEAD --n-samples $N_SAMPLES + echo "TEST-MODE: Finished forecasting baseline models" + echo "" + echo "TEST-MODE: Plotting state forecast for $SUBDIR" + Rscript postproces/plot_state_forecast.R "$SUBDIR" + echo "TEST-MODE: Finished postprocessing state forecast" + echo "" + echo "TEST-MODE: Rendering webpage for $SUBDIR" + Rscript diagnostic_report/render_diagnostic_report.R "$SUBDIR" + echo "TEST-MODE: Finished rendering webpage" + echo "" + echo "TEST-MODE: Scoring forecast for $SUBDIR" + Rscript postprocess/score_forecast.R "$SUBDIR" + echo "TEST-MODE: Finished scoring forecast" +done diff --git a/pipelines/workflows/forecast_state.py b/pipelines/workflows/forecast_state.py index 0afd2f6f..3f2e3a20 100644 --- a/pipelines/workflows/forecast_state.py +++ b/pipelines/workflows/forecast_state.py @@ -10,14 +10,16 @@ import polars as pl import tomli_w import tomllib -from preprocess.prep_data import process_and_save_state -from preprocess.prep_eval_data import save_eval_data +from pipelines.preprocess.prep_data import process_and_save_state +from pipelines.preprocess.prep_eval_data import save_eval_data from pygit2 import Repository numpyro.set_host_device_count(4) -from fit_model.pyrenew.fit_pyrenew_model import fit_and_save_model # noqa -from postprocess.generate_predictive import generate_and_save_predictions # noqa +from pipelines.fit_model.pyrenew.fit_pyrenew_model import fit_and_save_model # noqa +from pipelines.postprocess.generate_predictive import ( + generate_and_save_predictions, +) # noqa def record_git_info(model_run_dir: Path): From ae73b728631d4172280c1225ad5b4fd0b834b6b9 Mon Sep 17 00:00:00 2001 From: Damon Bayer Date: Wed, 18 Dec 2024 17:44:48 -0600 Subject: [PATCH 08/19] pre-commit --- pipelines/fit_model/pyrenew/fit_pyrenew_model.py | 1 + pipelines/forecast_state.py | 3 ++- pipelines/hubverse/create_observed_data_tables.py | 1 + pipelines/postprocess/generate_predictive.py | 1 + pipelines/preprocess/prep_eval_data.py | 1 + pipelines/workflows/forecast_state.py | 3 ++- 6 files changed, 8 insertions(+), 2 deletions(-) diff --git a/pipelines/fit_model/pyrenew/fit_pyrenew_model.py b/pipelines/fit_model/pyrenew/fit_pyrenew_model.py index 0bdbd7fc..b10648ad 100644 --- a/pipelines/fit_model/pyrenew/fit_pyrenew_model.py +++ b/pipelines/fit_model/pyrenew/fit_pyrenew_model.py @@ -4,6 +4,7 @@ import jax import numpy as np + from pipelines.fit_model.pyrenew.build_pyrenew_model import ( build_model_from_dir, ) diff --git a/pipelines/forecast_state.py b/pipelines/forecast_state.py index 3f2e3a20..ebc5d1bc 100644 --- a/pipelines/forecast_state.py +++ b/pipelines/forecast_state.py @@ -10,9 +10,10 @@ import polars as pl import tomli_w import tomllib +from pygit2 import Repository + from pipelines.preprocess.prep_data import process_and_save_state from pipelines.preprocess.prep_eval_data import save_eval_data -from pygit2 import Repository numpyro.set_host_device_count(4) diff --git a/pipelines/hubverse/create_observed_data_tables.py b/pipelines/hubverse/create_observed_data_tables.py index f55b9120..323a6f09 100644 --- a/pipelines/hubverse/create_observed_data_tables.py +++ b/pipelines/hubverse/create_observed_data_tables.py @@ -5,6 +5,7 @@ import epiweeks import polars as pl + from pipelines.preprocess.prep_data import ( aggregate_facility_level_nssp_to_state, get_state_pop_df, diff --git a/pipelines/postprocess/generate_predictive.py b/pipelines/postprocess/generate_predictive.py index e63ee97b..7a8bdceb 100644 --- a/pipelines/postprocess/generate_predictive.py +++ b/pipelines/postprocess/generate_predictive.py @@ -3,6 +3,7 @@ from pathlib import Path import arviz as az + from pipelines.fit_model.pyrenew.build_pyrenew_model import ( build_model_from_dir, ) diff --git a/pipelines/preprocess/prep_eval_data.py b/pipelines/preprocess/prep_eval_data.py index b3dce93b..711fb47c 100644 --- a/pipelines/preprocess/prep_eval_data.py +++ b/pipelines/preprocess/prep_eval_data.py @@ -3,6 +3,7 @@ from pathlib import Path import polars as pl + from pipelines.preprocess.prep_data import ( get_state_pop_df, process_state_level_data, diff --git a/pipelines/workflows/forecast_state.py b/pipelines/workflows/forecast_state.py index 3f2e3a20..ebc5d1bc 100644 --- a/pipelines/workflows/forecast_state.py +++ b/pipelines/workflows/forecast_state.py @@ -10,9 +10,10 @@ import polars as pl import tomli_w import tomllib +from pygit2 import Repository + from pipelines.preprocess.prep_data import process_and_save_state from pipelines.preprocess.prep_eval_data import save_eval_data -from pygit2 import Repository numpyro.set_host_device_count(4) From 6b3653cb8f99b5743b78b1aad18aaeadaf4d759a Mon Sep 17 00:00:00 2001 From: Damon Bayer Date: Wed, 18 Dec 2024 17:48:42 -0600 Subject: [PATCH 09/19] remove extra forecast_state --- pipelines/forecast_state.py | 531 ------------------------------------ 1 file changed, 531 deletions(-) delete mode 100644 pipelines/forecast_state.py diff --git a/pipelines/forecast_state.py b/pipelines/forecast_state.py deleted file mode 100644 index ebc5d1bc..00000000 --- a/pipelines/forecast_state.py +++ /dev/null @@ -1,531 +0,0 @@ -import argparse -import logging -import os -import shutil -import subprocess -from datetime import datetime, timedelta -from pathlib import Path - -import numpyro -import polars as pl -import tomli_w -import tomllib -from pygit2 import Repository - -from pipelines.preprocess.prep_data import process_and_save_state -from pipelines.preprocess.prep_eval_data import save_eval_data - -numpyro.set_host_device_count(4) - -from pipelines.fit_model.pyrenew.fit_pyrenew_model import fit_and_save_model # noqa -from pipelines.postprocess.generate_predictive import ( - generate_and_save_predictions, -) # noqa - - -def record_git_info(model_run_dir: Path): - metadata_file = Path(model_run_dir, "metadata.toml") - - if metadata_file.exists(): - with open(metadata_file, "rb") as file: - metadata = tomllib.load(file) - else: - metadata = {} - - try: - repo = Repository(os.getcwd()) - branch_name = repo.head.shorthand - commit_sha = str(repo.head.target) - except Exception as e: - branch_name = os.environ.get("GIT_BRANCH_NAME", "unknown") - commit_sha = os.environ.get("GIT_COMMIT_SHA", "unknown") - - new_metadata = { - "branch_name": branch_name, - "commit_sha": commit_sha, - } - - metadata.update(new_metadata) - - metadata_file.parent.mkdir(parents=True, exist_ok=True) - with open(metadata_file, "wb") as file: - tomli_w.dump(metadata, file) - - -def copy_and_record_priors(priors_path: Path, model_run_dir: Path): - metadata_file = Path(model_run_dir, "metadata.toml") - shutil.copyfile(priors_path, Path(model_run_dir, "priors.py")) - - if metadata_file.exists(): - with open(metadata_file, "rb") as file: - metadata = tomllib.load(file) - else: - metadata = {} - - new_metadata = { - "priors_path": str(priors_path), - } - - metadata.update(new_metadata) - - with open(metadata_file, "wb") as file: - tomli_w.dump(metadata, file) - - -def generate_epiweekly(model_run_dir: Path) -> None: - result = subprocess.run( - [ - "Rscript", - "pipelines/preprocess/generate_epiweekly.R", - f"{model_run_dir}", - ], - capture_output=True, - ) - if result.returncode != 0: - raise RuntimeError(f"generate_epiweekly: {result.stderr}") - return None - - -def timeseries_forecasts( - model_run_dir: Path, model_name: str, n_forecast_days: int, n_samples: int -) -> None: - result = subprocess.run( - [ - "Rscript", - "pipelines/fit_model/timeseries_forecasts.R", - f"{model_run_dir}", - "--model-name", - f"{model_name}", - "--n-forecast-days", - f"{n_forecast_days}", - "--n-samples", - f"{n_samples}", - ], - capture_output=True, - ) - if result.returncode != 0: - raise RuntimeError(f"timeseries_forecasts: {result.stderr}") - return None - - -def convert_inferencedata_to_parquet( - model_run_dir: Path, model_name: str -) -> None: - result = subprocess.run( - [ - "Rscript", - "pipelines/postprocess/convert_inferencedata_to_parquet.R", - f"{model_run_dir}", - "--model-name", - f"{model_name}", - ], - capture_output=True, - ) - if result.returncode != 0: - raise RuntimeError( - f"convert_inferencedata_to_parquet: {result.stderr}" - ) - return None - - -def plot_state_forecast( - model_run_dir: Path, pyrenew_model_name: str, timeseries_model_name: str -) -> None: - result = subprocess.run( - [ - "Rscript", - "pipelines/postprocess/plot_state_forecast.R", - f"{model_run_dir}", - "--pyrenew-model-name", - f"{pyrenew_model_name}", - "--timeseries-model-name", - f"{timeseries_model_name}", - ], - capture_output=True, - ) - if result.returncode != 0: - raise RuntimeError(f"plot_state_forecast: {result.stderr}") - return None - - -def score_forecast(model_run_dir: Path) -> None: - result = subprocess.run( - [ - "Rscript", - "pipelines/postprocess/score_forecast.R", - f"{model_run_dir}", - ], - capture_output=True, - ) - if result.returncode != 0: - raise RuntimeError(f"score_forecast: {result.stderr}") - return None - - -def render_diagnostic_report(model_run_dir: Path) -> None: - result = subprocess.run( - [ - "Rscript", - "pipelines/diagnostic_report/render_diagnostic_report.R", - f"{model_run_dir}", - ], - capture_output=True, - ) - if result.returncode != 0: - raise RuntimeError(f"render_diagnostic_report: {result.stderr}") - return None - - -def get_available_reports( - data_dir: str | Path, glob_pattern: str = "*.parquet" -): - return [ - datetime.strptime(f.stem, "%Y-%m-%d").date() - for f in Path(data_dir).glob(glob_pattern) - ] - - -def main( - disease: str, - report_date: str, - state: str, - facility_level_nssp_data_dir: Path | str, - state_level_nssp_data_dir: Path | str, - param_data_dir: Path | str, - priors_path: Path | str, - output_dir: Path | str, - n_training_days: int, - n_forecast_days: int, - n_chains: int, - n_warmup: int, - n_samples: int, - exclude_last_n_days: int = 0, - score: bool = False, - eval_data_path: Path = None, -): - logging.basicConfig(level=logging.INFO) - logger = logging.getLogger(__name__) - - available_facility_level_reports = get_available_reports( - facility_level_nssp_data_dir - ) - - available_state_level_reports = get_available_reports( - state_level_nssp_data_dir - ) - first_available_state_report = min(available_state_level_reports) - last_available_state_report = max(available_state_level_reports) - - if report_date == "latest": - report_date = max(available_facility_level_reports) - else: - report_date = datetime.strptime(report_date, "%Y-%m-%d").date() - - if report_date in available_state_level_reports: - state_report_date = report_date - elif report_date > last_available_state_report: - state_report_date = last_available_state_report - elif report_date > first_available_state_report: - raise ValueError( - "Dataset appear to be missing some state-level " - f"reports. First entry is {first_available_state_report}, " - f"last is {last_available_state_report}, but no entry " - f"for {report_date}" - ) - else: - raise ValueError( - "Requested report date is earlier than the first " - "state-level vintage. This is not currently supported" - ) - - logger.info(f"Report date: {report_date}") - if state_report_date is not None: - logger.info(f"Using state-level data as of: {state_report_date}") - - # + 1 because max date in dataset is report_date - 1 - last_training_date = report_date - timedelta(days=exclude_last_n_days + 1) - - if last_training_date >= report_date: - raise ValueError( - "Last training date must be before the report date. " - "Got a last training date of {last_training_date} " - "with a report date of {report_date}." - ) - - logger.info(f"last training date: {last_training_date}") - - first_training_date = last_training_date - timedelta( - days=n_training_days - 1 - ) - - logger.info(f"First training date {first_training_date}") - - facility_level_nssp_data, state_level_nssp_data = None, None - - if report_date in available_facility_level_reports: - logger.info( - "Facility level data available for " "the given report date" - ) - facility_datafile = f"{report_date}.parquet" - facility_level_nssp_data = pl.scan_parquet( - Path(facility_level_nssp_data_dir, facility_datafile) - ) - if state_report_date in available_state_level_reports: - logger.info("State-level data available for the given report " "date.") - state_datafile = f"{state_report_date}.parquet" - state_level_nssp_data = pl.scan_parquet( - Path(state_level_nssp_data_dir, state_datafile) - ) - if facility_level_nssp_data is None and state_level_nssp_data is None: - raise ValueError( - "No data available for the requested report date " f"{report_date}" - ) - - param_estimates = pl.scan_parquet(Path(param_data_dir, "prod.parquet")) - model_batch_dir_name = ( - f"{disease.lower()}_r_{report_date}_f_" - f"{first_training_date}_t_{last_training_date}" - ) - - model_batch_dir = Path(output_dir, model_batch_dir_name) - - model_run_dir = Path(model_batch_dir, "model_runs", state) - - os.makedirs(model_run_dir, exist_ok=True) - - logger.info("Recording git info...") - record_git_info(model_run_dir) - - logger.info(f"Copying and recording priors from {priors_path}...") - copy_and_record_priors(priors_path, model_run_dir) - - logger.info(f"Processing {state}") - process_and_save_state( - state_abb=state, - disease=disease, - facility_level_nssp_data=facility_level_nssp_data, - state_level_nssp_data=state_level_nssp_data, - report_date=report_date, - state_level_report_date=state_report_date, - first_training_date=first_training_date, - last_training_date=last_training_date, - param_estimates=param_estimates, - model_run_dir=model_run_dir, - logger=logger, - ) - logger.info("Getting eval data...") - if eval_data_path is None: - raise ValueError("No path to an evaluation dataset provided.") - save_eval_data( - state=state, - report_date=report_date, - disease=disease, - first_training_date=first_training_date, - last_training_date=last_training_date, - latest_comprehensive_path=eval_data_path, - output_data_dir=Path(model_run_dir, "data"), - last_eval_date=report_date + timedelta(days=n_forecast_days), - ) - - logger.info("Generating epiweekly datasets from daily datasets...") - generate_epiweekly(model_run_dir) - - logger.info("Data preparation complete.") - - logger.info("Fitting model") - fit_and_save_model( - model_run_dir, - "pyrenew_e", - n_warmup=n_warmup, - n_samples=n_samples, - n_chains=n_chains, - ) - logger.info("Model fitting complete") - - logger.info("Performing posterior prediction / forecasting...") - - n_days_past_last_training = n_forecast_days + exclude_last_n_days - generate_and_save_predictions( - model_run_dir, "pyrenew_e", n_days_past_last_training - ) - - logger.info( - "Performing baseline forecasting and non-target pathogen " - "forecasting..." - ) - n_denominator_samples = n_samples * n_chains - timeseries_forecasts( - model_run_dir, - "timeseries_e", - n_days_past_last_training, - n_denominator_samples, - ) - logger.info("All forecasting complete.") - - logger.info("Converting inferencedata to parquet...") - convert_inferencedata_to_parquet(model_run_dir, "pyrenew_e") - logger.info("Conversion complete.") - - logger.info("Postprocessing forecast...") - plot_state_forecast(model_run_dir, "pyrenew_e", "timeseries_e") - logger.info("Postprocessing complete.") - - logger.info("Rendering webpage...") - render_diagnostic_report(model_run_dir) - logger.info("Rendering complete.") - - if score: - logger.info("Scoring forecast...") - score_forecast(model_run_dir) - - logger.info( - "Single state pipeline complete " - f"for state {state} with " - f"report date {report_date}." - ) - return None - - -if __name__ == "__main__": - parser = argparse.ArgumentParser( - description="Create fit data for disease modeling." - ) - parser.add_argument( - "--disease", - type=str, - required=True, - help="Disease to model (e.g., COVID-19, Influenza, RSV).", - ) - - parser.add_argument( - "--state", - type=str, - required=True, - help=( - "Two letter abbreviation for the state to fit" - "(e.g. 'AK', 'AL', 'AZ', etc.)." - ), - ) - - parser.add_argument( - "--report-date", - type=str, - default="latest", - help="Report date in YYYY-MM-DD format or latest (default: latest).", - ) - - parser.add_argument( - "--facility-level-nssp-data-dir", - type=Path, - default=Path("private_data", "nssp_etl_gold"), - help=( - "Directory in which to look for facility-level NSSP " - "ED visit data" - ), - ) - - parser.add_argument( - "--state-level-nssp-data-dir", - type=Path, - default=Path("private_data", "nssp_state_level_gold"), - help=( - "Directory in which to look for state-level NSSP " "ED visit data." - ), - ) - - parser.add_argument( - "--param-data-dir", - type=Path, - default=Path("private_data", "prod_param_estimates"), - help=( - "Directory in which to look for parameter estimates" - "such as delay PMFs." - ), - required=True, - ) - - parser.add_argument( - "--priors-path", - type=Path, - help=( - "Path to an executible python file defining random variables " - "that require priors as pyrenew RandomVariable objects." - ), - required=True, - ) - - parser.add_argument( - "--output-dir", - type=Path, - default="private_data", - help="Directory in which to save output.", - ) - - parser.add_argument( - "--n-training-days", - type=int, - default=180, - help="Number of training days (default: 180).", - ) - - parser.add_argument( - "--n-forecast-days", - type=int, - default=28, - help=( - "Number of days ahead to forecast relative to the " - "report date (default: 28).", - ), - ) - - parser.add_argument( - "--n-chains", - type=int, - default=4, - help="Number of MCMC chains to run (default: 4).", - ) - - parser.add_argument( - "--n-warmup", - type=int, - default=1000, - help=( - "Number of warmup iterations per chain for NUTS" "(default: 1000)." - ), - ) - - parser.add_argument( - "--n-samples", - type=int, - default=1000, - help=( - "Number of posterior samples to draw per " - "chain using NUTS (default: 1000)." - ), - ) - - parser.add_argument( - "--exclude-last-n-days", - type=int, - default=0, - help=( - "Optionally exclude the final n days of available training " - "data (Default: 0, i.e. exclude no available data" - ), - ) - - parser.add_argument( - "--score", - type=bool, - action=argparse.BooleanOptionalAction, - help=("If this flag is provided, will attempt to score the forecast."), - ) - - parser.add_argument( - "--eval-data-path", - type=Path, - help=("Path to a parquet file containing compehensive truth data."), - ) - args = parser.parse_args() - numpyro.set_host_device_count(args.n_chains) - main(**vars(args)) From 8cd49bd60af9c6796513489f06a24630d4b9797d Mon Sep 17 00:00:00 2001 From: Damon Bayer Date: Mon, 23 Dec 2024 10:00:34 -0600 Subject: [PATCH 10/19] undo lots of changes --- README.md | 4 +- pipelines/batch/setup_eval_job.py | 2 +- .../batch/setup_parameter_inference_job.py | 2 +- pipelines/batch/setup_prod_job.py | 2 +- .../pyrenew => }/build_pyrenew_model.py | 0 pipelines/collate/__init__.py | 0 pipelines/{collate => }/collate_plots.py | 0 .../{collate => }/collate_score_tables.R | 0 .../convert_inferencedata_to_parquet.R | 0 pipelines/fit_model/__init__.py | 0 pipelines/fit_model/pyrenew/__init__.py | 0 .../pyrenew => }/fit_pyrenew_model.py | 2 +- pipelines/{workflows => }/forecast_state.py | 24 +++---- .../{preprocess => }/generate_epiweekly.R | 0 .../{postprocess => }/generate_predictive.py | 2 +- pipelines/hubverse/__init__.py | 0 ...> hubverse_create_observed_data_tables.py} | 2 +- ...bverse_table.R => hubverse_create_table.R} | 0 ... hubverse_make_obs_time_series_for_viz.py} | 0 .../score_hubverse.R => hubverse_score.R} | 0 pipelines/iteration_helpers/loop_fit.sh | 17 ----- .../loop_generate_predictive.sh | 18 ----- .../iteration_helpers/loop_postprocess.sh | 18 ----- pipelines/iteration_helpers/loop_score.sh | 17 ----- ...ecast.R => plot_and_save_state_forecast.R} | 0 .../plot_category_pointintervals.R | 0 pipelines/postprocess/__init__.py | 0 pipelines/{preprocess => }/prep_data.py | 0 pipelines/{preprocess => }/prep_eval_data.py | 2 +- pipelines/preprocess/__init__.py | 0 pipelines/preprocess/pull_nhsn.R | 68 ------------------- pipelines/{postprocess => }/score_forecast.R | 0 .../summarize_visualize_scores.R | 0 pipelines/tests/test_forecast_state.sh | 2 +- pipelines/tests/test_run.sh | 14 ++-- .../{fit_model => }/timeseries_forecasts.R | 0 pipelines/workflows/__init__.py | 0 37 files changed, 29 insertions(+), 167 deletions(-) rename pipelines/{fit_model/pyrenew => }/build_pyrenew_model.py (100%) delete mode 100644 pipelines/collate/__init__.py rename pipelines/{collate => }/collate_plots.py (100%) rename pipelines/{collate => }/collate_score_tables.R (100%) rename pipelines/{postprocess => }/convert_inferencedata_to_parquet.R (100%) delete mode 100644 pipelines/fit_model/__init__.py delete mode 100644 pipelines/fit_model/pyrenew/__init__.py rename pipelines/{fit_model/pyrenew => }/fit_pyrenew_model.py (97%) rename pipelines/{workflows => }/forecast_state.py (95%) rename pipelines/{preprocess => }/generate_epiweekly.R (100%) rename pipelines/{postprocess => }/generate_predictive.py (97%) delete mode 100644 pipelines/hubverse/__init__.py rename pipelines/{hubverse/create_observed_data_tables.py => hubverse_create_observed_data_tables.py} (99%) rename pipelines/{hubverse/create_hubverse_table.R => hubverse_create_table.R} (100%) rename pipelines/{hubverse/make_obs_time_series_for_viz.py => hubverse_make_obs_time_series_for_viz.py} (100%) rename pipelines/{hubverse/score_hubverse.R => hubverse_score.R} (100%) delete mode 100755 pipelines/iteration_helpers/loop_fit.sh delete mode 100755 pipelines/iteration_helpers/loop_generate_predictive.sh delete mode 100755 pipelines/iteration_helpers/loop_postprocess.sh delete mode 100755 pipelines/iteration_helpers/loop_score.sh rename pipelines/{postprocess/plot_state_forecast.R => plot_and_save_state_forecast.R} (100%) rename pipelines/{collate => }/plot_category_pointintervals.R (100%) delete mode 100644 pipelines/postprocess/__init__.py rename pipelines/{preprocess => }/prep_data.py (100%) rename pipelines/{preprocess => }/prep_eval_data.py (96%) delete mode 100644 pipelines/preprocess/__init__.py delete mode 100644 pipelines/preprocess/pull_nhsn.R rename pipelines/{postprocess => }/score_forecast.R (100%) rename pipelines/{collate => }/summarize_visualize_scores.R (100%) rename pipelines/{fit_model => }/timeseries_forecasts.R (100%) delete mode 100644 pipelines/workflows/__init__.py diff --git a/README.md b/README.md index 1e95ec33..4cff7858 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ This repository contains code for the [PyRenew-HEW model](https://github.com/CDC ## Containers -The project uses GitHub Actions for automatically building container images based on the project's [Containerfile](Containerfile) and [Containerfile.dependencies](Containerfile.dependencies) files. The images are currently hosted on Azure Container Registry and are built and pushed via the [containers.yaml](.github/workflows/containers.yaml) GitHub Actions workflow. +The project uses GitHub Actions for automatically building container images based on the project's [Containerfile](Containerfile) and [Containerfile.dependencies](Containerfile.dependencies) files. The images are currently hosted on Azure Container Registry and are built and pushed via the [containers.yaml](.github/containers.yaml) GitHub Actions workflow. Images can also be built locally. The [Makefile](Makefile) contains several targets for building and pushing images. Although the Makefile uses Docker as the default engine, the `ENGINE` environment variable can be set to `podman` to use Podman instead, for example: @@ -22,7 +22,7 @@ ENGINE=podman make dep_container_build # podman build . -t pyrenew-hew-dependencies -f Containerfile.dependencies ``` -Container images pushed to the Azure Container Registry are automatically tagged as either `latest` (if the commit is on the `main` branch) or with the branch name (if the commit is on a different branch). After a branch is deleted, the image tag is remove from the registry via the [delete-container-tag.yaml](.github/workflows/delete-container-tag.yaml) GitHub Actions workflow. +Container images pushed to the Azure Container Registry are automatically tagged as either `latest` (if the commit is on the `main` branch) or with the branch name (if the commit is on a different branch). After a branch is deleted, the image tag is remove from the registry via the [delete-container-tag.yaml](.github/delete-container-tag.yaml) GitHub Actions workflow. ## General Disclaimer This repository was created for use by CDC programs to collaborate on public health related projects in support of the [CDC mission](https://www.cdc.gov/about/organization/mission.htm). GitHub is not hosted by the CDC, but is a third party website used by CDC and its partners to share information and collaborate on software. CDC use of GitHub does not imply an endorsement of any one particular service, product, or enterprise. diff --git a/pipelines/batch/setup_eval_job.py b/pipelines/batch/setup_eval_job.py index 1dba1ee6..6b51fdaa 100644 --- a/pipelines/batch/setup_eval_job.py +++ b/pipelines/batch/setup_eval_job.py @@ -127,7 +127,7 @@ def main( base_call = ( "/bin/bash -c '" - "python pipelines/workflows/forecast_state.py " + "python pipelines/forecast_state.py " "--disease {disease} " "--state {state} " "--n-training-days {n_training} " diff --git a/pipelines/batch/setup_parameter_inference_job.py b/pipelines/batch/setup_parameter_inference_job.py index c49a68dc..72adde94 100644 --- a/pipelines/batch/setup_parameter_inference_job.py +++ b/pipelines/batch/setup_parameter_inference_job.py @@ -128,7 +128,7 @@ def main( base_call = ( "/bin/bash -c '" - "python pipelines/workflows/forecast_state.py " + "python pipelines/forecast_state.py " "--disease {disease} " "--state {state} " "--n-training-days 450 " diff --git a/pipelines/batch/setup_prod_job.py b/pipelines/batch/setup_prod_job.py index f604874d..2b8040f7 100644 --- a/pipelines/batch/setup_prod_job.py +++ b/pipelines/batch/setup_prod_job.py @@ -156,7 +156,7 @@ def main( base_call = ( "/bin/bash -c '" - "python pipelines/workflows/forecast_state.py " + "python pipelines/forecast_state.py " "--disease {disease} " "--state {state} " "--n-training-days {n_training_days} " diff --git a/pipelines/fit_model/pyrenew/build_pyrenew_model.py b/pipelines/build_pyrenew_model.py similarity index 100% rename from pipelines/fit_model/pyrenew/build_pyrenew_model.py rename to pipelines/build_pyrenew_model.py diff --git a/pipelines/collate/__init__.py b/pipelines/collate/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/pipelines/collate/collate_plots.py b/pipelines/collate_plots.py similarity index 100% rename from pipelines/collate/collate_plots.py rename to pipelines/collate_plots.py diff --git a/pipelines/collate/collate_score_tables.R b/pipelines/collate_score_tables.R similarity index 100% rename from pipelines/collate/collate_score_tables.R rename to pipelines/collate_score_tables.R diff --git a/pipelines/postprocess/convert_inferencedata_to_parquet.R b/pipelines/convert_inferencedata_to_parquet.R similarity index 100% rename from pipelines/postprocess/convert_inferencedata_to_parquet.R rename to pipelines/convert_inferencedata_to_parquet.R diff --git a/pipelines/fit_model/__init__.py b/pipelines/fit_model/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/pipelines/fit_model/pyrenew/__init__.py b/pipelines/fit_model/pyrenew/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/pipelines/fit_model/pyrenew/fit_pyrenew_model.py b/pipelines/fit_pyrenew_model.py similarity index 97% rename from pipelines/fit_model/pyrenew/fit_pyrenew_model.py rename to pipelines/fit_pyrenew_model.py index b10648ad..6f1bc788 100644 --- a/pipelines/fit_model/pyrenew/fit_pyrenew_model.py +++ b/pipelines/fit_pyrenew_model.py @@ -5,7 +5,7 @@ import jax import numpy as np -from pipelines.fit_model.pyrenew.build_pyrenew_model import ( +from .build_pyrenew_model import ( build_model_from_dir, ) diff --git a/pipelines/workflows/forecast_state.py b/pipelines/forecast_state.py similarity index 95% rename from pipelines/workflows/forecast_state.py rename to pipelines/forecast_state.py index ebc5d1bc..9a217c4c 100644 --- a/pipelines/workflows/forecast_state.py +++ b/pipelines/forecast_state.py @@ -12,13 +12,13 @@ import tomllib from pygit2 import Repository -from pipelines.preprocess.prep_data import process_and_save_state -from pipelines.preprocess.prep_eval_data import save_eval_data +from prep_data import process_and_save_state +from prep_eval_data import save_eval_data numpyro.set_host_device_count(4) -from pipelines.fit_model.pyrenew.fit_pyrenew_model import fit_and_save_model # noqa -from pipelines.postprocess.generate_predictive import ( +from fit_pyrenew_model import fit_and_save_model # noqa +from generate_predictive import ( generate_and_save_predictions, ) # noqa @@ -76,7 +76,7 @@ def generate_epiweekly(model_run_dir: Path) -> None: result = subprocess.run( [ "Rscript", - "pipelines/preprocess/generate_epiweekly.R", + "pipelines/generate_epiweekly.R", f"{model_run_dir}", ], capture_output=True, @@ -92,7 +92,7 @@ def timeseries_forecasts( result = subprocess.run( [ "Rscript", - "pipelines/fit_model/timeseries_forecasts.R", + "pipelines/timeseries_forecasts.R", f"{model_run_dir}", "--model-name", f"{model_name}", @@ -114,7 +114,7 @@ def convert_inferencedata_to_parquet( result = subprocess.run( [ "Rscript", - "pipelines/postprocess/convert_inferencedata_to_parquet.R", + "pipelines/convert_inferencedata_to_parquet.R", f"{model_run_dir}", "--model-name", f"{model_name}", @@ -128,13 +128,13 @@ def convert_inferencedata_to_parquet( return None -def plot_state_forecast( +def plot_and_save_state_forecast( model_run_dir: Path, pyrenew_model_name: str, timeseries_model_name: str ) -> None: result = subprocess.run( [ "Rscript", - "pipelines/postprocess/plot_state_forecast.R", + "pipelines/plot_and_save_state_forecast.R", f"{model_run_dir}", "--pyrenew-model-name", f"{pyrenew_model_name}", @@ -144,7 +144,7 @@ def plot_state_forecast( capture_output=True, ) if result.returncode != 0: - raise RuntimeError(f"plot_state_forecast: {result.stderr}") + raise RuntimeError(f"plot_and_save_state_forecast: {result.stderr}") return None @@ -152,7 +152,7 @@ def score_forecast(model_run_dir: Path) -> None: result = subprocess.run( [ "Rscript", - "pipelines/postprocess/score_forecast.R", + "pipelines/score_forecast.R", f"{model_run_dir}", ], capture_output=True, @@ -367,7 +367,7 @@ def main( logger.info("Conversion complete.") logger.info("Postprocessing forecast...") - plot_state_forecast(model_run_dir, "pyrenew_e", "timeseries_e") + plot_and_save_state_forecast(model_run_dir, "pyrenew_e", "timeseries_e") logger.info("Postprocessing complete.") logger.info("Rendering webpage...") diff --git a/pipelines/preprocess/generate_epiweekly.R b/pipelines/generate_epiweekly.R similarity index 100% rename from pipelines/preprocess/generate_epiweekly.R rename to pipelines/generate_epiweekly.R diff --git a/pipelines/postprocess/generate_predictive.py b/pipelines/generate_predictive.py similarity index 97% rename from pipelines/postprocess/generate_predictive.py rename to pipelines/generate_predictive.py index 7a8bdceb..65f20c9d 100644 --- a/pipelines/postprocess/generate_predictive.py +++ b/pipelines/generate_predictive.py @@ -4,7 +4,7 @@ import arviz as az -from pipelines.fit_model.pyrenew.build_pyrenew_model import ( +from .build_pyrenew_model import ( build_model_from_dir, ) diff --git a/pipelines/hubverse/__init__.py b/pipelines/hubverse/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/pipelines/hubverse/create_observed_data_tables.py b/pipelines/hubverse_create_observed_data_tables.py similarity index 99% rename from pipelines/hubverse/create_observed_data_tables.py rename to pipelines/hubverse_create_observed_data_tables.py index 323a6f09..a9b44f51 100644 --- a/pipelines/hubverse/create_observed_data_tables.py +++ b/pipelines/hubverse_create_observed_data_tables.py @@ -6,7 +6,7 @@ import epiweeks import polars as pl -from pipelines.preprocess.prep_data import ( +from prep_data import ( aggregate_facility_level_nssp_to_state, get_state_pop_df, ) diff --git a/pipelines/hubverse/create_hubverse_table.R b/pipelines/hubverse_create_table.R similarity index 100% rename from pipelines/hubverse/create_hubverse_table.R rename to pipelines/hubverse_create_table.R diff --git a/pipelines/hubverse/make_obs_time_series_for_viz.py b/pipelines/hubverse_make_obs_time_series_for_viz.py similarity index 100% rename from pipelines/hubverse/make_obs_time_series_for_viz.py rename to pipelines/hubverse_make_obs_time_series_for_viz.py diff --git a/pipelines/hubverse/score_hubverse.R b/pipelines/hubverse_score.R similarity index 100% rename from pipelines/hubverse/score_hubverse.R rename to pipelines/hubverse_score.R diff --git a/pipelines/iteration_helpers/loop_fit.sh b/pipelines/iteration_helpers/loop_fit.sh deleted file mode 100755 index 99eafb6a..00000000 --- a/pipelines/iteration_helpers/loop_fit.sh +++ /dev/null @@ -1,17 +0,0 @@ -#!/bin/bash - -# Check if the base directory is provided as an argument -if [ -z "$1" ]; then - echo "Usage: $0 " - exit 1 -fi - -# Base directory containing subdirectories -BASE_DIR="$1" - -# Iterate over each subdirectory in the base directory -for SUBDIR in "$BASE_DIR"/*/; do - # Run the Python script with the current subdirectory as the model_dir argument - echo "$SUBDIR" - python pipelines.fit_model.pyrenew.fit_pyrenew_model.py "$SUBDIR" -done diff --git a/pipelines/iteration_helpers/loop_generate_predictive.sh b/pipelines/iteration_helpers/loop_generate_predictive.sh deleted file mode 100755 index c533f616..00000000 --- a/pipelines/iteration_helpers/loop_generate_predictive.sh +++ /dev/null @@ -1,18 +0,0 @@ -#!/bin/bash - -# Check if the base directory is provided as an argument -if [ -z "$1" ]; then - echo "Usage: $0 " - exit 1 -fi - -# Base directory containing subdirectories -BASE_DIR="$1" - - -# Iterate over each subdirectory in the base directory -for SUBDIR in "$BASE_DIR"/*/; do - # Run the Python script with the current subdirectory as the model_dir argument - echo "$SUBDIR" - python postprocess/generate_predictive.py "$SUBDIR" --n-forecast-points 28 -done diff --git a/pipelines/iteration_helpers/loop_postprocess.sh b/pipelines/iteration_helpers/loop_postprocess.sh deleted file mode 100755 index f2fb80db..00000000 --- a/pipelines/iteration_helpers/loop_postprocess.sh +++ /dev/null @@ -1,18 +0,0 @@ -#!/bin/bash - -# Check if the base directory is provided as an argument -if [ -z "$1" ]; then - echo "Usage: $0 " - exit 1 -fi - -# Base directory containing subdirectories -BASE_DIR="$1" - -# Iterate over each subdirectory in the base directory -for SUBDIR in "$BASE_DIR"/*/; do - # Run the R script with the current subdirectory as the model_dir argument - echo "$SUBDIR" - Rscript postprocess/convert_inferencedata_to_parquet.R "$SUBDIR" - Rscript postproces/plot_state_forecast.R "$SUBDIR" -done diff --git a/pipelines/iteration_helpers/loop_score.sh b/pipelines/iteration_helpers/loop_score.sh deleted file mode 100755 index d584e26b..00000000 --- a/pipelines/iteration_helpers/loop_score.sh +++ /dev/null @@ -1,17 +0,0 @@ -#!/bin/bash - -# Check if the base directory is provided as an argument -if [ -z "$1" ]; then - echo "Usage: $0 " - exit 1 -fi - -# Base directory containing subdirectories -BASE_DIR="$1" - -# Iterate over each subdirectory in the base directory -for SUBDIR in "$BASE_DIR"/*/; do - # Run the R script with the current subdirectory as the model_dir argument - echo "$SUBDIR" - Rscript postprocess/score_forecast.R "$SUBDIR" -done diff --git a/pipelines/postprocess/plot_state_forecast.R b/pipelines/plot_and_save_state_forecast.R similarity index 100% rename from pipelines/postprocess/plot_state_forecast.R rename to pipelines/plot_and_save_state_forecast.R diff --git a/pipelines/collate/plot_category_pointintervals.R b/pipelines/plot_category_pointintervals.R similarity index 100% rename from pipelines/collate/plot_category_pointintervals.R rename to pipelines/plot_category_pointintervals.R diff --git a/pipelines/postprocess/__init__.py b/pipelines/postprocess/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/pipelines/preprocess/prep_data.py b/pipelines/prep_data.py similarity index 100% rename from pipelines/preprocess/prep_data.py rename to pipelines/prep_data.py diff --git a/pipelines/preprocess/prep_eval_data.py b/pipelines/prep_eval_data.py similarity index 96% rename from pipelines/preprocess/prep_eval_data.py rename to pipelines/prep_eval_data.py index 711fb47c..a6108949 100644 --- a/pipelines/preprocess/prep_eval_data.py +++ b/pipelines/prep_eval_data.py @@ -4,7 +4,7 @@ import polars as pl -from pipelines.preprocess.prep_data import ( +from prep_data import ( get_state_pop_df, process_state_level_data, ) diff --git a/pipelines/preprocess/__init__.py b/pipelines/preprocess/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/pipelines/preprocess/pull_nhsn.R b/pipelines/preprocess/pull_nhsn.R deleted file mode 100644 index c35cc8fb..00000000 --- a/pipelines/preprocess/pull_nhsn.R +++ /dev/null @@ -1,68 +0,0 @@ -script_packages <- c( - "forecasttools", - "dplyr", - "readr", - "lubridate", - "argparser" -) - -# load in packages without messages -purrr::walk(script_packages, \(pkg) { - suppressPackageStartupMessages( - library(pkg, character.only = TRUE) - ) -}) - -disease_nhsn_key <- c( - "COVID-19" = "totalconfc19newadm", - "Influenza" = "totalconfflunewadm" -) - -p <- arg_parser( - "Pull NHSN data" -) |> - add_argument( - "--start-date", - type = "character", - default = NULL, - help = "Start date in YYYY-MM-DD format" - ) |> - add_argument( - "--end-date", - type = "character", - default = NULL, - help = "End date in YYYY-MM-DD format" - ) |> - add_argument( - "--disease", - type = "character", - help = "Disease name" - ) |> - add_argument( - "--output-file", - type = "character", - help = "Path to output file" - ) - - -argv <- parse_args(p) -start_date <- argv$start_date -end_date <- argv$end_date -disease <- argv$disease -output_file <- argv$output_file - -if (is.na(output_file)) { - output_file <- stdout() -} - -columns <- disease_nhsn_key[disease] - -dat <- pull_nhsn( - start_date = start_date, - end_date = end_date, - columns = columns -) |> - mutate(weekendingdate = as_date(weekendingdate)) |> - rename(nhsn_admissions = !!unname(columns)) |> - mutate(nhsn_admissions = as.numeric(nhsn_admissions)) |> - write_tsv(output_file) diff --git a/pipelines/postprocess/score_forecast.R b/pipelines/score_forecast.R similarity index 100% rename from pipelines/postprocess/score_forecast.R rename to pipelines/score_forecast.R diff --git a/pipelines/collate/summarize_visualize_scores.R b/pipelines/summarize_visualize_scores.R similarity index 100% rename from pipelines/collate/summarize_visualize_scores.R rename to pipelines/summarize_visualize_scores.R diff --git a/pipelines/tests/test_forecast_state.sh b/pipelines/tests/test_forecast_state.sh index 1eef160b..56a42e4f 100755 --- a/pipelines/tests/test_forecast_state.sh +++ b/pipelines/tests/test_forecast_state.sh @@ -16,7 +16,7 @@ else echo "TEST-MODE: Finished generating test data" fi echo "TEST-MODE: Running forecasting pipeline" -python pipelines/workflows/forecast_state.py \ +python pipelines/forecast_state.py \ --disease "COVID-19" \ --state "CA" \ --facility-level-nssp-data-dir "$BASE_DIR/private_data/nssp_etl_gold" \ diff --git a/pipelines/tests/test_run.sh b/pipelines/tests/test_run.sh index 2b45f563..a805548d 100644 --- a/pipelines/tests/test_run.sh +++ b/pipelines/tests/test_run.sh @@ -17,27 +17,27 @@ echo "For $N_SAMPLES samples on 1 chain, and $N_AHEAD forecast points" echo "" for SUBDIR in "$BASE_DIR"/*/; do echo "TEST-MODE: Inference for $SUBDIR" - python pipelines.fit_model.pyrenew.fit_pyrenew_model.py "$SUBDIR" --n-chains 1 --n-samples $N_SAMPLES + python fit_pyrenew_model.py "$SUBDIR" --n-chains 1 --n-samples $N_SAMPLES echo "TEST-MODE: Finished inference" echo "" echo "TEST-MODE: Generating posterior predictions for $SUBDIR" - python postprocess/generate_predictive.py "$SUBDIR" --n-forecast-points $N_AHEAD + python generate_predictive.py "$SUBDIR" --n-forecast-points $N_AHEAD echo "TEST-MODE: Finished generating posterior predictions" echo "" echo "TEST-MODE: Converting inferencedata to parquet for $SUBDIR" - Rscript postprocess/convert_inferencedata_to_parquet.R "$SUBDIR" + Rscript convert_inferencedata_to_parquet.R "$SUBDIR" echo "TEST-MODE: Finished converting inferencedata to parquet" echo "" echo "TEST-MODE: Generate epiweekly data for $SUBDIR" - Rscript preprocess/generate_epiweekly.R "$SUBDIR" + Rscript generate_epiweekly.R "$SUBDIR" echo "TEST-MODE: Finished generating epiweekly data" echo "" echo "TEST-MODE: Forecasting baseline models for $SUBDIR" - Rscript fit_model/timeseries_forecasts.R "$SUBDIR" --n-forecast-days $N_AHEAD --n-samples $N_SAMPLES + Rscript timeseries_forecasts.R "$SUBDIR" --n-forecast-days $N_AHEAD --n-samples $N_SAMPLES echo "TEST-MODE: Finished forecasting baseline models" echo "" echo "TEST-MODE: Plotting state forecast for $SUBDIR" - Rscript postproces/plot_state_forecast.R "$SUBDIR" + Rscript plot_state_forecast.R "$SUBDIR" echo "TEST-MODE: Finished postprocessing state forecast" echo "" echo "TEST-MODE: Rendering webpage for $SUBDIR" @@ -45,6 +45,6 @@ for SUBDIR in "$BASE_DIR"/*/; do echo "TEST-MODE: Finished rendering webpage" echo "" echo "TEST-MODE: Scoring forecast for $SUBDIR" - Rscript postprocess/score_forecast.R "$SUBDIR" + Rscript score_forecast.R "$SUBDIR" echo "TEST-MODE: Finished scoring forecast" done diff --git a/pipelines/fit_model/timeseries_forecasts.R b/pipelines/timeseries_forecasts.R similarity index 100% rename from pipelines/fit_model/timeseries_forecasts.R rename to pipelines/timeseries_forecasts.R diff --git a/pipelines/workflows/__init__.py b/pipelines/workflows/__init__.py deleted file mode 100644 index e69de29b..00000000 From eaa29b4df3b02064f9a02e64c169146e1d059a4d Mon Sep 17 00:00:00 2001 From: Damon Bayer Date: Mon, 23 Dec 2024 10:02:31 -0600 Subject: [PATCH 11/19] restore readme --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 4cff7858..1e95ec33 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ This repository contains code for the [PyRenew-HEW model](https://github.com/CDC ## Containers -The project uses GitHub Actions for automatically building container images based on the project's [Containerfile](Containerfile) and [Containerfile.dependencies](Containerfile.dependencies) files. The images are currently hosted on Azure Container Registry and are built and pushed via the [containers.yaml](.github/containers.yaml) GitHub Actions workflow. +The project uses GitHub Actions for automatically building container images based on the project's [Containerfile](Containerfile) and [Containerfile.dependencies](Containerfile.dependencies) files. The images are currently hosted on Azure Container Registry and are built and pushed via the [containers.yaml](.github/workflows/containers.yaml) GitHub Actions workflow. Images can also be built locally. The [Makefile](Makefile) contains several targets for building and pushing images. Although the Makefile uses Docker as the default engine, the `ENGINE` environment variable can be set to `podman` to use Podman instead, for example: @@ -22,7 +22,7 @@ ENGINE=podman make dep_container_build # podman build . -t pyrenew-hew-dependencies -f Containerfile.dependencies ``` -Container images pushed to the Azure Container Registry are automatically tagged as either `latest` (if the commit is on the `main` branch) or with the branch name (if the commit is on a different branch). After a branch is deleted, the image tag is remove from the registry via the [delete-container-tag.yaml](.github/delete-container-tag.yaml) GitHub Actions workflow. +Container images pushed to the Azure Container Registry are automatically tagged as either `latest` (if the commit is on the `main` branch) or with the branch name (if the commit is on a different branch). After a branch is deleted, the image tag is remove from the registry via the [delete-container-tag.yaml](.github/workflows/delete-container-tag.yaml) GitHub Actions workflow. ## General Disclaimer This repository was created for use by CDC programs to collaborate on public health related projects in support of the [CDC mission](https://www.cdc.gov/about/organization/mission.htm). GitHub is not hosted by the CDC, but is a third party website used by CDC and its partners to share information and collaborate on software. CDC use of GitHub does not imply an endorsement of any one particular service, product, or enterprise. From 28277f69a2537ae3dd6bb96956adb530160d491d Mon Sep 17 00:00:00 2001 From: Damon Bayer Date: Mon, 23 Dec 2024 10:03:11 -0600 Subject: [PATCH 12/19] remove more inits --- pipelines/batch/__init__.py | 0 pipelines/diagnostic_report/__init__.py | 0 2 files changed, 0 insertions(+), 0 deletions(-) delete mode 100644 pipelines/batch/__init__.py delete mode 100644 pipelines/diagnostic_report/__init__.py diff --git a/pipelines/batch/__init__.py b/pipelines/batch/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/pipelines/diagnostic_report/__init__.py b/pipelines/diagnostic_report/__init__.py deleted file mode 100644 index e69de29b..00000000 From bfd07f4565c0ec144e1061ed39df848ff880aaeb Mon Sep 17 00:00:00 2001 From: Damon Bayer Date: Mon, 23 Dec 2024 10:07:32 -0600 Subject: [PATCH 13/19] remove another init --- pipelines/__init__.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 pipelines/__init__.py diff --git a/pipelines/__init__.py b/pipelines/__init__.py deleted file mode 100644 index e69de29b..00000000 From 0b5910bc3cdf5e20dfc495740d31a35f063dde75 Mon Sep 17 00:00:00 2001 From: Damon Bayer Date: Mon, 23 Dec 2024 10:08:37 -0600 Subject: [PATCH 14/19] correct import --- pipelines/fit_pyrenew_model.py | 2 +- pipelines/generate_predictive.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pipelines/fit_pyrenew_model.py b/pipelines/fit_pyrenew_model.py index 6f1bc788..090fa114 100644 --- a/pipelines/fit_pyrenew_model.py +++ b/pipelines/fit_pyrenew_model.py @@ -5,7 +5,7 @@ import jax import numpy as np -from .build_pyrenew_model import ( +from build_pyrenew_model import ( build_model_from_dir, ) diff --git a/pipelines/generate_predictive.py b/pipelines/generate_predictive.py index 65f20c9d..897eb886 100644 --- a/pipelines/generate_predictive.py +++ b/pipelines/generate_predictive.py @@ -4,7 +4,7 @@ import arviz as az -from .build_pyrenew_model import ( +from build_pyrenew_model import ( build_model_from_dir, ) From 31f97446fc46a2ab52f0302b7b5e49f5617cfdaf Mon Sep 17 00:00:00 2001 From: Damon Bayer Date: Mon, 23 Dec 2024 10:09:44 -0600 Subject: [PATCH 15/19] pre-commit --- pipelines/fit_pyrenew_model.py | 1 - pipelines/forecast_state.py | 3 +-- pipelines/generate_predictive.py | 1 - pipelines/hubverse_create_observed_data_tables.py | 1 - pipelines/prep_eval_data.py | 1 - 5 files changed, 1 insertion(+), 6 deletions(-) diff --git a/pipelines/fit_pyrenew_model.py b/pipelines/fit_pyrenew_model.py index 090fa114..a63278db 100644 --- a/pipelines/fit_pyrenew_model.py +++ b/pipelines/fit_pyrenew_model.py @@ -4,7 +4,6 @@ import jax import numpy as np - from build_pyrenew_model import ( build_model_from_dir, ) diff --git a/pipelines/forecast_state.py b/pipelines/forecast_state.py index 9a217c4c..74756665 100644 --- a/pipelines/forecast_state.py +++ b/pipelines/forecast_state.py @@ -10,10 +10,9 @@ import polars as pl import tomli_w import tomllib -from pygit2 import Repository - from prep_data import process_and_save_state from prep_eval_data import save_eval_data +from pygit2 import Repository numpyro.set_host_device_count(4) diff --git a/pipelines/generate_predictive.py b/pipelines/generate_predictive.py index 897eb886..fb3a5c0d 100644 --- a/pipelines/generate_predictive.py +++ b/pipelines/generate_predictive.py @@ -3,7 +3,6 @@ from pathlib import Path import arviz as az - from build_pyrenew_model import ( build_model_from_dir, ) diff --git a/pipelines/hubverse_create_observed_data_tables.py b/pipelines/hubverse_create_observed_data_tables.py index a9b44f51..660e613f 100644 --- a/pipelines/hubverse_create_observed_data_tables.py +++ b/pipelines/hubverse_create_observed_data_tables.py @@ -5,7 +5,6 @@ import epiweeks import polars as pl - from prep_data import ( aggregate_facility_level_nssp_to_state, get_state_pop_df, diff --git a/pipelines/prep_eval_data.py b/pipelines/prep_eval_data.py index a6108949..eddec0f5 100644 --- a/pipelines/prep_eval_data.py +++ b/pipelines/prep_eval_data.py @@ -3,7 +3,6 @@ from pathlib import Path import polars as pl - from prep_data import ( get_state_pop_df, process_state_level_data, From 28d35cd45053bad67292a7dae3bfcaad1f7ab084 Mon Sep 17 00:00:00 2001 From: Damon Bayer Date: Mon, 23 Dec 2024 10:59:01 -0600 Subject: [PATCH 16/19] rename n_datapoints --- demos/hosp_only_ww_model/pyrenew_hew_model.qmd | 4 ++-- demos/ww_model/ww_model_demo.qmd | 6 +++--- pipelines/generate_predictive.py | 5 ++++- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/demos/hosp_only_ww_model/pyrenew_hew_model.qmd b/demos/hosp_only_ww_model/pyrenew_hew_model.qmd index 6f6e4478..3e993fe0 100644 --- a/demos/hosp_only_ww_model/pyrenew_hew_model.qmd +++ b/demos/hosp_only_ww_model/pyrenew_hew_model.qmd @@ -50,7 +50,7 @@ We check that we can simulate from the prior predictive n_forecast_days = 35 prior_predictive = my_pyrenew_hew_model.prior_predictive( - n_datapoints=len(data_observed_disease_ed_visits) + n_forecast_days, + n_observed_disease_ed_visits_datapoints=len(data_observed_disease_ed_visits) + n_forecast_days, numpyro_predictive_args={"num_samples": 200}, ) ``` @@ -75,7 +75,7 @@ Create the posterior predictive and forecast: ```{python} # | label: posterior predictive posterior_predictive = my_pyrenew_hew_model.posterior_predictive( - n_datapoints=len(data_observed_disease_ed_visits) + n_forecast_days + n_observed_disease_ed_visits_datapoints=len(data_observed_disease_ed_visits) + n_forecast_days ) ``` diff --git a/demos/ww_model/ww_model_demo.qmd b/demos/ww_model/ww_model_demo.qmd index 9d7d2932..f1e00ef0 100644 --- a/demos/ww_model/ww_model_demo.qmd +++ b/demos/ww_model/ww_model_demo.qmd @@ -327,7 +327,7 @@ Check that we can simulate from the prior predictive n_forecast_days = 35 prior_predictive = my_model.prior_predictive( - n_datapoints= max(len(data_observed_hospital_admissions), max_ww_sampled_days) + n_forecast_days, + n_observed_disease_ed_visits_datapoints= max(len(data_observed_hospital_admissions), max_ww_sampled_days) + n_forecast_days, numpyro_predictive_args={"num_samples": 100}, ) ``` @@ -349,7 +349,7 @@ Simulate the posterior predictive distribution ```{python} # | label: posterior predictive posterior_predictive = my_model.posterior_predictive( - n_datapoints= max(len(data_observed_hospital_admissions), max_ww_sampled_days) + n_forecast_days, is_predictive=True + n_observed_disease_ed_visits_datapoints= max(len(data_observed_hospital_admissions), max_ww_sampled_days) + n_forecast_days, is_predictive=True ) ``` @@ -444,7 +444,7 @@ my_model_hosp_only_fit.run( ```{python} posterior_predictive_hosp_only = my_model_hosp_only_fit.posterior_predictive( - n_datapoints= len(data_observed_hospital_admissions) + n_forecast_days,is_predictive=True + n_observed_disease_ed_visits_datapoints= len(data_observed_hospital_admissions) + n_forecast_days,is_predictive=True ) ``` diff --git a/pipelines/generate_predictive.py b/pipelines/generate_predictive.py index fb3a5c0d..623f99b7 100644 --- a/pipelines/generate_predictive.py +++ b/pipelines/generate_predictive.py @@ -33,7 +33,10 @@ def generate_and_save_predictions( my_model.mcmc.sampler = fresh_sampler posterior_predictive = my_model.posterior_predictive( - n_datapoints=len(data_observed_disease_ed_visits) + n_forecast_points + n_observed_disease_ed_visits_datapoints=len( + data_observed_disease_ed_visits + ) + + n_forecast_points ) idata = az.from_numpyro( From 2586edfc87d0edc7bd015fa74243186efcbd5a18 Mon Sep 17 00:00:00 2001 From: Damon Bayer Date: Mon, 23 Dec 2024 11:10:58 -0600 Subject: [PATCH 17/19] recommit n_datapoints change --- pyrenew_hew/pyrenew_hew_model.py | 38 ++++++++++++++++++++------------ 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/pyrenew_hew/pyrenew_hew_model.py b/pyrenew_hew/pyrenew_hew_model.py index 0aac754f..fe5b458f 100644 --- a/pyrenew_hew/pyrenew_hew_model.py +++ b/pyrenew_hew/pyrenew_hew_model.py @@ -84,28 +84,35 @@ def validate(self): # numpydoc ignore=GL08 def sample( self, - n_datapoints=None, + n_observed_disease_ed_visits_datapoints=None, data_observed_disease_ed_visits=None, right_truncation_offset=None, ): # numpydoc ignore=GL08 - if n_datapoints is None and data_observed_disease_ed_visits is None: + if ( + n_observed_disease_ed_visits_datapoints is None + and data_observed_disease_ed_visits is None + ): raise ValueError( - "Either n_datapoints or data_observed_disease_ed_visits " + "Either n_observed_disease_ed_visits_datapoints or data_observed_disease_ed_visits " "must be passed." ) elif ( - n_datapoints is not None + n_observed_disease_ed_visits_datapoints is not None and data_observed_disease_ed_visits is not None ): raise ValueError( - "Cannot pass both n_datapoints and data_observed_disease_ed_visits." + "Cannot pass both n_observed_disease_ed_visits_datapoints and data_observed_disease_ed_visits." + ) + elif n_observed_disease_ed_visits_datapoints is None: + n_observed_disease_ed_visits_datapoints = len( + data_observed_disease_ed_visits ) - elif n_datapoints is None: - n_datapoints = len(data_observed_disease_ed_visits) else: - n_datapoints = n_datapoints + n_observed_disease_ed_visits_datapoints = ( + n_observed_disease_ed_visits_datapoints + ) - n_weeks_post_init = n_datapoints // 7 + 1 + n_weeks_post_init = n_observed_disease_ed_visits_datapoints // 7 + 1 i0 = self.infection_initialization_process() eta_sd = self.eta_sd_rv() @@ -127,7 +134,7 @@ def sample( rtu = repeat_until_n( data=jnp.exp(log_rtu_weekly), - n_timepoints=n_datapoints, + n_timepoints=n_observed_disease_ed_visits_datapoints, offset=0, period_size=7, ) @@ -174,14 +181,16 @@ def sample( iedr = jnp.repeat( transformation.SigmoidTransform()(p_ed_ar + p_ed_mean), repeats=7, - )[:n_datapoints] + )[:n_observed_disease_ed_visits_datapoints] # this is only applied after the ed visits are generated, not to all the latent infections. This is why we cannot apply the iedr in compute_delay_ascertained_incidence # see https://github.com/CDCgov/ww-inference-model/issues/43 numpyro.deterministic("iedr", iedr) ed_wday_effect_raw = self.ed_wday_effect_rv() - ed_wday_effect = tile_until_n(ed_wday_effect_raw, n_datapoints) + ed_wday_effect = tile_until_n( + ed_wday_effect_raw, n_observed_disease_ed_visits_datapoints + ) inf_to_ed = self.inf_to_ed_rv() @@ -189,7 +198,7 @@ def sample( p_observed_given_incident=1, latent_incidence=latent_infections, delay_incidence_to_observation_pmf=inf_to_ed, - )[-n_datapoints:] + )[-n_observed_disease_ed_visits_datapoints:] latent_ed_visits_final = ( potential_latent_ed_visits * iedr * ed_wday_effect * self.state_pop @@ -200,7 +209,8 @@ def sample( self.right_truncation_cdf_rv()[right_truncation_offset:] ) n_points_to_prepend = ( - n_datapoints - prop_already_reported_tail.shape[0] + n_observed_disease_ed_visits_datapoints + - prop_already_reported_tail.shape[0] ) prop_already_reported = jnp.pad( prop_already_reported_tail, From 81f1b4102087dbc7976b6b6719e049fd857f1b34 Mon Sep 17 00:00:00 2001 From: Damon Bayer Date: Mon, 23 Dec 2024 11:30:09 -0600 Subject: [PATCH 18/19] allow observing hospital admissions --- pyrenew_hew/pyrenew_hew_model.py | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/pyrenew_hew/pyrenew_hew_model.py b/pyrenew_hew/pyrenew_hew_model.py index fe5b458f..af43c164 100644 --- a/pyrenew_hew/pyrenew_hew_model.py +++ b/pyrenew_hew/pyrenew_hew_model.py @@ -14,7 +14,7 @@ InitializeInfectionsExponentialGrowth, ) from pyrenew.metaclass import Model -from pyrenew.observation import NegativeBinomialObservation +from pyrenew.observation import NegativeBinomialObservation, PoissonObservation from pyrenew.process import ARProcess, DifferencedProcess from pyrenew.randomvariable import DistributionalVariable, TransformedVariable @@ -85,7 +85,9 @@ def validate(self): # numpydoc ignore=GL08 def sample( self, n_observed_disease_ed_visits_datapoints=None, + n_observed_hospital_admissions_datapoints=None, data_observed_disease_ed_visits=None, + data_observed_hospital_admissions=None, right_truncation_offset=None, ): # numpydoc ignore=GL08 if ( @@ -107,9 +109,13 @@ def sample( n_observed_disease_ed_visits_datapoints = len( data_observed_disease_ed_visits ) - else: - n_observed_disease_ed_visits_datapoints = ( - n_observed_disease_ed_visits_datapoints + + if ( + n_observed_hospital_admissions_datapoints is None + and data_observed_hospital_admissions is not None + ): + n_observed_hospital_admissions_datapoints = len( + data_observed_hospital_admissions ) n_weeks_post_init = n_observed_disease_ed_visits_datapoints // 7 + 1 @@ -233,6 +239,14 @@ def sample( obs=data_observed_disease_ed_visits, ) + if n_observed_hospital_admissions_datapoints is not None: + hospital_admissions_obs_rv = PoissonObservation( + "observed_hospital_admissions" + ) + data_observed_hospital_admissions = hospital_admissions_obs_rv( + mu=jnp.ones(n_observed_hospital_admissions_datapoints) + ) + return observed_ed_visits From 332afd67897c84f56e4a01b4b90345e52f83884f Mon Sep 17 00:00:00 2001 From: Damon Bayer Date: Mon, 23 Dec 2024 15:50:23 -0600 Subject: [PATCH 19/19] update predictive and fix names --- demos/ww_model/ww_model_demo.qmd | 12 ++++++------ pipelines/build_pyrenew_model.py | 4 ++++ pipelines/fit_pyrenew_model.py | 1 + pipelines/generate_predictive.py | 7 ++++++- pipelines/prep_data.py | 2 +- pyrenew_hew/pyrenew_hew_model.py | 12 +++++++----- 6 files changed, 25 insertions(+), 13 deletions(-) diff --git a/demos/ww_model/ww_model_demo.qmd b/demos/ww_model/ww_model_demo.qmd index f1e00ef0..6747af6c 100644 --- a/demos/ww_model/ww_model_demo.qmd +++ b/demos/ww_model/ww_model_demo.qmd @@ -62,7 +62,7 @@ max_ww_sampled_days = max(stan_data["ww_sampled_times"]) lab_site_to_subpop_map = jnp.array(stan_data["lab_site_to_subpop_map"]) -1 #vector mapping the subpops to lab-site combos data_observed_log_conc = jnp.array(stan_data["log_conc"]) -data_observed_hospital_admissions = jnp.array(stan_data["hosp"]) +data_observed_disease_hospital_admissions = jnp.array(stan_data["hosp"]) ``` ```{python} @@ -327,7 +327,7 @@ Check that we can simulate from the prior predictive n_forecast_days = 35 prior_predictive = my_model.prior_predictive( - n_observed_disease_ed_visits_datapoints= max(len(data_observed_hospital_admissions), max_ww_sampled_days) + n_forecast_days, + n_observed_disease_ed_visits_datapoints= max(len(data_observed_disease_hospital_admissions), max_ww_sampled_days) + n_forecast_days, numpyro_predictive_args={"num_samples": 100}, ) ``` @@ -339,7 +339,7 @@ my_model.run( num_warmup=750, num_samples=500, rng_key=jax.random.key(223), - data_observed_hospital_admissions=data_observed_hospital_admissions, + data_observed_disease_hospital_admissions=data_observed_disease_hospital_admissions, data_observed_log_conc=data_observed_log_conc, mcmc_args=dict(num_chains=4) ) @@ -349,7 +349,7 @@ Simulate the posterior predictive distribution ```{python} # | label: posterior predictive posterior_predictive = my_model.posterior_predictive( - n_observed_disease_ed_visits_datapoints= max(len(data_observed_hospital_admissions), max_ww_sampled_days) + n_forecast_days, is_predictive=True + n_observed_disease_ed_visits_datapoints= max(len(data_observed_disease_hospital_admissions), max_ww_sampled_days) + n_forecast_days, is_predictive=True ) ``` @@ -437,14 +437,14 @@ my_model_hosp_only_fit.run( num_warmup=750, num_samples=500, rng_key=jax.random.key(223), - data_observed_hospital_admissions=data_observed_hospital_admissions, + data_observed_disease_hospital_admissions=data_observed_disease_hospital_admissions, mcmc_args=dict(num_chains=4) ) ``` ```{python} posterior_predictive_hosp_only = my_model_hosp_only_fit.posterior_predictive( - n_observed_disease_ed_visits_datapoints= len(data_observed_hospital_admissions) + n_forecast_days,is_predictive=True + n_observed_disease_ed_visits_datapoints= len(data_observed_disease_hospital_admissions) + n_forecast_days,is_predictive=True ) ``` diff --git a/pipelines/build_pyrenew_model.py b/pipelines/build_pyrenew_model.py index 5d062f38..115c031c 100644 --- a/pipelines/build_pyrenew_model.py +++ b/pipelines/build_pyrenew_model.py @@ -34,6 +34,9 @@ def build_model_from_dir(model_dir): data_observed_disease_ed_visits = jnp.array( model_data["data_observed_disease_ed_visits"] ) + data_observed_disease_hospital_admissions = jnp.array( + model_data["data_observed_disease_hospital_admissions"] + ) state_pop = jnp.array(model_data["state_pop"]) right_truncation_pmf_rv = DeterministicVariable( @@ -75,5 +78,6 @@ def build_model_from_dir(model_dir): return ( my_model, data_observed_disease_ed_visits, + data_observed_disease_hospital_admissions, right_truncation_offset, ) diff --git a/pipelines/fit_pyrenew_model.py b/pipelines/fit_pyrenew_model.py index a63278db..43ce5442 100644 --- a/pipelines/fit_pyrenew_model.py +++ b/pipelines/fit_pyrenew_model.py @@ -29,6 +29,7 @@ def fit_and_save_model( ( my_model, data_observed_disease_ed_visits, + data_observed_disease_hospital_admissions, right_truncation_offset, ) = build_model_from_dir(model_run_dir) my_model.run( diff --git a/pipelines/generate_predictive.py b/pipelines/generate_predictive.py index 623f99b7..e483af8c 100644 --- a/pipelines/generate_predictive.py +++ b/pipelines/generate_predictive.py @@ -18,6 +18,7 @@ def generate_and_save_predictions( ( my_model, data_observed_disease_ed_visits, + data_observed_disease_hospital_admissions, right_truncation_offset, ) = build_model_from_dir(model_run_dir) @@ -36,7 +37,11 @@ def generate_and_save_predictions( n_observed_disease_ed_visits_datapoints=len( data_observed_disease_ed_visits ) - + n_forecast_points + + n_forecast_points, + n_observed_hospital_admissions_datapoints=len( + data_observed_disease_hospital_admissions + ) + + n_forecast_points // 7, ) idata = az.from_numpyro( diff --git a/pipelines/prep_data.py b/pipelines/prep_data.py index af4af580..ca30468c 100644 --- a/pipelines/prep_data.py +++ b/pipelines/prep_data.py @@ -376,7 +376,7 @@ def process_and_save_state( "generation_interval_pmf": generation_interval_pmf, "right_truncation_pmf": right_truncation_pmf, "data_observed_disease_ed_visits": train_disease_ed_visits, - "data_observed_total_hospital_admissions": train_total_ed_visits, + "data_observed_disease_hospital_admissions": train_total_ed_visits, "data_observed_disease_hospital_admissions": train_disease_hospital_admissions, "nssp_training_dates": nssp_training_dates, "nhsn_training_dates": nhsn_training_dates, diff --git a/pyrenew_hew/pyrenew_hew_model.py b/pyrenew_hew/pyrenew_hew_model.py index af43c164..88658739 100644 --- a/pyrenew_hew/pyrenew_hew_model.py +++ b/pyrenew_hew/pyrenew_hew_model.py @@ -87,7 +87,7 @@ def sample( n_observed_disease_ed_visits_datapoints=None, n_observed_hospital_admissions_datapoints=None, data_observed_disease_ed_visits=None, - data_observed_hospital_admissions=None, + data_observed_disease_hospital_admissions=None, right_truncation_offset=None, ): # numpydoc ignore=GL08 if ( @@ -112,10 +112,10 @@ def sample( if ( n_observed_hospital_admissions_datapoints is None - and data_observed_hospital_admissions is not None + and data_observed_disease_hospital_admissions is not None ): n_observed_hospital_admissions_datapoints = len( - data_observed_hospital_admissions + data_observed_disease_hospital_admissions ) n_weeks_post_init = n_observed_disease_ed_visits_datapoints // 7 + 1 @@ -243,8 +243,10 @@ def sample( hospital_admissions_obs_rv = PoissonObservation( "observed_hospital_admissions" ) - data_observed_hospital_admissions = hospital_admissions_obs_rv( - mu=jnp.ones(n_observed_hospital_admissions_datapoints) + data_observed_disease_hospital_admissions = ( + hospital_admissions_obs_rv( + mu=jnp.ones(n_observed_hospital_admissions_datapoints) + 50 + ) ) return observed_ed_visits