Skip to content

Commit 37d5467

Browse files
committed
rust: move PlainTransport to flatbuffers
1 parent 2ce0409 commit 37d5467

17 files changed

+661
-253
lines changed

node/src/PlainTransport.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,7 @@ export class PlainTransport<PlainTransportAppData extends AppData = AppData>
356356
// Wait for response.
357357
const response = await this.channel.request(
358358
FbsRequest.Method.PLAINTRANSPORT_CONNECT,
359-
FbsRequest.Body.FBS_PlainTransport_ConnectRequest,
359+
FbsRequest.Body.PlainTransport_ConnectRequest,
360360
requestOffset,
361361
this.internal.transportId
362362
);

rust/src/data_structures.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,17 @@ pub enum Protocol {
146146
Udp,
147147
}
148148

149+
impl Protocol {
150+
// TODO: Use the Protocol FBS type.
151+
pub(crate) fn from_fbs(protocol: &str) -> Self {
152+
match protocol {
153+
"tcp" => Protocol::Tcp,
154+
"udp" => Protocol::Udp,
155+
_ => todo!(),
156+
}
157+
}
158+
}
159+
149160
/// ICE candidate
150161
#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Deserialize, Serialize)]
151162
#[serde(rename_all = "camelCase")]
@@ -260,6 +271,28 @@ impl TransportTuple {
260271
None
261272
}
262273
}
274+
275+
pub(crate) fn from_fbs(tuple: &transport::Tuple) -> TransportTuple {
276+
match &tuple.remote_ip {
277+
Some(_remote_ip) => TransportTuple::WithRemote {
278+
local_ip: tuple.local_ip.parse().expect("Error parsing IP address"),
279+
local_port: tuple.local_port,
280+
remote_ip: tuple
281+
.remote_ip
282+
.as_ref()
283+
.unwrap()
284+
.parse()
285+
.expect("Error parsing IP address"),
286+
remote_port: tuple.remote_port,
287+
protocol: Protocol::from_fbs(tuple.protocol.as_str()),
288+
},
289+
None => TransportTuple::LocalOnly {
290+
local_ip: tuple.local_ip.parse().expect("Error parsing IP address"),
291+
local_port: tuple.local_port,
292+
protocol: Protocol::from_fbs(tuple.protocol.as_str()),
293+
},
294+
}
295+
}
263296
}
264297

265298
/// DTLS state.

rust/src/fbs.rs

Lines changed: 186 additions & 142 deletions
Large diffs are not rendered by default.

rust/src/messages.rs

