fix: ClickHouse backup drain loop — batch merging and bug fixes#907
fix: ClickHouse backup drain loop — batch merging and bug fixes#907
Conversation
…and bug fixes The backup drain loop previously processed one batch per timer tick, causing very slow recovery after ClickHouse outages. This rewrites the drain to: - Continuously drain all backed-up batches in a single tick cycle - Merge multiple batches into a single ClickHouse commit (up to 65,536 rows) to reduce HTTP round-trips - Preserve original disk source keys across failed retries, preventing duplicate rows in ClickHouse after recovery - Prevent silent data loss when merged retry batches were dropped under memory pressure - Only reset the backoff timer when a drain completes fully, not on each individual successful chunk Also fixes a pre-existing compilation error (strum missing derive feature) and adds unit + integration tests with a Docker-based test script. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
Claude finished @fsodre's task in 2m 56s —— View job Review: PR #907 — ClickHouse backup drain loop fixes
Overall: The core changes (continuous drain, batch merging, source preservation) are well-motivated and the logic is sound. The test coverage is good — regression tests directly target the bugs being fixed. A few issues worth addressing: Issues
|
| let mut merged_rows: Vec<T> = Vec::new(); | ||
| let mut merged_quantities = Quantities::ZERO; | ||
|
|
||
| while merged_rows.len() < self.max_rows_per_commit { |
There was a problem hiding this comment.
Nit: The merge loop can overshoot max_rows_per_commit significantly. If a single batch has 100K rows, the while merged_rows.len() < self.max_rows_per_commit condition passes when merged_rows is empty, so all 100K rows are added in one shot — well beyond the 65,536 limit.
The tests document this as intentional ("the limit governs when to stop gathering, not a hard cap"), but in production this means a single large failed commit bypasses the request-size bound entirely. Consider either:
- Splitting oversized individual batches before merging, or
- Documenting this as a known limitation in the constant's doc comment
| let path = tmp.path().join("test_backup.db"); | ||
| let db = redb::Database::create(&path).unwrap(); | ||
| // Leak the tempdir so it stays alive for the test duration. | ||
| std::mem::forget(tmp); |
There was a problem hiding this comment.
Nit: std::mem::forget(tmp) leaks the temp directory (and its contents) on every test run. Consider storing the TempDir handle alongside the DiskBackup or in a static, so it's cleaned up when the test completes.
| for stream in listener.incoming() { | ||
| let Ok(mut stream) = stream else { break }; | ||
| let mut buf = [0u8; 65536]; | ||
| let _ = stream.read(&mut buf); |
There was a problem hiding this comment.
Minor: A single read() with a fixed 65,536-byte buffer may not consume the full HTTP request if the ClickHouse client sends data across multiple TCP segments (e.g., large merged batches). This could leave data in the socket buffer and cause the mock to misbehave on the next request. Consider reading until \r\n\r\n is found (end of headers) to be more robust, or at least looping reads until no more data is available.
| .last_cached | ||
| .take() | ||
| .filter(|cached| cached.source != BackupSource::Memory); | ||
| .filter(|cached| cached.sources.iter().any(|s| *s != BackupSource::Memory)); |
There was a problem hiding this comment.
Bug (test-only path): This filter runs on every incoming failed commit when use_only_memory_backup is true. Since last_cached holds data already removed from memory_backup (via retrieve_oldest), clearing it here means those rows are permanently lost — not just deferred.
In the production drop_excess path (line 628-631), this same filter is guarded by memory pressure, which is an acceptable trade-off. But here it fires unconditionally, so a failed merged commit waiting for retry gets silently discarded as soon as the next failure arrives.
Consider guarding this the same way, or removing it from the test-only path entirely.
Code reviewFound 1 issue:
rbuilder/crates/rbuilder-utils/src/clickhouse/backup/mod.rs Lines 721 to 732 in 40ce046 🤖 Generated with Claude Code - If this code review was useful, please react with 👍. Otherwise, react with 👎. |
DiskBackup::retrieve_oldest() was read-only — it never deleted the entry from disk. The gather loop called it repeatedly, getting the same disk entry every time, filling merged_rows with duplicates. Replace with take_oldest() that reads + deletes in a single redb write transaction. This makes repeated calls return successive entries. Since disk entries are now consumed on retrieval, purge_disk_backup and the post-commit purge loop are no longer needed and have been removed. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
Claude finished @fsodre's task in 3m 18s —— View job Review: PR #907 — ClickHouse backup drain loop fixes (re-review after sync)
Overall: The second commit ( Issues found
Notes
|
| let writer = self.db.write().expect("not poisoned").begin_write()?; | ||
| let result = { | ||
| let mut table = writer.open_table(table_def)?; | ||
|
|
||
| // Read the oldest entry, extracting owned values before mutating. | ||
| let (key, commit) = { | ||
| let Some(entry_res) = table.iter()?.next() else { | ||
| return Ok(None); |
There was a problem hiding this comment.
Bug: Early return on empty table leaks the write transaction without committing. More importantly, writer.open_table() on a write transaction creates the table if it doesn't exist, so every call to take_oldest on a fresh database creates an empty table that's rolled back on drop — this is wasteful but harmless.
However, the bigger issue: taking a write lock (self.db.write()) just to check if the table is empty blocks all concurrent readers (including save() and get_table_stats()). The old code used a read transaction for the check. Consider either:
- Doing a quick read-check first and only taking the write lock when there's actually data, or
- Accepting the contention since this runs on a single-threaded tick loop (if that's the case, a comment would help).
| let path = tmp.path().join("test_backup.db"); | ||
| let db = redb::Database::create(&path).unwrap(); | ||
| // Leak the tempdir so it stays alive for the test duration. | ||
| std::mem::forget(tmp); |
There was a problem hiding this comment.
Nit (carried over from previous review): std::mem::forget(tmp) leaks the temp directory on every test run. Consider storing the TempDir in the returned struct or using a wrapper, so it's cleaned up when the test completes.
| for stream in listener.incoming() { | ||
| let Ok(mut stream) = stream else { break }; | ||
| let mut buf = [0u8; 65536]; | ||
| let _ = stream.read(&mut buf); |
There was a problem hiding this comment.
Nit (carried over): A single read() with a 64KB buffer may not consume the full HTTP request for large merged batches, potentially causing the mock to see stale data on the next request. Consider reading in a loop until EOF or at least draining the stream.
| .last_cached | ||
| .take() | ||
| .filter(|cached| cached.source != BackupSource::Memory); | ||
| .filter(|cached| cached.sources.iter().any(|s| *s != BackupSource::Memory)); |
There was a problem hiding this comment.
Bug (test-only path, carried over): This block runs on every backup() call in test mode. It unconditionally clears last_cached if all sources are memory-only — but last_cached holds data already removed from the memory deque via retrieve_oldest. Those rows have no other copy, so clearing last_cached here is silent data loss.
The production drop_excess path (line 610-613) has the same filter, but it's guarded by actual memory pressure. Here it fires unconditionally. Either remove the filter entirely from this path, or guard it with the same memory-pressure check.
Summary
derivefeature tostrumdependencyTest plan
./scripts/test-clickhouse-backup-drain.sh(starts Docker ClickHouse, runs all tests, verifies data, cleans up)./scripts/test-clickhouse-backup-drain.sh --basicto skip the outage simulation if Docker stop/start is slow🤖 Generated with Claude Code