Skip to content

Store pipeline output in compressed CSVs and read/write them using data.table #308

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: dev
Choose a base branch
from
8 changes: 4 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ r_build:
docker build --no-cache --force-rm --pull -t forecast-eval-build docker_build

# Download the named file from the AWS S3 bucket
%.rds: dist
%.csv.gz: dist
test -f dist/$@ || curl -o dist/$@ $(S3_URL)/$@

# Specify all the data files we want to download
pull_data: score_cards_state_deaths.rds score_cards_state_cases.rds score_cards_nation_cases.rds score_cards_nation_deaths.rds score_cards_state_hospitalizations.rds score_cards_nation_hospitalizations.rds datetime_created_utc.rds
pull_data: score_cards_state_deaths.csv.gz score_cards_state_cases.csv.gz score_cards_nation_cases.csv.gz score_cards_nation_deaths.csv.gz score_cards_state_hospitalizations.csv.gz score_cards_nation_hospitalizations.csv.gz datetime_created_utc.csv.gz

# Download all the predictions cards objects. This is
# useful for development and debugging
pull_pred_cards: predictions_cards_confirmed_admissions_covid_1d.rds predictions_cards_confirmed_incidence_num.rds predictions_cards_deaths_incidence_num.rds
pull_pred_cards: predictions_cards_confirmed_admissions_covid_1d.csv.gz predictions_cards_confirmed_incidence_num.csv.gz predictions_cards_deaths_incidence_num.csv.gz

# Create the dist directory
dist:
Expand All @@ -59,7 +59,7 @@ score_forecast: r_build dist pull_data

# Post scoring pipeline output files to the AWS S3 bucket
deploy: score_forecast
aws s3 cp dist/ $(S3_BUCKET)/ --recursive --exclude "*" --include "*rds" --acl public-read
aws s3 cp dist/ $(S3_BUCKET)/ --recursive --exclude "*" --include "*csv.gz" --acl public-read

# Run bash in a docker container with a full preconfigured R environment
#
Expand Down
14 changes: 7 additions & 7 deletions Report/create_reports.R
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ library("dplyr")
library("evalcast")
library("lubridate")
library("stringr")
library("data.table")

