Skip to content

fix: ClickHouse backup drain loop — batch merging and bug fixes#907

Open
fsodre wants to merge 2 commits intodevelopfrom
fix/clickhouse-backup-drain-loop
Open

fix: ClickHouse backup drain loop — batch merging and bug fixes#907
fsodre wants to merge 2 commits intodevelopfrom
fix/clickhouse-backup-drain-loop

Conversation

@fsodre
Copy link

@fsodre fsodre commented Mar 23, 2026

Summary

  • Continuous drain: The backup drain loop now processes all backed-up batches in a single tick cycle instead of one batch per tick, fixing slow recovery after ClickHouse outages observed in production
  • Batch merging: Multiple small batches are merged into a single ClickHouse commit (up to 65,536 rows), reducing HTTP round-trips from N to ~N/batch_size
  • Disk key preservation: Fixed a bug where disk source keys were lost when a merged commit failed and was re-stored for retry, causing duplicate rows in ClickHouse after recovery
  • Memory pressure safety: Fixed a bug where merged retry batches (containing disk-sourced data) could be silently dropped under memory pressure, causing permanent data loss
  • Backoff correctness: The backoff timer is now only reset when a drain completes fully, not on each individual successful chunk within a drain cycle
  • Compilation fix: Added missing derive feature to strum dependency

Test plan

  • 9 unit tests covering drain behavior, batch merging, row limits, and bug regressions
  • 2 integration tests against real ClickHouse (Docker) verifying row counts and zero duplicates
  • Outage simulation test: stop ClickHouse mid-drain, accumulate batches, restart, verify full recovery with no duplicates
  • Run ./scripts/test-clickhouse-backup-drain.sh (starts Docker ClickHouse, runs all tests, verifies data, cleans up)
  • Run ./scripts/test-clickhouse-backup-drain.sh --basic to skip the outage simulation if Docker stop/start is slow

🤖 Generated with Claude Code

…and bug fixes

The backup drain loop previously processed one batch per timer tick, causing
very slow recovery after ClickHouse outages. This rewrites the drain to:

- Continuously drain all backed-up batches in a single tick cycle
- Merge multiple batches into a single ClickHouse commit (up to 65,536 rows)
  to reduce HTTP round-trips
- Preserve original disk source keys across failed retries, preventing
  duplicate rows in ClickHouse after recovery
- Prevent silent data loss when merged retry batches were dropped under
  memory pressure
- Only reset the backoff timer when a drain completes fully, not on each
  individual successful chunk

Also fixes a pre-existing compilation error (strum missing derive feature)
and adds unit + integration tests with a Docker-based test script.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@fsodre fsodre requested review from ZanCorDX and dvush as code owners March 23, 2026 11:19
@claude
Copy link
Contributor

claude bot commented Mar 23, 2026

Claude finished @fsodre's task in 2m 56s —— View job


Review: PR #907 — ClickHouse backup drain loop fixes

  • Read repo guidelines
  • Read changed files and PR diff
  • Analyze code for bugs, edge cases, security, and performance
  • Post review with inline comments

Overall: The core changes (continuous drain, batch merging, source preservation) are well-motivated and the logic is sound. The test coverage is good — regression tests directly target the bugs being fixed. A few issues worth addressing:

Issues

  1. last_cached clearing in test-only path (line 601-604) — The use_only_memory_backup branch unconditionally clears last_cached on every backup() call if it's memory-only. Since last_cached holds data already removed from the memory deque, this is silent data loss on the retry path. See inline comment.

  2. Merge loop can overshoot max_rows_per_commit — A single batch larger than the limit (e.g., 100K rows) bypasses the 65,536 bound entirely because the while condition passes when merged_rows is empty. This is documented in tests as intentional, but defeats the purpose of the limit for request-size bounding. See inline comment.

  3. std::mem::forget(tmp) leaks temp directories — Each test run leaks a temp dir on disk. See inline comment.

  4. Mock server partial reads — The fixed 65KB buffer with a single read() may not consume the full request for large merged batches. See inline comment.

  5. new_disk_backup_key() collision risk (pre-existing) — Uses SystemTime::now().as_micros() as the key. Two saves within the same microsecond produce the same key, with the second silently overwriting the first. Now more relevant since batch merging increases throughput. Consider adding an AtomicU128 counter or appending a random suffix.

