diff --git a/doc/developer/design/20230705_v2_txn_management.md b/doc/developer/design/20230705_v2_txn_management.md index 333ed48daf91..98bf70fcbbd3 100644 --- a/doc/developer/design/20230705_v2_txn_management.md +++ b/doc/developer/design/20230705_v2_txn_management.md @@ -365,6 +365,32 @@ maintenance or a CRDB write, but this is also true for registering a reader. On the balance, I think this is a _much_ better set of tradeoffs than the original plan. +### Compaction + +Compaction of data shards is initially delegated to the txns user (the storage +controller). Because txn writes intentionally never read data shards and in no +way depend on the sinces, the since of a data shard is free to be arbitrarily +far ahead of or behind the txns upper. Data shard reads, when run through the +above process, then follow the usual rules (can read at times beyond the since +but not beyond the upper). + +Compaction of the txns shard relies on the following invariant that is carefully +maintained: every write less than the since of the txns shard has been applied. +Mechanically, this is accomplished by a critical since capability held +internally by the txns system. Any txn writer is free to advance it to a time +once it has proven that all writes before that time have been applied. + +It is advantageous to compact the txns shard aggressively so that applied writes +are promptly consolidated out, minimizing the size. For a snapshot read at +`as_of`, we need to be able to distinguish when the latest write `<= as_of` has +been applied. The above invariant enables this as follows: + +- If `as_of <= txns_shard.since()`, then the invariant guarantees that all + writes `<= as_of` have been applied, so we're free to read as described in the + section above. +- Otherwise, we haven't compacted `as_of` in the txns shard yet, and still have + perfect information about which writes happened when. We can look at the data shard upper to determine which have been applied. + ### Forget A data shard is removed from the txns set using a `forget` operation that writes diff --git a/src/persist-cli/src/maelstrom/txn_list_append_multi.rs b/src/persist-cli/src/maelstrom/txn_list_append_multi.rs index 248da374db91..0b7801b5d12c 100644 --- a/src/persist-cli/src/maelstrom/txn_list_append_multi.rs +++ b/src/persist-cli/src/maelstrom/txn_list_append_multi.rs @@ -116,6 +116,11 @@ impl Transactor { txn.tidy(std::mem::take(&mut self.tidy)); match txn.commit_at(&mut self.txns, write_ts).await { Ok(maintenance) => { + // Aggressively allow the txns shard to compact. To + // exercise more edge cases, do it before we apply the + // newly committed txn. + self.txns.compact_to(write_ts).await; + debug!("req committed at read_ts={} write_ts={}", read_ts, write_ts); let tidy = maintenance.apply(&mut self.txns).await; self.tidy.merge(tidy); diff --git a/src/persist-txn/src/lib.rs b/src/persist-txn/src/lib.rs index 9918ba9c44be..b5ac1da31e95 100644 --- a/src/persist-txn/src/lib.rs +++ b/src/persist-txn/src/lib.rs @@ -287,11 +287,6 @@ pub mod txn_read; pub mod txn_write; pub mod txns; -// TODO(txn): -// - Closing/deleting data shards. -// - Hold a critical since capability for each registered shard? -// - Figure out the compaction story for both txn and data shard. - /// The in-mem representation of an update in the txns shard. #[derive(Debug)] pub enum TxnsEntry { @@ -654,7 +649,7 @@ pub mod tests { .collect() }; consolidate_updates(&mut expected); - let mut actual = actual.into_iter().collect(); + let mut actual = actual.into_iter().filter(|(_, t, _)| t < &until).collect(); consolidate_updates(&mut actual); // NB: Extra spaces after actual are so it lines up with expected. tracing::debug!( @@ -677,10 +672,16 @@ pub mod tests { #[allow(ungated_async_fn_track_caller)] #[track_caller] pub async fn assert_snapshot(&self, data_id: ShardId, as_of: u64) { + self.assert_subscribe(data_id, as_of, as_of + 1).await; + } + + #[allow(ungated_async_fn_track_caller)] + #[track_caller] + pub async fn assert_subscribe(&self, data_id: ShardId, as_of: u64, until: u64) { let mut data_subscribe = DataSubscribe::new("test", self.client.clone(), self.txns_id, data_id, as_of); - data_subscribe.step_past(as_of).await; - self.assert_eq(data_id, as_of, as_of + 1, data_subscribe.output().clone()) + data_subscribe.step_past(until - 1).await; + self.assert_eq(data_id, as_of, until, data_subscribe.output().clone()); } } @@ -714,6 +715,37 @@ pub mod tests { .expect("codecs should not change") } + pub(crate) async fn write_directly( + ts: u64, + data_write: &mut WriteHandle, + keys: &[&str], + log: &CommitLog, + ) { + let data_id = data_write.shard_id(); + let keys = keys.iter().map(|x| (*x).to_owned()).collect::>(); + let updates = keys.iter().map(|k| ((k, &()), &ts, 1)).collect::>(); + let mut current = data_write.shared_upper().into_option().unwrap(); + loop { + let res = crate::small_caa( + || format!("data {:.9} directly", data_id), + data_write, + &updates, + current, + ts + 1, + ) + .await; + match res { + Ok(()) => { + for ((k, ()), t, d) in updates { + log.record((data_id, k.to_owned(), *t, d)); + } + return; + } + Err(new_current) => current = new_current, + } + } + } + #[mz_ore::test(tokio::test)] #[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented async fn commit_log() { diff --git a/src/persist-txn/src/operator.rs b/src/persist-txn/src/operator.rs index fdd25fce0503..7adab37cbed7 100644 --- a/src/persist-txn/src/operator.rs +++ b/src/persist-txn/src/operator.rs @@ -32,7 +32,7 @@ use timely::progress::{Antichain, Timestamp}; use timely::scheduling::Scheduler; use timely::worker::Worker; use timely::{Data, WorkerConfig}; -use tracing::debug; +use tracing::{debug, trace}; use crate::txn_read::{DataListenNext, TxnsCache}; use crate::{TxnsCodec, TxnsCodecDefault}; @@ -122,12 +122,19 @@ where ) .await .expect("schema shouldn't change"); - let () = snap.unblock_read(data_write).await; + let empty_to = snap.unblock_read(data_write).await; + debug!( + "txns_progress({}) {:.9} starting as_of={:?} empty_to={:?}", + name, + data_id.to_string(), + as_of, + empty_to.elements() + ); // We've ensured that the data shard's physical upper is past as_of, so // start by passing through data and frontier updates from the input // until it is past the as_of. - let mut read_data_to = as_of.step_forward(); + let mut read_data_to = empty_to; let mut output_progress_exclusive = T::minimum(); loop { // TODO(txn): Do something more specific when the input returns None @@ -141,6 +148,12 @@ where // disconnected. Event::Data(_data_cap, data) => { for data in data.drain(..) { + trace!( + "txns_progress({}) {:.9} emitting data {:?}", + name, + data_id.to_string(), + data + ); passthrough_output.give(&cap, data).await; } } @@ -156,9 +169,15 @@ where // frontier updates too. if &output_progress_exclusive < input_progress_exclusive { output_progress_exclusive.clone_from(input_progress_exclusive); + trace!( + "txns_progress({}) {:.9} downgrading cap to {:?}", + name, + data_id.to_string(), + output_progress_exclusive + ); cap.downgrade(&output_progress_exclusive); } - if read_data_to <= output_progress_exclusive { + if read_data_to.less_equal(&output_progress_exclusive) { break; } } @@ -170,14 +189,16 @@ where // find out what to do next given our current progress. loop { txns_cache.update_ge(&output_progress_exclusive).await; + txns_cache.compact_to(&output_progress_exclusive); let data_listen_next = txns_cache.data_listen_next(&data_id, output_progress_exclusive.clone()); debug!( - "txns_progress({}): data_listen_next {:.9} at {:?}: {:?}", + "txns_progress({}) {:.9} data_listen_next at {:?}({:?}): {:?}", name, data_id.to_string(), - output_progress_exclusive, - data_listen_next + read_data_to, + txns_cache.progress_exclusive, + data_listen_next, ); match data_listen_next { // We've caught up to the txns upper and we have to wait for @@ -194,7 +215,7 @@ where // The data shard got a write! Loop back above and pass // through data until we see it. DataListenNext::ReadDataTo(new_target) => { - read_data_to = new_target; + read_data_to = Antichain::from_elem(new_target); break; } // We know there are no writes in @@ -203,9 +224,21 @@ where DataListenNext::EmitLogicalProgress(new_progress) => { assert!(output_progress_exclusive < new_progress); output_progress_exclusive = new_progress; + trace!( + "txns_progress({}) {:.9} downgrading cap to {:?}", + name, + data_id.to_string(), + output_progress_exclusive + ); cap.downgrade(&output_progress_exclusive); continue; } + DataListenNext::CompactedTo(since_ts) => { + unreachable!( + "internal logic error: {} unexpectedly compacted past {:?} to {:?}", + data_id, output_progress_exclusive, since_ts + ) + } } } } @@ -355,6 +388,10 @@ impl DataSubscribe { #[cfg(test)] pub async fn step_past(&mut self, ts: u64) { while self.txns.less_equal(&ts) { + trace!( + "progress at {:?}", + self.txns.with_frontier(|x| x.to_owned()).elements() + ); self.step(); tokio::task::yield_now().await; } @@ -438,4 +475,59 @@ mod tests { log.assert_eq(d0, 5, 8, sub.output().clone()); } } + + #[mz_ore::test(tokio::test(flavor = "multi_thread"))] + #[cfg_attr(miri, ignore)] // too slow + async fn subscribe_shard_finalize() { + let client = PersistClient::new_for_tests().await; + let mut txns = TxnsHandle::expect_open(client.clone()).await; + let log = txns.new_log(); + let d0 = txns.expect_register(1).await; + + // Start the operator as_of the register ts. + let mut sub = txns.read_cache().expect_subscribe(&client, d0, 1); + sub.step_past(1).await; + + // Write to it via txns. + txns.expect_commit_at(2, d0, &["foo"], &log).await; + sub.step_past(2).await; + + // Unregister it. + txns.forget(3, d0).await.unwrap(); + sub.step_past(3).await; + + // TODO: Hard mode, see if we can get the rest of this test to work even + // _without_ the txns shard advancing. + txns.begin().commit_at(&mut txns, 7).await.unwrap(); + + // The operator should continue to emit data written directly even + // though it's no longer in the txns set. + let mut d0_write = writer(&client, d0).await; + let key = "bar".to_owned(); + crate::small_caa(|| "test", &mut d0_write, &[((&key, &()), &5, 1)], 4, 6) + .await + .unwrap(); + log.record((d0, key, 5, 1)); + sub.step_past(4).await; + + // Now finalize the shard to writes. + let () = d0_write + .compare_and_append_batch(&mut [], Antichain::from_elem(6), Antichain::new()) + .await + .unwrap() + .unwrap(); + while sub.txns.less_than(&u64::MAX) { + sub.step(); + tokio::task::yield_now().await; + } + + // Make sure we read the correct things. + log.assert_eq(d0, 1, u64::MAX, sub.output().clone()); + + // Also make sure that we can read the right things if we start up after + // the forget but before the direct write and ditto after the direct + // write. + log.assert_subscribe(d0, 4, u64::MAX).await; + log.assert_subscribe(d0, 6, u64::MAX).await; + } } diff --git a/src/persist-txn/src/txn_read.rs b/src/persist-txn/src/txn_read.rs index 093b9acc73c2..957ce7b892fa 100644 --- a/src/persist-txn/src/txn_read.rs +++ b/src/persist-txn/src/txn_read.rs @@ -9,7 +9,8 @@ //! Interfaces for reading txn shards as well as data shards. -use std::collections::{BTreeMap, VecDeque}; +use std::cmp::Reverse; +use std::collections::{BTreeMap, BinaryHeap, VecDeque}; use std::fmt::Debug; use std::sync::Arc; @@ -17,7 +18,6 @@ use differential_dataflow::difference::Semigroup; use differential_dataflow::hashable::Hashable; use differential_dataflow::lattice::Lattice; use mz_ore::collections::HashMap; -use mz_persist_client::error::UpperMismatch; use mz_persist_client::read::{ListenEvent, ReadHandle, Since, Subscribe}; use mz_persist_client::write::WriteHandle; use mz_persist_client::{Diagnostics, PersistClient, ShardId}; @@ -70,6 +70,8 @@ use crate::{TxnsCodec, TxnsCodecDefault, TxnsEntry}; #[derive(Debug)] pub struct TxnsCache { txns_id: ShardId, + // Invariant: <= the minimum unapplied batch. + since_ts: T, pub(crate) progress_exclusive: T, txns_subscribe: Subscribe, @@ -80,11 +82,15 @@ pub struct TxnsCache, T)>, + pub(crate) unapplied_batches: BTreeMap, T)>, /// An index into `unapplied_batches` keyed by the serialized batch. batch_idx: HashMap, usize>, /// The times at which each data shard has been written. pub(crate) datas: BTreeMap>, + + /// Invariant: Contains the minimum write time (if any) for each value in + /// `self.datas`. + datas_min_write_ts: BinaryHeap>, } impl TxnsCache { @@ -122,22 +128,23 @@ impl } pub(crate) async fn from_read(txns_read: ReadHandle) -> Self { - // TODO(txn): Figure out the compaction story. This might require - // sorting inserts before retractions within each timestamp. let txns_id = txns_read.shard_id(); let as_of = txns_read.since().clone(); + let since_ts = as_of.as_option().expect("txns shard is not closed").clone(); let subscribe = txns_read .subscribe(as_of) .await .expect("handle holds a capability"); TxnsCache { txns_id, + since_ts, progress_exclusive: T::minimum(), txns_subscribe: subscribe, next_batch_id: 0, unapplied_batches: BTreeMap::new(), batch_idx: HashMap::new(), datas: BTreeMap::new(), + datas_min_write_ts: BinaryHeap::new(), } } @@ -230,6 +237,9 @@ impl data_times, ); + if ts < self.since_ts { + return CompactedTo(self.since_ts.clone()); + } let Some(data_times) = data_times else { // Not registered, maybe it will be in the future? In the meantime, // treat it like a normal shard (i.e. pass through reads) and check @@ -311,6 +321,30 @@ impl retractions.filter(|(batch_raw, _)| self.batch_idx.contains_key(*batch_raw)) } + /// Allows compaction to internal representations, losing the ability to + /// answer queries about times less_than since_ts. + /// + /// To ensure that this is not `O(data shard)`, we update them lazily as + /// they are touched in self.update. + /// + /// Callers must first wait for `update_ge` with the same or later timestamp + /// to return. Panics otherwise. + pub fn compact_to(&mut self, since_ts: &T) { + // Make things easier on ourselves by allowing update to work on + // un-compacted data. + assert!(&self.progress_exclusive >= since_ts); + + // NB: This intentionally does not compact self.unapplied_batches, + // because we aren't allowed to alter those timestamps. This is fine + // because it and self.batch_idx are self-compacting, anyway. + if &self.since_ts < since_ts { + self.since_ts.clone_from(since_ts); + } else { + return; + } + self.compact_data_times() + } + /// Invariant: afterward, self.progress_exclusive will be > ts #[instrument(level = "debug", skip_all, fields(ts = ?ts))] pub async fn update_gt(&mut self, ts: &T) { @@ -424,11 +458,12 @@ impl .unapplied_batches .insert(idx, (data_id, batch, ts.clone())); assert_eq!(prev, None); - self.datas - .get_mut(&data_id) - .expect("data is initialized") - .writes - .push_back(ts); + let times = self.datas.get_mut(&data_id).expect("data is initialized"); + if times.writes.is_empty() { + self.datas_min_write_ts.push(Reverse((ts.clone(), data_id))); + } + times.writes.push_back(ts); + self.compact_data_times(); } else if diff == -1 { debug!( "cache learned {:.9} applied t={:?} b={}", @@ -453,6 +488,87 @@ impl } } + fn compact_data_times(&mut self) { + debug!( + "cache compact since={:?} min_writes={:?}", + self.since_ts, self.datas_min_write_ts + ); + loop { + // Repeatedly grab the data shard with the minimum write_ts while it + // (the minimum since_ts) is less_than the since_ts. If that results + // in no registration and no writes, forget about the data shard + // entirely, so it doesn't sit around in the map forever. This will + // eventually finish because each data shard we compact will not + // meet the compaction criteria if we see it again. + // + // NB: This intentionally doesn't compact the registration + // timestamps if none of the writes need to be compacted, so that + // compaction isn't `O(compact calls * data shards)`. + let data_id = match self.datas_min_write_ts.peek() { + Some(Reverse((ts, _))) if ts < &self.since_ts => { + let Reverse((_, data_id)) = self.datas_min_write_ts.pop().expect("just peeked"); + data_id + } + Some(_) | None => break, + }; + let times = self + .datas + .get_mut(&data_id) + .expect("datas_min_write_ts should be an index into datas"); + debug!( + "cache compact {:.9} since={:?} times={:?}", + data_id.to_string(), + self.since_ts, + times + ); + + // Advance the registration times. + while let Some(x) = times.registered.front_mut() { + if x.register_ts < self.since_ts { + x.register_ts.clone_from(&self.since_ts); + } + if let Some(forget_ts) = x.forget_ts.as_ref() { + if forget_ts < &self.since_ts { + times.registered.pop_front(); + continue; + } + } + break; + } + + // Pop any write times before since_ts. We can stop as soon + // as something is >=. + while let Some(write_ts) = times.writes.front() { + if write_ts < &self.since_ts { + times.writes.pop_front(); + } else { + break; + } + } + + // Now re-insert it into datas_min_write_ts if non-empty. + if let Some(ts) = times.writes.front() { + self.datas_min_write_ts.push(Reverse((ts.clone(), data_id))); + } + + if times.registered.is_empty() { + assert!(times.writes.is_empty()); + self.datas.remove(&data_id); + } + debug!( + "cache compact {:.9} DONE since={:?} times={:?}", + data_id.to_string(), + self.since_ts, + self.datas.get(&data_id), + ); + } + debug!( + "cache compact DONE since={:?} min_writes={:?}", + self.since_ts, self.datas_min_write_ts + ); + debug_assert_eq!(self.validate(), Ok(())); + } + pub(crate) fn validate(&self) -> Result<(), String> { if self.batch_idx.len() != self.unapplied_batches.len() { return Err(format!( @@ -480,8 +596,29 @@ impl prev_ts = ts.clone(); } - for (_, data_times) in self.datas.iter() { - let () = data_times.validate()?; + for (data_id, data_times) in self.datas.iter() { + let () = data_times.validate(&self.since_ts)?; + + if let Some(ts) = data_times.writes.front() { + assert!(&self.since_ts <= ts); + // Oof, bummer to do this iteration. + assert!( + self.datas_min_write_ts + .iter() + .any(|Reverse((t, d))| t == ts && d == data_id), + "{:?} {:?} missing from {:?}", + data_id, + ts, + self.datas_min_write_ts + ); + } + } + + for Reverse((ts, data_id)) in self.datas_min_write_ts.iter() { + assert_eq!( + self.datas.get(data_id).unwrap().writes.front().as_ref(), + Some(&ts) + ); } Ok(()) @@ -500,6 +637,8 @@ pub(crate) struct DataTimes { /// - Everything in writes is in one of these intervals. pub(crate) registered: VecDeque>, /// Invariant: These are in increasing order. + /// + /// Invariant: Each of these is >= self.since_ts. pub(crate) writes: VecDeque, } @@ -535,7 +674,7 @@ impl DataTimes { self.registered.back().expect("at least one registration") } - pub(crate) fn validate(&self) -> Result<(), String> { + pub(crate) fn validate(&self, since_ts: &T) -> Result<(), String> { // Writes are sorted let mut prev_ts = T::minimum(); for ts in self.writes.iter() { @@ -545,6 +684,12 @@ impl DataTimes { ts, prev_ts )); } + if ts < since_ts { + return Err(format!( + "write ts {:?} not advanced past since ts {:?}", + ts, since_ts + )); + } prev_ts = ts.clone(); } @@ -559,9 +704,9 @@ impl DataTimes { )); } if let Some(forget_ts) = x.forget_ts.as_ref() { - if !(&x.register_ts < forget_ts) { + if !(&x.register_ts <= forget_ts) { return Err(format!( - "register ts {:?} not less than forget ts {:?}", + "register ts {:?} not less_equal forget ts {:?}", x.register_ts, forget_ts )); } @@ -622,8 +767,16 @@ pub struct DataSnapshot { impl DataSnapshot { /// Unblocks reading a snapshot at `self.as_of` by waiting for the latest /// write before that time and then running an empty CaA if necessary. + /// + /// Returns a frontier that is greater than the as_of and less_equal the + /// physical upper of the data shard. This is suitable for use as an initial + /// input to `TxnsCache::data_listen_next` (after reading up to it, of + /// course). #[instrument(level = "debug", skip_all, fields(shard = %self.data_id, ts = ?self.as_of))] - pub(crate) async fn unblock_read(&self, mut data_write: WriteHandle) + pub(crate) async fn unblock_read( + &self, + mut data_write: WriteHandle, + ) -> Antichain where K: Debug + Codec, V: Debug + Codec, @@ -643,13 +796,8 @@ impl DataSnapshot { } // Now fill `(latest_write,as_of]` with empty updates, so we can read - // the shard at as_of normally. - // - // In practice, because CaA takes an exclusive upper, we actually fill - // `(latest_write, empty_to)`. It is an invariant of DataSnapshot that - // this range is empty and that as_of is "in the middle". We really only - // care about as_of being readable, so exit the loop as soon as this is - // the case, even if the upper is not empty_to. + // the shard at as_of normally. In practice, because CaA takes an + // exclusive upper, we actually fill `(latest_write, empty_to)`. // // It's quite counter-intuitive for reads to involve writes, but I think // this is fine. In particular, because writing empty updates to a @@ -660,68 +808,44 @@ impl DataSnapshot { // timestamps to the most recent write and reading that. let Some(mut data_upper) = data_write.shared_upper().into_option() else { // If the upper is the empty antichain, then we've unblocked all - // possible reads. + // possible reads. Return early. debug!( "CaA data snapshot {:.9} shard finalized", self.data_id.to_string(), ); - return; + return Antichain::new(); }; - while data_upper <= self.as_of { + while data_upper < self.empty_to { // It would be very bad if we accidentally filled any times <= // latest_write with empty updates, so defensively assert on each // iteration through the loop. if let Some(latest_write) = self.latest_write.as_ref() { assert!(latest_write < &data_upper); } - debug!( - "CaA data snapshot {:.9} [{:?},{:?})", - self.data_id.to_string(), - data_upper, - self.empty_to, - ); - if let Some(latest_write) = self.latest_write.as_ref() { - assert!(latest_write <= &self.as_of); - } assert!(self.as_of < self.empty_to); - let res = data_write - .compare_and_append_batch( - &mut [], - Antichain::from_elem(data_upper.clone()), - Antichain::from_elem(self.empty_to.clone()), - ) - .await - .expect("usage was valid"); + let res = crate::small_caa( + || format!("data {:.9} unblock reads", self.data_id.to_string()), + &mut data_write, + &[], + data_upper.clone(), + self.empty_to.clone(), + ) + .await; match res { Ok(()) => { - debug!( - "CaA data snapshot {:.9} [{:?},{:?}) success", - self.data_id.to_string(), - data_upper, - self.empty_to, - ); - // Persist registers writes on the first write, so politely + // Persist registers writers on the first write, so politely // expire the writer we just created, but (as a performance // optimization) only if we actually wrote something. data_write.expire().await; break; } - Err(UpperMismatch { current, .. }) => { - let current = current - .into_option() - .expect("txns shard should not be closed"); - debug!( - "CaA data snapshot {:.9} [{:?},{:?}) mismatch actual={:?}", - self.data_id.to_string(), - data_upper, - self.empty_to, - current, - ); - data_upper = current; + Err(new_data_upper) => { + data_upper = new_data_upper; continue; } } } + Antichain::from_elem(self.empty_to.clone()) } /// See [ReadHandle::snapshot_and_fetch]. @@ -779,6 +903,9 @@ pub enum DataListenNext { /// shard. Wait for it to progress with `update_gt` and call /// `data_listen_next` again. WaitForTxnsProgress, + /// We've lost historical distinctions and can no longer answer queries + /// about times before the returned one. + CompactedTo(T), } #[cfg(test)] @@ -799,6 +926,7 @@ mod tests { ) -> Self { let mut ret = TxnsCache::open(&txns.datas.client, txns.txns_id()).await; ret.update_gt(&init_ts).await; + ret.compact_to(&init_ts); ret } @@ -836,6 +964,8 @@ mod tests { } } + // TODO(txn): Rewrite this test to exercise more edge cases, something like: + // registrations at `[2,4], [8,9]` and writes at 1, 3, 6, 10, 12. #[mz_ore::test(tokio::test)] #[cfg_attr(miri, ignore)] // too slow async fn data_snapshot() { @@ -901,6 +1031,8 @@ mod tests { assert_eq!(cache.data_snapshot(d0, 6), ds(d0, Some(6), 6, 8)); } + // TODO(txn): Rewrite this test to exercise more edge cases, something like: + // registrations at `[2,4], [8,9]` and writes at 1, 3, 6, 10, 12. #[mz_ore::test(tokio::test)] #[cfg_attr(miri, ignore)] // too slow async fn txns_cache_data_listen_next() { @@ -1013,7 +1145,8 @@ mod tests { }) } dt.writes = write_ts.into_iter().cloned().collect(); - dt.validate().map_err(|_| ()) + let since_ts = u64::minimum(); + dt.validate(&since_ts).map_err(|_| ()) } // Valid @@ -1021,6 +1154,7 @@ mod tests { assert_eq!(dt(&[1, 3], &[2]), Ok(())); assert_eq!(dt(&[1, 3, 5], &[2, 6, 7]), Ok(())); assert_eq!(dt(&[1, 3, 5], &[2, 6, 7]), Ok(())); + assert_eq!(dt(&[1, 1], &[1]), Ok(())); // Invalid assert_eq!(dt(&[], &[]), Err(())); diff --git a/src/persist-txn/src/txns.rs b/src/persist-txn/src/txns.rs index 17f97268684b..54af35e11609 100644 --- a/src/persist-txn/src/txns.rs +++ b/src/persist-txn/src/txns.rs @@ -106,7 +106,6 @@ where { pub(crate) txns_cache: TxnsCache, pub(crate) txns_write: WriteHandle, - #[allow(dead_code)] // TODO(txn): Becomes used in the txns compaction PR. pub(crate) txns_since: SinceHandle, pub(crate) datas: DataHandles, } @@ -242,8 +241,6 @@ where }) .collect::>(); // If the txns_upper has passed register_ts, we can no longer write. - // Return success if we have nothing left to write or an Error if we - // do. if register_ts < txns_upper { debug!( "txns register {} at {:?} mismatch current={:?}", @@ -323,8 +320,6 @@ where }; // If the txns_upper has passed forget_ts, we can no longer write. - // Return success if we have nothing left to write or an Error if we - // do. if forget_ts < txns_upper { debug!( "txns forget {:.9} at {:?} mismatch current={:?}", @@ -337,8 +332,23 @@ where // Ensure the latest write has been applied, so we don't run into // any issues trying to apply it later. - let data_times = self.txns_cache.datas.get(&data_id); - if let Some(latest_write) = data_times.and_then(|x| x.writes.back()) { + // + // NB: It's _very_ important for correctness to get this from the + // unapplied batches (which compact themselves naturally) and not + // from the writes (which are artificially compacted based on when + // we need reads for). + let data_latest_unapplied = self + .txns_cache + .unapplied_batches + .values() + .rev() + .find(|(x, _, _)| x == &data_id); + if let Some((_, _, latest_write)) = data_latest_unapplied { + debug!( + "txns forget {:.9} applying latest write {:?}", + data_id.to_string(), + latest_write, + ); let latest_write = latest_write.clone(); let _tidy = self.apply_le(&latest_write).await; } @@ -372,7 +382,7 @@ where // won't get around to telling the caller that it's safe to use this // shard directly again. Presumably it will retry at some point. let () = crate::empty_caa( - || format!("txns forget fill {:.9}", data_id.to_string()), + || format!("txns {:.9} forget fill", data_id.to_string()), self.datas.get_write(&data_id).await, forget_ts, ) @@ -457,6 +467,33 @@ where Ok(()) } + /// Allows compaction to the txns shard as well as internal representations, + /// losing the ability to answer queries about times less_than since_ts. + /// + /// In practice, this will likely only be called from the singleton + /// controller process. + pub async fn compact_to(&mut self, mut since_ts: T) { + tracing::debug!("compact_to {:?}", since_ts); + // This call to compact the cache only affects the write and + // registration times, not the unapplied batches. The unapplied batches + // have a very important correctness invariant to hold, are + // self-compacting as batches are applied, and are handled below. This + // means it's always safe to compact the cache past where the txns shard + // is physically compacted to, so do that regardless of min_unapplied_ts + // and of whether the maybe_downgrade goes through. + self.txns_cache.update_gt(&since_ts).await; + self.txns_cache.compact_to(&since_ts); + + // NB: A critical invariant for how this all works is that we never + // allow the since of the txns shard to pass any unapplied writes, so + // reduce it as necessary. + let min_unapplied_ts = self.txns_cache.min_unapplied_ts(); + if min_unapplied_ts < &since_ts { + since_ts.clone_from(min_unapplied_ts); + } + crate::maybe_cads::(&mut self.txns_since, since_ts).await; + } + /// Returns the [ShardId] of the txns shard. pub fn txns_id(&self) -> ShardId { self.txns_write.shard_id() @@ -544,12 +581,13 @@ where #[cfg(test)] mod tests { - use std::time::UNIX_EPOCH; + use std::time::{Duration, UNIX_EPOCH}; use differential_dataflow::Hashable; use futures::future::BoxFuture; use mz_ore::cast::CastFrom; use mz_persist_client::cache::PersistClientCache; + use mz_persist_client::cfg::{PersistParameters, RetryParameters}; use mz_persist_client::PersistLocation; use mz_persist_types::codec_impls::{StringSchema, UnitSchema}; use rand::rngs::SmallRng; @@ -559,7 +597,7 @@ mod tests { use tracing::{info, info_span, Instrument}; use crate::operator::DataSubscribe; - use crate::tests::{reader, writer, CommitLog}; + use crate::tests::{reader, write_directly, writer, CommitLog}; use super::*; @@ -741,28 +779,6 @@ mod tests { #[mz_ore::test(tokio::test)] #[cfg_attr(miri, ignore)] // too slow async fn register_forget() { - async fn write_directly( - data_write: &mut WriteHandle, - key: &str, - ts: u64, - log: &CommitLog, - ) { - let updates = [((key.to_owned(), ()), ts, 1)]; - let mut current = data_write.shared_upper(); - loop { - let res = data_write - .compare_and_append(&updates, current, Antichain::from_elem(ts + 1)) - .await - .unwrap(); - match res { - Ok(()) => { - log.record((data_write.shard_id(), key.to_owned(), ts, 1)); - return; - } - Err(err) => current = err.current, - } - } - } async fn step_some_past(subs: &mut Vec, ts: u64) { for (idx, sub) in subs.iter_mut().enumerate() { // Only step some of them to try to maximize edge cases. @@ -798,8 +814,12 @@ mod tests { info!("{} direct", ts); subs.push(txns.read_cache().expect_subscribe(&client, d0, ts)); ts += 1; - write_directly(&mut d0_write, &format!("d{}", ts), ts, &log).await; + write_directly(ts, &mut d0_write, &[&format!("d{}", ts)], &log).await; step_some_past(&mut subs, ts).await; + if ts % 11 == 0 { + // ts - 1 because we just wrote directly, which doesn't advance txns + txns.compact_to(ts - 1).await; + } info!("{} register", ts); subs.push(txns.read_cache().expect_subscribe(&client, d0, ts)); @@ -808,6 +828,9 @@ mod tests { .await .unwrap(); step_some_past(&mut subs, ts).await; + if ts % 11 == 0 { + txns.compact_to(ts).await; + } info!("{} txns", ts); subs.push(txns.read_cache().expect_subscribe(&client, d0, ts)); @@ -815,12 +838,18 @@ mod tests { txns.expect_commit_at(ts, d0, &[&format!("t{}", ts)], &log) .await; step_some_past(&mut subs, ts).await; + if ts % 11 == 0 { + txns.compact_to(ts).await; + } info!("{} forget", ts); subs.push(txns.read_cache().expect_subscribe(&client, d0, ts)); ts += 1; txns.forget(ts, d0).await.unwrap(); step_some_past(&mut subs, ts).await; + if ts % 11 == 0 { + txns.compact_to(ts).await; + } } // Check all the subscribes. @@ -902,19 +931,27 @@ mod tests { impl StressWorker { pub async fn step(&mut self) { - debug!("{} step {} START ts={}", self.idx, self.step, self.ts); + debug!( + "stress {} step {} START ts={}", + self.idx, self.step, self.ts + ); let data_id = self.data_ids[usize::cast_from(self.rng.next_u64()) % self.data_ids.len()]; - match self.rng.next_u64() % 4 { + match self.rng.next_u64() % 5 { 0 => self.write(data_id).await, // The register and forget impls intentionally don't switch on // whether it's already registered to stress idempotence. 1 => self.register(data_id).await, 2 => self.forget(data_id).await, - 3 => self.start_read(data_id), + 3 => { + debug!("stress compact {:.9} to {}", data_id.to_string(), self.ts); + self.txns.txns_cache.update_ge(&self.ts).await; + self.txns.txns_cache.compact_to(&self.ts) + } + 4 => self.start_read(data_id), _ => unreachable!(""), } - debug!("{} step {} DONE ts={}", self.idx, self.step, self.ts); + debug!("stress {} step {} DONE ts={}", self.idx, self.step, self.ts); self.step += 1; } @@ -946,7 +983,11 @@ mod tests { } async fn write_via_txns(&mut self, data_id: ShardId) -> Result<(), u64> { - debug!("write_via_txns {:.9} at {}", data_id.to_string(), self.ts); + debug!( + "stress write_via_txns {:.9} at {}", + data_id.to_string(), + self.ts + ); let mut txn = self.txns.begin(); txn.tidy(std::mem::take(&mut self.tidy)); txn.write(&data_id, self.key(), (), 1).await; @@ -965,37 +1006,45 @@ mod tests { } async fn write_direct(&mut self, data_id: ShardId) -> Result<(), u64> { - debug!("write_direct {:.9} at {}", data_id.to_string(), self.ts); + debug!( + "stress write_direct {:.9} at {}", + data_id.to_string(), + self.ts + ); // First write an empty txn to ensure that the shard isn't // registered at this ts by someone else. self.txns.begin().commit_at(&mut self.txns, self.ts).await?; let mut write = writer(&self.txns.datas.client, data_id).await; - let mut current = write.shared_upper(); + let mut current = write.shared_upper().into_option().unwrap(); loop { - if !current.less_equal(&self.ts) { - return Err(current.into_option().unwrap()); + if !(current <= self.ts) { + return Err(current); } let key = self.key(); - let updates = [((key.clone(), ()), self.ts, 1)]; - let res = write - .compare_and_append(&updates, current, Antichain::from_elem(self.ts + 1)) - .await - .unwrap(); + let updates = [((&key, &()), &self.ts, 1)]; + let res = crate::small_caa( + || format!("data {:.9} direct", data_id.to_string()), + &mut write, + &updates, + current, + self.ts + 1, + ) + .await; match res { Ok(()) => { debug!("log {:.9} {} at {}", data_id.to_string(), key, self.ts); self.log.record((data_id, key, self.ts, 1)); return Ok(()); } - Err(err) => current = err.current, + Err(new_current) => current = new_current, } } } async fn register(&mut self, data_id: ShardId) { self.retry_ts_err(&mut |w: &mut StressWorker| { - debug!("register {:.9} at {}", data_id.to_string(), w.ts); + debug!("stress register {:.9} at {}", data_id.to_string(), w.ts); Box::pin(async move { let data_write = writer(&w.txns.datas.client, data_id).await; let _ = w.txns.register(w.ts, [data_write]).await?; @@ -1007,14 +1056,18 @@ mod tests { async fn forget(&mut self, data_id: ShardId) { self.retry_ts_err(&mut |w: &mut StressWorker| { - debug!("forget {:.9} at {}", data_id.to_string(), w.ts); + debug!("stress forget {:.9} at {}", data_id.to_string(), w.ts); Box::pin(async move { w.txns.forget(w.ts, data_id).await }) }) .await } fn start_read(&mut self, data_id: ShardId) { - debug!("start_read {:.9} at {}", data_id.to_string(), self.ts); + debug!( + "stress start_read {:.9} at {}", + data_id.to_string(), + self.ts + ); let client = self.txns.datas.client.clone(); let txns_id = self.txns.txns_id(); let as_of = self.ts; @@ -1070,6 +1123,17 @@ mod tests { eprintln!("using seed {}", seed); let mut clients = PersistClientCache::new_no_metrics(); + // We disable pubsub below, so retune the listen retries (pubsub + // fallback) to keep the test speedy. + PersistParameters { + next_listen_batch_retryer: Some(RetryParameters { + initial_backoff: Duration::from_millis(1), + multiplier: 1, + clamp: Duration::from_millis(1), + }), + ..Default::default() + } + .apply(clients.cfg()); let client = clients.open(PersistLocation::new_in_mem()).await.unwrap(); let mut txns = TxnsHandle::expect_open(client.clone()).await; let log = txns.new_log(); @@ -1103,7 +1167,7 @@ mod tests { txns: TxnsHandle::expect_open_id(client.clone(), txns.txns_id()).await, data_ids: data_ids.clone(), tidy: Tidy::default(), - ts: register_ts + 1, + ts: register_ts, step: 0, rng: SmallRng::seed_from_u64(seed.wrapping_add(u64::cast_from(idx))), reads: Vec::new(), @@ -1126,16 +1190,24 @@ mod tests { reads.append(&mut r); } - info!("finished with max_ts of {}", max_ts); - txns.apply_le(&max_ts).await; - for data_id in data_ids { - log.assert_snapshot(data_id, max_ts).await; - } - info!("now waiting for reads {}", max_ts); - for (tx, data_id, as_of, subscribe) in reads { - let _ = tx.send(max_ts + 1); - let output = subscribe.await.unwrap(); - log.assert_eq(data_id, as_of, max_ts + 1, output); - } + // Run all of the following in a timeout to make hangs easier to debug. + tokio::time::timeout(Duration::from_secs(30), async { + info!("finished with max_ts of {}", max_ts); + txns.apply_le(&max_ts).await; + for data_id in data_ids { + info!("reading data shard {}", data_id); + log.assert_snapshot(data_id, max_ts) + .instrument(info_span!("read_data", data_id = format!("{:.9}", data_id))) + .await; + } + info!("now waiting for reads {}", max_ts); + for (tx, data_id, as_of, subscribe) in reads { + let _ = tx.send(max_ts + 1); + let output = subscribe.await.unwrap(); + log.assert_eq(data_id, as_of, max_ts + 1, output); + } + }) + .await + .unwrap(); } }