Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-40036: [C++] Azure file system write buffering & async writes #43096

Merged
merged 14 commits into from
Aug 21, 2024
276 changes: 247 additions & 29 deletions cpp/src/arrow/filesystem/azurefs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include "arrow/filesystem/azurefs.h"
#include "arrow/filesystem/azurefs_internal.h"
#include "arrow/io/memory.h"

// idenfity.hpp triggers -Wattributes warnings cause -Werror builds to fail,
// so disable it for this file with pragmas.
Expand Down Expand Up @@ -144,6 +145,9 @@ Status AzureOptions::ExtractFromUriQuery(const Uri& uri) {
blob_storage_scheme = "http";
dfs_storage_scheme = "http";
}
} else if (kv.first == "background_writes") {
ARROW_ASSIGN_OR_RAISE(background_writes,
::arrow::internal::ParseBoolean(kv.second));
} else {
return Status::Invalid(
"Unexpected query parameter in Azure Blob File System URI: '", kv.first, "'");
Expand Down Expand Up @@ -937,8 +941,8 @@ Status CommitBlockList(std::shared_ptr<Storage::Blobs::BlockBlobClient> block_bl
const std::vector<std::string>& block_ids,
const Blobs::CommitBlockListOptions& options) {
try {
// CommitBlockList puts all block_ids in the latest element. That means in the case of
// overlapping block_ids the newly staged block ids will always replace the
// CommitBlockList puts all block_ids in the latest element. That means in the case
// of overlapping block_ids the newly staged block ids will always replace the
// previously committed blocks.
// https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-list?tabs=microsoft-entra-id#request-body
block_blob_client->CommitBlockList(block_ids, options);
Expand All @@ -950,15 +954,43 @@ Status CommitBlockList(std::shared_ptr<Storage::Blobs::BlockBlobClient> block_bl
return Status::OK();
}

Status StageBlock(Blobs::BlockBlobClient* block_blob_client, const std::string& id,
Core::IO::MemoryBodyStream& content) {
try {
block_blob_client->StageBlock(id, content);
} catch (const Storage::StorageException& exception) {
return ExceptionToStatus(
exception, "StageBlock failed for '", block_blob_client->GetUrl(),
"' new_block_id: '", id,
"'. Staging new blocks is fundamental to streaming writes to blob storage.");
}

return Status::OK();
}

/// Writes will be buffered up to this size (in bytes) before actually uploading them.
static constexpr int64_t kBlockUploadSizeBytes = 10 * 1024 * 1024;
/// The maximum size of a block in Azure Blob (as per docs).
static constexpr int64_t kMaxBlockSizeBytes = 4UL * 1024 * 1024 * 1024;

/// This output stream, similar to other arrow OutputStreams, is not thread-safe.
class ObjectAppendStream final : public io::OutputStream {
private:
struct UploadState;

std::shared_ptr<ObjectAppendStream> Self() {
return std::dynamic_pointer_cast<ObjectAppendStream>(shared_from_this());
}

public:
ObjectAppendStream(std::shared_ptr<Blobs::BlockBlobClient> block_blob_client,
const io::IOContext& io_context, const AzureLocation& location,
const std::shared_ptr<const KeyValueMetadata>& metadata,
const AzureOptions& options)
: block_blob_client_(std::move(block_blob_client)),
io_context_(io_context),
location_(location) {
location_(location),
background_writes_(options.background_writes) {
if (metadata && metadata->size() != 0) {
ArrowMetadataToCommitBlockListOptions(metadata, commit_block_list_options_);
} else if (options.default_metadata && options.default_metadata->size() != 0) {
Expand Down Expand Up @@ -1008,10 +1040,13 @@ class ObjectAppendStream final : public io::OutputStream {
content_length_ = 0;
}
}

upload_state_ = std::make_shared<UploadState>();

if (content_length_ > 0) {
ARROW_ASSIGN_OR_RAISE(auto block_list, GetBlockList(block_blob_client_));
for (auto block : block_list.CommittedBlocks) {
block_ids_.push_back(block.Name);
upload_state_->block_ids.push_back(block.Name);
}
}
initialised_ = true;
Expand All @@ -1031,12 +1066,34 @@ class ObjectAppendStream final : public io::OutputStream {
if (closed_) {
return Status::OK();
}

if (current_block_) {
// Upload remaining buffer
RETURN_NOT_OK(AppendCurrentBlock());
}

RETURN_NOT_OK(Flush());
block_blob_client_ = nullptr;
closed_ = true;
return Status::OK();
}

Future<> CloseAsync() override {
if (closed_) {
return Status::OK();
}

if (current_block_) {
// Upload remaining buffer
RETURN_NOT_OK(AppendCurrentBlock());
}

return FlushAsync().Then([self = Self()]() {
self->block_blob_client_ = nullptr;
self->closed_ = true;
});
}

bool closed() const override { return closed_; }

Status CheckClosed(const char* action) const {
Expand All @@ -1052,11 +1109,11 @@ class ObjectAppendStream final : public io::OutputStream {
}

Status Write(const std::shared_ptr<Buffer>& buffer) override {
return DoAppend(buffer->data(), buffer->size(), buffer);
return DoWrite(buffer->data(), buffer->size(), buffer);
}

Status Write(const void* data, int64_t nbytes) override {
return DoAppend(data, nbytes);
return DoWrite(data, nbytes);
}

Status Flush() override {
Expand All @@ -1066,20 +1123,111 @@ class ObjectAppendStream final : public io::OutputStream {
// flush. This also avoids some unhandled errors when flushing in the destructor.
return Status::OK();
}
return CommitBlockList(block_blob_client_, block_ids_, commit_block_list_options_);

Future<> pending_blocks_completed;
{
std::unique_lock<std::mutex> lock(upload_state_->mutex);
pending_blocks_completed = upload_state_->pending_blocks_completed;
}

RETURN_NOT_OK(pending_blocks_completed.status());
pitrou marked this conversation as resolved.
Show resolved Hide resolved
std::unique_lock<std::mutex> lock(upload_state_->mutex);
return CommitBlockList(block_blob_client_, upload_state_->block_ids,
OliLay marked this conversation as resolved.
Show resolved Hide resolved
commit_block_list_options_);
}

private:
Status DoAppend(const void* data, int64_t nbytes,
std::shared_ptr<Buffer> owned_buffer = nullptr) {
RETURN_NOT_OK(CheckClosed("append"));
auto append_data = reinterpret_cast<const uint8_t*>(data);
Core::IO::MemoryBodyStream block_content(append_data, nbytes);
if (block_content.Length() == 0) {
Future<> FlushAsync() {
RETURN_NOT_OK(CheckClosed("flush async"));
if (!initialised_) {
// If the stream has not been successfully initialized then there is nothing to
// flush. This also avoids some unhandled errors when flushing in the destructor.
return Status::OK();
}

const auto n_block_ids = block_ids_.size();
Future<> pending_blocks_completed;
{
std::unique_lock<std::mutex> lock(upload_state_->mutex);
pending_blocks_completed = upload_state_->pending_blocks_completed;
}

return pending_blocks_completed.Then([self = Self()] {
std::unique_lock<std::mutex> lock(self->upload_state_->mutex);
return CommitBlockList(self->block_blob_client_, self->upload_state_->block_ids,
self->commit_block_list_options_);
});
}

private:
Status AppendCurrentBlock() {
ARROW_ASSIGN_OR_RAISE(auto buf, current_block_->Finish());
current_block_.reset();
current_block_size_ = 0;
return AppendBlock(buf);
}

Status DoWrite(const void* data, int64_t nbytes,
std::shared_ptr<Buffer> owned_buffer = nullptr) {
OliLay marked this conversation as resolved.
Show resolved Hide resolved
if (closed_) {
return Status::Invalid("Operation on closed stream");
}

const auto* data_ptr = reinterpret_cast<const int8_t*>(data);
auto advance_ptr = [this, &data_ptr, &nbytes](const int64_t offset) {
data_ptr += offset;
nbytes -= offset;
pos_ += offset;
content_length_ += offset;
OliLay marked this conversation as resolved.
Show resolved Hide resolved
};

// Handle case where we have some bytes buffered from prior calls.
if (current_block_size_ > 0) {
// Try to fill current buffer
const int64_t to_copy =
std::min(nbytes, kBlockUploadSizeBytes - current_block_size_);
RETURN_NOT_OK(current_block_->Write(data_ptr, to_copy));
current_block_size_ += to_copy;
advance_ptr(to_copy);

// If buffer isn't full, break
if (current_block_size_ < kBlockUploadSizeBytes) {
return Status::OK();
}

// Upload current buffer
RETURN_NOT_OK(AppendCurrentBlock());
}

// We can upload chunks without copying them into a buffer
while (nbytes >= kBlockUploadSizeBytes) {
const auto upload_size = std::min(nbytes, kMaxBlockSizeBytes);
RETURN_NOT_OK(AppendBlock(data_ptr, upload_size));
advance_ptr(upload_size);
}

// Buffer remaining bytes
if (nbytes > 0) {
current_block_size_ = nbytes;

if (current_block_ == nullptr) {
ARROW_ASSIGN_OR_RAISE(
current_block_,
io::BufferOutputStream::Create(kBlockUploadSizeBytes, io_context_.pool()));
} else {
// Re-use the allocation from before.
RETURN_NOT_OK(current_block_->Reset(kBlockUploadSizeBytes, io_context_.pool()));
}

RETURN_NOT_OK(current_block_->Write(data_ptr, current_block_size_));
pos_ += current_block_size_;
content_length_ += current_block_size_;
}

return Status::OK();
}

std::string CreateBlock() {
std::unique_lock<std::mutex> lock(upload_state_->mutex);
const auto n_block_ids = upload_state_->block_ids.size();

// New block ID must always be distinct from the existing block IDs. Otherwise we
// will accidentally replace the content of existing blocks, causing corruption.
Expand All @@ -1093,36 +1241,106 @@ class ObjectAppendStream final : public io::OutputStream {
new_block_id.insert(0, required_padding_digits, '0');
// There is a small risk when appending to a blob created by another client that
// `new_block_id` may overlapping with an existing block id. Adding the `-arrow`
// suffix significantly reduces the risk, but does not 100% eliminate it. For example
// if the blob was previously created with one block, with id `00001-arrow` then the
// next block we append will conflict with that, and cause corruption.
// suffix significantly reduces the risk, but does not 100% eliminate it. For
// example if the blob was previously created with one block, with id `00001-arrow`
// then the next block we append will conflict with that, and cause corruption.
new_block_id += "-arrow";
new_block_id = Core::Convert::Base64Encode(
std::vector<uint8_t>(new_block_id.begin(), new_block_id.end()));

try {
block_blob_client_->StageBlock(new_block_id, block_content);
} catch (const Storage::StorageException& exception) {
return ExceptionToStatus(
exception, "StageBlock failed for '", block_blob_client_->GetUrl(),
"' new_block_id: '", new_block_id,
"'. Staging new blocks is fundamental to streaming writes to blob storage.");
upload_state_->block_ids.push_back(new_block_id);

// We only use the future if we have background writes enabled. Without background
// writes the future is initialized as finished and not mutated any more.
if (background_writes_ && upload_state_->blocks_in_progress++ == 0) {
upload_state_->pending_blocks_completed = Future<>::Make();
}
block_ids_.push_back(new_block_id);
pos_ += nbytes;
content_length_ += nbytes;

return new_block_id;
}

Status AppendBlock(const void* data, int64_t nbytes,
std::shared_ptr<Buffer> owned_buffer = nullptr) {
RETURN_NOT_OK(CheckClosed("append"));

if (nbytes == 0) {
return Status::OK();
}

const auto block_id = CreateBlock();

if (background_writes_) {
if (owned_buffer == nullptr) {
ARROW_ASSIGN_OR_RAISE(owned_buffer, AllocateBuffer(nbytes, io_context_.pool()));
memcpy(owned_buffer->mutable_data(), data, nbytes);
} else {
DCHECK_EQ(data, owned_buffer->data());
DCHECK_EQ(nbytes, owned_buffer->size());
}

// The closure keeps the buffer and the upload state alive
auto deferred = [owned_buffer, block_id, block_blob_client = block_blob_client_,
state = upload_state_]() mutable -> Status {
Core::IO::MemoryBodyStream block_content(owned_buffer->data(),
owned_buffer->size());

auto status = StageBlock(block_blob_client.get(), block_id, block_content);
HandleUploadOutcome(state, status);
return Status::OK();
};
RETURN_NOT_OK(io::internal::SubmitIO(io_context_, std::move(deferred)));
} else {
auto append_data = reinterpret_cast<const uint8_t*>(data);
Core::IO::MemoryBodyStream block_content(append_data, nbytes);

RETURN_NOT_OK(StageBlock(block_blob_client_.get(), block_id, block_content));
}

return Status::OK();
}

Status AppendBlock(std::shared_ptr<Buffer> buffer) {
return AppendBlock(buffer->data(), buffer->size(), buffer);
}

static void HandleUploadOutcome(const std::shared_ptr<UploadState>& state,
const Status& status) {
std::unique_lock<std::mutex> lock(state->mutex);
if (!status.ok()) {
state->status &= status;
}
// Notify completion
if (--state->blocks_in_progress == 0) {
auto fut = state->pending_blocks_completed;
lock.unlock();
fut.MarkFinished(state->status);
}
}

std::shared_ptr<Blobs::BlockBlobClient> block_blob_client_;
const io::IOContext io_context_;
const AzureLocation location_;
const bool background_writes_;
int64_t content_length_ = kNoSize;

std::shared_ptr<io::BufferOutputStream> current_block_;
int64_t current_block_size_ = 0;

bool closed_ = false;
bool initialised_ = false;
int64_t pos_ = 0;
std::vector<std::string> block_ids_;

// This struct is kept alive through background writes to avoid problems
// in the completion handler.
struct UploadState {
std::mutex mutex;
std::vector<std::string> block_ids;
int64_t blocks_in_progress = 0;
Status status;
Future<> pending_blocks_completed = Future<>::MakeFinished(Status::OK());
};
std::shared_ptr<UploadState> upload_state_;

Blobs::CommitBlockListOptions commit_block_list_options_;
};

Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/filesystem/azurefs.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ struct ARROW_EXPORT AzureOptions {
/// This will be ignored if non-empty metadata is passed to OpenOutputStream.
std::shared_ptr<const KeyValueMetadata> default_metadata;

/// Whether OutputStream writes will be issued in the background, without blocking.
bool background_writes = true;
felipecrv marked this conversation as resolved.
Show resolved Hide resolved

private:
enum class CredentialKind {
kDefault,
Expand Down
Loading
Loading