Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
274 changes: 234 additions & 40 deletions src/compute/src/sink/correction_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,14 @@ use std::cmp::Ordering;
use std::collections::{BinaryHeap, VecDeque};
use std::fmt;
use std::rc::Rc;
use std::sync::{Mutex, OnceLock};

use columnar::{Columnar, Index, Len, Ref};
use mz_ore::cast::CastLossy;
use mz_ore::soft_assert_or_log;
use mz_persist_client::metrics::{SinkMetrics, SinkWorkerMetrics, UpdateDelta};
use mz_repr::{Diff, Timestamp};
use mz_timely_util::column_pager::{self, PagedColumn};
use mz_timely_util::columnar::Column;
use mz_timely_util::temporal::{Bucket, BucketChain};
use timely::PartialOrder;
Expand Down Expand Up @@ -853,20 +855,31 @@ impl<D: Data> Chain<D> {
/// All updates in the chunk must sort after all updates already in the chain, in
/// (time, data)-order, to ensure the chain remains sorted.
fn push_chunk(&mut self, chunk: Chunk<D>) {
debug_assert!(self.can_accept(chunk.first()));
debug_assert!(self.can_accept_chunk(&chunk));

self.update_count += chunk.len();
self.chunks.push(chunk);
}

/// Return whether the chain can accept the given update.
/// Return whether the chain can accept the given chunk at its end while preserving
/// (time, data)-order.
///
/// A chain can accept an update if pushing it at the end upholds the (time, data)-order.
fn can_accept<'a>(&'a self, update: Ref<'a, (D, Timestamp, Diff)>) -> bool {
self.last().is_none_or(|(dc, tc, _)| {
let (d, t, _) = update;
(tc, dc) < (t, d)
})
/// Uses the cached boundary times and only materializes the boundary chunks when the times
/// tie (a single timestamp straddling the chunk boundary), so the common
/// strictly-increasing-time case checks the invariant without paging chunks in.
fn can_accept_chunk(&self, chunk: &Chunk<D>) -> bool {
match self.chunks.last() {
None => true,
Some(last) => match last.last_time().cmp(&chunk.first_time()) {
Ordering::Less => true,
Ordering::Greater => false,
Ordering::Equal => {
let (dc, _, _) = last.last();
let (d, _, _) = chunk.first();
dc < d
}
},
}
}

/// Return the last update in the chain, if any.
Expand Down Expand Up @@ -942,9 +955,11 @@ impl<D: Data> Chain<D> {
};

for chunk in self.chunks.drain(..) {
if chunk.last().1 < time {
// Route whole chunks by cached boundary times so a chunk that lands entirely on one
// side is moved without paging it in; only a straddling chunk is materialized.
if chunk.last_time() < time {
lower.push_chunk(chunk);
} else if chunk.first().1 >= time {
} else if chunk.first_time() >= time {
upper.push_chunk(chunk);
} else {
// The chunk straddles `time`; copy its two halves.
Expand Down Expand Up @@ -1318,8 +1333,27 @@ impl<D: Data> Cursor<D> {
/// boundary (~2 MiB, matching the ship granularity used elsewhere in the codebase), so each
/// chunk corresponds to a single, predictably sized allocation.
struct Chunk<D: Data> {
/// The contained updates.
data: Column<(D, Timestamp, Diff)>,
/// The paged-out form, taken on first materialization.
///
/// A `Mutex` (not `RefCell`) keeps the chunk `Sync`: cursors hold chunks behind a shared
/// `Rc`, and the iterator returned by [`CorrectionV2::updates_before`] borrows them across
/// the persist writer's `await`, so `&Chunk` must be `Send`. The lock is taken once, at
/// materialization, and is otherwise uncontended (the sink runs single-threaded per worker).
paged: Mutex<Option<PagedColumn<(D, Timestamp, Diff)>>>,
/// The materialized form, populated lazily by [`Chunk::column`] on first access.
///
/// An `OnceLock` (not `OnceCell`) for the same `Sync` reason. Once set the slot is never
/// cleared, so its address is stable and [`Chunk::index`] can hand out `Ref<'_>` borrows tied
/// to `&self`. The allocation is freed when the chunk drops, which bounds resident memory to
/// the chunks under an active merge front.
resident: OnceLock<Column<(D, Timestamp, Diff)>>,
/// Number of updates, cached so `len` and chain bookkeeping never page the chunk in.
len: usize,
/// Time of the first update, cached so boundary checks (`split_at_time`, `can_accept`) route
/// a resting chunk without materializing it.
first_time: Timestamp,
/// Time of the last update, cached likewise.
last_time: Timestamp,
}

impl<D: Data> fmt::Debug for Chunk<D> {
Expand All @@ -1329,44 +1363,96 @@ impl<D: Data> fmt::Debug for Chunk<D> {
}

impl<D: Data> Chunk<D> {
/// Wrap the given column into a chunk.
fn from_column(data: Column<(D, Timestamp, Diff)>) -> Self {
Self { data }
/// Page the given non-empty column out into a chunk.
///
/// Reads the cached metadata (length, boundary times) while the column is still resident, then
/// hands it to the global column pager. The policy decides whether it actually spills; either
/// way the chunk is born paged and materializes lazily on first read.
///
/// # Panics
///
/// Panics if the column is empty. Chunks are non-empty by construction; [`ChunkBuilder`] only
/// ever builds a chunk from a populated column.
fn from_column(mut data: Column<(D, Timestamp, Diff)>) -> Self {
let (len, first_time, last_time) = {
let borrowed = data.borrow();
let len = borrowed.len();
assert!(len > 0, "chunks are non-empty");
(len, borrowed.get(0).1, borrowed.get(len - 1).1)
};

let paged = column_pager::global_pager().page(&mut data);
Self {
paged: Mutex::new(Some(paged)),
resident: OnceLock::new(),
len,
first_time,
last_time,
}
}

/// Materialize the chunk's column, paging it in on first access.
///
/// The returned reference is valid for as long as `&self`: the `OnceLock` slot is never
/// cleared once populated, so its contents have a stable address.
fn column(&self) -> &Column<(D, Timestamp, Diff)> {
self.resident.get_or_init(|| {
let paged = self
.paged
.lock()
.expect("pager mutex poisoned")
.take()
.expect("paged form present until materialized");
column_pager::global_pager().take(paged)
})
}

/// Return the number of updates in the chunk.
fn len(&self) -> usize {
self.data.borrow().len()
self.len
}

/// Return the update at the given index.
/// Return the update at the given index, paging the chunk in if necessary.
///
/// # Panics
///
/// Panics if the given index is not populated.
fn index(&self, idx: usize) -> Ref<'_, (D, Timestamp, Diff)> {
self.data.borrow().get(idx)
self.column().borrow().get(idx)
}

/// Return the first update in the chunk.
/// Return the first update in the chunk, paging the chunk in if necessary.
fn first(&self) -> Ref<'_, (D, Timestamp, Diff)> {
self.index(0)
}

/// Return the last update in the chunk.
/// Return the last update in the chunk, paging the chunk in if necessary.
fn last(&self) -> Ref<'_, (D, Timestamp, Diff)> {
self.index(self.len() - 1)
self.index(self.len - 1)
}

/// Return the time of the first update, without materializing the chunk.
fn first_time(&self) -> Timestamp {
self.first_time
}

/// Return the time of the last update, without materializing the chunk.
fn last_time(&self) -> Timestamp {
self.last_time
}

/// Return the index of the first update at a time greater than `time`, or `None` if no such
/// update exists.
///
/// The early-out uses the cached last time, so a chunk whose updates are all at or before
/// `time` is skipped without paging it in.
fn find_time_greater_than(&self, time: Timestamp) -> Option<usize> {
if self.last().1 <= time {
if self.last_time <= time {
return None;
}

let mut lower = 0;
let mut upper = self.len();
let mut upper = self.len;
while lower < upper {
let idx = (lower + upper) / 2;
if self.index(idx).1 > time {
Expand All @@ -1380,12 +1466,28 @@ impl<D: Data> Chunk<D> {
}

/// Return the size of the chunk, for use in metrics.
///
/// Reports resident bytes only: a chunk still spilled (on swap or in a pager file) is not part
/// of RSS and contributes nothing, matching the accounting in
/// [`mz_timely_util::columnar::merge_batcher`].
fn get_size(&self) -> SizeMetrics {
let bytes = self.data.length_in_bytes();
SizeMetrics {
size: bytes,
capacity: bytes,
allocations: 1,
let resident = |col: &Column<(D, Timestamp, Diff)>| {
let bytes = col.length_in_bytes();
SizeMetrics {
size: bytes,
capacity: bytes,
allocations: 1,
}
};

if let Some(col) = self.resident.get() {
return resident(col);
}
// Not yet materialized: a policy that kept the column resident still occupies RSS, so
// account for it; a genuinely spilled column does not.
match &*self.paged.lock().expect("pager mutex poisoned") {
Some(PagedColumn::Resident(col, _)) => resident(col),
_ => SizeMetrics::default(),
}
}
}
Expand Down Expand Up @@ -1444,10 +1546,17 @@ impl<D: Data> ChunkBuilder<D> {
// `ColumnBuilder::finish` flushes the in-progress container into the pending queue
// (as `Column::Typed`) and returns the first pending entry. Subsequent calls drain
// the rest until `None`. Translate that into an owning iterator.
//
// `finish` can hand back an empty column (e.g. when the last shipped chunk landed exactly
// on the boundary). Skip those: `Chunk::from_column` requires a non-empty column, and an
// empty chunk would needlessly engage the pager.
std::iter::from_fn(move || {
self.inner
.finish()
.map(|c| Chunk::from_column(std::mem::take(c)))
loop {
let col = std::mem::take(self.inner.finish()?);
if col.borrow().len() > 0 {
return Some(Chunk::from_column(col));
}
}
})
}
}
Expand Down Expand Up @@ -1756,16 +1865,14 @@ mod tests {
}
let chain = builder.finish();

// At least one chunk must have been minted into `Column::Align` form, otherwise the
// spill path wouldn't be exercised.
let align_chunks = chain
.chunks
.iter()
.filter(|c| matches!(c.data, mz_timely_util::columnar::Column::Align(_)))
.count();
// Crossing the mint boundary must have produced more than one chunk; otherwise the spill
// path (each minted chunk is paged out and read back through the pager) wouldn't be
// exercised. The chunk payload itself is now behind the pager (see [`Chunk`]), so we
// assert on chunk count rather than inspecting the column variant directly.
assert!(
align_chunks > 0,
"expected at least one spilled (Align) chunk, got chunks: {:?}",
chain.chunks.len() > 1,
"expected multiple minted chunks, got {} chunk(s): {:?}",
chain.chunks.len(),
chain.chunks,
);

Expand Down Expand Up @@ -1930,4 +2037,91 @@ mod tests {
vec![("a".to_owned(), Timestamp::from(10_u64), Diff::ONE)]
);
}

/// A [`PagingPolicy`] that always spills to the swap backend, uncompressed.
///
/// The default global pager keeps every chunk resident; installing this drives the actual
/// spill path so the tests exercise [`Chunk::column`]'s page-in through [`mz_ore::pager`].
///
/// [`PagingPolicy`]: column_pager::PagingPolicy
struct ForceSwap;

impl column_pager::PagingPolicy for ForceSwap {
fn decide(&self, _hint: column_pager::PageHint) -> column_pager::PageDecision {
column_pager::PageDecision::Page {
backend: mz_ore::pager::Backend::Swap,
codec: None,
}
}
fn record(&self, _event: column_pager::PageEvent) {}
}

/// Install a global pager that spills every chunk to swap for the duration of `f`, then
/// restore the default (disabled) pager. The global pager is process-wide; concurrent tests
/// only ever observe a correct round-trip regardless of backend, so racing on it is benign.
fn with_swap_pager<R>(f: impl FnOnce() -> R) -> R {
use std::sync::Arc;
column_pager::set_global_pager(column_pager::ColumnPager::new(Arc::new(ForceSwap)));
let result = f();
column_pager::set_global_pager(column_pager::ColumnPager::disabled());
result
}

/// Build a chain crossing the mint boundary while every chunk is spilled to swap, then assert
/// `iter()` (the read path behind `updates_before`) pages each chunk back in and roundtrips
/// values, order, and diffs.
#[mz_ore::test]
#[cfg_attr(miri, ignore)] // madvise on the swap backend is unsupported under miri
fn iter_roundtrips_through_swap_backend() {
let count = 200_000_u64;
with_swap_pager(|| {
let mut builder = ChainBuilder::<i64>::default();
for i in 0..count {
let d = i64::try_from(i).expect("fits");
builder.push_owned(&(d, Timestamp::new(i), Diff::ONE));
}
let chain = builder.finish();
assert!(chain.chunks.len() > 1, "expected multiple minted chunks");
assert_eq!(chain.update_count, usize::try_from(count).expect("fits"));

let mut expected = 0_u64;
for (d, t, r) in chain.iter() {
assert_eq!(d, i64::try_from(expected).expect("fits"));
assert_eq!(t, Timestamp::new(expected));
assert_eq!(r, Diff::ONE);
expected += 1;
}
assert_eq!(expected, count);
});
}

/// Drive a [`Cursor`] over a spilled, multi-chunk chain to completion (the access pattern
/// merges use). Each step pages the front chunk back in via [`Chunk::column`]; assert the
/// cursor yields every update in order.
#[mz_ore::test]
#[cfg_attr(miri, ignore)] // madvise on the swap backend is unsupported under miri
fn cursor_steps_through_swap_backend() {
let count = 200_000_u64;
with_swap_pager(|| {
let mut builder = ChainBuilder::<i64>::default();
for i in 0..count {
let d = i64::try_from(i).expect("fits");
builder.push_owned(&(d, Timestamp::new(i), Diff::ONE));
}
let chain = builder.finish();
assert!(chain.chunks.len() > 1, "expected multiple minted chunks");

let mut rest = chain.into_cursor();
let mut expected = 0_u64;
while let Some(cursor) = rest.take() {
let (d, t, r) = cursor.get();
assert_eq!(i64::into_owned(d), i64::try_from(expected).expect("fits"));
assert_eq!(t, Timestamp::new(expected));
assert_eq!(r, Diff::ONE);
expected += 1;
rest = cursor.step();
}
assert_eq!(expected, count);
});
}
}
Loading