diff --git a/src/braft/ballot_box.cpp b/src/braft/ballot_box.cpp
index 705c72e3..2b8471af 100644
--- a/src/braft/ballot_box.cpp
+++ b/src/braft/ballot_box.cpp
@@ -167,10 +167,22 @@ void BallotBox::describe(std::ostream& os, bool use_html) {
lck.unlock();
const char *newline = use_html ? "
" : "\r\n";
os << "last_committed_index: " << committed_index << newline;
- if (pending_index != 0) {
+ if (pending_queue_size != 0) {
os << "pending_index: " << pending_index << newline;
os << "pending_queue_size: " << pending_queue_size << newline;
}
}
+void BallotBox::get_status(BallotBoxStatus* status) {
+ if (!status) {
+ return;
+ }
+ std::unique_lock lck(_mutex);
+ status->committed_index = _last_committed_index;
+ if (_pending_meta_queue.size() != 0) {
+ status->pending_index = _pending_index;
+ status->pending_queue_size = _pending_meta_queue.size();
+ }
+}
+
} // namespace braft
diff --git a/src/braft/ballot_box.h b/src/braft/ballot_box.h
index 6148831c..f824f3ea 100644
--- a/src/braft/ballot_box.h
+++ b/src/braft/ballot_box.h
@@ -39,6 +39,15 @@ struct BallotBoxOptions {
ClosureQueue* closure_queue;
};
+struct BallotBoxStatus {
+ BallotBoxStatus()
+ : committed_index(0), pending_index(0), pending_queue_size(0)
+ {}
+ int64_t committed_index;
+ int64_t pending_index;
+ int64_t pending_queue_size;
+};
+
class BallotBox {
public:
BallotBox();
@@ -79,6 +88,8 @@ class BallotBox {
void describe(std::ostream& os, bool use_html);
+ void get_status(BallotBoxStatus* ballot_box_status);
+
private:
FSMCaller* _waiter;
diff --git a/src/braft/errno.proto b/src/braft/errno.proto
index ae7f64a9..3fc909ec 100644
--- a/src/braft/errno.proto
+++ b/src/braft/errno.proto
@@ -30,5 +30,7 @@ enum RaftError {
ELOGDELETED = 10014;
// No available user log to read
ENOMOREUSERLOG = 10015;
+ // Raft node in readonly mode
+ EREADONLY = 10016;
};
diff --git a/src/braft/file_service.cpp b/src/braft/file_service.cpp
index 23f7f510..516ff2df 100644
--- a/src/braft/file_service.cpp
+++ b/src/braft/file_service.cpp
@@ -46,9 +46,9 @@ void FileServiceImpl::get_file(::google::protobuf::RpcController* controller,
// Don't touch iter ever after
reader = iter->second;
lck.unlock();
- BRAFT_VLOG << "get_file from " << cntl->remote_side() << " path=" << reader->path()
- << " filename=" << request->filename()
- << " offset=" << request->offset() << " count=" << request->count();
+ BRAFT_VLOG << "get_file for " << cntl->remote_side() << " path=" << reader->path()
+ << " filename=" << request->filename()
+ << " offset=" << request->offset() << " count=" << request->count();
if (request->count() <= 0 || request->offset() < 0) {
cntl->SetFailed(brpc::EREQUEST, "Invalid request=%s",
diff --git a/src/braft/file_system_adaptor.cpp b/src/braft/file_system_adaptor.cpp
index 0a376f96..942883ce 100644
--- a/src/braft/file_system_adaptor.cpp
+++ b/src/braft/file_system_adaptor.cpp
@@ -37,7 +37,6 @@ const char* PosixDirReader::name() const {
}
PosixFileAdaptor::~PosixFileAdaptor() {
- ::close(_fd);
}
ssize_t PosixFileAdaptor::write(const butil::IOBuf& data, off_t offset) {
@@ -57,6 +56,15 @@ bool PosixFileAdaptor::sync() {
return raft_fsync(_fd) == 0;
}
+bool PosixFileAdaptor::close() {
+ if (_fd > 0) {
+ bool res = ::close(_fd) == 0;
+ _fd = -1;
+ return res;
+ }
+ return true;
+}
+
static pthread_once_t s_check_cloexec_once = PTHREAD_ONCE_INIT;
static bool s_support_cloexec_on_open = false;
diff --git a/src/braft/file_system_adaptor.h b/src/braft/file_system_adaptor.h
index 31fc7af7..18e83b47 100644
--- a/src/braft/file_system_adaptor.h
+++ b/src/braft/file_system_adaptor.h
@@ -56,12 +56,13 @@ class DirReader {
template
struct DestroyObj {
- void operator()(T* const obj) { obj->destroy(); }
+ void operator()(T* const obj) { obj->close(); delete obj; }
};
class FileAdaptor {
public:
+ virtual ~FileAdaptor() {}
// Write to the file. Different from posix ::pwrite(), write will retry automatically
// when occur EINTR.
// Return |data.size()| if successful, -1 otherwise.
@@ -79,13 +80,12 @@ class FileAdaptor {
// Sync data of the file to disk device
virtual bool sync() = 0;
- // Destroy this adaptor
- virtual void destroy() { delete this; }
+ // Close the descriptor of this file adaptor
+ virtual bool close() = 0;
protected:
FileAdaptor() {}
- virtual ~FileAdaptor() {}
private:
DISALLOW_COPY_AND_ASSIGN(FileAdaptor);
@@ -177,6 +177,7 @@ friend class PosixFileSystemAdaptor;
virtual ssize_t read(butil::IOPortal* portal, off_t offset, size_t size);
virtual ssize_t size();
virtual bool sync();
+ virtual bool close();
protected:
PosixFileAdaptor(int fd) : _fd(fd) {}
diff --git a/src/braft/fsm_caller.cpp b/src/braft/fsm_caller.cpp
index 0410281f..2a1eca26 100644
--- a/src/braft/fsm_caller.cpp
+++ b/src/braft/fsm_caller.cpp
@@ -31,6 +31,9 @@
namespace braft {
+static bvar::CounterRecorder g_commit_tasks_batch_counter(
+ "raft_commit_tasks_batch_counter");
+
FSMCaller::FSMCaller()
: _log_manager(NULL)
, _fsm(NULL)
@@ -41,6 +44,7 @@ FSMCaller::FSMCaller()
, _node(NULL)
, _cur_task(IDLE)
, _applying_index(0)
+ , _queue_started(false)
{
}
@@ -55,16 +59,20 @@ int FSMCaller::run(void* meta, bthread::TaskIterator& iter) {
return 0;
}
int64_t max_committed_index = -1;
+ int64_t counter = 0;
for (; iter; ++iter) {
if (iter->type == COMMITTED) {
if (iter->committed_index > max_committed_index) {
max_committed_index = iter->committed_index;
+ counter++;
}
} else {
if (max_committed_index >= 0) {
caller->_cur_task = COMMITTED;
caller->do_committed(max_committed_index);
max_committed_index = -1;
+ g_commit_tasks_batch_counter << counter;
+ counter = 0;
}
switch (iter->type) {
case COMMITTED:
@@ -115,6 +123,8 @@ int FSMCaller::run(void* meta, bthread::TaskIterator& iter) {
if (max_committed_index >= 0) {
caller->_cur_task = COMMITTED;
caller->do_committed(max_committed_index);
+ g_commit_tasks_batch_counter << counter;
+ counter = 0;
}
caller->_cur_task = IDLE;
return 0;
@@ -155,15 +165,22 @@ int FSMCaller::init(const FSMCallerOptions &options) {
execq_opt.bthread_attr = options.usercode_in_pthread
? BTHREAD_ATTR_PTHREAD
: BTHREAD_ATTR_NORMAL;
- bthread::execution_queue_start(&_queue_id,
+ if (bthread::execution_queue_start(&_queue_id,
&execq_opt,
FSMCaller::run,
- this);
+ this) != 0) {
+ LOG(ERROR) << "fsm fail to start execution_queue";
+ return -1;
+ }
+ _queue_started = true;
return 0;
}
int FSMCaller::shutdown() {
- return bthread::execution_queue_stop(_queue_id);
+ if (_queue_started) {
+ return bthread::execution_queue_stop(_queue_id);
+ }
+ return 0;
}
void FSMCaller::do_shutdown() {
@@ -257,7 +274,8 @@ void FSMCaller::do_committed(int64_t committed_index) {
if (iter_impl.entry()->old_peers == NULL) {
// Joint stage is not supposed to be noticeable by end users.
_fsm->on_configuration_committed(
- Configuration(*iter_impl.entry()->peers));
+ Configuration(*iter_impl.entry()->peers),
+ iter_impl.entry()->id.index);
}
}
// For other entries, we have nothing to do besides flush the
@@ -271,8 +289,8 @@ void FSMCaller::do_committed(int64_t committed_index) {
}
Iterator iter(&iter_impl);
_fsm->on_apply(iter);
- LOG_IF(ERROR, iter.valid())
- << "Node " << _node->node_id()
+ LOG_IF(ERROR, iter.valid())
+ << "Node " << _node->node_id()
<< " Iterator is still valid, did you return before iterator "
" reached the end?";
// Try move to next in case that we pass the same log twice.
@@ -391,7 +409,7 @@ void FSMCaller::do_snapshot_load(LoadSnapshotClosure* done) {
for (int i = 0; i < meta.peers_size(); ++i) {
conf.add_peer(meta.peers(i));
}
- _fsm->on_configuration_committed(conf);
+ _fsm->on_configuration_committed(conf, meta.last_included_index());
}
_last_applied_index.store(meta.last_included_index(),
@@ -499,8 +517,20 @@ void FSMCaller::describe(std::ostream &os, bool use_html) {
os << newline;
}
+int64_t FSMCaller::applying_index() const {
+ TaskType cur_task = _cur_task;
+ if (cur_task != COMMITTED) {
+ return 0;
+ } else {
+ return _applying_index.load(butil::memory_order_relaxed);
+ }
+}
+
void FSMCaller::join() {
- bthread::execution_queue_join(_queue_id);
+ if (_queue_started) {
+ bthread::execution_queue_join(_queue_id);
+ _queue_started = false;
+ }
}
IteratorImpl::IteratorImpl(StateMachine* sm, LogManager* lm,
diff --git a/src/braft/fsm_caller.h b/src/braft/fsm_caller.h
index 7c508658..de8a67b3 100644
--- a/src/braft/fsm_caller.h
+++ b/src/braft/fsm_caller.h
@@ -119,6 +119,7 @@ class BAIDU_CACHELINE_ALIGNMENT FSMCaller {
int64_t last_applied_index() const {
return _last_applied_index.load(butil::memory_order_relaxed);
}
+ int64_t applying_index() const;
void describe(std::ostream& os, bool use_html);
void join();
private:
@@ -183,6 +184,7 @@ friend class IteratorImpl;
TaskType _cur_task;
butil::atomic _applying_index;
Error _error;
+ bool _queue_started;
};
};
diff --git a/src/braft/log.cpp b/src/braft/log.cpp
index 872a3004..27c01cde 100644
--- a/src/braft/log.cpp
+++ b/src/braft/log.cpp
@@ -272,6 +272,7 @@ int Segment::load(ConfigurationManager* configuration_manager) {
// load entry index
int64_t file_size = st_buf.st_size;
int64_t entry_off = 0;
+ int64_t actual_last_index = _first_index - 1;
for (int64_t i = _first_index; entry_off < file_size; i++) {
EntryHeader header;
const int rc = _load_entry(entry_off, &header, NULL, ENTRY_HEADER_SIZE);
@@ -310,12 +311,21 @@ int Segment::load(ConfigurationManager* configuration_manager) {
}
}
_offset_and_term.push_back(std::make_pair(entry_off, header.term));
- if (_is_open) {
- ++_last_index;
- }
+ ++actual_last_index;
entry_off += skip_len;
}
+ if (ret == 0 && !_is_open && actual_last_index < _last_index) {
+ LOG(ERROR) << "data lost in a full segment, path: " << _path
+ << " first_index: " << _first_index << " expect_last_index: "
+ << _last_index << " actual_last_index: " << actual_last_index;
+ ret = -1;
+ }
+
+ if (_is_open) {
+ _last_index = actual_last_index;
+ }
+
// truncate last uncompleted entry
if (ret == 0 && entry_off != file_size) {
LOG(INFO) << "truncate last uncompleted write entry, path: " << _path
@@ -983,7 +993,7 @@ int SegmentLogStorage::list_segments(bool is_empty) {
}
last_log_index = segment->last_index();
- it++;
+ ++it;
}
if (_open_segment) {
if (last_log_index == -1 &&
@@ -1059,8 +1069,8 @@ int SegmentLogStorage::save_meta(const int64_t log_index) {
timer.stop();
PLOG_IF(ERROR, ret != 0) << "Fail to save meta to " << meta_path;
- BRAFT_VLOG << "log save_meta " << meta_path << " log_index: " << log_index
- << " time: " << timer.u_elapsed();
+ LOG(INFO) << "log save_meta " << meta_path << " first_log_index: " << log_index
+ << " time: " << timer.u_elapsed();
return ret;
}
@@ -1081,8 +1091,8 @@ int SegmentLogStorage::load_meta() {
_first_log_index.store(meta.first_log_index());
timer.stop();
- BRAFT_VLOG << "log load_meta " << meta_path << " log_index: " << meta.first_log_index()
- << " time: " << timer.u_elapsed();
+ LOG(INFO) << "log load_meta " << meta_path << " first_log_index: " << meta.first_log_index()
+ << " time: " << timer.u_elapsed();
return 0;
}
diff --git a/src/braft/log.h b/src/braft/log.h
index 549fa4fa..6c9fa81d 100644
--- a/src/braft/log.h
+++ b/src/braft/log.h
@@ -96,7 +96,7 @@ class BAIDU_CACHELINE_ALIGNMENT Segment
std::string file_name();
private:
-friend butil::RefCountedThreadSafe;
+friend class butil::RefCountedThreadSafe;
~Segment() {
if (_fd >= 0) {
::close(_fd);
diff --git a/src/braft/log_entry.cpp b/src/braft/log_entry.cpp
index 83b536b9..540ac17a 100644
--- a/src/braft/log_entry.cpp
+++ b/src/braft/log_entry.cpp
@@ -14,7 +14,6 @@
// Authors: Zhangyi Chen(chenzhangyi01@baidu.com)
-#include
#include "braft/log_entry.h"
#include "braft/local_storage.pb.h"
diff --git a/src/braft/log_entry.h b/src/braft/log_entry.h
index 4de5d50f..2d1a106c 100644
--- a/src/braft/log_entry.h
+++ b/src/braft/log_entry.h
@@ -19,7 +19,6 @@
#include // butil::IOBuf
#include // butil::RefCountedThreadSafe
-#include
#include // fmix64
#include "braft/configuration.h"
#include "braft/raft.pb.h"
diff --git a/src/braft/log_manager.cpp b/src/braft/log_manager.cpp
index 083584c8..c5c4a193 100644
--- a/src/braft/log_manager.cpp
+++ b/src/braft/log_manager.cpp
@@ -40,8 +40,13 @@ static bvar::Adder g_read_term_from_storage
static bvar::PerSecond > g_read_term_from_storage_second
("raft_read_term_from_storage_second", &g_read_term_from_storage);
-static bvar::LatencyRecorder g_storage_append_entries_latency("raft_storage_append_entries");
-static bvar::LatencyRecorder g_nomralized_append_entries_latency("raft_storage_append_entries_normalized");
+static bvar::LatencyRecorder g_storage_append_entries_latency(
+ "raft_storage_append_entries");
+static bvar::LatencyRecorder g_nomralized_append_entries_latency(
+ "raft_storage_append_entries_normalized");
+
+static bvar::CounterRecorder g_storage_flush_batch_counter(
+ "raft_storage_flush_batch_counter");
LogManagerOptions::LogManagerOptions()
: log_storage(NULL)
@@ -483,6 +488,7 @@ class AppendBatcher {
void flush() {
if (_size > 0) {
_lm->append_to_storage(&_to_append, _last_id);
+ g_storage_flush_batch_counter << _size;
for (size_t i = 0; i < _size; ++i) {
_storage[i]->_entries.clear();
if (_lm->_has_error.load(butil::memory_order_relaxed)) {
@@ -619,7 +625,7 @@ void LogManager::set_snapshot(const SnapshotMeta* meta) {
_config_manager->set_snapshot(entry);
int64_t term = unsafe_get_term(meta->last_included_index());
- const int64_t saved_last_snapshot_index = _last_snapshot_id.index;
+ const LogId last_but_one_snapshot_id = _last_snapshot_id;
_last_snapshot_id.index = meta->last_included_index();
_last_snapshot_id.term = meta->last_included_term();
if (_last_snapshot_id > _applied_id) {
@@ -628,6 +634,7 @@ void LogManager::set_snapshot(const SnapshotMeta* meta) {
if (term == 0) {
// last_included_index is larger than last_index
// FIXME: what if last_included_index is less than first_index?
+ _virtual_first_log_id = _last_snapshot_id;
truncate_prefix(meta->last_included_index() + 1, lck);
return;
} else if (term == meta->last_included_term()) {
@@ -635,13 +642,15 @@ void LogManager::set_snapshot(const SnapshotMeta* meta) {
// We don't truncate log before the lastest snapshot immediately since
// some log around last_snapshot_index is probably needed by some
// followers
- if (saved_last_snapshot_index > 0) {
+ if (last_but_one_snapshot_id.index > 0) {
// We have last snapshot index
- truncate_prefix(saved_last_snapshot_index + 1, lck);
+ _virtual_first_log_id = last_but_one_snapshot_id;
+ truncate_prefix(last_but_one_snapshot_id.index + 1, lck);
}
return;
} else {
// TODO: check the result of reset.
+ _virtual_first_log_id = _last_snapshot_id;
reset(meta->last_included_index() + 1, lck);
return;
}
@@ -651,6 +660,7 @@ void LogManager::set_snapshot(const SnapshotMeta* meta) {
void LogManager::clear_bufferred_logs() {
std::unique_lock lck(_mutex);
if (_last_snapshot_id.index != 0) {
+ _virtual_first_log_id = _last_snapshot_id;
truncate_prefix(_last_snapshot_id.index + 1, lck);
}
}
@@ -672,15 +682,20 @@ int64_t LogManager::unsafe_get_term(const int64_t index) {
if (index == 0) {
return 0;
}
-
- if (index > _last_log_index) {
- return 0;
+ // check virtual first log
+ if (index == _virtual_first_log_id.index) {
+ return _virtual_first_log_id.term;
}
-
- // check index equal snapshot_index, return snapshot_term
+ // check last_snapshot_id
if (index == _last_snapshot_id.index) {
return _last_snapshot_id.term;
}
+ // out of range, direct return NULL
+ // check this after check last_snapshot_id, because it is likely that
+ // last_snapshot_id < first_log_index
+ if (index > _last_log_index || index < _first_log_index) {
+ return 0;
+ }
LogEntry* entry = get_entry_from_memory(index);
if (entry) {
@@ -694,17 +709,21 @@ int64_t LogManager::get_term(const int64_t index) {
if (index == 0) {
return 0;
}
-
std::unique_lock lck(_mutex);
- // out of range, direct return NULL
- if (index > _last_log_index) {
- return 0;
+ // check virtual first log
+ if (index == _virtual_first_log_id.index) {
+ return _virtual_first_log_id.term;
}
-
- // check index equal snapshot_index, return snapshot_term
+ // check last_snapshot_id
if (index == _last_snapshot_id.index) {
return _last_snapshot_id.term;
}
+ // out of range, direct return NULL
+ // check this after check last_snapshot_id, because it is likely that
+ // last_snapshot_id < first_log_index
+ if (index > _last_log_index || index < _first_log_index) {
+ return 0;
+ }
LogEntry* entry = get_entry_from_memory(index);
if (entry) {
@@ -881,6 +900,17 @@ void LogManager::describe(std::ostream& os, bool use_html) {
os << "last_log_id: " << last_log_id() << newline;
}
+void LogManager::get_status(LogManagerStatus* status) {
+ if (!status) {
+ return;
+ }
+ std::unique_lock lck(_mutex);
+ status->first_index = _log_storage->first_log_index();
+ status->last_index = _log_storage->last_log_index();
+ status->disk_index = _disk_id.index;
+ status->known_applied_index = _applied_id.index;
+}
+
void LogManager::report_error(int error_code, const char* fmt, ...) {
_has_error.store(true, butil::memory_order_relaxed);
va_list ap;
diff --git a/src/braft/log_manager.h b/src/braft/log_manager.h
index d51054a1..8dd4cef6 100644
--- a/src/braft/log_manager.h
+++ b/src/braft/log_manager.h
@@ -40,6 +40,16 @@ struct LogManagerOptions {
FSMCaller* fsm_caller; // To report log error
};
+struct LogManagerStatus {
+ LogManagerStatus()
+ : first_index(1), last_index(0), disk_index(0), known_applied_index(0)
+ {}
+ int64_t first_index;
+ int64_t last_index;
+ int64_t disk_index;
+ int64_t known_applied_index;
+};
+
class SnapshotMeta;
class BAIDU_CACHELINE_ALIGNMENT LogManager {
@@ -133,6 +143,9 @@ class BAIDU_CACHELINE_ALIGNMENT LogManager {
void describe(std::ostream& os, bool use_html);
+ // Get the internal status of LogManager.
+ void get_status(LogManagerStatus* status);
+
private:
friend class AppendBatcher;
struct WaitMeta {
@@ -200,7 +213,14 @@ friend class AppendBatcher;
std::deque _logs_in_memory;
int64_t _first_log_index;
int64_t _last_log_index;
+ // the last snapshot's log_id
LogId _last_snapshot_id;
+ // the virtual first log, for finding next_index of replicator, which
+ // can avoid install_snapshot too often in extreme case where a follower's
+ // install_snapshot is slower than leader's save_snapshot
+ // [NOTICE] there should not be hole between this log_id and _last_snapshot_id,
+ // or may cause some unexpect cases
+ LogId _virtual_first_log_id;
bthread::ExecutionQueueId _disk_queue;
};
diff --git a/src/braft/node.cpp b/src/braft/node.cpp
index 2b5cc8fc..91bde804 100644
--- a/src/braft/node.cpp
+++ b/src/braft/node.cpp
@@ -60,6 +60,20 @@ static bvar::Adder g_num_nodes("raft_node_count");
bvar::Adder g_num_nodes("raft_node_count");
#endif
+static bvar::CounterRecorder g_apply_tasks_batch_counter(
+ "raft_apply_tasks_batch_counter");
+
+int SnapshotTimer::adjust_timeout_ms(int timeout_ms) {
+ if (!_first_schedule) {
+ return timeout_ms;
+ }
+ if (timeout_ms > 0) {
+ timeout_ms = butil::fast_rand_less_than(timeout_ms) + 1;
+ }
+ _first_schedule = false;
+ return timeout_ms;
+}
+
class ConfigurationChangeDone : public Closure {
public:
void Run() {
@@ -115,9 +129,12 @@ NodeImpl::NodeImpl(const GroupId& group_id, const PeerId& peer_id)
, _ballot_box(NULL)
, _snapshot_executor(NULL)
, _stop_transfer_arg(NULL)
+ , _vote_triggered(false)
, _waking_candidate(0)
, _append_entries_cache(NULL)
- , _append_entries_cache_version(0) {
+ , _append_entries_cache_version(0)
+ , _node_readonly(false)
+ , _majority_nodes_readonly(false) {
_server_id = peer_id;
AddRef();
g_num_nodes << 1;
@@ -138,6 +155,7 @@ NodeImpl::NodeImpl()
, _ballot_box(NULL)
, _snapshot_executor(NULL)
, _stop_transfer_arg(NULL)
+ , _vote_triggered(false)
, _waking_candidate(0) {
AddRef();
g_num_nodes << 1;
@@ -168,9 +186,11 @@ NodeImpl::~NodeImpl() {
_ballot_box = NULL;
}
- if (_log_storage) {
- delete _log_storage;
- _log_storage = NULL;
+ if (_options.node_owns_log_storage) {
+ if (_log_storage) {
+ delete _log_storage;
+ _log_storage = NULL;
+ }
}
if (_closure_queue) {
delete _closure_queue;
@@ -218,7 +238,11 @@ int NodeImpl::init_snapshot_storage() {
int NodeImpl::init_log_storage() {
CHECK(_fsm_caller);
- _log_storage = LogStorage::create(_options.log_uri);
+ if (_options.log_storage) {
+ _log_storage = _options.log_storage;
+ } else {
+ _log_storage = LogStorage::create(_options.log_uri);
+ }
if (!_log_storage) {
LOG(ERROR) << "node " << _group_id << ":" << _server_id
<< " find log storage failed, uri " << _options.log_uri;
@@ -337,7 +361,6 @@ int NodeImpl::bootstrap(const BootstrapOptions& options) {
_options.log_uri = options.log_uri;
_options.raft_meta_uri = options.raft_meta_uri;
_options.snapshot_uri = options.snapshot_uri;
- _options.snapshot_file_system_adaptor = _options.snapshot_file_system_adaptor;
_config_manager = new ConfigurationManager();
// Create _fsm_caller first as log_manager needs it to report error
@@ -418,7 +441,7 @@ int NodeImpl::init(const NodeOptions& options) {
if (!NodeManager::GetInstance()->server_exists(_server_id.addr)) {
LOG(ERROR) << "Group " << _group_id
<< " No RPC Server attached to " << _server_id.addr
- << ", did you forget to call raft::add_service()?";
+ << ", did you forget to call braft::add_service()?";
return -1;
}
@@ -644,10 +667,14 @@ class OnCaughtUp : public CatchupClosure {
void NodeImpl::on_caughtup(const PeerId& peer, int64_t term,
int64_t version, const butil::Status& st) {
BAIDU_SCOPED_LOCK(_mutex);
- // CHECK _current_term and _state to avoid ABA problem
- if (term != _current_term && _state != STATE_LEADER) {
- // term has changed and nothing should be done, otherwise there will be
- // an ABA problem.
+ // CHECK _state and _current_term to avoid ABA problem
+ if (_state != STATE_LEADER || term != _current_term) {
+ // if leader stepped down, reset() has already been called in step_down(),
+ // so nothing needs to be done here
+ LOG(WARNING) << "node " << node_id() << " stepped down when waiting peer "
+ << peer << " to catch up, current state is " << state2str(_state)
+ << ", current term is " << _current_term
+ << ", expect term is " << term;
return;
}
@@ -662,8 +689,8 @@ void NodeImpl::on_caughtup(const PeerId& peer, int64_t term,
- _replicator_group.last_rpc_send_timestamp(peer))
<= _options.election_timeout_ms) {
- BRAFT_VLOG << "node " << _group_id << ":" << _server_id
- << " waits peer " << peer << " to catch up";
+ LOG(INFO) << "node " << _group_id << ":" << _server_id
+ << " waits peer " << peer << " to catch up";
OnCaughtUp* caught_up = new OnCaughtUp(this, _current_term, peer, version);
timespec due_time = butil::milliseconds_from_now(
@@ -851,6 +878,8 @@ void NodeImpl::snapshot(Closure* done) {
}
void NodeImpl::do_snapshot(Closure* done) {
+ LOG(INFO) << "node " << _group_id << ":" << _server_id
+ << " starts to do snapshot";
if (_snapshot_executor) {
_snapshot_executor->do_snapshot(done);
} else {
@@ -947,10 +976,13 @@ void NodeImpl::handle_election_timeout() {
return;
}
- if ((butil::monotonic_time_ms() - _last_leader_timestamp)
+ // check timestamp, skip one cycle check when trigger vote
+ if (!_vote_triggered &&
+ (butil::monotonic_time_ms() - _last_leader_timestamp)
< _options.election_timeout_ms) {
return;
}
+ _vote_triggered = false;
// Reset leader as the leader is uncerntain on election timeout.
PeerId empty_id;
@@ -1128,6 +1160,22 @@ int NodeImpl::transfer_leadership_to(const PeerId& peer) {
return 0;
}
+void NodeImpl::vote(int election_timeout) {
+ std::unique_lock lck(_mutex);
+ _options.election_timeout_ms = election_timeout;
+ _replicator_group.reset_heartbeat_interval(
+ heartbeat_timeout(_options.election_timeout_ms));
+ _replicator_group.reset_election_timeout_interval(_options.election_timeout_ms);
+ if (_state != STATE_FOLLOWER) {
+ return;
+ }
+ _vote_triggered = true;
+ LOG(INFO) << "node " << _group_id << ":" << _server_id << " trigger-vote,"
+ " current_term " << _current_term << " state " << state2str(_state) <<
+ " election_timeout " << election_timeout;
+ _election_timer.reset(election_timeout);
+}
+
void NodeImpl::reset_election_timeout_ms(int election_timeout_ms) {
std::unique_lock lck(_mutex);
_options.election_timeout_ms = election_timeout_ms;
@@ -1201,8 +1249,8 @@ void NodeImpl::handle_request_vote_response(const PeerId& peer_id, const int64_t
// check stale response
if (term != _current_term) {
LOG(WARNING) << "node " << _group_id << ":" << _server_id
- << " received stale RequestVoteResponse from " << peer_id
- << " term " << term << " current_term " << _current_term;
+ << " received stale RequestVoteResponse from " << peer_id
+ << " term " << term << " current_term " << _current_term;
return;
}
// check response term
@@ -1243,7 +1291,7 @@ struct OnRequestVoteRPCDone : public google::protobuf::Closure {
if (cntl.ErrorCode() != 0) {
LOG(WARNING) << "node " << node->node_id()
<< " received RequestVoteResponse from " << peer
- << " error: " << cntl.ErrorText();
+ << " error: " << cntl.ErrorText();
break;
}
node->handle_request_vote_response(peer, term, response);
@@ -1486,8 +1534,8 @@ void NodeImpl::step_down(const int64_t term, bool wakeup_a_candidate,
BRAFT_VLOG << "node " << _group_id << ":" << _server_id
<< " term " << _current_term
<< " stepdown from " << state2str(_state)
- << " new_term " << term <<
- " wakeup_a_candidate=" << wakeup_a_candidate;
+ << " new_term " << term
+ << " wakeup_a_candidate=" << wakeup_a_candidate;
if (!is_active_state(_state)) {
return;
@@ -1512,8 +1560,10 @@ void NodeImpl::step_down(const int64_t term, bool wakeup_a_candidate,
// soft state in memory
_state = STATE_FOLLOWER;
+ // _conf_ctx.reset() will stop replicators of catching up nodes
_conf_ctx.reset();
_last_leader_timestamp = butil::monotonic_time_ms();
+ _majority_nodes_readonly = false;
clear_append_entries_cache();
@@ -1671,12 +1721,17 @@ void LeaderStableClosure::Run() {
}
void NodeImpl::apply(LogEntryAndClosure tasks[], size_t size) {
+ g_apply_tasks_batch_counter << size;
+
std::vector entries;
entries.reserve(size);
std::unique_lock lck(_mutex);
- if (_state != STATE_LEADER) {
+ bool reject_new_user_logs = (_node_readonly || _majority_nodes_readonly);
+ if (_state != STATE_LEADER || reject_new_user_logs) {
butil::Status st;
- if (_state != STATE_TRANSFERRING) {
+ if (_state == STATE_LEADER && reject_new_user_logs) {
+ st.set_error(EREADONLY, "readonly mode reject new user logs");
+ } else if (_state != STATE_TRANSFERRING) {
st.set_error(EPERM, "is not leader");
} else {
st.set_error(EBUSY, "is transferring leadership");
@@ -1760,7 +1815,7 @@ int NodeImpl::handle_pre_vote_request(const RequestVoteRequest* request,
lck.unlock();
LOG(WARNING) << "node " << _group_id << ":" << _server_id
<< " is not in active state " << "current_term "
- << saved_current_term
+ << saved_current_term
<< " state " << state2str(saved_state);
return EINVAL;
}
@@ -1815,7 +1870,8 @@ int NodeImpl::handle_request_vote_request(const RequestVoteRequest* request,
const State saved_state = _state;
lck.unlock();
LOG(WARNING) << "node " << _group_id << ":" << _server_id
- << " is not in active state " << "current_term " << saved_current_term
+ << " is not in active state " << "current_term "
+ << saved_current_term
<< " state " << state2str(saved_state);
return EINVAL;
}
@@ -2053,6 +2109,7 @@ void NodeImpl::handle_append_entries_request(brpc::Controller* cntl,
int64_t last_index = _log_manager->last_log_index();
int64_t saved_term = request->term();
int saved_entries_size = request->entries_size();
+ std::string rpc_server_id = request->server_id();
if (!from_append_entries_cache &&
handle_out_of_order_append_entries(
cntl, request, response, done, last_index)) {
@@ -2062,7 +2119,7 @@ void NodeImpl::handle_append_entries_request(brpc::Controller* cntl,
done_guard.release();
LOG(WARNING) << "node " << _group_id << ":" << _server_id
<< " cache out-of-order AppendEntries from "
- << request->server_id()
+ << rpc_server_id
<< " in term " << saved_term
<< " prev_log_index " << prev_log_index
<< " prev_log_term " << prev_log_term
@@ -2084,7 +2141,8 @@ void NodeImpl::handle_append_entries_request(brpc::Controller* cntl,
<< " prev_log_term " << request->prev_log_term()
<< " local_prev_log_term " << local_prev_log_term
<< " last_log_index " << last_index
- << " entries_size " << request->entries_size();
+ << " entries_size " << request->entries_size()
+ << " from_append_entries_cache: " << from_append_entries_cache;
return;
}
@@ -2092,6 +2150,7 @@ void NodeImpl::handle_append_entries_request(brpc::Controller* cntl,
response->set_success(true);
response->set_term(_current_term);
response->set_last_log_index(_log_manager->last_log_index());
+ response->set_readonly(_node_readonly);
lck.unlock();
// see the comments at FollowerStableClosure::run()
_ballot_box->set_last_committed_index(
@@ -2201,8 +2260,8 @@ void NodeImpl::handle_install_snapshot_request(brpc::Controller* cntl,
const State saved_state = _state;
lck.unlock();
LOG(WARNING) << "node " << _group_id << ":" << _server_id
- << " is not in active state " << "current_term " << saved_current_term
- << " state " << state2str(saved_state);
+ << " is not in active state " << "current_term "
+ << saved_current_term << " state " << state2str(saved_state);
cntl->SetFailed(EINVAL, "node %s:%s is not in active state, state %s",
_group_id.c_str(), _server_id.to_string().c_str(), state2str(saved_state));
return;
@@ -2301,13 +2360,25 @@ void NodeImpl::describe(std::ostream& os, bool use_html) {
//const int ref_count = ref_count_;
std::vector peers;
_conf.conf.list_peers(&peers);
+
+ const std::string is_changing_conf = _conf_ctx.is_busy() ? "YES" : "NO";
+ const char* conf_statge = _conf_ctx.stage_str();
+ // new_peers and old_peers during all conf-change stages, namely
+ // STAGE_CATCHING_UP->STAGE_JOINT->STAGE_STABLE
+ std::vector new_peers;
+ _conf_ctx.list_new_peers(&new_peers);
+ std::vector old_peers;
+ _conf_ctx.list_old_peers(&old_peers);
+
// No replicator attached to nodes that are not leader;
_replicator_group.list_replicators(&replicators);
const int64_t leader_timestamp = _last_leader_timestamp;
+ const bool readonly = (_node_readonly || _majority_nodes_readonly);
lck.unlock();
const char *newline = use_html ? "
" : "\r\n";
os << "peer_id: " << _server_id << newline;
os << "state: " << state2str(st) << newline;
+ os << "readonly: " << readonly << newline;
os << "term: " << term << newline;
os << "conf_index: " << conf_index << newline;
os << "peers:";
@@ -2324,6 +2395,42 @@ void NodeImpl::describe(std::ostream& os, bool use_html) {
}
os << newline; // newline for peers
+ // info of configuration change
+ if (st == STATE_LEADER) {
+ os << "changing_conf: " << is_changing_conf
+ << " stage: " << conf_statge << newline;
+ }
+ if (!new_peers.empty()) {
+ os << "new_peers:";
+ for (size_t j = 0; j < new_peers.size(); ++j) {
+ os << ' ';
+ if (use_html && new_peers[j] != _server_id) {
+ os << "";
+ }
+ os << new_peers[j];
+ if (use_html && new_peers[j] != _server_id) {
+ os << "";
+ }
+ }
+ os << newline; // newline for new_peers
+ }
+ if (!old_peers.empty()) {
+ os << "old_peers:";
+ for (size_t j = 0; j < old_peers.size(); ++j) {
+ os << ' ';
+ if (use_html && old_peers[j] != _server_id) {
+ os << "";
+ }
+ os << old_peers[j];
+ if (use_html && old_peers[j] != _server_id) {
+ os << "";
+ }
+ }
+ os << newline; // newline for old_peers
+ }
+
if (st == STATE_FOLLOWER) {
os << "leader: ";
if (use_html) {
@@ -2362,6 +2469,67 @@ void NodeImpl::describe(std::ostream& os, bool use_html) {
Replicator::describe(replicators[i], os, use_html);
}
}
+
+void NodeImpl::get_status(NodeStatus* status) {
+ if (status == NULL) {
+ return;
+ }
+
+ std::vector peers;
+ std::vector > replicators;
+ std::unique_lock lck(_mutex);
+ status->state = _state;
+ status->term = _current_term;
+ status->peer_id = _server_id;
+ status->readonly = (_node_readonly || _majority_nodes_readonly);
+ _conf.conf.list_peers(&peers);
+ _replicator_group.list_replicators(&replicators);
+ lck.unlock();
+
+ if (status->state == STATE_LEADER ||
+ status->state == STATE_TRANSFERRING) {
+ status->leader_id = _server_id;
+ } else if (status->state == STATE_FOLLOWER) {
+ status->leader_id = _leader_id;
+ }
+
+ LogManagerStatus log_manager_status;
+ _log_manager->get_status(&log_manager_status);
+ status->known_applied_index = log_manager_status.known_applied_index;
+ status->first_index = log_manager_status.first_index;
+ status->last_index = log_manager_status.last_index;
+ status->disk_index = log_manager_status.disk_index;
+
+ BallotBoxStatus ballot_box_status;
+ _ballot_box->get_status(&ballot_box_status);
+ status->committed_index = ballot_box_status.committed_index;
+ status->pending_index = ballot_box_status.pending_index;
+ status->pending_queue_size = ballot_box_status.pending_queue_size;
+
+ status->applying_index = _fsm_caller->applying_index();
+
+ if (replicators.size() == 0) {
+ return;
+ }
+
+ for (size_t i = 0; i < peers.size(); ++i) {
+ if (peers[i] == _server_id) {
+ continue;
+ }
+ status->stable_followers.insert(std::make_pair(peers[i], PeerStatus()));
+ }
+
+ for (size_t i = 0; i < replicators.size(); ++i) {
+ NodeStatus::PeerStatusMap::iterator it =
+ status->stable_followers.find(replicators[i].first);
+ if (it == status->stable_followers.end()) {
+ it = status->unstable_followers.insert(
+ std::make_pair(replicators[i].first, PeerStatus())).first;
+ }
+ Replicator::get_status(replicators[i].second, &(it->second));
+ }
+}
+
void NodeImpl::stop_replicator(const std::set& keep,
const std::set& drop) {
for (std::set::const_iterator
@@ -2680,7 +2848,7 @@ void NodeImpl::ConfigurationCtx::start(const Configuration& old_conf,
new_conf.diffs(old_conf, &adding, &removing);
_nchanges = adding.size() + removing.size();
- LOG(INFO) << "node " << _node->_group_id << ":" << _node->_server_id
+ LOG(INFO) << "node " << _node->_group_id << ":" << _node->_server_id
<< " change_peers from " << old_conf << " to " << new_conf << noflush;
if (adding.empty()) {
@@ -2729,6 +2897,9 @@ void NodeImpl::ConfigurationCtx::flush(const Configuration& conf,
void NodeImpl::ConfigurationCtx::on_caughtup(
int64_t version, const PeerId& peer_id, bool succ) {
if (version != _version) {
+ LOG(WARNING) << "Node " << _node->node_id()
+ << " on_caughtup with unmatched version=" << version
+ << ", expect version=" << _version;
return;
}
CHECK_EQ(STAGE_CATCHING_UP, _stage);
@@ -2786,9 +2957,21 @@ void NodeImpl::ConfigurationCtx::next_stage() {
}
void NodeImpl::ConfigurationCtx::reset(butil::Status* st) {
+ // reset() should be called only once
+ if (_stage == STAGE_NONE) {
+ BRAFT_VLOG << "node " << _node->node_id()
+ << " reset ConfigurationCtx when stage is STAGE_NONE already";
+ return;
+ }
+
+ LOG(INFO) << "node " << _node->node_id()
+ << " reset ConfigurationCtx, new_peers: " << Configuration(_new_peers)
+ << ", old_peers: " << Configuration(_old_peers);
if (st && st->ok()) {
_node->stop_replicator(_new_peers, _old_peers);
} else {
+ // leader step_down may stop replicators of catching up nodes, leading to
+ // run catchup_closure
_node->stop_replicator(_old_peers, _new_peers);
}
_new_peers.clear();
@@ -2797,6 +2980,7 @@ void NodeImpl::ConfigurationCtx::reset(butil::Status* st) {
++_version;
_stage = STAGE_NONE;
_nchanges = 0;
+ _node->check_majority_nodes_readonly();
if (_done) {
if (!st) {
_done->status().set_error(EPERM, "leader stepped down");
@@ -2808,6 +2992,69 @@ void NodeImpl::ConfigurationCtx::reset(butil::Status* st) {
}
}
+void NodeImpl::enter_readonly_mode() {
+ BAIDU_SCOPED_LOCK(_mutex);
+ if (!_node_readonly) {
+ LOG(INFO) << "node " << _group_id << ":" << _server_id
+ << " enter readonly mode";
+ _node_readonly = true;
+ }
+}
+
+void NodeImpl::leave_readonly_mode() {
+ BAIDU_SCOPED_LOCK(_mutex);
+ if (_node_readonly) {
+ LOG(INFO) << "node " << _group_id << ":" << _server_id
+ << " leave readonly mode";
+ _node_readonly = false;
+ }
+}
+
+bool NodeImpl::readonly() {
+ BAIDU_SCOPED_LOCK(_mutex);
+ return _node_readonly || _majority_nodes_readonly;
+}
+
+int NodeImpl::change_readonly_config(int64_t term, const PeerId& peer_id, bool readonly) {
+ BAIDU_SCOPED_LOCK(_mutex);
+ if (term != _current_term && _state != STATE_LEADER) {
+ return EINVAL;
+ }
+ _replicator_group.change_readonly_config(peer_id, readonly);
+ check_majority_nodes_readonly();
+ return 0;
+}
+
+void NodeImpl::check_majority_nodes_readonly() {
+ check_majority_nodes_readonly(_conf.conf);
+ if (!_conf.old_conf.empty()) {
+ check_majority_nodes_readonly(_conf.old_conf);
+ }
+}
+
+void NodeImpl::check_majority_nodes_readonly(const Configuration& conf) {
+ std::vector peers;
+ conf.list_peers(&peers);
+ size_t readonly_nodes = 0;
+ for (size_t i = 0; i < peers.size(); i++) {
+ if (peers[i] == _server_id) {
+ readonly_nodes += ((_node_readonly) ? 1: 0);
+ continue;
+ }
+ if (_replicator_group.readonly(peers[i])) {
+ ++readonly_nodes;
+ }
+ }
+ size_t writable_nodes = peers.size() - readonly_nodes;
+ bool prev_readonly = _majority_nodes_readonly;
+ _majority_nodes_readonly = !(writable_nodes >= (peers.size() / 2 + 1));
+ if (prev_readonly != _majority_nodes_readonly) {
+ LOG(INFO) << "node " << _group_id << ":" << _server_id
+ << " majority readonly change from " << (prev_readonly ? "enable" : "disable")
+ << " to " << (_majority_nodes_readonly ? " enable" : "disable");
+ }
+}
+
// Timers
int NodeTimer::init(NodeImpl* node, int timeout_ms) {
BRAFT_RETURN_IF(RepeatedTimerTask::init(timeout_ms) != 0, -1);
diff --git a/src/braft/node.h b/src/braft/node.h
index b2e43b20..8bf60cc7 100644
--- a/src/braft/node.h
+++ b/src/braft/node.h
@@ -76,8 +76,13 @@ class StepdownTimer : public NodeTimer {
};
class SnapshotTimer : public NodeTimer {
+public:
+ SnapshotTimer() : _first_schedule(true) {}
protected:
void run();
+ int adjust_timeout_ms(int timeout_ms);
+private:
+ bool _first_schedule;
};
class BAIDU_CACHELINE_ALIGNMENT NodeImpl
@@ -139,6 +144,9 @@ friend class ConfigurationChangeDone;
// trigger snapshot
void snapshot(Closure* done);
+ // trigger vote
+ void vote(int election_timeout);
+
// reset the election_timeout for the very node
void reset_election_timeout_ms(int election_timeout_ms);
@@ -195,6 +203,18 @@ friend class ConfigurationChangeDone;
void update_configuration_after_installing_snapshot();
void describe(std::ostream& os, bool use_html);
+
+ // Get the internal status of this node, the information is mostly the same as we
+ // see from the website, which is generated by |describe| actually.
+ void get_status(NodeStatus* status);
+
+ // Readonly mode func
+ void enter_readonly_mode();
+ void leave_readonly_mode();
+ bool readonly();
+ int change_readonly_config(int64_t term, const PeerId& peer_id, bool readonly);
+ void check_majority_nodes_readonly();
+ void check_majority_nodes_readonly(const Configuration& conf);
// Call on_error when some error happens, after this is called.
// After this point:
@@ -284,13 +304,38 @@ friend class butil::RefCountedThreadSafe;
DISALLOW_COPY_AND_ASSIGN(ConfigurationCtx);
public:
enum Stage {
- STAGE_NONE,
- STAGE_CATCHING_UP,
- STAGE_JOINT,
- STAGE_STABLE,
+ // Don't change the order if you are not sure about the usage
+ STAGE_NONE = 0,
+ STAGE_CATCHING_UP = 1,
+ STAGE_JOINT = 2,
+ STAGE_STABLE = 3,
};
ConfigurationCtx(NodeImpl* node) :
_node(node), _stage(STAGE_NONE), _version(0), _done(NULL) {}
+ void list_new_peers(std::vector* new_peers) const {
+ new_peers->clear();
+ std::set::iterator it;
+ for (it = _new_peers.begin(); it != _new_peers.end(); ++it) {
+ new_peers->push_back(*it);
+ }
+ }
+ void list_old_peers(std::vector* old_peers) const {
+ old_peers->clear();
+ std::set::iterator it;
+ for (it = _old_peers.begin(); it != _old_peers.end(); ++it) {
+ old_peers->push_back(*it);
+ }
+ }
+ const char* stage_str() {
+ const char* str[] = {"STAGE_NONE", "STAGE_CATCHING_UP",
+ "STAGE_JOINT", "STAGE_STABLE", };
+ if (_stage <= STAGE_STABLE) {
+ return str[(int)_stage];
+ } else {
+ return "UNKNOWN";
+ }
+ }
+ int32_t stage() const { return _stage; }
void reset(butil::Status* st = NULL);
bool is_busy() const { return _stage != STAGE_NONE; }
// Start change configuration.
@@ -394,11 +439,16 @@ friend class butil::RefCountedThreadSafe;
SnapshotTimer _snapshot_timer;
bthread_timer_t _transfer_timer;
StopTransferArg* _stop_transfer_arg;
+ bool _vote_triggered;
ReplicatorId _waking_candidate;
bthread::ExecutionQueueId _apply_queue_id;
bthread::ExecutionQueue::scoped_ptr_t _apply_queue;
AppendEntriesCache* _append_entries_cache;
int64_t _append_entries_cache_version;
+
+ // for readonly mode
+ bool _node_readonly;
+ bool _majority_nodes_readonly;
};
}
diff --git a/src/braft/raft.cpp b/src/braft/raft.cpp
index 1e66fe09..93b3f1c4 100644
--- a/src/braft/raft.cpp
+++ b/src/braft/raft.cpp
@@ -152,6 +152,10 @@ void Node::snapshot(Closure* done) {
_impl->snapshot(done);
}
+void Node::vote(int election_timeout) {
+ _impl->vote(election_timeout);
+}
+
void Node::reset_election_timeout_ms(int election_timeout_ms) {
_impl->reset_election_timeout_ms(election_timeout_ms);
}
@@ -164,6 +168,22 @@ butil::Status Node::read_committed_user_log(const int64_t index, UserLog* user_l
return _impl->read_committed_user_log(index, user_log);
}
+void Node::get_status(NodeStatus* status) {
+ return _impl->get_status(status);
+}
+
+void Node::enter_readonly_mode() {
+ return _impl->enter_readonly_mode();
+}
+
+void Node::leave_readonly_mode() {
+ return _impl->leave_readonly_mode();
+}
+
+bool Node::readonly() {
+ return _impl->readonly();
+}
+
// ------------- Iterator
void Iterator::next() {
if (valid()) {
@@ -228,6 +248,11 @@ void StateMachine::on_configuration_committed(const Configuration& conf) {
return;
}
+void StateMachine::on_configuration_committed(const Configuration& conf, int64_t index) {
+ (void)index;
+ return on_configuration_committed(conf);
+}
+
void StateMachine::on_stop_following(const LeaderChangeContext&) {}
void StateMachine::on_start_following(const LeaderChangeContext&) {}
diff --git a/src/braft/raft.h b/src/braft/raft.h
index 2b7c79f0..1604b14b 100644
--- a/src/braft/raft.h
+++ b/src/braft/raft.h
@@ -43,6 +43,7 @@ class SnapshotHook;
class LeaderChangeContext;
class FileSystemAdaptor;
class SnapshotThrottle;
+class LogStorage;
const PeerId ANY_PEER(butil::EndPoint(butil::IP_ANY, 0), 0);
@@ -241,6 +242,7 @@ class StateMachine {
// Invoked when a configuration has been committed to the group
virtual void on_configuration_committed(const ::braft::Configuration& conf);
+ virtual void on_configuration_committed(const ::braft::Configuration& conf, int64_t index);
// this method is called when a follower stops following a leader and its leader_id becomes NULL,
// situations including:
@@ -352,6 +354,84 @@ inline std::ostream& operator<<(std::ostream& os, const UserLog& user_log) {
return os;
}
+// Status of a peer
+struct PeerStatus {
+ PeerStatus()
+ : valid(false), installing_snapshot(false), next_index(0)
+ , last_rpc_send_timestamp(0), flying_append_entries_size(0)
+ , readonly_index(0), consecutive_error_times(0)
+ {}
+
+ bool valid;
+ bool installing_snapshot;
+ int64_t next_index;
+ int64_t last_rpc_send_timestamp;
+ int64_t flying_append_entries_size;
+ int64_t readonly_index;
+ int consecutive_error_times;
+};
+
+// Status of Node
+class NodeStatus {
+friend class NodeImpl;
+public:
+ typedef std::map PeerStatusMap;
+
+ NodeStatus()
+ : state(STATE_END), readonly(false), term(0), committed_index(0), known_applied_index(0)
+ , pending_index(0), pending_queue_size(0), applying_index(0), first_index(0)
+ , last_index(-1), disk_index(0)
+ {}
+
+ State state;
+ PeerId peer_id;
+ PeerId leader_id;
+ bool readonly;
+ int64_t term;
+ int64_t committed_index;
+ int64_t known_applied_index;
+
+ // The start index of the logs waiting to be committed.
+ // If the value is 0, means no pending logs.
+ //
+ // WARNING: if this value is not 0, and keep the same in a long time,
+ // means something happend to prevent the node to commit logs in a
+ // large probability, and users should check carefully to find out
+ // the reasons.
+ int64_t pending_index;
+
+ // How many pending logs waiting to be committed.
+ //
+ // WARNING: too many pending logs, means the processing rate can't catup with
+ // the writing rate. Users can consider to slow down the writing rate to avoid
+ // exhaustion of resources.
+ int64_t pending_queue_size;
+
+ // The current applying index. If the value is 0, means no applying log.
+ //
+ // WARNING: if this value is not 0, and keep the same in a long time, means
+ // the apply thread hung, users should check if a deadlock happend, or some
+ // time-consuming operations is handling in place.
+ int64_t applying_index;
+
+ // The first log of the node, including the logs in memory and disk.
+ int64_t first_index;
+
+ // The last log of the node, including the logs in memory and disk.
+ int64_t last_index;
+
+ // The max log in disk.
+ int64_t disk_index;
+
+ // Stable followers are peers in current configuration.
+ // If the node is not leader, this map is empty.
+ PeerStatusMap stable_followers;
+
+ // Unstable followers are peers not in current configurations. For example,
+ // if a new peer is added and not catchup now, it's in this map.
+ PeerStatusMap unstable_followers;
+};
+
struct NodeOptions {
// A follower would become a candidate if it doesn't receive any message
// from the leader in |election_timeout_ms| milliseconds
@@ -390,6 +470,18 @@ struct NodeOptions {
// Default: false
bool node_owns_fsm;
+ // If |node_owns_log_storage| is true. |log_storage| would be destroyed when the backing
+ // Node is no longer referenced.
+ //
+ // Default: true
+ bool node_owns_log_storage;
+
+ // The specific LogStorage implemented at the bussiness layer, which should be a valid
+ // instance, otherwise use SegmentLogStorage by default.
+ //
+ // Default: null
+ LogStorage* log_storage;
+
// Run the user callbacks and user closures in pthread rather than bthread
//
// Default: false
@@ -432,6 +524,8 @@ inline NodeOptions::NodeOptions()
, catchup_margin(1000)
, fsm(NULL)
, node_owns_fsm(false)
+ , node_owns_log_storage(true)
+ , log_storage(NULL)
, usercode_in_pthread(false)
, filter_before_copy_remote(false)
, snapshot_file_system_adaptor(NULL)
@@ -509,6 +603,11 @@ class Node {
// when the snapshot finishes, describing the detailed result.
void snapshot(Closure* done);
+ // user trigger vote
+ // reset election_timeout, suggest some peer to become the leader in a
+ // higher probability
+ void vote(int election_timeout);
+
// reset the election_timeout for the very node
void reset_election_timeout_ms(int election_timeout_ms);
@@ -529,6 +628,37 @@ class Node {
// in code implementation.
butil::Status read_committed_user_log(const int64_t index, UserLog* user_log);
+ // Get the internal status of this node, the information is mostly the same as we
+ // see from the website.
+ void get_status(NodeStatus* status);
+
+ // Make this node enter readonly mode.
+ // Readonly mode should only be used to protect the system in some extreme cases.
+ // For exampe, in a storage system, too many write requests flood into the system
+ // unexpectly, and the system is in the danger of exhaust capacity. There's not enough
+ // time to add new machines, and wait for capacity balance. Once many disks become
+ // full, quorum dead happen to raft groups. One choice in this example is readonly
+ // mode, to let leader reject new write requests, but still handle reads request,
+ // and configuration changes.
+ // If a follower become readonly, the leader stop replicate new logs to it. This
+ // may cause the data far behind the leader, in the case that the leader is still
+ // writable. After the follower exit readonly mode, the leader will resume to
+ // replicate missing logs.
+ // A leader is readonly, if the node itself is readonly, or writable nodes (nodes that
+ // are not marked as readonly) in the group is less than majority. Once a leader become
+ // readonly, no new users logs will be acceptted.
+ void enter_readonly_mode();
+
+ // Node leave readonly node.
+ void leave_readonly_mode();
+
+ // Check if this node is readonly.
+ // There are two situations that if a node is readonly:
+ // - This node is marked as readonly, by calling enter_readonly_mode();
+ // - This node is a leader, and the count of writable nodes in the group
+ // is less than the majority.
+ bool readonly();
+
private:
NodeImpl* _impl;
};
diff --git a/src/braft/raft.proto b/src/braft/raft.proto
index 19294b9c..bc057082 100644
--- a/src/braft/raft.proto
+++ b/src/braft/raft.proto
@@ -45,6 +45,7 @@ message AppendEntriesResponse {
required int64 term = 1;
required bool success = 2;
optional int64 last_log_index = 3;
+ optional bool readonly = 4;
};
message SnapshotMeta {
diff --git a/src/braft/remote_file_copier.cpp b/src/braft/remote_file_copier.cpp
index 43825113..8ebeb26a 100644
--- a/src/braft/remote_file_copier.cpp
+++ b/src/braft/remote_file_copier.cpp
@@ -189,7 +189,8 @@ RemoteFileCopier::Session::Session()
RemoteFileCopier::Session::~Session() {
if (_file) {
- _file->destroy();
+ _file->close();
+ delete _file;
_file = NULL;
}
}
@@ -197,7 +198,7 @@ RemoteFileCopier::Session::~Session() {
void RemoteFileCopier::Session::send_next_rpc() {
_cntl.Reset();
_response.Clear();
- // Not clear request as we need some fields of the previouse RPC
+ // Not clear request as we need some fields of the previous RPC
off_t offset = _request.offset() + _request.count();
const size_t max_count =
(!_buf) ? FLAGS_raft_max_byte_count_per_rpc : UINT_MAX;
@@ -216,6 +217,7 @@ void RemoteFileCopier::Session::send_next_rpc() {
new_max_count = _throttle->throttled_by_throughput(max_count);
if (new_max_count == 0) {
// Reset count to make next rpc retry the previous one
+ BRAFT_VLOG << "Copy file throttled, path: " << _dest_path;
_request.set_count(0);
AddRef();
int64_t retry_interval_ms_when_throttled =
@@ -293,7 +295,8 @@ void RemoteFileCopier::Session::on_rpc_returned() {
}
_retry_times = 0;
// Reset count to |real_read_size| to make next rpc get the right offset
- if (_response.has_read_size() && (_response.read_size() != 0)) {
+ if (_response.has_read_size() && (_response.read_size() != 0)
+ && FLAGS_raft_allow_read_partly_when_install_snapshot) {
_request.set_count(_response.read_size());
}
if (_file) {
@@ -346,7 +349,10 @@ void RemoteFileCopier::Session::on_timer(void* arg) {
void RemoteFileCopier::Session::on_finished() {
if (!_finished) {
if (_file) {
- _file->destroy();
+ if (!_file->close()) {
+ _st.set_error(EIO, "%s", berror(EIO));
+ }
+ delete _file;
_file = NULL;
}
_finished = true;
diff --git a/src/braft/replicator.cpp b/src/braft/replicator.cpp
index c66b7e45..f63a3083 100644
--- a/src/braft/replicator.cpp
+++ b/src/braft/replicator.cpp
@@ -37,7 +37,8 @@ BRPC_VALIDATE_GFLAG(raft_max_entries_size, ::brpc::PositiveInteger);
DEFINE_int32(raft_max_parallel_append_entries_rpc_num, 1,
"The max number of parallel AppendEntries requests");
-BRPC_VALIDATE_GFLAG(raft_max_parallel_append_entries_rpc_num, ::brpc::PositiveInteger);
+BRPC_VALIDATE_GFLAG(raft_max_parallel_append_entries_rpc_num,
+ ::brpc::PositiveInteger);
DEFINE_int32(raft_max_body_size, 512 * 1024,
"The max byte size of AppendEntriesRequest");
@@ -45,10 +46,14 @@ BRPC_VALIDATE_GFLAG(raft_max_body_size, ::brpc::PositiveInteger);
DEFINE_int32(raft_retry_replicate_interval_ms, 1000,
"Interval of retry to append entries or install snapshot");
-BRPC_VALIDATE_GFLAG(raft_retry_replicate_interval_ms, brpc::PositiveInteger);
+BRPC_VALIDATE_GFLAG(raft_retry_replicate_interval_ms,
+ brpc::PositiveInteger);
static bvar::LatencyRecorder g_send_entries_latency("raft_send_entries");
-static bvar::LatencyRecorder g_normalized_send_entries_latency("raft_send_entries_normalized");
+static bvar::LatencyRecorder g_normalized_send_entries_latency(
+ "raft_send_entries_normalized");
+static bvar::CounterRecorder g_send_entries_batch_counter(
+ "raft_send_entries_batch_counter");
ReplicatorOptions::ReplicatorOptions()
: dynamic_heartbeat_timeout_ms(NULL)
@@ -70,6 +75,7 @@ Replicator::Replicator()
, _heartbeat_counter(0)
, _append_entries_counter(0)
, _install_snapshot_counter(0)
+ , _readonly_index(0)
, _wait_id(0)
, _is_waiter_canceled(false)
, _reader(NULL)
@@ -84,13 +90,7 @@ Replicator::Replicator()
Replicator::~Replicator() {
// bind lifecycle with node, Release
// Replicator stop is async
- if (_reader) {
- _options.snapshot_storage->close(_reader);
- _reader = NULL;
- if (_options.snapshot_throttle) {
- _options.snapshot_throttle->finish_one_task(true);
- }
- }
+ _close_reader();
if (_options.node) {
_options.node->Release();
_options.node = NULL;
@@ -142,6 +142,14 @@ int Replicator::start(const ReplicatorOptions& options, ReplicatorId *id) {
int Replicator::stop(ReplicatorId id) {
bthread_id_t dummy_id = { id };
+ Replicator* r = NULL;
+ // already stopped
+ if (bthread_id_lock(dummy_id, (void**)&r) != 0) {
+ return 0;
+ }
+ // to run _catchup_closure if it is not NULL
+ r->_notify_on_caught_up(EPERM, true);
+ CHECK_EQ(0, bthread_id_unlock(dummy_id)) << "Fail to unlock " << dummy_id;
return bthread_id_error(dummy_id, ESTOP);
}
@@ -226,15 +234,18 @@ void Replicator::_on_block_timedout(void *arg) {
}
void Replicator::_block(long start_time_us, int error_code) {
+ // mainly for pipeline case, to avoid too many block timer when this
+ // replicator is something wrong
+ if (_st.st == BLOCKING) {
+ CHECK_EQ(0, bthread_id_unlock(_id)) << "Fail to unlock " << _id;
+ return;
+ }
+
// TODO: Currently we don't care about error_code which indicates why the
// very RPC fails. To make it better there should be different timeout for
// each individual error (e.g. we don't need check every
// heartbeat_timeout_ms whether a dead follower has come back), but it's just
// fine now.
- if (_st.st == BLOCKING) {
- CHECK_EQ(0, bthread_id_unlock(_id)) << "Fail to unlock " << _id;
- return;
- }
int blocking_time = 0;
if (error_code == EBUSY || error_code == EINTR) {
blocking_time = FLAGS_raft_retry_replicate_interval_ms;
@@ -274,13 +285,15 @@ void Replicator::_on_heartbeat_returned(
return;
}
- BRAFT_VLOG << "node " << r->_options.group_id << ":" << r->_options.server_id
- << " received HeartbeatResponse from "
- << r->_options.peer_id << " prev_log_index " << request->prev_log_index()
- << " prev_log_term " << request->prev_log_term() << noflush;
+ std::stringstream ss;
+ ss << "node " << r->_options.group_id << ":" << r->_options.server_id
+ << " received HeartbeatResponse from "
+ << r->_options.peer_id << " prev_log_index " << request->prev_log_index()
+ << " prev_log_term " << request->prev_log_term();
if (cntl->Failed()) {
- BRAFT_VLOG << " fail, sleep.";
+ ss << " fail, sleep.";
+ BRAFT_VLOG << ss.str();
// TODO: Should it be VLOG?
LOG_IF(WARNING, (r->_consecutive_error_times++) % 10 == 0)
@@ -294,8 +307,9 @@ void Replicator::_on_heartbeat_returned(
}
r->_consecutive_error_times = 0;
if (response->term() > r->_options.term) {
- BRAFT_VLOG << " fail, greater term " << response->term()
- << " expect term " << r->_options.term;
+ ss << " fail, greater term " << response->term()
+ << " expect term " << r->_options.term;
+ BRAFT_VLOG << ss.str();
NodeImpl *node_impl = r->_options.node;
// Acquire a reference of Node here in case that Node is detroyed
@@ -312,12 +326,29 @@ void Replicator::_on_heartbeat_returned(
node_impl->Release();
return;
}
- BRAFT_VLOG << " success";
+
+ bool readonly = response->has_readonly() && response->readonly();
+ BRAFT_VLOG << ss.str() << " readonly " << readonly;
if (rpc_send_time > r->_last_rpc_send_timestamp) {
r->_last_rpc_send_timestamp = rpc_send_time;
}
r->_start_heartbeat_timer(start_time_us);
+ NodeImpl* node_impl = NULL;
+ // Check if readonly config changed
+ if ((readonly && r->_readonly_index == 0) ||
+ (!readonly && r->_readonly_index != 0)) {
+ node_impl = r->_options.node;
+ node_impl->AddRef();
+ }
+ if (!node_impl) {
+ CHECK_EQ(0, bthread_id_unlock(dummy_id)) << "Fail to unlock " << dummy_id;
+ return;
+ }
+ const PeerId peer_id = r->_options.peer_id;
+ int64_t term = r->_options.term;
CHECK_EQ(0, bthread_id_unlock(dummy_id)) << "Fail to unlock " << dummy_id;
+ node_impl->change_readonly_config(term, peer_id, readonly);
+ node_impl->Release();
return;
}
@@ -335,12 +366,11 @@ void Replicator::_on_rpc_returned(ReplicatorId id, brpc::Controller* cntl,
return;
}
- BRAFT_VLOG << "node " << r->_options.group_id << ":" << r->_options.server_id
- << " received AppendEntriesResponse from "
- << r->_options.peer_id << " prev_log_index " << request->prev_log_index()
- << " prev_log_term " << request->prev_log_term()
- << " count " << request->entries_size()
- << noflush;
+ std::stringstream ss;
+ ss << "node " << r->_options.group_id << ":" << r->_options.server_id
+ << " received AppendEntriesResponse from "
+ << r->_options.peer_id << " prev_log_index " << request->prev_log_index()
+ << " prev_log_term " << request->prev_log_term() << " count " << request->entries_size();
bool valid_rpc = false;
int64_t rpc_first_index = request->prev_log_index() + 1;
@@ -357,13 +387,15 @@ void Replicator::_on_rpc_returned(ReplicatorId id, brpc::Controller* cntl,
}
}
if (!valid_rpc) {
- BRAFT_VLOG << " ignore invalid rpc";
+ ss << " ignore invalid rpc";
+ BRAFT_VLOG << ss.str();
CHECK_EQ(0, bthread_id_unlock(r->_id)) << "Fail to unlock " << r->_id;
return;
}
if (cntl->Failed()) {
- BRAFT_VLOG << " fail, sleep.";
+ ss << " fail, sleep.";
+ BRAFT_VLOG << ss.str();
// TODO: Should it be VLOG?
LOG_IF(WARNING, (r->_consecutive_error_times++) % 10 == 0)
@@ -398,9 +430,10 @@ void Replicator::_on_rpc_returned(ReplicatorId id, brpc::Controller* cntl,
node_impl->Release();
return;
}
- BRAFT_VLOG << " fail, find next_index remote last_log_index " << response->last_log_index()
- << " local next_index " << r->_next_index
- << " rpc prev_log_index " << request->prev_log_index();
+ ss << " fail, find next_index remote last_log_index " << response->last_log_index()
+ << " local next_index " << r->_next_index
+ << " rpc prev_log_index " << request->prev_log_index();
+ BRAFT_VLOG << ss.str();
if (rpc_send_time > r->_last_rpc_send_timestamp) {
r->_last_rpc_send_timestamp = rpc_send_time;
}
@@ -431,7 +464,8 @@ void Replicator::_on_rpc_returned(ReplicatorId id, brpc::Controller* cntl,
return;
}
- BRAFT_VLOG << " success";
+ ss << " success";
+ BRAFT_VLOG << ss.str();
if (response->term() != r->_options.term) {
LOG(ERROR) << "Group " << r->_options.group_id
@@ -447,7 +481,7 @@ void Replicator::_on_rpc_returned(ReplicatorId id, brpc::Controller* cntl,
const int entries_size = request->entries_size();
const int64_t rpc_last_log_index = request->prev_log_index() + entries_size;
BRAFT_VLOG_IF(entries_size > 0) << "Group " << r->_options.group_id
- << " Replicated logs in ["
+ << " replicated logs in ["
<< min_flying_index << ", "
<< rpc_last_log_index
<< "] to peer " << r->_options.peer_id;
@@ -551,12 +585,22 @@ void Replicator::_send_empty_entries(bool is_heartbeat) {
int Replicator::_prepare_entry(int offset, EntryMeta* em, butil::IOBuf *data) {
if (data->length() >= (size_t)FLAGS_raft_max_body_size) {
- return -1;
+ return ERANGE;
}
- const size_t log_index = _next_index + offset;
+ const int64_t log_index = _next_index + offset;
LogEntry *entry = _options.log_manager->get_entry(log_index);
if (entry == NULL) {
- return -1;
+ return ENOENT;
+ }
+ // When leader become readonly, no new user logs can submit. On the other side,
+ // if any user log are accepted after this replicator become readonly, the leader
+ // still have enough followers to commit logs, we can safely stop waiting new logs
+ // until the replicator leave readonly mode.
+ if (_readonly_index != 0 && log_index >= _readonly_index) {
+ if (entry->type != ENTRY_TYPE_CONFIGURATION) {
+ return EREADONLY;
+ }
+ _readonly_index = log_index + 1;
}
em->set_term(entry->id.term);
em->set_type(entry->type);
@@ -600,9 +644,11 @@ void Replicator::_send_entries() {
}
EntryMeta em;
const int max_entries_size = FLAGS_raft_max_entries_size - _flying_append_entries_size;
+ int prepare_entry_rc = 0;
CHECK_GT(max_entries_size, 0);
for (int i = 0; i < max_entries_size; ++i) {
- if (_prepare_entry(i, &em, &cntl->request_attachment()) != 0) {
+ prepare_entry_rc = _prepare_entry(i, &em, &cntl->request_attachment());
+ if (prepare_entry_rc != 0) {
break;
}
request->add_entries()->Swap(&em);
@@ -613,6 +659,16 @@ void Replicator::_send_entries() {
_reset_next_index();
return _install_snapshot();
}
+ // NOTICE: a follower's readonly mode does not prevent install_snapshot
+ // as we need followers to commit conf log(like add_node) when
+ // leader reaches readonly as well
+ if (prepare_entry_rc == EREADONLY) {
+ if (_flying_append_entries_size == 0) {
+ _st.st = IDLE;
+ }
+ CHECK_EQ(0, bthread_id_unlock(_id)) << "Fail to unlock " << _id;
+ return;
+ }
return _wait_more_entries();
}
@@ -621,6 +677,8 @@ void Replicator::_send_entries() {
_append_entries_counter++;
_next_index += request->entries_size();
_flying_append_entries_size += request->entries_size();
+
+ g_send_entries_batch_counter << request->entries_size();
BRAFT_VLOG << "node " << _options.group_id << ":" << _options.server_id
<< " send AppendEntriesRequest to " << _options.peer_id << " term " << _options.term
@@ -647,11 +705,18 @@ int Replicator::_continue_sending(void* arg, int error_code) {
return -1;
}
if (error_code == ETIMEDOUT) {
+ // Replication is in progress when block timedout, no need to start again
+ // this case can happen when
+ // 1. pipeline is enabled and
+ // 2. disable readonly mode triggers another replication
+ if (r->_wait_id != 0) {
+ return 0;
+ }
+
// Send empty entries after block timeout to check the correct
// _next_index otherwise the replictor is likely waits in
// _wait_more_entries and no further logs would be replicated even if the
// last_index of this followers is less than |next_index - 1|
- CHECK_EQ(r->_wait_id, 0);
r->_send_empty_entries(false);
} else if (error_code != ESTOP && !r->_is_waiter_canceled) {
// id is unlock in _send_entries
@@ -680,7 +745,7 @@ void Replicator::_wait_more_entries() {
_next_index - 1, _continue_sending, (void*)_id.value);
_is_waiter_canceled = false;
BRAFT_VLOG << "node " << _options.group_id << ":" << _options.peer_id
- << " wait more entries";
+ << " wait more entries, wait_id " << _wait_id;
}
if (_flying_append_entries_size == 0) {
_st.st = IDLE;
@@ -689,11 +754,26 @@ void Replicator::_wait_more_entries() {
}
void Replicator::_install_snapshot() {
- CHECK(!_reader);
+ if (_reader) {
+ // follower's readonly mode change may cause two install_snapshot
+ // one possible case is:
+ // enable -> install_snapshot -> disable -> wait_more_entries ->
+ // install_snapshot again
+ LOG(WARNING) << "node " << _options.group_id << ":" << _options.server_id
+ << " refuse to send InstallSnapshotRequest to " << _options.peer_id
+ << " because there is an running one";
+ CHECK_EQ(0, bthread_id_unlock(_id)) << "Fail to unlock " << _id;
+ return;
+ }
- if (_options.snapshot_throttle && !_options.snapshot_throttle->add_one_more_task(true)) {
+ if (_options.snapshot_throttle && !_options.snapshot_throttle->
+ add_one_more_task(true)) {
return _block(butil::gettimeofday_us(), EBUSY);
}
+
+ // pre-set replictor state to INSTALLING_SNAPSHOT, so replicator could be
+ // blocked if something is wrong, such as throttled for a period of time
+ _st.st = INSTALLING_SNAPSHOT;
_reader = _options.snapshot_storage->open();
if (!_reader) {
@@ -711,6 +791,16 @@ void Replicator::_install_snapshot() {
return;
}
std::string uri = _reader->generate_uri_for_copy();
+ // NOTICE: If uri is something wrong, retry later instead of reporting error
+ // immediately(making raft Node error), as FileSystemAdaptor layer of _reader is
+ // user defined and may need some control logic when opened
+ if (uri.empty()) {
+ LOG(WARNING) << "node " << _options.group_id << ":" << _options.server_id
+ << " refuse to send InstallSnapshotRequest to " << _options.peer_id
+ << " because snapshot uri is empty";
+ _close_reader();
+ return _block(butil::gettimeofday_us(), EBUSY);
+ }
SnapshotMeta meta;
// report error on failure
if (_reader->load_meta(&meta) != 0) {
@@ -744,7 +834,6 @@ void Replicator::_install_snapshot() {
_install_snapshot_in_fly = cntl->call_id();
_install_snapshot_counter++;
- _st.st = INSTALLING_SNAPSHOT;
_st.last_log_included = meta.last_included_index();
_st.last_term_included = meta.last_included_term();
google::protobuf::Closure* done = brpc::NewCallback<
@@ -777,14 +866,15 @@ void Replicator::_on_install_snapshot_returned(
r->_options.snapshot_throttle->finish_one_task(true);
}
}
- LOG(INFO) << "received InstallSnapshotResponse from "
- << r->_options.group_id << ":" << r->_options.peer_id
- << " last_included_index " << request->meta().last_included_index()
- << " last_included_term " << request->meta().last_included_term()
- << noflush;
+ std::stringstream ss;
+ ss << "received InstallSnapshotResponse from "
+ << r->_options.group_id << ":" << r->_options.peer_id
+ << " last_included_index " << request->meta().last_included_index()
+ << " last_included_term " << request->meta().last_included_term();
do {
if (cntl->Failed()) {
- LOG(INFO) << " error: " << cntl->ErrorText();
+ ss << " error: " << cntl->ErrorText();
+ LOG(INFO) << ss.str();
LOG_IF(WARNING, (r->_consecutive_error_times++) % 10 == 0)
<< "Group " << r->_options.group_id
@@ -796,13 +886,15 @@ void Replicator::_on_install_snapshot_returned(
}
if (!response->success()) {
succ = false;
- LOG(INFO) << " fail.";
+ ss << " fail.";
+ LOG(INFO) << ss.str();
// Let hearbeat do step down
break;
}
// Success
r->_next_index = request->meta().last_included_index() + 1;
- LOG(INFO) << " success.";
+ ss << " success.";
+ LOG(INFO) << ss.str();
} while (0);
// We don't retry installing the snapshot explicitly.
@@ -824,9 +916,9 @@ void Replicator::_notify_on_caught_up(int error_code, bool before_destroy) {
return;
}
if (error_code != ETIMEDOUT && error_code != EPERM) {
- if (!_is_catchup(_catchup_closure->_max_margin)) {
+ if (!_is_catchup(_catchup_closure->_max_margin)) {
return;
- }
+ }
if (_catchup_closure->_error_was_set) {
return;
}
@@ -841,7 +933,7 @@ void Replicator::_notify_on_caught_up(int error_code, bool before_destroy) {
return;
}
}
- } else { // Timed out
+ } else { // Timed out or leader step_down
if (!_catchup_closure->_error_was_set) {
_catchup_closure->status().set_error(error_code, "%s", berror(error_code));
}
@@ -917,6 +1009,7 @@ void Replicator::_on_catch_up_timedout(void* arg) {
bthread_id_t id = { (uint64_t)arg };
Replicator* r = NULL;
if (bthread_id_lock(id, (void**)&r) != 0) {
+ LOG(WARNING) << "Replicator is destroyed when catch_up_timedout.";
return;
}
r->_notify_on_caught_up(ETIMEDOUT, false);
@@ -1023,12 +1116,14 @@ void Replicator::_on_timeout_now_returned(
return;
}
- BRAFT_VLOG << "node " << r->_options.group_id << ":" << r->_options.server_id
- << " received TimeoutNowResponse from "
- << r->_options.peer_id;
+ std::stringstream ss;
+ ss << "node " << r->_options.group_id << ":" << r->_options.server_id
+ << " received TimeoutNowResponse from "
+ << r->_options.peer_id;
if (cntl->Failed()) {
- BRAFT_VLOG << " fail : " << cntl->ErrorText();
+ ss << " fail : " << cntl->ErrorText();
+ BRAFT_VLOG << ss.str();
if (stop_after_finish) {
r->_notify_on_caught_up(ESTOP, true);
@@ -1038,7 +1133,8 @@ void Replicator::_on_timeout_now_returned(
}
return;
}
- BRAFT_VLOG << (response->success() ? " success " : "fail:");
+ ss << (response->success() ? " success " : "fail:");
+ BRAFT_VLOG << ss.str();
if (response->term() > r->_options.term) {
NodeImpl *node_impl = r->_options.node;
@@ -1087,6 +1183,52 @@ int64_t Replicator::get_next_index(ReplicatorId id) {
return next_index;
}
+int Replicator::change_readonly_config(ReplicatorId id, bool readonly) {
+ Replicator *r = NULL;
+ bthread_id_t dummy_id = { id };
+ if (bthread_id_lock(dummy_id, (void**)&r) != 0) {
+ return 0;
+ }
+ return r->_change_readonly_config(readonly);
+}
+
+int Replicator::_change_readonly_config(bool readonly) {
+ if ((readonly && _readonly_index != 0) ||
+ (!readonly && _readonly_index == 0)) {
+ // Check if readonly already set
+ BRAFT_VLOG << "node " << _options.group_id << ":" << _options.server_id
+ << " ignore change readonly config of " << _options.peer_id
+ << " to " << readonly << ", readonly_index " << _readonly_index;
+ CHECK_EQ(0, bthread_id_unlock(_id)) << "Fail to unlock " << _id;
+ return 0;
+ }
+ if (readonly) {
+ // Keep a readonly index here to make sure the pending logs can be committed.
+ _readonly_index = _options.log_manager->last_log_index() + 1;
+ LOG(INFO) << "node " << _options.group_id << ":" << _options.server_id
+ << " enable readonly for " << _options.peer_id
+ << ", readonly_index " << _readonly_index;
+ CHECK_EQ(0, bthread_id_unlock(_id)) << "Fail to unlock " << _id;
+ } else {
+ _readonly_index = 0;
+ LOG(INFO) << "node " << _options.group_id << ":" << _options.server_id
+ << " disable readonly for " << _options.peer_id;
+ _wait_more_entries();
+ }
+ return 0;
+}
+
+bool Replicator::readonly(ReplicatorId id) {
+ Replicator *r = NULL;
+ bthread_id_t dummy_id = { id };
+ if (bthread_id_lock(dummy_id, (void**)&r) != 0) {
+ return 0;
+ }
+ bool readonly = (r->_readonly_index != 0);
+ CHECK_EQ(0, bthread_id_unlock(dummy_id)) << "Fail to unlock " << dummy_id;
+ return readonly;
+}
+
void Replicator::_destroy() {
bthread_id_t saved_id = _id;
CHECK_EQ(0, bthread_id_unlock_and_destroy(saved_id));
@@ -1105,12 +1247,16 @@ void Replicator::_describe(std::ostream& os, bool use_html) {
const int64_t heartbeat_counter = _heartbeat_counter;
const int64_t append_entries_counter = _append_entries_counter;
const int64_t install_snapshot_counter = _install_snapshot_counter;
+ const int64_t readonly_index = _readonly_index;
CHECK_EQ(0, bthread_id_unlock(_id));
// Don't touch *this ever after
const char* new_line = use_html ? "
" : "\r\n";
os << "replicator_" << id << '@' << peer_id << ':';
os << " next_index=" << next_index << ' ';
os << " flying_append_entries_size=" << flying_append_entries_size << ' ';
+ if (readonly_index != 0) {
+ os << " readonly_index=" << readonly_index << ' ';
+ }
switch (st.st) {
case IDLE:
os << "idle";
@@ -1129,6 +1275,17 @@ void Replicator::_describe(std::ostream& os, bool use_html) {
os << " hc=" << heartbeat_counter << " ac=" << append_entries_counter << " ic=" << install_snapshot_counter << new_line;
}
+void Replicator::_get_status(PeerStatus* status) {
+ status->valid = true;
+ status->installing_snapshot = (_st.st == INSTALLING_SNAPSHOT);
+ status->next_index = _next_index;
+ status->flying_append_entries_size = _flying_append_entries_size;
+ status->last_rpc_send_timestamp = _last_rpc_send_timestamp;
+ status->consecutive_error_times = _consecutive_error_times;
+ status->readonly_index = _readonly_index;
+ CHECK_EQ(0, bthread_id_unlock(_id));
+}
+
void Replicator::describe(ReplicatorId id, std::ostream& os, bool use_html) {
bthread_id_t dummy_id = { id };
Replicator* r = NULL;
@@ -1139,6 +1296,28 @@ void Replicator::describe(ReplicatorId id, std::ostream& os, bool use_html) {
return r->_describe(os, use_html);
}
+void Replicator::get_status(ReplicatorId id, PeerStatus* status) {
+ if (!status) {
+ return;
+ }
+ bthread_id_t dummy_id = { id };
+ Replicator* r = NULL;
+ if (bthread_id_lock(dummy_id, (void**)&r) != 0) {
+ return;
+ }
+ return r->_get_status(status);
+}
+
+void Replicator::_close_reader() {
+ if (_reader) {
+ _options.snapshot_storage->close(_reader);
+ _reader = NULL;
+ if (_options.snapshot_throttle) {
+ _options.snapshot_throttle->finish_one_task(true);
+ }
+ }
+}
+
// ==================== ReplicatorGroup ==========================
ReplicatorGroupOptions::ReplicatorGroupOptions()
@@ -1336,4 +1515,32 @@ void ReplicatorGroup::list_replicators(std::vector* out) const {
}
}
+void ReplicatorGroup::list_replicators(
+ std::vector >* out) const {
+ out->clear();
+ out->reserve(_rmap.size());
+ for (std::map::const_iterator
+ iter = _rmap.begin(); iter != _rmap.end(); ++iter) {
+ out->push_back(*iter);
+ }
+}
+
+int ReplicatorGroup::change_readonly_config(const PeerId& peer, bool readonly) {
+ std::map::const_iterator iter = _rmap.find(peer);
+ if (iter == _rmap.end()) {
+ return -1;
+ }
+ ReplicatorId rid = iter->second;
+ return Replicator::change_readonly_config(rid, readonly);
+}
+
+bool ReplicatorGroup::readonly(const PeerId& peer) const {
+ std::map::const_iterator iter = _rmap.find(peer);
+ if (iter == _rmap.end()) {
+ return false;
+ }
+ ReplicatorId rid = iter->second;
+ return Replicator::readonly(rid);
+}
+
} // namespace braft
diff --git a/src/braft/replicator.h b/src/braft/replicator.h
index 0a8ea5ed..9bdf380e 100644
--- a/src/braft/replicator.h
+++ b/src/braft/replicator.h
@@ -106,6 +106,16 @@ class BAIDU_CACHELINE_ALIGNMENT Replicator {
static int64_t get_next_index(ReplicatorId id);
static void describe(ReplicatorId id, std::ostream& os, bool use_html);
+
+ // Get replicator internal status.
+ static void get_status(ReplicatorId id, PeerStatus* status);
+
+ // Change the readonly config.
+ // Return 0 if success, the error code otherwise.
+ static int change_readonly_config(ReplicatorId id, bool readonly);
+
+ // Check if a replicator is readonly
+ static bool readonly(ReplicatorId id);
private:
enum St {
@@ -147,6 +157,7 @@ class BAIDU_CACHELINE_ALIGNMENT Replicator {
int64_t _min_flying_index() {
return _next_index - _flying_append_entries_size;
}
+ int _change_readonly_config(bool readonly);
static void _on_rpc_returned(
ReplicatorId id, brpc::Controller* cntl,
@@ -181,6 +192,7 @@ class BAIDU_CACHELINE_ALIGNMENT Replicator {
InstallSnapshotResponse* response);
void _destroy();
void _describe(std::ostream& os, bool use_html);
+ void _get_status(PeerStatus* status);
bool _is_catchup(int64_t max_margin) {
// We should wait until install snapshot finish. If the process is throttled,
// it maybe very slow.
@@ -193,6 +205,7 @@ class BAIDU_CACHELINE_ALIGNMENT Replicator {
}
return true;
}
+ void _close_reader();
private:
struct FlyingAppendEntriesRpc {
@@ -214,6 +227,7 @@ class BAIDU_CACHELINE_ALIGNMENT Replicator {
int64_t _heartbeat_counter;
int64_t _append_entries_counter;
int64_t _install_snapshot_counter;
+ int64_t _readonly_index;
Stat _st;
std::deque _append_entries_in_fly;
brpc::CallId _install_snapshot_in_fly;
@@ -317,6 +331,15 @@ class ReplicatorGroup {
// List all the existing replicators
void list_replicators(std::vector* out) const;
+ // List all the existing replicators with PeerId
+ void list_replicators(std::vector >* out) const;
+
+ // Change the readonly config for a peer
+ int change_readonly_config(const PeerId& peer, bool readonly);
+
+ // Check if a replicator is in readonly
+ bool readonly(const PeerId& peer) const;
+
private:
int _add_replicator(const PeerId& peer, ReplicatorId *rid);
diff --git a/src/braft/snapshot.cpp b/src/braft/snapshot.cpp
index 443a7d66..e8a15035 100644
--- a/src/braft/snapshot.cpp
+++ b/src/braft/snapshot.cpp
@@ -194,6 +194,35 @@ int LocalSnapshotWriter::init() {
set_error(EIO, "Fail to load metatable from %s", meta_path.c_str());
return EIO;
}
+
+ // remove file if meta_path not exist or it's not in _meta_table
+ // to avoid dirty data
+ {
+ std::vector to_remove;
+ DirReader* dir_reader = _fs->directory_reader(_path);
+ if (!dir_reader->is_valid()) {
+ LOG(WARNING) << "directory reader failed, maybe NOEXIST or PERMISSION,"
+ " path: " << _path;
+ delete dir_reader;
+ return EIO;
+ }
+ while (dir_reader->next()) {
+ std::string filename = dir_reader->name();
+ if (filename != BRAFT_SNAPSHOT_META_FILE) {
+ if (get_file_meta(filename, NULL) != 0) {
+ to_remove.push_back(filename);
+ }
+ }
+ }
+ delete dir_reader;
+ for (size_t i = 0; i < to_remove.size(); ++i) {
+ std::string file_path = _path + "/" + to_remove[i];
+ _fs->delete_file(file_path, false);
+ LOG(WARNING) << "Snapshot file exist but meta not found so delete it,"
+ " path: " << file_path;
+ }
+ }
+
return 0;
}
@@ -350,6 +379,7 @@ class SnapshotFileReader : public LocalDirReader {
// if it's not allowed to read partly or it's allowed but
// throughput is throttled to 0, try again.
if (!read_partly || new_max_count == 0) {
+ BRAFT_VLOG << "Read file throttled, path: " << path();
ret = EAGAIN;
}
}
@@ -383,11 +413,10 @@ std::string LocalSnapshotReader::generate_uri_for_copy() {
scoped_refptr reader(
new SnapshotFileReader(_fs.get(), _path, _snapshot_throttle.get()));
reader->set_meta_table(_meta_table);
-
- if (!reader->open()) {
- LOG(ERROR) << "Open snapshot=" << _path << " failed";
+ if (!reader->open()) {
+ LOG(ERROR) << "Open snapshot=" << _path << " failed";
return std::string();
- }
+ }
if (file_service_add(reader.get(), &_reader_id) != 0) {
LOG(ERROR) << "Fail to add reader to file_service, path: " << _path;
return std::string();
@@ -535,6 +564,7 @@ SnapshotWriter* LocalSnapshotStorage::create(bool from_empty) {
writer = NULL;
break;
}
+ BRAFT_VLOG << "Create writer success, path: " << snapshot_path;
} while (0);
return writer;
@@ -730,10 +760,17 @@ void LocalSnapshotCopier::copy() {
}
} while (0);
if (!ok() && _writer && _writer->ok()) {
- _writer->set_error(error_code(), error_data());
+ LOG(WARNING) << "Fail to copy, error_code " << error_code()
+ << " error_msg " << error_cstr()
+ << " writer path " << _writer->get_path();
+ _writer->set_error(error_code(), error_cstr());
}
if (_writer) {
- _storage->close(_writer, _filter_before_copy_remote);
+ // set_error for copier only when failed to close writer and copier was
+ // ok before this moment
+ if (_storage->close(_writer, _filter_before_copy_remote) != 0 && ok()) {
+ set_error(EIO, "Fail to close writer");
+ }
_writer = NULL;
}
if (ok()) {
@@ -759,7 +796,7 @@ void LocalSnapshotCopier::load_meta_table() {
lck.unlock();
if (!session->status().ok()) {
LOG(WARNING) << "Fail to copy meta file : " << session->status();
- set_error(session->status().error_code(), session->status().error_data());
+ set_error(session->status().error_code(), session->status().error_cstr());
return;
}
if (_remote_snapshot._meta_table.load_from_iobuf_as_remote(meta_buf) != 0) {
@@ -868,8 +905,8 @@ void LocalSnapshotCopier::filter() {
if (_filter_before_copy_remote) {
SnapshotReader* reader = _storage->open();
if (filter_before_copy(_writer, reader) != 0) {
- LOG(WARNING) << "Fail to filter writer before copying, path: "
- << _writer->get_path()
+ LOG(WARNING) << "Fail to filter writer before copying"
+ ", path: " << _writer->get_path()
<< ", destroy and create a new writer";
_writer->set_error(-1, "Fail to filter");
_storage->close(_writer, false);
@@ -938,7 +975,7 @@ void LocalSnapshotCopier::copy_file(const std::string& filename) {
_cur_session = NULL;
lck.unlock();
if (!session->status().ok()) {
- set_error(session->status().error_code(), session->status().error_data());
+ set_error(session->status().error_code(), session->status().error_cstr());
return;
}
if (_writer->add_file(filename, &meta) != 0) {
diff --git a/src/braft/snapshot_executor.cpp b/src/braft/snapshot_executor.cpp
index d1fe2bb1..90ac450e 100644
--- a/src/braft/snapshot_executor.cpp
+++ b/src/braft/snapshot_executor.cpp
@@ -108,6 +108,8 @@ SnapshotExecutor::~SnapshotExecutor() {
void SnapshotExecutor::do_snapshot(Closure* done) {
std::unique_lock lck(_mutex);
+ int64_t saved_last_snapshot_index = _last_snapshot_index;
+ int64_t saved_last_snapshot_term = _last_snapshot_term;
if (_stopped) {
lck.unlock();
if (done) {
@@ -140,6 +142,11 @@ void SnapshotExecutor::do_snapshot(Closure* done) {
// updated. But it's fine since we will do next snapshot saving in a
// predictable time.
lck.unlock();
+ LOG_IF(INFO, _node != NULL) << "node " << _node->node_id()
+ << " has no applied logs since last snapshot, "
+ << " last_snapshot_index " << saved_last_snapshot_index
+ << " last_snapshot_term " << saved_last_snapshot_term
+ << ", will clear bufferred log and return success";
_log_manager->clear_bufferred_logs();
if (done) {
run_closure_in_bthread(done, _usercode_in_pthread);
@@ -599,7 +606,7 @@ void SnapshotExecutor::report_error(int error_code, const char* fmt, ...) {
va_start(ap, fmt);
Error e;
e.set_type(ERROR_TYPE_SNAPSHOT);
- e.status().set_error(error_code, fmt, ap);
+ e.status().set_errorv(error_code, fmt, ap);
va_end(ap);
_fsm_caller->on_error(e);
}
diff --git a/src/braft/storage.cpp b/src/braft/storage.cpp
index 4a7791ed..7f9f3dea 100644
--- a/src/braft/storage.cpp
+++ b/src/braft/storage.cpp
@@ -109,7 +109,7 @@ RaftMetaStorage* RaftMetaStorage::create(const std::string& uri) {
const RaftMetaStorage* type = meta_storage_extension()->Find(
protocol.as_string().c_str());
if (type == NULL) {
- LOG(ERROR) << "Fail to find stable storage type " << protocol
+ LOG(ERROR) << "Fail to find meta storage type " << protocol
<< ", uri=" << uri;
return NULL;
}
diff --git a/src/braft/util.cpp b/src/braft/util.cpp
index 7fed4873..053d7384 100644
--- a/src/braft/util.cpp
+++ b/src/braft/util.cpp
@@ -16,14 +16,236 @@
// Wang,Yao(wangyao02@baidu.com)
#include "braft/util.h"
-
+#include
#include
#include
#include // butil::RawPacker
#include
-
#include "braft/raft.h"
+namespace bvar {
+
+// Reloading following gflags does not change names of the corresponding bvars.
+// Avoid reloading in practice.
+DEFINE_int32(bvar_counter_p1, 80, "First counter percentile");
+DEFINE_int32(bvar_counter_p2, 90, "Second counter percentile");
+DEFINE_int32(bvar_counter_p3, 99, "Third counter percentile");
+
+static bool valid_percentile(const char*, int32_t v) {
+ return v > 0 && v < 100;
+}
+
+const bool ALLOW_UNUSED dummy_bvar_counter_p1 = ::google::RegisterFlagValidator(
+ &FLAGS_bvar_counter_p1, valid_percentile);
+const bool ALLOW_UNUSED dummy_bvar_counter_p2 = ::google::RegisterFlagValidator(
+ &FLAGS_bvar_counter_p2, valid_percentile);
+const bool ALLOW_UNUSED dummy_bvar_counter_p3 = ::google::RegisterFlagValidator(
+ &FLAGS_bvar_counter_p3, valid_percentile);
+
+namespace detail {
+
+typedef PercentileSamples<1022> CombinedPercentileSamples;
+
+static int64_t get_window_recorder_qps(void* arg) {
+ detail::Sample s;
+ static_cast(arg)->get_span(1, &s);
+ // Use floating point to avoid overflow.
+ if (s.time_us <= 0) {
+ return 0;
+ }
+ return static_cast(round(s.data.num * 1000000.0 / s.time_us));
+}
+
+static int64_t get_recorder_count(void* arg) {
+ return static_cast(arg)->get_value().num;
+}
+
+// Caller is responsible for deleting the return value.
+static CombinedPercentileSamples* combine(PercentileWindow* w) {
+ CombinedPercentileSamples* cb = new CombinedPercentileSamples;
+ std::vector buckets;
+ w->get_samples(&buckets);
+ cb->combine_of(buckets.begin(), buckets.end());
+ return cb;
+}
+
+template
+static int64_t get_counter_percetile(void* arg) {
+ return ((CounterRecorder*)arg)->counter_percentile(
+ (double)numerator / double(denominator));
+}
+
+static int64_t get_p1_counter(void* arg) {
+ CounterRecorder* cr = static_cast(arg);
+ return cr->counter_percentile(FLAGS_bvar_counter_p1 / 100.0);
+}
+static int64_t get_p2_counter(void* arg) {
+ CounterRecorder* cr = static_cast(arg);
+ return cr->counter_percentile(FLAGS_bvar_counter_p2 / 100.0);
+}
+static int64_t get_p3_counter(void* arg) {
+ CounterRecorder* cr = static_cast(arg);
+ return cr->counter_percentile(FLAGS_bvar_counter_p3 / 100.0);
+}
+
+static Vector get_counters(void *arg) {
+ std::unique_ptr cb(
+ combine((PercentileWindow*)arg));
+ // NOTE: We don't show 99.99% since it's often significantly larger than
+ // other values and make other curves on the plotted graph small and
+ // hard to read.ggggnnn
+ Vector result;
+ result[0] = cb->get_number(FLAGS_bvar_counter_p1 / 100.0);
+ result[1] = cb->get_number(FLAGS_bvar_counter_p2 / 100.0);
+ result[2] = cb->get_number(FLAGS_bvar_counter_p3 / 100.0);
+ result[3] = cb->get_number(0.999);
+ return result;
+}
+
+CounterRecorderBase::CounterRecorderBase(time_t window_size)
+ : _max_counter()
+ , _avg_counter_window(&_avg_counter, window_size)
+ , _max_counter_window(&_max_counter, window_size)
+ , _counter_percentile_window(&_counter_percentile, window_size)
+ , _total_times(get_recorder_count, &_avg_counter)
+ , _qps(get_window_recorder_qps, &_avg_counter_window)
+ , _counter_p1(get_p1_counter, this)
+ , _counter_p2(get_p2_counter, this)
+ , _counter_p3(get_p3_counter, this)
+ , _counter_999(get_counter_percetile<999, 1000>, this)
+ , _counter_9999(get_counter_percetile<9999, 10000>, this)
+ , _counter_cdf(&_counter_percentile_window)
+ , _counter_percentiles(get_counters, &_counter_percentile_window)
+{}
+
+} // namespace detail
+
+// CounterRecorder
+Vector CounterRecorder::counter_percentiles() const {
+ // const_cast here is just to adapt parameter type and safe.
+ return detail::get_counters(
+ const_cast(&_counter_percentile_window));
+}
+
+int64_t CounterRecorder::qps(time_t window_size) const {
+ detail::Sample s;
+ _avg_counter_window.get_span(window_size, &s);
+ // Use floating point to avoid overflow.
+ if (s.time_us <= 0) {
+ return 0;
+ }
+ return static_cast(round(s.data.num * 1000000.0 / s.time_us));
+}
+
+int CounterRecorder::expose(const butil::StringPiece& prefix1,
+ const butil::StringPiece& prefix2) {
+ if (prefix2.empty()) {
+ LOG(ERROR) << "Parameter[prefix2] is empty";
+ return -1;
+ }
+ butil::StringPiece prefix = prefix2;
+ // User may add "_counter" as the suffix, remove it.
+ if (prefix.ends_with("counter") || prefix.ends_with("Counter")) {
+ prefix.remove_suffix(7);
+ if (prefix.empty()) {
+ LOG(ERROR) << "Invalid prefix2=" << prefix2;
+ return -1;
+ }
+ }
+ std::string tmp;
+ if (!prefix1.empty()) {
+ tmp.reserve(prefix1.size() + prefix.size() + 1);
+ tmp.append(prefix1.data(), prefix1.size());
+ tmp.push_back('_'); // prefix1 ending with _ is good.
+ tmp.append(prefix.data(), prefix.size());
+ prefix = tmp;
+ }
+
+ // set debug names for printing helpful error log.
+ _avg_counter.set_debug_name(prefix);
+ _counter_percentile.set_debug_name(prefix);
+
+ if (_avg_counter_window.expose_as(prefix, "avg_counter") != 0) {
+ return -1;
+ }
+ if (_max_counter_window.expose_as(prefix, "max_counter") != 0) {
+ return -1;
+ }
+ if (_total_times.expose_as(prefix, "total_times") != 0) {
+ return -1;
+ }
+ if (_qps.expose_as(prefix, "qps") != 0) {
+ return -1;
+ }
+ char namebuf[32];
+ snprintf(namebuf, sizeof(namebuf), "counter_%d", (int)FLAGS_bvar_counter_p1);
+ if (_counter_p1.expose_as(prefix, namebuf, DISPLAY_ON_PLAIN_TEXT) != 0) {
+ return -1;
+ }
+ snprintf(namebuf, sizeof(namebuf), "counter_%d", (int)FLAGS_bvar_counter_p2);
+ if (_counter_p2.expose_as(prefix, namebuf, DISPLAY_ON_PLAIN_TEXT) != 0) {
+ return -1;
+ }
+ snprintf(namebuf, sizeof(namebuf), "counter_%u", (int)FLAGS_bvar_counter_p3);
+ if (_counter_p3.expose_as(prefix, namebuf, DISPLAY_ON_PLAIN_TEXT) != 0) {
+ return -1;
+ }
+ if (_counter_999.expose_as(prefix, "counter_999", DISPLAY_ON_PLAIN_TEXT) != 0) {
+ return -1;
+ }
+ if (_counter_9999.expose_as(prefix, "counter_9999") != 0) {
+ return -1;
+ }
+ if (_counter_cdf.expose_as(prefix, "counter_cdf", DISPLAY_ON_HTML) != 0) {
+ return -1;
+ }
+ if (_counter_percentiles.expose_as(prefix, "counter_percentiles", DISPLAY_ON_HTML) != 0) {
+ return -1;
+ }
+ snprintf(namebuf, sizeof(namebuf), "%d%%,%d%%,%d%%,99.9%%",
+ (int)FLAGS_bvar_counter_p1, (int)FLAGS_bvar_counter_p2,
+ (int)FLAGS_bvar_counter_p3);
+ CHECK_EQ(0, _counter_percentiles.set_vector_names(namebuf));
+ return 0;
+}
+
+int64_t CounterRecorder::counter_percentile(double ratio) const {
+ std::unique_ptr cb(
+ combine((detail::PercentileWindow*)&_counter_percentile_window));
+ return cb->get_number(ratio);
+}
+
+void CounterRecorder::hide() {
+ _avg_counter_window.hide();
+ _max_counter_window.hide();
+ _total_times.hide();
+ _qps.hide();
+ _counter_p1.hide();
+ _counter_p2.hide();
+ _counter_p3.hide();
+ _counter_999.hide();
+ _counter_9999.hide();
+ _counter_cdf.hide();
+ _counter_percentiles.hide();
+}
+
+CounterRecorder& CounterRecorder::operator<<(int64_t count_num) {
+ _avg_counter << count_num;
+ _max_counter << count_num;
+ _counter_percentile << count_num;
+ return *this;
+}
+
+std::ostream& operator<<(std::ostream& os, const CounterRecorder& rec) {
+ return os << "{avg=" << rec.avg_counter()
+ << " max" << rec.window_size() << '=' << rec.max_counter()
+ << " qps=" << rec.qps()
+ << " count=" << rec.total_times() << '}';
+}
+
+} // namespace bvar
+
+
namespace braft {
static void* run_closure(void* arg) {
diff --git a/src/braft/util.h b/src/braft/util.h
index 50359845..50bec34b 100644
--- a/src/braft/util.h
+++ b/src/braft/util.h
@@ -40,9 +40,135 @@
#include
#include
#include
+#include
#include "braft/macros.h"
#include "braft/raft.h"
+namespace bvar {
+namespace detail {
+
+class Percentile;
+
+typedef Window RecorderWindow;
+typedef Window, SERIES_IN_SECOND> MaxUint64Window;
+typedef Window PercentileWindow;
+
+// For mimic constructor inheritance.
+class CounterRecorderBase {
+public:
+ explicit CounterRecorderBase(time_t window_size);
+ time_t window_size() const { return _avg_counter_window.window_size(); }
+protected:
+ IntRecorder _avg_counter;
+ Maxer _max_counter;
+ Percentile _counter_percentile;
+ RecorderWindow _avg_counter_window;
+ MaxUint64Window _max_counter_window;
+ PercentileWindow _counter_percentile_window;
+
+ PassiveStatus _total_times;
+ PassiveStatus _qps;
+ PassiveStatus _counter_p1;
+ PassiveStatus _counter_p2;
+ PassiveStatus _counter_p3;
+ PassiveStatus _counter_999; // 99.9%
+ PassiveStatus _counter_9999; // 99.99%
+ CDF _counter_cdf;
+ PassiveStatus > _counter_percentiles;
+};
+} // namespace detail
+
+// Specialized structure to record counter.
+// It's not a Variable, but it contains multiple bvar inside.
+class CounterRecorder : public detail::CounterRecorderBase {
+ typedef detail::CounterRecorderBase Base;
+public:
+ CounterRecorder() : Base(-1) {}
+ explicit CounterRecorder(time_t window_size) : Base(window_size) {}
+ explicit CounterRecorder(const butil::StringPiece& prefix) : Base(-1) {
+ expose(prefix);
+ }
+ CounterRecorder(const butil::StringPiece& prefix,
+ time_t window_size) : Base(window_size) {
+ expose(prefix);
+ }
+ CounterRecorder(const butil::StringPiece& prefix1,
+ const butil::StringPiece& prefix2) : Base(-1) {
+ expose(prefix1, prefix2);
+ }
+ CounterRecorder(const butil::StringPiece& prefix1,
+ const butil::StringPiece& prefix2,
+ time_t window_size) : Base(window_size) {
+ expose(prefix1, prefix2);
+ }
+
+ ~CounterRecorder() { hide(); }
+
+ // Record the counter num.
+ CounterRecorder& operator<<(int64_t count_num);
+
+ // Expose all internal variables using `prefix' as prefix.
+ // Returns 0 on success, -1 otherwise.
+ // Example:
+ // CounterRecorder rec;
+ // rec.expose("foo_bar_add"); // foo_bar_add_avg_counter
+ // // foo_bar_add_max_counter
+ // // foo_bar_add_total_times
+ // // foo_bar_add_qps
+ // rec.expose("foo_bar", "apply"); // foo_bar_apply_avg_counter
+ // // foo_bar_apply_max_counter
+ // // foo_bar_apply_total_times
+ // // foo_bar_apply_qps
+ int expose(const butil::StringPiece& prefix) {
+ return expose(butil::StringPiece(), prefix);
+ }
+ int expose(const butil::StringPiece& prefix1,
+ const butil::StringPiece& prefix2);
+
+ // Hide all internal variables, called in dtor as well.
+ void hide();
+
+ // Get the average counter num in recent |window_size| seconds
+ // If |window_size| is absent, use the window_size to ctor.
+ int64_t avg_counter(time_t window_size) const
+ { return _avg_counter_window.get_value(window_size).get_average_int(); }
+ int64_t avg_counter() const
+ { return _avg_counter_window.get_value().get_average_int(); }
+
+ // Get p1/p2/p3/99.9-ile counter num in recent window_size-to-ctor seconds.
+ Vector counter_percentiles() const;
+
+ // Get the max counter numer in recent window_size-to-ctor seconds.
+ int64_t max_counter() const { return _max_counter_window.get_value(); }
+
+ // Get the total number of recorded counter nums
+ int64_t total_times() const { return _avg_counter.get_value().num; }
+
+ // Get qps in recent |window_size| seconds. The `q' means counter nums.
+ // recorded by operator<<().
+ // If |window_size| is absent, use the window_size to ctor.
+ int64_t qps(time_t window_size) const;
+ int64_t qps() const { return _qps.get_value(); }
+
+ // Get |ratio|-ile counter num in recent |window_size| seconds
+ // E.g. 0.99 means 99%-ile
+ int64_t counter_percentile(double ratio) const;
+
+ // Get name of a sub-bvar.
+ const std::string& avg_counter_name() const { return _avg_counter_window.name(); }
+ const std::string& counter_percentiles_name() const
+ { return _counter_percentiles.name(); }
+ const std::string& counter_cdf_name() const { return _counter_cdf.name(); }
+ const std::string& max_counter_name() const
+ { return _max_counter_window.name(); }
+ const std::string& total_times_name() const { return _total_times.name(); }
+ const std::string& qps_name() const { return _qps.name(); }
+};
+
+std::ostream& operator<<(std::ostream& os, const CounterRecorder&);
+
+} // namespace bvar
+
namespace braft {
class Closure;
diff --git a/test/memory_file_system_adaptor.h b/test/memory_file_system_adaptor.h
index 8450e7c1..deec27cd 100644
--- a/test/memory_file_system_adaptor.h
+++ b/test/memory_file_system_adaptor.h
@@ -97,6 +97,11 @@ class MemoryFileAdaptor : public braft::FileAdaptor {
return true;
}
+ bool close() {
+ return true;
+ }
+
+
private:
scoped_refptr _node_impl;
};
diff --git a/test/test_log.cpp b/test/test_log.cpp
index 893aa7fb..3cc4aa1d 100644
--- a/test/test_log.cpp
+++ b/test/test_log.cpp
@@ -12,6 +12,7 @@
#include
#include
#include
+#include
#include
#include
#include "braft/log.h"
@@ -482,6 +483,110 @@ TEST_F(LogStorageTest, append_close_load_append) {
delete configuration_manager;
}
+ssize_t file_size(const char* filename) {
+ struct stat st;
+ stat(filename, &st);
+ return st.st_size;
+}
+
+int truncate_uninterrupted(const char* filename, off_t length) {
+ int rc = 0;
+ do {
+ rc = truncate(filename, length);
+ } while (rc == -1 && errno == EINTR);
+ return rc;
+}
+
+TEST_F(LogStorageTest, data_lost) {
+ ::system("rm -rf data");
+ braft::LogStorage* storage = new braft::SegmentLogStorage("./data");
+ braft::ConfigurationManager* configuration_manager = new braft::ConfigurationManager;
+ ASSERT_EQ(0, storage->init(configuration_manager));
+
+ // append entry
+ for (int i = 0; i < 100000; i++) {
+ std::vector entries;
+ for (int j = 0; j < 5; j++) {
+ int64_t index = 5*i + j + 1;
+ braft::LogEntry* entry = new braft::LogEntry();
+ entry->type = braft::ENTRY_TYPE_DATA;
+ entry->id.term = 1;
+ entry->id.index = index;
+
+ char data_buf[128];
+ snprintf(data_buf, sizeof(data_buf), "hello, world: %ld", index);
+ entry->data.append(data_buf);
+ entries.push_back(entry);
+ }
+
+ ASSERT_EQ(5, storage->append_entries(entries));
+
+ for (size_t j = 0; j < entries.size(); j++) {
+ delete entries[j];
+ }
+ }
+
+ delete storage;
+ delete configuration_manager;
+
+ // reinit
+ storage = new braft::SegmentLogStorage("./data");
+ configuration_manager = new braft::ConfigurationManager;
+ ASSERT_EQ(0, storage->init(configuration_manager));
+
+ ASSERT_EQ(storage->first_log_index(), 1);
+ ASSERT_EQ(storage->last_log_index(), 100000*5);
+
+ delete storage;
+ delete configuration_manager;
+
+ // last segment lost data
+ butil::DirReaderPosix dir_reader1("./data");
+ ASSERT_TRUE(dir_reader1.IsValid());
+ while (dir_reader1.Next()) {
+ int64_t first_index = 0;
+ int match = sscanf(dir_reader1.name(), "log_inprogress_%020ld",
+ &first_index);
+ std::string path;
+ butil::string_appendf(&path, "./data/%s", dir_reader1.name());
+ if (match == 1) {
+ ASSERT_EQ(truncate_uninterrupted(path.c_str(), file_size(path.c_str()) - 1), 0);
+ }
+ }
+
+ storage = new braft::SegmentLogStorage("./data");
+ configuration_manager = new braft::ConfigurationManager;
+ ASSERT_EQ(0, storage->init(configuration_manager));
+
+ ASSERT_EQ(storage->first_log_index(), 1);
+ ASSERT_EQ(storage->last_log_index(), 100000*5 - 1);
+
+ delete storage;
+ delete configuration_manager;
+
+ // middle segment lost data
+ butil::DirReaderPosix dir_reader2("./data");
+ ASSERT_TRUE(dir_reader2.IsValid());
+ while (dir_reader2.Next()) {
+ int64_t first_index = 0;
+ int64_t last_index = 0;
+ int match = sscanf(dir_reader2.name(), "log_%020ld_%020ld",
+ &first_index, &last_index);
+ std::string path;
+ butil::string_appendf(&path, "./data/%s", dir_reader2.name());
+ if (match == 2) {
+ ASSERT_EQ(truncate_uninterrupted(path.c_str(), file_size(path.c_str()) - 1), 0);
+ }
+ }
+
+ storage = new braft::SegmentLogStorage("./data");
+ configuration_manager = new braft::ConfigurationManager;
+ ASSERT_NE(0, storage->init(configuration_manager));
+
+ delete storage;
+ delete configuration_manager;
+}
+
TEST_F(LogStorageTest, append_read_badcase) {
::system("rm -rf data");
braft::LogStorage* storage = new braft::SegmentLogStorage("./data");
diff --git a/test/test_node.cpp b/test/test_node.cpp
index 6664ad89..a1744ec6 100644
--- a/test/test_node.cpp
+++ b/test/test_node.cpp
@@ -20,6 +20,8 @@
#include "braft/node.h"
#include "braft/enum.pb.h"
#include "braft/errno.pb.h"
+#include
+#include
namespace braft {
extern bvar::Adder g_num_nodes;
@@ -82,7 +84,8 @@ class MockFSM : public braft::StateMachine {
virtual void on_apply(braft::Iterator& iter) {
for (; iter.valid(); iter.next()) {
- LOG_IF(TRACE, !g_dont_print_apply_log) << "addr " << address << " apply " << iter.index();
+ LOG_IF(TRACE, !g_dont_print_apply_log) << "addr " << address
+ << " apply " << iter.index();
::brpc::ClosureGuard guard(iter.done());
lock();
logs.push_back(iter.data());
@@ -166,8 +169,8 @@ class MockFSM : public braft::StateMachine {
++_on_stop_following_times;
}
- virtual void on_configuration_committed(const ::braft::Configuration& conf) {
- LOG(TRACE) << "address " << address << " commit conf: " << conf;
+ virtual void on_configuration_committed(const ::braft::Configuration& conf, int64_t index) {
+ LOG(TRACE) << "address " << address << " commit conf: " << conf << " at index " << index;
}
};
@@ -176,9 +179,8 @@ class ExpectClosure : public braft::Closure {
public:
void Run() {
if (_expect_err_code >= 0) {
- EXPECT_EQ(status().error_code(), _expect_err_code)
+ ASSERT_EQ(status().error_code(), _expect_err_code)
<< _pos << " : " << status();
-
}
if (_cond) {
_cond->signal();
@@ -217,7 +219,7 @@ typedef ExpectClosure SnapshotClosure;
class Cluster {
public:
Cluster(const std::string& name, const std::vector& peers,
- int32_t election_timeout_ms = 300)
+ int32_t election_timeout_ms = 3000)
: _name(name), _peers(peers)
, _election_timeout_ms(election_timeout_ms) {
int64_t throttle_throughput_bytes = 10 * 1024 * 1024;
@@ -256,13 +258,17 @@ class Cluster {
butil::endpoint2str(listen_addr).c_str());
butil::string_printf(&options.snapshot_uri, "local://./data/%s/snapshot",
butil::endpoint2str(listen_addr).c_str());
+
scoped_refptr tst(_throttle);
options.snapshot_throttle = &tst;
+ options.catchup_margin = 2;
+
braft::Node* node = new braft::Node(_name, braft::PeerId(listen_addr, 0));
int ret = node->init(options);
if (ret != 0) {
LOG(WARNING) << "init_node failed, server: " << listen_addr;
+ delete node;
return ret;
} else {
LOG(NOTICE) << "init node " << listen_addr;
@@ -356,6 +362,26 @@ class Cluster {
}
}
+ void check_node_status() {
+ std::vector nodes;
+ {
+ std::lock_guard guard(_mutex);
+ for (size_t i = 0; i < _nodes.size(); i++) {
+ nodes.push_back(_nodes[i]);
+ }
+ }
+ for (size_t i = 0; i < _nodes.size(); ++i) {
+ braft::NodeStatus status;
+ nodes[i]->get_status(&status);
+ if (nodes[i]->is_leader()) {
+ ASSERT_EQ(status.state, braft::STATE_LEADER);
+ } else {
+ ASSERT_NE(status.state, braft::STATE_LEADER);
+ ASSERT_EQ(status.stable_followers.size(), 0);
+ }
+ }
+ }
+
void ensure_leader(const butil::EndPoint& expect_addr) {
CHECK:
std::lock_guard guard(_mutex);
@@ -373,7 +399,7 @@ class Cluster {
}
bool ensure_same(int wait_time_s = -1) {
- std::lock_guard guard(_mutex);
+ std::unique_lock guard(_mutex);
if (_fsms.size() <= 1) {
return true;
}
@@ -388,6 +414,10 @@ class Cluster {
fsm->lock();
if (first->logs.size() != fsm->logs.size()) {
+ LOG(INFO) << "logs size not match, "
+ << " addr: " << first->address << " vs "
+ << fsm->address << ", log num "
+ << first->logs.size() << " vs " << fsm->logs.size();
fsm->unlock();
goto WAIT;
}
@@ -398,9 +428,9 @@ class Cluster {
if (first_data.to_string() != fsm_data.to_string()) {
LOG(INFO) << "log data of index=" << j << " not match, "
<< " addr: " << first->address << " vs "
- << fsm->address << ", data "
- << first_data.to_string() << " vs "
- << fsm_data.to_string();
+ << fsm->address << ", data ("
+ << first_data.to_string() << ") vs "
+ << fsm_data.to_string() << ")";
fsm->unlock();
goto WAIT;
}
@@ -409,6 +439,8 @@ class Cluster {
fsm->unlock();
}
first->unlock();
+ guard.unlock();
+ check_node_status();
return true;
WAIT:
@@ -936,6 +968,226 @@ TEST_P(NodeTest, JoinNode) {
cluster.stop_all();
}
+TEST_P(NodeTest, Leader_step_down_during_install_snapshot) {
+ std::vector peers;
+ braft::PeerId peer0;
+ peer0.addr.ip = butil::my_ip();
+ peer0.addr.port = 5006;
+ peer0.idx = 0;
+
+ // start cluster
+ peers.push_back(peer0);
+ Cluster cluster("unittest", peers, 1000);
+ ASSERT_EQ(0, cluster.start(peer0.addr));
+ LOG(NOTICE) << "start single cluster " << peer0;
+
+ cluster.wait_leader();
+
+ braft::Node* leader = cluster.leader();
+ ASSERT_TRUE(leader != NULL);
+ ASSERT_EQ(leader->node_id().peer_id, peer0);
+ LOG(WARNING) << "leader is " << leader->node_id();
+
+ bthread::CountdownEvent cond(10);
+ // apply something
+ for (int i = 0; i < 10; i++) {
+ butil::IOBuf data;
+ std::string data_buf;
+ data_buf.resize(256 * 1024, 'a');
+ data.append(data_buf);
+
+ braft::Task task;
+ task.data = &data;
+ task.done = NEW_APPLYCLOSURE(&cond, 0);
+ leader->apply(task);
+ }
+ cond.wait();
+
+ // trigger leader snapshot
+ LOG(WARNING) << "trigger leader snapshot ";
+ cond.reset(1);
+ leader->snapshot(NEW_SNAPSHOTCLOSURE(&cond, 0));
+ cond.wait();
+
+ cond.reset(10);
+ // apply something
+ for (int i = 0; i < 10; i++) {
+ butil::IOBuf data;
+ std::string data_buf;
+ data_buf.resize(256 * 1024, 'b');
+ data.append(data_buf);
+
+ braft::Task task;
+ task.data = &data;
+ task.done = NEW_APPLYCLOSURE(&cond, 0);
+ leader->apply(task);
+ }
+ cond.wait();
+
+ // trigger leader snapshot again to compact logs
+ LOG(WARNING) << "trigger leader snapshot again";
+ cond.reset(1);
+ leader->snapshot(NEW_SNAPSHOTCLOSURE(&cond, 0));
+ cond.wait();
+
+ // start peer1
+ braft::PeerId peer1;
+ peer1.addr.ip = butil::my_ip();
+ peer1.addr.port = 5006 + 1;
+ peer1.idx = 0;
+ ASSERT_EQ(0, cluster.start(peer1.addr, true));
+ LOG(NOTICE) << "start peer " << peer1;
+ // wait until started successfully
+ usleep(1000* 1000);
+
+ // add peer1, leader step down while caught_up
+ cond.reset(1);
+ LOG(NOTICE) << "add peer: " << peer1;
+ leader->add_peer(peer1, NEW_ADDPEERCLOSURE(&cond, EPERM));
+ usleep(500 * 1000);
+
+ {
+ brpc::Channel channel;
+ brpc::ChannelOptions options;
+ options.protocol = brpc::PROTOCOL_HTTP;
+ if (channel.Init(leader->node_id().peer_id.addr, &options) != 0) {
+ LOG(ERROR) << "Fail to initialize channel";
+ }
+ {
+ brpc::Controller cntl;
+ cntl.http_request().uri() = "/raft_stat/unittest";
+ cntl.http_request().set_method(brpc::HTTP_METHOD_GET);
+ channel.CallMethod(NULL, &cntl, NULL, NULL, NULL/* done*/);
+ LOG(NOTICE) << "http return: \n" << cntl.response_attachment();
+ }
+ }
+
+ LOG(NOTICE) << "leader " << leader->node_id()
+ << " step_down because of some error";
+ butil::Status status;
+ status.set_error(braft::ERAFTTIMEDOUT, "Majority of the group dies");
+ leader->_impl->step_down(leader->_impl->_current_term, false, status);
+ cond.wait();
+
+ // add peer1 again, success
+ LOG(NOTICE) << "add peer again: " << peer1;
+ cond.reset(1);
+ cluster.wait_leader();
+ leader = cluster.leader();
+ leader->add_peer(peer1, NEW_ADDPEERCLOSURE(&cond, 0));
+ cond.wait();
+
+ cluster.ensure_same();
+
+ LOG(TRACE) << "stop cluster";
+ cluster.stop_all();
+}
+
+
+TEST_P(NodeTest, Report_error_during_install_snapshot) {
+ std::vector peers;
+ for (int i = 0; i < 3; i++) {
+ braft::PeerId peer;
+ peer.addr.ip = butil::my_ip();
+ peer.addr.port = 5006 + i;
+ peer.idx = 0;
+
+ peers.push_back(peer);
+ }
+
+ // start cluster
+ Cluster cluster("unittest", peers);
+ for (size_t i = 0; i < peers.size(); i++) {
+ ASSERT_EQ(0, cluster.start(peers[i].addr));
+ }
+
+ // elect leader
+ cluster.wait_leader();
+ braft::Node* leader = cluster.leader();
+ ASSERT_TRUE(leader != NULL);
+ LOG(WARNING) << "leader is " << leader->node_id();
+
+ // apply something
+ bthread::CountdownEvent cond(10);
+ for (int i = 0; i < 10; i++) {
+ butil::IOBuf data;
+ std::string data_buf;
+ data_buf.resize(256 * 1024, 'a');
+ data.append(data_buf);
+
+ braft::Task task;
+ task.data = &data;
+ task.done = NEW_APPLYCLOSURE(&cond, 0);
+ leader->apply(task);
+ }
+ cond.wait();
+
+ cluster.ensure_same();
+
+ std::vector nodes;
+ cluster.followers(&nodes);
+ ASSERT_EQ(2, nodes.size());
+
+ // stop follower
+ LOG(WARNING) << "stop follower";
+ butil::EndPoint follower_addr = nodes[0]->node_id().peer_id.addr;
+ cluster.stop(follower_addr);
+
+ // apply something
+ cond.reset(10);
+ for (int i = 10; i < 20; i++) {
+ butil::IOBuf data;
+ std::string data_buf;
+ data_buf.resize(256 * 1024, 'b');
+ data.append(data_buf);
+
+ braft::Task task;
+ task.data = &data;
+ task.done = NEW_APPLYCLOSURE(&cond, 0);
+ leader->apply(task);
+ }
+ cond.wait();
+
+ // trigger leader snapshot
+ LOG(WARNING) << "trigger leader snapshot ";
+ cond.reset(1);
+ leader->snapshot(NEW_SNAPSHOTCLOSURE(&cond, 0));
+ cond.wait();
+
+ // apply something
+ cond.reset(10);
+ for (int i = 20; i < 30; i++) {
+ butil::IOBuf data;
+ std::string data_buf;
+ data_buf.resize(256 * 1024, 'c');
+ data.append(data_buf);
+
+ braft::Task task;
+ task.data = &data;
+ task.done = NEW_APPLYCLOSURE(&cond, 0);
+ leader->apply(task);
+ }
+ cond.wait();
+
+ // trigger leader snapshot again to compact logs
+ LOG(WARNING) << "trigger leader snapshot again";
+ cond.reset(1);
+ leader->snapshot(NEW_SNAPSHOTCLOSURE(&cond, 0));
+ cond.wait();
+
+ LOG(WARNING) << "restart follower";
+ ASSERT_EQ(0, cluster.start(follower_addr));
+ usleep(1*1000*1000);
+
+ // trigger newly-started follower report_error when install_snapshot
+ cluster._nodes.back()->_impl->_snapshot_executor->report_error(EIO, "%s",
+ "Fail to close writer");
+
+ sleep(2);
+ LOG(WARNING) << "cluster stop";
+ cluster.stop_all();
+}
+
TEST_P(NodeTest, RemoveFollower) {
std::vector peers;
for (int i = 0; i < 3; i++) {
@@ -1570,8 +1822,11 @@ TEST_P(NodeTest, InstallSnapshot) {
}
cond.wait();
- // wait leader to compact logs
- usleep(5000 * 1000);
+ // trigger leader snapshot again to compact logs
+ LOG(WARNING) << "trigger leader snapshot again";
+ cond.reset(1);
+ leader->snapshot(NEW_SNAPSHOTCLOSURE(&cond, 0));
+ cond.wait();
LOG(WARNING) << "restart follower";
ASSERT_EQ(0, cluster.start(follower_addr));
@@ -2423,15 +2678,15 @@ TEST_P(NodeTest, on_start_following_and_on_stop_following) {
// When it was still in follower state, it would do handle_election_timeout and
// trigger on_stop_following when not receiving heartbeat for a long
// time(election_timeout_ms).
- ASSERT_EQ(static_cast(leader_second->_impl->_options.fsm)->_on_start_following_times, 1);
- ASSERT_EQ(static_cast(leader_second->_impl->_options.fsm)->_on_stop_following_times, 1);
+ ASSERT_GE(static_cast(leader_second->_impl->_options.fsm)->_on_start_following_times, 1);
+ ASSERT_GE(static_cast(leader_second->_impl->_options.fsm)->_on_stop_following_times, 1);
for (int i = 0; i < 3; i++) {
// Firstly these followers have a leader, but it stops and a candidate
// sends request_vote_request to them, which triggers on_stop_following.
// When the candidate becomes new leader, on_start_following is triggled
// again so _on_start_following_times increase by 1.
- ASSERT_EQ(static_cast(followers_second[i]->_impl->_options.fsm)->_on_start_following_times, 2);
- ASSERT_EQ(static_cast(followers_second[i]->_impl->_options.fsm)->_on_stop_following_times, 1);
+ ASSERT_GE(static_cast(followers_second[i]->_impl->_options.fsm)->_on_start_following_times, 2);
+ ASSERT_GE(static_cast(followers_second[i]->_impl->_options.fsm)->_on_stop_following_times, 1);
}
// transfer leadership to a follower
@@ -2463,20 +2718,20 @@ TEST_P(NodeTest, on_start_following_and_on_stop_following) {
// leader_third's _on_start_following_times and _on_stop_following_times should both be 2.
// When it was still in follower state, it would do handle_timeout_now_request and
// trigger on_stop_following when leader_second transferred leadership to it.
- ASSERT_EQ(static_cast(leader_third->_impl->_options.fsm)->_on_start_following_times, 2);
- ASSERT_EQ(static_cast(leader_third->_impl->_options.fsm)->_on_stop_following_times, 2);
+ ASSERT_GE(static_cast(leader_third->_impl->_options.fsm)->_on_start_following_times, 2);
+ ASSERT_GE(static_cast(leader_third->_impl->_options.fsm)->_on_stop_following_times, 2);
for (int i = 0; i < 3; i++) {
// leader_second became follower when it transferred leadership to target,
// and when it receives leader_third's append_entries_request on_start_following is triggled.
if (followers_third[i]->node_id().peer_id == leader_second->node_id().peer_id) {
- ASSERT_EQ(static_cast(followers_third[i]->_impl->_options.fsm)->_on_start_following_times, 2);
- ASSERT_EQ(static_cast(followers_third[i]->_impl->_options.fsm)->_on_stop_following_times, 1);
+ ASSERT_GE(static_cast(followers_third[i]->_impl->_options.fsm)->_on_start_following_times, 2);
+ ASSERT_GE(static_cast(followers_third[i]->_impl->_options.fsm)->_on_stop_following_times, 1);
continue;
}
// other followers just lose the leader_second and get leader_third, so _on_stop_following_times and
// _on_start_following_times both increase by 1.
- ASSERT_EQ(static_cast(followers_third[i]->_impl->_options.fsm)->_on_start_following_times, 3);
- ASSERT_EQ(static_cast(followers_third[i]->_impl->_options.fsm)->_on_stop_following_times, 2);
+ ASSERT_GE(static_cast(followers_third[i]->_impl->_options.fsm)->_on_start_following_times, 3);
+ ASSERT_GE(static_cast(followers_third[i]->_impl->_options.fsm)->_on_stop_following_times, 2);
}
cluster.ensure_same();
@@ -3305,6 +3560,197 @@ TEST_P(NodeTest, follower_handle_out_of_order_append_entries) {
cluster.stop_all();
}
+TEST_P(NodeTest, readonly) {
+ std::vector peers;
+ for (int i = 0; i < 3; i++) {
+ braft::PeerId peer;
+ peer.addr.ip = butil::my_ip();
+ peer.addr.port = 5006 + i;
+ peer.idx = 0;
+
+ peers.push_back(peer);
+ }
+
+ // start cluster
+ Cluster cluster("unittest", peers);
+ for (size_t i = 0; i < peers.size(); i++) {
+ ASSERT_EQ(0, cluster.start(peers[i].addr));
+ }
+
+ // elect leader
+ cluster.wait_leader();
+ braft::Node* leader = cluster.leader();
+ ASSERT_TRUE(leader != NULL);
+ LOG(WARNING) << "leader is " << leader->node_id();
+
+ // apply something
+ bthread::CountdownEvent cond(10);
+ int start_index = 0;
+ for (int i = start_index; i < start_index + 10; i++) {
+ butil::IOBuf data;
+ char data_buf[128];
+ snprintf(data_buf, sizeof(data_buf), "hello: %d", i);
+ data.append(data_buf);
+
+ braft::Task task;
+ task.data = &data;
+ task.done = NEW_APPLYCLOSURE(&cond, 0);
+ leader->apply(task);
+ }
+ cond.wait();
+
+ // let leader enter readonly mode, reject user logs
+ leader->enter_readonly_mode();
+ ASSERT_TRUE(leader->readonly());
+ cond.reset(10);
+ start_index += 10;
+ for (int i = start_index; i < start_index + 10; i++) {
+ butil::IOBuf data;
+ char data_buf[128];
+ snprintf(data_buf, sizeof(data_buf), "hello: %d", i);
+ data.append(data_buf);
+
+ braft::Task task;
+ task.data = &data;
+ task.done = NEW_APPLYCLOSURE(&cond, braft::EREADONLY);
+ leader->apply(task);
+ }
+ cond.wait();
+
+ // let leader leave readonly mode, accept user logs
+ leader->leave_readonly_mode();
+ ASSERT_FALSE(leader->readonly());
+ cond.reset(10);
+ start_index += 10;
+ for (int i = start_index; i < start_index + 10; i++) {
+ butil::IOBuf data;
+ char data_buf[128];
+ snprintf(data_buf, sizeof(data_buf), "hello: %d", i);
+ data.append(data_buf);
+
+ braft::Task task;
+ task.data = &data;
+ task.done = NEW_APPLYCLOSURE(&cond, 0);
+ leader->apply(task);
+ }
+ cond.wait();
+
+ std::vector followers;
+ cluster.followers(&followers);
+ ASSERT_EQ(2, followers.size());
+
+ // Let follower 0 enter readonly mode, still can accept user logs
+ followers[0]->enter_readonly_mode();
+ bthread_usleep(2000 * 1000); // wait a while for heartbeat
+ cond.reset(10);
+ start_index += 10;
+ for (int i = start_index; i < start_index + 10; i++) {
+ butil::IOBuf data;
+ char data_buf[128];
+ snprintf(data_buf, sizeof(data_buf), "hello: %d", i);
+ data.append(data_buf);
+
+ braft::Task task;
+ task.data = &data;
+ task.done = NEW_APPLYCLOSURE(&cond, 0);
+ leader->apply(task);
+ }
+ cond.wait();
+
+ // Let follower 1 enter readonly mode, majority readonly, reject user logs
+ followers[1]->enter_readonly_mode();
+ int retry = 5;
+ while (!leader->readonly() && --retry >= 0) {
+ bthread_usleep(1000 * 1000);
+ }
+ ASSERT_TRUE(leader->readonly());
+ cond.reset(10);
+ start_index += 10;
+ for (int i = start_index; i < start_index + 10; i++) {
+ butil::IOBuf data;
+ char data_buf[128];
+ snprintf(data_buf, sizeof(data_buf), "hello: %d", i);
+ data.append(data_buf);
+
+ braft::Task task;
+ task.data = &data;
+ task.done = NEW_APPLYCLOSURE(&cond, braft::EREADONLY);
+ leader->apply(task);
+ }
+ cond.wait();
+
+ // Add a new follower
+ braft::PeerId peer3;
+ peer3.addr.ip = butil::my_ip();
+ peer3.addr.port = 5006 + 3;
+ peer3.idx = 0;
+ ASSERT_EQ(0, cluster.start(peer3.addr, true));
+ bthread_usleep(1000* 1000);
+ cond.reset(1);
+ leader->add_peer(peer3, NEW_ADDPEERCLOSURE(&cond, 0));
+ cond.wait();
+
+ // Trigger follower 0 do snapshot
+ cond.reset(1);
+ followers[0]->snapshot(NEW_SNAPSHOTCLOSURE(&cond, 0));
+ cond.wait();
+
+ // 2/4 readonly, leader still in readonly
+ retry = 5;
+ while (!leader->readonly() && --retry >= 0) {
+ bthread_usleep(1000 * 1000);
+ }
+ ASSERT_TRUE(leader->readonly());
+ start_index += 10;
+ cond.reset(10);
+ for (int i = start_index; i < start_index + 10; i++) {
+ butil::IOBuf data;
+ char data_buf[128];
+ snprintf(data_buf, sizeof(data_buf), "hello: %d", i);
+ data.append(data_buf);
+
+ braft::Task task;
+ task.data = &data;
+ task.done = NEW_APPLYCLOSURE(&cond, braft::EREADONLY);
+ leader->apply(task);
+ }
+ cond.wait();
+
+ // Remove follower 0
+ cond.reset(1);
+ leader->remove_peer(followers[0]->node_id().peer_id, NEW_REMOVEPEERCLOSURE(&cond, 0));
+ cond.wait();
+ cluster.stop(followers[0]->node_id().peer_id.addr);
+
+ // 1/3 readonly, leader leave Readonly
+ retry = 5;
+ while (leader->readonly() && --retry >= 0) {
+ bthread_usleep(1000 * 1000);
+ }
+ ASSERT_TRUE(!leader->readonly());
+ cond.reset(10);
+ start_index += 10;
+ for (int i = start_index; i < start_index + 10; i++) {
+ butil::IOBuf data;
+ char data_buf[128];
+ snprintf(data_buf, sizeof(data_buf), "hello: %d", i);
+ data.append(data_buf);
+
+ braft::Task task;
+ task.data = &data;
+ task.done = NEW_APPLYCLOSURE(&cond, 0);
+ leader->apply(task);
+ }
+ cond.wait();
+
+ // Follower 1 leave readonly, catch up logs
+ followers[1]->leave_readonly_mode();
+ cluster.ensure_same();
+
+ LOG(WARNING) << "cluster stop";
+ cluster.stop_all();
+}
+
INSTANTIATE_TEST_CASE_P(NodeTestWithoutPipelineReplication,
NodeTest,
::testing::Values("NoReplcation"));
diff --git a/test/test_snapshot.cpp b/test/test_snapshot.cpp
index 98d050a7..5224640e 100644
--- a/test/test_snapshot.cpp
+++ b/test/test_snapshot.cpp
@@ -405,10 +405,17 @@ void add_file_meta(braft::FileSystemAdaptor* fs, braft::SnapshotWriter* writer,
if (checksum) {
file_meta.set_checksum(*checksum);
}
- write_file(fs, writer->get_path() + "/" + path.str(), path.str() + data);
+ write_file(fs, writer->get_path() + "/" + path.str(), path.str() + ": " + data);
ASSERT_EQ(0, writer->add_file(path.str(), &file_meta));
}
+void add_file_without_meta(braft::FileSystemAdaptor* fs, braft::SnapshotWriter* writer, int index,
+ const std::string& data) {
+ std::stringstream path;
+ path << "file" << index;
+ write_file(fs, writer->get_path() + "/" + path.str(), path.str() + ": " + data);
+}
+
bool check_file_exist(braft::FileSystemAdaptor* fs, const std::string& path, int index) {
if (fs == NULL) {
fs = braft::default_file_system();
@@ -522,6 +529,8 @@ TEST_F(SnapshotTest, filter_before_copy) {
add_file_meta(fs, writer2, 4, &checksum2, data2);
// file not exist in remote, will delete
add_file_meta(fs, writer2, 100, &checksum2, data2);
+ // file exit but meta not exit, will delete
+ add_file_without_meta(fs, writer2, 102, data2);
ASSERT_EQ(0, writer2->save_meta(meta));
ASSERT_EQ(0, storage2->close(writer2));
@@ -537,15 +546,15 @@ TEST_F(SnapshotTest, filter_before_copy) {
meta.set_last_included_index(901);
const std::string data3("ccc");
const std::string checksum3("3");
- // same checksum, will not copy
+ // same checksum, will copy from last_snapshot with index=901
add_file_meta(fs, writer2, 6, &checksum1, data3);
- // remote checksum not set, local set, will copy
+ // remote checksum not set, local last_snapshot set, will copy
add_file_meta(fs, writer2, 7, &checksum1, data3);
- // remote checksum set, local not set, will copy
+ // remote checksum set, local last_snapshot not set, will copy
add_file_meta(fs, writer2, 8, NULL, data3);
- // different checksum, will copy
+ // remote and local last_snapshot different checksum, will copy
add_file_meta(fs, writer2, 9, &checksum3, data3);
- // file not exist in remote, will delete
+ // file not exist in remote, will not copy
add_file_meta(fs, writer2, 101, &checksum3, data3);
ASSERT_EQ(0, writer2->save_meta(meta));
ASSERT_EQ(0, storage2->close(writer2));
@@ -566,7 +575,7 @@ TEST_F(SnapshotTest, filter_before_copy) {
for (int i = 1; i <= 9; ++i) {
ASSERT_TRUE(check_file_exist(fs, snapshot_path, i));
std::stringstream content;
- content << "file" << i;
+ content << "file" << i << ": ";
if (i == 1) {
content << data2;
} else if (i == 6) {
@@ -578,6 +587,7 @@ TEST_F(SnapshotTest, filter_before_copy) {
}
ASSERT_TRUE(!check_file_exist(fs, snapshot_path, 100));
ASSERT_TRUE(!check_file_exist(fs, snapshot_path, 101));
+ ASSERT_TRUE(!check_file_exist(fs, snapshot_path, 102));
delete storage2;
delete storage1;