Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions include/pingcap/kv/Cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ struct Cluster

Cluster()
: pd_client(std::make_shared<pd::MockPDClient>())
, rpc_client(std::make_unique<RpcClient>())
, thread_pool(std::make_unique<pingcap::common::FixedThreadPool>(1))
, rpc_client(std::make_unique<RpcClient>(pd_client, ClusterConfig{}))
, thread_pool(std::make_unique<pingcap::common::FixedThreadPool>(2))
, mpp_prober(std::make_unique<common::MPPProber>(this))
{
startBackgroundTasks();
Expand All @@ -44,11 +44,11 @@ struct Cluster
Cluster(const std::vector<std::string> & pd_addrs, const ClusterConfig & config)
: pd_client(std::make_shared<pd::CodecClient>(pd_addrs, config))
, region_cache(std::make_unique<RegionCache>(pd_client, config))
, rpc_client(std::make_unique<RpcClient>(config))
, rpc_client(std::make_unique<RpcClient>(pd_client, config))
, oracle(std::make_unique<pd::Oracle>(pd_client, std::chrono::milliseconds(oracle_update_interval)))
, lock_resolver(std::make_unique<LockResolver>(this))
, api_version(config.api_version)
, thread_pool(std::make_unique<pingcap::common::FixedThreadPool>(2))
, thread_pool(std::make_unique<pingcap::common::FixedThreadPool>(3))
, mpp_prober(std::make_unique<common::MPPProber>(this))
{
startBackgroundTasks();
Expand All @@ -64,6 +64,7 @@ struct Cluster
// (e.g. background threads) that cluster object holds so as to exit elegantly.
~Cluster()
{
rpc_client->stop();
mpp_prober->stop();
if (region_cache)
region_cache->stop();
Expand Down
6 changes: 3 additions & 3 deletions include/pingcap/kv/RegionClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ struct RegionClient
}
std::string err_msg = rpc.errMsg(status, extra_msg);
log->warning(err_msg);
onSendFail(bo, Exception(err_msg, GRPCErrorCode), ctx);
onSendFail(bo, Exception(err_msg, GRPCErrorCode), ctx, status);
continue;
}
if (resp->has_region_error())
Expand Down Expand Up @@ -202,15 +202,15 @@ struct RegionClient
}
std::string err_msg = rpc.errMsg(status, extra_msg);
log->warning(err_msg);
onSendFail(bo, Exception(err_msg, GRPCErrorCode), ctx);
onSendFail(bo, Exception(err_msg, GRPCErrorCode), ctx, status);
}
}

protected:
void onRegionError(Backoffer & bo, RPCContextPtr rpc_ctx, const errorpb::Error & err) const;

// Normally, it happens when machine down or network partition between tidb and kv or process crash.
void onSendFail(Backoffer & bo, const Exception & e, RPCContextPtr rpc_ctx) const;
void onSendFail(Backoffer & bo, const Exception & e, RPCContextPtr rpc_ctx, const ::grpc::Status & status) const;
};

} // namespace kv
Expand Down
33 changes: 33 additions & 0 deletions include/pingcap/kv/Rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,21 @@
#include <pingcap/kv/RegionCache.h>
#include <pingcap/kv/internal/type_traits.h>

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <map>
#include <mutex>
#include <utility>
#include <vector>

