Skip to content

Commit

Permalink
Merged all transaction interfaces
Browse files Browse the repository at this point in the history
  • Loading branch information
durner committed Jun 26, 2023
1 parent bf75baa commit d7e1d62
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 165 deletions.
13 changes: 6 additions & 7 deletions example/benchmark/src/benchmark/bandwidth.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@
#include "cloud/aws_resolver.hpp"
#include "cloud/azure.hpp"
#include "cloud/gcp.hpp"
#include "network/get_transaction.hpp"
#include "network/transaction.hpp"
#include "network/original_message.hpp"
#include "network/put_transaction.hpp"
#include "network/s3_send_receiver.hpp"
#include "network/tasked_send_receiver.hpp"
#include "perfevent/PerfEvent.hpp"
Expand Down Expand Up @@ -217,21 +216,21 @@ void Bandwidth::runUring(const Settings& benchmarkSettings, const string& uri)
sendReceivers.back()->reuse(result.moveDataVector());
};

network::GetTransaction getTxn[benchmarkSettings.concurrentThreads];
network::Transaction getTxn[benchmarkSettings.concurrentThreads];

for (auto i = 0u; i < benchmarkSettings.concurrentThreads; i++)
getTxn[i].setProvider(cloudProvider.get());

if (benchmarkSettings.blobFiles > benchmarkSettings.requests) {
for (auto i = 0u; i < benchmarkSettings.requests; i++) {
auto filePath = benchmarkSettings.filePath + to_string(dist(rng)) + ".bin";
getTxn[0].addRequest(callback, filePath, range, nullptr, 0, i);
getTxn[0].getObjectRequest(callback, filePath, range, nullptr, 0, i);
}
} else {
auto createRequests = [&](uint64_t start, uint64_t end, uint64_t threadId) {
for (auto i = start; i < end; i++) {
auto filePath = benchmarkSettings.filePath + to_string((i % benchmarkSettings.blobFiles) + 1) + ".bin";
getTxn[threadId].addRequest(callback, filePath, range, nullptr, 0, i);
getTxn[threadId].getObjectRequest(callback, filePath, range, nullptr, 0, i);
}
};
auto start = 0ull;
Expand Down Expand Up @@ -288,7 +287,7 @@ void Bandwidth::runUring(const Settings& benchmarkSettings, const string& uri)
finishedMessages++;
};

network::PutTransaction putTxn(cloudProvider.get());
network::Transaction putTxn(cloudProvider.get());
auto blob = make_unique<utils::DataVector<uint8_t>>(1 << 24);
if (benchmarkSettings.encryption) {
auto plainBlob = make_unique<utils::DataVector<uint8_t>>((1 << 24) - 16);
Expand All @@ -308,7 +307,7 @@ void Bandwidth::runUring(const Settings& benchmarkSettings, const string& uri)
end = benchmarkSettings.requests;
for (uint64_t j = start; j < end; j++) {
auto filePath = "upload_" + benchmarkSettings.filePath + to_string(j + 1) + ".bin";
putTxn.addRequest(callbackUpload, filePath, reinterpret_cast<const char*>(blob->data()), blob->size(), nullptr, 0, i);
putTxn.putObjectRequest(callbackUpload, filePath, reinterpret_cast<const char*>(blob->data()), blob->size(), nullptr, 0, i);
}
}

