Skip to content

Commit

Permalink
Add 'WithDictIndex' expiry API and update RANDOMKEY command (valkey-i…
Browse files Browse the repository at this point in the history
…o#1155)

valkey-io#1145

First part of a two-step effort to add `WithSlot` API for expiry. This
PR is to fix a crash that occurs when a RANDOMKEY uses a different slot
than the cached slot of a client during a multi-exec.

The next part will be to utilize the new API as an optimization to
prevent duplicate work when calculating the slot for a key.

---------

Signed-off-by: Nadav Levanoni <[email protected]>
Signed-off-by: Madelyn Olson <[email protected]>
Co-authored-by: Nadav Levanoni <[email protected]>
Co-authored-by: Madelyn Olson <[email protected]>
  • Loading branch information
3 people authored Oct 17, 2024
1 parent 06cfe2c commit 136d0fd
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 53 deletions.
141 changes: 88 additions & 53 deletions src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,13 @@ typedef enum {
KEY_DELETED /* The key was deleted now. */
} keyStatus;

keyStatus expireIfNeededWithDictIndex(serverDb *db, robj *key, int flags, int dict_index);
keyStatus expireIfNeeded(serverDb *db, robj *key, int flags);
int keyIsExpiredWithDictIndex(serverDb *db, robj *key, int dict_index);
int keyIsExpired(serverDb *db, robj *key);
static void dbSetValue(serverDb *db, robj *key, robj *val, int overwrite, dictEntry *de);
static int getKVStoreIndexForKey(sds key);
dictEntry *dbFindExpiresWithDictIndex(serverDb *db, void *key, int dict_index);

/* Update LFU when an object is accessed.
* Firstly, decrement the counter if the decrement time is reached.
Expand Down Expand Up @@ -269,7 +272,7 @@ int getKeySlot(sds key) {
* In this case a copy of `key` is copied in kvstore, the caller must ensure the `key` is properly freed.
*/
int dbAddRDBLoad(serverDb *db, sds key, robj *val) {
int dict_index = server.cluster_enabled ? getKeySlot(key) : 0;
int dict_index = getKVStoreIndexForKey(key);
dictEntry *de = kvstoreDictAddRaw(db->keys, dict_index, key, NULL);
if (de == NULL) return 0;
initObjectLRUOrLFU(val);
Expand Down Expand Up @@ -375,13 +378,13 @@ robj *dbRandomKey(serverDb *db) {
while (1) {
sds key;
robj *keyobj;
int randomSlot = kvstoreGetFairRandomDictIndex(db->keys);
de = kvstoreDictGetFairRandomKey(db->keys, randomSlot);
int randomDictIndex = kvstoreGetFairRandomDictIndex(db->keys);
de = kvstoreDictGetFairRandomKey(db->keys, randomDictIndex);
if (de == NULL) return NULL;

key = dictGetKey(de);
keyobj = createStringObject(key, sdslen(key));
if (dbFindExpires(db, key)) {
if (dbFindExpiresWithDictIndex(db, key, randomDictIndex)) {
if (allvolatile && server.primary_host && --maxtries == 0) {
/* If the DB is composed only of keys with an expire set,
* it could happen that all the keys are already logically
Expand All @@ -393,7 +396,7 @@ robj *dbRandomKey(serverDb *db) {
* return a key name that may be already expired. */
return keyobj;
}
if (expireIfNeeded(db, keyobj, 0) != KEY_VALID) {
if (expireIfNeededWithDictIndex(db, keyobj, 0, randomDictIndex) != KEY_VALID) {
decrRefCount(keyobj);
continue; /* search for another key. This expired. */
}
Expand All @@ -402,11 +405,9 @@ robj *dbRandomKey(serverDb *db) {
}
}

/* Helper for sync and async delete. */
int dbGenericDelete(serverDb *db, robj *key, int async, int flags) {
int dbGenericDeleteWithDictIndex(serverDb *db, robj *key, int async, int flags, int dict_index) {
dictEntry **plink;
int table;
int dict_index = getKVStoreIndexForKey(key->ptr);
dictEntry *de = kvstoreDictTwoPhaseUnlinkFind(db->keys, dict_index, key->ptr, &plink, &table);
if (de) {
robj *val = dictGetVal(de);
Expand Down Expand Up @@ -436,6 +437,12 @@ int dbGenericDelete(serverDb *db, robj *key, int async, int flags) {
}
}

/* Helper for sync and async delete. */
int dbGenericDelete(serverDb *db, robj *key, int async, int flags) {
int dict_index = getKVStoreIndexForKey(key->ptr);
return dbGenericDeleteWithDictIndex(db, key, async, flags, dict_index);
}

/* Delete a key, value, and associated expiration entry if any, from the DB */
int dbSyncDelete(serverDb *db, robj *key) {
return dbGenericDelete(db, key, 0, DB_FLAG_KEY_DELETED);
Expand Down Expand Up @@ -1693,19 +1700,25 @@ void setExpire(client *c, serverDb *db, robj *key, long long when) {

/* Return the expire time of the specified key, or -1 if no expire
* is associated with this key (i.e. the key is non volatile) */
long long getExpire(serverDb *db, robj *key) {
long long getExpireWithDictIndex(serverDb *db, robj *key, int dict_index) {
dictEntry *de;

if ((de = dbFindExpires(db, key->ptr)) == NULL) return -1;
if ((de = dbFindExpiresWithDictIndex(db, key->ptr, dict_index)) == NULL) return -1;

return dictGetSignedIntegerVal(de);
}

/* Delete the specified expired key and propagate expire. */
void deleteExpiredKeyAndPropagate(serverDb *db, robj *keyobj) {
/* Return the expire time of the specified key, or -1 if no expire
* is associated with this key (i.e. the key is non volatile) */
long long getExpire(serverDb *db, robj *key) {
int dict_index = getKVStoreIndexForKey(key->ptr);
return getExpireWithDictIndex(db, key, dict_index);
}

void deleteExpiredKeyAndPropagateWithDictIndex(serverDb *db, robj *keyobj, int dict_index) {
mstime_t expire_latency;
latencyStartMonitor(expire_latency);
dbGenericDelete(db, keyobj, server.lazyfree_lazy_expire, DB_FLAG_KEY_EXPIRED);
dbGenericDeleteWithDictIndex(db, keyobj, server.lazyfree_lazy_expire, DB_FLAG_KEY_EXPIRED, dict_index);
latencyEndMonitor(expire_latency);
latencyAddSampleIfNeeded("expire-del", expire_latency);
notifyKeyspaceEvent(NOTIFY_EXPIRED, "expired", keyobj, db->id);
Expand All @@ -1714,6 +1727,12 @@ void deleteExpiredKeyAndPropagate(serverDb *db, robj *keyobj) {
server.stat_expiredkeys++;
}

/* Delete the specified expired key and propagate expire. */
void deleteExpiredKeyAndPropagate(serverDb *db, robj *keyobj) {
int dict_index = getKVStoreIndexForKey(keyobj->ptr);
deleteExpiredKeyAndPropagateWithDictIndex(db, keyobj, dict_index);
}

/* Delete the specified expired key from overwriting and propagate the DEL or UNLINK. */
void deleteExpiredKeyFromOverwriteAndPropagate(client *c, robj *keyobj) {
int deleted = dbGenericDelete(c->db, keyobj, server.lazyfree_lazy_expire, DB_FLAG_KEY_EXPIRED);
Expand Down Expand Up @@ -1765,12 +1784,11 @@ void propagateDeletion(serverDb *db, robj *key, int lazy) {
decrRefCount(argv[1]);
}

/* Check if the key is expired. */
int keyIsExpired(serverDb *db, robj *key) {
int keyIsExpiredWithDictIndex(serverDb *db, robj *key, int dict_index) {
/* Don't expire anything while loading. It will be done later. */
if (server.loading) return 0;

mstime_t when = getExpire(db, key);
mstime_t when = getExpireWithDictIndex(db, key, dict_index);
mstime_t now;

if (when < 0) return 0; /* No expire for this key */
Expand All @@ -1782,39 +1800,15 @@ int keyIsExpired(serverDb *db, robj *key) {
return now > when;
}

/* This function is called when we are going to perform some operation
* in a given key, but such key may be already logically expired even if
* it still exists in the database. The main way this function is called
* is via lookupKey*() family of functions.
*
* The behavior of the function depends on the replication role of the
* instance, because by default replicas do not delete expired keys. They
* wait for DELs from the primary for consistency matters. However even
* replicas will try to have a coherent return value for the function,
* so that read commands executed in the replica side will be able to
* behave like if the key is expired even if still present (because the
* primary has yet to propagate the DEL).
*
* In primary as a side effect of finding a key which is expired, such
* key will be evicted from the database. Also this may trigger the
* propagation of a DEL/UNLINK command in AOF / replication stream.
*
* On replicas, this function does not delete expired keys by default, but
* it still returns KEY_EXPIRED if the key is logically expired. To force deletion
* of logically expired keys even on replicas, use the EXPIRE_FORCE_DELETE_EXPIRED
* flag. Note though that if the current client is executing
* replicated commands from the primary, keys are never considered expired.
*
* On the other hand, if you just want expiration check, but need to avoid
* the actual key deletion and propagation of the deletion, use the
* EXPIRE_AVOID_DELETE_EXPIRED flag.
*
* The return value of the function is KEY_VALID if the key is still valid.
* The function returns KEY_EXPIRED if the key is expired BUT not deleted,
* or returns KEY_DELETED if the key is expired and deleted. */
keyStatus expireIfNeeded(serverDb *db, robj *key, int flags) {
/* Check if the key is expired. */
int keyIsExpired(serverDb *db, robj *key) {
int dict_index = getKVStoreIndexForKey(key->ptr);
return keyIsExpiredWithDictIndex(db, key, dict_index);
}

keyStatus expireIfNeededWithDictIndex(serverDb *db, robj *key, int flags, int dict_index) {
if (server.lazy_expire_disabled) return KEY_VALID;
if (!keyIsExpired(db, key)) return KEY_VALID;
if (!keyIsExpiredWithDictIndex(db, key, dict_index)) return KEY_VALID;

/* If we are running in the context of a replica, instead of
* evicting the expired key from the database, we return ASAP:
Expand Down Expand Up @@ -1849,13 +1843,48 @@ keyStatus expireIfNeeded(serverDb *db, robj *key, int flags) {
key = createStringObject(key->ptr, sdslen(key->ptr));
}
/* Delete the key */
deleteExpiredKeyAndPropagate(db, key);
deleteExpiredKeyAndPropagateWithDictIndex(db, key, dict_index);
if (static_key) {
decrRefCount(key);
}
return KEY_DELETED;
}

/* This function is called when we are going to perform some operation
* in a given key, but such key may be already logically expired even if
* it still exists in the database. The main way this function is called
* is via lookupKey*() family of functions.
*
* The behavior of the function depends on the replication role of the
* instance, because by default replicas do not delete expired keys. They
* wait for DELs from the primary for consistency matters. However even
* replicas will try to have a coherent return value for the function,
* so that read commands executed in the replica side will be able to
* behave like if the key is expired even if still present (because the
* primary has yet to propagate the DEL).
*
* In primary as a side effect of finding a key which is expired, such
* key will be evicted from the database. Also this may trigger the
* propagation of a DEL/UNLINK command in AOF / replication stream.
*
* On replicas, this function does not delete expired keys by default, but
* it still returns KEY_EXPIRED if the key is logically expired. To force deletion
* of logically expired keys even on replicas, use the EXPIRE_FORCE_DELETE_EXPIRED
* flag. Note though that if the current client is executing
* replicated commands from the primary, keys are never considered expired.
*
* On the other hand, if you just want expiration check, but need to avoid
* the actual key deletion and propagation of the deletion, use the
* EXPIRE_AVOID_DELETE_EXPIRED flag.
*
* The return value of the function is KEY_VALID if the key is still valid.
* The function returns KEY_EXPIRED if the key is expired BUT not deleted,
* or returns KEY_DELETED if the key is expired and deleted. */
keyStatus expireIfNeeded(serverDb *db, robj *key, int flags) {
int dict_index = getKVStoreIndexForKey(key->ptr);
return expireIfNeededWithDictIndex(db, key, flags, dict_index);
}

/* CB passed to kvstoreExpand.
* The purpose is to skip expansion of unused dicts in cluster mode (all
* dicts not mapped to *my* slots) */
Expand Down Expand Up @@ -1897,16 +1926,22 @@ int dbExpandExpires(serverDb *db, uint64_t db_size, int try_expand) {
return dbExpandGeneric(db->expires, db_size, try_expand);
}

static dictEntry *dbFindGeneric(kvstore *kvs, void *key) {
return kvstoreDictFind(kvs, server.cluster_enabled ? getKeySlot(key) : 0, key);
dictEntry *dbFindWithDictIndex(serverDb *db, void *key, int dict_index) {
return kvstoreDictFind(db->keys, dict_index, key);
}

dictEntry *dbFind(serverDb *db, void *key) {
return dbFindGeneric(db->keys, key);
int dict_index = getKVStoreIndexForKey(key);
return dbFindWithDictIndex(db, key, dict_index);
}

dictEntry *dbFindExpiresWithDictIndex(serverDb *db, void *key, int dict_index) {
return kvstoreDictFind(db->expires, dict_index, key);
}

dictEntry *dbFindExpires(serverDb *db, void *key) {
return dbFindGeneric(db->expires, key);
int dict_index = getKVStoreIndexForKey(key);
return dbFindExpiresWithDictIndex(db, key, dict_index);
}

unsigned long long dbSize(serverDb *db) {
Expand Down
12 changes: 12 additions & 0 deletions tests/unit/multi.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -919,3 +919,15 @@ start_server {overrides {appendonly {yes} appendfilename {appendonly.aof} append
r get foo
} {}
}

start_cluster 1 0 {tags {"external:skip cluster"}} {
test "Regression test for multi-exec with RANDOMKEY accessing the wrong per-slot dictionary" {
R 0 SETEX FOO 10000 BAR
R 0 SETEX FIZZ 10000 BUZZ

R 0 MULTI
R 0 DEL FOO
R 0 RANDOMKEY
assert_equal [R 0 EXEC] {1 FIZZ}
}
}

0 comments on commit 136d0fd

Please sign in to comment.