From 94fa622767a20e433406cc97983cff9d2b5b2f3d Mon Sep 17 00:00:00 2001 From: Cuong Nguyen Date: Sun, 5 Nov 2023 13:16:50 -0500 Subject: [PATCH] Implementation for batch ingestion --- control_plane/src/pageserver.rs | 10 ++ libs/pageserver_api/src/models.rs | 2 + pageserver/src/config.rs | 9 ++ pageserver/src/pgdatadir_mapping.rs | 98 +++++++++++++------ pageserver/src/tenant.rs | 1 + pageserver/src/tenant/config.rs | 9 ++ pageserver/src/tenant/timeline.rs | 4 + pageserver/src/tenant/timeline/walreceiver.rs | 1 + .../walreceiver/connection_manager.rs | 3 + .../walreceiver/walreceiver_connection.rs | 11 ++- pageserver/src/walingest.rs | 14 +-- .../regress/test_attach_tenant_config.py | 1 + 12 files changed, 124 insertions(+), 39 deletions(-) diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index 1c7fd127a42b..1d865ae4adb2 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -380,6 +380,11 @@ impl PageServerNode { .map(|x| x.parse::()) .transpose() .context("Failed to parse 'gc_feedback' as bool")?, + batch_ingest: settings + .remove("batch_ingest") + .map(|x| x.parse::()) + .transpose() + .context("Failed to parse 'batch_ingest' as bool")?, }; // If tenant ID was not specified, generate one @@ -479,6 +484,11 @@ impl PageServerNode { .map(|x| x.parse::()) .transpose() .context("Failed to parse 'gc_feedback' as bool")?, + batch_ingest: settings + .remove("batch_ingest") + .map(|x| x.parse::()) + .transpose() + .context("Failed to parse 'batch_ingest' as bool")?, } }; diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 9959e7edf8d6..b4e6b9b1d921 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -232,6 +232,7 @@ pub struct TenantConfig { pub min_resident_size_override: Option, pub evictions_low_residence_duration_metric_threshold: Option, pub gc_feedback: Option, + pub batch_ingest: Option, } #[serde_as] @@ -291,6 +292,7 @@ impl TenantConfigRequest { min_resident_size_override: None, evictions_low_residence_duration_metric_threshold: None, gc_feedback: None, + batch_ingest: None, }; TenantConfigRequest { tenant_id, config } } diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index f2aa2f365eb4..4abefda32eea 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -114,6 +114,7 @@ pub mod defaults { #min_resident_size_override = .. # in bytes #evictions_low_residence_duration_metric_threshold = '{DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD}' #gc_feedback = false +#batch_ingest = true [remote_storage] @@ -884,6 +885,14 @@ impl PageServerConf { ); } + if let Some(batch_ingest) = item.get("batch_ingest") { + t_conf.batch_ingest = Some( + batch_ingest + .as_bool() + .with_context(|| "configure option batch_ingest is not a bool".to_string())?, + ); + } + Ok(t_conf) } diff --git a/pageserver/src/pgdatadir_mapping.rs b/pageserver/src/pgdatadir_mapping.rs index b66399a675ee..ccb598b88e6f 100644 --- a/pageserver/src/pgdatadir_mapping.rs +++ b/pageserver/src/pgdatadir_mapping.rs @@ -12,8 +12,9 @@ use crate::keyspace::{KeySpace, KeySpaceAccum}; use crate::metrics::WAL_COMMIT_WRITER_LOCK_WAIT_TIME; use crate::repository::*; use crate::walrecord::NeonWalRecord; -use anyhow::Context; +use anyhow::{ensure, Context}; use bytes::{Buf, Bytes}; +use itertools::Itertools; use pageserver_api::reltag::{RelTag, SlruKind}; use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM}; use postgres_ffi::BLCKSZ; @@ -673,17 +674,34 @@ pub struct DatadirModification<'a> { pub tline: &'a Timeline, /// Lsn assigned by begin_modification - pub lsn: Lsn, + lsn: Lsn, // The modifications are not applied directly to the underlying key-value store. // The put-functions add the modifications here, and they are flushed to the // underlying key-value store by the 'finish' function. - pending_updates: HashMap, + pending_updates: HashMap>, pending_deletions: Vec<(Range, Lsn)>, pending_nblocks: i64, } impl<'a> DatadirModification<'a> { + /// Get the current lsn + pub fn get_lsn(&self) -> Lsn { + self.lsn + } + + /// Set the current lsn + pub fn set_lsn(&mut self, lsn: Lsn) -> anyhow::Result<()> { + ensure!( + lsn >= self.lsn, + "setting an older lsn {} than {} is not allowed", + lsn, + self.lsn + ); + self.lsn = lsn; + Ok(()) + } + /// Initialize a completely new repository. /// /// This inserts the directory metadata entries that are assumed to @@ -1155,16 +1173,24 @@ impl<'a> DatadirModification<'a> { let writer = self.tline.writer().await; // Flush relation and SLRU data blocks, keep metadata. - let mut retained_pending_updates = HashMap::new(); - for (key, (lsn, value)) in self.pending_updates.drain() { - if is_rel_block_key(key) || is_slru_block_key(key) { - // This bails out on first error without modifying pending_updates. - // That's Ok, cf this function's doc comment. - writer.put(key, lsn, &value).await?; - } else { - retained_pending_updates.insert(key, (lsn, value)); + let mut retained_pending_updates: HashMap<_, Vec<_>> = HashMap::new(); + for (key, values) in self.pending_updates.drain() { + for (lsn, value) in values { + if is_rel_block_key(key) || is_slru_block_key(key) { + // This bails out on first error without modifying pending_updates. + // That's Ok, cf this function's doc comment. + writer.put(key, lsn, &value).await?; + } else { + retained_pending_updates + .entry(key) + .or_default() + .push((lsn, value)); + } } } + // The right way to extend this is to also merge the values in the corresponding + // keys, but since pending_updates is guaranteed to be empty after the drain, this + // should also be fine. self.pending_updates.extend(retained_pending_updates); if pending_nblocks != 0 { @@ -1192,8 +1218,13 @@ impl<'a> DatadirModification<'a> { let pending_updates = self .pending_updates .drain() - .map(|(key, (lsn, value))| (key, lsn, value)) - .collect(); + .map(|(key, pending_updates)| { + pending_updates + .into_iter() + .map(|(lsn, value)| (key, lsn, value)) + .collect::>() + }) + .concat(); writer.put_batch(pending_updates).await?; let pending_deletions = self.pending_deletions.drain(..).collect(); @@ -1216,27 +1247,36 @@ impl<'a> DatadirModification<'a> { // // Note: we don't check pending_deletions. It is an error to request a // value that has been removed, deletion only avoids leaking storage. - if let Some((_, value)) = self.pending_updates.get(&key) { - if let Value::Image(img) = value { - Ok(img.clone()) - } else { - // Currently, we never need to read back a WAL record that we - // inserted in the same "transaction". All the metadata updates - // work directly with Images, and we never need to read actual - // data pages. We could handle this if we had to, by calling - // the walredo manager, but let's keep it simple for now. - Err(PageReconstructError::from(anyhow::anyhow!( - "unexpected pending WAL record" - ))) + if let Some(values) = self.pending_updates.get(&key) { + if let Some((_, value)) = values.last() { + return if let Value::Image(img) = value { + Ok(img.clone()) + } else { + // Currently, we never need to read back a WAL record that we + // inserted in the same "transaction". All the metadata updates + // work directly with Images, and we never need to read actual + // data pages. We could handle this if we had to, by calling + // the walredo manager, but let's keep it simple for now. + Err(PageReconstructError::from(anyhow::anyhow!( + "unexpected pending WAL record" + ))) + }; } - } else { - let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn); - self.tline.get(key, lsn, ctx).await } + let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn); + self.tline.get(key, lsn, ctx).await } fn put(&mut self, key: Key, val: Value) { - self.pending_updates.insert(key, (self.lsn, val)); + let values = self.pending_updates.entry(key).or_default(); + // Replace the previous value if it exists at the same lsn + if let Some((last_lsn, last_value)) = values.last_mut() { + if *last_lsn == self.lsn { + *last_value = val; + return; + } + } + values.push((self.lsn, val)); } fn delete(&mut self, key_range: Range) { diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 16c770e5c8ec..9fc87ff254f1 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -3351,6 +3351,7 @@ pub mod harness { tenant_conf.evictions_low_residence_duration_metric_threshold, ), gc_feedback: Some(tenant_conf.gc_feedback), + batch_ingest: Some(tenant_conf.batch_ingest), } } } diff --git a/pageserver/src/tenant/config.rs b/pageserver/src/tenant/config.rs index ffe2c5eab651..b89bd6d41745 100644 --- a/pageserver/src/tenant/config.rs +++ b/pageserver/src/tenant/config.rs @@ -100,6 +100,7 @@ pub struct TenantConf { #[serde(with = "humantime_serde")] pub evictions_low_residence_duration_metric_threshold: Duration, pub gc_feedback: bool, + pub batch_ingest: bool, } /// Same as TenantConf, but this struct preserves the information about @@ -180,6 +181,10 @@ pub struct TenantConfOpt { #[serde(skip_serializing_if = "Option::is_none")] #[serde(default)] pub gc_feedback: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default)] + pub batch_ingest: Option, } #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] @@ -248,6 +253,7 @@ impl TenantConfOpt { .evictions_low_residence_duration_metric_threshold .unwrap_or(global_conf.evictions_low_residence_duration_metric_threshold), gc_feedback: self.gc_feedback.unwrap_or(global_conf.gc_feedback), + batch_ingest: self.batch_ingest.unwrap_or(global_conf.batch_ingest), } } } @@ -285,6 +291,7 @@ impl Default for TenantConf { ) .expect("cannot parse default evictions_low_residence_duration_metric_threshold"), gc_feedback: false, + batch_ingest: true, } } } @@ -381,6 +388,8 @@ impl TryFrom<&'_ models::TenantConfig> for TenantConfOpt { } tenant_conf.gc_feedback = request_data.gc_feedback; + tenant_conf.batch_ingest = request_data.batch_ingest; + Ok(tenant_conf) } } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 8ed84f8d1caa..a1e1c9fb83b3 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1567,6 +1567,9 @@ impl Timeline { let max_lsn_wal_lag = tenant_conf_guard .max_lsn_wal_lag .unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag); + let batch_ingest = tenant_conf_guard + .batch_ingest + .unwrap_or(self.conf.default_tenant_conf.batch_ingest); drop(tenant_conf_guard); let mut guard = self.walreceiver.lock().unwrap(); @@ -1582,6 +1585,7 @@ impl Timeline { max_lsn_wal_lag, auth_token: crate::config::SAFEKEEPER_AUTH_TOKEN.get().cloned(), availability_zone: self.conf.availability_zone.clone(), + batch_ingest, }, broker_client, ctx, diff --git a/pageserver/src/tenant/timeline/walreceiver.rs b/pageserver/src/tenant/timeline/walreceiver.rs index ccff735c3c6b..7e0c027382ec 100644 --- a/pageserver/src/tenant/timeline/walreceiver.rs +++ b/pageserver/src/tenant/timeline/walreceiver.rs @@ -57,6 +57,7 @@ pub struct WalReceiverConf { pub max_lsn_wal_lag: NonZeroU64, pub auth_token: Option>, pub availability_zone: Option, + pub batch_ingest: bool, } pub struct WalReceiver { diff --git a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs index 87b77d2fa0d7..72061195d4cf 100644 --- a/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs +++ b/pageserver/src/tenant/timeline/walreceiver/connection_manager.rs @@ -399,6 +399,7 @@ impl ConnectionManagerState { let node_id = new_sk.safekeeper_id; let connect_timeout = self.conf.wal_connect_timeout; + let batch_ingest = self.conf.batch_ingest; let timeline = Arc::clone(&self.timeline); let ctx = ctx.detached_child( TaskKind::WalReceiverConnectionHandler, @@ -418,6 +419,7 @@ impl ConnectionManagerState { connect_timeout, ctx, node_id, + batch_ingest, ) .await; @@ -1358,6 +1360,7 @@ mod tests { max_lsn_wal_lag: NonZeroU64::new(1024 * 1024).unwrap(), auth_token: None, availability_zone: None, + batch_ingest: true, }, wal_connection: None, wal_stream_candidates: HashMap::new(), diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index da1befecabca..3875bd747c6b 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -117,6 +117,7 @@ pub(super) async fn handle_walreceiver_connection( connect_timeout: Duration, ctx: RequestContext, node: NodeId, + batch_ingest: bool, ) -> Result<(), WalReceiverError> { debug_assert_current_span_has_tenant_and_timeline_id(); @@ -322,7 +323,7 @@ pub(super) async fn handle_walreceiver_connection( { let mut decoded = DecodedWALRecord::default(); - let mut modification = timeline.begin_modification(endlsn); + let mut modification = timeline.begin_modification(startlsn); while let Some((lsn, recdata)) = waldecoder.poll_decode()? { // It is important to deal with the aligned records as lsn in getPage@LSN is // aligned and can be several bytes bigger. Without this alignment we are @@ -338,7 +339,7 @@ pub(super) async fn handle_walreceiver_connection( &mut modification, &mut decoded, &ctx, - false, + !batch_ingest, ) .await .with_context(|| format!("could not ingest record at {lsn}"))?; @@ -347,8 +348,10 @@ pub(super) async fn handle_walreceiver_connection( last_rec_lsn = lsn; } - - modification.commit().await?; + if batch_ingest { + trace!("batch commit ingested WAL up to {}", last_rec_lsn); + modification.commit().await?; + } } if !caught_up && endlsn >= end_of_wal { diff --git a/pageserver/src/walingest.rs b/pageserver/src/walingest.rs index ebaaabd75687..018b78945b07 100644 --- a/pageserver/src/walingest.rs +++ b/pageserver/src/walingest.rs @@ -90,7 +90,7 @@ impl<'a> WalIngest<'a> { ctx: &RequestContext, commit: bool, ) -> anyhow::Result<()> { - modification.lsn = lsn; + modification.set_lsn(lsn)?; decode_wal_record(recdata, decoded, self.timeline.pg_version)?; let mut buf = decoded.record.clone(); @@ -513,7 +513,9 @@ impl<'a> WalIngest<'a> { // replaying it would fail to find the previous image of the page, because // it doesn't exist. So check if the VM page(s) exist, and skip the WAL // record if it doesn't. - let vm_size = self.get_relsize(vm_rel, modification.lsn, ctx).await?; + let vm_size = self + .get_relsize(vm_rel, modification.get_lsn(), ctx) + .await?; if let Some(blknum) = new_vm_blk { if blknum >= vm_size { new_vm_blk = None; @@ -711,7 +713,7 @@ impl<'a> WalIngest<'a> { modification.put_rel_page_image(rel, fsm_physical_page_no, ZERO_PAGE.clone())?; fsm_physical_page_no += 1; } - let nblocks = self.get_relsize(rel, modification.lsn, ctx).await?; + let nblocks = self.get_relsize(rel, modification.get_lsn(), ctx).await?; if nblocks > fsm_physical_page_no { // check if something to do: FSM is larger than truncate position self.put_rel_truncation(modification, rel, fsm_physical_page_no, ctx) @@ -733,7 +735,7 @@ impl<'a> WalIngest<'a> { modification.put_rel_page_image(rel, vm_page_no, ZERO_PAGE.clone())?; vm_page_no += 1; } - let nblocks = self.get_relsize(rel, modification.lsn, ctx).await?; + let nblocks = self.get_relsize(rel, modification.get_lsn(), ctx).await?; if nblocks > vm_page_no { // check if something to do: VM is larger than truncate position self.put_rel_truncation(modification, rel, vm_page_no, ctx) @@ -806,7 +808,7 @@ impl<'a> WalIngest<'a> { let mut csn_segno = csn_pageno / pg_constants::SLRU_PAGES_PER_SEGMENT; let mut csn_rpageno = csn_pageno % pg_constants::SLRU_PAGES_PER_SEGMENT; let mut csn_page_xids: Vec = vec![parsed.xid]; - let lsn: XidCSN = modification.lsn.0; + let lsn: XidCSN = modification.get_lsn().0; for subxact in &parsed.subxacts { let csn_subxact_pageno = ((subxact / pg_constants::CSN_LOG_XACTS_PER_PAGE) @@ -1212,7 +1214,7 @@ impl<'a> WalIngest<'a> { // Check if the relation exists. We implicitly create relations on first // record. // TODO: would be nice if to be more explicit about it - let last_lsn = modification.lsn; + let last_lsn = modification.get_lsn(); let old_nblocks = if !self .timeline .get_rel_exists(rel, last_lsn, true, ctx) diff --git a/test_runner/regress/test_attach_tenant_config.py b/test_runner/regress/test_attach_tenant_config.py index bc6afa84a1ab..51f45a905cc7 100644 --- a/test_runner/regress/test_attach_tenant_config.py +++ b/test_runner/regress/test_attach_tenant_config.py @@ -163,6 +163,7 @@ def test_fully_custom_config(positive_env: NeonEnv): }, "evictions_low_residence_duration_metric_threshold": "2days", "gc_feedback": True, + "batch_ingest": True, "gc_horizon": 23 * (1024 * 1024), "gc_period": "2h 13m", "image_creation_threshold": 7,