Skip to content

Commit

Permalink
Chore: Replace Vote<C> with VoteOf<C>
Browse files Browse the repository at this point in the history
Introduce the type alias `VoteOf<C>` to simplify future migration from the
fixed type `Vote<C>` to an associated type `C::Vote`.

By defining `VoteOf<C> = Vote<C>` now,
and later updating it to `VoteOf<C> = C::Vote`,
the migration can be handled with minimal code changes, avoiding
large-scale modifications in the future.
  • Loading branch information
drmingdrmer committed Jan 3, 2025
1 parent 9bc0aef commit f8dd4d5
Show file tree
Hide file tree
Showing 31 changed files with 99 additions and 95 deletions.
14 changes: 7 additions & 7 deletions examples/memstore/src/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ use std::fmt::Debug;
use std::ops::RangeBounds;
use std::sync::Arc;

use openraft::alias::VoteOf;
use openraft::storage::IOFlushed;
use openraft::LogId;
use openraft::LogState;
use openraft::RaftLogId;
use openraft::RaftTypeConfig;
use openraft::StorageError;
use openraft::Vote;
use tokio::sync::Mutex;

/// RaftLogStore implementation with a in-memory storage
Expand All @@ -33,7 +33,7 @@ pub struct LogStoreInner<C: RaftTypeConfig> {
committed: Option<LogId<C>>,

/// The current granted vote.
vote: Option<Vote<C>>,
vote: Option<VoteOf<C>>,
}

