Skip to content

Commit

Permalink
Enhance MockS3Client to support real client delegation and configurab…
Browse files Browse the repository at this point in the history
…le failures (#2158)

#### Reference Issues/PRs
ref monday ticket: 7971351691

#### What does this implement or fix?

This change introduces several improvements to the MockS3Client:
- Add support for wrapping and delegating to a real S3 client
- Implement a new `check_failure` method to enable configurable failure
simulation for specific buckets through env variables
- Update S3 storage initialization to use the new MockS3Client with real
client delegation
- Modify methods to first check for simulated failures before performing
operations

The changes allow for more flexible testing scenarios and improved mock
storage behavior.

#### Any other comments?
The intended use is something like:
``` python
# Enable failure
with config_context("S3Mock.EnableFailures", 1):
    with config_context_string("S3Mock.FailureBucket", target_bucket_names[0]):
        time.sleep(5)
    with config_context_string("S3Mock.FailureBucket", target_bucket_names[1]):
        time.sleep(5)
    # In these 5 seconds, all of the targets should have failed
    time.sleep(5)
    with config_context_string("S3Mock.FailureBucket", target_bucket_names[0]):
        time.sleep(5)

# continue as usual
```

#### Checklist

<details>
  <summary>
   Checklist for code changes...
  </summary>
 
- [ ] Have you updated the relevant docstrings, documentation and
copyright notice?
- [ ] Is this contribution tested against [all ArcticDB's
features](../docs/mkdocs/docs/technical/contributing.md)?
- [ ] Do all exceptions introduced raise appropriate [error
messages](https://docs.arcticdb.io/error_messages/)?
 - [ ] Are API changes highlighted in the PR description?
- [ ] Is the PR labelled as enhancement or bug so it appears in
autogenerated release notes?
</details>

<!--
Thanks for contributing a Pull Request to ArcticDB! Please ensure you
have taken a look at:
- ArcticDB's Code of Conduct:
https://github.com/man-group/ArcticDB/blob/master/CODE_OF_CONDUCT.md
- ArcticDB's Contribution Licensing:
https://github.com/man-group/ArcticDB/blob/master/docs/mkdocs/docs/technical/contributing.md#contribution-licensing
-->
  • Loading branch information
G-D-Petrov authored Feb 7, 2025
1 parent 1478b04 commit f6b8933
Show file tree
Hide file tree
Showing 14 changed files with 646 additions and 248 deletions.
2 changes: 2 additions & 0 deletions cpp/arcticdb/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,8 @@ set(arcticdb_srcs
storage/mock/s3_mock_client.cpp
storage/s3/s3_storage.cpp
storage/s3/s3_storage_tool.cpp
storage/s3/s3_client_wrapper.cpp
storage/s3/s3_client_wrapper.hpp
storage/storage_factory.cpp
storage/storage_utils.cpp
stream/aggregator.cpp
Expand Down
28 changes: 19 additions & 9 deletions cpp/arcticdb/storage/python_bindings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,23 +105,30 @@ void register_bindings(py::module& storage, py::exception<arcticdb::ArcticExcept

enum class S3SettingsPickleOrder : uint32_t {
AWS_AUTH = 0,
AWS_PROFILE = 1
AWS_PROFILE = 1,
USE_INTERNAL_CLIENT_WRAPPER_FOR_TESTING = 2
};

py::class_<s3::S3Settings>(storage, "S3Settings")
.def(py::init<s3::AWSAuthMethod, const std::string&>())
.def(py::init<s3::AWSAuthMethod, const std::string&, bool>())
.def(py::pickle(
[](const s3::S3Settings &settings) {
return py::make_tuple(settings.aws_auth(), settings.aws_profile());
return py::make_tuple(settings.aws_auth(), settings.aws_profile(), settings.use_internal_client_wrapper_for_testing());
},
[](py::tuple t) {
util::check(t.size() == 2, "Invalid S3Settings pickle objects");
s3::S3Settings settings(t[static_cast<uint32_t>(S3SettingsPickleOrder::AWS_AUTH)].cast<s3::AWSAuthMethod>(), t[static_cast<uint32_t>(S3SettingsPickleOrder::AWS_PROFILE)].cast<std::string>());
util::check(t.size() == 3, "Invalid S3Settings pickle objects");
s3::S3Settings settings(t[static_cast<uint32_t>(S3SettingsPickleOrder::AWS_AUTH)].cast<s3::AWSAuthMethod>(),
t[static_cast<uint32_t>(S3SettingsPickleOrder::AWS_PROFILE)].cast<std::string>(),
t[static_cast<uint32_t>(S3SettingsPickleOrder::USE_INTERNAL_CLIENT_WRAPPER_FOR_TESTING)].cast<bool>()
);
return settings;
}
))
.def_property_readonly("aws_profile", [](const s3::S3Settings &settings) { return settings.aws_profile(); })
.def_property_readonly("aws_auth", [](const s3::S3Settings &settings) { return settings.aws_auth(); });
.def_property_readonly("aws_auth", [](const s3::S3Settings &settings) { return settings.aws_auth(); })
.def_property_readonly("use_internal_client_wrapper_for_testing", [](const s3::S3Settings &settings) {
return settings.use_internal_client_wrapper_for_testing();
});

py::class_<NativeVariantStorage>(storage, "NativeVariantStorage")
.def(py::init<>())
Expand All @@ -130,7 +137,7 @@ void register_bindings(py::module& storage, py::exception<arcticdb::ArcticExcept
[](const NativeVariantStorage &settings) {
return util::variant_match(settings.variant(),
[] (const s3::S3Settings& settings) {
return py::make_tuple(settings.aws_auth(), settings.aws_profile());
return py::make_tuple(settings.aws_auth(), settings.aws_profile(), settings.use_internal_client_wrapper_for_testing());
},
[](const auto &) {
util::raise_rte("Invalid native storage setting type");
Expand All @@ -139,8 +146,11 @@ void register_bindings(py::module& storage, py::exception<arcticdb::ArcticExcept
);
},
[](py::tuple t) {
util::check(t.size() == 2, "Invalid S3Settings pickle objects");
s3::S3Settings settings(t[static_cast<uint32_t>(S3SettingsPickleOrder::AWS_AUTH)].cast<s3::AWSAuthMethod>(), t[static_cast<uint32_t>(S3SettingsPickleOrder::AWS_PROFILE)].cast<std::string>());
util::check(t.size() == 3, "Invalid S3Settings pickle objects");
s3::S3Settings settings(t[static_cast<uint32_t>(S3SettingsPickleOrder::AWS_AUTH)].cast<s3::AWSAuthMethod>(),
t[static_cast<uint32_t>(S3SettingsPickleOrder::AWS_PROFILE)].cast<std::string>(),
t[static_cast<uint32_t>(S3SettingsPickleOrder::USE_INTERNAL_CLIENT_WRAPPER_FOR_TESTING)].cast<bool>()
);
return NativeVariantStorage(std::move(settings));
}
))
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/storage/s3/nfs_backed_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
#include <arcticdb/storage/s3/s3_client_impl.hpp>
#include <arcticdb/storage/s3/s3_client_interface.hpp>
#include <arcticdb/util/simple_string_hash.hpp>

#include <arcticdb/storage/s3/s3_client_wrapper.hpp>
namespace arcticdb::storage::nfs_backed {

std::string add_suffix_char(const std::string& str) {
Expand Down
145 changes: 145 additions & 0 deletions cpp/arcticdb/storage/s3/s3_client_wrapper.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/* Copyright 2023 Man Group Operations Limited
*
* Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt.
*
* As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
*/

#include <arcticdb/storage/s3/s3_client_interface.hpp>
#include <arcticdb/log/log.hpp>
#include <arcticdb/util/buffer_pool.hpp>
#include <arcticdb/storage/storage_utils.hpp>
#include <arcticdb/storage/s3/s3_client_wrapper.hpp>

#include <aws/s3/S3Errors.h>

namespace arcticdb::storage{

using namespace object_store_utils;

namespace s3 {

std::optional<Aws::S3::S3Error> S3ClientTestWrapper::has_failure_trigger(const std::string& bucket_name) const {
bool static_failures_enabled = ConfigsMap::instance()->get_int("S3ClientTestWrapper.EnableFailures", 0) == 1;
// Check if mock failures are enabled
if (!static_failures_enabled) {
return std::nullopt;
}

// Get target buckets (if not set or "all", affects all buckets)
auto failure_buckets_str = ConfigsMap::instance()->get_string("S3ClientTestWrapper.FailureBucket", "all");

if (failure_buckets_str != "all") {
// Split the comma-separated bucket names and check if current bucket is in the list
std::istringstream bucket_stream(failure_buckets_str);
std::string target_bucket;
bool bucket_found = false;

while (std::getline(bucket_stream, target_bucket, ',')) {
// Trim whitespace
target_bucket.erase(0, target_bucket.find_first_not_of(" \t"));
target_bucket.erase(target_bucket.find_last_not_of(" \t") + 1);

if (target_bucket == bucket_name) {
bucket_found = true;
break;
}
}

if (!bucket_found) {
return std::nullopt;
}
}

// Get error configuration
auto error_code = ConfigsMap::instance()->get_int("S3ClientTestWrapper.ErrorCode", static_cast<int>(Aws::S3::S3Errors::NETWORK_CONNECTION));
auto retryable = ConfigsMap::instance()->get_int("S3ClientTestWrapper.ErrorRetryable", 0) == 1;

auto failure_error_ = Aws::S3::S3Error(Aws::Client::AWSError<Aws::S3::S3Errors>(
static_cast<Aws::S3::S3Errors>(error_code),
"SimulatedFailure",
"Simulated failure from environment variables",
retryable
));


return failure_error_;
}

S3Result<std::monostate> S3ClientTestWrapper::head_object(
const std::string& s3_object_name,
const std::string &bucket_name) const {
auto maybe_error = has_failure_trigger(bucket_name);
if (maybe_error.has_value()) {
return {*maybe_error};
}


return actual_client_->head_object(s3_object_name, bucket_name);
}

S3Result<Segment> S3ClientTestWrapper::get_object(
const std::string &s3_object_name,
const std::string &bucket_name) const {
auto maybe_error = has_failure_trigger(bucket_name);
if (maybe_error.has_value()) {
return {*maybe_error};
}

return actual_client_->get_object(s3_object_name, bucket_name);
}

folly::Future<S3Result<Segment>> S3ClientTestWrapper::get_object_async(
const std::string &s3_object_name,
const std::string &bucket_name) const {
auto maybe_error = has_failure_trigger(bucket_name);
if (maybe_error.has_value()) {
return folly::makeFuture<S3Result<Segment>>({*maybe_error});
}

return actual_client_->get_object_async(s3_object_name, bucket_name);
}

S3Result<std::monostate> S3ClientTestWrapper::put_object(
const std::string &s3_object_name,
Segment &segment,
const std::string &bucket_name,
PutHeader header) {
auto maybe_error = has_failure_trigger(bucket_name);
if (maybe_error.has_value()) {
return {*maybe_error};
}

return actual_client_->put_object(s3_object_name, segment, bucket_name, header);
}

S3Result<DeleteOutput> S3ClientTestWrapper::delete_objects(
const std::vector<std::string>& s3_object_names,
const std::string& bucket_name) {
auto maybe_error = has_failure_trigger(bucket_name);
if (maybe_error.has_value()) {
return {*maybe_error};
}


return actual_client_->delete_objects(s3_object_names, bucket_name);
}

// Using a fixed page size since it's only being used for simple tests.
// If we ever need to configure it we should move it to the s3 proto config instead.
constexpr auto page_size = 10;
S3Result<ListObjectsOutput> S3ClientTestWrapper::list_objects(
const std::string& name_prefix,
const std::string& bucket_name,
const std::optional<std::string>& continuation_token) const {
auto maybe_error = has_failure_trigger(bucket_name);
if (maybe_error.has_value()) {
return {*maybe_error};
}

return actual_client_->list_objects(name_prefix, bucket_name, continuation_token);
}

}

}
73 changes: 73 additions & 0 deletions cpp/arcticdb/storage/s3/s3_client_wrapper.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/* Copyright 2023 Man Group Operations Limited
*
* Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt.
*
* As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
*/

#pragma once

#include <aws/s3/S3Client.h>

#include <arcticdb/storage/s3/s3_client_interface.hpp>
#include <arcticdb/storage/s3/s3_client_impl.hpp>

#include <arcticdb/util/preconditions.hpp>
#include <arcticdb/util/pb_util.hpp>
#include <arcticdb/log/log.hpp>
#include <arcticdb/util/buffer_pool.hpp>

#include <arcticdb/storage/object_store_utils.hpp>
#include <arcticdb/storage/storage_utils.hpp>
#include <arcticdb/entity/serialized_key.hpp>
#include <arcticdb/util/configs_map.hpp>
#include <arcticdb/util/composite.hpp>

namespace arcticdb::storage::s3 {

// A wrapper around the actual S3 client which can simulate failures based on the configuration.
// The S3ClientTestWrapper delegates to the real client by default, but can intercept operations
// to simulate failures or track operations for testing purposes.
class S3ClientTestWrapper : public S3ClientInterface {
public:
explicit S3ClientTestWrapper(std::unique_ptr<S3ClientInterface> actual_client) :
actual_client_(std::move(actual_client)) {
}

~S3ClientTestWrapper() override = default;

[[nodiscard]] S3Result<std::monostate> head_object(
const std::string& s3_object_name,
const std::string& bucket_name) const override;

[[nodiscard]] S3Result<Segment> get_object(
const std::string& s3_object_name,
const std::string& bucket_name) const override;

[[nodiscard]] folly::Future<S3Result<Segment>> get_object_async(
const std::string& s3_object_name,
const std::string& bucket_name) const override;

S3Result<std::monostate> put_object(
const std::string& s3_object_name,
Segment& segment,
const std::string& bucket_name,
PutHeader header = PutHeader::NONE) override;

S3Result<DeleteOutput> delete_objects(
const std::vector<std::string>& s3_object_names,
const std::string& bucket_name) override;

S3Result<ListObjectsOutput> list_objects(
const std::string& prefix,
const std::string& bucket_name,
const std::optional<std::string>& continuation_token) const override;

private:
// Returns error if failures are enabled for the given bucket
std::optional<Aws::S3::S3Error> has_failure_trigger(const std::string& bucket_name) const;

std::unique_ptr<S3ClientInterface> actual_client_;
};

}
14 changes: 11 additions & 3 deletions cpp/arcticdb/storage/s3/s3_settings.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,19 @@ class S3Settings {
bool use_raw_prefix_;
AWSAuthMethod aws_auth_;
std::string aws_profile_;
bool use_internal_client_wrapper_for_testing_;

public:
explicit S3Settings(AWSAuthMethod aws_auth, const std::string& aws_profile) :
explicit S3Settings(AWSAuthMethod aws_auth,
const std::string& aws_profile,
bool use_internal_client_wrapper_for_testing) :
aws_auth_(aws_auth),
aws_profile_(aws_profile) {
aws_profile_(aws_profile),
use_internal_client_wrapper_for_testing_(use_internal_client_wrapper_for_testing) {
}

explicit S3Settings(const arcticc::pb2::s3_storage_pb2::Config& config) :
S3Settings(AWSAuthMethod::DISABLED, "")
S3Settings(AWSAuthMethod::DISABLED, "", false)
{
update(config);
}
Expand Down Expand Up @@ -135,6 +139,10 @@ class S3Settings {
return aws_auth_;
}

bool use_internal_client_wrapper_for_testing() const {
return use_internal_client_wrapper_for_testing_;
}

std::string aws_profile() const {
return aws_profile_;
}
Expand Down
6 changes: 6 additions & 0 deletions cpp/arcticdb/storage/s3/s3_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <arcticdb/log/log.hpp>
#include <arcticdb/storage/s3/s3_api.hpp>
#include <arcticdb/storage/s3/s3_client_wrapper.hpp>
#include <arcticdb/util/buffer_pool.hpp>
#include <arcticdb/storage/object_store_utils.hpp>
#include <arcticdb/storage/storage_options.hpp>
Expand Down Expand Up @@ -135,6 +136,11 @@ void S3Storage::create_s3_client(const S3Settings &conf, const Aws::Auth::AWSCre
ARCTICDB_RUNTIME_DEBUG(log::storage(), "Using provided auth credentials");
s3_client_ = std::make_unique<S3ClientImpl>(creds, get_s3_config(conf), Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, conf.use_virtual_addressing());
}

if (conf.use_internal_client_wrapper_for_testing()){
ARCTICDB_RUNTIME_DEBUG(log::storage(), "Using internal client wrapper for testing");
s3_client_ = std::make_unique<S3ClientTestWrapper>(std::move(s3_client_));
}
}

S3Storage::S3Storage(const LibraryPath &library_path, OpenMode mode, const S3Settings &conf) :
Expand Down
Loading

0 comments on commit f6b8933

Please sign in to comment.