diff --git a/include/pingcap/kv/2pc.h b/include/pingcap/kv/2pc.h index 416abbe4..11f16781 100644 --- a/include/pingcap/kv/2pc.h +++ b/include/pingcap/kv/2pc.h @@ -8,6 +8,7 @@ #include #include +#include #include #include #include @@ -98,6 +99,7 @@ struct TwoPhaseCommitter : public std::enable_shared_from_this region_txn_size; + // Total bytes of all keys and values in this transaction. Used for lock TTL decisions. uint64_t txn_size = 0; int lock_ttl = 0; @@ -106,6 +108,8 @@ struct TwoPhaseCommitter : public std::enable_shared_from_this & keys) { doActionOnKeys(bo, keys); } + void cleanupKeys(Backoffer & bo, const std::vector & keys) { doActionOnKeys(bo, keys); } + template void doActionOnKeys(Backoffer & bo, const std::vector & cur_keys) { @@ -178,9 +186,12 @@ struct TwoPhaseCommitter : public std::enable_shared_from_this::max() && primary_idx != 0) + if (primary_idx != std::numeric_limits::max()) { - std::swap(batches[0], batches[primary_idx]); + if (primary_idx != 0) + { + std::swap(batches[0], batches[primary_idx]); + } batches[0].is_primary = true; } @@ -213,12 +224,18 @@ struct TwoPhaseCommitter : public std::enable_shared_from_this= txnCommitBatchSize) { - uint64_t txn_size_mb = txn_size / bytesPerMiB; + double txn_size_mb = static_cast(txn_size) / bytesPerMiB; lock_ttl = static_cast(ttlFactor * sqrt(txn_size_mb)); if (lock_ttl < defaultLockTTL) { @@ -47,15 +52,15 @@ TwoPhaseCommitter::TwoPhaseCommitter(Txn * txn, bool _use_async_commit) txn->walkBuffer([&](const std::string & key, const std::string & value) { keys.push_back(key); mutations.emplace(key, value); + txn_size += key.size() + value.size(); }); cluster = txn->cluster; start_ts = txn->start_ts; - primary_lock = keys[0]; - txn_size = mutations.size(); - // TODO: use right lock_ttl - // currently prewrite is not concurrent, so the right lock_ttl is not enough for prewrite to complete - // lock_ttl = txnLockTTL(txn->start_time, txn_size); - lock_ttl = defaultLockTTL; + if (!keys.empty()) + { + primary_lock = keys[0]; + } + lock_ttl = txnLockTTL(txn->start_time, txn_size); if (txn_size > ttlManagerRunThreshold) { lock_ttl = managedLockTTL; @@ -66,6 +71,11 @@ void TwoPhaseCommitter::execute() { try { + if (keys.empty()) + { + return; + } + if (use_async_commit) { // If we want to use async commit or 1PC and also want external consistency across @@ -103,15 +113,32 @@ void TwoPhaseCommitter::execute() // TODO: check expired Backoffer commit_bo(commitMaxBackoff); commitKeys(commit_bo, keys); - // TODO: Process commit exception ttl_manager.close(); } catch (Exception & e) { - if (!commited) + ttl_manager.close(); + if (commited) { - // TODO: Rollback keys. + // The primary commit makes the transaction durable. Match + // client-go/TiDB 2PC semantics: secondary commit failures are logged + // but not surfaced as Txn::commit() failures, because retrying the + // whole transaction could duplicate writes. + log->warning("write commit exception after primary committed: " + e.displayText()); + return; + } + if (!commit_result_undetermined) + { + try + { + Backoffer cleanup_bo(cleanupMaxBackoff); + cleanupKeys(cleanup_bo, keys); + } + catch (Exception & cleanup_error) + { + log->warning("2PC cleanup exception: " + cleanup_error.displayText()); + } } log->warning("write commit exception: " + e.displayText()); throw; @@ -210,7 +237,7 @@ void TwoPhaseCommitter::prewriteSingleBatch(Backoffer & bo, const BatchKeys & ba } else { - if (batch.keys[0] == primary_lock) + if (batch.is_primary) { // After writing the primary key, if the size of the transaction is large than 32M, // start the ttlManager. The ttlManager will be closed in tikvTxn.Commit(). @@ -253,25 +280,80 @@ void TwoPhaseCommitter::commitSingleBatch(Backoffer & bo, const BatchKeys & batc req.set_start_version(start_ts); req.set_commit_version(commit_ts); - kvrpcpb::CommitResponse response; + RegionClient region_client(cluster, batch.region); + uint32_t commit_ts_expired_retry = 0; + for (;;) + { + kvrpcpb::CommitResponse response; + try + { + region_client.sendReqToRegion(bo, req, &response); + } + catch (Exception & e) + { + if (batch.is_primary && e.code() != RegionEpochNotMatch) + { + commit_result_undetermined = true; + throw; + } + bo.backoff(boRegionMiss, e); + commitKeys(bo, batch.keys); + return; + } + + if (response.has_error()) + { + if (response.error().has_commit_ts_expired()) + { + const auto & rejected = response.error().commit_ts_expired(); + if (rejected.min_commit_ts() > rejected.attempted_commit_ts() + && rejected.min_commit_ts() - rejected.attempted_commit_ts() > maxCommitTsExpiredSkew) + { + throw Exception("2PC MinCommitTS is too large, got MinCommitTS: " + std::to_string(rejected.min_commit_ts()) + + ", AttemptedCommitTS: " + std::to_string(rejected.attempted_commit_ts()), + LockError); + } + if (++commit_ts_expired_retry > maxCommitTsExpiredRetry) + { + throw Exception("2PC commit_ts_expired retry limit exceeded: " + response.error().ShortDebugString(), LockError); + } + commit_ts = cluster->pd_client->getTS(); + req.set_commit_version(commit_ts); + continue; + } + throw Exception("meet errors: " + response.error().ShortDebugString(), LockError); + } + break; + } + + commited = true; +} + +void TwoPhaseCommitter::cleanupSingleBatch(Backoffer & bo, const BatchKeys & batch) +{ + kvrpcpb::BatchRollbackRequest req; + for (const auto & key : batch.keys) + { + req.add_keys(key); + } + req.set_start_version(start_ts); + + kvrpcpb::BatchRollbackResponse response; RegionClient region_client(cluster, batch.region); try { - region_client.sendReqToRegion(bo, req, &response); + region_client.sendReqToRegion(bo, req, &response); } catch (Exception & e) { bo.backoff(boRegionMiss, e); - commit_ts = cluster->pd_client->getTS(); - commitKeys(bo, batch.keys); + cleanupKeys(bo, batch.keys); return; } if (response.has_error()) { - throw Exception("meet errors: " + response.error().ShortDebugString(), LockError); + throw Exception("cleanup failed: " + response.error().ShortDebugString(), LockError); } - - commited = true; } uint64_t sendTxnHeartBeat(Backoffer & bo, Cluster * cluster, std::string & primary_key, uint64_t start_ts, uint64_t ttl) diff --git a/src/test/real_tikv_test/2pc_test.cc b/src/test/real_tikv_test/2pc_test.cc index a254b5d0..68cc0a0e 100644 --- a/src/test/real_tikv_test/2pc_test.cc +++ b/src/test/real_tikv_test/2pc_test.cc @@ -19,7 +19,9 @@ struct TestTwoPhaseCommitter TwoPhaseCommitterPtr committer; public: - TestTwoPhaseCommitter(Txn * txn) : committer(std::make_shared(txn)) {} + TestTwoPhaseCommitter(Txn * txn) + : committer(std::make_shared(txn)) + {} void prewriteKeys(Backoffer & bo, const std::vector & keys) { committer->prewriteKeys(bo, keys); } @@ -28,6 +30,12 @@ struct TestTwoPhaseCommitter std::vector keys() { return committer->keys; } void setCommitTS(int64_t commit_ts) { committer->commit_ts = commit_ts; } + + uint64_t startTS() { return committer->start_ts; } + + uint64_t txnSize() { return committer->txn_size; } + + int lockTTL() { return committer->lock_ttl; } }; } // namespace kv @@ -71,7 +79,6 @@ class TestWith2PCRealTiKV : public testing::Test TEST_F(TestWith2PCRealTiKV, testCommitRollback) { - // Commit. { Txn txn(test_cluster.get()); @@ -97,7 +104,7 @@ TEST_F(TestWith2PCRealTiKV, testCommitRollback) txn2.set("c", "c2"); txn2.commit(); - txn1.commit(); + ASSERT_THROW(txn1.commit(), Exception); Snapshot snap(test_cluster.get()); ASSERT_EQ(snap.Get("a"), "a"); @@ -106,9 +113,105 @@ TEST_F(TestWith2PCRealTiKV, testCommitRollback) } } -TEST_F(TestWith2PCRealTiKV, commitAfterReadByOtherTxn) +TEST_F(TestWith2PCRealTiKV, testEmptyTxnCommit) +{ + Txn txn(test_cluster.get()); + ASSERT_NO_THROW(txn.commit()); +} + +TEST_F(TestWith2PCRealTiKV, testCommitTsExpiredRetries) +{ + const std::string prefix = "clientc_commit_ts_expired_" + std::to_string(test_cluster->pd_client->getTS()) + "_"; + const std::string key = prefix + "k"; + + { + Txn txn(test_cluster.get()); + txn.set(key, "v0"); + txn.commit(); + } + + Txn txn(test_cluster.get()); + txn.set(key, "v1"); + TestTwoPhaseCommitter committer{&txn}; + auto keys = committer.keys(); + Backoffer prewrite_bo(prewriteMaxBackoff); + committer.prewriteKeys(prewrite_bo, keys); + + // A reader after prewrite pushes the lock's min_commit_ts. Commit with a + // stale commit_ts should be rejected by TiKV with CommitTsExpired, and the + // client should retry the same primary commit with a fresh commit_ts. + Txn reader(test_cluster.get()); + auto result = reader.get(key); + ASSERT_EQ(result.second, true); + ASSERT_EQ(result.first, "v0"); + + committer.setCommitTS(committer.startTS() + 1); + Backoffer commit_bo(commitMaxBackoff); + ASSERT_NO_THROW(committer.commitKeys(commit_bo, keys)); + + Snapshot snap(test_cluster.get()); + ASSERT_EQ(snap.Get(key), "v1"); +} + +TEST_F(TestWith2PCRealTiKV, testFailedPrewriteCleansWrittenLocks) +{ + const std::string prefix = "clientc_cleanup_" + std::to_string(test_cluster->pd_client->getTS()) + "_"; + const std::string key_a = prefix + "a"; + const std::string key_b = prefix + "b"; + const std::string key_c = prefix + "c"; + + { + Txn txn(test_cluster.get()); + txn.set(key_a, "a0"); + txn.set(key_b, "b0"); + txn.set(key_c, "c0"); + txn.commit(); + } + + test_cluster->splitRegion(key_c); + + Txn older_writer(test_cluster.get()); + older_writer.set(key_a, "a1"); + older_writer.set(key_b, "b1"); + older_writer.set(key_c, "c1"); + + { + Txn newer_writer(test_cluster.get()); + newer_writer.set(key_c, "c2"); + newer_writer.commit(); + } + + ASSERT_THROW(older_writer.commit(), Exception); + + Backoffer mvcc_bo(GetMaxBackoff); + ASSERT_FALSE(Snapshot(test_cluster.get()).mvccGet(mvcc_bo, key_a).has_lock()); + ASSERT_FALSE(Snapshot(test_cluster.get()).mvccGet(mvcc_bo, key_b).has_lock()); + + { + Txn next_writer(test_cluster.get()); + next_writer.set(key_a, "a2"); + next_writer.set(key_b, "b2"); + ASSERT_NO_THROW(next_writer.commit()); + } + + Snapshot snap(test_cluster.get()); + ASSERT_EQ(snap.Get(key_a), "a2"); + ASSERT_EQ(snap.Get(key_b), "b2"); + ASSERT_EQ(snap.Get(key_c), "c2"); +} + +TEST_F(TestWith2PCRealTiKV, testLargeTxnTTLUsesBytes) { + Txn txn(test_cluster.get()); + txn.set("clientc_large_ttl_key", std::string(33 * 1024 * 1024, 'x')); + TestTwoPhaseCommitter committer{&txn}; + + ASSERT_GT(committer.txnSize(), 32ULL * 1024 * 1024); + ASSERT_EQ(committer.lockTTL(), 20000); +} +TEST_F(TestWith2PCRealTiKV, commitAfterReadByOtherTxn) +{ // Commit. { Txn txn(test_cluster.get()); @@ -337,4 +440,4 @@ TEST_F(TestWith2PCRealTiKV, testScanWithLargeTxn) } } -} // namespace +} // namespace pingcap::tests