Skip to content

recipes: Leader election#60

Open
rodio wants to merge 22 commits into
stackabletech:mainfrom
rodio:leader-election
Open

recipes: Leader election#60
rodio wants to merge 22 commits into
stackabletech:mainfrom
rodio:leader-election

Conversation

@rodio
Copy link
Copy Markdown

@rodio rodio commented Apr 23, 2026

I am implementing the leader election recipe from #10 and would appreciate early feedback to see if it makes sense.

I thought that a nice API for leader would be similar to watches: the leader election function would return a receiving end of a oneshot channel so that users could use it to check if a node is a leader. There would no way to "withdraw" yourself from leader election process once you've volunteered to be a leader because I think that no one needs it in practice.

So now that I have a somewhat working draft, I'd appreciate if someone could give feedback on this API and function signatures or really anything. Eventually I would like to resolve all these TODOs hide this recipe behind a feture flag.

To test it:

  • run a ZooKeeper instance on localhost:2181
  • create a node ./bin/zkCli.sh -server localhost:2181 create /election ""
  • run the test with cargo test election_works -- --nocapture
  • or run multiple copies of this:
use core::time;
use std::{io, time::Duration};

use tokio_zookeeper::{recipes::LeaderElection, *};

fn init_tracing_subscriber() {
    let _ = tracing_subscriber::fmt()
        .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
        .init();
}

#[tokio::main]
async fn main() {
    init_tracing_subscriber();
    let binding = "127.0.0.1:2181".parse().unwrap();
    let mut zk_builder = ZooKeeperBuilder::default();
    zk_builder.set_timeout(time::Duration::from_millis(10_000));
    let (zk, _default_watcher) = ZooKeeper::connect(&binding).await.unwrap();

    let leader_election = LeaderElection::new(zk.clone(), "/election");
    let mut leader_receiver = leader_election.volunteer().await.unwrap();

    loop {
        match leader_receiver.try_recv() {
            Ok(Some(_)) => {
                println!("I am the leader");
                println!("doing leader stuff...");
                _ = dbg!(zk.exists("/leader").await);
                _ = io::stdin().read_line(&mut String::new());
                println!("good bye!");
                break;
            }
            Ok(None) => {
                println!("I am a follower");
                println!("doing follower stuff...");
                _ = dbg!(zk.exists("/follower").await);
                tokio::time::sleep(Duration::from_secs(1)).await;
                println!("now let me check if I am the leader yet...");
            }
            Err(_) => unreachable!("closed channel"),
        }
    }
}

rodio added 4 commits April 21, 2026 18:58
Signed-off-by: Rodion Borovyk <rodion.borovyk@gmail.com>
Signed-off-by: Rodion Borovyk <rodion.borovyk@gmail.com>
Signed-off-by: Rodion Borovyk <rodion.borovyk@gmail.com>
Signed-off-by: Rodion Borovyk <rodion.borovyk@gmail.com>
@stackable-cla
Copy link
Copy Markdown

stackable-cla Bot commented Apr 23, 2026

CLA assistant check
All committers have signed the CLA.

@rodio rodio marked this pull request as ready for review April 25, 2026 10:42
@rodio
Copy link
Copy Markdown
Author

rodio commented Apr 25, 2026

I've marked it as ready for review because apparently no one receives notifications about drafts?

@maltesander
Copy link
Copy Markdown
Member

Hi @rodio ,

thank you for the contribution. I had a look, i think the general approach looks good.

I see two problems (not sure if Draft related):

  • The oneshot is wrong and cannot model all cases from https://zookeeper.apache.org/doc/current/recipes.html. Leadership is an ongoing state, not a one-shot event. Needed for session-loss semantics and the acknowledgment-ZNode pattern. I would go with some sort of watch::<LeaderState> for that.
  • Furthermore i would split the LeaderElection like:
use tokio::sync::watch;

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum LeaderState {
	// volunteered, watching predecessor
    Pending,
    // at index 0
    Leader,
    // session/candidacy ended; terminal
    Resigned,
}

/// Configuration for participating in a leader election.
#[derive(Debug, Clone)]
pub struct LeaderElection {
    election_node: String,
    zk: ZooKeeper,
}

/// An active candidacy. Drop to stop observing (the ephemeral znode
/// is still only removed when the underlying session ends).
#[derive(Debug)]
pub struct Candidate {
    my_path: String,
    state: watch::<LeaderState>,
    ...
}

impl Candidate {
    pub fn path(&self) -> &str { &self.my_path }
    pub fn state(&self) -> LeaderState { self.state.borrow().clone() }