Expand Down
6 changes: 3 additions & 3 deletions example/simple/main.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#include "cloud/provider.hpp"
#include "network/get_transaction.hpp"
#include "network/transaction.hpp"
#include "network/tasked_send_receiver.hpp"
#include <cstring>
#include <iostream>
Expand Down Expand Up @@ -28,8 +28,8 @@ int main(int /*argc*/, char** /*argv*/) {
auto provider = anyblob::cloud::Provider::makeProvider(bucketName, false, "", "", &sendReceiver);

// Create the get request
anyblob::network::GetTransaction getTxn(provider.get());
getTxn.addRequest(fileName);
anyblob::network::Transaction getTxn(provider.get());
getTxn.getObjectRequest(fileName);

// Retrieve the request synchronously with the scheduler object on this thread
getTxn.processSync(sendReceiver);
Expand Down
6 changes: 0 additions & 6 deletions include/cloud/provider.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@ namespace anyblob {
namespace network {
class TaskedSendReceiver;
class Transaction;
class GetTransaction;
class PutTransaction;
class DeleteTransaction;
struct OriginalMessage;
}; // namespace network
namespace utils {
Expand Down Expand Up @@ -109,9 +106,6 @@ class Provider {
[[nodiscard]] virtual Instance getInstanceDetails(network::TaskedSendReceiver& sendReceiver) = 0;

friend network::Transaction;
friend network::GetTransaction;
friend network::PutTransaction;
friend network::DeleteTransaction;
};
//---------------------------------------------------------------------------
} // namespace cloud
Expand Down
43 changes: 0 additions & 43 deletions include/network/delete_transaction.hpp

This file was deleted.

45 changes: 0 additions & 45 deletions include/network/get_transaction.hpp

This file was deleted.

47 changes: 0 additions & 47 deletions include/network/put_transaction.hpp

This file was deleted.

58 changes: 56 additions & 2 deletions include/network/transaction.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
#pragma once
#include "cloud/provider.hpp"
#include "network/message_result.hpp"
#include "network/original_message.hpp"
#include <cassert>
#include <memory>
#include <span>
#include <string>
Expand All @@ -24,7 +27,6 @@ class TaskedSendReceiver;
class TaskedSendReceiverGroup;
//---------------------------------------------------------------------------
/// The request/response transaction handler used to transfer data from and to remote storage
/// Used as interface in the initiation GetTransaction and PutTransaction
class Transaction {
public:
class Iterator;
Expand All @@ -38,19 +40,71 @@ class Transaction {
/// The message
message_vector_type messages;

public:

/// The constructor
Transaction() = default;
/// The explicit constructor with the provider
explicit Transaction(const cloud::Provider* provider) : provider(provider), messages() {}

public:
/// Set the provider
constexpr void setProvider(const cloud::Provider* provider) { this->provider = provider; }
/// Sends the request messages to the task group
void processAsync(TaskedSendReceiver& sendReceiver);
/// Processes the request messages
void processSync(TaskedSendReceiver& sendReceiver);

/// Build a new get request for synchronous calls
/// Note that the range is [start, end[, [0, 0[ gets the whole object
inline void getObjectRequest(const std::string& remotePath, std::pair<uint64_t, uint64_t> range = {0, 0}, uint8_t* result = nullptr, uint64_t capacity = 0, uint64_t traceId = 0) {
assert(provider);
auto originalMsg = std::make_unique<network::OriginalMessage>(provider->getRequest(remotePath, range), provider->getAddress(), provider->getPort(), result, capacity, traceId);
messages.push_back(std::move(originalMsg));
}

/// Build a new get request with callback
/// Note that the range is [start, end[, [0, 0[ gets the whole object
template <typename Callback>
inline void getObjectRequest(Callback&& callback, const std::string& remotePath, std::pair<uint64_t, uint64_t> range = {0, 0}, uint8_t* result = nullptr, uint64_t capacity = 0, uint64_t traceId = 0) {
assert(provider);
auto originalMsg = std::make_unique<network::OriginalCallbackMessage<Callback>>(std::forward<Callback&&>(callback), provider->getRequest(remotePath, range), provider->getAddress(), provider->getPort(), result, capacity, traceId);
messages.push_back(std::move(originalMsg));
}

/// Build a new put request for synchronous calls
inline void putObjectRequest(const std::string& remotePath, const char* data, uint64_t size, uint8_t* result = nullptr, uint64_t capacity = 0, uint64_t traceId = 0) {
assert(provider);
auto object = std::string_view(data, size);
auto originalMsg = std::make_unique<network::OriginalMessage>(provider->putRequest(remotePath, object), provider->getAddress(), provider->getPort(), result, capacity, traceId);
originalMsg->setPutRequestData(reinterpret_cast<const uint8_t*>(data), size);
messages.push_back(std::move(originalMsg));
}

/// Build a new put request with callback
template <typename Callback>
inline void putObjectRequest(Callback&& callback, const std::string& remotePath, const char* data, uint64_t size, uint8_t* result = nullptr, uint64_t capacity = 0, uint64_t traceId = 0) {
assert(provider);
auto object = std::string_view(data, size);
auto originalMsg = std::make_unique<network::OriginalCallbackMessage<Callback>>(std::forward<Callback&&>(callback), provider->putRequest(remotePath, object), provider->getAddress(), provider->getPort(), result, capacity, traceId);
originalMsg->setPutRequestData(reinterpret_cast<const uint8_t*>(data), size);
messages.push_back(std::move(originalMsg));
}

/// Build a new delete request for synchronous calls
inline void deleteObjectRequest(const std::string& remotePath, uint8_t* result = nullptr, uint64_t capacity = 0, uint64_t traceId = 0) {
assert(provider);
auto originalMsg = std::make_unique<network::OriginalMessage>(provider->deleteRequest(remotePath), provider->getAddress(), provider->getPort(), result, capacity, traceId);
messages.push_back(std::move(originalMsg));
}

/// Build a new delete request with callback
template <typename Callback>
inline void deleteObjectRequest(Callback&& callback, const std::string& remotePath, uint8_t* result = nullptr, uint64_t capacity = 0, uint64_t traceId = 0) {
assert(provider);
auto originalMsg = std::make_unique<network::OriginalCallbackMessage<Callback>>(std::forward<Callback&&>(callback), provider->deleteRequest(remotePath), provider->getAddress(), provider->getPort(), result, capacity, traceId);
messages.push_back(std::move(originalMsg));
}

/// The iterator
using iterator = Iterator;
/// The const iterator
Expand Down
21 changes: 9 additions & 12 deletions test/integration/minio.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
#include "catch2/single_include/catch2/catch.hpp"
#include "cloud/provider.hpp"
#include "network/get_transaction.hpp"
#include "network/put_transaction.hpp"
#include "network/delete_transaction.hpp"
#include "network/tasked_send_receiver.hpp"
#include "network/transaction.hpp"
#include <cstdlib>
#include <cstring>
#include <filesystem>
Expand Down Expand Up @@ -46,9 +44,9 @@ TEST_CASE("MinIO Integration") {
// The file to be uploaded and downloaded
string bucketName = "minio://";
bucketName = bucketName + endpoint + "/" + bucket + ":" + region;
string fileName[] {"test.txt", "long.txt"};
string fileName[]{"test.txt", "long.txt"};
string longText = stringGen(1 << 24);
string content[] {"Hello World!", longText};
string content[]{"Hello World!", longText};

// Create a new task group (18 concurrent request maximum, and up to 1024 outstanding submissions)
anyblob::network::TaskedSendReceiverGroup group(18, 1024);
Expand All @@ -60,9 +58,9 @@ TEST_CASE("MinIO Integration") {
auto provider = anyblob::cloud::Provider::makeProvider(bucketName, false, key, secret, &sendReceiver);
{
// Create the put request
anyblob::network::PutTransaction putTxn(provider.get());
anyblob::network::Transaction putTxn(provider.get());
for (auto i = 0u; i < 2; i++)
putTxn.addRequest(fileName[i], content[i].data(), content[i].size());
putTxn.putObjectRequest(fileName[i], content[i].data(), content[i].size());

// Upload the request synchronously with the scheduler object on this thread
putTxn.processSync(sendReceiver);
Expand All @@ -75,9 +73,9 @@ TEST_CASE("MinIO Integration") {
}
{
// Create the get request
anyblob::network::GetTransaction getTxn(provider.get());
anyblob::network::Transaction getTxn(provider.get());
for (auto i = 0u; i < 2; i++)
getTxn.addRequest(fileName[i]);
getTxn.getObjectRequest(fileName[i]);

// Retrieve the request synchronously with the scheduler object on this thread
getTxn.processSync(sendReceiver);
Expand Down Expand Up @@ -105,9 +103,9 @@ TEST_CASE("MinIO Integration") {
}
{
// Create the put request
anyblob::network::DeleteTransaction deleteTxn(provider.get());
anyblob::network::Transaction deleteTxn(provider.get());
for (auto i = 0u; i < 2; i++)
deleteTxn.addRequest(fileName[i]);
deleteTxn.deleteObjectRequest(fileName[i]);

// Upload the request synchronously with the scheduler object on this thread
deleteTxn.processSync(sendReceiver);
Expand All @@ -118,7 +116,6 @@ TEST_CASE("MinIO Integration") {
REQUIRE(it.success());
}
}

}
//---------------------------------------------------------------------------
} // namespace test
Expand Down

0 comments on commit d7e1d62

Please sign in to comment.