diff --git a/src/braft/snapshot.cpp b/src/braft/snapshot.cpp index 7a97132a..2528cb20 100644 --- a/src/braft/snapshot.cpp +++ b/src/braft/snapshot.cpp @@ -17,6 +17,7 @@ // Zheng,Pengfei(zhengpengfei@baidu.com) // Xiong,Kai(xiongkai@baidu.com) +#include #include #include // butil::string_appendf #include @@ -34,6 +35,13 @@ namespace braft { +DEFINE_int32(raft_max_get_file_request_concurrency, 64, + "Maximum number of concurrent GetFileRequests which will be " + "in-flight when copying a snapshot from a " + "remote peer to local."); +BRPC_VALIDATE_GFLAG(raft_max_get_file_request_concurrency, + brpc::PositiveInteger); + const char* LocalSnapshotStorage::_s_temp_path = "temp"; LocalSnapshotMetaTable::LocalSnapshotMetaTable() {} @@ -777,8 +785,30 @@ void LocalSnapshotCopier::copy() { } std::vector files; _remote_snapshot.list_files(&files); - for (size_t i = 0; i < files.size() && ok(); ++i) { - copy_file(files[i]); + + // Do the copy in chunks. We don't want to submit 1 million concurrent + // GetFileRequests to the remote + // server. This is configurable via + // raft_max_get_file_request_concurrency. Making a copy of this onto the + // stack in case it changes while this loop is running. + std::span files_span{files}; + std::size_t chunk_size = FLAGS_raft_max_get_file_request_concurrency; + for (std::size_t i = 0; i < files_span.size(); i += chunk_size) { + { + BAIDU_SCOPED_LOCK(_mutex); + if (_cancelled) { + break; + } + } + + if (!ok()) { + break; + } + + auto remaining = files_span.size() - i; + auto current_chunk_size = std::min(chunk_size, remaining); + auto chunk = files_span.subspan(i, current_chunk_size); + copy_files(chunk); } } while (0); if (!ok() && _writer && _writer->ok()) { @@ -949,65 +979,124 @@ void LocalSnapshotCopier::filter() { } } -void LocalSnapshotCopier::copy_file(const std::string& filename) { - if (_writer->get_file_meta(filename, NULL) == 0) { - LOG(INFO) << "Skipped downloading " << filename - << " path: " << _writer->get_path(); - return; +void LocalSnapshotCopier::copy_files(std::span filenames) { + // We start a concurrent file copy session for each file we will download and then wait for all of them to complete or + // fail. This struct contains the context for each file in the snapshot, which is kept in the vector below. + struct RemoteFileContext { + std::string filename; + LocalFileMeta meta; + scoped_refptr session; + + RemoteFileContext(std::string filename, LocalFileMeta meta, scoped_refptr session) + : filename{std::move(filename)}, meta{std::move(meta)}, session{std::move(session)} {} + }; + + std::vector sessions; + sessions.reserve(filenames.size()); + + // Create a copy session for each input file. + { + std::unique_lock lck{_mutex}; + + if (_cancelled) { + set_error(ECANCELED, "%s", berror(ECANCELED)); + return; } - std::string file_path = _writer->get_path() + '/' + filename; - butil::FilePath sub_path(filename); - if (sub_path != sub_path.DirName() && sub_path.DirName().value() != ".") { + + for (const auto& filename : filenames) { + if (_writer->get_file_meta(filename, nullptr) == 0) { + LOG(INFO) << "Skipped downloading " << filename << " path: " << _writer->get_path(); + continue; + } + + std::string file_path = _writer->get_path() + '/' + filename; + butil::FilePath sub_path(filename); + if (sub_path != sub_path.DirName() && sub_path.DirName().value() != ".") { butil::File::Error e; bool rc = false; if (FLAGS_raft_create_parent_directories) { - butil::FilePath sub_dir = - butil::FilePath(_writer->get_path()).Append(sub_path.DirName()); - rc = _fs->create_directory(sub_dir.value(), &e, true); + butil::FilePath sub_dir = butil::FilePath(_writer->get_path()).Append(sub_path.DirName()); + rc = _fs->create_directory(sub_dir.value(), &e, true); } else { - rc = create_sub_directory( - _writer->get_path(), sub_path.DirName().value(), _fs, &e); + rc = create_sub_directory(_writer->get_path(), sub_path.DirName().value(), _fs, &e); } if (!rc) { - LOG(ERROR) << "Fail to create directory for " << file_path - << " : " << butil::File::ErrorToString(e); - set_error(file_error_to_os_error(e), - "Fail to create directory"); + LOG(ERROR) << "Fail to create directory for " << file_path << " : " << butil::File::ErrorToString(e); + set_error(file_error_to_os_error(e), "Fail to create directory"); } - } - LocalFileMeta meta; - _remote_snapshot.get_file_meta(filename, &meta); - std::unique_lock lck(_mutex); - if (_cancelled) { - set_error(ECANCELED, "%s", berror(ECANCELED)); - return; - } - scoped_refptr session - = _copier.start_to_copy_to_file(filename, file_path, NULL); - if (session == NULL) { - LOG(WARNING) << "Fail to copy " << filename - << " path: " << _writer->get_path(); + } + + LocalFileMeta meta; + _remote_snapshot.get_file_meta(filename, &meta); + + scoped_refptr session = _copier.start_to_copy_to_file(filename, file_path, nullptr); + if (session == nullptr) { + LOG(WARNING) << "Fail to copy " << filename << " path: " << _writer->get_path(); set_error(-1, "Fail to copy %s", filename.c_str()); return; + } + + sessions.emplace_back(filename, std::move(meta), std::move(session)); } - _cur_session = session.get(); - lck.unlock(); - session->join(); - lck.lock(); - _cur_session = NULL; - lck.unlock(); - if (!session->status().ok()) { - set_error(session->status().error_code(), session->status().error_cstr()); - return; + } + + bool failed = false; + + // Now wait for each concurrent session to complete. + for (const auto& remote_file_context : sessions) { + if (failed) { + remote_file_context.session->cancel(); + remote_file_context.session->join(); } - if (_writer->add_file(filename, &meta) != 0) { - set_error(EIO, "Fail to add file to writer"); - return; + + // If LocalSnapshotCopier::cancel() has been called, just cancel the remaining items. + { + std::unique_lock lck{_mutex}; + if (_cancelled) { + remote_file_context.session->cancel(); + lck.unlock(); + remote_file_context.session->join(); + continue; + } } + + { + std::unique_lock lck{_mutex}; + _cur_session = remote_file_context.session.get(); + } + + remote_file_context.session->join(); + + { + std::unique_lock lck{_mutex}; + _cur_session = nullptr; + } + + // If LocalSnapshotCopier::cancel() has been called, just continue cancelling the remaining ones. + if (_cancelled) { + continue; + } + + if (!remote_file_context.session->status().ok()) { + set_error(remote_file_context.session->status().error_code(), "%s", + remote_file_context.session->status().error_cstr()); + failed = true; + continue; + } + + if (_writer->add_file(remote_file_context.filename, &remote_file_context.meta) != 0) { + set_error(EIO, "Fail to add file to writer"); + failed = true; + continue; + } + } + + if (!failed) { if (_writer->sync() != 0) { - set_error(EIO, "Fail to sync writer"); - return; + set_error(EIO, "Fail to sync writer"); + return; } + } } void LocalSnapshotCopier::start() { diff --git a/src/braft/snapshot.h b/src/braft/snapshot.h index 8d617d92..6c8df9c6 100644 --- a/src/braft/snapshot.h +++ b/src/braft/snapshot.h @@ -161,7 +161,7 @@ friend class LocalSnapshotStorage; int filter_before_copy(LocalSnapshotWriter* writer, SnapshotReader* last_snapshot); void filter(); - void copy_file(const std::string& filename); + void copy_files(std::span filenames); raft_mutex_t _mutex; bthread_t _tid;