    /// Resolves once we become leader, or returns `Err` if we lose
    /// the session before that happens.
    pub async fn wait_for_leadership(&mut self) -> Result<(), ElectionError> {
        loop {
            match *self.state.borrow_and_update() {
                LeaderState::Leader => return Ok(()),
                // Should probably even be split into something like `SessionLost` and `Withdrawn`
                LeaderState::Resigned   => return Err(ElectionError::SessionLost),
                LeaderState::Pending => {}
            }
            self.state.changed().await.map_err(|_| ElectionError::SessionLost)?;
        }
    }
}

With a volunteer impl like:

pub async fn volunteer(&self) -> Result<Candidate, ElectionError> {...}

So the candidate only exists after you volunteered and my_path is never an Option.

This will help with a couple of your TODOs:

  • The TOCTOU fix is just if stat.is_none() { continue; } (would block otherwise)
  • TODO could be ActiveLeaderElection?

Some other things:

  • proper GUID handling
  • sort children by sequence number suffix, not full string (If GUID differ, largest j < i)
  • "cant find myself" -> probe own path, retry on lag and transition to terminal if ephermeral is gone.

Malte

@maltesander maltesander self-requested a review April 26, 2026 16:32
@rodio
Copy link
Copy Markdown
Author

rodio commented Apr 27, 2026

Hi, @maltesander! Thank you for the thorough review!

I'd like to argue a bit here and defend the oneshot approach.

The "Resigned" state that you mentioned in your code snippet is not a valid state at all and should not be exposed to users. You have the reason in your comment there: Drop to stop observing (the ephemeral znode is still only removed when the underlying session ends). So resigning from the leader election process without dropping the connection would leave other nodes with an "impression" that you're still participating. When your node becomes the node with the lowest ID everyone would just be sitting there assuming you're the leader, but you don't care. You've resigned but but no one knows about it. So it puts the whole system into an invalid state where it is permanently with a leader that does not perform its "duties".

In fact, if users really want to stop participating they should drop their connection like I did in the test. The connection and the participation should be always be coupled and it should not be possible to drop one without dropping the other. This way there'll be no "orphan" ephemeral nodes. And the way to do it is the one-shot approach: you either are the leader, or wait for the leadership or drop the whole connection.

@maltesander
Copy link
Copy Markdown
Member

Hi @rodio,

yeah, i agree that "Resigned" is wrong and we shouldnt expose "Withdrawal".

Id still argue about involuntary withdrawal? E.g. Session loss (which the candidate did not choose)?

Consider: A candidate becomes leader, fires Leader on the oneshot. Application starts doing leader work. A little later, the ZK session expires (network partition, GC pause, ZK server failover, whatever). The ephemeral znode is gone. Some other node is now the leader. The original application is still doing leader work, because nobody told it otherwise. The oneshot already fired and can't fire again.

Without withdrawal, if we go with the oneshot we should document explicitly that we have to watch connect() for session expiry. Then it should treat itself as no longer Leader, regardless of the oneshot?

Im fine with both approaches.

Malte

@rodio
Copy link
Copy Markdown
Author

rodio commented Apr 27, 2026

I think you're right that we need to monitor session loss after and I think it is better to hide it inside the implementation of the recipe because it probably is something that almost everyone needs. I'll think more about it later and we'll see what I can come up with. Thanks!

rodio added 10 commits May 1, 2026 12:55
Signed-off-by: Rodion Borovyk <rodion.borovyk@gmail.com>
Signed-off-by: Rodion Borovyk <rodion.borovyk@gmail.com>
Signed-off-by: Rodion Borovyk <rodion.borovyk@gmail.com>
Signed-off-by: Rodion Borovyk <rodion.borovyk@gmail.com>
Signed-off-by: Rodion Borovyk <rodion.borovyk@gmail.com>
Signed-off-by: Rodion Borovyk <rodion.borovyk@gmail.com>
Signed-off-by: Rodion Borovyk <rodion.borovyk@gmail.com>
Signed-off-by: Rodion Borovyk <rodion.borovyk@gmail.com>
Signed-off-by: Rodion Borovyk <rodion.borovyk@gmail.com>
Signed-off-by: Rodion Borovyk <rodion.borovyk@gmail.com>
@rodio
Copy link
Copy Markdown
Author

rodio commented May 4, 2026

@maltesander I've pushed what I've come up whith. volunteer() now returns the reciever part of tokio::watch and an abort handle to abort leader election. Unfortunately I was not able to guarantee that the ephemeral nodes are dropped if it is aborted, but this is mentioned in docs.

Copy link
Copy Markdown
Member

@maltesander maltesander left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @rodio, thank you for the fixes. Some proposals and questions.

Comment thread src/recipes/leader/mod.rs Outdated

