From dd6a5f2adaf8c027187f17c0132a21fdb4e4998f Mon Sep 17 00:00:00 2001 From: Dominik Durner Date: Sun, 3 Dec 2023 20:13:41 +0100 Subject: [PATCH] Integrating s3 express one zone API and added verifyKeyRequest method --- .clang-format | 141 +++++--- .../include/network/s3_send_receiver.hpp | 2 +- example/benchmark/src/benchmark/bandwidth.cpp | 2 +- example/simple/main.cpp | 2 +- include/cloud/aws.hpp | 51 ++- include/cloud/gcp.hpp | 3 +- include/cloud/provider.hpp | 12 +- include/network/config.hpp | 1 - include/network/message_task.hpp | 3 +- include/network/tls_context.hpp | 6 +- include/network/transaction.hpp | 63 +++- include/utils/data_vector.hpp | 12 +- include/utils/load_tracker.hpp | 4 +- include/utils/ring_buffer.hpp | 8 +- include/utils/unordered_map.hpp | 22 +- src/cloud/aws.cpp | 314 ++++++++++++------ src/cloud/azure.cpp | 9 +- src/cloud/gcp.cpp | 8 +- src/cloud/provider.cpp | 8 +- test/integration/minio_async.cpp | 47 ++- test/unit/cloud/aws_test.cpp | 5 +- 21 files changed, 486 insertions(+), 237 deletions(-) diff --git a/.clang-format b/.clang-format index 6255728..af987d2 100644 --- a/.clang-format +++ b/.clang-format @@ -1,36 +1,68 @@ --- -Language: Cpp -# BasedOnStyle: Chromium +Language: Cpp AccessModifierOffset: 0 AlignAfterOpenBracket: Align -AlignConsecutiveAssignments: false -AlignConsecutiveDeclarations: false +AlignArrayOfStructures: None +AlignConsecutiveAssignments: + Enabled: false + AcrossEmptyLines: false + AcrossComments: false + AlignCompound: false + PadOperators: true +AlignConsecutiveBitFields: + Enabled: false + AcrossEmptyLines: false + AcrossComments: false + AlignCompound: false + PadOperators: true +AlignConsecutiveDeclarations: + Enabled: false + AcrossEmptyLines: false + AcrossComments: false + AlignCompound: false + PadOperators: true +AlignConsecutiveMacros: + Enabled: false + AcrossEmptyLines: false + AcrossComments: false + AlignCompound: false + PadOperators: true AlignEscapedNewlines: Left -AlignOperands: false -AlignTrailingComments: false +AlignOperands: Align +AlignTrailingComments: + Kind: Never + OverEmptyLines: 0 +AllowAllArgumentsOnNextLine: true AllowAllParametersOfDeclarationOnNextLine: true -AllowShortBlocksOnASingleLine: true +AllowShortBlocksOnASingleLine: Empty AllowShortCaseLabelsOnASingleLine: true AllowShortFunctionsOnASingleLine: All -AllowShortIfStatementsOnASingleLine: true +AllowShortIfStatementsOnASingleLine: OnlyFirstIf AllowShortLoopsOnASingleLine: true +AllowShortLambdasOnASingleLine: All AlwaysBreakAfterDefinitionReturnType: None AlwaysBreakAfterReturnType: None BinPackArguments: true BinPackParameters: true BraceWrapping: - AfterClass: false - AfterControlStatement: false - AfterEnum: false - AfterFunction: false - AfterNamespace: false + AfterClass: false + AfterControlStatement: MultiLine + AfterEnum: false + AfterFunction: false + AfterNamespace: false AfterObjCDeclaration: false - AfterStruct: false - AfterUnion: false + AfterStruct: false + AfterUnion: false AfterExternBlock: false - BeforeCatch: false - BeforeElse: false - IndentBraces: false + BeforeCatch: false + BeforeElse: false + IndentBraces: false + AfterCaseLabel: false + BeforeLambdaBody: false + BeforeWhile: false + SplitEmptyFunction: false + SplitEmptyRecord: false + SplitEmptyNamespace: false BreakBeforeBinaryOperators: None BreakBeforeBraces: Attach BreakBeforeInheritanceComma: false @@ -39,36 +71,50 @@ BreakConstructorInitializersBeforeComma: false BreakConstructorInitializers: BeforeColon BreakAfterJavaFieldAnnotations: false BreakStringLiterals: true -ColumnLimit: 0 -CommentPragmas: '^ IWYU pragma:' +ColumnLimit: 0 +CommentPragmas: "^ IWYU pragma:" +CompactNamespaces: false ConstructorInitializerAllOnOneLineOrOnePerLine: true ConstructorInitializerIndentWidth: 4 ContinuationIndentWidth: 4 Cpp11BracedListStyle: true DerivePointerAlignment: false -DisableFormat: false +DisableFormat: false ExperimentalAutoDetectBinPacking: true -ForEachMacros: [ foreach, Q_FOREACH, BOOST_FOREACH ] +FixNamespaceComments: true +ForEachMacros: + - foreach + - Q_FOREACH + - BOOST_FOREACH IncludeCategories: - - Regex: '^"' - Priority: 1 - - Regex: '^' - Priority: 2 - - Regex: '^<.*\.h>' - Priority: 4 - - Regex: '^<' - Priority: 3 - - Regex: '.\*' - Priority: 5 -IncludeIsMainRegex: '([-_](test|unittest))?$' + - Regex: ^" + Priority: 1 + - Regex: ^ + Priority: 2 + - Regex: ^<.*\.h> + Priority: 4 + - Regex: ^< + Priority: 3 + - Regex: .\* + Priority: 5 +IncludeIsMainRegex: ([-_](test|unittest))?$ +IndentAccessModifiers: false IndentCaseLabels: true -IndentWidth: 4 +IndentCaseBlocks: false +IndentExternBlock: AfterExternBlock +IndentGotoLabels: true +IndentPPDirectives: None +IndentRequiresClause: false +IndentWidth: 4 IndentWrappedFunctionNames: false +InsertBraces: false +InsertNewlineAtEOF: true +InsertTrailingCommas: None JavaScriptQuotes: Leave JavaScriptWrapImports: true KeepEmptyLinesAtTheStartOfBlocks: false -MacroBlockBegin: '' -MacroBlockEnd: '' +MacroBlockBegin: "" +MacroBlockEnd: "" MaxEmptyLinesToKeep: 1 NamespaceIndentation: None ObjCBlockIndentWidth: 4 @@ -81,20 +127,29 @@ PenaltyBreakString: 1000 PenaltyExcessCharacter: 1000000 PenaltyReturnTypeOnItsOwnLine: 200 PointerAlignment: Left -ReflowComments: false -SortIncludes: true +ReflowComments: false +SortIncludes: CaseSensitive SpaceAfterCStyleCast: true SpaceAfterTemplateKeyword: true SpaceBeforeAssignmentOperators: true SpaceBeforeParens: ControlStatements +SpaceBeforeParensOptions: + AfterControlStatements: true + AfterForeachMacros: true + AfterFunctionDefinitionName: false + AfterFunctionDeclarationName: false + AfterIfMacros: true + AfterOverloadedOperator: false + AfterRequiresInClause: false + AfterRequiresInExpression: false + BeforeNonEmptyParentheses: false SpaceInEmptyParentheses: false SpacesBeforeTrailingComments: 1 -SpacesInAngles: false +SpacesInAngles: Leave SpacesInContainerLiterals: false SpacesInCStyleCastParentheses: false SpacesInParentheses: false SpacesInSquareBrackets: false -Standard: Auto -TabWidth: 4 -UseTab: Never -... +Standard: Latest +TabWidth: 4 +UseTab: Never diff --git a/example/benchmark/include/network/s3_send_receiver.hpp b/example/benchmark/include/network/s3_send_receiver.hpp index 10157c3..add7ee7 100644 --- a/example/benchmark/include/network/s3_send_receiver.hpp +++ b/example/benchmark/include/network/s3_send_receiver.hpp @@ -15,7 +15,6 @@ #include #include #include -#include #include #include #include @@ -25,6 +24,7 @@ #include #include #include +#include // --------------------------------------------------------------------------- // AnyBlob - Universal Cloud Object Storage Library // Dominik Durner, 2021 diff --git a/example/benchmark/src/benchmark/bandwidth.cpp b/example/benchmark/src/benchmark/bandwidth.cpp index 1c9b74f..c8f8b3f 100644 --- a/example/benchmark/src/benchmark/bandwidth.cpp +++ b/example/benchmark/src/benchmark/bandwidth.cpp @@ -3,10 +3,10 @@ #include "cloud/aws_resolver.hpp" #include "cloud/azure.hpp" #include "cloud/gcp.hpp" -#include "network/transaction.hpp" #include "network/original_message.hpp" #include "network/s3_send_receiver.hpp" #include "network/tasked_send_receiver.hpp" +#include "network/transaction.hpp" #include "perfevent/PerfEvent.hpp" #include "utils/timer.hpp" #include "utils/utils.hpp" diff --git a/example/simple/main.cpp b/example/simple/main.cpp index 86005ab..4e52855 100644 --- a/example/simple/main.cpp +++ b/example/simple/main.cpp @@ -1,6 +1,6 @@ #include "cloud/provider.hpp" -#include "network/transaction.hpp" #include "network/tasked_send_receiver.hpp" +#include "network/transaction.hpp" #include #include //--------------------------------------------------------------------------- diff --git a/include/cloud/aws.hpp b/include/cloud/aws.hpp index 943f48e..e508d9e 100644 --- a/include/cloud/aws.hpp +++ b/include/cloud/aws.hpp @@ -1,8 +1,11 @@ #pragma once #include "cloud/aws_instances.hpp" +#include "cloud/aws_signer.hpp" #include "cloud/provider.hpp" #include "utils/data_vector.hpp" #include +#include +#include #include //--------------------------------------------------------------------------- // AnyBlob - Universal Cloud Object Storage Library @@ -37,6 +40,8 @@ class AWS : public Provider { std::string endpoint; /// The port int port = 80; + /// Is zonal request required? + bool zonal = false; }; /// The secret @@ -48,9 +53,9 @@ class AWS : public Provider { /// The secret std::string secret; /// The session token - std::string sessionToken; - /// The expieration - int64_t experiation; + std::string token; + /// The expiration + int64_t expiration; }; /// The fake AMZ timestamp @@ -61,10 +66,19 @@ class AWS : public Provider { protected: /// The settings Settings _settings; - /// The secret - std::unique_ptr _secret; + /// The global secret + std::shared_ptr _globalSecret; + /// The global session secret + std::shared_ptr _globalSessionSecret; /// The multipart upload size uint64_t _multipartUploadSize = 128ull << 20; + /// The secret mutex + std::mutex _mutex; + /// The thread local secret + thread_local static std::shared_ptr _secret; + /// The session secret + thread_local static std::shared_ptr _sessionSecret; + public: /// Get instance details @@ -75,7 +89,7 @@ class AWS : public Provider { void initResolver(network::TaskedSendReceiver& sendReceiver) override; /// The constructor - explicit AWS(const RemoteInfo& info) : _settings({info.bucket, info.region, info.endpoint, info.port}) { + explicit AWS(const RemoteInfo& info) : _settings({info.bucket, info.region, info.endpoint, info.port, info.zonal}), _mutex() { // Check for compatible clouds assert(info.provider == Provider::CloudService::AWS || info.provider == Provider::CloudService::MinIO || info.provider == Provider::CloudService::Oracle || info.provider == Provider::CloudService::IBM); // Requires a bucket @@ -87,27 +101,37 @@ class AWS : public Provider { } /// The custom endpoint constructor AWS(const RemoteInfo& info, const std::string& keyId, const std::string& key) : AWS(info) { - _secret = std::make_unique(); - _secret->keyId = keyId; - _secret->secret = key; + _globalSecret = std::make_unique(); + // At init it is fine to simply overwrite + _globalSecret->keyId = keyId; + _globalSecret->secret = key; + _secret = _globalSecret; } private: /// Initialize secret void initSecret(network::TaskedSendReceiver& sendReceiver) override; + /// Get a local copy of the global secret + void getSecret() override; /// Builds the secret http request [[nodiscard]] std::unique_ptr> downloadIAMUser() const; /// Builds the secret http request - [[nodiscard]] std::unique_ptr> downloadSecret(std::string_view content); + [[nodiscard]] std::unique_ptr> downloadSecret(std::string_view content, std::string& iamUser); /// Update secret - bool updateSecret(std::string_view content); + bool updateSecret(std::string_view content, std::string_view iamUser); + /// Update session token + bool updateSessionToken(std::string_view content); /// Checks whether the keys are still valid - [[nodiscard]] bool validKeys() const; + [[nodiscard]] bool validKeys(uint32_t offset = 60) const; + /// Checks whether the keys are still valid + [[nodiscard]] bool validSession(uint32_t offset = 60) const; /// Get the settings [[nodiscard]] inline Settings getSettings() { return _settings; } /// Allows multipart upload if size > 0 [[nodiscard]] uint64_t multipartUploadSize() const override { return _multipartUploadSize; } + /// Creates the generic http request and signs it + [[nodiscard]] std::unique_ptr> buildRequest(AWSSigner::Request& request, std::string_view payload = "", bool initHeaders = true) const; /// Builds the http request for downloading a blob or listing the directory [[nodiscard]] std::unique_ptr> getRequest(const std::string& filePath, const std::pair& range) const override; /// Builds the http request for putting objects without the object data itself @@ -126,7 +150,8 @@ class AWS : public Provider { [[nodiscard]] std::unique_ptr> createMultiPartRequest(const std::string& filePath) const override; /// Builds the http request for completing multipart put objects [[nodiscard]] std::unique_ptr> completeMultiPartRequest(const std::string& filePath, std::string_view uploadId, const std::vector& etags) const override; - + /// Builds the http request for getting the session token objects + [[nodiscard]] std::unique_ptr> getSessionToken(std::string_view type = "ReadWrite") const; /// Get the address of the server [[nodiscard]] std::string getAddress() const override; /// Get the port of the server diff --git a/include/cloud/gcp.hpp b/include/cloud/gcp.hpp index b81392b..587bb5e 100644 --- a/include/cloud/gcp.hpp +++ b/include/cloud/gcp.hpp @@ -75,7 +75,6 @@ class GCP : public Provider { /// Allows multipart upload if size > 0 [[nodiscard]] uint64_t multipartUploadSize() const override { return 128ull << 20; } - /// Builds the http request for downloading a blob or listing the directory [[nodiscard]] std::unique_ptr> getRequest(const std::string& filePath, const std::pair& range) const override; /// Builds the http request for putting objects without the object data itself @@ -84,7 +83,7 @@ class GCP : public Provider { [[nodiscard]] std::unique_ptr> putRequest(const std::string& filePath, std::string_view object) const override { return putRequestGeneric(filePath, object, 0, ""); } - // Builds the http request for deleting an objects + // Builds the http request for deleting an objects [[nodiscard]] std::unique_ptr> deleteRequest(const std::string& filePath) const override { return deleteRequestGeneric(filePath, ""); } diff --git a/include/cloud/provider.hpp b/include/cloud/provider.hpp index e03eb52..b0bc6e7 100644 --- a/include/cloud/provider.hpp +++ b/include/cloud/provider.hpp @@ -28,7 +28,7 @@ namespace cloud { //--------------------------------------------------------------------------- /// Implements the cloud provider abstraction class Provider { -public: + public: /// The remote prefixes count static constexpr unsigned remoteFileCount = 6; /// The remote prefixes @@ -59,6 +59,8 @@ class Provider { std::string endpoint = ""; /// The port int port = 80; + /// Is zonal endpoint? + bool zonal = false; }; /// Instance struct @@ -73,8 +75,7 @@ class Provider { uint64_t network; }; - -protected: + protected: CloudService _type; /// Builds the http request for downloading a blob or listing a directory [[nodiscard]] virtual std::unique_ptr> getRequest(const std::string& filePath, const std::pair& range) const = 0; @@ -100,8 +101,10 @@ class Provider { /// Initialize secret virtual void initSecret(network::TaskedSendReceiver& /*sendReceiver*/) {} + /// Get a local copy of the global secret + virtual void getSecret() {} -public: + public: /// The destructor virtual ~Provider() = default; /// Gets the cloud provider type @@ -121,7 +124,6 @@ class Provider { /// Parse a row from csv file [[nodiscard]] static std::vector parseCSVRow(std::string_view body); - /// Create a provider (keyId is access email for GCP/Azure) [[nodiscard]] static std::unique_ptr makeProvider(const std::string& filepath, bool https = true, const std::string& keyId = "", const std::string& keyFile = "", network::TaskedSendReceiver* sendReceiver = nullptr); diff --git a/include/network/config.hpp b/include/network/config.hpp index c886057..cc01e0c 100644 --- a/include/network/config.hpp +++ b/include/network/config.hpp @@ -21,7 +21,6 @@ struct Config { /// Total requests example: 100,000 Mbits / 400 Mbits = 250 Requests static constexpr uint64_t defaultCoreThroughput = 8000; - /// Throughput per core in Mbit/s uint64_t coreThroughput; /// Concurrent requests to achieve coreThroughput diff --git a/include/network/message_task.hpp b/include/network/message_task.hpp index f604b46..b5c1535 100644 --- a/include/network/message_task.hpp +++ b/include/network/message_task.hpp @@ -59,8 +59,7 @@ struct MessageTask { std::string_view s(reinterpret_cast(sendingMessage->message->data()), sendingMessage->message->size()); if (s.find("HTTP") != std::string_view::npos && sendingMessage->port == 443) { return std::make_unique(sendingMessage, context, std::forward(args)...); - } - else if (s.find("HTTP") != std::string_view::npos) { + } else if (s.find("HTTP") != std::string_view::npos) { return std::make_unique(sendingMessage, std::forward(args)...); } return nullptr; diff --git a/include/network/tls_context.hpp b/include/network/tls_context.hpp index 42ba5d9..3ee12e8 100644 --- a/include/network/tls_context.hpp +++ b/include/network/tls_context.hpp @@ -1,9 +1,9 @@ #pragma once -#include #include +#include #include -#include #include +#include //--------------------------------------------------------------------------- // AnyBlob - Universal Cloud Object Storage Library // Dominik Durner, 2023 @@ -28,7 +28,7 @@ class TLSContext { /// The cache mask static constexpr uint64_t cacheMask = (~0ull) >> (64 - cachePower); /// The session cache - std::array, 1ull << cachePower> _sessionCache; + std::array, 1ull << cachePower> _sessionCache; public: /// The constructor diff --git a/include/network/transaction.hpp b/include/network/transaction.hpp index c3cb09f..7dd5f2e 100644 --- a/include/network/transaction.hpp +++ b/include/network/transaction.hpp @@ -70,7 +70,7 @@ class Transaction { }; /// The provider - const cloud::Provider* _provider; + cloud::Provider* _provider; /// The message message_vector_type _messages; @@ -91,80 +91,114 @@ class Transaction { /// The constructor Transaction() : _provider(), _messages(), _messageCounter(), _multipartUploads(), _completedMultiparts() {} /// The explicit constructor with the provider - explicit Transaction(const cloud::Provider* provider) : _provider(provider), _messages(), _messageCounter(), _multipartUploads(), _completedMultiparts() {} + explicit Transaction(cloud::Provider* provider) : _provider(provider), _messages(), _messageCounter(), _multipartUploads(), _completedMultiparts() {} /// Set the provider - constexpr void setProvider(const cloud::Provider* provider) { this->_provider = provider; } + constexpr void setProvider(cloud::Provider* provider) { this->_provider = provider; } /// Sends the request messages to the task group void processAsync(network::TaskedSendReceiverGroup& group); /// Processes the request messages void processSync(TaskedSendReceiver& sendReceiver); + /// Function to ensure fresh keys before creating messages + /// This is needed to ensure valid keys before a message is requested + /// Simply forward a task send receiver the message function and the args of this message + template + bool verifyKeyRequest(TaskedSendReceiver& sendReceiver, Function&& func) { + assert(_provider); + _provider->initSecret(sendReceiver); + return std::forward(func)(); + } + /// Build a new get request for synchronous calls /// Note that the range is [start, end[, [0, 0[ gets the whole object - inline void getObjectRequest(const std::string& remotePath, std::pair range = {0, 0}, uint8_t* result = nullptr, uint64_t capacity = 0, uint64_t traceId = 0) { + inline bool getObjectRequest(const std::string& remotePath, std::pair range = {0, 0}, uint8_t* result = nullptr, uint64_t capacity = 0, uint64_t traceId = 0) { assert(_provider); + _provider->getSecret(); auto originalMsg = std::make_unique(_provider->getRequest(remotePath, range), _provider->getAddress(), _provider->getPort(), result, capacity, traceId); + if (!originalMsg) + return false; _messages.push_back(std::move(originalMsg)); + return true; } /// Build a new get request with callback /// Note that the range is [start, end[, [0, 0[ gets the whole object template - inline void getObjectRequest(Callback&& callback, const std::string& remotePath, std::pair range = {0, 0}, uint8_t* result = nullptr, uint64_t capacity = 0, uint64_t traceId = 0) { + inline bool getObjectRequest(Callback&& callback, const std::string& remotePath, std::pair range = {0, 0}, uint8_t* result = nullptr, uint64_t capacity = 0, uint64_t traceId = 0) { assert(_provider); + _provider->getSecret(); auto originalMsg = std::make_unique>(std::forward(callback), _provider->getRequest(remotePath, range), _provider->getAddress(), _provider->getPort(), result, capacity, traceId); + if (!originalMsg) + return false; _messages.push_back(std::move(originalMsg)); + return true; } /// Build a new put request for synchronous calls - inline void putObjectRequest(const std::string& remotePath, const char* data, uint64_t size, uint8_t* result = nullptr, uint64_t capacity = 0, uint64_t traceId = 0) { + inline bool putObjectRequest(const std::string& remotePath, const char* data, uint64_t size, uint8_t* result = nullptr, uint64_t capacity = 0, uint64_t traceId = 0) { assert(_provider); + _provider->getSecret(); if (_provider->multipartUploadSize() && size > _provider->multipartUploadSize()) return putObjectRequestMultiPart(remotePath, data, size, result, capacity, traceId); auto object = std::string_view(data, size); auto originalMsg = std::make_unique(_provider->putRequest(remotePath, object), _provider->getAddress(), _provider->getPort(), result, capacity, traceId); originalMsg->setPutRequestData(reinterpret_cast(data), size); + if (!originalMsg) + return false; _messages.push_back(std::move(originalMsg)); + return true; } /// Build a new put request with callback template - inline void putObjectRequest(Callback&& callback, const std::string& remotePath, const char* data, uint64_t size, uint8_t* result = nullptr, uint64_t capacity = 0, uint64_t traceId = 0) { + inline bool putObjectRequest(Callback&& callback, const std::string& remotePath, const char* data, uint64_t size, uint8_t* result = nullptr, uint64_t capacity = 0, uint64_t traceId = 0) { assert(_provider); + _provider->getSecret(); if (_provider->multipartUploadSize() && size > _provider->multipartUploadSize()) return putObjectRequestMultiPart(std::forward(callback), remotePath, data, size, result, capacity, traceId); auto object = std::string_view(data, size); auto originalMsg = std::make_unique>(std::forward(callback), _provider->putRequest(remotePath, object), _provider->getAddress(), _provider->getPort(), result, capacity, traceId); originalMsg->setPutRequestData(reinterpret_cast(data), size); + if (!originalMsg) + return false; _messages.push_back(std::move(originalMsg)); + return true; } /// Build a new delete request for synchronous calls - inline void deleteObjectRequest(const std::string& remotePath, uint8_t* result = nullptr, uint64_t capacity = 0, uint64_t traceId = 0) { + inline bool deleteObjectRequest(const std::string& remotePath, uint8_t* result = nullptr, uint64_t capacity = 0, uint64_t traceId = 0) { assert(_provider); + _provider->getSecret(); auto originalMsg = std::make_unique(_provider->deleteRequest(remotePath), _provider->getAddress(), _provider->getPort(), result, capacity, traceId); + if (!originalMsg) + return false; _messages.push_back(std::move(originalMsg)); + return true; } /// Build a new delete request with callback template - inline void deleteObjectRequest(Callback&& callback, const std::string& remotePath, uint8_t* result = nullptr, uint64_t capacity = 0, uint64_t traceId = 0) { + inline bool deleteObjectRequest(Callback&& callback, const std::string& remotePath, uint8_t* result = nullptr, uint64_t capacity = 0, uint64_t traceId = 0) { assert(_provider); + _provider->getSecret(); auto originalMsg = std::make_unique>(std::forward(callback), _provider->deleteRequest(remotePath), _provider->getAddress(), _provider->getPort(), result, capacity, traceId); + if (!originalMsg) + return false; _messages.push_back(std::move(originalMsg)); + return true; } private: /// Build a new put request for synchronous calls - inline void putObjectRequestMultiPart(const std::string& remotePath, const char* data, uint64_t size, uint8_t* result = nullptr, uint64_t capacity = 0, uint64_t traceId = 0) { + inline bool putObjectRequestMultiPart(const std::string& remotePath, const char* data, uint64_t size, uint8_t* result = nullptr, uint64_t capacity = 0, uint64_t traceId = 0) { auto finished = [](network::MessageResult& /*result*/) {}; - putObjectRequestMultiPart(std::move(finished), remotePath, data, size, result, capacity, traceId); + return putObjectRequestMultiPart(std::move(finished), remotePath, data, size, result, capacity, traceId); } /// Build a new put request with callback template - inline void putObjectRequestMultiPart(Callback&& callback, const std::string& remotePath, const char* data, uint64_t size, uint8_t* result = nullptr, uint64_t capacity = 0, uint64_t traceId = 0) { + inline bool putObjectRequestMultiPart(Callback&& callback, const std::string& remotePath, const char* data, uint64_t size, uint8_t* result = nullptr, uint64_t capacity = 0, uint64_t traceId = 0) { assert(_provider); auto splitSize = _provider->multipartUploadSize(); auto parts = (size / splitSize) + ((size % splitSize) ? 1u : 0u); @@ -176,10 +210,12 @@ class Transaction { _completedMultiparts++; return; } + _provider->getSecret(); _multipartUploads[position].uploadId = _provider->getUploadId(initalRequestResult.getResult()); auto offset = 0ull; for (auto i = 1ull; i <= parts; i++) { auto finishMultipart = [&callback, &initalRequestResult, position, remotePath, traceId, i, parts, this](network::MessageResult& result) { + _provider->getSecret(); if (!result.success()) [[unlikely]] { _multipartUploads[position].errorMessageId = i - 1; _multipartUploads[position].state = MultipartUpload::State::Aborted; @@ -222,7 +258,10 @@ class Transaction { }; auto originalMsg = makeCallbackMessage(std::move(uploadMessages), _provider->createMultiPartRequest(remotePath), _provider->getAddress(), _provider->getPort(), result, capacity, traceId); + if (!originalMsg) + return false; _messages.push_back(std::move(originalMsg)); + return true; } public: diff --git a/include/utils/data_vector.hpp b/include/utils/data_vector.hpp index 9b7b774..bc9cb9c 100644 --- a/include/utils/data_vector.hpp +++ b/include/utils/data_vector.hpp @@ -55,22 +55,22 @@ class DataVector { } /// Get the data - [[ nodiscard ]] constexpr T* data() { + [[nodiscard]] constexpr T* data() { return _data; } /// Get the data - [[ nodiscard ]] constexpr const T* cdata() const { + [[nodiscard]] constexpr const T* cdata() const { return _data; } /// Get the size - [[ nodiscard ]] constexpr uint64_t size() const { + [[nodiscard]] constexpr uint64_t size() const { return _size; } /// Get the capacity - [[ nodiscard ]] constexpr uint64_t capacity() const { + [[nodiscard]] constexpr uint64_t capacity() const { return _capacity; } @@ -80,7 +80,7 @@ class DataVector { } /// Is the data owned - [[ nodiscard ]] constexpr bool owned() { + [[nodiscard]] constexpr bool owned() { return _dataOwned || !_capacity; } @@ -107,7 +107,7 @@ class DataVector { } /// Transfer the ownership of the data - [[ nodiscard ]] constexpr std::unique_ptr transferBuffer() { + [[nodiscard]] constexpr std::unique_ptr transferBuffer() { return move(_dataOwned); } }; diff --git a/include/utils/load_tracker.hpp b/include/utils/load_tracker.hpp index c0048e0..f014eb1 100644 --- a/include/utils/load_tracker.hpp +++ b/include/utils/load_tracker.hpp @@ -107,8 +107,8 @@ class LoadTracker { /// Get the active cpu times static constexpr size_t getActiveTime(const CPUData& e) { return e.times[static_cast(CPUStates::S_USER)] + e.times[static_cast(CPUStates::S_NICE)] + - e.times[static_cast(CPUStates::S_SYSTEM)] + e.times[static_cast(CPUStates::S_IRQ)] + - e.times[static_cast(CPUStates::S_SOFTIRQ)] + e.times[static_cast(CPUStates::S_STEAL)]; + e.times[static_cast(CPUStates::S_SYSTEM)] + e.times[static_cast(CPUStates::S_IRQ)] + + e.times[static_cast(CPUStates::S_SOFTIRQ)] + e.times[static_cast(CPUStates::S_STEAL)]; } /// Write stats as csv diff --git a/include/utils/ring_buffer.hpp b/include/utils/ring_buffer.hpp index a69eb90..6e93b07 100644 --- a/include/utils/ring_buffer.hpp +++ b/include/utils/ring_buffer.hpp @@ -68,7 +68,7 @@ class RingBuffer { /// Insert a tuple into buffer template - [[ nodiscard ]] constexpr uint64_t insert(T tuple) { + [[nodiscard]] constexpr uint64_t insert(T tuple) { while (true) { std::unique_lock lock(_insert.mutex); auto seenHead = _seen.commited.load(std::memory_order_acquire); @@ -86,7 +86,7 @@ class RingBuffer { /// Insert a span into buffer template - [[ nodiscard ]] constexpr uint64_t insertAll(std::span tuples) { + [[nodiscard]] constexpr uint64_t insertAll(std::span tuples) { while (true) { std::unique_lock lock(_insert.mutex); auto seenHead = _seen.commited.load(); @@ -108,7 +108,7 @@ class RingBuffer { /// Consume from buffer template - [[ nodiscard ]] constexpr std::optional consume() { + [[nodiscard]] constexpr std::optional consume() { while (true) { std::unique_lock lock(_seen.mutex); auto curInsert = _insert.commited.load(std::memory_order_acquire); @@ -125,7 +125,7 @@ class RingBuffer { } /// Check if empty - [[ nodiscard ]] constexpr bool empty() const { + [[nodiscard]] constexpr bool empty() const { return !(_insert.commited.load(std::memory_order_acquire) - _seen.commited.load(std::memory_order_acquire)); } }; diff --git a/include/utils/unordered_map.hpp b/include/utils/unordered_map.hpp index 65d49d4..adab4f4 100644 --- a/include/utils/unordered_map.hpp +++ b/include/utils/unordered_map.hpp @@ -99,19 +99,19 @@ class UnorderedMap { } /// Equality operator - [[ nodiscard ]] constexpr bool operator==(const Iterator& rhs) const { + [[nodiscard]] constexpr bool operator==(const Iterator& rhs) const { return (_tableBucket == rhs._tableBucket) && (_valueBucket == rhs._valueBucket); } /// Non equality operator - [[ nodiscard ]] constexpr bool operator!=(const Iterator& rhs) const { + [[nodiscard]] constexpr bool operator!=(const Iterator& rhs) const { return (_tableBucket != rhs._tableBucket) || (_valueBucket != rhs._valueBucket); } /// Get the value bucket reference - [[ nodiscard ]] constexpr std::pair& operator*() const { + [[nodiscard]] constexpr std::pair& operator*() const { return _valueBucket->keyValue; } /// Get the value bucket pointer - [[ nodiscard ]] constexpr std::pair* operator->() const { + [[nodiscard]] constexpr std::pair* operator->() const { return &_valueBucket->keyValue; } @@ -139,14 +139,14 @@ class UnorderedMap { } /// The non-const begin iterator - [[ nodiscard ]] constexpr Iterator begin() { return Iterator(_map[0].get()); } + [[nodiscard]] constexpr Iterator begin() { return Iterator(_map[0].get()); } /// The non-const end iterator - [[ nodiscard ]] constexpr Iterator end() { return Iterator(); } + [[nodiscard]] constexpr Iterator end() { return Iterator(); } /// Returns the number of buckets - [[ nodiscard ]] constexpr size_t buckets() const { return _map.size(); } + [[nodiscard]] constexpr size_t buckets() const { return _map.size(); } /// Returns the size of the unordered map - [[ nodiscard ]] constexpr size_t size() const { return _size; } + [[nodiscard]] constexpr size_t size() const { return _size; } /// Insert element, returns iterator template @@ -186,7 +186,7 @@ class UnorderedMap { /// Erase element template - [[ nodiscard ]] constexpr bool erase(K&& key) { + [[nodiscard]] constexpr bool erase(K&& key) { auto position = Hash{}(key) % buckets(); std::unique_lock lock(_map[position]->sharedMutex); auto* chain = &_map[position]->chain; @@ -202,7 +202,7 @@ class UnorderedMap { } /// Erase element from iterator - [[ nodiscard ]] constexpr bool erase(Iterator it) { + [[nodiscard]] constexpr bool erase(Iterator it) { auto key = it->first; it.release(); return erase(std::move(key)); @@ -210,7 +210,7 @@ class UnorderedMap { /// Find element, returns iterator template - [[ nodiscard ]] constexpr Iterator find(K&& key) { + [[nodiscard]] constexpr Iterator find(K&& key) { auto position = Hash{}(key) % buckets(); std::shared_lock lock(_map[position]->sharedMutex); auto* chain = &_map[position]->chain; diff --git a/src/cloud/aws.cpp b/src/cloud/aws.cpp index d4799e4..813e9a5 100644 --- a/src/cloud/aws.cpp +++ b/src/cloud/aws.cpp @@ -1,14 +1,13 @@ #include "cloud/aws.hpp" #include "cloud/aws_resolver.hpp" -#include "cloud/aws_signer.hpp" #include "network/http_helper.hpp" #include "network/original_message.hpp" #include "network/tasked_send_receiver.hpp" #include "utils/data_vector.hpp" #include #include -#include #include +#include //--------------------------------------------------------------------------- // AnyBlob - Universal Cloud Object Storage Library // Dominik Durner, 2021 @@ -22,6 +21,9 @@ namespace cloud { //--------------------------------------------------------------------------- using namespace std; //--------------------------------------------------------------------------- +thread_local shared_ptr AWS::_secret = nullptr; +thread_local shared_ptr AWS::_sessionSecret = nullptr; +//--------------------------------------------------------------------------- static string buildAMZTimestamp() // Creates the AWS timestamp { @@ -56,8 +58,8 @@ Provider::Instance AWS::getInstanceDetails(network::TaskedSendReceiver& sendRece if (_type == Provider::CloudService::AWS) { auto message = downloadInstanceInfo(); auto originalMsg = make_unique(move(message), getIAMAddress(), getIAMPort()); - sendReceiver.send(originalMsg.get()); - sendReceiver.process(); + sendReceiver.sendSync(originalMsg.get()); + sendReceiver.processSync(); auto& content = originalMsg->result.getDataVector(); unique_ptr infoPtr; auto s = network::HTTPHelper::retrieveContent(content.cdata(), content.size(), infoPtr); @@ -72,12 +74,12 @@ Provider::Instance AWS::getInstanceDetails(network::TaskedSendReceiver& sendRece } //--------------------------------------------------------------------------- string AWS::getInstanceRegion(network::TaskedSendReceiver& sendReceiver) -// Uses the send receiver to initialize the secret +// Uses the send receiver to get the region { auto message = downloadInstanceInfo("placement/region"); auto originalMsg = make_unique(move(message), getIAMAddress(), getIAMPort()); - sendReceiver.send(originalMsg.get()); - sendReceiver.process(); + sendReceiver.sendSync(originalMsg.get()); + sendReceiver.processSync(); auto& content = originalMsg->result.getDataVector(); unique_ptr infoPtr; auto s = network::HTTPHelper::retrieveContent(content.cdata(), content.size(), infoPtr); @@ -94,7 +96,7 @@ unique_ptr> AWS::downloadIAMUser() const return make_unique>(reinterpret_cast(httpHeader.data()), reinterpret_cast(httpHeader.data() + httpHeader.size())); } //--------------------------------------------------------------------------- -unique_ptr> AWS::downloadSecret(string_view content) +unique_ptr> AWS::downloadSecret(string_view content, string& iamUser) // Builds the secret http request { auto pos = content.find("\n"); @@ -107,22 +109,21 @@ unique_ptr> AWS::downloadSecret(string_view content) httpHeader += getIAMAddress(); httpHeader += "\r\n\r\n"; - _secret = make_unique(); - _secret->iamUser = content.substr(0, pos); - + iamUser = content.substr(0, pos); return make_unique>(reinterpret_cast(httpHeader.data()), reinterpret_cast(httpHeader.data() + httpHeader.size())); } //--------------------------------------------------------------------------- -bool AWS::updateSecret(string_view content) +bool AWS::updateSecret(string_view content, string_view iamUser) // Update secret { + auto secret = make_shared(); string needle = "\"AccessKeyId\" : \""; auto pos = content.find(needle); if (pos == content.npos) return false; pos += needle.length(); auto end = content.find("\"", pos); - _secret->keyId = content.substr(pos, end - pos); + secret->keyId = content.substr(pos, end - pos); needle = "\"SecretAccessKey\" : \""; pos = content.find(needle); @@ -130,7 +131,7 @@ bool AWS::updateSecret(string_view content) return false; pos += needle.length(); end = content.find("\"", pos); - _secret->secret = content.substr(pos, end - pos); + secret->secret = content.substr(pos, end - pos); needle = "\"Token\" : \""; pos = content.find(needle); @@ -138,7 +139,7 @@ bool AWS::updateSecret(string_view content) return false; pos += needle.length(); end = content.find("\"", pos); - _secret->sessionToken = content.substr(pos, end - pos); + secret->token = content.substr(pos, end - pos); needle = "\"Expiration\" : \""; pos = content.find(needle); @@ -148,14 +149,72 @@ bool AWS::updateSecret(string_view content) end = content.find("\"", pos); auto sv = content.substr(pos, end - pos); string timestamp(sv.begin(), sv.end()); - _secret->experiation = convertIAMTimestamp(timestamp); + secret->expiration = convertIAMTimestamp(timestamp); + secret->iamUser = iamUser; + _globalSecret = secret; + _secret = secret; + return true; +} +//--------------------------------------------------------------------------- +bool AWS::updateSessionToken(string_view content) +// Update secret +{ + auto secret = make_shared(); + string needle = ""; + auto pos = content.find(needle); + if (pos == content.npos) + return false; + pos += needle.length(); + needle = ""; + auto end = content.find(needle, pos); + secret->keyId = content.substr(pos, end - pos); + + needle = ""; + pos = content.find(needle); + if (pos == content.npos) + return false; + pos += needle.length(); + needle = ""; + end = content.find(needle, pos); + secret->secret = content.substr(pos, end - pos); + + needle = ""; + pos = content.find(needle); + if (pos == content.npos) + return false; + pos += needle.length(); + needle = ""; + end = content.find(needle, pos); + secret->token = content.substr(pos, end - pos); + + needle = ""; + pos = content.find(needle); + if (pos == content.npos) + return false; + pos += needle.length(); + needle = ""; + end = content.find(needle, pos); + + auto sv = content.substr(pos, end - pos); + string timestamp(sv.begin(), sv.end()); + secret->expiration = convertIAMTimestamp(timestamp); + _globalSessionSecret = secret; + _sessionSecret = secret; return true; } //--------------------------------------------------------------------------- -bool AWS::validKeys() const +bool AWS::validKeys(uint32_t offset) const // Checks whether keys need to be refresehd { - if (!_secret || ((!_secret->sessionToken.empty() && _secret->experiation - 60 < chrono::system_clock::to_time_t(chrono::system_clock::now())) || _secret->secret.empty())) + if (!_secret || ((!_secret->token.empty() && _secret->expiration - offset < chrono::system_clock::to_time_t(chrono::system_clock::now())) || _secret->secret.empty())) + return false; + return true; +} +//--------------------------------------------------------------------------- +bool AWS::validSession(uint32_t offset) const +// Checks whether the session token needs to be refresehd +{ + if (!_sessionSecret || ((!_sessionSecret->token.empty() && _sessionSecret->expiration - offset < chrono::system_clock::to_time_t(chrono::system_clock::now())) || _sessionSecret->secret.empty())) return false; return true; } @@ -163,22 +222,68 @@ bool AWS::validKeys() const void AWS::initSecret(network::TaskedSendReceiver& sendReceiver) // Uses the send receiver to initialize the secret { - if (_type == Provider::CloudService::AWS) { - auto message = downloadIAMUser(); - auto originalMsg = make_unique(move(message), getIAMAddress(), getIAMPort()); - sendReceiver.send(originalMsg.get()); - sendReceiver.process(); - auto& content = originalMsg->result.getDataVector(); - unique_ptr infoPtr; - auto s = network::HTTPHelper::retrieveContent(content.cdata(), content.size(), infoPtr); - message = downloadSecret(s); - originalMsg = make_unique(move(message), getIAMAddress(), getIAMPort()); - sendReceiver.send(originalMsg.get()); - sendReceiver.process(); - auto& secretContent = originalMsg->result.getDataVector(); - infoPtr.reset(); - s = network::HTTPHelper::retrieveContent(secretContent.cdata(), secretContent.size(), infoPtr); - updateSecret(s); + if (_type == Provider::CloudService::AWS && !validKeys(180)) { + while (true) { + if (_mutex.try_lock()) { + _secret = _globalSecret; + if (validKeys(180)) + return; + auto secret = make_shared(); + auto message = downloadIAMUser(); + auto originalMsg = make_unique(move(message), getIAMAddress(), getIAMPort()); + sendReceiver.sendSync(originalMsg.get()); + sendReceiver.processSync(); + auto& content = originalMsg->result.getDataVector(); + unique_ptr infoPtr; + auto s = network::HTTPHelper::retrieveContent(content.cdata(), content.size(), infoPtr); + string iamUser; + message = downloadSecret(s, iamUser); + originalMsg = make_unique(move(message), getIAMAddress(), getIAMPort()); + sendReceiver.sendSync(originalMsg.get()); + sendReceiver.processSync(); + auto& secretContent = originalMsg->result.getDataVector(); + infoPtr.reset(); + s = network::HTTPHelper::retrieveContent(secretContent.cdata(), secretContent.size(), infoPtr); + updateSecret(s, iamUser); + _mutex.unlock(); + } + if (validKeys(60)) + return; + } + } + if (_type == Provider::CloudService::AWS && _settings.zonal && !validSession(180)) { + while (true) { + if (_mutex.try_lock()) { + _sessionSecret = _globalSessionSecret; + if (validSession(180)) + return; + + auto message = getSessionToken(); + auto originalMsg = make_unique(move(message), _settings.bucket + ".s3.amazonaws.com", getPort()); + sendReceiver.sendSync(originalMsg.get()); + sendReceiver.processSync(); + auto& secretContent = originalMsg->result.getDataVector(); + unique_ptr infoPtr; + auto s = network::HTTPHelper::retrieveContent(secretContent.cdata(), secretContent.size(), infoPtr); + updateSessionToken(s); + _mutex.unlock(); + } + if (validSession(60)) + return; + } + } +} +//--------------------------------------------------------------------------- +void AWS::getSecret() +// Updates the local secret +{ + if (!_secret) { + unique_lock lock(_mutex); + _secret = _globalSecret; + } + if (_type == Provider::CloudService::AWS && _settings.zonal && !_sessionSecret) { + unique_lock lock(_mutex); + _sessionSecret = _globalSessionSecret; } } //--------------------------------------------------------------------------- @@ -190,10 +295,39 @@ void AWS::initResolver(network::TaskedSendReceiver& sendReceiver) } } //--------------------------------------------------------------------------- +unique_ptr> AWS::buildRequest(AWSSigner::Request& request, string_view payload, bool initHeaders) const +// Creates and signs the request +{ + shared_ptr secret; + if (initHeaders) { + request.headers.emplace("Host", getAddress()); + request.headers.emplace("x-amz-date", testEnviornment ? fakeAMZTimestamp : buildAMZTimestamp()); + if (!_settings.zonal) { + request.headers.emplace("x-amz-request-payer", "requester"); + secret = _secret; + if (!secret->token.empty()) + request.headers.emplace("x-amz-security-token", secret->token); + } else { + secret = _sessionSecret; + request.headers.emplace("x-amz-s3session-token", secret->token); + } + } + + auto canonical = AWSSigner::createCanonicalRequest(request); + AWSSigner::StringToSign stringToSign = {.request = request, .requestSHA = canonical.second, .region = _settings.region, .service = "s3"}; + auto httpHeader = request.method + " "; + httpHeader += AWSSigner::createSignedRequest(secret->keyId, secret->secret, stringToSign) + " " + request.type + "\r\n"; + for (auto& h : request.headers) + httpHeader += h.first + ": " + h.second + "\r\n"; + httpHeader += "\r\n"; + httpHeader += payload; + return make_unique>(reinterpret_cast(httpHeader.data()), reinterpret_cast(httpHeader.data() + httpHeader.size())); +} +//--------------------------------------------------------------------------- unique_ptr> AWS::getRequest(const string& filePath, const pair& range) const // Builds the http request for downloading a blob { - if (!validKeys()) + if (!validKeys() || (_settings.zonal && !validSession())) return nullptr; AWSSigner::Request request; @@ -207,32 +341,20 @@ unique_ptr> AWS::getRequest(const string& filePath, c request.path = "/" + _settings.bucket + "/" + filePath; request.bodyData = nullptr; request.bodyLength = 0; - request.headers.emplace("Host", getAddress()); - request.headers.emplace("x-amz-date", testEnviornment ? fakeAMZTimestamp : buildAMZTimestamp()); - request.headers.emplace("x-amz-request-payer", "requester"); - if (!_secret->sessionToken.empty()) - request.headers.emplace("x-amz-security-token", _secret->sessionToken); if (range.first != range.second) { stringstream rangeString; rangeString << "bytes=" << range.first << "-" << range.second; request.headers.emplace("Range", rangeString.str()); } - auto canonical = AWSSigner::createCanonicalRequest(request); - AWSSigner::StringToSign stringToSign = {.request = request, .requestSHA = canonical.second, .region = _settings.region, .service = "s3"}; - const auto uri = AWSSigner::createSignedRequest(_secret->keyId, _secret->secret, stringToSign); - auto httpHeader = request.method + " " + uri + " " + request.type + "\r\n"; - for (auto& h : request.headers) - httpHeader += h.first + ": " + h.second + "\r\n"; - httpHeader += "\r\n"; - return make_unique>(reinterpret_cast(httpHeader.data()), reinterpret_cast(httpHeader.data() + httpHeader.size())); + return buildRequest(request); } //--------------------------------------------------------------------------- unique_ptr> AWS::putRequestGeneric(const string& filePath, string_view object, uint16_t part, string_view uploadId) const // Builds the http request for putting objects without the object data itself { - if (!validKeys()) + if (!validKeys() || (_settings.zonal && !validSession())) return nullptr; AWSSigner::Request request; @@ -254,27 +376,15 @@ unique_ptr> AWS::putRequestGeneric(const string& file request.bodyLength = object.size(); request.headers.emplace("Host", getAddress()); - request.headers.emplace("x-amz-date", testEnviornment ? fakeAMZTimestamp : buildAMZTimestamp()); request.headers.emplace("Content-Length", to_string(request.bodyLength)); - request.headers.emplace("x-amz-request-payer", "requester"); - if (!_secret->sessionToken.empty()) - request.headers.emplace("x-amz-security-token", _secret->sessionToken); - auto canonical = AWSSigner::createCanonicalRequest(request); - - AWSSigner::StringToSign stringToSign = {.request = request, .requestSHA = canonical.second, .region = _settings.region, .service = "s3"}; - const auto uri = AWSSigner::createSignedRequest(_secret->keyId, _secret->secret, stringToSign); - auto httpHeader = request.method + " " + uri + " " + request.type + "\r\n"; - for (auto& h : request.headers) - httpHeader += h.first + ": " + h.second + "\r\n"; - httpHeader += "\r\n"; - return make_unique>(reinterpret_cast(httpHeader.data()), reinterpret_cast(httpHeader.data() + httpHeader.size())); + return buildRequest(request); } //--------------------------------------------------------------------------- unique_ptr> AWS::deleteRequestGeneric(const string& filePath, string_view uploadId) const // Builds the http request for deleting an objects { - if (!validKeys()) + if (!validKeys() || (_settings.zonal && !validSession())) return nullptr; AWSSigner::Request request; @@ -294,27 +404,14 @@ unique_ptr> AWS::deleteRequestGeneric(const string& f request.bodyData = nullptr; request.bodyLength = 0; - request.headers.emplace("Host", getAddress()); - request.headers.emplace("x-amz-date", testEnviornment ? fakeAMZTimestamp : buildAMZTimestamp()); - request.headers.emplace("x-amz-request-payer", "requester"); - if (!_secret->sessionToken.empty()) - request.headers.emplace("x-amz-security-token", _secret->sessionToken); - auto canonical = AWSSigner::createCanonicalRequest(request); - - AWSSigner::StringToSign stringToSign = {.request = request, .requestSHA = canonical.second, .region = _settings.region, .service = "s3"}; - const auto uri = AWSSigner::createSignedRequest(_secret->keyId, _secret->secret, stringToSign); - auto httpHeader = request.method + " " + uri + " " + request.type + "\r\n"; - for (auto& h : request.headers) - httpHeader += h.first + ": " + h.second + "\r\n"; - httpHeader += "\r\n"; - return make_unique>(reinterpret_cast(httpHeader.data()), reinterpret_cast(httpHeader.data() + httpHeader.size())); + return buildRequest(request); } //--------------------------------------------------------------------------- unique_ptr> AWS::createMultiPartRequest(const string& filePath) const // Builds the http request for creating multipart upload objects { - if (!validKeys()) + if (!validKeys() || (_settings.zonal && !validSession())) return nullptr; AWSSigner::Request request; @@ -330,32 +427,20 @@ unique_ptr> AWS::createMultiPartRequest(const string& request.bodyData = nullptr; request.bodyLength = 0; request.headers.emplace("Host", getAddress()); - request.headers.emplace("x-amz-date", testEnviornment ? fakeAMZTimestamp : buildAMZTimestamp()); - request.headers.emplace("x-amz-request-payer", "requester"); - if (!_secret->sessionToken.empty()) - request.headers.emplace("x-amz-security-token", _secret->sessionToken); - - auto canonical = AWSSigner::createCanonicalRequest(request); - AWSSigner::StringToSign stringToSign = {.request = request, .requestSHA = canonical.second, .region = _settings.region, .service = "s3"}; - const auto uri = AWSSigner::createSignedRequest(_secret->keyId, _secret->secret, stringToSign); - auto httpHeader = request.method + " " + uri + " " + request.type + "\r\n"; - for (auto& h : request.headers) - httpHeader += h.first + ": " + h.second + "\r\n"; - httpHeader += "\r\n"; - return make_unique>(reinterpret_cast(httpHeader.data()), reinterpret_cast(httpHeader.data() + httpHeader.size())); + return buildRequest(request); } //--------------------------------------------------------------------------- unique_ptr> AWS::completeMultiPartRequest(const string& filePath, string_view uploadId, const std::vector& etags) const // Builds the http request for completing multipart upload objects { - if (!validKeys()) + if (!validKeys() || (_settings.zonal && !validSession())) return nullptr; string content = "\n"; for (auto i = 0ull; i < etags.size(); i++) { content += "\n"; - content += to_string(i+1); + content += to_string(i + 1); content += "\n\""; content += etags[i]; content += "\"\n\n"; @@ -375,22 +460,31 @@ unique_ptr> AWS::completeMultiPartRequest(const strin request.queries.emplace("uploadId", uploadId); request.bodyData = reinterpret_cast(content.data()); request.bodyLength = content.size(); - request.headers.emplace("Host", getAddress()); - request.headers.emplace("x-amz-date", testEnviornment ? fakeAMZTimestamp : buildAMZTimestamp()); request.headers.emplace("Content-Length", to_string(content.size())); - request.headers.emplace("x-amz-request-payer", "requester"); - if (!_secret->sessionToken.empty()) - request.headers.emplace("x-amz-security-token", _secret->sessionToken); - auto canonical = AWSSigner::createCanonicalRequest(request); + return buildRequest(request, content); +} +//--------------------------------------------------------------------------- +unique_ptr> AWS::getSessionToken(string_view type) const +// Builds the http request for retrieving the session token of createsession +{ + if (!validKeys()) + return nullptr; - AWSSigner::StringToSign stringToSign = {.request = request, .requestSHA = canonical.second, .region = _settings.region, .service = "s3"}; - const auto uri = AWSSigner::createSignedRequest(_secret->keyId, _secret->secret, stringToSign); - auto httpHeaderMessage = request.method + " " + uri + " " + request.type + "\r\n"; - for (auto& h : request.headers) - httpHeaderMessage += h.first + ": " + h.second + "\r\n"; - httpHeaderMessage += "\r\n" + content; - return make_unique>(reinterpret_cast(httpHeaderMessage.data()), reinterpret_cast(httpHeaderMessage.data() + httpHeaderMessage.size())); + AWSSigner::Request request; + request.method = "GET"; + request.type = "HTTP/1.1"; + request.path = "/"; + request.queries.emplace("session", ""); + request.bodyData = nullptr; + request.bodyLength = 0; + request.headers.emplace("Host", _settings.bucket + ".s3.amazonaws.com"); + request.headers.emplace("x-amz-create-session-mode", type); + request.headers.emplace("x-amz-date", testEnviornment ? fakeAMZTimestamp : buildAMZTimestamp()); + if (!_secret->token.empty()) + request.headers.emplace("x-amz-security-token", _secret->token); + + return buildRequest(request, "", false); } //--------------------------------------------------------------------------- uint32_t AWS::getPort() const @@ -404,6 +498,14 @@ string AWS::getAddress() const { if (!_settings.endpoint.empty()) return _settings.endpoint; + if (_settings.zonal) { + // remove --x-s3 and use at most 9 characters az id + -- + auto bucket = _settings.bucket.substr(0, _settings.bucket.size() - 6); + bucket = bucket.substr(bucket.size() - 11); + auto find = bucket.find("--"); + auto zone = bucket.substr(find + 2); + return _settings.bucket + ".s3express-" + zone + "." + _settings.region + ".amazonaws.com"; + } return _settings.bucket + ".s3." + _settings.region + ".amazonaws.com"; } //--------------------------------------------------------------------------- diff --git a/src/cloud/azure.cpp b/src/cloud/azure.cpp index 5f667bd..d070ed4 100644 --- a/src/cloud/azure.cpp +++ b/src/cloud/azure.cpp @@ -5,6 +5,7 @@ #include "network/tasked_send_receiver.hpp" #include "network/resolver.hpp" #include "utils/data_vector.hpp" +#include #include #include #include @@ -51,8 +52,8 @@ Provider::Instance Azure::getInstanceDetails(network::TaskedSendReceiver& sendRe { auto message = downloadInstanceInfo(); auto originalMsg = make_unique(move(message), getIAMAddress(), getIAMPort()); - sendReceiver.send(originalMsg.get()); - sendReceiver.process(); + sendReceiver.sendSync(originalMsg.get()); + sendReceiver.processSync(); auto& content = originalMsg->result.getDataVector(); unique_ptr infoPtr; auto s = network::HTTPHelper::retrieveContent(content.cdata(), content.size(), infoPtr); @@ -76,8 +77,8 @@ string Azure::getRegion(network::TaskedSendReceiver& sendReceiver) { auto message = downloadInstanceInfo(); auto originalMsg = make_unique(move(message), getIAMAddress(), getIAMPort()); - sendReceiver.send(originalMsg.get()); - sendReceiver.process(); + sendReceiver.sendSync(originalMsg.get()); + sendReceiver.processSync(); auto& content = originalMsg->result.getDataVector(); unique_ptr infoPtr; auto s = network::HTTPHelper::retrieveContent(content.cdata(), content.size(), infoPtr); diff --git a/src/cloud/gcp.cpp b/src/cloud/gcp.cpp index 94d8a0c..b28e465 100644 --- a/src/cloud/gcp.cpp +++ b/src/cloud/gcp.cpp @@ -46,8 +46,8 @@ Provider::Instance GCP::getInstanceDetails(network::TaskedSendReceiver& sendRece { auto message = downloadInstanceInfo(); auto originalMsg = make_unique(move(message), getIAMAddress(), getIAMPort()); - sendReceiver.send(originalMsg.get()); - sendReceiver.process(); + sendReceiver.sendSync(originalMsg.get()); + sendReceiver.processSync(); auto& content = originalMsg->result.getDataVector(); unique_ptr infoPtr; auto s = network::HTTPHelper::retrieveContent(content.cdata(), content.size(), infoPtr); @@ -65,8 +65,8 @@ string GCP::getInstanceRegion(network::TaskedSendReceiver& sendReceiver) { auto message = downloadInstanceInfo("zone"); auto originalMsg = make_unique(move(message), getIAMAddress(), getIAMPort()); - sendReceiver.send(originalMsg.get()); - sendReceiver.process(); + sendReceiver.sendSync(originalMsg.get()); + sendReceiver.processSync(); auto& content = originalMsg->result.getDataVector(); unique_ptr infoPtr; auto s = network::HTTPHelper::retrieveContent(content.cdata(), content.size(), infoPtr); diff --git a/src/cloud/provider.cpp b/src/cloud/provider.cpp index 2bb6f31..0badd08 100644 --- a/src/cloud/provider.cpp +++ b/src/cloud/provider.cpp @@ -5,8 +5,8 @@ #include "cloud/ibm.hpp" #include "cloud/minio.hpp" #include "cloud/oracle.hpp" -#include "network/tasked_send_receiver.hpp" #include "network/config.hpp" +#include "network/tasked_send_receiver.hpp" #include "utils/data_vector.hpp" #include #include @@ -81,6 +81,12 @@ Provider::RemoteInfo Provider::getRemoteInfo(const string& fileName) { info.bucket = bucketRegion; info.region = ""; } + if (!remoteFile[i].compare("s3://")) { + // Handle s3 one zone express + if (info.bucket.size() > 6 && info.bucket.find("--x-s3", info.bucket.size() - 7)) { + info.zonal = true; + } + } info.provider = static_cast(i); } } diff --git a/test/integration/minio_async.cpp b/test/integration/minio_async.cpp index 38fdfde..2627531 100644 --- a/test/integration/minio_async.cpp +++ b/test/integration/minio_async.cpp @@ -73,7 +73,7 @@ TEST_CASE("MinIO Asynchronous Integration") { { // Check the upload for success - std::atomic finishedMessages = 0; + atomic finishedMessages = 0; auto checkSuccess = [&finishedMessages](anyblob::network::MessageResult& result) { // Sucessful request REQUIRE(result.success()); @@ -82,8 +82,12 @@ TEST_CASE("MinIO Asynchronous Integration") { // Create the put request anyblob::network::Transaction putTxn(provider.get()); - for (auto i = 0u; i < 2; i++) - putTxn.putObjectRequest(checkSuccess, fileName[i], content[i].data(), content[i].size()); + for (auto i = 0u; i < 2; i++) { + auto putObjectRequest = [&putTxn, &fileName, &content, &checkSuccess, i]() { + return putTxn.putObjectRequest(checkSuccess, fileName[i], content[i].data(), content[i].size()); + }; + putTxn.verifyKeyRequest(sendReceiver, move(putObjectRequest)); + } // Upload the request asynchronously putTxn.processAsync(group); @@ -94,7 +98,7 @@ TEST_CASE("MinIO Asynchronous Integration") { } { // Check the upload for success - std::atomic finishedMessages = 0; + atomic finishedMessages = 0; auto checkSuccess = [&finishedMessages](anyblob::network::MessageResult& result) { // Sucessful request REQUIRE(result.success()); @@ -105,8 +109,12 @@ TEST_CASE("MinIO Asynchronous Integration") { auto minio = static_cast(provider.get()); minio->setMultipartUploadSize(6ull << 20); anyblob::network::Transaction putTxn(provider.get()); - for (auto i = 0u; i < 2; i++) - putTxn.putObjectRequest(checkSuccess, fileName[i], content[i].data(), content[i].size()); + for (auto i = 0u; i < 2; i++) { + auto putObjectRequest = [&putTxn, &fileName, &content, &checkSuccess, i]() { + return putTxn.putObjectRequest(checkSuccess, fileName[i], content[i].data(), content[i].size()); + }; + putTxn.verifyKeyRequest(sendReceiver, move(putObjectRequest)); + } while (finishedMessages != 2) { // Upload the new request asynchronously @@ -117,7 +125,7 @@ TEST_CASE("MinIO Asynchronous Integration") { } { // Check the upload for failure due to too small part - std::atomic finishedMessages = 0; + atomic finishedMessages = 0; auto checkSuccess = [&finishedMessages](anyblob::network::MessageResult& result) { // Sucessful request REQUIRE(!result.success()); @@ -128,7 +136,11 @@ TEST_CASE("MinIO Asynchronous Integration") { auto minio = static_cast(provider.get()); minio->setMultipartUploadSize(1ull << 20); // too small, requires at least 5MiB parts anyblob::network::Transaction putTxn(provider.get()); - putTxn.putObjectRequest(checkSuccess, fileName[1], content[1].data(), content[1].size()); + + auto putObjectRequest = [&putTxn, &fileName, &content, callback = move(checkSuccess)]() { + return putTxn.putObjectRequest(move(callback), fileName[1], content[1].data(), content[1].size()); + }; + putTxn.verifyKeyRequest(sendReceiver, move(putObjectRequest)); while (finishedMessages != 1) { // Upload the new request asynchronously @@ -138,7 +150,7 @@ TEST_CASE("MinIO Asynchronous Integration") { } } { - std::atomic finishedMessages = 0; + atomic finishedMessages = 0; // Create the get request anyblob::network::Transaction getTxn(provider.get()); for (auto i = 0u; i < 2; i++) { @@ -163,7 +175,11 @@ TEST_CASE("MinIO Asynchronous Integration") { finishedMessages++; }; - getTxn.getObjectRequest(std::move(checkSuccess), fileName[i]); + auto& currentFileName = fileName[i]; + auto getObjectRequest = [&getTxn, ¤tFileName, callback = move(checkSuccess)]() { + return getTxn.getObjectRequest(move(callback), currentFileName); + }; + getTxn.verifyKeyRequest(sendReceiver, move(getObjectRequest)); } // Retrieve the request asynchronously @@ -175,7 +191,7 @@ TEST_CASE("MinIO Asynchronous Integration") { } { // Check the delete for success - std::atomic finishedMessages = 0; + atomic finishedMessages = 0; auto checkSuccess = [&finishedMessages](anyblob::network::MessageResult& result) { // Sucessful request REQUIRE(result.success()); @@ -184,8 +200,13 @@ TEST_CASE("MinIO Asynchronous Integration") { // Create the delete request anyblob::network::Transaction deleteTxn(provider.get()); - for (auto i = 0u; i < 2; i++) - deleteTxn.deleteObjectRequest(checkSuccess, fileName[i]); + for (auto i = 0u; i < 2; i++) { + auto& currentFileName = fileName[i]; + auto deleteRequest = [&deleteTxn, ¤tFileName, callback = move(checkSuccess)]() { + return deleteTxn.deleteObjectRequest(move(callback), currentFileName); + }; + deleteTxn.verifyKeyRequest(sendReceiver, move(deleteRequest)); + } // Process the request asynchronously deleteTxn.processAsync(group); diff --git a/test/unit/cloud/aws_test.cpp b/test/unit/cloud/aws_test.cpp index 9e5857c..c5f5f56 100644 --- a/test/unit/cloud/aws_test.cpp +++ b/test/unit/cloud/aws_test.cpp @@ -39,14 +39,15 @@ class AWSTester { resultString = "GET /latest/meta-data/iam/security-credentials HTTP/1.1\r\nHost: 169.254.169.254\r\n\r\n"; REQUIRE(string_view(reinterpret_cast(dv->data()), dv->size()) == resultString); - dv = aws.downloadSecret("ABCDEF\n"); + string iamUser; + dv = aws.downloadSecret("ABCDEF\n", iamUser); resultString = "GET /latest/meta-data/iam/security-credentials/ABCDEF HTTP/1.1\r\nHost: 169.254.169.254\r\n\r\n"; REQUIRE(string_view(reinterpret_cast(dv->data()), dv->size()) == resultString); string keyService = "{\"AccessKeyId\" : \"ABC\", \"SecretAccessKey\" : \"ABC\", \"Token\" : \"ABC\", \"Expiration\" : \""; keyService += aws.fakeIAMTimestamp; keyService += "\"}"; - REQUIRE(aws.updateSecret(keyService)); + REQUIRE(aws.updateSecret(keyService, iamUser)); auto p = pair(numeric_limits::max(), numeric_limits::max()); dv = aws.getRequest("a/b/c.d", p);