# TODO: Contains fixed versions of WIS component metrics, to be ported over to evalcast
# Redefines overprediction, underprediction and sharpness
Expand All @@ -23,7 +24,8 @@ option_list <- list(
opt_parser <- OptionParser(option_list = option_list)
opt <- parse_args(opt_parser)
output_dir <- opt$dir
prediction_cards_filename <- "predictions_cards_${signal}.rds"
# Compress since prediction cards obj is big
prediction_cards_filename <- "predictions_cards_${signal}.csv.gz"
prediction_cards_filepath <- case_when(
!is.null(output_dir) ~ file.path(output_dir, prediction_cards_filename),
TRUE ~ prediction_cards_filename
Expand Down Expand Up @@ -106,17 +108,15 @@ class(predictions_cards) <- c("predictions_cards", class(predictions_cards))
print("Saving predictions...")
if (length(signals) == 1) {
signal <- signals
saveRDS(predictions_cards,
file = str_interp(prediction_cards_filepath),
compress = "xz"
fwrite(predictions_cards,
file = str_interp(prediction_cards_filepath)
)
} else {
# Save each signal separately.
for (signal_group in group_split(predictions_cards, signal)) {
signal <- signal_group$signal[1]
saveRDS(signal_group,
fwrite(signal_group,
file = str_interp(prediction_cards_filepath),
compress = "xz"
)
}
}
Expand Down Expand Up @@ -228,5 +228,5 @@ if (length(save_score_errors) > 0) {
stop(paste(save_score_errors, collapse = "\n"))
}

saveRDS(data.frame(datetime = c(data_pull_timestamp)), file = file.path(output_dir, "datetime_created_utc.rds"))
fwrite(data.frame(datetime = c(data_pull_timestamp)), file = file.path(output_dir, "datetime_created_utc.csv.gz"))
print("Done")
7 changes: 2 additions & 5 deletions Report/score.R
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ generate_score_card_file_path <- function(geo_type, signal_name, output_dir) {
output_dir,
paste0(
"score_cards_", geo_type, "_",
sig_suffix, ".rds"
sig_suffix, ".csv.gz"
)
)
return(output_file_name)
Expand Down Expand Up @@ -44,10 +44,7 @@ save_score_cards <- function(score_card, geo_type = c("state", "nation"),
}

output_file_name <- generate_score_card_file_path(geo_type, signal_name, output_dir)
saveRDS(score_card,
file = output_file_name,
compress = "xz"
)
fwrite(score_card, file = output_file_name)
}

save_score_cards_wrapper <- function(score_card, geo_type, signal_name, output_dir) {
Expand Down
2 changes: 1 addition & 1 deletion Report/utils.R
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
check_for_missing_forecasters <- function(predictions_cards, forecasters_list, geo_type, signal_name, output_dir) {
output_file_name <- generate_score_card_file_path(geo_type, signal_name, output_dir)
previous_run_forecasters <- readRDS(output_file_name) %>%
previous_run_forecasters <- fread(output_file_name, data.table = FALSE) %>%
filter(signal == signal_name) %>%
distinct(forecaster) %>%
pull()
Expand Down
32 changes: 21 additions & 11 deletions app/R/data.R
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
library(aws.s3)
library(data.table)

# Set application-level caching location. Stores up to 1GB of caches. Removes
# least recently used objects first.
Expand Down Expand Up @@ -33,16 +34,25 @@ getData <- function(filename, s3bucket) {
if (!is.null(s3bucket)) {
tryCatch(
{
aws.s3::s3readRDS(object = filename, bucket = s3bucket)
result <- s3read_using(fread, data.table = FALSE, object = filename, bucket = s3bucket)
},
error = function(e) {
e
getFallbackData(filename)
result <- getFallbackData(filename)
}
)
} else {
getFallbackData(filename)
result <- getFallbackData(filename)
}

if (filename != "datetime_created_utc.csv.gz") {
# fread uses the `IDate` class for dates. This causes problems downstream,
# so cast to base `Date`.
result$target_end_date <- as.Date(result$target_end_date)
result$forecast_date <- as.Date(result$forecast_date)
}

return(result)
}

createS3DataFactory <- function(s3bucket) {
Expand All @@ -57,29 +67,29 @@ getFallbackData <- function(filename) {
filename,
file.path("../dist/", filename)
)
readRDS(path)
fread(path, data.table = FALSE)
}


getCreationDate <- function(loadFile) {
dataCreationDate <- loadFile("datetime_created_utc.rds")
dataCreationDate <- loadFile("datetime_created_utc.csv.gz")
return(dataCreationDate %>% pull(datetime) %>% as.Date())
}


getAllData <- function(loadFile, targetVariable) {
df <- switch(targetVariable,
"Deaths" = bind_rows(
loadFile("score_cards_state_deaths.rds"),
loadFile("score_cards_nation_deaths.rds")
loadFile("score_cards_state_deaths.csv.gz"),
loadFile("score_cards_nation_deaths.csv.gz")
),
"Cases" = bind_rows(
loadFile("score_cards_state_cases.rds"),
loadFile("score_cards_nation_cases.rds")
loadFile("score_cards_state_cases.csv.gz"),
loadFile("score_cards_nation_cases.csv.gz")
),
"Hospitalizations" = bind_rows(
loadFile("score_cards_state_hospitalizations.rds"),
loadFile("score_cards_nation_hospitalizations.rds")
loadFile("score_cards_state_hospitalizations.csv.gz"),
loadFile("score_cards_nation_hospitalizations.csv.gz")
)
)

Expand Down
30 changes: 20 additions & 10 deletions app/assets/about.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,21 +91,25 @@ The forecasts and scores are available as RDS files and are uploaded weekly to a
You can use the url https://forecast-eval.s3.us-east-2.amazonaws.com/ + filename to download
any of the files from the bucket.

For instance: https://forecast-eval.s3.us-east-2.amazonaws.com/score_cards_nation_cases.rds to download scores for nation level case predictions.
For instance: https://forecast-eval.s3.us-east-2.amazonaws.com/score_cards_nation_cases.csv.gz to download scores for nation level case predictions.

The available files are:
* predictions_cards.rds (forecasts)
* score_cards_nation_cases.rds
* score_cards_nation_deaths.rds
* score_cards_state_cases.rds
* score_cards_state_deaths.rds
* score_cards_state_hospitalizations.rds
* score_cards_nation_hospitalizations.rds
* predictions_cards_confirmed_incidence_num.csv.gz (forecasts for cases)
* predictions_cards_deaths_incidence_num.csv.gz (forecasts for deaths)
* predictions_cards_confirmed_admissions_covid_1d.csv.gz (forecasts for hospitalizations)
* score_cards_nation_cases.csv.gz
* score_cards_nation_deaths.csv.gz
* score_cards_state_cases.csv.gz
* score_cards_state_deaths.csv.gz
* score_cards_state_hospitalizations.csv.gz
* score_cards_nation_hospitalizations.csv.gz

You can also connect to AWS and retrieve the data in R. Example of retrieving state cases file:

```
library(aws.s3)
library(data.table)

Sys.setenv("AWS_DEFAULT_REGION" = "us-east-2")
s3bucket = tryCatch(
{
Expand All @@ -118,14 +122,20 @@ s3bucket = tryCatch(

stateCases = tryCatch(
{
s3readRDS(object = "score_cards_state_cases.rds", bucket = s3bucket)
s3read_using(fread, object = "score_cards_state_cases.csv.gz", bucket = s3bucket)
},
error = function(e) {
e
}
)
```


or using the URL of the file:

```
library(data.table)
stateCases <- fread("https://forecast-eval.s3.us-east-2.amazonaws.com/score_cards_state_cases.csv.gz")
```


##### Forecasts with actuals
Expand Down
33 changes: 19 additions & 14 deletions app/assets/forecastsWithActuals.R
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
library(dplyr)
library(tidyr)
library(aws.s3)
library(data.table)

Sys.setenv("AWS_DEFAULT_REGION" = "us-east-2")
s3bucket <- tryCatch(
{
get_bucket(bucket = "forecast-eval")
aws.s3::get_bucket(bucket = "forecast-eval")
},
error = function(e) {
e
Expand All @@ -15,7 +16,7 @@ s3bucket <- tryCatch(
readbucket <- function(name) {
tryCatch(
{
s3readRDS(object = name, bucket = s3bucket)
s3read_using(fread, data.table = FALSE, object = name, bucket = s3bucket)
},
error = function(e) {
e
Expand All @@ -25,31 +26,31 @@ readbucket <- function(name) {

# Cases, deaths, hosp scores: needed for "actual"s
cases <- bind_rows(
readbucket("score_cards_nation_cases.rds"),
readbucket("score_cards_state_cases.rds")
readbucket("score_cards_nation_cases.csv.gz"),
readbucket("score_cards_state_cases.csv.gz")
)
deaths <- bind_rows(
readbucket("score_cards_nation_deaths.rds"),
readbucket("score_cards_state_deaths.rds")
readbucket("score_cards_nation_deaths.csv.gz"),
readbucket("score_cards_state_deaths.csv.gz")
)
hosp <- bind_rows(
readbucket("score_cards_nation_hospitalizations.rds"),
readbucket("score_cards_state_hospitalizations.rds")
readbucket("score_cards_nation_hospitalizations.csv.gz"),
readbucket("score_cards_state_hospitalizations.csv.gz")
)

# The big one: predictions from all forecasters
pred <- readbucket("predictions_cards.rds")

# Cases
pred <- readbucket("predictions_cards_confirmed_incidence_num.csv.gz")
pred_cases <- pred %>%
filter(signal == "confirmed_incidence_num") %>%
mutate(signal = NULL, data_source = NULL, incidence_period = NULL) %>%
pivot_wider(
names_from = quantile,
values_from = value,
names_prefix = "forecast_"
)

rm(pred)
gc()

actual_cases <- cases %>%
select(ahead, geo_value, forecaster, forecast_date, target_end_date, actual)

Expand All @@ -58,15 +59,18 @@ sum(is.na(actual_cases$actual)) == sum(is.na(joined_cases$actual))
write.csv(joined_cases, "cases.csv")

# Deaths
pred <- readbucket("predictions_cards_deaths_incidence_num.csv.gz")
pred_deaths <- pred %>%
filter(signal == "deaths_incidence_num") %>%
mutate(signal = NULL, data_source = NULL, incidence_period = NULL) %>%
pivot_wider(
names_from = quantile,
values_from = value,
names_prefix = "forecast_"
)

rm(pred)
gc()

actual_deaths <- deaths %>%
select(ahead, geo_value, forecaster, forecast_date, target_end_date, actual)

Expand All @@ -75,12 +79,13 @@ sum(is.na(actual_deaths$actual)) == sum(is.na(joined_deaths$actual))
write.csv(joined_deaths, "deaths.csv")

# Hospitalizations: break up by weeks since we run into memory errors o/w!
pred <- readbucket("predictions_cards_confirmed_admissions_covid_1d.csv.gz")
pred_hosp <- actual_hosp <- joined_hosp <- vector(mode = "list", length = 4)
for (k in 1:4) {
cat(k, "... ")
days <- (k - 1) * 7 + 1:7
pred_hosp[[k]] <- pred %>%
filter(signal == "confirmed_admissions_covid_1d", ahead %in% days) %>%
filter(ahead %in% days) %>%
mutate(signal = NULL, data_source = NULL, incidence_period = NULL) %>%
pivot_wider(
names_from = quantile,
Expand Down
1 change: 1 addition & 0 deletions app/server.R
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ server <- function(input, output, session) {
# Fill gaps so there are line breaks on weeks without data
# This is failing for CU-select on US deaths (https://github.com/cmu-delphi/forecast-eval/issues/157)
filteredScoreDf <- filteredScoreDf %>%
mutate(Week_End_Date = as.Date(Week_End_Date)) %>%
as_tsibble(key = c(Forecaster, ahead), index = Week_End_Date) %>%
group_by(Forecaster, Forecast_Date, ahead) %>%
fill_gaps(.full = TRUE)
Expand Down
4 changes: 2 additions & 2 deletions devops/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ RUN apt-get update && apt-get install -qq -y \
COPY devops/shiny_server.conf /etc/shiny-server/shiny-server.conf
WORKDIR /srv/shinyapp/
COPY DESCRIPTION ./
RUN Rscript -e "devtools::install_deps(dependencies = NA)"
COPY dist/*.rds ./
RUN Rscript -e "devtools::install_deps(dependencies = NA); install.packages('R.utils')"
COPY dist/*.csv.gz ./
COPY app/ ./
RUN chmod -R a+rw .