Skip to content

Commit

Permalink
3.6.8
Browse files Browse the repository at this point in the history
  • Loading branch information
TenderIronh committed Mar 22, 2023
1 parent 791d910 commit 67e3a89
Show file tree
Hide file tree
Showing 13 changed files with 128 additions and 150 deletions.
6 changes: 3 additions & 3 deletions README-ZH.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,12 @@ make
4. ~~建立网站,用户可以在网站管理所有P2PApp和设备。查看设备在线状态,升级,增删查改重启P2PApp等~~(100%)
5. 建立公众号,用户可在微信公众号管理所有P2PApp和设备
6. 客户端提供WebUI
7. 支持自有服务器,开源服务器程序
7. ~~支持自有服务器,开源服务器程序~~(100%)
8. 共享节点调度模型优化,对不同的运营商优化
9. 方便二次开发,提供API和lib
10. 应用层支持UDP协议,实现很简单,但UDP应用较少暂不急(100%)
10. ~~应用层支持UDP协议,实现很简单,但UDP应用较少暂不急~~(100%)
11. 底层通信支持KCP协议,目前仅支持Quic;KCP专门对延时优化,被游戏加速器广泛使用,可以牺牲一定的带宽降低延时
12. 支持Android系统,让旧手机焕发青春变成移动网关
12. ~~支持Android系统,让旧手机焕发青春变成移动网关~~(100%)
13. 支持Windows网上邻居共享文件
14. 内网直连优化,用处不大,估计就用户测试时用到
15. ~~支持UPNP~~(100%)
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,12 @@ Short-Term:
4. ~~Build website, users can manage all P2PApp and devices via it. View devices' online status, upgrade, restart or CURD P2PApp .~~(100%)
5. Provide wechat official account, user can manage P2PApp nodes and deivce as same as website.
6. Provide WebUI on client side.
7. Support private server, open source server program.
7. ~~Support private server, open source server program.~~(100%)
8. Optimize our share scheduling model for different network operators.
9. Provide REST APIs and libary for secondary development.
10. ~~Support UDP at application layer, it is easy to implement but not urgent due to only a few applicaitons using UDP protocol.~~(100%)
11. Support KCP protocol underlay, currently support Quic only. KCP focus on delay optimization,which has been widely used as game accelerator,it can sacrifice part of bandwidth to reduce timelag.
12. Support Android platform, let the phones to be mobile gateway.
12. ~~Support Android platform, let the phones to be mobile gateway.~~(100%)
13. Support SMB Windows neighborhood.
14. Direct connection on intranet, for testing.
15. ~~Support UPNP.~~(100%)
Expand Down
10 changes: 10 additions & 0 deletions core/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,16 @@ func (c *Config) switchApp(app AppConfig, enabled int) {
}
}
}
func (c *Config) retryApp(peerNode string) {
c.mtx.Lock()
defer c.mtx.Unlock()
for i := 0; i < len(c.Apps); i++ {
if c.Apps[i].PeerNode == peerNode {
c.Apps[i].retryNum = 0
c.Apps[i].nextRetryTime = time.Now()
}
}
}

