From 545869eb7d9365bf8e41683f6d2c711882b071aa Mon Sep 17 00:00:00 2001 From: Su Ko Date: Sun, 23 Nov 2025 16:21:39 +0900 Subject: [PATCH] Add replica sync checks before rebalancing in cluster manager Signed-off-by: Su Ko --- src/valkey-cli.c | 102 ++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 101 insertions(+), 1 deletion(-) diff --git a/src/valkey-cli.c b/src/valkey-cli.c index c902d1255f..fd919f3a38 100644 --- a/src/valkey-cli.c +++ b/src/valkey-cli.c @@ -3714,7 +3714,8 @@ static void clusterManagerNodeArrayInit(clusterManagerNodeArray *array, int len) static void clusterManagerNodeArrayReset(clusterManagerNodeArray *array); static void clusterManagerNodeArrayShift(clusterManagerNodeArray *array, clusterManagerNode **nodeptr); static void clusterManagerNodeArrayAdd(clusterManagerNodeArray *array, clusterManagerNode *node); - +static int clusterManagerIsReplicaSynced(clusterManagerNode *node, char **err); +static int clusterManagerWaitReplicasFullSync(long long timeout_ms); /* Cluster Manager commands. */ static int clusterManagerCommandCreate(int argc, char **argv); @@ -6786,6 +6787,95 @@ static void clusterManagerMode(clusterManagerCommandProc *proc) { exit(success ? 0 : 1); } +static int clusterManagerIsReplicaSynced(clusterManagerNode *node, char **err) { + if (err) *err = NULL; + + if (node == NULL || node->replicate == NULL) { + if (err) *err = sdsnew("Node is not a replica"); + return 0; + } + + valkeyReply *r = CLUSTER_MANAGER_COMMAND(node, "INFO replication"); + if (!clusterManagerCheckValkeyReply(node, r, err)) { + if (r) freeReplyObject(r); + return 0; + } + + if (r->type != VALKEY_REPLY_STRING) { + if (err) *err = sdsnew("Unexpected INFO replication reply type"); + freeReplyObject(r); + return 0; + } + + int link_up = 0; + int sync_in_progress = 0; + + char *str = r->str; + char *p = strstr(str, "master_link_status:"); + if (p) { + p += strlen("master_link_status:"); + if (!strncmp(p, "up", 2)) link_up = 1; + } + + p = strstr(str, "master_sync_in_progress:"); + if (p) { + p += strlen("master_sync_in_progress:"); + sync_in_progress = (int)strtol(p, NULL, 10); + } + + freeReplyObject(r); + + if (link_up && sync_in_progress == 0) + return 1; + + if (err) *err = sdsnew("Replica still syncing"); + return 0; +} + +static int clusterManagerWaitReplicasFullSync(long long timeout_ms) { + long long start = mstime(); + long long backoff = 100; + long long max_backoff = 2000; + + while (mstime() - start < timeout_ms) { + + int all_good = 1; + listIter li; + listNode *ln; + + listRewind(cluster_manager.nodes, &li); + while ((ln = listNext(&li)) != NULL) { + clusterManagerNode *node = ln->value; + + if (node->replicate == NULL) continue; + + char *err = NULL; + if (!clusterManagerIsReplicaSynced(node, &err)) { + all_good = 0; + + if (config.verbose) { + if (err) + clusterManagerLogInfo("[replica-sync] %s:%d not synced: %s\n", + node->ip, node->port, err); + else + clusterManagerLogInfo("[replica-sync] %s:%d not synced\n", + node->ip, node->port); + } + if (err) sdsfree(err); + } + } + + if (all_good) return 1; + + usleep(backoff * 1000); + backoff = backoff * 2; + if (backoff > max_backoff) backoff = max_backoff; + } + + clusterManagerLogErr("[ERR] replica full-sync wait timed-out.\n"); + return 0; +} + /* Cluster Manager Commands */ static int clusterManagerCommandCreate(int argc, char **argv) { @@ -7567,6 +7657,16 @@ static int clusterManagerCommandRebalance(int argc, char **argv) { result = 0; goto cleanup; } + + clusterManagerLogInfo(">>> Checking replica sync state before rebalancing...\n"); + + if (!clusterManagerWaitReplicasFullSync(30000)) { /* 30 seconds timeout */ + clusterManagerLogErr("[ERR] Replica sync incomplete. Aborting rebalance.\n"); + return 0; + } + + clusterManagerLogInfo(">>> All replicas are online. Safe to proceed with rebalancing.\n"); + /* Calculate the slots balance for each node. It's the number of * slots the node should lose (if positive) or gain (if negative) * in order to be balanced. */