Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add paused_actions for INFO Clients #1519

Merged
merged 5 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -4613,6 +4613,16 @@ void flushReplicasOutputBuffers(void) {
}
}

mstime_t getPausedActionTimeout(uint32_t action) {
mstime_t timeout = 0;
for (int i = 0; i < NUM_PAUSE_PURPOSES; i++) {
pause_event *p = &(server.client_pause_per_purpose[i]);
if (p->paused_actions & action && (p->end - server.mstime) > timeout)
timeout = p->end - server.mstime;
}
return timeout;
}

/* Compute current paused actions and its end time, aggregated for
* all pause purposes. */
void updatePausedActions(void) {
Expand Down
15 changes: 14 additions & 1 deletion src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -5649,6 +5649,17 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) {
unsigned long blocking_keys, blocking_keys_on_nokey, watched_keys;
getExpansiveClientsInfo(&maxin, &maxout);
totalNumberOfStatefulKeys(&blocking_keys, &blocking_keys_on_nokey, &watched_keys);

char *paused_actions = "none";
long long paused_timeout = 0;
if (server.paused_actions & PAUSE_ACTION_CLIENT_ALL) {
paused_actions = "all";
paused_timeout = getPausedActionTimeout(PAUSE_ACTION_CLIENT_ALL);
} else if (server.paused_actions & PAUSE_ACTION_CLIENT_WRITE) {
paused_actions = "write";
paused_timeout = getPausedActionTimeout(PAUSE_ACTION_CLIENT_WRITE);
}

if (sections++) info = sdscat(info, "\r\n");
info = sdscatprintf(
info,
Expand All @@ -5665,7 +5676,9 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) {
"clients_in_timeout_table:%llu\r\n", (unsigned long long)raxSize(server.clients_timeout_table),
"total_watched_keys:%lu\r\n", watched_keys,
"total_blocking_keys:%lu\r\n", blocking_keys,
"total_blocking_keys_on_nokey:%lu\r\n", blocking_keys_on_nokey));
"total_blocking_keys_on_nokey:%lu\r\n", blocking_keys_on_nokey,
"paused_actions:%s\r\n", paused_actions,
"paused_timeout_milliseconds:%lld\r\n", paused_timeout));
}

/* Memory */
Expand Down
1 change: 1 addition & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2701,6 +2701,7 @@ void pauseActions(pause_purpose purpose, mstime_t end, uint32_t actions);
void unpauseActions(pause_purpose purpose);
uint32_t isPausedActions(uint32_t action_bitmask);
uint32_t isPausedActionsWithUpdate(uint32_t action_bitmask);
mstime_t getPausedActionTimeout(uint32_t action);
void updatePausedActions(void);
void unblockPostponedClients(void);
void processEventsWhileBlocked(void);
Expand Down
19 changes: 19 additions & 0 deletions tests/unit/pause.tcl
Original file line number Diff line number Diff line change
@@ -1,4 +1,23 @@
start_server {tags {"pause network"}} {
test "Test check paused_actions in info stats" {
assert_equal [s paused_actions] "none"
assert_equal [s paused_timeout_milliseconds] 0

r client PAUSE 10000 WRITE
assert_equal [s paused_actions] "write"
after 1000
set timeout [s paused_timeout_milliseconds]
assert {$timeout > 0 && $timeout < 9000}
r client unpause

r multi
r client PAUSE 1000 ALL
r info clients
assert_match "*paused_actions:all*" [r exec]

r client unpause
}

test "Test read commands are not blocked by client pause" {
r client PAUSE 100000 WRITE
set rd [valkey_deferring_client]
Expand Down
Loading