Skip to content

Commit

Permalink
Batch API for TimelineWriter
Browse files Browse the repository at this point in the history
  • Loading branch information
ctring committed Nov 5, 2023
1 parent 8914088 commit ca3c177
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 4 deletions.
60 changes: 60 additions & 0 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4201,6 +4201,66 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_bulk_insert_with_batch() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("test_bulk_insert_with_batch")?
.load()
.await;
let tline = tenant
.create_test_timeline(
TIMELINE_ID,
Lsn(0x08),
DEFAULT_PG_VERSION,
RegionId(0),
&ctx,
)
.await?;

let mut lsn = Lsn(0x10);

let mut keyspace = KeySpaceAccum::new();

let mut test_key = Key::from_hex("012222222233333333444444445500000000").unwrap();
let mut blknum = 0;
for _ in 0..50 {
let mut batch = vec![];

for _ in 0..10000 {
test_key.field6 = blknum;

batch.push((
test_key,
lsn,
Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
));

keyspace.add_key(test_key);
lsn = Lsn(lsn.0 + 0x10);
blknum += 1;
}

let last_lsn = batch.last().unwrap().1;

let writer = tline.writer().await;
writer
.put_batch(batch.iter().map(|(k, l, v)| (*k, *l, v)).collect())
.await?;
writer.finish_write(last_lsn);
drop(writer);

let cutoff = tline.get_last_record_lsn();

tline
.update_gc_info(Vec::new(), cutoff, Duration::ZERO, &ctx)
.await?;
tline.freeze_and_flush().await?;
tline.compact(&CancellationToken::new(), &ctx).await?;
tline.gc().await?;
}

Ok(())
}

#[tokio::test]
async fn test_random_updates() -> anyhow::Result<()> {
let (tenant, ctx) = TenantHarness::create("test_random_updates")?.load().await;
Expand Down
31 changes: 27 additions & 4 deletions pageserver/src/tenant/storage_layer/inmemory_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use utils::{
// while being able to use std::fmt::Write's methods
use std::fmt::Write as _;
use std::ops::Range;
use tokio::sync::RwLock;
use tokio::sync::{RwLock, RwLockWriteGuard};

use super::{DeltaLayer, DeltaLayerWriter, Layer};

Expand Down Expand Up @@ -268,21 +268,40 @@ impl InMemoryLayer {
/// Common subroutine of the public put_wal_record() and put_page_image() functions.
/// Adds the page version to the in-memory tree
pub async fn put_value(&self, key: Key, lsn: Lsn, val: &Value) -> Result<()> {
trace!("put_value key {} at {}/{}", key, self.timeline_id, lsn);
let mut inner = self.inner.write().await;
self.assert_writable();
self.put_value_locked(&mut inner, key, lsn, val).await
}

pub async fn put_values(&self, values: Vec<(Key, Lsn, &Value)>) -> Result<()> {
let mut inner = self.inner.write().await;
self.assert_writable();
for (key, lsn, val) in values {
self.put_value_locked(&mut inner, key, lsn, val).await?;
}
Ok(())
}

async fn put_value_locked(
&self,
locked_inner: &mut RwLockWriteGuard<'_, InMemoryLayerInner>,
key: Key,
lsn: Lsn,
val: &Value,
) -> Result<()> {
trace!("put_value key {} at {}/{}", key, self.timeline_id, lsn);

let off = {
SER_BUFFER.with(|x| -> Result<_> {
let mut buf = x.borrow_mut();
buf.clear();
val.ser_into(&mut (*buf))?;
let off = inner.file.write_blob(&buf)?;
let off = locked_inner.file.write_blob(&buf)?;
Ok(off)
})?
};

let vec_map = inner.index.entry(key).or_default();
let vec_map = locked_inner.index.entry(key).or_default();
let old = vec_map.append_or_update_last(lsn, off).unwrap().0;
if old.is_some() {
// We already had an entry for this LSN. That's odd..
Expand All @@ -298,6 +317,10 @@ impl InMemoryLayer {
Ok(())
}

pub async fn put_tombstones(&self, _key_ranges: Vec<(Range<Key>, Lsn)>) -> Result<()> {
Ok(())
}

/// Make the layer non-writeable. Only call once.
/// Records the end_lsn for non-dropped layers.
/// `end_lsn` is exclusive
Expand Down
24 changes: 24 additions & 0 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2664,12 +2664,28 @@ impl Timeline {
Ok(())
}

async fn put_values(&self, values: Vec<(Key, Lsn, &Value)>) -> anyhow::Result<()> {
if let Some((_, lsn, _)) = values.iter().next() {
let layer = self.get_layer_for_write(*lsn).await?;
layer.put_values(values).await?;
}
Ok(())
}

async fn put_tombstone(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
let layer = self.get_layer_for_write(lsn).await?;
layer.put_tombstone(key_range, lsn).await?;
Ok(())
}

async fn put_tombstones(&self, tombstones: Vec<(Range<Key>, Lsn)>) -> anyhow::Result<()> {
if let Some((_, lsn)) = tombstones.iter().next() {
let layer = self.get_layer_for_write(*lsn).await?;
layer.put_tombstones(tombstones).await?;
}
Ok(())
}

fn finish_write(&self, new_lsn: Lsn) {
assert!(new_lsn.is_aligned());

Expand Down Expand Up @@ -4785,10 +4801,18 @@ impl<'a> TimelineWriter<'a> {
self.tl.put_value(key, lsn, value).await
}

pub async fn put_batch(&self, batch: Vec<(Key, Lsn, &Value)>) -> anyhow::Result<()> {
self.tl.put_values(batch).await
}

pub async fn delete(&self, key_range: Range<Key>, lsn: Lsn) -> anyhow::Result<()> {
self.tl.put_tombstone(key_range, lsn).await
}

pub async fn delete_batch(&self, batch: Vec<(Range<Key>, Lsn)>) -> anyhow::Result<()> {
self.tl.put_tombstones(batch).await
}

/// Track the end of the latest digested WAL record.
/// Remember the (end of) last valid WAL record remembered in the timeline.
///
Expand Down

0 comments on commit ca3c177

Please sign in to comment.