Skip to content

Commit

Permalink
Simplify the passing of ingested batch
Browse files Browse the repository at this point in the history
  • Loading branch information
ctring committed Nov 14, 2023
1 parent d51cbf1 commit e593db1
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 33 deletions.
2 changes: 2 additions & 0 deletions pageserver/src/http/openapi_spec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1051,6 +1051,8 @@ components:
type: string
image_creation_threshold:
type: integer
ingest_batch_size:
type: integer
walreceiver_connect_timeout:
type: string
lagging_wal_timeout:
Expand Down
26 changes: 7 additions & 19 deletions pageserver/src/pgdatadir_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use crate::repository::*;
use crate::walrecord::NeonWalRecord;
use anyhow::{ensure, Context};
use bytes::{Buf, Bytes};
use itertools::Itertools;
use pageserver_api::reltag::{RelTag, SlruKind};
use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi::BLCKSZ;
Expand Down Expand Up @@ -1187,10 +1186,8 @@ impl<'a> DatadirModification<'a> {
}
}
}
// The right way to extend this is to also merge the values in the corresponding
// keys, but since pending_updates is guaranteed to be empty after the drain, this
// should also be fine.
self.pending_updates.extend(retained_pending_updates);

self.pending_updates = retained_pending_updates;

if pending_nblocks != 0 {
writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ));
Expand All @@ -1212,20 +1209,11 @@ impl<'a> DatadirModification<'a> {
let pending_nblocks = self.pending_nblocks;
self.pending_nblocks = 0;

let pending_updates = self
.pending_updates
.drain()
.map(|(key, pending_updates)| {
pending_updates
.into_iter()
.map(|(lsn, value)| (key, lsn, value))
.collect::<Vec<_>>()
})
.concat();
writer.put_batch(&pending_updates).await?;

let pending_deletions: Vec<_> = self.pending_deletions.drain(..).collect();
writer.delete_batch(&pending_deletions).await?;
writer.put_batch(&self.pending_updates).await?;
self.pending_updates.clear();

writer.delete_batch(&self.pending_deletions).await?;
self.pending_deletions.clear();

writer.finish_write(lsn);

Expand Down
10 changes: 4 additions & 6 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4224,24 +4224,22 @@ mod tests {
let mut test_key = Key::from_hex("012222222233333333444444445500000000").unwrap();
let mut blknum = 0;
for _ in 0..50 {
let mut batch = vec![];

let mut batch: HashMap<Key, Vec<(Lsn, Value)>> = HashMap::new();
let mut last_lsn = lsn;
for _ in 0..10000 {
test_key.field6 = blknum;

batch.push((
test_key,
batch.entry(test_key).or_default().push((
lsn,
Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))),
));

keyspace.add_key(test_key);
last_lsn = lsn;
lsn = Lsn(lsn.0 + 0x10);
blknum += 1;
}

let last_lsn = batch.last().unwrap().1;

let writer = tline.writer().await;
writer.put_batch(&batch).await?;
writer.finish_write(last_lsn);
Expand Down
8 changes: 5 additions & 3 deletions pageserver/src/tenant/storage_layer/inmemory_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,11 +273,13 @@ impl InMemoryLayer {
self.put_value_locked(&mut inner, key, lsn, val).await
}

pub async fn put_values(&self, values: &[(Key, Lsn, Value)]) -> Result<()> {
pub async fn put_values(&self, values: &HashMap<Key, Vec<(Lsn, Value)>>) -> Result<()> {
let mut inner = self.inner.write().await;
self.assert_writable();
for (key, lsn, val) in values {
self.put_value_locked(&mut inner, *key, *lsn, val).await?;
for (key, vals) in values {
for (lsn, val) in vals {
self.put_value_locked(&mut inner, *key, *lsn, val).await?;
}
}
Ok(())
}
Expand Down
14 changes: 9 additions & 5 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2668,10 +2668,14 @@ impl Timeline {
Ok(())
}

async fn put_values(&self, values: &[(Key, Lsn, Value)]) -> anyhow::Result<()> {
if let Some((_, lsn, _)) = values.first() {
let layer = self.get_layer_for_write(*lsn).await?;
layer.put_values(values).await?;
async fn put_values(&self, values: &HashMap<Key, Vec<(Lsn, Value)>>) -> anyhow::Result<()> {
// Pick the first LSN in the batch to get the layer to write to.
for lsns in values.values() {
if let Some((lsn, _)) = lsns.first() {
let layer = self.get_layer_for_write(*lsn).await?;
layer.put_values(values).await?;
break;
}
}
Ok(())
}
Expand Down Expand Up @@ -4805,7 +4809,7 @@ impl<'a> TimelineWriter<'a> {
self.tl.put_value(key, lsn, value).await
}

pub async fn put_batch(&self, batch: &[(Key, Lsn, Value)]) -> anyhow::Result<()> {
pub async fn put_batch(&self, batch: &HashMap<Key, Vec<(Lsn, Value)>>) -> anyhow::Result<()> {
self.tl.put_values(batch).await
}

Expand Down

0 comments on commit e593db1

Please sign in to comment.