Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions include/pingcap/kv/2pc.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include <cmath>
#include <memory>
#include <shared_mutex>
#include <thread>
#include <unordered_map>
#include <utility>
Expand Down Expand Up @@ -98,6 +99,7 @@ struct TwoPhaseCommitter : public std::enable_shared_from_this<TwoPhaseCommitter
Cluster * cluster;

std::unordered_map<uint64_t, int> region_txn_size;
// Total bytes of all keys and values in this transaction. Used for lock TTL decisions.
uint64_t txn_size = 0;

int lock_ttl = 0;
Expand All @@ -106,6 +108,8 @@ struct TwoPhaseCommitter : public std::enable_shared_from_this<TwoPhaseCommitter
// commited means primary key has been written to kv stores.
bool commited;

bool commit_result_undetermined = false;

// Only for test now
bool use_async_commit;

Expand All @@ -120,6 +124,8 @@ struct TwoPhaseCommitter : public std::enable_shared_from_this<TwoPhaseCommitter
public:
explicit TwoPhaseCommitter(Txn * txn, bool _use_async_commit = false);

// Executes TiKV 2PC. After the primary key is committed, the transaction is
// durable; secondary commit errors are logged but not returned to callers.
void execute();

private:
Expand Down Expand Up @@ -148,6 +154,8 @@ struct TwoPhaseCommitter : public std::enable_shared_from_this<TwoPhaseCommitter

void commitKeys(Backoffer & bo, const std::vector<std::string> & keys) { doActionOnKeys<ActionCommit>(bo, keys); }

void cleanupKeys(Backoffer & bo, const std::vector<std::string> & keys) { doActionOnKeys<ActionCleanUp>(bo, keys); }

template <Action action>
void doActionOnKeys(Backoffer & bo, const std::vector<std::string> & cur_keys)
{
Expand Down Expand Up @@ -178,9 +186,12 @@ struct TwoPhaseCommitter : public std::enable_shared_from_this<TwoPhaseCommitter
batches.emplace_back(BatchKeys(group.first, sub_keys));
}
}
if (primary_idx != std::numeric_limits<uint64_t>::max() && primary_idx != 0)
if (primary_idx != std::numeric_limits<uint64_t>::max())
{
std::swap(batches[0], batches[primary_idx]);
if (primary_idx != 0)
{
std::swap(batches[0], batches[primary_idx]);
}
batches[0].is_primary = true;
}

Expand Down Expand Up @@ -213,12 +224,18 @@ struct TwoPhaseCommitter : public std::enable_shared_from_this<TwoPhaseCommitter
{
commitSingleBatch(bo, batch);
}
else if constexpr (action == ActionCleanUp)
{
cleanupSingleBatch(bo, batch);
}
}
}

void prewriteSingleBatch(Backoffer & bo, const BatchKeys & batch);

void commitSingleBatch(Backoffer & bo, const BatchKeys & batch);

void cleanupSingleBatch(Backoffer & bo, const BatchKeys & batch);
};

} // namespace kv
Expand Down
118 changes: 100 additions & 18 deletions src/kv/2pc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,19 @@ constexpr uint64_t bytesPerMiB = 1024 * 1024;

constexpr uint64_t ttlManagerRunThreshold = 32 * 1024 * 1024;

// 1 hour in PD TSO units. TiKV should not reject commit_ts with a min_commit_ts
// this far in the future during normal retry.
constexpr uint64_t maxCommitTsExpiredSkew = 3600000ULL << pd::physicalShiftBits;

constexpr uint32_t maxCommitTsExpiredRetry = 8;

