diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 84d70f6..98f0128 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -24,4 +24,4 @@ jobs: - run: mkdir zookeeper - run: tar -zxvf zookeeper.tar.gz -C zookeeper --strip-components 1 - run: ./scripts/ci-start-zookeeper - - run: cargo test + - run: cargo test -F leader_recipe diff --git a/Cargo.lock b/Cargo.lock index 4f42afe..810141a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,32 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" +[[package]] +name = "aho-corasick" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddd31a130427c27518df266943a5308ed92d4b226cc639f5a8f1002816174301" +dependencies = [ + "memchr", +] + +[[package]] +name = "anyhow" +version = "1.0.102" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c" + +[[package]] +name = "backon" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cffb0e931875b666fc4fcb20fee52e9bbd1ef836fd9e9e04ec21555f9f85f7ef" +dependencies = [ + "fastrand", + "gloo-timers", + "tokio", +] + [[package]] name = "backtrace" version = "0.3.75" @@ -38,6 +64,12 @@ version = "2.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b8e56985ec62d17e9c1001dc89c88ecd7dc08e47eba5ec7c29c7b5eeecde967" +[[package]] +name = "bumpalo" +version = "3.20.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d20789868f4b01b2f2caec9f5c4e0213b41e3e5702a50157d699ae31ced2fcb" + [[package]] name = "byteorder" version = "1.5.0" @@ -50,6 +82,24 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9555578bc9e57714c812a1f84e4fc5b4d21fcb063490c624de019f7464c91268" +[[package]] +name = "equivalent" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" + +[[package]] +name = "fastrand" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f1f227452a390804cdb637b74a86990f2a7d7ba4b7d5693aac9b4dd6defd8d6" + +[[package]] +name = "foldhash" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" + [[package]] name = "futures" version = "0.3.31" @@ -139,18 +189,76 @@ dependencies = [ "slab", ] +[[package]] +name = "getrandom" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0de51e6874e94e7bf76d726fc5d13ba782deca734ff60d5bb2fb2607c7406555" +dependencies = [ + "cfg-if", + "libc", + "r-efi", + "wasip2", + "wasip3", +] + [[package]] name = "gimli" version = "0.31.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" +[[package]] +name = "gloo-timers" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb143cf96099802033e0d4f4963b19fd2e0b728bcf076cd9cf7f6634f092994" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", +] + +[[package]] +name = "hashbrown" +version = "0.15.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" +dependencies = [ + "foldhash", +] + +[[package]] +name = "hashbrown" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f467dd6dccf739c208452f8014c75c18bb8301b050ad1cfb27153803edb0f51" + [[package]] name = "heck" version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" +[[package]] +name = "id-arena" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d3067d79b975e8844ca9eb072e16b31c3c1c36928edf9c6789548c524d0d954" + +[[package]] +name = "indexmap" +version = "2.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d466e9454f08e4a911e14806c24e16fba1b4c121d1ea474396f396069cf949d9" +dependencies = [ + "equivalent", + "hashbrown 0.17.0", + "serde", + "serde_core", +] + [[package]] name = "io-uring" version = "0.7.9" @@ -162,12 +270,36 @@ dependencies = [ "libc", ] +[[package]] +name = "itoa" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682" + +[[package]] +name = "js-sys" +version = "0.3.97" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1840c94c045fbcf8ba2812c95db44499f7c64910a912551aaaa541decebcacf" +dependencies = [ + "cfg-if", + "futures-util", + "once_cell", + "wasm-bindgen", +] + [[package]] name = "lazy_static" version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" +[[package]] +name = "leb128fmt" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" + [[package]] name = "libc" version = "0.2.174" @@ -180,6 +312,15 @@ version = "0.4.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94" +[[package]] +name = "matchers" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +dependencies = [ + "regex-automata 0.1.10", +] + [[package]] name = "memchr" version = "2.7.5" @@ -269,6 +410,16 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "prettyplease" +version = "0.2.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b" +dependencies = [ + "proc-macro2", + "syn", +] + [[package]] name = "proc-macro2" version = "1.0.95" @@ -287,12 +438,116 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "r-efi" +version = "6.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8dcc9c7d52a811697d2151c701e0d08956f92b0e24136cf4cf27b57a6a0d9bf" + +[[package]] +name = "regex" +version = "1.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e10754a14b9137dd7b1e3e5b0493cc9171fdd105e0ab477f51b72e7f3ac0e276" +dependencies = [ + "aho-corasick", + "memchr", + "regex-automata 0.4.14", + "regex-syntax 0.8.10", +] + +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax 0.6.29", +] + +[[package]] +name = "regex-automata" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e1dd4122fc1595e8162618945476892eefca7b88c52820e74af6262213cae8f" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax 0.8.10", +] + +[[package]] +name = "regex-syntax" +version = "0.6.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" + +[[package]] +name = "regex-syntax" +version = "0.8.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a" + [[package]] name = "rustc-demangle" version = "0.1.26" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56f7d92ca342cea22a06f2121d944b4fd82af56988c270852495420f961d4ace" +[[package]] +name = "rustversion" +version = "1.0.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" + +[[package]] +name = "semver" +version = "1.0.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a7852d02fc848982e0c167ef163aaff9cd91dc640ba85e263cb1ce46fae51cd" + +[[package]] +name = "serde" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" +dependencies = [ + "serde_core", +] + +[[package]] +name = "serde_core" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.149" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83fc039473c5595ace860d8c4fafa220ff474b3fc6bfdb4293327f1a37e94d86" +dependencies = [ + "itoa", + "memchr", + "serde", + "serde_core", + "zmij", +] + [[package]] name = "sharded-slab" version = "0.1.7" @@ -347,9 +602,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.104" +version = "2.0.117" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17b6f705963418cdb9927482fa304bc562ece2fdd4f616084c50b7023b435a40" +checksum = "e665b8803e7b1d2a727f4023456bbbbe74da67099c585258af0ad9c5013b9b99" dependencies = [ "proc-macro2", "quote", @@ -397,6 +652,7 @@ dependencies = [ name = "tokio-zookeeper" version = "0.4.0" dependencies = [ + "backon", "byteorder", "futures", "pin-project", @@ -404,6 +660,7 @@ dependencies = [ "tokio", "tracing", "tracing-subscriber", + "uuid", ] [[package]] @@ -455,10 +712,14 @@ version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008" dependencies = [ + "matchers", "nu-ansi-term", + "once_cell", + "regex", "sharded-slab", "smallvec", "thread_local", + "tracing", "tracing-core", "tracing-log", ] @@ -469,6 +730,23 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512" +[[package]] +name = "unicode-xid" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" + +[[package]] +name = "uuid" +version = "1.23.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddd74a9687298c6858e9b88ec8935ec45d22e8fd5e6394fa1bd4e99a87789c76" +dependencies = [ + "getrandom", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "valuable" version = "0.1.1" @@ -481,6 +759,103 @@ version = "0.11.1+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b" +[[package]] +name = "wasip2" +version = "1.0.3+wasi-0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20064672db26d7cdc89c7798c48a0fdfac8213434a1186e5ef29fd560ae223d6" +dependencies = [ + "wit-bindgen 0.57.1", +] + +[[package]] +name = "wasip3" +version = "0.4.0+wasi-0.3.0-rc-2026-01-06" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5428f8bf88ea5ddc08faddef2ac4a67e390b88186c703ce6dbd955e1c145aca5" +dependencies = [ + "wit-bindgen 0.51.0", +] + +[[package]] +name = "wasm-bindgen" +version = "0.2.120" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df52b6d9b87e0c74c9edfa1eb2d9bf85e5d63515474513aa50fa181b3c4f5db1" +dependencies = [ + "cfg-if", + "once_cell", + "rustversion", + "wasm-bindgen-macro", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.120" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78b1041f495fb322e64aca85f5756b2172e35cd459376e67f2a6c9dffcedb103" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.120" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9dcd0ff20416988a18ac686d4d4d0f6aae9ebf08a389ff5d29012b05af2a1b41" +dependencies = [ + "bumpalo", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.120" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49757b3c82ebf16c57d69365a142940b384176c24df52a087fb748e2085359ea" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "wasm-encoder" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "990065f2fe63003fe337b932cfb5e3b80e0b4d0f5ff650e6985b1048f62c8319" +dependencies = [ + "leb128fmt", + "wasmparser", +] + +[[package]] +name = "wasm-metadata" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb0e353e6a2fbdc176932bbaab493762eb1255a7900fe0fea1a2f96c296cc909" +dependencies = [ + "anyhow", + "indexmap", + "wasm-encoder", + "wasmparser", +] + +[[package]] +name = "wasmparser" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe" +dependencies = [ + "bitflags", + "hashbrown 0.15.5", + "indexmap", + "semver", +] + [[package]] name = "winapi" version = "0.3.9" @@ -575,3 +950,103 @@ name = "windows_x86_64_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" + +[[package]] +name = "wit-bindgen" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7249219f66ced02969388cf2bb044a09756a083d0fab1e566056b04d9fbcaa5" +dependencies = [ + "wit-bindgen-rust-macro", +] + +[[package]] +name = "wit-bindgen" +version = "0.57.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ebf944e87a7c253233ad6766e082e3cd714b5d03812acc24c318f549614536e" + +[[package]] +name = "wit-bindgen-core" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea61de684c3ea68cb082b7a88508a8b27fcc8b797d738bfc99a82facf1d752dc" +dependencies = [ + "anyhow", + "heck", + "wit-parser", +] + +[[package]] +name = "wit-bindgen-rust" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7c566e0f4b284dd6561c786d9cb0142da491f46a9fbed79ea69cdad5db17f21" +dependencies = [ + "anyhow", + "heck", + "indexmap", + "prettyplease", + "syn", + "wasm-metadata", + "wit-bindgen-core", + "wit-component", +] + +[[package]] +name = "wit-bindgen-rust-macro" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c0f9bfd77e6a48eccf51359e3ae77140a7f50b1e2ebfe62422d8afdaffab17a" +dependencies = [ + "anyhow", + "prettyplease", + "proc-macro2", + "quote", + "syn", + "wit-bindgen-core", + "wit-bindgen-rust", +] + +[[package]] +name = "wit-component" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2" +dependencies = [ + "anyhow", + "bitflags", + "indexmap", + "log", + "serde", + "serde_derive", + "serde_json", + "wasm-encoder", + "wasm-metadata", + "wasmparser", + "wit-parser", +] + +[[package]] +name = "wit-parser" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ecc8ac4bc1dc3381b7f59c34f00b67e18f910c2c0f50015669dde7def656a736" +dependencies = [ + "anyhow", + "id-arena", + "indexmap", + "log", + "semver", + "serde", + "serde_derive", + "serde_json", + "unicode-xid", + "wasmparser", +] + +[[package]] +name = "zmij" +version = "1.0.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8848ee67ecc8aedbaf3e4122217aff892639231befc6a1b58d29fff4c2cabaa" diff --git a/Cargo.toml b/Cargo.toml index 6034d04..b5446ad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,15 +21,18 @@ edition = "2024" maintenance = { status = "experimental" } [dependencies] +backon = { version = "1.6.0", optional = true } byteorder = "1.5.0" futures = "0.3.31" pin-project = "1.1.10" snafu = "0.8.6" tokio = { version = "1.47.1", features = ["net", "rt", "time"] } tracing = "0.1.41" +uuid = { version = "1.23.1", features = ["v4"], optional = true } [dev-dependencies] -tokio = { version = "1.47.1", features = ["macros"] } -tracing-subscriber = "0.3.19" +tokio = { version = "1.47.1", features = ["macros", "rt-multi-thread"] } +tracing-subscriber = { version = "0.3.19", features = ["env-filter"] } [features] +leader_recipe = ["dep:backon", "dep:uuid", "tokio/sync"] diff --git a/src/lib.rs b/src/lib.rs index 69720c4..fe60c0d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -194,8 +194,8 @@ #![deny(missing_copy_implementations)] use error::Error; -use futures::{Stream, channel::oneshot}; -use snafu::{ResultExt, whatever as bail}; +use futures::{channel::oneshot, Stream}; +use snafu::{whatever as bail, ResultExt}; use std::borrow::Cow; use std::net::SocketAddr; use std::time; @@ -204,6 +204,8 @@ use tracing::{debug, instrument, trace}; /// Per-operation ZooKeeper error types. pub mod error; mod proto; +/// Recipes from the ZooKeeper docs +pub mod recipes; mod transform; mod types; diff --git a/src/recipes/leader/mod.rs b/src/recipes/leader/mod.rs new file mode 100644 index 0000000..b9dc3b9 --- /dev/null +++ b/src/recipes/leader/mod.rs @@ -0,0 +1,549 @@ +use snafu::Snafu; +use snafu::{OptionExt, ResultExt, Whatever, whatever}; +use tokio::task::AbortHandle; +use tracing::{Instrument, debug, error, info, trace_span, warn}; +use uuid::Uuid; + +use crate::WatchedEventType::{self, NodeDeleted}; +use crate::ZooKeeper; +use crate::{Acl, WatchedEvent}; +use crate::{CreateMode, KeeperState}; + +use tokio::sync::watch; + +use backon::ExponentialBuilder; +use backon::Retryable; + +/// Participate in leader election through this struct +#[derive(Debug)] +pub struct LeaderElection { + /// ZNode under which volunteers are registered + election_prefix: String, + zk: ZooKeeper, + backon_builder: ExponentialBuilder, + acl: Vec, +} + +/// Represents the current state this node is in +#[derive(Debug, Clone, Copy)] +pub enum LeadershipState { + /// This node is a follower + Follower, + /// This node is the leader + Leader, + /// Leadership participation procedure has not yet started + Uninitialized, + /// An error has occurred in the leader election procedure; terminal state + Error, +} + +impl LeaderElection { + /// Create a new leader election struct + pub fn new(zk: ZooKeeper, election_node: &str, acl: Vec) -> Self { + let backon_builder = ExponentialBuilder::default() + .with_jitter() + .with_min_delay(core::time::Duration::from_millis(100)) + .with_max_delay(core::time::Duration::from_millis(5000)) + .with_max_times(5); + + Self { + election_prefix: election_node.to_string(), + zk, + backon_builder, + acl, + } + } + /// Participate in [leader election](https://zookeeper.apache.org/doc/current/recipes.html#sc_leaderElection) + /// + /// # Returns + /// + /// - A [tokio::sync::watch::Receiver] that resolves once this node becomes + /// a leader. To stop participating, drop the underlying ZooKeeper + /// connection, so that the underlying ephemeral znodes are removed. + /// - A [tokio::runtime::task::abort::AbortHandle]. Call .abort() to stop + /// participating in leader election. If a connection to ZooKeeper is + /// kept alive after this call, the ephemeral nodes are not removed making + /// it it seem like you're still participating + /// + /// Upon receiving from this receiver applications may consider creating a + /// separate znode to acknowledge that the leader has executed the leader + /// procedure. + /// + /// Does not automatically re-create ephemeral nodes for participation but + /// sends an error whenever session expires or other unexpected events or + /// errors occur in the process. Only these transitions are possible: + /// - Uninitialized -> Leader -> Error + /// - Uninitialized -> Follower -> Error + /// - Uninitialized -> Follower -> Leader -> Error + /// + /// Here is an example of how this might be used: + /// + /// ```no_run + /// use tokio::select; + /// use tokio_zookeeper::{Acl, ZooKeeper, recipes::leader::*}; + /// + /// fn init_tracing_subscriber() { + /// let _ = tracing_subscriber::fmt() + /// .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + /// .init(); + /// } + /// + /// #[tokio::main] + /// async fn main() { + /// init_tracing_subscriber(); + /// let binding = "127.0.0.1:2181".parse().unwrap(); + /// let (zk, _default_watcher) = ZooKeeper::connect(&binding).await.unwrap(); + /// + /// let leader_election = LeaderElection::new(zk, "/election", Acl::open_unsafe().to_vec()); + /// let (mut leader_receiver, _abort_handle) = leader_election.volunteer().await.unwrap(); + /// loop { + /// let state = *leader_receiver.borrow_and_update(); + /// match state { + /// LeadershipState::Leader => { + /// select! { + /// _ = leader_receiver.changed() => { println!("changed"); } + /// _ = async { + /// loop { + /// println!("doing leader work..."); + /// tokio::time::sleep(tokio::time::Duration::from_secs(1_000)).await + /// } + /// } => {} + /// } + /// } + /// LeadershipState::Follower => { + /// select! { + /// _ = leader_receiver.changed() => { println!("changed"); } + /// _ = async { + /// loop { + /// println!("doing follower work..."); + /// tokio::time::sleep(tokio::time::Duration::from_secs(1_000)).await + /// } + /// } => unreachable!("select should cancel the future") + /// } + /// } + /// LeadershipState::Uninitialized => { + /// println!("uninitialized"); + /// _ = leader_receiver.changed().await; + /// } + /// LeadershipState::Error => { + /// eprintln!("error"); + /// return; + /// } + /// } + /// } + /// } + /// ``` + pub async fn volunteer( + self, + ) -> Result<(watch::Receiver, AbortHandle), Whatever> { + info!("volunteering for leader election"); + + // Error handling with guids: + // https://zookeeper.apache.org/doc/current/recipes.html#sc_recipes_GuidNote + // "If a recoverable error occurs calling create() the client should + // call getChildren() and check for a node containing the guid used in the path + // name. This handles the case [...] of the create() succeeding on the + // server but the server crashing before returning the name of the new node." + let guid = Uuid::new_v4(); + let path = match self + .zk + .create( + &format!("{}/{}-n_", self.election_prefix, guid), + &b""[..], + self.acl, + CreateMode::EphemeralSequential, + ) + .await + { + Ok(create_res) => { + // if this is an err, it is "unrecoverable": no parent node, node already exists, etc. + create_res.whatever_context("can't create ephemeral node, unrecoverable error")? + } + Err(e) => { + warn!( + "can't create ephemeral node : {e}, recoverable error, will now try to find my guid again..." + ); + (|| async { + get_children(&self.zk, &self.election_prefix) + .await + .whatever_context( + "can't get leader election nodes, retry failed".to_string(), + ) + }) + .retry(self.backon_builder) + .notify(|err, dur| { + warn!("retrying {:?} after {:?}", err, dur); + }) + .await? + .into_iter() + .find(|child| child.guid == guid) + .map(|child| child.full_path()) + .whatever_context("can't find a znode with my guid")? + } + }; + + let node = ElectionChild::try_from_full_path(&path, &self.election_prefix) + .whatever_context("wrong format of the election child node")?; + + let (leader_sender, leader_receiver) = watch::channel(LeadershipState::Uninitialized); + let candidate = Candidate::new(node, self.zk, self.backon_builder); + + let jh = tokio::spawn( + candidate + .observe(leader_sender) + .instrument(trace_span!("election_observer", my_path = %path)), + ); + + Ok((leader_receiver, jh.abort_handle())) + } +} + +#[derive(Debug)] +struct Candidate { + zk: ZooKeeper, + node: ElectionChild, + backon_builder: ExponentialBuilder, +} + +impl Candidate { + fn new(node: ElectionChild, zk: ZooKeeper, backon_builder: ExponentialBuilder) -> Self { + Self { + zk, + node, + backon_builder, + } + } + + async fn observe(self, leader_sender: watch::Sender) { + loop { + match (|| async { self.observe_once(&leader_sender).await }) + .retry(self.backon_builder) + .notify(|err, dur| { + warn!("retrying {:?} after {:?}", err, dur); + }) + .when(|e| matches!(e, LeaderElectionError::ZnodeNotFound)) + .await + { + Ok(_) => continue, + Err(e) => { + error!("{e}"); + _ = leader_sender.send(LeadershipState::Error); + return; + } + } + } + } + + async fn observe_once( + &self, + leader_sender: &watch::Sender, + ) -> Result<(), LeaderElectionError> { + let mut children: Vec = + get_children(&self.zk, &self.node.election_prefix).await?; + children.sort_unstable(); + + debug!(participants = ?children, "got leader election participants"); + + match children.iter().position(|node| node == &self.node) { + Some(0) => { + self.observe_leader(leader_sender).await?; + } + Some(index) => { + self.observe_follower(&children[index - 1], leader_sender) + .await?; + } + None => return Err(LeaderElectionError::ZnodeNotFound), + }; + + Ok(()) + } + + async fn observe_leader( + &self, + leader_sender: &watch::Sender, + ) -> Result<(), LeaderElectionError> { + // start watching my own ephemeral node + let (rx, stat) = self + .zk + .with_watcher() + .exists(&self.node.full_path()) + .await?; + if stat.is_none() { + return Err(LeaderElectionError::ZnodeNotFound); + } + + info!("i am the leader"); + leader_sender + .send(LeadershipState::Leader) + .map_err(|e| LeaderElectionError::SendError { source: e })?; + + let event = rx + .await + .map_err(|e| LeaderElectionError::Canceled { source: e })?; + match (event.event_type, event.keeper_state) { + (NodeDeleted, _) => { + error!("Leader's ephemeral node was deleted"); + Err(LeaderElectionError::UnexpectedEvent { change: event }) + } + (WatchedEventType::None, KeeperState::Expired | KeeperState::AuthFailed) => { + Err(LeaderElectionError::UnexpectedEvent { change: event }) + } + _ => { + // transient disconnects, SyncConnected, SaslAuthenticated, NodeDataChanged events etc. + debug!(?event, "retryable event, will retry"); + Ok(()) + } + } + } + + async fn observe_follower( + &self, + preceding_node: &ElectionChild, + leader_sender: &watch::Sender, + ) -> Result<(), LeaderElectionError> { + let path = preceding_node.full_path(); + debug!(?path, "setting the watch for the preceding node"); + + let (rx, stat) = self.zk.with_watcher().exists(&path).await?; + if stat.is_none() { + debug!("preceding node already gone, re-evaluating"); + return Ok(()); + } + + leader_sender + .send(LeadershipState::Follower) + .map_err(|e| LeaderElectionError::SendError { source: e })?; + + let event = rx + .await + .map_err(|e| LeaderElectionError::Canceled { source: e })?; + match (event.event_type, event.keeper_state) { + (NodeDeleted, _) => { + debug!(?event, "the preceding node was removed"); + Ok(()) + } + (WatchedEventType::None, KeeperState::Expired | KeeperState::AuthFailed) => { + Err(LeaderElectionError::UnexpectedEvent { change: event }) + } + _ => { + // transient disconnects, SyncConnected, SaslAuthenticated, NodeDataChanged events, etc. + debug!(?event, "retryable event, will retry"); + Ok(()) + } + } + } +} + +#[derive(PartialEq, Eq, Debug)] +struct ElectionChild { + election_prefix: String, + path: String, + guid: Uuid, + seq: u32, +} + +impl ElectionChild { + fn try_from_full_path(full_path: &str, election_prefix: &str) -> Result { + if !full_path.starts_with(election_prefix) { + whatever!( + "wrong format of a child node; must start with election prefix {election_prefix}" + ); + } + + let Some(path) = full_path.strip_prefix(&format!("{}/", election_prefix)) else { + whatever!("wrong format of a child node; must be `/prefix/path`"); + }; + + ElectionChild::try_from_parts(election_prefix, path) + } + + fn try_from_parts(prefix: &str, path: &str) -> Result { + let path_parts = path.split("-n_").collect::>(); + if path_parts.len() != 2 { + whatever!("wrong format of a child node's path; must `-n_`"); + } + let guid = Uuid::parse_str(path_parts[0]).whatever_context("Can't parse node's guid")?; + let seq = path_parts[1].parse::().whatever_context(format!( + "cant parse node's sequential number as u32 from prefix `{prefix}` and path `{path}`" + ))?; + + Ok(Self { + election_prefix: prefix.to_string(), + path: path.to_string(), + guid, + seq, + }) + } + + fn full_path(&self) -> String { + format!("{}/{}", self.election_prefix, self.path) + } +} + +impl PartialOrd for ElectionChild { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for ElectionChild { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.seq.cmp(&other.seq) + } +} + +#[derive(Debug, Snafu)] +enum LeaderElectionError { + #[snafu( + display("Watch receiver canceled, server disconnected?"), + context(false) + )] + Canceled { + source: futures::channel::oneshot::Canceled, + }, + #[snafu(display("ZooKeeper error: {source}"), context(false))] + ZkError { source: crate::Error }, + #[snafu(display("Error while sending leadership state update"), context(false))] + SendError { + source: watch::error::SendError, + }, + #[snafu(display("Ephemeral znode for leader election not found"))] + ZnodeNotFound, + #[snafu(display("Unexpected event received: {change:?}"))] + UnexpectedEvent { change: WatchedEvent }, +} + +async fn get_children( + zk: &ZooKeeper, + election_node: &str, +) -> Result, crate::Error> { + Ok(zk + .get_children(election_node) + .await? + .unwrap_or_default() + .into_iter() + .filter_map(|s| { + ElectionChild::try_from_parts(election_node, &s) + .inspect_err(|e| warn!("skipping malformed election child {s:?}: {e}")) + .ok() + }) + .collect()) +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use super::*; + use crate::{ZooKeeperBuilder, error::Create}; + + #[test] + fn parse_path() { + init_tracing_subscriber(); + let guid = Uuid::new_v4(); + let ok_path = format!("/election/{}-n_000123", guid); + let c = ElectionChild::try_from_full_path(&ok_path, "/election").unwrap(); + assert_eq!(c.election_prefix, "/election"); + assert_eq!(c.path, format!("{}-n_000123", guid)); + assert_eq!(c.guid, guid); + assert_eq!(c.full_path(), format!("/election/{}-n_000123", guid)); + + let bad_paths = vec![ + format!("/election//{guid}-n_00123"), + format!("/election_bad/{guid}-n_00123"), + "/election/bad_path".to_string(), + "/election/bad-guid-n_00123".to_string(), + format!("/election/{guid}-n_badnumber"), + ]; + + for path in bad_paths { + assert!(ElectionChild::try_from_full_path(&path, "/election").is_err()); + } + } + + #[tokio::test] + async fn election_works() { + let builder = ZooKeeperBuilder::default(); + let connect_addr = "127.0.0.1:2181".parse().unwrap(); + + init_tracing_subscriber(); + + let (zk1, _w) = builder.connect(&connect_addr).await.unwrap(); + create_election_node(&zk1).await; + let leader_election1 = LeaderElection::new(zk1, "/election", Acl::open_unsafe().to_vec()); + let (mut rx1, jh1) = leader_election1.volunteer().await.unwrap(); + tokio::time::timeout(Duration::from_secs(10), wait_for_leadership(&mut rx1)) + .await + .expect("the first participant should be the leader"); + + let (zk2, _w) = builder.connect(&connect_addr).await.unwrap(); + let leader_election2 = LeaderElection::new(zk2, "/election", Acl::open_unsafe().to_vec()); + let (mut rx2, _jh2) = leader_election2.volunteer().await.unwrap(); + + tokio::time::timeout(Duration::from_secs(10), wait_for_follower(&mut rx2)) + .await + .expect("the second participant should become follower"); + + jh1.abort(); + + tokio::time::timeout(Duration::from_secs(10), wait_for_leadership(&mut rx2)) + .await + .expect("the second participant should now become the leader"); + } + + fn init_tracing_subscriber() { + let _ = tracing_subscriber::fmt() + .with_max_level(tracing::Level::DEBUG) + .try_init(); + } + + async fn create_election_node(zk: &ZooKeeper) { + let res = zk + .create( + "/election", + &b""[..], + Acl::open_unsafe(), + CreateMode::Persistent, + ) + .await + .unwrap(); + + match res { + Ok(_) => {} + Err(e) if e == Create::NodeExists => {} + Err(e) => panic!("{e}"), + }; + } + + async fn wait_for_leadership(rx: &mut tokio::sync::watch::Receiver) { + debug!("waiting for leadership"); + loop { + let state = *rx.borrow_and_update(); + match state { + LeadershipState::Leader => { + return; + } + LeadershipState::Uninitialized | LeadershipState::Follower => { + rx.changed().await.unwrap() + } + _ => panic!("wait for leadership error {state:?}"), + } + } + } + + async fn wait_for_follower(rx: &mut tokio::sync::watch::Receiver) { + debug!("waiting for follower"); + loop { + let state = *rx.borrow_and_update(); + match state { + LeadershipState::Leader | LeadershipState::Uninitialized => { + rx.changed().await.unwrap() + } + LeadershipState::Follower => { + return; + } + _ => panic!("wait for follower error {state:?}"), + } + } + } +} diff --git a/src/recipes/mod.rs b/src/recipes/mod.rs new file mode 100644 index 0000000..6321959 --- /dev/null +++ b/src/recipes/mod.rs @@ -0,0 +1,3 @@ +/// A recipe for the leader election procedure [https://zookeeper.apache.org/doc/current/recipes.html#sc_leaderElection] +#[cfg(feature = "leader_recipe")] +pub mod leader;