let mut merged_rows: Vec<T> = Vec::new();
let mut merged_quantities = Quantities::ZERO;

while merged_rows.len() < self.max_rows_per_commit {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: The merge loop can overshoot max_rows_per_commit significantly. If a single batch has 100K rows, the while merged_rows.len() < self.max_rows_per_commit condition passes when merged_rows is empty, so all 100K rows are added in one shot — well beyond the 65,536 limit.

The tests document this as intentional ("the limit governs when to stop gathering, not a hard cap"), but in production this means a single large failed commit bypasses the request-size bound entirely. Consider either:

  • Splitting oversized individual batches before merging, or
  • Documenting this as a known limitation in the constant's doc comment

let path = tmp.path().join("test_backup.db");
let db = redb::Database::create(&path).unwrap();
// Leak the tempdir so it stays alive for the test duration.
std::mem::forget(tmp);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: std::mem::forget(tmp) leaks the temp directory (and its contents) on every test run. Consider storing the TempDir handle alongside the DiskBackup or in a static, so it's cleaned up when the test completes.

for stream in listener.incoming() {
let Ok(mut stream) = stream else { break };
let mut buf = [0u8; 65536];
let _ = stream.read(&mut buf);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: A single read() with a fixed 65,536-byte buffer may not consume the full HTTP request if the ClickHouse client sends data across multiple TCP segments (e.g., large merged batches). This could leave data in the socket buffer and cause the mock to misbehave on the next request. Consider reading until \r\n\r\n is found (end of headers) to be more robust, or at least looping reads until no more data is available.

.last_cached
.take()
.filter(|cached| cached.source != BackupSource::Memory);
.filter(|cached| cached.sources.iter().any(|s| *s != BackupSource::Memory));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug (test-only path): This filter runs on every incoming failed commit when use_only_memory_backup is true. Since last_cached holds data already removed from memory_backup (via retrieve_oldest), clearing it here means those rows are permanently lost — not just deferred.

In the production drop_excess path (line 628-631), this same filter is guarded by memory pressure, which is an acceptable trade-off. But here it fires unconditionally, so a failed merged commit waiting for retry gets silently discarded as soon as the next failure arrives.

Consider guarding this the same way, or removing it from the test-only path entirely.

@fsodre
Copy link
Author

fsodre commented Mar 23, 2026

Code review

Found 1 issue:

  1. Disk entries re-read repeatedly in the gather loop, causing duplicate rows in ClickHouse. DiskBackup::retrieve_oldest() is a read-only operation (takes &self, never calls delete). Once memory is exhausted and last_cached is empty, the inner while gather loop calls self.retrieve_oldest() repeatedly, and each call returns the same disk entry because it was never consumed. The loop extends merged_rows with the same rows over and over until max_rows_per_commit is reached, then commits all those duplicates to ClickHouse. The original code was safe because it processed one batch per tick and purged immediately after success. The new gather-then-commit pattern breaks that invariant.

while merged_rows.len() < self.max_rows_per_commit {
let Some(oldest) = self.retrieve_oldest() else {
break;
};
merged_quantities.bytes += oldest.commit.quantities.bytes;
merged_quantities.rows += oldest.commit.quantities.rows;
merged_quantities.transactions += oldest.commit.quantities.transactions;
merged_rows.extend(oldest.commit.rows);
sources.extend(oldest.sources);
}

🤖 Generated with Claude Code

- If this code review was useful, please react with 👍. Otherwise, react with 👎.

DiskBackup::retrieve_oldest() was read-only — it never deleted the entry
from disk. The gather loop called it repeatedly, getting the same disk
entry every time, filling merged_rows with duplicates.

Replace with take_oldest() that reads + deletes in a single redb write
transaction. This makes repeated calls return successive entries. Since
disk entries are now consumed on retrieval, purge_disk_backup and the
post-commit purge loop are no longer needed and have been removed.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@claude
Copy link
Contributor

claude bot commented Mar 23, 2026

Claude finished @fsodre's task in 3m 18s —— View job


Review: PR #907 — ClickHouse backup drain loop fixes (re-review after sync)

  • Read repo guidelines
  • Read the updated diff against origin/develop
  • Analyze code changes for bugs, edge cases, security, and performance
  • Post review with inline comments

Overall: The second commit (42a3448) correctly addresses the critical duplicate-rows bug identified by @fsodreretrieve_oldest is replaced with an atomic take_oldest that reads and deletes within a single write transaction. The fix is clean and the approach is sound.

Issues found

  1. take_oldest write-lock contention (inline) — Every call acquires a write lock even when the table is empty. The old code used a read transaction for the check. This blocks concurrent save() calls unnecessarily. Consider a read-first-then-write pattern, or document that contention is acceptable.

  2. Test-only last_cached clearing is unconditional data loss (inline) — In the use_only_memory_backup path (line 583-587), last_cached with memory-only sources is cleared on every backup() call, not just under memory pressure. This silently drops rows that have no other copy. (Carried from previous review, still present.)

  3. std::mem::forget(tmp) leaks temp dirs (inline) — Minor, test-only. (Carried from previous review.)

  4. Mock server single-read buffer (inline) — Minor, test-only. (Carried from previous review.)

Notes

  • The Durability::Immediate setting from the old delete() was dropped in take_oldest(). This is consistent with save() which also doesn't set it, but worth noting: if the process crashes between take_oldest returning and the ClickHouse commit succeeding, those rows are lost. The old code had the same race window (just wider), so this is arguably an improvement.
  • The new_disk_backup_key() collision risk (pre-existing, SystemTime::now().as_micros()) becomes slightly more relevant with batch merging increasing throughput, but is unlikely to be a problem in practice since disk backup only triggers on ClickHouse failures.

Comment on lines +331 to +338
let writer = self.db.write().expect("not poisoned").begin_write()?;
let result = {
let mut table = writer.open_table(table_def)?;

// Read the oldest entry, extracting owned values before mutating.
let (key, commit) = {
let Some(entry_res) = table.iter()?.next() else {
return Ok(None);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Early return on empty table leaks the write transaction without committing. More importantly, writer.open_table() on a write transaction creates the table if it doesn't exist, so every call to take_oldest on a fresh database creates an empty table that's rolled back on drop — this is wasteful but harmless.

However, the bigger issue: taking a write lock (self.db.write()) just to check if the table is empty blocks all concurrent readers (including save() and get_table_stats()). The old code used a read transaction for the check. Consider either:

  1. Doing a quick read-check first and only taking the write lock when there's actually data, or
  2. Accepting the contention since this runs on a single-threaded tick loop (if that's the case, a comment would help).

let path = tmp.path().join("test_backup.db");
let db = redb::Database::create(&path).unwrap();
// Leak the tempdir so it stays alive for the test duration.
std::mem::forget(tmp);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit (carried over from previous review): std::mem::forget(tmp) leaks the temp directory on every test run. Consider storing the TempDir in the returned struct or using a wrapper, so it's cleaned up when the test completes.

for stream in listener.incoming() {
let Ok(mut stream) = stream else { break };
let mut buf = [0u8; 65536];
let _ = stream.read(&mut buf);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit (carried over): A single read() with a 64KB buffer may not consume the full HTTP request for large merged batches, potentially causing the mock to see stale data on the next request. Consider reading in a loop until EOF or at least draining the stream.

.last_cached
.take()
.filter(|cached| cached.source != BackupSource::Memory);
.filter(|cached| cached.sources.iter().any(|s| *s != BackupSource::Memory));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug (test-only path, carried over): This block runs on every backup() call in test mode. It unconditionally clears last_cached if all sources are memory-only — but last_cached holds data already removed from the memory deque via retrieve_oldest. Those rows have no other copy, so clearing last_cached here is silent data loss.

The production drop_excess path (line 610-613) has the same filter, but it's guarded by actual memory pressure. Here it fires unconditionally. Either remove the filter entirely from this path, or guard it with the same memory-pressure check.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant