From a86ff1fe305664e7a90f91f5a9c545f20ac45a00 Mon Sep 17 00:00:00 2001 From: Dominik Durner Date: Mon, 3 Jul 2023 19:31:47 +0200 Subject: [PATCH] Added MultiPart Upload to Transaction --- example/benchmark/src/benchmark/bandwidth.cpp | 4 +- include/cloud/aws.hpp | 4 +- include/cloud/minio.hpp | 2 + include/network/transaction.hpp | 199 +++++++++++++++--- src/cloud/aws.cpp | 9 +- src/cloud/gcp.cpp | 9 +- src/network/transaction.cpp | 41 +++- test/integration/minio_async.cpp | 178 ++++++++++++++++ .../integration/{minio.cpp => minio_sync.cpp} | 27 ++- 9 files changed, 416 insertions(+), 57 deletions(-) create mode 100644 test/integration/minio_async.cpp rename test/integration/{minio.cpp => minio_sync.cpp} (86%) diff --git a/example/benchmark/src/benchmark/bandwidth.cpp b/example/benchmark/src/benchmark/bandwidth.cpp index c3e17d3..91ed655 100644 --- a/example/benchmark/src/benchmark/bandwidth.cpp +++ b/example/benchmark/src/benchmark/bandwidth.cpp @@ -249,7 +249,7 @@ void Bandwidth::runUring(const Settings& benchmarkSettings, const string& uri) } for (auto i = 0u; i < benchmarkSettings.concurrentThreads; i++) - getTxn[i].processAsync(*sendReceivers.back()); + getTxn[i].processAsync(group); fstream s; unique_ptr timer; @@ -317,7 +317,7 @@ void Bandwidth::runUring(const Settings& benchmarkSettings, const string& uri) sendReceivers[i]->setTimings(nullptr); } - putTxn.processAsync(*sendReceivers.back()); + putTxn.processAsync(group); while (finishedMessages != requestPerSocket * benchmarkSettings.concurrentThreads) usleep(100); diff --git a/include/cloud/aws.hpp b/include/cloud/aws.hpp index 0a57cc2..2b68a4e 100644 --- a/include/cloud/aws.hpp +++ b/include/cloud/aws.hpp @@ -63,6 +63,8 @@ class AWS : public Provider { Settings _settings; /// The secret std::unique_ptr _secret; + /// The multipart upload size + uint64_t _multipartUploadSize = 128ull << 20; public: /// Get instance details @@ -104,7 +106,7 @@ class AWS : public Provider { /// Get the settings [[nodiscard]] inline Settings getSettings() { return _settings; } /// Allows multipart upload if size > 0 - [[nodiscard]] uint64_t multipartUploadSize() const override { return 128ull << 20; } + [[nodiscard]] uint64_t multipartUploadSize() const override { return _multipartUploadSize; } /// Builds the http request for downloading a blob or listing the directory [[nodiscard]] std::unique_ptr> getRequest(const std::string& filePath, const std::pair& range) const override; diff --git a/include/cloud/minio.hpp b/include/cloud/minio.hpp index b804b1f..fff6f67 100644 --- a/include/cloud/minio.hpp +++ b/include/cloud/minio.hpp @@ -29,6 +29,8 @@ class MinIO : public AWS { [[nodiscard]] std::string getAddress() const override; /// Get the instance details [[nodiscard]] Provider::Instance getInstanceDetails(network::TaskedSendReceiver& sendReceiver) override; + /// Set the upload split size + constexpr void setMultipartUploadSize(uint64_t size) { _multipartUploadSize = size; } friend Provider; }; diff --git a/include/network/transaction.hpp b/include/network/transaction.hpp index 0c99764..dc4ab90 100644 --- a/include/network/transaction.hpp +++ b/include/network/transaction.hpp @@ -2,6 +2,7 @@ #include "cloud/provider.hpp" #include "network/message_result.hpp" #include "network/original_message.hpp" +#include #include #include #include @@ -33,92 +34,230 @@ class Transaction { class ConstIterator; protected: - /// The provider - const cloud::Provider* provider; /// The message typedef using message_vector_type = std::vector>; + + /// The multipart upload struct + struct MultipartUpload { + /// The multipart state + enum State : uint8_t { + Default = 0, + Sending = 1, + Validating = 2, + }; + /// The uploadId + std::string uploadId; + /// The submessages + message_vector_type messages; + /// The eTags + std::vector eTags; + /// The number of outstanding part requests + std::atomic outstanding; + /// The state + std::atomic state; + + /// The constructor + explicit MultipartUpload(int parts) : messages(parts + 1), eTags(parts), outstanding(parts), state(State::Default) {} + /// Copy constructor + MultipartUpload(MultipartUpload& other) = delete; + /// Move constructor + MultipartUpload(MultipartUpload&& other) noexcept : uploadId(std::move(other.uploadId)), messages(std::move(other.messages)), eTags(std::move(other.eTags)), outstanding(other.outstanding.load()), state(other.state.load()) {} + /// Copy assignment + MultipartUpload& operator=(MultipartUpload other) = delete; + }; + + /// The provider + const cloud::Provider* _provider; + /// The message - message_vector_type messages; + message_vector_type _messages; + // The message coutner + std::atomic _messageCounter; + /// Multipart uploads + std::vector _multipartUploads; + /// Finished multipart uploads + std::atomic _completedMultiparts; - public: + /// Helper function to build callback messages + template + std::unique_ptr> makeCallbackMessage(Callback&& c, Arguments&&... args) { + return std::make_unique>(std::forward(c), std::forward(args)...); + } + public: /// The constructor - Transaction() = default; + Transaction() : _provider(), _messages(), _messageCounter(), _multipartUploads(), _completedMultiparts() {} /// The explicit constructor with the provider - explicit Transaction(const cloud::Provider* provider) : provider(provider), messages() {} + explicit Transaction(const cloud::Provider* provider) : _provider(provider), _messages(), _messageCounter(), _multipartUploads(), _completedMultiparts() {} /// Set the provider - constexpr void setProvider(const cloud::Provider* provider) { this->provider = provider; } + constexpr void setProvider(const cloud::Provider* provider) { this->_provider = provider; } /// Sends the request messages to the task group - void processAsync(TaskedSendReceiver& sendReceiver); + void processAsync(network::TaskedSendReceiverGroup& group); /// Processes the request messages void processSync(TaskedSendReceiver& sendReceiver); /// Build a new get request for synchronous calls /// Note that the range is [start, end[, [0, 0[ gets the whole object inline void getObjectRequest(const std::string& remotePath, std::pair range = {0, 0}, uint8_t* result = nullptr, uint64_t capacity = 0, uint64_t traceId = 0) { - assert(provider); - auto originalMsg = std::make_unique(provider->getRequest(remotePath, range), provider->getAddress(), provider->getPort(), result, capacity, traceId); - messages.push_back(std::move(originalMsg)); + assert(_provider); + auto originalMsg = std::make_unique(_provider->getRequest(remotePath, range), _provider->getAddress(), _provider->getPort(), result, capacity, traceId); + _messages.push_back(std::move(originalMsg)); } /// Build a new get request with callback /// Note that the range is [start, end[, [0, 0[ gets the whole object template inline void getObjectRequest(Callback&& callback, const std::string& remotePath, std::pair range = {0, 0}, uint8_t* result = nullptr, uint64_t capacity = 0, uint64_t traceId = 0) { - assert(provider); - auto originalMsg = std::make_unique>(std::forward(callback), provider->getRequest(remotePath, range), provider->getAddress(), provider->getPort(), result, capacity, traceId); - messages.push_back(std::move(originalMsg)); + assert(_provider); + auto originalMsg = std::make_unique>(std::forward(callback), _provider->getRequest(remotePath, range), _provider->getAddress(), _provider->getPort(), result, capacity, traceId); + _messages.push_back(std::move(originalMsg)); } /// Build a new put request for synchronous calls inline void putObjectRequest(const std::string& remotePath, const char* data, uint64_t size, uint8_t* result = nullptr, uint64_t capacity = 0, uint64_t traceId = 0) { - assert(provider); + assert(_provider); + if (_provider->multipartUploadSize() && size > _provider->multipartUploadSize()) + return putObjectRequestMultiPart(remotePath, data, size, result, capacity, traceId); auto object = std::string_view(data, size); - auto originalMsg = std::make_unique(provider->putRequest(remotePath, object), provider->getAddress(), provider->getPort(), result, capacity, traceId); + auto originalMsg = std::make_unique(_provider->putRequest(remotePath, object), _provider->getAddress(), _provider->getPort(), result, capacity, traceId); originalMsg->setPutRequestData(reinterpret_cast(data), size); - messages.push_back(std::move(originalMsg)); + _messages.push_back(std::move(originalMsg)); } /// Build a new put request with callback template inline void putObjectRequest(Callback&& callback, const std::string& remotePath, const char* data, uint64_t size, uint8_t* result = nullptr, uint64_t capacity = 0, uint64_t traceId = 0) { - assert(provider); + assert(_provider); + if (_provider->multipartUploadSize() && size > _provider->multipartUploadSize()) + return putObjectRequestMultiPart(std::forward(callback), remotePath, data, size, result, capacity, traceId); auto object = std::string_view(data, size); - auto originalMsg = std::make_unique>(std::forward(callback), provider->putRequest(remotePath, object), provider->getAddress(), provider->getPort(), result, capacity, traceId); + auto originalMsg = std::make_unique>(std::forward(callback), _provider->putRequest(remotePath, object), _provider->getAddress(), _provider->getPort(), result, capacity, traceId); originalMsg->setPutRequestData(reinterpret_cast(data), size); - messages.push_back(std::move(originalMsg)); + _messages.push_back(std::move(originalMsg)); } /// Build a new delete request for synchronous calls inline void deleteObjectRequest(const std::string& remotePath, uint8_t* result = nullptr, uint64_t capacity = 0, uint64_t traceId = 0) { - assert(provider); - auto originalMsg = std::make_unique(provider->deleteRequest(remotePath), provider->getAddress(), provider->getPort(), result, capacity, traceId); - messages.push_back(std::move(originalMsg)); + assert(_provider); + auto originalMsg = std::make_unique(_provider->deleteRequest(remotePath), _provider->getAddress(), _provider->getPort(), result, capacity, traceId); + _messages.push_back(std::move(originalMsg)); } /// Build a new delete request with callback template inline void deleteObjectRequest(Callback&& callback, const std::string& remotePath, uint8_t* result = nullptr, uint64_t capacity = 0, uint64_t traceId = 0) { - assert(provider); - auto originalMsg = std::make_unique>(std::forward(callback), provider->deleteRequest(remotePath), provider->getAddress(), provider->getPort(), result, capacity, traceId); - messages.push_back(std::move(originalMsg)); + assert(_provider); + auto originalMsg = std::make_unique>(std::forward(callback), _provider->deleteRequest(remotePath), _provider->getAddress(), _provider->getPort(), result, capacity, traceId); + _messages.push_back(std::move(originalMsg)); } + private: + /// Build a new put request for synchronous calls + inline void putObjectRequestMultiPart(const std::string& remotePath, const char* data, uint64_t size, uint8_t* result = nullptr, uint64_t capacity = 0, uint64_t traceId = 0) { + assert(_provider); + auto splitSize = _provider->multipartUploadSize(); + auto parts = (size / splitSize) + ((size % splitSize) ? 1u : 0u); + _multipartUploads.emplace_back(parts); + auto position = _multipartUploads.size() - 1; + + auto uploadMessages = [position, parts, data, remotePath, traceId, splitSize, size, this](network::MessageResult& result) { + if (!result.success()) + return; + + _multipartUploads[position].uploadId = _provider->getUploadId(result.getResult()); + auto offset = 0ull; + for (auto i = 1ull; i <= parts; i++) { + auto finishMultipart = [position, remotePath, traceId, i, parts, this](network::MessageResult& result) { + // TODO: requires abort handling + if (!result.success()) + return; + + _multipartUploads[position].eTags[i - 1] = _provider->getETag(std::string_view(reinterpret_cast(result.getData()), result.getOffset())); + if (_multipartUploads[position].outstanding.fetch_sub(1) == 1) { + auto finished = [this](network::MessageResult& /*result*/) { + _completedMultiparts++; + }; + auto originalMsg = makeCallbackMessage(std::move(finished), _provider->completeMultiPartRequest(remotePath, _multipartUploads[position].uploadId, _multipartUploads[position].eTags), _provider->getAddress(), _provider->getPort(), nullptr, 0, traceId); + _multipartUploads[position].messages[parts] = std::move(originalMsg); + _multipartUploads[position].state = MultipartUpload::State::Validating; + } + }; + auto partSize = (i != parts) ? splitSize : size - offset; + auto object = std::string_view(data + offset, partSize); + auto originalMsg = makeCallbackMessage(std::move(finishMultipart), _provider->putRequestGeneric(remotePath, object, i, _multipartUploads[position].uploadId), _provider->getAddress(), _provider->getPort(), nullptr, 0, traceId); + originalMsg->setPutRequestData(reinterpret_cast(data + offset), partSize); + _multipartUploads[position].messages[i - 1] = std::move(originalMsg); + offset += partSize; + } + _multipartUploads[position].state = MultipartUpload::State::Sending; + }; + + auto originalMsg = makeCallbackMessage(std::move(uploadMessages), _provider->createMultiPartRequest(remotePath), _provider->getAddress(), _provider->getPort(), result, capacity, traceId); + _messages.push_back(std::move(originalMsg)); + } + + /// Build a new put request with callback + template + inline void putObjectRequestMultiPart(Callback&& callback, const std::string& remotePath, const char* data, uint64_t size, uint8_t* result = nullptr, uint64_t capacity = 0, uint64_t traceId = 0) { + assert(_provider); + auto splitSize = _provider->multipartUploadSize(); + auto parts = (size / splitSize) + ((size % splitSize) ? 1u : 0u); + _multipartUploads.emplace_back(parts); + auto position = _multipartUploads.size() - 1; + + auto uploadMessages = [&callback, position, parts, data, remotePath, traceId, splitSize, size, this](network::MessageResult& result) { + if (!result.success()) + return; + + _multipartUploads[position].uploadId = _provider->getUploadId(result.getResult()); + auto offset = 0ull; + for (auto i = 1ull; i <= parts; i++) { + auto finishMultipart = [&callback, position, remotePath, traceId, i, parts, this](network::MessageResult& result) { + // TODO: requires abort handling + if (!result.success()) + return; + + _multipartUploads[position].eTags[i - 1] = _provider->getETag(std::string_view(reinterpret_cast(result.getData()), result.getOffset())); + if (_multipartUploads[position].outstanding.fetch_sub(1) == 1) { + auto finished = [&callback, this](network::MessageResult& result) { + _completedMultiparts++; + std::forward(callback)(result); + }; + auto originalMsg = makeCallbackMessage(std::move(finished), _provider->completeMultiPartRequest(remotePath, _multipartUploads[position].uploadId, _multipartUploads[position].eTags), _provider->getAddress(), _provider->getPort(), nullptr, 0, traceId); + _multipartUploads[position].messages[parts] = std::move(originalMsg); + _multipartUploads[position].state = MultipartUpload::State::Validating; + } + }; + auto partSize = (i != parts) ? splitSize : size - offset; + auto object = std::string_view(data + offset, partSize); + auto originalMsg = makeCallbackMessage(std::move(finishMultipart), _provider->putRequestGeneric(remotePath, object, i, _multipartUploads[position].uploadId), _provider->getAddress(), _provider->getPort(), nullptr, 0, traceId); + originalMsg->setPutRequestData(reinterpret_cast(data + offset), partSize); + _multipartUploads[position].messages[i - 1] = std::move(originalMsg); + offset += partSize; + } + _multipartUploads[position].state = MultipartUpload::State::Sending; + }; + + auto originalMsg = makeCallbackMessage(std::move(uploadMessages), _provider->createMultiPartRequest(remotePath), _provider->getAddress(), _provider->getPort(), result, capacity, traceId); + _messages.push_back(std::move(originalMsg)); + } + + public: /// The iterator using iterator = Iterator; /// The const iterator using const_iterator = ConstIterator; /// The begin it - inline iterator begin() { return iterator(messages.begin()); } + inline iterator begin() { return iterator(_messages.begin()); } /// The end it - inline iterator end() { return iterator(messages.end()); } + inline iterator end() { return iterator(_messages.end()); } /// The begin it - inline const_iterator cbegin() const { return const_iterator(messages.cbegin()); } + inline const_iterator cbegin() const { return const_iterator(_messages.cbegin()); } /// The end it - inline const_iterator cend() const { return const_iterator(messages.cend()); } + inline const_iterator cend() const { return const_iterator(_messages.cend()); } /// The iterator class Iterator { diff --git a/src/cloud/aws.cpp b/src/cloud/aws.cpp index c01f0cf..1e35043 100644 --- a/src/cloud/aws.cpp +++ b/src/cloud/aws.cpp @@ -247,8 +247,8 @@ unique_ptr> AWS::putRequestGeneric(const string& file // Is it a multipart upload? if (part) { - request.path += "?partNumber=" + to_string(part) + "&uploadId="; - request.path += uploadId; + request.queries.emplace("partNumber", to_string(part)); + request.queries.emplace("uploadId", uploadId); } request.bodyLength = object.size(); @@ -320,7 +320,7 @@ unique_ptr> AWS::createMultiPartRequest(const string& request.path = "/" + filePath; else request.path = "/" + _settings.bucket + "/" + filePath; - request.path += "?uploads"; + request.queries.emplace("uploads", ""); request.bodyData = nullptr; request.bodyLength = 0; request.headers.emplace("Host", getAddress()); @@ -365,9 +365,8 @@ unique_ptr> AWS::completeMultiPartRequest(const strin request.path = "/" + filePath; else request.path = "/" + _settings.bucket + "/" + filePath; - request.path += "&uploadId="; - request.path += uploadId; + request.queries.emplace("uploadId", uploadId); request.bodyData = nullptr; request.bodyLength = 0; request.headers.emplace("Host", getAddress()); diff --git a/src/cloud/gcp.cpp b/src/cloud/gcp.cpp index 8d1dd61..b6dc430 100644 --- a/src/cloud/gcp.cpp +++ b/src/cloud/gcp.cpp @@ -114,8 +114,8 @@ unique_ptr> GCP::putRequestGeneric(const string& file // Is it a multipart upload? if (part) { - request.path += "?partNumber=" + to_string(part) + "&uploadId="; - request.path += uploadId; + request.queries.emplace("partNumber", to_string(part)); + request.queries.emplace("uploadId", uploadId); } request.bodyData = reinterpret_cast(object.data()); @@ -171,7 +171,7 @@ unique_ptr> GCP::createMultiPartRequest(const string& request.method = "POST"; request.type = "HTTP/1.1"; request.path = "/" + filePath; - request.path += "?uploads"; + request.queries.emplace("uploads", ""); request.bodyData = nullptr; request.bodyLength = 0; @@ -208,8 +208,7 @@ unique_ptr> GCP::completeMultiPartRequest(const strin request.method = "POST"; request.type = "HTTP/1.1"; request.path = "/" + filePath; - request.path += "&uploadId="; - request.path += uploadId; + request.queries.emplace("uploadId", uploadId); request.bodyData = nullptr; request.bodyLength = 0; diff --git a/src/network/transaction.cpp b/src/network/transaction.cpp index ca2e89f..52371d8 100644 --- a/src/network/transaction.cpp +++ b/src/network/transaction.cpp @@ -18,21 +18,44 @@ using namespace std; void Transaction::processSync(TaskedSendReceiver& sendReceiver) // Processes the request messages { - // send the original request message - for (auto& msg : messages) { - sendReceiver.sendSync(msg.get()); - } + do { + // send the original request message + for (; _messageCounter < _messages.size(); _messageCounter++) { + sendReceiver.sendSync(_messages[_messageCounter].get()); + } - // do the download work - sendReceiver.processSync(); + for (auto& multipart : _multipartUploads) { + if (multipart.state == MultipartUpload::State::Sending) { + for (auto i = 0ull; i < multipart.eTags.size(); i++) + sendReceiver.sendSync(multipart.messages[i].get()); + multipart.state = MultipartUpload::State::Default; + } else if (multipart.state == MultipartUpload::State::Validating) { + sendReceiver.sendSync(multipart.messages[multipart.eTags.size()].get()); + multipart.state = MultipartUpload::State::Default; + } + } + + // do the download work + sendReceiver.processSync(); + } while (_multipartUploads.size() != _completedMultiparts); } //--------------------------------------------------------------------------- -void Transaction::processAsync(TaskedSendReceiver& sendReceiver) +void Transaction::processAsync(TaskedSendReceiverGroup& group) // Sends the request messages to the task group { // send the original request message - for (auto& msg : messages) { - while (!sendReceiver.send(msg.get())) {} + for (; _messageCounter < _messages.size(); _messageCounter++) { + while (!group.send(_messages[_messageCounter].get())) {} + } + for (auto& multipart : _multipartUploads) { + if (multipart.state == MultipartUpload::State::Sending) { + for (auto i = 0ull; i < multipart.eTags.size(); i++) + while (!group.send(multipart.messages[i].get())) {} + multipart.state = MultipartUpload::State::Default; + } else if (multipart.state == MultipartUpload::State::Validating) { + while (!group.send(multipart.messages[multipart.eTags.size()].get())) {} + multipart.state = MultipartUpload::State::Default; + } } } //--------------------------------------------------------------------------- diff --git a/test/integration/minio_async.cpp b/test/integration/minio_async.cpp new file mode 100644 index 0000000..dcea2c2 --- /dev/null +++ b/test/integration/minio_async.cpp @@ -0,0 +1,178 @@ +#include "catch2/single_include/catch2/catch.hpp" +#include "cloud/minio.hpp" +#include "cloud/provider.hpp" +#include "network/tasked_send_receiver.hpp" +#include "network/transaction.hpp" +#include +#include +#include +#include +#include +//--------------------------------------------------------------------------- +// AnyBlob - Universal Cloud Object Storage Library +// Dominik Durner, 2022 +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/. +// SPDX-License-Identifier: MPL-2.0 +//--------------------------------------------------------------------------- +namespace anyblob { +namespace test { +//--------------------------------------------------------------------------- +using namespace std; +//--------------------------------------------------------------------------- +TEST_CASE("MinIO Asynchronous Integration") { + // Get the enviornment + const char* bucket = getenv("AWS_S3_BUCKET"); + const char* region = getenv("AWS_S3_REGION"); + const char* endpoint = getenv("AWS_S3_ENDPOINT"); + const char* key = getenv("AWS_S3_ACCESS_KEY"); + const char* secret = getenv("AWS_S3_SECRET_ACCESS_KEY"); + + REQUIRE(bucket); + REQUIRE(region); + REQUIRE(endpoint); + REQUIRE(key); + REQUIRE(secret); + + auto stringGen = [](auto len) { + static constexpr auto chars = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; + auto resultString = string(len, '\0'); + generate_n(begin(resultString), len, [&]() { return chars[rand() % strlen(chars)]; }); + return resultString; + }; + + // The file to be uploaded and downloaded + string bucketName = "minio://"; + bucketName = bucketName + endpoint + "/" + bucket + ":" + region; + string fileName[]{"test.txt", "long.txt"}; + string longText = stringGen(1 << 24); + string content[]{"Hello World!", longText}; + + // Create a new task group (18 concurrent request maximum, and up to 1024 outstanding submissions) + anyblob::network::TaskedSendReceiverGroup group(18, 1024); + + // Create an AnyBlob scheduler object for the group + anyblob::network::TaskedSendReceiver sendReceiver(group); + + // Async thread + future asyncSendReceiverThread; + + auto runLambda = [&]() { + // Runs the download thread to asynchronously retrieve data + sendReceiver.run(); + }; + asyncSendReceiverThread = async(launch::async, runLambda); + + // Create the provider for the corresponding filename + auto provider = anyblob::cloud::Provider::makeProvider(bucketName, false, key, secret, &sendReceiver); + { + // Check the upload for success + std::atomic finishedMessages = 0; + auto checkSuccess = [&finishedMessages](anyblob::network::MessageResult& result) { + // Sucessful request + REQUIRE(result.success()); + finishedMessages++; + }; + + // Create the put request + anyblob::network::Transaction putTxn(provider.get()); + for (auto i = 0u; i < 2; i++) + putTxn.putObjectRequest(checkSuccess, fileName[i], content[i].data(), content[i].size()); + + // Upload the request asynchronously + putTxn.processAsync(group); + + // Wait for the upload + while (finishedMessages != 2) + usleep(100); + } + { + // Check the upload for success + std::atomic finishedMessages = 0; + auto checkSuccess = [&finishedMessages](anyblob::network::MessageResult& result) { + // Sucessful request + REQUIRE(result.success()); + finishedMessages++; + }; + + // Create the multipart put request + auto minio = static_cast(provider.get()); + minio->setMultipartUploadSize(6ull << 20); + anyblob::network::Transaction putTxn(provider.get()); + for (auto i = 0u; i < 2; i++) + putTxn.putObjectRequest(checkSuccess, fileName[i], content[i].data(), content[i].size()); + + while (finishedMessages != 2) { + // Upload the new request asynchronously + putTxn.processAsync(group); + // Wait for the upload + usleep(100); + } + } + { + std::atomic finishedMessages = 0; + // Create the get request + anyblob::network::Transaction getTxn(provider.get()); + for (auto i = 0u; i < 2; i++) { + // Check the download for success + auto checkSuccess = [&finishedMessages, &content, i](anyblob::network::MessageResult& result) { + // Sucessful request + REQUIRE(result.success()); + // Simple string_view interface + REQUIRE(!content[i].compare(result.getResult())); + // Check from the other side too + REQUIRE(!result.getResult().compare(content[i])); + // Check the size + REQUIRE(result.getSize() == content[i].size()); + + // Advanced raw interface + // Note that the data lies in the data buffer but after the offset to skip the HTTP header + // Note that the size is already without the header, so the full request has size + offset length + string_view rawDataString(reinterpret_cast(result.getData()) + result.getOffset(), result.getSize()); + REQUIRE(!content[i].compare(rawDataString)); + REQUIRE(!rawDataString.compare(result.getResult())); + REQUIRE(!rawDataString.compare(content[i])); + finishedMessages++; + }; + + getTxn.getObjectRequest(std::move(checkSuccess), fileName[i]); + } + + // Retrieve the request asynchronously + getTxn.processAsync(group); + + // Wait for the download + while (finishedMessages != 2) + usleep(100); + } + { + // Check the delete for success + std::atomic finishedMessages = 0; + auto checkSuccess = [&finishedMessages](anyblob::network::MessageResult& result) { + // Sucessful request + REQUIRE(result.success()); + finishedMessages++; + }; + + // Create the delete request + anyblob::network::Transaction deleteTxn(provider.get()); + for (auto i = 0u; i < 2; i++) + deleteTxn.deleteObjectRequest(checkSuccess, fileName[i]); + + // Process the request asynchronously + deleteTxn.processAsync(group); + + // Wait for the deletion + while (finishedMessages != 2) + usleep(100); + } + + // Stop the send receiver daemon + sendReceiver.stop(); + // Join the thread + asyncSendReceiverThread.get(); +} +//--------------------------------------------------------------------------- +} // namespace test +} // namespace anyblob diff --git a/test/integration/minio.cpp b/test/integration/minio_sync.cpp similarity index 86% rename from test/integration/minio.cpp rename to test/integration/minio_sync.cpp index 9464c20..07382d6 100644 --- a/test/integration/minio.cpp +++ b/test/integration/minio_sync.cpp @@ -1,12 +1,12 @@ #include "catch2/single_include/catch2/catch.hpp" #include "cloud/provider.hpp" +#include "cloud/minio.hpp" #include "network/tasked_send_receiver.hpp" #include "network/transaction.hpp" #include #include #include #include -#include //--------------------------------------------------------------------------- // AnyBlob - Universal Cloud Object Storage Library // Dominik Durner, 2022 @@ -20,7 +20,7 @@ namespace test { //--------------------------------------------------------------------------- using namespace std; //--------------------------------------------------------------------------- -TEST_CASE("MinIO Integration") { +TEST_CASE("MinIO Sync Integration") { // Get the enviornment const char* bucket = getenv("AWS_S3_BUCKET"); const char* region = getenv("AWS_S3_REGION"); @@ -71,6 +71,23 @@ TEST_CASE("MinIO Integration") { REQUIRE(it.success()); } } + { + // Create the multipart put request + auto minio = static_cast(provider.get()); + minio->setMultipartUploadSize(6ull << 20); + anyblob::network::Transaction putTxn(provider.get()); + for (auto i = 0u; i < 2; i++) + putTxn.putObjectRequest(fileName[i], content[i].data(), content[i].size()); + + // Upload the request synchronously with the scheduler object on this thread + putTxn.processSync(sendReceiver); + + // Check the upload + for (const auto& it : putTxn) { + // Sucessful request + REQUIRE(it.success()); + } + } { // Create the get request anyblob::network::Transaction getTxn(provider.get()); @@ -102,15 +119,15 @@ TEST_CASE("MinIO Integration") { } } { - // Create the put request + // Create the delete request anyblob::network::Transaction deleteTxn(provider.get()); for (auto i = 0u; i < 2; i++) deleteTxn.deleteObjectRequest(fileName[i]); - // Upload the request synchronously with the scheduler object on this thread + // Process the request synchronously with the scheduler object on this thread deleteTxn.processSync(sendReceiver); - // Check the upload + // Check the deletion for (const auto& it : deleteTxn) { // Sucessful request REQUIRE(it.success());