impl LeaderElection {
/// Create a new leader election struct
pub fn new(zk: ZooKeeper, election_node: &str, acl: &'static [Acl]) -> Self {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This wont allow constructing ACLs at runtime. What about just a Vec<Acl>?

Comment thread src/recipes/leader/mod.rs Outdated
.notify(|err, dur| {
warn!("retrying {:?} after {:?}", err, dur);
})
.when(|e| e.to_string() == ZNODE_NOT_FOUND_ERROR_MSG)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be proper error variants and not a string match.
enum ObserveError { ZnodeNotFound, ... } and .when(|e| matches!(e, ObserveError::ZnodeNotFound))

Comment thread src/recipes/leader/mod.rs Outdated
Comment on lines +16 to +18
static CANT_SEND_ERROR_MSG: &str = "Can't send leadership state update";
static RX_CANCELLED_ERROR_MSG: &str = "Watch receiver canceled, server disconnected?";
static ZNODE_NOT_FOUND_ERROR_MSG: &str = "The ephemeral znode of the participant was not found";
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be proper error variants and replace the whatever!

#[derive(Debug, Snafu)]
pub enum LeaderElectionError {
    #[snafu(display("ephemeral znode of the participant was not found"))]
    ZnodeNotFound,
    #[snafu(display("zookeeper error"))]
    Zk { source: crate::Error },
    // ...
}

and when(|e| matches!(e, LeaderElectionError::ZnodeNotFound))

Comment thread src/recipes/leader/mod.rs Outdated
Comment on lines +277 to +281
error!(?event, "leader's ephemeral node changed");
whatever!(
"Unexpected change to the leader's node: {:?}",
event.event_type
);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would not log "Unexpected" or "error!" when the leader's session ends. When the leader's ephemeral disappears because the ZK session expired or was reset, that's the expected terminal condition for a leader losing the session. The "error!" can be downgraded to a info/warn?

Furthermore this collapses into LeadershipState::Error in observe(), which hides the "session-induced loss of leadership"?

Should we add a LeadershipState::Lost variant so callers can react appropriately (fail over, reconnect) versus genuine error conditions?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not understand the difference between session expiration and session reset yet, but just in case: whenever ZK server is killed, for example, we get Canceled on line 275 so that is the expected terminal condition that I've seen happening. I understand that we should not collapse the Canceled thing, I will add another LeadershipState variant but I think it would still make sense to log these unexpected changes as errors here.

With the log here on line 277 I wanted to indicate that something unexpected happened to the znode itself, for example NodeDataChanged or DataWatchRemoved. I do not know what to do when we receive such kinds of events in this leader election context, so I just wanted to hard-fail here.

But I'm guessing we could potentially also receive a WatchedEvent with only the KeeperState field set here on line 275 which could be any of these:

pub enum KeeperState {
/// The client is in the disconnected state - it is not connected to any server in the ensemble.
Disconnected = 0,
/// The client is in the connected state - it is connected to a server in the ensemble (one of
/// the servers specified in the host connection parameter during ZooKeeper client creation).
SyncConnected = 3,
/// Authentication has failed -- connection requires a new `ZooKeeper` instance.
AuthFailed = 4,
/// The client is connected to a read-only server, that is the server which is not currently
/// connected to the majority. The only operations allowed after receiving this state is read
/// operations. This state is generated for read-only clients only since read/write clients
/// aren't allowed to connect to read-only servers.
ConnectedReadOnly = 5,
/// Used to notify clients that they are SASL-authenticated, so that they can perform ZooKeeper
/// actions with their SASL-authorized permissions.
SaslAuthenticated = 6,
/// The serving cluster has expired this session. The ZooKeeper client connection (the session)
/// is no longer valid. You must create a new client connection (instantiate a new `ZooKeeper`
/// instance) if you with to access the ensemble.
Expired = -112,
}
So in this case I'm not even sure what to do if, for example, we somehow receive a SaslAuthenticated event here... Should I just unpack all these possible values and ignore all the SaslAuthenticated, SyncConnected and send something like LeadershipState::Lost in case we get Expired or Disconnected?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no difference between session expiration and session reset from a ZooKeeper perspective. The cluster decides a session is dead because the heartbeats stopped.

Yes, i think we should handle the potential WatchedEvent. What do you think about something like this (not complete - and using whatever! for brevity)?

match (event.event_type, event.keeper_state) {
    // expected leadership loss paths
    (NodeDeleted, _) => {
        info!("leader's ephemeral was deleted, session likely expired");
        leader_sender.send(LeadershipState::Lost).ok();
        Ok(())
    }
    (None, KeeperState::Expired) => {
        info!("session expired while leader");
        leader_sender.send(LeadershipState::Lost).ok();
        Ok(())
    }
    (None, KeeperState::AuthFailed) => {
        // genuinely unrecoverable.
        whatever!("auth failed while leader")
    }

    // transient, don't transition, re-await
    (None, KeeperState::Disconnected) => {
        // Session still alive; we're just temporarily unable to reach the cluster.
        // The whole point of session timeouts is to ride this out without losing leadership.
        debug!("transient disconnect while leader, continuing to wait");
        // Re-run observe, the watch is gone with the connection, we'll re-set it.
        Ok(())
    }
    (None, KeeperState::SyncConnected | KeeperState::SaslAuthenticated | KeeperState::ConnectedReadOnly) => {
    	// not sure about this one
        debug!(?event.keeper_state, "benign state event while leader, continuing to wait");
        Ok(())
    }

    // genuinely unexpected, keep hard-fail
    (NodeDataChanged | NodeChildrenChanged | DataWatchRemoved | NodeCreated, _) => {
    	// this can be error logged but will appear via snafu as well
        whatever!("unexpected change to leader's node: {:?}", event.event_type)
    }
}

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm having second thoughts on having a separate LeadershipState::Lost because I'm struggling to understand why would callers need to react differently in different situations. For example, TCP connection dies - we get rx Canceled on line 275 - we send an error to the watcher - caller needs to reconnect; we receive KeeperState::Expired - client still needs to reconnect so it might as well be the same error. Do you have a specific example in mind where we might need to differentiate?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it might be overengineered. But i think only because the possible (reaction) of a caller might be the same, we should not hide any (error) state? But if it is 100% equal we dont have to split as well...

Thinking about it, i dont really have strong examples to not collapse it for now.

I can think of observability (e.g. operators care why leader ship was lost (network, quorom lost -> infrastructure)) and that mentioned race condition like the difference between:

  • you lost leadership and know it
  • you lost leadership and a successor might already be acting

If i want to react differently on the suspicion of loss vs confirmed loss, we would lose that.

But i rather go without that distinction for now than make it more complicated than it has to be.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've made changes to error handling here that are similar to your suggestion. Not sure if the retrying on "benign" errors is going to make a lot of sense because right on the next iteration of the loop we're doing get_children()and I think it would fail as well, but it is not possible to retry get_children() in a meniningful way for now because it returns a generic crate::Error.

Comment thread src/recipes/leader/mod.rs Outdated
}
Err(e) => {
warn!(
"can't get children: {e}, recoverable error, will now try to find my guid again..."
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is from the "create()"

Suggested change
"can't get children: {e}, recoverable error, will now try to find my guid again..."
"can't create ephemeral node: {e}, recoverable error, will now try to find my guid again..."

Comment thread src/recipes/leader/mod.rs Outdated
Comment on lines +39 to +40
/// An error has occured in the leader election procedure
Error,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we document Error as terminal. Once observe (line 222) sends the Error and returns, the spawned task is done. There is no path back without constructing a fresh LeaderElection and revolunteering.

Comment thread src/recipes/leader/mod.rs Outdated
Leader,
/// Leadership participation procedure has not yet started
Uninitialized,
/// An error has occured in the leader election procedure
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// An error has occured in the leader election procedure
/// An error has occurred in the leader election procedure

Comment thread src/recipes/leader/mod.rs Outdated
/// leader. To stop participating, drop the underlying ZooKeeper connection,
/// so that the underlying ephemeral znodes are removed.
/// - A [tokio::runtime::task::abort::AbortHandle]. Call .abort() to stop
/// participating in leader election. If a connection to ZooKeeper is kept
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// participating in leader election. If a connection to ZooKeeper is kept
/// participating in leader election. If a connection to ZooKeeper is kept

rodio added 6 commits May 5, 2026 18:45
Signed-off-by: Rodion Borovyk <rodion.borovyk@gmail.com>
Signed-off-by: Rodion Borovyk <rodion.borovyk@gmail.com>
Signed-off-by: Rodion Borovyk <rodion.borovyk@gmail.com>
Signed-off-by: Rodion Borovyk <rodion.borovyk@gmail.com>
Signed-off-by: Rodion Borovyk <rodion.borovyk@gmail.com>
Signed-off-by: Rodion Borovyk <rodion.borovyk@gmail.com>
Signed-off-by: Rodion Borovyk <rodion.borovyk@gmail.com>
@rodio
Copy link
Copy Markdown
Author

rodio commented May 11, 2026

@maltesander I've applied your suggestions, please have a look whenever you have time. Thanks!

Signed-off-by: Rodion Borovyk <rodion.borovyk@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants