Skip to content

Commit

Permalink
Implementation for batch ingestion
Browse files Browse the repository at this point in the history
  • Loading branch information
ctring committed Nov 5, 2023
1 parent c3d043c commit 94fa622
Show file tree
Hide file tree
Showing 12 changed files with 124 additions and 39 deletions.
10 changes: 10 additions & 0 deletions control_plane/src/pageserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,11 @@ impl PageServerNode {
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'gc_feedback' as bool")?,
batch_ingest: settings
.remove("batch_ingest")
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'batch_ingest' as bool")?,
};

// If tenant ID was not specified, generate one
Expand Down Expand Up @@ -479,6 +484,11 @@ impl PageServerNode {
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'gc_feedback' as bool")?,
batch_ingest: settings
.remove("batch_ingest")
.map(|x| x.parse::<bool>())
.transpose()
.context("Failed to parse 'batch_ingest' as bool")?,
}
};

Expand Down
2 changes: 2 additions & 0 deletions libs/pageserver_api/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ pub struct TenantConfig {
pub min_resident_size_override: Option<u64>,
pub evictions_low_residence_duration_metric_threshold: Option<String>,
pub gc_feedback: Option<bool>,
pub batch_ingest: Option<bool>,
}

#[serde_as]
Expand Down Expand Up @@ -291,6 +292,7 @@ impl TenantConfigRequest {
min_resident_size_override: None,
evictions_low_residence_duration_metric_threshold: None,
gc_feedback: None,
batch_ingest: None,
};
TenantConfigRequest { tenant_id, config }
}
Expand Down
9 changes: 9 additions & 0 deletions pageserver/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ pub mod defaults {
#min_resident_size_override = .. # in bytes
#evictions_low_residence_duration_metric_threshold = '{DEFAULT_EVICTIONS_LOW_RESIDENCE_DURATION_METRIC_THRESHOLD}'
#gc_feedback = false
#batch_ingest = true
[remote_storage]
Expand Down Expand Up @@ -884,6 +885,14 @@ impl PageServerConf {
);
}

if let Some(batch_ingest) = item.get("batch_ingest") {
t_conf.batch_ingest = Some(
batch_ingest
.as_bool()
.with_context(|| "configure option batch_ingest is not a bool".to_string())?,
);
}

Ok(t_conf)
}

Expand Down
98 changes: 69 additions & 29 deletions pageserver/src/pgdatadir_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ use crate::keyspace::{KeySpace, KeySpaceAccum};
use crate::metrics::WAL_COMMIT_WRITER_LOCK_WAIT_TIME;
use crate::repository::*;
use crate::walrecord::NeonWalRecord;
use anyhow::Context;
use anyhow::{ensure, Context};
use bytes::{Buf, Bytes};
use itertools::Itertools;
use pageserver_api::reltag::{RelTag, SlruKind};
use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi::BLCKSZ;
Expand Down Expand Up @@ -673,17 +674,34 @@ pub struct DatadirModification<'a> {
pub tline: &'a Timeline,

/// Lsn assigned by begin_modification
pub lsn: Lsn,
lsn: Lsn,

// The modifications are not applied directly to the underlying key-value store.
// The put-functions add the modifications here, and they are flushed to the
// underlying key-value store by the 'finish' function.
pending_updates: HashMap<Key, (Lsn, Value)>,
pending_updates: HashMap<Key, Vec<(Lsn, Value)>>,
pending_deletions: Vec<(Range<Key>, Lsn)>,
pending_nblocks: i64,
}

impl<'a> DatadirModification<'a> {
/// Get the current lsn
pub fn get_lsn(&self) -> Lsn {
self.lsn
}

/// Set the current lsn
pub fn set_lsn(&mut self, lsn: Lsn) -> anyhow::Result<()> {
ensure!(
lsn >= self.lsn,
"setting an older lsn {} than {} is not allowed",
lsn,
self.lsn
);
self.lsn = lsn;
Ok(())
}

/// Initialize a completely new repository.
///
/// This inserts the directory metadata entries that are assumed to
Expand Down Expand Up @@ -1155,16 +1173,24 @@ impl<'a> DatadirModification<'a> {
let writer = self.tline.writer().await;

// Flush relation and SLRU data blocks, keep metadata.
let mut retained_pending_updates = HashMap::new();
for (key, (lsn, value)) in self.pending_updates.drain() {
if is_rel_block_key(key) || is_slru_block_key(key) {
// This bails out on first error without modifying pending_updates.
// That's Ok, cf this function's doc comment.
writer.put(key, lsn, &value).await?;
} else {
retained_pending_updates.insert(key, (lsn, value));
let mut retained_pending_updates: HashMap<_, Vec<_>> = HashMap::new();
for (key, values) in self.pending_updates.drain() {
for (lsn, value) in values {
if is_rel_block_key(key) || is_slru_block_key(key) {
// This bails out on first error without modifying pending_updates.
// That's Ok, cf this function's doc comment.
writer.put(key, lsn, &value).await?;
} else {
retained_pending_updates
.entry(key)
.or_default()
.push((lsn, value));
}
}
}
// The right way to extend this is to also merge the values in the corresponding
// keys, but since pending_updates is guaranteed to be empty after the drain, this
// should also be fine.
self.pending_updates.extend(retained_pending_updates);

if pending_nblocks != 0 {
Expand Down Expand Up @@ -1192,8 +1218,13 @@ impl<'a> DatadirModification<'a> {
let pending_updates = self
.pending_updates
.drain()
.map(|(key, (lsn, value))| (key, lsn, value))
.collect();
.map(|(key, pending_updates)| {
pending_updates
.into_iter()
.map(|(lsn, value)| (key, lsn, value))
.collect::<Vec<_>>()
})
.concat();
writer.put_batch(pending_updates).await?;

let pending_deletions = self.pending_deletions.drain(..).collect();
Expand All @@ -1216,27 +1247,36 @@ impl<'a> DatadirModification<'a> {
//
// Note: we don't check pending_deletions. It is an error to request a
// value that has been removed, deletion only avoids leaking storage.
if let Some((_, value)) = self.pending_updates.get(&key) {
if let Value::Image(img) = value {
Ok(img.clone())
} else {
// Currently, we never need to read back a WAL record that we
// inserted in the same "transaction". All the metadata updates
// work directly with Images, and we never need to read actual
// data pages. We could handle this if we had to, by calling
// the walredo manager, but let's keep it simple for now.
Err(PageReconstructError::from(anyhow::anyhow!(
"unexpected pending WAL record"
)))
if let Some(values) = self.pending_updates.get(&key) {
if let Some((_, value)) = values.last() {
return if let Value::Image(img) = value {
Ok(img.clone())
} else {
// Currently, we never need to read back a WAL record that we
// inserted in the same "transaction". All the metadata updates
// work directly with Images, and we never need to read actual
// data pages. We could handle this if we had to, by calling
// the walredo manager, but let's keep it simple for now.
Err(PageReconstructError::from(anyhow::anyhow!(
"unexpected pending WAL record"
)))
};
}
} else {
let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
self.tline.get(key, lsn, ctx).await
}
let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
self.tline.get(key, lsn, ctx).await
}

fn put(&mut self, key: Key, val: Value) {
self.pending_updates.insert(key, (self.lsn, val));
let values = self.pending_updates.entry(key).or_default();
// Replace the previous value if it exists at the same lsn
if let Some((last_lsn, last_value)) = values.last_mut() {
if *last_lsn == self.lsn {
*last_value = val;
return;
}
}
values.push((self.lsn, val));
}

fn delete(&mut self, key_range: Range<Key>) {
Expand Down
1 change: 1 addition & 0 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3351,6 +3351,7 @@ pub mod harness {
tenant_conf.evictions_low_residence_duration_metric_threshold,
),
gc_feedback: Some(tenant_conf.gc_feedback),
batch_ingest: Some(tenant_conf.batch_ingest),
}
}
}
Expand Down
9 changes: 9 additions & 0 deletions pageserver/src/tenant/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ pub struct TenantConf {
#[serde(with = "humantime_serde")]
pub evictions_low_residence_duration_metric_threshold: Duration,
pub gc_feedback: bool,
pub batch_ingest: bool,
}

/// Same as TenantConf, but this struct preserves the information about
Expand Down Expand Up @@ -180,6 +181,10 @@ pub struct TenantConfOpt {
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub gc_feedback: Option<bool>,

#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
pub batch_ingest: Option<bool>,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
Expand Down Expand Up @@ -248,6 +253,7 @@ impl TenantConfOpt {
.evictions_low_residence_duration_metric_threshold
.unwrap_or(global_conf.evictions_low_residence_duration_metric_threshold),
gc_feedback: self.gc_feedback.unwrap_or(global_conf.gc_feedback),
batch_ingest: self.batch_ingest.unwrap_or(global_conf.batch_ingest),
}
}
}
Expand Down Expand Up @@ -285,6 +291,7 @@ impl Default for TenantConf {
)
.expect("cannot parse default evictions_low_residence_duration_metric_threshold"),
gc_feedback: false,
batch_ingest: true,
}
}
}
Expand Down Expand Up @@ -381,6 +388,8 @@ impl TryFrom<&'_ models::TenantConfig> for TenantConfOpt {
}
tenant_conf.gc_feedback = request_data.gc_feedback;

tenant_conf.batch_ingest = request_data.batch_ingest;

Ok(tenant_conf)
}
}
Expand Down
4 changes: 4 additions & 0 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1567,6 +1567,9 @@ impl Timeline {
let max_lsn_wal_lag = tenant_conf_guard
.max_lsn_wal_lag
.unwrap_or(self.conf.default_tenant_conf.max_lsn_wal_lag);
let batch_ingest = tenant_conf_guard
.batch_ingest
.unwrap_or(self.conf.default_tenant_conf.batch_ingest);
drop(tenant_conf_guard);

let mut guard = self.walreceiver.lock().unwrap();
Expand All @@ -1582,6 +1585,7 @@ impl Timeline {
max_lsn_wal_lag,
auth_token: crate::config::SAFEKEEPER_AUTH_TOKEN.get().cloned(),
availability_zone: self.conf.availability_zone.clone(),
batch_ingest,
},
broker_client,
ctx,
Expand Down
1 change: 1 addition & 0 deletions pageserver/src/tenant/timeline/walreceiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub struct WalReceiverConf {
pub max_lsn_wal_lag: NonZeroU64,
pub auth_token: Option<Arc<String>>,
pub availability_zone: Option<String>,
pub batch_ingest: bool,
}

pub struct WalReceiver {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ impl ConnectionManagerState {

let node_id = new_sk.safekeeper_id;
let connect_timeout = self.conf.wal_connect_timeout;
let batch_ingest = self.conf.batch_ingest;
let timeline = Arc::clone(&self.timeline);
let ctx = ctx.detached_child(
TaskKind::WalReceiverConnectionHandler,
Expand All @@ -418,6 +419,7 @@ impl ConnectionManagerState {
connect_timeout,
ctx,
node_id,
batch_ingest,
)
.await;

Expand Down Expand Up @@ -1358,6 +1360,7 @@ mod tests {
max_lsn_wal_lag: NonZeroU64::new(1024 * 1024).unwrap(),
auth_token: None,
availability_zone: None,
batch_ingest: true,
},
wal_connection: None,
wal_stream_candidates: HashMap::new(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ pub(super) async fn handle_walreceiver_connection(
connect_timeout: Duration,
ctx: RequestContext,
node: NodeId,
batch_ingest: bool,
) -> Result<(), WalReceiverError> {
debug_assert_current_span_has_tenant_and_timeline_id();

Expand Down Expand Up @@ -322,7 +323,7 @@ pub(super) async fn handle_walreceiver_connection(

{
let mut decoded = DecodedWALRecord::default();
let mut modification = timeline.begin_modification(endlsn);
let mut modification = timeline.begin_modification(startlsn);
while let Some((lsn, recdata)) = waldecoder.poll_decode()? {
// It is important to deal with the aligned records as lsn in getPage@LSN is
// aligned and can be several bytes bigger. Without this alignment we are
Expand All @@ -338,7 +339,7 @@ pub(super) async fn handle_walreceiver_connection(
&mut modification,
&mut decoded,
&ctx,
false,
!batch_ingest,
)
.await
.with_context(|| format!("could not ingest record at {lsn}"))?;
Expand All @@ -347,8 +348,10 @@ pub(super) async fn handle_walreceiver_connection(

last_rec_lsn = lsn;
}

modification.commit().await?;
if batch_ingest {
trace!("batch commit ingested WAL up to {}", last_rec_lsn);
modification.commit().await?;
}
}

if !caught_up && endlsn >= end_of_wal {
Expand Down
Loading

0 comments on commit 94fa622

Please sign in to comment.