diff --git a/src/compute/src/sink/correction_v2.rs b/src/compute/src/sink/correction_v2.rs index 5a93b6c4d64c9..3dbc39c2fa076 100644 --- a/src/compute/src/sink/correction_v2.rs +++ b/src/compute/src/sink/correction_v2.rs @@ -147,12 +147,14 @@ use std::cmp::Ordering; use std::collections::{BinaryHeap, VecDeque}; use std::fmt; use std::rc::Rc; +use std::sync::{Mutex, OnceLock}; use columnar::{Columnar, Index, Len, Ref}; use mz_ore::cast::CastLossy; use mz_ore::soft_assert_or_log; use mz_persist_client::metrics::{SinkMetrics, SinkWorkerMetrics, UpdateDelta}; use mz_repr::{Diff, Timestamp}; +use mz_timely_util::column_pager::{self, PagedColumn}; use mz_timely_util::columnar::Column; use mz_timely_util::temporal::{Bucket, BucketChain}; use timely::PartialOrder; @@ -853,20 +855,31 @@ impl Chain { /// All updates in the chunk must sort after all updates already in the chain, in /// (time, data)-order, to ensure the chain remains sorted. fn push_chunk(&mut self, chunk: Chunk) { - debug_assert!(self.can_accept(chunk.first())); + debug_assert!(self.can_accept_chunk(&chunk)); self.update_count += chunk.len(); self.chunks.push(chunk); } - /// Return whether the chain can accept the given update. + /// Return whether the chain can accept the given chunk at its end while preserving + /// (time, data)-order. /// - /// A chain can accept an update if pushing it at the end upholds the (time, data)-order. - fn can_accept<'a>(&'a self, update: Ref<'a, (D, Timestamp, Diff)>) -> bool { - self.last().is_none_or(|(dc, tc, _)| { - let (d, t, _) = update; - (tc, dc) < (t, d) - }) + /// Uses the cached boundary times and only materializes the boundary chunks when the times + /// tie (a single timestamp straddling the chunk boundary), so the common + /// strictly-increasing-time case checks the invariant without paging chunks in. + fn can_accept_chunk(&self, chunk: &Chunk) -> bool { + match self.chunks.last() { + None => true, + Some(last) => match last.last_time().cmp(&chunk.first_time()) { + Ordering::Less => true, + Ordering::Greater => false, + Ordering::Equal => { + let (dc, _, _) = last.last(); + let (d, _, _) = chunk.first(); + dc < d + } + }, + } } /// Return the last update in the chain, if any. @@ -942,9 +955,11 @@ impl Chain { }; for chunk in self.chunks.drain(..) { - if chunk.last().1 < time { + // Route whole chunks by cached boundary times so a chunk that lands entirely on one + // side is moved without paging it in; only a straddling chunk is materialized. + if chunk.last_time() < time { lower.push_chunk(chunk); - } else if chunk.first().1 >= time { + } else if chunk.first_time() >= time { upper.push_chunk(chunk); } else { // The chunk straddles `time`; copy its two halves. @@ -1318,8 +1333,27 @@ impl Cursor { /// boundary (~2 MiB, matching the ship granularity used elsewhere in the codebase), so each /// chunk corresponds to a single, predictably sized allocation. struct Chunk { - /// The contained updates. - data: Column<(D, Timestamp, Diff)>, + /// The paged-out form, taken on first materialization. + /// + /// A `Mutex` (not `RefCell`) keeps the chunk `Sync`: cursors hold chunks behind a shared + /// `Rc`, and the iterator returned by [`CorrectionV2::updates_before`] borrows them across + /// the persist writer's `await`, so `&Chunk` must be `Send`. The lock is taken once, at + /// materialization, and is otherwise uncontended (the sink runs single-threaded per worker). + paged: Mutex>>, + /// The materialized form, populated lazily by [`Chunk::column`] on first access. + /// + /// An `OnceLock` (not `OnceCell`) for the same `Sync` reason. Once set the slot is never + /// cleared, so its address is stable and [`Chunk::index`] can hand out `Ref<'_>` borrows tied + /// to `&self`. The allocation is freed when the chunk drops, which bounds resident memory to + /// the chunks under an active merge front. + resident: OnceLock>, + /// Number of updates, cached so `len` and chain bookkeeping never page the chunk in. + len: usize, + /// Time of the first update, cached so boundary checks (`split_at_time`, `can_accept`) route + /// a resting chunk without materializing it. + first_time: Timestamp, + /// Time of the last update, cached likewise. + last_time: Timestamp, } impl fmt::Debug for Chunk { @@ -1329,44 +1363,96 @@ impl fmt::Debug for Chunk { } impl Chunk { - /// Wrap the given column into a chunk. - fn from_column(data: Column<(D, Timestamp, Diff)>) -> Self { - Self { data } + /// Page the given non-empty column out into a chunk. + /// + /// Reads the cached metadata (length, boundary times) while the column is still resident, then + /// hands it to the global column pager. The policy decides whether it actually spills; either + /// way the chunk is born paged and materializes lazily on first read. + /// + /// # Panics + /// + /// Panics if the column is empty. Chunks are non-empty by construction; [`ChunkBuilder`] only + /// ever builds a chunk from a populated column. + fn from_column(mut data: Column<(D, Timestamp, Diff)>) -> Self { + let (len, first_time, last_time) = { + let borrowed = data.borrow(); + let len = borrowed.len(); + assert!(len > 0, "chunks are non-empty"); + (len, borrowed.get(0).1, borrowed.get(len - 1).1) + }; + + let paged = column_pager::global_pager().page(&mut data); + Self { + paged: Mutex::new(Some(paged)), + resident: OnceLock::new(), + len, + first_time, + last_time, + } + } + + /// Materialize the chunk's column, paging it in on first access. + /// + /// The returned reference is valid for as long as `&self`: the `OnceLock` slot is never + /// cleared once populated, so its contents have a stable address. + fn column(&self) -> &Column<(D, Timestamp, Diff)> { + self.resident.get_or_init(|| { + let paged = self + .paged + .lock() + .expect("pager mutex poisoned") + .take() + .expect("paged form present until materialized"); + column_pager::global_pager().take(paged) + }) } /// Return the number of updates in the chunk. fn len(&self) -> usize { - self.data.borrow().len() + self.len } - /// Return the update at the given index. + /// Return the update at the given index, paging the chunk in if necessary. /// /// # Panics /// /// Panics if the given index is not populated. fn index(&self, idx: usize) -> Ref<'_, (D, Timestamp, Diff)> { - self.data.borrow().get(idx) + self.column().borrow().get(idx) } - /// Return the first update in the chunk. + /// Return the first update in the chunk, paging the chunk in if necessary. fn first(&self) -> Ref<'_, (D, Timestamp, Diff)> { self.index(0) } - /// Return the last update in the chunk. + /// Return the last update in the chunk, paging the chunk in if necessary. fn last(&self) -> Ref<'_, (D, Timestamp, Diff)> { - self.index(self.len() - 1) + self.index(self.len - 1) + } + + /// Return the time of the first update, without materializing the chunk. + fn first_time(&self) -> Timestamp { + self.first_time + } + + /// Return the time of the last update, without materializing the chunk. + fn last_time(&self) -> Timestamp { + self.last_time } /// Return the index of the first update at a time greater than `time`, or `None` if no such /// update exists. + /// + /// The early-out uses the cached last time, so a chunk whose updates are all at or before + /// `time` is skipped without paging it in. fn find_time_greater_than(&self, time: Timestamp) -> Option { - if self.last().1 <= time { + if self.last_time <= time { return None; } let mut lower = 0; - let mut upper = self.len(); + let mut upper = self.len; while lower < upper { let idx = (lower + upper) / 2; if self.index(idx).1 > time { @@ -1380,12 +1466,28 @@ impl Chunk { } /// Return the size of the chunk, for use in metrics. + /// + /// Reports resident bytes only: a chunk still spilled (on swap or in a pager file) is not part + /// of RSS and contributes nothing, matching the accounting in + /// [`mz_timely_util::columnar::merge_batcher`]. fn get_size(&self) -> SizeMetrics { - let bytes = self.data.length_in_bytes(); - SizeMetrics { - size: bytes, - capacity: bytes, - allocations: 1, + let resident = |col: &Column<(D, Timestamp, Diff)>| { + let bytes = col.length_in_bytes(); + SizeMetrics { + size: bytes, + capacity: bytes, + allocations: 1, + } + }; + + if let Some(col) = self.resident.get() { + return resident(col); + } + // Not yet materialized: a policy that kept the column resident still occupies RSS, so + // account for it; a genuinely spilled column does not. + match &*self.paged.lock().expect("pager mutex poisoned") { + Some(PagedColumn::Resident(col, _)) => resident(col), + _ => SizeMetrics::default(), } } } @@ -1444,10 +1546,17 @@ impl ChunkBuilder { // `ColumnBuilder::finish` flushes the in-progress container into the pending queue // (as `Column::Typed`) and returns the first pending entry. Subsequent calls drain // the rest until `None`. Translate that into an owning iterator. + // + // `finish` can hand back an empty column (e.g. when the last shipped chunk landed exactly + // on the boundary). Skip those: `Chunk::from_column` requires a non-empty column, and an + // empty chunk would needlessly engage the pager. std::iter::from_fn(move || { - self.inner - .finish() - .map(|c| Chunk::from_column(std::mem::take(c))) + loop { + let col = std::mem::take(self.inner.finish()?); + if col.borrow().len() > 0 { + return Some(Chunk::from_column(col)); + } + } }) } } @@ -1756,16 +1865,14 @@ mod tests { } let chain = builder.finish(); - // At least one chunk must have been minted into `Column::Align` form, otherwise the - // spill path wouldn't be exercised. - let align_chunks = chain - .chunks - .iter() - .filter(|c| matches!(c.data, mz_timely_util::columnar::Column::Align(_))) - .count(); + // Crossing the mint boundary must have produced more than one chunk; otherwise the spill + // path (each minted chunk is paged out and read back through the pager) wouldn't be + // exercised. The chunk payload itself is now behind the pager (see [`Chunk`]), so we + // assert on chunk count rather than inspecting the column variant directly. assert!( - align_chunks > 0, - "expected at least one spilled (Align) chunk, got chunks: {:?}", + chain.chunks.len() > 1, + "expected multiple minted chunks, got {} chunk(s): {:?}", + chain.chunks.len(), chain.chunks, ); @@ -1930,4 +2037,91 @@ mod tests { vec![("a".to_owned(), Timestamp::from(10_u64), Diff::ONE)] ); } + + /// A [`PagingPolicy`] that always spills to the swap backend, uncompressed. + /// + /// The default global pager keeps every chunk resident; installing this drives the actual + /// spill path so the tests exercise [`Chunk::column`]'s page-in through [`mz_ore::pager`]. + /// + /// [`PagingPolicy`]: column_pager::PagingPolicy + struct ForceSwap; + + impl column_pager::PagingPolicy for ForceSwap { + fn decide(&self, _hint: column_pager::PageHint) -> column_pager::PageDecision { + column_pager::PageDecision::Page { + backend: mz_ore::pager::Backend::Swap, + codec: None, + } + } + fn record(&self, _event: column_pager::PageEvent) {} + } + + /// Install a global pager that spills every chunk to swap for the duration of `f`, then + /// restore the default (disabled) pager. The global pager is process-wide; concurrent tests + /// only ever observe a correct round-trip regardless of backend, so racing on it is benign. + fn with_swap_pager(f: impl FnOnce() -> R) -> R { + use std::sync::Arc; + column_pager::set_global_pager(column_pager::ColumnPager::new(Arc::new(ForceSwap))); + let result = f(); + column_pager::set_global_pager(column_pager::ColumnPager::disabled()); + result + } + + /// Build a chain crossing the mint boundary while every chunk is spilled to swap, then assert + /// `iter()` (the read path behind `updates_before`) pages each chunk back in and roundtrips + /// values, order, and diffs. + #[mz_ore::test] + #[cfg_attr(miri, ignore)] // madvise on the swap backend is unsupported under miri + fn iter_roundtrips_through_swap_backend() { + let count = 200_000_u64; + with_swap_pager(|| { + let mut builder = ChainBuilder::::default(); + for i in 0..count { + let d = i64::try_from(i).expect("fits"); + builder.push_owned(&(d, Timestamp::new(i), Diff::ONE)); + } + let chain = builder.finish(); + assert!(chain.chunks.len() > 1, "expected multiple minted chunks"); + assert_eq!(chain.update_count, usize::try_from(count).expect("fits")); + + let mut expected = 0_u64; + for (d, t, r) in chain.iter() { + assert_eq!(d, i64::try_from(expected).expect("fits")); + assert_eq!(t, Timestamp::new(expected)); + assert_eq!(r, Diff::ONE); + expected += 1; + } + assert_eq!(expected, count); + }); + } + + /// Drive a [`Cursor`] over a spilled, multi-chunk chain to completion (the access pattern + /// merges use). Each step pages the front chunk back in via [`Chunk::column`]; assert the + /// cursor yields every update in order. + #[mz_ore::test] + #[cfg_attr(miri, ignore)] // madvise on the swap backend is unsupported under miri + fn cursor_steps_through_swap_backend() { + let count = 200_000_u64; + with_swap_pager(|| { + let mut builder = ChainBuilder::::default(); + for i in 0..count { + let d = i64::try_from(i).expect("fits"); + builder.push_owned(&(d, Timestamp::new(i), Diff::ONE)); + } + let chain = builder.finish(); + assert!(chain.chunks.len() > 1, "expected multiple minted chunks"); + + let mut rest = chain.into_cursor(); + let mut expected = 0_u64; + while let Some(cursor) = rest.take() { + let (d, t, r) = cursor.get(); + assert_eq!(i64::into_owned(d), i64::try_from(expected).expect("fits")); + assert_eq!(t, Timestamp::new(expected)); + assert_eq!(r, Diff::ONE); + expected += 1; + rest = cursor.step(); + } + assert_eq!(expected, count); + }); + } }