Skip to content

Commit

Permalink
rust: move PlainTransport to flatbuffers
Browse files Browse the repository at this point in the history
  • Loading branch information
jmillan committed Aug 30, 2023
1 parent 2ce0409 commit e469b4e
Show file tree
Hide file tree
Showing 17 changed files with 653 additions and 253 deletions.
2 changes: 1 addition & 1 deletion node/src/PlainTransport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ export class PlainTransport<PlainTransportAppData extends AppData = AppData>
// Wait for response.
const response = await this.channel.request(
FbsRequest.Method.PLAINTRANSPORT_CONNECT,
FbsRequest.Body.FBS_PlainTransport_ConnectRequest,
FbsRequest.Body.PlainTransport_ConnectRequest,
requestOffset,
this.internal.transportId
);
Expand Down
33 changes: 33 additions & 0 deletions rust/src/data_structures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,17 @@ pub enum Protocol {
Udp,
}

impl Protocol {
// TODO: Use the Protocol FBS type.
pub(crate) fn from_fbs(protocol: &str) -> Self {
match protocol {
"tcp" => Protocol::Tcp,
"udp" => Protocol::Udp,
_ => todo!(),
}
}
}

/// ICE candidate
#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
Expand Down Expand Up @@ -260,6 +271,28 @@ impl TransportTuple {
None
}
}

pub(crate) fn from_fbs(tuple: &transport::Tuple) -> TransportTuple {
match &tuple.remote_ip {
Some(_remote_ip) => TransportTuple::WithRemote {
local_ip: tuple.local_ip.parse().expect("Error parsing IP address"),
local_port: tuple.local_port,
remote_ip: tuple
.remote_ip
.as_ref()
.unwrap()
.parse()
.expect("Error parsing IP address"),
remote_port: tuple.remote_port,
protocol: Protocol::from_fbs(tuple.protocol.as_str()),
},
None => TransportTuple::LocalOnly {
local_ip: tuple.local_ip.parse().expect("Error parsing IP address"),
local_port: tuple.local_port,
protocol: Protocol::from_fbs(tuple.protocol.as_str()),
},
}
}
}

/// DTLS state.
Expand Down
328 changes: 186 additions & 142 deletions rust/src/fbs.rs

Large diffs are not rendered by default.

