From 136d0fd212dc5fbc1f9ef48123d1fab723ec117b Mon Sep 17 00:00:00 2001 From: Nadav Levanoni <38641521+nadav-levanoni@users.noreply.github.com> Date: Wed, 16 Oct 2024 17:40:11 -0700 Subject: [PATCH] Add 'WithDictIndex' expiry API and update RANDOMKEY command (#1155) https://github.com/valkey-io/valkey/issues/1145 First part of a two-step effort to add `WithSlot` API for expiry. This PR is to fix a crash that occurs when a RANDOMKEY uses a different slot than the cached slot of a client during a multi-exec. The next part will be to utilize the new API as an optimization to prevent duplicate work when calculating the slot for a key. --------- Signed-off-by: Nadav Levanoni Signed-off-by: Madelyn Olson Co-authored-by: Nadav Levanoni Co-authored-by: Madelyn Olson --- src/db.c | 141 +++++++++++++++++++++++++++---------------- tests/unit/multi.tcl | 12 ++++ 2 files changed, 100 insertions(+), 53 deletions(-) diff --git a/src/db.c b/src/db.c index 3493e2d863..00e6e7b2d6 100644 --- a/src/db.c +++ b/src/db.c @@ -52,10 +52,13 @@ typedef enum { KEY_DELETED /* The key was deleted now. */ } keyStatus; +keyStatus expireIfNeededWithDictIndex(serverDb *db, robj *key, int flags, int dict_index); keyStatus expireIfNeeded(serverDb *db, robj *key, int flags); +int keyIsExpiredWithDictIndex(serverDb *db, robj *key, int dict_index); int keyIsExpired(serverDb *db, robj *key); static void dbSetValue(serverDb *db, robj *key, robj *val, int overwrite, dictEntry *de); static int getKVStoreIndexForKey(sds key); +dictEntry *dbFindExpiresWithDictIndex(serverDb *db, void *key, int dict_index); /* Update LFU when an object is accessed. * Firstly, decrement the counter if the decrement time is reached. @@ -269,7 +272,7 @@ int getKeySlot(sds key) { * In this case a copy of `key` is copied in kvstore, the caller must ensure the `key` is properly freed. */ int dbAddRDBLoad(serverDb *db, sds key, robj *val) { - int dict_index = server.cluster_enabled ? getKeySlot(key) : 0; + int dict_index = getKVStoreIndexForKey(key); dictEntry *de = kvstoreDictAddRaw(db->keys, dict_index, key, NULL); if (de == NULL) return 0; initObjectLRUOrLFU(val); @@ -375,13 +378,13 @@ robj *dbRandomKey(serverDb *db) { while (1) { sds key; robj *keyobj; - int randomSlot = kvstoreGetFairRandomDictIndex(db->keys); - de = kvstoreDictGetFairRandomKey(db->keys, randomSlot); + int randomDictIndex = kvstoreGetFairRandomDictIndex(db->keys); + de = kvstoreDictGetFairRandomKey(db->keys, randomDictIndex); if (de == NULL) return NULL; key = dictGetKey(de); keyobj = createStringObject(key, sdslen(key)); - if (dbFindExpires(db, key)) { + if (dbFindExpiresWithDictIndex(db, key, randomDictIndex)) { if (allvolatile && server.primary_host && --maxtries == 0) { /* If the DB is composed only of keys with an expire set, * it could happen that all the keys are already logically @@ -393,7 +396,7 @@ robj *dbRandomKey(serverDb *db) { * return a key name that may be already expired. */ return keyobj; } - if (expireIfNeeded(db, keyobj, 0) != KEY_VALID) { + if (expireIfNeededWithDictIndex(db, keyobj, 0, randomDictIndex) != KEY_VALID) { decrRefCount(keyobj); continue; /* search for another key. This expired. */ } @@ -402,11 +405,9 @@ robj *dbRandomKey(serverDb *db) { } } -/* Helper for sync and async delete. */ -int dbGenericDelete(serverDb *db, robj *key, int async, int flags) { +int dbGenericDeleteWithDictIndex(serverDb *db, robj *key, int async, int flags, int dict_index) { dictEntry **plink; int table; - int dict_index = getKVStoreIndexForKey(key->ptr); dictEntry *de = kvstoreDictTwoPhaseUnlinkFind(db->keys, dict_index, key->ptr, &plink, &table); if (de) { robj *val = dictGetVal(de); @@ -436,6 +437,12 @@ int dbGenericDelete(serverDb *db, robj *key, int async, int flags) { } } +/* Helper for sync and async delete. */ +int dbGenericDelete(serverDb *db, robj *key, int async, int flags) { + int dict_index = getKVStoreIndexForKey(key->ptr); + return dbGenericDeleteWithDictIndex(db, key, async, flags, dict_index); +} + /* Delete a key, value, and associated expiration entry if any, from the DB */ int dbSyncDelete(serverDb *db, robj *key) { return dbGenericDelete(db, key, 0, DB_FLAG_KEY_DELETED); @@ -1693,19 +1700,25 @@ void setExpire(client *c, serverDb *db, robj *key, long long when) { /* Return the expire time of the specified key, or -1 if no expire * is associated with this key (i.e. the key is non volatile) */ -long long getExpire(serverDb *db, robj *key) { +long long getExpireWithDictIndex(serverDb *db, robj *key, int dict_index) { dictEntry *de; - if ((de = dbFindExpires(db, key->ptr)) == NULL) return -1; + if ((de = dbFindExpiresWithDictIndex(db, key->ptr, dict_index)) == NULL) return -1; return dictGetSignedIntegerVal(de); } -/* Delete the specified expired key and propagate expire. */ -void deleteExpiredKeyAndPropagate(serverDb *db, robj *keyobj) { +/* Return the expire time of the specified key, or -1 if no expire + * is associated with this key (i.e. the key is non volatile) */ +long long getExpire(serverDb *db, robj *key) { + int dict_index = getKVStoreIndexForKey(key->ptr); + return getExpireWithDictIndex(db, key, dict_index); +} + +void deleteExpiredKeyAndPropagateWithDictIndex(serverDb *db, robj *keyobj, int dict_index) { mstime_t expire_latency; latencyStartMonitor(expire_latency); - dbGenericDelete(db, keyobj, server.lazyfree_lazy_expire, DB_FLAG_KEY_EXPIRED); + dbGenericDeleteWithDictIndex(db, keyobj, server.lazyfree_lazy_expire, DB_FLAG_KEY_EXPIRED, dict_index); latencyEndMonitor(expire_latency); latencyAddSampleIfNeeded("expire-del", expire_latency); notifyKeyspaceEvent(NOTIFY_EXPIRED, "expired", keyobj, db->id); @@ -1714,6 +1727,12 @@ void deleteExpiredKeyAndPropagate(serverDb *db, robj *keyobj) { server.stat_expiredkeys++; } +/* Delete the specified expired key and propagate expire. */ +void deleteExpiredKeyAndPropagate(serverDb *db, robj *keyobj) { + int dict_index = getKVStoreIndexForKey(keyobj->ptr); + deleteExpiredKeyAndPropagateWithDictIndex(db, keyobj, dict_index); +} + /* Delete the specified expired key from overwriting and propagate the DEL or UNLINK. */ void deleteExpiredKeyFromOverwriteAndPropagate(client *c, robj *keyobj) { int deleted = dbGenericDelete(c->db, keyobj, server.lazyfree_lazy_expire, DB_FLAG_KEY_EXPIRED); @@ -1765,12 +1784,11 @@ void propagateDeletion(serverDb *db, robj *key, int lazy) { decrRefCount(argv[1]); } -/* Check if the key is expired. */ -int keyIsExpired(serverDb *db, robj *key) { +int keyIsExpiredWithDictIndex(serverDb *db, robj *key, int dict_index) { /* Don't expire anything while loading. It will be done later. */ if (server.loading) return 0; - mstime_t when = getExpire(db, key); + mstime_t when = getExpireWithDictIndex(db, key, dict_index); mstime_t now; if (when < 0) return 0; /* No expire for this key */ @@ -1782,39 +1800,15 @@ int keyIsExpired(serverDb *db, robj *key) { return now > when; } -/* This function is called when we are going to perform some operation - * in a given key, but such key may be already logically expired even if - * it still exists in the database. The main way this function is called - * is via lookupKey*() family of functions. - * - * The behavior of the function depends on the replication role of the - * instance, because by default replicas do not delete expired keys. They - * wait for DELs from the primary for consistency matters. However even - * replicas will try to have a coherent return value for the function, - * so that read commands executed in the replica side will be able to - * behave like if the key is expired even if still present (because the - * primary has yet to propagate the DEL). - * - * In primary as a side effect of finding a key which is expired, such - * key will be evicted from the database. Also this may trigger the - * propagation of a DEL/UNLINK command in AOF / replication stream. - * - * On replicas, this function does not delete expired keys by default, but - * it still returns KEY_EXPIRED if the key is logically expired. To force deletion - * of logically expired keys even on replicas, use the EXPIRE_FORCE_DELETE_EXPIRED - * flag. Note though that if the current client is executing - * replicated commands from the primary, keys are never considered expired. - * - * On the other hand, if you just want expiration check, but need to avoid - * the actual key deletion and propagation of the deletion, use the - * EXPIRE_AVOID_DELETE_EXPIRED flag. - * - * The return value of the function is KEY_VALID if the key is still valid. - * The function returns KEY_EXPIRED if the key is expired BUT not deleted, - * or returns KEY_DELETED if the key is expired and deleted. */ -keyStatus expireIfNeeded(serverDb *db, robj *key, int flags) { +/* Check if the key is expired. */ +int keyIsExpired(serverDb *db, robj *key) { + int dict_index = getKVStoreIndexForKey(key->ptr); + return keyIsExpiredWithDictIndex(db, key, dict_index); +} + +keyStatus expireIfNeededWithDictIndex(serverDb *db, robj *key, int flags, int dict_index) { if (server.lazy_expire_disabled) return KEY_VALID; - if (!keyIsExpired(db, key)) return KEY_VALID; + if (!keyIsExpiredWithDictIndex(db, key, dict_index)) return KEY_VALID; /* If we are running in the context of a replica, instead of * evicting the expired key from the database, we return ASAP: @@ -1849,13 +1843,48 @@ keyStatus expireIfNeeded(serverDb *db, robj *key, int flags) { key = createStringObject(key->ptr, sdslen(key->ptr)); } /* Delete the key */ - deleteExpiredKeyAndPropagate(db, key); + deleteExpiredKeyAndPropagateWithDictIndex(db, key, dict_index); if (static_key) { decrRefCount(key); } return KEY_DELETED; } +/* This function is called when we are going to perform some operation + * in a given key, but such key may be already logically expired even if + * it still exists in the database. The main way this function is called + * is via lookupKey*() family of functions. + * + * The behavior of the function depends on the replication role of the + * instance, because by default replicas do not delete expired keys. They + * wait for DELs from the primary for consistency matters. However even + * replicas will try to have a coherent return value for the function, + * so that read commands executed in the replica side will be able to + * behave like if the key is expired even if still present (because the + * primary has yet to propagate the DEL). + * + * In primary as a side effect of finding a key which is expired, such + * key will be evicted from the database. Also this may trigger the + * propagation of a DEL/UNLINK command in AOF / replication stream. + * + * On replicas, this function does not delete expired keys by default, but + * it still returns KEY_EXPIRED if the key is logically expired. To force deletion + * of logically expired keys even on replicas, use the EXPIRE_FORCE_DELETE_EXPIRED + * flag. Note though that if the current client is executing + * replicated commands from the primary, keys are never considered expired. + * + * On the other hand, if you just want expiration check, but need to avoid + * the actual key deletion and propagation of the deletion, use the + * EXPIRE_AVOID_DELETE_EXPIRED flag. + * + * The return value of the function is KEY_VALID if the key is still valid. + * The function returns KEY_EXPIRED if the key is expired BUT not deleted, + * or returns KEY_DELETED if the key is expired and deleted. */ +keyStatus expireIfNeeded(serverDb *db, robj *key, int flags) { + int dict_index = getKVStoreIndexForKey(key->ptr); + return expireIfNeededWithDictIndex(db, key, flags, dict_index); +} + /* CB passed to kvstoreExpand. * The purpose is to skip expansion of unused dicts in cluster mode (all * dicts not mapped to *my* slots) */ @@ -1897,16 +1926,22 @@ int dbExpandExpires(serverDb *db, uint64_t db_size, int try_expand) { return dbExpandGeneric(db->expires, db_size, try_expand); } -static dictEntry *dbFindGeneric(kvstore *kvs, void *key) { - return kvstoreDictFind(kvs, server.cluster_enabled ? getKeySlot(key) : 0, key); +dictEntry *dbFindWithDictIndex(serverDb *db, void *key, int dict_index) { + return kvstoreDictFind(db->keys, dict_index, key); } dictEntry *dbFind(serverDb *db, void *key) { - return dbFindGeneric(db->keys, key); + int dict_index = getKVStoreIndexForKey(key); + return dbFindWithDictIndex(db, key, dict_index); +} + +dictEntry *dbFindExpiresWithDictIndex(serverDb *db, void *key, int dict_index) { + return kvstoreDictFind(db->expires, dict_index, key); } dictEntry *dbFindExpires(serverDb *db, void *key) { - return dbFindGeneric(db->expires, key); + int dict_index = getKVStoreIndexForKey(key); + return dbFindExpiresWithDictIndex(db, key, dict_index); } unsigned long long dbSize(serverDb *db) { diff --git a/tests/unit/multi.tcl b/tests/unit/multi.tcl index dafbc66c10..ffdaa3edd4 100644 --- a/tests/unit/multi.tcl +++ b/tests/unit/multi.tcl @@ -919,3 +919,15 @@ start_server {overrides {appendonly {yes} appendfilename {appendonly.aof} append r get foo } {} } + +start_cluster 1 0 {tags {"external:skip cluster"}} { + test "Regression test for multi-exec with RANDOMKEY accessing the wrong per-slot dictionary" { + R 0 SETEX FOO 10000 BAR + R 0 SETEX FIZZ 10000 BUZZ + + R 0 MULTI + R 0 DEL FOO + R 0 RANDOMKEY + assert_equal [R 0 EXEC] {1 FIZZ} + } +}