Skip to content

Commit

Permalink
client关闭健康检查
Browse files Browse the repository at this point in the history
Change-Id: I45d5674836db3e8d5075755cb494ad9d7a5e662d
  • Loading branch information
wu-hanqing committed Apr 30, 2020
1 parent 16550e2 commit 593e8ce
Show file tree
Hide file tree
Showing 11 changed files with 80 additions and 20 deletions.
2 changes: 1 addition & 1 deletion WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ bind(
git_repository(
name = "com_netease_storage_gerrit_curve_curve_brpc",
remote = "http://gerrit.storage.netease.com/curve/curve-brpc",
commit = "8d5e3c085b38598b6c436999029ebdefa9301450",
commit = "0f0287df3f232b8b12a30b4d99392d716de1a16b",
)

bind(
Expand Down
3 changes: 3 additions & 0 deletions conf/client.conf
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,6 @@ global.logPath=/data/log/curve/
############### metric 配置信息 #############
#
global.metricDummyServerStartPort=9000

# 是否关闭健康检查: true/关闭 false/不关闭
global.turnOffHealthCheck=true
2 changes: 2 additions & 0 deletions src/client/chunk_closure.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,8 @@ void ClientClosure::Run() {
}

void ClientClosure::OnRpcFailed() {
client_->ResetSenderIfNotHealth(chunkserverID_);

status_ = cntl_->ErrorCode();

// 如果连接失败,再等一定时间再重试
Expand Down
6 changes: 6 additions & 0 deletions src/client/client_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,12 @@ int ClientConfig::Init(const char* configpath) {
LOG_IF(ERROR, ret == false) << "config no mds.registerToMDS info";
RETURN_IF_FALSE(ret)

ret = conf_.GetBoolValue("global.turnOffHealthCheck",
&fileServiceOption_.commonOpt.turnOffHealthCheck);
LOG_IF(WARNING, ret == false)
<< "config no global.turnOffHealthCheck info, using default value "
<< fileServiceOption_.commonOpt.turnOffHealthCheck;

return 0;
}

Expand Down
7 changes: 3 additions & 4 deletions src/client/config_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,12 +255,11 @@ typedef struct IOOption {
* @mdsRegisterToMDS: 是否向mds注册client信息,因为client需要通过dummy server导出
* metric信息,为了配合普罗米修斯的自动服务发现机制,会将其监听的
* ip和端口信息发送给mds。
* @turnOffHealthCheck: 是否关闭健康检查
*/
typedef struct CommonConfigOpt {
bool mdsRegisterToMDS;
CommonConfigOpt() {
mdsRegisterToMDS = false;
}
bool mdsRegisterToMDS{false};
bool turnOffHealthCheck{false};
} CommonConfigOpt_t;

/**
Expand Down
8 changes: 8 additions & 0 deletions src/client/copyset_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,14 @@ class CopysetClient : public Uncopyable {
uint64_t len,
Closure *done);

/**
* @brief 如果csId对应的RequestSender不健康,就进行重置
* @param csId chunkserver id
*/
void ResetSenderIfNotHealth(const ChunkServerID& csId) {
senderManager_->ResetSenderIfNotHealth(csId);
}

/**
* session过期,需要将重试RPC停住
*/
Expand Down
8 changes: 8 additions & 0 deletions src/client/libcurve_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ curve::client::FileClient* globalclient = nullptr;
static const int PROCESS_NAME_MAX = 32;
static char g_processname[PROCESS_NAME_MAX];

namespace brpc {
DECLARE_int32(health_check_interval);
} // namespace brpc

namespace curve {
namespace client {

Expand Down Expand Up @@ -102,6 +106,10 @@ int FileClient::Init(const std::string& configpath) {
return -LIBCURVE_ERROR::FAILED;
}

if (clientconfig_.GetFileServiceOption().commonOpt.turnOffHealthCheck) {
brpc::FLAGS_health_check_interval = -1;
}

bool rc = StartDummyServer();
if (rc == false) {
mdsClient_->UnInitialize();
Expand Down
38 changes: 23 additions & 15 deletions src/client/metacache_struct.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#ifndef SRC_CLIENT_METACACHE_STRUCT_H_
#define SRC_CLIENT_METACACHE_STRUCT_H_

#include <atomic>
#include <string>
#include <list>
#include <map>
Expand Down Expand Up @@ -62,7 +63,7 @@ typedef struct CURVE_CACHELINE_ALIGNMENT CopysetInfo {
// 当前copyset的节点信息
std::vector<CopysetPeerInfo_t> csinfos_;
// 当前节点的apply信息,在read的时候需要,用来避免读IO进入raft
uint32_t lastappliedindex_;
std::atomic<uint64_t> lastappliedindex_{0};
// leader在本copyset信息中的索引,用于后面避免重复尝试同一个leader
int16_t leaderindex_;
// 当前copyset的id信息
Expand All @@ -86,21 +87,20 @@ typedef struct CURVE_CACHELINE_ALIGNMENT CopysetInfo {
this->cpid_ = other.cpid_;
this->csinfos_.assign(other.csinfos_.begin(), other.csinfos_.end());
this->leaderindex_ = other.leaderindex_;
this->lastappliedindex_ = other.lastappliedindex_;
this->lastappliedindex_.store(other.lastappliedindex_);
this->leaderMayChange_ = other.leaderMayChange_;
return *this;
}

CopysetInfo(const CopysetInfo& other) {
this->cpid_ = other.cpid_;
this->csinfos_.assign(other.csinfos_.begin(), other.csinfos_.end());
this->leaderindex_ = other.leaderindex_;
this->lastappliedindex_ = other.lastappliedindex_;
this->leaderMayChange_ = other.leaderMayChange_;
}
CopysetInfo(const CopysetInfo& other)
: leaderMayChange_(other.leaderMayChange_),
csinfos_(other.csinfos_),
lastappliedindex_(other.lastappliedindex_.load()),
leaderindex_(other.leaderindex_),
cpid_(other.cpid_) {}

uint64_t GetAppliedIndex() {
return lastappliedindex_;
uint64_t GetAppliedIndex() const {
return lastappliedindex_.load(std::memory_order_acquire);
}

void SetLeaderUnstableFlag() {
Expand All @@ -121,10 +121,18 @@ typedef struct CURVE_CACHELINE_ALIGNMENT CopysetInfo {
* @param: appliedindex为待更新的值
*/
void UpdateAppliedIndex(uint64_t appliedindex) {
spinlock_.Lock();
if (appliedindex == 0 || appliedindex > lastappliedindex_)
lastappliedindex_ = appliedindex;
spinlock_.UnLock();
uint64_t curIndex = lastappliedindex_.load(std::memory_order_acquire);

if (appliedindex != 0 && appliedindex <= curIndex) {
return;
}

while (!lastappliedindex_.compare_exchange_strong(
curIndex, appliedindex, std::memory_order_acq_rel)) {
if (curIndex >= appliedindex) {
break;
}
}
}

/**
Expand Down
4 changes: 4 additions & 0 deletions src/client/request_sender.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ class RequestSender {
int ResetSender(ChunkServerID chunkServerId,
butil::EndPoint serverEndPoint);

bool IsSocketHealth() {
return channel_.CheckHealth() == 0;
}

private:
// Rpc stub配置
IOSenderOption_t iosenderopt_;
Expand Down
16 changes: 16 additions & 0 deletions src/client/request_sender_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,21 @@ RequestSenderManager::SenderPtr RequestSenderManager::GetOrCreateSender(
return senderPtr;
}

void RequestSenderManager::ResetSenderIfNotHealth(const ChunkServerID& csId) {
std::lock_guard<std::mutex> guard(lock_);
auto iter = senderPool_.find(csId);

if (iter == senderPool_.end()) {
return;
}

// 检查是否健康
if (iter->second->IsSocketHealth()) {
return;
}

senderPool_.erase(iter);
}

} // namespace client
} // namespace curve
6 changes: 6 additions & 0 deletions src/client/request_sender_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ class RequestSenderManager : public Uncopyable {
const butil::EndPoint &leaderAddr,
IOSenderOption_t senderopt);

/**
* @brief 如果csId对应的RequestSender不健康,就进行重置
* @param csId chunkserver id
*/
void ResetSenderIfNotHealth(const ChunkServerID& csId);

private:
// 互斥锁,保护senderPool_
mutable std::mutex lock_;
Expand Down

0 comments on commit 593e8ce

Please sign in to comment.