247 changes: 203 additions & 44 deletions rust/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ use crate::data_structures::{
ListenInfo, SctpState, TransportTuple,
};
use crate::direct_transport::DirectTransportOptions;
use crate::fbs::{direct_transport, message, request, response, router, transport, worker};
use crate::fbs::{
direct_transport, message, plain_transport, request, response, router, transport, worker,
};
use crate::ortc::RtpMapping;
use crate::pipe_transport::PipeTransportOptions;
use crate::plain_transport::PlainTransportOptions;
Expand Down Expand Up @@ -735,7 +737,7 @@ impl RequestFbs for RouterDumpRequest {
pub(crate) struct RouterCreateDirectTransportData {
transport_id: TransportId,
direct: bool,
max_message_size: usize,
max_message_size: u32,
}

impl RouterCreateDirectTransportData {
Expand All @@ -754,7 +756,7 @@ impl RouterCreateDirectTransportData {
direct_transport::DirectTransportOptions {
base: Box::new(transport::Options {
direct: true,
max_message_size: u32::try_from(self.max_message_size).unwrap(),
max_message_size: self.max_message_size,
initial_available_outgoing_bitrate: 0,
enable_sctp: false,
num_sctp_streams: None,
Expand Down Expand Up @@ -896,6 +898,7 @@ impl Request for RouterCreateWebrtcTransportRequest {
pub(crate) struct RouterCreatePlainTransportData {
transport_id: TransportId,
listen_info: ListenInfo,
rtcp_listen_info: Option<ListenInfo>,
rtcp_mux: bool,
comedia: bool,
enable_sctp: bool,
Expand All @@ -915,6 +918,7 @@ impl RouterCreatePlainTransportData {
Self {
transport_id,
listen_info: plain_transport_options.listen_info,
rtcp_listen_info: plain_transport_options.rtcp_listen_info,
rtcp_mux: plain_transport_options.rtcp_mux,
comedia: plain_transport_options.comedia,
enable_sctp: plain_transport_options.enable_sctp,
Expand All @@ -926,26 +930,102 @@ impl RouterCreatePlainTransportData {
is_data_channel: false,
}
}

pub(crate) fn to_fbs(&self) -> plain_transport::PlainTransportOptions {
plain_transport::PlainTransportOptions {
base: Box::new(transport::Options {
direct: false,
initial_available_outgoing_bitrate: 0,
enable_sctp: self.enable_sctp,
num_sctp_streams: Some(Box::new(self.num_sctp_streams.to_fbs())),
max_sctp_message_size: self.max_sctp_message_size,
sctp_send_buffer_size: self.sctp_send_buffer_size,
is_data_channel: self.is_data_channel,
max_message_size: self.max_sctp_message_size,
}),
listen_info: Box::new(self.listen_info.to_fbs()),
rtcp_listen_info: self
.rtcp_listen_info
.map(|listen_info| Box::new(listen_info.to_fbs())),
rtcp_mux: self.rtcp_mux,
comedia: self.comedia,
enable_srtp: self.enable_srtp,
srtp_crypto_suite: Some(self.srtp_crypto_suite.to_string()),
}
}
}

request_response!(
RouterId,
"router.createPlainTransport",
RouterCreatePlainTransportRequest {
#[serde(flatten)]
data: RouterCreatePlainTransportData,
},
PlainTransportData {
// The following fields are present, but unused
// rtcp_mux: bool,
// comedia: bool,
tuple: Mutex<TransportTuple>,
rtcp_tuple: Mutex<Option<TransportTuple>>,
sctp_parameters: Option<SctpParameters>,
sctp_state: Mutex<Option<SctpState>>,
srtp_parameters: Mutex<Option<SrtpParameters>>,
},
);
#[derive(Debug)]
pub(crate) struct RouterCreatePlainTransportRequest {
pub(crate) data: RouterCreatePlainTransportData,
}

impl RequestFbs for RouterCreatePlainTransportRequest {
const METHOD: request::Method = request::Method::RouterCreatePlaintransport;
type HandlerId = RouterId;
type Response = PlainTransportData;

fn into_bytes(self, id: u32, handler_id: Self::HandlerId) -> Vec<u8> {
let mut builder = Builder::new();
let data = router::CreatePlainTransportRequest::create(
&mut builder,
self.data.transport_id.to_string(),
self.data.to_fbs(),
);
let request_body = request::Body::create_create_plain_transport_request(&mut builder, data);
let request = request::Request::create(
&mut builder,
id,
Self::METHOD,
handler_id.to_string(),
Some(request_body),
);
let message_body = message::Body::create_request(&mut builder, request);
let message = message::Message::create(&mut builder, message::Type::Request, message_body);

builder.finish(message, None).to_vec()
}

fn convert_response(
response: Option<response::Body>,
) -> Result<Self::Response, Box<dyn Error>> {
let Some(response::Body::FbsPlainTransportDumpResponse(data)) = response else {
panic!("Wrong message from worker: {response:?}");
};

Ok(PlainTransportData {
tuple: Mutex::new(TransportTuple::from_fbs(data.tuple.as_ref())),
rtcp_tuple: Mutex::new(
data.rtcp_tuple
.map(|tuple| TransportTuple::from_fbs(tuple.as_ref())),
),
sctp_parameters: data
.base
.sctp_parameters
.map(|parameters| SctpParameters::from_fbs(parameters.as_ref())),
sctp_state: Mutex::new(
data.base
.sctp_state
.map(|state| SctpState::from_fbs(&state)),
),
srtp_parameters: Mutex::new(
data.srtp_parameters
.map(|parameters| SrtpParameters::from_fbs(parameters.as_ref())),
),
})
}
}

pub(crate) struct PlainTransportData {
// The following fields are present, but unused
// rtcp_mux: bool,
// comedia: bool,
pub(crate) tuple: Mutex<TransportTuple>,
pub(crate) rtcp_tuple: Mutex<Option<TransportTuple>>,
pub(crate) sctp_parameters: Option<SctpParameters>,
pub(crate) sctp_state: Mutex<Option<SctpState>>,
pub(crate) srtp_parameters: Mutex<Option<SrtpParameters>>,
}

#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
Expand Down Expand Up @@ -1074,9 +1154,9 @@ request_response!(
);

#[derive(Debug)]
pub(crate) struct TransportDumpRequestNew {}
pub(crate) struct TransportDumpRequestFbs {}

impl RequestFbs for TransportDumpRequestNew {
impl RequestFbs for TransportDumpRequestFbs {
const METHOD: request::Method = request::Method::TransportDump;
type HandlerId = TransportId;
type Response = response::Body;
Expand Down Expand Up @@ -1116,9 +1196,9 @@ request_response!(
);

#[derive(Debug)]
pub(crate) struct TransportGetStatsRequestNew {}
pub(crate) struct TransportGetStatsRequestFbs {}

impl RequestFbs for TransportGetStatsRequestNew {
impl RequestFbs for TransportGetStatsRequestFbs {
const METHOD: request::Method = request::Method::TransportGetStats;
type HandlerId = TransportId;
type Response = response::Body;
Expand Down Expand Up @@ -1151,6 +1231,42 @@ impl RequestFbs for TransportGetStatsRequestNew {
}
}

#[derive(Debug)]
pub(crate) struct TransportCloseRequestFbs {
pub(crate) transport_id: TransportId,
}

impl RequestFbs for TransportCloseRequestFbs {
const METHOD: request::Method = request::Method::RouterCloseTransport;
type HandlerId = RouterId;
type Response = ();

fn into_bytes(self, id: u32, handler_id: Self::HandlerId) -> Vec<u8> {
let mut builder = Builder::new();
let data =
router::CloseTransportRequest::create(&mut builder, self.transport_id.to_string());
let request_body = request::Body::create_close_transport_request(&mut builder, data);

let request = request::Request::create(
&mut builder,
id,
Self::METHOD,
handler_id.to_string(),
Some(request_body),
);
let message_body = message::Body::create_request(&mut builder, request);
let message = message::Message::create(&mut builder, message::Type::Request, message_body);

builder.finish(message, None).to_vec()
}

fn convert_response(
_response: Option<response::Body>,
) -> Result<Self::Response, Box<dyn Error>> {
Ok(())
}
}

request_response!(
TransportId,
"transport.connect",
Expand All @@ -1176,25 +1292,68 @@ request_response!(
},
);

request_response!(
TransportId,
"transport.connect",
TransportConnectPlainRequest {
#[serde(skip_serializing_if = "Option::is_none")]
ip: Option<IpAddr>,
#[serde(skip_serializing_if = "Option::is_none")]
port: Option<u16>,
#[serde(skip_serializing_if = "Option::is_none")]
rtcp_port: Option<u16>,
#[serde(skip_serializing_if = "Option::is_none")]
srtp_parameters: Option<SrtpParameters>,
},
TransportConnectResponsePlain {
tuple: Option<TransportTuple>,
rtcp_tuple: Option<TransportTuple>,
srtp_parameters: Option<SrtpParameters>,
},
);
#[derive(Debug)]
pub(crate) struct PlainTransportConnectResponse {
pub(crate) tuple: TransportTuple,
pub(crate) rtcp_tuple: Option<TransportTuple>,
pub(crate) srtp_parameters: Option<SrtpParameters>,
}

#[derive(Debug)]
pub(crate) struct TransportConnectPlainRequest {
pub(crate) ip: Option<IpAddr>,
pub(crate) port: Option<u16>,
pub(crate) rtcp_port: Option<u16>,
pub(crate) srtp_parameters: Option<SrtpParameters>,
}

impl RequestFbs for TransportConnectPlainRequest {
const METHOD: request::Method = request::Method::PlaintransportConnect;
type HandlerId = TransportId;
type Response = PlainTransportConnectResponse;

fn into_bytes(self, id: u32, handler_id: Self::HandlerId) -> Vec<u8> {
let mut builder = Builder::new();
let data = plain_transport::ConnectRequest::create(
&mut builder,
self.ip.map(|ip| ip.to_string()),
self.port,
self.rtcp_port,
self.srtp_parameters.map(|parameters| parameters.to_fbs()),
);
let request_body =
request::Body::create_plain_transport_connect_request(&mut builder, data);
let request = request::Request::create(
&mut builder,
id,
Self::METHOD,
handler_id.to_string(),
Some(request_body),
);
let message_body = message::Body::create_request(&mut builder, request);
let message = message::Message::create(&mut builder, message::Type::Request, message_body);

builder.finish(message, None).to_vec()
}

fn convert_response(
response: Option<response::Body>,
) -> Result<Self::Response, Box<dyn Error>> {
let Some(response::Body::FbsPlainTransportConnectResponse(data)) = response else {
panic!("Wrong message from worker: {response:?}");
};

Ok(PlainTransportConnectResponse {
tuple: TransportTuple::from_fbs(data.tuple.as_ref()),
rtcp_tuple: data
.rtcp_tuple
.map(|tuple| TransportTuple::from_fbs(tuple.as_ref())),
srtp_parameters: data
.srtp_parameters
.map(|parameters| SrtpParameters::from_fbs(parameters.as_ref())),
})
}
}

request_response!(
TransportId,
Expand Down
2 changes: 1 addition & 1 deletion rust/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ impl Router {
let data = self
.inner
.channel
.request(
.request_fbs(
self.inner.id,
RouterCreatePlainTransportRequest {
data: RouterCreatePlainTransportData::from_options(
Expand Down
Loading

0 comments on commit e469b4e

Please sign in to comment.