Skip to content

Commit 7366f71

Browse files
committed
feat: switch feature flags into RUSTFLAGS
1 parent f5478b9 commit 7366f71

File tree

9 files changed

+123
-97
lines changed

9 files changed

+123
-97
lines changed

Cargo.toml

Lines changed: 49 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,6 @@ edition = "2021"
77
[features]
88

99
# async_std
10-
full-ci = ["async-std-executor", "logging-utils", "channel-async-std"]
11-
12-
tokio-ci = ["tokio-executor", "logging-utils", "channel-tokio"]
13-
async-std-executor = ["dep:async-std"]
14-
tokio-executor = ["dep:tokio", "dep:tokio-stream", "dep:console-subscriber"]
15-
channel-flume = ["flume"]
16-
channel-tokio = ["dep:tokio", "dep:tokio-stream"]
17-
channel-async-std = ["dep:async-channel"]
1810
profiling = [
1911
"opentelemetry-jaeger",
2012
"tracing-opentelemetry",
@@ -27,38 +19,15 @@ logging-utils = ["tracing-subscriber"]
2719

2820
[dependencies]
2921
async-lock = "2.7"
30-
async-std = { version = "1.12", features = [
31-
"attributes",
32-
"unstable",
33-
], optional = true }
34-
async-channel = { version = "1.9.0", optional = true }
3522
async-trait = "0.1.71"
3623
color-eyre = "0.6.2"
37-
flume = { version = "0.10.14", optional = true }
3824
futures = "0.3.28"
39-
tokio = { version = "1", optional = true, features = [
40-
"fs",
41-
"io-util",
42-
"io-std",
43-
"macros",
44-
"net",
45-
"parking_lot",
46-
"process",
47-
"rt",
48-
"rt-multi-thread",
49-
"signal",
50-
"sync",
51-
"time",
52-
"tracing",
53-
] }
54-
tokio-stream = { version = "0.1.14", optional = true }
5525
tracing = "0.1.37"
5626
tracing-error = "0.2.0"
5727
tracing-subscriber = { version = "0.3.17", features = [
5828
"env-filter",
5929
"json",
6030
], optional = true }
61-
console-subscriber = { version = "0.1.10", optional = true }
6231
opentelemetry = { version = "0.19.0", features = [
6332
"rt-tokio-current-thread",
6433
"metrics",
@@ -69,3 +38,52 @@ opentelemetry-jaeger = { version = "0.17.0", features = [
6938
"rt-tokio-current-thread",
7039
], optional = true }
7140
opentelemetry-aws = { version = "0.7.0", features = ["trace"], optional = true }
41+
42+
[target.'cfg(all(async_executor_impl = "tokio"))'.dependencies]
43+
tokio = { version = "1", features = [
44+
"fs",
45+
"io-util",
46+
"io-std",
47+
"macros",
48+
"net",
49+
"parking_lot",
50+
"process",
51+
"rt",
52+
"rt-multi-thread",
53+
"signal",
54+
"sync",
55+
"time",
56+
"tracing",
57+
] }
58+
tokio-stream = { version = "0.1.14" }
59+
60+
[target.'cfg(all(async_executor_impl = "async-std"))'.dependencies]
61+
async-std = { version = "1.12", features = [
62+
"attributes",
63+
"unstable",
64+
]}
65+
66+
[target.'cfg(all(async_channel_impl = "tokio"))'.dependencies]
67+
console-subscriber = { version = "0.1.10" }
68+
tokio = { version = "1", features = [
69+
"fs",
70+
"io-util",
71+
"io-std",
72+
"macros",
73+
"net",
74+
"parking_lot",
75+
"process",
76+
"rt",
77+
"rt-multi-thread",
78+
"signal",
79+
"sync",
80+
"time",
81+
"tracing",
82+
] }
83+
tokio-stream = { version = "0.1.14" }
84+
85+
[target.'cfg(all(async_channel_impl = "async-std"))'.dependencies]
86+
async-channel = { version = "1.9.0" }
87+
88+
[target.'cfg(all(async_channel_impl = "flume"))'.dependencies]
89+
flume = { version = "0.10.14" }

README.md

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,17 @@
22

33
This crate exports four things:
44

5-
- A compatibility/abstraction layer for writing async-executor agnostic code. We support two async executors: async-std and tokio. Each may be toggled with a feature flag.
6-
- A compatibility/abstraction layer for writing async channel agnostic code. We support three async channel implementations: async-std-channels. Each may be toggled with a feature flag.
5+
- A compatibility/abstraction layer for writing async-executor agnostic code. We support two async executors: async-std and tokio. Each may be toggled with a configuration flag.
6+
- A compatibility/abstraction layer for writing async channel agnostic code. We support three async channel implementations: async-std-channels. Each may be toggled with a configuration flag.
77
- A library exporting a bunch of useful async primitives.
88
- A tracing configuration layer optionally supporting console and opentelemetry integration.
99

10+
# Example usage
1011

12+
```bash
13+
RUSTFLAGS='--cfg async_executor_impl="tokio" --cfg async_channel_impl="tokio"' cargo build
14+
```
15+
16+
`async_executor_impl` may be either `tokio` or `async-std`. `async_channel_impl` may be either `tokio`, `async-std`, or `flume`. Note that using `tokio` channels requires `tokio` to be the runtime. Note that the async executor impl and async channel impl must be set in order for this crate to compile successfully.
1117

1218

src/async_primitives/subscribable_mutex.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,12 @@ use futures::{stream::FuturesOrdered, Future, FutureExt};
55
use std::{fmt, time::Duration};
66
use tracing::warn;
77

8-
#[cfg(feature = "async-std-executor")]
8+
#[cfg(all(async_executor_impl = "async-std"))]
99
use async_std::prelude::StreamExt;
10-
#[cfg(feature = "tokio-executor")]
10+
#[cfg(all(async_executor_impl = "tokio"))]
1111
use tokio_stream::StreamExt;
12-
#[cfg(not(any(feature = "async-std-executor", feature = "tokio-executor")))]
13-
std::compile_error! {"Either feature \"async-std-executor\" or feature \"tokio-executor\" must be enabled for this crate."}
12+
#[cfg(not(any(async_executor_impl = "async-std", async_executor_impl = "tokio")))]
13+
std::compile_error! {"The cfg flag async_executor_impl must be set in rustflags to either \"async-std\" or \"tokio\" for this crate. Try adding `--cfg async_executor_impl=\"tokio\""}
1414

1515
/// A mutex that can register subscribers to be notified. This works in the same way as [`Mutex`], but has some additional functions:
1616
///
@@ -251,10 +251,10 @@ mod tests {
251251
use std::{sync::Arc, time::Duration};
252252

253253
#[cfg_attr(
254-
feature = "tokio-executor",
254+
async_executor_impl = "tokio",
255255
tokio::test(flavor = "multi_thread", worker_threads = 2)
256256
)]
257-
#[cfg_attr(feature = "async-std-executor", async_std::test)]
257+
#[cfg_attr(async_executor_impl = "async-std", async_std::test)]
258258
async fn test_wait_timeout_until() {
259259
let mutex: Arc<SubscribableMutex<usize>> = Arc::default();
260260
{
@@ -276,10 +276,10 @@ mod tests {
276276
}
277277

278278
#[cfg_attr(
279-
feature = "tokio-executor",
279+
async_executor_impl = "tokio",
280280
tokio::test(flavor = "multi_thread", worker_threads = 2)
281281
)]
282-
#[cfg_attr(feature = "async-std-executor", async_std::test)]
282+
#[cfg_attr(async_executor_impl = "async-std", async_std::test)]
283283
async fn test_wait_timeout_until_fail() {
284284
let mutex: Arc<SubscribableMutex<usize>> = Arc::default();
285285
{
@@ -300,10 +300,10 @@ mod tests {
300300
}
301301

302302
#[cfg_attr(
303-
feature = "tokio-executor",
303+
async_executor_impl = "tokio",
304304
tokio::test(flavor = "multi_thread", worker_threads = 2)
305305
)]
306-
#[cfg_attr(feature = "async-std-executor", async_std::test)]
306+
#[cfg_attr(async_executor_impl = "async-std", async_std::test)]
307307
async fn test_compare_and_set() {
308308
let mutex = SubscribableMutex::new(5usize);
309309
let subscriber = mutex.subscribe().await;
@@ -322,10 +322,10 @@ mod tests {
322322
}
323323

324324
#[cfg_attr(
325-
feature = "tokio-executor",
325+
async_executor_impl = "tokio",
326326
tokio::test(flavor = "multi_thread", worker_threads = 2)
327327
)]
328-
#[cfg_attr(feature = "async-std-executor", async_std::test)]
328+
#[cfg_attr(async_executor_impl = "async-std", async_std::test)]
329329
async fn test_subscriber() {
330330
let mutex = SubscribableMutex::new(5usize);
331331
let subscriber = mutex.subscribe().await;

src/channel.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ mod oneshot;
2121
/// Unbounded channels
2222
mod unbounded;
2323

24-
#[cfg(all(feature = "async-std-executor", feature = "channel-tokio"))]
25-
compile_error!("feature 'async-std-executor' and 'channel-tokio' cannot be used at the same time; 'channel-tokio' needs the tokio runtime");
24+
#[cfg(all(async_executor_impl = "async-std", async_channel_impl = "tokio"))]
25+
compile_error!("async_executor_impl = 'async-std-executor' and async_channel_impl = 'channel-tokio' cannot be used together; 'channel-tokio' needs the tokio runtime");
2626

2727
pub use bounded::{bounded, BoundedStream, Receiver, RecvError, SendError, Sender, TryRecvError};
2828
pub use oneshot::{oneshot, OneShotReceiver, OneShotRecvError, OneShotSender, OneShotTryRecvError};

src/channel/bounded.rs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::pin::Pin;
33
use futures::Stream;
44

55
/// inner module, used to group feature-specific imports
6-
#[cfg(feature = "channel-tokio")]
6+
#[cfg(all(async_channel_impl = "tokio"))]
77
mod inner {
88
pub use tokio::sync::mpsc::error::{SendError, TryRecvError};
99

@@ -45,7 +45,7 @@ mod inner {
4545
}
4646

4747
/// inner module, used to group feature-specific imports
48-
#[cfg(feature = "channel-flume")]
48+
#[cfg(all(async_channel_impl = "flume"))]
4949
mod inner {
5050
pub use flume::{RecvError, SendError, TryRecvError};
5151

@@ -75,7 +75,7 @@ mod inner {
7575
}
7676

7777
/// inner module, used to group feature-specific imports
78-
#[cfg(feature = "channel-async-std")]
78+
#[cfg(all(async_channel_impl = "async-std"))]
7979
mod inner {
8080
pub use async_channel::{RecvError, SendError, TryRecvError};
8181

@@ -114,9 +114,9 @@ impl<T> Sender<T> {
114114
///
115115
/// Will return an error if the receiver is dropped
116116
pub async fn send(&self, msg: T) -> Result<(), SendError<T>> {
117-
#[cfg(feature = "channel-flume")]
117+
#[cfg(all(async_channel_impl = "flume"))]
118118
let result = self.0.send_async(msg).await;
119-
#[cfg(not(feature = "channel-flume"))]
119+
#[cfg(not(all(async_channel_impl = "flume")))]
120120
let result = self.0.send(msg).await;
121121

122122
result
@@ -130,11 +130,11 @@ impl<T> Receiver<T> {
130130
///
131131
/// Will return an error if the sender is dropped
132132
pub async fn recv(&mut self) -> Result<T, RecvError> {
133-
#[cfg(feature = "channel-flume")]
133+
#[cfg(all(async_channel_impl = "flume"))]
134134
let result = self.0.recv_async().await;
135-
#[cfg(feature = "channel-tokio")]
135+
#[cfg(all(async_channel_impl = "tokio"))]
136136
let result = self.0.recv().await.ok_or(RecvError);
137-
#[cfg(feature = "channel-async-std")]
137+
#[cfg(all(async_channel_impl = "async-std"))]
138138
let result = self.0.recv().await;
139139

140140
result
@@ -144,11 +144,11 @@ impl<T> Receiver<T> {
144144
where
145145
T: 'static,
146146
{
147-
#[cfg(feature = "channel-async-std")]
147+
#[cfg(all(async_channel_impl = "async-std"))]
148148
let result = self.0;
149-
#[cfg(feature = "channel-tokio")]
149+
#[cfg(all(async_channel_impl = "tokio"))]
150150
let result = tokio_stream::wrappers::ReceiverStream::new(self.0);
151-
#[cfg(feature = "channel-flume")]
151+
#[cfg(all(async_channel_impl = "flume"))]
152152
let result = self.0.into_stream();
153153

154154
BoundedStream(result)
@@ -219,14 +219,14 @@ impl<T> Stream for BoundedStream<T> {
219219
mut self: std::pin::Pin<&mut Self>,
220220
cx: &mut std::task::Context<'_>,
221221
) -> std::task::Poll<Option<Self::Item>> {
222-
#[cfg(feature = "channel-flume")]
222+
#[cfg(all(async_channel_impl = "flume"))]
223223
return <flume::r#async::RecvStream<T>>::poll_next(Pin::new(&mut self.0), cx);
224-
#[cfg(feature = "channel-tokio")]
224+
#[cfg(all(async_channel_impl = "tokio"))]
225225
return <tokio_stream::wrappers::ReceiverStream<T> as Stream>::poll_next(
226226
Pin::new(&mut self.0),
227227
cx,
228228
);
229-
#[cfg(feature = "channel-async-std")]
229+
#[cfg(all(async_channel_impl = "async-std"))]
230230
return <async_channel::Receiver<T> as Stream>::poll_next(Pin::new(&mut self.0), cx);
231231
}
232232
}

src/channel/oneshot.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/// inner module, used to group feature-specific imports
2-
#[cfg(feature = "channel-tokio")]
2+
#[cfg(all(async_channel_impl = "tokio"))]
33
mod inner {
44
pub use tokio::sync::oneshot::{error::TryRecvError as OneShotTryRecvError, Receiver, Sender};
55

@@ -40,7 +40,7 @@ mod inner {
4040
}
4141

4242
/// inner module, used to group feature-specific imports
43-
#[cfg(feature = "channel-flume")]
43+
#[cfg(all(async_channel_impl = "flume"))]
4444
mod inner {
4545
use flume::{Receiver, Sender};
4646
pub use flume::{RecvError as OneShotRecvError, TryRecvError as OneShotTryRecvError};
@@ -61,7 +61,7 @@ mod inner {
6161
}
6262

6363
/// inner module, used to group feature-specific imports
64-
#[cfg(feature = "channel-async-std")]
64+
#[cfg(all(async_channel_impl = "async-std"))]
6565
mod inner {
6666
use async_std::channel::{Receiver, Sender};
6767
pub use async_std::channel::{
@@ -90,11 +90,11 @@ impl<T> OneShotSender<T> {
9090
///
9191
/// If this fails because the receiver is dropped, a warning will be printed.
9292
pub fn send(self, msg: T) {
93-
#[cfg(feature = "channel-async-std")]
93+
#[cfg(all(async_channel_impl = "async-std"))]
9494
if self.0.try_send(msg).is_err() {
9595
tracing::warn!("Could not send msg on OneShotSender, did the receiver drop?");
9696
}
97-
#[cfg(not(feature = "channel-async-std"))]
97+
#[cfg(not(all(async_channel_impl = "async-std")))]
9898
if self.0.send(msg).is_err() {
9999
tracing::warn!("Could not send msg on OneShotSender, did the receiver drop?");
100100
}
@@ -108,11 +108,11 @@ impl<T> OneShotReceiver<T> {
108108
///
109109
/// Will return an error if the sender channel is dropped
110110
pub async fn recv(self) -> Result<T, OneShotRecvError> {
111-
#[cfg(feature = "channel-tokio")]
111+
#[cfg(all(async_channel_impl = "tokio"))]
112112
let result = self.0.await.map_err(Into::into);
113-
#[cfg(feature = "channel-flume")]
113+
#[cfg(all(async_channel_impl = "flume"))]
114114
let result = self.0.recv_async().await;
115-
#[cfg(feature = "channel-async-std")]
115+
#[cfg(all(async_channel_impl = "async-std"))]
116116
let result = self.0.recv().await;
117117

118118
result

0 commit comments

Comments
 (0)