-
Notifications
You must be signed in to change notification settings - Fork 892
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
snapshot: Transfer files concurrently to speed up snapshot transfer #482
Open
ambroff
wants to merge
1
commit into
baidu:master
Choose a base branch
from
ambroff:concurrent-snapshot-transfer
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ | |
// Zheng,Pengfei([email protected]) | ||
// Xiong,Kai([email protected]) | ||
|
||
#include <gflags/gflags.h> | ||
#include <butil/time.h> | ||
#include <butil/string_printf.h> // butil::string_appendf | ||
#include <brpc/uri.h> | ||
|
@@ -34,6 +35,13 @@ | |
|
||
namespace braft { | ||
|
||
DEFINE_int32(raft_max_get_file_request_concurrency, 64, | ||
"Maximum number of concurrent GetFileRequests which will be " | ||
"in-flight when copying a snapshot from a " | ||
"remote peer to local."); | ||
BRPC_VALIDATE_GFLAG(raft_max_get_file_request_concurrency, | ||
brpc::PositiveInteger); | ||
|
||
const char* LocalSnapshotStorage::_s_temp_path = "temp"; | ||
|
||
LocalSnapshotMetaTable::LocalSnapshotMetaTable() {} | ||
|
@@ -777,8 +785,30 @@ void LocalSnapshotCopier::copy() { | |
} | ||
std::vector<std::string> files; | ||
_remote_snapshot.list_files(&files); | ||
for (size_t i = 0; i < files.size() && ok(); ++i) { | ||
copy_file(files[i]); | ||
|
||
// Do the copy in chunks. We don't want to submit 1 million concurrent | ||
// GetFileRequests to the remote | ||
// server. This is configurable via | ||
// raft_max_get_file_request_concurrency. Making a copy of this onto the | ||
// stack in case it changes while this loop is running. | ||
std::span<std::string> files_span{files}; | ||
std::size_t chunk_size = FLAGS_raft_max_get_file_request_concurrency; | ||
for (std::size_t i = 0; i < files_span.size(); i += chunk_size) { | ||
{ | ||
BAIDU_SCOPED_LOCK(_mutex); | ||
if (_cancelled) { | ||
break; | ||
} | ||
} | ||
|
||
if (!ok()) { | ||
break; | ||
} | ||
|
||
auto remaining = files_span.size() - i; | ||
auto current_chunk_size = std::min(chunk_size, remaining); | ||
auto chunk = files_span.subspan(i, current_chunk_size); | ||
copy_files(chunk); | ||
} | ||
} while (0); | ||
if (!ok() && _writer && _writer->ok()) { | ||
|
@@ -949,65 +979,124 @@ void LocalSnapshotCopier::filter() { | |
} | ||
} | ||
|
||
void LocalSnapshotCopier::copy_file(const std::string& filename) { | ||
if (_writer->get_file_meta(filename, NULL) == 0) { | ||
LOG(INFO) << "Skipped downloading " << filename | ||
<< " path: " << _writer->get_path(); | ||
return; | ||
void LocalSnapshotCopier::copy_files(std::span<std::string const> filenames) { | ||
// We start a concurrent file copy session for each file we will download and then wait for all of them to complete or | ||
// fail. This struct contains the context for each file in the snapshot, which is kept in the vector below. | ||
struct RemoteFileContext { | ||
std::string filename; | ||
LocalFileMeta meta; | ||
scoped_refptr<RemoteFileCopier::Session> session; | ||
|
||
RemoteFileContext(std::string filename, LocalFileMeta meta, scoped_refptr<RemoteFileCopier::Session> session) | ||
: filename{std::move(filename)}, meta{std::move(meta)}, session{std::move(session)} {} | ||
}; | ||
|
||
std::vector<RemoteFileContext> sessions; | ||
sessions.reserve(filenames.size()); | ||
|
||
// Create a copy session for each input file. | ||
{ | ||
std::unique_lock lck{_mutex}; | ||
|
||
if (_cancelled) { | ||
set_error(ECANCELED, "%s", berror(ECANCELED)); | ||
return; | ||
} | ||
std::string file_path = _writer->get_path() + '/' + filename; | ||
butil::FilePath sub_path(filename); | ||
if (sub_path != sub_path.DirName() && sub_path.DirName().value() != ".") { | ||
|
||
for (const auto& filename : filenames) { | ||
if (_writer->get_file_meta(filename, nullptr) == 0) { | ||
LOG(INFO) << "Skipped downloading " << filename << " path: " << _writer->get_path(); | ||
continue; | ||
} | ||
|
||
std::string file_path = _writer->get_path() + '/' + filename; | ||
butil::FilePath sub_path(filename); | ||
if (sub_path != sub_path.DirName() && sub_path.DirName().value() != ".") { | ||
butil::File::Error e; | ||
bool rc = false; | ||
if (FLAGS_raft_create_parent_directories) { | ||
butil::FilePath sub_dir = | ||
butil::FilePath(_writer->get_path()).Append(sub_path.DirName()); | ||
rc = _fs->create_directory(sub_dir.value(), &e, true); | ||
butil::FilePath sub_dir = butil::FilePath(_writer->get_path()).Append(sub_path.DirName()); | ||
rc = _fs->create_directory(sub_dir.value(), &e, true); | ||
} else { | ||
rc = create_sub_directory( | ||
_writer->get_path(), sub_path.DirName().value(), _fs, &e); | ||
rc = create_sub_directory(_writer->get_path(), sub_path.DirName().value(), _fs, &e); | ||
} | ||
if (!rc) { | ||
LOG(ERROR) << "Fail to create directory for " << file_path | ||
<< " : " << butil::File::ErrorToString(e); | ||
set_error(file_error_to_os_error(e), | ||
"Fail to create directory"); | ||
LOG(ERROR) << "Fail to create directory for " << file_path << " : " << butil::File::ErrorToString(e); | ||
set_error(file_error_to_os_error(e), "Fail to create directory"); | ||
} | ||
} | ||
LocalFileMeta meta; | ||
_remote_snapshot.get_file_meta(filename, &meta); | ||
std::unique_lock<raft_mutex_t> lck(_mutex); | ||
if (_cancelled) { | ||
set_error(ECANCELED, "%s", berror(ECANCELED)); | ||
return; | ||
} | ||
scoped_refptr<RemoteFileCopier::Session> session | ||
= _copier.start_to_copy_to_file(filename, file_path, NULL); | ||
if (session == NULL) { | ||
LOG(WARNING) << "Fail to copy " << filename | ||
<< " path: " << _writer->get_path(); | ||
} | ||
|
||
LocalFileMeta meta; | ||
_remote_snapshot.get_file_meta(filename, &meta); | ||
|
||
scoped_refptr<RemoteFileCopier::Session> session = _copier.start_to_copy_to_file(filename, file_path, nullptr); | ||
if (session == nullptr) { | ||
LOG(WARNING) << "Fail to copy " << filename << " path: " << _writer->get_path(); | ||
set_error(-1, "Fail to copy %s", filename.c_str()); | ||
return; | ||
} | ||
|
||
sessions.emplace_back(filename, std::move(meta), std::move(session)); | ||
} | ||
_cur_session = session.get(); | ||
lck.unlock(); | ||
session->join(); | ||
lck.lock(); | ||
_cur_session = NULL; | ||
lck.unlock(); | ||
if (!session->status().ok()) { | ||
set_error(session->status().error_code(), session->status().error_cstr()); | ||
return; | ||
} | ||
|
||
bool failed = false; | ||
|
||
// Now wait for each concurrent session to complete. | ||
for (const auto& remote_file_context : sessions) { | ||
if (failed) { | ||
remote_file_context.session->cancel(); | ||
remote_file_context.session->join(); | ||
} | ||
if (_writer->add_file(filename, &meta) != 0) { | ||
set_error(EIO, "Fail to add file to writer"); | ||
return; | ||
|
||
// If LocalSnapshotCopier::cancel() has been called, just cancel the remaining items. | ||
{ | ||
std::unique_lock lck{_mutex}; | ||
if (_cancelled) { | ||
remote_file_context.session->cancel(); | ||
lck.unlock(); | ||
remote_file_context.session->join(); | ||
continue; | ||
} | ||
} | ||
|
||
{ | ||
std::unique_lock lck{_mutex}; | ||
_cur_session = remote_file_context.session.get(); | ||
} | ||
|
||
remote_file_context.session->join(); | ||
|
||
{ | ||
std::unique_lock lck{_mutex}; | ||
_cur_session = nullptr; | ||
} | ||
|
||
// If LocalSnapshotCopier::cancel() has been called, just continue cancelling the remaining ones. | ||
if (_cancelled) { | ||
continue; | ||
} | ||
|
||
if (!remote_file_context.session->status().ok()) { | ||
set_error(remote_file_context.session->status().error_code(), "%s", | ||
remote_file_context.session->status().error_cstr()); | ||
failed = true; | ||
continue; | ||
} | ||
|
||
if (_writer->add_file(remote_file_context.filename, &remote_file_context.meta) != 0) { | ||
set_error(EIO, "Fail to add file to writer"); | ||
failed = true; | ||
continue; | ||
} | ||
} | ||
|
||
if (!failed) { | ||
if (_writer->sync() != 0) { | ||
set_error(EIO, "Fail to sync writer"); | ||
return; | ||
set_error(EIO, "Fail to sync writer"); | ||
return; | ||
} | ||
} | ||
} | ||
|
||
void LocalSnapshotCopier::start() { | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copy 64(default value) files first, and then the next 64 files?
It seems not a good concurrency strategy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right it is not ideal, but it is still a dramatic improvement and would have prevented a production incident recently.
I can see if there is an easy way to try to send requests as others complete so we don't do it in chunks. I'm worried that will require more fundamental changes. You can't just call Session::join() because you don't know which request will finish first. You could repeatedly poll Session::status() but that would be a waste of CPU. I think a callback / continuation would have to be added to Session to do this properly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess another alternative is to just send all of them concurrently, but enable the snapshot throttle. But that could break anyone who doesn't use the snapshot throttle.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got you. If
RemoteFileCopier::Session
has such routine ason_finish
we can have a better solution.But if the files have the same size (roughly), we can assume that the files in one chunk would be finished in the same time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have started making this change but probably won’t finish it until next week.