uint64_t txnLockTTL(std::chrono::milliseconds start, uint64_t txn_size)
{
uint64_t lock_ttl = defaultLockTTL;

if (txn_size >= txnCommitBatchSize)
{
uint64_t txn_size_mb = txn_size / bytesPerMiB;
double txn_size_mb = static_cast<double>(txn_size) / bytesPerMiB;
lock_ttl = static_cast<uint64_t>(ttlFactor * sqrt(txn_size_mb));
if (lock_ttl < defaultLockTTL)
{
Expand All @@ -47,15 +52,15 @@ TwoPhaseCommitter::TwoPhaseCommitter(Txn * txn, bool _use_async_commit)
txn->walkBuffer([&](const std::string & key, const std::string & value) {
keys.push_back(key);
mutations.emplace(key, value);
txn_size += key.size() + value.size();
});
cluster = txn->cluster;
start_ts = txn->start_ts;
primary_lock = keys[0];
txn_size = mutations.size();
// TODO: use right lock_ttl
// currently prewrite is not concurrent, so the right lock_ttl is not enough for prewrite to complete
// lock_ttl = txnLockTTL(txn->start_time, txn_size);
lock_ttl = defaultLockTTL;
if (!keys.empty())
{
primary_lock = keys[0];
}
lock_ttl = txnLockTTL(txn->start_time, txn_size);
if (txn_size > ttlManagerRunThreshold)
{
lock_ttl = managedLockTTL;
Expand All @@ -66,6 +71,11 @@ void TwoPhaseCommitter::execute()
{
try
{
if (keys.empty())
{
return;
}

if (use_async_commit)
{
// If we want to use async commit or 1PC and also want external consistency across
Expand Down Expand Up @@ -103,15 +113,32 @@ void TwoPhaseCommitter::execute()
// TODO: check expired
Backoffer commit_bo(commitMaxBackoff);
commitKeys(commit_bo, keys);
// TODO: Process commit exception

ttl_manager.close();
}
catch (Exception & e)
{
if (!commited)
ttl_manager.close();
if (commited)
{
// TODO: Rollback keys.
// The primary commit makes the transaction durable. Match
// client-go/TiDB 2PC semantics: secondary commit failures are logged
// but not surfaced as Txn::commit() failures, because retrying the
// whole transaction could duplicate writes.
log->warning("write commit exception after primary committed: " + e.displayText());
return;
}
Comment on lines +121 to +130
if (!commit_result_undetermined)
{
try
{
Backoffer cleanup_bo(cleanupMaxBackoff);
cleanupKeys(cleanup_bo, keys);
}
catch (Exception & cleanup_error)
{
log->warning("2PC cleanup exception: " + cleanup_error.displayText());
}
}
log->warning("write commit exception: " + e.displayText());
throw;
Expand Down Expand Up @@ -210,7 +237,7 @@ void TwoPhaseCommitter::prewriteSingleBatch(Backoffer & bo, const BatchKeys & ba
}
else
{
if (batch.keys[0] == primary_lock)
if (batch.is_primary)
{
// After writing the primary key, if the size of the transaction is large than 32M,
// start the ttlManager. The ttlManager will be closed in tikvTxn.Commit().
Expand Down Expand Up @@ -253,25 +280,80 @@ void TwoPhaseCommitter::commitSingleBatch(Backoffer & bo, const BatchKeys & batc
req.set_start_version(start_ts);
req.set_commit_version(commit_ts);

kvrpcpb::CommitResponse response;
RegionClient region_client(cluster, batch.region);
uint32_t commit_ts_expired_retry = 0;
for (;;)
{
kvrpcpb::CommitResponse response;
try
{
region_client.sendReqToRegion<RPC_NAME(KvCommit)>(bo, req, &response);
}
catch (Exception & e)
{
if (batch.is_primary && e.code() != RegionEpochNotMatch)
{
commit_result_undetermined = true;
throw;
}
bo.backoff(boRegionMiss, e);
commitKeys(bo, batch.keys);
return;
}

if (response.has_error())
{
if (response.error().has_commit_ts_expired())
{
const auto & rejected = response.error().commit_ts_expired();
if (rejected.min_commit_ts() > rejected.attempted_commit_ts()
&& rejected.min_commit_ts() - rejected.attempted_commit_ts() > maxCommitTsExpiredSkew)
{
throw Exception("2PC MinCommitTS is too large, got MinCommitTS: " + std::to_string(rejected.min_commit_ts())
+ ", AttemptedCommitTS: " + std::to_string(rejected.attempted_commit_ts()),
LockError);
}
if (++commit_ts_expired_retry > maxCommitTsExpiredRetry)
{
throw Exception("2PC commit_ts_expired retry limit exceeded: " + response.error().ShortDebugString(), LockError);
}
commit_ts = cluster->pd_client->getTS();
req.set_commit_version(commit_ts);
continue;
}
throw Exception("meet errors: " + response.error().ShortDebugString(), LockError);
}
break;
}

commited = true;
}

void TwoPhaseCommitter::cleanupSingleBatch(Backoffer & bo, const BatchKeys & batch)
{
kvrpcpb::BatchRollbackRequest req;
for (const auto & key : batch.keys)
{
req.add_keys(key);
}
req.set_start_version(start_ts);

kvrpcpb::BatchRollbackResponse response;
RegionClient region_client(cluster, batch.region);
try
{
region_client.sendReqToRegion<RPC_NAME(KvCommit)>(bo, req, &response);
region_client.sendReqToRegion<RPC_NAME(KvBatchRollback)>(bo, req, &response);
}
catch (Exception & e)
{
bo.backoff(boRegionMiss, e);
commit_ts = cluster->pd_client->getTS();
commitKeys(bo, batch.keys);
cleanupKeys(bo, batch.keys);
return;
}
if (response.has_error())
{
throw Exception("meet errors: " + response.error().ShortDebugString(), LockError);
throw Exception("cleanup failed: " + response.error().ShortDebugString(), LockError);
}

commited = true;
}

uint64_t sendTxnHeartBeat(Backoffer & bo, Cluster * cluster, std::string & primary_key, uint64_t start_ts, uint64_t ttl)
Expand Down
Loading
Loading