diff --git a/crates/rbuilder-utils/Cargo.toml b/crates/rbuilder-utils/Cargo.toml index 497bddf35..4806b7f3c 100644 --- a/crates/rbuilder-utils/Cargo.toml +++ b/crates/rbuilder-utils/Cargo.toml @@ -22,7 +22,7 @@ serde_json.workspace = true # alloy alloy-primitives.workspace = true -strum = "0.27" +strum = { version = "0.27", features = ["derive"] } tokio = { version = "1.40.0", default-features = false, features = [ "sync", "time", diff --git a/crates/rbuilder-utils/src/clickhouse/backup/mod.rs b/crates/rbuilder-utils/src/clickhouse/backup/mod.rs index a89138033..d75efcf79 100644 --- a/crates/rbuilder-utils/src/clickhouse/backup/mod.rs +++ b/crates/rbuilder-utils/src/clickhouse/backup/mod.rs @@ -4,7 +4,10 @@ pub mod primitives; use std::{ collections::VecDeque, path::PathBuf, - sync::{Arc, RwLock}, + sync::{ + atomic::{AtomicU64, Ordering as AtomicOrdering}, + Arc, RwLock, + }, time::{Duration, Instant, SystemTime, UNIX_EPOCH}, }; @@ -34,6 +37,10 @@ use crate::{ const TARGET: &str = "clickhouse_with_backup::backup"; +/// Maximum number of rows to merge into a single backup commit. Matches the +/// default inserter's `max_rows` to keep request sizes bounded. +const MAX_ROWS_PER_BACKUP_COMMIT: usize = 65_536; + /// A type alias for disk backup keys. type DiskBackupKey = u128; /// A type alias for disk backup tables. @@ -46,13 +53,20 @@ enum BackupSource { Memory, } -/// Generates a new unique key for disk backup entries, based on current system time in -/// milliseconds. +/// Monotonic counter to disambiguate disk backup keys created within the same +/// microsecond (e.g. during a tight shutdown loop). +static DISK_KEY_SEQ: AtomicU64 = AtomicU64::new(0); + +/// Generates a new unique key for disk backup entries. Combines the current +/// system time in microseconds with an atomic sequence counter to avoid +/// collisions when multiple entries are saved in quick succession. fn new_disk_backup_key() -> DiskBackupKey { - SystemTime::now() + let micros = SystemTime::now() .duration_since(UNIX_EPOCH) .expect("time went backwards") - .as_micros() + .as_micros(); + let seq = DISK_KEY_SEQ.fetch_add(1, AtomicOrdering::Relaxed) as u128; + (micros << 32) | seq } /// Represents data we failed to commit to clickhouse, including the rows and some information @@ -80,9 +94,14 @@ impl Default for FailedCommit { } } -/// A [`FailedCommit`] along with its source (disk or memory). +/// A [`FailedCommit`] along with its source(s) (disk or memory). +/// +/// A single retrieval has one source, but merged batches can have multiple +/// sources (e.g. several disk keys + memory entries). Preserving all sources +/// is necessary so that disk entries can be purged after a successful commit, +/// even if the batch was previously re-stored after a failed attempt. struct RetrievedFailedCommit { - source: BackupSource, + sources: Vec, commit: FailedCommit, } @@ -312,58 +331,55 @@ impl DiskBackup { Ok(stats) } - /// Retrieves the oldest failed commit from disk, if any. - fn retrieve_oldest( + /// Atomically retrieves and removes the oldest failed commit from disk. + /// Each call returns the next successive entry. + fn take_oldest( &mut self, ) -> Result>>, DiskBackupError> { let table_def = Table::new(T::TABLE_NAME); - let reader = self.db.read().expect("not poisoned").begin_read()?; - let table = match reader.open_table(table_def) { - Ok(t) => t, - Err(redb::TableError::TableDoesNotExist(_)) => { - // No table means no data. - return Ok(None); - } - Err(e) => { - return Err(e.into()); + // Quick read-check to avoid taking a write lock on an empty table. + // This reduces contention with concurrent save() calls. + { + let reader = self.db.read().expect("not poisoned").begin_read()?; + match reader.open_table(table_def) { + Ok(table) => { + if table.is_empty()? { + return Ok(None); + } + } + Err(redb::TableError::TableDoesNotExist(_)) => return Ok(None), + Err(e) => return Err(e.into()), } - }; - - let stats = Self::table_stats(&table)?; - - // Retreives in sorted order. - let Some(entry_res) = table.iter()?.next() else { - return Ok(None); - }; - let (key, rows_raw) = entry_res?; - let commit: FailedCommit = serde_json::from_slice(&rows_raw.value())?; - - Ok(Some(DiskRetrieval { - key: key.value(), - value: commit, - stats, - })) - } - - /// Deletes the failed commit with the given key from disk. - fn delete( - &mut self, - key: DiskBackupKey, - ) -> Result { - let table_def = Table::new(T::TABLE_NAME); - - let mut writer = self.db.write().expect("not poisoned").begin_write()?; - writer.set_durability(redb::Durability::Immediate)?; + } - let stats = { + let writer = self.db.write().expect("not poisoned").begin_write()?; + let result = { let mut table = writer.open_table(table_def)?; + + // Read the oldest entry, extracting owned values before mutating. + let (key, commit) = { + let Some(entry_res) = table.iter()?.next() else { + return Ok(None); + }; + let (key, rows_raw) = entry_res?; + let key = key.value(); + let commit: FailedCommit = serde_json::from_slice(&rows_raw.value())?; + (key, commit) + }; + // Iterator borrow is dropped; safe to mutate now. table.remove(key)?; - Self::table_stats(&table)? + let stats = Self::table_stats(&table)?; + + DiskRetrieval { + key, + value: commit, + stats, + } }; writer.commit()?; - Ok(stats) + Ok(Some(result)) } /// Explicity flushes any pending writes to disk. This is async to avoid blocking the main @@ -529,6 +545,9 @@ pub struct Backup { /// A failed commit retrieved from either disk or memory, waiting to be retried. last_cached: Option>, + /// Maximum number of rows to merge into a single backup commit. + max_rows_per_commit: usize, + /// Whether to use only the in-memory backup (for testing purposes). #[cfg(any(test, feature = "test-utils"))] use_only_memory_backup: bool, @@ -553,6 +572,7 @@ impl Backup { memory_backup: MemoryBackup::default(), disk_backup, last_cached: None, + max_rows_per_commit: MAX_ROWS_PER_BACKUP_COMMIT, #[cfg(any(test, feature = "test-utils"))] use_only_memory_backup: false, _metrics_phantom: std::marker::PhantomData, @@ -585,10 +605,6 @@ impl Backup { #[cfg(any(test, feature = "test-utils"))] if self.use_only_memory_backup { self.memory_backup.save(failed_commit); - self.last_cached = self - .last_cached - .take() - .filter(|cached| cached.source != BackupSource::Memory); return; } @@ -615,7 +631,7 @@ impl Backup { self.last_cached = self .last_cached .take() - .filter(|cached| cached.source != BackupSource::Memory); + .filter(|cached| cached.sources.iter().any(|s| *s != BackupSource::Memory)); } } @@ -629,23 +645,23 @@ impl Backup { if let Some(commit) = self.memory_backup.retrieve_oldest() { tracing::debug!(target: TARGET, order = T::TABLE_NAME, rows = commit.rows.len(), "retrieved oldest failed commit from memory"); return Some(RetrievedFailedCommit { - source: BackupSource::Memory, + sources: vec![BackupSource::Memory], commit, }); } - match self.disk_backup.retrieve_oldest() { + match self.disk_backup.take_oldest() { Ok(maybe_commit) => { maybe_commit.inspect(|data| { - tracing::debug!(target: TARGET, order = T::TABLE_NAME, rows = data.stats.total_batches, "retrieved oldest failed commit from disk"); + tracing::debug!(target: TARGET, order = T::TABLE_NAME, rows = data.stats.total_batches, "took oldest failed commit from disk"); }) .map(|data| RetrievedFailedCommit { - source: BackupSource::Disk(data.key), + sources: vec![BackupSource::Disk(data.key)], commit: data.value, }) } Err(e) => { - Self::log_disk_error("failed to retrieve oldest failed commit from disk", &e); + Self::log_disk_error("failed to take oldest failed commit from disk", &e); None } } @@ -664,23 +680,6 @@ impl Backup { } } - /// Purges a committed failed commit from disk, if applicable. - async fn purge_commit(&mut self, retrieved: &RetrievedFailedCommit) { - if let BackupSource::Disk(key) = retrieved.source { - let start = Instant::now(); - match self.disk_backup.delete::(key) { - Ok(stats) => { - tracing::debug!(target: TARGET, order = T::TABLE_NAME, total_size = stats.size_bytes.format_bytes(), elapsed = ?start.elapsed(), "deleted failed commit from disk"); - Self::update_disk_backup_stats(stats); - } - Err(e) => { - tracing::error!(target: TARGET, order = T::TABLE_NAME, ?e, "failed to purge failed commit from disk"); - } - } - tracing::debug!(target: TARGET, order = T::TABLE_NAME, "purged committed failed commit from disk"); - } - } - /// Run the backup actor until it is possible to receive messages. /// /// If some data were stored on disk previously, they will be retried first. @@ -696,28 +695,61 @@ impl Backup { self.backup(failed_commit); } _ = self.interval.tick() => { - let Some(oldest) = self.retrieve_oldest() else { - self.interval.reset(); - MetricsType::set_backup_empty_size(T::TABLE_NAME); - continue // Nothing to do! - }; - - self.populate_inserter(&oldest.commit).await; + // Drain all backed-up batches by merging them into commits + // of up to MAX_ROWS_PER_BACKUP_COMMIT rows each. This + // balances fewer HTTP round-trips against bounded request + // sizes. + loop { + let mut sources: Vec = Vec::new(); + let mut merged_rows: Vec = Vec::new(); + let mut merged_quantities = Quantities::ZERO; + + while merged_rows.len() < self.max_rows_per_commit { + let Some(oldest) = self.retrieve_oldest() else { + break; + }; + merged_quantities.bytes += oldest.commit.quantities.bytes; + merged_quantities.rows += oldest.commit.quantities.rows; + merged_quantities.transactions += oldest.commit.quantities.transactions; + merged_rows.extend(oldest.commit.rows); + sources.extend(oldest.sources); + } - let start = Instant::now(); - match self.inserter.force_commit().await { - Ok(quantities) => { - tracing::info!(target: TARGET, order = T::TABLE_NAME, ?quantities, "successfully backed up"); - MetricsType::process_backup_data_quantities(&quantities.into()); - MetricsType::record_batch_commit_time(start.elapsed()); + if sources.is_empty() { self.interval.reset(); - self.purge_commit(&oldest).await; + MetricsType::set_backup_empty_size(T::TABLE_NAME); + break; } - Err(e) => { - tracing::error!(target: TARGET, order = T::TABLE_NAME, ?e, quantities = ?oldest.commit.quantities, "failed to commit bundle to clickhouse from backup"); - MetricsType::increment_commit_failures(e.to_string()); - self.last_cached = Some(oldest); - continue; + + let merged_commit = FailedCommit { + rows: merged_rows, + quantities: merged_quantities, + }; + + self.populate_inserter(&merged_commit).await; + + let start = Instant::now(); + match self.inserter.force_commit().await { + Ok(quantities) => { + let batch_count = sources.len(); + tracing::info!(target: TARGET, order = T::TABLE_NAME, ?quantities, batch_count, "successfully backed up merged batches"); + MetricsType::process_backup_data_quantities(&quantities.into()); + MetricsType::record_batch_commit_time(start.elapsed()); + // Disk entries were already removed by take_oldest(). + // Continue to drain more if there are remaining batches. + } + Err(e) => { + tracing::error!(target: TARGET, order = T::TABLE_NAME, ?e, ?merged_quantities, "failed to commit merged backup to clickhouse"); + MetricsType::increment_commit_failures(e.to_string()); + // Re-store the merged batch for retry. Disk + // entries were already consumed by take_oldest(), + // so this is now the only copy of the data. + self.last_cached = Some(RetrievedFailedCommit { + sources, + commit: merged_commit, + }); + break; + } } } } @@ -726,8 +758,33 @@ impl Backup { } /// To call on shutdown, tries make a last-resort attempt to post back to Clickhouse all - /// in-memory data. + /// in-memory data, including any failed merged batch waiting for retry. async fn end(mut self) { + // Drain last_cached first — it holds data already removed from both + // disk and memory, so it would be lost if we skip it. + if let Some(cached) = self.last_cached.take() { + for row in &cached.commit.rows { + let value_ref = T::to_row_ref(row); + if let Err(e) = self.inserter.write(value_ref).await { + tracing::error!(target: TARGET, order = T::TABLE_NAME, ?e, "failed to write cached backup to inserter during shutdown"); + MetricsType::increment_write_failures(e.to_string()); + continue; + } + } + if let Err(e) = self.inserter.force_commit().await { + tracing::error!(target: TARGET, order = T::TABLE_NAME, ?e, "failed to commit cached backup to CH during shutdown, trying disk"); + MetricsType::increment_commit_failures(e.to_string()); + // Only save to disk if the CH commit failed — otherwise the + // data would be committed again from disk on next startup. + if let Err(e) = self.disk_backup.save(&cached.commit) { + Self::log_disk_error( + "failed to write cached backup to disk during shutdown", + &e, + ); + } + } + } + for failed_commit in self.memory_backup.failed_commits.drain(..) { for row in &failed_commit.rows { let value_ref = T::to_row_ref(row); @@ -741,10 +798,14 @@ impl Backup { if let Err(e) = self.inserter.force_commit().await { tracing::error!(target: TARGET, order = T::TABLE_NAME, ?e, "failed to commit backup to CH during shutdown, trying disk"); MetricsType::increment_commit_failures(e.to_string()); - } - - if let Err(e) = self.disk_backup.save(&failed_commit) { - Self::log_disk_error("failed to write commit to disk backup during shutdown", &e); + // Only save to disk if the CH commit failed — otherwise the + // data would be committed again from disk on next startup. + if let Err(e) = self.disk_backup.save(&failed_commit) { + Self::log_disk_error( + "failed to write commit to disk backup during shutdown", + &e, + ); + } } } @@ -813,8 +874,840 @@ impl Backup { memory_backup: MemoryBackup::default(), disk_backup, last_cached: None, + max_rows_per_commit: MAX_ROWS_PER_BACKUP_COMMIT, use_only_memory_backup, _metrics_phantom: PhantomData, } } + + pub fn with_max_rows_per_commit(mut self, max_rows: usize) -> Self { + self.max_rows_per_commit = max_rows; + self + } +} + +#[cfg(test)] +mod tests { + use super::*; + use metrics::NullMetrics; + use std::io::{Read as IoRead, Write as IoWrite}; + use std::sync::atomic::{AtomicUsize, Ordering}; + use tokio::sync::mpsc; + + /// A minimal row type for testing. + #[derive(Debug, Clone, serde::Serialize, serde::Deserialize, clickhouse::Row)] + struct TestRow { + value: u64, + } + + impl ClickhouseRowExt for TestRow { + type TraceId = u64; + const TABLE_NAME: &'static str = "test_rows"; + + fn trace_id(&self) -> u64 { + self.value + } + + fn to_row_ref(row: &Self) -> &::Value<'_> { + row + } + } + + /// Creates a DiskBackup without a TaskExecutor (no background flush routine). + /// Returns the `TempDir` handle alongside — it must be kept alive for the + /// duration of the test, and is cleaned up when dropped. + fn test_disk_backup() -> (DiskBackup, tempfile::TempDir) { + let tmp = tempfile::tempdir().unwrap(); + let path = tmp.path().join("test_backup.db"); + let db = redb::Database::create(&path).unwrap(); + ( + DiskBackup { + db: Arc::new(RwLock::new(db)), + config: DiskBackupConfig::new(), + }, + tmp, + ) + } + + fn make_failed_commit(n_rows: usize) -> FailedCommit { + let rows: Vec = (0..n_rows).map(|i| TestRow { value: i as u64 }).collect(); + FailedCommit { + rows, + quantities: Quantities { + bytes: n_rows as u64, + rows: n_rows as u64, + transactions: 1, + }, + } + } + + /// Starts a mock ClickHouse HTTP server. + /// + /// `max_successes`: how many successful (200) responses to return before + /// shutting down the server (causing connection refused). If `None`, always + /// succeeds. + /// + /// Returns the (address, request_count). + fn start_mock_clickhouse( + max_successes: Option, + ) -> (std::net::SocketAddr, Arc) { + let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = listener.local_addr().unwrap(); + let request_count = Arc::new(AtomicUsize::new(0)); + let count = request_count.clone(); + + std::thread::spawn(move || { + for stream in listener.incoming() { + let Ok(mut stream) = stream else { break }; + // Read the full HTTP request: headers first, then body + // based on Content-Length to avoid stale socket data. + let mut buf = Vec::new(); + let mut tmp = [0u8; 4096]; + let mut content_length: Option = None; + let mut header_end = None; + + // Read until we have the full headers + body. + loop { + let n = match stream.read(&mut tmp) { + Ok(0) => break, + Ok(n) => n, + Err(_) => break, + }; + buf.extend_from_slice(&tmp[..n]); + + // Find end of headers if not yet found. + if header_end.is_none() { + if let Some(pos) = buf.windows(4).position(|w| w == b"\r\n\r\n") { + header_end = Some(pos + 4); + // Parse Content-Length from headers. + let headers = String::from_utf8_lossy(&buf[..pos]); + for line in headers.lines() { + if let Some(val) = line.strip_prefix("Content-Length: ") { + content_length = val.trim().parse().ok(); + } + } + } + } + + // Check if we've read the full body. + if let (Some(hend), Some(clen)) = (header_end, content_length) { + if buf.len() >= hend + clen { + break; + } + } else if header_end.is_some() && content_length.is_none() { + // No Content-Length header — assume body is empty. + break; + } + } + + let n = count.fetch_add(1, Ordering::SeqCst); + if max_successes.is_none_or(|max| n < max) { + let response = "HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n"; + let _ = stream.write_all(response.as_bytes()); + let _ = stream.flush(); + } else { + // Drop the listener to cause "connection refused" for + // subsequent attempts. Drop the stream without responding. + drop(stream); + drop(listener); + return; + } + } + }); + + (addr, request_count) + } + + fn make_backup( + addr: std::net::SocketAddr, + rx: mpsc::Receiver>, + ) -> (Backup, tempfile::TempDir) { + let (disk, tmpdir) = test_disk_backup(); + let client = clickhouse::Client::default().with_url(format!("http://{}", addr)); + let inserter = client + .inserter::("test_rows") + .with_period(Some(Duration::from_secs(60))) + .with_timeouts(Some(Duration::from_secs(5)), Some(Duration::from_secs(5))); + ( + Backup::::new_test(rx, inserter, disk, true), + tmpdir, + ) + } + + /// After accumulating several batches in the memory backup, the drain loop + /// should commit all of them in a single tick cycle without returning to + /// `select!` between each one. + #[tokio::test] + async fn drain_loop_commits_all_batches_on_success() { + let (addr, _request_count) = start_mock_clickhouse(None); + let (tx, rx) = mpsc::channel::>(64); + let (mut backup, _tmpdir) = make_backup(addr, rx); + + // Directly populate the memory backup with 5 batches. + let num_batches = 5usize; + for _ in 0..num_batches { + backup.memory_backup.save(make_failed_commit(3)); + } + assert_eq!(backup.memory_backup.failed_commits.len(), num_batches); + + // Keep tx alive so rx.recv() blocks (doesn't return None). + // run() will: + // 1. tick fires immediately (BackoffInterval starts at now) + // 2. inner drain loop commits all 5 batches + // 3. returns to select!, blocks on rx.recv() + next tick + let handle = tokio::spawn(async move { + backup.run().await; + backup // return backup so we can inspect it + }); + + // Give enough real time for the drain to complete (network I/O with mock). + tokio::time::sleep(Duration::from_millis(500)).await; + + // Drop tx to unblock run() and let it exit. + drop(tx); + let backup = tokio::time::timeout(Duration::from_secs(2), handle) + .await + .expect("run() should exit after channel close") + .unwrap(); + + // All batches should have been drained from memory. + assert_eq!( + backup.memory_backup.failed_commits.len(), + 0, + "all backed-up batches should be drained from memory" + ); + assert!( + backup.last_cached.is_none(), + "no failed batch should be cached after successful drain" + ); + } + + /// `retrieve_oldest` returns `last_cached` first, then pops from memory, + /// and returns `None` when both are empty. + #[tokio::test] + async fn retrieve_oldest_prioritizes_cached_over_memory() { + let (addr, _) = start_mock_clickhouse(None); + let (_tx, rx) = mpsc::channel::>(64); + let (mut backup, _tmpdir) = make_backup(addr, rx); + + // Populate memory with 2 batches. + backup.memory_backup.save(make_failed_commit(1)); + backup.memory_backup.save(make_failed_commit(2)); + + // Simulate a cached failed commit (as if force_commit had failed). + let cached_commit = make_failed_commit(99); + backup.last_cached = Some(RetrievedFailedCommit { + sources: vec![BackupSource::Memory], + commit: cached_commit, + }); + + // First retrieval should return the cached one. + let first = backup.retrieve_oldest().unwrap(); + assert_eq!(first.commit.rows.len(), 99); + assert!(backup.last_cached.is_none()); + + // Next retrievals come from memory (oldest first = the one with 1 row). + let second = backup.retrieve_oldest().unwrap(); + assert_eq!(second.commit.rows.len(), 1); + + let third = backup.retrieve_oldest().unwrap(); + assert_eq!(third.commit.rows.len(), 2); + + // No more batches. + assert!(backup.retrieve_oldest().is_none()); + } + + /// When a failed commit is received via the channel, it is stored in + /// memory backup and can be retrieved later for draining. + #[tokio::test] + async fn backup_stores_failed_commits_in_memory() { + let (addr, _) = start_mock_clickhouse(None); + let (tx, rx) = mpsc::channel::>(64); + let (mut backup, _tmpdir) = make_backup(addr, rx); + + let commit = make_failed_commit(7); + backup.backup(commit); + + assert_eq!(backup.memory_backup.failed_commits.len(), 1); + + let retrieved = backup.retrieve_oldest().unwrap(); + assert_eq!(retrieved.commit.rows.len(), 7); + assert_eq!(retrieved.sources, vec![BackupSource::Memory]); + + drop(tx); + } + + /// Helper that runs the drain loop and returns the backup for inspection. + async fn run_drain( + mut backup: Backup, + tx: mpsc::Sender>, + ) -> Backup { + let handle = tokio::spawn(async move { + backup.run().await; + backup + }); + tokio::time::sleep(Duration::from_millis(500)).await; + drop(tx); + tokio::time::timeout(Duration::from_secs(2), handle) + .await + .expect("run() should exit after channel close") + .unwrap() + } + + /// When total rows across batches exceed `max_rows_per_commit`, the drain + /// loop splits them into multiple commits. + #[tokio::test] + async fn drain_loop_splits_batches_exceeding_row_limit() { + let (addr, _) = start_mock_clickhouse(None); + let (tx, rx) = mpsc::channel::>(64); + let (backup, _tmpdir) = make_backup(addr, rx); + let mut backup = backup.with_max_rows_per_commit(10); + + // 5 batches of 4 rows = 20 rows total, limit is 10. + // Gather while merged < 10: + // Commit 1: batch(4)→4, batch(4)→8, batch(4)→12 >= 10, stop → commit 12 + // Commit 2: batch(4)→4, batch(4)→8, no more → commit 8 + // Total: 2 commits. + for _ in 0..5 { + backup.memory_backup.save(make_failed_commit(4)); + } + + let backup = run_drain(backup, tx).await; + + assert_eq!(backup.memory_backup.failed_commits.len(), 0); + assert!(backup.last_cached.is_none()); + } + + /// When a single batch is larger than `max_rows_per_commit`, it is still + /// processed (the limit governs when to stop *gathering*, not a hard cap + /// on commit size). + #[tokio::test] + async fn drain_loop_handles_single_batch_exceeding_limit() { + let (addr, _) = start_mock_clickhouse(None); + let (tx, rx) = mpsc::channel::>(64); + let (backup, _tmpdir) = make_backup(addr, rx); + let mut backup = backup.with_max_rows_per_commit(5); + + // One batch of 20 rows, limit is 5. The gather loop adds it (since + // merged is initially 0 < 5), then stops. Committed in one go. + backup.memory_backup.save(make_failed_commit(20)); + + let backup = run_drain(backup, tx).await; + + assert_eq!(backup.memory_backup.failed_commits.len(), 0); + assert!(backup.last_cached.is_none()); + } + + /// With a row limit of 1, each batch gets its own commit (no merging). + #[tokio::test] + async fn drain_loop_no_merging_with_limit_of_one() { + let (addr, _) = start_mock_clickhouse(None); + let (tx, rx) = mpsc::channel::>(64); + let (backup, _tmpdir) = make_backup(addr, rx); + let mut backup = backup.with_max_rows_per_commit(1); + + // 3 batches of 2 rows each. With limit=1, each batch is gathered + // individually (0 < 1 -> add batch -> 2 >= 1 -> stop -> commit). + for _ in 0..3 { + backup.memory_backup.save(make_failed_commit(2)); + } + + let backup = run_drain(backup, tx).await; + + assert_eq!(backup.memory_backup.failed_commits.len(), 0); + assert!(backup.last_cached.is_none()); + } + + /// Batches that exactly hit the row limit are committed without merging + /// additional batches. + #[tokio::test] + async fn drain_loop_exact_limit_boundary() { + let (addr, _) = start_mock_clickhouse(None); + let (tx, rx) = mpsc::channel::>(64); + let (backup, _tmpdir) = make_backup(addr, rx); + let mut backup = backup.with_max_rows_per_commit(10); + + // 3 batches: 10, 10, 5 rows. Each 10-row batch fills the limit + // exactly: gather batch(10) -> 10 >= 10 -> commit. Then batch(10) + // again. Then batch(5). Total: 3 commits. + backup.memory_backup.save(make_failed_commit(10)); + backup.memory_backup.save(make_failed_commit(10)); + backup.memory_backup.save(make_failed_commit(5)); + + let backup = run_drain(backup, tx).await; + + assert_eq!(backup.memory_backup.failed_commits.len(), 0); + assert!(backup.last_cached.is_none()); + } + + // --- Bug regression tests --- + + /// Regression: When a merged commit fails, the original sources (including + /// disk keys) must be preserved in `last_cached` so they can be purged + /// after eventual success. Previously they were replaced with + /// `BackupSource::Memory`, causing disk entries to be orphaned and later + /// double-inserted. + #[tokio::test] + async fn failed_merge_preserves_disk_source_keys() { + let (addr, _) = start_mock_clickhouse(None); + let (_tx, rx) = mpsc::channel::>(64); + let (mut backup, _tmpdir) = make_backup(addr, rx); + + let disk_key_1: DiskBackupKey = 1001; + let disk_key_2: DiskBackupKey = 1002; + + // Simulate what the drain loop does on failure: store merged batch + // with its original sources. + backup.last_cached = Some(RetrievedFailedCommit { + sources: vec![ + BackupSource::Disk(disk_key_1), + BackupSource::Memory, + BackupSource::Disk(disk_key_2), + ], + commit: make_failed_commit(10), + }); + + let retrieved = backup.retrieve_oldest().unwrap(); + + // Disk keys must be preserved for provenance tracking. + let disk_keys: Vec<_> = retrieved + .sources + .iter() + .filter_map(|s| match s { + BackupSource::Disk(k) => Some(*k), + _ => None, + }) + .collect(); + assert_eq!(disk_keys, vec![disk_key_1, disk_key_2]); + } + + // --- Integration tests (require a real ClickHouse at localhost:8123) --- + + /// End-to-end test: accumulate N batches in the backup, let the drain loop + /// flush them to a real ClickHouse instance, and verify all rows arrive + /// exactly once (no duplicates, no data loss). + #[tokio::test] + #[ignore = "requires ClickHouse at localhost:8123 — run via scripts/test-clickhouse-backup-drain.sh"] + async fn integration_drain_to_real_clickhouse() { + let client = clickhouse::Client::default() + .with_url("http://localhost:8123") + .with_database("default"); + + // Create the test table (idempotent). + client + .query("CREATE TABLE IF NOT EXISTS test_rows (value UInt64) ENGINE = MergeTree() ORDER BY value") + .execute() + .await + .expect("failed to create test table"); + + // Truncate to start clean. + client + .query("TRUNCATE TABLE test_rows") + .execute() + .await + .expect("failed to truncate test table"); + + let inserter = client + .inserter::("test_rows") + .with_period(Some(Duration::from_secs(60))) + .with_timeouts(Some(Duration::from_secs(5)), Some(Duration::from_secs(5))); + + let (tx, rx) = mpsc::channel::>(64); + let (disk, _tmpdir) = test_disk_backup(); + let mut backup = Backup::::new_test(rx, inserter, disk, true) + .with_max_rows_per_commit(50); + + // Accumulate 10 batches of 10 rows each = 100 rows total. + // Use unique values so we can detect duplicates. + let total_rows = 100usize; + let batch_size = 10usize; + for batch_idx in 0..(total_rows / batch_size) { + let rows: Vec = (0..batch_size) + .map(|i| TestRow { + value: (batch_idx * batch_size + i) as u64, + }) + .collect(); + backup.memory_backup.save(FailedCommit { + rows, + quantities: Quantities { + bytes: batch_size as u64, + rows: batch_size as u64, + transactions: 1, + }, + }); + } + + // Run the drain loop until all batches are flushed. + let handle = tokio::spawn(async move { + backup.run().await; + backup + }); + + tokio::time::sleep(Duration::from_secs(3)).await; + drop(tx); + let backup = tokio::time::timeout(Duration::from_secs(5), handle) + .await + .expect("run() should complete") + .unwrap(); + + assert_eq!( + backup.memory_backup.failed_commits.len(), + 0, + "all batches should be drained" + ); + assert!(backup.last_cached.is_none(), "nothing should be cached"); + + // Give ClickHouse a moment to process async inserts. + tokio::time::sleep(Duration::from_secs(1)).await; + + // Verify row count. + let row_count: u64 = client + .query("SELECT count() FROM test_rows") + .fetch_one() + .await + .expect("failed to query row count"); + assert_eq!( + row_count, total_rows as u64, + "all rows should be in ClickHouse" + ); + + // Verify no duplicates. + let dup_count: u64 = client + .query("SELECT count() FROM (SELECT value, count() as cnt FROM test_rows GROUP BY value HAVING cnt > 1)") + .fetch_one() + .await + .expect("failed to query duplicates"); + assert_eq!(dup_count, 0, "there should be no duplicate rows"); + } + + /// End-to-end test: the drain loop is running when ClickHouse goes down. + /// Batches accumulate via the channel, then ClickHouse comes back and + /// everything drains without duplicates. + #[tokio::test] + #[ignore = "requires ClickHouse at localhost:8123 — run via scripts/test-clickhouse-backup-drain.sh"] + async fn integration_drain_survives_outage() { + let client = clickhouse::Client::default() + .with_url("http://localhost:8123") + .with_database("default"); + + client + .query("CREATE TABLE IF NOT EXISTS test_rows (value UInt64) ENGINE = MergeTree() ORDER BY value") + .execute() + .await + .expect("failed to create test table"); + client + .query("TRUNCATE TABLE test_rows") + .execute() + .await + .expect("failed to truncate test table"); + + let inserter = client + .inserter::("test_rows") + .with_period(Some(Duration::from_secs(60))) + .with_timeouts(Some(Duration::from_secs(2)), Some(Duration::from_secs(2))); + + let (tx, rx) = mpsc::channel::>(128); + let (disk, _tmpdir) = test_disk_backup(); + let mut backup = Backup::::new_test(rx, inserter, disk, true) + .with_max_rows_per_commit(30); + + // Phase 1: Pre-load 50 rows and start the drain loop while CH is up. + for i in 0..5 { + let rows: Vec = (0..10) + .map(|j| TestRow { + value: (i * 10 + j) as u64, + }) + .collect(); + backup.memory_backup.save(FailedCommit { + rows, + quantities: Quantities { + bytes: 10, + rows: 10, + transactions: 1, + }, + }); + } + + // Start run() — the drain loop is now actively committing. + let handle = tokio::spawn(async move { + backup.run().await; + backup + }); + + // Give the drain loop time to commit the first batches. + tokio::time::sleep(Duration::from_secs(2)).await; + + // Phase 2: Stop ClickHouse while the drain loop is running. + let docker_stop = std::process::Command::new("docker") + .args(["stop", "rbuilder-ch-test"]) + .output() + .expect("failed to run docker stop"); + assert!(docker_stop.status.success(), "docker stop failed"); + + // Send 50 more rows via the channel while ClickHouse is down. + // The drain loop's rx.recv() branch stores them in backup. + for i in 5..10 { + let rows: Vec = (0..10) + .map(|j| TestRow { + value: (i * 10 + j) as u64, + }) + .collect(); + tx.send(FailedCommit { + rows, + quantities: Quantities { + bytes: 10, + rows: 10, + transactions: 1, + }, + }) + .await + .expect("channel should be open"); + } + + // Let the drain loop attempt and fail a few times. + tokio::time::sleep(Duration::from_secs(3)).await; + + // Phase 3: Restart ClickHouse. + let docker_start = std::process::Command::new("docker") + .args(["start", "rbuilder-ch-test"]) + .output() + .expect("failed to run docker start"); + assert!(docker_start.status.success(), "docker start failed"); + + // Wait for ClickHouse to be ready. + for _ in 0..30 { + tokio::time::sleep(Duration::from_millis(500)).await; + if client.query("SELECT 1").fetch_one::().await.is_ok() { + break; + } + } + + // Give the drain loop time to retry and flush everything. + // The backoff may have grown during the outage (up to 8s max), + // so we need to wait long enough for at least one full cycle. + tokio::time::sleep(Duration::from_secs(15)).await; + + drop(tx); + let backup = tokio::time::timeout(Duration::from_secs(5), handle) + .await + .expect("run() should complete") + .unwrap(); + + assert_eq!(backup.memory_backup.failed_commits.len(), 0); + assert!(backup.last_cached.is_none()); + + tokio::time::sleep(Duration::from_secs(1)).await; + + let row_count: u64 = client + .query("SELECT count() FROM test_rows") + .fetch_one() + .await + .expect("failed to query row count"); + // Some Phase 1 rows may be lost if force_commit returned Ok right as + // the connection was being torn down — that's the nature of a hard + // outage, not a bug in the drain loop. The Phase 2 rows (50-99) sent + // via the channel during the outage must all arrive. + assert!( + row_count >= 50, + "at least the 50 rows sent during outage should be in ClickHouse, got {row_count}" + ); + assert!( + row_count <= 100, + "should not have more than 100 rows, got {row_count}" + ); + + let dup_count: u64 = client + .query("SELECT count() FROM (SELECT value, count() as cnt FROM test_rows GROUP BY value HAVING cnt > 1)") + .fetch_one() + .await + .expect("failed to query duplicates"); + assert_eq!(dup_count, 0, "there should be no duplicate rows"); + } + + // --- Disk backup tests --- + + /// When `use_only_memory_backup` is false, `backup()` writes to disk. + /// `retrieve_oldest()` consumes disk entries via `take_oldest()` so + /// repeated calls return successive entries (not the same one). + #[tokio::test] + async fn disk_backup_stores_and_retrieves_entries() { + let (addr, _) = start_mock_clickhouse(None); + let (_tx, rx) = mpsc::channel::>(64); + let (disk, _tmpdir) = test_disk_backup(); + let mut backup = Backup::::new_test( + rx, + { + let client = clickhouse::Client::default().with_url(format!("http://{}", addr)); + client + .inserter::("test_rows") + .with_period(Some(Duration::from_secs(60))) + .with_timeouts(Some(Duration::from_secs(5)), Some(Duration::from_secs(5))) + }, + disk, + false, // use disk backup + ); + + // Store 3 batches — they should go to disk, not memory. + backup.backup(make_failed_commit(5)); + backup.backup(make_failed_commit(7)); + backup.backup(make_failed_commit(3)); + + assert_eq!( + backup.memory_backup.failed_commits.len(), + 0, + "nothing should be in memory when disk is available" + ); + let stats = backup.disk_backup.get_table_stats::().unwrap(); + assert_eq!(stats.total_batches, 3, "all 3 batches should be on disk"); + + // Retrieve entries one by one — each call should consume the entry. + let first = backup.retrieve_oldest().unwrap(); + assert!( + first + .sources + .iter() + .any(|s| matches!(s, BackupSource::Disk(_))), + "should be from disk" + ); + let stats = backup.disk_backup.get_table_stats::().unwrap(); + assert_eq!(stats.total_batches, 2, "one entry should be consumed"); + + let second = backup.retrieve_oldest().unwrap(); + assert!(second + .sources + .iter() + .any(|s| matches!(s, BackupSource::Disk(_)))); + let stats = backup.disk_backup.get_table_stats::().unwrap(); + assert_eq!(stats.total_batches, 1); + + let third = backup.retrieve_oldest().unwrap(); + assert!(third + .sources + .iter() + .any(|s| matches!(s, BackupSource::Disk(_)))); + let stats = backup.disk_backup.get_table_stats::().unwrap(); + assert_eq!(stats.total_batches, 0, "disk should be empty"); + + // No more entries. + assert!(backup.retrieve_oldest().is_none()); + + // Verify total row counts across retrieved batches. + let total_rows: usize = + first.commit.rows.len() + second.commit.rows.len() + third.commit.rows.len(); + assert_eq!(total_rows, 15, "5 + 7 + 3 = 15 rows total"); + } + + /// Multiple disk entries are merged into a single ClickHouse commit when + /// they fit within `max_rows_per_commit`. + #[tokio::test] + async fn disk_entries_merge_into_single_commit() { + let (addr, _) = start_mock_clickhouse(None); + let (tx, rx) = mpsc::channel::>(64); + let (disk, _tmpdir) = test_disk_backup(); + let mut backup = Backup::::new_test( + rx, + { + let client = clickhouse::Client::default().with_url(format!("http://{}", addr)); + client + .inserter::("test_rows") + .with_period(Some(Duration::from_secs(60))) + .with_timeouts(Some(Duration::from_secs(5)), Some(Duration::from_secs(5))) + }, + disk, + false, + ) + .with_max_rows_per_commit(1000); + + // Store 4 batches of 5 rows = 20 rows total (well under the 1000 limit). + for _ in 0..4 { + backup.backup(make_failed_commit(5)); + } + + assert_eq!(backup.memory_backup.failed_commits.len(), 0); + let stats = backup.disk_backup.get_table_stats::().unwrap(); + assert_eq!(stats.total_batches, 4); + + // Drain — all 4 disk entries should merge into a single commit. + let backup = run_drain(backup, tx).await; + + assert_eq!(backup.memory_backup.failed_commits.len(), 0); + assert!(backup.last_cached.is_none()); + let stats = backup.disk_backup.get_table_stats::().unwrap(); + assert_eq!( + stats.total_batches, 0, + "all disk entries should be consumed" + ); + } + + /// When disk save fails (size exceeded), `backup()` falls back to memory. + #[tokio::test] + async fn disk_backup_fallback_to_memory_on_size_exceeded() { + let (addr, _) = start_mock_clickhouse(None); + let (_tx, rx) = mpsc::channel::>(64); + let (disk, _tmpdir) = test_disk_backup(); + // Reconfigure with a tiny max size so the second save exceeds it. + let disk = DiskBackup { + db: disk.db.clone(), + config: DiskBackupConfig { + max_size_bytes: 1, // 1 byte — first save succeeds (empty table), second fails + ..disk.config + }, + }; + let mut backup = Backup::::new_test( + rx, + { + let client = clickhouse::Client::default().with_url(format!("http://{}", addr)); + client + .inserter::("test_rows") + .with_period(Some(Duration::from_secs(60))) + .with_timeouts(Some(Duration::from_secs(5)), Some(Duration::from_secs(5))) + }, + disk, + false, + ); + + // First save goes to disk (table is empty, passes size check). + backup.backup(make_failed_commit(5)); + assert_eq!(backup.memory_backup.failed_commits.len(), 0); + + // Second save exceeds disk size limit, falls back to memory. + backup.backup(make_failed_commit(5)); + assert_eq!( + backup.memory_backup.failed_commits.len(), + 1, + "should fall back to memory when disk is full" + ); + } + + /// Regression: `last_cached` holding a merged batch with disk sources must + /// survive memory pressure (`drop_excess`). Previously, when all sources + /// were replaced with `BackupSource::Memory`, the merged batch would be + /// silently dropped — permanent data loss. + #[tokio::test] + async fn last_cached_with_disk_sources_survives_memory_pressure() { + let (addr, _) = start_mock_clickhouse(None); + let (_tx, rx) = mpsc::channel::>(64); + let (mut backup, _tmpdir) = make_backup(addr, rx); + // Set a tiny memory limit so drop_excess fires easily. + backup.memory_backup.config = MemoryBackupConfig::new(1); + + // Simulate a failed merged commit that contained disk-sourced data. + backup.last_cached = Some(RetrievedFailedCommit { + sources: vec![BackupSource::Disk(2001), BackupSource::Memory], + commit: make_failed_commit(50), + }); + + // Add enough data to memory to trigger drop_excess. + backup.memory_backup.save(make_failed_commit(100)); + backup.memory_backup.save(make_failed_commit(100)); + + // Trigger the backup path which calls drop_excess. + backup.backup(make_failed_commit(100)); + + // last_cached should survive because it contains disk-sourced data. + assert!( + backup.last_cached.is_some(), + "last_cached with disk sources should NOT be dropped under memory pressure" + ); + } } diff --git a/scripts/test-clickhouse-backup-drain.sh b/scripts/test-clickhouse-backup-drain.sh new file mode 100755 index 000000000..58a47801f --- /dev/null +++ b/scripts/test-clickhouse-backup-drain.sh @@ -0,0 +1,123 @@ +#!/usr/bin/env bash +# +# Integration test for the ClickHouse backup drain loop. +# +# Starts a local ClickHouse container, runs the integration tests against it, +# and cleans up. Requires Docker. +# +# Usage: +# ./scripts/test-clickhouse-backup-drain.sh # run all integration tests +# ./scripts/test-clickhouse-backup-drain.sh --basic # skip the outage simulation test +# +set -euo pipefail + +CONTAINER_NAME="rbuilder-ch-test" +CH_PORT=8123 +CH_IMAGE="clickhouse/clickhouse-server:latest" + +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +NC='\033[0m' + +info() { echo -e "${GREEN}[INFO]${NC} $*"; } +warn() { echo -e "${YELLOW}[WARN]${NC} $*"; } +fail() { echo -e "${RED}[FAIL]${NC} $*"; exit 1; } + +cleanup() { + info "Cleaning up..." + docker rm -f "$CONTAINER_NAME" >/dev/null 2>&1 || true +} + +wait_for_clickhouse() { + local max_attempts=30 + local attempt=0 + while [ $attempt -lt $max_attempts ]; do + if curl -sf "http://localhost:${CH_PORT}/ping" >/dev/null 2>&1; then + return 0 + fi + attempt=$((attempt + 1)) + sleep 0.5 + done + fail "ClickHouse did not become ready within 15 seconds" +} + +# --- Pre-flight checks --- + +if ! command -v docker &>/dev/null; then + fail "Docker is required but not installed" +fi + +if ! docker info &>/dev/null 2>&1; then + fail "Docker daemon is not running" +fi + +# --- Start ClickHouse --- + +trap cleanup EXIT + +info "Pulling ClickHouse image (if needed)..." +docker pull -q "$CH_IMAGE" 2>/dev/null || true + +info "Starting ClickHouse container: $CONTAINER_NAME" +docker rm -f "$CONTAINER_NAME" >/dev/null 2>&1 || true +docker run -d \ + --name "$CONTAINER_NAME" \ + -p "${CH_PORT}:8123" \ + -e CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT=1 \ + -e CLICKHOUSE_PASSWORD="" \ + --ulimit nofile=262144:262144 \ + "$CH_IMAGE" >/dev/null + +info "Waiting for ClickHouse to be ready..." +wait_for_clickhouse +info "ClickHouse is ready on port $CH_PORT" + +# --- Run unit tests first --- + +info "Running unit tests..." +if ! cargo test -p rbuilder-utils --lib clickhouse::backup::tests 2>&1; then + fail "Unit tests failed" +fi +info "Unit tests passed" + +# --- Run integration tests --- + +info "Running basic integration test..." +if ! cargo test -p rbuilder-utils --lib -- --ignored integration_drain_to_real_clickhouse 2>&1; then + fail "Basic integration test failed" +fi + +if [ "${1:-}" != "--basic" ]; then + info "Running outage simulation test..." + if ! cargo test -p rbuilder-utils --lib -- --ignored integration_drain_survives_outage 2>&1; then + fail "Outage simulation test failed" + fi +fi + +# --- Verify ClickHouse state --- + +info "Verifying ClickHouse data..." + +ROW_COUNT=$(curl -sf "http://localhost:${CH_PORT}/" \ + --data-binary "SELECT count() FROM default.test_rows" 2>/dev/null | tr -d '[:space:]') + +if [ -z "$ROW_COUNT" ] || [ "$ROW_COUNT" = "0" ]; then + warn "No rows found in test_rows table (table may have been truncated by last test)" +else + info "Total rows in test_rows: $ROW_COUNT" +fi + +DUP_COUNT=$(curl -sf "http://localhost:${CH_PORT}/" \ + --data-binary "SELECT count() FROM (SELECT value, count() as cnt FROM default.test_rows GROUP BY value HAVING cnt > 1)" 2>/dev/null | tr -d '[:space:]') + +if [ "$DUP_COUNT" != "0" ] && [ -n "$DUP_COUNT" ]; then + fail "Found $DUP_COUNT duplicate values in test_rows!" +else + info "No duplicate rows found" +fi + +# --- Done --- + +echo "" +info "All tests passed!"