diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 56e9cd0fecbc..8011a8e8ae13 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -4201,6 +4201,66 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_bulk_insert_with_batch() -> anyhow::Result<()> { + let (tenant, ctx) = TenantHarness::create("test_bulk_insert_with_batch")? + .load() + .await; + let tline = tenant + .create_test_timeline( + TIMELINE_ID, + Lsn(0x08), + DEFAULT_PG_VERSION, + RegionId(0), + &ctx, + ) + .await?; + + let mut lsn = Lsn(0x10); + + let mut keyspace = KeySpaceAccum::new(); + + let mut test_key = Key::from_hex("012222222233333333444444445500000000").unwrap(); + let mut blknum = 0; + for _ in 0..50 { + let mut batch = vec![]; + + for _ in 0..10000 { + test_key.field6 = blknum; + + batch.push(( + test_key, + lsn, + Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), + )); + + keyspace.add_key(test_key); + lsn = Lsn(lsn.0 + 0x10); + blknum += 1; + } + + let last_lsn = batch.last().unwrap().1; + + let writer = tline.writer().await; + writer + .put_batch(batch.iter().map(|(k, l, v)| (*k, *l, v)).collect()) + .await?; + writer.finish_write(last_lsn); + drop(writer); + + let cutoff = tline.get_last_record_lsn(); + + tline + .update_gc_info(Vec::new(), cutoff, Duration::ZERO, &ctx) + .await?; + tline.freeze_and_flush().await?; + tline.compact(&CancellationToken::new(), &ctx).await?; + tline.gc().await?; + } + + Ok(()) + } + #[tokio::test] async fn test_random_updates() -> anyhow::Result<()> { let (tenant, ctx) = TenantHarness::create("test_random_updates")?.load().await; diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index aa9d0884e000..6b748ed3ea6d 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -28,7 +28,7 @@ use utils::{ // while being able to use std::fmt::Write's methods use std::fmt::Write as _; use std::ops::Range; -use tokio::sync::RwLock; +use tokio::sync::{RwLock, RwLockWriteGuard}; use super::{DeltaLayer, DeltaLayerWriter, Layer}; @@ -268,21 +268,40 @@ impl InMemoryLayer { /// Common subroutine of the public put_wal_record() and put_page_image() functions. /// Adds the page version to the in-memory tree pub async fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> Result<()> { - trace!("put_value key {} at {}/{}", key, self.timeline_id, lsn); let mut inner = self.inner.write().await; self.assert_writable(); + self.put_value_locked(&mut inner, key, lsn, val).await + } + + pub async fn put_values(&self, values: Vec<(Key, Lsn, &Value)>) -> Result<()> { + let mut inner = self.inner.write().await; + self.assert_writable(); + for (key, lsn, val) in values { + self.put_value_locked(&mut inner, key, lsn, val).await?; + } + Ok(()) + } + + async fn put_value_locked( + &self, + locked_inner: &mut RwLockWriteGuard<'_, InMemoryLayerInner>, + key: Key, + lsn: Lsn, + val: &Value, + ) -> Result<()> { + trace!("put_value key {} at {}/{}", key, self.timeline_id, lsn); let off = { SER_BUFFER.with(|x| -> Result<_> { let mut buf = x.borrow_mut(); buf.clear(); val.ser_into(&mut (*buf))?; - let off = inner.file.write_blob(&buf)?; + let off = locked_inner.file.write_blob(&buf)?; Ok(off) })? }; - let vec_map = inner.index.entry(key).or_default(); + let vec_map = locked_inner.index.entry(key).or_default(); let old = vec_map.append_or_update_last(lsn, off).unwrap().0; if old.is_some() { // We already had an entry for this LSN. That's odd.. @@ -298,6 +317,10 @@ impl InMemoryLayer { Ok(()) } + pub async fn put_tombstones(&self, _key_ranges: Vec<(Range, Lsn)>) -> Result<()> { + Ok(()) + } + /// Make the layer non-writeable. Only call once. /// Records the end_lsn for non-dropped layers. /// `end_lsn` is exclusive diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 9965356d2dbd..b7000e3d4b75 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -2664,12 +2664,28 @@ impl Timeline { Ok(()) } + async fn put_values(&self, values: Vec<(Key, Lsn, &Value)>) -> anyhow::Result<()> { + if let Some((_, lsn, _)) = values.iter().next() { + let layer = self.get_layer_for_write(*lsn).await?; + layer.put_values(values).await?; + } + Ok(()) + } + async fn put_tombstone(&self, key_range: Range, lsn: Lsn) -> anyhow::Result<()> { let layer = self.get_layer_for_write(lsn).await?; layer.put_tombstone(key_range, lsn).await?; Ok(()) } + async fn put_tombstones(&self, tombstones: Vec<(Range, Lsn)>) -> anyhow::Result<()> { + if let Some((_, lsn)) = tombstones.iter().next() { + let layer = self.get_layer_for_write(*lsn).await?; + layer.put_tombstones(tombstones).await?; + } + Ok(()) + } + fn finish_write(&self, new_lsn: Lsn) { assert!(new_lsn.is_aligned()); @@ -4785,10 +4801,18 @@ impl<'a> TimelineWriter<'a> { self.tl.put_value(key, lsn, value).await } + pub async fn put_batch(&self, batch: Vec<(Key, Lsn, &Value)>) -> anyhow::Result<()> { + self.tl.put_values(batch).await + } + pub async fn delete(&self, key_range: Range, lsn: Lsn) -> anyhow::Result<()> { self.tl.put_tombstone(key_range, lsn).await } + pub async fn delete_batch(&self, batch: Vec<(Range, Lsn)>) -> anyhow::Result<()> { + self.tl.put_tombstones(batch).await + } + /// Track the end of the latest digested WAL record. /// Remember the (end of) last valid WAL record remembered in the timeline. ///