namespace pingcap
{
namespace kv
{
constexpr auto rpc_conn_check_interval = std::chrono::minutes(10);
constexpr auto rpc_conn_check_interval_jitter = std::chrono::minutes(5);

struct ConnArray
{
std::mutex mutex;
Expand All @@ -33,31 +41,56 @@ using GRPCMetaData = std::multimap<std::string, std::string>;
struct RpcClient
{
ClusterConfig config;
pd::ClientPtr pd_client;

std::mutex mutex;

std::map<std::string, ConnArrayPtr> conns;

Logger * log = &Logger::get("pingcap.RpcClient");
std::chrono::minutes scan_interval = rpc_conn_check_interval;
std::atomic<bool> stopped = false;
std::condition_variable scan_cv;

RpcClient() = default;

explicit RpcClient(const ClusterConfig & config_)
: config(config_)
{}

RpcClient(pd::ClientPtr pd_client_, const ClusterConfig & config_)
: config(config_)
, pd_client(std::move(pd_client_))
{}

void update(const ClusterConfig & config_)
{
std::unique_lock lk(mutex);
config = config_;
conns.clear();
}

void run();

void stop();

void scanAndRemoveInvalidConns();

void removeConn(const std::string & addr);

ConnArrayPtr getConnArray(const std::string & addr);

ConnArrayPtr createConnArray(const std::string & addr);
};

using RpcClientPtr = std::unique_ptr<RpcClient>;

inline void dropConnIfNeeded(const RpcClientPtr & client, const std::string & addr, const ::grpc::Status & status)
{
if (status.error_code() == grpc::StatusCode::UNAVAILABLE)
client->removeConn(addr);
}
Comment on lines +88 to +92
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Align dropConnIfNeeded() with the shared removal predicate.

shouldRemoveConnOnStatus() treats both UNAVAILABLE and CANCELLED as removable, but dropConnIfNeeded() only handles UNAVAILABLE. Since onSendFail() uses this helper, unary failures and stream setup failures with CANCELLED won’t invalidate the connection, while StreamReader::finish() will.

Proposed fix
 inline void dropConnIfNeeded(const RpcClientPtr & client, const std::string & addr, const ::grpc::Status & status)
 {
-    if (status.error_code() == grpc::StatusCode::UNAVAILABLE)
+    if (shouldRemoveConnOnStatus(status))
         client->markConnInvalid(addr);
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
inline void dropConnIfNeeded(const RpcClientPtr & client, const std::string & addr, const ::grpc::Status & status)
{
if (status.error_code() == grpc::StatusCode::UNAVAILABLE)
client->markConnInvalid(addr);
}
inline void dropConnIfNeeded(const RpcClientPtr & client, const std::string & addr, const ::grpc::Status & status)
{
if (shouldRemoveConnOnStatus(status))
client->markConnInvalid(addr);
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@include/pingcap/kv/Rpc.h` around lines 102 - 106, The helper dropConnIfNeeded
currently only treats grpc::StatusCode::UNAVAILABLE as removable while
shouldRemoveConnOnStatus also considers CANCELLED; update dropConnIfNeeded to
mark the connection invalid for both UNAVAILABLE and CANCELLED (i.e., check for
status.error_code() == UNAVAILABLE || status.error_code() == CANCELLED) so that
callers like onSendFail and stream setup failures behave consistently with
StreamReader::finish and shouldRemoveConnOnStatus, still calling
client->markConnInvalid(addr) when either code is seen.


// RpcCall holds the request and response, and delegates RPC calls.
template <typename T>
class RpcCall
Expand Down
3 changes: 3 additions & 0 deletions src/kv/Cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ void Cluster::startBackgroundTasks()
{
thread_pool->start();

thread_pool->enqueue([this] {
rpc_client->run();
});
thread_pool->enqueue([this] {
mpp_prober->run();
});
Expand Down
3 changes: 2 additions & 1 deletion src/kv/RegionClient.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,10 @@ void RegionClient::onRegionError(Backoffer & bo, RPCContextPtr rpc_ctx, const er
cluster->region_cache->dropRegion(rpc_ctx->region);
}

void RegionClient::onSendFail(Backoffer & bo, const Exception & e, RPCContextPtr rpc_ctx) const
void RegionClient::onSendFail(Backoffer & bo, const Exception & e, RPCContextPtr rpc_ctx, const ::grpc::Status & status) const
{
cluster->region_cache->onSendReqFail(rpc_ctx, e);
dropConnIfNeeded(cluster->rpc_client, rpc_ctx->addr, status);
// Retry on send request failure when it's not canceled.
// When a store is not available, the leader of related region should be elected quickly.
bo.backoff(boTiKVRPC, e);
Expand Down
104 changes: 103 additions & 1 deletion src/kv/Rpc.cc
Original file line number Diff line number Diff line change
@@ -1,9 +1,40 @@
#include <pingcap/Exception.h>
#include <pingcap/kv/Rpc.h>

#include <random>
#include <unordered_set>

namespace pingcap
{
namespace kv
{
namespace
{
std::unordered_set<std::string> getStoreAddresses(const pd::ClientPtr & pd_client)
{
std::unordered_set<std::string> store_addrs;
const auto stores = pd_client->getAllStores(true);
store_addrs.reserve(stores.size());
for (const auto & store : stores)
{
if (!store.address().empty())
store_addrs.emplace(store.address());
}
return store_addrs;
}

std::chrono::seconds getRandomScanInterval(std::chrono::minutes scan_interval)
{
const auto min_seconds = std::chrono::duration_cast<std::chrono::seconds>(scan_interval);
const auto max_seconds = std::chrono::duration_cast<std::chrono::seconds>(
scan_interval + rpc_conn_check_interval_jitter);

thread_local std::mt19937_64 generator(std::random_device{}());
std::uniform_int_distribution<std::chrono::seconds::rep> distribution(min_seconds.count(), max_seconds.count());
return std::chrono::seconds(distribution(generator));
}
} // namespace

ConnArray::ConnArray(size_t max_size, const std::string & addr, const ClusterConfig & config_)
: address(addr)
, index(0)
Expand All @@ -22,6 +53,78 @@ std::shared_ptr<KvConnClient> ConnArray::get()
return vec[index];
}

void RpcClient::run()
{
while (!stopped.load())
{
{
const auto wait_interval = getRandomScanInterval(scan_interval);
std::unique_lock lock(mutex);
scan_cv.wait_for(lock, wait_interval, [this] {
return stopped.load();
});
}

if (stopped.load())
return;

try
{
scanAndRemoveInvalidConns();
}
catch (...)
{
log->warning(getCurrentExceptionMsg("RpcClient scan conns failed: "));
}
}
}

void RpcClient::stop()
{
stopped.store(true);
scan_cv.notify_all();
}

void RpcClient::scanAndRemoveInvalidConns()
{
if (!pd_client || pd_client->isMock())
return;

std::vector<std::string> conn_snapshot;
{
std::lock_guard<std::mutex> lock(mutex);
conn_snapshot.reserve(conns.size());
for (const auto & conn : conns)
conn_snapshot.emplace_back(conn.first);
}

if (conn_snapshot.empty())
return;

const auto store_addrs = getStoreAddresses(pd_client);
std::vector<std::string> invalid_conns;
for (const auto & addr : conn_snapshot)
{
if (store_addrs.find(addr) == store_addrs.end())
invalid_conns.push_back(addr);
}

if (invalid_conns.empty())
return;

for (const auto & addr : invalid_conns)
{
removeConn(addr);
}
}

void RpcClient::removeConn(const std::string & addr)
{
std::lock_guard<std::mutex> lock(mutex);
if (conns.erase(addr))
log->information("delete invalid addr: " + addr);
}

ConnArrayPtr RpcClient::getConnArray(const std::string & addr)
{
std::lock_guard<std::mutex> lock(mutex);
Expand All @@ -39,6 +142,5 @@ ConnArrayPtr RpcClient::createConnArray(const std::string & addr)
conns[addr] = conn_array;
return conn_array;
}

} // namespace kv
} // namespace pingcap
Loading