Skip to content

Commit

Permalink
add signaling message handle functions
Browse files Browse the repository at this point in the history
  • Loading branch information
yngrtc committed Jan 7, 2024
1 parent c481f4a commit ecd77d7
Show file tree
Hide file tree
Showing 5 changed files with 201 additions and 36 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ data = { path = "rtc/data"}
[dev-dependencies]
futures = "0.3.30"
smol = "2.0.0"
tokio = { version = "1.24.1", features = ["rt-multi-thread", "macros", "signal"] }
tokio = { version = "1.24.1", features = ["fs", "rt-multi-thread", "macros", "signal"] }
tokio-util = "0.7.10"
hyper = { version = "0.14.16", features = ["full"] }
chrono = "0.4.31"
env_logger = "0.10.1"
Expand Down
10 changes: 10 additions & 0 deletions examples/chat.html
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,16 @@
byId('join').disabled = false;
byId('leave').disabled = true;
rtc.close();

let path = '/leave/'+byId("room").value+'/'+endpointId;
const res = await fetch(path, {
method: 'POST',
headers: {
'Content-Type': 'application/json'
}
});

rtc = new RTCPeerConnection();
}
</script>
</body>
Expand Down
18 changes: 10 additions & 8 deletions examples/chat.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::signal::{SignalingMessage, SignalingServer};
use crate::signal::{handle_signaling_message, SignalingMessage, SignalingServer};
use async_broadcast::broadcast;
use clap::Parser;
use dtls::extension::extension_use_srtp::SrtpProtectionProfile;
Expand Down Expand Up @@ -86,7 +86,7 @@ fn main() -> anyhow::Result<()> {
}

println!(
"listening {}@{}(signal)/[{}-{}](media)...",
"listening {}:{}(signal)/[{}-{}](media)...",
cli.host, cli.signal_port, cli.media_port_min, cli.media_port_max
);

