Skip to content

Commit 40fc428

Browse files
authored
refactor: provide topic in pubsub event (#337)
1 parent 66f9909 commit 40fc428

File tree

6 files changed

+249
-51
lines changed

6 files changed

+249
-51
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
# 0.13.0
2+
- refactor: provide topic in pubsub event. If a topic filter is supplied, the topic will be excluded from the event. [PR 337](https://github.com/dariusc93/rust-ipfs/pull/337)
3+
14
# 0.12.2
25
- feat: Reimplement ConnectionEvents and PeerConnectionEvents stream via `Ipfs::{connection_events, peer_connection_events}`. [PR 320](https://github.com/dariusc93/rust-ipfs/pull/320)
36

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ name = "rust-ipfs"
77
readme = "README.md"
88
repository = "https://github.com/dariusc93/rust-ipfs"
99
description = "IPFS node implementation"
10-
version = "0.12.2"
10+
version = "0.13.0"
1111

1212
[features]
1313
default = []

examples/pubsub.rs

Lines changed: 132 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
use clap::Parser;
2-
use futures::{pin_mut, FutureExt};
2+
use futures::FutureExt;
33
use ipld_core::ipld;
44
use libp2p::futures::StreamExt;
55
use libp2p::Multiaddr;
66
use rust_ipfs::p2p::MultiaddrExt;
7-
use rust_ipfs::{Ipfs, Keypair, PubsubEvent, UninitializedIpfs};
7+
use rust_ipfs::{ConnectionEvents, Ipfs, Keypair, PubsubEvent, UninitializedIpfs};
88

9+
use parking_lot::Mutex;
10+
use pollable_map::stream::StreamMap;
911
use rustyline_async::Readline;
1012
use std::time::Duration;
1113
use std::{io::Write, sync::Arc};
@@ -41,6 +43,8 @@ async fn main() -> anyhow::Result<()> {
4143

4244
let topic = opt.topic.unwrap_or_else(|| String::from("ipfs-chat"));
4345

46+
let main_topic = Arc::new(Mutex::new(topic.clone()));
47+
4448
let keypair = Keypair::generate_ed25519();
4549

4650
let peer_id = keypair.public().to_peer_id();
@@ -95,6 +99,16 @@ async fn main() -> anyhow::Result<()> {
9599

96100
let mut st = ipfs.connection_events().await?;
97101

102+
let mut main_events = StreamMap::new();
103+
104+
let mut listener_st = StreamMap::new();
105+
106+
let mut main_event_st = ipfs.pubsub_events(None).await?;
107+
108+
let stream = ipfs.pubsub_subscribe(topic.clone()).await?;
109+
110+
listener_st.insert(topic.clone(), stream);
111+
98112
for addr in opt.connect {
99113
let Some(peer_id) = addr.peer_id() else {
100114
writeln!(stdout, ">{addr} does not contain a p2p protocol. skipping")?;
@@ -109,41 +123,138 @@ async fn main() -> anyhow::Result<()> {
109123
writeln!(stdout, "Connected to {}", peer_id)?;
110124
}
111125

112-
let mut event_stream = ipfs.pubsub_events(&topic).await?;
113-
114-
let stream = ipfs.pubsub_subscribe(&topic).await?;
115-
116-
pin_mut!(stream);
117-
118-
tokio::spawn(topic_discovery(ipfs.clone(), topic.clone()));
126+
let owned_topic = topic.to_string();
127+
tokio::spawn(topic_discovery(ipfs.clone(), owned_topic));
119128

120129
tokio::task::yield_now().await;
121130

122131
loop {
123132
tokio::select! {
124-
data = stream.next() => {
125-
if let Some(msg) = data {
126-
writeln!(stdout, "{}: {}", msg.source.expect("Message should contain a source peer_id"), String::from_utf8_lossy(&msg.data))?;
133+
Some((topic, msg)) = listener_st.next() => {
134+
writeln!(stdout, "> {topic}: {}: {}", msg.source.expect("Message should contain a source peer_id"), String::from_utf8_lossy(&msg.data))?;
135+
}
136+
Some(conn_ev) = st.next() => {
137+
match conn_ev {
138+
ConnectionEvents::IncomingConnection{ peer_id, .. } => {
139+
writeln!(stdout, "> {peer_id} connected")?;
140+
}
141+
ConnectionEvents::OutgoingConnection{ peer_id, .. } => {
142+
writeln!(stdout, "> {peer_id} connected")?;
143+
}
144+
ConnectionEvents::ClosedConnection{ peer_id, .. } => {
145+
writeln!(stdout, "> {peer_id} disconnected")?;
146+
}
127147
}
128148
}
129-
conn_ev = st.next() => {
130-
if let Some(ev) = conn_ev {
131-
writeln!(stdout, "connection event: {ev:?}")?;
149+
Some(event) = main_event_st.next() => {
150+
match event {
151+
PubsubEvent::Subscribe { peer_id, topic: Some(topic) } => writeln!(stdout, "{} subscribed to {}", peer_id, topic)?,
152+
PubsubEvent::Unsubscribe { peer_id, topic: Some(topic) } => writeln!(stdout, "{} unsubscribed from {}", peer_id, topic)?,
153+
_ => unreachable!(),
132154
}
133155
}
134-
Some(event) = event_stream.next() => {
156+
Some((topic, event)) = main_events.next() => {
135157
match event {
136-
PubsubEvent::Subscribe { peer_id } => writeln!(stdout, "{} subscribed", peer_id)?,
137-
PubsubEvent::Unsubscribe { peer_id } => writeln!(stdout, "{} unsubscribed", peer_id)?,
158+
PubsubEvent::Subscribe { peer_id, topic: None } => writeln!(stdout, "{} subscribed to {}", peer_id, topic)?,
159+
PubsubEvent::Unsubscribe { peer_id, topic: None } => writeln!(stdout, "{} unsubscribed from {}", peer_id, topic)?,
160+
_ => unreachable!()
138161
}
139162
}
140163
line = rl.readline().fuse() => match line {
141164
Ok(rustyline_async::ReadlineEvent::Line(line)) => {
142-
if let Err(e) = ipfs.pubsub_publish(topic.clone(), line.as_bytes().to_vec()).await {
143-
writeln!(stdout, "Error publishing message: {e}")?;
165+
let line = line.trim();
166+
if !line.starts_with('/') {
167+
if !line.is_empty() {
168+
let topic_to_publish = &*main_topic.lock();
169+
if let Err(e) = ipfs.pubsub_publish(topic_to_publish.clone(), line.as_bytes().to_vec()).await {
170+
writeln!(stdout, "> error publishing message: {e}")?;
171+
continue;
172+
}
173+
writeln!(stdout, "{peer_id}: {line}")?;
174+
}
144175
continue;
145176
}
146-
writeln!(stdout, "{peer_id}: {line}")?;
177+
178+
let mut command = line.split(' ');
179+
180+
match command.next() {
181+
Some("/subscribe") => {
182+
let topic = match command.next() {
183+
Some(topic) => topic.to_string(),
184+
None => {
185+
writeln!(stdout, "> topic must be provided")?;
186+
continue;
187+
}
188+
};
189+
let event_st = ipfs.pubsub_events(topic.clone()).await?;
190+
let Ok(st) = ipfs.pubsub_subscribe(topic.clone()).await else {
191+
writeln!(stdout, "> already subscribed to topic")?;
192+
continue;
193+
};
194+
195+
listener_st.insert(topic.clone(), st);
196+
main_events.insert(topic.clone(), event_st);
197+
writeln!(stdout, "> subscribed to {}", topic)?;
198+
*main_topic.lock() = topic;
199+
continue;
200+
}
201+
Some("/unsubscribe") => {
202+
let topic = match command.next() {
203+
Some(topic) => topic.to_string(),
204+
None => main_topic.lock().clone()
205+
};
206+
207+
listener_st.remove(&topic);
208+
main_events.remove(&topic);
209+
210+
if !ipfs.pubsub_unsubscribe(&topic).await.unwrap_or_default() {
211+
writeln!(stdout, "> unable to unsubscribe from {}", topic)?;
212+
continue;
213+
}
214+
215+
writeln!(stdout, "> unsubscribe from {}", topic)?;
216+
if let Some(some_topic) = main_events.keys().next() {
217+
*main_topic.lock() = some_topic.clone();
218+
writeln!(stdout, "> setting current topic to {}", some_topic)?;
219+
}
220+
continue;
221+
}
222+
Some("/list-topics") => {
223+
let topics = ipfs.pubsub_subscribed().await.unwrap_or_default();
224+
if topics.is_empty() {
225+
writeln!(stdout, "> not subscribed to any topics")?;
226+
continue;
227+
}
228+
229+
let current_topic = main_topic.lock().clone();
230+
231+
writeln!(stdout, "> list of topics")?;
232+
for topic in topics {
233+
writeln!(stdout, "\t{topic} {}", if current_topic == topic { "- current" } else { "" } )?;
234+
}
235+
}
236+
Some("/set-current-topic") => {
237+
let topic = match command.next() {
238+
Some(topic) if !topic.is_empty() => topic.to_string(),
239+
None | _ => {
240+
writeln!(stdout, "> topic must be provided")?;
241+
continue;
242+
}
243+
};
244+
245+
let topics = ipfs.pubsub_subscribed().await.unwrap_or_default();
246+
if topics.is_empty() || !topics.contains(&topic) {
247+
writeln!(stdout, "> not subscribed to topic \"{topic}\"")?;
248+
continue;
249+
}
250+
251+
*main_topic.lock() = topic.clone();
252+
253+
writeln!(stdout, "> topic set to {topic}")?;
254+
}
255+
_ => continue
256+
}
257+
147258
}
148259
Ok(rustyline_async::ReadlineEvent::Eof) => {
149260
cancel.notify_one();

src/lib.rs

Lines changed: 41 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -449,13 +449,19 @@ impl From<DhtMode> for Option<Mode> {
449449
}
450450
}
451451

452-
#[derive(Debug, Clone)]
452+
#[derive(Debug, Clone, Eq, PartialEq)]
453453
pub enum PubsubEvent {
454454
/// Subscription event to a given topic
455-
Subscribe { peer_id: PeerId },
455+
Subscribe {
456+
peer_id: PeerId,
457+
topic: Option<String>,
458+
},
456459

457460
/// Unsubscribing event to a given topic
458-
Unsubscribe { peer_id: PeerId },
461+
Unsubscribe {
462+
peer_id: PeerId,
463+
topic: Option<String>,
464+
},
459465
}
460466

461467
#[derive(Debug, Clone)]
@@ -467,15 +473,6 @@ pub(crate) enum InnerPubsubEvent {
467473
Unsubscribe { topic: String, peer_id: PeerId },
468474
}
469475

470-
impl From<InnerPubsubEvent> for PubsubEvent {
471-
fn from(event: InnerPubsubEvent) -> Self {
472-
match event {
473-
InnerPubsubEvent::Subscribe { peer_id, .. } => PubsubEvent::Subscribe { peer_id },
474-
InnerPubsubEvent::Unsubscribe { peer_id, .. } => PubsubEvent::Unsubscribe { peer_id },
475-
}
476-
}
477-
}
478-
479476
type TSwarmEvent<C> = <TSwarm<C> as Stream>::Item;
480477
type TSwarmEventFn<C> = Arc<dyn Fn(&mut TSwarm<C>, &TSwarmEvent<C>) + Sync + Send>;
481478
type TTransportFn = Box<
@@ -1527,38 +1524,55 @@ impl Ipfs {
15271524
.await
15281525
}
15291526

1530-
/// Stream that returns [`PubsubEvent`] for a given topic
1527+
/// Stream that returns [`PubsubEvent`] for a given topic. if a topic is not supplied, it will provide all events emitted for any topic.
15311528
pub async fn pubsub_events(
15321529
&self,
1533-
topic: impl Into<String>,
1530+
topic: impl Into<Option<String>>,
15341531
) -> Result<BoxStream<'static, PubsubEvent>, Error> {
15351532
async move {
1536-
let topic = topic.into();
15371533
let (tx, rx) = oneshot_channel();
15381534

15391535
self.to_task
15401536
.clone()
15411537
.send(IpfsEvent::PubsubEventStream(tx))
15421538
.await?;
15431539

1544-
let mut receiver = rx
1545-
.await?;
1540+
let receiver = rx.await?;
15461541

1547-
let defined_topic = topic.to_string();
1542+
let defined_topic = topic.into();
15481543

1549-
let stream = async_stream::stream! {
1550-
while let Some(event) = receiver.next().await {
1551-
match &event {
1552-
InnerPubsubEvent::Subscribe { topic, .. } | InnerPubsubEvent::Unsubscribe { topic, .. } if topic.eq(&defined_topic) => yield event.into(),
1553-
_ => {}
1554-
}
1544+
let stream = receiver.filter_map(move |event| {
1545+
let defined_topic = defined_topic.clone();
1546+
async move {
1547+
let ev = match event {
1548+
InnerPubsubEvent::Subscribe { topic, peer_id } => {
1549+
let topic = match defined_topic {
1550+
Some(defined_topic) if defined_topic.eq(&topic) => None,
1551+
Some(defined_topic) if defined_topic.ne(&topic) => return None,
1552+
Some(_) => return None,
1553+
None => Some(topic),
1554+
};
1555+
PubsubEvent::Subscribe { peer_id, topic }
1556+
}
1557+
InnerPubsubEvent::Unsubscribe { topic, peer_id } => {
1558+
let topic = match defined_topic {
1559+
Some(defined_topic) if defined_topic.eq(&topic) => None,
1560+
Some(defined_topic) if defined_topic.ne(&topic) => return None,
1561+
Some(_) => return None,
1562+
None => Some(topic),
1563+
};
1564+
PubsubEvent::Unsubscribe { peer_id, topic }
1565+
}
1566+
};
1567+
1568+
Some(ev)
15551569
}
1556-
};
1570+
});
15571571

15581572
Ok(stream.boxed())
15591573
}
1560-
.instrument(self.span.clone())
1561-
.await
1574+
.instrument(self.span.clone())
1575+
.await
15621576
}
15631577

15641578
/// Publishes to the topic which may have been subscribed to earlier

0 commit comments

Comments
 (0)