Skip to content

Rocksdb manual flush code changes #11849

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions fdbclient/ServerKnobs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -597,8 +597,8 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( ROCKSDB_CF_METRICS_DELAY, 900.0 );
init( ROCKSDB_MAX_LOG_FILE_SIZE, 10485760 ); // 10MB.
init( ROCKSDB_KEEP_LOG_FILE_NUM, 100 ); // Keeps 1GB log per storage server.
// Does manual flushes at regular intervals(seconds), incase rocksdb did not flush. Feature disable if the value is 0.
init( ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL, 0 ); if( randomize && BUGGIFY ) ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL = deterministicRandom()->randomInt(4, 10);
// Does manual flushes at regular intervals(seconds), incase rocksdb did not flush. Feature disabled if the value is 0.
init( ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL, 600 ); if( isSimulated ) ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL = deterministicRandom()->randomInt(4, 1200);
init( ROCKSDB_SKIP_STATS_UPDATE_ON_OPEN, true );
init( ROCKSDB_SKIP_FILE_SIZE_CHECK_ON_OPEN, true );
init( ROCKSDB_FULLFILE_CHECKSUM, false ); if ( randomize && BUGGIFY ) ROCKSDB_FULLFILE_CHECKSUM = true;
Expand All @@ -609,7 +609,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi
init( ROCKSDB_MEMTABLE_PROTECTION_BYTES_PER_KEY, 0 ); if ( randomize && BUGGIFY ) ROCKSDB_MEMTABLE_PROTECTION_BYTES_PER_KEY = 8; // Default: 0 (disabled). Supported values: 0, 1, 2, 4, 8.
// Block cache key-value checksum. Checksum is validated during read, so has non-trivial impact on read performance.
init( ROCKSDB_BLOCK_PROTECTION_BYTES_PER_KEY, 0 ); if ( randomize && BUGGIFY ) ROCKSDB_BLOCK_PROTECTION_BYTES_PER_KEY = 8; // Default: 0 (disabled). Supported values: 0, 1, 2, 4, 8.
init( ROCKSDB_METRICS_IN_SIMULATION, false );
init( ROCKSDB_ENABLE_NONDETERMINISM, false );
init( SHARDED_ROCKSDB_ALLOW_WRITE_STALL_ON_FLUSH, false );
init( SHARDED_ROCKSDB_VALIDATE_MAPPING_RATIO, 0.01 ); if (isSimulated) SHARDED_ROCKSDB_VALIDATE_MAPPING_RATIO = deterministicRandom()->random01();
init( SHARD_METADATA_SCAN_BYTES_LIMIT, 10485760 ); // 10MB
Expand Down
8 changes: 4 additions & 4 deletions fdbclient/include/fdbclient/ServerKnobs.h
Original file line number Diff line number Diff line change
Expand Up @@ -578,10 +578,10 @@ class SWIFT_CXX_IMMORTAL_SINGLETON_TYPE ServerKnobs : public KnobsImpl<ServerKno
int ROCKSDB_WRITEBATCH_PROTECTION_BYTES_PER_KEY;
int ROCKSDB_MEMTABLE_PROTECTION_BYTES_PER_KEY;
int ROCKSDB_BLOCK_PROTECTION_BYTES_PER_KEY;
bool ROCKSDB_METRICS_IN_SIMULATION; // Whether rocksdb traceevent metrics will be emitted in simulation. Note that
// turning this on in simulation could lead to non-deterministic runs since we
// rely on rocksdb metadata. This knob also applies to sharded rocks storage
// engine.
bool ROCKSDB_ENABLE_NONDETERMINISM; // Whether rocksdb nondeterministic behavior should be enabled in simulation.
// Note that turning this on in simulation could lead to non-deterministic runs
// since we rely on rocksdb metadata. This knob also applies to sharded rocks
// storage engine.
bool SHARDED_ROCKSDB_ALLOW_WRITE_STALL_ON_FLUSH;
int SHARDED_ROCKSDB_MEMTABLE_MAX_RANGE_DELETIONS;
double SHARDED_ROCKSDB_VALIDATE_MAPPING_RATIO;
Expand Down
56 changes: 34 additions & 22 deletions fdbserver/KeyValueStoreRocksDB.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ class SharedRocksDBState {
rocksdb::Options getOptions() const { return rocksdb::Options(this->dbOptions, this->cfOptions); }
rocksdb::ReadOptions getReadOptions() { return this->readOptions; }
rocksdb::FlushOptions getFlushOptions() { return this->flushOptions; }
double getLastFlushTime() const { return this->lastFlushTime_; }
void setLastFlushTime(double lastFlushTime) { this->lastFlushTime_ = lastFlushTime; }

private:
const UID id;
Expand All @@ -120,6 +122,7 @@ class SharedRocksDBState {
rocksdb::ColumnFamilyOptions cfOptions;
rocksdb::ReadOptions readOptions;
rocksdb::FlushOptions flushOptions;
std::atomic<double> lastFlushTime_;
};

SharedRocksDBState::SharedRocksDBState(UID id)
Expand Down Expand Up @@ -374,12 +377,14 @@ class RocksDBErrorListener : public rocksdb::EventListener {

class RocksDBEventListener : public rocksdb::EventListener {
public:
RocksDBEventListener(std::shared_ptr<double> lastFlushTime) : lastFlushTime(lastFlushTime){};
RocksDBEventListener(std::shared_ptr<SharedRocksDBState> sharedState) : sharedState(sharedState){};

void OnFlushCompleted(rocksdb::DB* db, const rocksdb::FlushJobInfo& info) override { *lastFlushTime = now(); }
void OnFlushCompleted(rocksdb::DB* db, const rocksdb::FlushJobInfo& info) override {
sharedState->setLastFlushTime(now());
}

private:
std::shared_ptr<double> lastFlushTime;
std::shared_ptr<SharedRocksDBState> sharedState;
};

using DB = rocksdb::DB*;
Expand Down Expand Up @@ -986,19 +991,30 @@ ACTOR Future<Void> flowLockLogger(UID id, const FlowLock* readLock, const FlowLo
}
}

ACTOR Future<Void> manualFlush(UID id,
rocksdb::DB* db,
std::shared_ptr<SharedRocksDBState> sharedState,
std::shared_ptr<double> lastFlushTime,
CF cf) {
ACTOR Future<Void> manualFlush(UID id, rocksdb::DB* db, std::shared_ptr<SharedRocksDBState> sharedState, CF cf) {
if (SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL) {
state rocksdb::FlushOptions fOptions = sharedState->getFlushOptions();
state double waitTime = SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL;
state double currTime = 0;
state int timeElapsedAfterLastFlush = 0;
loop {
wait(delay(SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL));
wait(delay(waitTime));

if ((now() - *lastFlushTime) > SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL) {
currTime = now();
timeElapsedAfterLastFlush = currTime - sharedState->getLastFlushTime();
if (timeElapsedAfterLastFlush >= SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL) {
db->Flush(fOptions, cf);
TraceEvent e("RocksDBManualFlush", id);
waitTime = SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL;
TraceEvent("RocksDBManualFlush", id).detail("TimeElapsedAfterLastFlush", timeElapsedAfterLastFlush);
} else {
waitTime = SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL - timeElapsedAfterLastFlush;
}

// The above code generates different waitTimes based on rocksdb background flushes which causes non
// deterministic behavior. Setting constant waitTimes in simulation to avoid this. And enable the behavior
// only in RocksdbNondeterministic(ROCKSDB_ENABLE_NONDETERMINISM=true) test.
if (g_network->isSimulated() && !SERVER_KNOBS->ROCKSDB_ENABLE_NONDETERMINISM) {
waitTime = SERVER_KNOBS->ROCKSDB_MANUAL_FLUSH_TIME_INTERVAL;
}
}
}
Expand Down Expand Up @@ -1289,11 +1305,9 @@ struct RocksDBKeyValueStore : IKeyValueStore {
const FlowLock* fetchLock,
std::shared_ptr<RocksDBErrorListener> errorListener,
std::shared_ptr<RocksDBEventListener> eventListener,
std::shared_ptr<double> lastFlushTime,
Counters& counters)
: path(std::move(path)), metrics(metrics), readLock(readLock), fetchLock(fetchLock),
errorListener(errorListener), eventListener(eventListener), lastFlushTime(lastFlushTime),
counters(counters) {}
errorListener(errorListener), eventListener(eventListener), counters(counters) {}

double getTimeEstimate() const override { return SERVER_KNOBS->COMMIT_TIME_ESTIMATE; }
};
Expand Down Expand Up @@ -1358,7 +1372,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
// The current thread and main thread are same when the code runs in simulation.
// blockUntilReady() is getting the thread into deadlock state, so directly calling
// the metricsLogger.
if (SERVER_KNOBS->ROCKSDB_METRICS_IN_SIMULATION) {
if (SERVER_KNOBS->ROCKSDB_ENABLE_NONDETERMINISM) {
a.metrics = rocksDBMetricLogger(id,
sharedState,
options.statistics,
Expand All @@ -1368,10 +1382,10 @@ struct RocksDBKeyValueStore : IKeyValueStore {
&a.counters,
cf) &&
flowLockLogger(id, a.readLock, a.fetchLock) && refreshReadIteratorPool(readIterPool) &&
manualFlush(id, db, sharedState, a.lastFlushTime, cf);
manualFlush(id, db, sharedState, cf);
} else {
a.metrics = flowLockLogger(id, a.readLock, a.fetchLock) && refreshReadIteratorPool(readIterPool) &&
manualFlush(id, db, sharedState, a.lastFlushTime, cf);
manualFlush(id, db, sharedState, cf);
}
} else {
onMainThread([&] {
Expand All @@ -1384,7 +1398,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
&a.counters,
cf) &&
flowLockLogger(id, a.readLock, a.fetchLock) && refreshReadIteratorPool(readIterPool) &&
manualFlush(id, db, sharedState, a.lastFlushTime, cf);
manualFlush(id, db, sharedState, cf);
return Future<bool>(true);
}).blockUntilReady();
}
Expand Down Expand Up @@ -1898,8 +1912,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
numReadWaiters(SERVER_KNOBS->ROCKSDB_READ_QUEUE_HARD_MAX - SERVER_KNOBS->ROCKSDB_READ_QUEUE_SOFT_MAX),
numFetchWaiters(SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_HARD_MAX - SERVER_KNOBS->ROCKSDB_FETCH_QUEUE_SOFT_MAX),
errorListener(std::make_shared<RocksDBErrorListener>(id)), errorFuture(errorListener->getFuture()) {
lastFlushTime = std::make_shared<double>(now());
eventListener = std::make_shared<RocksDBEventListener>(lastFlushTime);
eventListener = std::make_shared<RocksDBEventListener>(sharedState);
// In simluation, run the reader/writer threads as Coro threads (i.e. in the network thread. The storage engine
// is still multi-threaded as background compaction threads are still present. Reads/writes to disk will also
// block the network thread in a way that would be unacceptable in production but is a necessary evil here. When
Expand Down Expand Up @@ -2093,7 +2106,7 @@ struct RocksDBKeyValueStore : IKeyValueStore {
return openFuture;
}
auto a = std::make_unique<Writer::OpenAction>(
path, metrics, &readSemaphore, &fetchSemaphore, errorListener, eventListener, lastFlushTime, counters);
path, metrics, &readSemaphore, &fetchSemaphore, errorListener, eventListener, counters);
openFuture = a->done.getFuture();
writeThread->post(a.release());
return openFuture;
Expand Down Expand Up @@ -2427,7 +2440,6 @@ struct RocksDBKeyValueStore : IKeyValueStore {
Reference<IThreadPool> readThreads;
std::shared_ptr<RocksDBErrorListener> errorListener;
std::shared_ptr<RocksDBEventListener> eventListener;
std::shared_ptr<double> lastFlushTime;
Future<Void> errorFuture;
Promise<Void> closePromise;
Future<Void> openFuture;
Expand Down
2 changes: 1 addition & 1 deletion fdbserver/KeyValueStoreShardedRocksDB.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3713,7 +3713,7 @@ struct ShardedRocksDBKeyValueStore : IKeyValueStore {
} else {
auto a = std::make_unique<Writer::OpenAction>(&shardManager, metrics, &readSemaphore, &fetchSemaphore);
openFuture = a->done.getFuture();
if (SERVER_KNOBS->ROCKSDB_METRICS_IN_SIMULATION) {
if (SERVER_KNOBS->ROCKSDB_ENABLE_NONDETERMINISM) {
this->metrics =
ShardManager::shardMetricsLogger(this->rState, openFuture, &shardManager) &&
rocksDBAggregatedMetricsLogger(this->rState, openFuture, rocksDBMetrics, &shardManager, this->path);
Expand Down
2 changes: 1 addition & 1 deletion tests/fast/RocksdbNondeterministicTest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
storageEngineType = 4 # Always pick RocksDB

[[knobs]]
rocksdb_metrics_in_simulation = true # This can cause non-determinism, but we don't do unseed check for this test
rocksdb_enable_nondeterminism = true # This can cause non-determinism, but we don't do unseed check for this test

[[test]]
testTitle = 'Clogged'
Expand Down
2 changes: 1 addition & 1 deletion tests/fast/ShardedRocksNondeterministicTest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ storageEngineType = 5 # Always pick ShardedRocks

# These can cause non-determinism, but we don't do unseed check for this test
[[knobs]]
rocksdb_metrics_in_simulation = true
rocksdb_enable_nondeterminism = true
shard_encode_location_metadata = true

[[test]]
Expand Down