From 10b6b013052e6a021ddd7ac50fdc26ffcf404f1c Mon Sep 17 00:00:00 2001 From: Dominik Durner Date: Wed, 5 Jul 2023 19:14:12 +0200 Subject: [PATCH] Added getError at result that returns response header and body --- include/network/message_result.hpp | 6 +++++- include/network/transaction.hpp | 10 ++++++++-- src/network/http_message.cpp | 5 ++++- src/network/message_result.cpp | 22 +++++++++++++++++----- src/network/tasked_send_receiver.cpp | 1 - src/network/tls_connection.cpp | 9 +++++---- test/integration/minio_async.cpp | 22 ++++++++++++++++++++++ test/integration/minio_sync.cpp | 18 +++++++++++++++++- 8 files changed, 78 insertions(+), 15 deletions(-) diff --git a/include/network/message_result.hpp b/include/network/message_result.hpp index 3e3829c..7c7d114 100644 --- a/include/network/message_result.hpp +++ b/include/network/message_result.hpp @@ -67,6 +67,8 @@ class MessageResult { uint64_t size; /// The offset of the result; the start after the message header uint64_t offset; + /// The error response + const MessageResult* originError; /// The failure code uint16_t failureCode; /// The state @@ -96,8 +98,10 @@ class MessageResult { [[nodiscard]] uint64_t getOffset() const; /// Get the state [[nodiscard]] MessageState getState() const; - /// Get the original message + /// Get the failure code [[nodiscard]] uint16_t getFailureCode() const; + /// Get the error response (incl. header) + [[nodiscard]] std::string_view getError() const; /// Is the data owned by this object [[nodiscard]] bool owned() const; /// Was the request successful diff --git a/include/network/transaction.hpp b/include/network/transaction.hpp index dfa9086..c3cb09f 100644 --- a/include/network/transaction.hpp +++ b/include/network/transaction.hpp @@ -56,6 +56,8 @@ class Transaction { std::atomic outstanding; /// The state std::atomic state; + /// The error message id + std::atomic errorMessageId; /// The constructor explicit MultipartUpload(int parts) : messages(parts + 1), eTags(parts), outstanding(parts), state(State::Default) {} @@ -179,6 +181,7 @@ class Transaction { for (auto i = 1ull; i <= parts; i++) { auto finishMultipart = [&callback, &initalRequestResult, position, remotePath, traceId, i, parts, this](network::MessageResult& result) { if (!result.success()) [[unlikely]] { + _multipartUploads[position].errorMessageId = i - 1; _multipartUploads[position].state = MultipartUpload::State::Aborted; } else { _multipartUploads[position].eTags[i - 1] = _provider->getETag(std::string_view(reinterpret_cast(result.getData()), result.getOffset())); @@ -186,16 +189,19 @@ class Transaction { if (_multipartUploads[position].outstanding.fetch_sub(1) == 1) { if (_multipartUploads[position].state != MultipartUpload::State::Aborted) [[likely]] { auto finished = [&callback, &initalRequestResult, this](network::MessageResult& result) { - if (!result.success()) + if (!result.success()) { initalRequestResult.state = network::MessageState::Cancelled; + initalRequestResult.originError = &result; + } _completedMultiparts++; std::forward(callback)(initalRequestResult); }; auto originalMsg = makeCallbackMessage(std::move(finished), _provider->completeMultiPartRequest(remotePath, _multipartUploads[position].uploadId, _multipartUploads[position].eTags), _provider->getAddress(), _provider->getPort(), nullptr, 0, traceId); _multipartUploads[position].messages[parts] = std::move(originalMsg); } else { - auto finished = [&callback, &initalRequestResult, this](network::MessageResult& /*result*/) { + auto finished = [&callback, &initalRequestResult, position, this](network::MessageResult& /*result*/) { initalRequestResult.state = network::MessageState::Cancelled; + initalRequestResult.originError = &_multipartUploads[position].messages[_multipartUploads[position].errorMessageId]->result; _completedMultiparts++; std::forward(callback)(initalRequestResult); }; diff --git a/src/network/http_message.cpp b/src/network/http_message.cpp index 85ba80f..82a72bf 100644 --- a/src/network/http_message.cpp +++ b/src/network/http_message.cpp @@ -71,7 +71,10 @@ MessageState HTTPMessage::execute(IOUringSocket& socket) length = originalMessage->putLength + originalMessage->message->size() - sendBufferOffset; } request = unique_ptr(new IOUringSocket::Request{{.cdata = ptr}, length, fd, IOUringSocket::EventType::write, this}); - socket.send_prep_to(request.get(), &tcpSettings.kernelTimeout); + if (length <= static_cast(chunkSize)) + socket.send_prep_to(request.get(), &tcpSettings.kernelTimeout); + else + socket.send_prep(request.get()); } break; } diff --git a/src/network/message_result.cpp b/src/network/message_result.cpp index d436071..fab186c 100644 --- a/src/network/message_result.cpp +++ b/src/network/message_result.cpp @@ -13,13 +13,13 @@ namespace network { //--------------------------------------------------------------------------- using namespace std; //--------------------------------------------------------------------------- -MessageResult::MessageResult() : size(), offset(), failureCode(), state(MessageState::Init) +MessageResult::MessageResult() : size(), offset(), originError(nullptr), failureCode(), state(MessageState::Init) // The default constructor { dataVector = make_unique>(); } //--------------------------------------------------------------------------- -MessageResult::MessageResult(uint8_t* data, uint64_t size) : size(), offset(), failureCode(), state(MessageState::Init) +MessageResult::MessageResult(uint8_t* data, uint64_t size) : size(), offset(), originError(nullptr), failureCode(), state(MessageState::Init) // The constructor with buffer input { if (data) @@ -28,7 +28,7 @@ MessageResult::MessageResult(uint8_t* data, uint64_t size) : size(), offset(), f dataVector = make_unique>(); } //--------------------------------------------------------------------------- -MessageResult::MessageResult(utils::DataVector* dataVector) : size(), offset(), failureCode(), state(MessageState::Init) +MessageResult::MessageResult(utils::DataVector* dataVector) : size(), offset(), originError(nullptr), failureCode(), state(MessageState::Init) // The constructor with buffer input { this->dataVector = unique_ptr>(dataVector); @@ -83,9 +83,21 @@ MessageState MessageResult::getState() const } //--------------------------------------------------------------------------- uint16_t MessageResult::getFailureCode() const -// Get the original message +// Get the failure code { - return failureCode; + if (originError) + return originError->getFailureCode(); + else + return failureCode; +} +//--------------------------------------------------------------------------- +std::string_view MessageResult::getError() const +// Get the error header +{ + if (originError) + return originError->getError(); + else + return string_view(reinterpret_cast(dataVector->data()), dataVector->size()); } //--------------------------------------------------------------------------- bool MessageResult::owned() const diff --git a/src/network/tasked_send_receiver.cpp b/src/network/tasked_send_receiver.cpp index f37313b..843f0cc 100644 --- a/src/network/tasked_send_receiver.cpp +++ b/src/network/tasked_send_receiver.cpp @@ -9,7 +9,6 @@ #include #include #include -#include #include //--------------------------------------------------------------------------- // AnyBlob - Universal Cloud Object Storage Library diff --git a/src/network/tls_connection.cpp b/src/network/tls_connection.cpp index 136d64e..0128938 100644 --- a/src/network/tls_connection.cpp +++ b/src/network/tls_connection.cpp @@ -1,7 +1,6 @@ #include "network/tls_connection.hpp" #include "network/https_message.hpp" #include "network/tls_context.hpp" -#include #include #include #include @@ -141,8 +140,7 @@ TLSConnection::Progress TLSConnection::shutdown(IOUringSocket& socket, bool fail if (!failedOnce) [[likely]] { status = Progress::Init; return shutdown(socket, true); - } - else { + } else { _context.dropSession(_message.fd); } } @@ -184,7 +182,10 @@ TLSConnection::Progress TLSConnection::process(IOUringSocket& socket) auto writeSize = _state.networkBioRead - _state.socketWrite; const uint8_t* ptr = reinterpret_cast(_buffer.get()) + _state.socketWrite; _message.request = unique_ptr(new IOUringSocket::Request{{.cdata = ptr}, writeSize, _message.fd, IOUringSocket::EventType::write, &_message}); - socket.send_prep_to(_message.request.get(), &_message.tcpSettings.kernelTimeout); + if (writeSize <= static_cast(_message.chunkSize)) + socket.send_prep_to(_message.request.get(), &_message.tcpSettings.kernelTimeout); + else + socket.send_prep(_message.request.get()); return _state.progress; } else { _state.progress = Progress::ReceivingInit; diff --git a/test/integration/minio_async.cpp b/test/integration/minio_async.cpp index dcea2c2..c2edbb7 100644 --- a/test/integration/minio_async.cpp +++ b/test/integration/minio_async.cpp @@ -110,6 +110,28 @@ TEST_CASE("MinIO Asynchronous Integration") { usleep(100); } } + { + // Check the upload for failure due to too small part + std::atomic finishedMessages = 0; + auto checkSuccess = [&finishedMessages](anyblob::network::MessageResult& result) { + // Sucessful request + REQUIRE(!result.success()); + finishedMessages++; + }; + + // Create the multipart put request + auto minio = static_cast(provider.get()); + minio->setMultipartUploadSize(1ull << 20); // too small, requires at least 5MiB parts + anyblob::network::Transaction putTxn(provider.get()); + putTxn.putObjectRequest(checkSuccess, fileName[1], content[1].data(), content[1].size()); + + while (finishedMessages != 1) { + // Upload the new request asynchronously + putTxn.processAsync(group); + // Wait for the upload + usleep(100); + } + } { std::atomic finishedMessages = 0; // Create the get request diff --git a/test/integration/minio_sync.cpp b/test/integration/minio_sync.cpp index 07382d6..3d6b581 100644 --- a/test/integration/minio_sync.cpp +++ b/test/integration/minio_sync.cpp @@ -1,6 +1,6 @@ #include "catch2/single_include/catch2/catch.hpp" -#include "cloud/provider.hpp" #include "cloud/minio.hpp" +#include "cloud/provider.hpp" #include "network/tasked_send_receiver.hpp" #include "network/transaction.hpp" #include @@ -88,6 +88,22 @@ TEST_CASE("MinIO Sync Integration") { REQUIRE(it.success()); } } + { + // Create the multipart put request but enforce failure due to small part size + auto minio = static_cast(provider.get()); + minio->setMultipartUploadSize(1ull << 20); // part size must be larger than 5MiB + anyblob::network::Transaction putTxn(provider.get()); + putTxn.putObjectRequest(fileName[1], content[1].data(), content[1].size()); + + // Upload the request synchronously with the scheduler object on this thread + putTxn.processSync(sendReceiver); + + // Check the upload + for (const auto& it : putTxn) { + // Sucessful request + REQUIRE(!it.success()); + } + } { // Create the get request anyblob::network::Transaction getTxn(provider.get());