Skip to content

Commit

Permalink
Merge branch 'master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
PFZheng authored Apr 25, 2019
2 parents dcc204e + 20ae73b commit 681367a
Show file tree
Hide file tree
Showing 31 changed files with 1,954 additions and 183 deletions.
14 changes: 13 additions & 1 deletion src/braft/ballot_box.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,22 @@ void BallotBox::describe(std::ostream& os, bool use_html) {
lck.unlock();
const char *newline = use_html ? "<br>" : "\r\n";
os << "last_committed_index: " << committed_index << newline;
if (pending_index != 0) {
if (pending_queue_size != 0) {
os << "pending_index: " << pending_index << newline;
os << "pending_queue_size: " << pending_queue_size << newline;
}
}

void BallotBox::get_status(BallotBoxStatus* status) {
if (!status) {
return;
}
std::unique_lock<raft_mutex_t> lck(_mutex);
status->committed_index = _last_committed_index;
if (_pending_meta_queue.size() != 0) {
status->pending_index = _pending_index;
status->pending_queue_size = _pending_meta_queue.size();
}
}

} // namespace braft
11 changes: 11 additions & 0 deletions src/braft/ballot_box.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ struct BallotBoxOptions {
ClosureQueue* closure_queue;
};

struct BallotBoxStatus {
BallotBoxStatus()
: committed_index(0), pending_index(0), pending_queue_size(0)
{}
int64_t committed_index;
int64_t pending_index;
int64_t pending_queue_size;
};

