diff --git a/src/adapter/src/catalog.rs b/src/adapter/src/catalog.rs index bef8d2ca95ab..b61d825266b8 100644 --- a/src/adapter/src/catalog.rs +++ b/src/adapter/src/catalog.rs @@ -659,31 +659,37 @@ impl Catalog { let metrics_registry = &MetricsRegistry::new(); let active_connection_count = Arc::new(std::sync::Mutex::new(ConnectionCounter::new(0))); let secrets_reader = Arc::new(InMemorySecretsController::new()); - let (catalog, _, _, _) = Catalog::open(Config { - storage, - metrics_registry, - // when debugging, no reaping - storage_usage_retention_period: None, - state: StateConfig { - unsafe_mode: true, - all_features: false, - build_info: &DUMMY_BUILD_INFO, - environment_id: environment_id.unwrap_or(EnvironmentId::for_tests()), - now, - skip_migrations: true, - cluster_replica_sizes: Default::default(), - builtin_cluster_replica_size: "1".into(), - system_parameter_defaults: Default::default(), - remote_system_parameters: None, - availability_zones: vec![], - egress_ips: vec![], - aws_principal_context: None, - aws_privatelink_availability_zones: None, - http_host_name: None, - connection_context: ConnectionContext::for_tests(secrets_reader), - active_connection_count, + // Used as a lower boundary of the boot_ts, but it's ok to use now() for + // debugging/testing. + let previous_ts = now().into(); + let (catalog, _, _, _) = Catalog::open( + Config { + storage, + metrics_registry, + // when debugging, no reaping + storage_usage_retention_period: None, + state: StateConfig { + unsafe_mode: true, + all_features: false, + build_info: &DUMMY_BUILD_INFO, + environment_id: environment_id.unwrap_or(EnvironmentId::for_tests()), + now, + skip_migrations: true, + cluster_replica_sizes: Default::default(), + builtin_cluster_replica_size: "1".into(), + system_parameter_defaults: Default::default(), + remote_system_parameters: None, + availability_zones: vec![], + egress_ips: vec![], + aws_principal_context: None, + aws_privatelink_availability_zones: None, + http_host_name: None, + connection_context: ConnectionContext::for_tests(secrets_reader), + active_connection_count, + }, }, - }) + previous_ts, + ) .await?; Ok(catalog) } diff --git a/src/adapter/src/catalog/open.rs b/src/adapter/src/catalog/open.rs index d8357fac2f1d..c91d5d7a6988 100644 --- a/src/adapter/src/catalog/open.rs +++ b/src/adapter/src/catalog/open.rs @@ -182,11 +182,15 @@ impl Catalog { /// Initializes a CatalogState. Separate from [`Catalog::open`] to avoid depending on state /// external to a [mz_catalog::durable::DurableCatalogState] (for example: no [mz_secrets::SecretsReader]). /// + /// The passed in `previous_ts` must be the highest read timestamp for + /// [Timeline::EpochMilliseconds] known across all timestamp oracles. + /// /// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 7.5KB. This would /// get stored on the stack which is bad for runtime performance, and blow up our stack usage. /// Because of that we purposefully move this Future onto the heap (i.e. Box it). pub fn initialize_state<'a>( config: StateConfig, + previous_ts: mz_repr::Timestamp, storage: &'a mut Box, ) -> BoxFuture< 'a, @@ -266,13 +270,11 @@ impl Catalog { // This time is usually the current system time, but with protection // against backwards time jumps, even across restarts. let boot_ts = { - let previous_ts = txn - .get_timestamp(&Timeline::EpochMilliseconds) - .expect("missing EpochMilliseconds timeline"); let boot_ts = catalog_oracle::monotonic_now( config.now.clone(), previous_ts, ); + info!(%previous_ts, %boot_ts, "initialize_state"); if !is_read_only { // IMPORTANT: we durably record the new timestamp before using it. txn.set_timestamp(Timeline::EpochMilliseconds, boot_ts)?; @@ -840,6 +842,9 @@ impl Catalog { /// Opens or creates a catalog that stores data at `path`. /// + /// The passed in `previous_ts` must be the highest read timestamp for + /// [Timeline::EpochMilliseconds] known across all timestamp oracles. + /// /// Returns the catalog, metadata about builtin objects that have changed /// schemas since last restart, a list of updates to builtin tables that /// describe the initial state of the catalog, and the version of the @@ -850,6 +855,7 @@ impl Catalog { /// Because of that we purposefully move this Future onto the heap (i.e. Box it). pub fn open( config: Config<'_>, + previous_ts: mz_repr::Timestamp, ) -> BoxFuture< 'static, Result< @@ -865,7 +871,7 @@ impl Catalog { async move { let mut storage = config.storage; let (state, boot_ts, builtin_migration_metadata, last_seen_version) = - Self::initialize_state(config.state, &mut storage).await?; + Self::initialize_state(config.state, previous_ts, &mut storage).await?; let mut catalog = Catalog { state, diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index 9eae5bcd7924..92fa90594371 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -163,7 +163,7 @@ use crate::util::{ClientTransmitter, CompletedClientTransmitter, ComputeSinkId, use crate::webhook::{WebhookAppenderInvalidator, WebhookConcurrencyLimiter}; use crate::{flags, AdapterNotice, TimestampProvider}; use mz_catalog::builtin::BUILTINS; -use mz_catalog::durable::OpenableDurableCatalogState; +use mz_catalog::durable::{DurableCatalogState, OpenableDurableCatalogState}; use mz_expr::refresh_schedule::RefreshSchedule; use mz_ore::future::TimeoutError; use mz_timestamp_oracle::postgres_oracle::{ @@ -2756,7 +2756,7 @@ pub fn serve( controller_config, controller_envd_epoch, controller_persist_txn_tables, - storage, + mut storage, timestamp_oracle_url, unsafe_mode, all_features, @@ -2813,32 +2813,47 @@ pub fn serve( let aws_privatelink_availability_zones = aws_privatelink_availability_zones .map(|azs_vec| BTreeSet::from_iter(azs_vec.iter().cloned())); + let pg_timestamp_oracle_config = timestamp_oracle_url + .map(|pg_url| PostgresTimestampOracleConfig::new(&pg_url, &metrics_registry)); + let initial_timestamps = + get_initial_oracle_timestamps(&mut storage, &pg_timestamp_oracle_config).await?; + + // A candidate for the boot_ts. Catalog::open will further advance this, + // based on the "now" timestamp, if/when needed. + let previous_ts = initial_timestamps + .get(&Timeline::EpochMilliseconds) + .expect("missing EpochMillisseconds timestamp") + .clone(); + info!("coordinator init: opening catalog"); let (catalog, builtin_migration_metadata, builtin_table_updates, _last_catalog_version) = - Catalog::open(mz_catalog::config::Config { - storage, - metrics_registry: &metrics_registry, - storage_usage_retention_period, - state: mz_catalog::config::StateConfig { - unsafe_mode, - all_features, - build_info, - environment_id: environment_id.clone(), - now: now.clone(), - skip_migrations: false, - cluster_replica_sizes, - builtin_cluster_replica_size, - system_parameter_defaults, - remote_system_parameters, - availability_zones, - egress_ips, - aws_principal_context, - aws_privatelink_availability_zones, - connection_context, - active_connection_count, - http_host_name, + Catalog::open( + mz_catalog::config::Config { + storage, + metrics_registry: &metrics_registry, + storage_usage_retention_period, + state: mz_catalog::config::StateConfig { + unsafe_mode, + all_features, + build_info, + environment_id: environment_id.clone(), + now: now.clone(), + skip_migrations: false, + cluster_replica_sizes, + builtin_cluster_replica_size, + system_parameter_defaults, + remote_system_parameters, + availability_zones, + egress_ips, + aws_principal_context, + aws_privatelink_availability_zones, + connection_context, + active_connection_count, + http_host_name, + }, }, - }) + previous_ts, + ) .await?; let session_id = catalog.config().session_id; let start_instant = catalog.config().start_instant; @@ -2861,8 +2876,6 @@ pub fn serve( // use the same impl! let timestamp_oracle_impl = catalog.system_config().timestamp_oracle_impl(); - let pg_timestamp_oracle_config = timestamp_oracle_url - .map(|pg_url| PostgresTimestampOracleConfig::new(&pg_url, &metrics_registry)); if let Some(config) = pg_timestamp_oracle_config.as_ref() { // Apply settings from system vars as early as possible because some // of them are locked in right when an oracle is first opened! @@ -2871,9 +2884,6 @@ pub fn serve( pg_timestamp_oracle_params.apply(config); } - let initial_timestamps = - get_initial_oracle_timestamps(&catalog, &pg_timestamp_oracle_config).await?; - let thread = thread::Builder::new() // The Coordinator thread tends to keep a lot of data on its stack. To // prevent a stack overflow we allocate a stack three times as big as the default @@ -3014,10 +3024,15 @@ pub fn serve( // initializes oracles on bootstrap, once we have fully migrated to the new // postgres/crdb-backed oracle. async fn get_initial_oracle_timestamps( - catalog: &Catalog, + storage: &mut Box, pg_timestamp_oracle_config: &Option, ) -> Result, AdapterError> { - let catalog_oracle_timestamps = catalog.get_all_persisted_timestamps().await?; + let catalog_oracle_timestamps: BTreeMap<_, _> = storage + .get_timestamps() + .await? + .into_iter() + .map(|mz_catalog::durable::TimelineTimestamp { timeline, ts }| (timeline, ts)) + .collect(); let debug_msg = || { catalog_oracle_timestamps .iter() diff --git a/src/catalog-debug/src/main.rs b/src/catalog-debug/src/main.rs index 2c5c84d9118c..a2d85646873b 100644 --- a/src/catalog-debug/src/main.rs +++ b/src/catalog-debug/src/main.rs @@ -413,6 +413,10 @@ async fn upgrade_check( ) .await?; + // Used as a lower boundary of the boot_ts, but it's ok to use now() for + // debugging/testing/inspecting. + let previous_ts = now().into(); + let (_catalog, _, _, last_catalog_version) = Catalog::initialize_state( StateConfig { unsafe_mode: true, @@ -435,6 +439,7 @@ async fn upgrade_check( )), active_connection_count: Arc::new(Mutex::new(ConnectionCounter::new(0))), }, + previous_ts, &mut storage, ) .await?; diff --git a/src/environmentd/tests/server.rs b/src/environmentd/tests/server.rs index ed3b7248c9aa..746e7f94ba37 100644 --- a/src/environmentd/tests/server.rs +++ b/src/environmentd/tests/server.rs @@ -1422,7 +1422,7 @@ fn test_old_storage_usage_records_are_reaped_on_restart() { *now.lock().expect("lock poisoned") = u64::try_from(initial_timestamp) .expect("negative timestamps are impossible") + u64::try_from(retention_period.as_millis()).expect("known to fit") - + 1; + + 200; { let server = harness.start_blocking(); diff --git a/src/sql/src/session/vars.rs b/src/sql/src/session/vars.rs index 3be562ce04fd..1caa8fd791bf 100644 --- a/src/sql/src/session/vars.rs +++ b/src/sql/src/session/vars.rs @@ -607,7 +607,7 @@ pub const PERSIST_TXN_TABLES: ServerVar = ServerVar { const TIMESTAMP_ORACLE_IMPL: ServerVar = ServerVar { name: UncasedStr::new("timestamp_oracle"), - value: TimestampOracleImpl::Catalog, + value: TimestampOracleImpl::Postgres, description: "Backing implementation of TimestampOracle.", internal: true, }; diff --git a/src/stash-debug/src/main.rs b/src/stash-debug/src/main.rs index 20a59dfeccd4..84f8cf734b65 100644 --- a/src/stash-debug/src/main.rs +++ b/src/stash-debug/src/main.rs @@ -452,6 +452,10 @@ impl Usage { ) .await?; + // Used as a lower boundary of the boot_ts, but it's ok to use now() for + // debugging/testing/inspecting. + let previous_ts = now().into(); + match Catalog::initialize_state( StateConfig { unsafe_mode: true, @@ -474,6 +478,7 @@ impl Usage { )), active_connection_count: Arc::new(Mutex::new(ConnectionCounter::new(0))), }, + previous_ts, &mut storage, ) .await