func (c *Config) add(app AppConfig, override bool) {
c.mtx.Lock()
Expand Down
17 changes: 9 additions & 8 deletions core/errorcode.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@ import (
var (
// ErrorS2S string = "s2s is not supported"
// ErrorHandshake string = "handshake error"
ErrorS2S = errors.New("s2s is not supported")
ErrorHandshake = errors.New("handshake error")
ErrorNewUser = errors.New("new user")
ErrorLogin = errors.New("user or password not correct")
ErrNodeTooShort = errors.New("node name too short, it must >=8 charaters")
ErrPeerOffline = errors.New("peer offline")
ErrMsgFormat = errors.New("message format wrong")
ErrVersionNotCompatible = errors.New("version not compatible")
ErrorS2S = errors.New("s2s is not supported")
ErrorHandshake = errors.New("handshake error")
ErrorNewUser = errors.New("new user")
ErrorLogin = errors.New("user or password not correct")
ErrNodeTooShort = errors.New("node name too short, it must >=8 charaters")
ErrPeerOffline = errors.New("peer offline")
ErrMsgFormat = errors.New("message format wrong")
ErrVersionNotCompatible = errors.New("version not compatible")
ErrOverlayConnDisconnect = errors.New("overlay connection is disconnected")
)
12 changes: 11 additions & 1 deletion core/handlepush.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error {
gLog.Printf(LvERROR, "wrong MsgPushConnectReq:%s", err)
return err
}
gLog.Printf(LvINFO, "%s is connecting...", req.From)
gLog.Printf(LvDEBUG, "%s is connecting...", req.From)
gLog.Println(LvDEBUG, "push connect response to ", req.From)
if compareVersion(req.Version, LeastSupportVersion) == LESS {
gLog.Println(LvERROR, ErrVersionNotCompatible.Error(), ":", req.From)
Expand Down Expand Up @@ -274,6 +274,16 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error {
// disable APP
pn.DeleteApp(config)
}
case MsgPushDstNodeOnline:
gLog.Println(LvINFO, "MsgPushDstNodeOnline")
app := PushDstNodeOnline{}
err := json.Unmarshal(msg[openP2PHeaderSize:], &app)
if err != nil {
gLog.Printf(LvERROR, "wrong MsgPushDstNodeOnline:%s %s", err, string(msg[openP2PHeaderSize:]))
return err
}
gLog.Println(LvINFO, "retry peerNode ", app.Node)
gConf.retryApp(app.Node)
default:
pn.msgMapMtx.Lock()
ch := pn.msgMap[pushHead.From]
Expand Down
2 changes: 1 addition & 1 deletion core/nat.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func natTest(serverHost string, serverPort int, localPort int) (publicIP string,
func getNATType(host string, udp1 int, udp2 int) (publicIP string, NATType int, hasIPvr int, hasUPNPorNATPMP int, err error) {
// the random local port may be used by other.
localPort := int(rand.Uint32()%15000 + 50000)
echoPort := P2PNetworkInstance(nil).config.TCPPort
echoPort := gConf.Network.TCPPort
ip1, port1, err := natTest(host, udp1, localPort)
if err != nil {
return "", 0, 0, 0, err
Expand Down
54 changes: 40 additions & 14 deletions core/overlay.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type overlayConn struct {
// for udp
connUDP *net.UDPConn
remoteAddr net.Addr
udpRelayData chan []byte
udpData chan []byte
lastReadUDPTs time.Time
}

Expand All @@ -44,15 +44,15 @@ func (oConn *overlayConn) run() {
defer gLog.Printf(LvDEBUG, "%d overlayConn run end", oConn.id)
oConn.running = true
oConn.lastReadUDPTs = time.Now()
buffer := make([]byte, ReadBuffLen+PaddingSize)
readBuf := buffer[:ReadBuffLen]
buffer := make([]byte, ReadBuffLen+PaddingSize) // 16 bytes for padding
reuseBuff := buffer[:ReadBuffLen]
encryptData := make([]byte, ReadBuffLen+PaddingSize) // 16 bytes for padding
tunnelHead := new(bytes.Buffer)
relayHead := new(bytes.Buffer)
binary.Write(relayHead, binary.LittleEndian, oConn.rtid)
binary.Write(tunnelHead, binary.LittleEndian, oConn.id)
for oConn.running && oConn.tunnel.isRuning() {
buff, dataLen, err := oConn.Read(readBuf)
readBuff, dataLen, err := oConn.Read(reuseBuff)
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Timeout() {
continue
Expand All @@ -61,9 +61,9 @@ func (oConn *overlayConn) run() {
gLog.Printf(LvDEBUG, "overlayConn %d read error:%s,close it", oConn.id, err)
break
}
payload := buff[:dataLen]
payload := readBuff[:dataLen]
if oConn.appKey != 0 {
payload, _ = encryptBytes(oConn.appKeyBytes, encryptData, buffer[:dataLen], dataLen)
payload, _ = encryptBytes(oConn.appKeyBytes, encryptData, readBuff[:dataLen], dataLen)
}
writeBytes := append(tunnelHead.Bytes(), payload...)
if oConn.rtid == 0 {
Expand Down Expand Up @@ -98,39 +98,49 @@ func (oConn *overlayConn) run() {
}
}

func (oConn *overlayConn) Read(reuseBuff []byte) (buff []byte, n int, err error) {
func (oConn *overlayConn) Read(reuseBuff []byte) (buff []byte, dataLen int, err error) {
if !oConn.running {
err = ErrOverlayConnDisconnect
return
}
if oConn.connUDP != nil {
if time.Now().After(oConn.lastReadUDPTs.Add(time.Minute * 5)) {
err = errors.New("udp close")
return
}
if oConn.remoteAddr != nil { // as server
select {
case buff = <-oConn.udpRelayData:
n = len(buff)
case buff = <-oConn.udpData:
dataLen = len(buff) - PaddingSize
oConn.lastReadUDPTs = time.Now()
case <-time.After(time.Second * 10):
err = ErrDeadlineExceeded
}
} else { // as client
oConn.connUDP.SetReadDeadline(time.Now().Add(5 * time.Second))
n, _, err = oConn.connUDP.ReadFrom(reuseBuff)
dataLen, _, err = oConn.connUDP.ReadFrom(reuseBuff)
if err == nil {
oConn.lastReadUDPTs = time.Now()
}
buff = reuseBuff
}
return
}
oConn.connTCP.SetReadDeadline(time.Now().Add(time.Second * 5))
n, err = oConn.connTCP.Read(reuseBuff)
buff = reuseBuff
if oConn.connTCP != nil {
oConn.connTCP.SetReadDeadline(time.Now().Add(time.Second * 5))
dataLen, err = oConn.connTCP.Read(reuseBuff)
buff = reuseBuff
}

return
}

// calling by p2pTunnel
func (oConn *overlayConn) Write(buff []byte) (n int, err error) {
// add mutex when multi-thread calling
if !oConn.running {
return 0, ErrOverlayConnDisconnect
}
if oConn.connUDP != nil {
if oConn.remoteAddr == nil {
n, err = oConn.connUDP.Write(buff)
Expand All @@ -142,9 +152,25 @@ func (oConn *overlayConn) Write(buff []byte) (n int, err error) {
}
return
}
n, err = oConn.connTCP.Write(buff)
if oConn.connTCP != nil {
n, err = oConn.connTCP.Write(buff)
}

if err != nil {
oConn.running = false
}
return
}

func (oConn *overlayConn) Close() (err error) {
oConn.running = false
if oConn.connTCP != nil {
oConn.connTCP.Close()
oConn.connTCP = nil
}
if oConn.connUDP != nil {
oConn.connUDP.Close()
oConn.connUDP = nil
}
return nil
}
30 changes: 15 additions & 15 deletions core/p2papp.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (app *p2pApp) listenUDP() error {
gLog.Printf(LvERROR, "listen error:%s", err)
return err
}
buffer := make([]byte, 64*1024)
buffer := make([]byte, 64*1024+PaddingSize)
udpID := make([]byte, 8)
for {
app.listenerUDP.SetReadDeadline(time.Now().Add(time.Second * 10))
Expand All @@ -127,8 +127,8 @@ func (app *p2pApp) listenUDP() error {
break
}
} else {
b := bytes.Buffer{}
b.Write(buffer[:len])
dupData := bytes.Buffer{} // should uses memory pool
dupData.Write(buffer[:len+PaddingSize])
// load from app.tunnel.overlayConns by remoteAddr error, new udp connection
remoteIP := strings.Split(remoteAddr.String(), ":")[0]
port, _ := strconv.Atoi(strings.Split(remoteAddr.String(), ":")[1])
Expand All @@ -139,19 +139,19 @@ func (app *p2pApp) listenUDP() error {
udpID[3] = a[3]
udpID[4] = byte(port)
udpID[5] = byte(port >> 8)
id := binary.LittleEndian.Uint64(udpID)
id := binary.LittleEndian.Uint64(udpID) // convert remoteIP:port to uint64
s, ok := app.tunnel.overlayConns.Load(id)
if !ok {
oConn := overlayConn{
tunnel: app.tunnel,
connUDP: app.listenerUDP,
remoteAddr: remoteAddr,
udpRelayData: make(chan []byte, 1000),
id: id,
isClient: true,
rtid: app.rtid,
appID: app.id,
appKey: app.key,
tunnel: app.tunnel,
connUDP: app.listenerUDP,
remoteAddr: remoteAddr,
udpData: make(chan []byte, 1000),
id: id,
isClient: true,
rtid: app.rtid,
appID: app.id,
appKey: app.key,
}
// calc key bytes for encrypt
if oConn.appKey != 0 {
Expand Down Expand Up @@ -181,15 +181,15 @@ func (app *p2pApp) listenUDP() error {
app.tunnel.conn.WriteBytes(MsgP2P, MsgRelayData, msgWithHead)
}
go oConn.run()
oConn.udpRelayData <- b.Bytes()
oConn.udpData <- dupData.Bytes()
}

// load from app.tunnel.overlayConns by remoteAddr ok, write relay data
overlayConn, ok := s.(*overlayConn)
if !ok {
continue
}
overlayConn.udpRelayData <- b.Bytes()
overlayConn.udpData <- dupData.Bytes()
}
}
return nil
Expand Down
Loading

0 comments on commit 67e3a89

Please sign in to comment.