Skip to content

Commit

Permalink
add data channel handler
Browse files Browse the repository at this point in the history
  • Loading branch information
yngrtc committed Jan 20, 2024
1 parent 410a996 commit 8f466db
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 3 deletions.
3 changes: 3 additions & 0 deletions examples/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use retty::bootstrap::BootstrapUdpServer;
use retty::channel::Pipeline;
use retty::executor::LocalExecutorBuilder;
use retty::transport::{AsyncTransport, AsyncTransportWrite, TaggedBytesMut};
use sfu::handlers::data::DataChannelHandler;
use sfu::handlers::demuxer::DemuxerHandler;
use sfu::handlers::dtls::DtlsHandler;
use sfu::handlers::gateway::GatewayHandler;
Expand Down Expand Up @@ -149,6 +150,7 @@ fn main() -> anyhow::Result<()> {
let stun_handler = StunHandler::new();
let dtls_handler = DtlsHandler::new(Rc::clone(&server_states_moved), dtls_handshake_config_moved.clone());
let sctp_handler = SctpHandler::new(Rc::clone(&server_states_moved), sctp_endpoint_config_moved.clone());
let data_channel_handler = DataChannelHandler::new();
//TODO: add DTLS and RTP handlers
let gateway_handler = GatewayHandler::new(Rc::clone(&server_states_moved));

Expand All @@ -157,6 +159,7 @@ fn main() -> anyhow::Result<()> {
pipeline.add_back(stun_handler);
pipeline.add_back(dtls_handler);
pipeline.add_back(sctp_handler);
pipeline.add_back(data_channel_handler);
//TODO: add DTLS and RTP handlers
pipeline.add_back(gateway_handler);

Expand Down
2 changes: 1 addition & 1 deletion rtc
174 changes: 174 additions & 0 deletions src/handlers/data/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,175 @@
use crate::messages::{
ApplicationMessage, DTLSMessageEvent, DataChannelMessage, DataChannelMessageParams,
DataChannelMessageType, MessageEvent, TaggedMessageEvent,
};
use data::message::{message_channel_ack::*, message_channel_open::*, message_type::*, *};
use log::debug;
use retty::channel::{Handler, InboundContext, InboundHandler, OutboundContext, OutboundHandler};
use shared::error::{Error, Result};
use shared::marshal::*;

#[derive(Default)]
struct DataChannelInbound;
#[derive(Default)]
struct DataChannelOutbound;
#[derive(Default)]
pub struct DataChannelHandler {
data_channel_inbound: DataChannelInbound,
data_channel_outbound: DataChannelOutbound,
}

impl DataChannelHandler {
pub fn new() -> Self {
DataChannelHandler::default()
}
}

impl InboundHandler for DataChannelInbound {
type Rin = TaggedMessageEvent;
type Rout = Self::Rin;

fn read(&mut self, ctx: &InboundContext<Self::Rin, Self::Rout>, msg: Self::Rin) {
if let MessageEvent::DTLS(DTLSMessageEvent::SCTP(message)) = msg.message {
debug!(
"recv SCTP DataChannelMessage {:?} with {:?}",
msg.transport.peer_addr, message
);
let try_read =
|| -> Result<(Option<ApplicationMessage>, Option<DataChannelMessage>)> {
if message.data_message_type == DataChannelMessageType::Control {
let mut buf = &message.payload[..];
if MessageType::unmarshal(&mut buf)? == MessageType::DataChannelOpen {
debug!("DataChannelOpen for association_handle {} and stream_id {} and data_message_type {:?}",
message.association_handle,
message.stream_id,
message.data_message_type);

let _ = DataChannelOpen::unmarshal(&mut buf)?;

let payload = Message::DataChannelAck(DataChannelAck {}).marshal()?;
Ok((
None,
Some(DataChannelMessage {
association_handle: message.association_handle,
stream_id: message.stream_id,
data_message_type: DataChannelMessageType::Control,
params: DataChannelMessageParams::Outbound {
ordered: true,
reliable: true,
max_rtx_count: 0,
max_rtx_millis: 0,
},
payload,
}),
))
} else {
Ok((None, None))
}
} else if message.data_message_type == DataChannelMessageType::Binary {
Ok((
Some(ApplicationMessage {
association_handle: message.association_handle,
stream_id: message.stream_id,
payload: message.payload,
}),
None,
))
} else {
Err(Error::UnknownProtocol)
}
};

match try_read() {
Ok((inbound_message, outbound_message)) => {
if let Some(application_message) = inbound_message {
debug!("recv application message {:?}", msg.transport.peer_addr);
ctx.fire_read(TaggedMessageEvent {
now: msg.now,
transport: msg.transport,
message: MessageEvent::DTLS(DTLSMessageEvent::APPLICATION(
application_message,
)),
})
}
if let Some(data_channel_message) = outbound_message {
debug!("send DataChannelAck message {:?}", msg.transport.peer_addr);
ctx.fire_write(TaggedMessageEvent {
now: msg.now,
transport: msg.transport,
message: MessageEvent::DTLS(DTLSMessageEvent::SCTP(
data_channel_message,
)),
});
}
}
Err(err) => ctx.fire_read_exception(Box::new(err)),
};
} else {
// Bypass
debug!("bypass DataChannel read {:?}", msg.transport.peer_addr);
ctx.fire_read(msg);
}
}
}

impl OutboundHandler for DataChannelOutbound {
type Win = TaggedMessageEvent;
type Wout = Self::Win;

fn write(&mut self, ctx: &OutboundContext<Self::Win, Self::Wout>, msg: Self::Win) {
if let MessageEvent::DTLS(DTLSMessageEvent::APPLICATION(message)) = msg.message {
debug!(
"send application message {:?} with {:?}",
msg.transport.peer_addr, message
);

ctx.fire_write(TaggedMessageEvent {
now: msg.now,
transport: msg.transport,
message: MessageEvent::DTLS(DTLSMessageEvent::SCTP(DataChannelMessage {
association_handle: message.association_handle,
stream_id: message.stream_id,
data_message_type: DataChannelMessageType::Binary,
params: DataChannelMessageParams::Outbound {
ordered: true,
reliable: true,
max_rtx_count: 0,
max_rtx_millis: 0,
},
payload: message.payload,
})),
});
} else {
// Bypass
debug!("bypass DataChannel write {:?}", msg.transport.peer_addr);
ctx.fire_write(msg);
}
}

fn close(&mut self, ctx: &OutboundContext<Self::Win, Self::Wout>) {
ctx.fire_close();
}
}

impl Handler for DataChannelHandler {
type Rin = TaggedMessageEvent;
type Rout = Self::Rin;
type Win = TaggedMessageEvent;
type Wout = Self::Win;

fn name(&self) -> &str {
"DataChannelHandler"
}

fn split(
self,
) -> (
Box<dyn InboundHandler<Rin = Self::Rin, Rout = Self::Rout>>,
Box<dyn OutboundHandler<Win = Self::Win, Wout = Self::Wout>>,
) {
(
Box::new(self.data_channel_inbound),
Box::new(self.data_channel_outbound),
)
}
}
4 changes: 3 additions & 1 deletion src/handlers/sctp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,9 @@ impl InboundHandler for SctpInbound {
association_handle: ch.0,
stream_id: id,
data_message_type: to_data_message_type(chunks.ppi),
params: DataChannelMessageParams::Inbound { seq_num: 0 },
params: DataChannelMessageParams::Inbound {
seq_num: chunks.ssn,
},
payload: BytesMut::from(&self.internal_buffer[0..n]),
});
}
Expand Down
9 changes: 8 additions & 1 deletion src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ pub struct DataChannelMessage {
pub(crate) payload: BytesMut,
}

#[derive(Debug)]
pub struct ApplicationMessage {
pub(crate) association_handle: usize,
pub(crate) stream_id: u16,
pub(crate) payload: BytesMut,
}

#[derive(Debug)]
pub enum STUNMessageEvent {
RAW(BytesMut),
Expand All @@ -42,7 +49,7 @@ pub enum STUNMessageEvent {
pub enum DTLSMessageEvent {
RAW(BytesMut),
SCTP(DataChannelMessage),
APPLICATION(BytesMut),
APPLICATION(ApplicationMessage),
}

#[derive(Debug)]
Expand Down

0 comments on commit 8f466db

Please sign in to comment.