From e593db1f5ab2505eb176c9faaf2e9b9ba36cb2c4 Mon Sep 17 00:00:00 2001 From: Cuong Nguyen Date: Mon, 13 Nov 2023 23:48:44 -0500 Subject: [PATCH] Simplify the passing of ingested batch --- pageserver/src/http/openapi_spec.yml | 2 ++ pageserver/src/pgdatadir_mapping.rs | 26 +++++-------------- pageserver/src/tenant.rs | 10 +++---- .../tenant/storage_layer/inmemory_layer.rs | 8 +++--- pageserver/src/tenant/timeline.rs | 14 ++++++---- 5 files changed, 27 insertions(+), 33 deletions(-) diff --git a/pageserver/src/http/openapi_spec.yml b/pageserver/src/http/openapi_spec.yml index 38e07f172d3d..d7b0568ec54a 100644 --- a/pageserver/src/http/openapi_spec.yml +++ b/pageserver/src/http/openapi_spec.yml @@ -1051,6 +1051,8 @@ components: type: string image_creation_threshold: type: integer + ingest_batch_size: + type: integer walreceiver_connect_timeout: type: string lagging_wal_timeout: diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index be25bfbd09b0..c1f10a6ab9f4 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -13,7 +13,6 @@ use crate::repository::*; use crate::walrecord::NeonWalRecord; use anyhow::{ensure, Context}; use bytes::{Buf, Bytes}; -use itertools::Itertools; use pageserver_api::reltag::{RelTag, SlruKind}; use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM}; use postgres_ffi::BLCKSZ; @@ -1187,10 +1186,8 @@ impl<'a> DatadirModification<'a> { } } } - // The right way to extend this is to also merge the values in the corresponding - // keys, but since pending_updates is guaranteed to be empty after the drain, this - // should also be fine. - self.pending_updates.extend(retained_pending_updates); + + self.pending_updates = retained_pending_updates; if pending_nblocks != 0 { writer.update_current_logical_size(pending_nblocks * i64::from(BLCKSZ)); @@ -1212,20 +1209,11 @@ impl<'a> DatadirModification<'a> { let pending_nblocks = self.pending_nblocks; self.pending_nblocks = 0; - let pending_updates = self - .pending_updates - .drain() - .map(|(key, pending_updates)| { - pending_updates - .into_iter() - .map(|(lsn, value)| (key, lsn, value)) - .collect::>() - }) - .concat(); - writer.put_batch(&pending_updates).await?; - - let pending_deletions: Vec<_> = self.pending_deletions.drain(..).collect(); - writer.delete_batch(&pending_deletions).await?; + writer.put_batch(&self.pending_updates).await?; + self.pending_updates.clear(); + + writer.delete_batch(&self.pending_deletions).await?; + self.pending_deletions.clear(); writer.finish_write(lsn); diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index d6a5495f3325..ad18a4815063 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -4224,24 +4224,22 @@ mod tests { let mut test_key = Key::from_hex("012222222233333333444444445500000000").unwrap(); let mut blknum = 0; for _ in 0..50 { - let mut batch = vec![]; - + let mut batch: HashMap> = HashMap::new(); + let mut last_lsn = lsn; for _ in 0..10000 { test_key.field6 = blknum; - batch.push(( - test_key, + batch.entry(test_key).or_default().push(( lsn, Value::Image(TEST_IMG(&format!("{} at {}", blknum, lsn))), )); keyspace.add_key(test_key); + last_lsn = lsn; lsn = Lsn(lsn.0 + 0x10); blknum += 1; } - let last_lsn = batch.last().unwrap().1; - let writer = tline.writer().await; writer.put_batch(&batch).await?; writer.finish_write(last_lsn); diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 303a9a3708d1..8fb93b2fcea9 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -273,11 +273,13 @@ impl InMemoryLayer { self.put_value_locked(&mut inner, key, lsn, val).await } - pub async fn put_values(&self, values: &[(Key, Lsn, Value)]) -> Result<()> { + pub async fn put_values(&self, values: &HashMap>) -> Result<()> { let mut inner = self.inner.write().await; self.assert_writable(); - for (key, lsn, val) in values { - self.put_value_locked(&mut inner, *key, *lsn, val).await?; + for (key, vals) in values { + for (lsn, val) in vals { + self.put_value_locked(&mut inner, *key, *lsn, val).await?; + } } Ok(()) } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 2dda4c5b68c8..f826003c6d1a 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -2668,10 +2668,14 @@ impl Timeline { Ok(()) } - async fn put_values(&self, values: &[(Key, Lsn, Value)]) -> anyhow::Result<()> { - if let Some((_, lsn, _)) = values.first() { - let layer = self.get_layer_for_write(*lsn).await?; - layer.put_values(values).await?; + async fn put_values(&self, values: &HashMap>) -> anyhow::Result<()> { + // Pick the first LSN in the batch to get the layer to write to. + for lsns in values.values() { + if let Some((lsn, _)) = lsns.first() { + let layer = self.get_layer_for_write(*lsn).await?; + layer.put_values(values).await?; + break; + } } Ok(()) } @@ -4805,7 +4809,7 @@ impl<'a> TimelineWriter<'a> { self.tl.put_value(key, lsn, value).await } - pub async fn put_batch(&self, batch: &[(Key, Lsn, Value)]) -> anyhow::Result<()> { + pub async fn put_batch(&self, batch: &HashMap>) -> anyhow::Result<()> { self.tl.put_values(batch).await }