Skip to content

Commit

Permalink
Added MultiPart Upload to Transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
durner committed Jul 4, 2023
1 parent 5c35ea8 commit a86ff1f
Show file tree
Hide file tree
Showing 9 changed files with 416 additions and 57 deletions.
4 changes: 2 additions & 2 deletions example/benchmark/src/benchmark/bandwidth.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ void Bandwidth::runUring(const Settings& benchmarkSettings, const string& uri)
}

for (auto i = 0u; i < benchmarkSettings.concurrentThreads; i++)
getTxn[i].processAsync(*sendReceivers.back());
getTxn[i].processAsync(group);

fstream s;
unique_ptr<utils::Timer> timer;
Expand Down Expand Up @@ -317,7 +317,7 @@ void Bandwidth::runUring(const Settings& benchmarkSettings, const string& uri)
sendReceivers[i]->setTimings(nullptr);
}

putTxn.processAsync(*sendReceivers.back());
putTxn.processAsync(group);

while (finishedMessages != requestPerSocket * benchmarkSettings.concurrentThreads)
usleep(100);
Expand Down
4 changes: 3 additions & 1 deletion include/cloud/aws.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ class AWS : public Provider {
Settings _settings;
/// The secret
std::unique_ptr<Secret> _secret;
/// The multipart upload size
uint64_t _multipartUploadSize = 128ull << 20;

public:
/// Get instance details
Expand Down Expand Up @@ -104,7 +106,7 @@ class AWS : public Provider {
/// Get the settings
[[nodiscard]] inline Settings getSettings() { return _settings; }
/// Allows multipart upload if size > 0
[[nodiscard]] uint64_t multipartUploadSize() const override { return 128ull << 20; }
[[nodiscard]] uint64_t multipartUploadSize() const override { return _multipartUploadSize; }

/// Builds the http request for downloading a blob or listing the directory
[[nodiscard]] std::unique_ptr<utils::DataVector<uint8_t>> getRequest(const std::string& filePath, const std::pair<uint64_t, uint64_t>& range) const override;
Expand Down
2 changes: 2 additions & 0 deletions include/cloud/minio.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ class MinIO : public AWS {
[[nodiscard]] std::string getAddress() const override;
/// Get the instance details
[[nodiscard]] Provider::Instance getInstanceDetails(network::TaskedSendReceiver& sendReceiver) override;
/// Set the upload split size
constexpr void setMultipartUploadSize(uint64_t size) { _multipartUploadSize = size; }

friend Provider;
};
Expand Down
199 changes: 169 additions & 30 deletions include/network/transaction.hpp

Large diffs are not rendered by default.

9 changes: 4 additions & 5 deletions src/cloud/aws.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,8 @@ unique_ptr<utils::DataVector<uint8_t>> AWS::putRequestGeneric(const string& file

// Is it a multipart upload?
if (part) {
request.path += "?partNumber=" + to_string(part) + "&uploadId=";
request.path += uploadId;
request.queries.emplace("partNumber", to_string(part));
request.queries.emplace("uploadId", uploadId);
}

request.bodyLength = object.size();
Expand Down Expand Up @@ -320,7 +320,7 @@ unique_ptr<utils::DataVector<uint8_t>> AWS::createMultiPartRequest(const string&
request.path = "/" + filePath;
else
request.path = "/" + _settings.bucket + "/" + filePath;
request.path += "?uploads";
request.queries.emplace("uploads", "");
request.bodyData = nullptr;
request.bodyLength = 0;
request.headers.emplace("Host", getAddress());
Expand Down Expand Up @@ -365,9 +365,8 @@ unique_ptr<utils::DataVector<uint8_t>> AWS::completeMultiPartRequest(const strin
request.path = "/" + filePath;
else
request.path = "/" + _settings.bucket + "/" + filePath;
request.path += "&uploadId=";
request.path += uploadId;

request.queries.emplace("uploadId", uploadId);
request.bodyData = nullptr;
request.bodyLength = 0;
request.headers.emplace("Host", getAddress());
Expand Down
9 changes: 4 additions & 5 deletions src/cloud/gcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ unique_ptr<utils::DataVector<uint8_t>> GCP::putRequestGeneric(const string& file

// Is it a multipart upload?
if (part) {
request.path += "?partNumber=" + to_string(part) + "&uploadId=";
request.path += uploadId;
request.queries.emplace("partNumber", to_string(part));
request.queries.emplace("uploadId", uploadId);
}

request.bodyData = reinterpret_cast<const uint8_t*>(object.data());
Expand Down Expand Up @@ -171,7 +171,7 @@ unique_ptr<utils::DataVector<uint8_t>> GCP::createMultiPartRequest(const string&
request.method = "POST";
request.type = "HTTP/1.1";
request.path = "/" + filePath;
request.path += "?uploads";
request.queries.emplace("uploads", "");
request.bodyData = nullptr;
request.bodyLength = 0;

Expand Down Expand Up @@ -208,8 +208,7 @@ unique_ptr<utils::DataVector<uint8_t>> GCP::completeMultiPartRequest(const strin
request.method = "POST";
request.type = "HTTP/1.1";
request.path = "/" + filePath;
request.path += "&uploadId=";
request.path += uploadId;
request.queries.emplace("uploadId", uploadId);
request.bodyData = nullptr;
request.bodyLength = 0;

Expand Down
41 changes: 32 additions & 9 deletions src/network/transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,44 @@ using namespace std;
void Transaction::processSync(TaskedSendReceiver& sendReceiver)
// Processes the request messages
{
// send the original request message
for (auto& msg : messages) {
sendReceiver.sendSync(msg.get());
}
do {
// send the original request message
for (; _messageCounter < _messages.size(); _messageCounter++) {
sendReceiver.sendSync(_messages[_messageCounter].get());
}

// do the download work
sendReceiver.processSync();
for (auto& multipart : _multipartUploads) {
if (multipart.state == MultipartUpload::State::Sending) {
for (auto i = 0ull; i < multipart.eTags.size(); i++)
sendReceiver.sendSync(multipart.messages[i].get());
multipart.state = MultipartUpload::State::Default;
} else if (multipart.state == MultipartUpload::State::Validating) {
sendReceiver.sendSync(multipart.messages[multipart.eTags.size()].get());
multipart.state = MultipartUpload::State::Default;
}
}

// do the download work
sendReceiver.processSync();
} while (_multipartUploads.size() != _completedMultiparts);
}
//---------------------------------------------------------------------------
void Transaction::processAsync(TaskedSendReceiver& sendReceiver)
void Transaction::processAsync(TaskedSendReceiverGroup& group)
// Sends the request messages to the task group
{
// send the original request message
for (auto& msg : messages) {
while (!sendReceiver.send(msg.get())) {}
for (; _messageCounter < _messages.size(); _messageCounter++) {
while (!group.send(_messages[_messageCounter].get())) {}
}
for (auto& multipart : _multipartUploads) {
if (multipart.state == MultipartUpload::State::Sending) {
for (auto i = 0ull; i < multipart.eTags.size(); i++)
while (!group.send(multipart.messages[i].get())) {}
multipart.state = MultipartUpload::State::Default;
} else if (multipart.state == MultipartUpload::State::Validating) {
while (!group.send(multipart.messages[multipart.eTags.size()].get())) {}
multipart.state = MultipartUpload::State::Default;
}
}
}
//---------------------------------------------------------------------------
Expand Down
178 changes: 178 additions & 0 deletions test/integration/minio_async.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
#include "catch2/single_include/catch2/catch.hpp"
#include "cloud/minio.hpp"
#include "cloud/provider.hpp"
#include "network/tasked_send_receiver.hpp"
#include "network/transaction.hpp"
#include <cstdlib>
#include <cstring>
#include <filesystem>
#include <fstream>
#include <future>
//---------------------------------------------------------------------------
// AnyBlob - Universal Cloud Object Storage Library
// Dominik Durner, 2022
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
// SPDX-License-Identifier: MPL-2.0
//---------------------------------------------------------------------------
namespace anyblob {
namespace test {
//---------------------------------------------------------------------------
using namespace std;
//---------------------------------------------------------------------------
TEST_CASE("MinIO Asynchronous Integration") {
// Get the enviornment
const char* bucket = getenv("AWS_S3_BUCKET");
const char* region = getenv("AWS_S3_REGION");
const char* endpoint = getenv("AWS_S3_ENDPOINT");
const char* key = getenv("AWS_S3_ACCESS_KEY");
const char* secret = getenv("AWS_S3_SECRET_ACCESS_KEY");

REQUIRE(bucket);
REQUIRE(region);
REQUIRE(endpoint);
REQUIRE(key);
REQUIRE(secret);

auto stringGen = [](auto len) {
static constexpr auto chars = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
auto resultString = string(len, '\0');
generate_n(begin(resultString), len, [&]() { return chars[rand() % strlen(chars)]; });
return resultString;
};

// The file to be uploaded and downloaded
string bucketName = "minio://";
bucketName = bucketName + endpoint + "/" + bucket + ":" + region;
string fileName[]{"test.txt", "long.txt"};
string longText = stringGen(1 << 24);
string content[]{"Hello World!", longText};

// Create a new task group (18 concurrent request maximum, and up to 1024 outstanding submissions)
anyblob::network::TaskedSendReceiverGroup group(18, 1024);

// Create an AnyBlob scheduler object for the group
anyblob::network::TaskedSendReceiver sendReceiver(group);

// Async thread
future<void> asyncSendReceiverThread;

auto runLambda = [&]() {
// Runs the download thread to asynchronously retrieve data
sendReceiver.run();
};
asyncSendReceiverThread = async(launch::async, runLambda);

// Create the provider for the corresponding filename
auto provider = anyblob::cloud::Provider::makeProvider(bucketName, false, key, secret, &sendReceiver);
{
// Check the upload for success
std::atomic<uint16_t> finishedMessages = 0;
auto checkSuccess = [&finishedMessages](anyblob::network::MessageResult& result) {
// Sucessful request
REQUIRE(result.success());
finishedMessages++;
};

// Create the put request
anyblob::network::Transaction putTxn(provider.get());
for (auto i = 0u; i < 2; i++)
putTxn.putObjectRequest(checkSuccess, fileName[i], content[i].data(), content[i].size());

// Upload the request asynchronously
putTxn.processAsync(group);

// Wait for the upload
while (finishedMessages != 2)
usleep(100);
}
{
// Check the upload for success
std::atomic<uint16_t> finishedMessages = 0;
auto checkSuccess = [&finishedMessages](anyblob::network::MessageResult& result) {
// Sucessful request
REQUIRE(result.success());
finishedMessages++;
};

// Create the multipart put request
auto minio = static_cast<anyblob::cloud::MinIO*>(provider.get());
minio->setMultipartUploadSize(6ull << 20);
anyblob::network::Transaction putTxn(provider.get());
for (auto i = 0u; i < 2; i++)
putTxn.putObjectRequest(checkSuccess, fileName[i], content[i].data(), content[i].size());

while (finishedMessages != 2) {
// Upload the new request asynchronously
putTxn.processAsync(group);
// Wait for the upload
usleep(100);
}
}
{
std::atomic<uint16_t> finishedMessages = 0;
// Create the get request
anyblob::network::Transaction getTxn(provider.get());
for (auto i = 0u; i < 2; i++) {
// Check the download for success
auto checkSuccess = [&finishedMessages, &content, i](anyblob::network::MessageResult& result) {
// Sucessful request
REQUIRE(result.success());
// Simple string_view interface
REQUIRE(!content[i].compare(result.getResult()));
// Check from the other side too
REQUIRE(!result.getResult().compare(content[i]));
// Check the size
REQUIRE(result.getSize() == content[i].size());

// Advanced raw interface
// Note that the data lies in the data buffer but after the offset to skip the HTTP header
// Note that the size is already without the header, so the full request has size + offset length
string_view rawDataString(reinterpret_cast<const char*>(result.getData()) + result.getOffset(), result.getSize());
REQUIRE(!content[i].compare(rawDataString));
REQUIRE(!rawDataString.compare(result.getResult()));
REQUIRE(!rawDataString.compare(content[i]));
finishedMessages++;
};

getTxn.getObjectRequest(std::move(checkSuccess), fileName[i]);
}

// Retrieve the request asynchronously
getTxn.processAsync(group);

// Wait for the download
while (finishedMessages != 2)
usleep(100);
}
{
// Check the delete for success
std::atomic<uint16_t> finishedMessages = 0;
auto checkSuccess = [&finishedMessages](anyblob::network::MessageResult& result) {
// Sucessful request
REQUIRE(result.success());
finishedMessages++;
};

// Create the delete request
anyblob::network::Transaction deleteTxn(provider.get());
for (auto i = 0u; i < 2; i++)
deleteTxn.deleteObjectRequest(checkSuccess, fileName[i]);

// Process the request asynchronously
deleteTxn.processAsync(group);

// Wait for the deletion
while (finishedMessages != 2)
usleep(100);
}

// Stop the send receiver daemon
sendReceiver.stop();
// Join the thread
asyncSendReceiverThread.get();
}
//---------------------------------------------------------------------------
} // namespace test
} // namespace anyblob
Loading

0 comments on commit a86ff1f

Please sign in to comment.