Skip to content

Add support for max_retries to req_perform_parallel() #769

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@
control of whether a browser is opened or the URL is printed (@plietar, #763)

* `req_perform_parallel()` handles `progress` argument consistently with other
functions (#726).
functions (#726) and now respects the `max_tries` argument to `req_retry()`.
(It also correctly reports the number of _pending_ retries, not total retries
in the progress bar.)

* `req_url_query()` now re-calculates n lengths when using `.multi = "explode"`
to avoid select/recycling issues (@Kevanness, #719).
Expand Down
54 changes: 31 additions & 23 deletions R/req-perform-parallel.R
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
#' While running, you'll get a progress bar that looks like:
#' `[working] (1 + 4) -> 5 -> 5`. The string tells you the current status of
#' the queue (e.g. working, waiting, errored) followed by (the
#' number of pending requests + pending retried requests) -> the number of
#' number of pending requests + pending retries) -> the number of
#' active requests -> the number of complete requests.
#'
#' ## Limitations
Expand All @@ -22,10 +22,9 @@
#' these limitation, but it's enough work that I'm unlikely to do it unless
#' I know that people would fine it useful: so please let me know!
#'
#' Additionally, it does not respect the `max_tries` argument to `req_retry()`
#' because if you have five requests in flight and the first one gets rate
#' limited, it's likely that all the others do too. This also means that
#' the circuit breaker is never triggered.
#' Additionally, while `req_perform_parallel()` respects the `max_tries`
#' argument to `req_retry()`, it does not currently respect `max_seconds`.
#' Additionally, it does not trigger or respect the circuit breaker.
#'
#' @inherit req_perform_sequential params return
#' @param max_active Maximum number of concurrent requests.
Expand Down Expand Up @@ -124,7 +123,7 @@
n_pending = 0,
n_active = 0,
n_complete = 0,
n_retries = 0,
n_retrying = 0,
on_error = "stop",
mock = NULL,
progress = NULL,
Expand Down Expand Up @@ -155,8 +154,8 @@
progress,
total = n,
format = paste0(
"[{self$queue_status}] ",
"({self$n_pending} + {self$n_retries}) -> {self$n_active} -> {self$n_complete} | ",
"[{self$queue_status} {cli::pb_spin}] ",
"({self$n_pending} + {self$n_retrying}) -> {self$n_active} -> {self$n_complete} | ",
"{cli::pb_bar} {cli::pb_percent}"
),
frame = frame
Expand All @@ -176,6 +175,7 @@

self$queue_status <- "working"
self$n_pending <- n
self$n_retrying <- 0
self$n_active <- 0
self$n_complete <- 0

Expand Down Expand Up @@ -236,9 +236,11 @@
)
NULL
} else if (self$queue_status == "working") {
if (self$n_pending == 0 && self$n_active == 0) {
to_do <- self$n_pending + self$n_retrying

if (to_do == 0 && self$n_active == 0) {
self$queue_status <- "done"
} else if (self$n_pending > 0 && self$n_active <= self$max_active) {
} else if (to_do > 0 && self$n_active <= self$max_active) {
if (!self$submit_next(deadline)) {
self$queue_status <- "waiting"
}
Expand All @@ -258,7 +260,7 @@
},

submit_next = function(deadline) {
i <- which(self$status == "pending")[[1]]
i <- which(self$status %in% c("pending", "retrying"))[[1]]

self$token_deadline <- throttle_deadline(self$reqs[[i]])
if (self$token_deadline > unix_time()) {
Expand Down Expand Up @@ -293,24 +295,29 @@
req <- self$reqs[[i]]
resp <- error$resp
self$resps[[i]] <- error
tries <- self$tries[[i]]

if (retry_is_transient(req, resp) && self$can_retry(i)) {
delay <- retry_after(req, resp, tries)
self$rate_limit_deadline <- unix_time() + delay

self$set_status(i, "pending")
self$n_retries <- self$n_retries + 1
self$set_status(i, "retrying")

Check warning on line 300 in R/req-perform-parallel.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-parallel.R#L300

Added line #L300 was not covered by tests
self$queue_status <- "waiting"

delay <- retry_after(req, resp, self$tries[[i]])
signal(class = "httr2_retry", tries = self$tries[[i]], delay = delay)

Check warning on line 304 in R/req-perform-parallel.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-parallel.R#L303-L304

Added lines #L303 - L304 were not covered by tests

self$rate_limit_deadline <- max(
self$rate_limit_deadline,
unix_time() + delay
)

Check warning on line 309 in R/req-perform-parallel.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-parallel.R#L306-L309

Added lines #L306 - L309 were not covered by tests
} else if (resp_is_invalid_oauth_token(req, resp) && self$can_reauth(i)) {
self$set_status(i, "retrying")

Check warning on line 311 in R/req-perform-parallel.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-parallel.R#L311

Added line #L311 was not covered by tests

# This isn't quite right, because if there are (e.g.) four requests in
# the queue and the first one fails, we'll clear the cache for all four,
# causing a token refresh more often than necessary. This shouldn't
# affect correctness, but it does make it slower than necessary.
self$oauth_failed <- c(self$oauth_failed, i)
req_auth_clear_cache(self$reqs[[i]])
self$set_status(i, "pending")
self$n_retries <- self$n_retries + 1
# Don't count this as a retry for the purpose of limiting retries
self$tries[[i]] <- self$tries[[i]] - 1

Check warning on line 320 in R/req-perform-parallel.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-parallel.R#L320

Added line #L320 was not covered by tests
} else {
self$set_status(i, "complete")
if (self$on_error != "continue") {
Expand All @@ -323,21 +330,22 @@
switch(
self$status[[i]], # old status
pending = self$n_pending <- self$n_pending - 1,
active = self$n_active <- self$n_active - 1
active = self$n_active <- self$n_active - 1,
retrying = self$n_retrying <- self$n_retrying - 1

Check warning on line 334 in R/req-perform-parallel.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-parallel.R#L334

Added line #L334 was not covered by tests
)
switch(
status, # new status
pending = self$n_pending <- self$n_pending + 1,
active = self$n_active <- self$n_active + 1,
complete = self$n_complete <- self$n_complete + 1
complete = self$n_complete <- self$n_complete + 1,
retrying = self$n_retrying <- self$n_retrying + 1

Check warning on line 341 in R/req-perform-parallel.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-parallel.R#L341

Added line #L341 was not covered by tests
)

self$status[[i]] <- status
},

can_retry = function(i) {
TRUE
# self$tries[[i]] < retry_max_tries(self$reqs[[i]])
self$tries[[i]] < retry_max_tries(self$reqs[[i]])

Check warning on line 348 in R/req-perform-parallel.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-parallel.R#L348

Added line #L348 was not covered by tests
},
can_reauth = function(i) {
!i %in% self$oauth_failed
Expand Down
9 changes: 4 additions & 5 deletions man/req_perform_parallel.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 23 additions & 2 deletions tests/testthat/test-req-perform-parallel.R
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,27 @@ test_that("requests are throttled", {
expect_equal(mock_time, 4)
})

test_that("respects max_tries", {
three_tries <- function(req, res) {
if (res$app$locals$i < 3) {
res$set_status(429)$set_header("retry-after", 0)$send_json(FALSE)
} else {
res$send_json(TRUE)
}
}

req <- local_app_request(three_tries)
out <- req_perform_parallel(list(req), on_error = "return")
expect_s3_class(out[[1]], "httr2_http_429")

req <- req_retry(local_app_request(three_tries), max_tries = 2)
out <- req_perform_parallel(list(req), on_error = "return")
expect_s3_class(out[[1]], "httr2_http_429")

req <- req_retry(local_app_request(three_tries), max_tries = 10)
out <- req_perform_parallel(list(req), on_error = "return")
expect_s3_class(out[[1]], "httr2_response")
})

# Tests of lower-level operation -----------------------------------------------

Expand Down Expand Up @@ -255,7 +276,7 @@ test_that("can retry a transient error", {
expect_null(queue$process1())
expect_equal(queue$queue_status, "waiting")
expect_equal(queue$rate_limit_deadline, mock_time + 2)
expect_equal(queue$n_pending, 1)
expect_equal(queue$n_retrying, 1)
expect_s3_class(queue$resps[[1]], "httr2_http_429")
expect_equal(resp_body_json(queue$resps[[1]]$resp), list(status = "waiting"))

Expand All @@ -268,7 +289,7 @@ test_that("can retry a transient error", {
expect_null(queue$process1())
expect_equal(queue$queue_status, "working")
expect_equal(queue$n_active, 0)
expect_equal(queue$n_pending, 1)
expect_equal(queue$n_retrying, 1)

# Resubmit
expect_null(queue$process1())
Expand Down