From 3eecdb60d9d09c80bf8522e26402d464c82c8287 Mon Sep 17 00:00:00 2001 From: Dominik Durner Date: Thu, 29 Jun 2023 12:00:39 +0200 Subject: [PATCH] Add MultiPart Upload implementation --- .../include/network/s3_send_receiver.hpp | 6 ++ include/cloud/aws.hpp | 13 ++- include/cloud/gcp.hpp | 14 ++- include/cloud/provider.hpp | 14 +++ src/cloud/aws.cpp | 93 ++++++++++++++++++- src/cloud/gcp.cpp | 74 ++++++++++++++- src/cloud/provider.cpp | 42 +++++++++ 7 files changed, 252 insertions(+), 4 deletions(-) diff --git a/example/benchmark/include/network/s3_send_receiver.hpp b/example/benchmark/include/network/s3_send_receiver.hpp index 2e40039..76419e9 100644 --- a/example/benchmark/include/network/s3_send_receiver.hpp +++ b/example/benchmark/include/network/s3_send_receiver.hpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -145,6 +146,7 @@ class S3SendReceiver { _config->maxConnections = threadsOrThroughput; _config->verifySSL = false; _config->enableTcpKeepAlive = true; + _config->executor = Aws::MakeShared("s3-downloader", threadsOrThroughput); } else { _config->throughputTargetGbps = threadsOrThroughput; } @@ -284,6 +286,10 @@ class S3SendReceiver { (*_timings)[submissionId].finish = std::chrono::steady_clock::now(); } submissions--; + } else { + std::cout << "S3 download error - retry object!" << std::endl; + while (!this->send(req)) {} + submissions--; } }); submissions++; diff --git a/include/cloud/aws.hpp b/include/cloud/aws.hpp index f42fe0b..0a57cc2 100644 --- a/include/cloud/aws.hpp +++ b/include/cloud/aws.hpp @@ -1,6 +1,7 @@ #pragma once #include "cloud/aws_instances.hpp" #include "cloud/provider.hpp" +#include "utils/data_vector.hpp" #include #include //--------------------------------------------------------------------------- @@ -102,13 +103,23 @@ class AWS : public Provider { [[nodiscard]] bool validKeys() const; /// Get the settings [[nodiscard]] inline Settings getSettings() { return _settings; } + /// Allows multipart upload if size > 0 + [[nodiscard]] uint64_t multipartUploadSize() const override { return 128ull << 20; } /// Builds the http request for downloading a blob or listing the directory [[nodiscard]] std::unique_ptr> getRequest(const std::string& filePath, const std::pair& range) const override; /// Builds the http request for putting objects without the object data itself - [[nodiscard]] std::unique_ptr> putRequest(const std::string& filePath, const std::string_view object) const override; + [[nodiscard]] std::unique_ptr> putRequestGeneric(const std::string& filePath, const std::string_view object, uint16_t part, const std::string_view uploadId) const override; + /// Builds the http request for putting objects without the object data itself + [[nodiscard]] std::unique_ptr> putRequest(const std::string& filePath, const std::string_view object) const override { + return putRequestGeneric(filePath, object, 0, ""); + } // Builds the http request for deleting an objects [[nodiscard]] std::unique_ptr> deleteRequest(const std::string& filePath) const override; + /// Builds the http request for creating multipart put objects + [[nodiscard]] std::unique_ptr> createMultiPartRequest(const std::string& filePath) const override; + /// Builds the http request for completing multipart put objects + [[nodiscard]] std::unique_ptr> completeMultiPartRequest(const std::string& filePath, const std::string_view uploadId, const std::vector& etags) const override; /// Get the address of the server [[nodiscard]] std::string getAddress() const override; diff --git a/include/cloud/gcp.hpp b/include/cloud/gcp.hpp index fe8e520..f9b062a 100644 --- a/include/cloud/gcp.hpp +++ b/include/cloud/gcp.hpp @@ -1,6 +1,7 @@ #pragma once #include "cloud/gcp_instances.hpp" #include "cloud/provider.hpp" +#include "utils/data_vector.hpp" #include #include //--------------------------------------------------------------------------- @@ -71,13 +72,24 @@ class GCP : public Provider { private: /// Get the settings [[nodiscard]] inline Settings getSettings() { return _settings; } + /// Allows multipart upload if size > 0 + [[nodiscard]] uint64_t multipartUploadSize() const override { return 128ull << 20; } + /// Builds the http request for downloading a blob or listing the directory [[nodiscard]] std::unique_ptr> getRequest(const std::string& filePath, const std::pair& range) const override; /// Builds the http request for putting objects without the object data itself - [[nodiscard]] std::unique_ptr> putRequest(const std::string& filePath, const std::string_view object) const override; + [[nodiscard]] std::unique_ptr> putRequestGeneric(const std::string& filePath, const std::string_view object, uint16_t part, const std::string_view uploadId) const override; + /// Builds the http request for putting objects without the object data itself + [[nodiscard]] std::unique_ptr> putRequest(const std::string& filePath, const std::string_view object) const override { + return putRequestGeneric(filePath, object, 0, ""); + } // Builds the http request for deleting an objects [[nodiscard]] std::unique_ptr> deleteRequest(const std::string& filePath) const override; + /// Builds the http request for creating multipart put objects + [[nodiscard]] std::unique_ptr> createMultiPartRequest(const std::string& filePath) const override; + /// Builds the http request for completing multipart put objects + [[nodiscard]] std::unique_ptr> completeMultiPartRequest(const std::string& filePath, const std::string_view uploadId, const std::vector& etags) const override; /// Get the address of the server [[nodiscard]] std::string getAddress() const override; diff --git a/include/cloud/provider.hpp b/include/cloud/provider.hpp index 3dd3a7f..bbad8a7 100644 --- a/include/cloud/provider.hpp +++ b/include/cloud/provider.hpp @@ -80,6 +80,16 @@ class Provider { [[nodiscard]] virtual std::string getAddress() const = 0; /// Get the port of the server [[nodiscard]] virtual uint32_t getPort() const = 0; + + /// Is multipart upload supported, if size > 0? + [[nodiscard]] virtual uint64_t multipartUploadSize() const { return 0; } + /// Builds the http request for putting multipart objects without the object data itself + [[nodiscard]] virtual std::unique_ptr> putRequestGeneric(const std::string& /*filePath*/, const std::string_view /*object*/, uint16_t /*part*/, const std::string_view /*uploadId*/) const; + /// Builds the http request for creating multipart put objects + [[nodiscard]] virtual std::unique_ptr> createMultiPartRequest(const std::string& /*filePath*/) const; + /// Builds the http request for completing multipart put objects + [[nodiscard]] virtual std::unique_ptr> completeMultiPartRequest(const std::string& /*filePath*/, const std::string_view /*uploadId*/, const std::vector& /*etags*/) const; + /// Initialize secret virtual void initSecret(network::TaskedSendReceiver& /*sendReceiver*/) {} @@ -96,6 +106,10 @@ class Provider { [[nodiscard]] static Provider::RemoteInfo getRemoteInfo(const std::string& fileName); /// Get the key from a keyFile [[nodiscard]] static std::string getKey(const std::string& keyFile); + /// Get the etag from the upload header + [[nodiscard]] static std::string getETag(const std::string_view header); + /// Get the upload id from the multipart request body + [[nodiscard]] static std::string getUploadId(const std::string_view body); /// Create a provider (keyId is access email for GCP/Azure) [[nodiscard]] static std::unique_ptr makeProvider(const std::string& filepath, bool https = true, const std::string& keyId = "", const std::string& keyFile = "", network::TaskedSendReceiver* sendReceiver = nullptr); diff --git a/src/cloud/aws.cpp b/src/cloud/aws.cpp index aa3183c..c01f0cf 100644 --- a/src/cloud/aws.cpp +++ b/src/cloud/aws.cpp @@ -228,7 +228,7 @@ unique_ptr> AWS::getRequest(const string& filePath, c return make_unique>(reinterpret_cast(httpHeader.data()), reinterpret_cast(httpHeader.data() + httpHeader.size())); } //--------------------------------------------------------------------------- -unique_ptr> AWS::putRequest(const string& filePath, const string_view object) const +unique_ptr> AWS::putRequestGeneric(const string& filePath, const string_view object, uint16_t part, const string_view uploadId) const // Builds the http request for putting objects without the object data itself { if (!validKeys()) @@ -244,6 +244,13 @@ unique_ptr> AWS::putRequest(const string& filePath, c request.path = "/" + filePath; else request.path = "/" + _settings.bucket + "/" + filePath; + + // Is it a multipart upload? + if (part) { + request.path += "?partNumber=" + to_string(part) + "&uploadId="; + request.path += uploadId; + } + request.bodyLength = object.size(); request.headers.emplace("Host", getAddress()); request.headers.emplace("x-amz-date", testEnviornment ? fakeAMZTimestamp : buildAMZTimestamp()); @@ -278,6 +285,7 @@ unique_ptr> AWS::deleteRequest(const string& filePath request.path = "/" + filePath; else request.path = "/" + _settings.bucket + "/" + filePath; + request.bodyData = nullptr; request.bodyLength = 0; request.headers.emplace("Host", getAddress()); @@ -297,6 +305,89 @@ unique_ptr> AWS::deleteRequest(const string& filePath return make_unique>(reinterpret_cast(httpHeader.data()), reinterpret_cast(httpHeader.data() + httpHeader.size())); } //--------------------------------------------------------------------------- +unique_ptr> AWS::createMultiPartRequest(const string& filePath) const +// Builds the http request for creating multipart upload objects +{ + if (!validKeys()) + return nullptr; + + AWSSigner::Request request; + request.method = "POST"; + request.type = "HTTP/1.1"; + + // If an endpoint is defined, we use the path-style request. The default is the usage of virtual hosted-style requests. + if (_settings.endpoint.empty()) + request.path = "/" + filePath; + else + request.path = "/" + _settings.bucket + "/" + filePath; + request.path += "?uploads"; + request.bodyData = nullptr; + request.bodyLength = 0; + request.headers.emplace("Host", getAddress()); + request.headers.emplace("x-amz-date", testEnviornment ? fakeAMZTimestamp : buildAMZTimestamp()); + request.headers.emplace("x-amz-request-payer", "requester"); + if (!_secret->sessionToken.empty()) + request.headers.emplace("x-amz-security-token", _secret->sessionToken); + + auto canonical = AWSSigner::createCanonicalRequest(request); + + AWSSigner::StringToSign stringToSign = {.request = request, .requestSHA = canonical.second, .region = _settings.region, .service = "s3"}; + const auto uri = AWSSigner::createSignedRequest(_secret->keyId, _secret->secret, stringToSign); + auto httpHeader = request.method + " " + uri + " " + request.type + "\r\n"; + for (auto& h : request.headers) + httpHeader += h.first + ": " + h.second + "\r\n"; + httpHeader += "\r\n"; + return make_unique>(reinterpret_cast(httpHeader.data()), reinterpret_cast(httpHeader.data() + httpHeader.size())); +} +//--------------------------------------------------------------------------- +unique_ptr> AWS::completeMultiPartRequest(const string& filePath, const string_view uploadId, const std::vector& etags) const +// Builds the http request for completing multipart upload objects +{ + if (!validKeys()) + return nullptr; + + string content = "\n"; + for (auto i = 0ull; i < etags.size(); i++) { + content += "\n"; + content += to_string(i+1); + content += "\n\""; + content += etags[i]; + content += "\"\n\n"; + } + content += "\n"; + + AWSSigner::Request request; + request.method = "POST"; + request.type = "HTTP/1.1"; + + // If an endpoint is defined, we use the path-style request. The default is the usage of virtual hosted-style requests. + if (_settings.endpoint.empty()) + request.path = "/" + filePath; + else + request.path = "/" + _settings.bucket + "/" + filePath; + request.path += "&uploadId="; + request.path += uploadId; + + request.bodyData = nullptr; + request.bodyLength = 0; + request.headers.emplace("Host", getAddress()); + request.headers.emplace("x-amz-date", testEnviornment ? fakeAMZTimestamp : buildAMZTimestamp()); + request.headers.emplace("Content-Length", to_string(content.size())); + request.headers.emplace("x-amz-request-payer", "requester"); + if (!_secret->sessionToken.empty()) + request.headers.emplace("x-amz-security-token", _secret->sessionToken); + + auto canonical = AWSSigner::createCanonicalRequest(request); + + AWSSigner::StringToSign stringToSign = {.request = request, .requestSHA = canonical.second, .region = _settings.region, .service = "s3"}; + const auto uri = AWSSigner::createSignedRequest(_secret->keyId, _secret->secret, stringToSign); + auto httpHeaderMessage = request.method + " " + uri + " " + request.type + "\r\n"; + for (auto& h : request.headers) + httpHeaderMessage += h.first + ": " + h.second + "\r\n"; + httpHeaderMessage += "\r\n" + content; + return make_unique>(reinterpret_cast(httpHeaderMessage.data()), reinterpret_cast(httpHeaderMessage.data() + httpHeaderMessage.size())); +} +//--------------------------------------------------------------------------- uint32_t AWS::getPort() const // Gets the port of AWS S3 on http { diff --git a/src/cloud/gcp.cpp b/src/cloud/gcp.cpp index 35d4e5e..8d1dd61 100644 --- a/src/cloud/gcp.cpp +++ b/src/cloud/gcp.cpp @@ -104,13 +104,20 @@ unique_ptr> GCP::getRequest(const string& filePath, c return make_unique>(reinterpret_cast(httpHeader.data()), reinterpret_cast(httpHeader.data() + httpHeader.size())); } //--------------------------------------------------------------------------- -unique_ptr> GCP::putRequest(const string& filePath, const string_view object) const +unique_ptr> GCP::putRequestGeneric(const string& filePath, const string_view object, uint16_t part, const string_view uploadId) const // Builds the http request for putting objects without the object data itself { GCPSigner::Request request; request.method = "PUT"; request.type = "HTTP/1.1"; request.path = "/" + filePath; + + // Is it a multipart upload? + if (part) { + request.path += "?partNumber=" + to_string(part) + "&uploadId="; + request.path += uploadId; + } + request.bodyData = reinterpret_cast(object.data()); request.bodyLength = object.size(); @@ -157,6 +164,71 @@ unique_ptr> GCP::deleteRequest(const string& filePath return make_unique>(reinterpret_cast(httpHeader.data()), reinterpret_cast(httpHeader.data() + httpHeader.size())); } //--------------------------------------------------------------------------- +unique_ptr> GCP::createMultiPartRequest(const string& filePath) const +// Builds the http request for creating multipart upload objects +{ + GCPSigner::Request request; + request.method = "POST"; + request.type = "HTTP/1.1"; + request.path = "/" + filePath; + request.path += "?uploads"; + request.bodyData = nullptr; + request.bodyLength = 0; + + auto date = testEnviornment ? fakeAMZTimestamp : buildAMZTimestamp(); + request.queries.emplace("X-Goog-Date", date); + request.headers.emplace("Host", getAddress()); + request.headers.emplace("Date", date); + + GCPSigner::StringToSign stringToSign = {.region = _settings.region, .service = "storage"}; + request.path = GCPSigner::createSignedRequest(_secret->serviceAccountEmail, _secret->privateKey, request, stringToSign); + + auto httpHeader = request.method + " " + request.path + " " + request.type + "\r\n"; + for (auto& h : request.headers) + httpHeader += h.first + ": " + h.second + "\r\n"; + httpHeader += "\r\n"; + + return make_unique>(reinterpret_cast(httpHeader.data()), reinterpret_cast(httpHeader.data() + httpHeader.size())); +} +//--------------------------------------------------------------------------- +unique_ptr> GCP::completeMultiPartRequest(const string& filePath, const string_view uploadId, const std::vector& etags) const +// Builds the http request for completing multipart upload objects +{ + string content = "\n"; + for (auto i = 0ull; i < etags.size(); i++) { + content += "\n"; + content += to_string(i+1); + content += "\n\""; + content += etags[i]; + content += "\"\n\n"; + } + content += "\n"; + + GCPSigner::Request request; + request.method = "POST"; + request.type = "HTTP/1.1"; + request.path = "/" + filePath; + request.path += "&uploadId="; + request.path += uploadId; + request.bodyData = nullptr; + request.bodyLength = 0; + + auto date = testEnviornment ? fakeAMZTimestamp : buildAMZTimestamp(); + request.queries.emplace("X-Goog-Date", date); + request.headers.emplace("Host", getAddress()); + request.headers.emplace("Date", date); + request.headers.emplace("Content-Length", to_string(content.size())); + + GCPSigner::StringToSign stringToSign = {.region = _settings.region, .service = "storage"}; + request.path = GCPSigner::createSignedRequest(_secret->serviceAccountEmail, _secret->privateKey, request, stringToSign); + + auto httpHeaderMessage = request.method + " " + request.path + " " + request.type + "\r\n"; + for (auto& h : request.headers) + httpHeaderMessage += h.first + ": " + h.second + "\r\n"; + httpHeaderMessage += "\r\n" + content; + return make_unique>(reinterpret_cast(httpHeaderMessage.data()), reinterpret_cast(httpHeaderMessage.data() + httpHeaderMessage.size())); +} +//--------------------------------------------------------------------------- uint32_t GCP::getPort() const // Gets the port of GCP on http { diff --git a/src/cloud/provider.cpp b/src/cloud/provider.cpp index 2a5b1a8..f8610c2 100644 --- a/src/cloud/provider.cpp +++ b/src/cloud/provider.cpp @@ -93,6 +93,48 @@ string Provider::getKey(const string& keyFile) return string((istreambuf_iterator(ifs)), (istreambuf_iterator())); } //--------------------------------------------------------------------------- +string Provider::getETag(const string_view header) +// Get the etag from the upload header +{ + string needle = "ETag: \""; + auto pos = header.find(needle); + if (pos == header.npos) + return ""; + pos += needle.length(); + auto end = header.find("\"", pos); + return string(header.substr(pos, end - pos)); +} +//--------------------------------------------------------------------------- +string Provider::getUploadId(const string_view body) +// Get the upload id from the multipart request body +{ + string needle = ""; + auto pos = body.find(needle); + if (pos == body.npos) + return ""; + pos += needle.length(); + auto end = body.find("", pos); + return string(body.substr(pos, end - pos)); +} +//--------------------------------------------------------------------------- +unique_ptr> Provider::putRequestGeneric(const string& /*filePath*/, const string_view /*object*/, uint16_t /*part*/, const string_view /*uploadId*/) const +// Builds the http request for putting multipart objects without the object data itself +{ + return nullptr; +} +//--------------------------------------------------------------------------- +unique_ptr> Provider::createMultiPartRequest(const string& /*filePath*/) const +// Builds the http request for creating multipart put objects +{ + return nullptr; +} +//--------------------------------------------------------------------------- +unique_ptr> Provider::completeMultiPartRequest(const string& /*filePath*/, const string_view /*uploadId*/, const vector& /*etags*/) const +// Builds the http request for completing multipart put objects +{ + return nullptr; +} +//--------------------------------------------------------------------------- unique_ptr Provider::makeProvider(const string& filepath, bool https, const string& keyId, const string& secret, network::TaskedSendReceiver* sendReceiver) // Create a provider {