Skip to content

Commit

Permalink
Switch the last Persist dynamic configs to dyncfg
Browse files Browse the repository at this point in the history
These weren't actually configurable without a code change!
  • Loading branch information
bkirwi committed Nov 21, 2024
1 parent 077ce5c commit 7f3d7e5
Show file tree
Hide file tree
Showing 10 changed files with 124 additions and 179 deletions.
11 changes: 6 additions & 5 deletions src/persist-client/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use timely::PartialOrder;
use tracing::{debug_span, trace_span, warn, Instrument};

use crate::async_runtime::IsolatedRuntime;
use crate::cfg::MiB;
use crate::cfg::{MiB, BATCH_BUILDER_MAX_OUTSTANDING_PARTS};
use crate::error::InvalidUsage;
use crate::internal::compact::{CompactConfig, Compactor};
use crate::internal::encoding::{LazyInlineBatchPart, LazyPartStats, LazyProto, Schemas};
Expand Down Expand Up @@ -480,9 +480,7 @@ impl BatchBuilderConfig {
writer_key,
blob_target_size: BLOB_TARGET_SIZE.get(value).clamp(1, usize::MAX),
batch_delete_enabled: BATCH_DELETE_ENABLED.get(value),
batch_builder_max_outstanding_parts: value
.dynamic
.batch_builder_max_outstanding_parts(),
batch_builder_max_outstanding_parts: BATCH_BUILDER_MAX_OUTSTANDING_PARTS.get(value),
batch_columnar_format,
batch_columnar_format_percent,
inline_writes_single_max_bytes: INLINE_WRITES_SINGLE_MAX_BYTES.get(value),
Expand Down Expand Up @@ -1644,6 +1642,7 @@ mod tests {
use timely::order::Product;

use crate::cache::PersistClientCache;
use crate::cfg::BATCH_BUILDER_MAX_OUTSTANDING_PARTS;
use crate::internal::paths::{BlobKey, PartialBlobKey};
use crate::tests::{all_ok, new_test_client};
use crate::PersistLocation;
Expand All @@ -1667,7 +1666,9 @@ mod tests {
// edge cases below.
cache.cfg.set_config(&BLOB_TARGET_SIZE, 0);
cache.cfg.set_config(&MAX_RUNS, 3);
cache.cfg.dynamic.set_batch_builder_max_outstanding_parts(2);
cache
.cfg
.set_config(&BATCH_BUILDER_MAX_OUTSTANDING_PARTS, 2);

let client = cache
.open(PersistLocation::new_in_mem())
Expand Down
221 changes: 83 additions & 138 deletions src/persist-client/src/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

//! The tunable knobs for persist.

use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

Expand Down Expand Up @@ -107,8 +106,6 @@ pub struct PersistConfig {
/// Indicates whether `configs` has been synced at least once with an
/// upstream source.
configs_synced_once: Arc<watch::Sender<bool>>,
/// Configurations that can be dynamically updated.
pub dynamic: Arc<DynamicConfig>,
/// Whether to physically and logically compact batches in blob storage.
pub compaction_enabled: bool,
/// In Compactor::compact_and_apply_background, the maximum number of concurrent
Expand Down Expand Up @@ -182,18 +179,6 @@ impl PersistConfig {
now,
configs: Arc::new(configs),
configs_synced_once: Arc::new(configs_synced_once),
dynamic: Arc::new(DynamicConfig {
batch_builder_max_outstanding_parts: AtomicUsize::new(2),
compaction_heuristic_min_inputs: AtomicUsize::new(8),
compaction_heuristic_min_parts: AtomicUsize::new(8),
compaction_heuristic_min_updates: AtomicUsize::new(1024),
compaction_memory_bound_bytes: AtomicUsize::new(1024 * MiB),
gc_blob_delete_concurrency_limit: AtomicUsize::new(32),
state_versions_recent_live_diffs_limit: AtomicUsize::new(
30 * ROLLUP_THRESHOLD.default(),
),
usage_state_fetch_concurrency_limit: AtomicUsize::new(8),
}),
compaction_enabled: !compaction_disabled,
compaction_concurrency_limit: 5,
compaction_queue_size: 20,
Expand Down Expand Up @@ -328,6 +313,14 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
.add(&crate::cfg::USE_CRITICAL_SINCE_SOURCE)
.add(&crate::cfg::USE_CRITICAL_SINCE_SNAPSHOT)
.add(&crate::cfg::USE_GLOBAL_TXN_CACHE_SOURCE)
.add(&BATCH_BUILDER_MAX_OUTSTANDING_PARTS)
.add(&COMPACTION_HEURISTIC_MIN_INPUTS)
.add(&COMPACTION_HEURISTIC_MIN_PARTS)
.add(&COMPACTION_HEURISTIC_MIN_UPDATES)
.add(&COMPACTION_MEMORY_BOUND_BYTES)
.add(&GC_BLOB_DELETE_CONCURRENCY_LIMIT)
.add(&STATE_VERSIONS_RECENT_LIVE_DIFFS_LIMIT)
.add(&USAGE_STATE_FETCH_CONCURRENCY_LIMIT)
.add(&crate::cli::admin::CATALOG_FORCE_COMPACTION_FUEL)
.add(&crate::cli::admin::CATALOG_FORCE_COMPACTION_WAIT)
.add(&crate::cli::admin::EXPRESSION_CACHE_FORCE_COMPACTION_FUEL)
Expand Down Expand Up @@ -367,14 +360,6 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {

impl PersistConfig {
pub(crate) const DEFAULT_FALLBACK_ROLLUP_THRESHOLD_MULTIPLIER: usize = 3;

// TODO: Get rid of this in favor of using dyncfgs at the
// relevant callsites.
pub fn set_state_versions_recent_live_diffs_limit(&self, val: usize) {
self.dynamic
.state_versions_recent_live_diffs_limit
.store(val, DynamicConfig::STORE_ORDERING);
}
}

/// The minimum TTL of a connection to Postgres/CRDB before it is proactively
Expand Down Expand Up @@ -456,6 +441,81 @@ pub const USE_GLOBAL_TXN_CACHE_SOURCE: Config<bool> = Config::new(
"Use the process global txn cache (instead of an operator local one) in the Persist source.",
);

/// The maximum number of parts (s3 blobs) that [crate::batch::BatchBuilder]
/// will pipeline before back-pressuring [crate::batch::BatchBuilder::add]
/// calls on previous ones finishing.
pub const BATCH_BUILDER_MAX_OUTSTANDING_PARTS: Config<usize> = Config::new(
"persist_batch_builder_max_outstanding_parts",
2,
"The number of writes a batch builder can have outstanding before we slow down the writer.",
);

/// In Compactor::compact_and_apply, we do the compaction (don't skip it)
/// if the number of inputs is at least this many. Compaction is performed
/// if any of the heuristic criteria are met (they are OR'd).
pub const COMPACTION_HEURISTIC_MIN_INPUTS: Config<usize> = Config::new(
"persist_compaction_heuristic_min_inputs",
8,
"Don't skip compaction if we have more than this many hollow batches as input.",
);

/// In Compactor::compact_and_apply, we do the compaction (don't skip it)
/// if the number of batch parts is at least this many. Compaction is performed
/// if any of the heuristic criteria are met (they are OR'd).
pub const COMPACTION_HEURISTIC_MIN_PARTS: Config<usize> = Config::new(
"persist_compaction_heuristic_min_parts",
8,
"Don't skip compaction if we have more than this many parts as input.",
);

/// In Compactor::compact_and_apply, we do the compaction (don't skip it)
/// if the number of updates is at least this many. Compaction is performed
/// if any of the heuristic criteria are met (they are OR'd).
pub const COMPACTION_HEURISTIC_MIN_UPDATES: Config<usize> = Config::new(
"persist_compaction_heuristic_min_updates",
1024,
"Don't skip compaction if we have more than this many updates as input.",
);

/// The upper bound on compaction's memory consumption. The value must be at
/// least 4*`blob_target_size`. Increasing this value beyond the minimum allows
/// compaction to merge together more runs at once, providing greater
/// consolidation of updates, at the cost of greater memory usage.
pub const COMPACTION_MEMORY_BOUND_BYTES: Config<usize> = Config::new(
"persist_compaction_memory_bound_bytes",
1024 * MiB,
"Attempt to limit compaction to this amount of memory.",
);

/// The maximum number of concurrent blob deletes during garbage collection.
pub const GC_BLOB_DELETE_CONCURRENCY_LIMIT: Config<usize> = Config::new(
"persist_gc_blob_delete_concurrency_limit",
32,
"Limit the number of concurrent deletes GC can perform to this threshold.",
);

/// The # of diffs to initially scan when fetching the latest consensus state, to
/// determine which requests go down the fast vs slow path. Should be large enough
/// to fetch all live diffs in the steady-state, and small enough to query Consensus
/// at high volume. Steady-state usage should accommodate readers that require
/// seqno-holds for reasonable amounts of time, which to start we say is 10s of minutes.
///
/// This value ought to be defined in terms of `NEED_ROLLUP_THRESHOLD` to approximate
/// when we expect rollups to be written and therefore when old states will be truncated
/// by GC.
pub const STATE_VERSIONS_RECENT_LIVE_DIFFS_LIMIT: Config<usize> = Config::new(
"persist_state_versions_recent_live_diffs_limit",
30 * 128,
"Fetch this many diffs when fetching recent diffs.",
);

/// The maximum number of concurrent state fetches during usage computation.
pub const USAGE_STATE_FETCH_CONCURRENCY_LIMIT: Config<usize> = Config::new(
"persist_usage_state_fetch_concurrency_limit",
8,
"Limit the concurrency in of fetching in the perioding Persist-storage-usage calculation.",
);

impl PostgresClientKnobs for PersistConfig {
fn connection_pool_max_size(&self) -> usize {
self.consensus_connection_pool_max_size
Expand All @@ -482,37 +542,6 @@ impl PostgresClientKnobs for PersistConfig {
}
}

/// Persist configurations that can be dynamically updated.
///
/// Persist is expected to react to each of these such that updating the value
/// returned by the function takes effect in persist (i.e. no caching it). This
/// should happen "as promptly as reasonably possible" where that's defined by
/// the tradeoffs of complexity vs promptness. For example, we might use a
/// consistent version of `BLOB_TARGET_SIZE` for the entirety of a single
/// compaction call. However, it should _never_ require a process restart for an
/// update of these to take effect.
///
/// These are hooked up to LaunchDarkly. Specifically, LaunchDarkly configs are
/// serialized into a [mz_dyncfg::ConfigUpdates]. In environmentd, these are applied
/// directly via [mz_dyncfg::ConfigUpdates::apply] to the [PersistConfig] in
/// [crate::cache::PersistClientCache]. There is one `PersistClientCache` per
/// process, and every `PersistConfig` shares the same `Arc<DynamicConfig>`, so
/// this affects all [DynamicConfig] usage in the process. The
/// `ConfigUpdates` is also sent via the compute and storage command
/// streams, which then apply it to all computed/storaged/clusterd processes as
/// well.
#[derive(Debug)]
pub struct DynamicConfig {
batch_builder_max_outstanding_parts: AtomicUsize,
compaction_heuristic_min_inputs: AtomicUsize,
compaction_heuristic_min_parts: AtomicUsize,
compaction_heuristic_min_updates: AtomicUsize,
compaction_memory_bound_bytes: AtomicUsize,
gc_blob_delete_concurrency_limit: AtomicUsize,
state_versions_recent_live_diffs_limit: AtomicUsize,
usage_state_fetch_concurrency_limit: AtomicUsize,
}

#[derive(Copy, Clone, Debug, Eq, PartialEq, Arbitrary, Serialize, Deserialize)]
pub struct RetryParameters {
pub fixed_sleep: Duration,
Expand All @@ -536,90 +565,6 @@ impl RetryParameters {
}
}

impl DynamicConfig {
// TODO: Decide if we can relax these.
const LOAD_ORDERING: Ordering = Ordering::SeqCst;
const STORE_ORDERING: Ordering = Ordering::SeqCst;

/// The maximum number of parts (s3 blobs) that [crate::batch::BatchBuilder]
/// will pipeline before back-pressuring [crate::batch::BatchBuilder::add]
/// calls on previous ones finishing.
pub fn batch_builder_max_outstanding_parts(&self) -> usize {
self.batch_builder_max_outstanding_parts
.load(Self::LOAD_ORDERING)
}

/// In Compactor::compact_and_apply, we do the compaction (don't skip it)
/// if the number of inputs is at least this many. Compaction is performed
/// if any of the heuristic criteria are met (they are OR'd).
pub fn compaction_heuristic_min_inputs(&self) -> usize {
self.compaction_heuristic_min_inputs
.load(Self::LOAD_ORDERING)
}

/// In Compactor::compact_and_apply, we do the compaction (don't skip it)
/// if the number of batch parts is at least this many. Compaction is performed
/// if any of the heuristic criteria are met (they are OR'd).
pub fn compaction_heuristic_min_parts(&self) -> usize {
self.compaction_heuristic_min_parts
.load(Self::LOAD_ORDERING)
}

/// In Compactor::compact_and_apply, we do the compaction (don't skip it)
/// if the number of updates is at least this many. Compaction is performed
/// if any of the heuristic criteria are met (they are OR'd).
pub fn compaction_heuristic_min_updates(&self) -> usize {
self.compaction_heuristic_min_updates
.load(Self::LOAD_ORDERING)
}

/// The upper bound on compaction's memory consumption. The value must be at
/// least 4*`blob_target_size`. Increasing this value beyond the minimum allows
/// compaction to merge together more runs at once, providing greater
/// consolidation of updates, at the cost of greater memory usage.
pub fn compaction_memory_bound_bytes(&self) -> usize {
self.compaction_memory_bound_bytes.load(Self::LOAD_ORDERING)
}

/// The maximum number of concurrent blob deletes during garbage collection.
pub fn gc_blob_delete_concurrency_limit(&self) -> usize {
self.gc_blob_delete_concurrency_limit
.load(Self::LOAD_ORDERING)
}

/// The # of diffs to initially scan when fetching the latest consensus state, to
/// determine which requests go down the fast vs slow path. Should be large enough
/// to fetch all live diffs in the steady-state, and small enough to query Consensus
/// at high volume. Steady-state usage should accommodate readers that require
/// seqno-holds for reasonable amounts of time, which to start we say is 10s of minutes.
///
/// This value ought to be defined in terms of `NEED_ROLLUP_THRESHOLD` to approximate
/// when we expect rollups to be written and therefore when old states will be truncated
/// by GC.
pub fn state_versions_recent_live_diffs_limit(&self) -> usize {
self.state_versions_recent_live_diffs_limit
.load(Self::LOAD_ORDERING)
}

/// The maximum number of concurrent state fetches during usage computation.
pub fn usage_state_fetch_concurrency_limit(&self) -> usize {
self.usage_state_fetch_concurrency_limit
.load(Self::LOAD_ORDERING)
}

// TODO: Get rid of these in favor of using dyncfgs at the
// relevant callsites.
#[cfg(test)]
pub fn set_batch_builder_max_outstanding_parts(&self, val: usize) {
self.batch_builder_max_outstanding_parts
.store(val, Self::LOAD_ORDERING);
}
pub fn set_compaction_memory_bound_bytes(&self, val: usize) {
self.compaction_memory_bound_bytes
.store(val, Self::LOAD_ORDERING);
}
}

// TODO: Replace with dynamic values when PersistConfig is integrated with LD
impl BlobKnobs for PersistConfig {
fn operation_timeout(&self) -> Duration {
Expand Down
11 changes: 6 additions & 5 deletions src/persist-client/src/cli/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use tracing::{info, warn};

use crate::async_runtime::IsolatedRuntime;
use crate::cache::StateCache;
use crate::cfg::all_dyncfgs;
use crate::cfg::{all_dyncfgs, COMPACTION_MEMORY_BOUND_BYTES};
use crate::cli::args::{make_blob, make_consensus, StateArgs, StoreArgs};
use crate::cli::inspect::FAKE_OPAQUE_CODEC;
use crate::internal::compact::{CompactConfig, CompactReq, Compactor};
Expand Down Expand Up @@ -131,10 +131,11 @@ pub async fn run(command: AdminArgs) -> Result<(), anyhow::Error> {
let configs = all_dyncfgs(ConfigSet::default());
// TODO: Fetch the latest values of these configs from Launch Darkly.
let cfg = PersistConfig::new(&BUILD_INFO, SYSTEM_TIME.clone(), configs);
if args.compaction_memory_bound_bytes > 0 {
cfg.dynamic
.set_compaction_memory_bound_bytes(args.compaction_memory_bound_bytes);
}
cfg.set_config(
&COMPACTION_MEMORY_BOUND_BYTES,
args.compaction_memory_bound_bytes,
);

let metrics_registry = MetricsRegistry::new();
let expected_version = command
.expected_version
Expand Down
20 changes: 10 additions & 10 deletions src/persist-client/src/internal/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@ use tracing::{debug, debug_span, error, trace, warn, Instrument, Span};

use crate::async_runtime::IsolatedRuntime;
use crate::batch::{BatchBuilderConfig, BatchBuilderInternal, BatchParts, PartDeletes};
use crate::cfg::MiB;
use crate::cfg::{
MiB, COMPACTION_HEURISTIC_MIN_INPUTS, COMPACTION_HEURISTIC_MIN_PARTS,
COMPACTION_HEURISTIC_MIN_UPDATES, COMPACTION_MEMORY_BOUND_BYTES,
GC_BLOB_DELETE_CONCURRENCY_LIMIT,
};
use crate::fetch::FetchBatchFilter;
use crate::internal::encoding::Schemas;
use crate::internal::gc::GarbageCollector;
Expand Down Expand Up @@ -81,7 +85,7 @@ impl CompactConfig {
/// Initialize the compaction config from Persist configuration.
pub fn new(value: &PersistConfig, shard_id: ShardId) -> Self {
CompactConfig {
compaction_memory_bound_bytes: value.dynamic.compaction_memory_bound_bytes(),
compaction_memory_bound_bytes: COMPACTION_MEMORY_BOUND_BYTES.get(value),
compaction_yield_after_n_updates: value.compaction_yield_after_n_updates,
version: value.build_version.clone(),
batch: BatchBuilderConfig::new(value, shard_id),
Expand Down Expand Up @@ -242,11 +246,11 @@ where
// were just written, but it does result in non-trivial blob traffic
// (especially in aggregate). This heuristic is something we'll need to
// tune over time.
let should_compact = req.inputs.len() >= self.cfg.dynamic.compaction_heuristic_min_inputs()
let should_compact = req.inputs.len() >= COMPACTION_HEURISTIC_MIN_INPUTS.get(&self.cfg)
|| req.inputs.iter().map(|x| x.part_count()).sum::<usize>()
>= self.cfg.dynamic.compaction_heuristic_min_parts()
>= COMPACTION_HEURISTIC_MIN_PARTS.get(&self.cfg)
|| req.inputs.iter().map(|x| x.len).sum::<usize>()
>= self.cfg.dynamic.compaction_heuristic_min_updates();
>= COMPACTION_HEURISTIC_MIN_UPDATES.get(&self.cfg);
if !should_compact {
self.metrics.compaction.skipped.inc();
return None;
Expand Down Expand Up @@ -409,11 +413,7 @@ where
.delete(
machine.applier.state_versions.blob.as_ref(),
machine.shard_id(),
machine
.applier
.cfg
.dynamic
.gc_blob_delete_concurrency_limit(),
GC_BLOB_DELETE_CONCURRENCY_LIMIT.get(&machine.applier.cfg),
&*metrics,
&metrics.retries.external.compaction_noop_delete,
)
Expand Down
Loading

0 comments on commit 7f3d7e5

Please sign in to comment.