diff --git a/store/postgres/src/chain_store.rs b/store/postgres/src/chain_store.rs index e1d9bea2538..d8527779a89 100644 --- a/store/postgres/src/chain_store.rs +++ b/store/postgres/src/chain_store.rs @@ -3319,7 +3319,7 @@ impl ChainStoreTrait for ChainStore { let mut last_log = Instant::now(); loop { - let current_size = batch_size.size; + let current_size = batch_size.size.max(1); let start = Instant::now(); let deleted = self .storage diff --git a/store/postgres/src/vid_batcher.rs b/store/postgres/src/vid_batcher.rs index 7ea633058f8..63c0070913b 100644 --- a/store/postgres/src/vid_batcher.rs +++ b/store/postgres/src/vid_batcher.rs @@ -24,6 +24,7 @@ const INITIAL_BATCH_SIZE: i64 = 10_000; /// arrays can be large and large arrays will slow down copying a lot. We /// therefore tread lightly in that case const INITIAL_BATCH_SIZE_LIST: i64 = 100; +const MIN_BATCH_SIZE: i64 = 1; /// Track the desired size of a batch in such a way that doing the next /// batch gets close to TARGET_DURATION for the time it takes to copy one @@ -35,9 +36,13 @@ pub(crate) struct AdaptiveBatchSize { } impl AdaptiveBatchSize { + fn clamp_size(size: i64) -> i64 { + size.max(MIN_BATCH_SIZE) + } + pub fn with_size(size: i64) -> Self { Self { - size, + size: Self::clamp_size(size), target: ENV_VARS.store.batch_target_duration, } } @@ -49,10 +54,7 @@ impl AdaptiveBatchSize { INITIAL_BATCH_SIZE }; - Self { - size, - target: ENV_VARS.store.batch_target_duration, - } + Self::with_size(size) } // adjust batch size by trying to extrapolate in such a way that we @@ -62,7 +64,7 @@ impl AdaptiveBatchSize { // Avoid division by zero let duration = duration.as_millis().max(1); let new_batch_size = self.size as f64 * self.target.as_millis() as f64 / duration as f64; - self.size = (2 * self.size).min(new_batch_size.round() as i64); + self.size = Self::clamp_size((2 * self.size).min(new_batch_size.round() as i64)); self.size } } @@ -186,7 +188,7 @@ impl VidBatcher { /// Explicitly set the batch size pub fn with_batch_size(mut self: VidBatcher, size: usize) -> Self { - self.batch_size.size = size as i64; + self.batch_size.size = AdaptiveBatchSize::clamp_size(size as i64); self } @@ -244,6 +246,7 @@ impl VidBatcher { } pub(crate) fn set_batch_size(&mut self, size: usize) { + let size = AdaptiveBatchSize::clamp_size(size as i64) as usize; self.batch_size.size = size as i64; self.end = match &self.ogive { Some(ogive) => ogive.next_point(self.start, size).unwrap(), @@ -464,6 +467,17 @@ mod tests { batcher.step(360, 359, S010).await; } + #[test] + fn adaptive_batch_size_never_shrinks_to_zero() { + let mut batch_size = AdaptiveBatchSize { + size: 100, + target: S100, + }; + + assert_eq!(batch_size.adapt(Duration::from_secs(20_001)), 1); + assert_eq!(batch_size.size, 1); + } + #[test] fn vid_batcher_adjusts_bounds() { // The first and last entry in `bounds` are estimats of the min and