Skip to content

Commit

Permalink
Merge pull request #25014 from aljoscha/adapter-crdb-oracle-default
Browse files Browse the repository at this point in the history
coord: make timestamp_oracle=Postgres the default in code
  • Loading branch information
aljoscha authored Feb 8, 2024
2 parents 7a402c1 + 5ca194a commit f44a426
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 62 deletions.
54 changes: 30 additions & 24 deletions src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -659,31 +659,37 @@ impl Catalog {
let metrics_registry = &MetricsRegistry::new();
let active_connection_count = Arc::new(std::sync::Mutex::new(ConnectionCounter::new(0)));
let secrets_reader = Arc::new(InMemorySecretsController::new());
let (catalog, _, _, _) = Catalog::open(Config {
storage,
metrics_registry,
// when debugging, no reaping
storage_usage_retention_period: None,
state: StateConfig {
unsafe_mode: true,
all_features: false,
build_info: &DUMMY_BUILD_INFO,
environment_id: environment_id.unwrap_or(EnvironmentId::for_tests()),
now,
skip_migrations: true,
cluster_replica_sizes: Default::default(),
builtin_cluster_replica_size: "1".into(),
system_parameter_defaults: Default::default(),
remote_system_parameters: None,
availability_zones: vec![],
egress_ips: vec![],
aws_principal_context: None,
aws_privatelink_availability_zones: None,
http_host_name: None,
connection_context: ConnectionContext::for_tests(secrets_reader),
active_connection_count,
// Used as a lower boundary of the boot_ts, but it's ok to use now() for
// debugging/testing.
let previous_ts = now().into();
let (catalog, _, _, _) = Catalog::open(
Config {
storage,
metrics_registry,
// when debugging, no reaping
storage_usage_retention_period: None,
state: StateConfig {
unsafe_mode: true,
all_features: false,
build_info: &DUMMY_BUILD_INFO,
environment_id: environment_id.unwrap_or(EnvironmentId::for_tests()),
now,
skip_migrations: true,
cluster_replica_sizes: Default::default(),
builtin_cluster_replica_size: "1".into(),
system_parameter_defaults: Default::default(),
remote_system_parameters: None,
availability_zones: vec![],
egress_ips: vec![],
aws_principal_context: None,
aws_privatelink_availability_zones: None,
http_host_name: None,
connection_context: ConnectionContext::for_tests(secrets_reader),
active_connection_count,
},
},
})
previous_ts,
)
.await?;
Ok(catalog)
}
Expand Down
14 changes: 10 additions & 4 deletions src/adapter/src/catalog/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,11 +182,15 @@ impl Catalog {
/// Initializes a CatalogState. Separate from [`Catalog::open`] to avoid depending on state
/// external to a [mz_catalog::durable::DurableCatalogState] (for example: no [mz_secrets::SecretsReader]).
///
/// The passed in `previous_ts` must be the highest read timestamp for
/// [Timeline::EpochMilliseconds] known across all timestamp oracles.
///
/// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 7.5KB. This would
/// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
/// Because of that we purposefully move this Future onto the heap (i.e. Box it).
pub fn initialize_state<'a>(
config: StateConfig,
previous_ts: mz_repr::Timestamp,
storage: &'a mut Box<dyn mz_catalog::durable::DurableCatalogState>,
) -> BoxFuture<
'a,
Expand Down Expand Up @@ -266,13 +270,11 @@ impl Catalog {
// This time is usually the current system time, but with protection
// against backwards time jumps, even across restarts.
let boot_ts = {
let previous_ts = txn
.get_timestamp(&Timeline::EpochMilliseconds)
.expect("missing EpochMilliseconds timeline");
let boot_ts = catalog_oracle::monotonic_now(
config.now.clone(),
previous_ts,
);
info!(%previous_ts, %boot_ts, "initialize_state");
if !is_read_only {
// IMPORTANT: we durably record the new timestamp before using it.
txn.set_timestamp(Timeline::EpochMilliseconds, boot_ts)?;
Expand Down Expand Up @@ -840,6 +842,9 @@ impl Catalog {

/// Opens or creates a catalog that stores data at `path`.
///
/// The passed in `previous_ts` must be the highest read timestamp for
/// [Timeline::EpochMilliseconds] known across all timestamp oracles.
///
/// Returns the catalog, metadata about builtin objects that have changed
/// schemas since last restart, a list of updates to builtin tables that
/// describe the initial state of the catalog, and the version of the
Expand All @@ -850,6 +855,7 @@ impl Catalog {
/// Because of that we purposefully move this Future onto the heap (i.e. Box it).
pub fn open(
config: Config<'_>,
previous_ts: mz_repr::Timestamp,
) -> BoxFuture<
'static,
Result<
Expand All @@ -865,7 +871,7 @@ impl Catalog {
async move {
let mut storage = config.storage;
let (state, boot_ts, builtin_migration_metadata, last_seen_version) =
Self::initialize_state(config.state, &mut storage).await?;
Self::initialize_state(config.state, previous_ts, &mut storage).await?;

let mut catalog = Catalog {
state,
Expand Down
79 changes: 47 additions & 32 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ use crate::util::{ClientTransmitter, CompletedClientTransmitter, ComputeSinkId,
use crate::webhook::{WebhookAppenderInvalidator, WebhookConcurrencyLimiter};
use crate::{flags, AdapterNotice, TimestampProvider};
use mz_catalog::builtin::BUILTINS;
use mz_catalog::durable::OpenableDurableCatalogState;
use mz_catalog::durable::{DurableCatalogState, OpenableDurableCatalogState};
use mz_expr::refresh_schedule::RefreshSchedule;
use mz_ore::future::TimeoutError;
use mz_timestamp_oracle::postgres_oracle::{
Expand Down Expand Up @@ -2756,7 +2756,7 @@ pub fn serve(
controller_config,
controller_envd_epoch,
controller_persist_txn_tables,
storage,
mut storage,
timestamp_oracle_url,
unsafe_mode,
all_features,
Expand Down Expand Up @@ -2813,32 +2813,47 @@ pub fn serve(
let aws_privatelink_availability_zones = aws_privatelink_availability_zones
.map(|azs_vec| BTreeSet::from_iter(azs_vec.iter().cloned()));

let pg_timestamp_oracle_config = timestamp_oracle_url
.map(|pg_url| PostgresTimestampOracleConfig::new(&pg_url, &metrics_registry));
let initial_timestamps =
get_initial_oracle_timestamps(&mut storage, &pg_timestamp_oracle_config).await?;

// A candidate for the boot_ts. Catalog::open will further advance this,
// based on the "now" timestamp, if/when needed.
let previous_ts = initial_timestamps
.get(&Timeline::EpochMilliseconds)
.expect("missing EpochMillisseconds timestamp")
.clone();

info!("coordinator init: opening catalog");
let (catalog, builtin_migration_metadata, builtin_table_updates, _last_catalog_version) =
Catalog::open(mz_catalog::config::Config {
storage,
metrics_registry: &metrics_registry,
storage_usage_retention_period,
state: mz_catalog::config::StateConfig {
unsafe_mode,
all_features,
build_info,
environment_id: environment_id.clone(),
now: now.clone(),
skip_migrations: false,
cluster_replica_sizes,
builtin_cluster_replica_size,
system_parameter_defaults,
remote_system_parameters,
availability_zones,
egress_ips,
aws_principal_context,
aws_privatelink_availability_zones,
connection_context,
active_connection_count,
http_host_name,
Catalog::open(
mz_catalog::config::Config {
storage,
metrics_registry: &metrics_registry,
storage_usage_retention_period,
state: mz_catalog::config::StateConfig {
unsafe_mode,
all_features,
build_info,
environment_id: environment_id.clone(),
now: now.clone(),
skip_migrations: false,
cluster_replica_sizes,
builtin_cluster_replica_size,
system_parameter_defaults,
remote_system_parameters,
availability_zones,
egress_ips,
aws_principal_context,
aws_privatelink_availability_zones,
connection_context,
active_connection_count,
http_host_name,
},
},
})
previous_ts,
)
.await?;
let session_id = catalog.config().session_id;
let start_instant = catalog.config().start_instant;
Expand All @@ -2861,8 +2876,6 @@ pub fn serve(
// use the same impl!
let timestamp_oracle_impl = catalog.system_config().timestamp_oracle_impl();

let pg_timestamp_oracle_config = timestamp_oracle_url
.map(|pg_url| PostgresTimestampOracleConfig::new(&pg_url, &metrics_registry));
if let Some(config) = pg_timestamp_oracle_config.as_ref() {
// Apply settings from system vars as early as possible because some
// of them are locked in right when an oracle is first opened!
Expand All @@ -2871,9 +2884,6 @@ pub fn serve(
pg_timestamp_oracle_params.apply(config);
}

let initial_timestamps =
get_initial_oracle_timestamps(&catalog, &pg_timestamp_oracle_config).await?;

let thread = thread::Builder::new()
// The Coordinator thread tends to keep a lot of data on its stack. To
// prevent a stack overflow we allocate a stack three times as big as the default
Expand Down Expand Up @@ -3014,10 +3024,15 @@ pub fn serve(
// initializes oracles on bootstrap, once we have fully migrated to the new
// postgres/crdb-backed oracle.
async fn get_initial_oracle_timestamps(
catalog: &Catalog,
storage: &mut Box<dyn DurableCatalogState>,
pg_timestamp_oracle_config: &Option<PostgresTimestampOracleConfig>,
) -> Result<BTreeMap<Timeline, Timestamp>, AdapterError> {
let catalog_oracle_timestamps = catalog.get_all_persisted_timestamps().await?;
let catalog_oracle_timestamps: BTreeMap<_, _> = storage
.get_timestamps()
.await?
.into_iter()
.map(|mz_catalog::durable::TimelineTimestamp { timeline, ts }| (timeline, ts))
.collect();
let debug_msg = || {
catalog_oracle_timestamps
.iter()
Expand Down
5 changes: 5 additions & 0 deletions src/catalog-debug/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,10 @@ async fn upgrade_check(
)
.await?;

// Used as a lower boundary of the boot_ts, but it's ok to use now() for
// debugging/testing/inspecting.
let previous_ts = now().into();

let (_catalog, _, _, last_catalog_version) = Catalog::initialize_state(
StateConfig {
unsafe_mode: true,
Expand All @@ -435,6 +439,7 @@ async fn upgrade_check(
)),
active_connection_count: Arc::new(Mutex::new(ConnectionCounter::new(0))),
},
previous_ts,
&mut storage,
)
.await?;
Expand Down
2 changes: 1 addition & 1 deletion src/environmentd/tests/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1422,7 +1422,7 @@ fn test_old_storage_usage_records_are_reaped_on_restart() {
*now.lock().expect("lock poisoned") = u64::try_from(initial_timestamp)
.expect("negative timestamps are impossible")
+ u64::try_from(retention_period.as_millis()).expect("known to fit")
+ 1;
+ 200;

{
let server = harness.start_blocking();
Expand Down
2 changes: 1 addition & 1 deletion src/sql/src/session/vars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@ pub const PERSIST_TXN_TABLES: ServerVar<PersistTxnTablesImpl> = ServerVar {

const TIMESTAMP_ORACLE_IMPL: ServerVar<TimestampOracleImpl> = ServerVar {
name: UncasedStr::new("timestamp_oracle"),
value: TimestampOracleImpl::Catalog,
value: TimestampOracleImpl::Postgres,
description: "Backing implementation of TimestampOracle.",
internal: true,
};
Expand Down
5 changes: 5 additions & 0 deletions src/stash-debug/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,10 @@ impl Usage {
)
.await?;

// Used as a lower boundary of the boot_ts, but it's ok to use now() for
// debugging/testing/inspecting.
let previous_ts = now().into();

match Catalog::initialize_state(
StateConfig {
unsafe_mode: true,
Expand All @@ -474,6 +478,7 @@ impl Usage {
)),
active_connection_count: Arc::new(Mutex::new(ConnectionCounter::new(0))),
},
previous_ts,
&mut storage,
)
.await
Expand Down

0 comments on commit f44a426

Please sign in to comment.