Skip to content

Commit

Permalink
add Gateway handler (WIP)
Browse files Browse the repository at this point in the history
  • Loading branch information
yngrtc committed Jan 16, 2024
1 parent 22f0ca7 commit 2c9540e
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 6 deletions.
7 changes: 6 additions & 1 deletion examples/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use retty::channel::Pipeline;
use retty::executor::LocalExecutorBuilder;
use retty::transport::{AsyncTransport, AsyncTransportWrite, TaggedBytesMut};
use sfu::handlers::demuxer::DemuxerHandler;
use sfu::handlers::gateway::GatewayHandler;
use sfu::handlers::stun::StunHandler;
use sfu::server::certificate::RTCCertificate;
use sfu::server::config::ServerConfig;
Expand Down Expand Up @@ -131,7 +132,7 @@ fn main() -> anyhow::Result<()> {

info!("listening {}:{}...", host, port);

let _server_states_moved = server_states.clone();
let server_states_moved = server_states.clone();
let mut bootstrap = BootstrapUdpServer::new();
bootstrap.pipeline(Box::new(
move |writer: AsyncTransportWrite<TaggedBytesMut>| {
Expand All @@ -140,10 +141,14 @@ fn main() -> anyhow::Result<()> {
let async_transport_handler = AsyncTransport::new(writer);
let demuxer_handler = DemuxerHandler::new();
let stun_handler = StunHandler::new();
//TODO: add DTLS and RTP handlers
let gateway_handler = GatewayHandler::new(Rc::clone(&server_states_moved));

pipeline.add_back(async_transport_handler);
pipeline.add_back(demuxer_handler);
pipeline.add_back(stun_handler);
//TODO: add DTLS and RTP handlers
pipeline.add_back(gateway_handler);

pipeline.finalize()
},
Expand Down
50 changes: 45 additions & 5 deletions src/handlers/gateway/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
use crate::server::states::ServerStates;
use crate::shared::messages::TaggedMessageEvent;
use crate::shared::messages::{MessageEvent, STUNMessageEvent, TaggedMessageEvent};
use log::warn;
use retty::channel::{Handler, InboundContext, InboundHandler, OutboundContext, OutboundHandler};
use retty::transport::TransportContext;
use shared::error::Result;
use std::rc::Rc;
use std::time::Instant;

struct GatewayInbound {
server_states: Rc<ServerStates>,
}
struct GatewayOutbound;
struct GatewayOutbound {
server_states: Rc<ServerStates>,
}

pub struct GatewayHandler {
gateway_inbound: GatewayInbound,
Expand All @@ -16,8 +22,10 @@ pub struct GatewayHandler {
impl GatewayHandler {
pub fn new(server_states: Rc<ServerStates>) -> Self {
GatewayHandler {
gateway_inbound: GatewayInbound { server_states },
gateway_outbound: GatewayOutbound {},
gateway_inbound: GatewayInbound {
server_states: Rc::clone(&server_states),
},
gateway_outbound: GatewayOutbound { server_states },
}
}
}
Expand All @@ -26,7 +34,27 @@ impl InboundHandler for GatewayInbound {
type Rin = TaggedMessageEvent;
type Rout = Self::Rin;

fn read(&mut self, _ctx: &InboundContext<Self::Rin, Self::Rout>, _msg: Self::Rin) {}
fn read(&mut self, ctx: &InboundContext<Self::Rin, Self::Rout>, msg: Self::Rin) {
let try_read = || -> Result<()> {
match msg.message {
MessageEvent::STUN(STUNMessageEvent::STUN(message)) => {
self.handle_stun_message(ctx, msg.now, msg.transport, message)
}
_ => {
warn!(
"drop unsupported message {:?} from {}",
msg.message, msg.transport.peer_addr
);
Ok(())
}
}
};

if let Err(err) = try_read() {
warn!("try_read got error {}", err);
ctx.fire_read_exception(Box::new(err));
}
}
}

impl OutboundHandler for GatewayOutbound {
Expand Down Expand Up @@ -60,3 +88,15 @@ impl Handler for GatewayHandler {
)
}
}

impl GatewayInbound {
fn handle_stun_message(
&mut self,
_ctx: &InboundContext<TaggedMessageEvent, TaggedMessageEvent>,
_now: Instant,
_transport: TransportContext,
_request: stun::message::Message,
) -> Result<()> {
Ok(())
}
}

0 comments on commit 2c9540e

Please sign in to comment.