From 4fe88ddd39107a67c10f2beb3207883be1830586 Mon Sep 17 00:00:00 2001 From: Kyle Ambroff-Kao Date: Thu, 6 Feb 2025 02:55:22 -0800 Subject: [PATCH] snapshot: Transfer files concurrently to speed up snapshot transfer Currently snapshot transfer happens by sending GetFileRequest for every file known to be in the remote snapshot. This happens sequentially for each file. The only real configurations which allow tuning the throughput of this transfer are the throttle which can be set when initializing the braft::Node, or the runtime configuration raft_max_byte_count_per_rpc which determines how many chunks a large file will be broken into during the transfer. The default is 128KiB, so a 1MiB file will be transfered in about 8 GetFileRequests. This works great for snapshots which have a handful of large files. But if a snapshot has hundreds or thousands of small files then transfer of these snapshots can be pretty slow. I locally create a snapshot with 100k files on my development machine, for example, it can take up to 30 minutes to transfer all of those files in that snapshot. Even though the latency per transfer is low, there is a full round trip plus a flush of __raft_meta on the receiving end for each file. This patch adds concurrency to these transfers. When a remote snapshot is transferred locally, up to raft_max_get_file_request_concurrency GetFileRequests will be sent concurrently. This defaults to 64. With this patch, the 100k file snapshot consistently transfers in under 10 seconds on my development machine. This should resolve https://github.com/baidu/braft/issues/362. --- src/braft/snapshot.cpp | 181 ++++++++++++++++++++++++++++++----------- src/braft/snapshot.h | 2 +- 2 files changed, 136 insertions(+), 47 deletions(-) diff --git a/src/braft/snapshot.cpp b/src/braft/snapshot.cpp index 7a97132a..2528cb20 100644 --- a/src/braft/snapshot.cpp +++ b/src/braft/snapshot.cpp @@ -17,6 +17,7 @@ // Zheng,Pengfei(zhengpengfei@baidu.com) // Xiong,Kai(xiongkai@baidu.com) +#include #include #include // butil::string_appendf #include @@ -34,6 +35,13 @@ namespace braft { +DEFINE_int32(raft_max_get_file_request_concurrency, 64, + "Maximum number of concurrent GetFileRequests which will be " + "in-flight when copying a snapshot from a " + "remote peer to local."); +BRPC_VALIDATE_GFLAG(raft_max_get_file_request_concurrency, + brpc::PositiveInteger); + const char* LocalSnapshotStorage::_s_temp_path = "temp"; LocalSnapshotMetaTable::LocalSnapshotMetaTable() {} @@ -777,8 +785,30 @@ void LocalSnapshotCopier::copy() { } std::vector files; _remote_snapshot.list_files(&files); - for (size_t i = 0; i < files.size() && ok(); ++i) { - copy_file(files[i]); + + // Do the copy in chunks. We don't want to submit 1 million concurrent + // GetFileRequests to the remote + // server. This is configurable via + // raft_max_get_file_request_concurrency. Making a copy of this onto the + // stack in case it changes while this loop is running. + std::span files_span{files}; + std::size_t chunk_size = FLAGS_raft_max_get_file_request_concurrency; + for (std::size_t i = 0; i < files_span.size(); i += chunk_size) { + { + BAIDU_SCOPED_LOCK(_mutex); + if (_cancelled) { + break; + } + } + + if (!ok()) { + break; + } + + auto remaining = files_span.size() - i; + auto current_chunk_size = std::min(chunk_size, remaining); + auto chunk = files_span.subspan(i, current_chunk_size); + copy_files(chunk); } } while (0); if (!ok() && _writer && _writer->ok()) { @@ -949,65 +979,124 @@ void LocalSnapshotCopier::filter() { } } -void LocalSnapshotCopier::copy_file(const std::string& filename) { - if (_writer->get_file_meta(filename, NULL) == 0) { - LOG(INFO) << "Skipped downloading " << filename - << " path: " << _writer->get_path(); - return; +void LocalSnapshotCopier::copy_files(std::span filenames) { + // We start a concurrent file copy session for each file we will download and then wait for all of them to complete or + // fail. This struct contains the context for each file in the snapshot, which is kept in the vector below. + struct RemoteFileContext { + std::string filename; + LocalFileMeta meta; + scoped_refptr session; + + RemoteFileContext(std::string filename, LocalFileMeta meta, scoped_refptr session) + : filename{std::move(filename)}, meta{std::move(meta)}, session{std::move(session)} {} + }; + + std::vector sessions; + sessions.reserve(filenames.size()); + + // Create a copy session for each input file. + { + std::unique_lock lck{_mutex}; + + if (_cancelled) { + set_error(ECANCELED, "%s", berror(ECANCELED)); + return; } - std::string file_path = _writer->get_path() + '/' + filename; - butil::FilePath sub_path(filename); - if (sub_path != sub_path.DirName() && sub_path.DirName().value() != ".") { + + for (const auto& filename : filenames) { + if (_writer->get_file_meta(filename, nullptr) == 0) { + LOG(INFO) << "Skipped downloading " << filename << " path: " << _writer->get_path(); + continue; + } + + std::string file_path = _writer->get_path() + '/' + filename; + butil::FilePath sub_path(filename); + if (sub_path != sub_path.DirName() && sub_path.DirName().value() != ".") { butil::File::Error e; bool rc = false; if (FLAGS_raft_create_parent_directories) { - butil::FilePath sub_dir = - butil::FilePath(_writer->get_path()).Append(sub_path.DirName()); - rc = _fs->create_directory(sub_dir.value(), &e, true); + butil::FilePath sub_dir = butil::FilePath(_writer->get_path()).Append(sub_path.DirName()); + rc = _fs->create_directory(sub_dir.value(), &e, true); } else { - rc = create_sub_directory( - _writer->get_path(), sub_path.DirName().value(), _fs, &e); + rc = create_sub_directory(_writer->get_path(), sub_path.DirName().value(), _fs, &e); } if (!rc) { - LOG(ERROR) << "Fail to create directory for " << file_path - << " : " << butil::File::ErrorToString(e); - set_error(file_error_to_os_error(e), - "Fail to create directory"); + LOG(ERROR) << "Fail to create directory for " << file_path << " : " << butil::File::ErrorToString(e); + set_error(file_error_to_os_error(e), "Fail to create directory"); } - } - LocalFileMeta meta; - _remote_snapshot.get_file_meta(filename, &meta); - std::unique_lock lck(_mutex); - if (_cancelled) { - set_error(ECANCELED, "%s", berror(ECANCELED)); - return; - } - scoped_refptr session - = _copier.start_to_copy_to_file(filename, file_path, NULL); - if (session == NULL) { - LOG(WARNING) << "Fail to copy " << filename - << " path: " << _writer->get_path(); + } + + LocalFileMeta meta; + _remote_snapshot.get_file_meta(filename, &meta); + + scoped_refptr session = _copier.start_to_copy_to_file(filename, file_path, nullptr); + if (session == nullptr) { + LOG(WARNING) << "Fail to copy " << filename << " path: " << _writer->get_path(); set_error(-1, "Fail to copy %s", filename.c_str()); return; + } + + sessions.emplace_back(filename, std::move(meta), std::move(session)); } - _cur_session = session.get(); - lck.unlock(); - session->join(); - lck.lock(); - _cur_session = NULL; - lck.unlock(); - if (!session->status().ok()) { - set_error(session->status().error_code(), session->status().error_cstr()); - return; + } + + bool failed = false; + + // Now wait for each concurrent session to complete. + for (const auto& remote_file_context : sessions) { + if (failed) { + remote_file_context.session->cancel(); + remote_file_context.session->join(); } - if (_writer->add_file(filename, &meta) != 0) { - set_error(EIO, "Fail to add file to writer"); - return; + + // If LocalSnapshotCopier::cancel() has been called, just cancel the remaining items. + { + std::unique_lock lck{_mutex}; + if (_cancelled) { + remote_file_context.session->cancel(); + lck.unlock(); + remote_file_context.session->join(); + continue; + } } + + { + std::unique_lock lck{_mutex}; + _cur_session = remote_file_context.session.get(); + } + + remote_file_context.session->join(); + + { + std::unique_lock lck{_mutex}; + _cur_session = nullptr; + } + + // If LocalSnapshotCopier::cancel() has been called, just continue cancelling the remaining ones. + if (_cancelled) { + continue; + } + + if (!remote_file_context.session->status().ok()) { + set_error(remote_file_context.session->status().error_code(), "%s", + remote_file_context.session->status().error_cstr()); + failed = true; + continue; + } + + if (_writer->add_file(remote_file_context.filename, &remote_file_context.meta) != 0) { + set_error(EIO, "Fail to add file to writer"); + failed = true; + continue; + } + } + + if (!failed) { if (_writer->sync() != 0) { - set_error(EIO, "Fail to sync writer"); - return; + set_error(EIO, "Fail to sync writer"); + return; } + } } void LocalSnapshotCopier::start() { diff --git a/src/braft/snapshot.h b/src/braft/snapshot.h index 8d617d92..6c8df9c6 100644 --- a/src/braft/snapshot.h +++ b/src/braft/snapshot.h @@ -161,7 +161,7 @@ friend class LocalSnapshotStorage; int filter_before_copy(LocalSnapshotWriter* writer, SnapshotReader* last_snapshot); void filter(); - void copy_file(const std::string& filename); + void copy_files(std::span filenames); raft_mutex_t _mutex; bthread_t _tid;