Skip to content

Commit

Permalink
Added getError at result that returns response header and body
Browse files Browse the repository at this point in the history
  • Loading branch information
durner committed Jul 7, 2023
1 parent ad3eb18 commit 10b6b01
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 15 deletions.
6 changes: 5 additions & 1 deletion include/network/message_result.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ class MessageResult {
uint64_t size;
/// The offset of the result; the start after the message header
uint64_t offset;
/// The error response
const MessageResult* originError;
/// The failure code
uint16_t failureCode;
/// The state
Expand Down Expand Up @@ -96,8 +98,10 @@ class MessageResult {
[[nodiscard]] uint64_t getOffset() const;
/// Get the state
[[nodiscard]] MessageState getState() const;
/// Get the original message
/// Get the failure code
[[nodiscard]] uint16_t getFailureCode() const;
/// Get the error response (incl. header)
[[nodiscard]] std::string_view getError() const;
/// Is the data owned by this object
[[nodiscard]] bool owned() const;
/// Was the request successful
Expand Down
10 changes: 8 additions & 2 deletions include/network/transaction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ class Transaction {
std::atomic<int> outstanding;
/// The state
std::atomic<State> state;
/// The error message id
std::atomic<int> errorMessageId;

/// The constructor
explicit MultipartUpload(int parts) : messages(parts + 1), eTags(parts), outstanding(parts), state(State::Default) {}
Expand Down Expand Up @@ -179,23 +181,27 @@ class Transaction {
for (auto i = 1ull; i <= parts; i++) {
auto finishMultipart = [&callback, &initalRequestResult, position, remotePath, traceId, i, parts, this](network::MessageResult& result) {
if (!result.success()) [[unlikely]] {
_multipartUploads[position].errorMessageId = i - 1;
_multipartUploads[position].state = MultipartUpload::State::Aborted;
} else {
_multipartUploads[position].eTags[i - 1] = _provider->getETag(std::string_view(reinterpret_cast<const char*>(result.getData()), result.getOffset()));
}
if (_multipartUploads[position].outstanding.fetch_sub(1) == 1) {
if (_multipartUploads[position].state != MultipartUpload::State::Aborted) [[likely]] {
auto finished = [&callback, &initalRequestResult, this](network::MessageResult& result) {
if (!result.success())
if (!result.success()) {
initalRequestResult.state = network::MessageState::Cancelled;
initalRequestResult.originError = &result;
}
_completedMultiparts++;
std::forward<Callback>(callback)(initalRequestResult);
};
auto originalMsg = makeCallbackMessage(std::move(finished), _provider->completeMultiPartRequest(remotePath, _multipartUploads[position].uploadId, _multipartUploads[position].eTags), _provider->getAddress(), _provider->getPort(), nullptr, 0, traceId);
_multipartUploads[position].messages[parts] = std::move(originalMsg);
} else {
auto finished = [&callback, &initalRequestResult, this](network::MessageResult& /*result*/) {
auto finished = [&callback, &initalRequestResult, position, this](network::MessageResult& /*result*/) {
initalRequestResult.state = network::MessageState::Cancelled;
initalRequestResult.originError = &_multipartUploads[position].messages[_multipartUploads[position].errorMessageId]->result;
_completedMultiparts++;
std::forward<Callback>(callback)(initalRequestResult);
};
Expand Down
5 changes: 4 additions & 1 deletion src/network/http_message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@ MessageState HTTPMessage::execute(IOUringSocket& socket)
length = originalMessage->putLength + originalMessage->message->size() - sendBufferOffset;
}
request = unique_ptr<IOUringSocket::Request>(new IOUringSocket::Request{{.cdata = ptr}, length, fd, IOUringSocket::EventType::write, this});
socket.send_prep_to(request.get(), &tcpSettings.kernelTimeout);
if (length <= static_cast<int64_t>(chunkSize))
socket.send_prep_to(request.get(), &tcpSettings.kernelTimeout);
else
socket.send_prep(request.get());
}
break;
}
Expand Down
22 changes: 17 additions & 5 deletions src/network/message_result.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ namespace network {
//---------------------------------------------------------------------------
using namespace std;
//---------------------------------------------------------------------------
MessageResult::MessageResult() : size(), offset(), failureCode(), state(MessageState::Init)
MessageResult::MessageResult() : size(), offset(), originError(nullptr), failureCode(), state(MessageState::Init)
// The default constructor
{
dataVector = make_unique<utils::DataVector<uint8_t>>();
}
//---------------------------------------------------------------------------
MessageResult::MessageResult(uint8_t* data, uint64_t size) : size(), offset(), failureCode(), state(MessageState::Init)
MessageResult::MessageResult(uint8_t* data, uint64_t size) : size(), offset(), originError(nullptr), failureCode(), state(MessageState::Init)
// The constructor with buffer input
{
if (data)
Expand All @@ -28,7 +28,7 @@ MessageResult::MessageResult(uint8_t* data, uint64_t size) : size(), offset(), f
dataVector = make_unique<utils::DataVector<uint8_t>>();
}
//---------------------------------------------------------------------------
MessageResult::MessageResult(utils::DataVector<uint8_t>* dataVector) : size(), offset(), failureCode(), state(MessageState::Init)
MessageResult::MessageResult(utils::DataVector<uint8_t>* dataVector) : size(), offset(), originError(nullptr), failureCode(), state(MessageState::Init)
// The constructor with buffer input
{
this->dataVector = unique_ptr<utils::DataVector<uint8_t>>(dataVector);
Expand Down Expand Up @@ -83,9 +83,21 @@ MessageState MessageResult::getState() const
}
//---------------------------------------------------------------------------
uint16_t MessageResult::getFailureCode() const
// Get the original message
// Get the failure code
{
return failureCode;
if (originError)
return originError->getFailureCode();
else
return failureCode;
}
//---------------------------------------------------------------------------
std::string_view MessageResult::getError() const
// Get the error header
{
if (originError)
return originError->getError();
else
return string_view(reinterpret_cast<char*>(dataVector->data()), dataVector->size());
}
//---------------------------------------------------------------------------
bool MessageResult::owned() const
Expand Down
1 change: 0 additions & 1 deletion src/network/tasked_send_receiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
#include <algorithm>
#include <cassert>
#include <cstring>
#include <iostream>
#include <unistd.h>
//---------------------------------------------------------------------------
// AnyBlob - Universal Cloud Object Storage Library
Expand Down
9 changes: 5 additions & 4 deletions src/network/tls_connection.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#include "network/tls_connection.hpp"
#include "network/https_message.hpp"
#include "network/tls_context.hpp"
#include <iostream>
#include <openssl/bio.h>
#include <openssl/crypto.h>
#include <openssl/err.h>
Expand Down Expand Up @@ -141,8 +140,7 @@ TLSConnection::Progress TLSConnection::shutdown(IOUringSocket& socket, bool fail
if (!failedOnce) [[likely]] {
status = Progress::Init;
return shutdown(socket, true);
}
else {
} else {
_context.dropSession(_message.fd);
}
}
Expand Down Expand Up @@ -184,7 +182,10 @@ TLSConnection::Progress TLSConnection::process(IOUringSocket& socket)
auto writeSize = _state.networkBioRead - _state.socketWrite;
const uint8_t* ptr = reinterpret_cast<uint8_t*>(_buffer.get()) + _state.socketWrite;
_message.request = unique_ptr<IOUringSocket::Request>(new IOUringSocket::Request{{.cdata = ptr}, writeSize, _message.fd, IOUringSocket::EventType::write, &_message});
socket.send_prep_to(_message.request.get(), &_message.tcpSettings.kernelTimeout);
if (writeSize <= static_cast<int64_t>(_message.chunkSize))
socket.send_prep_to(_message.request.get(), &_message.tcpSettings.kernelTimeout);
else
socket.send_prep(_message.request.get());
return _state.progress;
} else {
_state.progress = Progress::ReceivingInit;
Expand Down
22 changes: 22 additions & 0 deletions test/integration/minio_async.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,28 @@ TEST_CASE("MinIO Asynchronous Integration") {
usleep(100);
}
}
{
// Check the upload for failure due to too small part
std::atomic<uint16_t> finishedMessages = 0;
auto checkSuccess = [&finishedMessages](anyblob::network::MessageResult& result) {
// Sucessful request
REQUIRE(!result.success());
finishedMessages++;
};

// Create the multipart put request
auto minio = static_cast<anyblob::cloud::MinIO*>(provider.get());
minio->setMultipartUploadSize(1ull << 20); // too small, requires at least 5MiB parts
anyblob::network::Transaction putTxn(provider.get());
putTxn.putObjectRequest(checkSuccess, fileName[1], content[1].data(), content[1].size());

while (finishedMessages != 1) {
// Upload the new request asynchronously
putTxn.processAsync(group);
// Wait for the upload
usleep(100);
}
}
{
std::atomic<uint16_t> finishedMessages = 0;
// Create the get request
Expand Down
18 changes: 17 additions & 1 deletion test/integration/minio_sync.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#include "catch2/single_include/catch2/catch.hpp"
#include "cloud/provider.hpp"
#include "cloud/minio.hpp"
#include "cloud/provider.hpp"
#include "network/tasked_send_receiver.hpp"
#include "network/transaction.hpp"
#include <cstdlib>
Expand Down Expand Up @@ -88,6 +88,22 @@ TEST_CASE("MinIO Sync Integration") {
REQUIRE(it.success());
}
}
{
// Create the multipart put request but enforce failure due to small part size
auto minio = static_cast<anyblob::cloud::MinIO*>(provider.get());
minio->setMultipartUploadSize(1ull << 20); // part size must be larger than 5MiB
anyblob::network::Transaction putTxn(provider.get());
putTxn.putObjectRequest(fileName[1], content[1].data(), content[1].size());

// Upload the request synchronously with the scheduler object on this thread
putTxn.processSync(sendReceiver);

// Check the upload
for (const auto& it : putTxn) {
// Sucessful request
REQUIRE(!it.success());
}
}
{
// Create the get request
anyblob::network::Transaction getTxn(provider.get());
Expand Down

0 comments on commit 10b6b01

Please sign in to comment.