Expand All @@ -105,8 +105,8 @@ fn main() -> anyhow::Result<()> {
let worker = wait_group.worker();
let host = cli.host.clone();
let mut stop_rx = stop_rx.clone();
let (signal_tx, signal_rx) = smol::channel::unbounded::<SignalingMessage>();
media_port_thread_map.insert(port, signal_tx);
let (signaling_tx, signaling_rx) = smol::channel::unbounded::<SignalingMessage>();
media_port_thread_map.insert(port, signaling_tx);

let server_config = server_config.clone();
LocalExecutorBuilder::new()
Expand All @@ -124,7 +124,7 @@ fn main() -> anyhow::Result<()> {
.with_extended_master_secret(dtls::config::ExtendedMasterSecretType::Require)
.build(false, None)
.unwrap();
let _server_states = Rc::new(ServerStates::new(server_config));
let server_states = Rc::new(ServerStates::new(server_config));

info!("listening {}:{}...", host, port);
let mut bootstrap = BootstrapUdpServer::new();
Expand Down Expand Up @@ -152,10 +152,12 @@ fn main() -> anyhow::Result<()> {
info!("media server on {}:{} receives stop signal", host, port);
break;
}
recv = signal_rx.recv() => {
recv = signaling_rx.recv() => {
match recv {
Ok(_msg) => {
//TODO: receive signal msg
Ok(signaling_msg) => {
if let Err(err) = handle_signaling_message(&server_states, signaling_msg) {
error!("handle_signaling_message error: {}", err);
}
}
Err(err) => {
error!("signal_rx recv error: {}", err);
Expand Down
181 changes: 179 additions & 2 deletions examples/signal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@ use bytes::Bytes;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server, StatusCode};
use log::{debug, error, info};
use sfu::server::states::ServerStates;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::rc::Rc;
use std::sync::Arc;
use tokio::fs::File;
use tokio_util::codec::{BytesCodec, FramedRead};

pub enum SignalingProtocolMessage {
Ok {
Expand All @@ -18,7 +22,7 @@ pub enum SignalingProtocolMessage {
Err {
room_id: u64,
endpoint_id: u64,
reason: String,
reason: Bytes,
},
Join {
room_id: u64,
Expand Down Expand Up @@ -84,7 +88,11 @@ impl SignalingServer {
}
});
let server = Server::bind(&signal_addr).serve(service);
info!("signaling server is running...");
println!(
"signaling server http://{}:{} is running...",
signal_addr.ip(),
signal_addr.port()
);
let graceful = server.with_graceful_shutdown(async {
let _ = stop_rx.recv().await;
info!("signaling server receives stop signal");
Expand All @@ -106,6 +114,23 @@ async fn remote_handler(
req: Request<Body>,
media_port_thread_map: Arc<HashMap<u16, smol::channel::Sender<SignalingMessage>>>,
) -> Result<Response<Body>, hyper::Error> {
match (req.method(), req.uri().path()) {
(&Method::GET, "/") | (&Method::GET, "/index.html") => {
// Open file for reading
if let Ok(file) = File::open("examples/chat.html").await {
let stream = FramedRead::new(file, BytesCodec::new());
let body = Body::wrap_stream(stream);
return Ok(Response::new(body));
} else {
eprintln!("ERROR: Unable to open file.");
let mut not_found = Response::default();
*not_found.status_mut() = StatusCode::NOT_FOUND;
return Ok(not_found);
}
}
_ => {}
};

let path: Vec<&str> = req.uri().path().split('/').collect();
if path.len() < 3
|| path[2].parse::<u64>().is_err()
Expand Down Expand Up @@ -299,3 +324,155 @@ async fn remote_handler(
*response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
Ok(response)
}

pub fn handle_signaling_message(
server_states: &Rc<ServerStates>,
signaling_msg: SignalingMessage,
) -> Result<()> {
match signaling_msg.request {
SignalingProtocolMessage::Join { room_id } => {
let endpoint_id: u64 = rand::random();
Ok(signaling_msg
.response_tx
.send(SignalingProtocolMessage::Ok {
room_id,
endpoint_id,
})
.map_err(|_| {
shared::error::Error::Other(
"failed to send back signaling message response".to_string(),
)
})?)
}
SignalingProtocolMessage::Offer {
room_id,
endpoint_id,
offer_sdp,
} => handle_offer_message(
server_states,
room_id,
endpoint_id,
offer_sdp,
signaling_msg.response_tx,
),
SignalingProtocolMessage::Answer {
room_id,
endpoint_id,
answer_sdp,
} => handle_answer_message(
server_states,
room_id,
endpoint_id,
answer_sdp,
signaling_msg.response_tx,
),
SignalingProtocolMessage::Leave {
room_id,
endpoint_id,
} => handle_leave_message(
server_states,
room_id,
endpoint_id,
signaling_msg.response_tx,
),
SignalingProtocolMessage::Ok {
room_id,
endpoint_id,
}
| SignalingProtocolMessage::Err {
room_id,
endpoint_id,
reason: _,
}
| SignalingProtocolMessage::Trickle {
room_id,
endpoint_id,
trickle_sdp: _,
} => Ok(signaling_msg
.response_tx
.send(SignalingProtocolMessage::Err {
room_id,
endpoint_id,
reason: Bytes::from("Invalid Request"),
})
.map_err(|_| {
shared::error::Error::Other(
"failed to send back signaling message response".to_string(),
)
})?),
}
}

fn handle_offer_message(
_server_states: &Rc<ServerStates>,
room_id: u64,
endpoint_id: u64,
offer_sdp: Bytes,
response_tx: futures::channel::oneshot::Sender<SignalingProtocolMessage>,
) -> Result<()> {
info!(
"handle_offer_message: {}/{}/{}",
room_id,
endpoint_id,
String::from_utf8(offer_sdp.to_vec())?
);

let answer_sdp = offer_sdp;

Ok(response_tx
.send(SignalingProtocolMessage::Answer {
room_id,
endpoint_id,
answer_sdp,
})
.map_err(|_| {
shared::error::Error::Other(
"failed to send back signaling message response".to_string(),
)
})?)
}

fn handle_answer_message(
_server_states: &Rc<ServerStates>,
room_id: u64,
endpoint_id: u64,
answer_sdp: Bytes,
response_tx: futures::channel::oneshot::Sender<SignalingProtocolMessage>,
) -> Result<()> {
info!(
"handle_answer_message: {}/{}/{}",
room_id,
endpoint_id,
String::from_utf8(answer_sdp.to_vec())?
);

Ok(response_tx
.send(SignalingProtocolMessage::Ok {
room_id,
endpoint_id,
})
.map_err(|_| {
shared::error::Error::Other(
"failed to send back signaling message response".to_string(),
)
})?)
}

fn handle_leave_message(
_server_states: &Rc<ServerStates>,
room_id: u64,
endpoint_id: u64,
response_tx: futures::channel::oneshot::Sender<SignalingProtocolMessage>,
) -> Result<()> {
info!("handle_leave_message: {}/{}", room_id, endpoint_id,);
Ok(response_tx
.send(SignalingProtocolMessage::Ok {
room_id,
endpoint_id,
})
.map_err(|_| {
shared::error::Error::Other(
"failed to send back signaling message response".to_string(),
)
})?)
}
25 changes: 0 additions & 25 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,4 @@
use std::cell::RefCell;
use std::collections::HashMap;
use std::rc::Rc;

pub mod certificate;
pub mod config;
pub mod room;
pub mod states;

use crate::shared::types::RoomId;
use room::Room;

pub struct ServerStates {
rooms: RefCell<HashMap<RoomId, Rc<Room>>>,
}

impl Default for ServerStates {
fn default() -> Self {
Self::new()
}
}

impl ServerStates {
pub fn new() -> Self {
Self {
rooms: RefCell::new(HashMap::new()),
}
}
}

0 comments on commit ecd77d7

Please sign in to comment.