Skip to content

Commit

Permalink
Added Multipart Abort
Browse files Browse the repository at this point in the history
  • Loading branch information
durner committed Jul 4, 2023
1 parent ece4ab7 commit ad3eb18
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 68 deletions.
6 changes: 5 additions & 1 deletion include/cloud/aws.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,11 @@ class AWS : public Provider {
return putRequestGeneric(filePath, object, 0, "");
}
// Builds the http request for deleting an objects
[[nodiscard]] std::unique_ptr<utils::DataVector<uint8_t>> deleteRequest(const std::string& filePath) const override;
[[nodiscard]] std::unique_ptr<utils::DataVector<uint8_t>> deleteRequest(const std::string& filePath) const override {
return deleteRequestGeneric(filePath, "");
}
/// Builds the http request for deleting objects
[[nodiscard]] std::unique_ptr<utils::DataVector<uint8_t>> deleteRequestGeneric(const std::string& filePath, const std::string_view uploadId) const override;
/// Builds the http request for creating multipart put objects
[[nodiscard]] std::unique_ptr<utils::DataVector<uint8_t>> createMultiPartRequest(const std::string& filePath) const override;
/// Builds the http request for completing multipart put objects
Expand Down
8 changes: 6 additions & 2 deletions include/cloud/gcp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,12 @@ class GCP : public Provider {
[[nodiscard]] std::unique_ptr<utils::DataVector<uint8_t>> putRequest(const std::string& filePath, const std::string_view object) const override {
return putRequestGeneric(filePath, object, 0, "");
}
// Builds the http request for deleting an objects
[[nodiscard]] std::unique_ptr<utils::DataVector<uint8_t>> deleteRequest(const std::string& filePath) const override;
// Builds the http request for deleting an objects
[[nodiscard]] std::unique_ptr<utils::DataVector<uint8_t>> deleteRequest(const std::string& filePath) const override {
return deleteRequestGeneric(filePath, "");
}
/// Builds the http request for deleting objects
[[nodiscard]] std::unique_ptr<utils::DataVector<uint8_t>> deleteRequestGeneric(const std::string& filePath, const std::string_view uploadId) const override;
/// Builds the http request for creating multipart put objects
[[nodiscard]] std::unique_ptr<utils::DataVector<uint8_t>> createMultiPartRequest(const std::string& filePath) const override;
/// Builds the http request for completing multipart put objects
Expand Down
2 changes: 2 additions & 0 deletions include/cloud/provider.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ class Provider {
[[nodiscard]] virtual uint64_t multipartUploadSize() const { return 0; }
/// Builds the http request for putting multipart objects without the object data itself
[[nodiscard]] virtual std::unique_ptr<utils::DataVector<uint8_t>> putRequestGeneric(const std::string& /*filePath*/, const std::string_view /*object*/, uint16_t /*part*/, const std::string_view /*uploadId*/) const;
/// Builds the http request for deleting multipart aborted objects
[[nodiscard]] virtual std::unique_ptr<utils::DataVector<uint8_t>> deleteRequestGeneric(const std::string& /*filePath*/, const std::string_view /*uploadId*/) const;
/// Builds the http request for creating multipart put objects
[[nodiscard]] virtual std::unique_ptr<utils::DataVector<uint8_t>> createMultiPartRequest(const std::string& /*filePath*/) const;
/// Builds the http request for completing multipart put objects
Expand Down
5 changes: 4 additions & 1 deletion include/network/message_result.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ struct OriginalMessage;
struct HTTPMessage;
struct HTTPSMessage;
class TLSConnection;
class Transaction;
//---------------------------------------------------------------------------
/// Current status of the message
enum class MessageState : uint8_t {
Expand All @@ -35,7 +36,8 @@ enum class MessageState : uint8_t {
Receiving,
TLSShutdown,
Finished,
Aborted
Aborted,
Cancelled
};
//---------------------------------------------------------------------------
/// The failure codes
Expand Down Expand Up @@ -111,6 +113,7 @@ class MessageResult {
friend HTTPSMessage;
friend OriginalMessage;
friend TLSConnection;
friend Transaction;
};
//---------------------------------------------------------------------------
} // namespace network
Expand Down
87 changes: 30 additions & 57 deletions include/network/transaction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class Transaction {
Default = 0,
Sending = 1,
Validating = 2,
Aborted = 1u << 7
};
/// The uploadId
std::string uploadId;
Expand Down Expand Up @@ -155,48 +156,8 @@ class Transaction {
private:
/// Build a new put request for synchronous calls
inline void putObjectRequestMultiPart(const std::string& remotePath, const char* data, uint64_t size, uint8_t* result = nullptr, uint64_t capacity = 0, uint64_t traceId = 0) {
assert(_provider);
auto splitSize = _provider->multipartUploadSize();
auto parts = (size / splitSize) + ((size % splitSize) ? 1u : 0u);
_multipartUploads.emplace_back(parts);
auto position = _multipartUploads.size() - 1;

auto uploadMessages = [position, parts, data, remotePath, traceId, splitSize, size, this](network::MessageResult& result) {
if (!result.success()) {
_completedMultiparts++;
return;
}

_multipartUploads[position].uploadId = _provider->getUploadId(result.getResult());
auto offset = 0ull;
for (auto i = 1ull; i <= parts; i++) {
auto finishMultipart = [position, remotePath, traceId, i, parts, this](network::MessageResult& result) {
// TODO: requires abort handling
if (!result.success())
return;

_multipartUploads[position].eTags[i - 1] = _provider->getETag(std::string_view(reinterpret_cast<const char*>(result.getData()), result.getOffset()));
if (_multipartUploads[position].outstanding.fetch_sub(1) == 1) {
auto finished = [this](network::MessageResult& /*result*/) {
_completedMultiparts++;
};
auto originalMsg = makeCallbackMessage(std::move(finished), _provider->completeMultiPartRequest(remotePath, _multipartUploads[position].uploadId, _multipartUploads[position].eTags), _provider->getAddress(), _provider->getPort(), nullptr, 0, traceId);
_multipartUploads[position].messages[parts] = std::move(originalMsg);
_multipartUploads[position].state = MultipartUpload::State::Validating;
}
};
auto partSize = (i != parts) ? splitSize : size - offset;
auto object = std::string_view(data + offset, partSize);
auto originalMsg = makeCallbackMessage(std::move(finishMultipart), _provider->putRequestGeneric(remotePath, object, i, _multipartUploads[position].uploadId), _provider->getAddress(), _provider->getPort(), nullptr, 0, traceId);
originalMsg->setPutRequestData(reinterpret_cast<const uint8_t*>(data + offset), partSize);
_multipartUploads[position].messages[i - 1] = std::move(originalMsg);
offset += partSize;
}
_multipartUploads[position].state = MultipartUpload::State::Sending;
};

auto originalMsg = makeCallbackMessage(std::move(uploadMessages), _provider->createMultiPartRequest(remotePath), _provider->getAddress(), _provider->getPort(), result, capacity, traceId);
_messages.push_back(std::move(originalMsg));
auto finished = [](network::MessageResult& /*result*/) {};
putObjectRequestMultiPart(std::move(finished), remotePath, data, size, result, capacity, traceId);
}

/// Build a new put request with callback
Expand All @@ -208,27 +169,39 @@ class Transaction {
_multipartUploads.emplace_back(parts);
auto position = _multipartUploads.size() - 1;

auto uploadMessages = [&callback, position, parts, data, remotePath, traceId, splitSize, size, this](network::MessageResult& result) {
if (!result.success()) {
auto uploadMessages = [&callback, position, parts, data, remotePath, traceId, splitSize, size, this](network::MessageResult& initalRequestResult) {
if (!initalRequestResult.success()) {
_completedMultiparts++;
return;
}
_multipartUploads[position].uploadId = _provider->getUploadId(result.getResult());
_multipartUploads[position].uploadId = _provider->getUploadId(initalRequestResult.getResult());
auto offset = 0ull;
for (auto i = 1ull; i <= parts; i++) {
auto finishMultipart = [&callback, position, remotePath, traceId, i, parts, this](network::MessageResult& result) {
// TODO: requires abort handling
if (!result.success())
return;

_multipartUploads[position].eTags[i - 1] = _provider->getETag(std::string_view(reinterpret_cast<const char*>(result.getData()), result.getOffset()));
auto finishMultipart = [&callback, &initalRequestResult, position, remotePath, traceId, i, parts, this](network::MessageResult& result) {
if (!result.success()) [[unlikely]] {
_multipartUploads[position].state = MultipartUpload::State::Aborted;
} else {
_multipartUploads[position].eTags[i - 1] = _provider->getETag(std::string_view(reinterpret_cast<const char*>(result.getData()), result.getOffset()));
}
if (_multipartUploads[position].outstanding.fetch_sub(1) == 1) {
auto finished = [&callback, this](network::MessageResult& result) {
_completedMultiparts++;
std::forward<Callback>(callback)(result);
};
auto originalMsg = makeCallbackMessage(std::move(finished), _provider->completeMultiPartRequest(remotePath, _multipartUploads[position].uploadId, _multipartUploads[position].eTags), _provider->getAddress(), _provider->getPort(), nullptr, 0, traceId);
_multipartUploads[position].messages[parts] = std::move(originalMsg);
if (_multipartUploads[position].state != MultipartUpload::State::Aborted) [[likely]] {
auto finished = [&callback, &initalRequestResult, this](network::MessageResult& result) {
if (!result.success())
initalRequestResult.state = network::MessageState::Cancelled;
_completedMultiparts++;
std::forward<Callback>(callback)(initalRequestResult);
};
auto originalMsg = makeCallbackMessage(std::move(finished), _provider->completeMultiPartRequest(remotePath, _multipartUploads[position].uploadId, _multipartUploads[position].eTags), _provider->getAddress(), _provider->getPort(), nullptr, 0, traceId);
_multipartUploads[position].messages[parts] = std::move(originalMsg);
} else {
auto finished = [&callback, &initalRequestResult, this](network::MessageResult& /*result*/) {
initalRequestResult.state = network::MessageState::Cancelled;
_completedMultiparts++;
std::forward<Callback>(callback)(initalRequestResult);
};
auto originalMsg = makeCallbackMessage(std::move(finished), _provider->deleteRequestGeneric(remotePath, _multipartUploads[position].uploadId), _provider->getAddress(), _provider->getPort(), nullptr, 0, traceId);
_multipartUploads[position].messages[parts] = std::move(originalMsg);
}
_multipartUploads[position].state = MultipartUpload::State::Validating;
}
};
Expand Down
11 changes: 8 additions & 3 deletions src/cloud/aws.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ unique_ptr<utils::DataVector<uint8_t>> AWS::putRequestGeneric(const string& file
return make_unique<utils::DataVector<uint8_t>>(reinterpret_cast<uint8_t*>(httpHeader.data()), reinterpret_cast<uint8_t*>(httpHeader.data() + httpHeader.size()));
}
//---------------------------------------------------------------------------
unique_ptr<utils::DataVector<uint8_t>> AWS::deleteRequest(const string& filePath) const
unique_ptr<utils::DataVector<uint8_t>> AWS::deleteRequestGeneric(const string& filePath, const string_view uploadId) const
// Builds the http request for deleting an objects
{
if (!validKeys())
Expand All @@ -286,6 +286,11 @@ unique_ptr<utils::DataVector<uint8_t>> AWS::deleteRequest(const string& filePath
else
request.path = "/" + _settings.bucket + "/" + filePath;

// Is it a multipart upload?
if (!uploadId.empty()) {
request.queries.emplace("uploadId", uploadId);
}

request.bodyData = nullptr;
request.bodyLength = 0;
request.headers.emplace("Host", getAddress());
Expand Down Expand Up @@ -367,8 +372,8 @@ unique_ptr<utils::DataVector<uint8_t>> AWS::completeMultiPartRequest(const strin
request.path = "/" + _settings.bucket + "/" + filePath;

request.queries.emplace("uploadId", uploadId);
request.bodyData = nullptr;
request.bodyLength = 0;
request.bodyData = reinterpret_cast<const uint8_t*>(content.data());
request.bodyLength = content.size();
request.headers.emplace("Host", getAddress());
request.headers.emplace("x-amz-date", testEnviornment ? fakeAMZTimestamp : buildAMZTimestamp());
request.headers.emplace("Content-Length", to_string(content.size()));
Expand Down
13 changes: 9 additions & 4 deletions src/cloud/gcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ unique_ptr<utils::DataVector<uint8_t>> GCP::putRequestGeneric(const string& file
return make_unique<utils::DataVector<uint8_t>>(reinterpret_cast<uint8_t*>(httpHeader.data()), reinterpret_cast<uint8_t*>(httpHeader.data() + httpHeader.size()));
}
//---------------------------------------------------------------------------
unique_ptr<utils::DataVector<uint8_t>> GCP::deleteRequest(const string& filePath) const
unique_ptr<utils::DataVector<uint8_t>> GCP::deleteRequestGeneric(const string& filePath, const string_view uploadId) const
// Builds the http request for deleting objects
{
GCPSigner::Request request;
Expand All @@ -149,6 +149,11 @@ unique_ptr<utils::DataVector<uint8_t>> GCP::deleteRequest(const string& filePath
request.bodyData = nullptr;
request.bodyLength = 0;

// Is it a multipart upload?
if (!uploadId.empty()) {
request.queries.emplace("uploadId", uploadId);
}

auto date = testEnviornment ? fakeAMZTimestamp : buildAMZTimestamp();
request.queries.emplace("X-Goog-Date", date);
request.headers.emplace("Host", getAddress());
Expand Down Expand Up @@ -197,7 +202,7 @@ unique_ptr<utils::DataVector<uint8_t>> GCP::completeMultiPartRequest(const strin
string content = "<CompleteMultipartUpload>\n";
for (auto i = 0ull; i < etags.size(); i++) {
content += "<Part>\n<PartNumber>";
content += to_string(i+1);
content += to_string(i + 1);
content += "</PartNumber>\n<ETag>\"";
content += etags[i];
content += "\"</ETag>\n</Part>\n";
Expand All @@ -209,8 +214,8 @@ unique_ptr<utils::DataVector<uint8_t>> GCP::completeMultiPartRequest(const strin
request.type = "HTTP/1.1";
request.path = "/" + filePath;
request.queries.emplace("uploadId", uploadId);
request.bodyData = nullptr;
request.bodyLength = 0;
request.bodyData = reinterpret_cast<const uint8_t*>(content.data());
request.bodyLength = content.size();

auto date = testEnviornment ? fakeAMZTimestamp : buildAMZTimestamp();
request.queries.emplace("X-Goog-Date", date);
Expand Down
6 changes: 6 additions & 0 deletions src/cloud/provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,12 @@ unique_ptr<utils::DataVector<uint8_t>> Provider::putRequestGeneric(const string&
return nullptr;
}
//---------------------------------------------------------------------------
unique_ptr<utils::DataVector<uint8_t>> Provider::deleteRequestGeneric(const string& /*filePath*/, const string_view /*uploadId*/) const
// Builds the http request for delete multipart objects
{
return nullptr;
}
//---------------------------------------------------------------------------
unique_ptr<utils::DataVector<uint8_t>> Provider::createMultiPartRequest(const string& /*filePath*/) const
// Builds the http request for creating multipart put objects
{
Expand Down

0 comments on commit ad3eb18

Please sign in to comment.