Lines changed: 211 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@ use crate::data_structures::{
1111
ListenInfo, SctpState, TransportTuple,
1212
};
1313
use crate::direct_transport::DirectTransportOptions;
14-
use crate::fbs::{direct_transport, message, request, response, router, transport, worker};
14+
use crate::fbs::{
15+
direct_transport, message, plain_transport, request, response, router, transport, worker,
16+
};
1517
use crate::ortc::RtpMapping;
1618
use crate::pipe_transport::PipeTransportOptions;
1719
use crate::plain_transport::PlainTransportOptions;
@@ -735,7 +737,7 @@ impl RequestFbs for RouterDumpRequest {
735737
pub(crate) struct RouterCreateDirectTransportData {
736738
transport_id: TransportId,
737739
direct: bool,
738-
max_message_size: usize,
740+
max_message_size: u32,
739741
}
740742

741743
impl RouterCreateDirectTransportData {
@@ -754,7 +756,7 @@ impl RouterCreateDirectTransportData {
754756
direct_transport::DirectTransportOptions {
755757
base: Box::new(transport::Options {
756758
direct: true,
757-
max_message_size: u32::try_from(self.max_message_size).unwrap(),
759+
max_message_size: self.max_message_size,
758760
initial_available_outgoing_bitrate: 0,
759761
enable_sctp: false,
760762
num_sctp_streams: None,
@@ -896,6 +898,7 @@ impl Request for RouterCreateWebrtcTransportRequest {
896898
pub(crate) struct RouterCreatePlainTransportData {
897899
transport_id: TransportId,
898900
listen_info: ListenInfo,
901+
rtcp_listen_info: Option<ListenInfo>,
899902
rtcp_mux: bool,
900903
comedia: bool,
901904
enable_sctp: bool,
@@ -915,6 +918,7 @@ impl RouterCreatePlainTransportData {
915918
Self {
916919
transport_id,
917920
listen_info: plain_transport_options.listen_info,
921+
rtcp_listen_info: plain_transport_options.rtcp_listen_info,
918922
rtcp_mux: plain_transport_options.rtcp_mux,
919923
comedia: plain_transport_options.comedia,
920924
enable_sctp: plain_transport_options.enable_sctp,
@@ -926,26 +930,102 @@ impl RouterCreatePlainTransportData {
926930
is_data_channel: false,
927931
}
928932
}
933+
934+
pub(crate) fn to_fbs(&self) -> plain_transport::PlainTransportOptions {
935+
plain_transport::PlainTransportOptions {
936+
base: Box::new(transport::Options {
937+
direct: false,
938+
initial_available_outgoing_bitrate: 0,
939+
enable_sctp: self.enable_sctp,
940+
num_sctp_streams: Some(Box::new(self.num_sctp_streams.to_fbs())),
941+
max_sctp_message_size: self.max_sctp_message_size,
942+
sctp_send_buffer_size: self.sctp_send_buffer_size,
943+
is_data_channel: self.is_data_channel,
944+
max_message_size: self.max_sctp_message_size,
945+
}),
946+
listen_info: Box::new(self.listen_info.to_fbs()),
947+
rtcp_listen_info: self
948+
.rtcp_listen_info
949+
.map(|listen_info| Box::new(listen_info.to_fbs())),
950+
rtcp_mux: self.rtcp_mux,
951+
comedia: self.comedia,
952+
enable_srtp: self.enable_srtp,
953+
srtp_crypto_suite: Some(self.srtp_crypto_suite.to_string()),
954+
}
955+
}
929956
}
930957

931-
request_response!(
932-
RouterId,
933-
"router.createPlainTransport",
934-
RouterCreatePlainTransportRequest {
935-
#[serde(flatten)]
936-
data: RouterCreatePlainTransportData,
937-
},
938-
PlainTransportData {
939-
// The following fields are present, but unused
940-
// rtcp_mux: bool,
941-
// comedia: bool,
942-
tuple: Mutex<TransportTuple>,
943-
rtcp_tuple: Mutex<Option<TransportTuple>>,
944-
sctp_parameters: Option<SctpParameters>,
945-
sctp_state: Mutex<Option<SctpState>>,
946-
srtp_parameters: Mutex<Option<SrtpParameters>>,
947-
},
948-
);
958+
#[derive(Debug)]
959+
pub(crate) struct RouterCreatePlainTransportRequest {
960+
pub(crate) data: RouterCreatePlainTransportData,
961+
}
962+
963+
impl RequestFbs for RouterCreatePlainTransportRequest {
964+
const METHOD: request::Method = request::Method::RouterCreatePlaintransport;
965+
type HandlerId = RouterId;
966+
type Response = PlainTransportData;
967+
968+
fn into_bytes(self, id: u32, handler_id: Self::HandlerId) -> Vec<u8> {
969+
let mut builder = Builder::new();
970+
let data = router::CreatePlainTransportRequest::create(
971+
&mut builder,
972+
self.data.transport_id.to_string(),
973+
self.data.to_fbs(),
974+
);
975+
let request_body = request::Body::create_create_plain_transport_request(&mut builder, data);
976+
let request = request::Request::create(
977+
&mut builder,
978+
id,
979+
Self::METHOD,
980+
handler_id.to_string(),
981+
Some(request_body),
982+
);
983+
let message_body = message::Body::create_request(&mut builder, request);
984+
let message = message::Message::create(&mut builder, message::Type::Request, message_body);
985+
986+
builder.finish(message, None).to_vec()
987+
}
988+
989+
fn convert_response(
990+
response: Option<response::Body>,
991+
) -> Result<Self::Response, Box<dyn Error>> {
992+
let Some(response::Body::FbsPlainTransportDumpResponse(data)) = response else {
993+
panic!("Wrong message from worker: {response:?}");
994+
};
995+
996+
Ok(PlainTransportData {
997+
tuple: Mutex::new(TransportTuple::from_fbs(data.tuple.as_ref())),
998+
rtcp_tuple: Mutex::new(
999+
data.rtcp_tuple
1000+
.map(|tuple| TransportTuple::from_fbs(tuple.as_ref())),
1001+
),
1002+
sctp_parameters: data
1003+
.base
1004+
.sctp_parameters
1005+
.map(|parameters| SctpParameters::from_fbs(parameters.as_ref())),
1006+
sctp_state: Mutex::new(
1007+
data.base
1008+
.sctp_state
1009+
.map(|state| SctpState::from_fbs(&state)),
1010+
),
1011+
srtp_parameters: Mutex::new(
1012+
data.srtp_parameters
1013+
.map(|parameters| SrtpParameters::from_fbs(parameters.as_ref())),
1014+
),
1015+
})
1016+
}
1017+
}
1018+
1019+
pub(crate) struct PlainTransportData {
1020+
// The following fields are present, but unused
1021+
// rtcp_mux: bool,
1022+
// comedia: bool,
1023+
pub(crate) tuple: Mutex<TransportTuple>,
1024+
pub(crate) rtcp_tuple: Mutex<Option<TransportTuple>>,
1025+
pub(crate) sctp_parameters: Option<SctpParameters>,
1026+
pub(crate) sctp_state: Mutex<Option<SctpState>>,
1027+
pub(crate) srtp_parameters: Mutex<Option<SrtpParameters>>,
1028+
}
9491029

9501030
#[derive(Debug, Serialize)]
9511031
#[serde(rename_all = "camelCase")]
@@ -1074,9 +1154,9 @@ request_response!(
10741154
);
10751155

10761156
#[derive(Debug)]
1077-
pub(crate) struct TransportDumpRequestNew {}
1157+
pub(crate) struct TransportDumpRequestFbs {}
10781158

1079-
impl RequestFbs for TransportDumpRequestNew {
1159+
impl RequestFbs for TransportDumpRequestFbs {
10801160
const METHOD: request::Method = request::Method::TransportDump;
10811161
type HandlerId = TransportId;
10821162
type Response = response::Body;
@@ -1116,9 +1196,9 @@ request_response!(
11161196
);
11171197

11181198
#[derive(Debug)]
1119-
pub(crate) struct TransportGetStatsRequestNew {}
1199+
pub(crate) struct TransportGetStatsRequestFbs {}
11201200

1121-
impl RequestFbs for TransportGetStatsRequestNew {
1201+
impl RequestFbs for TransportGetStatsRequestFbs {
11221202
const METHOD: request::Method = request::Method::TransportGetStats;
11231203
type HandlerId = TransportId;
11241204
type Response = response::Body;
@@ -1151,6 +1231,42 @@ impl RequestFbs for TransportGetStatsRequestNew {
11511231
}
11521232
}
11531233

1234+
#[derive(Debug)]
1235+
pub(crate) struct TransportCloseRequestFbs {
1236+
pub(crate) transport_id: TransportId,
1237+
}
1238+
1239+
impl RequestFbs for TransportCloseRequestFbs {
1240+
const METHOD: request::Method = request::Method::RouterCloseTransport;
1241+
type HandlerId = RouterId;
1242+
type Response = ();
1243+
1244+
fn into_bytes(self, id: u32, handler_id: Self::HandlerId) -> Vec<u8> {
1245+
let mut builder = Builder::new();
1246+
let data =
1247+
router::CloseTransportRequest::create(&mut builder, self.transport_id.to_string());
1248+
let request_body = request::Body::create_close_transport_request(&mut builder, data);
1249+
1250+
let request = request::Request::create(
1251+
&mut builder,
1252+
id,
1253+
Self::METHOD,
1254+
handler_id.to_string(),
1255+
Some(request_body),
1256+
);
1257+
let message_body = message::Body::create_request(&mut builder, request);
1258+
let message = message::Message::create(&mut builder, message::Type::Request, message_body);
1259+
1260+
builder.finish(message, None).to_vec()
1261+
}
1262+
1263+
fn convert_response(
1264+
_response: Option<response::Body>,
1265+
) -> Result<Self::Response, Box<dyn Error>> {
1266+
Ok(())
1267+
}
1268+
}
1269+
11541270
request_response!(
11551271
TransportId,
11561272
"transport.connect",
@@ -1176,25 +1292,76 @@ request_response!(
11761292
},
11771293
);
11781294

1179-
request_response!(
1180-
TransportId,
1181-
"transport.connect",
1182-
TransportConnectPlainRequest {
1183-
#[serde(skip_serializing_if = "Option::is_none")]
1184-
ip: Option<IpAddr>,
1185-
#[serde(skip_serializing_if = "Option::is_none")]
1186-
port: Option<u16>,
1187-
#[serde(skip_serializing_if = "Option::is_none")]
1188-
rtcp_port: Option<u16>,
1189-
#[serde(skip_serializing_if = "Option::is_none")]
1190-
srtp_parameters: Option<SrtpParameters>,
1191-
},
1192-
TransportConnectResponsePlain {
1193-
tuple: Option<TransportTuple>,
1194-
rtcp_tuple: Option<TransportTuple>,
1195-
srtp_parameters: Option<SrtpParameters>,
1196-
},
1197-
);
1295+
#[derive(Debug)]
1296+
pub(crate) struct PlainTransportConnectRequestData {
1297+
pub(crate) ip: Option<IpAddr>,
1298+
pub(crate) port: Option<u16>,
1299+
pub(crate) rtcp_port: Option<u16>,
1300+
pub(crate) srtp_parameters: Option<SrtpParameters>,
1301+
}
1302+
1303+
#[derive(Debug)]
1304+
pub(crate) struct PlainTransportConnectResponseData {
1305+
pub(crate) tuple: TransportTuple,
1306+
pub(crate) rtcp_tuple: Option<TransportTuple>,
1307+
pub(crate) srtp_parameters: Option<SrtpParameters>,
1308+
}
1309+
1310+
#[derive(Debug)]
1311+
pub(crate) struct TransportConnectPlainRequest {
1312+
pub(crate) ip: Option<IpAddr>,
1313+
pub(crate) port: Option<u16>,
1314+
pub(crate) rtcp_port: Option<u16>,
1315+
pub(crate) srtp_parameters: Option<SrtpParameters>,
1316+
}
1317+
1318+
impl RequestFbs for TransportConnectPlainRequest {
1319+
const METHOD: request::Method = request::Method::PlaintransportConnect;
1320+
type HandlerId = TransportId;
1321+
type Response = PlainTransportConnectResponseData;
1322+
1323+
fn into_bytes(self, id: u32, handler_id: Self::HandlerId) -> Vec<u8> {
1324+
let mut builder = Builder::new();
1325+
let data = plain_transport::ConnectRequest::create(
1326+
&mut builder,
1327+
self.ip.map(|ip| ip.to_string()),
1328+
self.port,
1329+
self.rtcp_port,
1330+
self.srtp_parameters.map(|parameters| parameters.to_fbs()),
1331+
);
1332+
let request_body =
1333+
request::Body::create_plain_transport_connect_request(&mut builder, data);
1334+
let request = request::Request::create(
1335+
&mut builder,
1336+
id,
1337+
Self::METHOD,
1338+
handler_id.to_string(),
1339+
Some(request_body),
1340+
);
1341+
let message_body = message::Body::create_request(&mut builder, request);
1342+
let message = message::Message::create(&mut builder, message::Type::Request, message_body);
1343+
1344+
builder.finish(message, None).to_vec()
1345+
}
1346+
1347+
fn convert_response(
1348+
response: Option<response::Body>,
1349+
) -> Result<Self::Response, Box<dyn Error>> {
1350+
let Some(response::Body::FbsPlainTransportConnectResponse(data)) = response else {
1351+
panic!("Wrong message from worker: {response:?}");
1352+
};
1353+
1354+
Ok(PlainTransportConnectResponseData {
1355+
tuple: TransportTuple::from_fbs(data.tuple.as_ref()),
1356+
rtcp_tuple: data
1357+
.rtcp_tuple
1358+
.map(|tuple| TransportTuple::from_fbs(tuple.as_ref())),
1359+
srtp_parameters: data
1360+
.srtp_parameters
1361+
.map(|parameters| SrtpParameters::from_fbs(parameters.as_ref())),
1362+
})
1363+
}
1364+
}
11981365

11991366
request_response!(
12001367
TransportId,

rust/src/router.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -760,7 +760,7 @@ impl Router {
760760
let data = self
761761
.inner
762762
.channel
763-
.request(
763+
.request_fbs(
764764
self.inner.id,
765765
RouterCreatePlainTransportRequest {
766766
data: RouterCreatePlainTransportData::from_options(

0 commit comments

Comments
 (0)