Skip to content

Commit

Permalink
Merge branch 'valkey-io:unstable' into duplicate-options
Browse files Browse the repository at this point in the history
  • Loading branch information
Shivshankar-Reddy authored Sep 11, 2024
2 parents 7f3e967 + fa348e2 commit ef1de4b
Show file tree
Hide file tree
Showing 14 changed files with 121 additions and 53 deletions.
14 changes: 14 additions & 0 deletions .git-blame-ignore-revs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# This is a file that can be used by git-blame to ignore some revisions.
# (git 2.23+, released in August 2019)
#
# Can be configured as follow:
#
# $ git config blame.ignoreRevsFile .git-blame-ignore-revs
#
# For more information you can look at git-blame(1) man page.

# Applied clang-format (#323)
c41dd77a3e93e02be3c4bc75d8c76b7b4169a4ce

# Removed terms `master` and `slave` from the source code (#591)
54c97479356ecf41b4b63733494a1be2ab919e17
1 change: 1 addition & 0 deletions runtest-moduleapi
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,5 @@ $TCLSH tests/test_helper.tcl \
--single unit/moduleapi/moduleauth \
--single unit/moduleapi/rdbloadsave \
--single unit/moduleapi/crash \
--single unit/moduleapi/getchannels \
"${@}"
9 changes: 8 additions & 1 deletion src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -3156,14 +3156,21 @@ int clusterProcessPacket(clusterLink *link) {
/* Add this node if it is new for us and the msg type is MEET.
* In this stage we don't try to add the node with the right
* flags, replicaof pointer, and so forth, as this details will be
* resolved when we'll receive PONGs from the node. */
* resolved when we'll receive PONGs from the node. The exception
* to this is the flag that indicates extensions are supported, as
* we want to send extensions right away in the return PONG in order
* to reduce the amount of time needed to stabilize the shard ID. */
if (!sender && type == CLUSTERMSG_TYPE_MEET) {
clusterNode *node;

node = createClusterNode(NULL, CLUSTER_NODE_HANDSHAKE);
serverAssert(nodeIp2String(node->ip, link, hdr->myip) == C_OK);
getClientPortFromClusterMsg(hdr, &node->tls_port, &node->tcp_port);
node->cport = ntohs(hdr->cport);
if (hdr->mflags[0] & CLUSTERMSG_FLAG0_EXT_DATA) {
node->flags |= CLUSTER_NODE_EXTENSIONS_SUPPORTED;
}
setClusterNodeToInboundClusterLink(node, link);
clusterAddNode(node);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
}
Expand Down
62 changes: 32 additions & 30 deletions src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ typedef enum {
keyStatus expireIfNeeded(serverDb *db, robj *key, int flags);
int keyIsExpired(serverDb *db, robj *key);
static void dbSetValue(serverDb *db, robj *key, robj *val, int overwrite, dictEntry *de);
static int getKVStoreIndexForKey(sds key);

/* Update LFU when an object is accessed.
* Firstly, decrement the counter if the decrement time is reached.
Expand Down Expand Up @@ -125,7 +126,7 @@ robj *lookupKey(serverDb *db, robj *key, int flags) {
if (!hasActiveChildProcess() && !(flags & LOOKUP_NOTOUCH)) {
if (!canUseSharedObject() && val->refcount == OBJ_SHARED_REFCOUNT) {
val = dupStringObject(val);
kvstoreDictSetVal(db->keys, getKeySlot(key->ptr), de, val);
kvstoreDictSetVal(db->keys, getKVStoreIndexForKey(key->ptr), de, val);
}
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
updateLFU(val);
Expand Down Expand Up @@ -202,15 +203,15 @@ robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) {
* if the key already exists, otherwise, it can fall back to dbOverwrite. */
static void dbAddInternal(serverDb *db, robj *key, robj *val, int update_if_existing) {
dictEntry *existing;
int slot = getKeySlot(key->ptr);
dictEntry *de = kvstoreDictAddRaw(db->keys, slot, key->ptr, &existing);
int dict_index = getKVStoreIndexForKey(key->ptr);
dictEntry *de = kvstoreDictAddRaw(db->keys, dict_index, key->ptr, &existing);
if (update_if_existing && existing) {
dbSetValue(db, key, val, 1, existing);
return;
}
serverAssertWithInfo(NULL, key, de != NULL);
initObjectLRUOrLFU(val);
kvstoreDictSetVal(db->keys, slot, de, val);
kvstoreDictSetVal(db->keys, dict_index, de, val);
signalKeyAsReady(db, key, val->type);
notifyKeyspaceEvent(NOTIFY_NEW, "new", key, db->id);
}
Expand All @@ -219,32 +220,33 @@ void dbAdd(serverDb *db, robj *key, robj *val) {
dbAddInternal(db, key, val, 0);
}

/* Returns key's hash slot when cluster mode is enabled, or 0 when disabled.
* The only difference between this function and getKeySlot, is that it's not using cached key slot from the
* current_client and always calculates CRC hash. This is useful when slot needs to be calculated for a key that user
* didn't request for, such as in case of eviction. */
int calculateKeySlot(sds key) {
return server.cluster_enabled ? keyHashSlot(key, (int)sdslen(key)) : 0;
/* Returns which dict index should be used with kvstore for a given key. */
static int getKVStoreIndexForKey(sds key) {
return server.cluster_enabled ? getKeySlot(key) : 0;
}

/* Return slot-specific dictionary for key based on key's hash slot when cluster mode is enabled, else 0.*/
/* Returns the cluster hash slot for a given key, trying to use the cached slot that
* stored on the server.current_client first. If there is no cached value, it will compute the hash slot
* and then cache the value.*/
int getKeySlot(sds key) {
serverAssert(server.cluster_enabled);
/* This is performance optimization that uses pre-set slot id from the current command,
* in order to avoid calculation of the key hash.
*
* This optimization is only used when current_client flag `CLIENT_EXECUTING_COMMAND` is set.
* It only gets set during the execution of command under `call` method. Other flows requesting
* the key slot would fallback to calculateKeySlot.
* the key slot would fallback to keyHashSlot.
*
* Modules and scripts executed on the primary may get replicated as multi-execs that operate on multiple slots,
* so we must always recompute the slot for commands coming from the primary.
*/
if (server.current_client && server.current_client->slot >= 0 && server.current_client->flag.executing_command &&
!server.current_client->flag.primary) {
debugServerAssertWithInfo(server.current_client, NULL, calculateKeySlot(key) == server.current_client->slot);
debugServerAssertWithInfo(server.current_client, NULL,
(int)keyHashSlot(key, (int)sdslen(key)) == server.current_client->slot);
return server.current_client->slot;
}
int slot = calculateKeySlot(key);
int slot = keyHashSlot(key, (int)sdslen(key));
/* For the case of replicated commands from primary, getNodeByQuery() never gets called,
* and thus c->slot never gets populated. That said, if this command ends up accessing a key,
* we are able to backfill c->slot here, where the key's hash calculation is made. */
Expand All @@ -267,11 +269,11 @@ int getKeySlot(sds key) {
* In this case a copy of `key` is copied in kvstore, the caller must ensure the `key` is properly freed.
*/
int dbAddRDBLoad(serverDb *db, sds key, robj *val) {
int slot = getKeySlot(key);
dictEntry *de = kvstoreDictAddRaw(db->keys, slot, key, NULL);
int dict_index = server.cluster_enabled ? getKeySlot(key) : 0;
dictEntry *de = kvstoreDictAddRaw(db->keys, dict_index, key, NULL);
if (de == NULL) return 0;
initObjectLRUOrLFU(val);
kvstoreDictSetVal(db->keys, slot, de, val);
kvstoreDictSetVal(db->keys, dict_index, de, val);
return 1;
}

Expand All @@ -288,8 +290,8 @@ int dbAddRDBLoad(serverDb *db, sds key, robj *val) {
*
* The program is aborted if the key was not already present. */
static void dbSetValue(serverDb *db, robj *key, robj *val, int overwrite, dictEntry *de) {
int slot = getKeySlot(key->ptr);
if (!de) de = kvstoreDictFind(db->keys, slot, key->ptr);
int dict_index = getKVStoreIndexForKey(key->ptr);
if (!de) de = kvstoreDictFind(db->keys, dict_index, key->ptr);
serverAssertWithInfo(NULL, key, de != NULL);
robj *old = dictGetVal(de);

Expand All @@ -309,7 +311,7 @@ static void dbSetValue(serverDb *db, robj *key, robj *val, int overwrite, dictEn
/* Because of RM_StringDMA, old may be changed, so we need get old again */
old = dictGetVal(de);
}
kvstoreDictSetVal(db->keys, slot, de, val);
kvstoreDictSetVal(db->keys, dict_index, de, val);
/* For efficiency, let the I/O thread that allocated an object also deallocate it. */
if (tryOffloadFreeObjToIOThreads(old) == C_OK) {
/* OK */
Expand Down Expand Up @@ -404,8 +406,8 @@ robj *dbRandomKey(serverDb *db) {
int dbGenericDelete(serverDb *db, robj *key, int async, int flags) {
dictEntry **plink;
int table;
int slot = getKeySlot(key->ptr);
dictEntry *de = kvstoreDictTwoPhaseUnlinkFind(db->keys, slot, key->ptr, &plink, &table);
int dict_index = getKVStoreIndexForKey(key->ptr);
dictEntry *de = kvstoreDictTwoPhaseUnlinkFind(db->keys, dict_index, key->ptr, &plink, &table);
if (de) {
robj *val = dictGetVal(de);
/* RM_StringDMA may call dbUnshareStringValue which may free val, so we
Expand All @@ -421,13 +423,13 @@ int dbGenericDelete(serverDb *db, robj *key, int async, int flags) {
if (async) {
/* Because of dbUnshareStringValue, the val in de may change. */
freeObjAsync(key, dictGetVal(de), db->id);
kvstoreDictSetVal(db->keys, slot, de, NULL);
kvstoreDictSetVal(db->keys, dict_index, de, NULL);
}
/* Deleting an entry from the expires dict will not free the sds of
* the key, because it is shared with the main dictionary. */
kvstoreDictDelete(db->expires, slot, key->ptr);
kvstoreDictDelete(db->expires, dict_index, key->ptr);

kvstoreDictTwoPhaseUnlinkFree(db->keys, slot, de, plink, table);
kvstoreDictTwoPhaseUnlinkFree(db->keys, dict_index, de, plink, table);
return 1;
} else {
return 0;
Expand Down Expand Up @@ -1664,7 +1666,7 @@ void swapdbCommand(client *c) {
*----------------------------------------------------------------------------*/

int removeExpire(serverDb *db, robj *key) {
return kvstoreDictDelete(db->expires, getKeySlot(key->ptr), key->ptr) == DICT_OK;
return kvstoreDictDelete(db->expires, getKVStoreIndexForKey(key->ptr), key->ptr) == DICT_OK;
}

/* Set an expire to the specified key. If the expire is set in the context
Expand All @@ -1675,10 +1677,10 @@ void setExpire(client *c, serverDb *db, robj *key, long long when) {
dictEntry *kde, *de, *existing;

/* Reuse the sds from the main dict in the expire dict */
int slot = getKeySlot(key->ptr);
kde = kvstoreDictFind(db->keys, slot, key->ptr);
int dict_index = getKVStoreIndexForKey(key->ptr);
kde = kvstoreDictFind(db->keys, dict_index, key->ptr);
serverAssertWithInfo(NULL, key, kde != NULL);
de = kvstoreDictAddRaw(db->expires, slot, dictGetKey(kde), &existing);
de = kvstoreDictAddRaw(db->expires, dict_index, dictGetKey(kde), &existing);
if (existing) {
dictSetSignedIntegerVal(existing, when);
} else {
Expand Down Expand Up @@ -1896,7 +1898,7 @@ int dbExpandExpires(serverDb *db, uint64_t db_size, int try_expand) {
}

static dictEntry *dbFindGeneric(kvstore *kvs, void *key) {
return kvstoreDictFind(kvs, getKeySlot(key), key);
return kvstoreDictFind(kvs, server.cluster_enabled ? getKeySlot(key) : 0, key);
}

dictEntry *dbFind(serverDb *db, void *key) {
Expand Down
6 changes: 6 additions & 0 deletions src/kvstore.c
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,12 @@ kvstore *kvstoreCreate(dictType *type, int num_dicts_bits, int flags) {
* for the dict cursor, see kvstoreScan */
assert(num_dicts_bits <= 16);

/* The dictType of kvstore needs to use the specific callbacks.
* If there are any changes in the future, it will need to be modified. */
assert(type->rehashingStarted == kvstoreDictRehashingStarted);
assert(type->rehashingCompleted == kvstoreDictRehashingCompleted);
assert(type->dictMetadataBytes == kvstoreDictMetadataSize);

kvstore *kvs = zcalloc(sizeof(*kvs));
kvs->dtype = type;
kvs->flags = flags;
Expand Down
8 changes: 7 additions & 1 deletion src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -12965,6 +12965,9 @@ int VM_RdbLoad(ValkeyModuleCtx *ctx, ValkeyModuleRdbStream *stream, int flags) {
disconnectReplicas();
freeReplicationBacklog();

/* Stop and kill existing AOF rewriting fork as it is saving outdated data,
* we will re-enable it after the rdbLoad. Also killing it will prevent COW
* memory issue. */
if (server.aof_state != AOF_OFF) stopAppendOnly();

/* Kill existing RDB fork as it is saving outdated data. Also killing it
Expand All @@ -12983,7 +12986,10 @@ int VM_RdbLoad(ValkeyModuleCtx *ctx, ValkeyModuleRdbStream *stream, int flags) {
int ret = rdbLoad(stream->data.filename, NULL, RDBFLAGS_NONE);

if (server.current_client) unprotectClient(server.current_client);
if (server.aof_state != AOF_OFF) startAppendOnly();

/* Here we need to decide whether to enable the AOF based on the aof_enabled,
* since the previous stopAppendOnly sets aof_state to AOF_OFF. */
if (server.aof_enabled) startAppendOnly();

if (ret != RDB_OK) {
errno = (ret == RDB_NOT_EXIST) ? ENOENT : EIO;
Expand Down
12 changes: 7 additions & 5 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -2384,8 +2384,9 @@ parseResult handleParseResults(client *c) {

/* Process the completion of an IO write operation for a client.
* This function handles various post-write tasks, including updating client state,
* allow_async_writes - A flag indicating whether I/O threads can handle pending writes for this client.
* returns 1 if processing completed successfully, 0 if processing is skipped. */
int processClientIOWriteDone(client *c) {
int processClientIOWriteDone(client *c, int allow_async_writes) {
/* memory barrier acquire to get the latest client state */
atomic_thread_fence(memory_order_acquire);
/* If a client is protected, don't proceed to check the write results as it may trigger conn close. */
Expand Down Expand Up @@ -2414,7 +2415,7 @@ int processClientIOWriteDone(client *c) {
installClientWriteHandler(c);
} else {
/* If we can send the client to the I/O thread, let it handle the write. */
if (trySendWriteToIOThreads(c) == C_OK) return 1;
if (allow_async_writes && trySendWriteToIOThreads(c) == C_OK) return 1;
/* Try again in the next eventloop */
putClientInPendingWriteQueue(c);
}
Expand Down Expand Up @@ -2442,7 +2443,7 @@ int processIOThreadsWriteDone(void) {
/* Client is still waiting for a pending I/O - skip it */
if (c->io_write_state == CLIENT_PENDING_IO || c->io_read_state == CLIENT_PENDING_IO) continue;

processed += processClientIOWriteDone(c);
processed += processClientIOWriteDone(c, 1);
}

return processed;
Expand Down Expand Up @@ -4663,7 +4664,8 @@ int processIOThreadsReadDone(void) {
if (c->io_write_state == CLIENT_PENDING_IO || c->io_read_state == CLIENT_PENDING_IO) continue;
/* If the write job is done, process it ASAP to free the buffer and handle connection errors */
if (c->io_write_state == CLIENT_COMPLETED_IO) {
processClientIOWriteDone(c);
int allow_async_writes = 0; /* Don't send writes for the client to IO threads before processing the reads */
processClientIOWriteDone(c, allow_async_writes);
}
/* memory barrier acquire to get the updated client state */
atomic_thread_fence(memory_order_acquire);
Expand Down Expand Up @@ -4825,7 +4827,7 @@ void ioThreadReadQueryFromClient(void *data) {
int numkeys = getKeysFromCommand(c->io_parsed_cmd, c->argv, c->argc, &result);
if (numkeys) {
robj *first_key = c->argv[result.keys[0].pos];
c->slot = calculateKeySlot(first_key->ptr);
c->slot = keyHashSlot(first_key->ptr, sdslen(first_key->ptr));
}
getKeysFreeResult(&result);
}
Expand Down
3 changes: 2 additions & 1 deletion src/pubsub.c
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,8 @@ void pubsubCommand(client *c) {
int j;
addReplyArrayLen(c, (c->argc - 2) * 2);
for (j = 2; j < c->argc; j++) {
unsigned int slot = calculateKeySlot(c->argv[j]->ptr);
sds key = c->argv[j]->ptr;
unsigned int slot = server.cluster_enabled ? keyHashSlot(key, (int)sdslen(key)) : 0;
dict *clients = kvstoreDictFetchValue(server.pubsubshard_channels, slot, c->argv[j]);

addReplyBulk(c, c->argv[j]);
Expand Down
3 changes: 2 additions & 1 deletion src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -2805,7 +2805,8 @@ void bufferReplData(connection *conn) {
remaining_bytes = readIntoReplDataBlock(conn, tail, remaining_bytes);
}
if (readlen && remaining_bytes == 0) {
if (server.pending_repl_data.len > server.client_obuf_limits[CLIENT_TYPE_REPLICA].hard_limit_bytes) {
if (server.client_obuf_limits[CLIENT_TYPE_REPLICA].hard_limit_bytes &&
server.pending_repl_data.len > server.client_obuf_limits[CLIENT_TYPE_REPLICA].hard_limit_bytes) {
serverLog(LL_NOTICE, "Replication buffer limit reached, stopping buffering.");
/* Stop accumulating primary commands. */
connSetReadHandler(conn, NULL);
Expand Down
24 changes: 23 additions & 1 deletion src/t_set.c
Original file line number Diff line number Diff line change
Expand Up @@ -1445,6 +1445,7 @@ void sunionDiffGenericCommand(client *c, robj **setkeys, int setnum, robj *dstke
robj **sets = zmalloc(sizeof(robj *) * setnum);
setTypeIterator *si;
robj *dstset = NULL;
int dstset_encoding = OBJ_ENCODING_INTSET;
char *str;
size_t len;
int64_t llval;
Expand All @@ -1463,6 +1464,23 @@ void sunionDiffGenericCommand(client *c, robj **setkeys, int setnum, robj *dstke
zfree(sets);
return;
}
/* For a SET's encoding, according to the factory method setTypeCreate(), currently have 3 types:
* 1. OBJ_ENCODING_INTSET
* 2. OBJ_ENCODING_LISTPACK
* 3. OBJ_ENCODING_HT
* 'dstset_encoding' is used to determine which kind of encoding to use when initialize 'dstset'.
*
* If all sets are all OBJ_ENCODING_INTSET encoding or 'dstkey' is not null, keep 'dstset'
* OBJ_ENCODING_INTSET encoding when initialize. Otherwise it is not efficient to create the 'dstset'
* from intset and then convert to listpack or hashtable.
*
* If one of the set is OBJ_ENCODING_LISTPACK, let's set 'dstset' to hashtable default encoding,
* the hashtable is more efficient when find and compare than the listpack. The corresponding
* time complexity are O(1) vs O(n). */
if (!dstkey && dstset_encoding == OBJ_ENCODING_INTSET &&
(setobj->encoding == OBJ_ENCODING_LISTPACK || setobj->encoding == OBJ_ENCODING_HT)) {
dstset_encoding = OBJ_ENCODING_HT;
}
sets[j] = setobj;
if (j > 0 && sets[0] == sets[j]) {
sameset = 1;
Expand Down Expand Up @@ -1504,7 +1522,11 @@ void sunionDiffGenericCommand(client *c, robj **setkeys, int setnum, robj *dstke
/* We need a temp set object to store our union/diff. If the dstkey
* is not NULL (that is, we are inside an SUNIONSTORE/SDIFFSTORE operation) then
* this set object will be the resulting object to set into the target key*/
dstset = createIntsetObject();
if (dstset_encoding == OBJ_ENCODING_INTSET) {
dstset = createIntsetObject();
} else {
dstset = createSetObject();
}

if (op == SET_OP_UNION) {
/* Union is trivial, just add every element of every set to the
Expand Down
3 changes: 2 additions & 1 deletion tests/integration/replication-buffer.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,9 @@ start_server {} {
# with master.
$master config set repl-timeout 1000
$replica1 config set repl-timeout 1000
$replica1 config set client-output-buffer-limit "replica 1024 0 0"
$replica2 config set repl-timeout 1000
$replica2 config set client-output-buffer-limit "replica 0 0 0"
$replica2 config set client-output-buffer-limit "replica 1024 0 0"
$replica2 config set dual-channel-replication-enabled $dualchannel

$replica1 replicaof $master_host $master_port
Expand Down
Loading

0 comments on commit ef1de4b

Please sign in to comment.