Skip to content

Commit cffe68c

Browse files
committed
persist-txn: compaction
Compaction of data shards is initially delegated to the txns user (the storage controller). Because txn writes intentionally never read data shards and in no way depend on the sinces, the since of a data shard is free to be arbitrarily far ahead of or behind the txns upper. Data shard reads, when run through the above process, then follow the usual rules (can read at times beyond the since but not beyond the upper). Compaction of the txns shard relies on the following invariant that is carefully maintained: every write less than the since of the txns shard has been applied. Mechanically, this is accomplished by a critical since capability held internally by the txns system. Any txn writer is free to advance it to a time once it has proven that all writes before that time have been applied. It is advantageous to compact the txns shard aggressively so that applied writes are promptly consolidated out, minimizing the size. For a snapshot read at `as_of`, we need to be able to distinguish when the latest write `<= as_of` has been applied. The above invariant enables this as follows: - If `as_of <= txns_shard.since()`, then the invariant guarantees that all writes `<= as_of` have been applied, so we're free to read as described in the section above. - Otherwise, we haven't compacted `as_of` in the txns shard yet, and still have perfect information about which writes happened when. We can look at the data shard upper to determine which have been applied.
1 parent f138058 commit cffe68c

File tree

6 files changed

+361
-106
lines changed

6 files changed

+361
-106
lines changed

doc/developer/design/20230705_v2_txn_management.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,32 @@ maintenance or a CRDB write, but this is also true for registering a reader. On
365365
the balance, I think this is a _much_ better set of tradeoffs than the original
366366
plan.
367367

368+
### Compaction
369+
370+
Compaction of data shards is initially delegated to the txns user (the storage
371+
controller). Because txn writes intentionally never read data shards and in no
372+
way depend on the sinces, the since of a data shard is free to be arbitrarily
373+
far ahead of or behind the txns upper. Data shard reads, when run through the
374+
above process, then follow the usual rules (can read at times beyond the since
375+
but not beyond the upper).
376+
377+
Compaction of the txns shard relies on the following invariant that is carefully
378+
maintained: every write less than the since of the txns shard has been applied.
379+
Mechanically, this is accomplished by a critical since capability held
380+
internally by the txns system. Any txn writer is free to advance it to a time
381+
once it has proven that all writes before that time have been applied.
382+
383+
It is advantageous to compact the txns shard aggressively so that applied writes
384+
are promptly consolidated out, minimizing the size. For a snapshot read at
385+
`as_of`, we need to be able to distinguish when the latest write `<= as_of` has
386+
been applied. The above invariant enables this as follows:
387+
388+
- If `as_of <= txns_shard.since()`, then the invariant guarantees that all
389+
writes `<= as_of` have been applied, so we're free to read as described in the
390+
section above.
391+
- Otherwise, we haven't compacted `as_of` in the txns shard yet, and still have
392+
perfect information about which writes happened when. We can look at the data shard upper to determine which have been applied.
393+
368394
### Forget
369395

370396
A data shard is removed from the txns set using a `forget` operation that writes

