From 42f819a4bd0a516da8b8a9efeac38f04216853fa Mon Sep 17 00:00:00 2001
From: Ping Yu <yuping@pingcap.com>
Date: Tue, 15 Aug 2023 21:02:44 +0800
Subject: [PATCH 1/7] API v2 part1

Signed-off-by: Ping Yu <yuping@pingcap.com>
---
 src/mock.rs                    | 21 ++++++++++--
 src/pd/client.rs               | 38 ++++++++++++++++++----
 src/raw/client.rs              | 17 +++++++---
 src/raw/requests.rs            |  5 +++
 src/request/codec.rs           | 37 +++++++++++++++++++++
 src/request/mod.rs             |  6 ++++
 src/request/plan.rs            | 12 +++++--
 src/request/plan_builder.rs    | 19 +++++++----
 src/request/shard.rs           |  5 +--
 src/store/request.rs           |  6 ++++
 src/transaction/client.rs      | 59 +++++++++++++++++++++++++++-------
 src/transaction/snapshot.rs    |  8 +++--
 src/transaction/transaction.rs |  3 +-
 13 files changed, 198 insertions(+), 38 deletions(-)
 create mode 100644 src/request/codec.rs

diff --git a/src/mock.rs b/src/mock.rs
index eada6a8e..887bb2d2 100644
--- a/src/mock.rs
+++ b/src/mock.rs
@@ -18,6 +18,7 @@ use crate::proto::metapb::RegionEpoch;
 use crate::proto::metapb::{self};
 use crate::region::RegionId;
 use crate::region::RegionWithLeader;
+use crate::request::codec::ApiV1Codec;
 use crate::store::KvClient;
 use crate::store::KvConnect;
 use crate::store::RegionStore;
@@ -30,7 +31,7 @@ use crate::Timestamp;
 
 /// Create a `PdRpcClient` with it's internals replaced with mocks so that the
 /// client can be tested without doing any RPC calls.
-pub async fn pd_rpc_client() -> PdRpcClient<MockKvConnect, MockCluster> {
+pub async fn pd_rpc_client() -> PdRpcClient<ApiV1Codec, MockKvConnect, MockCluster> {
     let config = Config::default();
     PdRpcClient::new(
         config.clone(),
@@ -43,6 +44,7 @@ pub async fn pd_rpc_client() -> PdRpcClient<MockKvConnect, MockCluster> {
             ))
         },
         false,
+        Some(ApiV1Codec::default()),
     )
     .await
     .unwrap()
@@ -71,9 +73,18 @@ pub struct MockKvConnect;
 
 pub struct MockCluster;
 
-#[derive(new)]
 pub struct MockPdClient {
     client: MockKvClient,
+    codec: ApiV1Codec,
+}
+
+impl MockPdClient {
+    pub fn new(client: MockKvClient) -> MockPdClient {
+        MockPdClient {
+            client,
+            codec: ApiV1Codec::default(),
+        }
+    }
 }
 
 #[async_trait]
