Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 0 additions & 70 deletions python/ray/tests/test_metrics_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -620,76 +620,6 @@ def test_case_publisher_specific_metrics_correct(publisher_name: str):
)


def test_operation_stats(monkeypatch, shutdown_only):
# Test operation stats are available when flag is on.
operation_metrics = [
"ray_operation_count_total",
"ray_operation_run_time_ms_bucket",
"ray_operation_queue_time_ms_bucket",
"ray_operation_active_count",
]

monkeypatch.setenv("RAY_emit_main_service_metrics", "1")
timeseries = PrometheusTimeseries()
addr = ray.init()
remote_signal = SignalActor.remote()

@ray.remote
class Actor:
def __init__(self, signal):
self.signal = signal

def get_worker_id(self):
return ray.get_runtime_context().get_worker_id()

def wait(self):
ray.get(self.signal.wait.remote())

actor = Actor.remote(remote_signal)
ray.get(actor.get_worker_id.remote())
obj_ref = actor.wait.remote()

ray.get(remote_signal.send.remote())
ray.get(obj_ref)

def verify():
metrics = raw_metric_timeseries(addr, timeseries)

samples = metrics["ray_operation_active_count"]
found = False
for sample in samples:
if (
sample.labels["Name"] == "gcs_server_main_io_context"
and sample.labels["Component"] == "gcs_server"
):
found = True
if not found:
return False

found = False
for sample in samples:
if (
sample.labels["Name"] == "raylet_main_io_context"
and sample.labels["Component"] == "raylet"
):
found = True
if not found:
return False

metric_names = set(metrics.keys())
for op_metric in operation_metrics:
assert op_metric in metric_names
samples = metrics[op_metric]
components = set()
print(components)
for sample in samples:
components.add(sample.labels["Component"])
assert {"raylet", "gcs_server"} == components
return True

wait_for_condition(verify, timeout=30)


@pytest.mark.skipif(prometheus_client is None, reason="Prometheus not installed")
@pytest.mark.parametrize("_setup_cluster_for_test", [True], indirect=True)
def test_histogram(_setup_cluster_for_test):
Expand Down
1 change: 1 addition & 0 deletions release/release_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2953,6 +2953,7 @@
type: gpu
runtime_env:
- LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so
- RAY_emit_main_service_metrics=1
cluster_compute: distributed.yaml

run:
Expand Down
52 changes: 39 additions & 13 deletions src/ray/common/asio/instrumented_io_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,20 @@

namespace {

// Dedicated io_context for lag probe timer scheduling.
// This prevents timer scheduling overhead from affecting the lag measurements
// of the main io_context being monitored.
boost::asio::io_context &GetLagProbeTimerIOContext() {
static boost::asio::io_context timer_io_context;
static boost::asio::executor_work_guard<boost::asio::io_context::executor_type> work =
boost::asio::make_work_guard(timer_io_context);
static std::thread timer_thread([]() {
SetThreadName("LagProbeTimer");
timer_io_context.run();
});
return timer_io_context;
}

// Post a probe. Records the lag and schedule another probe.
// Requires: `interval_ms` > 0.
void LagProbeLoop(instrumented_io_context &io_context,
Expand All @@ -35,26 +49,38 @@ void LagProbeLoop(instrumented_io_context &io_context,
auto end = std::chrono::steady_clock::now();
auto duration =
std::chrono::duration_cast<std::chrono::milliseconds>(end - begin);
ray::stats::STATS_io_context_event_loop_lag_ms.Record(
duration.count(),
{
{"Name", context_name.value_or(GetThreadName())},
});
//ray::stats::STATS_io_context_event_loop_lag_ms.Record(
// duration.count(),
// {
// {"Name", context_name.value_or(GetThreadName())},
// });

// Schedule the next probe. If `duration` is larger than `interval_ms`, we
// should schedule the next probe immediately. Otherwise, we should wait
// for `interval_ms - duration`.
auto delay = interval_ms - duration.count();
if (delay <= 0) {
LagProbeLoop(io_context, interval_ms, context_name);
} else {
execute_after(
io_context,
[&io_context, interval_ms, context_name]() {
LagProbeLoop(io_context, interval_ms, context_name);
},
std::chrono::milliseconds(delay));
delay = 1;
}
// LagProbeLoop(io_context, interval_ms, context_name);
// } else {
// Use the dedicated timer io_context for scheduling to avoid timer
// overhead on the io_context being measured.
auto timer = std::make_shared<boost::asio::deadline_timer>(
GetLagProbeTimerIOContext());
timer->expires_from_now(boost::posix_time::milliseconds(delay));
timer->async_wait([timer, &io_context, interval_ms, context_name](
const boost::system::error_code &error) {
if (error != boost::asio::error::operation_aborted) {
// Post back to the main io_context to continue the probe loop
io_context.post(
[&io_context, interval_ms, context_name]() {
LagProbeLoop(io_context, interval_ms, context_name);
},
"event_loop_lag_probe");
}
});
// }
},
"event_loop_lag_probe");
}
Expand Down