Skip to content

Commit

Permalink
Consolidate various BLOCKED_WAIT* states (valkey-io#562)
Browse files Browse the repository at this point in the history
There are currently three block types: BLOCKED_WAIT, BLOCKED_WAITAOF,
and BLOCKED_WAIT_PREREPL, used to block clients executing `WAIT`,
`WAITAOF`, and `CLUSTER SETSLOT`, respectively. They share the same
workflow: the client is blocked until replication to the expected number
of replicas completes. However, they provide different responses
depending on the commands involved. Using distinct block types leads to
code duplication and reduced readability. This PR consolidates the three
types into a single WAIT type, differentiating them using the pending
command to ensure the appropriate response is returned.


Fix valkey-io#427

---------

Signed-off-by: Ping Xie <[email protected]>
  • Loading branch information
PingXie authored May 31, 2024
1 parent 6fb90ad commit f927565
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 57 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,5 @@ Makefile.dep
compile_commands.json
redis.code-workspace
.cache
.cscope*
.swp
50 changes: 18 additions & 32 deletions src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,7 @@ void queueClientForReprocessing(client *c) {
void unblockClient(client *c, int queue_for_reprocessing) {
if (c->bstate.btype == BLOCKED_LIST || c->bstate.btype == BLOCKED_ZSET || c->bstate.btype == BLOCKED_STREAM) {
unblockClientWaitingData(c);
} else if (c->bstate.btype == BLOCKED_WAIT || c->bstate.btype == BLOCKED_WAITAOF ||
c->bstate.btype == BLOCKED_WAIT_PREREPL) {
} else if (c->bstate.btype == BLOCKED_WAIT) {
unblockClientWaitingReplicas(c);
} else if (c->bstate.btype == BLOCKED_MODULE) {
if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c);
Expand All @@ -200,8 +199,7 @@ void unblockClient(client *c, int queue_for_reprocessing) {

/* Reset the client for a new query, unless the client has pending command to process
* or in case a shutdown operation was canceled and we are still in the processCommand sequence */
if (!(c->flags & CLIENT_PENDING_COMMAND) && c->bstate.btype != BLOCKED_SHUTDOWN &&
c->bstate.btype != BLOCKED_WAIT_PREREPL) {
if (!(c->flags & CLIENT_PENDING_COMMAND) && c->bstate.btype != BLOCKED_SHUTDOWN) {
freeClientOriginalArgv(c);
/* Clients that are not blocked on keys are not reprocessed so we must
* call reqresAppendResponse here (for clients blocked on key,
Expand All @@ -211,11 +209,11 @@ void unblockClient(client *c, int queue_for_reprocessing) {
resetClient(c);
}

/* We count blocked client stats on regular clients and not on module clients */
if (!(c->flags & CLIENT_MODULE)) server.blocked_clients--;
server.blocked_clients_by_type[c->bstate.btype]--;
/* Clear the flags, and put the client in the unblocked list so that
* we'll process new commands in its query buffer ASAP. */
if (!(c->flags & CLIENT_MODULE))
server.blocked_clients--; /* We count blocked client stats on regular clients and not on module clients */
server.blocked_clients_by_type[c->bstate.btype]--;
c->flags &= ~CLIENT_BLOCKED;
c->bstate.btype = BLOCKED_NONE;
c->bstate.unblock_on_nokey = 0;
Expand All @@ -231,15 +229,19 @@ void replyToBlockedClientTimedOut(client *c) {
addReplyNullArray(c);
updateStatsOnUnblock(c, 0, 0, 0);
} else if (c->bstate.btype == BLOCKED_WAIT) {
addReplyLongLong(c, replicationCountAcksByOffset(c->bstate.reploffset));
} else if (c->bstate.btype == BLOCKED_WAITAOF) {
addReplyArrayLen(c, 2);
addReplyLongLong(c, server.fsynced_reploff >= c->bstate.reploffset);
addReplyLongLong(c, replicationCountAOFAcksByOffset(c->bstate.reploffset));
if (c->cmd->proc == waitCommand) {
addReplyLongLong(c, replicationCountAcksByOffset(c->bstate.reploffset));
} else if (c->cmd->proc == waitaofCommand) {
addReplyArrayLen(c, 2);
addReplyLongLong(c, server.fsynced_reploff >= c->bstate.reploffset);
addReplyLongLong(c, replicationCountAOFAcksByOffset(c->bstate.reploffset));
} else if (c->cmd->proc == clusterCommand) {
addReplyErrorObject(c, shared.noreplicaserr);
} else {
serverPanic("Unknown wait command %s in replyToBlockedClientTimedOut().", c->cmd->declared_name);
}
} else if (c->bstate.btype == BLOCKED_MODULE) {
moduleBlockedClientTimedOut(c, 0);
} else if (c->bstate.btype == BLOCKED_WAIT_PREREPL) {
addReplyErrorObject(c, shared.noreplicaserr);
} else {
serverPanic("Unknown btype in replyToBlockedClientTimedOut().");
}
Expand Down Expand Up @@ -585,29 +587,13 @@ static void handleClientsBlockedOnKey(readyList *rl) {
}

/* block a client for replica acknowledgement */
void blockClientForReplicaAck(client *c, mstime_t timeout, long long offset, long numreplicas, int btype, int numlocal) {
void blockClientForReplicaAck(client *c, mstime_t timeout, long long offset, long numreplicas, int numlocal) {
c->bstate.timeout = timeout;
c->bstate.reploffset = offset;
c->bstate.numreplicas = numreplicas;
c->bstate.numlocal = numlocal;
listAddNodeHead(server.clients_waiting_acks, c);
blockClient(c, btype);
}

/* block a client due to pre-replication */
void blockForPreReplication(client *c, mstime_t timeout, long long offset, long numreplicas) {
blockClientForReplicaAck(c, timeout, offset, numreplicas, BLOCKED_WAIT_PREREPL, 0);
c->flags |= CLIENT_PENDING_COMMAND;
}

/* block a client due to wait command */
void blockForReplication(client *c, mstime_t timeout, long long offset, long numreplicas) {
blockClientForReplicaAck(c, timeout, offset, numreplicas, BLOCKED_WAIT, 0);
}

/* block a client due to waitaof command */
void blockForAofFsync(client *c, mstime_t timeout, long long offset, int numlocal, long numreplicas) {
blockClientForReplicaAck(c, timeout, offset, numreplicas, BLOCKED_WAITAOF, numlocal);
blockClient(c, BLOCKED_WAIT);
}

/* Postpone client from executing a command. For example the server might be busy
Expand Down
6 changes: 4 additions & 2 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -6018,7 +6018,7 @@ void clusterCommandSetSlot(client *c) {
* This ensures that all replicas have the latest topology information, enabling
* a reliable slot ownership transfer even if the primary node went down during
* the process. */
if (nodeIsMaster(myself) && myself->numslaves != 0 && (c->flags & CLIENT_PREREPL_DONE) == 0) {
if (nodeIsMaster(myself) && myself->numslaves != 0 && (c->flags & CLIENT_REPLICATION_DONE) == 0) {
forceCommandPropagation(c, PROPAGATE_REPL);
/* We are a primary and this is the first time we see this `SETSLOT`
* command. Force-replicate the command to all of our replicas
Expand All @@ -6028,7 +6028,9 @@ void clusterCommandSetSlot(client *c) {
* 2. The repl offset target is set to the master's current repl offset + 1.
* There is no concern of partial replication because replicas always
* ack the repl offset at the command boundary. */
blockForPreReplication(c, timeout_ms, server.master_repl_offset + 1, myself->numslaves);
blockClientForReplicaAck(c, timeout_ms, server.master_repl_offset + 1, myself->numslaves, 0);
/* Mark client as pending command for execution after replication to replicas. */
c->flags |= CLIENT_PENDING_COMMAND;
replicationRequestAckFromSlaves();
return;
}
Expand Down
2 changes: 1 addition & 1 deletion src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -2068,7 +2068,7 @@ void resetClient(client *c) {
c->multibulklen = 0;
c->bulklen = -1;
c->slot = -1;
c->flags &= ~(CLIENT_EXECUTING_COMMAND | CLIENT_PREREPL_DONE);
c->flags &= ~(CLIENT_EXECUTING_COMMAND | CLIENT_REPLICATION_DONE);

/* Make sure the duration has been recorded to some command. */
serverAssert(c->duration == 0);
Expand Down
11 changes: 5 additions & 6 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -3457,7 +3457,7 @@ void waitCommand(client *c) {

/* Otherwise block the client and put it into our list of clients
* waiting for ack from slaves. */
blockForReplication(c, timeout, offset, numreplicas);
blockClientForReplicaAck(c, timeout, offset, numreplicas, 0);

/* Make sure that the server will send an ACK request to all the slaves
* before returning to the event loop. */
Expand Down Expand Up @@ -3497,7 +3497,7 @@ void waitaofCommand(client *c) {

/* Otherwise block the client and put it into our list of clients
* waiting for ack from slaves. */
blockForAofFsync(c, timeout, c->woff, numlocal, numreplicas);
blockClientForReplicaAck(c, timeout, c->woff, numreplicas, numlocal);

/* Make sure that the server will send an ACK request to all the slaves
* before returning to the event loop. */
Expand Down Expand Up @@ -3532,8 +3532,7 @@ void processClientsWaitingReplicas(void) {
int numreplicas = 0;

client *c = ln->value;
int is_wait_aof = c->bstate.btype == BLOCKED_WAITAOF;
int is_wait_prerepl = c->bstate.btype == BLOCKED_WAIT_PREREPL;
int is_wait_aof = c->cmd->proc == waitaofCommand;

if (is_wait_aof && c->bstate.numlocal && !server.aof_enabled) {
addReplyError(c, "WAITAOF cannot be used when numlocal is set but appendonly is disabled.");
Expand Down Expand Up @@ -3580,8 +3579,8 @@ void processClientsWaitingReplicas(void) {
addReplyArrayLen(c, 2);
addReplyLongLong(c, numlocal);
addReplyLongLong(c, numreplicas);
} else if (is_wait_prerepl) {
c->flags |= CLIENT_PREREPL_DONE;
} else if (c->flags & CLIENT_PENDING_COMMAND) {
c->flags |= CLIENT_REPLICATION_DONE;
} else {
addReplyLongLong(c, numreplicas);
}
Expand Down
28 changes: 12 additions & 16 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -426,23 +426,21 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
#define CLIENT_MODULE_PREVENT_AOF_PROP (1ULL << 48) /* Module client do not want to propagate to AOF */
#define CLIENT_MODULE_PREVENT_REPL_PROP (1ULL << 49) /* Module client do not want to propagate to replica */
#define CLIENT_REPROCESSING_COMMAND (1ULL << 50) /* The client is re-processing the command. */
#define CLIENT_PREREPL_DONE (1ULL << 51) /* Indicate that pre-replication has been done on the client */
#define CLIENT_REPLICATION_DONE (1ULL << 51) /* Indicate that replication has been done on the client */

/* Client block type (btype field in client structure)
* if CLIENT_BLOCKED flag is set. */
typedef enum blocking_type {
BLOCKED_NONE, /* Not blocked, no CLIENT_BLOCKED flag set. */
BLOCKED_LIST, /* BLPOP & co. */
BLOCKED_WAIT, /* WAIT for synchronous replication. */
BLOCKED_WAITAOF, /* WAITAOF for AOF file fsync. */
BLOCKED_MODULE, /* Blocked by a loadable module. */
BLOCKED_STREAM, /* XREAD. */
BLOCKED_ZSET, /* BZPOP et al. */
BLOCKED_POSTPONE, /* Blocked by processCommand, re-try processing later. */
BLOCKED_SHUTDOWN, /* SHUTDOWN. */
BLOCKED_WAIT_PREREPL, /* WAIT for pre-replication and then run the command. */
BLOCKED_NUM, /* Number of blocked states. */
BLOCKED_END /* End of enumeration */
BLOCKED_NONE, /* Not blocked, no CLIENT_BLOCKED flag set. */
BLOCKED_LIST, /* BLPOP & co. */
BLOCKED_WAIT, /* WAIT for synchronous replication. */
BLOCKED_MODULE, /* Blocked by a loadable module. */
BLOCKED_STREAM, /* XREAD. */
BLOCKED_ZSET, /* BZPOP et al. */
BLOCKED_POSTPONE, /* Blocked by processCommand, re-try processing later. */
BLOCKED_SHUTDOWN, /* SHUTDOWN. */
BLOCKED_NUM, /* Number of blocked states. */
BLOCKED_END /* End of enumeration */
} blocking_type;

/* Client request types */
Expand Down Expand Up @@ -3498,9 +3496,7 @@ void signalKeyAsReady(serverDb *db, robj *key, int type);
void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, int unblock_on_nokey);
void blockClientShutdown(client *c);
void blockPostponeClient(client *c);
void blockForReplication(client *c, mstime_t timeout, long long offset, long numreplicas);
void blockForPreReplication(client *c, mstime_t timeout, long long offset, long numreplicas);
void blockForAofFsync(client *c, mstime_t timeout, long long offset, int numlocal, long numreplicas);
void blockClientForReplicaAck(client *c, mstime_t timeout, long long offset, long numreplicas, int numlocal);
void replicationRequestAckFromSlaves(void);
void signalDeletedKeyAsReady(serverDb *db, robj *key, int type);
void updateStatsOnUnblock(client *c, long blocked_us, long reply_us, int had_errors);
Expand Down

0 comments on commit f927565

Please sign in to comment.