impl<C: RaftTypeConfig> Default for LogStoreInner<C> {
Expand Down Expand Up @@ -84,12 +84,12 @@ impl<C: RaftTypeConfig> LogStoreInner<C> {
Ok(self.committed.clone())
}

async fn save_vote(&mut self, vote: &Vote<C>) -> Result<(), StorageError<C>> {
async fn save_vote(&mut self, vote: &VoteOf<C>) -> Result<(), StorageError<C>> {
self.vote = Some(vote.clone());
Ok(())
}

async fn read_vote(&mut self) -> Result<Option<Vote<C>>, StorageError<C>> {
async fn read_vote(&mut self) -> Result<Option<VoteOf<C>>, StorageError<C>> {
Ok(self.vote.clone())
}

Expand Down Expand Up @@ -135,14 +135,14 @@ mod impl_log_store {
use std::fmt::Debug;
use std::ops::RangeBounds;

use openraft::alias::VoteOf;
use openraft::storage::IOFlushed;
use openraft::storage::RaftLogStorage;
use openraft::LogId;
use openraft::LogState;
use openraft::RaftLogReader;
use openraft::RaftTypeConfig;
use openraft::StorageError;
use openraft::Vote;

use crate::log_store::LogStore;

Expand All @@ -157,7 +157,7 @@ mod impl_log_store {
inner.try_get_log_entries(range).await
}

async fn read_vote(&mut self) -> Result<Option<Vote<C>>, StorageError<C>> {
async fn read_vote(&mut self) -> Result<Option<VoteOf<C>>, StorageError<C>> {
let mut inner = self.inner.lock().await;
inner.read_vote().await
}
Expand All @@ -183,7 +183,7 @@ mod impl_log_store {
inner.read_committed().await
}

async fn save_vote(&mut self, vote: &Vote<C>) -> Result<(), StorageError<C>> {
async fn save_vote(&mut self, vote: &VoteOf<C>) -> Result<(), StorageError<C>> {
let mut inner = self.inner.lock().await;
inner.save_vote(vote).await
}
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/core/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ use crate::raft_state::IOId;
use crate::replication;
use crate::replication::ReplicationSessionId;
use crate::type_config::alias::InstantOf;
use crate::type_config::alias::VoteOf;
use crate::vote::committed::CommittedVote;
use crate::vote::non_committed::NonCommittedVote;
use crate::RaftTypeConfig;
use crate::StorageError;
use crate::Vote;

/// A message coming from the internal components.
pub(crate) enum Notification<C>
Expand All @@ -33,7 +33,7 @@ where C: RaftTypeConfig
target: C::NodeId,

/// The higher vote observed.
higher: Vote<C>,
higher: VoteOf<C>,

/// The Leader that sent replication request.
leader_vote: CommittedVote<C>,
Expand Down
6 changes: 3 additions & 3 deletions openraft/src/core/raft_msg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ use crate::type_config::alias::LogIdOf;
use crate::type_config::alias::OneshotSenderOf;
use crate::type_config::alias::ResponderOf;
use crate::type_config::alias::SnapshotDataOf;
use crate::type_config::alias::VoteOf;
use crate::ChangeMembers;
use crate::RaftState;
use crate::RaftTypeConfig;
use crate::Vote;

pub(crate) mod external_command;

Expand Down Expand Up @@ -52,7 +52,7 @@ where C: RaftTypeConfig
},

InstallFullSnapshot {
vote: Vote<C>,
vote: VoteOf<C>,
snapshot: Snapshot<C>,
tx: ResultSender<C, SnapshotResponse<C>>,
},
Expand Down Expand Up @@ -101,7 +101,7 @@ where C: RaftTypeConfig
/// Otherwise, just reset Leader lease so that the node `to` can become Leader.
HandleTransferLeader {
/// The vote of the Leader that is transferring the leadership.
from: Vote<C>,
from: VoteOf<C>,
/// The assigned node to be the next Leader.
to: C::NodeId,
},
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/engine/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ use crate::raft_state::IOId;
use crate::replication::request::Replicate;
use crate::replication::ReplicationSessionId;
use crate::type_config::alias::OneshotSenderOf;
use crate::type_config::alias::VoteOf;
use crate::vote::committed::CommittedVote;
use crate::LogId;
use crate::OptionalSend;
use crate::RaftTypeConfig;
use crate::Vote;

/// Commands to send to `RaftRuntime` to execute, to update the application state.
#[derive(Debug)]
Expand Down Expand Up @@ -112,7 +112,7 @@ where C: RaftTypeConfig
},

/// Save vote to storage
SaveVote { vote: Vote<C> },
SaveVote { vote: VoteOf<C> },

/// Send vote to all other members
SendVote { vote_req: VoteRequest<C> },
Expand Down
9 changes: 5 additions & 4 deletions openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use crate::storage::Snapshot;
use crate::storage::SnapshotMeta;
use crate::type_config::alias::ResponderOf;
use crate::type_config::alias::SnapshotDataOf;
use crate::type_config::alias::VoteOf;
use crate::type_config::TypeConfigExt;
use crate::vote::RaftLeaderId;
use crate::vote::RaftTerm;
Expand Down Expand Up @@ -113,7 +114,7 @@ where C: RaftTypeConfig
///
/// The candidate `last_log_id` is initialized with the attributes of Acceptor part:
/// [`RaftState`]
pub(crate) fn new_candidate(&mut self, vote: Vote<C>) -> &mut Candidate<C, LeaderQuorumSet<C>> {
pub(crate) fn new_candidate(&mut self, vote: VoteOf<C>) -> &mut Candidate<C, LeaderQuorumSet<C>> {
let now = C::now();
let last_log_id = self.state.last_log_id().cloned();

Expand Down Expand Up @@ -380,7 +381,7 @@ where C: RaftTypeConfig
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn handle_append_entries(
&mut self,
vote: &Vote<C>,
vote: &VoteOf<C>,
prev_log_id: Option<LogId<C>>,
entries: Vec<C::Entry>,
tx: Option<AppendEntriesTx<C>>,
Expand Down Expand Up @@ -419,7 +420,7 @@ where C: RaftTypeConfig

pub(crate) fn append_entries(
&mut self,
vote: &Vote<C>,
vote: &VoteOf<C>,
prev_log_id: Option<LogId<C>>,
entries: Vec<C::Entry>,
) -> Result<(), RejectAppendEntries<C>> {
Expand Down Expand Up @@ -453,7 +454,7 @@ where C: RaftTypeConfig
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn handle_install_full_snapshot(
&mut self,
vote: Vote<C>,
vote: VoteOf<C>,
snapshot: Snapshot<C>,
tx: ResultSender<C, SnapshotResponse<C>>,
) {
Expand Down
6 changes: 3 additions & 3 deletions openraft/src/engine/handler/vote_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ use crate::proposer::CandidateState;
use crate::proposer::LeaderState;
use crate::raft_state::IOId;
use crate::raft_state::LogStateReader;
use crate::type_config::alias::VoteOf;
use crate::type_config::TypeConfigExt;
use crate::vote::RaftLeaderId;
use crate::LogId;
use crate::OptionalSend;
use crate::RaftState;
use crate::RaftTypeConfig;
use crate::Vote;

#[cfg(test)]
mod accept_vote_test;
Expand Down Expand Up @@ -60,7 +60,7 @@ where C: RaftTypeConfig
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn accept_vote<T, E, F>(
&mut self,
vote: &Vote<C>,
vote: &VoteOf<C>,
tx: ResultSender<C, T, E>,
f: F,
) -> Option<ResultSender<C, T, E>>
Expand Down Expand Up @@ -99,7 +99,7 @@ where C: RaftTypeConfig
/// Note: This method does not check last-log-id. handle-vote-request has to deal with
/// last-log-id itself.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn update_vote(&mut self, vote: &Vote<C>) -> Result<(), RejectVoteRequest<C>> {
pub(crate) fn update_vote(&mut self, vote: &VoteOf<C>) -> Result<(), RejectVoteRequest<C>> {
// Partial ord compare:
// Vote does not have to be total ord.
// `!(a >= b)` does not imply `a < b`.
Expand Down
12 changes: 6 additions & 6 deletions openraft/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ use crate::network::RPCTypes;
use crate::raft::AppendEntriesResponse;
use crate::raft_types::SnapshotSegmentId;
use crate::try_as_ref::TryAsRef;
use crate::type_config::alias::VoteOf;
use crate::LogId;
use crate::Membership;
use crate::RaftTypeConfig;
use crate::StorageError;
use crate::Vote;

/// RaftError is returned by API methods of `Raft`.
///
Expand Down Expand Up @@ -346,8 +346,8 @@ where
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
#[error("seen a higher vote: {higher} GT mine: {sender_vote}")]
pub(crate) struct HigherVote<C: RaftTypeConfig> {
pub(crate) higher: Vote<C>,
pub(crate) sender_vote: Vote<C>,
pub(crate) higher: VoteOf<C>,
pub(crate) sender_vote: VoteOf<C>,
}

/// Error that indicates a **temporary** network error and when it is returned, Openraft will retry
Expand Down Expand Up @@ -603,7 +603,7 @@ pub struct LearnerNotFound<C: RaftTypeConfig> {
#[error("not allowed to initialize due to current raft state: last_log_id: {last_log_id:?} vote: {vote}")]
pub struct NotAllowed<C: RaftTypeConfig> {
pub last_log_id: Option<LogId<C>>,
pub vote: Vote<C>,
pub vote: VoteOf<C>,
}

#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
Expand Down Expand Up @@ -636,7 +636,7 @@ pub enum NoForward {}
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
pub(crate) enum RejectVoteRequest<C: RaftTypeConfig> {
#[error("reject vote request by a greater vote: {0}")]
ByVote(Vote<C>),
ByVote(VoteOf<C>),

#[allow(dead_code)]
#[error("reject vote request by a greater last-log-id: {0:?}")]
Expand All @@ -659,7 +659,7 @@ where C: RaftTypeConfig
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
pub(crate) enum RejectAppendEntries<C: RaftTypeConfig> {
#[error("reject AppendEntries by a greater vote: {0}")]
ByVote(Vote<C>),
ByVote(VoteOf<C>),

#[error("reject AppendEntries because of conflicting log-id: {local:?}; expect to be: {expect:?}")]
ByConflictingLogId { expect: LogId<C>, local: Option<LogId<C>> },
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/metrics/metric.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::cmp::Ordering;

use crate::metrics::metric_display::MetricDisplay;
use crate::type_config::alias::VoteOf;
use crate::LogId;
use crate::LogIdOptionExt;
use crate::RaftMetrics;
use crate::RaftTypeConfig;
use crate::Vote;

/// A metric entry of a Raft node.
///
Expand All @@ -15,7 +15,7 @@ pub enum Metric<C>
where C: RaftTypeConfig
{
Term(C::Term),
Vote(Vote<C>),
Vote(VoteOf<C>),
LastLogIndex(Option<u64>),
Applied(Option<LogId<C>>),
AppliedIndex(Option<u64>),
Expand Down
5 changes: 3 additions & 2 deletions openraft/src/metrics/raft_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::metrics::ReplicationMetrics;
use crate::metrics::SerdeInstant;
use crate::type_config::alias::InstantOf;
use crate::type_config::alias::SerdeInstantOf;
use crate::type_config::alias::VoteOf;
use crate::Instant;
use crate::LogId;
use crate::RaftTypeConfig;
Expand All @@ -32,7 +33,7 @@ pub struct RaftMetrics<C: RaftTypeConfig> {
pub current_term: C::Term,

/// The last flushed vote.
pub vote: Vote<C>,
pub vote: VoteOf<C>,

/// The last log index has been appended to this Raft node's log.
pub last_log_index: Option<u64>,
Expand Down Expand Up @@ -280,7 +281,7 @@ where C: RaftTypeConfig
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
pub struct RaftServerMetrics<C: RaftTypeConfig> {
pub id: C::NodeId,
pub vote: Vote<C>,
pub vote: VoteOf<C>,
pub state: ServerState,
pub current_leader: Option<C::NodeId>,

Expand Down
4 changes: 2 additions & 2 deletions openraft/src/metrics/wait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ use crate::core::ServerState;
use crate::metrics::Condition;
use crate::metrics::Metric;
use crate::metrics::RaftMetrics;
use crate::type_config::alias::VoteOf;
use crate::type_config::alias::WatchReceiverOf;
use crate::type_config::TypeConfigExt;
use crate::LogId;
use crate::OptionalSend;
use crate::RaftTypeConfig;
use crate::Vote;

// Error variants related to metrics.
#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -93,7 +93,7 @@ where C: RaftTypeConfig

/// Wait for `vote` to become `want` or timeout.
#[tracing::instrument(level = "trace", skip(self), fields(msg=msg.to_string().as_str()))]
pub async fn vote(&self, want: Vote<C>, msg: impl ToString) -> Result<RaftMetrics<C>, WaitError> {
pub async fn vote(&self, want: VoteOf<C>, msg: impl ToString) -> Result<RaftMetrics<C>, WaitError> {
self.eq(Metric::Vote(want), msg).await
}

Expand Down
8 changes: 4 additions & 4 deletions openraft/src/network/snapshot_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ mod tokio_rt {
use crate::raft::InstallSnapshotRequest;
use crate::raft::SnapshotResponse;
use crate::storage::Snapshot;
use crate::type_config::alias::VoteOf;
use crate::type_config::TypeConfigExt;
use crate::ErrorSubject;
use crate::ErrorVerb;
Expand All @@ -37,15 +38,14 @@ mod tokio_rt {
use crate::RaftTypeConfig;
use crate::StorageError;
use crate::ToStorageResult;
use crate::Vote;

/// This chunk based implementation requires `SnapshotData` to be `AsyncRead + AsyncSeek`.
impl<C: RaftTypeConfig> SnapshotTransport<C> for Chunked
where C::SnapshotData: tokio::io::AsyncRead + tokio::io::AsyncWrite + tokio::io::AsyncSeek + Unpin
{
async fn send_snapshot<Net>(
net: &mut Net,
vote: Vote<C>,
vote: VoteOf<C>,
mut snapshot: Snapshot<C>,
mut cancel: impl Future<Output = ReplicationClosed> + OptionalSend + 'static,
option: RPCOption,
Expand Down Expand Up @@ -272,12 +272,12 @@ use crate::network::RPCOption;
use crate::raft::InstallSnapshotRequest;
use crate::raft::SnapshotResponse;
use crate::storage::Snapshot;
use crate::type_config::alias::VoteOf;
use crate::OptionalSend;
use crate::Raft;
use crate::RaftNetwork;
use crate::RaftTypeConfig;
use crate::SnapshotId;
use crate::Vote;

/// Send and Receive snapshot by chunks.
pub struct Chunked {}
Expand All @@ -299,7 +299,7 @@ pub trait SnapshotTransport<C: RaftTypeConfig> {
// TODO: consider removing dependency on RaftNetwork
async fn send_snapshot<Net>(
net: &mut Net,
vote: Vote<C>,
vote: VoteOf<C>,
snapshot: Snapshot<C>,
cancel: impl Future<Output = ReplicationClosed> + OptionalSend + 'static,
option: RPCOption,
Expand Down
Loading

0 comments on commit f8dd4d5

Please sign in to comment.