Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

snapshot: Transfer files concurrently to speed up snapshot transfer #482

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 135 additions & 46 deletions src/braft/snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
// Zheng,Pengfei([email protected])
// Xiong,Kai([email protected])

#include <gflags/gflags.h>
#include <butil/time.h>
#include <butil/string_printf.h> // butil::string_appendf
#include <brpc/uri.h>
Expand All @@ -34,6 +35,13 @@

namespace braft {

DEFINE_int32(raft_max_get_file_request_concurrency, 64,
"Maximum number of concurrent GetFileRequests which will be "
"in-flight when copying a snapshot from a "
"remote peer to local.");
BRPC_VALIDATE_GFLAG(raft_max_get_file_request_concurrency,
brpc::PositiveInteger);

const char* LocalSnapshotStorage::_s_temp_path = "temp";

LocalSnapshotMetaTable::LocalSnapshotMetaTable() {}
Expand Down Expand Up @@ -777,8 +785,30 @@ void LocalSnapshotCopier::copy() {
}
std::vector<std::string> files;
_remote_snapshot.list_files(&files);
for (size_t i = 0; i < files.size() && ok(); ++i) {
copy_file(files[i]);

// Do the copy in chunks. We don't want to submit 1 million concurrent
// GetFileRequests to the remote
// server. This is configurable via
// raft_max_get_file_request_concurrency. Making a copy of this onto the
// stack in case it changes while this loop is running.
std::span<std::string> files_span{files};
std::size_t chunk_size = FLAGS_raft_max_get_file_request_concurrency;
for (std::size_t i = 0; i < files_span.size(); i += chunk_size) {
{
BAIDU_SCOPED_LOCK(_mutex);
if (_cancelled) {
break;
}
}

if (!ok()) {
break;
}

auto remaining = files_span.size() - i;
auto current_chunk_size = std::min(chunk_size, remaining);
auto chunk = files_span.subspan(i, current_chunk_size);
copy_files(chunk);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy 64(default value) files first, and then the next 64 files?
It seems not a good concurrency strategy.

Copy link
Author

@ambroff ambroff Feb 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right it is not ideal, but it is still a dramatic improvement and would have prevented a production incident recently.

I can see if there is an easy way to try to send requests as others complete so we don't do it in chunks. I'm worried that will require more fundamental changes. You can't just call Session::join() because you don't know which request will finish first. You could repeatedly poll Session::status() but that would be a waste of CPU. I think a callback / continuation would have to be added to Session to do this properly.

Copy link
Author

@ambroff ambroff Feb 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess another alternative is to just send all of them concurrently, but enable the snapshot throttle. But that could break anyone who doesn't use the snapshot throttle.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got you. If RemoteFileCopier::Session has such routine as on_finish we can have a better solution.
But if the files have the same size (roughly), we can assume that the files in one chunk would be finished in the same time.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have started making this change but probably won’t finish it until next week.

}
} while (0);
if (!ok() && _writer && _writer->ok()) {
Expand Down Expand Up @@ -949,65 +979,124 @@ void LocalSnapshotCopier::filter() {
}
}

void LocalSnapshotCopier::copy_file(const std::string& filename) {
if (_writer->get_file_meta(filename, NULL) == 0) {
LOG(INFO) << "Skipped downloading " << filename
<< " path: " << _writer->get_path();
return;
void LocalSnapshotCopier::copy_files(std::span<std::string const> filenames) {
// We start a concurrent file copy session for each file we will download and then wait for all of them to complete or
// fail. This struct contains the context for each file in the snapshot, which is kept in the vector below.
struct RemoteFileContext {
std::string filename;
LocalFileMeta meta;
scoped_refptr<RemoteFileCopier::Session> session;

RemoteFileContext(std::string filename, LocalFileMeta meta, scoped_refptr<RemoteFileCopier::Session> session)
: filename{std::move(filename)}, meta{std::move(meta)}, session{std::move(session)} {}
};

std::vector<RemoteFileContext> sessions;
sessions.reserve(filenames.size());

// Create a copy session for each input file.
{
std::unique_lock lck{_mutex};

if (_cancelled) {
set_error(ECANCELED, "%s", berror(ECANCELED));
return;
}
std::string file_path = _writer->get_path() + '/' + filename;
butil::FilePath sub_path(filename);
if (sub_path != sub_path.DirName() && sub_path.DirName().value() != ".") {

for (const auto& filename : filenames) {
if (_writer->get_file_meta(filename, nullptr) == 0) {
LOG(INFO) << "Skipped downloading " << filename << " path: " << _writer->get_path();
continue;
}

std::string file_path = _writer->get_path() + '/' + filename;
butil::FilePath sub_path(filename);
if (sub_path != sub_path.DirName() && sub_path.DirName().value() != ".") {
butil::File::Error e;
bool rc = false;
if (FLAGS_raft_create_parent_directories) {
butil::FilePath sub_dir =
butil::FilePath(_writer->get_path()).Append(sub_path.DirName());
rc = _fs->create_directory(sub_dir.value(), &e, true);
butil::FilePath sub_dir = butil::FilePath(_writer->get_path()).Append(sub_path.DirName());
rc = _fs->create_directory(sub_dir.value(), &e, true);
} else {
rc = create_sub_directory(
_writer->get_path(), sub_path.DirName().value(), _fs, &e);
rc = create_sub_directory(_writer->get_path(), sub_path.DirName().value(), _fs, &e);
}
if (!rc) {
LOG(ERROR) << "Fail to create directory for " << file_path
<< " : " << butil::File::ErrorToString(e);
set_error(file_error_to_os_error(e),
"Fail to create directory");
LOG(ERROR) << "Fail to create directory for " << file_path << " : " << butil::File::ErrorToString(e);
set_error(file_error_to_os_error(e), "Fail to create directory");
}
}
LocalFileMeta meta;
_remote_snapshot.get_file_meta(filename, &meta);
std::unique_lock<raft_mutex_t> lck(_mutex);
if (_cancelled) {
set_error(ECANCELED, "%s", berror(ECANCELED));
return;
}
scoped_refptr<RemoteFileCopier::Session> session
= _copier.start_to_copy_to_file(filename, file_path, NULL);
if (session == NULL) {
LOG(WARNING) << "Fail to copy " << filename
<< " path: " << _writer->get_path();
}

LocalFileMeta meta;
_remote_snapshot.get_file_meta(filename, &meta);

scoped_refptr<RemoteFileCopier::Session> session = _copier.start_to_copy_to_file(filename, file_path, nullptr);
if (session == nullptr) {
LOG(WARNING) << "Fail to copy " << filename << " path: " << _writer->get_path();
set_error(-1, "Fail to copy %s", filename.c_str());
return;
}

sessions.emplace_back(filename, std::move(meta), std::move(session));
}
_cur_session = session.get();
lck.unlock();
session->join();
lck.lock();
_cur_session = NULL;
lck.unlock();
if (!session->status().ok()) {
set_error(session->status().error_code(), session->status().error_cstr());
return;
}

bool failed = false;

// Now wait for each concurrent session to complete.
for (const auto& remote_file_context : sessions) {
if (failed) {
remote_file_context.session->cancel();
remote_file_context.session->join();
}
if (_writer->add_file(filename, &meta) != 0) {
set_error(EIO, "Fail to add file to writer");
return;

// If LocalSnapshotCopier::cancel() has been called, just cancel the remaining items.
{
std::unique_lock lck{_mutex};
if (_cancelled) {
remote_file_context.session->cancel();
lck.unlock();
remote_file_context.session->join();
continue;
}
}

{
std::unique_lock lck{_mutex};
_cur_session = remote_file_context.session.get();
}

remote_file_context.session->join();

{
std::unique_lock lck{_mutex};
_cur_session = nullptr;
}

// If LocalSnapshotCopier::cancel() has been called, just continue cancelling the remaining ones.
if (_cancelled) {
continue;
}

if (!remote_file_context.session->status().ok()) {
set_error(remote_file_context.session->status().error_code(), "%s",
remote_file_context.session->status().error_cstr());
failed = true;
continue;
}

if (_writer->add_file(remote_file_context.filename, &remote_file_context.meta) != 0) {
set_error(EIO, "Fail to add file to writer");
failed = true;
continue;
}
}

if (!failed) {
if (_writer->sync() != 0) {
set_error(EIO, "Fail to sync writer");
return;
set_error(EIO, "Fail to sync writer");
return;
}
}
}

void LocalSnapshotCopier::start() {
Expand Down
2 changes: 1 addition & 1 deletion src/braft/snapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ friend class LocalSnapshotStorage;
int filter_before_copy(LocalSnapshotWriter* writer,
SnapshotReader* last_snapshot);
void filter();
void copy_file(const std::string& filename);
void copy_files(std::span<std::string const> filenames);

raft_mutex_t _mutex;
bthread_t _tid;
Expand Down