diff --git a/rust-toolchain b/rust-toolchain new file mode 100644 index 00000000..3e2f5f2c --- /dev/null +++ b/rust-toolchain @@ -0,0 +1 @@ +nightly-2021-03-15 diff --git a/src/transaction/buffer.rs b/src/transaction/buffer.rs index 1d7f61b9..2ec1d86c 100644 --- a/src/transaction/buffer.rs +++ b/src/transaction/buffer.rs @@ -1,17 +1,21 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. use crate::{BoundRange, Key, KvPair, Result, Value}; +use derive_new::new; use std::{ - collections::{BTreeMap, HashMap}, + collections::{btree_map::Entry, BTreeMap, HashMap}, future::Future, }; use tikv_client_proto::kvrpcpb; use tokio::sync::{Mutex, MutexGuard}; -#[derive(Default)] +#[derive(new)] struct InnerBuffer { + #[new(default)] primary_key: Option, + #[new(default)] entry_map: BTreeMap, + is_pessimistic: bool, } impl InnerBuffer { @@ -29,16 +33,22 @@ impl InnerBuffer { } /// A caching layer which buffers reads and writes in a transaction. -#[derive(Default)] pub struct Buffer { mutations: Mutex, } impl Buffer { + pub fn new(is_pessimistic: bool) -> Buffer { + Buffer { + mutations: Mutex::new(InnerBuffer::new(is_pessimistic)), + } + } + /// Get the primary key of the buffer. pub async fn get_primary_key(&self) -> Option { self.mutations.lock().await.primary_key.clone() } + /// Get the primary key of the buffer, if not exists, use `key` as the primary key. pub async fn get_primary_key_or(&self, key: &Key) -> Key { self.mutations.lock().await.get_primary_key_or(key).clone() @@ -203,28 +213,30 @@ impl Buffer { /// Mark a value as Insert mutation into the buffer (does not write through). pub async fn insert(&self, key: Key, value: Value) { - self.mutations - .lock() - .await - .insert(key, BufferEntry::Insert(value)); + let mut mutations = self.mutations.lock().await; + let mut entry = mutations.entry_map.entry(key.clone()); + match entry { + Entry::Occupied(ref mut o) if matches!(o.get(), BufferEntry::Del) => { + o.insert(BufferEntry::Put(value)); + } + _ => mutations.insert(key, BufferEntry::Insert(value)), + } } /// Mark a value as deleted. pub async fn delete(&self, key: Key) { let mut mutations = self.mutations.lock().await; - let value = mutations - .entry_map - .entry(key.clone()) - .or_insert(BufferEntry::Del); - - let new_value: BufferEntry; - if let BufferEntry::Insert(_) = value { - new_value = BufferEntry::CheckNotExist - } else { - new_value = BufferEntry::Del + let is_pessimistic = mutations.is_pessimistic; + let mut entry = mutations.entry_map.entry(key.clone()); + + match entry { + Entry::Occupied(ref mut o) + if matches!(o.get(), BufferEntry::Insert(_)) && !is_pessimistic => + { + o.insert(BufferEntry::CheckNotExist); + } + _ => mutations.insert(key, BufferEntry::Del), } - - mutations.insert(key, new_value); } /// Converts the buffered mutations to the proto buffer version @@ -378,7 +390,7 @@ mod tests { #[tokio::test] #[allow(unreachable_code)] async fn set_and_get_from_buffer() { - let buffer = Buffer::default(); + let buffer = Buffer::new(false); buffer .put(b"key1".to_vec().into(), b"value1".to_vec()) .await; @@ -411,7 +423,7 @@ mod tests { #[tokio::test] #[allow(unreachable_code)] async fn insert_and_get_from_buffer() { - let buffer = Buffer::default(); + let buffer = Buffer::new(false); buffer .insert(b"key1".to_vec().into(), b"value1".to_vec()) .await; @@ -453,13 +465,13 @@ mod tests { let v2: Value = b"value2".to_vec(); let v2_ = v2.clone(); - let buffer = Buffer::default(); + let buffer = Buffer::new(false); let r1 = block_on(buffer.get_or_else(k1.clone(), move |_| ready(Ok(Some(v1_))))); let r2 = block_on(buffer.get_or_else(k1.clone(), move |_| ready(panic!()))); assert_eq!(r1.unwrap().unwrap(), v1); assert_eq!(r2.unwrap().unwrap(), v1); - let buffer = Buffer::default(); + let buffer = Buffer::new(false); let r1 = block_on( buffer.batch_get_or_else(vec![k1.clone(), k2.clone()].into_iter(), move |_| { ready(Ok(vec![(k1_, v1__).into(), (k2_, v2_).into()])) diff --git a/src/transaction/lowering.rs b/src/transaction/lowering.rs index ce1243d0..413c6bf8 100644 --- a/src/transaction/lowering.rs +++ b/src/transaction/lowering.rs @@ -106,8 +106,34 @@ pub fn new_pessimistic_rollback_request( ) } +pub trait PessimisticLock: Clone { + fn key(self) -> Key; + + fn assertion(&self) -> kvrpcpb::Assertion; +} + +impl PessimisticLock for Key { + fn key(self) -> Key { + self + } + + fn assertion(&self) -> kvrpcpb::Assertion { + kvrpcpb::Assertion::None + } +} + +impl PessimisticLock for (Key, kvrpcpb::Assertion) { + fn key(self) -> Key { + self.0 + } + + fn assertion(&self) -> kvrpcpb::Assertion { + self.1 + } +} + pub fn new_pessimistic_lock_request( - keys: impl Iterator, + locks: impl Iterator, primary_lock: Key, start_version: Timestamp, lock_ttl: u64, @@ -115,13 +141,15 @@ pub fn new_pessimistic_lock_request( need_value: bool, ) -> kvrpcpb::PessimisticLockRequest { requests::new_pessimistic_lock_request( - keys.map(|key| { - let mut mutation = kvrpcpb::Mutation::default(); - mutation.set_op(kvrpcpb::Op::PessimisticLock); - mutation.set_key(key.into()); - mutation - }) - .collect(), + locks + .map(|pl| { + let mut mutation = kvrpcpb::Mutation::default(); + mutation.set_op(kvrpcpb::Op::PessimisticLock); + mutation.set_assertion(pl.assertion()); + mutation.set_key(pl.key().into()); + mutation + }) + .collect(), primary_lock.into(), start_version.version(), lock_ttl, diff --git a/src/transaction/transaction.rs b/src/transaction/transaction.rs index 1fe8be4c..47374a90 100644 --- a/src/transaction/transaction.rs +++ b/src/transaction/transaction.rs @@ -66,7 +66,7 @@ impl Transaction { Transaction { status: Arc::new(RwLock::new(status)), timestamp, - buffer: Default::default(), + buffer: Buffer::new(options.is_pessimistic()), rpc, options, is_heartbeat_started: false, @@ -401,8 +401,11 @@ impl Transaction { return Err(Error::DuplicateKeyInsertion); } if self.is_pessimistic() { - self.pessimistic_lock(iter::once(key.clone()), false) - .await?; + self.pessimistic_lock( + iter::once((key.clone(), kvrpcpb::Assertion::NotExist)), + false, + ) + .await?; } self.buffer.insert(key, value.into()).await; Ok(()) @@ -630,7 +633,7 @@ impl Transaction { /// Only valid for pessimistic transactions, panics if called on an optimistic transaction. async fn pessimistic_lock( &mut self, - keys: impl IntoIterator, + keys: impl IntoIterator, need_value: bool, ) -> Result> { assert!( @@ -638,12 +641,12 @@ impl Transaction { "`pessimistic_lock` is only valid to use with pessimistic transactions" ); - let keys: Vec = keys.into_iter().collect(); + let keys: Vec<_> = keys.into_iter().collect(); if keys.is_empty() { return Ok(vec![]); } - let first_key = keys[0].clone(); + let first_key = keys[0].clone().key(); let primary_lock = self.buffer.get_primary_key_or(&first_key).await; let for_update_ts = self.rpc.clone().get_timestamp().await?; self.options.push_for_update_ts(for_update_ts.clone()); @@ -667,7 +670,7 @@ impl Transaction { self.start_auto_heartbeat().await; for key in keys { - self.buffer.lock(key).await; + self.buffer.lock(key.key()).await; } pairs @@ -891,6 +894,13 @@ impl TransactionOptions { self.auto_heartbeat = false; self } + + pub fn is_pessimistic(&self) -> bool { + match self.kind { + TransactionKind::Pessimistic(_) => true, + TransactionKind::Optimistic => false, + } + } } /// The default TTL of a lock in milliseconds. diff --git a/tests/integration_tests.rs b/tests/integration_tests.rs index 74e7c9b4..61fd3010 100644 --- a/tests/integration_tests.rs +++ b/tests/integration_tests.rs @@ -596,6 +596,49 @@ async fn pessimistic_rollback() -> Result<()> { Ok(()) } +#[tokio::test] +#[serial] +async fn pessimistic_delete() -> Result<()> { + clear_tikv().await; + let client = + TransactionClient::new_with_config(vec!["127.0.0.1:2379"], Default::default()).await?; + + // The transaction will lock the keys and must release the locks on commit, even when values are + // not written to the DB. + let mut txn = client.begin_pessimistic().await?; + txn.put(vec![1], vec![42]).await?; + txn.delete(vec![1]).await?; + txn.insert(vec![2], vec![42]).await?; + txn.delete(vec![2]).await?; + txn.put(vec![3], vec![42]).await?; + txn.commit().await?; + + // Check that the keys are not locked. + let mut txn2 = client.begin_optimistic().await?; + txn2.put(vec![1], vec![42]).await?; + txn2.put(vec![2], vec![42]).await?; + txn2.put(vec![3], vec![42]).await?; + txn2.commit().await?; + + // As before, but rollback instead of commit. + let mut txn = client.begin_pessimistic().await?; + txn.put(vec![1], vec![42]).await?; + txn.delete(vec![1]).await?; + txn.delete(vec![2]).await?; + txn.insert(vec![2], vec![42]).await?; + txn.delete(vec![2]).await?; + txn.put(vec![3], vec![42]).await?; + txn.rollback().await?; + + let mut txn2 = client.begin_optimistic().await?; + txn2.put(vec![1], vec![42]).await?; + txn2.put(vec![2], vec![42]).await?; + txn2.put(vec![3], vec![42]).await?; + txn2.commit().await?; + + Ok(()) +} + #[tokio::test] #[serial] async fn lock_keys() -> Result<()> {