From 801217579fac7a5d88f4b8402950de7378f0db29 Mon Sep 17 00:00:00 2001 From: Hadley Wickham Date: Thu, 3 Jul 2025 09:20:03 -0500 Subject: [PATCH] Add support for `max_retries` to `req_perform_parallel()` And improve the progress bar. --- NEWS.md | 4 +- R/req-perform-parallel.R | 54 +++++++++++++--------- man/req_perform_parallel.Rd | 9 ++-- tests/testthat/test-req-perform-parallel.R | 25 +++++++++- 4 files changed, 61 insertions(+), 31 deletions(-) diff --git a/NEWS.md b/NEWS.md index b3896a09..3cb31bcd 100644 --- a/NEWS.md +++ b/NEWS.md @@ -58,7 +58,9 @@ control of whether a browser is opened or the URL is printed (@plietar, #763) * `req_perform_parallel()` handles `progress` argument consistently with other - functions (#726). + functions (#726) and now respects the `max_tries` argument to `req_retry()`. + (It also correctly reports the number of _pending_ retries, not total retries + in the progress bar.) * `req_url_query()` now re-calculates n lengths when using `.multi = "explode"` to avoid select/recycling issues (@Kevanness, #719). diff --git a/R/req-perform-parallel.R b/R/req-perform-parallel.R index 2781af26..ffab933b 100644 --- a/R/req-perform-parallel.R +++ b/R/req-perform-parallel.R @@ -8,7 +8,7 @@ #' While running, you'll get a progress bar that looks like: #' `[working] (1 + 4) -> 5 -> 5`. The string tells you the current status of #' the queue (e.g. working, waiting, errored) followed by (the -#' number of pending requests + pending retried requests) -> the number of +#' number of pending requests + pending retries) -> the number of #' active requests -> the number of complete requests. #' #' ## Limitations @@ -22,10 +22,9 @@ #' these limitation, but it's enough work that I'm unlikely to do it unless #' I know that people would fine it useful: so please let me know! #' -#' Additionally, it does not respect the `max_tries` argument to `req_retry()` -#' because if you have five requests in flight and the first one gets rate -#' limited, it's likely that all the others do too. This also means that -#' the circuit breaker is never triggered. +#' Additionally, while `req_perform_parallel()` respects the `max_tries` +#' argument to `req_retry()`, it does not currently respect `max_seconds`. +#' Additionally, it does not trigger or respect the circuit breaker. #' #' @inherit req_perform_sequential params return #' @param max_active Maximum number of concurrent requests. @@ -124,7 +123,7 @@ RequestQueue <- R6::R6Class( n_pending = 0, n_active = 0, n_complete = 0, - n_retries = 0, + n_retrying = 0, on_error = "stop", mock = NULL, progress = NULL, @@ -155,8 +154,8 @@ RequestQueue <- R6::R6Class( progress, total = n, format = paste0( - "[{self$queue_status}] ", - "({self$n_pending} + {self$n_retries}) -> {self$n_active} -> {self$n_complete} | ", + "[{self$queue_status} {cli::pb_spin}] ", + "({self$n_pending} + {self$n_retrying}) -> {self$n_active} -> {self$n_complete} | ", "{cli::pb_bar} {cli::pb_percent}" ), frame = frame @@ -176,6 +175,7 @@ RequestQueue <- R6::R6Class( self$queue_status <- "working" self$n_pending <- n + self$n_retrying <- 0 self$n_active <- 0 self$n_complete <- 0 @@ -236,9 +236,11 @@ RequestQueue <- R6::R6Class( ) NULL } else if (self$queue_status == "working") { - if (self$n_pending == 0 && self$n_active == 0) { + to_do <- self$n_pending + self$n_retrying + + if (to_do == 0 && self$n_active == 0) { self$queue_status <- "done" - } else if (self$n_pending > 0 && self$n_active <= self$max_active) { + } else if (to_do > 0 && self$n_active <= self$max_active) { if (!self$submit_next(deadline)) { self$queue_status <- "waiting" } @@ -258,7 +260,7 @@ RequestQueue <- R6::R6Class( }, submit_next = function(deadline) { - i <- which(self$status == "pending")[[1]] + i <- which(self$status %in% c("pending", "retrying"))[[1]] self$token_deadline <- throttle_deadline(self$reqs[[i]]) if (self$token_deadline > unix_time()) { @@ -293,24 +295,29 @@ RequestQueue <- R6::R6Class( req <- self$reqs[[i]] resp <- error$resp self$resps[[i]] <- error - tries <- self$tries[[i]] if (retry_is_transient(req, resp) && self$can_retry(i)) { - delay <- retry_after(req, resp, tries) - self$rate_limit_deadline <- unix_time() + delay - - self$set_status(i, "pending") - self$n_retries <- self$n_retries + 1 + self$set_status(i, "retrying") self$queue_status <- "waiting" + + delay <- retry_after(req, resp, self$tries[[i]]) + signal(class = "httr2_retry", tries = self$tries[[i]], delay = delay) + + self$rate_limit_deadline <- max( + self$rate_limit_deadline, + unix_time() + delay + ) } else if (resp_is_invalid_oauth_token(req, resp) && self$can_reauth(i)) { + self$set_status(i, "retrying") + # This isn't quite right, because if there are (e.g.) four requests in # the queue and the first one fails, we'll clear the cache for all four, # causing a token refresh more often than necessary. This shouldn't # affect correctness, but it does make it slower than necessary. self$oauth_failed <- c(self$oauth_failed, i) req_auth_clear_cache(self$reqs[[i]]) - self$set_status(i, "pending") - self$n_retries <- self$n_retries + 1 + # Don't count this as a retry for the purpose of limiting retries + self$tries[[i]] <- self$tries[[i]] - 1 } else { self$set_status(i, "complete") if (self$on_error != "continue") { @@ -323,21 +330,22 @@ RequestQueue <- R6::R6Class( switch( self$status[[i]], # old status pending = self$n_pending <- self$n_pending - 1, - active = self$n_active <- self$n_active - 1 + active = self$n_active <- self$n_active - 1, + retrying = self$n_retrying <- self$n_retrying - 1 ) switch( status, # new status pending = self$n_pending <- self$n_pending + 1, active = self$n_active <- self$n_active + 1, - complete = self$n_complete <- self$n_complete + 1 + complete = self$n_complete <- self$n_complete + 1, + retrying = self$n_retrying <- self$n_retrying + 1 ) self$status[[i]] <- status }, can_retry = function(i) { - TRUE - # self$tries[[i]] < retry_max_tries(self$reqs[[i]]) + self$tries[[i]] < retry_max_tries(self$reqs[[i]]) }, can_reauth = function(i) { !i %in% self$oauth_failed diff --git a/man/req_perform_parallel.Rd b/man/req_perform_parallel.Rd index c5be53a5..13134e18 100644 --- a/man/req_perform_parallel.Rd +++ b/man/req_perform_parallel.Rd @@ -59,7 +59,7 @@ pummel a server with a very large number of simultaneous requests. While running, you'll get a progress bar that looks like: \verb{[working] (1 + 4) -> 5 -> 5}. The string tells you the current status of the queue (e.g. working, waiting, errored) followed by (the -number of pending requests + pending retried requests) -> the number of +number of pending requests + pending retries) -> the number of active requests -> the number of complete requests. \subsection{Limitations}{ @@ -72,10 +72,9 @@ host, rather than a mix of different hosts. It's probably possible to remove these limitation, but it's enough work that I'm unlikely to do it unless I know that people would fine it useful: so please let me know! -Additionally, it does not respect the \code{max_tries} argument to \code{req_retry()} -because if you have five requests in flight and the first one gets rate -limited, it's likely that all the others do too. This also means that -the circuit breaker is never triggered. +Additionally, while \code{req_perform_parallel()} respects the \code{max_tries} +argument to \code{req_retry()}, it does not currently respect \code{max_seconds}. +Additionally, it does not trigger or respect the circuit breaker. } } \examples{ diff --git a/tests/testthat/test-req-perform-parallel.R b/tests/testthat/test-req-perform-parallel.R index 11fca536..364b7b67 100644 --- a/tests/testthat/test-req-perform-parallel.R +++ b/tests/testthat/test-req-perform-parallel.R @@ -181,6 +181,27 @@ test_that("requests are throttled", { expect_equal(mock_time, 4) }) +test_that("respects max_tries", { + three_tries <- function(req, res) { + if (res$app$locals$i < 3) { + res$set_status(429)$set_header("retry-after", 0)$send_json(FALSE) + } else { + res$send_json(TRUE) + } + } + + req <- local_app_request(three_tries) + out <- req_perform_parallel(list(req), on_error = "return") + expect_s3_class(out[[1]], "httr2_http_429") + + req <- req_retry(local_app_request(three_tries), max_tries = 2) + out <- req_perform_parallel(list(req), on_error = "return") + expect_s3_class(out[[1]], "httr2_http_429") + + req <- req_retry(local_app_request(three_tries), max_tries = 10) + out <- req_perform_parallel(list(req), on_error = "return") + expect_s3_class(out[[1]], "httr2_response") +}) # Tests of lower-level operation ----------------------------------------------- @@ -255,7 +276,7 @@ test_that("can retry a transient error", { expect_null(queue$process1()) expect_equal(queue$queue_status, "waiting") expect_equal(queue$rate_limit_deadline, mock_time + 2) - expect_equal(queue$n_pending, 1) + expect_equal(queue$n_retrying, 1) expect_s3_class(queue$resps[[1]], "httr2_http_429") expect_equal(resp_body_json(queue$resps[[1]]$resp), list(status = "waiting")) @@ -268,7 +289,7 @@ test_that("can retry a transient error", { expect_null(queue$process1()) expect_equal(queue$queue_status, "working") expect_equal(queue$n_active, 0) - expect_equal(queue$n_pending, 1) + expect_equal(queue$n_retrying, 1) # Resubmit expect_null(queue$process1())