Skip to content

Commit

Permalink
Replace worker process when memory pressure high (#109)
Browse files Browse the repository at this point in the history
* Replace worker process when memory pressure high

* Make mem limit configurable

* Add to docstring

* Add tests

* fixup! Make mem limit configurable

* Bump version

* Update src/ReTestItems.jl
  • Loading branch information
nickrobinson251 authored Sep 1, 2023
1 parent 0c4a9d8 commit ea52cd5
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 12 deletions.
2 changes: 1 addition & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name = "ReTestItems"
uuid = "817f1d60-ba6b-4fd5-9520-3cf149f6a823"
version = "1.17.0"
version = "1.18.0"

[deps]
Dates = "ade2ca70-3891-5945-98fb-dc099432e06a"
Expand Down
36 changes: 27 additions & 9 deletions src/ReTestItems.jl
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ export TestSetup, TestItem, TestItemResult
const RETESTITEMS_TEMP_FOLDER = mkpath(joinpath(tempdir(), "ReTestItemsTempLogsDirectory"))
const DEFAULT_TESTITEM_TIMEOUT = 30*60
const DEFAULT_RETRIES = 0
const DEFAULT_MEMORY_THRESHOLD = 0.99

if isdefined(Base, :errormonitor)
const errmon = Base.errormonitor
Expand Down Expand Up @@ -139,6 +140,14 @@ will be run.
Can be used to load packages or set up the environment. Must be a `:block` expression.
- `test_end_expr::Expr`: an expression that will be evaluated after each testitem is run.
Can be used to verify that global state is unchanged after running a test. Must be a `:block` expression.
- `memory_threshold::Real`: Sets the fraction of memory that can be in use before a worker processes are
restarted to free memory. Defaults to $DEFAULT_MEMORY_THRESHOLD. Only supported with `nworkers > 0`.
For example, if set to 0.8, then when >80% of the available memory is in use, a worker process will be killed and
replaced with a new worker before the next testitem is evaluated. The testitem will then be run on the new worker
process, regardless of if memory pressure dropped below the threshold. If the memory pressure remains above the
threshold, then a worker process will again be replaced before the next testitem is evaluated.
Can also be set using the `RETESTITEMS_MEMORY_THRESHOLD` environment variable.
**Note**: the `memory_threshold` keyword is experimental and may be removed in future versions.
- `report::Bool=false`: If `true`, write a JUnit-format XML file summarising the test results.
Can also be set using the `RETESTITEMS_REPORT` environment variable. The location at which
the XML report is saved can be set using the `RETESTITEMS_REPORT_LOCATION` environment variable.
Expand Down Expand Up @@ -189,6 +198,7 @@ function runtests(
worker_init_expr::Expr=Expr(:block),
testitem_timeout::Real=parse(Float64, get(ENV, "RETESTITEMS_TESTITEM_TIMEOUT", string(DEFAULT_TESTITEM_TIMEOUT))),
retries::Int=parse(Int, get(ENV, "RETESTITEMS_RETRIES", string(DEFAULT_RETRIES))),
memory_threshold::Real=parse(Float64, get(ENV, "RETESTITEMS_MEMORY_THRESHOLD", string(DEFAULT_MEMORY_THRESHOLD))),
debug=0,
name::Union{Regex,AbstractString,Nothing}=nothing,
tags::Union{Symbol,AbstractVector{Symbol},Nothing}=nothing,
Expand All @@ -211,6 +221,7 @@ function runtests(
end
logs in LOG_DISPLAY_MODES || throw(ArgumentError("`logs` must be one of $LOG_DISPLAY_MODES, got $(repr(logs))"))
report && logs == :eager && throw(ArgumentError("`report=true` is not compatible with `logs=:eager`"))
(0 memory_threshold 1) || throw(ArgumentError("`memory_threshold` must be between 0 and 1, got $(repr(memory_threshold))"))
# If we were given paths but none were valid, then nothing to run.
!isempty(paths) && isempty(paths′) && return nothing
shouldrun_combined(ti) = shouldrun(ti) && _shouldrun(name, ti.name) && _shouldrun(tags, ti.tags)
Expand All @@ -221,10 +232,10 @@ function runtests(
debuglvl = Int(debug)
if debuglvl > 0
LoggingExtras.withlevel(LoggingExtras.Debug; verbosity=debuglvl) do
_runtests(shouldrun_combined, paths′, nworkers, nworker_threads, worker_init_expr, test_end_expr, testitem_timeout, retries, verbose_results, debuglvl, report, logs)
_runtests(shouldrun_combined, paths′, nworkers, nworker_threads, worker_init_expr, test_end_expr, testitem_timeout, retries, memory_threshold, verbose_results, debuglvl, report, logs)
end
else
return _runtests(shouldrun_combined, paths′, nworkers, nworker_threads, worker_init_expr, test_end_expr, testitem_timeout, retries, verbose_results, debuglvl, report, logs)
return _runtests(shouldrun_combined, paths′, nworkers, nworker_threads, worker_init_expr, test_end_expr, testitem_timeout, retries, memory_threshold, verbose_results, debuglvl, report, logs)
end
end

Expand All @@ -238,7 +249,7 @@ end
# By tracking and reusing test environments, we can avoid this issue.
const TEST_ENVS = Dict{String, String}()

function _runtests(shouldrun, paths, nworkers::Int, nworker_threads::String, worker_init_expr::Expr, test_end_expr::Expr, testitem_timeout::Real, retries::Int, verbose_results::Bool, debug::Int, report::Bool, logs::Symbol)
function _runtests(shouldrun, paths, nworkers::Int, nworker_threads::String, worker_init_expr::Expr, test_end_expr::Expr, testitem_timeout::Real, retries::Int, memory_threshold::Real, verbose_results::Bool, debug::Int, report::Bool, logs::Symbol)
# Don't recursively call `runtests` e.g. if we `include` a file which calls it.
# So we ignore the `runtests(...)` call in `test/runtests.jl` when `runtests(...)`
# was called from the command line.
Expand All @@ -258,7 +269,7 @@ function _runtests(shouldrun, paths, nworkers::Int, nworker_threads::String, wor
if is_running_test_runtests_jl(proj_file)
# Assume this is `Pkg.test`, so test env already active.
@debugv 2 "Running in current environment `$(Base.active_project())`"
return _runtests_in_current_env(shouldrun, paths, proj_file, nworkers, nworker_threads, worker_init_expr, test_end_expr, testitem_timeout, retries, verbose_results, debug, report, logs)
return _runtests_in_current_env(shouldrun, paths, proj_file, nworkers, nworker_threads, worker_init_expr, test_end_expr, testitem_timeout, retries, memory_threshold, verbose_results, debug, report, logs)
else
@debugv 1 "Activating test environment for `$proj_file`"
orig_proj = Base.active_project()
Expand All @@ -271,7 +282,7 @@ function _runtests(shouldrun, paths, nworkers::Int, nworker_threads::String, wor
testenv = TestEnv.activate()
TEST_ENVS[proj_file] = testenv
end
_runtests_in_current_env(shouldrun, paths, proj_file, nworkers, nworker_threads, worker_init_expr, test_end_expr, testitem_timeout, retries, verbose_results, debug, report, logs)
_runtests_in_current_env(shouldrun, paths, proj_file, nworkers, nworker_threads, worker_init_expr, test_end_expr, testitem_timeout, retries, memory_threshold, verbose_results, debug, report, logs)
finally
Base.set_active_project(orig_proj)
end
Expand All @@ -281,7 +292,7 @@ end

function _runtests_in_current_env(
shouldrun, paths, projectfile::String, nworkers::Int, nworker_threads, worker_init_expr::Expr, test_end_expr::Expr,
testitem_timeout::Real, retries::Int, verbose_results::Bool, debug::Int, report::Bool, logs::Symbol,
testitem_timeout::Real, retries::Int, memory_threshold::Real, verbose_results::Bool, debug::Int, report::Bool, logs::Symbol,
)
start_time = time()
proj_name = something(Pkg.Types.read_project(projectfile).name, "")
Expand Down Expand Up @@ -346,7 +357,7 @@ function _runtests_in_current_env(
ti = starting[i]
@spawn begin
with_logger(original_logger) do
manage_worker($w, $proj_name, $testitems, $ti, $nworker_threads, $worker_init_expr, $test_end_expr, $testitem_timeout, $retries, $verbose_results, $debug, $report, $logs)
manage_worker($w, $proj_name, $testitems, $ti, $nworker_threads, $worker_init_expr, $test_end_expr, $testitem_timeout, $retries, $memory_threshold, $verbose_results, $debug, $report, $logs)
end
end
end
Expand Down Expand Up @@ -454,13 +465,20 @@ function record_test_error!(testitem, msg, elapsed_seconds::Real=0.0)
end

function manage_worker(
worker::Worker, proj_name, testitems, testitem, nworker_threads, worker_init_expr::Expr, test_end_expr::Expr,
timeout::Real, retries::Int, verbose_results::Bool, debug::Int, report::Bool, logs::Symbol
worker::Worker, proj_name::AbstractString, testitems::TestItems, testitem::Union{TestItem,Nothing}, nworker_threads, worker_init_expr::Expr, test_end_expr::Expr,
timeout::Real, retries::Int, memory_threshold::Real, verbose_results::Bool, debug::Int, report::Bool, logs::Symbol
)
ntestitems = length(testitems.testitems)
run_number = 1
memory_threshold_percent = 100*memory_threshold
while testitem !== nothing
ch = Channel{TestItemResult}(1)
if memory_percent() > memory_threshold_percent
@warn "Memory usage ($(Base.Ryu.writefixed(memory_percent(), 1))%) is higher than threshold ($(Base.Ryu.writefixed(memory_threshold_percent, 1))%). Restarting worker process to try to free memory."
terminate!(worker)
wait(worker)
worker = robust_start_worker(proj_name, nworker_threads, worker_init_expr, ntestitems)
end
testitem.workerid[] = worker.pid
fut = remote_eval(worker, :(ReTestItems.runtestitem($testitem, GLOBAL_TEST_CONTEXT; test_end_expr=$(QuoteNode(test_end_expr)), verbose_results=$verbose_results, logs=$(QuoteNode(logs)))))
max_runs = 1 + max(retries, testitem.retries)
Expand Down
6 changes: 4 additions & 2 deletions src/log_capture.jl
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,13 @@ _has_logs(ti::TestItem, i=nothing) = (path = logpath(ti, i); (isfile(path) && fi
# Stats to help diagnose OOM issues.
_mem_watermark() = string(
# Tracks the peak memory usage of a process / worker
"maxrss ", lpad(Base.Ryu.writefixed(100 * Float64(Sys.maxrss()/Sys.total_memory()), 1), 4),
"maxrss ", lpad(Base.Ryu.writefixed(maxrss_percent(), 1), 4),
# Total memory pressure on the machine
"% | mem ", lpad(Base.Ryu.writefixed(100 * Float64(1 - (Sys.free_memory()/Sys.total_memory())), 1), 4),
"% | mem ", lpad(Base.Ryu.writefixed(memory_percent(), 1), 4),
"% | "
)
maxrss_percent() = 100 * Float64(Sys.maxrss()/Sys.total_memory())
memory_percent() = 100 * Float64(1 - (Sys.free_memory()/Sys.total_memory()))

"""
print_errors_and_captured_logs(ti::TestItem, run_number::Int; logs=:batched, errors_first=false)
Expand Down
45 changes: 45 additions & 0 deletions test/integrationtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -946,4 +946,49 @@ end
end
end

@testset "Replace workers when we hit memory threshold" begin
using IOCapture
file = joinpath(TEST_FILES_DIR, "_happy_tests.jl")
try
# monkey-patch the internal `memory_percent` function to return a fixed value, so we
# can control if we hit the `memory_threshold`.
@eval ReTestItems.memory_percent() = 83.1
expected_warning = "Warning: Memory usage (83.1%) is higher than threshold (7.0%). Restarting worker process to try to free memory."

# Pass `memory_threshold` keyword, and hit the memory threshold.
c1 = IOCapture.capture() do
encased_testset(()->runtests(file; nworkers=1, memory_threshold=0.07))
end
results1 = c1.value
@test all_passed(results1)
@test contains(c1.output, expected_warning)

# Set the `RETESTITEMS_MEMORY_THRESHOLD` env variable, and hit the memory threshold.
c2 = IOCapture.capture() do
withenv("RETESTITEMS_MEMORY_THRESHOLD" => 0.07) do
encased_testset(()->runtests(file; nworkers=1))
end
end
results2 = c2.value
@test all_passed(results2)
@test contains(c2.output, expected_warning)

# Set the memory_threshold, but don't hit it.
c3 = IOCapture.capture() do
withenv("RETESTITEMS_MEMORY_THRESHOLD" => 0.9) do
encased_testset(()->runtests(file; nworkers=1))
end
end
results3 = c3.value
@test all_passed(results3)
@test !contains(c3.output, expected_warning)
finally
@eval ReTestItems.memory_percent() = 100 * Float64(Sys.maxrss()/Sys.total_memory())
end
xx = 99
err_msg = "ArgumentError: `memory_threshold` must be between 0 and 1, got $xx"
expected_err = VERSION < v"1.8" ? ArgumentError : err_msg
@test_throws expected_err runtests(file; nworkers=1, memory_threshold=xx)
end

end # integrationtests.jl testset

2 comments on commit ea52cd5

@nickrobinson251
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JuliaRegistrator
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Registration pull request created: JuliaRegistries/General/90616

After the above pull request is merged, it is recommended that a tag is created on this repository for the registered package version.

This will be done automatically if the Julia TagBot GitHub Action is installed, or can be done manually through the github interface, or via:

git tag -a v1.18.0 -m "<description of version>" ea52cd566da748d12e6d7add15c6e96ebcb8ae94
git push origin v1.18.0

Please sign in to comment.