From ddc927c4e50bbd1c6a113500785e04571762a805 Mon Sep 17 00:00:00 2001 From: llz Date: Wed, 6 Jul 2022 09:56:29 +0800 Subject: [PATCH 1/2] Upgrade to timely and DD 0.12.0 --- connect/Cargo.toml | 2 +- tdiag/Cargo.toml | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/connect/Cargo.toml b/connect/Cargo.toml index 52be8f6..f8fa622 100644 --- a/connect/Cargo.toml +++ b/connect/Cargo.toml @@ -10,4 +10,4 @@ license = "MIT" edition = "2018" [dependencies] -timely = "^0.11" +timely = "^0.12.0" diff --git a/tdiag/Cargo.toml b/tdiag/Cargo.toml index c01fbfe..d42b399 100644 --- a/tdiag/Cargo.toml +++ b/tdiag/Cargo.toml @@ -10,8 +10,8 @@ license = "MIT" edition = "2018" [dependencies] -timely = "^0.11" -differential-dataflow = "^0.11" +timely = "^0.12.0" +differential-dataflow = "^0.12.0" clap = "^2.33" # tdiag-connect = "^0.2" tdiag-connect = { path = "../connect" } From e280d38bce4037b82ca78b9f9e130aab58444aee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=A8=8A=E5=AE=81?= <2939747187@qq.com> Date: Wed, 6 Jul 2022 10:28:39 +0800 Subject: [PATCH 2/2] Upgrade to timely and DD 0.12.0 --- connect/src/receive/replaywithshutdown.rs | 4 ++-- tdiag/src/commands/arrangements.rs | 2 +- tdiag/src/commands/graph.rs | 2 +- tdiag/src/commands/profile.rs | 5 +++-- tdiag/src/main.rs | 4 ++-- 5 files changed, 9 insertions(+), 8 deletions(-) diff --git a/connect/src/receive/replaywithshutdown.rs b/connect/src/receive/replaywithshutdown.rs index 0b66c87..a1f660c 100644 --- a/connect/src/receive/replaywithshutdown.rs +++ b/connect/src/receive/replaywithshutdown.rs @@ -64,8 +64,8 @@ where I : IntoIterator, // The first thing we do is modify our capabilities to match the number of streams we manage. // This should be a simple change of `self.event_streams.len() - 1`. We only do this once, as // our very first action. - progress.internals[0].update(Default::default(), (event_streams.len() as i64) - 1); - antichain.update_iter(Some((Default::default(), (event_streams.len() as i64) - 1)).into_iter()); + progress.internals[0].update(T::minimum(), (event_streams.len() as i64) - 1); + antichain.update_iter(Some((T::minimum(), (event_streams.len() as i64) - 1)).into_iter()); started = true; } diff --git a/tdiag/src/commands/arrangements.rs b/tdiag/src/commands/arrangements.rs index a2ebb3a..42e402a 100644 --- a/tdiag/src/commands/arrangements.rs +++ b/tdiag/src/commands/arrangements.rs @@ -26,7 +26,7 @@ use tdiag_connect::receive::ReplayWithShutdown; /// compaction events and derive number of tuples for each trace; /// 3. prints the current size alongside arrangement names; pub fn listen( - timely_configuration: timely::Configuration, + timely_configuration: timely::Config, timely_sockets: Vec>, differential_sockets: Vec>, output_interval_ms: u64, diff --git a/tdiag/src/commands/graph.rs b/tdiag/src/commands/graph.rs index 5e5acc5..7b8b587 100644 --- a/tdiag/src/commands/graph.rs +++ b/tdiag/src/commands/graph.rs @@ -26,7 +26,7 @@ static GRAPH_HTML: &str = include_str!("graph/dataflow-graph.html"); /// /// This module includes `graph/dataflow-graph.html` as a static resource. pub fn listen_and_render( - timely_configuration: timely::Configuration, + timely_configuration: timely::Config, sockets: Vec>, output_path: &std::path::Path) -> Result<(), crate::DiagError> { diff --git a/tdiag/src/commands/profile.rs b/tdiag/src/commands/profile.rs index d8aef1a..065ed85 100644 --- a/tdiag/src/commands/profile.rs +++ b/tdiag/src/commands/profile.rs @@ -14,6 +14,7 @@ use differential_dataflow::operators::{Join, reduce::Threshold, Consolidate, arr use timely::logging::TimelyEvent::{Operates, Schedule}; use tdiag_connect::receive::ReplayWithShutdown; +use timely::progress::frontier::AntichainRef; /// Prints aggregate time spent in each scope/operator. /// @@ -24,7 +25,7 @@ use tdiag_connect::receive::ReplayWithShutdown; /// 3. prints the resulting measurements alongside operator names and /// scope names; pub fn listen_and_profile( - timely_configuration: timely::Configuration, + timely_configuration: timely::Config, sockets: Vec>) -> Result<(), crate::DiagError> { let sockets = Arc::new(Mutex::new(sockets)); @@ -119,7 +120,7 @@ pub fn listen_and_profile( let mut profile_trace = profile_trace; - profile_trace.distinguish_since(&[]); + profile_trace.set_physical_compaction(AntichainRef::new(&[])); let (mut cursor, storage) = profile_trace.cursor(); diff --git a/tdiag/src/main.rs b/tdiag/src/main.rs index e4260e6..4ac709b 100644 --- a/tdiag/src/main.rs +++ b/tdiag/src/main.rs @@ -106,8 +106,8 @@ variable pointing to tdiag's differential port (51318 by default). .parse().map_err(|e| DiagError(format!("Invalid --diag-workers: {}", e)))?; let timely_configuration = match diag_workers { - 1 => timely::Configuration::Thread, - n => timely::Configuration::Process(n), + 1 => timely::Config::thread(), + n => timely::Config::process(n), }; match args.subcommand() {