Skip to content

Commit

Permalink
Merge pull request #1144 from milroy/issue-1137
Browse files Browse the repository at this point in the history
Improve sched.resource-status RPC and search performance
  • Loading branch information
mergify[bot] authored Mar 1, 2024
2 parents 2c61dcf + 568f27e commit f9dc433
Show file tree
Hide file tree
Showing 7 changed files with 177 additions and 75 deletions.
149 changes: 87 additions & 62 deletions resource/modules/resource_match.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,15 @@ struct resource_ctx_t : public resource_interface_t {
std::map<uint64_t, uint64_t> allocations; /* Allocation table */
std::map<uint64_t, uint64_t> reservations; /* Reservation table */
std::map<std::string, std::shared_ptr<msg_wrap_t>> notify_msgs;
bool m_resources_updated = true; /* resources have been updated */
bool m_resources_down_updated = true; /* down resources have been updated */
/* last time allocated resources search updated */
std::chrono::time_point<
std::chrono::system_clock> m_resources_alloc_updated;
/* R caches */
json_t *m_r_all;
json_t *m_r_down;
json_t *m_r_alloc;
};

msg_wrap_t::msg_wrap_t (const msg_wrap_t &o)
Expand Down Expand Up @@ -296,13 +305,6 @@ static const struct flux_msg_handler_spec htab[] = {
FLUX_MSGHANDLER_TABLE_END
};

static double get_elapse_time (timeval &st, timeval &et)
{
double ts1 = (double)st.tv_sec + (double)st.tv_usec/1000000.0f;
double ts2 = (double)et.tv_sec + (double)et.tv_usec/1000000.0f;
return ts2 - ts1;
}

/******************************************************************************
* *
* Module Initialization Routines *
Expand All @@ -317,6 +319,7 @@ static void set_default_args (std::shared_ptr<resource_ctx_t> &ctx)
ct_opts.set_match_policy ("first");
ct_opts.set_prune_filters ("ALL:core");
ct_opts.set_match_format ("rv1_nosched");
ct_opts.set_update_interval (0);
ctx->opts += ct_opts;
}

Expand Down Expand Up @@ -348,6 +351,13 @@ static std::shared_ptr<resource_ctx_t> getctx (flux_t *h)
ctx->fgraph = nullptr; /* Cannot be allocated at this point */
ctx->writers = nullptr; /* Cannot be allocated at this point */
ctx->reader = nullptr; /* Cannot be allocated at this point */
ctx->m_r_all = nullptr;
ctx->m_r_down = nullptr;
ctx->m_r_alloc = nullptr;
ctx->m_resources_updated = true;
ctx->m_resources_down_updated = true;
//gettimeofday (&(ctx->m_resources_alloc_updated), NULL);
ctx->m_resources_alloc_updated = std::chrono::system_clock::now ();
}

done:
Expand Down Expand Up @@ -1110,6 +1120,7 @@ static int grow_resource_db (std::shared_ptr<resource_ctx_t> &ctx,
}
}
ctx->db->metadata.set_graph_duration (duration);
ctx->m_resources_updated = true;

done:
idset_destroy (grow_set);
Expand Down Expand Up @@ -1202,6 +1213,10 @@ static int mark_now (std::shared_ptr<resource_ctx_t> &ctx,
flux_log (ctx->h, LOG_DEBUG,
"resource status changed (rankset=[%s] status=%s)",
ids, resource_pool_t::status_to_str (status).c_str ());

// Updated the ranks
ctx->m_resources_down_updated = true;

done:
return rc;
}
Expand Down Expand Up @@ -1308,16 +1323,14 @@ static int populate_resource_db_acquire (std::shared_ptr<resource_ctx_t> &ctx)
static int populate_resource_db (std::shared_ptr<resource_ctx_t> &ctx)
{
int rc = -1;
double elapse;
struct timeval st, et;
std::chrono::time_point<std::chrono::system_clock> start;
std::chrono::duration<double> elapsed;

if (ctx->opts.get_opt ().is_reserve_vtx_vec_set ())
ctx->db->resource_graph.m_vertices.reserve (
ctx->opts.get_opt ().get_reserve_vtx_vec ());
if ( (rc = gettimeofday (&st, NULL)) < 0) {
flux_log_error (ctx->h, "%s: gettimeofday", __FUNCTION__);
goto done;
}

start = std::chrono::system_clock::now ();
if (ctx->opts.get_opt ().is_load_file_set ()) {
if (populate_resource_db_file (ctx) < 0)
goto done;
Expand All @@ -1335,11 +1348,9 @@ static int populate_resource_db (std::shared_ptr<resource_ctx_t> &ctx)
"%s: loaded resources from core's resource.acquire",
__FUNCTION__);
}
if ( (rc = gettimeofday (&et, NULL)) < 0) {
flux_log_error (ctx->h, "%s: gettimeofday", __FUNCTION__);
goto done;
}
ctx->perf.load = get_elapse_time (st, et);

