Skip to content

Commit

Permalink
Merge pull request #22619 from danhhz/persist_txn_compaction
Browse files Browse the repository at this point in the history
persist-txn: compaction
  • Loading branch information
danhhz authored Oct 27, 2023
2 parents 9dd7d80 + d381993 commit 0cba4cf
Show file tree
Hide file tree
Showing 6 changed files with 503 additions and 142 deletions.
26 changes: 26 additions & 0 deletions doc/developer/design/20230705_v2_txn_management.md
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,32 @@ maintenance or a CRDB write, but this is also true for registering a reader. On
the balance, I think this is a _much_ better set of tradeoffs than the original
plan.

### Compaction

Compaction of data shards is initially delegated to the txns user (the storage
controller). Because txn writes intentionally never read data shards and in no
way depend on the sinces, the since of a data shard is free to be arbitrarily
far ahead of or behind the txns upper. Data shard reads, when run through the
above process, then follow the usual rules (can read at times beyond the since
but not beyond the upper).

Compaction of the txns shard relies on the following invariant that is carefully
maintained: every write less than the since of the txns shard has been applied.
Mechanically, this is accomplished by a critical since capability held
internally by the txns system. Any txn writer is free to advance it to a time
once it has proven that all writes before that time have been applied.

It is advantageous to compact the txns shard aggressively so that applied writes
are promptly consolidated out, minimizing the size. For a snapshot read at
`as_of`, we need to be able to distinguish when the latest write `<= as_of` has
been applied. The above invariant enables this as follows:

- If `as_of <= txns_shard.since()`, then the invariant guarantees that all
writes `<= as_of` have been applied, so we're free to read as described in the
section above.
- Otherwise, we haven't compacted `as_of` in the txns shard yet, and still have
perfect information about which writes happened when. We can look at the data shard upper to determine which have been applied.

### Forget

