From 67e3a8915a0446ab60c31fe859ff53b4cda25245 Mon Sep 17 00:00:00 2001 From: TenderIronh Date: Wed, 22 Mar 2023 23:11:38 +0800 Subject: [PATCH] 3.6.8 --- README-ZH.md | 6 +++--- README.md | 4 ++-- core/config.go | 10 +++++++++ core/errorcode.go | 17 ++++++++------- core/handlepush.go | 12 ++++++++++- core/nat.go | 2 +- core/overlay.go | 54 ++++++++++++++++++++++++++++++++++------------ core/p2papp.go | 30 +++++++++++++------------- core/p2pnetwork.go | 53 ++++++++++++++++++++++++--------------------- core/p2ptunnel.go | 13 +++-------- core/protocol.go | 6 +++++- core/totp.go | 35 ------------------------------ core/totp_test.go | 36 ------------------------------- 13 files changed, 128 insertions(+), 150 deletions(-) delete mode 100644 core/totp.go delete mode 100644 core/totp_test.go diff --git a/README-ZH.md b/README-ZH.md index 86bf2d6..7bb9b1c 100644 --- a/README-ZH.md +++ b/README-ZH.md @@ -110,12 +110,12 @@ make 4. ~~建立网站,用户可以在网站管理所有P2PApp和设备。查看设备在线状态,升级,增删查改重启P2PApp等~~(100%) 5. 建立公众号,用户可在微信公众号管理所有P2PApp和设备 6. 客户端提供WebUI -7. 支持自有服务器,开源服务器程序 +7. ~~支持自有服务器,开源服务器程序~~(100%) 8. 共享节点调度模型优化,对不同的运营商优化 9. 方便二次开发,提供API和lib -10. 应用层支持UDP协议,实现很简单,但UDP应用较少暂不急(100%) +10. ~~应用层支持UDP协议,实现很简单,但UDP应用较少暂不急~~(100%) 11. 底层通信支持KCP协议,目前仅支持Quic;KCP专门对延时优化,被游戏加速器广泛使用,可以牺牲一定的带宽降低延时 -12. 支持Android系统,让旧手机焕发青春变成移动网关 +12. ~~支持Android系统,让旧手机焕发青春变成移动网关~~(100%) 13. 支持Windows网上邻居共享文件 14. 内网直连优化,用处不大,估计就用户测试时用到 15. ~~支持UPNP~~(100%) diff --git a/README.md b/README.md index 4668c58..914625e 100644 --- a/README.md +++ b/README.md @@ -117,12 +117,12 @@ Short-Term: 4. ~~Build website, users can manage all P2PApp and devices via it. View devices' online status, upgrade, restart or CURD P2PApp .~~(100%) 5. Provide wechat official account, user can manage P2PApp nodes and deivce as same as website. 6. Provide WebUI on client side. -7. Support private server, open source server program. +7. ~~Support private server, open source server program.~~(100%) 8. Optimize our share scheduling model for different network operators. 9. Provide REST APIs and libary for secondary development. 10. ~~Support UDP at application layer, it is easy to implement but not urgent due to only a few applicaitons using UDP protocol.~~(100%) 11. Support KCP protocol underlay, currently support Quic only. KCP focus on delay optimization,which has been widely used as game accelerator,it can sacrifice part of bandwidth to reduce timelag. -12. Support Android platform, let the phones to be mobile gateway. +12. ~~Support Android platform, let the phones to be mobile gateway.~~(100%) 13. Support SMB Windows neighborhood. 14. Direct connection on intranet, for testing. 15. ~~Support UPNP.~~(100%) diff --git a/core/config.go b/core/config.go index 6eb48f7..8af489c 100644 --- a/core/config.go +++ b/core/config.go @@ -62,6 +62,16 @@ func (c *Config) switchApp(app AppConfig, enabled int) { } } } +func (c *Config) retryApp(peerNode string) { + c.mtx.Lock() + defer c.mtx.Unlock() + for i := 0; i < len(c.Apps); i++ { + if c.Apps[i].PeerNode == peerNode { + c.Apps[i].retryNum = 0 + c.Apps[i].nextRetryTime = time.Now() + } + } +} func (c *Config) add(app AppConfig, override bool) { c.mtx.Lock() diff --git a/core/errorcode.go b/core/errorcode.go index 158621f..6ff3f03 100644 --- a/core/errorcode.go +++ b/core/errorcode.go @@ -8,12 +8,13 @@ import ( var ( // ErrorS2S string = "s2s is not supported" // ErrorHandshake string = "handshake error" - ErrorS2S = errors.New("s2s is not supported") - ErrorHandshake = errors.New("handshake error") - ErrorNewUser = errors.New("new user") - ErrorLogin = errors.New("user or password not correct") - ErrNodeTooShort = errors.New("node name too short, it must >=8 charaters") - ErrPeerOffline = errors.New("peer offline") - ErrMsgFormat = errors.New("message format wrong") - ErrVersionNotCompatible = errors.New("version not compatible") + ErrorS2S = errors.New("s2s is not supported") + ErrorHandshake = errors.New("handshake error") + ErrorNewUser = errors.New("new user") + ErrorLogin = errors.New("user or password not correct") + ErrNodeTooShort = errors.New("node name too short, it must >=8 charaters") + ErrPeerOffline = errors.New("peer offline") + ErrMsgFormat = errors.New("message format wrong") + ErrVersionNotCompatible = errors.New("version not compatible") + ErrOverlayConnDisconnect = errors.New("overlay connection is disconnected") ) diff --git a/core/handlepush.go b/core/handlepush.go index 9a900a6..9447ed3 100644 --- a/core/handlepush.go +++ b/core/handlepush.go @@ -28,7 +28,7 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error { gLog.Printf(LvERROR, "wrong MsgPushConnectReq:%s", err) return err } - gLog.Printf(LvINFO, "%s is connecting...", req.From) + gLog.Printf(LvDEBUG, "%s is connecting...", req.From) gLog.Println(LvDEBUG, "push connect response to ", req.From) if compareVersion(req.Version, LeastSupportVersion) == LESS { gLog.Println(LvERROR, ErrVersionNotCompatible.Error(), ":", req.From) @@ -274,6 +274,16 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error { // disable APP pn.DeleteApp(config) } + case MsgPushDstNodeOnline: + gLog.Println(LvINFO, "MsgPushDstNodeOnline") + app := PushDstNodeOnline{} + err := json.Unmarshal(msg[openP2PHeaderSize:], &app) + if err != nil { + gLog.Printf(LvERROR, "wrong MsgPushDstNodeOnline:%s %s", err, string(msg[openP2PHeaderSize:])) + return err + } + gLog.Println(LvINFO, "retry peerNode ", app.Node) + gConf.retryApp(app.Node) default: pn.msgMapMtx.Lock() ch := pn.msgMap[pushHead.From] diff --git a/core/nat.go b/core/nat.go index 0974c92..4e93aba 100644 --- a/core/nat.go +++ b/core/nat.go @@ -91,7 +91,7 @@ func natTest(serverHost string, serverPort int, localPort int) (publicIP string, func getNATType(host string, udp1 int, udp2 int) (publicIP string, NATType int, hasIPvr int, hasUPNPorNATPMP int, err error) { // the random local port may be used by other. localPort := int(rand.Uint32()%15000 + 50000) - echoPort := P2PNetworkInstance(nil).config.TCPPort + echoPort := gConf.Network.TCPPort ip1, port1, err := natTest(host, udp1, localPort) if err != nil { return "", 0, 0, 0, err diff --git a/core/overlay.go b/core/overlay.go index fa2e657..296575a 100644 --- a/core/overlay.go +++ b/core/overlay.go @@ -35,7 +35,7 @@ type overlayConn struct { // for udp connUDP *net.UDPConn remoteAddr net.Addr - udpRelayData chan []byte + udpData chan []byte lastReadUDPTs time.Time } @@ -44,15 +44,15 @@ func (oConn *overlayConn) run() { defer gLog.Printf(LvDEBUG, "%d overlayConn run end", oConn.id) oConn.running = true oConn.lastReadUDPTs = time.Now() - buffer := make([]byte, ReadBuffLen+PaddingSize) - readBuf := buffer[:ReadBuffLen] + buffer := make([]byte, ReadBuffLen+PaddingSize) // 16 bytes for padding + reuseBuff := buffer[:ReadBuffLen] encryptData := make([]byte, ReadBuffLen+PaddingSize) // 16 bytes for padding tunnelHead := new(bytes.Buffer) relayHead := new(bytes.Buffer) binary.Write(relayHead, binary.LittleEndian, oConn.rtid) binary.Write(tunnelHead, binary.LittleEndian, oConn.id) for oConn.running && oConn.tunnel.isRuning() { - buff, dataLen, err := oConn.Read(readBuf) + readBuff, dataLen, err := oConn.Read(reuseBuff) if err != nil { if ne, ok := err.(net.Error); ok && ne.Timeout() { continue @@ -61,9 +61,9 @@ func (oConn *overlayConn) run() { gLog.Printf(LvDEBUG, "overlayConn %d read error:%s,close it", oConn.id, err) break } - payload := buff[:dataLen] + payload := readBuff[:dataLen] if oConn.appKey != 0 { - payload, _ = encryptBytes(oConn.appKeyBytes, encryptData, buffer[:dataLen], dataLen) + payload, _ = encryptBytes(oConn.appKeyBytes, encryptData, readBuff[:dataLen], dataLen) } writeBytes := append(tunnelHead.Bytes(), payload...) if oConn.rtid == 0 { @@ -98,7 +98,11 @@ func (oConn *overlayConn) run() { } } -func (oConn *overlayConn) Read(reuseBuff []byte) (buff []byte, n int, err error) { +func (oConn *overlayConn) Read(reuseBuff []byte) (buff []byte, dataLen int, err error) { + if !oConn.running { + err = ErrOverlayConnDisconnect + return + } if oConn.connUDP != nil { if time.Now().After(oConn.lastReadUDPTs.Add(time.Minute * 5)) { err = errors.New("udp close") @@ -106,15 +110,15 @@ func (oConn *overlayConn) Read(reuseBuff []byte) (buff []byte, n int, err error) } if oConn.remoteAddr != nil { // as server select { - case buff = <-oConn.udpRelayData: - n = len(buff) + case buff = <-oConn.udpData: + dataLen = len(buff) - PaddingSize oConn.lastReadUDPTs = time.Now() case <-time.After(time.Second * 10): err = ErrDeadlineExceeded } } else { // as client oConn.connUDP.SetReadDeadline(time.Now().Add(5 * time.Second)) - n, _, err = oConn.connUDP.ReadFrom(reuseBuff) + dataLen, _, err = oConn.connUDP.ReadFrom(reuseBuff) if err == nil { oConn.lastReadUDPTs = time.Now() } @@ -122,15 +126,21 @@ func (oConn *overlayConn) Read(reuseBuff []byte) (buff []byte, n int, err error) } return } - oConn.connTCP.SetReadDeadline(time.Now().Add(time.Second * 5)) - n, err = oConn.connTCP.Read(reuseBuff) - buff = reuseBuff + if oConn.connTCP != nil { + oConn.connTCP.SetReadDeadline(time.Now().Add(time.Second * 5)) + dataLen, err = oConn.connTCP.Read(reuseBuff) + buff = reuseBuff + } + return } // calling by p2pTunnel func (oConn *overlayConn) Write(buff []byte) (n int, err error) { // add mutex when multi-thread calling + if !oConn.running { + return 0, ErrOverlayConnDisconnect + } if oConn.connUDP != nil { if oConn.remoteAddr == nil { n, err = oConn.connUDP.Write(buff) @@ -142,9 +152,25 @@ func (oConn *overlayConn) Write(buff []byte) (n int, err error) { } return } - n, err = oConn.connTCP.Write(buff) + if oConn.connTCP != nil { + n, err = oConn.connTCP.Write(buff) + } + if err != nil { oConn.running = false } return } + +func (oConn *overlayConn) Close() (err error) { + oConn.running = false + if oConn.connTCP != nil { + oConn.connTCP.Close() + oConn.connTCP = nil + } + if oConn.connUDP != nil { + oConn.connUDP.Close() + oConn.connUDP = nil + } + return nil +} diff --git a/core/p2papp.go b/core/p2papp.go index 8056ef7..ca3b021 100644 --- a/core/p2papp.go +++ b/core/p2papp.go @@ -114,7 +114,7 @@ func (app *p2pApp) listenUDP() error { gLog.Printf(LvERROR, "listen error:%s", err) return err } - buffer := make([]byte, 64*1024) + buffer := make([]byte, 64*1024+PaddingSize) udpID := make([]byte, 8) for { app.listenerUDP.SetReadDeadline(time.Now().Add(time.Second * 10)) @@ -127,8 +127,8 @@ func (app *p2pApp) listenUDP() error { break } } else { - b := bytes.Buffer{} - b.Write(buffer[:len]) + dupData := bytes.Buffer{} // should uses memory pool + dupData.Write(buffer[:len+PaddingSize]) // load from app.tunnel.overlayConns by remoteAddr error, new udp connection remoteIP := strings.Split(remoteAddr.String(), ":")[0] port, _ := strconv.Atoi(strings.Split(remoteAddr.String(), ":")[1]) @@ -139,19 +139,19 @@ func (app *p2pApp) listenUDP() error { udpID[3] = a[3] udpID[4] = byte(port) udpID[5] = byte(port >> 8) - id := binary.LittleEndian.Uint64(udpID) + id := binary.LittleEndian.Uint64(udpID) // convert remoteIP:port to uint64 s, ok := app.tunnel.overlayConns.Load(id) if !ok { oConn := overlayConn{ - tunnel: app.tunnel, - connUDP: app.listenerUDP, - remoteAddr: remoteAddr, - udpRelayData: make(chan []byte, 1000), - id: id, - isClient: true, - rtid: app.rtid, - appID: app.id, - appKey: app.key, + tunnel: app.tunnel, + connUDP: app.listenerUDP, + remoteAddr: remoteAddr, + udpData: make(chan []byte, 1000), + id: id, + isClient: true, + rtid: app.rtid, + appID: app.id, + appKey: app.key, } // calc key bytes for encrypt if oConn.appKey != 0 { @@ -181,7 +181,7 @@ func (app *p2pApp) listenUDP() error { app.tunnel.conn.WriteBytes(MsgP2P, MsgRelayData, msgWithHead) } go oConn.run() - oConn.udpRelayData <- b.Bytes() + oConn.udpData <- dupData.Bytes() } // load from app.tunnel.overlayConns by remoteAddr ok, write relay data @@ -189,7 +189,7 @@ func (app *p2pApp) listenUDP() error { if !ok { continue } - overlayConn.udpRelayData <- b.Bytes() + overlayConn.udpData <- dupData.Bytes() } } return nil diff --git a/core/p2pnetwork.go b/core/p2pnetwork.go index c9c6b83..3538945 100644 --- a/core/p2pnetwork.go +++ b/core/p2pnetwork.go @@ -22,6 +22,11 @@ var ( once sync.Once ) +const ( + retryLimit = 20 + retryInterval = 10 * time.Second +) + type P2PNetwork struct { conn *websocket.Conn online bool @@ -63,7 +68,6 @@ func P2PNetworkInstance(config *NetworkConfig) *P2PNetwork { } func (pn *P2PNetwork) run() { - go pn.autorunApp() heartbeatTimer := time.NewTicker(NetworkHeartbeatTime) for pn.running { select { @@ -72,7 +76,8 @@ func (pn *P2PNetwork) run() { case <-pn.restartCh: pn.online = false - pn.wgReconnect.Wait() // wait read/write goroutine end + pn.wgReconnect.Wait() // wait read/autorunapp goroutine end + time.Sleep(NatTestTimeout) err := pn.init() if err != nil { gLog.Println(LvERROR, "P2PNetwork init error:", err) @@ -119,37 +124,38 @@ func (pn *P2PNetwork) runAll() { if appExist { pn.DeleteApp(*config) } - if config.retryNum > 0 { - gLog.Printf(LvINFO, "detect app %s disconnect, reconnecting the %d times...", config.AppName, config.retryNum) - if time.Now().Add(-time.Minute * 15).After(config.retryTime) { // normal lasts 15min - config.retryNum = 0 - } + if config.retryNum >= retryLimit { + continue } + + if time.Now().Add(-time.Minute * 15).After(config.retryTime) { // run normally 15min, reset retrynum + config.retryNum = 0 + } + config.retryNum++ + gLog.Printf(LvINFO, "detect app %s disconnect, reconnecting the %d times...", config.AppName, config.retryNum) config.retryTime = time.Now() - if config.retryNum > 20 { - config.Enabled = 0 - gLog.Printf(LvWARN, "app %s has stopped retry, manually enable it on Web console", config.AppName) - continue - } - config.nextRetryTime = time.Now().Add(time.Second * 10) + config.nextRetryTime = time.Now().Add(retryInterval) config.connectTime = time.Now() config.peerToken = pn.config.Token - gConf.mtx.Unlock() // AddApp will take a period of time + gConf.mtx.Unlock() // AddApp will take a period of time, let outside modify gConf err := pn.AddApp(*config) gConf.mtx.Lock() if err != nil { config.errMsg = err.Error() + if err == ErrPeerOffline { // stop retry, waiting for online + config.retryNum = retryLimit + gLog.Printf(LvINFO, " %s offline, it will auto reconnect when peer node online", config.PeerNode) + } } } } func (pn *P2PNetwork) autorunApp() { gLog.Println(LvINFO, "autorunApp start") - for pn.running { + pn.wgReconnect.Add(1) + defer pn.wgReconnect.Done() + for pn.running && pn.online { time.Sleep(time.Second) - if !pn.online { - continue - } pn.runAll() } gLog.Println(LvINFO, "autorunApp end") @@ -364,6 +370,7 @@ func (pn *P2PNetwork) addDirectTunnel(config AppConfig, tid uint64) (*P2PTunnel, initErr := t.requestPeerInfo() if initErr != nil { gLog.Println(LvERROR, "init error:", initErr) + return nil, initErr } var err error @@ -436,10 +443,7 @@ func (pn *P2PNetwork) newTunnel(t *P2PTunnel, tid uint64, isClient bool) error { func (pn *P2PNetwork) init() error { gLog.Println(LvINFO, "init start") pn.wgReconnect.Add(1) - go func() { //reconnect at least 5s - time.Sleep(NatTestTimeout) - pn.wgReconnect.Done() - }() + defer pn.wgReconnect.Done() var err error for { // detect nat type @@ -449,12 +453,14 @@ func (pn *P2PNetwork) init() error { pn.config.natType = NATSymmetric pn.config.hasIPv4 = 0 pn.config.hasUPNPorNATPMP = 0 + gLog.Println(LvINFO, "openp2pS2STest debug") } if strings.Contains(pn.config.Node, "openp2pC2CTest") { pn.config.natType = NATCone pn.config.hasIPv4 = 0 pn.config.hasUPNPorNATPMP = 0 + gLog.Println(LvINFO, "openp2pC2CTest debug") } if err != nil { gLog.Println(LvDEBUG, "detect NAT type error:", err) @@ -512,6 +518,7 @@ func (pn *P2PNetwork) init() error { req.IPv6 = pn.config.publicIPv6 pn.write(MsgReport, MsgReportBasic, &req) }() + go pn.autorunApp() gLog.Println(LvDEBUG, "P2PNetwork init ok") break } @@ -601,7 +608,6 @@ func (pn *P2PNetwork) write(mainType uint16, subType uint16, packet interface{}) defer pn.writeMtx.Unlock() if err = pn.conn.WriteMessage(websocket.BinaryMessage, msg); err != nil { gLog.Printf(LvERROR, "write msgType %d,%d error:%s", mainType, subType, err) - pn.conn.Close() } return err } @@ -644,7 +650,6 @@ func (pn *P2PNetwork) push(to string, subType uint16, packet interface{}) error defer pn.writeMtx.Unlock() if err = pn.conn.WriteMessage(websocket.BinaryMessage, pushMsg); err != nil { gLog.Printf(LvERROR, "push to %s error:%s", to, err) - pn.conn.Close() } return err } diff --git a/core/p2ptunnel.go b/core/p2ptunnel.go index d27b661..477659c 100644 --- a/core/p2ptunnel.go +++ b/core/p2ptunnel.go @@ -528,7 +528,7 @@ func (t *P2PTunnel) readLoop() { // calc key bytes for encrypt if oConn.appKey != 0 { - encryptKey := make([]byte, 16) + encryptKey := make([]byte, AESKeySize) binary.LittleEndian.PutUint64(encryptKey, oConn.appKey) binary.LittleEndian.PutUint64(encryptKey[8:], oConn.appKey) oConn.appKeyBytes = encryptKey @@ -548,7 +548,7 @@ func (t *P2PTunnel) readLoop() { i, ok := t.overlayConns.Load(overlayID) if ok { oConn := i.(*overlayConn) - oConn.running = false + oConn.Close() } default: } @@ -610,14 +610,7 @@ func (t *P2PTunnel) closeOverlayConns(appID uint64) { t.overlayConns.Range(func(_, i interface{}) bool { oConn := i.(*overlayConn) if oConn.appID == appID { - if oConn.connTCP != nil { - oConn.connTCP.Close() - oConn.connTCP = nil - } - if oConn.connUDP != nil { - oConn.connUDP.Close() - oConn.connUDP = nil - } + oConn.Close() } return true }) diff --git a/core/protocol.go b/core/protocol.go index a4c3ee4..459d026 100644 --- a/core/protocol.go +++ b/core/protocol.go @@ -10,7 +10,7 @@ import ( "time" ) -const OpenP2PVersion = "3.6.5" +const OpenP2PVersion = "3.6.8" const ProductName string = "openp2p" const LeastSupportVersion = "3.0.0" @@ -96,6 +96,7 @@ const ( MsgPushEditNode = 12 MsgPushAPPKey = 13 MsgPushReportLog = 14 + MsgPushDstNodeOnline = 15 ) // MsgP2P sub type message @@ -223,6 +224,9 @@ type PushConnectReq struct { LinkMode string `json:"linkMode,omitempty"` IsUnderlayServer int `json:"isServer,omitempty"` // Requset spec peer is server } +type PushDstNodeOnline struct { + Node string `json:"node,omitempty"` +} type PushConnectRsp struct { Error int `json:"error,omitempty"` From string `json:"from,omitempty"` diff --git a/core/totp.go b/core/totp.go deleted file mode 100644 index fb40a0e..0000000 --- a/core/totp.go +++ /dev/null @@ -1,35 +0,0 @@ -// Time-based One-time Password -package openp2p - -import ( - "crypto/hmac" - "crypto/sha256" - "encoding/binary" -) - -const TOTPStep = 30 // 30s -func GenTOTP(token uint64, ts int64) uint64 { - step := ts / TOTPStep - tbuff := make([]byte, 8) - binary.LittleEndian.PutUint64(tbuff, token) - mac := hmac.New(sha256.New, tbuff) - b := make([]byte, 8) - binary.LittleEndian.PutUint64(b, uint64(step)) - mac.Write(b) - num := binary.LittleEndian.Uint64(mac.Sum(nil)[:8]) - // fmt.Printf("%x\n", mac.Sum(nil)) - return num -} - -func VerifyTOTP(code uint64, token uint64, ts int64) bool { - if code == 0 { - return false - } - if code == token { - return true - } - if code == GenTOTP(token, ts) || code == GenTOTP(token, ts-TOTPStep) || code == GenTOTP(token, ts+TOTPStep) { - return true - } - return false -} diff --git a/core/totp_test.go b/core/totp_test.go deleted file mode 100644 index 720c730..0000000 --- a/core/totp_test.go +++ /dev/null @@ -1,36 +0,0 @@ -// Time-based One-time Password -package openp2p - -import ( - "testing" - "time" -) - -func TestTOTP(t *testing.T) { - for i := 0; i < 20; i++ { - ts := time.Now().Unix() - code := GenTOTP(13666999958022769123, ts) - t.Log(code) - if !VerifyTOTP(code, 13666999958022769123, ts) { - t.Error("TOTP error") - } - if !VerifyTOTP(code, 13666999958022769123, ts-10) { - t.Error("TOTP error") - } - if !VerifyTOTP(code, 13666999958022769123, ts+10) { - t.Error("TOTP error") - } - if VerifyTOTP(code, 13666999958022769123, ts+60) { - t.Error("TOTP error") - } - if VerifyTOTP(code, 13666999958022769124, ts+1) { - t.Error("TOTP error") - } - if VerifyTOTP(code, 13666999958022769125, ts+1) { - t.Error("TOTP error") - } - time.Sleep(time.Second) - t.Log("round", i, " ", ts, " test ok") - } - -}