@@ -102,6 +113,7 @@ impl MockPdClient {
     pub fn default() -> MockPdClient {
         MockPdClient {
             client: MockKvClient::default(),
+            codec: ApiV1Codec::default(),
         }
     }
 
@@ -165,6 +177,7 @@ impl MockPdClient {
 
 #[async_trait]
 impl PdClient for MockPdClient {
+    type Codec = ApiV1Codec;
     type KvClient = MockKvClient;
 
     async fn map_region_to_store(self: Arc<Self>, region: RegionWithLeader) -> Result<RegionStore> {
@@ -210,4 +223,8 @@ impl PdClient for MockPdClient {
     }
 
     async fn invalidate_region_cache(&self, _ver_id: crate::region::RegionVerId) {}
+
+    fn get_codec(&self) -> &Self::Codec {
+        &self.codec
+    }
 }
diff --git a/src/pd/client.rs b/src/pd/client.rs
index 31f88968..e22b7c90 100644
--- a/src/pd/client.rs
+++ b/src/pd/client.rs
@@ -20,6 +20,7 @@ use crate::region::RegionId;
 use crate::region::RegionVerId;
 use crate::region::RegionWithLeader;
 use crate::region_cache::RegionCache;
+use crate::request::codec::{ApiV1Codec, Codec};
 use crate::store::KvClient;
 use crate::store::KvConnect;
 use crate::store::RegionStore;
@@ -50,6 +51,7 @@ use crate::Timestamp;
 /// So if we use transactional APIs, keys in PD are encoded and PD does not know about the encoding stuff.
 #[async_trait]
 pub trait PdClient: Send + Sync + 'static {
+    type Codec: Codec;
     type KvClient: KvClient + Send + Sync + 'static;
 
     /// In transactional API, `region` is decoded (keys in raw format).
@@ -200,20 +202,30 @@ pub trait PdClient: Send + Sync + 'static {
     async fn update_leader(&self, ver_id: RegionVerId, leader: metapb::Peer) -> Result<()>;
 
     async fn invalidate_region_cache(&self, ver_id: RegionVerId);
+
+    /// Get the codec carried by `PdClient`.
+    /// The purpose of carrying the codec is to reduce the passing of it on so many calling paths.
+    fn get_codec(&self) -> &Self::Codec;
 }
 
 /// This client converts requests for the logical TiKV cluster into requests
 /// for a single TiKV store using PD and internal logic.
-pub struct PdRpcClient<KvC: KvConnect + Send + Sync + 'static = TikvConnect, Cl = Cluster> {
+pub struct PdRpcClient<
+    Cod: Codec = ApiV1Codec,
+    KvC: KvConnect + Send + Sync + 'static = TikvConnect,
+    Cl = Cluster,
+> {
     pd: Arc<RetryClient<Cl>>,
     kv_connect: KvC,
     kv_client_cache: Arc<RwLock<HashMap<String, KvC::KvClient>>>,
     enable_codec: bool,
     region_cache: RegionCache<RetryClient<Cl>>,
+    codec: Option<Cod>,
 }
 
 #[async_trait]
-impl<KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<KvC> {
+impl<Cod: Codec, KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<Cod, KvC> {
+    type Codec = Cod;
     type KvClient = KvC::KvClient;
 
     async fn map_region_to_store(self: Arc<Self>, region: RegionWithLeader) -> Result<RegionStore> {
@@ -255,31 +267,40 @@ impl<KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<KvC> {
     async fn invalidate_region_cache(&self, ver_id: RegionVerId) {
         self.region_cache.invalidate_region_cache(ver_id).await
     }
+
+    fn get_codec(&self) -> &Self::Codec {
+        self.codec
+            .as_ref()
+            .unwrap_or_else(|| panic!("codec not set"))
+    }
 }
 
-impl PdRpcClient<TikvConnect, Cluster> {
+impl<Cod: Codec> PdRpcClient<Cod, TikvConnect, Cluster> {
     pub async fn connect(
         pd_endpoints: &[String],
         config: Config,
         enable_codec: bool,
-    ) -> Result<PdRpcClient> {
+        codec: Option<Cod>,
+    ) -> Result<PdRpcClient<Cod>> {
         PdRpcClient::new(
             config.clone(),
             |security_mgr| TikvConnect::new(security_mgr, config.timeout),
             |security_mgr| RetryClient::connect(pd_endpoints, security_mgr, config.timeout),
             enable_codec,
+            codec,
         )
         .await
     }
 }
 
-impl<KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<KvC, Cl> {
+impl<Cod: Codec, KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<Cod, KvC, Cl> {
     pub async fn new<PdFut, MakeKvC, MakePd>(
         config: Config,
         kv_connect: MakeKvC,
         pd: MakePd,
         enable_codec: bool,
-    ) -> Result<PdRpcClient<KvC, Cl>>
+        codec: Option<Cod>,
+    ) -> Result<PdRpcClient<Cod, KvC, Cl>>
     where
         PdFut: Future<Output = Result<RetryClient<Cl>>>,
         MakeKvC: FnOnce(Arc<SecurityManager>) -> KvC,
@@ -303,6 +324,7 @@ impl<KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<KvC, Cl> {
             kv_connect: kv_connect(security_mgr),
             enable_codec,
             region_cache: RegionCache::new(pd),
+            codec,
         })
     }
 
@@ -322,6 +344,10 @@ impl<KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<KvC, Cl> {
             Err(e) => Err(e),
         }
     }
+
+    pub fn set_codec(&mut self, codec: Cod) {
+        self.codec = Some(codec);
+    }
 }
 
 fn make_key_range(start_key: Vec<u8>, end_key: Vec<u8>) -> kvrpcpb::KeyRange {
diff --git a/src/raw/client.rs b/src/raw/client.rs
index 0bdc2f8b..b69662bc 100644
--- a/src/raw/client.rs
+++ b/src/raw/client.rs
@@ -15,6 +15,7 @@ use crate::pd::PdClient;
 use crate::pd::PdRpcClient;
 use crate::proto::metapb;
 use crate::raw::lowering::*;
+use crate::request::codec::{ApiV1Codec, Codec};
 use crate::request::Collect;
 use crate::request::CollectSingle;
 use crate::request::Plan;
@@ -35,7 +36,11 @@ const MAX_RAW_KV_SCAN_LIMIT: u32 = 10240;
 ///
 /// The returned results of raw request methods are [`Future`](std::future::Future)s that must be
 /// awaited to execute.
-pub struct Client<PdC: PdClient = PdRpcClient> {
+pub struct Client<Cod = ApiV1Codec, PdC = PdRpcClient<Cod>>
+where
+    Cod: Codec,
+    PdC: PdClient<Codec = Cod>,
+{
     rpc: Arc<PdC>,
     cf: Option<ColumnFamily>,
     backoff: Backoff,
@@ -54,7 +59,7 @@ impl Clone for Client {
     }
 }
 
-impl Client<PdRpcClient> {
+impl Client<ApiV1Codec, PdRpcClient> {
     /// Create a raw [`Client`] and connect to the TiKV cluster.
     ///
     /// Because TiKV is managed by a [PD](https://github.com/pingcap/pd/) cluster, the endpoints for
@@ -100,7 +105,9 @@ impl Client<PdRpcClient> {
         config: Config,
     ) -> Result<Self> {
         let pd_endpoints: Vec<String> = pd_endpoints.into_iter().map(Into::into).collect();
-        let rpc = Arc::new(PdRpcClient::connect(&pd_endpoints, config, false).await?);
+        let rpc = Arc::new(
+            PdRpcClient::connect(&pd_endpoints, config, false, Some(ApiV1Codec::default())).await?,
+        );
         Ok(Client {
             rpc,
             cf: None,
@@ -142,7 +149,9 @@ impl Client<PdRpcClient> {
             atomic: self.atomic,
         }
     }
+}
 
+impl<Cod: Codec> Client<Cod, PdRpcClient<Cod>> {
     /// Set the [`Backoff`] strategy for retrying requests.
     /// The default strategy is [`DEFAULT_REGION_BACKOFF`](crate::backoff::DEFAULT_REGION_BACKOFF).
     /// See [`Backoff`] for more information.
@@ -189,7 +198,7 @@ impl Client<PdRpcClient> {
     }
 }
 
-impl<PdC: PdClient> Client<PdC> {
+impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Client<Cod, PdC> {
     /// Create a new 'get' request.
     ///
     /// Once resolved this request will result in the fetching of the value associated with the
diff --git a/src/raw/requests.rs b/src/raw/requests.rs
index 107b8807..e0e64c5e 100644
--- a/src/raw/requests.rs
+++ b/src/raw/requests.rs
@@ -13,6 +13,7 @@ use super::RawRpcRequest;
 use crate::collect_first;
 use crate::pd::PdClient;
 use crate::proto::kvrpcpb;
+use crate::proto::kvrpcpb::ApiVersion;
 use crate::proto::metapb;
 use crate::proto::tikvpb::tikv_client::TikvClient;
 use crate::request::plan::ResponseWithShard;
@@ -397,6 +398,10 @@ impl Request for RawCoprocessorRequest {
     fn set_context(&mut self, context: kvrpcpb::Context) {
         self.inner.set_context(context);
     }
+
+    fn set_api_version(&mut self, api_version: ApiVersion) {
+        self.inner.set_api_version(api_version);
+    }
 }
 
 impl KvRequest for RawCoprocessorRequest {
diff --git a/src/request/codec.rs b/src/request/codec.rs
new file mode 100644
index 00000000..be5716ca
--- /dev/null
+++ b/src/request/codec.rs
@@ -0,0 +1,37 @@
+// Copyright 2023 TiKV Project Authors. Licensed under Apache-2.0.
+
+use crate::proto::kvrpcpb;
+use crate::request::KvRequest;
+use std::borrow::Cow;
+
+pub trait Codec: Clone + Sync + Send + 'static {
+    fn encode_request<'a, R: KvRequest>(&self, req: &'a R) -> Cow<'a, R> {
+        Cow::Borrowed(req)
+    }
+}
+
+#[derive(Clone, Default)]
+pub struct ApiV1Codec {}
+
+impl Codec for ApiV1Codec {}
+
+#[derive(Clone)]
+pub struct ApiV2Codec {
+    _keyspace_id: u32,
+}
+
+impl ApiV2Codec {
+    pub fn new(keyspace_id: u32) -> Self {
+        Self {
+            _keyspace_id: keyspace_id,
+        }
+    }
+}
+
+impl Codec for ApiV2Codec {
+    fn encode_request<'a, R: KvRequest>(&self, req: &'a R) -> Cow<'a, R> {
+        let mut req = req.clone();
+        req.set_api_version(kvrpcpb::ApiVersion::V2);
+        Cow::Owned(req)
+    }
+}
diff --git a/src/request/mod.rs b/src/request/mod.rs
index a1ac759e..22839443 100644
--- a/src/request/mod.rs
+++ b/src/request/mod.rs
@@ -32,6 +32,7 @@ use crate::store::HasKeyErrors;
 use crate::store::Request;
 use crate::transaction::HasLocks;
 
+pub mod codec;
 pub mod plan;
 mod plan_builder;
 mod shard;
@@ -88,6 +89,7 @@ mod test {
     use crate::mock::MockKvClient;
     use crate::mock::MockPdClient;
     use crate::proto::kvrpcpb;
+    use crate::proto::kvrpcpb::ApiVersion;
     use crate::proto::pdpb::Timestamp;
     use crate::proto::tikvpb::tikv_client::TikvClient;
     use crate::store::store_stream_for_keys;
@@ -138,6 +140,10 @@ mod test {
             fn set_context(&mut self, _: kvrpcpb::Context) {
                 unreachable!();
             }
+
+            fn set_api_version(&mut self, _api_version: ApiVersion) {
+                unreachable!();
+            }
         }
 
         #[async_trait]
diff --git a/src/request/plan.rs b/src/request/plan.rs
index 905e7fad..2f10d5c7 100644
--- a/src/request/plan.rs
+++ b/src/request/plan.rs
@@ -17,6 +17,7 @@ use crate::pd::PdClient;
 use crate::proto::errorpb;
 use crate::proto::errorpb::EpochNotMatch;
 use crate::proto::kvrpcpb;
+use crate::request::codec::Codec;
 use crate::request::shard::HasNextBatch;
 use crate::request::KvRequest;
 use crate::request::NextBatch;
@@ -48,22 +49,27 @@ pub trait Plan: Sized + Clone + Sync + Send + 'static {
 
 /// The simplest plan which just dispatches a request to a specific kv server.
 #[derive(Clone)]
-pub struct Dispatch<Req: KvRequest> {
+pub struct Dispatch<Cod: Codec, Req: KvRequest> {
     pub request: Req,
     pub kv_client: Option<Arc<dyn KvClient + Send + Sync>>,
+    pub codec: Cod,
 }
 
 #[async_trait]
-impl<Req: KvRequest> Plan for Dispatch<Req> {
+impl<Cod: Codec, Req: KvRequest> Plan for Dispatch<Cod, Req> {
     type Result = Req::Response;
 
     async fn execute(&self) -> Result<Self::Result> {
+        // `encode_request` will clone the request, which would have high overhead.
+        // TODO: consider in-place encoding.
+        let req = self.codec.encode_request(&self.request);
+
         let stats = tikv_stats(self.request.label());
         let result = self
             .kv_client
             .as_ref()
             .expect("Unreachable: kv_client has not been initialised in Dispatch")
-            .dispatch(&self.request)
+            .dispatch(req.as_ref())
             .await;
         let result = stats.done(result);
         result.map(|r| {
diff --git a/src/request/plan_builder.rs b/src/request/plan_builder.rs
index 5ce2350c..678c9686 100644
--- a/src/request/plan_builder.rs
+++ b/src/request/plan_builder.rs
@@ -6,6 +6,7 @@ use std::sync::Arc;
 use super::plan::PreserveShard;
 use crate::backoff::Backoff;
 use crate::pd::PdClient;
+use crate::request::codec::Codec;
 use crate::request::plan::CleanupLocks;
 use crate::request::shard::HasNextBatch;
 use crate::request::DefaultProcessor;
@@ -45,13 +46,17 @@ impl PlanBuilderPhase for NoTarget {}
 pub struct Targetted;
 impl PlanBuilderPhase for Targetted {}
 
-impl<PdC: PdClient, Req: KvRequest> PlanBuilder<PdC, Dispatch<Req>, NoTarget> {
+impl<Cod: Codec, PdC: PdClient<Codec = Cod>, Req: KvRequest>
+    PlanBuilder<PdC, Dispatch<Cod, Req>, NoTarget>
+{
     pub fn new(pd_client: Arc<PdC>, request: Req) -> Self {
+        let codec = pd_client.get_codec().clone();
         PlanBuilder {
             pd_client,
             plan: Dispatch {
                 request,
                 kv_client: None,
+                codec,
             },
             phantom: PhantomData,
         }
@@ -183,12 +188,14 @@ where
     }
 }
 
-impl<PdC: PdClient, R: KvRequest> PlanBuilder<PdC, Dispatch<R>, NoTarget> {
+impl<Cod: Codec, PdC: PdClient<Codec = Cod>, R: KvRequest>
+    PlanBuilder<PdC, Dispatch<Cod, R>, NoTarget>
+{
     /// Target the request at a single region; caller supplies the store to target.
     pub async fn single_region_with_store(
         self,
         store: RegionStore,
-    ) -> Result<PlanBuilder<PdC, Dispatch<R>, Targetted>> {
+    ) -> Result<PlanBuilder<PdC, Dispatch<Cod, R>, Targetted>> {
         set_single_region_store(self.plan, store, self.pd_client)
     }
 }
@@ -222,11 +229,11 @@ where
     }
 }
 
-fn set_single_region_store<PdC: PdClient, R: KvRequest>(
-    mut plan: Dispatch<R>,
+fn set_single_region_store<Cod: Codec, PdC: PdClient<Codec = Cod>, R: KvRequest>(
+    mut plan: Dispatch<Cod, R>,
     store: RegionStore,
     pd_client: Arc<PdC>,
-) -> Result<PlanBuilder<PdC, Dispatch<R>, Targetted>> {
+) -> Result<PlanBuilder<PdC, Dispatch<Cod, R>, Targetted>> {
     plan.request
         .set_context(store.region_with_leader.context()?);
     plan.kv_client = Some(store.client);
diff --git a/src/request/shard.rs b/src/request/shard.rs
index aaefab72..413ed9ed 100644
--- a/src/request/shard.rs
+++ b/src/request/shard.rs
@@ -6,6 +6,7 @@ use futures::stream::BoxStream;
 
 use super::plan::PreserveShard;
 use crate::pd::PdClient;
+use crate::request::codec::Codec;
 use crate::request::plan::CleanupLocks;
 use crate::request::Dispatch;
 use crate::request::KvRequest;
@@ -80,7 +81,7 @@ pub trait NextBatch {
     fn next_batch(&mut self, _range: (Vec<u8>, Vec<u8>));
 }
 
-impl<Req: KvRequest + Shardable> Shardable for Dispatch<Req> {
+impl<Cod: Codec, Req: KvRequest + Shardable> Shardable for Dispatch<Cod, Req> {
     type Shard = Req::Shard;
 
     fn shards(
@@ -96,7 +97,7 @@ impl<Req: KvRequest + Shardable> Shardable for Dispatch<Req> {
     }
 }
 
-impl<Req: KvRequest + NextBatch> NextBatch for Dispatch<Req> {
+impl<Cod: Codec, Req: KvRequest + NextBatch> NextBatch for Dispatch<Cod, Req> {
     fn next_batch(&mut self, range: (Vec<u8>, Vec<u8>)) {
         self.request.next_batch(range);
     }
diff --git a/src/store/request.rs b/src/store/request.rs
index 5060eb01..a26e57e0 100644
--- a/src/store/request.rs
+++ b/src/store/request.rs
@@ -22,6 +22,7 @@ pub trait Request: Any + Sync + Send + 'static {
     fn label(&self) -> &'static str;
     fn as_any(&self) -> &dyn Any;
     fn set_context(&mut self, context: kvrpcpb::Context);
+    fn set_api_version(&mut self, api_version: kvrpcpb::ApiVersion);
 }
 
 macro_rules! impl_request {
@@ -54,6 +55,11 @@ macro_rules! impl_request {
             fn set_context(&mut self, context: kvrpcpb::Context) {
                 self.context = Some(context);
             }
+
+            fn set_api_version(&mut self, api_version: kvrpcpb::ApiVersion) {
+                let context = self.context.get_or_insert(kvrpcpb::Context::default());
+                context.api_version = api_version.into();
+            }
         }
     };
 }
diff --git a/src/transaction/client.rs b/src/transaction/client.rs
index 64d32451..4212079c 100644
--- a/src/transaction/client.rs
+++ b/src/transaction/client.rs
@@ -10,6 +10,7 @@ use crate::config::Config;
 use crate::pd::PdClient;
 use crate::pd::PdRpcClient;
 use crate::proto::pdpb::Timestamp;
+use crate::request::codec::{ApiV1Codec, ApiV2Codec, Codec};
 use crate::request::plan::CleanupLocksResult;
 use crate::request::Plan;
 use crate::timestamp::TimestampExt;
@@ -42,11 +43,11 @@ const SCAN_LOCK_BATCH_SIZE: u32 = 1024;
 ///
 /// The returned results of transactional requests are [`Future`](std::future::Future)s that must be
 /// awaited to execute.
-pub struct Client {
-    pd: Arc<PdRpcClient>,
+pub struct Client<Cod: Codec = ApiV1Codec> {
+    pd: Arc<PdRpcClient<Cod>>,
 }
 
-impl Clone for Client {
+impl<Cod: Codec> Clone for Client<Cod> {
     fn clone(&self) -> Self {
         Self {
             pd: self.pd.clone(),
@@ -54,7 +55,7 @@ impl Clone for Client {
     }
 }
 
-impl Client {
+impl Client<ApiV1Codec> {
     /// Create a transactional [`Client`] and connect to the TiKV cluster.
     ///
     /// Because TiKV is managed by a [PD](https://github.com/pingcap/pd/) cluster, the endpoints for
@@ -71,7 +72,6 @@ impl Client {
     /// # });
     /// ```
     pub async fn new<S: Into<String>>(pd_endpoints: Vec<S>) -> Result<Client> {
-        // debug!("creating transactional client");
         Self::new_with_config(pd_endpoints, Config::default()).await
     }
 
@@ -100,9 +100,35 @@ impl Client {
         pd_endpoints: Vec<S>,
         config: Config,
     ) -> Result<Client> {
+        Self::new_with_codec(pd_endpoints, config, ApiV1Codec::default()).await
+    }
+}
+
+impl Client<ApiV2Codec> {
+    pub async fn new_with_config_v2<S: Into<String>>(
+        _keyspace_name: &str,
+        pd_endpoints: Vec<S>,
+        config: Config,
+    ) -> Result<Client<ApiV2Codec>> {
+        debug!("creating new transactional client APIv2");
+        let pd_endpoints: Vec<String> = pd_endpoints.into_iter().map(Into::into).collect();
+        let mut pd = PdRpcClient::connect(&pd_endpoints, config, true, None).await?;
+        let keyspace_id = 0; // TODO: get keyspace_id by pd.get_keyspace(keyspace_name)
+        pd.set_codec(ApiV2Codec::new(keyspace_id));
+        Ok(Client { pd: Arc::new(pd) })
+    }
+}
+
+impl<Cod: Codec> Client<Cod> {
+    pub async fn new_with_codec<S: Into<String>>(
+        pd_endpoints: Vec<S>,
+        config: Config,
+        codec: Cod,
+    ) -> Result<Client<Cod>> {
         debug!("creating new transactional client");
         let pd_endpoints: Vec<String> = pd_endpoints.into_iter().map(Into::into).collect();
-        let pd = Arc::new(PdRpcClient::connect(&pd_endpoints, config, true).await?);
+        let pd =
+            Arc::new(PdRpcClient::<Cod>::connect(&pd_endpoints, config, true, Some(codec)).await?);
         Ok(Client { pd })
     }
 
@@ -126,7 +152,7 @@ impl Client {
     /// transaction.commit().await.unwrap();
     /// # });
     /// ```
-    pub async fn begin_optimistic(&self) -> Result<Transaction> {
+    pub async fn begin_optimistic(&self) -> Result<Transaction<PdRpcClient<Cod>>> {
         debug!("creating new optimistic transaction");
         let timestamp = self.current_timestamp().await?;
         Ok(self.new_transaction(timestamp, TransactionOptions::new_optimistic()))
@@ -149,7 +175,7 @@ impl Client {
     /// transaction.commit().await.unwrap();
     /// # });
     /// ```
-    pub async fn begin_pessimistic(&self) -> Result<Transaction> {
+    pub async fn begin_pessimistic(&self) -> Result<Transaction<PdRpcClient<Cod>>> {
         debug!("creating new pessimistic transaction");
         let timestamp = self.current_timestamp().await?;
         Ok(self.new_transaction(timestamp, TransactionOptions::new_pessimistic()))
@@ -172,14 +198,21 @@ impl Client {
     /// transaction.commit().await.unwrap();
     /// # });
     /// ```
-    pub async fn begin_with_options(&self, options: TransactionOptions) -> Result<Transaction> {
+    pub async fn begin_with_options(
+        &self,
+        options: TransactionOptions,
+    ) -> Result<Transaction<PdRpcClient<Cod>>> {
         debug!("creating new customized transaction");
         let timestamp = self.current_timestamp().await?;
         Ok(self.new_transaction(timestamp, options))
     }
 
     /// Create a new [`Snapshot`](Snapshot) at the given [`Timestamp`](Timestamp).
-    pub fn snapshot(&self, timestamp: Timestamp, options: TransactionOptions) -> Snapshot {
+    pub fn snapshot(
+        &self,
+        timestamp: Timestamp,
+        options: TransactionOptions,
+    ) -> Snapshot<PdRpcClient<Cod>> {
         debug!("creating new snapshot");
         Snapshot::new(self.new_transaction(timestamp, options.read_only()))
     }
@@ -272,7 +305,11 @@ impl Client {
         plan.execute().await
     }
 
-    fn new_transaction(&self, timestamp: Timestamp, options: TransactionOptions) -> Transaction {
+    fn new_transaction(
+        &self,
+        timestamp: Timestamp,
+        options: TransactionOptions,
+    ) -> Transaction<PdRpcClient<Cod>> {
         Transaction::new(timestamp, self.pd.clone(), options)
     }
 }
diff --git a/src/transaction/snapshot.rs b/src/transaction/snapshot.rs
index 5694614b..2ae15769 100644
--- a/src/transaction/snapshot.rs
+++ b/src/transaction/snapshot.rs
@@ -3,6 +3,8 @@
 use derive_new::new;
 use log::debug;
 
+use crate::pd::{PdClient, PdRpcClient};
+use crate::request::codec::Codec;
 use crate::BoundRange;
 use crate::Key;
 use crate::KvPair;
@@ -18,11 +20,11 @@ use crate::Value;
 ///
 /// See the [Transaction](struct@crate::Transaction) docs for more information on the methods.
 #[derive(new)]
-pub struct Snapshot {
-    transaction: Transaction,
+pub struct Snapshot<PdC: PdClient = PdRpcClient> {
+    transaction: Transaction<PdC>,
 }
 
-impl Snapshot {
+impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Snapshot<PdC> {
     /// Get the value associated with the given key.
     pub async fn get(&mut self, key: impl Into<Key>) -> Result<Option<Value>> {
         debug!("invoking get request on snapshot");
diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs
index 44532f7e..f4b49a3e 100644
--- a/src/transaction/transaction.rs
+++ b/src/transaction/transaction.rs
@@ -18,6 +18,7 @@ use crate::pd::PdClient;
 use crate::pd::PdRpcClient;
 use crate::proto::kvrpcpb;
 use crate::proto::pdpb::Timestamp;
+use crate::request::codec::Codec;
 use crate::request::Collect;
 use crate::request::CollectError;
 use crate::request::CollectSingle;
@@ -83,7 +84,7 @@ pub struct Transaction<PdC: PdClient = PdRpcClient> {
     start_instant: Instant,
 }
 
-impl<PdC: PdClient> Transaction<PdC> {
+impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Transaction<PdC> {
     pub(crate) fn new(
         timestamp: Timestamp,
         rpc: Arc<PdC>,

From 40cdb54082c17625d2f534c80d065de050a1b717 Mon Sep 17 00:00:00 2001
From: Ping Yu <yuping@pingcap.com>
Date: Fri, 25 Aug 2023 20:28:43 +0800
Subject: [PATCH 2/7] inplace encoding

Signed-off-by: Ping Yu <yuping@pingcap.com>
---
 src/lib.rs                     |  2 ++
 src/raw/client.rs              | 35 +++++++++++++++++++++-----------
 src/raw/requests.rs            |  4 +++-
 src/request/codec.rs           | 24 +++++++++++++++-------
 src/request/mod.rs             | 13 +++++++++---
 src/request/plan.rs            | 12 +++--------
 src/request/plan_builder.rs    | 24 +++++++++-------------
 src/request/shard.rs           |  5 ++---
 src/transaction/client.rs      |  8 +++++---
 src/transaction/lock.rs        | 16 ++++++++++-----
 src/transaction/transaction.rs | 37 ++++++++++++++++++++++------------
 11 files changed, 109 insertions(+), 71 deletions(-)

diff --git a/src/lib.rs b/src/lib.rs
index 60dc2956..b9ba66c0 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -145,6 +145,8 @@ pub use crate::raw::Client as RawClient;
 #[doc(inline)]
 pub use crate::raw::ColumnFamily;
 #[doc(inline)]
+pub use crate::request::codec;
+#[doc(inline)]
 pub use crate::request::RetryOptions;
 #[doc(inline)]
 pub use crate::timestamp::Timestamp;
diff --git a/src/raw/client.rs b/src/raw/client.rs
index b69662bc..5387ac04 100644
--- a/src/raw/client.rs
+++ b/src/raw/client.rs
@@ -15,7 +15,7 @@ use crate::pd::PdClient;
 use crate::pd::PdRpcClient;
 use crate::proto::metapb;
 use crate::raw::lowering::*;
-use crate::request::codec::{ApiV1Codec, Codec};
+use crate::request::codec::{ApiV1Codec, Codec, EncodedRequest};
 use crate::request::Collect;
 use crate::request::CollectSingle;
 use crate::request::Plan;
@@ -220,7 +220,8 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Client<Cod, PdC> {
     pub async fn get(&self, key: impl Into<Key>) -> Result<Option<Value>> {
         debug!("invoking raw get request");
         let request = new_raw_get_request(key.into(), self.cf.clone());
-        let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
+        let encoded_req = EncodedRequest::new(request, self.rpc.get_codec());
+        let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req)
             .retry_multi_region(self.backoff.clone())
             .merge(CollectSingle)
             .post_process_default()
@@ -252,7 +253,8 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Client<Cod, PdC> {
     ) -> Result<Vec<KvPair>> {
         debug!("invoking raw batch_get request");
         let request = new_raw_batch_get_request(keys.into_iter().map(Into::into), self.cf.clone());
-        let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
+        let encoded_req = EncodedRequest::new(request, self.rpc.get_codec());
+        let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req)
             .retry_multi_region(self.backoff.clone())
             .merge(Collect)
             .plan();
@@ -280,7 +282,8 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Client<Cod, PdC> {
     pub async fn put(&self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
         debug!("invoking raw put request");
         let request = new_raw_put_request(key.into(), value.into(), self.cf.clone(), self.atomic);
-        let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
+        let encoded_req = EncodedRequest::new(request, self.rpc.get_codec());
+        let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req)
             .retry_multi_region(self.backoff.clone())
             .merge(CollectSingle)
             .extract_error()
@@ -316,7 +319,8 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Client<Cod, PdC> {
             self.cf.clone(),
             self.atomic,
         );
-        let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
+        let encoded_req = EncodedRequest::new(request, self.rpc.get_codec());
+        let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req)
             .retry_multi_region(self.backoff.clone())
             .extract_error()
             .plan();
@@ -344,7 +348,8 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Client<Cod, PdC> {
     pub async fn delete(&self, key: impl Into<Key>) -> Result<()> {
         debug!("invoking raw delete request");
         let request = new_raw_delete_request(key.into(), self.cf.clone(), self.atomic);
-        let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
+        let encoded_req = EncodedRequest::new(request, self.rpc.get_codec());
+        let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req)
             .retry_multi_region(self.backoff.clone())
             .merge(CollectSingle)
             .extract_error()
@@ -375,7 +380,8 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Client<Cod, PdC> {
         self.assert_non_atomic()?;
         let request =
             new_raw_batch_delete_request(keys.into_iter().map(Into::into), self.cf.clone());
-        let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
+        let encoded_req = EncodedRequest::new(request, self.rpc.get_codec());
+        let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req)
             .retry_multi_region(self.backoff.clone())
             .extract_error()
             .plan();
@@ -402,7 +408,8 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Client<Cod, PdC> {
         debug!("invoking raw delete_range request");
         self.assert_non_atomic()?;
         let request = new_raw_delete_range_request(range.into(), self.cf.clone());
-        let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
+        let encoded_req = EncodedRequest::new(request, self.rpc.get_codec());
+        let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req)
             .retry_multi_region(self.backoff.clone())
             .extract_error()
             .plan();
@@ -558,7 +565,8 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Client<Cod, PdC> {
             previous_value.into(),
             self.cf.clone(),
         );
-        let plan = crate::request::PlanBuilder::new(self.rpc.clone(), req)
+        let encoded_req = EncodedRequest::new(req, self.rpc.get_codec());
+        let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req)
             .retry_multi_region(self.backoff.clone())
             .merge(CollectSingle)
             .post_process_default()
@@ -581,7 +589,8 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Client<Cod, PdC> {
             ranges.into_iter().map(Into::into),
             request_builder,
         );
-        let plan = crate::request::PlanBuilder::new(self.rpc.clone(), req)
+        let encoded_req = EncodedRequest::new(req, self.rpc.get_codec());
+        let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req)
             .preserve_shard()
             .retry_multi_region(self.backoff.clone())
             .post_process_default()
@@ -615,7 +624,8 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Client<Cod, PdC> {
         while cur_limit > 0 {
             let request =
                 new_raw_scan_request(cur_range.clone(), cur_limit, key_only, self.cf.clone());
-            let resp = crate::request::PlanBuilder::new(self.rpc.clone(), request)
+            let encoded_req = EncodedRequest::new(request, self.rpc.get_codec());
+            let resp = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req)
                 .single_region_with_store(region_store.clone())
                 .await?
                 .plan()
@@ -670,7 +680,8 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Client<Cod, PdC> {
             key_only,
             self.cf.clone(),
         );
-        let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
+        let encoded_req = EncodedRequest::new(request, self.rpc.get_codec());
+        let plan = crate::request::PlanBuilder::new(self.rpc.clone(), encoded_req)
             .retry_multi_region(self.backoff.clone())
             .merge(Collect)
             .plan();
diff --git a/src/raw/requests.rs b/src/raw/requests.rs
index e0e64c5e..ff1cc14f 100644
--- a/src/raw/requests.rs
+++ b/src/raw/requests.rs
@@ -501,6 +501,7 @@ mod test {
     use crate::mock::MockKvClient;
     use crate::mock::MockPdClient;
     use crate::proto::kvrpcpb;
+    use crate::request::codec::EncodedRequest;
     use crate::request::Plan;
     use crate::Key;
 
@@ -535,7 +536,8 @@ mod test {
             key_only: true,
             ..Default::default()
         };
-        let plan = crate::request::PlanBuilder::new(client, scan)
+        let encoded_scan = EncodedRequest::new(scan, client.get_codec());
+        let plan = crate::request::PlanBuilder::new(client, encoded_scan)
             .resolve_lock(OPTIMISTIC_BACKOFF)
             .retry_multi_region(DEFAULT_REGION_BACKOFF)
             .merge(Collect)
diff --git a/src/request/codec.rs b/src/request/codec.rs
index be5716ca..06e447c6 100644
--- a/src/request/codec.rs
+++ b/src/request/codec.rs
@@ -2,12 +2,10 @@
 
 use crate::proto::kvrpcpb;
 use crate::request::KvRequest;
-use std::borrow::Cow;
 
 pub trait Codec: Clone + Sync + Send + 'static {
-    fn encode_request<'a, R: KvRequest>(&self, req: &'a R) -> Cow<'a, R> {
-        Cow::Borrowed(req)
-    }
+    fn encode_request<R: KvRequest>(&self, _req: &mut R) {}
+    // TODO: fn decode_response()
 }
 
 #[derive(Clone, Default)]
@@ -29,9 +27,21 @@ impl ApiV2Codec {
 }
 
 impl Codec for ApiV2Codec {
-    fn encode_request<'a, R: KvRequest>(&self, req: &'a R) -> Cow<'a, R> {
-        let mut req = req.clone();
+    fn encode_request<R: KvRequest>(&self, req: &mut R) {
         req.set_api_version(kvrpcpb::ApiVersion::V2);
-        Cow::Owned(req)
+        // TODO: req.encode_request(self);
+    }
+}
+
+// EncodeRequest is just a type wrapper to avoid passing not encoded request to `PlanBuilder` by mistake.
+#[derive(Clone)]
+pub struct EncodedRequest<Req: KvRequest> {
+    pub inner: Req,
+}
+
+impl<Req: KvRequest> EncodedRequest<Req> {
+    pub fn new<C: Codec>(mut req: Req, codec: &C) -> Self {
+        codec.encode_request(&mut req);
+        Self { inner: req }
     }
 }
diff --git a/src/request/mod.rs b/src/request/mod.rs
index 22839443..55746c5c 100644
--- a/src/request/mod.rs
+++ b/src/request/mod.rs
@@ -42,6 +42,9 @@ mod shard;
 pub trait KvRequest: Request + Sized + Clone + Sync + Send + 'static {
     /// The expected response to the request.
     type Response: HasKeyErrors + HasLocks + Clone + Send + 'static;
+
+    // TODO: fn encode_request()
+    // TODO: fn decode_response()
 }
 
 #[derive(Clone, Debug, new, Eq, PartialEq)]
@@ -88,10 +91,12 @@ mod test {
     use super::*;
     use crate::mock::MockKvClient;
     use crate::mock::MockPdClient;
+    use crate::pd::PdClient;
     use crate::proto::kvrpcpb;
     use crate::proto::kvrpcpb::ApiVersion;
     use crate::proto::pdpb::Timestamp;
     use crate::proto::tikvpb::tikv_client::TikvClient;
+    use crate::request::codec::EncodedRequest;
     use crate::store::store_stream_for_keys;
     use crate::store::HasRegionError;
     use crate::transaction::lowering::new_commit_request;
@@ -189,7 +194,8 @@ mod test {
             |_: &dyn Any| Ok(Box::new(MockRpcResponse) as Box<dyn Any>),
         )));
 
-        let plan = crate::request::PlanBuilder::new(pd_client.clone(), request)
+        let encoded_req = EncodedRequest::new(request, pd_client.get_codec());
+        let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req)
             .resolve_lock(Backoff::no_jitter_backoff(1, 1, 3))
             .retry_multi_region(Backoff::no_jitter_backoff(1, 1, 3))
             .extract_error()
@@ -213,16 +219,17 @@ mod test {
 
         let key: Key = "key".to_owned().into();
         let req = new_commit_request(iter::once(key), Timestamp::default(), Timestamp::default());
+        let encoded_req = EncodedRequest::new(req, pd_client.get_codec());
 
         // does not extract error
-        let plan = crate::request::PlanBuilder::new(pd_client.clone(), req.clone())
+        let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req.clone())
             .resolve_lock(OPTIMISTIC_BACKOFF)
             .retry_multi_region(OPTIMISTIC_BACKOFF)
             .plan();
         assert!(plan.execute().await.is_ok());
 
         // extract error
-        let plan = crate::request::PlanBuilder::new(pd_client.clone(), req)
+        let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req)
             .resolve_lock(OPTIMISTIC_BACKOFF)
             .retry_multi_region(OPTIMISTIC_BACKOFF)
             .extract_error()
diff --git a/src/request/plan.rs b/src/request/plan.rs
index 2f10d5c7..905e7fad 100644
--- a/src/request/plan.rs
+++ b/src/request/plan.rs
@@ -17,7 +17,6 @@ use crate::pd::PdClient;
 use crate::proto::errorpb;
 use crate::proto::errorpb::EpochNotMatch;
 use crate::proto::kvrpcpb;
-use crate::request::codec::Codec;
 use crate::request::shard::HasNextBatch;
 use crate::request::KvRequest;
 use crate::request::NextBatch;
@@ -49,27 +48,22 @@ pub trait Plan: Sized + Clone + Sync + Send + 'static {
 
 /// The simplest plan which just dispatches a request to a specific kv server.
 #[derive(Clone)]
-pub struct Dispatch<Cod: Codec, Req: KvRequest> {
+pub struct Dispatch<Req: KvRequest> {
     pub request: Req,
     pub kv_client: Option<Arc<dyn KvClient + Send + Sync>>,
-    pub codec: Cod,
 }
 
 #[async_trait]
-impl<Cod: Codec, Req: KvRequest> Plan for Dispatch<Cod, Req> {
+impl<Req: KvRequest> Plan for Dispatch<Req> {
     type Result = Req::Response;
 
     async fn execute(&self) -> Result<Self::Result> {
-        // `encode_request` will clone the request, which would have high overhead.
-        // TODO: consider in-place encoding.
-        let req = self.codec.encode_request(&self.request);
-
         let stats = tikv_stats(self.request.label());
         let result = self
             .kv_client
             .as_ref()
             .expect("Unreachable: kv_client has not been initialised in Dispatch")
-            .dispatch(req.as_ref())
+            .dispatch(&self.request)
             .await;
         let result = stats.done(result);
         result.map(|r| {
diff --git a/src/request/plan_builder.rs b/src/request/plan_builder.rs
index 678c9686..88a5e9ad 100644
--- a/src/request/plan_builder.rs
+++ b/src/request/plan_builder.rs
@@ -6,7 +6,7 @@ use std::sync::Arc;
 use super::plan::PreserveShard;
 use crate::backoff::Backoff;
 use crate::pd::PdClient;
-use crate::request::codec::Codec;
+use crate::request::codec::EncodedRequest;
 use crate::request::plan::CleanupLocks;
 use crate::request::shard::HasNextBatch;
 use crate::request::DefaultProcessor;
@@ -46,17 +46,13 @@ impl PlanBuilderPhase for NoTarget {}
 pub struct Targetted;
 impl PlanBuilderPhase for Targetted {}
 
-impl<Cod: Codec, PdC: PdClient<Codec = Cod>, Req: KvRequest>
-    PlanBuilder<PdC, Dispatch<Cod, Req>, NoTarget>
-{
-    pub fn new(pd_client: Arc<PdC>, request: Req) -> Self {
-        let codec = pd_client.get_codec().clone();
+impl<PdC: PdClient, Req: KvRequest> PlanBuilder<PdC, Dispatch<Req>, NoTarget> {
+    pub fn new(pd_client: Arc<PdC>, encoded_request: EncodedRequest<Req>) -> Self {
         PlanBuilder {
             pd_client,
             plan: Dispatch {
-                request,
+                request: encoded_request.inner,
                 kv_client: None,
-                codec,
             },
             phantom: PhantomData,
         }
@@ -188,14 +184,12 @@ where
     }
 }
 
-impl<Cod: Codec, PdC: PdClient<Codec = Cod>, R: KvRequest>
-    PlanBuilder<PdC, Dispatch<Cod, R>, NoTarget>
-{
+impl<PdC: PdClient, R: KvRequest> PlanBuilder<PdC, Dispatch<R>, NoTarget> {
     /// Target the request at a single region; caller supplies the store to target.
     pub async fn single_region_with_store(
         self,
         store: RegionStore,
-    ) -> Result<PlanBuilder<PdC, Dispatch<Cod, R>, Targetted>> {
+    ) -> Result<PlanBuilder<PdC, Dispatch<R>, Targetted>> {
         set_single_region_store(self.plan, store, self.pd_client)
     }
 }
@@ -229,11 +223,11 @@ where
     }
 }
 
-fn set_single_region_store<Cod: Codec, PdC: PdClient<Codec = Cod>, R: KvRequest>(
-    mut plan: Dispatch<Cod, R>,
+fn set_single_region_store<PdC: PdClient, R: KvRequest>(
+    mut plan: Dispatch<R>,
     store: RegionStore,
     pd_client: Arc<PdC>,
-) -> Result<PlanBuilder<PdC, Dispatch<Cod, R>, Targetted>> {
+) -> Result<PlanBuilder<PdC, Dispatch<R>, Targetted>> {
     plan.request
         .set_context(store.region_with_leader.context()?);
     plan.kv_client = Some(store.client);
diff --git a/src/request/shard.rs b/src/request/shard.rs
index 413ed9ed..aaefab72 100644
--- a/src/request/shard.rs
+++ b/src/request/shard.rs
@@ -6,7 +6,6 @@ use futures::stream::BoxStream;
 
 use super::plan::PreserveShard;
 use crate::pd::PdClient;
-use crate::request::codec::Codec;
 use crate::request::plan::CleanupLocks;
 use crate::request::Dispatch;
 use crate::request::KvRequest;
@@ -81,7 +80,7 @@ pub trait NextBatch {
     fn next_batch(&mut self, _range: (Vec<u8>, Vec<u8>));
 }
 
-impl<Cod: Codec, Req: KvRequest + Shardable> Shardable for Dispatch<Cod, Req> {
+impl<Req: KvRequest + Shardable> Shardable for Dispatch<Req> {
     type Shard = Req::Shard;
 
     fn shards(
@@ -97,7 +96,7 @@ impl<Cod: Codec, Req: KvRequest + Shardable> Shardable for Dispatch<Cod, Req> {
     }
 }
 
-impl<Cod: Codec, Req: KvRequest + NextBatch> NextBatch for Dispatch<Cod, Req> {
+impl<Req: KvRequest + NextBatch> NextBatch for Dispatch<Req> {
     fn next_batch(&mut self, range: (Vec<u8>, Vec<u8>)) {
         self.request.next_batch(range);
     }
diff --git a/src/transaction/client.rs b/src/transaction/client.rs
index 4212079c..8d1cf43d 100644
--- a/src/transaction/client.rs
+++ b/src/transaction/client.rs
@@ -10,7 +10,7 @@ use crate::config::Config;
 use crate::pd::PdClient;
 use crate::pd::PdRpcClient;
 use crate::proto::pdpb::Timestamp;
-use crate::request::codec::{ApiV1Codec, ApiV2Codec, Codec};
+use crate::request::codec::{ApiV1Codec, ApiV2Codec, Codec, EncodedRequest};
 use crate::request::plan::CleanupLocksResult;
 use crate::request::Plan;
 use crate::timestamp::TimestampExt;
@@ -279,7 +279,8 @@ impl<Cod: Codec> Client<Cod> {
         let ctx = ResolveLocksContext::default();
         let backoff = Backoff::equal_jitter_backoff(100, 10000, 50);
         let req = new_scan_lock_request(range.into(), safepoint, options.batch_size);
-        let plan = crate::request::PlanBuilder::new(self.pd.clone(), req)
+        let encoded_req = EncodedRequest::new(req, self.pd.get_codec());
+        let plan = crate::request::PlanBuilder::new(self.pd.clone(), encoded_req)
             .cleanup_locks(ctx.clone(), options, backoff)
             .retry_multi_region(DEFAULT_REGION_BACKOFF)
             .extract_error()
@@ -298,7 +299,8 @@ impl<Cod: Codec> Client<Cod> {
         batch_size: u32,
     ) -> Result<Vec<crate::proto::kvrpcpb::LockInfo>> {
         let req = new_scan_lock_request(range.into(), safepoint, batch_size);
-        let plan = crate::request::PlanBuilder::new(self.pd.clone(), req)
+        let encoded_req = EncodedRequest::new(req, self.pd.get_codec());
+        let plan = crate::request::PlanBuilder::new(self.pd.clone(), encoded_req)
             .retry_multi_region(DEFAULT_REGION_BACKOFF)
             .merge(crate::request::Collect)
             .plan();
diff --git a/src/transaction/lock.rs b/src/transaction/lock.rs
index 25522422..c55ab381 100644
--- a/src/transaction/lock.rs
+++ b/src/transaction/lock.rs
@@ -17,6 +17,7 @@ use crate::proto::kvrpcpb;
 use crate::proto::kvrpcpb::TxnInfo;
 use crate::proto::pdpb::Timestamp;
 use crate::region::RegionVerId;
+use crate::request::codec::EncodedRequest;
 use crate::request::Collect;
 use crate::request::CollectSingle;
 use crate::request::Plan;
@@ -77,7 +78,8 @@ pub async fn resolve_locks(
             Some(&commit_version) => commit_version,
             None => {
                 let request = requests::new_cleanup_request(lock.primary_lock, lock.lock_version);
-                let plan = crate::request::PlanBuilder::new(pd_client.clone(), request)
+                let encoded_req = EncodedRequest::new(request, pd_client.get_codec());
+                let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req)
                     .resolve_lock(OPTIMISTIC_BACKOFF)
                     .retry_multi_region(DEFAULT_REGION_BACKOFF)
                     .merge(CollectSingle)
@@ -118,8 +120,9 @@ async fn resolve_lock_with_retry(
         let store = pd_client.clone().store_for_key(key.into()).await?;
         let ver_id = store.region_with_leader.ver_id();
         let request = requests::new_resolve_lock_request(start_version, commit_version);
+        let encoded_req = EncodedRequest::new(request, pd_client.get_codec());
         // The only place where single-region is used
-        let plan = crate::request::PlanBuilder::new(pd_client.clone(), request)
+        let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req)
             .single_region_with_store(store)
             .await?
             .resolve_lock(Backoff::no_backoff())
@@ -359,7 +362,8 @@ impl LockResolver {
             force_sync_commit,
             resolving_pessimistic_lock,
         );
-        let plan = crate::request::PlanBuilder::new(pd_client.clone(), req)
+        let encoded_req = EncodedRequest::new(req, pd_client.get_codec());
+        let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req)
             .retry_multi_region(DEFAULT_REGION_BACKOFF)
             .merge(CollectSingle)
             .extract_error()
@@ -383,7 +387,8 @@ impl LockResolver {
         txn_id: u64,
     ) -> Result<SecondaryLocksStatus> {
         let req = new_check_secondary_locks_request(keys, txn_id);
-        let plan = crate::request::PlanBuilder::new(pd_client.clone(), req)
+        let encoded_req = EncodedRequest::new(req, pd_client.get_codec());
+        let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req)
             .retry_multi_region(DEFAULT_REGION_BACKOFF)
             .extract_error()
             .merge(Collect)
@@ -399,7 +404,8 @@ impl LockResolver {
     ) -> Result<RegionVerId> {
         let ver_id = store.region_with_leader.ver_id();
         let request = requests::new_batch_resolve_lock_request(txn_infos.clone());
-        let plan = crate::request::PlanBuilder::new(pd_client.clone(), request)
+        let encoded_req = EncodedRequest::new(request, pd_client.get_codec());
+        let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req)
             .single_region_with_store(store.clone())
             .await?
             .extract_error()
diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs
index f4b49a3e..b28ea2b3 100644
--- a/src/transaction/transaction.rs
+++ b/src/transaction/transaction.rs
@@ -18,7 +18,7 @@ use crate::pd::PdClient;
 use crate::pd::PdRpcClient;
 use crate::proto::kvrpcpb;
 use crate::proto::pdpb::Timestamp;
-use crate::request::codec::Codec;
+use crate::request::codec::{Codec, EncodedRequest};
 use crate::request::Collect;
 use crate::request::CollectError;
 use crate::request::CollectSingle;
@@ -134,7 +134,7 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Transaction<PdC> {
 
         self.buffer
             .get_or_else(key, |key| async move {
-                let request = new_get_request(key, timestamp);
+                let request = EncodedRequest::new(new_get_request(key, timestamp), rpc.get_codec());
                 let plan = PlanBuilder::new(rpc, request)
                     .resolve_lock(retry_options.lock_backoff)
                     .retry_multi_region(DEFAULT_REGION_BACKOFF)
@@ -265,7 +265,8 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Transaction<PdC> {
         self.buffer
             .batch_get_or_else(keys.into_iter().map(|k| k.into()), move |keys| async move {
                 let request = new_batch_get_request(keys, timestamp);
-                let plan = PlanBuilder::new(rpc, request)
+                let encoded_req = EncodedRequest::new(request, rpc.get_codec());
+                let plan = PlanBuilder::new(rpc, encoded_req)
                     .resolve_lock(retry_options.lock_backoff)
                     .retry_multi_region(retry_options.region_backoff)
                     .merge(Collect)
@@ -692,7 +693,8 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Transaction<PdC> {
             primary_key,
             self.start_instant.elapsed().as_millis() as u64 + MAX_TTL,
         );
-        let plan = PlanBuilder::new(self.rpc.clone(), request)
+        let encoded_req = EncodedRequest::new(request, self.rpc.get_codec());
+        let plan = PlanBuilder::new(self.rpc.clone(), encoded_req)
             .resolve_lock(self.options.retry_options.lock_backoff.clone())
             .retry_multi_region(self.options.retry_options.region_backoff.clone())
             .merge(CollectSingle)
@@ -722,7 +724,8 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Transaction<PdC> {
                 move |new_range, new_limit| async move {
                     let request =
                         new_scan_request(new_range, timestamp, new_limit, key_only, reverse);
-                    let plan = PlanBuilder::new(rpc, request)
+                    let encoded_req = EncodedRequest::new(request, rpc.get_codec());
+                    let plan = PlanBuilder::new(rpc, encoded_req)
                         .resolve_lock(retry_options.lock_backoff)
                         .retry_multi_region(retry_options.region_backoff)
                         .merge(Collect)
@@ -777,7 +780,8 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Transaction<PdC> {
             for_update_ts.clone(),
             need_value,
         );
-        let plan = PlanBuilder::new(self.rpc.clone(), request)
+        let encoded_req = EncodedRequest::new(request, self.rpc.get_codec());
+        let plan = PlanBuilder::new(self.rpc.clone(), encoded_req)
             .resolve_lock(self.options.retry_options.lock_backoff.clone())
             .preserve_shard()
             .retry_multi_region_preserve_results(self.options.retry_options.region_backoff.clone())
@@ -831,7 +835,8 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Transaction<PdC> {
             start_version,
             for_update_ts,
         );
-        let plan = PlanBuilder::new(self.rpc.clone(), req)
+        let encoded_req = EncodedRequest::new(req, self.rpc.get_codec());
+        let plan = PlanBuilder::new(self.rpc.clone(), encoded_req)
             .resolve_lock(self.options.retry_options.lock_backoff.clone())
             .retry_multi_region(self.options.retry_options.region_backoff.clone())
             .extract_error()
@@ -901,7 +906,8 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Transaction<PdC> {
                     primary_key.clone(),
                     start_instant.elapsed().as_millis() as u64 + MAX_TTL,
                 );
-                let plan = PlanBuilder::new(rpc.clone(), request)
+                let encoded_req = EncodedRequest::new(request, rpc.get_codec());
+                let plan = PlanBuilder::new(rpc.clone(), encoded_req)
                     .retry_multi_region(region_backoff.clone())
                     .merge(CollectSingle)
                     .plan();
@@ -1210,7 +1216,8 @@ impl<PdC: PdClient> Committer<PdC> {
             .collect();
         // FIXME set max_commit_ts and min_commit_ts
 
-        let plan = PlanBuilder::new(self.rpc.clone(), request)
+        let encoded_req = EncodedRequest::new(request, self.rpc.get_codec());
+        let plan = PlanBuilder::new(self.rpc.clone(), encoded_req)
             .resolve_lock(self.options.retry_options.lock_backoff.clone())
             .retry_multi_region(self.options.retry_options.region_backoff.clone())
             .merge(CollectError)
@@ -1250,7 +1257,8 @@ impl<PdC: PdClient> Committer<PdC> {
             self.start_version.clone(),
             commit_version.clone(),
         );
-        let plan = PlanBuilder::new(self.rpc.clone(), req)
+        let encoded_req = EncodedRequest::new(req, self.rpc.get_codec());
+        let plan = PlanBuilder::new(self.rpc.clone(), encoded_req)
             .resolve_lock(self.options.retry_options.lock_backoff.clone())
             .retry_multi_region(self.options.retry_options.region_backoff.clone())
             .extract_error()
@@ -1314,7 +1322,8 @@ impl<PdC: PdClient> Committer<PdC> {
                 .filter(|key| &primary_key != key);
             new_commit_request(keys, self.start_version, commit_version)
         };
-        let plan = PlanBuilder::new(self.rpc, req)
+        let encoded_req = EncodedRequest::new(req, self.rpc.get_codec());
+        let plan = PlanBuilder::new(self.rpc, encoded_req)
             .resolve_lock(self.options.retry_options.lock_backoff)
             .retry_multi_region(self.options.retry_options.region_backoff)
             .extract_error()
@@ -1335,7 +1344,8 @@ impl<PdC: PdClient> Committer<PdC> {
         match self.options.kind {
             TransactionKind::Optimistic => {
                 let req = new_batch_rollback_request(keys, self.start_version);
-                let plan = PlanBuilder::new(self.rpc, req)
+                let encoded_req = EncodedRequest::new(req, self.rpc.get_codec());
+                let plan = PlanBuilder::new(self.rpc, encoded_req)
                     .resolve_lock(self.options.retry_options.lock_backoff)
                     .retry_multi_region(self.options.retry_options.region_backoff)
                     .extract_error()
@@ -1344,7 +1354,8 @@ impl<PdC: PdClient> Committer<PdC> {
             }
             TransactionKind::Pessimistic(for_update_ts) => {
                 let req = new_pessimistic_rollback_request(keys, self.start_version, for_update_ts);
-                let plan = PlanBuilder::new(self.rpc, req)
+                let encoded_req = EncodedRequest::new(req, self.rpc.get_codec());
+                let plan = PlanBuilder::new(self.rpc, encoded_req)
                     .resolve_lock(self.options.retry_options.lock_backoff)
                     .retry_multi_region(self.options.retry_options.region_backoff)
                     .extract_error()

From 9201cf6b80d6bf57ce9f63ceccd7c681627c2525 Mon Sep 17 00:00:00 2001
From: Ping Yu <yuping@pingcap.com>
Date: Fri, 25 Aug 2023 20:55:09 +0800
Subject: [PATCH 3/7] polish

Signed-off-by: Ping Yu <yuping@pingcap.com>
---
 src/mock.rs               | 14 +++++++-------
 src/pd/client.rs          | 31 +++++++++++++++++--------------
 src/raw/client.rs         |  9 +++++----
 src/request/codec.rs      | 17 ++++++++++++-----
 src/transaction/client.rs | 14 +++++++-------
 src/transaction/lock.rs   |  1 -
 6 files changed, 48 insertions(+), 38 deletions(-)

diff --git a/src/mock.rs b/src/mock.rs
index 887bb2d2..3b0b157f 100644
--- a/src/mock.rs
+++ b/src/mock.rs
@@ -18,7 +18,7 @@ use crate::proto::metapb::RegionEpoch;
 use crate::proto::metapb::{self};
 use crate::region::RegionId;
 use crate::region::RegionWithLeader;
-use crate::request::codec::ApiV1Codec;
+use crate::request::codec::ApiV1TxnCodec;
 use crate::store::KvClient;
 use crate::store::KvConnect;
 use crate::store::RegionStore;
@@ -31,7 +31,7 @@ use crate::Timestamp;
 
 /// Create a `PdRpcClient` with it's internals replaced with mocks so that the
 /// client can be tested without doing any RPC calls.
-pub async fn pd_rpc_client() -> PdRpcClient<ApiV1Codec, MockKvConnect, MockCluster> {
+pub async fn pd_rpc_client() -> PdRpcClient<ApiV1TxnCodec, MockKvConnect, MockCluster> {
     let config = Config::default();
     PdRpcClient::new(
         config.clone(),
@@ -44,7 +44,7 @@ pub async fn pd_rpc_client() -> PdRpcClient<ApiV1Codec, MockKvConnect, MockClust
             ))
         },
         false,
-        Some(ApiV1Codec::default()),
+        Some(ApiV1TxnCodec::default()),
     )
     .await
     .unwrap()
@@ -75,14 +75,14 @@ pub struct MockCluster;
 
 pub struct MockPdClient {
     client: MockKvClient,
-    codec: ApiV1Codec,
+    codec: ApiV1TxnCodec,
 }
 
 impl MockPdClient {
     pub fn new(client: MockKvClient) -> MockPdClient {
         MockPdClient {
             client,
-            codec: ApiV1Codec::default(),
+            codec: ApiV1TxnCodec::default(),
         }
     }
 }
@@ -113,7 +113,7 @@ impl MockPdClient {
     pub fn default() -> MockPdClient {
         MockPdClient {
             client: MockKvClient::default(),
-            codec: ApiV1Codec::default(),
+            codec: ApiV1TxnCodec::default(),
         }
     }
 
@@ -177,7 +177,7 @@ impl MockPdClient {
 
 #[async_trait]
 impl PdClient for MockPdClient {
-    type Codec = ApiV1Codec;
+    type Codec = ApiV1TxnCodec;
     type KvClient = MockKvClient;
 
     async fn map_region_to_store(self: Arc<Self>, region: RegionWithLeader) -> Result<RegionStore> {
diff --git a/src/pd/client.rs b/src/pd/client.rs
index e22b7c90..f291c62c 100644
--- a/src/pd/client.rs
+++ b/src/pd/client.rs
@@ -20,7 +20,7 @@ use crate::region::RegionId;
 use crate::region::RegionVerId;
 use crate::region::RegionWithLeader;
 use crate::region_cache::RegionCache;
-use crate::request::codec::{ApiV1Codec, Codec};
+use crate::request::codec::{ApiV1TxnCodec, Codec};
 use crate::store::KvClient;
 use crate::store::KvConnect;
 use crate::store::RegionStore;
@@ -191,8 +191,11 @@ pub trait PdClient: Send + Sync + 'static {
         .boxed()
     }
 
-    fn decode_region(mut region: RegionWithLeader, enable_codec: bool) -> Result<RegionWithLeader> {
-        if enable_codec {
+    fn decode_region(
+        mut region: RegionWithLeader,
+        enable_mvcc_codec: bool,
+    ) -> Result<RegionWithLeader> {
+        if enable_mvcc_codec {
             codec::decode_bytes_in_place(&mut region.region.start_key, false)?;
             codec::decode_bytes_in_place(&mut region.region.end_key, false)?;
         }
@@ -204,21 +207,21 @@ pub trait PdClient: Send + Sync + 'static {
     async fn invalidate_region_cache(&self, ver_id: RegionVerId);
 
     /// Get the codec carried by `PdClient`.
-    /// The purpose of carrying the codec is to reduce the passing of it on so many calling paths.
+    /// The purpose of carrying the codec is to avoid passing it on so many calling paths.
     fn get_codec(&self) -> &Self::Codec;
 }
 
 /// This client converts requests for the logical TiKV cluster into requests
 /// for a single TiKV store using PD and internal logic.
 pub struct PdRpcClient<
-    Cod: Codec = ApiV1Codec,
+    Cod: Codec = ApiV1TxnCodec,
     KvC: KvConnect + Send + Sync + 'static = TikvConnect,
     Cl = Cluster,
 > {
     pd: Arc<RetryClient<Cl>>,
     kv_connect: KvC,
     kv_client_cache: Arc<RwLock<HashMap<String, KvC::KvClient>>>,
-    enable_codec: bool,
+    enable_mvcc_codec: bool,
     region_cache: RegionCache<RetryClient<Cl>>,
     codec: Option<Cod>,
 }
@@ -236,20 +239,20 @@ impl<Cod: Codec, KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClien
     }
 
     async fn region_for_key(&self, key: &Key) -> Result<RegionWithLeader> {
-        let enable_codec = self.enable_codec;
-        let key = if enable_codec {
+        let enable_mvcc_codec = self.enable_mvcc_codec;
+        let key = if enable_mvcc_codec {
             key.to_encoded()
         } else {
             key.clone()
         };
 
         let region = self.region_cache.get_region_by_key(&key).await?;
-        Self::decode_region(region, enable_codec)
+        Self::decode_region(region, enable_mvcc_codec)
     }
 
     async fn region_for_id(&self, id: RegionId) -> Result<RegionWithLeader> {
         let region = self.region_cache.get_region_by_id(id).await?;
-        Self::decode_region(region, self.enable_codec)
+        Self::decode_region(region, self.enable_mvcc_codec)
     }
 
     async fn get_timestamp(self: Arc<Self>) -> Result<Timestamp> {
@@ -279,14 +282,14 @@ impl<Cod: Codec> PdRpcClient<Cod, TikvConnect, Cluster> {
     pub async fn connect(
         pd_endpoints: &[String],
         config: Config,
-        enable_codec: bool,
+        enable_mvcc_codec: bool, // TODO: infer from `codec`.
         codec: Option<Cod>,
     ) -> Result<PdRpcClient<Cod>> {
         PdRpcClient::new(
             config.clone(),
             |security_mgr| TikvConnect::new(security_mgr, config.timeout),
             |security_mgr| RetryClient::connect(pd_endpoints, security_mgr, config.timeout),
-            enable_codec,
+            enable_mvcc_codec,
             codec,
         )
         .await
@@ -298,7 +301,7 @@ impl<Cod: Codec, KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<Cod, Kv
         config: Config,
         kv_connect: MakeKvC,
         pd: MakePd,
-        enable_codec: bool,
+        enable_mvcc_codec: bool,
         codec: Option<Cod>,
     ) -> Result<PdRpcClient<Cod, KvC, Cl>>
     where
@@ -322,7 +325,7 @@ impl<Cod: Codec, KvC: KvConnect + Send + Sync + 'static, Cl> PdRpcClient<Cod, Kv
             pd: pd.clone(),
             kv_client_cache,
             kv_connect: kv_connect(security_mgr),
-            enable_codec,
+            enable_mvcc_codec,
             region_cache: RegionCache::new(pd),
             codec,
         })
diff --git a/src/raw/client.rs b/src/raw/client.rs
index 5387ac04..fc733015 100644
--- a/src/raw/client.rs
+++ b/src/raw/client.rs
@@ -15,7 +15,7 @@ use crate::pd::PdClient;
 use crate::pd::PdRpcClient;
 use crate::proto::metapb;
 use crate::raw::lowering::*;
-use crate::request::codec::{ApiV1Codec, Codec, EncodedRequest};
+use crate::request::codec::{ApiV1RawCodec, Codec, EncodedRequest};
 use crate::request::Collect;
 use crate::request::CollectSingle;
 use crate::request::Plan;
@@ -36,7 +36,7 @@ const MAX_RAW_KV_SCAN_LIMIT: u32 = 10240;
 ///
 /// The returned results of raw request methods are [`Future`](std::future::Future)s that must be
 /// awaited to execute.
-pub struct Client<Cod = ApiV1Codec, PdC = PdRpcClient<Cod>>
+pub struct Client<Cod = ApiV1RawCodec, PdC = PdRpcClient<Cod>>
 where
     Cod: Codec,
     PdC: PdClient<Codec = Cod>,
@@ -59,7 +59,7 @@ impl Clone for Client {
     }
 }
 
-impl Client<ApiV1Codec, PdRpcClient> {
+impl Client<ApiV1RawCodec, PdRpcClient<ApiV1RawCodec>> {
     /// Create a raw [`Client`] and connect to the TiKV cluster.
     ///
     /// Because TiKV is managed by a [PD](https://github.com/pingcap/pd/) cluster, the endpoints for
@@ -106,7 +106,8 @@ impl Client<ApiV1Codec, PdRpcClient> {
     ) -> Result<Self> {
         let pd_endpoints: Vec<String> = pd_endpoints.into_iter().map(Into::into).collect();
         let rpc = Arc::new(
-            PdRpcClient::connect(&pd_endpoints, config, false, Some(ApiV1Codec::default())).await?,
+            PdRpcClient::connect(&pd_endpoints, config, false, Some(ApiV1RawCodec::default()))
+                .await?,
         );
         Ok(Client {
             rpc,
diff --git a/src/request/codec.rs b/src/request/codec.rs
index 06e447c6..a409a8e7 100644
--- a/src/request/codec.rs
+++ b/src/request/codec.rs
@@ -9,16 +9,21 @@ pub trait Codec: Clone + Sync + Send + 'static {
 }
 
 #[derive(Clone, Default)]
-pub struct ApiV1Codec {}
+pub struct ApiV1TxnCodec {}
 
-impl Codec for ApiV1Codec {}
+impl Codec for ApiV1TxnCodec {}
+
+#[derive(Clone, Default)]
+pub struct ApiV1RawCodec {}
+
+impl Codec for ApiV1RawCodec {}
 
 #[derive(Clone)]
-pub struct ApiV2Codec {
+pub struct ApiV2TxnCodec {
     _keyspace_id: u32,
 }
 
-impl ApiV2Codec {
+impl ApiV2TxnCodec {
     pub fn new(keyspace_id: u32) -> Self {
         Self {
             _keyspace_id: keyspace_id,
@@ -26,13 +31,15 @@ impl ApiV2Codec {
     }
 }
 
-impl Codec for ApiV2Codec {
+impl Codec for ApiV2TxnCodec {
     fn encode_request<R: KvRequest>(&self, req: &mut R) {
         req.set_api_version(kvrpcpb::ApiVersion::V2);
         // TODO: req.encode_request(self);
     }
 }
 
+// TODO: pub struct ApiV2RawCodec
+
 // EncodeRequest is just a type wrapper to avoid passing not encoded request to `PlanBuilder` by mistake.
 #[derive(Clone)]
 pub struct EncodedRequest<Req: KvRequest> {
diff --git a/src/transaction/client.rs b/src/transaction/client.rs
index 8d1cf43d..e4d40618 100644
--- a/src/transaction/client.rs
+++ b/src/transaction/client.rs
@@ -10,7 +10,7 @@ use crate::config::Config;
 use crate::pd::PdClient;
 use crate::pd::PdRpcClient;
 use crate::proto::pdpb::Timestamp;
-use crate::request::codec::{ApiV1Codec, ApiV2Codec, Codec, EncodedRequest};
+use crate::request::codec::{ApiV1TxnCodec, ApiV2TxnCodec, Codec, EncodedRequest};
 use crate::request::plan::CleanupLocksResult;
 use crate::request::Plan;
 use crate::timestamp::TimestampExt;
@@ -43,7 +43,7 @@ const SCAN_LOCK_BATCH_SIZE: u32 = 1024;
 ///
 /// The returned results of transactional requests are [`Future`](std::future::Future)s that must be
 /// awaited to execute.
-pub struct Client<Cod: Codec = ApiV1Codec> {
+pub struct Client<Cod: Codec = ApiV1TxnCodec> {
     pd: Arc<PdRpcClient<Cod>>,
 }
 
@@ -55,7 +55,7 @@ impl<Cod: Codec> Clone for Client<Cod> {
     }
 }
 
-impl Client<ApiV1Codec> {
+impl Client<ApiV1TxnCodec> {
     /// Create a transactional [`Client`] and connect to the TiKV cluster.
     ///
     /// Because TiKV is managed by a [PD](https://github.com/pingcap/pd/) cluster, the endpoints for
@@ -100,21 +100,21 @@ impl Client<ApiV1Codec> {
         pd_endpoints: Vec<S>,
         config: Config,
     ) -> Result<Client> {
-        Self::new_with_codec(pd_endpoints, config, ApiV1Codec::default()).await
+        Self::new_with_codec(pd_endpoints, config, ApiV1TxnCodec::default()).await
     }
 }
 
-impl Client<ApiV2Codec> {
+impl Client<ApiV2TxnCodec> {
     pub async fn new_with_config_v2<S: Into<String>>(
         _keyspace_name: &str,
         pd_endpoints: Vec<S>,
         config: Config,
-    ) -> Result<Client<ApiV2Codec>> {
+    ) -> Result<Client<ApiV2TxnCodec>> {
         debug!("creating new transactional client APIv2");
         let pd_endpoints: Vec<String> = pd_endpoints.into_iter().map(Into::into).collect();
         let mut pd = PdRpcClient::connect(&pd_endpoints, config, true, None).await?;
         let keyspace_id = 0; // TODO: get keyspace_id by pd.get_keyspace(keyspace_name)
-        pd.set_codec(ApiV2Codec::new(keyspace_id));
+        pd.set_codec(ApiV2TxnCodec::new(keyspace_id));
         Ok(Client { pd: Arc::new(pd) })
     }
 }
diff --git a/src/transaction/lock.rs b/src/transaction/lock.rs
index c55ab381..0ccb5e46 100644
--- a/src/transaction/lock.rs
+++ b/src/transaction/lock.rs
@@ -121,7 +121,6 @@ async fn resolve_lock_with_retry(
         let ver_id = store.region_with_leader.ver_id();
         let request = requests::new_resolve_lock_request(start_version, commit_version);
         let encoded_req = EncodedRequest::new(request, pd_client.get_codec());
-        // The only place where single-region is used
         let plan = crate::request::PlanBuilder::new(pd_client.clone(), encoded_req)
             .single_region_with_store(store)
             .await?

From c9fbf69e30187c9af8b23ea99025d3b7ac9a3a06 Mon Sep 17 00:00:00 2001
From: Ping Yu <yuping@pingcap.com>
Date: Mon, 28 Aug 2023 11:22:17 +0800
Subject: [PATCH 4/7] polish

Signed-off-by: Ping Yu <yuping@pingcap.com>
---
 src/request/mod.rs | 4 +---
 1 file changed, 1 insertion(+), 3 deletions(-)

diff --git a/src/request/mod.rs b/src/request/mod.rs
index 55746c5c..429adea5 100644
--- a/src/request/mod.rs
+++ b/src/request/mod.rs
@@ -146,9 +146,7 @@ mod test {
                 unreachable!();
             }
 
-            fn set_api_version(&mut self, _api_version: ApiVersion) {
-                unreachable!();
-            }
+            fn set_api_version(&mut self, _api_version: ApiVersion) {}
         }
 
         #[async_trait]

From 12b217db644c4187adb30403ee7db2d6817b7136 Mon Sep 17 00:00:00 2001
From: Ping Yu <yuping@pingcap.com>
Date: Mon, 28 Aug 2023 12:25:00 +0800
Subject: [PATCH 5/7] export proto

Signed-off-by: Ping Yu <yuping@pingcap.com>
---
 src/lib.rs | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/src/lib.rs b/src/lib.rs
index b9ba66c0..a2acf57b 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -94,6 +94,8 @@
 
 pub mod backoff;
 #[doc(hidden)]
+pub mod proto; // export `proto` to enable user customized codec
+#[doc(hidden)]
 pub mod raw;
 pub mod request;
 #[doc(hidden)]
@@ -104,7 +106,6 @@ mod compat;
 mod config;
 mod kv;
 mod pd;
-mod proto;
 mod region;
 mod region_cache;
 mod stats;

From 19f829d46cbd7fe51c0dbe521b6163b2385ae410 Mon Sep 17 00:00:00 2001
From: Ping Yu <yuping@pingcap.com>
Date: Mon, 28 Aug 2023 17:11:36 +0800
Subject: [PATCH 6/7] fix set_context

Signed-off-by: Ping Yu <yuping@pingcap.com>
---
 src/raw/requests.rs         |  4 ++--
 src/request/shard.rs        |  6 +++---
 src/store/request.rs        |  8 ++++++++
 src/transaction/requests.rs | 13 +++++++------
 4 files changed, 20 insertions(+), 11 deletions(-)

diff --git a/src/raw/requests.rs b/src/raw/requests.rs
index ff1cc14f..23bfce73 100644
--- a/src/raw/requests.rs
+++ b/src/raw/requests.rs
@@ -162,7 +162,7 @@ impl Shardable for kvrpcpb::RawBatchPutRequest {
     }
 
     fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
-        self.context = Some(store.region_with_leader.context()?);
+        self.set_context(store.region_with_leader.context()?);
         self.pairs = shard;
         Ok(())
     }
@@ -293,7 +293,7 @@ impl Shardable for kvrpcpb::RawBatchScanRequest {
     }
 
     fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
-        self.context = Some(store.region_with_leader.context()?);
+        self.set_context(store.region_with_leader.context()?);
         self.ranges = shard;
         Ok(())
     }
diff --git a/src/request/shard.rs b/src/request/shard.rs
index aaefab72..7c78743d 100644
--- a/src/request/shard.rs
+++ b/src/request/shard.rs
@@ -163,7 +163,7 @@ macro_rules! shardable_key {
                 mut shard: Self::Shard,
                 store: &$crate::store::RegionStore,
             ) -> $crate::Result<()> {
-                self.context = Some(store.region_with_leader.context()?);
+                self.set_context(store.region_with_leader.context()?);
                 assert!(shard.len() == 1);
                 self.key = shard.pop().unwrap();
                 Ok(())
@@ -196,7 +196,7 @@ macro_rules! shardable_keys {
                 shard: Self::Shard,
                 store: &$crate::store::RegionStore,
             ) -> $crate::Result<()> {
-                self.context = Some(store.region_with_leader.context()?);
+                self.set_context(store.region_with_leader.context()?);
                 self.keys = shard.into_iter().map(Into::into).collect();
                 Ok(())
             }
@@ -225,7 +225,7 @@ macro_rules! shardable_range {
                 shard: Self::Shard,
                 store: &$crate::store::RegionStore,
             ) -> $crate::Result<()> {
-                self.context = Some(store.region_with_leader.context()?);
+                self.set_context(store.region_with_leader.context()?);
 
                 self.start_key = shard.0.into();
                 self.end_key = shard.1.into();
diff --git a/src/store/request.rs b/src/store/request.rs
index a26e57e0..2f1a31a0 100644
--- a/src/store/request.rs
+++ b/src/store/request.rs
@@ -21,6 +21,8 @@ pub trait Request: Any + Sync + Send + 'static {
     ) -> Result<Box<dyn Any>>;
     fn label(&self) -> &'static str;
     fn as_any(&self) -> &dyn Any;
+    /// Set the context for the request.
+    /// Should always use `set_context` other than modify the `self.context` directly.
     fn set_context(&mut self, context: kvrpcpb::Context);
     fn set_api_version(&mut self, api_version: kvrpcpb::ApiVersion);
 }
@@ -53,7 +55,13 @@ macro_rules! impl_request {
             }
 
             fn set_context(&mut self, context: kvrpcpb::Context) {
+                let api_version = self
+                    .context
+                    .as_ref()
+                    .map(|c| c.api_version)
+                    .unwrap_or_default();
                 self.context = Some(context);
+                self.set_api_version(kvrpcpb::ApiVersion::from_i32(api_version).unwrap());
             }
 
             fn set_api_version(&mut self, api_version: kvrpcpb::ApiVersion) {
diff --git a/src/transaction/requests.rs b/src/transaction/requests.rs
index 9cc4993f..bd3852e3 100644
--- a/src/transaction/requests.rs
+++ b/src/transaction/requests.rs
@@ -38,6 +38,7 @@ use crate::shardable_range;
 use crate::store::store_stream_for_keys;
 use crate::store::store_stream_for_range;
 use crate::store::RegionStore;
+use crate::store::Request;
 use crate::timestamp::TimestampExt;
 use crate::transaction::HasLocks;
 use crate::util::iter::FlatMapOkIterExt;
@@ -294,7 +295,7 @@ impl Shardable for kvrpcpb::PrewriteRequest {
     }
 
     fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
-        self.context = Some(store.region_with_leader.context()?);
+        self.set_context(store.region_with_leader.context()?);
 
         // Only need to set secondary keys if we're sending the primary key.
         if self.use_async_commit && !self.mutations.iter().any(|m| m.key == self.primary_lock) {
@@ -361,7 +362,7 @@ impl Shardable for kvrpcpb::CommitRequest {
     }
 
     fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
-        self.context = Some(store.region_with_leader.context()?);
+        self.set_context(store.region_with_leader.context()?);
         self.keys = shard.into_iter().map(Into::into).collect();
         Ok(())
     }
@@ -452,7 +453,7 @@ impl Shardable for kvrpcpb::PessimisticLockRequest {
     }
 
     fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
-        self.context = Some(store.region_with_leader.context()?);
+        self.set_context(store.region_with_leader.context()?);
         self.mutations = shard;
         Ok(())
     }
@@ -553,7 +554,7 @@ impl Shardable for kvrpcpb::ScanLockRequest {
     }
 
     fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
-        self.context = Some(store.region_with_leader.context()?);
+        self.set_context(store.region_with_leader.context()?);
         self.start_key = shard.0;
         Ok(())
     }
@@ -614,7 +615,7 @@ impl Shardable for kvrpcpb::TxnHeartBeatRequest {
     }
 
     fn apply_shard(&mut self, mut shard: Self::Shard, store: &RegionStore) -> Result<()> {
-        self.context = Some(store.region_with_leader.context()?);
+        self.set_context(store.region_with_leader.context()?);
         assert!(shard.len() == 1);
         self.primary_lock = shard.pop().unwrap();
         Ok(())
@@ -672,7 +673,7 @@ impl Shardable for kvrpcpb::CheckTxnStatusRequest {
     }
 
     fn apply_shard(&mut self, mut shard: Self::Shard, store: &RegionStore) -> Result<()> {
-        self.context = Some(store.region_with_leader.context()?);
+        self.set_context(store.region_with_leader.context()?);
         assert!(shard.len() == 1);
         self.primary_key = shard.pop().unwrap();
         Ok(())

From 34b99f5a7eb0af64cbc6e3bf21a605643dcf0506 Mon Sep 17 00:00:00 2001
From: Ping Yu <yuping@pingcap.com>
Date: Wed, 30 Aug 2023 11:30:32 +0800
Subject: [PATCH 7/7] add Codec parameter to Transaction & Snapshot

Signed-off-by: Ping Yu <yuping@pingcap.com>
---
 src/transaction/client.rs      | 10 +++++-----
 src/transaction/snapshot.rs    |  9 ++++++---
 src/transaction/transaction.rs | 17 +++++++++++------
 3 files changed, 22 insertions(+), 14 deletions(-)

diff --git a/src/transaction/client.rs b/src/transaction/client.rs
index e4d40618..610ea065 100644
--- a/src/transaction/client.rs
+++ b/src/transaction/client.rs
@@ -152,7 +152,7 @@ impl<Cod: Codec> Client<Cod> {
     /// transaction.commit().await.unwrap();
     /// # });
     /// ```
-    pub async fn begin_optimistic(&self) -> Result<Transaction<PdRpcClient<Cod>>> {
+    pub async fn begin_optimistic(&self) -> Result<Transaction<Cod, PdRpcClient<Cod>>> {
         debug!("creating new optimistic transaction");
         let timestamp = self.current_timestamp().await?;
         Ok(self.new_transaction(timestamp, TransactionOptions::new_optimistic()))
@@ -175,7 +175,7 @@ impl<Cod: Codec> Client<Cod> {
     /// transaction.commit().await.unwrap();
     /// # });
     /// ```
-    pub async fn begin_pessimistic(&self) -> Result<Transaction<PdRpcClient<Cod>>> {
+    pub async fn begin_pessimistic(&self) -> Result<Transaction<Cod, PdRpcClient<Cod>>> {
         debug!("creating new pessimistic transaction");
         let timestamp = self.current_timestamp().await?;
         Ok(self.new_transaction(timestamp, TransactionOptions::new_pessimistic()))
@@ -201,7 +201,7 @@ impl<Cod: Codec> Client<Cod> {
     pub async fn begin_with_options(
         &self,
         options: TransactionOptions,
-    ) -> Result<Transaction<PdRpcClient<Cod>>> {
+    ) -> Result<Transaction<Cod, PdRpcClient<Cod>>> {
         debug!("creating new customized transaction");
         let timestamp = self.current_timestamp().await?;
         Ok(self.new_transaction(timestamp, options))
@@ -212,7 +212,7 @@ impl<Cod: Codec> Client<Cod> {
         &self,
         timestamp: Timestamp,
         options: TransactionOptions,
-    ) -> Snapshot<PdRpcClient<Cod>> {
+    ) -> Snapshot<Cod, PdRpcClient<Cod>> {
         debug!("creating new snapshot");
         Snapshot::new(self.new_transaction(timestamp, options.read_only()))
     }
@@ -311,7 +311,7 @@ impl<Cod: Codec> Client<Cod> {
         &self,
         timestamp: Timestamp,
         options: TransactionOptions,
-    ) -> Transaction<PdRpcClient<Cod>> {
+    ) -> Transaction<Cod, PdRpcClient<Cod>> {
         Transaction::new(timestamp, self.pd.clone(), options)
     }
 }
diff --git a/src/transaction/snapshot.rs b/src/transaction/snapshot.rs
index 2ae15769..a8aa9464 100644
--- a/src/transaction/snapshot.rs
+++ b/src/transaction/snapshot.rs
@@ -2,7 +2,9 @@
 
 use derive_new::new;
 use log::debug;
+use std::marker::PhantomData;
 
+use crate::codec::ApiV1TxnCodec;
 use crate::pd::{PdClient, PdRpcClient};
 use crate::request::codec::Codec;
 use crate::BoundRange;
@@ -20,11 +22,12 @@ use crate::Value;
 ///
 /// See the [Transaction](struct@crate::Transaction) docs for more information on the methods.
 #[derive(new)]
-pub struct Snapshot<PdC: PdClient = PdRpcClient> {
-    transaction: Transaction<PdC>,
+pub struct Snapshot<Cod: Codec = ApiV1TxnCodec, PdC: PdClient = PdRpcClient<Cod>> {
+    transaction: Transaction<Cod, PdC>,
+    phantom: PhantomData<Cod>,
 }
 
-impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Snapshot<PdC> {
+impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Snapshot<Cod, PdC> {
     /// Get the value associated with the given key.
     pub async fn get(&mut self, key: impl Into<Key>) -> Result<Option<Value>> {
         debug!("invoking get request on snapshot");
diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs
index b28ea2b3..9317e171 100644
--- a/src/transaction/transaction.rs
+++ b/src/transaction/transaction.rs
@@ -1,6 +1,7 @@
 // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
 
 use std::iter;
+use std::marker::PhantomData;
 use std::sync::Arc;
 use std::time::Instant;
 
@@ -14,6 +15,7 @@ use tokio::time::Duration;
 
 use crate::backoff::Backoff;
 use crate::backoff::DEFAULT_REGION_BACKOFF;
+use crate::codec::ApiV1TxnCodec;
 use crate::pd::PdClient;
 use crate::pd::PdRpcClient;
 use crate::proto::kvrpcpb;
@@ -74,7 +76,7 @@ use crate::Value;
 /// txn.commit().await.unwrap();
 /// # });
 /// ```
-pub struct Transaction<PdC: PdClient = PdRpcClient> {
+pub struct Transaction<Cod: Codec = ApiV1TxnCodec, PdC: PdClient = PdRpcClient<Cod>> {
     status: Arc<RwLock<TransactionStatus>>,
     timestamp: Timestamp,
     buffer: Buffer,
@@ -82,14 +84,15 @@ pub struct Transaction<PdC: PdClient = PdRpcClient> {
     options: TransactionOptions,
     is_heartbeat_started: bool,
     start_instant: Instant,
+    phantom: PhantomData<Cod>,
 }
 
-impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Transaction<PdC> {
+impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Transaction<Cod, PdC> {
     pub(crate) fn new(
         timestamp: Timestamp,
         rpc: Arc<PdC>,
         options: TransactionOptions,
-    ) -> Transaction<PdC> {
+    ) -> Transaction<Cod, PdC> {
         let status = if options.read_only {
             TransactionStatus::ReadOnly
         } else {
@@ -103,6 +106,7 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Transaction<PdC> {
             options,
             is_heartbeat_started: false,
             start_instant: std::time::Instant::now(),
+            phantom: PhantomData,
         }
     }
 
@@ -134,8 +138,9 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Transaction<PdC> {
 
         self.buffer
             .get_or_else(key, |key| async move {
-                let request = EncodedRequest::new(new_get_request(key, timestamp), rpc.get_codec());
-                let plan = PlanBuilder::new(rpc, request)
+                let request = new_get_request(key, timestamp);
+                let encoded_req = EncodedRequest::new(request, rpc.get_codec());
+                let plan = PlanBuilder::new(rpc, encoded_req)
                     .resolve_lock(retry_options.lock_backoff)
                     .retry_multi_region(DEFAULT_REGION_BACKOFF)
                     .merge(CollectSingle)
@@ -924,7 +929,7 @@ impl<Cod: Codec, PdC: PdClient<Codec = Cod>> Transaction<PdC> {
     }
 }
 
-impl<PdC: PdClient> Drop for Transaction<PdC> {
+impl<Cod: Codec, PdC: PdClient> Drop for Transaction<Cod, PdC> {
     fn drop(&mut self) {
         debug!("dropping transaction");
         if std::thread::panicking() {