Skip to content

Commit

Permalink
Added new config interface
Browse files Browse the repository at this point in the history
  • Loading branch information
durner committed Nov 17, 2023
1 parent 514195a commit 916a191
Show file tree
Hide file tree
Showing 11 changed files with 127 additions and 45 deletions.
3 changes: 2 additions & 1 deletion example/benchmark/src/benchmark/bandwidth.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ void Bandwidth::runS3(const Settings& benchmarkSettings, const string& uri)
void Bandwidth::runUring(const Settings& benchmarkSettings, const string& uri)
// The bandwith benchmark for uring interface
{
network::TaskedSendReceiverGroup group(benchmarkSettings.concurrentRequests, benchmarkSettings.requests << 1, benchmarkSettings.chunkSize);
network::TaskedSendReceiverGroup group(benchmarkSettings.chunkSize, benchmarkSettings.requests << 1);
group.setConcurrentRequests(benchmarkSettings.concurrentRequests);
vector<unique_ptr<network::TaskedSendReceiver>> sendReceivers;
vector<unique_ptr<network::TaskedSendReceiverGroup>> taskGroups;
for (auto i = 0u; i < benchmarkSettings.concurrentThreads; i++) {
Expand Down
8 changes: 6 additions & 2 deletions example/simple/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@ int main(int /*argc*/, char** /*argv*/) {
auto bucketName = "s3://anyblob:eu-central-1";
auto fileName = "anyblob/anyblob.txt";

// Create a new task group (18 concurrent request maximum, and up to 1024 outstanding submissions)
anyblob::network::TaskedSendReceiverGroup group(18, 1024);
// Create a new task group
anyblob::network::TaskedSendReceiverGroup group;

// Create an AnyBlob scheduler object for the group
anyblob::network::TaskedSendReceiver sendReceiver(group);

// Create the provider for the corresponding filename
auto provider = anyblob::cloud::Provider::makeProvider(bucketName, false, "", "", &sendReceiver);

// Update the concurrency according to instance settings
auto config = provider->getConfig(sendReceiver);
group.setConfig(config);

// Create the get request
anyblob::network::Transaction getTxn(provider.get());
getTxn.getObjectRequest(fileName);
Expand Down
15 changes: 11 additions & 4 deletions include/cloud/provider.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ namespace anyblob {
namespace network {
class TaskedSendReceiver;
class Transaction;
struct Config;
struct OriginalMessage;
}; // namespace network
namespace utils {
Expand All @@ -25,7 +26,7 @@ namespace cloud {
//---------------------------------------------------------------------------
/// Implements the cloud provider abstraction
class Provider {
public:
public:
/// The remote prefixes count
static constexpr unsigned remoteFileCount = 6;
/// The remote prefixes
Expand All @@ -44,6 +45,7 @@ class Provider {
Local = 255
};

/// RemoteInfo struct
struct RemoteInfo {
/// The provider
CloudService provider;
Expand All @@ -57,6 +59,7 @@ class Provider {
int port = 80;
};

/// Instance struct
struct Instance {
/// The instance type
std::string type;
Expand All @@ -65,10 +68,11 @@ class Provider {
/// Number of vCPus
unsigned vcpu;
/// The network performance in Mbit/s
unsigned network;
uint64_t network;
};

protected:

protected:
CloudService _type;
/// Builds the http request for downloading a blob or listing a directory
[[nodiscard]] virtual std::unique_ptr<utils::DataVector<uint8_t>> getRequest(const std::string& filePath, const std::pair<uint64_t, uint64_t>& range) const = 0;
Expand All @@ -95,7 +99,7 @@ class Provider {
/// Initialize secret
virtual void initSecret(network::TaskedSendReceiver& /*sendReceiver*/) {}

public:
public:
/// The destructor
virtual ~Provider() = default;
/// Gets the cloud provider type
Expand All @@ -115,13 +119,16 @@ class Provider {
/// Parse a row from csv file
[[nodiscard]] static std::vector<std::string> parseCSVRow(std::string_view body);


/// Create a provider (keyId is access email for GCP/Azure)
[[nodiscard]] static std::unique_ptr<Provider> makeProvider(const std::string& filepath, bool https = true, const std::string& keyId = "", const std::string& keyFile = "", network::TaskedSendReceiver* sendReceiver = nullptr);

/// Init the resolver for specific provider
virtual void initResolver(network::TaskedSendReceiver& /*sendReceiver*/) {}
/// Get the instance infos
[[nodiscard]] virtual Instance getInstanceDetails(network::TaskedSendReceiver& sendReceiver) = 0;
/// Get the config
[[nodiscard]] virtual network::Config getConfig(network::TaskedSendReceiver& sendReceiver);

friend network::Transaction;
};
Expand Down
43 changes: 43 additions & 0 deletions include/network/config.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#pragma once
#include <cmath>
#include <cstdint>
//---------------------------------------------------------------------------
// AnyBlob - Universal Cloud Object Storage Library
// Dominik Durner, 2023
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
// SPDX-License-Identifier: MPL-2.0
//---------------------------------------------------------------------------
namespace anyblob {
namespace network {
//---------------------------------------------------------------------------
/// Config for number of retriever threads and bandwidth
struct Config {
/// Default concurrent requests to achieve coreThroughput (based on AWS experiments)
static constexpr uint64_t defaultCoreConcurrency = 20;
/// Default throuput per core in Mbit/s (based on AWS experiments)
/// Per-object bandwidth: 8,000 Mbits / 20 Requests = 400 Mbits / Requests = 50 MiBs / Request
/// Total requests example: 100,000 Mbits / 400 Mbits = 250 Requests
static constexpr uint64_t defaultCoreThroughput = 8000;


/// Throuput per core in Mbit/s
uint64_t _coreThroughput;
/// Concurrent requests to achieve coreThroughput
unsigned _coreConcurreny;
/// The network performance in Mbit/s
uint64_t _network;

/// Get the network bandwidth
constexpr auto bandwidth() const { return _network; }
/// Get the number of requests per core
constexpr auto coreRequests() const { return _coreConcurreny; }
/// Get the total outstanding requests
constexpr auto totalRequests() const { return retrievers() * coreRequests(); }
/// Get the number of retriever threads to saturate bandwidth
constexpr unsigned retrievers() const { return std::ceil(static_cast<double>(_network) / _coreThroughput); }
};
//---------------------------------------------------------------------------
} // namespace network
} // namespace anyblob
52 changes: 30 additions & 22 deletions include/network/tasked_send_receiver.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "network/io_uring_socket.hpp"
#include "network/message_task.hpp"
#include "network/tls_context.hpp"
#include "network/config.hpp"
#include "utils/ring_buffer.hpp"
#include <atomic>
#include <condition_variable>
Expand Down Expand Up @@ -29,7 +30,7 @@ struct TimingHelper;
namespace network {
//---------------------------------------------------------------------------
class TaskedSendReceiver;
class TaskSendReceiverHandle;
class TaskedSendReceiverHandle;
class Resolver;
struct OriginalMessage;
//---------------------------------------------------------------------------
Expand All @@ -48,6 +49,8 @@ class TaskedSendReceiverGroup {
/// Implicitly handle the unused send receivers
std::atomic<TaskedSendReceiver*> _head;

/// The number of submissions in queue (per thread)
static constexpr uint64_t submissionPerCore = 1 << 10;
/// The recv chunk size
uint64_t _chunkSize;
/// The queue maximum for each TaskedSendReceiver
Expand All @@ -60,7 +63,7 @@ class TaskedSendReceiverGroup {

public:
/// Initializes the global submissions and completions
TaskedSendReceiverGroup(uint64_t concurrentRequests, uint64_t submissions, uint64_t chunkSize = 64u * 1024, uint64_t reuse = 0);
TaskedSendReceiverGroup(uint64_t chunkSize = 64u * 1024, uint64_t submissions = std::thread::hardware_concurrency() * submissionPerCore, uint64_t reuse = 0);
/// Destructor
~TaskedSendReceiverGroup();

Expand All @@ -69,12 +72,27 @@ class TaskedSendReceiverGroup {
/// Adds a span of message to the submission queue
[[nodiscard]] bool send(std::span<OriginalMessage*> msgs);
/// Gets a tasked send receiver deamon
[[nodiscard]] TaskSendReceiverHandle getHandle();
[[nodiscard]] TaskedSendReceiverHandle getHandle();
/// Submits group queue and waits for result
void process(bool oneQueueInvocation = true);

/// Update the concurrent requests via config
void setConfig(const network::Config& config) {
if (_concurrentRequests != config.coreRequests())
_concurrentRequests = config.coreRequests();
}
/// Update the concurrent requests
void setConcurrentRequests(uint64_t concurrentRequests) {
if (_concurrentRequests != concurrentRequests)
_concurrentRequests = concurrentRequests;
}
/// Get the concurrent requests
uint64_t getConcurrentRequests() const {
return _concurrentRequests;
}

friend TaskedSendReceiver;
friend TaskSendReceiverHandle;
friend TaskedSendReceiverHandle;
};
//---------------------------------------------------------------------------
/// Implements a send recieve roundtrip with the help of IOUringSockets
Expand Down Expand Up @@ -125,16 +143,6 @@ class TaskedSendReceiver {
/// Stops the deamon
void stop() { _stopDeamon = true; }

/// Update the concurrent requests
void setConcurrentRequests(uint64_t concurrentRequests) {
if (_group._concurrentRequests != concurrentRequests)
_group._concurrentRequests = concurrentRequests;
}
/// Get the concurrent requests
uint64_t getConcurrentRequests() {
return _group._concurrentRequests;
}

/// Set the timings
void setTimings(std::vector<utils::TimingHelper>* timings) {
_timings = timings;
Expand All @@ -153,34 +161,34 @@ class TaskedSendReceiver {
[[nodiscard]] int32_t submitRequests();

friend TaskedSendReceiverGroup;
friend TaskSendReceiverHandle;
friend TaskedSendReceiverHandle;
};
//---------------------------------------------------------------------------
/// Handle to a TaskedSendReceiver
class TaskSendReceiverHandle {
class TaskedSendReceiverHandle {
private:
/// The shared group
TaskedSendReceiverGroup* _group;
/// The send receiver
TaskedSendReceiver* _sendReceiver;

/// Default constructor is deleted
TaskSendReceiverHandle() = delete;
TaskedSendReceiverHandle() = delete;
/// Consturctor
TaskSendReceiverHandle(TaskedSendReceiverGroup* group);
TaskedSendReceiverHandle(TaskedSendReceiverGroup* group);
/// Delete copy
TaskSendReceiverHandle(TaskSendReceiverHandle& other) = delete;
TaskedSendReceiverHandle(TaskedSendReceiverHandle& other) = delete;
/// Delete copy assignment
TaskSendReceiverHandle& operator=(TaskSendReceiverHandle& other) = delete;
TaskedSendReceiverHandle& operator=(TaskedSendReceiverHandle& other) = delete;

/// Submits queue and waits for result
bool sendReceive(bool oneQueueInvocation = true);

public:
/// Move constructor
TaskSendReceiverHandle(TaskSendReceiverHandle&& other);
TaskedSendReceiverHandle(TaskedSendReceiverHandle&& other);
/// Move assignment
TaskSendReceiverHandle& operator=(TaskSendReceiverHandle&& other);
TaskedSendReceiverHandle& operator=(TaskedSendReceiverHandle&& other);

/// Runs the handle, thread becomes the the TaskedSendReceiver
bool run();
Expand Down
2 changes: 1 addition & 1 deletion src/cloud/aws.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ void AWS::initResolver(network::TaskedSendReceiver& sendReceiver)
// Inits the resolver
{
if (_type == Provider::CloudService::AWS) {
sendReceiver.addResolver("amazonaws.com", unique_ptr<network::Resolver>(new cloud::AWSResolver(sendReceiver.getConcurrentRequests())));
sendReceiver.addResolver("amazonaws.com", unique_ptr<network::Resolver>(new cloud::AWSResolver(sendReceiver.getGroup()->getConcurrentRequests())));
}
}
//---------------------------------------------------------------------------
Expand Down
8 changes: 8 additions & 0 deletions src/cloud/provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "cloud/minio.hpp"
#include "cloud/oracle.hpp"
#include "network/tasked_send_receiver.hpp"
#include "network/config.hpp"
#include "utils/data_vector.hpp"
#include <fstream>
#include <istream>
Expand Down Expand Up @@ -172,6 +173,13 @@ unique_ptr<utils::DataVector<uint8_t>> Provider::completeMultiPartRequest(const
return nullptr;
}
//---------------------------------------------------------------------------
network::Config Provider::getConfig(network::TaskedSendReceiver& sendReceiver)
// Uses the send receiver to get instance configs
{
auto instance = getInstanceDetails(sendReceiver);
return network::Config{network::Config::defaultCoreThroughput, network::Config::defaultCoreConcurrency, instance.network};
}
//---------------------------------------------------------------------------
unique_ptr<Provider> Provider::makeProvider(const string& filepath, bool https, const string& keyId, const string& secret, network::TaskedSendReceiver* sendReceiver)
// Create a provider
{
Expand Down
20 changes: 10 additions & 10 deletions src/network/tasked_send_receiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ namespace network {
//---------------------------------------------------------------------------
using namespace std;
//---------------------------------------------------------------------------
TaskedSendReceiverGroup::TaskedSendReceiverGroup(uint64_t concurrentRequests, uint64_t submissions, uint64_t chunkSize, uint64_t reuse) : _submissions(submissions), _reuse(!reuse ? submissions : reuse), _sendReceivers(), _resizeMutex(), _head(nullptr), _chunkSize(chunkSize), _concurrentRequests(concurrentRequests), _cv(), _mutex()
TaskedSendReceiverGroup::TaskedSendReceiverGroup(uint64_t chunkSize, uint64_t submissions, uint64_t reuse) : _submissions(submissions), _reuse(!reuse ? submissions : reuse), _sendReceivers(), _resizeMutex(), _head(nullptr), _chunkSize(chunkSize), _concurrentRequests(network::Config::defaultCoreConcurrency), _cv(), _mutex()
// Initializes the global submissions and completions
{
TLSContext::initOpenSSL();
Expand All @@ -56,20 +56,20 @@ bool TaskedSendReceiverGroup::send(span<OriginalMessage*> msgs)
return (_submissions.insertAll(msgs) != ~0ull);
}
//---------------------------------------------------------------------------
TaskSendReceiverHandle TaskedSendReceiverGroup::getHandle()
TaskedSendReceiverHandle TaskedSendReceiverGroup::getHandle()
// Runs a new tasked send receiver deamon
{
for (auto head = _head.load(); head;) {
if (_head.compare_exchange_weak(head, head->_next)) {
TaskSendReceiverHandle handle(this);
TaskedSendReceiverHandle handle(this);
handle._sendReceiver = head;
return handle;
}
head = _head;
}
lock_guard<mutex> lg(_resizeMutex);
auto& ref = _sendReceivers.emplace_back(make_unique<TaskedSendReceiver>(*this));
TaskSendReceiverHandle handle(this);
TaskedSendReceiverHandle handle(this);
handle._sendReceiver = ref.get();
return handle;
}
Expand All @@ -81,20 +81,20 @@ void TaskedSendReceiverGroup::process(bool oneQueueInvocation)
handle.sendReceive(oneQueueInvocation);
}
//---------------------------------------------------------------------------
TaskSendReceiverHandle::TaskSendReceiverHandle(TaskedSendReceiverGroup* group) : _group(group)
TaskedSendReceiverHandle::TaskedSendReceiverHandle(TaskedSendReceiverGroup* group) : _group(group)
// The constructor
{
}
//---------------------------------------------------------------------------
TaskSendReceiverHandle::TaskSendReceiverHandle(TaskSendReceiverHandle&& other)
TaskedSendReceiverHandle::TaskedSendReceiverHandle(TaskedSendReceiverHandle&& other)
// Move constructor
{
_group = other._group;
_sendReceiver = other._sendReceiver;
other._sendReceiver = nullptr;
}
//---------------------------------------------------------------------------
TaskSendReceiverHandle& TaskSendReceiverHandle::operator=(TaskSendReceiverHandle&& other)
TaskedSendReceiverHandle& TaskedSendReceiverHandle::operator=(TaskedSendReceiverHandle&& other)
// Move assignment
{
if (other._group == _group) {
Expand All @@ -104,7 +104,7 @@ TaskSendReceiverHandle& TaskSendReceiverHandle::operator=(TaskSendReceiverHandle
return *this;
}
//---------------------------------------------------------------------------
bool TaskSendReceiverHandle::run()
bool TaskedSendReceiverHandle::run()
// Starts the deamon
{
if (!_sendReceiver)
Expand All @@ -113,7 +113,7 @@ bool TaskSendReceiverHandle::run()
return true;
}
//---------------------------------------------------------------------------
void TaskSendReceiverHandle::stop()
void TaskedSendReceiverHandle::stop()
// Stops the deamon
{
if (!_sendReceiver)
Expand All @@ -129,7 +129,7 @@ void TaskSendReceiverHandle::stop()
}
}
//---------------------------------------------------------------------------
bool TaskSendReceiverHandle::sendReceive(bool oneQueueInvocation)
bool TaskedSendReceiverHandle::sendReceive(bool oneQueueInvocation)
// Creates a sending message with chaining IOSQE_IO_LINK, creates a receiving message, submits queue, and waits for result
{
if (!_sendReceiver)
Expand Down
Loading

0 comments on commit 916a191

Please sign in to comment.