diff --git a/doc/developer/design/20230705_v2_txn_management.md b/doc/developer/design/20230705_v2_txn_management.md index 333ed48daf91b..98bf70fcbbd37 100644 --- a/doc/developer/design/20230705_v2_txn_management.md +++ b/doc/developer/design/20230705_v2_txn_management.md @@ -365,6 +365,32 @@ maintenance or a CRDB write, but this is also true for registering a reader. On the balance, I think this is a _much_ better set of tradeoffs than the original plan. +### Compaction + +Compaction of data shards is initially delegated to the txns user (the storage +controller). Because txn writes intentionally never read data shards and in no +way depend on the sinces, the since of a data shard is free to be arbitrarily +far ahead of or behind the txns upper. Data shard reads, when run through the +above process, then follow the usual rules (can read at times beyond the since +but not beyond the upper). + +Compaction of the txns shard relies on the following invariant that is carefully +maintained: every write less than the since of the txns shard has been applied. +Mechanically, this is accomplished by a critical since capability held +internally by the txns system. Any txn writer is free to advance it to a time +once it has proven that all writes before that time have been applied. + +It is advantageous to compact the txns shard aggressively so that applied writes +are promptly consolidated out, minimizing the size. For a snapshot read at +`as_of`, we need to be able to distinguish when the latest write `<= as_of` has +been applied. The above invariant enables this as follows: + +- If `as_of <= txns_shard.since()`, then the invariant guarantees that all + writes `<= as_of` have been applied, so we're free to read as described in the + section above. +- Otherwise, we haven't compacted `as_of` in the txns shard yet, and still have + perfect information about which writes happened when. We can look at the data shard upper to determine which have been applied. + ### Forget A data shard is removed from the txns set using a `forget` operation that writes diff --git a/src/persist-cli/src/maelstrom/txn_list_append_multi.rs b/src/persist-cli/src/maelstrom/txn_list_append_multi.rs index 248da374db918..16967dda2d40b 100644 --- a/src/persist-cli/src/maelstrom/txn_list_append_multi.rs +++ b/src/persist-cli/src/maelstrom/txn_list_append_multi.rs @@ -116,6 +116,11 @@ impl Transactor { txn.tidy(std::mem::take(&mut self.tidy)); match txn.commit_at(&mut self.txns, write_ts).await { Ok(maintenance) => { + // Aggressively allow the txns shard compact. To + // exercise more edge cases, do it before we apply the + // newly committed txn. + self.txns.compact_to(write_ts).await; + debug!("req committed at read_ts={} write_ts={}", read_ts, write_ts); let tidy = maintenance.apply(&mut self.txns).await; self.tidy.merge(tidy); diff --git a/src/persist-txn/src/lib.rs b/src/persist-txn/src/lib.rs index 9918ba9c44be0..a7fcbb47eb588 100644 --- a/src/persist-txn/src/lib.rs +++ b/src/persist-txn/src/lib.rs @@ -287,11 +287,6 @@ pub mod txn_read; pub mod txn_write; pub mod txns; -// TODO(txn): -// - Closing/deleting data shards. -// - Hold a critical since capability for each registered shard? -// - Figure out the compaction story for both txn and data shard. - /// The in-mem representation of an update in the txns shard. #[derive(Debug)] pub enum TxnsEntry { diff --git a/src/persist-txn/src/operator.rs b/src/persist-txn/src/operator.rs index 97389c6436693..ae2255be7c92e 100644 --- a/src/persist-txn/src/operator.rs +++ b/src/persist-txn/src/operator.rs @@ -32,7 +32,7 @@ use timely::progress::{Antichain, Timestamp}; use timely::scheduling::Scheduler; use timely::worker::Worker; use timely::{Data, WorkerConfig}; -use tracing::debug; +use tracing::{debug, trace}; use crate::txn_read::{DataListenNext, TxnsCache}; use crate::{TxnsCodec, TxnsCodecDefault}; @@ -132,12 +132,16 @@ where ) .await .expect("schema shouldn't change"); - let () = snap.unblock_read(data_write).await; + let empty_to = snap.unblock_read(data_write).await; + debug!( + "txns_progress({}) {} starting as_of={:?} empty_to={:?}", + name, data_id, as_of, empty_to + ); // We've ensured that the data shard's physical upper is past as_of, so // start by passing through data and frontier updates from the input // until it is past the as_of. - let mut read_data_to = as_of.step_forward(); + let mut read_data_to = empty_to; let mut output_progress_exclusive = T::minimum(); loop { // TODO(txn): Do something more specific when the input returns None @@ -151,6 +155,7 @@ where // disconnected. Event::Data(_data_cap, data) => { for data in data.drain(..) { + trace!("txns_progress({}) emitting data {:?}", name, data); passthrough_output.give(&cap, data).await; } } @@ -166,9 +171,14 @@ where // frontier updates too. if &output_progress_exclusive < input_progress_exclusive { output_progress_exclusive.clone_from(input_progress_exclusive); + trace!( + "txns_progress({}) downgrading cap to {:?}", + name, + output_progress_exclusive + ); cap.downgrade(&output_progress_exclusive); } - if read_data_to <= output_progress_exclusive { + if read_data_to.less_than(&output_progress_exclusive) { break; } } @@ -180,15 +190,9 @@ where // find out what to do next given our current progress. loop { txns_cache.update_ge(&output_progress_exclusive).await; + txns_cache.compact_to(&output_progress_exclusive); let data_listen_next = txns_cache.data_listen_next(&data_id, output_progress_exclusive.clone()); - debug!( - "txns_progress({}): data_listen_next {:.9} at {:?}: {:?}", - name, - data_id.to_string(), - output_progress_exclusive, - data_listen_next - ); match data_listen_next { // We've caught up to the txns upper and we have to wait for // it to advance before asking again. @@ -204,7 +208,7 @@ where // The data shard got a write! Loop back above and pass // through data until we see it. DataListenNext::ReadDataTo(new_target) => { - read_data_to = new_target; + read_data_to = Antichain::from_elem(new_target); break; } // We know there are no writes in @@ -213,9 +217,19 @@ where DataListenNext::EmitLogicalProgress(new_progress) => { assert!(output_progress_exclusive < new_progress); output_progress_exclusive = new_progress; + debug!( + "txns_progress({}) downgrading cap to {:?}", + name, output_progress_exclusive + ); cap.downgrade(&output_progress_exclusive); continue; } + DataListenNext::CompactedTo(since_ts) => { + unreachable!( + "internal logic error: {} unexpectedly compacted past {:?} to {:?}", + data_id, output_progress_exclusive, since_ts + ) + } } } } diff --git a/src/persist-txn/src/txn_read.rs b/src/persist-txn/src/txn_read.rs index c16012a6b47cd..f2ae12d35306d 100644 --- a/src/persist-txn/src/txn_read.rs +++ b/src/persist-txn/src/txn_read.rs @@ -9,14 +9,14 @@ //! Interfaces for reading txn shards as well as data shards. -use std::collections::{BTreeMap, VecDeque}; +use std::cmp::Reverse; +use std::collections::{BTreeMap, BinaryHeap, VecDeque}; use std::fmt::Debug; use differential_dataflow::difference::Semigroup; use differential_dataflow::hashable::Hashable; use differential_dataflow::lattice::Lattice; use mz_ore::collections::HashMap; -use mz_persist_client::error::UpperMismatch; use mz_persist_client::read::{ListenEvent, ReadHandle, Since, Subscribe}; use mz_persist_client::write::WriteHandle; use mz_persist_client::ShardId; @@ -69,6 +69,8 @@ use crate::{TxnsCodec, TxnsCodecDefault, TxnsEntry}; #[derive(Debug)] pub struct TxnsCache { _txns_id: ShardId, + // Invariant: <= the minimum unapplied batch. + since_ts: T, pub(crate) progress_exclusive: T, txns_subscribe: Subscribe, @@ -79,11 +81,15 @@ pub struct TxnsCache, T)>, + pub(crate) unapplied_batches: BTreeMap, T)>, /// An index into `unapplied_batches` keyed by the serialized batch. batch_idx: HashMap, usize>, /// The times at which each data shard has been written. pub(crate) datas: BTreeMap>, + + /// Invariant: Contains the minimum write time (if any) for each value in + /// `self.datas`. + datas_min_write_ts: BinaryHeap>, } impl TxnsCache { @@ -100,21 +106,22 @@ impl pub(crate) async fn open(txns_read: ReadHandle) -> Self { let _txns_id = txns_read.shard_id(); - // TODO(txn): Figure out the compaction story. This might require - // sorting inserts before retractions within each timestamp. let as_of = txns_read.since().clone(); + let since_ts = as_of.as_option().expect("txns shard is not closed").clone(); let subscribe = txns_read .subscribe(as_of) .await .expect("handle holds a capability"); TxnsCache { _txns_id, + since_ts, progress_exclusive: T::minimum(), txns_subscribe: subscribe, next_batch_id: 0, unapplied_batches: BTreeMap::new(), batch_idx: HashMap::new(), datas: BTreeMap::new(), + datas_min_write_ts: BinaryHeap::new(), } } @@ -202,6 +209,9 @@ impl data_times, ); + if ts < self.since_ts { + return CompactedTo(self.since_ts.clone()); + } let Some(data_times) = data_times else { // Not registered, maybe it will be in the future? In the meantime, // treat it like a normal shard (i.e. pass through reads) and check @@ -283,6 +293,30 @@ impl retractions.filter(|(batch_raw, _)| self.batch_idx.contains_key(*batch_raw)) } + /// Allows compaction to internal representations, losing the ability to + /// answer queries about times less_than since_ts. + /// + /// To ensure that this is not `O(data shard)`, we update them lazily as + /// they are touched in self.update. + /// + /// Callers must first wait for `update_ge` with the same or later timestamp + /// to return. Panics otherwise. + pub fn compact_to(&mut self, since_ts: &T) { + // Make things easier on ourselves by allowing update to work on + // un-compacted data. + assert!(&self.progress_exclusive >= since_ts); + + // NB: This intentionally does not compact self.unapplied_batches, + // because we aren't allowed to alter those timestamps. This is fine + // because it and self.batch_idx are self-compacting, anyway. + if &self.since_ts < since_ts { + self.since_ts.clone_from(since_ts); + } else { + return; + } + self.compact_data_times() + } + /// Invariant: afterward, self.progress_exclusive will be > ts #[instrument(level = "debug", skip_all, fields(ts = ?ts))] pub(crate) async fn update_gt(&mut self, ts: &T) { @@ -396,11 +430,12 @@ impl .unapplied_batches .insert(idx, (data_id, batch, ts.clone())); assert_eq!(prev, None); - self.datas - .get_mut(&data_id) - .expect("data is initialized") - .writes - .push_back(ts); + let times = self.datas.get_mut(&data_id).expect("data is initialized"); + if times.writes.is_empty() { + self.datas_min_write_ts.push(Reverse((ts.clone(), data_id))); + } + times.writes.push_back(ts); + self.compact_data_times(); } else if diff == -1 { debug!( "cache learned {:.9} applied t={:?} b={}", @@ -425,6 +460,87 @@ impl } } + fn compact_data_times(&mut self) { + debug!( + "cache compact since={:?} min_writes={:?}", + self.since_ts, self.datas_min_write_ts + ); + loop { + // Repeatedly compact the data shard with the minimum write_ts and + // compact it until we get one at or past the since_ts. If that + // results in no registration and no writes, forget about the shard + // entirely, so it doesn't sit around forever. This will eventually + // finish because each data shard we compact will not meet the + // compaction criteria if we see it again. + // + // NB: This intentionally doesn't compact the registration + // timestamps if none of the writes need to be compacted, so that + // compaction isn't `O(compact calls * data shards)`. + let data_id = match self.datas_min_write_ts.peek() { + Some(Reverse((ts, _))) if ts < &self.since_ts => { + let Reverse((_, data_id)) = self.datas_min_write_ts.pop().expect("just peeked"); + data_id + } + Some(_) | None => break, + }; + let times = self + .datas + .get_mut(&data_id) + .expect("datas_min_write_ts should be an index into datas"); + debug!( + "cache compact {:.9} since={:?} times={:?}", + data_id.to_string(), + self.since_ts, + times + ); + + // Advance the registration times. + while let Some(x) = times.registered.front_mut() { + if x.register_ts < self.since_ts { + x.register_ts.clone_from(&self.since_ts); + } + if let Some(forget_ts) = x.forget_ts.as_ref() { + if forget_ts < &self.since_ts { + times.registered.pop_front(); + continue; + } + } + break; + } + + // Pop any write times before since_ts. We can stop as soon + // as something is >=. + while let Some(write_ts) = times.writes.front() { + if write_ts < &self.since_ts { + times.writes.pop_front(); + } else { + break; + } + } + + // Now re-insert it into datas_min_write_ts if non-empty. + if let Some(ts) = times.writes.front() { + self.datas_min_write_ts.push(Reverse((ts.clone(), data_id))); + } + + if times.registered.is_empty() { + assert!(times.writes.is_empty()); + self.datas.remove(&data_id); + } + debug!( + "cache compact {:.9} DONE since={:?} times={:?}", + data_id.to_string(), + self.since_ts, + self.datas.get(&data_id), + ); + } + debug!( + "cache compact DONE since={:?} min_writes={:?}", + self.since_ts, self.datas_min_write_ts + ); + debug_assert_eq!(self.validate(), Ok(())); + } + pub(crate) fn validate(&self) -> Result<(), String> { if self.batch_idx.len() != self.unapplied_batches.len() { return Err(format!( @@ -452,8 +568,29 @@ impl prev_ts = ts.clone(); } - for (_, data_times) in self.datas.iter() { - let () = data_times.validate()?; + for (data_id, data_times) in self.datas.iter() { + let () = data_times.validate(&self.since_ts)?; + + if let Some(ts) = data_times.writes.front() { + assert!(&self.since_ts <= ts); + // Oof, bummer to do this iteration. + assert!( + self.datas_min_write_ts + .iter() + .any(|Reverse((t, d))| t == ts && d == data_id), + "{:?} {:?} missing from {:?}", + data_id, + ts, + self.datas_min_write_ts + ); + } + } + + for Reverse((ts, data_id)) in self.datas_min_write_ts.iter() { + assert_eq!( + self.datas.get(data_id).unwrap().writes.front().as_ref(), + Some(&ts) + ); } Ok(()) @@ -472,6 +609,8 @@ pub(crate) struct DataTimes { /// - Everything in writes is in one of these intervals. pub(crate) registered: VecDeque>, /// Invariant: These are in increasing order. + /// + /// Invariant: Each of these is >= self.since_ts. pub(crate) writes: VecDeque, } @@ -507,7 +646,7 @@ impl DataTimes { self.registered.back().expect("at least one registration") } - pub(crate) fn validate(&self) -> Result<(), String> { + pub(crate) fn validate(&self, since_ts: &T) -> Result<(), String> { // Writes are sorted let mut prev_ts = T::minimum(); for ts in self.writes.iter() { @@ -517,6 +656,12 @@ impl DataTimes { ts, prev_ts )); } + if ts < since_ts { + return Err(format!( + "write ts {:?} not advanced past since ts {:?}", + ts, since_ts + )); + } prev_ts = ts.clone(); } @@ -531,9 +676,9 @@ impl DataTimes { )); } if let Some(forget_ts) = x.forget_ts.as_ref() { - if !(&x.register_ts < forget_ts) { + if !(&x.register_ts <= forget_ts) { return Err(format!( - "register ts {:?} not less than forget ts {:?}", + "register ts {:?} not less_equal forget ts {:?}", x.register_ts, forget_ts )); } @@ -594,8 +739,16 @@ pub struct DataSnapshot { impl DataSnapshot { /// Unblocks reading a snapshot at `self.as_of` by waiting for the latest /// write before that time and then running an empty CaA if necessary. + /// + /// Returns a frontier that is greater than the as_of and less_equal the + /// physical upper of the data shard. This is suitable for use as an initial + /// input to `TxnsCache::data_listen_next` (after reading up to it, of + /// course). #[instrument(level = "debug", skip_all, fields(shard = %self.data_id, ts = ?self.as_of))] - pub(crate) async fn unblock_read(&self, mut data_write: WriteHandle) + pub(crate) async fn unblock_read( + &self, + mut data_write: WriteHandle, + ) -> Antichain where K: Debug + Codec, V: Debug + Codec, @@ -615,13 +768,8 @@ impl DataSnapshot { } // Now fill `(latest_write,as_of]` with empty updates, so we can read - // the shard at as_of normally. - // - // In practice, because CaA takes an exclusive upper, we actually fill - // `(latest_write, empty_to)`. It is an invariant of DataSnapshot that - // this range is empty and that as_of is "in the middle". We really only - // care about as_of being readable, so exit the loop as soon as this is - // the case, even if the upper is not empty_to. + // the shard at as_of normally. In practice, because CaA takes an + // exclusive upper, we actually fill `(latest_write, empty_to)`. // // It's quite counter-intuitive for reads to involve writes, but I think // this is fine. In particular, because writing empty updates to a @@ -632,68 +780,44 @@ impl DataSnapshot { // timestamps to the most recent write and reading that. let Some(mut data_upper) = data_write.shared_upper().into_option() else { // If the upper is the empty antichain, then we've unblocked all - // possible reads. + // possible reads. Return early. debug!( "CaA data snapshot {:.9} shard finalized", self.data_id.to_string(), ); - return; + return Antichain::new(); }; - while data_upper <= self.as_of { + while data_upper < self.empty_to { // It would be very bad if we accidentally filled any times <= // latest_write with empty updates, so defensively assert on each // iteration through the loop. if let Some(latest_write) = self.latest_write.as_ref() { assert!(latest_write < &data_upper); } - debug!( - "CaA data snapshot {:.9} [{:?},{:?})", - self.data_id.to_string(), - data_upper, - self.empty_to, - ); - if let Some(latest_write) = self.latest_write.as_ref() { - assert!(latest_write <= &self.as_of); - } assert!(self.as_of < self.empty_to); - let res = data_write - .compare_and_append_batch( - &mut [], - Antichain::from_elem(data_upper.clone()), - Antichain::from_elem(self.empty_to.clone()), - ) - .await - .expect("usage was valid"); + let res = crate::small_caa( + || format!("data {:.9} unblock reads", self.data_id.to_string()), + &mut data_write, + &[], + data_upper.clone(), + self.empty_to.clone(), + ) + .await; match res { Ok(()) => { - debug!( - "CaA data snapshot {:.9} [{:?},{:?}) success", - self.data_id.to_string(), - data_upper, - self.empty_to, - ); - // Persist registers writes on the first write, so politely + // Persist registers writers on the first write, so politely // expire the writer we just created, but (as a performance // optimization) only if we actually wrote something. data_write.expire().await; break; } - Err(UpperMismatch { current, .. }) => { - let current = current - .into_option() - .expect("txns shard should not be closed"); - debug!( - "CaA data snapshot {:.9} [{:?},{:?}) mismatch actual={:?}", - self.data_id.to_string(), - data_upper, - self.empty_to, - current, - ); - data_upper = current; + Err(new_data_upper) => { + data_upper = new_data_upper; continue; } } } + Antichain::from_elem(self.empty_to.clone()) } /// See [ReadHandle::snapshot_and_fetch]. @@ -751,6 +875,9 @@ pub enum DataListenNext { /// shard. Wait for it to progress with `update_gt` and call /// `data_listen_next` again. WaitForTxnsProgress, + /// We've lost historical distinctions and can no longer answer queries + /// about times before the returned one. + CompactedTo(T), } #[cfg(test)] @@ -785,6 +912,7 @@ mod tests { .unwrap(); let mut ret = TxnsCache::open(txns_read).await; ret.update_gt(&init_ts).await; + ret.compact_to(&init_ts); ret } @@ -896,6 +1024,8 @@ mod tests { txns.expect_commit_at(3, d0, &[], &log).await; let mut cache = TxnsCache::expect_open(3, &txns).await; + + assert_eq!(cache.since_ts, 3); assert_eq!(cache.progress_exclusive, 4); // We can answer exclusive queries at <= progress. The data shard is not @@ -939,6 +1069,20 @@ mod tests { // assert_eq!(cache.data_listen_next(&d0, 4), ReadDataTo(7)); assert_eq!(cache.data_listen_next(&d0, 5), ReadDataTo(7)); assert_eq!(cache.data_listen_next(&d0, 6), ReadDataTo(7)); + + // Compacting to the current since is a no-op. + assert_eq!(cache.since_ts, 3); + cache.compact_to(&3); + assert_eq!(cache.data_listen_next(&d0, 5), ReadDataTo(7)); + + // Compacting to the first write doesn't change the answer. + cache.compact_to(&6); + assert_eq!(cache.data_listen_next(&d0, 6), ReadDataTo(7)); + + // Compacting past the first write means we can no longer ask about it. + cache.compact_to(&7); + assert_eq!(cache.data_listen_next(&d0, 6), CompactedTo(7)); + // assert_eq!(cache.data_listen_next(&d0, 7), WaitForTxnsProgress); } #[mz_ore::test(tokio::test)] @@ -999,7 +1143,8 @@ mod tests { }) } dt.writes = write_ts.into_iter().cloned().collect(); - dt.validate().map_err(|_| ()) + let since_ts = u64::minimum(); + dt.validate(&since_ts).map_err(|_| ()) } // Valid @@ -1007,6 +1152,7 @@ mod tests { assert_eq!(dt(&[1, 3], &[2]), Ok(())); assert_eq!(dt(&[1, 3, 5], &[2, 6, 7]), Ok(())); assert_eq!(dt(&[1, 3, 5], &[2, 6, 7]), Ok(())); + assert_eq!(dt(&[1, 1], &[1]), Ok(())); // Invalid assert_eq!(dt(&[], &[]), Err(())); diff --git a/src/persist-txn/src/txns.rs b/src/persist-txn/src/txns.rs index 17f97268684bd..3e6f63ca9038c 100644 --- a/src/persist-txn/src/txns.rs +++ b/src/persist-txn/src/txns.rs @@ -106,7 +106,6 @@ where { pub(crate) txns_cache: TxnsCache, pub(crate) txns_write: WriteHandle, - #[allow(dead_code)] // TODO(txn): Becomes used in the txns compaction PR. pub(crate) txns_since: SinceHandle, pub(crate) datas: DataHandles, } @@ -242,8 +241,6 @@ where }) .collect::>(); // If the txns_upper has passed register_ts, we can no longer write. - // Return success if we have nothing left to write or an Error if we - // do. if register_ts < txns_upper { debug!( "txns register {} at {:?} mismatch current={:?}", @@ -323,8 +320,6 @@ where }; // If the txns_upper has passed forget_ts, we can no longer write. - // Return success if we have nothing left to write or an Error if we - // do. if forget_ts < txns_upper { debug!( "txns forget {:.9} at {:?} mismatch current={:?}", @@ -337,8 +332,23 @@ where // Ensure the latest write has been applied, so we don't run into // any issues trying to apply it later. - let data_times = self.txns_cache.datas.get(&data_id); - if let Some(latest_write) = data_times.and_then(|x| x.writes.back()) { + // + // NB: It's _very_ important for correctness to get this from the + // unapplied batches (which compact themselves naturally) and not + // from the writes (which are artificially compacted based on when + // we need reads for). + let data_latest_unapplied = self + .txns_cache + .unapplied_batches + .values() + .rev() + .find(|(x, _, _)| x == &data_id); + if let Some((_, _, latest_write)) = data_latest_unapplied { + debug!( + "txns forget {:.9} applying latest write {:?}", + data_id.to_string(), + latest_write, + ); let latest_write = latest_write.clone(); let _tidy = self.apply_le(&latest_write).await; } @@ -372,7 +382,7 @@ where // won't get around to telling the caller that it's safe to use this // shard directly again. Presumably it will retry at some point. let () = crate::empty_caa( - || format!("txns forget fill {:.9}", data_id.to_string()), + || format!("txns {:.9} forget fill", data_id.to_string()), self.datas.get_write(&data_id).await, forget_ts, ) @@ -457,6 +467,28 @@ where Ok(()) } + /// Allows compaction to the txns shard as well as internal representations, + /// losing the ability to answer queries about times less_than since_ts. + /// + /// only call this from the singleton controller process + pub async fn compact_to(&mut self, mut since_ts: T) { + tracing::debug!("compact_to {:?}", since_ts); + // It's always safe to compact our internal representation past where + // the txns shard is physically compacted to, so do that regardless of + // whether the maybe_downgrade goes through. + self.txns_cache.update_gt(&since_ts).await; + self.txns_cache.compact_to(&since_ts); + + // NB: A critical invariant for how this all works is that we never + // allow the since of the txns shard to pass any unapplied writes, so + // reduce it as necessary. + let min_unapplied_ts = self.txns_cache.min_unapplied_ts(); + if min_unapplied_ts < &since_ts { + since_ts.clone_from(min_unapplied_ts); + } + crate::maybe_cads::(&mut self.txns_since, since_ts).await; + } + /// Returns the [ShardId] of the txns shard. pub fn txns_id(&self) -> ShardId { self.txns_write.shard_id() @@ -800,6 +832,10 @@ mod tests { ts += 1; write_directly(&mut d0_write, &format!("d{}", ts), ts, &log).await; step_some_past(&mut subs, ts).await; + if ts % 11 == 0 { + // ts - 1 because we just wrote directly, which doesn't advance txns + txns.compact_to(ts - 1).await; + } info!("{} register", ts); subs.push(txns.read_cache().expect_subscribe(&client, d0, ts)); @@ -808,6 +844,9 @@ mod tests { .await .unwrap(); step_some_past(&mut subs, ts).await; + if ts % 11 == 0 { + txns.compact_to(ts).await; + } info!("{} txns", ts); subs.push(txns.read_cache().expect_subscribe(&client, d0, ts)); @@ -815,12 +854,18 @@ mod tests { txns.expect_commit_at(ts, d0, &[&format!("t{}", ts)], &log) .await; step_some_past(&mut subs, ts).await; + if ts % 11 == 0 { + txns.compact_to(ts).await; + } info!("{} forget", ts); subs.push(txns.read_cache().expect_subscribe(&client, d0, ts)); ts += 1; txns.forget(ts, d0).await.unwrap(); step_some_past(&mut subs, ts).await; + if ts % 11 == 0 { + txns.compact_to(ts).await; + } } // Check all the subscribes. @@ -902,19 +947,27 @@ mod tests { impl StressWorker { pub async fn step(&mut self) { - debug!("{} step {} START ts={}", self.idx, self.step, self.ts); + debug!( + "stress {} step {} START ts={}", + self.idx, self.step, self.ts + ); let data_id = self.data_ids[usize::cast_from(self.rng.next_u64()) % self.data_ids.len()]; - match self.rng.next_u64() % 4 { + match self.rng.next_u64() % 5 { 0 => self.write(data_id).await, // The register and forget impls intentionally don't switch on // whether it's already registered to stress idempotence. 1 => self.register(data_id).await, 2 => self.forget(data_id).await, - 3 => self.start_read(data_id), + 3 => { + debug!("stress compact {:.9} to {}", data_id.to_string(), self.ts); + self.txns.txns_cache.update_ge(&self.ts).await; + self.txns.txns_cache.compact_to(&self.ts) + } + 4 => self.start_read(data_id), _ => unreachable!(""), } - debug!("{} step {} DONE ts={}", self.idx, self.step, self.ts); + debug!("stress {} step {} DONE ts={}", self.idx, self.step, self.ts); self.step += 1; } @@ -946,7 +999,11 @@ mod tests { } async fn write_via_txns(&mut self, data_id: ShardId) -> Result<(), u64> { - debug!("write_via_txns {:.9} at {}", data_id.to_string(), self.ts); + debug!( + "stress write_via_txns {:.9} at {}", + data_id.to_string(), + self.ts + ); let mut txn = self.txns.begin(); txn.tidy(std::mem::take(&mut self.tidy)); txn.write(&data_id, self.key(), (), 1).await; @@ -965,37 +1022,45 @@ mod tests { } async fn write_direct(&mut self, data_id: ShardId) -> Result<(), u64> { - debug!("write_direct {:.9} at {}", data_id.to_string(), self.ts); + debug!( + "stress write_direct {:.9} at {}", + data_id.to_string(), + self.ts + ); // First write an empty txn to ensure that the shard isn't // registered at this ts by someone else. self.txns.begin().commit_at(&mut self.txns, self.ts).await?; let mut write = writer(&self.txns.datas.client, data_id).await; - let mut current = write.shared_upper(); + let mut current = write.shared_upper().into_option().unwrap(); loop { - if !current.less_equal(&self.ts) { - return Err(current.into_option().unwrap()); + if !(current <= self.ts) { + return Err(current); } let key = self.key(); - let updates = [((key.clone(), ()), self.ts, 1)]; - let res = write - .compare_and_append(&updates, current, Antichain::from_elem(self.ts + 1)) - .await - .unwrap(); + let updates = [((&key, &()), &self.ts, 1)]; + let res = crate::small_caa( + || format!("data {:.9} direct", data_id.to_string()), + &mut write, + &updates, + current, + self.ts + 1, + ) + .await; match res { Ok(()) => { debug!("log {:.9} {} at {}", data_id.to_string(), key, self.ts); self.log.record((data_id, key, self.ts, 1)); return Ok(()); } - Err(err) => current = err.current, + Err(new_current) => current = new_current, } } } async fn register(&mut self, data_id: ShardId) { self.retry_ts_err(&mut |w: &mut StressWorker| { - debug!("register {:.9} at {}", data_id.to_string(), w.ts); + debug!("stress register {:.9} at {}", data_id.to_string(), w.ts); Box::pin(async move { let data_write = writer(&w.txns.datas.client, data_id).await; let _ = w.txns.register(w.ts, [data_write]).await?; @@ -1007,14 +1072,18 @@ mod tests { async fn forget(&mut self, data_id: ShardId) { self.retry_ts_err(&mut |w: &mut StressWorker| { - debug!("forget {:.9} at {}", data_id.to_string(), w.ts); + debug!("stress forget {:.9} at {}", data_id.to_string(), w.ts); Box::pin(async move { w.txns.forget(w.ts, data_id).await }) }) .await } fn start_read(&mut self, data_id: ShardId) { - debug!("start_read {:.9} at {}", data_id.to_string(), self.ts); + debug!( + "stress start_read {:.9} at {}", + data_id.to_string(), + self.ts + ); let client = self.txns.datas.client.clone(); let txns_id = self.txns.txns_id(); let as_of = self.ts;