Skip to content

Commit

Permalink
Added Multipart Abort
Browse files Browse the repository at this point in the history
  • Loading branch information
durner committed Jul 4, 2023
1 parent ece4ab7 commit 6ed5d09
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 21 deletions.
6 changes: 5 additions & 1 deletion include/cloud/aws.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,11 @@ class AWS : public Provider {
return putRequestGeneric(filePath, object, 0, "");
}
// Builds the http request for deleting an objects
[[nodiscard]] std::unique_ptr<utils::DataVector<uint8_t>> deleteRequest(const std::string& filePath) const override;
[[nodiscard]] std::unique_ptr<utils::DataVector<uint8_t>> deleteRequest(const std::string& filePath) const override {
return deleteRequestGeneric(filePath, "");
}
/// Builds the http request for deleting objects
[[nodiscard]] std::unique_ptr<utils::DataVector<uint8_t>> deleteRequestGeneric(const std::string& filePath, const std::string_view uploadId) const override;
/// Builds the http request for creating multipart put objects
[[nodiscard]] std::unique_ptr<utils::DataVector<uint8_t>> createMultiPartRequest(const std::string& filePath) const override;
/// Builds the http request for completing multipart put objects
Expand Down
8 changes: 6 additions & 2 deletions include/cloud/gcp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,12 @@ class GCP : public Provider {
[[nodiscard]] std::unique_ptr<utils::DataVector<uint8_t>> putRequest(const std::string& filePath, const std::string_view object) const override {
return putRequestGeneric(filePath, object, 0, "");
}
// Builds the http request for deleting an objects
[[nodiscard]] std::unique_ptr<utils::DataVector<uint8_t>> deleteRequest(const std::string& filePath) const override;
// Builds the http request for deleting an objects
[[nodiscard]] std::unique_ptr<utils::DataVector<uint8_t>> deleteRequest(const std::string& filePath) const override {
return deleteRequestGeneric(filePath, "");
}
/// Builds the http request for deleting objects
[[nodiscard]] std::unique_ptr<utils::DataVector<uint8_t>> deleteRequestGeneric(const std::string& filePath, const std::string_view uploadId) const override;
/// Builds the http request for creating multipart put objects
[[nodiscard]] std::unique_ptr<utils::DataVector<uint8_t>> createMultiPartRequest(const std::string& filePath) const override;
/// Builds the http request for completing multipart put objects
Expand Down
2 changes: 2 additions & 0 deletions include/cloud/provider.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ class Provider {
[[nodiscard]] virtual uint64_t multipartUploadSize() const { return 0; }
/// Builds the http request for putting multipart objects without the object data itself
[[nodiscard]] virtual std::unique_ptr<utils::DataVector<uint8_t>> putRequestGeneric(const std::string& /*filePath*/, const std::string_view /*object*/, uint16_t /*part*/, const std::string_view /*uploadId*/) const;
/// Builds the http request for deleting multipart aborted objects
[[nodiscard]] virtual std::unique_ptr<utils::DataVector<uint8_t>> deleteRequestGeneric(const std::string& /*filePath*/, const std::string_view /*uploadId*/) const;
/// Builds the http request for creating multipart put objects
[[nodiscard]] virtual std::unique_ptr<utils::DataVector<uint8_t>> createMultiPartRequest(const std::string& /*filePath*/) const;
/// Builds the http request for completing multipart put objects
Expand Down
40 changes: 25 additions & 15 deletions include/network/transaction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class Transaction {
Default = 0,
Sending = 1,
Validating = 2,
Aborted = 1u << 7
};
/// The uploadId
std::string uploadId;
Expand Down Expand Up @@ -166,22 +167,26 @@ class Transaction {
_completedMultiparts++;
return;
}

_multipartUploads[position].uploadId = _provider->getUploadId(result.getResult());
auto offset = 0ull;
for (auto i = 1ull; i <= parts; i++) {
auto finishMultipart = [position, remotePath, traceId, i, parts, this](network::MessageResult& result) {
// TODO: requires abort handling
if (!result.success())
return;

_multipartUploads[position].eTags[i - 1] = _provider->getETag(std::string_view(reinterpret_cast<const char*>(result.getData()), result.getOffset()));
if (!result.success()) [[unlikely]] {
_multipartUploads[position].state = MultipartUpload::State::Aborted;
} else {
_multipartUploads[position].eTags[i - 1] = _provider->getETag(std::string_view(reinterpret_cast<const char*>(result.getData()), result.getOffset()));
}
if (_multipartUploads[position].outstanding.fetch_sub(1) == 1) {
auto finished = [this](network::MessageResult& /*result*/) {
_completedMultiparts++;
};
auto originalMsg = makeCallbackMessage(std::move(finished), _provider->completeMultiPartRequest(remotePath, _multipartUploads[position].uploadId, _multipartUploads[position].eTags), _provider->getAddress(), _provider->getPort(), nullptr, 0, traceId);
_multipartUploads[position].messages[parts] = std::move(originalMsg);
if (_multipartUploads[position].state != MultipartUpload::State::Aborted) [[likely]] {
auto originalMsg = makeCallbackMessage(std::move(finished), _provider->completeMultiPartRequest(remotePath, _multipartUploads[position].uploadId, _multipartUploads[position].eTags), _provider->getAddress(), _provider->getPort(), nullptr, 0, traceId);
_multipartUploads[position].messages[parts] = std::move(originalMsg);
} else {
auto originalMsg = makeCallbackMessage(std::move(finished), _provider->deleteRequestGeneric(remotePath, _multipartUploads[position].uploadId), _provider->getAddress(), _provider->getPort(), nullptr, 0, traceId);
_multipartUploads[position].messages[parts] = std::move(originalMsg);
}
_multipartUploads[position].state = MultipartUpload::State::Validating;
}
};
Expand Down Expand Up @@ -217,18 +222,23 @@ class Transaction {
auto offset = 0ull;
for (auto i = 1ull; i <= parts; i++) {
auto finishMultipart = [&callback, position, remotePath, traceId, i, parts, this](network::MessageResult& result) {
// TODO: requires abort handling
if (!result.success())
return;

_multipartUploads[position].eTags[i - 1] = _provider->getETag(std::string_view(reinterpret_cast<const char*>(result.getData()), result.getOffset()));
if (!result.success()) [[unlikely]] {
_multipartUploads[position].state = MultipartUpload::State::Aborted;
} else {
_multipartUploads[position].eTags[i - 1] = _provider->getETag(std::string_view(reinterpret_cast<const char*>(result.getData()), result.getOffset()));
}
if (_multipartUploads[position].outstanding.fetch_sub(1) == 1) {
auto finished = [&callback, this](network::MessageResult& result) {
_completedMultiparts++;
std::forward<Callback>(callback)(result);
};
auto originalMsg = makeCallbackMessage(std::move(finished), _provider->completeMultiPartRequest(remotePath, _multipartUploads[position].uploadId, _multipartUploads[position].eTags), _provider->getAddress(), _provider->getPort(), nullptr, 0, traceId);
_multipartUploads[position].messages[parts] = std::move(originalMsg);
if (_multipartUploads[position].state != MultipartUpload::State::Aborted) [[likely]] {
auto originalMsg = makeCallbackMessage(std::move(finished), _provider->completeMultiPartRequest(remotePath, _multipartUploads[position].uploadId, _multipartUploads[position].eTags), _provider->getAddress(), _provider->getPort(), nullptr, 0, traceId);
_multipartUploads[position].messages[parts] = std::move(originalMsg);
} else {
auto originalMsg = makeCallbackMessage(std::move(finished), _provider->deleteRequestGeneric(remotePath, _multipartUploads[position].uploadId), _provider->getAddress(), _provider->getPort(), nullptr, 0, traceId);
_multipartUploads[position].messages[parts] = std::move(originalMsg);
}
_multipartUploads[position].state = MultipartUpload::State::Validating;
}
};
Expand Down
7 changes: 6 additions & 1 deletion src/cloud/aws.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ unique_ptr<utils::DataVector<uint8_t>> AWS::putRequestGeneric(const string& file
return make_unique<utils::DataVector<uint8_t>>(reinterpret_cast<uint8_t*>(httpHeader.data()), reinterpret_cast<uint8_t*>(httpHeader.data() + httpHeader.size()));
}
//---------------------------------------------------------------------------
unique_ptr<utils::DataVector<uint8_t>> AWS::deleteRequest(const string& filePath) const
unique_ptr<utils::DataVector<uint8_t>> AWS::deleteRequestGeneric(const string& filePath, const string_view uploadId) const
// Builds the http request for deleting an objects
{
if (!validKeys())
Expand All @@ -286,6 +286,11 @@ unique_ptr<utils::DataVector<uint8_t>> AWS::deleteRequest(const string& filePath
else
request.path = "/" + _settings.bucket + "/" + filePath;

// Is it a multipart upload?
if (!uploadId.empty()) {
request.queries.emplace("uploadId", uploadId);
}

request.bodyData = nullptr;
request.bodyLength = 0;
request.headers.emplace("Host", getAddress());
Expand Down
9 changes: 7 additions & 2 deletions src/cloud/gcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ unique_ptr<utils::DataVector<uint8_t>> GCP::putRequestGeneric(const string& file
return make_unique<utils::DataVector<uint8_t>>(reinterpret_cast<uint8_t*>(httpHeader.data()), reinterpret_cast<uint8_t*>(httpHeader.data() + httpHeader.size()));
}
//---------------------------------------------------------------------------
unique_ptr<utils::DataVector<uint8_t>> GCP::deleteRequest(const string& filePath) const
unique_ptr<utils::DataVector<uint8_t>> GCP::deleteRequestGeneric(const string& filePath, const string_view uploadId) const
// Builds the http request for deleting objects
{
GCPSigner::Request request;
Expand All @@ -149,6 +149,11 @@ unique_ptr<utils::DataVector<uint8_t>> GCP::deleteRequest(const string& filePath
request.bodyData = nullptr;
request.bodyLength = 0;

// Is it a multipart upload?
if (!uploadId.empty()) {
request.queries.emplace("uploadId", uploadId);
}

auto date = testEnviornment ? fakeAMZTimestamp : buildAMZTimestamp();
request.queries.emplace("X-Goog-Date", date);
request.headers.emplace("Host", getAddress());
Expand Down Expand Up @@ -197,7 +202,7 @@ unique_ptr<utils::DataVector<uint8_t>> GCP::completeMultiPartRequest(const strin
string content = "<CompleteMultipartUpload>\n";
for (auto i = 0ull; i < etags.size(); i++) {
content += "<Part>\n<PartNumber>";
content += to_string(i+1);
content += to_string(i + 1);
content += "</PartNumber>\n<ETag>\"";
content += etags[i];
content += "\"</ETag>\n</Part>\n";
Expand Down
6 changes: 6 additions & 0 deletions src/cloud/provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,12 @@ unique_ptr<utils::DataVector<uint8_t>> Provider::putRequestGeneric(const string&
return nullptr;
}
//---------------------------------------------------------------------------
unique_ptr<utils::DataVector<uint8_t>> Provider::deleteRequestGeneric(const string& /*filePath*/, const string_view /*uploadId*/) const
// Builds the http request for delete multipart objects
{
return nullptr;
}
//---------------------------------------------------------------------------
unique_ptr<utils::DataVector<uint8_t>> Provider::createMultiPartRequest(const string& /*filePath*/) const
// Builds the http request for creating multipart put objects
{
Expand Down

0 comments on commit 6ed5d09

Please sign in to comment.