A data shard is removed from the txns set using a `forget` operation that writes
Expand Down
5 changes: 5 additions & 0 deletions src/persist-cli/src/maelstrom/txn_list_append_multi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ impl Transactor {
txn.tidy(std::mem::take(&mut self.tidy));
match txn.commit_at(&mut self.txns, write_ts).await {
Ok(maintenance) => {
// Aggressively allow the txns shard to compact. To
// exercise more edge cases, do it before we apply the
// newly committed txn.
self.txns.compact_to(write_ts).await;

debug!("req committed at read_ts={} write_ts={}", read_ts, write_ts);
let tidy = maintenance.apply(&mut self.txns).await;
self.tidy.merge(tidy);
Expand Down
48 changes: 40 additions & 8 deletions src/persist-txn/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,11 +287,6 @@ pub mod txn_read;
pub mod txn_write;
pub mod txns;

// TODO(txn):
// - Closing/deleting data shards.
// - Hold a critical since capability for each registered shard?
// - Figure out the compaction story for both txn and data shard.

/// The in-mem representation of an update in the txns shard.
#[derive(Debug)]
pub enum TxnsEntry {
Expand Down Expand Up @@ -654,7 +649,7 @@ pub mod tests {
.collect()
};
consolidate_updates(&mut expected);
let mut actual = actual.into_iter().collect();
let mut actual = actual.into_iter().filter(|(_, t, _)| t < &until).collect();
consolidate_updates(&mut actual);
// NB: Extra spaces after actual are so it lines up with expected.
tracing::debug!(
Expand All @@ -677,10 +672,16 @@ pub mod tests {
#[allow(ungated_async_fn_track_caller)]
#[track_caller]
pub async fn assert_snapshot(&self, data_id: ShardId, as_of: u64) {
self.assert_subscribe(data_id, as_of, as_of + 1).await;
}

#[allow(ungated_async_fn_track_caller)]
#[track_caller]
pub async fn assert_subscribe(&self, data_id: ShardId, as_of: u64, until: u64) {
let mut data_subscribe =
DataSubscribe::new("test", self.client.clone(), self.txns_id, data_id, as_of);
data_subscribe.step_past(as_of).await;
self.assert_eq(data_id, as_of, as_of + 1, data_subscribe.output().clone())
data_subscribe.step_past(until - 1).await;
self.assert_eq(data_id, as_of, until, data_subscribe.output().clone());
}
}

Expand Down Expand Up @@ -714,6 +715,37 @@ pub mod tests {
.expect("codecs should not change")
}

pub(crate) async fn write_directly(
ts: u64,
data_write: &mut WriteHandle<String, (), u64, i64>,
keys: &[&str],
log: &CommitLog,
) {
let data_id = data_write.shard_id();
let keys = keys.iter().map(|x| (*x).to_owned()).collect::<Vec<_>>();
let updates = keys.iter().map(|k| ((k, &()), &ts, 1)).collect::<Vec<_>>();
let mut current = data_write.shared_upper().into_option().unwrap();
loop {
let res = crate::small_caa(
|| format!("data {:.9} directly", data_id),
data_write,
&updates,
current,
ts + 1,
)
.await;
match res {
Ok(()) => {
for ((k, ()), t, d) in updates {
log.record((data_id, k.to_owned(), *t, d));
}
return;
}
Err(new_current) => current = new_current,
}
}
}

#[mz_ore::test(tokio::test)]
#[cfg_attr(miri, ignore)] // unsupported operation: returning ready events from epoll_wait is not yet implemented
async fn commit_log() {
Expand Down
108 changes: 100 additions & 8 deletions src/persist-txn/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use timely::progress::{Antichain, Timestamp};
use timely::scheduling::Scheduler;
use timely::worker::Worker;
use timely::{Data, WorkerConfig};
use tracing::debug;
use tracing::{debug, trace};

use crate::txn_read::{DataListenNext, TxnsCache};
use crate::{TxnsCodec, TxnsCodecDefault};
Expand Down Expand Up @@ -122,12 +122,19 @@ where
)
.await
.expect("schema shouldn't change");
let () = snap.unblock_read(data_write).await;
let empty_to = snap.unblock_read(data_write).await;
debug!(
"txns_progress({}) {:.9} starting as_of={:?} empty_to={:?}",
name,
data_id.to_string(),
as_of,
empty_to.elements()
);

// We've ensured that the data shard's physical upper is past as_of, so
// start by passing through data and frontier updates from the input
// until it is past the as_of.
let mut read_data_to = as_of.step_forward();
let mut read_data_to = empty_to;
let mut output_progress_exclusive = T::minimum();
loop {
// TODO(txn): Do something more specific when the input returns None
Expand All @@ -141,6 +148,12 @@ where
// disconnected.
Event::Data(_data_cap, data) => {
for data in data.drain(..) {
trace!(
"txns_progress({}) {:.9} emitting data {:?}",
name,
data_id.to_string(),
data
);
passthrough_output.give(&cap, data).await;
}
}
Expand All @@ -156,9 +169,15 @@ where
// frontier updates too.
if &output_progress_exclusive < input_progress_exclusive {
output_progress_exclusive.clone_from(input_progress_exclusive);
trace!(
"txns_progress({}) {:.9} downgrading cap to {:?}",
name,
data_id.to_string(),
output_progress_exclusive
);
cap.downgrade(&output_progress_exclusive);
}
if read_data_to <= output_progress_exclusive {
if read_data_to.less_equal(&output_progress_exclusive) {
break;
}
}
Expand All @@ -170,14 +189,16 @@ where
// find out what to do next given our current progress.
loop {
txns_cache.update_ge(&output_progress_exclusive).await;
txns_cache.compact_to(&output_progress_exclusive);
let data_listen_next =
txns_cache.data_listen_next(&data_id, output_progress_exclusive.clone());
debug!(
"txns_progress({}): data_listen_next {:.9} at {:?}: {:?}",
"txns_progress({}) {:.9} data_listen_next at {:?}({:?}): {:?}",
name,
data_id.to_string(),
output_progress_exclusive,
data_listen_next
read_data_to,
txns_cache.progress_exclusive,
data_listen_next,
);
match data_listen_next {
// We've caught up to the txns upper and we have to wait for
Expand All @@ -194,7 +215,7 @@ where
// The data shard got a write! Loop back above and pass
// through data until we see it.
DataListenNext::ReadDataTo(new_target) => {
read_data_to = new_target;
read_data_to = Antichain::from_elem(new_target);
break;
}
// We know there are no writes in
Expand All @@ -203,9 +224,21 @@ where
DataListenNext::EmitLogicalProgress(new_progress) => {
assert!(output_progress_exclusive < new_progress);
output_progress_exclusive = new_progress;
trace!(
"txns_progress({}) {:.9} downgrading cap to {:?}",
name,
data_id.to_string(),
output_progress_exclusive
);
cap.downgrade(&output_progress_exclusive);
continue;
}
DataListenNext::CompactedTo(since_ts) => {
unreachable!(
"internal logic error: {} unexpectedly compacted past {:?} to {:?}",
data_id, output_progress_exclusive, since_ts
)
}
}
}
}
Expand Down Expand Up @@ -355,6 +388,10 @@ impl DataSubscribe {
#[cfg(test)]
pub async fn step_past(&mut self, ts: u64) {
while self.txns.less_equal(&ts) {
trace!(
"progress at {:?}",
self.txns.with_frontier(|x| x.to_owned()).elements()
);
self.step();
tokio::task::yield_now().await;
}
Expand Down Expand Up @@ -438,4 +475,59 @@ mod tests {
log.assert_eq(d0, 5, 8, sub.output().clone());
}
}

#[mz_ore::test(tokio::test(flavor = "multi_thread"))]
#[cfg_attr(miri, ignore)] // too slow
async fn subscribe_shard_finalize() {
let client = PersistClient::new_for_tests().await;
let mut txns = TxnsHandle::expect_open(client.clone()).await;
let log = txns.new_log();
let d0 = txns.expect_register(1).await;

// Start the operator as_of the register ts.
let mut sub = txns.read_cache().expect_subscribe(&client, d0, 1);
sub.step_past(1).await;

// Write to it via txns.
txns.expect_commit_at(2, d0, &["foo"], &log).await;
sub.step_past(2).await;

// Unregister it.
txns.forget(3, d0).await.unwrap();
sub.step_past(3).await;

// TODO: Hard mode, see if we can get the rest of this test to work even
// _without_ the txns shard advancing.
txns.begin().commit_at(&mut txns, 7).await.unwrap();

// The operator should continue to emit data written directly even
// though it's no longer in the txns set.
let mut d0_write = writer(&client, d0).await;
let key = "bar".to_owned();
crate::small_caa(|| "test", &mut d0_write, &[((&key, &()), &5, 1)], 4, 6)
.await
.unwrap();
log.record((d0, key, 5, 1));
sub.step_past(4).await;

// Now finalize the shard to writes.
let () = d0_write
.compare_and_append_batch(&mut [], Antichain::from_elem(6), Antichain::new())
.await
.unwrap()
.unwrap();
while sub.txns.less_than(&u64::MAX) {
sub.step();
tokio::task::yield_now().await;
}

// Make sure we read the correct things.
log.assert_eq(d0, 1, u64::MAX, sub.output().clone());

// Also make sure that we can read the right things if we start up after
// the forget but before the direct write and ditto after the direct
// write.
log.assert_subscribe(d0, 4, u64::MAX).await;
log.assert_subscribe(d0, 6, u64::MAX).await;
}
}
Loading

0 comments on commit 0cba4cf

Please sign in to comment.