From f927565d28229691b39f873f40d2d8b9cacc425d Mon Sep 17 00:00:00 2001 From: Ping Xie Date: Thu, 30 May 2024 23:45:47 -0700 Subject: [PATCH] Consolidate various BLOCKED_WAIT* states (#562) There are currently three block types: BLOCKED_WAIT, BLOCKED_WAITAOF, and BLOCKED_WAIT_PREREPL, used to block clients executing `WAIT`, `WAITAOF`, and `CLUSTER SETSLOT`, respectively. They share the same workflow: the client is blocked until replication to the expected number of replicas completes. However, they provide different responses depending on the commands involved. Using distinct block types leads to code duplication and reduced readability. This PR consolidates the three types into a single WAIT type, differentiating them using the pending command to ensure the appropriate response is returned. Fix #427 --------- Signed-off-by: Ping Xie --- .gitignore | 2 ++ src/blocked.c | 50 ++++++++++++++++---------------------------- src/cluster_legacy.c | 6 ++++-- src/networking.c | 2 +- src/replication.c | 11 +++++----- src/server.h | 28 +++++++++++-------------- 6 files changed, 42 insertions(+), 57 deletions(-) diff --git a/.gitignore b/.gitignore index 8ed98aa326..e745f76a04 100644 --- a/.gitignore +++ b/.gitignore @@ -43,3 +43,5 @@ Makefile.dep compile_commands.json redis.code-workspace .cache +.cscope* +.swp diff --git a/src/blocked.c b/src/blocked.c index 0291505cb9..85ef9170a0 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -183,8 +183,7 @@ void queueClientForReprocessing(client *c) { void unblockClient(client *c, int queue_for_reprocessing) { if (c->bstate.btype == BLOCKED_LIST || c->bstate.btype == BLOCKED_ZSET || c->bstate.btype == BLOCKED_STREAM) { unblockClientWaitingData(c); - } else if (c->bstate.btype == BLOCKED_WAIT || c->bstate.btype == BLOCKED_WAITAOF || - c->bstate.btype == BLOCKED_WAIT_PREREPL) { + } else if (c->bstate.btype == BLOCKED_WAIT) { unblockClientWaitingReplicas(c); } else if (c->bstate.btype == BLOCKED_MODULE) { if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c); @@ -200,8 +199,7 @@ void unblockClient(client *c, int queue_for_reprocessing) { /* Reset the client for a new query, unless the client has pending command to process * or in case a shutdown operation was canceled and we are still in the processCommand sequence */ - if (!(c->flags & CLIENT_PENDING_COMMAND) && c->bstate.btype != BLOCKED_SHUTDOWN && - c->bstate.btype != BLOCKED_WAIT_PREREPL) { + if (!(c->flags & CLIENT_PENDING_COMMAND) && c->bstate.btype != BLOCKED_SHUTDOWN) { freeClientOriginalArgv(c); /* Clients that are not blocked on keys are not reprocessed so we must * call reqresAppendResponse here (for clients blocked on key, @@ -211,11 +209,11 @@ void unblockClient(client *c, int queue_for_reprocessing) { resetClient(c); } + /* We count blocked client stats on regular clients and not on module clients */ + if (!(c->flags & CLIENT_MODULE)) server.blocked_clients--; + server.blocked_clients_by_type[c->bstate.btype]--; /* Clear the flags, and put the client in the unblocked list so that * we'll process new commands in its query buffer ASAP. */ - if (!(c->flags & CLIENT_MODULE)) - server.blocked_clients--; /* We count blocked client stats on regular clients and not on module clients */ - server.blocked_clients_by_type[c->bstate.btype]--; c->flags &= ~CLIENT_BLOCKED; c->bstate.btype = BLOCKED_NONE; c->bstate.unblock_on_nokey = 0; @@ -231,15 +229,19 @@ void replyToBlockedClientTimedOut(client *c) { addReplyNullArray(c); updateStatsOnUnblock(c, 0, 0, 0); } else if (c->bstate.btype == BLOCKED_WAIT) { - addReplyLongLong(c, replicationCountAcksByOffset(c->bstate.reploffset)); - } else if (c->bstate.btype == BLOCKED_WAITAOF) { - addReplyArrayLen(c, 2); - addReplyLongLong(c, server.fsynced_reploff >= c->bstate.reploffset); - addReplyLongLong(c, replicationCountAOFAcksByOffset(c->bstate.reploffset)); + if (c->cmd->proc == waitCommand) { + addReplyLongLong(c, replicationCountAcksByOffset(c->bstate.reploffset)); + } else if (c->cmd->proc == waitaofCommand) { + addReplyArrayLen(c, 2); + addReplyLongLong(c, server.fsynced_reploff >= c->bstate.reploffset); + addReplyLongLong(c, replicationCountAOFAcksByOffset(c->bstate.reploffset)); + } else if (c->cmd->proc == clusterCommand) { + addReplyErrorObject(c, shared.noreplicaserr); + } else { + serverPanic("Unknown wait command %s in replyToBlockedClientTimedOut().", c->cmd->declared_name); + } } else if (c->bstate.btype == BLOCKED_MODULE) { moduleBlockedClientTimedOut(c, 0); - } else if (c->bstate.btype == BLOCKED_WAIT_PREREPL) { - addReplyErrorObject(c, shared.noreplicaserr); } else { serverPanic("Unknown btype in replyToBlockedClientTimedOut()."); } @@ -585,29 +587,13 @@ static void handleClientsBlockedOnKey(readyList *rl) { } /* block a client for replica acknowledgement */ -void blockClientForReplicaAck(client *c, mstime_t timeout, long long offset, long numreplicas, int btype, int numlocal) { +void blockClientForReplicaAck(client *c, mstime_t timeout, long long offset, long numreplicas, int numlocal) { c->bstate.timeout = timeout; c->bstate.reploffset = offset; c->bstate.numreplicas = numreplicas; c->bstate.numlocal = numlocal; listAddNodeHead(server.clients_waiting_acks, c); - blockClient(c, btype); -} - -/* block a client due to pre-replication */ -void blockForPreReplication(client *c, mstime_t timeout, long long offset, long numreplicas) { - blockClientForReplicaAck(c, timeout, offset, numreplicas, BLOCKED_WAIT_PREREPL, 0); - c->flags |= CLIENT_PENDING_COMMAND; -} - -/* block a client due to wait command */ -void blockForReplication(client *c, mstime_t timeout, long long offset, long numreplicas) { - blockClientForReplicaAck(c, timeout, offset, numreplicas, BLOCKED_WAIT, 0); -} - -/* block a client due to waitaof command */ -void blockForAofFsync(client *c, mstime_t timeout, long long offset, int numlocal, long numreplicas) { - blockClientForReplicaAck(c, timeout, offset, numreplicas, BLOCKED_WAITAOF, numlocal); + blockClient(c, BLOCKED_WAIT); } /* Postpone client from executing a command. For example the server might be busy diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 22fdb20cf7..0de6351e90 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -6018,7 +6018,7 @@ void clusterCommandSetSlot(client *c) { * This ensures that all replicas have the latest topology information, enabling * a reliable slot ownership transfer even if the primary node went down during * the process. */ - if (nodeIsMaster(myself) && myself->numslaves != 0 && (c->flags & CLIENT_PREREPL_DONE) == 0) { + if (nodeIsMaster(myself) && myself->numslaves != 0 && (c->flags & CLIENT_REPLICATION_DONE) == 0) { forceCommandPropagation(c, PROPAGATE_REPL); /* We are a primary and this is the first time we see this `SETSLOT` * command. Force-replicate the command to all of our replicas @@ -6028,7 +6028,9 @@ void clusterCommandSetSlot(client *c) { * 2. The repl offset target is set to the master's current repl offset + 1. * There is no concern of partial replication because replicas always * ack the repl offset at the command boundary. */ - blockForPreReplication(c, timeout_ms, server.master_repl_offset + 1, myself->numslaves); + blockClientForReplicaAck(c, timeout_ms, server.master_repl_offset + 1, myself->numslaves, 0); + /* Mark client as pending command for execution after replication to replicas. */ + c->flags |= CLIENT_PENDING_COMMAND; replicationRequestAckFromSlaves(); return; } diff --git a/src/networking.c b/src/networking.c index e062bc3aba..9274f21c05 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2068,7 +2068,7 @@ void resetClient(client *c) { c->multibulklen = 0; c->bulklen = -1; c->slot = -1; - c->flags &= ~(CLIENT_EXECUTING_COMMAND | CLIENT_PREREPL_DONE); + c->flags &= ~(CLIENT_EXECUTING_COMMAND | CLIENT_REPLICATION_DONE); /* Make sure the duration has been recorded to some command. */ serverAssert(c->duration == 0); diff --git a/src/replication.c b/src/replication.c index 1d5e0fe290..375b637f61 100644 --- a/src/replication.c +++ b/src/replication.c @@ -3457,7 +3457,7 @@ void waitCommand(client *c) { /* Otherwise block the client and put it into our list of clients * waiting for ack from slaves. */ - blockForReplication(c, timeout, offset, numreplicas); + blockClientForReplicaAck(c, timeout, offset, numreplicas, 0); /* Make sure that the server will send an ACK request to all the slaves * before returning to the event loop. */ @@ -3497,7 +3497,7 @@ void waitaofCommand(client *c) { /* Otherwise block the client and put it into our list of clients * waiting for ack from slaves. */ - blockForAofFsync(c, timeout, c->woff, numlocal, numreplicas); + blockClientForReplicaAck(c, timeout, c->woff, numreplicas, numlocal); /* Make sure that the server will send an ACK request to all the slaves * before returning to the event loop. */ @@ -3532,8 +3532,7 @@ void processClientsWaitingReplicas(void) { int numreplicas = 0; client *c = ln->value; - int is_wait_aof = c->bstate.btype == BLOCKED_WAITAOF; - int is_wait_prerepl = c->bstate.btype == BLOCKED_WAIT_PREREPL; + int is_wait_aof = c->cmd->proc == waitaofCommand; if (is_wait_aof && c->bstate.numlocal && !server.aof_enabled) { addReplyError(c, "WAITAOF cannot be used when numlocal is set but appendonly is disabled."); @@ -3580,8 +3579,8 @@ void processClientsWaitingReplicas(void) { addReplyArrayLen(c, 2); addReplyLongLong(c, numlocal); addReplyLongLong(c, numreplicas); - } else if (is_wait_prerepl) { - c->flags |= CLIENT_PREREPL_DONE; + } else if (c->flags & CLIENT_PENDING_COMMAND) { + c->flags |= CLIENT_REPLICATION_DONE; } else { addReplyLongLong(c, numreplicas); } diff --git a/src/server.h b/src/server.h index 249d896d35..2bacc991e1 100644 --- a/src/server.h +++ b/src/server.h @@ -426,23 +426,21 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT]; #define CLIENT_MODULE_PREVENT_AOF_PROP (1ULL << 48) /* Module client do not want to propagate to AOF */ #define CLIENT_MODULE_PREVENT_REPL_PROP (1ULL << 49) /* Module client do not want to propagate to replica */ #define CLIENT_REPROCESSING_COMMAND (1ULL << 50) /* The client is re-processing the command. */ -#define CLIENT_PREREPL_DONE (1ULL << 51) /* Indicate that pre-replication has been done on the client */ +#define CLIENT_REPLICATION_DONE (1ULL << 51) /* Indicate that replication has been done on the client */ /* Client block type (btype field in client structure) * if CLIENT_BLOCKED flag is set. */ typedef enum blocking_type { - BLOCKED_NONE, /* Not blocked, no CLIENT_BLOCKED flag set. */ - BLOCKED_LIST, /* BLPOP & co. */ - BLOCKED_WAIT, /* WAIT for synchronous replication. */ - BLOCKED_WAITAOF, /* WAITAOF for AOF file fsync. */ - BLOCKED_MODULE, /* Blocked by a loadable module. */ - BLOCKED_STREAM, /* XREAD. */ - BLOCKED_ZSET, /* BZPOP et al. */ - BLOCKED_POSTPONE, /* Blocked by processCommand, re-try processing later. */ - BLOCKED_SHUTDOWN, /* SHUTDOWN. */ - BLOCKED_WAIT_PREREPL, /* WAIT for pre-replication and then run the command. */ - BLOCKED_NUM, /* Number of blocked states. */ - BLOCKED_END /* End of enumeration */ + BLOCKED_NONE, /* Not blocked, no CLIENT_BLOCKED flag set. */ + BLOCKED_LIST, /* BLPOP & co. */ + BLOCKED_WAIT, /* WAIT for synchronous replication. */ + BLOCKED_MODULE, /* Blocked by a loadable module. */ + BLOCKED_STREAM, /* XREAD. */ + BLOCKED_ZSET, /* BZPOP et al. */ + BLOCKED_POSTPONE, /* Blocked by processCommand, re-try processing later. */ + BLOCKED_SHUTDOWN, /* SHUTDOWN. */ + BLOCKED_NUM, /* Number of blocked states. */ + BLOCKED_END /* End of enumeration */ } blocking_type; /* Client request types */ @@ -3498,9 +3496,7 @@ void signalKeyAsReady(serverDb *db, robj *key, int type); void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, int unblock_on_nokey); void blockClientShutdown(client *c); void blockPostponeClient(client *c); -void blockForReplication(client *c, mstime_t timeout, long long offset, long numreplicas); -void blockForPreReplication(client *c, mstime_t timeout, long long offset, long numreplicas); -void blockForAofFsync(client *c, mstime_t timeout, long long offset, int numlocal, long numreplicas); +void blockClientForReplicaAck(client *c, mstime_t timeout, long long offset, long numreplicas, int numlocal); void replicationRequestAckFromSlaves(void); void signalDeletedKeyAsReady(serverDb *db, robj *key, int type); void updateStatsOnUnblock(client *c, long blocked_us, long reply_us, int had_errors);