From cffe68c038b142e2b0487eb65db67d2707142eae Mon Sep 17 00:00:00 2001 From: Daniel Harrison <52528+danhhz@users.noreply.github.com> Date: Thu, 19 Oct 2023 08:32:34 -0700 Subject: [PATCH] persist-txn: compaction Compaction of data shards is initially delegated to the txns user (the storage controller). Because txn writes intentionally never read data shards and in no way depend on the sinces, the since of a data shard is free to be arbitrarily far ahead of or behind the txns upper. Data shard reads, when run through the above process, then follow the usual rules (can read at times beyond the since but not beyond the upper). Compaction of the txns shard relies on the following invariant that is carefully maintained: every write less than the since of the txns shard has been applied. Mechanically, this is accomplished by a critical since capability held internally by the txns system. Any txn writer is free to advance it to a time once it has proven that all writes before that time have been applied. It is advantageous to compact the txns shard aggressively so that applied writes are promptly consolidated out, minimizing the size. For a snapshot read at `as_of`, we need to be able to distinguish when the latest write `<= as_of` has been applied. The above invariant enables this as follows: - If `as_of <= txns_shard.since()`, then the invariant guarantees that all writes `<= as_of` have been applied, so we're free to read as described in the section above. - Otherwise, we haven't compacted `as_of` in the txns shard yet, and still have perfect information about which writes happened when. We can look at the data shard upper to determine which have been applied. --- .../design/20230705_v2_txn_management.md | 26 ++ .../src/maelstrom/txn_list_append_multi.rs | 5 + src/persist-txn/src/lib.rs | 5 - src/persist-txn/src/operator.rs | 38 ++- src/persist-txn/src/txn_read.rs | 272 ++++++++++++++---- src/persist-txn/src/txns.rs | 121 ++++++-- 6 files changed, 361 insertions(+), 106 deletions(-) diff --git a/doc/developer/design/20230705_v2_txn_management.md b/doc/developer/design/20230705_v2_txn_management.md index 333ed48daf91b..98bf70fcbbd37 100644 --- a/doc/developer/design/20230705_v2_txn_management.md +++ b/doc/developer/design/20230705_v2_txn_management.md @@ -365,6 +365,32 @@ maintenance or a CRDB write, but this is also true for registering a reader. On the balance, I think this is a _much_ better set of tradeoffs than the original plan. +### Compaction + +Compaction of data shards is initially delegated to the txns user (the storage +controller). Because txn writes intentionally never read data shards and in no +way depend on the sinces, the since of a data shard is free to be arbitrarily +far ahead of or behind the txns upper. Data shard reads, when run through the +above process, then follow the usual rules (can read at times beyond the since +but not beyond the upper). + +Compaction of the txns shard relies on the following invariant that is carefully +maintained: every write less than the since of the txns shard has been applied. +Mechanically, this is accomplished by a critical since capability held +internally by the txns system. Any txn writer is free to advance it to a time +once it has proven that all writes before that time have been applied. + +It is advantageous to compact the txns shard aggressively so that applied writes +are promptly consolidated out, minimizing the size. For a snapshot read at +`as_of`, we need to be able to distinguish when the latest write `<= as_of` has +been applied. The above invariant enables this as follows: + +- If `as_of <= txns_shard.since()`, then the invariant guarantees that all + writes `<= as_of` have been applied, so we're free to read as described in the + section above. +- Otherwise, we haven't compacted `as_of` in the txns shard yet, and still have + perfect information about which writes happened when. We can look at the data shard upper to determine which have been applied. + ### Forget A data shard is removed from the txns set using a `forget` operation that writes diff --git a/src/persist-cli/src/maelstrom/txn_list_append_multi.rs b/src/persist-cli/src/maelstrom/txn_list_append_multi.rs index 248da374db918..16967dda2d40b 100644 --- a/src/persist-cli/src/maelstrom/txn_list_append_multi.rs +++ b/src/persist-cli/src/maelstrom/txn_list_append_multi.rs @@ -116,6 +116,11 @@ impl Transactor { txn.tidy(std::mem::take(&mut self.tidy)); match txn.commit_at(&mut self.txns, write_ts).await { Ok(maintenance) => { + // Aggressively allow the txns shard compact. To + // exercise more edge cases, do it before we apply the + // newly committed txn. + self.txns.compact_to(write_ts).await; + debug!("req committed at read_ts={} write_ts={}", read_ts, write_ts); let tidy = maintenance.apply(&mut self.txns).await; self.tidy.merge(tidy); diff --git a/src/persist-txn/src/lib.rs b/src/persist-txn/src/lib.rs index 9918ba9c44be0..a7fcbb47eb588 100644 --- a/src/persist-txn/src/lib.rs +++ b/src/persist-txn/src/lib.rs @@ -287,11 +287,6 @@ pub mod txn_read; pub mod txn_write; pub mod txns; -// TODO(txn): -// - Closing/deleting data shards. -// - Hold a critical since capability for each registered shard? -// - Figure out the compaction story for both txn and data shard. - /// The in-mem representation of an update in the txns shard. #[derive(Debug)] pub enum TxnsEntry { diff --git a/src/persist-txn/src/operator.rs b/src/persist-txn/src/operator.rs index 97389c6436693..ae2255be7c92e 100644 --- a/src/persist-txn/src/operator.rs +++ b/src/persist-txn/src/operator.rs @@ -32,7 +32,7 @@ use timely::progress::{Antichain, Timestamp}; use timely::scheduling::Scheduler; use timely::worker::Worker; use timely::{Data, WorkerConfig}; -use tracing::debug; +use tracing::{debug, trace}; use crate::txn_read::{DataListenNext, TxnsCache}; use crate::{TxnsCodec, TxnsCodecDefault}; @@ -132,12 +132,16 @@ where ) .await .expect("schema shouldn't change"); - let () = snap.unblock_read(data_write).await; + let empty_to = snap.unblock_read(data_write).await; + debug!( + "txns_progress({}) {} starting as_of={:?} empty_to={:?}", + name, data_id, as_of, empty_to + ); // We've ensured that the data shard's physical upper is past as_of, so // start by passing through data and frontier updates from the input // until it is past the as_of. - let mut read_data_to = as_of.step_forward(); + let mut read_data_to = empty_to; let mut output_progress_exclusive = T::minimum(); loop { // TODO(txn): Do something more specific when the input returns None @@ -151,6 +155,7 @@ where // disconnected. Event::Data(_data_cap, data) => { for data in data.drain(..) { + trace!("txns_progress({}) emitting data {:?}", name, data); passthrough_output.give(&cap, data).await; } } @@ -166,9 +171,14 @@ where // frontier updates too. if &output_progress_exclusive < input_progress_exclusive { output_progress_exclusive.clone_from(input_progress_exclusive); + trace!( + "txns_progress({}) downgrading cap to {:?}", + name, + output_progress_exclusive + ); cap.downgrade(&output_progress_exclusive); } - if read_data_to <= output_progress_exclusive { + if read_data_to.less_than(&output_progress_exclusive) { break; } } @@ -180,15 +190,9 @@ where // find out what to do next given our current progress. loop { txns_cache.update_ge(&output_progress_exclusive).await; + txns_cache.compact_to(&output_progress_exclusive); let data_listen_next = txns_cache.data_listen_next(&data_id, output_progress_exclusive.clone()); - debug!( - "txns_progress({}): data_listen_next {:.9} at {:?}: {:?}", - name, - data_id.to_string(), - output_progress_exclusive, - data_listen_next - ); match data_listen_next { // We've caught up to the txns upper and we have to wait for // it to advance before asking again. @@ -204,7 +208,7 @@ where // The data shard got a write! Loop back above and pass // through data until we see it. DataListenNext::ReadDataTo(new_target) => { - read_data_to = new_target; + read_data_to = Antichain::from_elem(new_target); break; } // We know there are no writes in @@ -213,9 +217,19 @@ where DataListenNext::EmitLogicalProgress(new_progress) => { assert!(output_progress_exclusive < new_progress); output_progress_exclusive = new_progress; + debug!( + "txns_progress({}) downgrading cap to {:?}", + name, output_progress_exclusive + ); cap.downgrade(&output_progress_exclusive); continue; } + DataListenNext::CompactedTo(since_ts) => { + unreachable!( + "internal logic error: {} unexpectedly compacted past {:?} to {:?}", + data_id, output_progress_exclusive, since_ts + ) + } } } } diff --git a/src/persist-txn/src/txn_read.rs b/src/persist-txn/src/txn_read.rs index c16012a6b47cd..f2ae12d35306d 100644 --- a/src/persist-txn/src/txn_read.rs +++ b/src/persist-txn/src/txn_read.rs @@ -9,14 +9,14 @@ //! Interfaces for reading txn shards as well as data shards. -use std::collections::{BTreeMap, VecDeque}; +use std::cmp::Reverse; +use std::collections::{BTreeMap, BinaryHeap, VecDeque}; use std::fmt::Debug; use differential_dataflow::difference::Semigroup; use differential_dataflow::hashable::Hashable; use differential_dataflow::lattice::Lattice; use mz_ore::collections::HashMap; -use mz_persist_client::error::UpperMismatch; use mz_persist_client::read::{ListenEvent, ReadHandle, Since, Subscribe}; use mz_persist_client::write::WriteHandle; use mz_persist_client::ShardId; @@ -69,6 +69,8 @@ use crate::{TxnsCodec, TxnsCodecDefault, TxnsEntry}; #[derive(Debug)] pub struct TxnsCache { _txns_id: ShardId, + // Invariant: <= the minimum unapplied batch. + since_ts: T, pub(crate) progress_exclusive: T, txns_subscribe: Subscribe, @@ -79,11 +81,15 @@ pub struct TxnsCache, T)>, + pub(crate) unapplied_batches: BTreeMap, T)>, /// An index into `unapplied_batches` keyed by the serialized batch. batch_idx: HashMap, usize>, /// The times at which each data shard has been written. pub(crate) datas: BTreeMap>, + + /// Invariant: Contains the minimum write time (if any) for each value in + /// `self.datas`. + datas_min_write_ts: BinaryHeap>, } impl TxnsCache { @@ -100,21 +106,22 @@ impl pub(crate) async fn open(txns_read: ReadHandle) -> Self { let _txns_id = txns_read.shard_id(); - // TODO(txn): Figure out the compaction story. This might require - // sorting inserts before retractions within each timestamp. let as_of = txns_read.since().clone(); + let since_ts = as_of.as_option().expect("txns shard is not closed").clone(); let subscribe = txns_read .subscribe(as_of) .await .expect("handle holds a capability"); TxnsCache { _txns_id, + since_ts, progress_exclusive: T::minimum(), txns_subscribe: subscribe, next_batch_id: 0, unapplied_batches: BTreeMap::new(), batch_idx: HashMap::new(), datas: BTreeMap::new(), + datas_min_write_ts: BinaryHeap::new(), } } @@ -202,6 +209,9 @@ impl data_times, ); + if ts < self.since_ts { + return CompactedTo(self.since_ts.clone()); + } let Some(data_times) = data_times else { // Not registered, maybe it will be in the future? In the meantime, // treat it like a normal shard (i.e. pass through reads) and check @@ -283,6 +293,30 @@ impl retractions.filter(|(batch_raw, _)| self.batch_idx.contains_key(*batch_raw)) } + /// Allows compaction to internal representations, losing the ability to + /// answer queries about times less_than since_ts. + /// + /// To ensure that this is not `O(data shard)`, we update them lazily as + /// they are touched in self.update. + /// + /// Callers must first wait for `update_ge` with the same or later timestamp + /// to return. Panics otherwise. + pub fn compact_to(&mut self, since_ts: &T) { + // Make things easier on ourselves by allowing update to work on + // un-compacted data. + assert!(&self.progress_exclusive >= since_ts); + + // NB: This intentionally does not compact self.unapplied_batches, + // because we aren't allowed to alter those timestamps. This is fine + // because it and self.batch_idx are self-compacting, anyway. + if &self.since_ts < since_ts { + self.since_ts.clone_from(since_ts); + } else { + return; + } + self.compact_data_times() + } + /// Invariant: afterward, self.progress_exclusive will be > ts #[instrument(level = "debug", skip_all, fields(ts = ?ts))] pub(crate) async fn update_gt(&mut self, ts: &T) { @@ -396,11 +430,12 @@ impl .unapplied_batches .insert(idx, (data_id, batch, ts.clone())); assert_eq!(prev, None); - self.datas - .get_mut(&data_id) - .expect("data is initialized") - .writes - .push_back(ts); + let times = self.datas.get_mut(&data_id).expect("data is initialized"); + if times.writes.is_empty() { + self.datas_min_write_ts.push(Reverse((ts.clone(), data_id))); + } + times.writes.push_back(ts); + self.compact_data_times(); } else if diff == -1 { debug!( "cache learned {:.9} applied t={:?} b={}", @@ -425,6 +460,87 @@ impl } } + fn compact_data_times(&mut self) { + debug!( + "cache compact since={:?} min_writes={:?}", + self.since_ts, self.datas_min_write_ts + ); + loop { + // Repeatedly compact the data shard with the minimum write_ts and + // compact it until we get one at or past the since_ts. If that + // results in no registration and no writes, forget about the shard + // entirely, so it doesn't sit around forever. This will eventually + // finish because each data shard we compact will not meet the + // compaction criteria if we see it again. + // + // NB: This intentionally doesn't compact the registration + // timestamps if none of the writes need to be compacted, so that + // compaction isn't `O(compact calls * data shards)`. + let data_id = match self.datas_min_write_ts.peek() { + Some(Reverse((ts, _))) if ts < &self.since_ts => { + let Reverse((_, data_id)) = self.datas_min_write_ts.pop().expect("just peeked"); + data_id + } + Some(_) | None => break, + }; + let times = self + .datas + .get_mut(&data_id) + .expect("datas_min_write_ts should be an index into datas"); + debug!( + "cache compact {:.9} since={:?} times={:?}", + data_id.to_string(), + self.since_ts, + times + ); + + // Advance the registration times. + while let Some(x) = times.registered.front_mut() { + if x.register_ts < self.since_ts { + x.register_ts.clone_from(&self.since_ts); + } + if let Some(forget_ts) = x.forget_ts.as_ref() { + if forget_ts < &self.since_ts { + times.registered.pop_front(); + continue; + } + } + break; + } + + // Pop any write times before since_ts. We can stop as soon + // as something is >=. + while let Some(write_ts) = times.writes.front() { + if write_ts < &self.since_ts { + times.writes.pop_front(); + } else { + break; + } + } + + // Now re-insert it into datas_min_write_ts if non-empty. + if let Some(ts) = times.writes.front() { + self.datas_min_write_ts.push(Reverse((ts.clone(), data_id))); + } + + if times.registered.is_empty() { + assert!(times.writes.is_empty()); + self.datas.remove(&data_id); + } + debug!( + "cache compact {:.9} DONE since={:?} times={:?}", + data_id.to_string(), + self.since_ts, + self.datas.get(&data_id), + ); + } + debug!( + "cache compact DONE since={:?} min_writes={:?}", + self.since_ts, self.datas_min_write_ts + ); + debug_assert_eq!(self.validate(), Ok(())); + } + pub(crate) fn validate(&self) -> Result<(), String> { if self.batch_idx.len() != self.unapplied_batches.len() { return Err(format!( @@ -452,8 +568,29 @@ impl prev_ts = ts.clone(); } - for (_, data_times) in self.datas.iter() { - let () = data_times.validate()?; + for (data_id, data_times) in self.datas.iter() { + let () = data_times.validate(&self.since_ts)?; + + if let Some(ts) = data_times.writes.front() { + assert!(&self.since_ts <= ts); + // Oof, bummer to do this iteration. + assert!( + self.datas_min_write_ts + .iter() + .any(|Reverse((t, d))| t == ts && d == data_id), + "{:?} {:?} missing from {:?}", + data_id, + ts, + self.datas_min_write_ts + ); + } + } + + for Reverse((ts, data_id)) in self.datas_min_write_ts.iter() { + assert_eq!( + self.datas.get(data_id).unwrap().writes.front().as_ref(), + Some(&ts) + ); } Ok(()) @@ -472,6 +609,8 @@ pub(crate) struct DataTimes { /// - Everything in writes is in one of these intervals. pub(crate) registered: VecDeque>, /// Invariant: These are in increasing order. + /// + /// Invariant: Each of these is >= self.since_ts. pub(crate) writes: VecDeque, } @@ -507,7 +646,7 @@ impl DataTimes { self.registered.back().expect("at least one registration") } - pub(crate) fn validate(&self) -> Result<(), String> { + pub(crate) fn validate(&self, since_ts: &T) -> Result<(), String> { // Writes are sorted let mut prev_ts = T::minimum(); for ts in self.writes.iter() { @@ -517,6 +656,12 @@ impl DataTimes { ts, prev_ts )); } + if ts < since_ts { + return Err(format!( + "write ts {:?} not advanced past since ts {:?}", + ts, since_ts + )); + } prev_ts = ts.clone(); } @@ -531,9 +676,9 @@ impl DataTimes { )); } if let Some(forget_ts) = x.forget_ts.as_ref() { - if !(&x.register_ts < forget_ts) { + if !(&x.register_ts <= forget_ts) { return Err(format!( - "register ts {:?} not less than forget ts {:?}", + "register ts {:?} not less_equal forget ts {:?}", x.register_ts, forget_ts )); } @@ -594,8 +739,16 @@ pub struct DataSnapshot { impl DataSnapshot { /// Unblocks reading a snapshot at `self.as_of` by waiting for the latest /// write before that time and then running an empty CaA if necessary. + /// + /// Returns a frontier that is greater than the as_of and less_equal the + /// physical upper of the data shard. This is suitable for use as an initial + /// input to `TxnsCache::data_listen_next` (after reading up to it, of + /// course). #[instrument(level = "debug", skip_all, fields(shard = %self.data_id, ts = ?self.as_of))] - pub(crate) async fn unblock_read(&self, mut data_write: WriteHandle) + pub(crate) async fn unblock_read( + &self, + mut data_write: WriteHandle, + ) -> Antichain where K: Debug + Codec, V: Debug + Codec, @@ -615,13 +768,8 @@ impl DataSnapshot { } // Now fill `(latest_write,as_of]` with empty updates, so we can read - // the shard at as_of normally. - // - // In practice, because CaA takes an exclusive upper, we actually fill - // `(latest_write, empty_to)`. It is an invariant of DataSnapshot that - // this range is empty and that as_of is "in the middle". We really only - // care about as_of being readable, so exit the loop as soon as this is - // the case, even if the upper is not empty_to. + // the shard at as_of normally. In practice, because CaA takes an + // exclusive upper, we actually fill `(latest_write, empty_to)`. // // It's quite counter-intuitive for reads to involve writes, but I think // this is fine. In particular, because writing empty updates to a @@ -632,68 +780,44 @@ impl DataSnapshot { // timestamps to the most recent write and reading that. let Some(mut data_upper) = data_write.shared_upper().into_option() else { // If the upper is the empty antichain, then we've unblocked all - // possible reads. + // possible reads. Return early. debug!( "CaA data snapshot {:.9} shard finalized", self.data_id.to_string(), ); - return; + return Antichain::new(); }; - while data_upper <= self.as_of { + while data_upper < self.empty_to { // It would be very bad if we accidentally filled any times <= // latest_write with empty updates, so defensively assert on each // iteration through the loop. if let Some(latest_write) = self.latest_write.as_ref() { assert!(latest_write < &data_upper); } - debug!( - "CaA data snapshot {:.9} [{:?},{:?})", - self.data_id.to_string(), - data_upper, - self.empty_to, - ); - if let Some(latest_write) = self.latest_write.as_ref() { - assert!(latest_write <= &self.as_of); - } assert!(self.as_of < self.empty_to); - let res = data_write - .compare_and_append_batch( - &mut [], - Antichain::from_elem(data_upper.clone()), - Antichain::from_elem(self.empty_to.clone()), - ) - .await - .expect("usage was valid"); + let res = crate::small_caa( + || format!("data {:.9} unblock reads", self.data_id.to_string()), + &mut data_write, + &[], + data_upper.clone(), + self.empty_to.clone(), + ) + .await; match res { Ok(()) => { - debug!( - "CaA data snapshot {:.9} [{:?},{:?}) success", - self.data_id.to_string(), - data_upper, - self.empty_to, - ); - // Persist registers writes on the first write, so politely + // Persist registers writers on the first write, so politely // expire the writer we just created, but (as a performance // optimization) only if we actually wrote something. data_write.expire().await; break; } - Err(UpperMismatch { current, .. }) => { - let current = current - .into_option() - .expect("txns shard should not be closed"); - debug!( - "CaA data snapshot {:.9} [{:?},{:?}) mismatch actual={:?}", - self.data_id.to_string(), - data_upper, - self.empty_to, - current, - ); - data_upper = current; + Err(new_data_upper) => { + data_upper = new_data_upper; continue; } } } + Antichain::from_elem(self.empty_to.clone()) } /// See [ReadHandle::snapshot_and_fetch]. @@ -751,6 +875,9 @@ pub enum DataListenNext { /// shard. Wait for it to progress with `update_gt` and call /// `data_listen_next` again. WaitForTxnsProgress, + /// We've lost historical distinctions and can no longer answer queries + /// about times before the returned one. + CompactedTo(T), } #[cfg(test)] @@ -785,6 +912,7 @@ mod tests { .unwrap(); let mut ret = TxnsCache::open(txns_read).await; ret.update_gt(&init_ts).await; + ret.compact_to(&init_ts); ret } @@ -896,6 +1024,8 @@ mod tests { txns.expect_commit_at(3, d0, &[], &log).await; let mut cache = TxnsCache::expect_open(3, &txns).await; + + assert_eq!(cache.since_ts, 3); assert_eq!(cache.progress_exclusive, 4); // We can answer exclusive queries at <= progress. The data shard is not @@ -939,6 +1069,20 @@ mod tests { // assert_eq!(cache.data_listen_next(&d0, 4), ReadDataTo(7)); assert_eq!(cache.data_listen_next(&d0, 5), ReadDataTo(7)); assert_eq!(cache.data_listen_next(&d0, 6), ReadDataTo(7)); + + // Compacting to the current since is a no-op. + assert_eq!(cache.since_ts, 3); + cache.compact_to(&3); + assert_eq!(cache.data_listen_next(&d0, 5), ReadDataTo(7)); + + // Compacting to the first write doesn't change the answer. + cache.compact_to(&6); + assert_eq!(cache.data_listen_next(&d0, 6), ReadDataTo(7)); + + // Compacting past the first write means we can no longer ask about it. + cache.compact_to(&7); + assert_eq!(cache.data_listen_next(&d0, 6), CompactedTo(7)); + // assert_eq!(cache.data_listen_next(&d0, 7), WaitForTxnsProgress); } #[mz_ore::test(tokio::test)] @@ -999,7 +1143,8 @@ mod tests { }) } dt.writes = write_ts.into_iter().cloned().collect(); - dt.validate().map_err(|_| ()) + let since_ts = u64::minimum(); + dt.validate(&since_ts).map_err(|_| ()) } // Valid @@ -1007,6 +1152,7 @@ mod tests { assert_eq!(dt(&[1, 3], &[2]), Ok(())); assert_eq!(dt(&[1, 3, 5], &[2, 6, 7]), Ok(())); assert_eq!(dt(&[1, 3, 5], &[2, 6, 7]), Ok(())); + assert_eq!(dt(&[1, 1], &[1]), Ok(())); // Invalid assert_eq!(dt(&[], &[]), Err(())); diff --git a/src/persist-txn/src/txns.rs b/src/persist-txn/src/txns.rs index 17f97268684bd..3e6f63ca9038c 100644 --- a/src/persist-txn/src/txns.rs +++ b/src/persist-txn/src/txns.rs @@ -106,7 +106,6 @@ where { pub(crate) txns_cache: TxnsCache, pub(crate) txns_write: WriteHandle, - #[allow(dead_code)] // TODO(txn): Becomes used in the txns compaction PR. pub(crate) txns_since: SinceHandle, pub(crate) datas: DataHandles, } @@ -242,8 +241,6 @@ where }) .collect::>(); // If the txns_upper has passed register_ts, we can no longer write. - // Return success if we have nothing left to write or an Error if we - // do. if register_ts < txns_upper { debug!( "txns register {} at {:?} mismatch current={:?}", @@ -323,8 +320,6 @@ where }; // If the txns_upper has passed forget_ts, we can no longer write. - // Return success if we have nothing left to write or an Error if we - // do. if forget_ts < txns_upper { debug!( "txns forget {:.9} at {:?} mismatch current={:?}", @@ -337,8 +332,23 @@ where // Ensure the latest write has been applied, so we don't run into // any issues trying to apply it later. - let data_times = self.txns_cache.datas.get(&data_id); - if let Some(latest_write) = data_times.and_then(|x| x.writes.back()) { + // + // NB: It's _very_ important for correctness to get this from the + // unapplied batches (which compact themselves naturally) and not + // from the writes (which are artificially compacted based on when + // we need reads for). + let data_latest_unapplied = self + .txns_cache + .unapplied_batches + .values() + .rev() + .find(|(x, _, _)| x == &data_id); + if let Some((_, _, latest_write)) = data_latest_unapplied { + debug!( + "txns forget {:.9} applying latest write {:?}", + data_id.to_string(), + latest_write, + ); let latest_write = latest_write.clone(); let _tidy = self.apply_le(&latest_write).await; } @@ -372,7 +382,7 @@ where // won't get around to telling the caller that it's safe to use this // shard directly again. Presumably it will retry at some point. let () = crate::empty_caa( - || format!("txns forget fill {:.9}", data_id.to_string()), + || format!("txns {:.9} forget fill", data_id.to_string()), self.datas.get_write(&data_id).await, forget_ts, ) @@ -457,6 +467,28 @@ where Ok(()) } + /// Allows compaction to the txns shard as well as internal representations, + /// losing the ability to answer queries about times less_than since_ts. + /// + /// only call this from the singleton controller process + pub async fn compact_to(&mut self, mut since_ts: T) { + tracing::debug!("compact_to {:?}", since_ts); + // It's always safe to compact our internal representation past where + // the txns shard is physically compacted to, so do that regardless of + // whether the maybe_downgrade goes through. + self.txns_cache.update_gt(&since_ts).await; + self.txns_cache.compact_to(&since_ts); + + // NB: A critical invariant for how this all works is that we never + // allow the since of the txns shard to pass any unapplied writes, so + // reduce it as necessary. + let min_unapplied_ts = self.txns_cache.min_unapplied_ts(); + if min_unapplied_ts < &since_ts { + since_ts.clone_from(min_unapplied_ts); + } + crate::maybe_cads::(&mut self.txns_since, since_ts).await; + } + /// Returns the [ShardId] of the txns shard. pub fn txns_id(&self) -> ShardId { self.txns_write.shard_id() @@ -800,6 +832,10 @@ mod tests { ts += 1; write_directly(&mut d0_write, &format!("d{}", ts), ts, &log).await; step_some_past(&mut subs, ts).await; + if ts % 11 == 0 { + // ts - 1 because we just wrote directly, which doesn't advance txns + txns.compact_to(ts - 1).await; + } info!("{} register", ts); subs.push(txns.read_cache().expect_subscribe(&client, d0, ts)); @@ -808,6 +844,9 @@ mod tests { .await .unwrap(); step_some_past(&mut subs, ts).await; + if ts % 11 == 0 { + txns.compact_to(ts).await; + } info!("{} txns", ts); subs.push(txns.read_cache().expect_subscribe(&client, d0, ts)); @@ -815,12 +854,18 @@ mod tests { txns.expect_commit_at(ts, d0, &[&format!("t{}", ts)], &log) .await; step_some_past(&mut subs, ts).await; + if ts % 11 == 0 { + txns.compact_to(ts).await; + } info!("{} forget", ts); subs.push(txns.read_cache().expect_subscribe(&client, d0, ts)); ts += 1; txns.forget(ts, d0).await.unwrap(); step_some_past(&mut subs, ts).await; + if ts % 11 == 0 { + txns.compact_to(ts).await; + } } // Check all the subscribes. @@ -902,19 +947,27 @@ mod tests { impl StressWorker { pub async fn step(&mut self) { - debug!("{} step {} START ts={}", self.idx, self.step, self.ts); + debug!( + "stress {} step {} START ts={}", + self.idx, self.step, self.ts + ); let data_id = self.data_ids[usize::cast_from(self.rng.next_u64()) % self.data_ids.len()]; - match self.rng.next_u64() % 4 { + match self.rng.next_u64() % 5 { 0 => self.write(data_id).await, // The register and forget impls intentionally don't switch on // whether it's already registered to stress idempotence. 1 => self.register(data_id).await, 2 => self.forget(data_id).await, - 3 => self.start_read(data_id), + 3 => { + debug!("stress compact {:.9} to {}", data_id.to_string(), self.ts); + self.txns.txns_cache.update_ge(&self.ts).await; + self.txns.txns_cache.compact_to(&self.ts) + } + 4 => self.start_read(data_id), _ => unreachable!(""), } - debug!("{} step {} DONE ts={}", self.idx, self.step, self.ts); + debug!("stress {} step {} DONE ts={}", self.idx, self.step, self.ts); self.step += 1; } @@ -946,7 +999,11 @@ mod tests { } async fn write_via_txns(&mut self, data_id: ShardId) -> Result<(), u64> { - debug!("write_via_txns {:.9} at {}", data_id.to_string(), self.ts); + debug!( + "stress write_via_txns {:.9} at {}", + data_id.to_string(), + self.ts + ); let mut txn = self.txns.begin(); txn.tidy(std::mem::take(&mut self.tidy)); txn.write(&data_id, self.key(), (), 1).await; @@ -965,37 +1022,45 @@ mod tests { } async fn write_direct(&mut self, data_id: ShardId) -> Result<(), u64> { - debug!("write_direct {:.9} at {}", data_id.to_string(), self.ts); + debug!( + "stress write_direct {:.9} at {}", + data_id.to_string(), + self.ts + ); // First write an empty txn to ensure that the shard isn't // registered at this ts by someone else. self.txns.begin().commit_at(&mut self.txns, self.ts).await?; let mut write = writer(&self.txns.datas.client, data_id).await; - let mut current = write.shared_upper(); + let mut current = write.shared_upper().into_option().unwrap(); loop { - if !current.less_equal(&self.ts) { - return Err(current.into_option().unwrap()); + if !(current <= self.ts) { + return Err(current); } let key = self.key(); - let updates = [((key.clone(), ()), self.ts, 1)]; - let res = write - .compare_and_append(&updates, current, Antichain::from_elem(self.ts + 1)) - .await - .unwrap(); + let updates = [((&key, &()), &self.ts, 1)]; + let res = crate::small_caa( + || format!("data {:.9} direct", data_id.to_string()), + &mut write, + &updates, + current, + self.ts + 1, + ) + .await; match res { Ok(()) => { debug!("log {:.9} {} at {}", data_id.to_string(), key, self.ts); self.log.record((data_id, key, self.ts, 1)); return Ok(()); } - Err(err) => current = err.current, + Err(new_current) => current = new_current, } } } async fn register(&mut self, data_id: ShardId) { self.retry_ts_err(&mut |w: &mut StressWorker| { - debug!("register {:.9} at {}", data_id.to_string(), w.ts); + debug!("stress register {:.9} at {}", data_id.to_string(), w.ts); Box::pin(async move { let data_write = writer(&w.txns.datas.client, data_id).await; let _ = w.txns.register(w.ts, [data_write]).await?; @@ -1007,14 +1072,18 @@ mod tests { async fn forget(&mut self, data_id: ShardId) { self.retry_ts_err(&mut |w: &mut StressWorker| { - debug!("forget {:.9} at {}", data_id.to_string(), w.ts); + debug!("stress forget {:.9} at {}", data_id.to_string(), w.ts); Box::pin(async move { w.txns.forget(w.ts, data_id).await }) }) .await } fn start_read(&mut self, data_id: ShardId) { - debug!("start_read {:.9} at {}", data_id.to_string(), self.ts); + debug!( + "stress start_read {:.9} at {}", + data_id.to_string(), + self.ts + ); let client = self.txns.datas.client.clone(); let txns_id = self.txns.txns_id(); let as_of = self.ts;