elapsed = std::chrono::system_clock::now () - start;
ctx->perf.load = elapsed.count ();
rc = 0;

done:
Expand Down Expand Up @@ -1399,9 +1410,12 @@ static std::shared_ptr<f_resource_graph_t> create_filtered_graph (
edg_infra_map_t emap = get (&resource_relation_t::idata, g);

// Set vertex and edge filters based on subsystems to use
int subsys_size = ctx->db->metadata.roots.size ();
const multi_subsystemsS &filter = ctx->matcher->subsystemsS ();
subsystem_selector_t<vtx_t, f_vtx_infra_map_t> vtxsel (vmap, filter);
subsystem_selector_t<edg_t, f_edg_infra_map_t> edgsel (emap, filter);
subsystem_selector_t<vtx_t, f_vtx_infra_map_t> vtxsel (vmap, filter,
subsys_size);
subsystem_selector_t<edg_t, f_edg_infra_map_t> edgsel (emap, filter,
subsys_size);
fg = std::make_shared<f_resource_graph_t> (g, edgsel, vtxsel);
} catch (std::bad_alloc &e) {
errno = ENOMEM;
Expand Down Expand Up @@ -1706,15 +1720,12 @@ static int run_match (std::shared_ptr<resource_ctx_t> &ctx, int64_t jobid,
flux_error_t *errp)
{
int rc = 0;
double elapse = 0.0f;
struct timeval start;
struct timeval end;
std::chrono::time_point<std::chrono::system_clock> start;
std::chrono::duration<double> elapsed;
std::chrono::duration<int64_t> epoch;
bool rsv = false;

if ( (rc = gettimeofday (&start, NULL)) < 0) {
flux_log_error (ctx->h, "%s: gettimeofday", __FUNCTION__);
goto done;
}
start = std::chrono::system_clock::now ();
if (strcmp ("allocate", cmd) != 0
&& strcmp ("allocate_orelse_reserve", cmd) != 0
&& strcmp ("allocate_with_satisfiability", cmd) != 0
Expand All @@ -1725,7 +1736,9 @@ static int run_match (std::shared_ptr<resource_ctx_t> &ctx, int64_t jobid,
goto done;
}

*at = *now = (int64_t)start.tv_sec;
epoch = std::chrono::duration_cast<std::chrono::seconds>
(start.time_since_epoch ());
*at = *now = epoch.count ();
if ( (rc = run (ctx, jobid, cmd, jstr, at, errp)) < 0) {
goto done;
}
Expand All @@ -1735,11 +1748,8 @@ static int run_match (std::shared_ptr<resource_ctx_t> &ctx, int64_t jobid,
}

rsv = (*now != *at)? true : false;
if ( (rc = gettimeofday (&end, NULL)) < 0) {
flux_log_error (ctx->h, "%s: gettimeofday", __FUNCTION__);
goto done;
}
*ov = get_elapse_time (start, end);
elapsed = std::chrono::system_clock::now () - start;
*ov = elapsed.count ();
update_match_perf (ctx, *ov);

if (cmd != std::string ("satisfiability")) {
Expand All @@ -1761,16 +1771,12 @@ static int run_update (std::shared_ptr<resource_ctx_t> &ctx, int64_t jobid,
{
int rc = 0;
uint64_t duration = 0;
double elapse = 0.0f;
struct timeval start;
struct timeval end;
std::chrono::time_point<std::chrono::system_clock> start;
std::chrono::duration<double> elapsed;
std::string jgf;
std::string R2;

if ( (rc = gettimeofday (&start, NULL)) < 0) {
flux_log_error (ctx->h, "%s: gettimeofday", __FUNCTION__);
goto done;
}
start = std::chrono::system_clock::now ();
if ( (rc = parse_R (ctx, R, jgf, at, duration)) < 0) {
flux_log_error (ctx->h, "%s: parsing R", __FUNCTION__);
goto done;
Expand All @@ -1783,11 +1789,8 @@ static int run_update (std::shared_ptr<resource_ctx_t> &ctx, int64_t jobid,
flux_log_error (ctx->h, "%s: writers->emit", __FUNCTION__);
goto done;
}
if ( (rc = gettimeofday (&end, NULL)) < 0) {
flux_log_error (ctx->h, "%s: gettimeofday", __FUNCTION__);
goto done;
}
ov = get_elapse_time (start, end);
elapsed = std::chrono::system_clock::now () - start;
ov = elapsed.count ();
update_match_perf (ctx, ov);
if ( (rc = track_schedule_info (ctx, jobid, false, at, "", o, ov)) != 0) {
flux_log_error (ctx->h, "%s: can't add job info (id=%jd)",
Expand All @@ -1809,6 +1812,8 @@ static void update_request_cb (flux_t *h, flux_msg_handler_t *w,
uint64_t duration = 0;
std::string status = "";
std::stringstream o;
std::chrono::time_point<std::chrono::system_clock> start;
std::chrono::duration<double> elapsed;

std::shared_ptr<resource_ctx_t> ctx = getctx ((flux_t *)arg);
if (flux_request_unpack (msg, NULL, "{s:I s:s}",
Expand All @@ -1819,11 +1824,7 @@ static void update_request_cb (flux_t *h, flux_msg_handler_t *w,
}
if (is_existent_jobid (ctx, jobid)) {
int rc = 0;
struct timeval st, et;
if ( (rc = gettimeofday (&st, NULL)) < 0) {
flux_log_error (ctx->h, "%s: gettimeofday", __FUNCTION__);
goto error;
}
start = std::chrono::system_clock::now ();
if ( (rc = Rlite_equal (ctx, R, ctx->jobs[jobid]->R.c_str ())) < 0) {
flux_log_error (ctx->h, "%s: Rlite_equal", __FUNCTION__);
goto error;
Expand All @@ -1834,12 +1835,9 @@ static void update_request_cb (flux_t *h, flux_msg_handler_t *w,
__FUNCTION__, static_cast<intmax_t> (jobid));
goto error;
}
if ( (rc = gettimeofday (&et, NULL)) < 0) {
flux_log_error (ctx->h, "%s: gettimeofday", __FUNCTION__);
goto error;
}
elapsed = std::chrono::system_clock::now () - start;
// If a jobid with matching R exists, no need to update
ov = get_elapse_time (st, et);
ov = elapsed.count ();
get_jobstate_str (ctx->jobs[jobid]->state, status);
o << ctx->jobs[jobid]->R;
at = ctx->jobs[jobid]->scheduled_at;
Expand Down Expand Up @@ -2485,22 +2483,48 @@ static void status_request_cb (flux_t *h, flux_msg_handler_t *w,
json_t *R_all = nullptr;
json_t *R_down = nullptr;
json_t *R_alloc = nullptr;
std::chrono::time_point<std::chrono::system_clock> now;
std::chrono::duration<double> elapsed;
std::shared_ptr<resource_ctx_t> ctx = getctx ((flux_t *)arg);

if (run_find (ctx, "status=up or status=down", "rv1_nosched", &R_all) < 0)
goto error;
if (run_find (ctx, "status=down", "rv1_nosched", &R_down) < 0)
goto error;
if (run_find (ctx, "sched-now=allocated", "rv1_nosched", &R_alloc) < 0)
goto error;
now = std::chrono::system_clock::now ();
elapsed = now - ctx->m_resources_alloc_updated;
// Get R alloc whenever m_resources_alloc_updated or
// the elapsed time is greater than configured limit
if ( (elapsed.count () >
static_cast<double> (ctx->opts.get_opt ().get_update_interval ())) ||
ctx->m_resources_updated) {
if (run_find (ctx, "sched-now=allocated", "rv1_nosched", &R_alloc) < 0)
goto error;
ctx->m_r_alloc = json_deep_copy (R_alloc);
ctx->m_resources_alloc_updated = std::chrono::system_clock::now ();
} else
R_alloc = json_deep_copy (ctx->m_r_alloc);

if (ctx->m_resources_updated) {
if (run_find (ctx, "status=up or status=down", "rv1_nosched",
&R_all) < 0)
goto error;
ctx->m_r_all = json_deep_copy (R_all);
ctx->m_resources_updated = false;
} else
R_all = json_deep_copy (ctx->m_r_all);

if (ctx->m_resources_down_updated) {
if (run_find (ctx, "status=down", "rv1_nosched", &R_down) < 0)
goto error;
ctx->m_r_down = json_deep_copy (R_down);
ctx->m_resources_down_updated = false;
} else
R_down = json_deep_copy (ctx->m_r_down);

if (flux_respond_pack (h, msg, "{s:o? s:o? s:o?}",
"all", R_all,
"down", R_down,
"allocated", R_alloc) < 0) {
flux_log_error (h, "%s: flux_respond_pack", __FUNCTION__);
goto error;
}

flux_log (h, LOG_DEBUG, "%s: status succeeded", __FUNCTION__);
return;

Expand Down Expand Up @@ -2679,6 +2703,7 @@ static void set_status_request_cb (flux_t *h, flux_msg_handler_t *w,
errmsg = "Failed to set status of resource vertex";
goto error;
}
ctx->m_resources_down_updated = true;
if (flux_respond (h, msg, NULL) < 0) {
flux_log_error (h, "%s: flux_respond", __FUNCTION__);
}
Expand Down
Loading

0 comments on commit f9dc433

Please sign in to comment.