Skip to content

Commit

Permalink
支持websocket协议
Browse files Browse the repository at this point in the history
  • Loading branch information
vnt-dev committed Jun 30, 2024
1 parent b529abe commit 57b3a61
Show file tree
Hide file tree
Showing 11 changed files with 625 additions and 67 deletions.
64 changes: 60 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "vnts"
version = "1.2.9"
version = "1.2.11"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
Expand All @@ -22,6 +22,7 @@ ring = { version = "0.17", optional = true }
rand = "0.8"
sha2 = { version = "0.10", features = ["oid"] }
colored = "2.1"
anyhow = "1.0.82"

thiserror = "1"
chrono = "0.4"
Expand All @@ -36,6 +37,7 @@ socket2 = { version = "0.5", features = ["all"] }
actix-web = { version = "4.5", optional = true }
actix-files = { version = "0.6", optional = true }
actix-web-static-files = { version = "4.0.1", optional = true }
tokio-tungstenite = "0.23.1"

serde = { version = "1", features = ["derive"] }
crossbeam-utils = "0.8"
Expand Down
1 change: 1 addition & 0 deletions src/core/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ mod tcp;
mod udp;
#[cfg(feature = "web")]
mod web;
mod websocket;

pub async fn start(
udp: std::net::UdpSocket,
Expand Down
48 changes: 37 additions & 11 deletions src/core/server/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use tokio::net::tcp::OwnedReadHalf;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::mpsc::{channel, Sender};

const TCP_MAX_PACKET_SIZE: usize = (1 << 24) - 1;

pub async fn start(tcp: TcpListener, handler: PacketHandler) {
if let Err(e) = accept(tcp, handler).await {
log::error!("accept {:?}", e);
Expand All @@ -17,24 +19,47 @@ async fn accept(tcp: TcpListener, handler: PacketHandler) -> io::Result<()> {
loop {
let (stream, addr) = tcp.accept().await?;
let _ = stream.set_nodelay(true);
stream_handle(stream, addr, handler.clone()).await;
tokio::spawn(stream_handle(stream, addr, handler.clone()));
}
}

async fn stream_handle(stream: TcpStream, addr: SocketAddr, handler: PacketHandler) {
{
let mut buf = [0u8; 1];
match stream.peek(&mut buf).await {
Ok(len) => {
if len == 0 {
log::warn!("数据流读取失败 {}", addr);
return;
}
if buf[0] != 0 {
//可能是ws协议
crate::core::server::websocket::handle_websocket_connection(
stream, addr, handler,
)
.await;
return;
}
}
Err(e) => {
log::warn!("数据流读取失败 {:?} {}", e, addr);
return;
}
}
}

let (r, mut w) = stream.into_split();

let (sender, mut receiver) = channel::<Vec<u8>>(100);
tokio::spawn(async move {
while let Some(data) = receiver.recv().await {
let len = data.len();
if len > TCP_MAX_PACKET_SIZE {
log::warn!("超过了tcp的最大长度传输 地址{}", addr);
return;
}
if let Err(e) = w
.write_all(&[
(len >> 24) as u8,
(len >> 16) as u8,
(len >> 8) as u8,
len as u8,
])
.write_all(&[0, (len >> 16) as u8, (len >> 8) as u8, len as u8])
.await
{
log::info!("发送失败,链接终止:{:?},{:?}", addr, e);
Expand Down Expand Up @@ -65,10 +90,11 @@ async fn tcp_read(
let sender = Some(sender);
loop {
read.read_exact(&mut head).await?;
let len = ((head[0] as usize) << 24)
| ((head[1] as usize) << 16)
| ((head[2] as usize) << 8)
| head[3] as usize;
if head[0] != 0 {
log::warn!("tcp数据流错误 来源地址 {}", addr);
return Ok(());
}
let len = ((head[1] as usize) << 16) | ((head[2] as usize) << 8) | head[3] as usize;
if len < 12 || len > buf.len() {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
Expand Down
69 changes: 69 additions & 0 deletions src/core/server/websocket/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use crate::core::service::PacketHandler;
use crate::protocol::NetPacket;
use anyhow::Context;
use futures_util::{SinkExt, StreamExt};
use std::net::SocketAddr;
use tokio::net::TcpStream;
use tokio::sync::mpsc::channel;
use tokio_tungstenite::accept_async;
use tokio_tungstenite::tungstenite::Message;

pub async fn handle_websocket_connection(
stream: TcpStream,
addr: SocketAddr,
handler: PacketHandler,
) {
tokio::spawn(async move {
if let Err(e) = handle_websocket_connection0(stream, addr, handler).await {
log::warn!("websocket err {:?} {}", e, addr);
}
});
}

async fn handle_websocket_connection0(
stream: TcpStream,
addr: SocketAddr,
handler: PacketHandler,
) -> anyhow::Result<()> {
let ws_stream = accept_async(stream)
.await
.with_context(|| format!("Error during WebSocket handshake {}", addr))?;

let (mut ws_write, mut ws_read) = ws_stream.split();

let (sender, mut receiver) = channel::<Vec<u8>>(100);
tokio::spawn(async move {
while let Some(data) = receiver.recv().await {
if let Err(e) = ws_write.send(Message::Binary(data)).await {
log::warn!("websocket err {:?} {}", e, addr);
break;
}
}
let _ = ws_write.close().await;
});
let sender = Some(sender);
while let Some(msg) = ws_read.next().await {
let msg = msg.with_context(|| format!("Error during WebSocket read {}", addr))?;
match msg {
Message::Text(txt) => log::info!("Received text message: {} {}", txt, addr),
Message::Binary(mut data) => {
let packet = NetPacket::new0(data.len(), &mut data)?;
if let Some(rs) = handler.handle(packet, addr, &sender).await {
if sender
.as_ref()
.unwrap()
.send(rs.buffer().to_vec())
.await
.is_err()
{
break;
}
}
}
Message::Ping(_) | Message::Pong(_) => (),
Message::Close(_) => break,
_ => {}
}
}
return Ok(());
}
4 changes: 2 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,8 @@ async fn main() {
log::info!("监听udp端口: {:?}", port);
println!("监听udp端口: {:?}", port);
let tcp = create_tcp(port).unwrap();
log::info!("监听tcp端口: {:?}", port);
println!("监听tcp端口: {:?}", port);
log::info!("监听tcp/ws端口: {:?}", port);
println!("监听tcp/ws端口: {:?}", port);
#[cfg(feature = "web")]
let http = if web_port != 0 {
let http = create_tcp(web_port).unwrap();
Expand Down
Loading

0 comments on commit 57b3a61

Please sign in to comment.