Skip to content

Commit

Permalink
add paused_actions for INFO Clients (#1519)
Browse files Browse the repository at this point in the history
Add `paused_actions` and `paused_timeout_milliseconds` for INFO Clients
to inform users about if clients are paused.

---------

Signed-off-by: zhaozhao.zz <[email protected]>
  • Loading branch information
soloestoy authored Jan 14, 2025
1 parent 2a1a65b commit c5a1585
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 1 deletion.
10 changes: 10 additions & 0 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -4520,6 +4520,16 @@ void flushReplicasOutputBuffers(void) {
}
}

mstime_t getPausedActionTimeout(uint32_t action) {
mstime_t timeout = 0;
for (int i = 0; i < NUM_PAUSE_PURPOSES; i++) {
pause_event *p = &(server.client_pause_per_purpose[i]);
if (p->paused_actions & action && (p->end - server.mstime) > timeout)
timeout = p->end - server.mstime;
}
return timeout;
}

/* Compute current paused actions and its end time, aggregated for
* all pause purposes. */
void updatePausedActions(void) {
Expand Down
15 changes: 14 additions & 1 deletion src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -5668,6 +5668,17 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) {
unsigned long blocking_keys, blocking_keys_on_nokey, watched_keys;
getExpansiveClientsInfo(&maxin, &maxout);
totalNumberOfStatefulKeys(&blocking_keys, &blocking_keys_on_nokey, &watched_keys);

char *paused_actions = "none";
long long paused_timeout = 0;
if (server.paused_actions & PAUSE_ACTION_CLIENT_ALL) {
paused_actions = "all";
paused_timeout = getPausedActionTimeout(PAUSE_ACTION_CLIENT_ALL);
} else if (server.paused_actions & PAUSE_ACTION_CLIENT_WRITE) {
paused_actions = "write";
paused_timeout = getPausedActionTimeout(PAUSE_ACTION_CLIENT_WRITE);
}

if (sections++) info = sdscat(info, "\r\n");
info = sdscatprintf(
info,
Expand All @@ -5684,7 +5695,9 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) {
"clients_in_timeout_table:%llu\r\n", (unsigned long long)raxSize(server.clients_timeout_table),
"total_watched_keys:%lu\r\n", watched_keys,
"total_blocking_keys:%lu\r\n", blocking_keys,
"total_blocking_keys_on_nokey:%lu\r\n", blocking_keys_on_nokey));
"total_blocking_keys_on_nokey:%lu\r\n", blocking_keys_on_nokey,
"paused_actions:%s\r\n", paused_actions,
"paused_timeout_milliseconds:%lld\r\n", paused_timeout));
}

/* Memory */
Expand Down
1 change: 1 addition & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2714,6 +2714,7 @@ void pauseActions(pause_purpose purpose, mstime_t end, uint32_t actions);
void unpauseActions(pause_purpose purpose);
uint32_t isPausedActions(uint32_t action_bitmask);
uint32_t isPausedActionsWithUpdate(uint32_t action_bitmask);
mstime_t getPausedActionTimeout(uint32_t action);
void updatePausedActions(void);
void unblockPostponedClients(void);
void processEventsWhileBlocked(void);
Expand Down
19 changes: 19 additions & 0 deletions tests/unit/pause.tcl
Original file line number Diff line number Diff line change
@@ -1,4 +1,23 @@
start_server {tags {"pause network"}} {
test "Test check paused_actions in info stats" {
assert_equal [s paused_actions] "none"
assert_equal [s paused_timeout_milliseconds] 0

r client PAUSE 10000 WRITE
assert_equal [s paused_actions] "write"
after 1000
set timeout [s paused_timeout_milliseconds]
assert {$timeout > 0 && $timeout < 9000}
r client unpause

r multi
r client PAUSE 1000 ALL
r info clients
assert_match "*paused_actions:all*" [r exec]

r client unpause
}

test "Test read commands are not blocked by client pause" {
r client PAUSE 100000 WRITE
set rd [valkey_deferring_client]
Expand Down

0 comments on commit c5a1585

Please sign in to comment.