Skip to content

Commit

Permalink
Require C11 atomics (valkey-io#490)
Browse files Browse the repository at this point in the history
- Replaces custom atomics logic with C11 default atomics logic.
- Drops  "atomicvar_api" field from server info

Closes valkey-io#485

---------

Signed-off-by: adetunjii <[email protected]>
Signed-off-by: Samuel Adetunji <[email protected]>
Co-authored-by: teej4y <[email protected]>
  • Loading branch information
adetunjii and teej4y authored May 26, 2024
1 parent 1c55f3c commit 5d0f4bc
Show file tree
Hide file tree
Showing 15 changed files with 318 additions and 332 deletions.
22 changes: 11 additions & 11 deletions src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -946,7 +946,7 @@ void stopAppendOnly(void) {
server.aof_last_incr_size = 0;
server.aof_last_incr_fsync_offset = 0;
server.fsynced_reploff = -1;
atomicSet(server.fsynced_reploff_pending, 0);
atomic_store_explicit(&server.fsynced_reploff_pending, 0, memory_order_relaxed);
killAppendOnlyChild();
sdsfree(server.aof_buf);
server.aof_buf = sdsempty();
Expand Down Expand Up @@ -985,11 +985,11 @@ int startAppendOnly(void) {
}
server.aof_last_fsync = server.mstime;
/* If AOF fsync error in bio job, we just ignore it and log the event. */
int aof_bio_fsync_status;
atomicGet(server.aof_bio_fsync_status, aof_bio_fsync_status);
int aof_bio_fsync_status = atomic_load_explicit(&server.aof_bio_fsync_status, memory_order_relaxed);
if (aof_bio_fsync_status == C_ERR) {
serverLog(LL_WARNING, "AOF reopen, just ignore the AOF fsync error in bio job");
atomicSet(server.aof_bio_fsync_status, C_OK);
serverLog(LL_WARNING,
"AOF reopen, just ignore the AOF fsync error in bio job");
atomic_store_explicit(&server.aof_bio_fsync_status, C_OK, memory_order_relaxed);
}

/* If AOF was in error state, we just ignore it and log the event. */
Expand Down Expand Up @@ -1074,7 +1074,7 @@ void flushAppendOnlyFile(int force) {
* (because there's no reason, from the AOF POV, to call fsync) and then WAITAOF may wait on
* the higher offset (which contains data that was only propagated to replicas, and not to AOF) */
if (!sync_in_progress && server.aof_fsync != AOF_FSYNC_NO)
atomicSet(server.fsynced_reploff_pending, server.master_repl_offset);
atomic_store_explicit(&server.fsynced_reploff_pending, server.master_repl_offset, memory_order_relaxed);
return;
}
}
Expand Down Expand Up @@ -1244,8 +1244,9 @@ void flushAppendOnlyFile(int force) {
latencyAddSampleIfNeeded("aof-fsync-always", latency);
server.aof_last_incr_fsync_offset = server.aof_last_incr_size;
server.aof_last_fsync = server.mstime;
atomicSet(server.fsynced_reploff_pending, server.master_repl_offset);
} else if (server.aof_fsync == AOF_FSYNC_EVERYSEC && server.mstime - server.aof_last_fsync >= 1000) {
atomic_store_explicit(&server.fsynced_reploff_pending, server.master_repl_offset, memory_order_relaxed);
} else if (server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.mstime - server.aof_last_fsync >= 1000) {
if (!sync_in_progress) {
aof_background_fsync(server.aof_fd);
server.aof_last_incr_fsync_offset = server.aof_last_incr_size;
Expand Down Expand Up @@ -2409,7 +2410,7 @@ int rewriteAppendOnlyFileBackground(void) {

/* Set the initial repl_offset, which will be applied to fsynced_reploff
* when AOFRW finishes (after possibly being updated by a bio thread) */
atomicSet(server.fsynced_reploff_pending, server.master_repl_offset);
atomic_store_explicit(&server.fsynced_reploff_pending, server.master_repl_offset, memory_order_relaxed);
server.fsynced_reploff = 0;
}

Expand Down Expand Up @@ -2647,8 +2648,7 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
/* Update the fsynced replication offset that just now become valid.
* This could either be the one we took in startAppendOnly, or a
* newer one set by the bio thread. */
long long fsynced_reploff_pending;
atomicGet(server.fsynced_reploff_pending, fsynced_reploff_pending);
long long fsynced_reploff_pending = atomic_load_explicit(&server.fsynced_reploff_pending, memory_order_relaxed);
server.fsynced_reploff = fsynced_reploff_pending;
}

Expand Down
17 changes: 10 additions & 7 deletions src/bio.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@

#include "server.h"
#include "bio.h"
#include <stdatomic.h>

static char *bio_worker_title[] = {
"bio_close_file",
Expand Down Expand Up @@ -256,17 +257,19 @@ void *bioProcessBackgroundJobs(void *arg) {
/* The fd may be closed by main thread and reused for another
* socket, pipe, or file. We just ignore these errno because
* aof fsync did not really fail. */
if (valkey_fsync(job->fd_args.fd) == -1 && errno != EBADF && errno != EINVAL) {
int last_status;
atomicGet(server.aof_bio_fsync_status, last_status);
atomicSet(server.aof_bio_fsync_status, C_ERR);
atomicSet(server.aof_bio_fsync_errno, errno);
if (valkey_fsync(job->fd_args.fd) == -1 &&
errno != EBADF && errno != EINVAL)
{
int last_status = atomic_load_explicit(&server.aof_bio_fsync_status, memory_order_relaxed);

atomic_store_explicit(&server.aof_bio_fsync_errno, errno, memory_order_relaxed);
atomic_store_explicit(&server.aof_bio_fsync_status, C_ERR, memory_order_release);
if (last_status == C_OK) {
serverLog(LL_WARNING, "Fail to fsync the AOF file: %s", strerror(errno));
}
} else {
atomicSet(server.aof_bio_fsync_status, C_OK);
atomicSet(server.fsynced_reploff_pending, job->fd_args.offset);
atomic_store_explicit(&server.aof_bio_fsync_status, C_OK, memory_order_relaxed);
atomic_store_explicit(&server.fsynced_reploff_pending, job->fd_args.offset, memory_order_relaxed);
}

if (job->fd_args.need_reclaim_cache) {
Expand Down
1 change: 0 additions & 1 deletion src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@

#include "server.h"
#include "cluster.h"
#include "atomicvar.h"
#include "latency.h"
#include "script.h"
#include "functions.h"
Expand Down
1 change: 0 additions & 1 deletion src/evict.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@

#include "server.h"
#include "bio.h"
#include "atomicvar.h"
#include "script.h"
#include <math.h>

Expand Down
1 change: 0 additions & 1 deletion src/functions.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
#include "sds.h"
#include "dict.h"
#include "adlist.h"
#include "atomicvar.h"

#define LOAD_TIMEOUT_MS 500

Expand Down
73 changes: 37 additions & 36 deletions src/lazyfree.c
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
#include "server.h"
#include "bio.h"
#include "atomicvar.h"
#include "functions.h"
#include "cluster.h"

static serverAtomic size_t lazyfree_objects = 0;
static serverAtomic size_t lazyfreed_objects = 0;
#include <stdatomic.h>

static _Atomic size_t lazyfree_objects = 0;
static _Atomic size_t lazyfreed_objects = 0;

/* Release objects from the lazyfree thread. It's just decrRefCount()
* updating the count of objects to release. */
void lazyfreeFreeObject(void *args[]) {
robj *o = (robj *)args[0];
decrRefCount(o);
atomicDecr(lazyfree_objects, 1);
atomicIncr(lazyfreed_objects, 1);
atomic_fetch_sub_explicit(&lazyfree_objects,1,memory_order_relaxed);
atomic_fetch_add_explicit(&lazyfreed_objects,1,memory_order_relaxed);
}

/* Release a database from the lazyfree thread. The 'db' pointer is the
Expand All @@ -26,26 +27,26 @@ void lazyfreeFreeDatabase(void *args[]) {
size_t numkeys = kvstoreSize(da1);
kvstoreRelease(da1);
kvstoreRelease(da2);
atomicDecr(lazyfree_objects, numkeys);
atomicIncr(lazyfreed_objects, numkeys);
atomic_fetch_sub_explicit(&lazyfree_objects,numkeys,memory_order_relaxed);
atomic_fetch_add_explicit(&lazyfreed_objects,numkeys,memory_order_relaxed);
}

/* Release the key tracking table. */
void lazyFreeTrackingTable(void *args[]) {
rax *rt = args[0];
size_t len = rt->numele;
freeTrackingRadixTree(rt);
atomicDecr(lazyfree_objects, len);
atomicIncr(lazyfreed_objects, len);
atomic_fetch_sub_explicit(&lazyfree_objects,len,memory_order_relaxed);
atomic_fetch_add_explicit(&lazyfreed_objects,len,memory_order_relaxed);
}

/* Release the error stats rax tree. */
void lazyFreeErrors(void *args[]) {
rax *errors = args[0];
size_t len = errors->numele;
raxFreeWithCallback(errors, zfree);
atomicDecr(lazyfree_objects, len);
atomicIncr(lazyfreed_objects, len);
atomic_fetch_sub_explicit(&lazyfree_objects,len,memory_order_relaxed);
atomic_fetch_add_explicit(&lazyfreed_objects,len,memory_order_relaxed);
}

/* Release the lua_scripts dict. */
Expand All @@ -55,17 +56,17 @@ void lazyFreeLuaScripts(void *args[]) {
lua_State *lua = args[2];
long long len = dictSize(lua_scripts);
freeLuaScriptsSync(lua_scripts, lua_scripts_lru_list, lua);
atomicDecr(lazyfree_objects, len);
atomicIncr(lazyfreed_objects, len);
atomic_fetch_sub_explicit(&lazyfree_objects,len,memory_order_relaxed);
atomic_fetch_add_explicit(&lazyfreed_objects,len,memory_order_relaxed);
}

/* Release the functions ctx. */
void lazyFreeFunctionsCtx(void *args[]) {
functionsLibCtx *functions_lib_ctx = args[0];
size_t len = functionsLibCtxFunctionsLen(functions_lib_ctx);
functionsLibCtxFree(functions_lib_ctx);
atomicDecr(lazyfree_objects, len);
atomicIncr(lazyfreed_objects, len);
atomic_fetch_sub_explicit(&lazyfree_objects,len,memory_order_relaxed);
atomic_fetch_add_explicit(&lazyfreed_objects,len,memory_order_relaxed);
}

/* Release replication backlog referencing memory. */
Expand All @@ -76,26 +77,24 @@ void lazyFreeReplicationBacklogRefMem(void *args[]) {
len += raxSize(index);
listRelease(blocks);
raxFree(index);
atomicDecr(lazyfree_objects, len);
atomicIncr(lazyfreed_objects, len);
atomic_fetch_sub_explicit(&lazyfree_objects,len,memory_order_relaxed);
atomic_fetch_add_explicit(&lazyfreed_objects,len,memory_order_relaxed);
}

/* Return the number of currently pending objects to free. */
size_t lazyfreeGetPendingObjectsCount(void) {
size_t aux;
atomicGet(lazyfree_objects, aux);
size_t aux = atomic_load_explicit(&lazyfree_objects,memory_order_relaxed);
return aux;
}

/* Return the number of objects that have been freed. */
size_t lazyfreeGetFreedObjectsCount(void) {
size_t aux;
atomicGet(lazyfreed_objects, aux);
size_t aux = atomic_load_explicit(&lazyfreed_objects,memory_order_relaxed);
return aux;
}

void lazyfreeResetStats(void) {
atomicSet(lazyfreed_objects, 0);
atomic_store_explicit(&lazyfreed_objects,0,memory_order_relaxed);
}

/* Return the amount of work needed in order to free an object.
Expand Down Expand Up @@ -175,8 +174,8 @@ void freeObjAsync(robj *key, robj *obj, int dbid) {
* of parts of the server core may call incrRefCount() to protect
* objects, and then call dbDelete(). */
if (free_effort > LAZYFREE_THRESHOLD && obj->refcount == 1) {
atomicIncr(lazyfree_objects, 1);
bioCreateLazyFreeJob(lazyfreeFreeObject, 1, obj);
atomic_fetch_add_explicit(&lazyfree_objects,1,memory_order_relaxed);
bioCreateLazyFreeJob(lazyfreeFreeObject,1,obj);
} else {
decrRefCount(obj);
}
Expand All @@ -195,7 +194,7 @@ void emptyDbAsync(serverDb *db) {
kvstore *oldkeys = db->keys, *oldexpires = db->expires;
db->keys = kvstoreCreate(&dbDictType, slot_count_bits, flags);
db->expires = kvstoreCreate(&dbExpiresDictType, slot_count_bits, flags);
atomicIncr(lazyfree_objects, kvstoreSize(oldkeys));
atomic_fetch_add_explicit(&lazyfree_objects, kvstoreSize(oldkeys), memory_order_relaxed);
bioCreateLazyFreeJob(lazyfreeFreeDatabase, 2, oldkeys, oldexpires);
}

Expand All @@ -204,8 +203,8 @@ void emptyDbAsync(serverDb *db) {
void freeTrackingRadixTreeAsync(rax *tracking) {
/* Because this rax has only keys and no values so we use numnodes. */
if (tracking->numnodes > LAZYFREE_THRESHOLD) {
atomicIncr(lazyfree_objects, tracking->numele);
bioCreateLazyFreeJob(lazyFreeTrackingTable, 1, tracking);
atomic_fetch_add_explicit(&lazyfree_objects,tracking->numele,memory_order_relaxed);
bioCreateLazyFreeJob(lazyFreeTrackingTable,1,tracking);
} else {
freeTrackingRadixTree(tracking);
}
Expand All @@ -216,8 +215,8 @@ void freeTrackingRadixTreeAsync(rax *tracking) {
void freeErrorsRadixTreeAsync(rax *errors) {
/* Because this rax has only keys and no values so we use numnodes. */
if (errors->numnodes > LAZYFREE_THRESHOLD) {
atomicIncr(lazyfree_objects, errors->numele);
bioCreateLazyFreeJob(lazyFreeErrors, 1, errors);
atomic_fetch_add_explicit(&lazyfree_objects,errors->numele,memory_order_relaxed);
bioCreateLazyFreeJob(lazyFreeErrors,1,errors);
} else {
raxFreeWithCallback(errors, zfree);
}
Expand All @@ -227,8 +226,8 @@ void freeErrorsRadixTreeAsync(rax *errors) {
* Close lua interpreter, if there are a lot of lua scripts, close it in async way. */
void freeLuaScriptsAsync(dict *lua_scripts, list *lua_scripts_lru_list, lua_State *lua) {
if (dictSize(lua_scripts) > LAZYFREE_THRESHOLD) {
atomicIncr(lazyfree_objects, dictSize(lua_scripts));
bioCreateLazyFreeJob(lazyFreeLuaScripts, 3, lua_scripts, lua_scripts_lru_list, lua);
atomic_fetch_add_explicit(&lazyfree_objects,dictSize(lua_scripts),memory_order_relaxed);
bioCreateLazyFreeJob(lazyFreeLuaScripts,3,lua_scripts,lua_scripts_lru_list,lua);
} else {
freeLuaScriptsSync(lua_scripts, lua_scripts_lru_list, lua);
}
Expand All @@ -237,18 +236,20 @@ void freeLuaScriptsAsync(dict *lua_scripts, list *lua_scripts_lru_list, lua_Stat
/* Free functions ctx, if the functions ctx contains enough functions, free it in async way. */
void freeFunctionsAsync(functionsLibCtx *functions_lib_ctx) {
if (functionsLibCtxFunctionsLen(functions_lib_ctx) > LAZYFREE_THRESHOLD) {
atomicIncr(lazyfree_objects, functionsLibCtxFunctionsLen(functions_lib_ctx));
bioCreateLazyFreeJob(lazyFreeFunctionsCtx, 1, functions_lib_ctx);
atomic_fetch_add_explicit(&lazyfree_objects,functionsLibCtxFunctionsLen(functions_lib_ctx),memory_order_relaxed);
bioCreateLazyFreeJob(lazyFreeFunctionsCtx,1,functions_lib_ctx);
} else {
functionsLibCtxFree(functions_lib_ctx);
}
}

/* Free replication backlog referencing buffer blocks and rax index. */
void freeReplicationBacklogRefMemAsync(list *blocks, rax *index) {
if (listLength(blocks) > LAZYFREE_THRESHOLD || raxSize(index) > LAZYFREE_THRESHOLD) {
atomicIncr(lazyfree_objects, listLength(blocks) + raxSize(index));
bioCreateLazyFreeJob(lazyFreeReplicationBacklogRefMem, 2, blocks, index);
if (listLength(blocks) > LAZYFREE_THRESHOLD ||
raxSize(index) > LAZYFREE_THRESHOLD)
{
atomic_fetch_add_explicit(&lazyfree_objects,listLength(blocks)+raxSize(index),memory_order_relaxed);
bioCreateLazyFreeJob(lazyFreeReplicationBacklogRefMem,2,blocks,index);
} else {
listRelease(blocks);
raxFree(index);
Expand Down
5 changes: 2 additions & 3 deletions src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -2413,8 +2413,7 @@ void VM_Yield(ValkeyModuleCtx *ctx, int flags, const char *busy_reply) {
* after the main thread enters acquiring GIL state in order to protect the event
* loop (ae.c) and avoid potential race conditions. */

int acquiring;
atomicGet(server.module_gil_acquring, acquiring);
int acquiring = atomic_load_explicit(&server.module_gil_acquiring, memory_order_relaxed);
if (!acquiring) {
/* If the main thread has not yet entered the acquiring GIL state,
* we attempt to wake it up and exit without waiting for it to
Expand Down Expand Up @@ -11823,7 +11822,7 @@ void moduleInitModulesSystem(void) {
moduleUnblockedClients = listCreate();
server.loadmodule_queue = listCreate();
server.module_configs_queue = dictCreate(&sdsKeyValueHashDictType);
server.module_gil_acquring = 0;
server.module_gil_acquiring = 0;
modules = dictCreate(&modulesDictType);
moduleAuthCallbacks = listCreate();

Expand Down
Loading

0 comments on commit 5d0f4bc

Please sign in to comment.