Skip to content

Commit

Permalink
add a check to skip checksum for diskless replication
Browse files Browse the repository at this point in the history
Signed-off-by: Shivshankar-Reddy <[email protected]>
  • Loading branch information
Shivshankar-Reddy committed Oct 17, 2024
1 parent 0142198 commit c11b6ce
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 3 deletions.
7 changes: 5 additions & 2 deletions src/rdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -1407,7 +1407,7 @@ int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
long key_counter = 0;
int j;

if (server.rdb_checksum) rdb->update_cksum = rioGenericUpdateChecksum;
if (server.rdb_checksum && !(rdbflags & RDBFLAGS_CKSUM_SKIP)) rdb->update_cksum = rioGenericUpdateChecksum;
snprintf(magic, sizeof(magic), "REDIS%04d", RDB_VERSION);
if (rdbWriteRaw(rdb, magic, 9) == -1) goto werr;
if (rdbSaveInfoAuxFields(rdb, rdbflags, rsi) == -1) goto werr;
Expand Down Expand Up @@ -1451,14 +1451,17 @@ int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
* without doing any processing of the content. */
int rdbSaveRioWithEOFMark(int req, rio *rdb, int *error, rdbSaveInfo *rsi) {
char eofmark[RDB_EOF_MARK_SIZE];
int skip_cksum_repl = RDBFLAGS_REPLICATION;

startSaving(RDBFLAGS_REPLICATION);
getRandomHexChars(eofmark, RDB_EOF_MARK_SIZE);
if (error) *error = 0;
if (rioWrite(rdb, "$EOF:", 5) == 0) goto werr;
if (rioWrite(rdb, eofmark, RDB_EOF_MARK_SIZE) == 0) goto werr;
if (rioWrite(rdb, "\r\n", 2) == 0) goto werr;
if (rdbSaveRio(req, rdb, error, RDBFLAGS_REPLICATION, rsi) == C_ERR) goto werr;
if (server.repl_diskless_sync && req & REPLICA_REQ_CHKSUM_SKIP)
skip_cksum_repl |= RDBFLAGS_CKSUM_SKIP;
if (rdbSaveRio(req, rdb, error, skip_cksum_repl, rsi) == C_ERR) goto werr;
if (rioWrite(rdb, eofmark, RDB_EOF_MARK_SIZE) == 0) goto werr;
stopSaving(1);
return C_OK;
Expand Down
1 change: 1 addition & 0 deletions src/rdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@
#define RDBFLAGS_ALLOW_DUP (1 << 2) /* Allow duplicated keys when loading.*/
#define RDBFLAGS_FEED_REPL (1 << 3) /* Feed replication stream when loading.*/
#define RDBFLAGS_KEEP_CACHE (1 << 4) /* Don't reclaim cache after rdb file is generated */
#define RDBFLAGS_CKSUM_SKIP (1 << 5) /* Skip checksum for diskless sync. */

/* When rdbLoadObject() returns NULL, the err flag is
* set to hold the type of error that occurred */
Expand Down
19 changes: 18 additions & 1 deletion src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -1109,9 +1109,11 @@ void syncCommand(client *c) {
}

if (primaryTryPartialResynchronization(c, psync_offset) == C_OK) {
serverLog(LL_NOTICE, "===> completed partial resync");
server.stat_sync_partial_ok++;
return; /* No full resync needed, return. */
} else {
serverLog(LL_NOTICE, "===> completed partial resync with failure");
char *primary_replid = c->argv[1]->ptr;

/* Increment stats for failed PSYNCs, but only if the
Expand Down Expand Up @@ -1271,6 +1273,9 @@ void syncCommand(client *c) {
* - rdb-channel <1|0>
* Used to identify the client as a replica's rdb connection in an dual channel
* sync session.
*
* - repl-diskless-load <1|0>
* Replica is capable of load data from replication stream, request to skip checksum.
* */
void replconfCommand(client *c) {
int j;
Expand Down Expand Up @@ -1313,6 +1318,13 @@ void replconfCommand(client *c) {
* replconf option. */
c->replica_capa |= REPLICA_CAPA_DUAL_CHANNEL;
}
} else if (!strcasecmp(c->argv[j]->ptr, "repl-diskless-load")) {
/* REPLCONF repl-diskless-load is used to identify the client is capable of
* load directly without creating rdb file */
long rdb_diskless_load = 0;
if (getRangeLongFromObjectOrReply(c, c->argv[j + 1], 0, 1, &rdb_diskless_load, NULL) != C_OK) return;
if (rdb_diskless_load == 1)
c->replica_req |= REPLICA_REQ_CHKSUM_SKIP;
} else if (!strcasecmp(c->argv[j]->ptr, "ack")) {
/* REPLCONF ACK is used by replica to inform the primary the amount
* of replication stream that it processed so far. It is an
Expand Down Expand Up @@ -2632,7 +2644,7 @@ static void fullSyncWithPrimary(connection *conn) {
/* Send replica lisening port to primary for clarification */
sds portstr = getReplicaPortString();
err = sendCommand(conn, "REPLCONF", "capa", "eof", "rdb-only", "1", "rdb-channel", "1", "listening-port",
portstr, NULL);
portstr, "repl-diskless-load", useDisklessLoad() ? "1" : "0", NULL);
sdsfree(portstr);
if (err) {
serverLog(LL_WARNING, "Sending command to primary in dual channel replication handshake: %s", err);
Expand Down Expand Up @@ -3427,6 +3439,11 @@ void syncWithPrimary(connection *conn) {
if (err) goto write_error;
}

if (useDisklessLoad()) {
err = sendCommand(conn, "REPLCONF", "repl-diskless-load", "1", NULL);
if (err) goto write_error;
}

/* Inform the primary of our (replica) capabilities.
*
* EOF: supports EOF-style RDB transfer for diskless replication.
Expand Down
2 changes: 2 additions & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,8 @@ typedef enum {
#define REPLICA_REQ_RDB_EXCLUDE_DATA (1 << 0) /* Exclude data from RDB */
#define REPLICA_REQ_RDB_EXCLUDE_FUNCTIONS (1 << 1) /* Exclude functions from RDB */
#define REPLICA_REQ_RDB_CHANNEL (1 << 2) /* Use dual-channel-replication */
#define REPLICA_REQ_CHKSUM_SKIP (1 << 3) /* Exclude checksum from RDB */

/* Mask of all bits in the replica requirements bitfield that represent non-standard (filtered) RDB requirements */
#define REPLICA_REQ_RDB_MASK (REPLICA_REQ_RDB_EXCLUDE_DATA | REPLICA_REQ_RDB_EXCLUDE_FUNCTIONS)

Expand Down

0 comments on commit c11b6ce

Please sign in to comment.