class BallotBox {
public:
BallotBox();
Expand Down Expand Up @@ -79,6 +88,8 @@ class BallotBox {

void describe(std::ostream& os, bool use_html);

void get_status(BallotBoxStatus* ballot_box_status);

private:

FSMCaller* _waiter;
Expand Down
2 changes: 2 additions & 0 deletions src/braft/errno.proto
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,7 @@ enum RaftError {
ELOGDELETED = 10014;
// No available user log to read
ENOMOREUSERLOG = 10015;
// Raft node in readonly mode
EREADONLY = 10016;
};

6 changes: 3 additions & 3 deletions src/braft/file_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ void FileServiceImpl::get_file(::google::protobuf::RpcController* controller,
// Don't touch iter ever after
reader = iter->second;
lck.unlock();
BRAFT_VLOG << "get_file from " << cntl->remote_side() << " path=" << reader->path()
<< " filename=" << request->filename()
<< " offset=" << request->offset() << " count=" << request->count();
BRAFT_VLOG << "get_file for " << cntl->remote_side() << " path=" << reader->path()
<< " filename=" << request->filename()
<< " offset=" << request->offset() << " count=" << request->count();

if (request->count() <= 0 || request->offset() < 0) {
cntl->SetFailed(brpc::EREQUEST, "Invalid request=%s",
Expand Down
10 changes: 9 additions & 1 deletion src/braft/file_system_adaptor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ const char* PosixDirReader::name() const {
}

PosixFileAdaptor::~PosixFileAdaptor() {
::close(_fd);
}

ssize_t PosixFileAdaptor::write(const butil::IOBuf& data, off_t offset) {
Expand All @@ -57,6 +56,15 @@ bool PosixFileAdaptor::sync() {
return raft_fsync(_fd) == 0;
}

bool PosixFileAdaptor::close() {
if (_fd > 0) {
bool res = ::close(_fd) == 0;
_fd = -1;
return res;
}
return true;
}

static pthread_once_t s_check_cloexec_once = PTHREAD_ONCE_INIT;
static bool s_support_cloexec_on_open = false;

Expand Down
9 changes: 5 additions & 4 deletions src/braft/file_system_adaptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,13 @@ class DirReader {

template <typename T>
struct DestroyObj {
void operator()(T* const obj) { obj->destroy(); }
void operator()(T* const obj) { obj->close(); delete obj; }
};


class FileAdaptor {
public:
virtual ~FileAdaptor() {}
// Write to the file. Different from posix ::pwrite(), write will retry automatically
// when occur EINTR.
// Return |data.size()| if successful, -1 otherwise.
Expand All @@ -79,13 +80,12 @@ class FileAdaptor {
// Sync data of the file to disk device
virtual bool sync() = 0;

// Destroy this adaptor
virtual void destroy() { delete this; }
// Close the descriptor of this file adaptor
virtual bool close() = 0;

protected:

FileAdaptor() {}
virtual ~FileAdaptor() {}

private:
DISALLOW_COPY_AND_ASSIGN(FileAdaptor);
Expand Down Expand Up @@ -177,6 +177,7 @@ friend class PosixFileSystemAdaptor;
virtual ssize_t read(butil::IOPortal* portal, off_t offset, size_t size);
virtual ssize_t size();
virtual bool sync();
virtual bool close();

protected:
PosixFileAdaptor(int fd) : _fd(fd) {}
Expand Down
46 changes: 38 additions & 8 deletions src/braft/fsm_caller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@

namespace braft {

static bvar::CounterRecorder g_commit_tasks_batch_counter(
"raft_commit_tasks_batch_counter");

FSMCaller::FSMCaller()
: _log_manager(NULL)
, _fsm(NULL)
Expand All @@ -41,6 +44,7 @@ FSMCaller::FSMCaller()
, _node(NULL)
, _cur_task(IDLE)
, _applying_index(0)
, _queue_started(false)
{
}

Expand All @@ -55,16 +59,20 @@ int FSMCaller::run(void* meta, bthread::TaskIterator<ApplyTask>& iter) {
return 0;
}
int64_t max_committed_index = -1;
int64_t counter = 0;
for (; iter; ++iter) {
if (iter->type == COMMITTED) {
if (iter->committed_index > max_committed_index) {
max_committed_index = iter->committed_index;
counter++;
}
} else {
if (max_committed_index >= 0) {
caller->_cur_task = COMMITTED;
caller->do_committed(max_committed_index);
max_committed_index = -1;
g_commit_tasks_batch_counter << counter;
counter = 0;
}
switch (iter->type) {
case COMMITTED:
Expand Down Expand Up @@ -115,6 +123,8 @@ int FSMCaller::run(void* meta, bthread::TaskIterator<ApplyTask>& iter) {
if (max_committed_index >= 0) {
caller->_cur_task = COMMITTED;
caller->do_committed(max_committed_index);
g_commit_tasks_batch_counter << counter;
counter = 0;
}
caller->_cur_task = IDLE;
return 0;
Expand Down Expand Up @@ -155,15 +165,22 @@ int FSMCaller::init(const FSMCallerOptions &options) {
execq_opt.bthread_attr = options.usercode_in_pthread
? BTHREAD_ATTR_PTHREAD
: BTHREAD_ATTR_NORMAL;
bthread::execution_queue_start(&_queue_id,
if (bthread::execution_queue_start(&_queue_id,
&execq_opt,
FSMCaller::run,
this);
this) != 0) {
LOG(ERROR) << "fsm fail to start execution_queue";
return -1;
}
_queue_started = true;
return 0;
}

int FSMCaller::shutdown() {
return bthread::execution_queue_stop(_queue_id);
if (_queue_started) {
return bthread::execution_queue_stop(_queue_id);
}
return 0;
}

void FSMCaller::do_shutdown() {
Expand Down Expand Up @@ -257,7 +274,8 @@ void FSMCaller::do_committed(int64_t committed_index) {
if (iter_impl.entry()->old_peers == NULL) {
// Joint stage is not supposed to be noticeable by end users.
_fsm->on_configuration_committed(
Configuration(*iter_impl.entry()->peers));
Configuration(*iter_impl.entry()->peers),
iter_impl.entry()->id.index);
}
}
// For other entries, we have nothing to do besides flush the
Expand All @@ -271,8 +289,8 @@ void FSMCaller::do_committed(int64_t committed_index) {
}
Iterator iter(&iter_impl);
_fsm->on_apply(iter);
LOG_IF(ERROR, iter.valid())
<< "Node " << _node->node_id()
LOG_IF(ERROR, iter.valid())
<< "Node " << _node->node_id()
<< " Iterator is still valid, did you return before iterator "
" reached the end?";
// Try move to next in case that we pass the same log twice.
Expand Down Expand Up @@ -391,7 +409,7 @@ void FSMCaller::do_snapshot_load(LoadSnapshotClosure* done) {
for (int i = 0; i < meta.peers_size(); ++i) {
conf.add_peer(meta.peers(i));
}
_fsm->on_configuration_committed(conf);
_fsm->on_configuration_committed(conf, meta.last_included_index());
}

_last_applied_index.store(meta.last_included_index(),
Expand Down Expand Up @@ -499,8 +517,20 @@ void FSMCaller::describe(std::ostream &os, bool use_html) {
os << newline;
}

int64_t FSMCaller::applying_index() const {
TaskType cur_task = _cur_task;
if (cur_task != COMMITTED) {
return 0;
} else {
return _applying_index.load(butil::memory_order_relaxed);
}
}

void FSMCaller::join() {
bthread::execution_queue_join(_queue_id);
if (_queue_started) {
bthread::execution_queue_join(_queue_id);
_queue_started = false;
}
}

IteratorImpl::IteratorImpl(StateMachine* sm, LogManager* lm,
Expand Down
2 changes: 2 additions & 0 deletions src/braft/fsm_caller.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ class BAIDU_CACHELINE_ALIGNMENT FSMCaller {
int64_t last_applied_index() const {
return _last_applied_index.load(butil::memory_order_relaxed);
}
int64_t applying_index() const;
void describe(std::ostream& os, bool use_html);
void join();
private:
Expand Down Expand Up @@ -183,6 +184,7 @@ friend class IteratorImpl;
TaskType _cur_task;
butil::atomic<int64_t> _applying_index;
Error _error;
bool _queue_started;
};

};
Expand Down
26 changes: 18 additions & 8 deletions src/braft/log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ int Segment::load(ConfigurationManager* configuration_manager) {
// load entry index
int64_t file_size = st_buf.st_size;
int64_t entry_off = 0;
int64_t actual_last_index = _first_index - 1;
for (int64_t i = _first_index; entry_off < file_size; i++) {
EntryHeader header;
const int rc = _load_entry(entry_off, &header, NULL, ENTRY_HEADER_SIZE);
Expand Down Expand Up @@ -310,12 +311,21 @@ int Segment::load(ConfigurationManager* configuration_manager) {
}
}
_offset_and_term.push_back(std::make_pair(entry_off, header.term));
if (_is_open) {
++_last_index;
}
++actual_last_index;
entry_off += skip_len;
}

if (ret == 0 && !_is_open && actual_last_index < _last_index) {
LOG(ERROR) << "data lost in a full segment, path: " << _path
<< " first_index: " << _first_index << " expect_last_index: "
<< _last_index << " actual_last_index: " << actual_last_index;
ret = -1;
}

if (_is_open) {
_last_index = actual_last_index;
}

// truncate last uncompleted entry
if (ret == 0 && entry_off != file_size) {
LOG(INFO) << "truncate last uncompleted write entry, path: " << _path
Expand Down Expand Up @@ -983,7 +993,7 @@ int SegmentLogStorage::list_segments(bool is_empty) {
}

last_log_index = segment->last_index();
it++;
++it;
}
if (_open_segment) {
if (last_log_index == -1 &&
Expand Down Expand Up @@ -1059,8 +1069,8 @@ int SegmentLogStorage::save_meta(const int64_t log_index) {

timer.stop();
PLOG_IF(ERROR, ret != 0) << "Fail to save meta to " << meta_path;
BRAFT_VLOG << "log save_meta " << meta_path << " log_index: " << log_index
<< " time: " << timer.u_elapsed();
LOG(INFO) << "log save_meta " << meta_path << " first_log_index: " << log_index
<< " time: " << timer.u_elapsed();
return ret;
}

Expand All @@ -1081,8 +1091,8 @@ int SegmentLogStorage::load_meta() {
_first_log_index.store(meta.first_log_index());

timer.stop();
BRAFT_VLOG << "log load_meta " << meta_path << " log_index: " << meta.first_log_index()
<< " time: " << timer.u_elapsed();
LOG(INFO) << "log load_meta " << meta_path << " first_log_index: " << meta.first_log_index()
<< " time: " << timer.u_elapsed();
return 0;
}

Expand Down
2 changes: 1 addition & 1 deletion src/braft/log.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class BAIDU_CACHELINE_ALIGNMENT Segment

std::string file_name();
private:
friend butil::RefCountedThreadSafe<Segment>;
friend class butil::RefCountedThreadSafe<Segment>;
~Segment() {
if (_fd >= 0) {
::close(_fd);
Expand Down
1 change: 0 additions & 1 deletion src/braft/log_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

// Authors: Zhangyi Chen([email protected])

#include <bvar/bvar.h>
#include "braft/log_entry.h"
#include "braft/local_storage.pb.h"

Expand Down
1 change: 0 additions & 1 deletion src/braft/log_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

#include <butil/iobuf.h> // butil::IOBuf
#include <butil/memory/ref_counted.h> // butil::RefCountedThreadSafe
#include <bvar/bvar.h>
#include <butil/third_party/murmurhash3/murmurhash3.h> // fmix64
#include "braft/configuration.h"
#include "braft/raft.pb.h"
Expand Down
Loading

0 comments on commit 681367a

Please sign in to comment.