Skip to content

Commit

Permalink
Add MultiPart Upload implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
durner committed Jun 29, 2023
1 parent d7e1d62 commit 40023be
Show file tree
Hide file tree
Showing 6 changed files with 246 additions and 4 deletions.
13 changes: 12 additions & 1 deletion include/cloud/aws.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once
#include "cloud/aws_instances.hpp"
#include "cloud/provider.hpp"
#include "utils/data_vector.hpp"
#include <cassert>
#include <string>
//---------------------------------------------------------------------------
Expand Down Expand Up @@ -102,13 +103,23 @@ class AWS : public Provider {
[[nodiscard]] bool validKeys() const;
/// Get the settings
[[nodiscard]] inline Settings getSettings() { return _settings; }
/// Allows multipart upload if size > 0
[[nodiscard]] uint64_t multipartUploadSize() const override { return 128ull << 20; }

/// Builds the http request for downloading a blob or listing the directory
[[nodiscard]] std::unique_ptr<utils::DataVector<uint8_t>> getRequest(const std::string& filePath, const std::pair<uint64_t, uint64_t>& range) const override;
/// Builds the http request for putting objects without the object data itself
[[nodiscard]] std::unique_ptr<utils::DataVector<uint8_t>> putRequest(const std::string& filePath, const std::string_view object) const override;
[[nodiscard]] std::unique_ptr<utils::DataVector<uint8_t>> putRequestGeneric(const std::string& filePath, const std::string_view object, uint16_t part, const std::string_view uploadId) const override;
/// Builds the http request for putting objects without the object data itself
[[nodiscard]] std::unique_ptr<utils::DataVector<uint8_t>> putRequest(const std::string& filePath, const std::string_view object) const override {
return putRequestGeneric(filePath, object, 0, "");
}
// Builds the http request for deleting an objects
[[nodiscard]] std::unique_ptr<utils::DataVector<uint8_t>> deleteRequest(const std::string& filePath) const override;
/// Builds the http request for creating multipart put objects
[[nodiscard]] std::unique_ptr<utils::DataVector<uint8_t>> createMultiPartRequest(const std::string& filePath) const override;
/// Builds the http request for completing multipart put objects
[[nodiscard]] std::unique_ptr<utils::DataVector<uint8_t>> completeMultiPartRequest(const std::string& filePath, const std::string_view uploadId, const std::vector<std::string>& etags) const override;

/// Get the address of the server
[[nodiscard]] std::string getAddress() const override;
Expand Down
14 changes: 13 additions & 1 deletion include/cloud/gcp.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once
#include "cloud/gcp_instances.hpp"
#include "cloud/provider.hpp"
#include "utils/data_vector.hpp"
#include <cassert>
#include <string>
//---------------------------------------------------------------------------
Expand Down Expand Up @@ -71,13 +72,24 @@ class GCP : public Provider {
private:
/// Get the settings
[[nodiscard]] inline Settings getSettings() { return _settings; }
/// Allows multipart upload if size > 0
[[nodiscard]] uint64_t multipartUploadSize() const override { return 128ull << 20; }


/// Builds the http request for downloading a blob or listing the directory
[[nodiscard]] std::unique_ptr<utils::DataVector<uint8_t>> getRequest(const std::string& filePath, const std::pair<uint64_t, uint64_t>& range) const override;
/// Builds the http request for putting objects without the object data itself
[[nodiscard]] std::unique_ptr<utils::DataVector<uint8_t>> putRequest(const std::string& filePath, const std::string_view object) const override;
[[nodiscard]] std::unique_ptr<utils::DataVector<uint8_t>> putRequestGeneric(const std::string& filePath, const std::string_view object, uint16_t part, const std::string_view uploadId) const override;
/// Builds the http request for putting objects without the object data itself
[[nodiscard]] std::unique_ptr<utils::DataVector<uint8_t>> putRequest(const std::string& filePath, const std::string_view object) const override {
return putRequestGeneric(filePath, object, 0, "");
}
// Builds the http request for deleting an objects
[[nodiscard]] std::unique_ptr<utils::DataVector<uint8_t>> deleteRequest(const std::string& filePath) const override;
/// Builds the http request for creating multipart put objects
[[nodiscard]] std::unique_ptr<utils::DataVector<uint8_t>> createMultiPartRequest(const std::string& filePath) const override;
/// Builds the http request for completing multipart put objects
[[nodiscard]] std::unique_ptr<utils::DataVector<uint8_t>> completeMultiPartRequest(const std::string& filePath, const std::string_view uploadId, const std::vector<std::string>& etags) const override;

/// Get the address of the server
[[nodiscard]] std::string getAddress() const override;
Expand Down
14 changes: 14 additions & 0 deletions include/cloud/provider.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,16 @@ class Provider {
[[nodiscard]] virtual std::string getAddress() const = 0;
/// Get the port of the server
[[nodiscard]] virtual uint32_t getPort() const = 0;

/// Is multipart upload supported, if size > 0?
[[nodiscard]] virtual uint64_t multipartUploadSize() const { return 0; }
/// Builds the http request for putting multipart objects without the object data itself
[[nodiscard]] virtual std::unique_ptr<utils::DataVector<uint8_t>> putRequestGeneric(const std::string& /*filePath*/, const std::string_view /*object*/, uint16_t /*part*/, const std::string_view /*uploadId*/) const;
/// Builds the http request for creating multipart put objects
[[nodiscard]] virtual std::unique_ptr<utils::DataVector<uint8_t>> createMultiPartRequest(const std::string& /*filePath*/) const;
/// Builds the http request for completing multipart put objects
[[nodiscard]] virtual std::unique_ptr<utils::DataVector<uint8_t>> completeMultiPartRequest(const std::string& /*filePath*/, const std::string_view /*uploadId*/, const std::vector<std::string>& /*etags*/) const;

/// Initialize secret
virtual void initSecret(network::TaskedSendReceiver& /*sendReceiver*/) {}

Expand All @@ -96,6 +106,10 @@ class Provider {
[[nodiscard]] static Provider::RemoteInfo getRemoteInfo(const std::string& fileName);
/// Get the key from a keyFile
[[nodiscard]] static std::string getKey(const std::string& keyFile);
/// Get the etag from the upload header
[[nodiscard]] static std::string getETag(const std::string_view header);
/// Get the upload id from the multipart request body
[[nodiscard]] static std::string getUploadId(const std::string_view body);

/// Create a provider (keyId is access email for GCP/Azure)
[[nodiscard]] static std::unique_ptr<Provider> makeProvider(const std::string& filepath, bool https = true, const std::string& keyId = "", const std::string& keyFile = "", network::TaskedSendReceiver* sendReceiver = nullptr);
Expand Down
93 changes: 92 additions & 1 deletion src/cloud/aws.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ unique_ptr<utils::DataVector<uint8_t>> AWS::getRequest(const string& filePath, c
return make_unique<utils::DataVector<uint8_t>>(reinterpret_cast<uint8_t*>(httpHeader.data()), reinterpret_cast<uint8_t*>(httpHeader.data() + httpHeader.size()));
}
//---------------------------------------------------------------------------
unique_ptr<utils::DataVector<uint8_t>> AWS::putRequest(const string& filePath, const string_view object) const
unique_ptr<utils::DataVector<uint8_t>> AWS::putRequestGeneric(const string& filePath, const string_view object, uint16_t part, const string_view uploadId) const
// Builds the http request for putting objects without the object data itself
{
if (!validKeys())
Expand All @@ -244,6 +244,13 @@ unique_ptr<utils::DataVector<uint8_t>> AWS::putRequest(const string& filePath, c
request.path = "/" + filePath;
else
request.path = "/" + _settings.bucket + "/" + filePath;

// Is it a multipart upload?
if (part) {
request.path += "?partNumber=" + to_string(part) + "&uploadId=";
request.path += uploadId;
}

request.bodyLength = object.size();
request.headers.emplace("Host", getAddress());
request.headers.emplace("x-amz-date", testEnviornment ? fakeAMZTimestamp : buildAMZTimestamp());
Expand Down Expand Up @@ -278,6 +285,7 @@ unique_ptr<utils::DataVector<uint8_t>> AWS::deleteRequest(const string& filePath
request.path = "/" + filePath;
else
request.path = "/" + _settings.bucket + "/" + filePath;

request.bodyData = nullptr;
request.bodyLength = 0;
request.headers.emplace("Host", getAddress());
Expand All @@ -297,6 +305,89 @@ unique_ptr<utils::DataVector<uint8_t>> AWS::deleteRequest(const string& filePath
return make_unique<utils::DataVector<uint8_t>>(reinterpret_cast<uint8_t*>(httpHeader.data()), reinterpret_cast<uint8_t*>(httpHeader.data() + httpHeader.size()));
}
//---------------------------------------------------------------------------
unique_ptr<utils::DataVector<uint8_t>> AWS::createMultiPartRequest(const string& filePath) const
// Builds the http request for creating multipart upload objects
{
if (!validKeys())
return nullptr;

AWSSigner::Request request;
request.method = "POST";
request.type = "HTTP/1.1";

// If an endpoint is defined, we use the path-style request. The default is the usage of virtual hosted-style requests.
if (_settings.endpoint.empty())
request.path = "/" + filePath;
else
request.path = "/" + _settings.bucket + "/" + filePath;
request.path += "?uploads";
request.bodyData = nullptr;
request.bodyLength = 0;
request.headers.emplace("Host", getAddress());
request.headers.emplace("x-amz-date", testEnviornment ? fakeAMZTimestamp : buildAMZTimestamp());
request.headers.emplace("x-amz-request-payer", "requester");
if (!_secret->sessionToken.empty())
request.headers.emplace("x-amz-security-token", _secret->sessionToken);

auto canonical = AWSSigner::createCanonicalRequest(request);

AWSSigner::StringToSign stringToSign = {.request = request, .requestSHA = canonical.second, .region = _settings.region, .service = "s3"};
const auto uri = AWSSigner::createSignedRequest(_secret->keyId, _secret->secret, stringToSign);
auto httpHeader = request.method + " " + uri + " " + request.type + "\r\n";
for (auto& h : request.headers)
httpHeader += h.first + ": " + h.second + "\r\n";
httpHeader += "\r\n";
return make_unique<utils::DataVector<uint8_t>>(reinterpret_cast<uint8_t*>(httpHeader.data()), reinterpret_cast<uint8_t*>(httpHeader.data() + httpHeader.size()));
}
//---------------------------------------------------------------------------
unique_ptr<utils::DataVector<uint8_t>> AWS::completeMultiPartRequest(const string& filePath, const string_view uploadId, const std::vector<std::string>& etags) const
// Builds the http request for completing multipart upload objects
{
if (!validKeys())
return nullptr;

string content = "<CompleteMultipartUpload>\n";
for (auto i = 0ull; i < etags.size(); i++) {
content += "<Part>\n<PartNumber>";
content += to_string(i+1);
content += "</PartNumber>\n<ETag>\"";
content += etags[i];
content += "\"</ETag>\n</Part>\n";
}
content += "</CompleteMultipartUpload>\n";

AWSSigner::Request request;
request.method = "POST";
request.type = "HTTP/1.1";

// If an endpoint is defined, we use the path-style request. The default is the usage of virtual hosted-style requests.
if (_settings.endpoint.empty())
request.path = "/" + filePath;
else
request.path = "/" + _settings.bucket + "/" + filePath;
request.path += "&uploadId=";
request.path += uploadId;

request.bodyData = nullptr;
request.bodyLength = 0;
request.headers.emplace("Host", getAddress());
request.headers.emplace("x-amz-date", testEnviornment ? fakeAMZTimestamp : buildAMZTimestamp());
request.headers.emplace("Content-Length", to_string(content.size()));
request.headers.emplace("x-amz-request-payer", "requester");
if (!_secret->sessionToken.empty())
request.headers.emplace("x-amz-security-token", _secret->sessionToken);

auto canonical = AWSSigner::createCanonicalRequest(request);

AWSSigner::StringToSign stringToSign = {.request = request, .requestSHA = canonical.second, .region = _settings.region, .service = "s3"};
const auto uri = AWSSigner::createSignedRequest(_secret->keyId, _secret->secret, stringToSign);
auto httpHeaderMessage = request.method + " " + uri + " " + request.type + "\r\n";
for (auto& h : request.headers)
httpHeaderMessage += h.first + ": " + h.second + "\r\n";
httpHeaderMessage += "\r\n" + content;
return make_unique<utils::DataVector<uint8_t>>(reinterpret_cast<uint8_t*>(httpHeaderMessage.data()), reinterpret_cast<uint8_t*>(httpHeaderMessage.data() + httpHeaderMessage.size()));
}
//---------------------------------------------------------------------------
uint32_t AWS::getPort() const
// Gets the port of AWS S3 on http
{
Expand Down
74 changes: 73 additions & 1 deletion src/cloud/gcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,20 @@ unique_ptr<utils::DataVector<uint8_t>> GCP::getRequest(const string& filePath, c
return make_unique<utils::DataVector<uint8_t>>(reinterpret_cast<uint8_t*>(httpHeader.data()), reinterpret_cast<uint8_t*>(httpHeader.data() + httpHeader.size()));
}
//---------------------------------------------------------------------------
unique_ptr<utils::DataVector<uint8_t>> GCP::putRequest(const string& filePath, const string_view object) const
unique_ptr<utils::DataVector<uint8_t>> GCP::putRequestGeneric(const string& filePath, const string_view object, uint16_t part, const string_view uploadId) const
// Builds the http request for putting objects without the object data itself
{
GCPSigner::Request request;
request.method = "PUT";
request.type = "HTTP/1.1";
request.path = "/" + filePath;

// Is it a multipart upload?
if (part) {
request.path += "?partNumber=" + to_string(part) + "&uploadId=";
request.path += uploadId;
}

request.bodyData = reinterpret_cast<const uint8_t*>(object.data());
request.bodyLength = object.size();

Expand Down Expand Up @@ -157,6 +164,71 @@ unique_ptr<utils::DataVector<uint8_t>> GCP::deleteRequest(const string& filePath
return make_unique<utils::DataVector<uint8_t>>(reinterpret_cast<uint8_t*>(httpHeader.data()), reinterpret_cast<uint8_t*>(httpHeader.data() + httpHeader.size()));
}
//---------------------------------------------------------------------------
unique_ptr<utils::DataVector<uint8_t>> GCP::createMultiPartRequest(const string& filePath) const
// Builds the http request for creating multipart upload objects
{
GCPSigner::Request request;
request.method = "POST";
request.type = "HTTP/1.1";
request.path = "/" + filePath;
request.path += "?uploads";
request.bodyData = nullptr;
request.bodyLength = 0;

auto date = testEnviornment ? fakeAMZTimestamp : buildAMZTimestamp();
request.queries.emplace("X-Goog-Date", date);
request.headers.emplace("Host", getAddress());
request.headers.emplace("Date", date);

GCPSigner::StringToSign stringToSign = {.region = _settings.region, .service = "storage"};
request.path = GCPSigner::createSignedRequest(_secret->serviceAccountEmail, _secret->privateKey, request, stringToSign);

auto httpHeader = request.method + " " + request.path + " " + request.type + "\r\n";
for (auto& h : request.headers)
httpHeader += h.first + ": " + h.second + "\r\n";
httpHeader += "\r\n";

return make_unique<utils::DataVector<uint8_t>>(reinterpret_cast<uint8_t*>(httpHeader.data()), reinterpret_cast<uint8_t*>(httpHeader.data() + httpHeader.size()));
}
//---------------------------------------------------------------------------
unique_ptr<utils::DataVector<uint8_t>> GCP::completeMultiPartRequest(const string& filePath, const string_view uploadId, const std::vector<std::string>& etags) const
// Builds the http request for completing multipart upload objects
{
string content = "<CompleteMultipartUpload>\n";
for (auto i = 0ull; i < etags.size(); i++) {
content += "<Part>\n<PartNumber>";
content += to_string(i+1);
content += "</PartNumber>\n<ETag>\"";
content += etags[i];
content += "\"</ETag>\n</Part>\n";
}
content += "</CompleteMultipartUpload>\n";

GCPSigner::Request request;
request.method = "POST";
request.type = "HTTP/1.1";
request.path = "/" + filePath;
request.path += "&uploadId=";
request.path += uploadId;
request.bodyData = nullptr;
request.bodyLength = 0;

auto date = testEnviornment ? fakeAMZTimestamp : buildAMZTimestamp();
request.queries.emplace("X-Goog-Date", date);
request.headers.emplace("Host", getAddress());
request.headers.emplace("Date", date);
request.headers.emplace("Content-Length", to_string(content.size()));

GCPSigner::StringToSign stringToSign = {.region = _settings.region, .service = "storage"};
request.path = GCPSigner::createSignedRequest(_secret->serviceAccountEmail, _secret->privateKey, request, stringToSign);

auto httpHeaderMessage = request.method + " " + request.path + " " + request.type + "\r\n";
for (auto& h : request.headers)
httpHeaderMessage += h.first + ": " + h.second + "\r\n";
httpHeaderMessage += "\r\n" + content;
return make_unique<utils::DataVector<uint8_t>>(reinterpret_cast<uint8_t*>(httpHeaderMessage.data()), reinterpret_cast<uint8_t*>(httpHeaderMessage.data() + httpHeaderMessage.size()));
}
//---------------------------------------------------------------------------
uint32_t GCP::getPort() const
// Gets the port of GCP on http
{
Expand Down
42 changes: 42 additions & 0 deletions src/cloud/provider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,48 @@ string Provider::getKey(const string& keyFile)
return string((istreambuf_iterator<char>(ifs)), (istreambuf_iterator<char>()));
}
//---------------------------------------------------------------------------
string Provider::getETag(const string_view header)
// Get the etag from the upload header
{
string needle = "ETag: \"";
auto pos = header.find(needle);
if (pos == header.npos)
return "";
pos += needle.length();
auto end = header.find("\"", pos);
return string(header.substr(pos, end - pos));
}
//---------------------------------------------------------------------------
string Provider::getUploadId(const string_view body)
// Get the upload id from the multipart request body
{
string needle = "<UploadId>";
auto pos = body.find(needle);
if (pos == body.npos)
return "";
pos += needle.length();
auto end = body.find("</UploadId>", pos);
return string(body.substr(pos, end - pos));
}
//---------------------------------------------------------------------------
unique_ptr<utils::DataVector<uint8_t>> Provider::putRequestGeneric(const string& /*filePath*/, const string_view /*object*/, uint16_t /*part*/, const string_view /*uploadId*/) const
// Builds the http request for putting multipart objects without the object data itself
{
return nullptr;
}
//---------------------------------------------------------------------------
unique_ptr<utils::DataVector<uint8_t>> Provider::createMultiPartRequest(const string& /*filePath*/) const
// Builds the http request for creating multipart put objects
{
return nullptr;
}
//---------------------------------------------------------------------------
unique_ptr<utils::DataVector<uint8_t>> Provider::completeMultiPartRequest(const string& /*filePath*/, const string_view /*uploadId*/, const vector<string>& /*etags*/) const
// Builds the http request for completing multipart put objects
{
return nullptr;
}
//---------------------------------------------------------------------------
unique_ptr<Provider> Provider::makeProvider(const string& filepath, bool https, const string& keyId, const string& secret, network::TaskedSendReceiver* sendReceiver)
// Create a provider
{
Expand Down

0 comments on commit 40023be

Please sign in to comment.