From c3d043c6d51a3718c0e0cdb08ace36db07855612 Mon Sep 17 00:00:00 2001 From: Cuong Nguyen Date: Sat, 4 Nov 2023 23:54:23 -0400 Subject: [PATCH] Commit ingested records in batches --- pageserver/src/import_datadir.rs | 4 +-- pageserver/src/pgdatadir_mapping.rs | 31 ++++++++++--------- pageserver/src/tenant.rs | 4 +-- .../tenant/storage_layer/inmemory_layer.rs | 4 +-- pageserver/src/tenant/timeline.rs | 4 +-- .../walreceiver/walreceiver_connection.rs | 11 ++++++- pageserver/src/walingest.rs | 5 ++- 7 files changed, 38 insertions(+), 25 deletions(-) diff --git a/pageserver/src/import_datadir.rs b/pageserver/src/import_datadir.rs index 3838d4669d33..7789d98c97b0 100644 --- a/pageserver/src/import_datadir.rs +++ b/pageserver/src/import_datadir.rs @@ -306,7 +306,7 @@ async fn import_wal( while last_lsn <= endpoint { if let Some((lsn, recdata)) = waldecoder.poll_decode()? { walingest - .ingest_record(recdata, lsn, &mut modification, &mut decoded, ctx) + .ingest_record(recdata, lsn, &mut modification, &mut decoded, ctx, true) .await?; last_lsn = lsn; @@ -442,7 +442,7 @@ pub async fn import_wal_from_tar( while last_lsn <= end_lsn { if let Some((lsn, recdata)) = waldecoder.poll_decode()? { walingest - .ingest_record(recdata, lsn, &mut modification, &mut decoded, ctx) + .ingest_record(recdata, lsn, &mut modification, &mut decoded, ctx, true) .await?; last_lsn = lsn; diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index 4b72ed5fba59..b66399a675ee 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -678,8 +678,8 @@ pub struct DatadirModification<'a> { // The modifications are not applied directly to the underlying key-value store. // The put-functions add the modifications here, and they are flushed to the // underlying key-value store by the 'finish' function. - pending_updates: HashMap, - pending_deletions: Vec>, + pending_updates: HashMap, + pending_deletions: Vec<(Range, Lsn)>, pending_nblocks: i64, } @@ -1156,13 +1156,13 @@ impl<'a> DatadirModification<'a> { // Flush relation and SLRU data blocks, keep metadata. let mut retained_pending_updates = HashMap::new(); - for (key, value) in self.pending_updates.drain() { + for (key, (lsn, value)) in self.pending_updates.drain() { if is_rel_block_key(key) || is_slru_block_key(key) { // This bails out on first error without modifying pending_updates. // That's Ok, cf this function's doc comment. - writer.put(key, self.lsn, &value).await?; + writer.put(key, lsn, &value).await?; } else { - retained_pending_updates.insert(key, value); + retained_pending_updates.insert(key, (lsn, value)); } } self.pending_updates.extend(retained_pending_updates); @@ -1189,12 +1189,15 @@ impl<'a> DatadirModification<'a> { let pending_nblocks = self.pending_nblocks; self.pending_nblocks = 0; - for (key, value) in self.pending_updates.drain() { - writer.put(key, lsn, &value).await?; - } - for key_range in self.pending_deletions.drain(..) { - writer.delete(key_range, lsn).await?; - } + let pending_updates = self + .pending_updates + .drain() + .map(|(key, (lsn, value))| (key, lsn, value)) + .collect(); + writer.put_batch(pending_updates).await?; + + let pending_deletions = self.pending_deletions.drain(..).collect(); + writer.delete_batch(pending_deletions).await?; writer.finish_write(lsn); @@ -1213,7 +1216,7 @@ impl<'a> DatadirModification<'a> { // // Note: we don't check pending_deletions. It is an error to request a // value that has been removed, deletion only avoids leaking storage. - if let Some(value) = self.pending_updates.get(&key) { + if let Some((_, value)) = self.pending_updates.get(&key) { if let Value::Image(img) = value { Ok(img.clone()) } else { @@ -1233,12 +1236,12 @@ impl<'a> DatadirModification<'a> { } fn put(&mut self, key: Key, val: Value) { - self.pending_updates.insert(key, val); + self.pending_updates.insert(key, (self.lsn, val)); } fn delete(&mut self, key_range: Range) { trace!("DELETE {}-{}", key_range.start, key_range.end); - self.pending_deletions.push(key_range); + self.pending_deletions.push((key_range, self.lsn)); } } diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 8011a8e8ae13..16c770e5c8ec 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -4242,9 +4242,7 @@ mod tests { let last_lsn = batch.last().unwrap().1; let writer = tline.writer().await; - writer - .put_batch(batch.iter().map(|(k, l, v)| (*k, *l, v)).collect()) - .await?; + writer.put_batch(batch).await?; writer.finish_write(last_lsn); drop(writer); diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 6b748ed3ea6d..b756db6bf5a9 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -273,11 +273,11 @@ impl InMemoryLayer { self.put_value_locked(&mut inner, key, lsn, val).await } - pub async fn put_values(&self, values: Vec<(Key, Lsn, &Value)>) -> Result<()> { + pub async fn put_values(&self, values: Vec<(Key, Lsn, Value)>) -> Result<()> { let mut inner = self.inner.write().await; self.assert_writable(); for (key, lsn, val) in values { - self.put_value_locked(&mut inner, key, lsn, val).await?; + self.put_value_locked(&mut inner, key, lsn, &val).await?; } Ok(()) } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index b7000e3d4b75..8ed84f8d1caa 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -2664,7 +2664,7 @@ impl Timeline { Ok(()) } - async fn put_values(&self, values: Vec<(Key, Lsn, &Value)>) -> anyhow::Result<()> { + async fn put_values(&self, values: Vec<(Key, Lsn, Value)>) -> anyhow::Result<()> { if let Some((_, lsn, _)) = values.iter().next() { let layer = self.get_layer_for_write(*lsn).await?; layer.put_values(values).await?; @@ -4801,7 +4801,7 @@ impl<'a> TimelineWriter<'a> { self.tl.put_value(key, lsn, value).await } - pub async fn put_batch(&self, batch: Vec<(Key, Lsn, &Value)>) -> anyhow::Result<()> { + pub async fn put_batch(&self, batch: Vec<(Key, Lsn, Value)>) -> anyhow::Result<()> { self.tl.put_values(batch).await } diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index e22f7a1a7361..da1befecabca 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -332,7 +332,14 @@ pub(super) async fn handle_walreceiver_connection( } walingest - .ingest_record(recdata, lsn, &mut modification, &mut decoded, &ctx) + .ingest_record( + recdata, + lsn, + &mut modification, + &mut decoded, + &ctx, + false, + ) .await .with_context(|| format!("could not ingest record at {lsn}"))?; @@ -340,6 +347,8 @@ pub(super) async fn handle_walreceiver_connection( last_rec_lsn = lsn; } + + modification.commit().await?; } if !caught_up && endlsn >= end_of_wal { diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index 211f35dbf70d..ebaaabd75687 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -88,6 +88,7 @@ impl<'a> WalIngest<'a> { modification: &mut DatadirModification<'_>, decoded: &mut DecodedWALRecord, ctx: &RequestContext, + commit: bool, ) -> anyhow::Result<()> { modification.lsn = lsn; decode_wal_record(recdata, decoded, self.timeline.pg_version)?; @@ -358,7 +359,9 @@ impl<'a> WalIngest<'a> { // Now that this record has been fully handled, including updating the // checkpoint data, let the repository know that it is up-to-date to this LSN - modification.commit().await?; + if commit { + modification.commit().await?; + } Ok(()) }