Skip to content

fix: realtime late join #6869

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Jul 23, 2025
26 changes: 18 additions & 8 deletions src/mimefactory.rs
Original file line number Diff line number Diff line change
@@ -5,7 +5,9 @@ use std::io::Cursor;

use anyhow::{Context as _, Result, bail, ensure};
use base64::Engine as _;
use data_encoding::BASE32_NOPAD;
use deltachat_contact_tools::sanitize_bidi_characters;
use iroh_gossip::proto::TopicId;
use mail_builder::headers::HeaderType;
use mail_builder::headers::address::{Address, EmailAddress};
use mail_builder::mime::MimePart;
@@ -21,14 +23,14 @@ use crate::contact::{Contact, ContactId, Origin};
use crate::context::Context;
use crate::e2ee::EncryptHelper;
use crate::ephemeral::Timer as EphemeralTimer;
use crate::key::self_fingerprint;
use crate::key::{DcKey, SignedPublicKey};
use crate::headerdef::HeaderDef;
use crate::key::{DcKey, SignedPublicKey, self_fingerprint};
use crate::location;
use crate::log::{info, warn};
use crate::message::{Message, MsgId, Viewtype};
use crate::mimeparser::{SystemMessage, is_hidden};
use crate::param::Param;
use crate::peer_channels::create_iroh_header;
use crate::peer_channels::{create_iroh_header, get_iroh_topic_for_msg};
use crate::simplify::escape_message_footer_marks;
use crate::stock_str;
use crate::tools::{
@@ -139,6 +141,9 @@ pub struct MimeFactory {

/// True if the avatar should be attached.
pub attach_selfavatar: bool,

/// This field is used to sustain the topic id of webxdcs needed for peer channels.
webxdc_topic: Option<TopicId>,
}

/// Result of rendering a message, ready to be submitted to a send job.
@@ -449,7 +454,7 @@ impl MimeFactory {
member_timestamps.is_empty()
|| to.len() + past_members.len() == member_timestamps.len()
);

let webxdc_topic = get_iroh_topic_for_msg(context, msg.id).await?;
let factory = MimeFactory {
from_addr,
from_displayname,
@@ -469,6 +474,7 @@ impl MimeFactory {
last_added_location_id: None,
sync_ids_to_delete: None,
attach_selfavatar,
webxdc_topic,
};
Ok(factory)
}
@@ -516,6 +522,7 @@ impl MimeFactory {
last_added_location_id: None,
sync_ids_to_delete: None,
attach_selfavatar: false,
webxdc_topic: None,
};

Ok(res)
@@ -1492,7 +1499,7 @@ impl MimeFactory {
}
SystemMessage::IrohNodeAddr => {
headers.push((
"Iroh-Node-Addr",
HeaderDef::IrohNodeAddr.into(),
mail_builder::headers::text::Text::new(serde_json::to_string(
&context
.get_or_try_init_peer_channel()
@@ -1673,10 +1680,13 @@ impl MimeFactory {
let json = msg.param.get(Param::Arg).unwrap_or_default();
parts.push(context.build_status_update_part(json));
} else if msg.viewtype == Viewtype::Webxdc {
let topic = self
.webxdc_topic
.map(|top| BASE32_NOPAD.encode(top.as_bytes()).to_ascii_lowercase())
.unwrap_or(create_iroh_header(context, msg.id).await?);
headers.push((
"Iroh-Gossip-Topic",
mail_builder::headers::raw::Raw::new(create_iroh_header(context, msg.id).await?)
.into(),
HeaderDef::IrohGossipTopic.get_headername(),
mail_builder::headers::raw::Raw::new(topic).into(),
));
if let (Some(json), _) = context
.render_webxdc_status_update_object(
132 changes: 107 additions & 25 deletions src/peer_channels.rs
Original file line number Diff line number Diff line change
@@ -48,13 +48,13 @@ use crate::mimeparser::SystemMessage;
const PUBLIC_KEY_LENGTH: usize = 32;
const PUBLIC_KEY_STUB: &[u8] = "static_string".as_bytes();

/// Store iroh peer channels for the context.
/// Store Iroh peer channels for the context.
#[derive(Debug)]
pub struct Iroh {
/// iroh router needed for iroh peer channels.
/// Iroh router needed for Iroh peer channels.
pub(crate) router: iroh::protocol::Router,

/// [Gossip] needed for iroh peer channels.
/// [Gossip] needed for Iroh peer channels.
pub(crate) gossip: Gossip,

/// Sequence numbers for gossip channels.
@@ -109,7 +109,7 @@ impl Iroh {

info!(
ctx,
"IROH_REALTIME: Joining gossip with peers: {:?}", node_ids,
"IROH_REALTIME: Joining gossip {topic} with peers: {:?}.", node_ids,
);

// Inform iroh of potentially new node addresses
@@ -138,17 +138,11 @@ impl Iroh {
Ok(Some(join_rx))
}

/// Add gossip peers to realtime channel if it is already active.
pub async fn maybe_add_gossip_peers(&self, topic: TopicId, peers: Vec<NodeAddr>) -> Result<()> {
/// Add gossip peer to realtime channel if it is already active.
pub async fn maybe_add_gossip_peer(&self, topic: TopicId, peer: NodeAddr) -> Result<()> {
if self.iroh_channels.read().await.get(&topic).is_some() {
for peer in &peers {
self.router.endpoint().add_node_addr(peer.clone())?;
}

self.gossip.subscribe_with_opts(
topic,
JoinOptions::with_bootstrap(peers.into_iter().map(|peer| peer.node_id)),
);
self.router.endpoint().add_node_addr(peer.clone())?;
self.gossip.subscribe(topic, vec![peer.node_id])?;
}
Ok(())
}
@@ -316,6 +310,17 @@ impl Context {
}
}
}

pub(crate) async fn maybe_add_gossip_peer(&self, topic: TopicId, peer: NodeAddr) -> Result<()> {
if let Some(iroh) = &*self.iroh.read().await {
info!(
self,
"Adding (maybe existing) peer with id {} to {topic}.", peer.node_id
);
iroh.maybe_add_gossip_peer(topic, peer).await?;
}
Ok(())
}
}

/// Cache a peers [NodeId] for one topic.
@@ -348,12 +353,13 @@ pub async fn add_gossip_peer_from_header(
return Ok(());
}

let node_addr =
serde_json::from_str::<NodeAddr>(node_addr).context("Failed to parse node address")?;

info!(
context,
"Adding iroh peer with address {node_addr:?} to the topic of {instance_id}."
"Adding iroh peer with node id {} to the topic of {instance_id}.", node_addr.node_id
);
let node_addr =
serde_json::from_str::<NodeAddr>(node_addr).context("Failed to parse node address")?;

context.emit_event(EventType::WebxdcRealtimeAdvertisementReceived {
msg_id: instance_id,
@@ -371,8 +377,7 @@ pub async fn add_gossip_peer_from_header(
let relay_server = node_addr.relay_url().map(|relay| relay.as_str());
iroh_add_peer_for_topic(context, instance_id, topic, node_id, relay_server).await?;

let iroh = context.get_or_try_init_peer_channel().await?;
iroh.maybe_add_gossip_peers(topic, vec![node_addr]).await?;
context.maybe_add_gossip_peer(topic, node_addr).await?;
Ok(())
}

@@ -555,9 +560,9 @@ mod tests {
use super::*;
use crate::{
EventType,
chat::send_msg,
chat::{self, ChatId, ProtectionStatus, add_contact_to_chat, resend_msgs, send_msg},
message::{Message, Viewtype},
test_utils::TestContextManager,
test_utils::{TestContext, TestContextManager},
};

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
@@ -924,8 +929,30 @@ mod tests {
let alice = &mut tcm.alice().await;
let bob = &mut tcm.bob().await;

let chat = alice.create_chat(bob).await.id;

let mut instance = Message::new(Viewtype::File);
instance
.set_file_from_bytes(
alice,
"minimal.xdc",
include_bytes!("../test-data/webxdc/minimal.xdc"),
None,
)
.unwrap();
connect_alice_bob(alice, chat, &mut instance, bob).await
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_webxdc_resend() {
let mut tcm = TestContextManager::new();
let alice = &mut tcm.alice().await;
let bob = &mut tcm.bob().await;
let group = chat::create_group_chat(alice, ProtectionStatus::Unprotected, "group chat")
.await
.unwrap();

// Alice sends webxdc to bob
let alice_chat = alice.create_chat(bob).await;
let mut instance = Message::new(Viewtype::File);
instance
.set_file_from_bytes(
@@ -935,7 +962,60 @@ mod tests {
None,
)
.unwrap();
send_msg(alice, alice_chat.id, &mut instance).await.unwrap();

add_contact_to_chat(alice, group, alice.add_or_lookup_contact_id(bob).await)
.await
.unwrap();

connect_alice_bob(alice, group, &mut instance, bob).await;

// fiona joins late
let fiona = &mut tcm.fiona().await;

add_contact_to_chat(alice, group, alice.add_or_lookup_contact_id(fiona).await)
.await
.unwrap();

resend_msgs(alice, &[instance.id]).await.unwrap();
let msg = alice.pop_sent_msg().await;
let fiona_instance = fiona.recv_msg(&msg).await;
fiona_instance.chat_id.accept(fiona).await.unwrap();
assert!(fiona.ctx.iroh.read().await.is_none());

let fiona_connect_future = send_webxdc_realtime_advertisement(fiona, fiona_instance.id)
.await
.unwrap()
.unwrap();
let fiona_advert = fiona.pop_sent_msg().await;
alice.recv_msg_trash(&fiona_advert).await;

fiona_connect_future.await.unwrap();
send_webxdc_realtime_data(alice, instance.id, b"alice -> bob & fiona".into())
.await
.unwrap();

loop {
let event = fiona.evtracker.recv().await.unwrap();
if let EventType::WebxdcRealtimeData { data, .. } = event.typ {
if data == b"alice -> bob & fiona" {
break;
} else {
panic!(
"Unexpected status update: {}",
String::from_utf8_lossy(&data)
);
}
}
}
}

async fn connect_alice_bob(
alice: &mut TestContext,
alice_chat_id: ChatId,
instance: &mut Message,
bob: &mut TestContext,
) {
send_msg(alice, alice_chat_id, instance).await.unwrap();
let alice_webxdc = alice.get_last_msg().await;

let webxdc = alice.pop_sent_msg().await;
@@ -952,17 +1032,19 @@ mod tests {
.unwrap();
let alice_advertisement = alice.pop_sent_msg().await;

send_webxdc_realtime_advertisement(bob, bob_webxdc.id)
let bob_advertisement_future = send_webxdc_realtime_advertisement(bob, bob_webxdc.id)
.await
.unwrap()
.unwrap();
let bob_advertisement = bob.pop_sent_msg().await;

eprintln!("Receiving advertisements");
bob.recv_msg_trash(&alice_advertisement).await;
alice.recv_msg_trash(&bob_advertisement).await;

eprintln!("Alice waits for connection");
eprintln!("Alice and Bob wait for connection");
alice_advertisement_future.await.unwrap();
bob_advertisement_future.await.unwrap();

// Alice sends ephemeral message
eprintln!("Sending ephemeral message");
2 changes: 1 addition & 1 deletion src/receive_imf.rs
Original file line number Diff line number Diff line change
@@ -2141,7 +2141,7 @@ RETURNING id
created_db_entries.push(row_id);
}

// check all parts whether they contain a new logging webxdc
// Maybe set logging xdc and add gossip topics for webxdcs.
for (part, msg_id) in mime_parser.parts.iter().zip(&created_db_entries) {
// check if any part contains a webxdc topic id
if part.typ == Viewtype::Webxdc {