Skip to content

Commit

Permalink
Commit ingested records in batches
Browse files Browse the repository at this point in the history
  • Loading branch information
ctring committed Nov 5, 2023
1 parent ca3c177 commit c3d043c
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 25 deletions.
4 changes: 2 additions & 2 deletions pageserver/src/import_datadir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ async fn import_wal(
while last_lsn <= endpoint {
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
walingest
.ingest_record(recdata, lsn, &mut modification, &mut decoded, ctx)
.ingest_record(recdata, lsn, &mut modification, &mut decoded, ctx, true)
.await?;
last_lsn = lsn;

Expand Down Expand Up @@ -442,7 +442,7 @@ pub async fn import_wal_from_tar(
while last_lsn <= end_lsn {
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
walingest
.ingest_record(recdata, lsn, &mut modification, &mut decoded, ctx)
.ingest_record(recdata, lsn, &mut modification, &mut decoded, ctx, true)
.await?;
last_lsn = lsn;

Expand Down
31 changes: 17 additions & 14 deletions pageserver/src/pgdatadir_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -678,8 +678,8 @@ pub struct DatadirModification<'a> {
// The modifications are not applied directly to the underlying key-value store.
// The put-functions add the modifications here, and they are flushed to the
// underlying key-value store by the 'finish' function.
pending_updates: HashMap<Key, Value>,
pending_deletions: Vec<Range<Key>>,
pending_updates: HashMap<Key, (Lsn, Value)>,
pending_deletions: Vec<(Range<Key>, Lsn)>,
pending_nblocks: i64,
}

Expand Down Expand Up @@ -1156,13 +1156,13 @@ impl<'a> DatadirModification<'a> {

// Flush relation and SLRU data blocks, keep metadata.
let mut retained_pending_updates = HashMap::new();
for (key, value) in self.pending_updates.drain() {
for (key, (lsn, value)) in self.pending_updates.drain() {
if is_rel_block_key(key) || is_slru_block_key(key) {
// This bails out on first error without modifying pending_updates.
// That's Ok, cf this function's doc comment.
writer.put(key, self.lsn, &value).await?;
writer.put(key, lsn, &value).await?;
} else {
retained_pending_updates.insert(key, value);
retained_pending_updates.insert(key, (lsn, value));
}
}
self.pending_updates.extend(retained_pending_updates);
Expand All @@ -1189,12 +1189,15 @@ impl<'a> DatadirModification<'a> {
let pending_nblocks = self.pending_nblocks;
self.pending_nblocks = 0;

for (key, value) in self.pending_updates.drain() {
writer.put(key, lsn, &value).await?;
}
for key_range in self.pending_deletions.drain(..) {
writer.delete(key_range, lsn).await?;
}
let pending_updates = self
.pending_updates
.drain()
.map(|(key, (lsn, value))| (key, lsn, value))
.collect();
writer.put_batch(pending_updates).await?;

let pending_deletions = self.pending_deletions.drain(..).collect();
writer.delete_batch(pending_deletions).await?;

writer.finish_write(lsn);

Expand All @@ -1213,7 +1216,7 @@ impl<'a> DatadirModification<'a> {
//
// Note: we don't check pending_deletions. It is an error to request a
// value that has been removed, deletion only avoids leaking storage.
if let Some(value) = self.pending_updates.get(&key) {
if let Some((_, value)) = self.pending_updates.get(&key) {
if let Value::Image(img) = value {
Ok(img.clone())
} else {
Expand All @@ -1233,12 +1236,12 @@ impl<'a> DatadirModification<'a> {
}

fn put(&mut self, key: Key, val: Value) {
self.pending_updates.insert(key, val);
self.pending_updates.insert(key, (self.lsn, val));
}

fn delete(&mut self, key_range: Range<Key>) {
trace!("DELETE {}-{}", key_range.start, key_range.end);
self.pending_deletions.push(key_range);
self.pending_deletions.push((key_range, self.lsn));
}
}

Expand Down
4 changes: 1 addition & 3 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4242,9 +4242,7 @@ mod tests {
let last_lsn = batch.last().unwrap().1;

let writer = tline.writer().await;
writer
.put_batch(batch.iter().map(|(k, l, v)| (*k, *l, v)).collect())
.await?;
writer.put_batch(batch).await?;
writer.finish_write(last_lsn);
drop(writer);

Expand Down
4 changes: 2 additions & 2 deletions pageserver/src/tenant/storage_layer/inmemory_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,11 +273,11 @@ impl InMemoryLayer {
self.put_value_locked(&mut inner, key, lsn, val).await
}

pub async fn put_values(&self, values: Vec<(Key, Lsn, &Value)>) -> Result<()> {
pub async fn put_values(&self, values: Vec<(Key, Lsn, Value)>) -> Result<()> {
let mut inner = self.inner.write().await;
self.assert_writable();
for (key, lsn, val) in values {
self.put_value_locked(&mut inner, key, lsn, val).await?;
self.put_value_locked(&mut inner, key, lsn, &val).await?;
}
Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2664,7 +2664,7 @@ impl Timeline {
Ok(())
}

async fn put_values(&self, values: Vec<(Key, Lsn, &Value)>) -> anyhow::Result<()> {
async fn put_values(&self, values: Vec<(Key, Lsn, Value)>) -> anyhow::Result<()> {
if let Some((_, lsn, _)) = values.iter().next() {
let layer = self.get_layer_for_write(*lsn).await?;
layer.put_values(values).await?;
Expand Down Expand Up @@ -4801,7 +4801,7 @@ impl<'a> TimelineWriter<'a> {
self.tl.put_value(key, lsn, value).await
}

pub async fn put_batch(&self, batch: Vec<(Key, Lsn, &Value)>) -> anyhow::Result<()> {
pub async fn put_batch(&self, batch: Vec<(Key, Lsn, Value)>) -> anyhow::Result<()> {
self.tl.put_values(batch).await
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,14 +332,23 @@ pub(super) async fn handle_walreceiver_connection(
}

walingest
.ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx)
.ingest_record(
recdata,
lsn,
&mut modification,
&mut decoded,
&ctx,
false,
)
.await
.with_context(|| format!("could not ingest record at {lsn}"))?;

fail_point!("walreceiver-after-ingest");

last_rec_lsn = lsn;
}

modification.commit().await?;
}

if !caught_up && endlsn >= end_of_wal {
Expand Down
5 changes: 4 additions & 1 deletion pageserver/src/walingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ impl<'a> WalIngest<'a> {
modification: &mut DatadirModification<'_>,
decoded: &mut DecodedWALRecord,
ctx: &RequestContext,
commit: bool,
) -> anyhow::Result<()> {
modification.lsn = lsn;
decode_wal_record(recdata, decoded, self.timeline.pg_version)?;
Expand Down Expand Up @@ -358,7 +359,9 @@ impl<'a> WalIngest<'a> {

// Now that this record has been fully handled, including updating the
// checkpoint data, let the repository know that it is up-to-date to this LSN
modification.commit().await?;
if commit {
modification.commit().await?;
}

Ok(())
}
Expand Down

0 comments on commit c3d043c

Please sign in to comment.