diff --git a/include/network/message_result.hpp b/include/network/message_result.hpp index 3e3829c..7c7d114 100644 --- a/include/network/message_result.hpp +++ b/include/network/message_result.hpp @@ -67,6 +67,8 @@ class MessageResult { uint64_t size; /// The offset of the result; the start after the message header uint64_t offset; + /// The error response + const MessageResult* originError; /// The failure code uint16_t failureCode; /// The state @@ -96,8 +98,10 @@ class MessageResult { [[nodiscard]] uint64_t getOffset() const; /// Get the state [[nodiscard]] MessageState getState() const; - /// Get the original message + /// Get the failure code [[nodiscard]] uint16_t getFailureCode() const; + /// Get the error response (incl. header) + [[nodiscard]] std::string_view getError() const; /// Is the data owned by this object [[nodiscard]] bool owned() const; /// Was the request successful diff --git a/include/network/transaction.hpp b/include/network/transaction.hpp index dfa9086..c3cb09f 100644 --- a/include/network/transaction.hpp +++ b/include/network/transaction.hpp @@ -56,6 +56,8 @@ class Transaction { std::atomic outstanding; /// The state std::atomic state; + /// The error message id + std::atomic errorMessageId; /// The constructor explicit MultipartUpload(int parts) : messages(parts + 1), eTags(parts), outstanding(parts), state(State::Default) {} @@ -179,6 +181,7 @@ class Transaction { for (auto i = 1ull; i <= parts; i++) { auto finishMultipart = [&callback, &initalRequestResult, position, remotePath, traceId, i, parts, this](network::MessageResult& result) { if (!result.success()) [[unlikely]] { + _multipartUploads[position].errorMessageId = i - 1; _multipartUploads[position].state = MultipartUpload::State::Aborted; } else { _multipartUploads[position].eTags[i - 1] = _provider->getETag(std::string_view(reinterpret_cast(result.getData()), result.getOffset())); @@ -186,16 +189,19 @@ class Transaction { if (_multipartUploads[position].outstanding.fetch_sub(1) == 1) { if (_multipartUploads[position].state != MultipartUpload::State::Aborted) [[likely]] { auto finished = [&callback, &initalRequestResult, this](network::MessageResult& result) { - if (!result.success()) + if (!result.success()) { initalRequestResult.state = network::MessageState::Cancelled; + initalRequestResult.originError = &result; + } _completedMultiparts++; std::forward(callback)(initalRequestResult); }; auto originalMsg = makeCallbackMessage(std::move(finished), _provider->completeMultiPartRequest(remotePath, _multipartUploads[position].uploadId, _multipartUploads[position].eTags), _provider->getAddress(), _provider->getPort(), nullptr, 0, traceId); _multipartUploads[position].messages[parts] = std::move(originalMsg); } else { - auto finished = [&callback, &initalRequestResult, this](network::MessageResult& /*result*/) { + auto finished = [&callback, &initalRequestResult, position, this](network::MessageResult& /*result*/) { initalRequestResult.state = network::MessageState::Cancelled; + initalRequestResult.originError = &_multipartUploads[position].messages[_multipartUploads[position].errorMessageId]->result; _completedMultiparts++; std::forward(callback)(initalRequestResult); }; diff --git a/src/network/message_result.cpp b/src/network/message_result.cpp index d436071..d885c93 100644 --- a/src/network/message_result.cpp +++ b/src/network/message_result.cpp @@ -13,13 +13,13 @@ namespace network { //--------------------------------------------------------------------------- using namespace std; //--------------------------------------------------------------------------- -MessageResult::MessageResult() : size(), offset(), failureCode(), state(MessageState::Init) +MessageResult::MessageResult() : size(), offset(), originError(nullptr), failureCode(), state(MessageState::Init) // The default constructor { dataVector = make_unique>(); } //--------------------------------------------------------------------------- -MessageResult::MessageResult(uint8_t* data, uint64_t size) : size(), offset(), failureCode(), state(MessageState::Init) +MessageResult::MessageResult(uint8_t* data, uint64_t size) : size(), offset(), originError(nullptr), failureCode(), state(MessageState::Init) // The constructor with buffer input { if (data) @@ -28,7 +28,7 @@ MessageResult::MessageResult(uint8_t* data, uint64_t size) : size(), offset(), f dataVector = make_unique>(); } //--------------------------------------------------------------------------- -MessageResult::MessageResult(utils::DataVector* dataVector) : size(), offset(), failureCode(), state(MessageState::Init) +MessageResult::MessageResult(utils::DataVector* dataVector) : size(), offset(), originError(nullptr), failureCode(), state(MessageState::Init) // The constructor with buffer input { this->dataVector = unique_ptr>(dataVector); @@ -83,11 +83,20 @@ MessageState MessageResult::getState() const } //--------------------------------------------------------------------------- uint16_t MessageResult::getFailureCode() const -// Get the original message +// Get the failure code { return failureCode; } //--------------------------------------------------------------------------- +std::string_view MessageResult::getError() const +// Get the error header +{ + if (originError) + return originError->getError(); + else + return string_view(reinterpret_cast(dataVector->data()), dataVector->size()); +} +//--------------------------------------------------------------------------- bool MessageResult::owned() const // Is the data onwed by this { diff --git a/src/network/tasked_send_receiver.cpp b/src/network/tasked_send_receiver.cpp index f37313b..843f0cc 100644 --- a/src/network/tasked_send_receiver.cpp +++ b/src/network/tasked_send_receiver.cpp @@ -9,7 +9,6 @@ #include #include #include -#include #include //--------------------------------------------------------------------------- // AnyBlob - Universal Cloud Object Storage Library diff --git a/src/network/tls_connection.cpp b/src/network/tls_connection.cpp index 136d64e..49adf8a 100644 --- a/src/network/tls_connection.cpp +++ b/src/network/tls_connection.cpp @@ -1,7 +1,6 @@ #include "network/tls_connection.hpp" #include "network/https_message.hpp" #include "network/tls_context.hpp" -#include #include #include #include diff --git a/test/integration/minio_async.cpp b/test/integration/minio_async.cpp index dcea2c2..c2edbb7 100644 --- a/test/integration/minio_async.cpp +++ b/test/integration/minio_async.cpp @@ -110,6 +110,28 @@ TEST_CASE("MinIO Asynchronous Integration") { usleep(100); } } + { + // Check the upload for failure due to too small part + std::atomic finishedMessages = 0; + auto checkSuccess = [&finishedMessages](anyblob::network::MessageResult& result) { + // Sucessful request + REQUIRE(!result.success()); + finishedMessages++; + }; + + // Create the multipart put request + auto minio = static_cast(provider.get()); + minio->setMultipartUploadSize(1ull << 20); // too small, requires at least 5MiB parts + anyblob::network::Transaction putTxn(provider.get()); + putTxn.putObjectRequest(checkSuccess, fileName[1], content[1].data(), content[1].size()); + + while (finishedMessages != 1) { + // Upload the new request asynchronously + putTxn.processAsync(group); + // Wait for the upload + usleep(100); + } + } { std::atomic finishedMessages = 0; // Create the get request diff --git a/test/integration/minio_sync.cpp b/test/integration/minio_sync.cpp index 07382d6..3d6b581 100644 --- a/test/integration/minio_sync.cpp +++ b/test/integration/minio_sync.cpp @@ -1,6 +1,6 @@ #include "catch2/single_include/catch2/catch.hpp" -#include "cloud/provider.hpp" #include "cloud/minio.hpp" +#include "cloud/provider.hpp" #include "network/tasked_send_receiver.hpp" #include "network/transaction.hpp" #include @@ -88,6 +88,22 @@ TEST_CASE("MinIO Sync Integration") { REQUIRE(it.success()); } } + { + // Create the multipart put request but enforce failure due to small part size + auto minio = static_cast(provider.get()); + minio->setMultipartUploadSize(1ull << 20); // part size must be larger than 5MiB + anyblob::network::Transaction putTxn(provider.get()); + putTxn.putObjectRequest(fileName[1], content[1].data(), content[1].size()); + + // Upload the request synchronously with the scheduler object on this thread + putTxn.processSync(sendReceiver); + + // Check the upload + for (const auto& it : putTxn) { + // Sucessful request + REQUIRE(!it.success()); + } + } { // Create the get request anyblob::network::Transaction getTxn(provider.get());