Skip to content

Commit 12a2bba

Browse files
committed
avoid runtime calls as much as possible
1 parent 9e85620 commit 12a2bba

File tree

2 files changed

+29
-11
lines changed

2 files changed

+29
-11
lines changed

src/connection.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use crate::{
99
heartbeat::Heartbeat,
1010
internal_rpc::{InternalRPC, InternalRPCHandle},
1111
io_loop::IoLoop,
12+
reactor::FullReactor,
1213
runtime,
1314
socket_state::SocketState,
1415
tcp::{AMQPUriTcpExt, OwnedTLSConfig},
@@ -18,6 +19,7 @@ use crate::{
1819
};
1920
use amq_protocol::frame::{AMQPFrame, ProtocolVersion};
2021
use async_trait::async_trait;
22+
use executor_trait::FullExecutor;
2123
use std::{fmt, io, sync::Arc};
2224
use tracing::{Level, level_enabled, trace};
2325

@@ -181,7 +183,11 @@ impl Connection {
181183
pub async fn connector(
182184
uri: AMQPUri,
183185
connect: Box<
184-
dyn (Fn(AMQPUri) -> Box<dyn Future<Output = io::Result<AsyncTcpStream>> + Send>)
186+
dyn (Fn(
187+
AMQPUri,
188+
Arc<dyn FullExecutor + Send + Sync + 'static>,
189+
Arc<dyn FullReactor + Send + Sync + 'static>,
190+
) -> Box<dyn Future<Output = io::Result<AsyncTcpStream>> + Send>)
185191
+ Send
186192
+ Sync,
187193
>,
@@ -194,15 +200,15 @@ impl Connection {
194200
let frames = Frames::default();
195201
let socket_state = SocketState::default();
196202
let internal_rpc = InternalRPC::new(executor.clone(), socket_state.handle());
197-
let heartbeat = Heartbeat::new(status.clone(), executor.clone(), reactor);
203+
let heartbeat = Heartbeat::new(status.clone(), executor.clone(), reactor.clone());
198204
let channels = Channels::new(
199205
configuration.clone(),
200206
status.clone(),
201207
socket_state.handle(),
202208
internal_rpc.handle(),
203209
frames.clone(),
204210
heartbeat.clone(),
205-
executor,
211+
executor.clone(),
206212
uri.clone(),
207213
options.clone(),
208214
);
@@ -220,7 +226,7 @@ impl Connection {
220226
);
221227

222228
internal_rpc.start(conn.channels.clone());
223-
conn.io_loop.register(io_loop.start()?);
229+
conn.io_loop.register(io_loop.start(executor, reactor)?);
224230
conn.start(uri, options).await
225231
}
226232

@@ -288,12 +294,10 @@ impl Connect for AMQPUri {
288294
options: ConnectionProperties,
289295
config: OwnedTLSConfig,
290296
) -> Result<Connection> {
291-
let executor = runtime::executor()?;
292-
let reactor = runtime::reactor()?;
293297
let config = Arc::new(config);
294298
Connection::connector(
295299
self,
296-
Box::new(move |uri| {
300+
Box::new(move |uri, executor, reactor| {
297301
let config = config.clone();
298302
let reactor = reactor.clone();
299303
let executor = executor.clone();

src/io_loop.rs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,14 @@ use crate::{
99
internal_rpc::InternalRPCHandle,
1010
killswitch::KillSwitch,
1111
protocol::{self, AMQPError, AMQPHardError},
12+
reactor::FullReactor,
1213
socket_state::SocketState,
1314
thread::JoinHandle,
1415
types::FrameSize,
1516
uri::AMQPUri,
1617
};
1718
use amq_protocol::frame::{AMQPFrame, GenError, gen_frame, parse_frame};
19+
use executor_trait::FullExecutor;
1820
use futures_io::{AsyncRead, AsyncWrite};
1921
use std::{
2022
collections::VecDeque,
@@ -44,7 +46,11 @@ pub struct IoLoop {
4446
heartbeat: Heartbeat,
4547
socket_state: SocketState,
4648
connect: Arc<
47-
dyn (Fn(AMQPUri) -> Box<dyn Future<Output = io::Result<AsyncTcpStream>> + Send>)
49+
dyn (Fn(
50+
AMQPUri,
51+
Arc<dyn FullExecutor + Send + Sync + 'static>,
52+
Arc<dyn FullReactor + Send + Sync + 'static>,
53+
) -> Box<dyn Future<Output = io::Result<AsyncTcpStream>> + Send>)
4854
+ Send
4955
+ Sync,
5056
>,
@@ -65,7 +71,11 @@ impl IoLoop {
6571
frames: Frames,
6672
socket_state: SocketState,
6773
connect: Arc<
68-
dyn (Fn(AMQPUri) -> Box<dyn Future<Output = io::Result<AsyncTcpStream>> + Send>)
74+
dyn (Fn(
75+
AMQPUri,
76+
Arc<dyn FullExecutor + Send + Sync + 'static>,
77+
Arc<dyn FullReactor + Send + Sync + 'static>,
78+
) -> Box<dyn Future<Output = io::Result<AsyncTcpStream>> + Send>)
6979
+ Send
7080
+ Sync,
7181
>,
@@ -166,7 +176,11 @@ impl IoLoop {
166176
}
167177
}
168178

169-
pub(crate) fn start(mut self) -> Result<JoinHandle> {
179+
pub(crate) fn start(
180+
mut self,
181+
executor: Arc<dyn FullExecutor + Send + Sync + 'static>,
182+
reactor: Arc<dyn FullReactor + Send + Sync + 'static>,
183+
) -> Result<JoinHandle> {
170184
let waker = self.socket_state.handle();
171185
let current_span = tracing::Span::current();
172186
let handle = ThreadBuilder::new()
@@ -180,7 +194,7 @@ impl IoLoop {
180194
let mut writable_context = Context::from_waker(&writable_waker);
181195
let (mut stream, res) = loop {
182196
let (promise, resolver) = Promise::new();
183-
let connect = Box::into_pin((self.connect)(self.uri.clone()));
197+
let connect = Box::into_pin((self.connect)(self.uri.clone(), executor.clone(), reactor.clone()));
184198
self.internal_rpc.register_internal_future_with_resolver(async move { Ok(connect.await?) }, resolver);
185199
let mut stream = promise.wait().inspect_err(|err| {
186200
trace!("Poison connection attempt");

0 commit comments

Comments
 (0)