-
This serializes all client writes. I hope to know why it's there and if it will be fixed in the future. Thanks |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 3 replies
-
can following code fix this issue? Thanks diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 77870bfc..de034882 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -9,9 +9,10 @@ use std::sync::Arc; use std::time::Duration; use anyerror::AnyError; -use futures::stream::FuturesUnordered; -use futures::StreamExt; -use futures::TryFutureExt; +use futures::{ + stream::{FuturesOrdered, FuturesUnordered}, + StreamExt, TryFutureExt +}; use maplit::btreeset; use tokio::select; use tokio::sync::mpsc; @@ -158,6 +159,10 @@ impl LeaderData { // TODO: remove SM /// The core type implementing the Raft protocol. + +type LogFLushResult = Result::NodeId>, std::io::Error>; +type OneshotReceiver = <::AsyncRuntime as AsyncRuntime>::OneshotReceiver; + pub struct RaftCore where C: RaftTypeConfig, @@ -208,6 +213,8 @@ where pub(crate) span: Span, + pub(crate) stream_log_flushed: FuturesOrdered>>, + pub(crate) _p: PhantomData, } @@ -735,9 +742,8 @@ where let callback = LogFlushed::new(log_io_id, tx); self.log_store.append(entries, callback).await?; - rx.await - .map_err(|e| StorageIOError::write_logs(AnyError::error(e)))? - .map_err(|e| StorageIOError::write_logs(AnyError::error(e)))?; + + self.stream_log_flushed.push_back(rx); Ok(()) } @@ -931,6 +937,28 @@ where // In each loop, the first step is blocking waiting for any message from any channel. // Then if there is any message, process as many as possible to maximize throughput. + let poll_log_flush = async { + if self.stream_log_flushed.is_empty() { + return None; + } + while let Some(flush_res) = self.stream_log_flushed.next().await { + let log_io_id = flush_res + .map_err(|e| AnyError::error(e)) + .and_then(|r| r.map_err(|e| AnyError::error(e))); + match log_io_id { + Ok(log_io_id) => { + // The leader may have changed. + // But reporting to a different leader is not a problem. + if let Ok(mut lh) = self.engine.leader_handler() { + lh.replication_handler().update_local_progress(log_io_id.log_id); + } + } + Err(err) => { return Some(Err(err)); } + } + } + Some(Ok(())) + }; + select! { // Check shutdown in each loop first so that a message flood in `tx_api` won't block shutting down. // `select!` without `biased` provides a random fairness. @@ -943,6 +971,10 @@ where return Err(Fatal::Stopped); } + Some(flush_res) = poll_log_flush => { + flush_res.map_err(|e| Into::>::into(StorageIOError::write_logs(e)))?; + } + notify_res = self.rx_notify.recv() => { match notify_res { Some(notify) => self.handle_notify(notify)?, @@ -1611,15 +1643,9 @@ where } Command::AppendInputEntries { vote, entries } => { let last_log_id = *entries.last().unwrap().get_log_id(); - tracing::debug!("AppendInputEntries: {}", DisplaySlice::<_>(&entries),); - + tracing::debug!("AppendInputEntries: {}", DisplaySlice::<_>(&entries)); self.append_to_log(entries, vote, last_log_id).await?; - // The leader may have changed. - // But reporting to a different leader is not a problem. - if let Ok(mut lh) = self.engine.leader_handler() { - lh.replication_handler().update_local_progress(Some(last_log_id)); - } } Command::SaveVote { vote } => { self.log_store.save_vote(&vote).await?; diff --git a/openraft/src/raft/mod.rs b/openraft/src/raft/mod.rs index a540ef37..3aba1f22 100644 --- a/openraft/src/raft/mod.rs +++ b/openraft/src/raft/mod.rs @@ -300,6 +300,8 @@ where C: RaftTypeConfig command_state: CommandState::default(), span: core_span, + stream_log_flushed: futures::stream::FuturesOrdered::new(), + _p: Default::default(), }; |
Beta Was this translation helpful? Give feedback.
-
Thank you for bringing attention to this issue and suggesting a solution. To make To ensure that log I/O is asynchronous, all I/O operations in A more effective storage API would handle all write I/O in a callback manner, allowing the application to serialize all I/O operations in a single internal queue or similar structure. Currently, log I/O operations are blocking, which naturally serializes them based on the execution order. Therefore, to transition to non-blocking and asynchronous operations, the |
Beta Was this translation helpful? Give feedback.
Hi @ukernel ,
thanks for looking into it.
Yes, the issue should be fixed ultimately to allow more dataflow-like processing. Your proposal seems on the first look like a workable solution (after some clean-ups), but I don't know all the dependencies inside of
RaftCore
, i.e., whether this might cause other problems.In any case, feel free to post it as a PR, it's easier to review and comment than a diff in an issue. Invite @drmingdrmer for the review, he's the main maintainer of the crate and he definitely knows the dependencies the best.
The issue I see is that
FuturesOrdered
seems to be optimized for a use case with large number of heavyweight futures with a large number of memory allocat…