Skip to content

Commit

Permalink
feat: Allow concurrent world state access (#11216)
Browse files Browse the repository at this point in the history
Implements per-fork queues for requests to the native world state
following it's concurrency rules. Also tightens up aspects of the cached
store to ensure reads of committed data don't access anything
uncommitted.

```
1. Reads of committed state never need to be queued. LMDB uses MVCC to ensure readers see a consistent view of the DB.
2. Reads of uncommitted state can happen concurrently with other reads of uncommitted state on the same fork (or reads of committed state)
3. All writes require exclusive access to their respective fork
 ```
  • Loading branch information
PhilWindle authored and AztecBot committed Jan 16, 2025
1 parent 45915b1 commit 5cd38aa
Show file tree
Hide file tree
Showing 10 changed files with 373 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ void ContentAddressedAppendOnlyTree<Store, HashingPolicy>::get_leaf(const index_
RequestContext requestContext;
requestContext.includeUncommitted = includeUncommitted;
requestContext.root = store_->get_current_root(*tx, includeUncommitted);
std::optional<fr> leaf_hash = find_leaf_hash(leaf_index, requestContext, *tx);
std::optional<fr> leaf_hash = find_leaf_hash(leaf_index, requestContext, *tx, false);
response.success = leaf_hash.has_value();
if (response.success) {
response.inner.leaf = leaf_hash.value();
Expand Down Expand Up @@ -690,15 +690,15 @@ void ContentAddressedAppendOnlyTree<Store, HashingPolicy>::get_leaf(const index_
leaf_index,
" for block ",
blockNumber,
", leaf index is too high.");
", leaf index out of range.");
response.success = false;
return;
}
RequestContext requestContext;
requestContext.blockNumber = blockNumber;
requestContext.includeUncommitted = includeUncommitted;
requestContext.root = blockData.root;
std::optional<fr> leaf_hash = find_leaf_hash(leaf_index, requestContext, *tx);
std::optional<fr> leaf_hash = find_leaf_hash(leaf_index, requestContext, *tx, false);
response.success = leaf_hash.has_value();
if (response.success) {
response.inner.leaf = leaf_hash.value();
Expand Down Expand Up @@ -746,7 +746,6 @@ void ContentAddressedAppendOnlyTree<Store, HashingPolicy>::find_leaf_indices_fro

RequestContext requestContext;
requestContext.includeUncommitted = includeUncommitted;
requestContext.root = store_->get_current_root(*tx, includeUncommitted);

for (const auto& leaf : leaves) {
std::optional<index_t> leaf_index =
Expand Down Expand Up @@ -787,7 +786,6 @@ void ContentAddressedAppendOnlyTree<Store, HashingPolicy>::find_leaf_indices_fro
RequestContext requestContext;
requestContext.blockNumber = blockNumber;
requestContext.includeUncommitted = includeUncommitted;
requestContext.root = blockData.root;
requestContext.maxIndex = blockData.size;

for (const auto& leaf : leaves) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ void ContentAddressedIndexedTree<Store, HashingPolicy>::get_leaf(const index_t&
RequestContext requestContext;
requestContext.includeUncommitted = includeUncommitted;
requestContext.root = store_->get_current_root(*tx, includeUncommitted);
std::optional<fr> leaf_hash = find_leaf_hash(index, requestContext, *tx);
std::optional<fr> leaf_hash = find_leaf_hash(index, requestContext, *tx, false);
if (!leaf_hash.has_value()) {
response.success = false;
response.message = "Failed to find leaf hash for current root";
Expand Down Expand Up @@ -390,7 +390,7 @@ void ContentAddressedIndexedTree<Store, HashingPolicy>::get_leaf(const index_t&
requestContext.blockNumber = blockNumber;
requestContext.includeUncommitted = includeUncommitted;
requestContext.root = blockData.root;
std::optional<fr> leaf_hash = find_leaf_hash(index, requestContext, *tx);
std::optional<fr> leaf_hash = find_leaf_hash(index, requestContext, *tx, false);
if (!leaf_hash.has_value()) {
response.success = false;
response.message = format("Failed to find leaf hash for root of block ", blockNumber);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class LMDBDatabaseCreationTransaction;
class LMDBDatabase {
public:
using Ptr = std::unique_ptr<LMDBDatabase>;
using SharedPtr = std::shared_ptr<LMDBDatabase>;

LMDBDatabase(LMDBEnvironment::SharedPtr env,
const LMDBDatabaseCreationTransaction& transaction,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
#include <cstddef>
#include <cstdint>
#include <gtest/gtest.h>

#include <chrono>
#include <cstdlib>
#include <filesystem>
#include <memory>
#include <sstream>
#include <stdexcept>
#include <vector>

#include "barretenberg/common/serialize.hpp"
#include "barretenberg/common/streams.hpp"
#include "barretenberg/common/test.hpp"
#include "barretenberg/crypto/merkle_tree/fixtures.hpp"
#include "barretenberg/crypto/merkle_tree/lmdb_store/lmdb_database.hpp"
#include "barretenberg/crypto/merkle_tree/lmdb_store/lmdb_db_transaction.hpp"
#include "barretenberg/crypto/merkle_tree/lmdb_store/lmdb_environment.hpp"
#include "barretenberg/crypto/merkle_tree/lmdb_store/queries.hpp"
#include "barretenberg/crypto/merkle_tree/signal.hpp"
#include "barretenberg/crypto/merkle_tree/types.hpp"
#include "barretenberg/numeric/random/engine.hpp"
#include "barretenberg/numeric/uint128/uint128.hpp"
#include "barretenberg/numeric/uint256/uint256.hpp"
#include "barretenberg/polynomials/serialize.hpp"
#include "barretenberg/stdlib/primitives/field/field.hpp"
#include "lmdb_tree_store.hpp"

using namespace bb::stdlib;
using namespace bb::crypto::merkle_tree;

class LMDBEnvironmentTest : public testing::Test {
protected:
void SetUp() override
{
_directory = random_temp_directory();
_mapSize = 1024 * 1024;
_maxReaders = 16;
std::filesystem::create_directories(_directory);
}

void TearDown() override { std::filesystem::remove_all(_directory); }

static std::string _directory;
static uint32_t _maxReaders;
static uint64_t _mapSize;
};

std::string LMDBEnvironmentTest::_directory;
uint32_t LMDBEnvironmentTest::_maxReaders;
uint64_t LMDBEnvironmentTest::_mapSize;

std::vector<uint8_t> serialise(std::string key)
{
std::vector<uint8_t> data(key.begin(), key.end());
return data;
}

TEST_F(LMDBEnvironmentTest, can_create_environment)
{
EXPECT_NO_THROW(LMDBEnvironment environment(
LMDBEnvironmentTest::_directory, LMDBEnvironmentTest::_mapSize, 1, LMDBEnvironmentTest::_maxReaders));
}

TEST_F(LMDBEnvironmentTest, can_create_database)
{
LMDBEnvironment::SharedPtr environment = std::make_shared<LMDBEnvironment>(
LMDBEnvironmentTest::_directory, LMDBEnvironmentTest::_mapSize, 1, LMDBEnvironmentTest::_maxReaders);

{
LMDBDatabaseCreationTransaction tx(environment);
LMDBDatabase::SharedPtr db = std::make_unique<LMDBDatabase>(environment, tx, "DB", false, false);
EXPECT_NO_THROW(tx.commit());
}
}

TEST_F(LMDBEnvironmentTest, can_write_to_database)
{
LMDBEnvironment::SharedPtr environment = std::make_shared<LMDBEnvironment>(
LMDBEnvironmentTest::_directory, LMDBEnvironmentTest::_mapSize, 1, LMDBEnvironmentTest::_maxReaders);

LMDBDatabaseCreationTransaction tx(environment);
LMDBDatabase::SharedPtr db = std::make_unique<LMDBDatabase>(environment, tx, "DB", false, false);
EXPECT_NO_THROW(tx.commit());

{
LMDBTreeWriteTransaction::SharedPtr tx = std::make_shared<LMDBTreeWriteTransaction>(environment);
auto key = serialise(std::string("Key"));
auto data = serialise(std::string("TestData"));
EXPECT_NO_THROW(tx->put_value(key, data, *db));
EXPECT_NO_THROW(tx->commit());
}
}

TEST_F(LMDBEnvironmentTest, can_read_from_database)
{
LMDBEnvironment::SharedPtr environment = std::make_shared<LMDBEnvironment>(
LMDBEnvironmentTest::_directory, LMDBEnvironmentTest::_mapSize, 1, LMDBEnvironmentTest::_maxReaders);

LMDBDatabaseCreationTransaction tx(environment);
LMDBDatabase::SharedPtr db = std::make_unique<LMDBDatabase>(environment, tx, "DB", false, false);
EXPECT_NO_THROW(tx.commit());

{
LMDBTreeWriteTransaction::SharedPtr tx = std::make_shared<LMDBTreeWriteTransaction>(environment);
auto key = serialise(std::string("Key"));
auto data = serialise(std::string("TestData"));
EXPECT_NO_THROW(tx->put_value(key, data, *db));
EXPECT_NO_THROW(tx->commit());
}

{
environment->wait_for_reader();
LMDBTreeReadTransaction::SharedPtr tx = std::make_shared<LMDBTreeReadTransaction>(environment);
auto key = serialise(std::string("Key"));
auto expected = serialise(std::string("TestData"));
std::vector<uint8_t> data;
tx->get_value(key, data, *db);
EXPECT_EQ(data, expected);
}
}

TEST_F(LMDBEnvironmentTest, can_write_and_read_multiple)
{
LMDBEnvironment::SharedPtr environment = std::make_shared<LMDBEnvironment>(
LMDBEnvironmentTest::_directory, LMDBEnvironmentTest::_mapSize, 1, LMDBEnvironmentTest::_maxReaders);

LMDBDatabaseCreationTransaction tx(environment);
LMDBDatabase::SharedPtr db = std::make_unique<LMDBDatabase>(environment, tx, "DB", false, false);
EXPECT_NO_THROW(tx.commit());

uint64_t numValues = 10;

{
for (uint64_t count = 0; count < numValues; count++) {
LMDBTreeWriteTransaction::SharedPtr tx = std::make_shared<LMDBTreeWriteTransaction>(environment);
auto key = serialise((std::stringstream() << "Key" << count).str());
auto data = serialise((std::stringstream() << "TestData" << count).str());
EXPECT_NO_THROW(tx->put_value(key, data, *db));
EXPECT_NO_THROW(tx->commit());
}
}

{
for (uint64_t count = 0; count < numValues; count++) {
environment->wait_for_reader();
LMDBTreeReadTransaction::SharedPtr tx = std::make_shared<LMDBTreeReadTransaction>(environment);
auto key = serialise((std::stringstream() << "Key" << count).str());
auto expected = serialise((std::stringstream() << "TestData" << count).str());
std::vector<uint8_t> data;
tx->get_value(key, data, *db);
EXPECT_EQ(data, expected);
}
}
}

TEST_F(LMDBEnvironmentTest, can_read_multiple_threads)
{
LMDBEnvironment::SharedPtr environment =
std::make_shared<LMDBEnvironment>(LMDBEnvironmentTest::_directory, LMDBEnvironmentTest::_mapSize, 1, 2);

LMDBDatabaseCreationTransaction tx(environment);
LMDBDatabase::SharedPtr db = std::make_unique<LMDBDatabase>(environment, tx, "DB", false, false);
EXPECT_NO_THROW(tx.commit());

uint64_t numValues = 10;
uint64_t numIterationsPerThread = 1000;
uint32_t numThreads = 16;

{
for (uint64_t count = 0; count < numValues; count++) {
LMDBTreeWriteTransaction::SharedPtr tx = std::make_shared<LMDBTreeWriteTransaction>(environment);
auto key = serialise((std::stringstream() << "Key" << count).str());
auto data = serialise((std::stringstream() << "TestData" << count).str());
EXPECT_NO_THROW(tx->put_value(key, data, *db));
EXPECT_NO_THROW(tx->commit());
}
}

{
auto func = [&]() -> void {
for (uint64_t iteration = 0; iteration < numIterationsPerThread; iteration++) {
for (uint64_t count = 0; count < numValues; count++) {
environment->wait_for_reader();
LMDBTreeReadTransaction::SharedPtr tx = std::make_shared<LMDBTreeReadTransaction>(environment);
auto key = serialise((std::stringstream() << "Key" << count).str());
auto expected = serialise((std::stringstream() << "TestData" << count).str());
std::vector<uint8_t> data;
tx->get_value(key, data, *db);
EXPECT_EQ(data, expected);
}
}
};
std::vector<std::unique_ptr<std::thread>> threads;
for (uint64_t count = 0; count < numThreads; count++) {
threads.emplace_back(std::make_unique<std::thread>(func));
}
for (uint64_t count = 0; count < numThreads; count++) {
threads[count]->join();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <cstring>
#include <exception>
#include <functional>
#include <memory>
#include <vector>

namespace bb::crypto::merkle_tree {
Expand All @@ -22,6 +23,7 @@ namespace bb::crypto::merkle_tree {
class LMDBTreeReadTransaction : public LMDBTransaction {
public:
using Ptr = std::unique_ptr<LMDBTreeReadTransaction>;
using SharedPtr = std::shared_ptr<LMDBTreeReadTransaction>;

LMDBTreeReadTransaction(LMDBEnvironment::SharedPtr env);
LMDBTreeReadTransaction(const LMDBTreeReadTransaction& other) = delete;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,48 @@ TEST_F(LMDBTreeStoreTest, can_write_and_read_meta_data)
}
}

TEST_F(LMDBTreeStoreTest, can_read_data_from_multiple_threads)
{
TreeMeta metaData;
metaData.committedSize = 56;
metaData.initialSize = 12;
metaData.initialRoot = VALUES[1];
metaData.root = VALUES[2];
metaData.depth = 40;
metaData.oldestHistoricBlock = 87;
metaData.unfinalisedBlockHeight = 95;
metaData.name = "Note hash tree";
metaData.size = 60;
LMDBTreeStore store(_directory, "DB1", _mapSize, 2);
{
LMDBTreeWriteTransaction::Ptr transaction = store.create_write_transaction();
store.write_meta_data(metaData, *transaction);
transaction->commit();
}

uint64_t numIterationsPerThread = 1000;
uint32_t numThreads = 16;

{
auto func = [&]() -> void {
for (uint64_t iteration = 0; iteration < numIterationsPerThread; iteration++) {
LMDBTreeReadTransaction::Ptr transaction = store.create_read_transaction();
TreeMeta readBack;
bool success = store.read_meta_data(readBack, *transaction);
EXPECT_TRUE(success);
EXPECT_EQ(readBack, metaData);
}
};
std::vector<std::unique_ptr<std::thread>> threads;
for (uint64_t count = 0; count < numThreads; count++) {
threads.emplace_back(std::make_unique<std::thread>(func));
}
for (uint64_t count = 0; count < numThreads; count++) {
threads[count]->join();
}
}
}

TEST_F(LMDBTreeStoreTest, can_write_and_read_multiple_blocks_with_meta)
{
LMDBTreeStore store(_directory, "DB1", _mapSize, _maxReaders);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ namespace bb::crypto::merkle_tree {
class LMDBTreeWriteTransaction : public LMDBTransaction {
public:
using Ptr = std::unique_ptr<LMDBTreeWriteTransaction>;
using SharedPtr = std::shared_ptr<LMDBTreeWriteTransaction>;

LMDBTreeWriteTransaction(LMDBEnvironment::SharedPtr env);
LMDBTreeWriteTransaction(const LMDBTreeWriteTransaction& other) = delete;
Expand Down
Loading

0 comments on commit 5cd38aa

Please sign in to comment.