diff --git a/example/benchmark/src/benchmark/bandwidth.cpp b/example/benchmark/src/benchmark/bandwidth.cpp index 91ed655..1c9b74f 100644 --- a/example/benchmark/src/benchmark/bandwidth.cpp +++ b/example/benchmark/src/benchmark/bandwidth.cpp @@ -154,7 +154,8 @@ void Bandwidth::runS3(const Settings& benchmarkSettings, const string& uri) void Bandwidth::runUring(const Settings& benchmarkSettings, const string& uri) // The bandwith benchmark for uring interface { - network::TaskedSendReceiverGroup group(benchmarkSettings.concurrentRequests, benchmarkSettings.requests << 1, benchmarkSettings.chunkSize); + network::TaskedSendReceiverGroup group(benchmarkSettings.chunkSize, benchmarkSettings.requests << 1); + group.setConcurrentRequests(benchmarkSettings.concurrentRequests); vector> sendReceivers; vector> taskGroups; for (auto i = 0u; i < benchmarkSettings.concurrentThreads; i++) { diff --git a/example/simple/main.cpp b/example/simple/main.cpp index 2c61794..86005ab 100644 --- a/example/simple/main.cpp +++ b/example/simple/main.cpp @@ -18,8 +18,8 @@ int main(int /*argc*/, char** /*argv*/) { auto bucketName = "s3://anyblob:eu-central-1"; auto fileName = "anyblob/anyblob.txt"; - // Create a new task group (18 concurrent request maximum, and up to 1024 outstanding submissions) - anyblob::network::TaskedSendReceiverGroup group(18, 1024); + // Create a new task group + anyblob::network::TaskedSendReceiverGroup group; // Create an AnyBlob scheduler object for the group anyblob::network::TaskedSendReceiver sendReceiver(group); @@ -27,6 +27,10 @@ int main(int /*argc*/, char** /*argv*/) { // Create the provider for the corresponding filename auto provider = anyblob::cloud::Provider::makeProvider(bucketName, false, "", "", &sendReceiver); + // Update the concurrency according to instance settings + auto config = provider->getConfig(sendReceiver); + group.setConfig(config); + // Create the get request anyblob::network::Transaction getTxn(provider.get()); getTxn.getObjectRequest(fileName); diff --git a/include/cloud/provider.hpp b/include/cloud/provider.hpp index 7fa8248..a0f3cd2 100644 --- a/include/cloud/provider.hpp +++ b/include/cloud/provider.hpp @@ -14,6 +14,7 @@ namespace anyblob { namespace network { class TaskedSendReceiver; class Transaction; +struct Config; struct OriginalMessage; }; // namespace network namespace utils { @@ -25,7 +26,7 @@ namespace cloud { //--------------------------------------------------------------------------- /// Implements the cloud provider abstraction class Provider { - public: +public: /// The remote prefixes count static constexpr unsigned remoteFileCount = 6; /// The remote prefixes @@ -44,6 +45,7 @@ class Provider { Local = 255 }; + /// RemoteInfo struct struct RemoteInfo { /// The provider CloudService provider; @@ -57,6 +59,7 @@ class Provider { int port = 80; }; + /// Instance struct struct Instance { /// The instance type std::string type; @@ -65,10 +68,11 @@ class Provider { /// Number of vCPus unsigned vcpu; /// The network performance in Mbit/s - unsigned network; + uint64_t network; }; - protected: + +protected: CloudService _type; /// Builds the http request for downloading a blob or listing a directory [[nodiscard]] virtual std::unique_ptr> getRequest(const std::string& filePath, const std::pair& range) const = 0; @@ -95,7 +99,7 @@ class Provider { /// Initialize secret virtual void initSecret(network::TaskedSendReceiver& /*sendReceiver*/) {} - public: +public: /// The destructor virtual ~Provider() = default; /// Gets the cloud provider type @@ -115,6 +119,7 @@ class Provider { /// Parse a row from csv file [[nodiscard]] static std::vector parseCSVRow(std::string_view body); + /// Create a provider (keyId is access email for GCP/Azure) [[nodiscard]] static std::unique_ptr makeProvider(const std::string& filepath, bool https = true, const std::string& keyId = "", const std::string& keyFile = "", network::TaskedSendReceiver* sendReceiver = nullptr); @@ -122,6 +127,8 @@ class Provider { virtual void initResolver(network::TaskedSendReceiver& /*sendReceiver*/) {} /// Get the instance infos [[nodiscard]] virtual Instance getInstanceDetails(network::TaskedSendReceiver& sendReceiver) = 0; + /// Get the config + [[nodiscard]] virtual network::Config getConfig(network::TaskedSendReceiver& sendReceiver); friend network::Transaction; }; diff --git a/include/network/config.hpp b/include/network/config.hpp new file mode 100644 index 0000000..ff07d53 --- /dev/null +++ b/include/network/config.hpp @@ -0,0 +1,43 @@ +#pragma once +#include +#include +//--------------------------------------------------------------------------- +// AnyBlob - Universal Cloud Object Storage Library +// Dominik Durner, 2023 +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/. +// SPDX-License-Identifier: MPL-2.0 +//--------------------------------------------------------------------------- +namespace anyblob { +namespace network { +//--------------------------------------------------------------------------- +/// Config for number of retriever threads and bandwidth +struct Config { + /// Default concurrent requests to achieve coreThroughput (based on AWS experiments) + static constexpr uint64_t defaultCoreConcurrency = 20; + /// Default throuput per core in Mbit/s (based on AWS experiments) + /// Per-object bandwidth: 8,000 Mbits / 20 Requests = 400 Mbits / Requests = 50 MiBs / Request + /// Total requests example: 100,000 Mbits / 400 Mbits = 250 Requests + static constexpr uint64_t defaultCoreThroughput = 8000; + + + /// Throuput per core in Mbit/s + uint64_t _coreThroughput; + /// Concurrent requests to achieve coreThroughput + unsigned _coreConcurreny; + /// The network performance in Mbit/s + uint64_t _network; + + /// Get the network bandwidth + constexpr auto bandwidth() const { return _network; } + /// Get the number of requests per core + constexpr auto coreRequests() const { return _coreConcurreny; } + /// Get the total outstanding requests + constexpr auto totalRequests() const { return retrievers() * coreRequests(); } + /// Get the number of retriever threads to saturate bandwidth + constexpr unsigned retrievers() const { return std::ceil(static_cast(_network) / _coreThroughput); } +}; +//--------------------------------------------------------------------------- +} // namespace network +} // namespace anyblob diff --git a/include/network/tasked_send_receiver.hpp b/include/network/tasked_send_receiver.hpp index 89e78b5..024f493 100644 --- a/include/network/tasked_send_receiver.hpp +++ b/include/network/tasked_send_receiver.hpp @@ -2,6 +2,7 @@ #include "network/io_uring_socket.hpp" #include "network/message_task.hpp" #include "network/tls_context.hpp" +#include "network/config.hpp" #include "utils/ring_buffer.hpp" #include #include @@ -29,7 +30,7 @@ struct TimingHelper; namespace network { //--------------------------------------------------------------------------- class TaskedSendReceiver; -class TaskSendReceiverHandle; +class TaskedSendReceiverHandle; class Resolver; struct OriginalMessage; //--------------------------------------------------------------------------- @@ -48,6 +49,8 @@ class TaskedSendReceiverGroup { /// Implicitly handle the unused send receivers std::atomic _head; + /// The number of submissions in queue (per thread) + static constexpr uint64_t submissionPerCore = 1 << 10; /// The recv chunk size uint64_t _chunkSize; /// The queue maximum for each TaskedSendReceiver @@ -60,7 +63,7 @@ class TaskedSendReceiverGroup { public: /// Initializes the global submissions and completions - TaskedSendReceiverGroup(uint64_t concurrentRequests, uint64_t submissions, uint64_t chunkSize = 64u * 1024, uint64_t reuse = 0); + TaskedSendReceiverGroup(uint64_t chunkSize = 64u * 1024, uint64_t submissions = std::thread::hardware_concurrency() * submissionPerCore, uint64_t reuse = 0); /// Destructor ~TaskedSendReceiverGroup(); @@ -69,12 +72,27 @@ class TaskedSendReceiverGroup { /// Adds a span of message to the submission queue [[nodiscard]] bool send(std::span msgs); /// Gets a tasked send receiver deamon - [[nodiscard]] TaskSendReceiverHandle getHandle(); + [[nodiscard]] TaskedSendReceiverHandle getHandle(); /// Submits group queue and waits for result void process(bool oneQueueInvocation = true); + /// Update the concurrent requests via config + void setConfig(const network::Config& config) { + if (_concurrentRequests != config.coreRequests()) + _concurrentRequests = config.coreRequests(); + } + /// Update the concurrent requests + void setConcurrentRequests(uint64_t concurrentRequests) { + if (_concurrentRequests != concurrentRequests) + _concurrentRequests = concurrentRequests; + } + /// Get the concurrent requests + uint64_t getConcurrentRequests() const { + return _concurrentRequests; + } + friend TaskedSendReceiver; - friend TaskSendReceiverHandle; + friend TaskedSendReceiverHandle; }; //--------------------------------------------------------------------------- /// Implements a send recieve roundtrip with the help of IOUringSockets @@ -125,16 +143,6 @@ class TaskedSendReceiver { /// Stops the deamon void stop() { _stopDeamon = true; } - /// Update the concurrent requests - void setConcurrentRequests(uint64_t concurrentRequests) { - if (_group._concurrentRequests != concurrentRequests) - _group._concurrentRequests = concurrentRequests; - } - /// Get the concurrent requests - uint64_t getConcurrentRequests() { - return _group._concurrentRequests; - } - /// Set the timings void setTimings(std::vector* timings) { _timings = timings; @@ -153,11 +161,11 @@ class TaskedSendReceiver { [[nodiscard]] int32_t submitRequests(); friend TaskedSendReceiverGroup; - friend TaskSendReceiverHandle; + friend TaskedSendReceiverHandle; }; //--------------------------------------------------------------------------- /// Handle to a TaskedSendReceiver -class TaskSendReceiverHandle { +class TaskedSendReceiverHandle { private: /// The shared group TaskedSendReceiverGroup* _group; @@ -165,22 +173,22 @@ class TaskSendReceiverHandle { TaskedSendReceiver* _sendReceiver; /// Default constructor is deleted - TaskSendReceiverHandle() = delete; + TaskedSendReceiverHandle() = delete; /// Consturctor - TaskSendReceiverHandle(TaskedSendReceiverGroup* group); + TaskedSendReceiverHandle(TaskedSendReceiverGroup* group); /// Delete copy - TaskSendReceiverHandle(TaskSendReceiverHandle& other) = delete; + TaskedSendReceiverHandle(TaskedSendReceiverHandle& other) = delete; /// Delete copy assignment - TaskSendReceiverHandle& operator=(TaskSendReceiverHandle& other) = delete; + TaskedSendReceiverHandle& operator=(TaskedSendReceiverHandle& other) = delete; /// Submits queue and waits for result bool sendReceive(bool oneQueueInvocation = true); public: /// Move constructor - TaskSendReceiverHandle(TaskSendReceiverHandle&& other); + TaskedSendReceiverHandle(TaskedSendReceiverHandle&& other); /// Move assignment - TaskSendReceiverHandle& operator=(TaskSendReceiverHandle&& other); + TaskedSendReceiverHandle& operator=(TaskedSendReceiverHandle&& other); /// Runs the handle, thread becomes the the TaskedSendReceiver bool run(); diff --git a/src/cloud/aws.cpp b/src/cloud/aws.cpp index c8c7918..df0ac48 100644 --- a/src/cloud/aws.cpp +++ b/src/cloud/aws.cpp @@ -185,7 +185,7 @@ void AWS::initResolver(network::TaskedSendReceiver& sendReceiver) // Inits the resolver { if (_type == Provider::CloudService::AWS) { - sendReceiver.addResolver("amazonaws.com", unique_ptr(new cloud::AWSResolver(sendReceiver.getConcurrentRequests()))); + sendReceiver.addResolver("amazonaws.com", unique_ptr(new cloud::AWSResolver(sendReceiver.getGroup()->getConcurrentRequests()))); } } //--------------------------------------------------------------------------- diff --git a/src/cloud/provider.cpp b/src/cloud/provider.cpp index f37c811..2bb6f31 100644 --- a/src/cloud/provider.cpp +++ b/src/cloud/provider.cpp @@ -6,6 +6,7 @@ #include "cloud/minio.hpp" #include "cloud/oracle.hpp" #include "network/tasked_send_receiver.hpp" +#include "network/config.hpp" #include "utils/data_vector.hpp" #include #include @@ -172,6 +173,13 @@ unique_ptr> Provider::completeMultiPartRequest(const return nullptr; } //--------------------------------------------------------------------------- +network::Config Provider::getConfig(network::TaskedSendReceiver& sendReceiver) +// Uses the send receiver to get instance configs +{ + auto instance = getInstanceDetails(sendReceiver); + return network::Config{network::Config::defaultCoreThroughput, network::Config::defaultCoreConcurrency, instance.network}; +} +//--------------------------------------------------------------------------- unique_ptr Provider::makeProvider(const string& filepath, bool https, const string& keyId, const string& secret, network::TaskedSendReceiver* sendReceiver) // Create a provider { diff --git a/src/network/tasked_send_receiver.cpp b/src/network/tasked_send_receiver.cpp index 843f0cc..c2ddcaa 100644 --- a/src/network/tasked_send_receiver.cpp +++ b/src/network/tasked_send_receiver.cpp @@ -29,7 +29,7 @@ namespace network { //--------------------------------------------------------------------------- using namespace std; //--------------------------------------------------------------------------- -TaskedSendReceiverGroup::TaskedSendReceiverGroup(uint64_t concurrentRequests, uint64_t submissions, uint64_t chunkSize, uint64_t reuse) : _submissions(submissions), _reuse(!reuse ? submissions : reuse), _sendReceivers(), _resizeMutex(), _head(nullptr), _chunkSize(chunkSize), _concurrentRequests(concurrentRequests), _cv(), _mutex() +TaskedSendReceiverGroup::TaskedSendReceiverGroup(uint64_t chunkSize, uint64_t submissions, uint64_t reuse) : _submissions(submissions), _reuse(!reuse ? submissions : reuse), _sendReceivers(), _resizeMutex(), _head(nullptr), _chunkSize(chunkSize), _concurrentRequests(network::Config::defaultCoreConcurrency), _cv(), _mutex() // Initializes the global submissions and completions { TLSContext::initOpenSSL(); @@ -56,12 +56,12 @@ bool TaskedSendReceiverGroup::send(span msgs) return (_submissions.insertAll(msgs) != ~0ull); } //--------------------------------------------------------------------------- -TaskSendReceiverHandle TaskedSendReceiverGroup::getHandle() +TaskedSendReceiverHandle TaskedSendReceiverGroup::getHandle() // Runs a new tasked send receiver deamon { for (auto head = _head.load(); head;) { if (_head.compare_exchange_weak(head, head->_next)) { - TaskSendReceiverHandle handle(this); + TaskedSendReceiverHandle handle(this); handle._sendReceiver = head; return handle; } @@ -69,7 +69,7 @@ TaskSendReceiverHandle TaskedSendReceiverGroup::getHandle() } lock_guard lg(_resizeMutex); auto& ref = _sendReceivers.emplace_back(make_unique(*this)); - TaskSendReceiverHandle handle(this); + TaskedSendReceiverHandle handle(this); handle._sendReceiver = ref.get(); return handle; } @@ -81,12 +81,12 @@ void TaskedSendReceiverGroup::process(bool oneQueueInvocation) handle.sendReceive(oneQueueInvocation); } //--------------------------------------------------------------------------- -TaskSendReceiverHandle::TaskSendReceiverHandle(TaskedSendReceiverGroup* group) : _group(group) +TaskedSendReceiverHandle::TaskedSendReceiverHandle(TaskedSendReceiverGroup* group) : _group(group) // The constructor { } //--------------------------------------------------------------------------- -TaskSendReceiverHandle::TaskSendReceiverHandle(TaskSendReceiverHandle&& other) +TaskedSendReceiverHandle::TaskedSendReceiverHandle(TaskedSendReceiverHandle&& other) // Move constructor { _group = other._group; @@ -94,7 +94,7 @@ TaskSendReceiverHandle::TaskSendReceiverHandle(TaskSendReceiverHandle&& other) other._sendReceiver = nullptr; } //--------------------------------------------------------------------------- -TaskSendReceiverHandle& TaskSendReceiverHandle::operator=(TaskSendReceiverHandle&& other) +TaskedSendReceiverHandle& TaskedSendReceiverHandle::operator=(TaskedSendReceiverHandle&& other) // Move assignment { if (other._group == _group) { @@ -104,7 +104,7 @@ TaskSendReceiverHandle& TaskSendReceiverHandle::operator=(TaskSendReceiverHandle return *this; } //--------------------------------------------------------------------------- -bool TaskSendReceiverHandle::run() +bool TaskedSendReceiverHandle::run() // Starts the deamon { if (!_sendReceiver) @@ -113,7 +113,7 @@ bool TaskSendReceiverHandle::run() return true; } //--------------------------------------------------------------------------- -void TaskSendReceiverHandle::stop() +void TaskedSendReceiverHandle::stop() // Stops the deamon { if (!_sendReceiver) @@ -129,7 +129,7 @@ void TaskSendReceiverHandle::stop() } } //--------------------------------------------------------------------------- -bool TaskSendReceiverHandle::sendReceive(bool oneQueueInvocation) +bool TaskedSendReceiverHandle::sendReceive(bool oneQueueInvocation) // Creates a sending message with chaining IOSQE_IO_LINK, creates a receiving message, submits queue, and waits for result { if (!_sendReceiver) diff --git a/test/integration/minio_async.cpp b/test/integration/minio_async.cpp index c2edbb7..38fdfde 100644 --- a/test/integration/minio_async.cpp +++ b/test/integration/minio_async.cpp @@ -49,8 +49,8 @@ TEST_CASE("MinIO Asynchronous Integration") { string longText = stringGen(1 << 24); string content[]{"Hello World!", longText}; - // Create a new task group (18 concurrent request maximum, and up to 1024 outstanding submissions) - anyblob::network::TaskedSendReceiverGroup group(18, 1024); + // Create a new task group + anyblob::network::TaskedSendReceiverGroup group; // Create an AnyBlob scheduler object for the group anyblob::network::TaskedSendReceiver sendReceiver(group); @@ -66,6 +66,11 @@ TEST_CASE("MinIO Asynchronous Integration") { // Create the provider for the corresponding filename auto provider = anyblob::cloud::Provider::makeProvider(bucketName, false, key, secret, &sendReceiver); + + // Update the concurrency according to instance settings (noop here, as minio uses default values) + auto config = provider->getConfig(sendReceiver); + group.setConfig(config); + { // Check the upload for success std::atomic finishedMessages = 0; diff --git a/test/integration/minio_sync.cpp b/test/integration/minio_sync.cpp index 3d6b581..8788556 100644 --- a/test/integration/minio_sync.cpp +++ b/test/integration/minio_sync.cpp @@ -48,14 +48,19 @@ TEST_CASE("MinIO Sync Integration") { string longText = stringGen(1 << 24); string content[]{"Hello World!", longText}; - // Create a new task group (18 concurrent request maximum, and up to 1024 outstanding submissions) - anyblob::network::TaskedSendReceiverGroup group(18, 1024); + // Create a new task group + anyblob::network::TaskedSendReceiverGroup group; // Create an AnyBlob scheduler object for the group anyblob::network::TaskedSendReceiver sendReceiver(group); // Create the provider for the corresponding filename auto provider = anyblob::cloud::Provider::makeProvider(bucketName, false, key, secret, &sendReceiver); + + // Update the concurrency according to instance settings (noop here, as minio uses default values) + auto config = provider->getConfig(sendReceiver); + group.setConfig(config); + { // Create the put request anyblob::network::Transaction putTxn(provider.get()); diff --git a/test/unit/network/send_receiver_test.cpp b/test/unit/network/send_receiver_test.cpp index 4bc1c6d..8c11d8e 100644 --- a/test/unit/network/send_receiver_test.cpp +++ b/test/unit/network/send_receiver_test.cpp @@ -29,7 +29,8 @@ TEST_CASE("send_receiver") { auto concurrency = 8; uint64_t length = 4096u; - TaskedSendReceiverGroup group(concurrency >> 1, concurrency << 2); + TaskedSendReceiverGroup group; + group.setConcurrentRequests(concurrency); vector> msgs; for (auto i = 0; i < concurrency; i++) {