src/persist-cli/src/maelstrom/txn_list_append_multi.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,11 @@ impl Transactor {
116116
txn.tidy(std::mem::take(&mut self.tidy));
117117
match txn.commit_at(&mut self.txns, write_ts).await {
118118
Ok(maintenance) => {
119+
// Aggressively allow the txns shard compact. To
120+
// exercise more edge cases, do it before we apply the
121+
// newly committed txn.
122+
self.txns.compact_to(write_ts).await;
123+
119124
debug!("req committed at read_ts={} write_ts={}", read_ts, write_ts);
120125
let tidy = maintenance.apply(&mut self.txns).await;
121126
self.tidy.merge(tidy);

src/persist-txn/src/lib.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -287,11 +287,6 @@ pub mod txn_read;
287287
pub mod txn_write;
288288
pub mod txns;
289289

290-
// TODO(txn):
291-
// - Closing/deleting data shards.
292-
// - Hold a critical since capability for each registered shard?
293-
// - Figure out the compaction story for both txn and data shard.
294-
295290
/// The in-mem representation of an update in the txns shard.
296291
#[derive(Debug)]
297292
pub enum TxnsEntry {

src/persist-txn/src/operator.rs

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use timely::progress::{Antichain, Timestamp};
3232
use timely::scheduling::Scheduler;
3333
use timely::worker::Worker;
3434
use timely::{Data, WorkerConfig};
35-
use tracing::debug;
35+
use tracing::{debug, trace};
3636

3737
use crate::txn_read::{DataListenNext, TxnsCache};
3838
use crate::{TxnsCodec, TxnsCodecDefault};
@@ -132,12 +132,16 @@ where
132132
)
133133
.await
134134
.expect("schema shouldn't change");
135-
let () = snap.unblock_read(data_write).await;
135+
let empty_to = snap.unblock_read(data_write).await;
136+
debug!(
137+
"txns_progress({}) {} starting as_of={:?} empty_to={:?}",
138+
name, data_id, as_of, empty_to
139+
);
136140

137141
// We've ensured that the data shard's physical upper is past as_of, so
138142
// start by passing through data and frontier updates from the input
139143
// until it is past the as_of.
140-
let mut read_data_to = as_of.step_forward();
144+
let mut read_data_to = empty_to;
141145
let mut output_progress_exclusive = T::minimum();
142146
loop {
143147
// TODO(txn): Do something more specific when the input returns None
@@ -151,6 +155,7 @@ where
151155
// disconnected.
152156
Event::Data(_data_cap, data) => {
153157
for data in data.drain(..) {
158+
trace!("txns_progress({}) emitting data {:?}", name, data);
154159
passthrough_output.give(&cap, data).await;
155160
}
156161
}
@@ -166,9 +171,14 @@ where
166171
// frontier updates too.
167172
if &output_progress_exclusive < input_progress_exclusive {
168173
output_progress_exclusive.clone_from(input_progress_exclusive);
174+
trace!(
175+
"txns_progress({}) downgrading cap to {:?}",
176+
name,
177+
output_progress_exclusive
178+
);
169179
cap.downgrade(&output_progress_exclusive);
170180
}
171-
if read_data_to <= output_progress_exclusive {
181+
if read_data_to.less_than(&output_progress_exclusive) {
172182
break;
173183
}
174184
}
@@ -180,15 +190,9 @@ where
180190
// find out what to do next given our current progress.
181191
loop {
182192
txns_cache.update_ge(&output_progress_exclusive).await;
193+
txns_cache.compact_to(&output_progress_exclusive);
183194
let data_listen_next =
184195
txns_cache.data_listen_next(&data_id, output_progress_exclusive.clone());
185-
debug!(
186-
"txns_progress({}): data_listen_next {:.9} at {:?}: {:?}",
187-
name,
188-
data_id.to_string(),
189-
output_progress_exclusive,
190-
data_listen_next
191-
);
192196
match data_listen_next {
193197
// We've caught up to the txns upper and we have to wait for
194198
// it to advance before asking again.
@@ -204,7 +208,7 @@ where
204208
// The data shard got a write! Loop back above and pass
205209
// through data until we see it.
206210
DataListenNext::ReadDataTo(new_target) => {
207-
read_data_to = new_target;
211+
read_data_to = Antichain::from_elem(new_target);
208212
break;
209213
}
210214
// We know there are no writes in
@@ -213,9 +217,19 @@ where
213217
DataListenNext::EmitLogicalProgress(new_progress) => {
214218
assert!(output_progress_exclusive < new_progress);
215219
output_progress_exclusive = new_progress;
220+
debug!(
221+
"txns_progress({}) downgrading cap to {:?}",
222+
name, output_progress_exclusive
223+
);
216224
cap.downgrade(&output_progress_exclusive);
217225
continue;
218226
}
227+
DataListenNext::CompactedTo(since_ts) => {
228+
unreachable!(
229+
"internal logic error: {} unexpectedly compacted past {:?} to {:?}",
230+
data_id, output_progress_exclusive, since_ts
231+
)
232+
}
219233
}
220234
}
221235
}

0 commit comments

Comments
 (0)