From b72ede9a6a9cf26b54c23aff2e0bcc6eb0142a11 Mon Sep 17 00:00:00 2001 From: TenderIronh Date: Mon, 4 Sep 2023 23:11:42 +0800 Subject: [PATCH] optimize hole punching --- USAGE-ZH.md | 4 +- core/config.go | 15 +++- core/errorcode.go | 1 + core/handlepush.go | 16 +--- core/holepunch.go | 85 +++++++++++-------- core/iptree_test.go | 14 ++-- core/openp2p.go | 10 --- core/p2pnetwork.go | 189 ++++++++++++++++++++++++++----------------- core/p2ptunnel.go | 22 +++-- core/protocol.go | 96 +++++++++++++++++++++- core/udp.go | 3 +- core/underlay_tcp.go | 8 +- core/update.go | 56 ++++++++++--- docker/Dockerfile | 2 +- docker/get-client.sh | 4 +- 15 files changed, 349 insertions(+), 176 deletions(-) diff --git a/USAGE-ZH.md b/USAGE-ZH.md index a307e4f..052ad09 100644 --- a/USAGE-ZH.md +++ b/USAGE-ZH.md @@ -101,7 +101,7 @@ sudo /usr/local/openp2p/openp2p uninstall ## Docker运行 ``` # 把YOUR-TOKEN和YOUR-NODE-NAME替换成自己的 -docker run -d --net host --name openp2p-client -e OPENP2P_TOKEN=YOUR-TOKEN -e OPENP2P_NODE=YOUR-NODE-NAME openp2pcn/openp2p-client:latest +docker run -d --restart=always --net host --name openp2p-client -e OPENP2P_TOKEN=YOUR-TOKEN -e OPENP2P_NODE=YOUR-NODE-NAME openp2pcn/openp2p-client:latest OR -docker run -d --net host --name openp2p-client openp2pcn/openp2p-client:latest -token YOUR-TOKEN -node YOUR-NODE-NAME +docker run -d --restart=always --net host --name openp2p-client openp2pcn/openp2p-client:latest -token YOUR-TOKEN -node YOUR-NODE-NAME ``` diff --git a/core/config.go b/core/config.go index c82756c..935f696 100644 --- a/core/config.go +++ b/core/config.go @@ -23,6 +23,7 @@ type AppConfig struct { DstPort int DstHost string PeerUser string + RelayNode string Enabled int // default:1 // runtime info peerVersion string @@ -127,7 +128,7 @@ func (c *Config) save() { } func init() { - gConf.LogLevel = 1 + gConf.LogLevel = int(LvINFO) gConf.Network.ShareBandwidth = 10 gConf.Network.ServerHost = "api.openp2p.cn" gConf.Network.ServerPort = WsPort @@ -176,6 +177,16 @@ func (c *Config) setShareBandwidth(bw int) { defer c.save() c.Network.ShareBandwidth = bw } +func (c *Config) setIPv6(v6 string) { + c.mtx.Lock() + defer c.mtx.Unlock() + c.Network.publicIPv6 = v6 +} +func (c *Config) IPv6() string { + c.mtx.Lock() + defer c.mtx.Unlock() + return c.Network.publicIPv6 +} type NetworkConfig struct { // local info @@ -214,6 +225,7 @@ func parseParams(subCommand string) { tcpPort := fset.Int("tcpport", 0, "tcp port for upnp or publicip") protocol := fset.String("protocol", "tcp", "tcp or udp") appName := fset.String("appname", "", "app name") + relayNode := fset.String("relaynode", "", "relaynode") shareBandwidth := fset.Int("sharebandwidth", 10, "N mbps share bandwidth limit, private network no limit") daemonMode := fset.Bool("d", false, "daemonMode") notVerbose := fset.Bool("nv", false, "not log console") @@ -233,6 +245,7 @@ func parseParams(subCommand string) { config.SrcPort = *srcPort config.Protocol = *protocol config.AppName = *appName + config.RelayNode = *relayNode if !*newconfig { gConf.load() // load old config. otherwise will clear all apps } diff --git a/core/errorcode.go b/core/errorcode.go index 1bb7480..d47f486 100644 --- a/core/errorcode.go +++ b/core/errorcode.go @@ -18,4 +18,5 @@ var ( ErrMsgFormat = errors.New("message format wrong") ErrVersionNotCompatible = errors.New("version not compatible") ErrOverlayConnDisconnect = errors.New("overlay connection is disconnected") + ErrConnectRelayNode = errors.New("connect relay node error") ) diff --git a/core/handlepush.go b/core/handlepush.go index edf2a2c..4644b2b 100644 --- a/core/handlepush.go +++ b/core/handlepush.go @@ -6,7 +6,6 @@ import ( "encoding/json" "fmt" "os" - "os/exec" "path/filepath" "reflect" "time" @@ -50,6 +49,8 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error { // notify peer relay ready msg := TunnelMsg{ID: t.id} pn.push(r.From, MsgPushAddRelayTunnelRsp, msg) + } else { + pn.push(r.From, MsgPushAddRelayTunnelRsp, "error") // compatible with old version client, trigger unmarshal error } }(req) case MsgPushAPPKey: @@ -61,16 +62,7 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error { SaveKey(req.AppID, req.AppKey) case MsgPushUpdate: gLog.Println(LvINFO, "MsgPushUpdate") - update(pn.config.ServerHost, pn.config.ServerPort) // download new version first, then exec ./openp2p update - targetPath := filepath.Join(defaultInstallPath, defaultBinName) - args := []string{"update"} - env := os.Environ() - cmd := exec.Command(targetPath, args...) - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - cmd.Stdin = os.Stdin - cmd.Env = env - err := cmd.Run() + err := update(pn.config.ServerHost, pn.config.ServerPort) if err == nil { os.Exit(0) } @@ -123,7 +115,7 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error { pn.msgMapMtx.Lock() ch := pn.msgMap[pushHead.From] pn.msgMapMtx.Unlock() - ch <- msg + ch <- pushMsg{data: msg, ts: time.Now()} } return err } diff --git a/core/holepunch.go b/core/holepunch.go index 10c213b..e7e4661 100644 --- a/core/holepunch.go +++ b/core/holepunch.go @@ -6,7 +6,6 @@ import ( "fmt" "math/rand" "net" - "sync" "time" ) @@ -34,7 +33,6 @@ func handshakeC2C(t *P2PTunnel) (err error) { } } t.ra, _ = net.ResolveUDPAddr("udp", ra.String()) - // cone server side if head.MainType == MsgP2P && head.SubType == MsgPunchHandshake { gLog.Printf(LvDEBUG, "read %d handshake ", t.id) UDPWrite(conn, t.ra, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id}) @@ -43,13 +41,7 @@ func handshakeC2C(t *P2PTunnel) (err error) { gLog.Println(LvDEBUG, "handshakeC2C write MsgPunchHandshakeAck error", err) return err } - if head.MainType == MsgP2P && head.SubType == MsgPunchHandshakeAck { - gLog.Printf(LvDEBUG, "read %d handshake ack ", t.id) - gLog.Printf(LvINFO, "handshakeC2C ok") - return nil - } } - // cone client side will only read handshake ack if head.MainType == MsgP2P && head.SubType == MsgPunchHandshakeAck { gLog.Printf(LvDEBUG, "read %d handshake ack ", t.id) _, err = UDPWrite(conn, t.ra, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id}) @@ -65,8 +57,6 @@ func handshakeC2C(t *P2PTunnel) (err error) { func handshakeC2S(t *P2PTunnel) error { gLog.Printf(LvDEBUG, "handshakeC2S start") defer gLog.Printf(LvDEBUG, "handshakeC2S end") - // even if read timeout, continue handshake - t.pn.read(t.config.PeerNode, MsgPush, MsgPushHandshakeStart, HandshakeTimeout) r := rand.New(rand.NewSource(time.Now().UnixNano())) randPorts := r.Perm(65532) conn, err := net.ListenUDP("udp", t.la) @@ -74,11 +64,12 @@ func handshakeC2S(t *P2PTunnel) error { return err } defer conn.Close() + go func() error { gLog.Printf(LvDEBUG, "send symmetric handshake to %s from %d:%d start", t.config.peerIP, t.coneLocalPort, t.coneNatPort) for i := 0; i < SymmetricHandshakeNum; i++ { // TODO: auto calc cost time - time.Sleep(SymmetricHandshakeInterval) + // time.Sleep(SymmetricHandshakeInterval) dst, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", t.config.peerIP, randPorts[i]+2)) if err != nil { return err @@ -92,8 +83,7 @@ func handshakeC2S(t *P2PTunnel) error { gLog.Println(LvDEBUG, "send symmetric handshake end") return nil }() - deadline := time.Now().Add(HandshakeTimeout) - err = conn.SetReadDeadline(deadline) + err = conn.SetReadDeadline(time.Now().Add(HandshakeTimeout)) if err != nil { gLog.Println(LvERROR, "SymmetricHandshakeAckTimeout SetReadDeadline error") return err @@ -112,10 +102,27 @@ func handshakeC2S(t *P2PTunnel) error { return err } t.ra, _ = net.ResolveUDPAddr("udp", dst.String()) + if head.MainType == MsgP2P && head.SubType == MsgPunchHandshake { + gLog.Printf(LvDEBUG, "handshakeC2S read %d handshake ", t.id) + UDPWrite(conn, t.ra, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id}) + for { + _, head, _, _, err = UDPRead(conn, HandshakeTimeout) + if err != nil { + gLog.Println(LvDEBUG, "handshakeC2S handshake error") + return err + } + // waiting ack + if head.MainType == MsgP2P && head.SubType == MsgPunchHandshakeAck { + break + } + } + } if head.MainType == MsgP2P && head.SubType == MsgPunchHandshakeAck { - gLog.Printf(LvDEBUG, "handshakeC2S read %d handshake ack %s", t.id, dst.String()) - _, err = UDPWrite(conn, dst, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id}) + gLog.Printf(LvDEBUG, "handshakeC2S read %d handshake ack %s", t.id, t.ra.String()) + _, err = UDPWrite(conn, t.ra, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id}) return err + } else { + gLog.Println(LvDEBUG, "handshakeS2C read msg but not MsgPunchHandshakeAck") } gLog.Printf(LvINFO, "handshakeC2S ok") return nil @@ -128,12 +135,11 @@ func handshakeS2C(t *P2PTunnel) error { // sequencely udp send handshake, do not parallel send gLog.Printf(LvDEBUG, "send symmetric handshake to %s:%d start", t.config.peerIP, t.config.peerConeNatPort) gotIt := false - gotMtx := sync.Mutex{} for i := 0; i < SymmetricHandshakeNum; i++ { // TODO: auto calc cost time - time.Sleep(SymmetricHandshakeInterval) + // time.Sleep(SymmetricHandshakeInterval) go func(t *P2PTunnel) error { - conn, err := net.ListenUDP("udp", nil) + conn, err := net.ListenUDP("udp", nil) // TODO: system allocated port really random? if err != nil { gLog.Printf(LvDEBUG, "listen error") return err @@ -145,38 +151,51 @@ func handshakeS2C(t *P2PTunnel) error { // gLog.Println(LevelDEBUG, "one of the handshake error:", err) return err } - gotMtx.Lock() - defer gotMtx.Unlock() if gotIt { return nil } - gotIt = true - t.la, _ = net.ResolveUDPAddr("udp", conn.LocalAddr().String()) + if head.MainType == MsgP2P && head.SubType == MsgPunchHandshake { gLog.Printf(LvDEBUG, "handshakeS2C read %d handshake ", t.id) UDPWrite(conn, t.ra, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id}) - _, head, _, _, err = UDPRead(conn, HandshakeTimeout) - if err != nil { - gLog.Println(LvDEBUG, "handshakeS2C handshake error") - return err - } - if head.MainType == MsgP2P && head.SubType == MsgPunchHandshakeAck { - gLog.Printf(LvDEBUG, "handshakeS2C read %d handshake ack %s", t.id, conn.LocalAddr().String()) - gotCh <- t.la - return nil + // may read sereral MsgPunchHandshake + for { + _, head, _, _, err = UDPRead(conn, HandshakeTimeout) + if err != nil { + gLog.Println(LvDEBUG, "handshakeS2C handshake error") + return err + } + if head.MainType == MsgP2P && head.SubType == MsgPunchHandshakeAck { + break + } else { + gLog.Println(LvDEBUG, "handshakeS2C read msg but not MsgPunchHandshakeAck") + } } } + if head.MainType == MsgP2P && head.SubType == MsgPunchHandshakeAck { + gLog.Printf(LvDEBUG, "handshakeS2C read %d handshake ack %s", t.id, conn.LocalAddr().String()) + UDPWrite(conn, t.ra, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id}) + gotIt = true + la, _ := net.ResolveUDPAddr("udp", conn.LocalAddr().String()) + gotCh <- la + return nil + } else { + gLog.Println(LvDEBUG, "handshakeS2C read msg but not MsgPunchHandshakeAck") + } return nil }(t) } gLog.Printf(LvDEBUG, "send symmetric handshake end") - gLog.Println(LvDEBUG, "handshakeS2C ready, notify peer connect") - t.pn.push(t.config.PeerNode, MsgPushHandshakeStart, TunnelMsg{ID: t.id}) + if compareVersion(t.config.peerVersion, SymmetricSimultaneouslySendVersion) == LESS { // compatible with old client + gLog.Println(LvDEBUG, "handshakeS2C ready, notify peer connect") + t.pn.push(t.config.PeerNode, MsgPushHandshakeStart, TunnelMsg{ID: t.id}) + } select { case <-time.After(HandshakeTimeout): return fmt.Errorf("wait handshake failed") case la := <-gotCh: + t.la = la gLog.Println(LvDEBUG, "symmetric handshake ok", la) gLog.Printf(LvINFO, "handshakeS2C ok") } diff --git a/core/iptree_test.go b/core/iptree_test.go index f477e44..2391d49 100644 --- a/core/iptree_test.go +++ b/core/iptree_test.go @@ -71,7 +71,7 @@ func TestSegment2(t *testing.T) { iptree.Print() iptree.Add("10.1.1.90", "10.1.1.110") // interset iptree.Print() - t.Logf("blocklist size:%d\n", iptree.Size()) + t.Logf("ipTree size:%d\n", iptree.Size()) wrapTestContains(t, iptree, "10.1.1.40", true) wrapTestContains(t, iptree, "10.1.5.50", true) wrapTestContains(t, iptree, "10.1.6.50", true) @@ -94,7 +94,7 @@ func TestSegment2(t *testing.T) { } -func BenchmarkBuildBlockList20k(t *testing.B) { +func BenchmarkBuildipTree20k(t *testing.B) { iptree := NewIPTree("") iptree.Clear() iptree.Add("10.1.5.50", "10.1.5.100") @@ -116,16 +116,16 @@ func BenchmarkBuildBlockList20k(t *testing.B) { gap := uint32(10) for i := minIP; i < minIP+nodeNum*gap; i += gap { iptree.AddIntIP(i, i) - // t.Logf("blocklist size:%d\n", iptree.Size()) + // t.Logf("ipTree size:%d\n", iptree.Size()) } binary.Read(bytes.NewBuffer(net.ParseIP("100.1.1.1").To4()), binary.BigEndian, &minIP) // insert 100k block ip segment for i := minIP; i < minIP+nodeNum*gap; i += gap { iptree.AddIntIP(i, i+5) } - t.Logf("blocklist size:%d\n", iptree.Size()) + t.Logf("ipTree size:%d\n", iptree.Size()) iptree.Clear() - t.Logf("clear. blocklist size:%d\n", iptree.Size()) + t.Logf("clear. ipTree size:%d\n", iptree.Size()) } func BenchmarkQuery(t *testing.B) { iptree := NewIPTree("") @@ -149,14 +149,14 @@ func BenchmarkQuery(t *testing.B) { gap := uint32(10) for i := minIP; i < minIP+nodeNum*gap; i += gap { iptree.AddIntIP(i, i) - // t.Logf("blocklist size:%d\n", iptree.Size()) + // t.Logf("ipTree size:%d\n", iptree.Size()) } binary.Read(bytes.NewBuffer(net.ParseIP("100.1.1.1").To4()), binary.BigEndian, &minIP) // insert 100k block ip segment for i := minIP; i < minIP+nodeNum*gap; i += gap { iptree.AddIntIP(i, i+5) } - t.Logf("blocklist size:%d\n", iptree.Size()) + t.Logf("ipTree size:%d\n", iptree.Size()) t.ResetTimer() queryNum := 100 * 10000 for i := 0; i < queryNum; i++ { diff --git a/core/openp2p.go b/core/openp2p.go index e3770fa..0c414e8 100644 --- a/core/openp2p.go +++ b/core/openp2p.go @@ -20,16 +20,6 @@ func Run() { case "version", "-v", "--version": fmt.Println(OpenP2PVersion) return - case "update": - targetPath := filepath.Join(defaultInstallPath, defaultBinName) - d := daemon{} - err := d.Control("restart", targetPath, nil) - if err != nil { - gLog.Println(LvERROR, "restart service error:", err) - } else { - gLog.Println(LvINFO, "restart service ok.") - } - return case "install": install() return diff --git a/core/p2pnetwork.go b/core/p2pnetwork.go index 7ae0633..e13ab2b 100644 --- a/core/p2pnetwork.go +++ b/core/p2pnetwork.go @@ -27,8 +27,12 @@ var ( const ( retryLimit = 20 retryInterval = 10 * time.Second - dtma = 20 - ddtma = 5 +) + +// golang not support float64 const +var ( + ma10 float64 = 1.0 / 10 + ma20 float64 = 1.0 / 20 ) type P2PNetwork struct { @@ -40,12 +44,12 @@ type P2PNetwork struct { writeMtx sync.Mutex hbTime time.Time // for sync server time - t1 int64 // nanoSeconds - dt int64 // client faster then server dt nanoSeconds - dtma int64 - ddt int64 // differential of dt + t1 int64 // nanoSeconds + dt int64 // client faster then server dt nanoSeconds + ddtma int64 + ddt int64 // differential of dt // msgMap sync.Map - msgMap map[uint64]chan []byte //key: nodeID + msgMap map[uint64]chan pushMsg //key: nodeID msgMapMtx sync.Mutex config NetworkConfig allTunnels sync.Map @@ -53,6 +57,13 @@ type P2PNetwork struct { limiter *BandwidthLimiter } +type pushMsg struct { + data []byte + ts time.Time +} + +const msgExpiredTime = time.Minute + func P2PNetworkInstance(config *NetworkConfig) *P2PNetwork { if instance == nil { once.Do(func() { @@ -60,17 +71,24 @@ func P2PNetworkInstance(config *NetworkConfig) *P2PNetwork { restartCh: make(chan bool, 2), online: false, running: true, - msgMap: make(map[uint64]chan []byte), + msgMap: make(map[uint64]chan pushMsg), limiter: newBandwidthLimiter(config.ShareBandwidth), dt: 0, ddt: 0, } - instance.msgMap[0] = make(chan []byte) // for gateway + instance.msgMap[0] = make(chan pushMsg) // for gateway if config != nil { instance.config = *config } instance.init() go instance.run() + go func() { + for { + instance.refreshIPv6(false) + time.Sleep(time.Hour) + } + }() + cleanTempFiles() }) } return instance @@ -166,49 +184,56 @@ func (pn *P2PNetwork) addRelayTunnel(config AppConfig) (*P2PTunnel, uint64, stri gLog.Printf(LvINFO, "addRelayTunnel to %s start", config.PeerNode) defer gLog.Printf(LvINFO, "addRelayTunnel to %s end", config.PeerNode) // request a relay node or specify manually(TODO) - pn.write(MsgRelay, MsgRelayNodeReq, &RelayNodeReq{config.PeerNode}) - head, body := pn.read("", MsgRelay, MsgRelayNodeRsp, ClientAPITimeout) - if head == nil { - return nil, 0, "", errors.New("read MsgRelayNodeRsp error") - } - rsp := RelayNodeRsp{} - if err := json.Unmarshal(body, &rsp); err != nil { - return nil, 0, "", errors.New("unmarshal MsgRelayNodeRsp error") - } - if rsp.RelayName == "" || rsp.RelayToken == 0 { - gLog.Printf(LvERROR, "MsgRelayNodeReq error") - return nil, 0, "", errors.New("MsgRelayNodeReq error") - } - gLog.Printf(LvINFO, "got relay node:%s", rsp.RelayName) relayConfig := config - relayConfig.PeerNode = rsp.RelayName - relayConfig.peerToken = rsp.RelayToken + relayMode := "private" + if config.RelayNode == "" { + pn.write(MsgRelay, MsgRelayNodeReq, &RelayNodeReq{config.PeerNode}) + head, body := pn.read("", MsgRelay, MsgRelayNodeRsp, ClientAPITimeout) + if head == nil { + return nil, 0, "", errors.New("read MsgRelayNodeRsp error") + } + rsp := RelayNodeRsp{} + if err := json.Unmarshal(body, &rsp); err != nil { + return nil, 0, "", errors.New("unmarshal MsgRelayNodeRsp error") + } + if rsp.RelayName == "" || rsp.RelayToken == 0 { + gLog.Printf(LvERROR, "MsgRelayNodeReq error") + return nil, 0, "", errors.New("MsgRelayNodeReq error") + } + gLog.Printf(LvINFO, "got relay node:%s", rsp.RelayName) + + relayConfig.PeerNode = rsp.RelayName + relayConfig.peerToken = rsp.RelayToken + relayMode = rsp.Mode + } else { + relayConfig.PeerNode = config.RelayNode + } /// t, err := pn.addDirectTunnel(relayConfig, 0) if err != nil { gLog.Println(LvERROR, "direct connect error:", err) - return nil, 0, "", err + return nil, 0, "", ErrConnectRelayNode // relay offline will stop retry } // notify peer addRelayTunnel req := AddRelayTunnelReq{ From: pn.config.Node, - RelayName: rsp.RelayName, - RelayToken: rsp.RelayToken, + RelayName: relayConfig.PeerNode, + RelayToken: relayConfig.peerToken, } - gLog.Printf(LvINFO, "push relay %s---------%s", config.PeerNode, rsp.RelayName) + gLog.Printf(LvINFO, "push relay %s---------%s", config.PeerNode, relayConfig.PeerNode) pn.push(config.PeerNode, MsgPushAddRelayTunnelReq, &req) // wait relay ready - head, body = pn.read(config.PeerNode, MsgPush, MsgPushAddRelayTunnelRsp, PeerAddRelayTimeount) // TODO: const value + head, body := pn.read(config.PeerNode, MsgPush, MsgPushAddRelayTunnelRsp, PeerAddRelayTimeount) if head == nil { gLog.Printf(LvERROR, "read MsgPushAddRelayTunnelRsp error") return nil, 0, "", errors.New("read MsgPushAddRelayTunnelRsp error") } rspID := TunnelMsg{} if err = json.Unmarshal(body, &rspID); err != nil { - return nil, 0, "", errors.New("unmarshal MsgRelayNodeRsp error") + return nil, 0, "", errors.New("peer connect relayNode error") } - return t, rspID.ID, rsp.Mode, err + return t, rspID.ID, relayMode, err } // use *AppConfig to save status @@ -343,13 +368,8 @@ func (pn *P2PNetwork) addDirectTunnel(config AppConfig, tid uint64) (t *P2PTunne isClient = true } - if t = pn.findTunnel(&config); t != nil { - return t, nil - } - // create tunnel if not exist - pn.msgMapMtx.Lock() - pn.msgMap[nodeNameToID(config.PeerNode)] = make(chan []byte, 50) + pn.msgMap[nodeNameToID(config.PeerNode)] = make(chan pushMsg, 50) pn.msgMapMtx.Unlock() // server side if !isClient { @@ -365,7 +385,7 @@ func (pn *P2PNetwork) addDirectTunnel(config AppConfig, tid uint64) (t *P2PTunne return nil, initErr } // try TCP6 - if IsIPv6(config.peerIPv6) && IsIPv6(pn.config.publicIPv6) { + if IsIPv6(config.peerIPv6) && IsIPv6(gConf.IPv6()) { gLog.Println(LvINFO, "try TCP6") config.linkMode = LinkModeTCP6 config.isUnderlayServer = 0 @@ -422,9 +442,12 @@ func (pn *P2PNetwork) addDirectTunnel(config AppConfig, tid uint64) (t *P2PTunne } func (pn *P2PNetwork) newTunnel(config AppConfig, tid uint64, isClient bool) (t *P2PTunnel, err error) { - if existTunnel := pn.findTunnel(&config); existTunnel != nil { - return existTunnel, nil + if isClient { // only client side find existing tunnel + if existTunnel := pn.findTunnel(&config); existTunnel != nil { + return existTunnel, nil + } } + t = &P2PTunnel{pn: pn, config: config, id: tid, @@ -475,9 +498,14 @@ func (pn *P2PNetwork) init() error { gLog.Println(LvDEBUG, "detect NAT type:", pn.config.natType, " publicIP:", pn.config.publicIP) gatewayURL := fmt.Sprintf("%s:%d", pn.config.ServerHost, pn.config.ServerPort) uri := "/api/v1/login" - caCertPool := x509.NewCertPool() + caCertPool, err := x509.SystemCertPool() + if err != nil { + gLog.Println(LvERROR, "Failed to load system root CAs:", err) + } else { + caCertPool = x509.NewCertPool() + } caCertPool.AppendCertsFromPEM([]byte(rootCA)) - + caCertPool.AppendCertsFromPEM([]byte(ISRGRootX1)) config := tls.Config{ RootCAs: caCertPool, InsecureSkipVerify: false} // let's encrypt root cert "DST Root CA X3" expired at 2021/09/29. many old system(windows server 2008 etc) will not trust our cert @@ -520,13 +548,13 @@ func (pn *P2PNetwork) init() error { gLog.Println(LvDEBUG, "netinfo:", rsp) if rsp != nil && rsp.Country != "" { if IsIPv6(rsp.IP.String()) { - pn.config.publicIPv6 = rsp.IP.String() + gConf.setIPv6(rsp.IP.String()) } req.NetInfo = *rsp } else { pn.refreshIPv6(true) } - req.IPv6 = pn.config.publicIPv6 + req.IPv6 = gConf.IPv6() pn.write(MsgReport, MsgReportBasic, &req) }() go pn.autorunApp() @@ -576,27 +604,30 @@ func (pn *P2PNetwork) handleMessage(t int, msg []byte) { rtt := pn.hbTime.UnixNano() - pn.t1 t2 := int64(binary.LittleEndian.Uint64(msg[openP2PHeaderSize : openP2PHeaderSize+8])) dt := pn.t1 + rtt/2 - t2 - if pn.dtma == 0 { - pn.dtma = dt - } else { + if pn.dt != 0 { ddt := dt - pn.dt - // if pn.ddt == 0 { + if pn.ddtma > 0 && (ddt > (pn.ddtma+pn.ddtma/3) || ddt < (pn.ddtma-pn.ddtma/3)) { + newdt := pn.dt + pn.ddtma + gLog.Printf(LvDEBUG, "server time auto adjust dt=%.2fms to %.2fms", float64(dt)/float64(time.Millisecond), float64(newdt)/float64(time.Millisecond)) + dt = newdt + } pn.ddt = ddt - // } else { - // pn.ddt = pn.ddt/ddtma*(ddtma-1) + ddt/ddtma // avoid int64 overflow - // } - - pn.dtma = pn.dtma/dtma*(dtma-1) + dt/dtma // avoid int64 overflow + if pn.ddtma == 0 { + pn.ddtma = pn.ddt + } else { + pn.ddtma = int64(float64(pn.ddtma)*(1-ma10) + float64(pn.ddt)*ma10) // avoid int64 overflow + } } pn.dt = dt - gLog.Printf(LvDEBUG, "server time dt=%dms ddt=%dns rtt=%dms", pn.dt/int64(time.Millisecond), pn.ddt, rtt/int64(time.Millisecond)) + + gLog.Printf(LvDEBUG, "server time dt=%dms ddt=%dns ddtma=%dns rtt=%dms ", pn.dt/int64(time.Millisecond), pn.ddt, pn.ddtma, rtt/int64(time.Millisecond)) case MsgPush: handlePush(pn, head.SubType, msg) default: pn.msgMapMtx.Lock() ch := pn.msgMap[0] pn.msgMapMtx.Unlock() - ch <- msg + ch <- pushMsg{data: msg, ts: time.Now()} return } } @@ -695,19 +726,25 @@ func (pn *P2PNetwork) read(node string, mainType uint16, subType uint16, timeout gLog.Printf(LvERROR, "wait msg%d:%d timeout", mainType, subType) return case msg := <-ch: + if msg.ts.Before(time.Now().Add(-msgExpiredTime)) { + gLog.Printf(LvDEBUG, "msg expired error %d:%d", head.MainType, head.SubType) + continue + } head = &openP2PHeader{} - err := binary.Read(bytes.NewReader(msg[:openP2PHeaderSize]), binary.LittleEndian, head) + err := binary.Read(bytes.NewReader(msg.data[:openP2PHeaderSize]), binary.LittleEndian, head) if err != nil { gLog.Println(LvERROR, "read msg error:", err) break } if head.MainType != mainType || head.SubType != subType { + gLog.Printf(LvDEBUG, "read msg type error %d:%d, requeue it", head.MainType, head.SubType) + ch <- msg continue } if mainType == MsgPush { - body = msg[openP2PHeaderSize+PushHeaderSize:] + body = msg.data[openP2PHeaderSize+PushHeaderSize:] } else { - body = msg[openP2PHeaderSize:] + body = msg.data[openP2PHeaderSize:] } return } @@ -727,29 +764,33 @@ func (pn *P2PNetwork) updateAppHeartbeat(appID uint64) { // ipv6 will expired need to refresh. func (pn *P2PNetwork) refreshIPv6(force bool) { - if !force && !IsIPv6(pn.config.publicIPv6) { // not support ipv6, not refresh + if !force && !IsIPv6(gConf.IPv6()) { // not support ipv6, not refresh return } - client := &http.Client{Timeout: time.Second * 10} - r, err := client.Get("http://6.ipw.cn") - if err != nil { - gLog.Println(LvDEBUG, "refreshIPv6 error:", err) - return - } - defer r.Body.Close() - buf := make([]byte, 1024) - n, err := r.Body.Read(buf) - if n <= 0 { - gLog.Println(LvINFO, "refreshIPv6 error:", err, n) - return + for i := 0; i < 3; i++ { + client := &http.Client{Timeout: time.Second * 10} + r, err := client.Get("http://6.ipw.cn") + if err != nil { + gLog.Println(LvDEBUG, "refreshIPv6 error:", err) + continue + } + defer r.Body.Close() + buf := make([]byte, 1024) + n, err := r.Body.Read(buf) + if n <= 0 { + gLog.Println(LvINFO, "refreshIPv6 error:", err, n) + continue + } + gConf.setIPv6(string(buf[:n])) + break } - pn.config.publicIPv6 = string(buf[:n]) + } func (pn *P2PNetwork) requestPeerInfo(config *AppConfig) error { // request peer info pn.write(MsgQuery, MsgQueryPeerInfoReq, &QueryPeerInfoReq{config.peerToken, config.PeerNode}) - head, body := pn.read("", MsgQuery, MsgQueryPeerInfoRsp, UnderlayConnectTimeout) + head, body := pn.read("", MsgQuery, MsgQueryPeerInfoRsp, ClientAPITimeout) if head == nil { return ErrNetwork // network error, should not be ErrPeerOffline } diff --git a/core/p2ptunnel.go b/core/p2ptunnel.go index 912009f..93a7b93 100644 --- a/core/p2ptunnel.go +++ b/core/p2ptunnel.go @@ -40,9 +40,6 @@ func (t *P2PTunnel) initPort() { t.hbMtx.Unlock() t.hbTimeRelay = time.Now().Add(time.Second * 600) // TODO: test fake time localPort := int(rand.Uint32()%15000 + 50000) // if the process has bug, will add many upnp port. use specify p2p port by param - if t.config.linkMode == LinkModeTCP6 { - t.pn.refreshIPv6(false) - } if t.config.linkMode == LinkModeTCP6 || t.config.linkMode == LinkModeTCP4 { t.coneLocalPort = t.pn.config.TCPPort t.coneNatPort = t.pn.config.TCPPort // symmetric doesn't need coneNatPort @@ -74,7 +71,7 @@ func (t *P2PTunnel) connect() error { ConeNatPort: t.coneNatPort, NatType: t.pn.config.natType, HasIPv4: t.pn.config.hasIPv4, - IPv6: t.pn.config.publicIPv6, + IPv6: gConf.IPv6(), HasUPNPorNATPMP: t.pn.config.hasUPNPorNATPMP, ID: t.id, AppKey: appKey, @@ -86,7 +83,7 @@ func (t *P2PTunnel) connect() error { req.Token = t.pn.config.Token } t.pn.push(t.config.PeerNode, MsgPushConnectReq, req) - head, body := t.pn.read(t.config.PeerNode, MsgPush, MsgPushConnectRsp, ClientAPITimeout) + head, body := t.pn.read(t.config.PeerNode, MsgPush, MsgPushConnectRsp, HandshakeTimeout*3) if head == nil { return errors.New("connect error") } @@ -128,16 +125,16 @@ func (t *P2PTunnel) setRun(running bool) { } func (t *P2PTunnel) isActive() bool { - if !t.isRuning() { + if !t.isRuning() || t.conn == nil { return false } t.hbMtx.Lock() defer t.hbMtx.Unlock() - return time.Now().Before(t.hbTime.Add(TunnelIdleTimeout)) + return time.Now().Before(t.hbTime.Add(TunnelHeartbeatTime * 2)) } func (t *P2PTunnel) checkActive() bool { - if t.conn == nil { + if !t.isActive() { return false } hbt := time.Now() @@ -276,7 +273,7 @@ func (t *P2PTunnel) connectUnderlayQuic() (c underlay, err error) { return nil, fmt.Errorf("quic listen error:%s", e) } } - t.pn.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, HandshakeTimeout) + t.pn.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, HandshakeTimeout*3) gLog.Println(LvDEBUG, "quic dial to ", t.ra.String()) qConn, e = dialQuic(conn, t.ra, TunnelIdleTimeout) if e != nil { @@ -324,7 +321,7 @@ func (t *P2PTunnel) connectUnderlayTCP() (c underlay, err error) { // client side if t.config.linkMode == LinkModeTCP4 { - t.pn.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, HandshakeTimeout) + t.pn.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, HandshakeTimeout*3) } else { //tcp punch should sleep for punch the same time if compareVersion(t.config.peerVersion, SyncServerTimeVersion) == LESS { gLog.Printf(LvDEBUG, "peer version %s less than %s", t.config.peerVersion, SyncServerTimeVersion) @@ -380,7 +377,7 @@ func (t *P2PTunnel) connectUnderlayTCP6() (c underlay, err error) { } //else - t.pn.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, HandshakeTimeout) + t.pn.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, HandshakeTimeout*3) gLog.Println(LvDEBUG, "TCP6 dial to ", t.config.peerIPv6) qConn, err = dialTCP6(t.config.peerIPv6, t.config.peerConeNatPort) if err != nil { @@ -586,8 +583,7 @@ func (t *P2PTunnel) listen() error { t.punchTs = rsp.PunchTs // only private node set ipv6 if t.config.fromToken == t.pn.config.Token { - t.pn.refreshIPv6(false) - rsp.IPv6 = t.pn.config.publicIPv6 + rsp.IPv6 = gConf.IPv6() } t.pn.push(t.config.PeerNode, MsgPushConnectRsp, rsp) diff --git a/core/protocol.go b/core/protocol.go index 55fde16..a9b8c8b 100644 --- a/core/protocol.go +++ b/core/protocol.go @@ -10,10 +10,11 @@ import ( "time" ) -const OpenP2PVersion = "3.10.3" +const OpenP2PVersion = "3.10.9" const ProductName string = "openp2p" const LeastSupportVersion = "3.0.0" const SyncServerTimeVersion = "3.9.0" +const SymmetricSimultaneouslySendVersion = "3.10.7" const ( IfconfigPort1 = 27180 @@ -140,21 +141,20 @@ const ( SymmetricHandshakeNum = 800 // 0.992379 // SymmetricHandshakeNum = 1000 // 0.999510 SymmetricHandshakeInterval = time.Millisecond - HandshakeTimeout = time.Second * 5 + HandshakeTimeout = time.Second * 10 PeerAddRelayTimeount = time.Second * 30 // peer need times CheckActiveTimeout = time.Second * 5 PaddingSize = 16 AESKeySize = 16 MaxRetry = 10 Cone2ConePunchMaxRetry = 1 - RetryInterval = time.Second * 30 PublicIPEchoTimeout = time.Second * 1 NatTestTimeout = time.Second * 5 UDPReadTimeout = time.Second * 5 ClientAPITimeout = time.Second * 10 UnderlayConnectTimeout = time.Second * 10 MaxDirectTry = 3 - PunchTsDelay = time.Second * 2 + PunchTsDelay = time.Second * 3 ) // NATNone has public ip @@ -463,3 +463,91 @@ GB18jw+G7o4U3rGX8agHqVGQEd06gk1ZaprASpTGwSsv4A5ehosjT1d7re8Z5eD4 RVtXS+DplMClQ5QSlv3StwcWOsjyiAimNfLEU5xoEfq17yOJUTU1OTL4YOt16QUc C1tnzFr3k/ioqFR7cnyzNrbjlfPOmO9l2WReEbMP3bvaSHm6EcpJKS8= -----END CERTIFICATE-----` + +const ISRGRootX1 = `-----BEGIN CERTIFICATE----- +MIIEJjCCAw6gAwIBAgISAztStWq026ej0RCsk3ErbUdPMA0GCSqGSIb3DQEBCwUA +MDIxCzAJBgNVBAYTAlVTMRYwFAYDVQQKEw1MZXQncyBFbmNyeXB0MQswCQYDVQQD +EwJSMzAeFw0yMzA4MDQwODUyMjlaFw0yMzExMDIwODUyMjhaMBcxFTATBgNVBAMM +DCoub3BlbnAycC5jbjBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABPRdkgLV2FA+ +3g/GjcA9UcfDfIFYgofSTNbOCQFIiQVMXrTgAToF1/tWaS2LOuysZcCX6OE7SCeG +lQ+0g+L2qvujggIaMIICFjAOBgNVHQ8BAf8EBAMCB4AwHQYDVR0lBBYwFAYIKwYB +BQUHAwEGCCsGAQUFBwMCMAwGA1UdEwEB/wQCMAAwHQYDVR0OBBYEFIdL5LNQC+X4 +8r6u+3NlM238Vmk5MB8GA1UdIwQYMBaAFBQusxe3WFbLrlAJQOYfr52LFMLGMFUG +CCsGAQUFBwEBBEkwRzAhBggrBgEFBQcwAYYVaHR0cDovL3IzLm8ubGVuY3Iub3Jn +MCIGCCsGAQUFBzAChhZodHRwOi8vcjMuaS5sZW5jci5vcmcvMCMGA1UdEQQcMBqC +DCoub3BlbnAycC5jboIKb3BlbnAycC5jbjATBgNVHSAEDDAKMAgGBmeBDAECATCC +AQQGCisGAQQB1nkCBAIEgfUEgfIA8AB2AHoyjFTYty22IOo44FIe6YQWcDIThU07 +0ivBOlejUutSAAABib/2fCgAAAQDAEcwRQIhAJzf9XNe0cu9CNYLLqtDCZZMqI6u +qsHrnnXcFQW23ioZAiAgwKp5DwZw9RmF19KOjD6lYJfTxc+anJUuWAlMwu1HYQB2 +AK33vvp8/xDIi509nB4+GGq0Zyldz7EMJMqFhjTr3IKKAAABib/2fEEAAAQDAEcw +RQIgKeI7DopyzFXPdRQZKZrHVqfXQ8OipvlKXd5xRnKFjH4CIQDMM+TU+LOux8xK +1NlTiSs9DhQI/eU3ZXKxSQAqF50RnTANBgkqhkiG9w0BAQsFAAOCAQEATqZ+H2NT +cv4FzArD/Krlnur1OTitvpubRWM+ClB9Cr6pvPVB7Dp0/ALxu35ZmCtrzdJWTfmp +lHxU4nPXRPVjuPRNXooSyH//KTfHyf32919PQOi/qc/QEAuIzkGLJg0dIPKLxaNK +CiTWU+2iAYSHBgCWulfLX/RYNbBZQ9w0xIm3XhuMjCF/omG8ofuz1DmiRVR+17JA +nuDXQkxm7KhmbxSA4PsLwzvIWA8Wk44ZK7uncgRY3WIUXcVRELSFA5LuH67TOwag +al6iG56KW1N2Yy9YmeG27SYvHZYkjmuJ8NEy7Ku+Mi6gwO4hs0CYr2wtUacPfjKF +aYTGWSt6Pt8kmw== +-----END CERTIFICATE----- +-----BEGIN CERTIFICATE----- +MIIFFjCCAv6gAwIBAgIRAJErCErPDBinU/bWLiWnX1owDQYJKoZIhvcNAQELBQAw +TzELMAkGA1UEBhMCVVMxKTAnBgNVBAoTIEludGVybmV0IFNlY3VyaXR5IFJlc2Vh +cmNoIEdyb3VwMRUwEwYDVQQDEwxJU1JHIFJvb3QgWDEwHhcNMjAwOTA0MDAwMDAw +WhcNMjUwOTE1MTYwMDAwWjAyMQswCQYDVQQGEwJVUzEWMBQGA1UEChMNTGV0J3Mg +RW5jcnlwdDELMAkGA1UEAxMCUjMwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEK +AoIBAQC7AhUozPaglNMPEuyNVZLD+ILxmaZ6QoinXSaqtSu5xUyxr45r+XXIo9cP +R5QUVTVXjJ6oojkZ9YI8QqlObvU7wy7bjcCwXPNZOOftz2nwWgsbvsCUJCWH+jdx +sxPnHKzhm+/b5DtFUkWWqcFTzjTIUu61ru2P3mBw4qVUq7ZtDpelQDRrK9O8Zutm +NHz6a4uPVymZ+DAXXbpyb/uBxa3Shlg9F8fnCbvxK/eG3MHacV3URuPMrSXBiLxg +Z3Vms/EY96Jc5lP/Ooi2R6X/ExjqmAl3P51T+c8B5fWmcBcUr2Ok/5mzk53cU6cG +/kiFHaFpriV1uxPMUgP17VGhi9sVAgMBAAGjggEIMIIBBDAOBgNVHQ8BAf8EBAMC +AYYwHQYDVR0lBBYwFAYIKwYBBQUHAwIGCCsGAQUFBwMBMBIGA1UdEwEB/wQIMAYB +Af8CAQAwHQYDVR0OBBYEFBQusxe3WFbLrlAJQOYfr52LFMLGMB8GA1UdIwQYMBaA +FHm0WeZ7tuXkAXOACIjIGlj26ZtuMDIGCCsGAQUFBwEBBCYwJDAiBggrBgEFBQcw +AoYWaHR0cDovL3gxLmkubGVuY3Iub3JnLzAnBgNVHR8EIDAeMBygGqAYhhZodHRw +Oi8veDEuYy5sZW5jci5vcmcvMCIGA1UdIAQbMBkwCAYGZ4EMAQIBMA0GCysGAQQB +gt8TAQEBMA0GCSqGSIb3DQEBCwUAA4ICAQCFyk5HPqP3hUSFvNVneLKYY611TR6W +PTNlclQtgaDqw+34IL9fzLdwALduO/ZelN7kIJ+m74uyA+eitRY8kc607TkC53wl +ikfmZW4/RvTZ8M6UK+5UzhK8jCdLuMGYL6KvzXGRSgi3yLgjewQtCPkIVz6D2QQz +CkcheAmCJ8MqyJu5zlzyZMjAvnnAT45tRAxekrsu94sQ4egdRCnbWSDtY7kh+BIm +lJNXoB1lBMEKIq4QDUOXoRgffuDghje1WrG9ML+Hbisq/yFOGwXD9RiX8F6sw6W4 +avAuvDszue5L3sz85K+EC4Y/wFVDNvZo4TYXao6Z0f+lQKc0t8DQYzk1OXVu8rp2 +yJMC6alLbBfODALZvYH7n7do1AZls4I9d1P4jnkDrQoxB3UqQ9hVl3LEKQ73xF1O +yK5GhDDX8oVfGKF5u+decIsH4YaTw7mP3GFxJSqv3+0lUFJoi5Lc5da149p90Ids +hCExroL1+7mryIkXPeFM5TgO9r0rvZaBFOvV2z0gp35Z0+L4WPlbuEjN/lxPFin+ +HlUjr8gRsI3qfJOQFy/9rKIJR0Y/8Omwt/8oTWgy1mdeHmmjk7j1nYsvC9JSQ6Zv +MldlTTKB3zhThV1+XWYp6rjd5JW1zbVWEkLNxE7GJThEUG3szgBVGP7pSWTUTsqX +nLRbwHOoq7hHwg== +-----END CERTIFICATE----- +-----BEGIN CERTIFICATE----- +MIIFazCCA1OgAwIBAgIRAIIQz7DSQONZRGPgu2OCiwAwDQYJKoZIhvcNAQELBQAw +TzELMAkGA1UEBhMCVVMxKTAnBgNVBAoTIEludGVybmV0IFNlY3VyaXR5IFJlc2Vh +cmNoIEdyb3VwMRUwEwYDVQQDEwxJU1JHIFJvb3QgWDEwHhcNMTUwNjA0MTEwNDM4 +WhcNMzUwNjA0MTEwNDM4WjBPMQswCQYDVQQGEwJVUzEpMCcGA1UEChMgSW50ZXJu +ZXQgU2VjdXJpdHkgUmVzZWFyY2ggR3JvdXAxFTATBgNVBAMTDElTUkcgUm9vdCBY +MTCCAiIwDQYJKoZIhvcNAQEBBQADggIPADCCAgoCggIBAK3oJHP0FDfzm54rVygc +h77ct984kIxuPOZXoHj3dcKi/vVqbvYATyjb3miGbESTtrFj/RQSa78f0uoxmyF+ +0TM8ukj13Xnfs7j/EvEhmkvBioZxaUpmZmyPfjxwv60pIgbz5MDmgK7iS4+3mX6U +A5/TR5d8mUgjU+g4rk8Kb4Mu0UlXjIB0ttov0DiNewNwIRt18jA8+o+u3dpjq+sW +T8KOEUt+zwvo/7V3LvSye0rgTBIlDHCNAymg4VMk7BPZ7hm/ELNKjD+Jo2FR3qyH +B5T0Y3HsLuJvW5iB4YlcNHlsdu87kGJ55tukmi8mxdAQ4Q7e2RCOFvu396j3x+UC +B5iPNgiV5+I3lg02dZ77DnKxHZu8A/lJBdiB3QW0KtZB6awBdpUKD9jf1b0SHzUv +KBds0pjBqAlkd25HN7rOrFleaJ1/ctaJxQZBKT5ZPt0m9STJEadao0xAH0ahmbWn +OlFuhjuefXKnEgV4We0+UXgVCwOPjdAvBbI+e0ocS3MFEvzG6uBQE3xDk3SzynTn +jh8BCNAw1FtxNrQHusEwMFxIt4I7mKZ9YIqioymCzLq9gwQbooMDQaHWBfEbwrbw +qHyGO0aoSCqI3Haadr8faqU9GY/rOPNk3sgrDQoo//fb4hVC1CLQJ13hef4Y53CI +rU7m2Ys6xt0nUW7/vGT1M0NPAgMBAAGjQjBAMA4GA1UdDwEB/wQEAwIBBjAPBgNV +HRMBAf8EBTADAQH/MB0GA1UdDgQWBBR5tFnme7bl5AFzgAiIyBpY9umbbjANBgkq +hkiG9w0BAQsFAAOCAgEAVR9YqbyyqFDQDLHYGmkgJykIrGF1XIpu+ILlaS/V9lZL +ubhzEFnTIZd+50xx+7LSYK05qAvqFyFWhfFQDlnrzuBZ6brJFe+GnY+EgPbk6ZGQ +3BebYhtF8GaV0nxvwuo77x/Py9auJ/GpsMiu/X1+mvoiBOv/2X/qkSsisRcOj/KK +NFtY2PwByVS5uCbMiogziUwthDyC3+6WVwW6LLv3xLfHTjuCvjHIInNzktHCgKQ5 +ORAzI4JMPJ+GslWYHb4phowim57iaztXOoJwTdwJx4nLCgdNbOhdjsnvzqvHu7Ur +TkXWStAmzOVyyghqpZXjFaH3pO3JLF+l+/+sKAIuvtd7u+Nxe5AW0wdeRlN8NwdC +jNPElpzVmbUq4JUagEiuTDkHzsxHpFKVK7q4+63SM1N95R1NbdWhscdCb+ZAJzVc +oyi3B43njTOQ5yOf+1CceWxG1bQVs5ZufpsMljq4Ui0/1lvh+wjChP4kqKOJ2qxq +4RgqsahDYVvTH9w7jXbyLeiNdd8XM2w9U/t7y0Ff/9yi0GE44Za4rF2LN9d11TPA +mRGunUHBcnWEvgJBQl9nJEiU0Zsnvgc/ubhPgXRR4Xq37Z0j4r7g1SgEEzwxA57d +emyPxgcYxn/eR44/KJ4EBs+lVDR3veyJm+kXQ99b21/+jh5Xos1AnX5iItreGCc= +-----END CERTIFICATE----- +` diff --git a/core/udp.go b/core/udp.go index c2a8281..8eb3b29 100644 --- a/core/udp.go +++ b/core/udp.go @@ -20,8 +20,7 @@ func UDPWrite(conn *net.UDPConn, dst net.Addr, mainType uint16, subType uint16, func UDPRead(conn *net.UDPConn, timeout time.Duration) (ra net.Addr, head *openP2PHeader, result []byte, len int, err error) { if timeout > 0 { - deadline := time.Now().Add(timeout) - err = conn.SetReadDeadline(deadline) + err = conn.SetReadDeadline(time.Now().Add(timeout)) if err != nil { gLog.Println(LvERROR, "SetReadDeadline error") return nil, nil, nil, 0, err diff --git a/core/underlay_tcp.go b/core/underlay_tcp.go index 8f0bdc2..5e14e5b 100644 --- a/core/underlay_tcp.go +++ b/core/underlay_tcp.go @@ -77,7 +77,7 @@ func listenTCP(host string, port int, localPort int, mode string, t *P2PTunnel) time.Sleep(ts) } gLog.Println(LvDEBUG, (time.Now().UnixNano()-t.pn.dt)/(int64)(time.Millisecond), " send tcp punch: ", fmt.Sprintf("0.0.0.0:%d", localPort), "-->", fmt.Sprintf("%s:%d", host, port)) - c, err := reuse.DialTimeout("tcp", fmt.Sprintf("0.0.0.0:%d", localPort), fmt.Sprintf("%s:%d", host, port), HandshakeTimeout) + c, err := reuse.DialTimeout("tcp", fmt.Sprintf("0.0.0.0:%d", localPort), fmt.Sprintf("%s:%d", host, port), CheckActiveTimeout) if err != nil { gLog.Println(LvDEBUG, "send tcp punch: ", err) return nil, err @@ -90,7 +90,7 @@ func listenTCP(host string, port int, localPort int, mode string, t *P2PTunnel) if err != nil { return nil, err } - l.SetDeadline(time.Now().Add(HandshakeTimeout)) + l.SetDeadline(time.Now().Add(CheckActiveTimeout)) c, err := l.Accept() defer l.Close() if err != nil { @@ -103,9 +103,9 @@ func dialTCP(host string, port int, localPort int, mode string) (*underlayTCP, e var c net.Conn var err error if mode == LinkModeTCPPunch { - c, err = reuse.DialTimeout("tcp", fmt.Sprintf("0.0.0.0:%d", localPort), fmt.Sprintf("%s:%d", host, port), HandshakeTimeout) + c, err = reuse.DialTimeout("tcp", fmt.Sprintf("0.0.0.0:%d", localPort), fmt.Sprintf("%s:%d", host, port), CheckActiveTimeout) } else { - c, err = net.DialTimeout("tcp", fmt.Sprintf("%s:%d", host, port), HandshakeTimeout) + c, err = net.DialTimeout("tcp", fmt.Sprintf("%s:%d", host, port), CheckActiveTimeout) } if err != nil { diff --git a/core/update.go b/core/update.go index 54982b4..95a271e 100644 --- a/core/update.go +++ b/core/update.go @@ -17,11 +17,17 @@ import ( "time" ) -func update(host string, port int) { +func update(host string, port int) error { gLog.Println(LvINFO, "update start") defer gLog.Println(LvINFO, "update end") - caCertPool := x509.NewCertPool() + caCertPool, err := x509.SystemCertPool() + if err != nil { + gLog.Println(LvERROR, "Failed to load system root CAs:", err) + } else { + caCertPool = x509.NewCertPool() + } caCertPool.AppendCertsFromPEM([]byte(rootCA)) + caCertPool.AppendCertsFromPEM([]byte(ISRGRootX1)) c := http.Client{ Transport: &http.Transport{ @@ -35,32 +41,33 @@ func update(host string, port int) { rsp, err := c.Get(fmt.Sprintf("https://%s:%d/api/v1/update?fromver=%s&os=%s&arch=%s&user=%s&node=%s", host, port, OpenP2PVersion, goos, goarch, gConf.Network.User, gConf.Network.Node)) if err != nil { gLog.Println(LvERROR, "update:query update list failed:", err) - return + return err } defer rsp.Body.Close() if rsp.StatusCode != http.StatusOK { gLog.Println(LvERROR, "get update info error:", rsp.Status) - return + return err } rspBuf, err := ioutil.ReadAll(rsp.Body) if err != nil { gLog.Println(LvERROR, "update:read update list failed:", err) - return + return err } updateInfo := UpdateInfo{} if err = json.Unmarshal(rspBuf, &updateInfo); err != nil { gLog.Println(LvERROR, rspBuf, " update info decode error:", err) - return + return err } if updateInfo.Error != 0 { gLog.Println(LvERROR, "update error:", updateInfo.Error, updateInfo.ErrorDetail) - return + return err } err = updateFile(updateInfo.Url, "", "openp2p") if err != nil { gLog.Println(LvERROR, "update: download failed:", err) - return + return err } + return nil } // todo rollback on error @@ -72,8 +79,18 @@ func updateFile(url string, checksum string, dst string) error { gLog.Printf(LvERROR, "OpenFile %s error:%s", tmpFile, err) return err } + caCertPool, err := x509.SystemCertPool() + if err != nil { + gLog.Println(LvERROR, "Failed to load system root CAs:", err) + } else { + caCertPool = x509.NewCertPool() + } + caCertPool.AppendCertsFromPEM([]byte(rootCA)) + caCertPool.AppendCertsFromPEM([]byte(ISRGRootX1)) tr := &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: false}, + TLSClientConfig: &tls.Config{ + RootCAs: caCertPool, + InsecureSkipVerify: false}, } client := &http.Client{Transport: tr} response, err := client.Get(url) @@ -94,9 +111,13 @@ func updateFile(url string, checksum string, dst string) error { gLog.Println(LvINFO, "download ", url, " ok") gLog.Printf(LvINFO, "size: %d bytes", n) - err = os.Rename(os.Args[0], os.Args[0]+"0") - if err != nil && os.IsExist(err) { - gLog.Printf(LvINFO, " rename %s error:%s", os.Args[0], err) + err = os.Rename(os.Args[0], os.Args[0]+"0") // the old daemon process was using the 0 file, so it will prevent override it + if err != nil { + gLog.Printf(LvINFO, " rename %s error:%s, retry 1", os.Args[0], err) + err = os.Rename(os.Args[0], os.Args[0]+"1") + if err != nil { + gLog.Printf(LvINFO, " rename %s error:%s", os.Args[0], err) + } } // extract gLog.Println(LvINFO, "extract files") @@ -195,3 +216,14 @@ func extractTgz(dst, src string) error { } return nil } + +func cleanTempFiles() { + err := os.Remove(os.Args[0] + "0") + if err != nil { + gLog.Printf(LvDEBUG, " remove %s error:%s", os.Args[0]+"0", err) + } + err = os.Remove(os.Args[0] + "1") + if err != nil { + gLog.Printf(LvDEBUG, " remove %s error:%s", os.Args[0]+"0", err) + } +} diff --git a/docker/Dockerfile b/docker/Dockerfile index 7d75028..c92ef33 100755 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -6,7 +6,7 @@ RUN sed -i 's/dl-cdn.alpinelinux.org/mirrors.aliyun.com/g' /etc/apk/repositories rm -rf /tmp/* /var/tmp/* /var/cache/apk/* /var/cache/distfiles/* COPY get-client.sh / - +ARG DOCKER_VER="latest" RUN echo $TARGETPLATFORM && chmod +x /get-client.sh && ./get-client.sh ENTRYPOINT ["/openp2p"] diff --git a/docker/get-client.sh b/docker/get-client.sh index 95e6bc5..687642e 100755 --- a/docker/get-client.sh +++ b/docker/get-client.sh @@ -1,5 +1,7 @@ #!/bin/sh + +echo "Building version:${DOCKER_VER}" echo "Running on platform: $TARGETPLATFORM" # TARGETPLATFORM=$(echo $TARGETPLATFORM | tr ',' '/') echo "Running on platform: $TARGETPLATFORM" @@ -23,7 +25,7 @@ sysType="linux-amd64" sysType="linux-mipsbe" fi fi -url="https://openp2p.cn/download/v1/latest/openp2p-latest.$sysType.tar.gz" +url="https://openp2p.cn/download/v1/${DOCKER_VER}/openp2p-latest.$sysType.tar.gz" echo "download $url start" if [ -f /usr/bin/curl ]; then