diff --git a/USAGE-ZH.md b/USAGE-ZH.md index df7fe67..6bc54a0 100644 --- a/USAGE-ZH.md +++ b/USAGE-ZH.md @@ -96,4 +96,9 @@ firewall-cmd --state C:\Program Files\OpenP2P\openp2p.exe uninstall # linux,macos sudo /usr/local/openp2p/openp2p uninstall -``` \ No newline at end of file +``` + +## Docker运行 +``` +docker run -d --net host --name openp2p-client openp2pcn/openp2p-client:latest -token YOUR-TOKEN -node YOUR-NODE-NAME +``` diff --git a/USAGE.md b/USAGE.md index 78194da..f3190e2 100644 --- a/USAGE.md +++ b/USAGE.md @@ -98,4 +98,9 @@ firewall-cmd --state C:\Program Files\OpenP2P\openp2p.exe uninstall # linux,macos sudo /usr/local/openp2p/openp2p uninstall +``` + +## Run with Docker +``` +docker run -d --net host --name openp2p-client openp2pcn/openp2p-client:latest -token YOUR-TOKEN -node YOUR-NODE-NAME ``` \ No newline at end of file diff --git a/cmd/openp2p.go b/cmd/openp2p.go index 2c1863b..3adecd2 100644 --- a/cmd/openp2p.go +++ b/cmd/openp2p.go @@ -1,7 +1,9 @@ package main -import openp2p "openp2p/core" +import ( + core "openp2p/core" +) func main() { - openp2p.Run() + core.Run() } diff --git a/core/handlepush.go b/core/handlepush.go index a50d972..9b55a9f 100644 --- a/core/handlepush.go +++ b/core/handlepush.go @@ -43,8 +43,7 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error { } // verify totp token or token t := totp.TOTP{Step: totp.RelayTOTPStep} - if t.Verify(req.Token, pn.config.Token, time.Now().Unix()+(pn.serverTs-pn.localTs)) || // localTs may behind, auto adjust ts - t.Verify(req.Token, pn.config.Token, time.Now().Unix()) { + if t.Verify(req.Token, pn.config.Token, time.Now().Unix()-pn.dt) { // localTs may behind, auto adjust ts gLog.Printf(LvINFO, "Access Granted\n") config := AppConfig{} config.peerNatType = req.NatType diff --git a/core/holepunch.go b/core/holepunch.go index 07774cb..10c213b 100644 --- a/core/holepunch.go +++ b/core/holepunch.go @@ -23,11 +23,11 @@ func handshakeC2C(t *P2PTunnel) (err error) { gLog.Println(LvDEBUG, "handshakeC2C write MsgPunchHandshake error:", err) return err } - ra, head, _, _, err := UDPRead(conn, SymmetricHandshakeAckTimeout) + ra, head, _, _, err := UDPRead(conn, HandshakeTimeout) if err != nil { time.Sleep(time.Millisecond * 200) gLog.Println(LvDEBUG, err, ", return this error when ip was not reachable, retry read") - ra, head, _, _, err = UDPRead(conn, SymmetricHandshakeAckTimeout) + ra, head, _, _, err = UDPRead(conn, HandshakeTimeout) if err != nil { gLog.Println(LvDEBUG, "handshakeC2C read MsgPunchHandshake error:", err) return err @@ -38,7 +38,7 @@ func handshakeC2C(t *P2PTunnel) (err error) { if head.MainType == MsgP2P && head.SubType == MsgPunchHandshake { gLog.Printf(LvDEBUG, "read %d handshake ", t.id) UDPWrite(conn, t.ra, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id}) - _, head, _, _, err = UDPRead(conn, SymmetricHandshakeAckTimeout) + _, head, _, _, err = UDPRead(conn, HandshakeTimeout) if err != nil { gLog.Println(LvDEBUG, "handshakeC2C write MsgPunchHandshakeAck error", err) return err @@ -66,7 +66,7 @@ func handshakeC2S(t *P2PTunnel) error { gLog.Printf(LvDEBUG, "handshakeC2S start") defer gLog.Printf(LvDEBUG, "handshakeC2S end") // even if read timeout, continue handshake - t.pn.read(t.config.PeerNode, MsgPush, MsgPushHandshakeStart, SymmetricHandshakeAckTimeout) + t.pn.read(t.config.PeerNode, MsgPush, MsgPushHandshakeStart, HandshakeTimeout) r := rand.New(rand.NewSource(time.Now().UnixNano())) randPorts := r.Perm(65532) conn, err := net.ListenUDP("udp", t.la) @@ -92,7 +92,7 @@ func handshakeC2S(t *P2PTunnel) error { gLog.Println(LvDEBUG, "send symmetric handshake end") return nil }() - deadline := time.Now().Add(SymmetricHandshakeAckTimeout) + deadline := time.Now().Add(HandshakeTimeout) err = conn.SetReadDeadline(deadline) if err != nil { gLog.Println(LvERROR, "SymmetricHandshakeAckTimeout SetReadDeadline error") @@ -140,7 +140,7 @@ func handshakeS2C(t *P2PTunnel) error { } defer conn.Close() UDPWrite(conn, t.ra, MsgP2P, MsgPunchHandshake, P2PHandshakeReq{ID: t.id}) - _, head, _, _, err := UDPRead(conn, SymmetricHandshakeAckTimeout) + _, head, _, _, err := UDPRead(conn, HandshakeTimeout) if err != nil { // gLog.Println(LevelDEBUG, "one of the handshake error:", err) return err @@ -155,7 +155,7 @@ func handshakeS2C(t *P2PTunnel) error { if head.MainType == MsgP2P && head.SubType == MsgPunchHandshake { gLog.Printf(LvDEBUG, "handshakeS2C read %d handshake ", t.id) UDPWrite(conn, t.ra, MsgP2P, MsgPunchHandshakeAck, P2PHandshakeReq{ID: t.id}) - _, head, _, _, err = UDPRead(conn, SymmetricHandshakeAckTimeout) + _, head, _, _, err = UDPRead(conn, HandshakeTimeout) if err != nil { gLog.Println(LvDEBUG, "handshakeS2C handshake error") return err @@ -174,7 +174,7 @@ func handshakeS2C(t *P2PTunnel) error { t.pn.push(t.config.PeerNode, MsgPushHandshakeStart, TunnelMsg{ID: t.id}) select { - case <-time.After(SymmetricHandshakeAckTimeout): + case <-time.After(HandshakeTimeout): return fmt.Errorf("wait handshake failed") case la := <-gotCh: gLog.Println(LvDEBUG, "symmetric handshake ok", la) diff --git a/core/log.go b/core/log.go index c744981..dd4f730 100644 --- a/core/log.go +++ b/core/log.go @@ -51,6 +51,7 @@ type logger struct { pid int maxLogSize int64 mode int + stdLogger *log.Logger } func NewLogger(path string, filePrefix string, level LogLevel, maxLogSize int64, mode int) *logger { @@ -73,7 +74,7 @@ func NewLogger(path string, filePrefix string, level LogLevel, maxLogSize int64, } os.Chmod(logFilePath, 0644) logfiles[lv] = f - loggers[lv] = log.New(f, "", log.LstdFlags) + loggers[lv] = log.New(f, "", log.LstdFlags|log.Lmicroseconds) } var le string if runtime.GOOS == "windows" { @@ -81,7 +82,8 @@ func NewLogger(path string, filePrefix string, level LogLevel, maxLogSize int64, } else { le = "\n" } - pLog := &logger{loggers, logfiles, level, logdir, &sync.Mutex{}, le, os.Getpid(), maxLogSize, mode} + pLog := &logger{loggers, logfiles, level, logdir, &sync.Mutex{}, le, os.Getpid(), maxLogSize, mode, log.New(os.Stdout, "", 0)} + pLog.stdLogger.SetFlags(log.LstdFlags | log.Lmicroseconds) go pLog.checkFile() return pLog } @@ -142,7 +144,7 @@ func (l *logger) Printf(level LogLevel, format string, params ...interface{}) { l.loggers[0].Printf("%d %s "+format+l.lineEnding, params...) } if l.mode == LogConsole || l.mode == LogFileAndConsole { - log.Printf("%d %s "+format+l.lineEnding, params...) + l.stdLogger.Printf("%d %s "+format+l.lineEnding, params...) } } @@ -159,6 +161,6 @@ func (l *logger) Println(level LogLevel, params ...interface{}) { l.loggers[0].Print(params...) } if l.mode == LogConsole || l.mode == LogFileAndConsole { - log.Print(params...) + l.stdLogger.Print(params...) } } diff --git a/core/nat.go b/core/nat.go index 198b545..6912e0c 100644 --- a/core/nat.go +++ b/core/nat.go @@ -3,7 +3,6 @@ package openp2p import ( "encoding/json" "fmt" - "log" "math/rand" "net" "strconv" @@ -14,20 +13,22 @@ import ( reuse "github.com/openp2p-cn/go-reuseport" ) -func natTCP(serverHost string, serverPort int, localPort int) (publicIP string, publicPort int) { +func natTCP(serverHost string, serverPort int) (publicIP string, publicPort int, localPort int) { // dialer := &net.Dialer{ // LocalAddr: &net.TCPAddr{ // IP: net.ParseIP("0.0.0.0"), // Port: localPort, // }, // } - conn, err := reuse.DialTimeout("tcp4", fmt.Sprintf("%s:%d", "0.0.0.0", localPort), fmt.Sprintf("%s:%d", serverHost, serverPort), time.Second*5) + conn, err := reuse.DialTimeout("tcp4", fmt.Sprintf("%s:%d", "0.0.0.0", 0), fmt.Sprintf("%s:%d", serverHost, serverPort), NatTestTimeout) // conn, err := net.Dial("tcp4", fmt.Sprintf("%s:%d", serverHost, serverPort)) + // log.Println(LvINFO, conn.LocalAddr()) if err != nil { fmt.Printf("Dial tcp4 %s:%d error:%s", serverHost, serverPort, err) return } defer conn.Close() + localPort, _ = strconv.Atoi(strings.Split(conn.LocalAddr().String(), ":")[1]) _, wrerr := conn.Write([]byte("1")) if wrerr != nil { fmt.Printf("Write error: %s\n", wrerr) @@ -151,14 +152,14 @@ func publicIPTest(publicIP string, echoPort int) (hasPublicIP int, hasUPNPorNATP gLog.Println(LvDEBUG, "could not perform UPNP external address:", err) break } - log.Println("PublicIP:", ext) + gLog.Println(LvINFO, "PublicIP:", ext) - externalPort, err := nat.AddPortMapping("udp", echoPort, echoPort, "openp2p", 604800) // 7 days, upnp will perform failed when os start + externalPort, err := nat.AddPortMapping("udp", echoPort, echoPort, "openp2p", 30) // 30 seconds fot upnp testing if err != nil { gLog.Println(LvDEBUG, "could not add udp UPNP port mapping", externalPort) break } else { - nat.AddPortMapping("tcp", echoPort, echoPort, "openp2p", 604800) // 7 days + nat.AddPortMapping("tcp", echoPort, echoPort, "openp2p", 604800) // 7 days for tcp connection } } gLog.Printf(LvDEBUG, "public ip test start %s:%d", publicIP, echoPort) diff --git a/core/overlay.go b/core/overlay.go index c8908f8..77089e1 100644 --- a/core/overlay.go +++ b/core/overlay.go @@ -84,16 +84,14 @@ func (oConn *overlayConn) run() { } oConn.tunnel.overlayConns.Delete(oConn.id) // notify peer disconnect - if oConn.isClient { - req := OverlayDisconnectReq{ID: oConn.id} - if oConn.rtid == 0 { - oConn.tunnel.conn.WriteMessage(MsgP2P, MsgOverlayDisconnectReq, &req) - } else { - // write relay data - msg, _ := newMessage(MsgP2P, MsgOverlayDisconnectReq, &req) - msgWithHead := append(relayHead.Bytes(), msg...) - oConn.tunnel.conn.WriteBytes(MsgP2P, MsgRelayData, msgWithHead) - } + req := OverlayDisconnectReq{ID: oConn.id} + if oConn.rtid == 0 { + oConn.tunnel.conn.WriteMessage(MsgP2P, MsgOverlayDisconnectReq, &req) + } else { + // write relay data + msg, _ := newMessage(MsgP2P, MsgOverlayDisconnectReq, &req) + msgWithHead := append(relayHead.Bytes(), msg...) + oConn.tunnel.conn.WriteBytes(MsgP2P, MsgRelayData, msgWithHead) } } diff --git a/core/p2papp.go b/core/p2papp.go index e7d33c5..2574df8 100644 --- a/core/p2papp.go +++ b/core/p2papp.go @@ -82,7 +82,7 @@ func (app *p2pApp) listenTCP() error { oConn.appKeyBytes = encryptKey } app.tunnel.overlayConns.Store(oConn.id, &oConn) - gLog.Printf(LvDEBUG, "Accept TCP overlayID:%d", oConn.id) + gLog.Printf(LvDEBUG, "Accept TCP overlayID:%d, %s", oConn.id, oConn.connTCP.RemoteAddr()) // tell peer connect req := OverlayConnectReq{ID: oConn.id, Token: app.tunnel.pn.config.Token, diff --git a/core/p2pnetwork.go b/core/p2pnetwork.go index b42a215..7945a9f 100644 --- a/core/p2pnetwork.go +++ b/core/p2pnetwork.go @@ -25,6 +25,8 @@ var ( const ( retryLimit = 20 retryInterval = 10 * time.Second + dtma = 20 + ddtma = 5 ) type P2PNetwork struct { @@ -34,9 +36,12 @@ type P2PNetwork struct { restartCh chan bool wgReconnect sync.WaitGroup writeMtx sync.Mutex - serverTs int64 - localTs int64 hbTime time.Time + // for sync server time + t1 int64 // nanoSeconds + dt int64 // client faster then server dt nanoSeconds + dtma int64 + ddt int64 // differential of dt // msgMap sync.Map msgMap map[uint64]chan []byte //key: nodeID msgMapMtx sync.Mutex @@ -55,6 +60,8 @@ func P2PNetworkInstance(config *NetworkConfig) *P2PNetwork { running: true, msgMap: make(map[uint64]chan []byte), limiter: newBandwidthLimiter(config.ShareBandwidth), + dt: 0, + ddt: 0, } instance.msgMap[0] = make(chan []byte) // for gateway if config != nil { @@ -69,11 +76,13 @@ func P2PNetworkInstance(config *NetworkConfig) *P2PNetwork { func (pn *P2PNetwork) run() { heartbeatTimer := time.NewTicker(NetworkHeartbeatTime) + pn.t1 = time.Now().UnixNano() + pn.write(MsgHeartbeat, 0, "") for pn.running { select { case <-heartbeatTimer.C: + pn.t1 = time.Now().UnixNano() pn.write(MsgHeartbeat, 0, "") - case <-pn.restartCh: pn.online = false pn.wgReconnect.Wait() // wait read/autorunapp goroutine end @@ -87,7 +96,7 @@ func (pn *P2PNetwork) run() { } func (pn *P2PNetwork) Connect(timeout int) bool { - // waiting for login response + // waiting for heartbeat for i := 0; i < (timeout / 1000); i++ { if pn.hbTime.After(time.Now().Add(-NetworkHeartbeatTime)) { return true @@ -552,8 +561,6 @@ func (pn *P2PNetwork) handleMessage(t int, msg []byte) { gLog.Printf(LvERROR, "login error:%d, detail:%s", rsp.Error, rsp.Detail) pn.running = false } else { - pn.serverTs = rsp.Ts - pn.hbTime = time.Now() pn.config.Token = rsp.Token pn.config.User = rsp.User gConf.setToken(rsp.Token) @@ -562,12 +569,28 @@ func (pn *P2PNetwork) handleMessage(t int, msg []byte) { gConf.setNode(rsp.Node) pn.config.Node = rsp.Node } - pn.localTs = time.Now().Unix() - gLog.Printf(LvINFO, "login ok. user=%s,node=%s,Server ts=%d, local ts=%d", rsp.User, rsp.Node, rsp.Ts, pn.localTs) + gLog.Printf(LvINFO, "login ok. user=%s,node=%s", rsp.User, rsp.Node) } case MsgHeartbeat: gLog.Printf(LvDEBUG, "P2PNetwork heartbeat ok") pn.hbTime = time.Now() + rtt := pn.hbTime.UnixNano() - pn.t1 + t2 := int64(binary.LittleEndian.Uint64(msg[openP2PHeaderSize : openP2PHeaderSize+8])) + dt := pn.t1 + rtt/2 - t2 + if pn.dtma == 0 { + pn.dtma = dt + } else { + ddt := dt - pn.dt + // if pn.ddt == 0 { + pn.ddt = ddt + // } else { + // pn.ddt = pn.ddt/ddtma*(ddtma-1) + ddt/ddtma // avoid int64 overflow + // } + + pn.dtma = pn.dtma/dtma*(dtma-1) + dt/dtma // avoid int64 overflow + } + pn.dt = dt + gLog.Printf(LvDEBUG, "server time dt=%dms ddt=%dns rtt=%dms", pn.dt/int64(time.Millisecond), pn.ddt, rtt/int64(time.Millisecond)) case MsgPush: handlePush(pn, head.SubType, msg) default: diff --git a/core/p2ptunnel.go b/core/p2ptunnel.go index 6ac1ded..640c532 100644 --- a/core/p2ptunnel.go +++ b/core/p2ptunnel.go @@ -29,12 +29,13 @@ type P2PTunnel struct { coneLocalPort int coneNatPort int linkModeWeb string // use config.linkmode + punchTs uint64 } func (t *P2PTunnel) requestPeerInfo() error { // request peer info t.pn.write(MsgQuery, MsgQueryPeerInfoReq, &QueryPeerInfoReq{t.config.peerToken, t.config.PeerNode}) - head, body := t.pn.read("", MsgQuery, MsgQueryPeerInfoRsp, time.Second*10) + head, body := t.pn.read("", MsgQuery, MsgQueryPeerInfoRsp, UnderlayConnectTimeout) if head == nil { return ErrNetwork // network error, should not be ErrPeerOffline } @@ -74,15 +75,15 @@ func (t *P2PTunnel) initPort() { t.coneNatPort = t.pn.config.TCPPort // symmetric doesn't need coneNatPort } if t.config.linkMode == LinkModeUDPPunch { - // prepare one random cone hole + // prepare one random cone hole manually _, natPort, _ := natTest(t.pn.config.ServerHost, t.pn.config.UDPPort1, localPort) t.coneLocalPort = localPort t.coneNatPort = natPort } if t.config.linkMode == LinkModeTCPPunch { - // prepare one random cone hole - _, natPort := natTCP(t.pn.config.ServerHost, IfconfigPort1, localPort) - t.coneLocalPort = localPort + // prepare one random cone hole by system automatically + _, natPort, localPort2 := natTCP(t.pn.config.ServerHost, IfconfigPort1) + t.coneLocalPort = localPort2 t.coneNatPort = natPort } t.la = &net.UDPAddr{IP: net.ParseIP(t.pn.config.localIP), Port: t.coneLocalPort} @@ -112,7 +113,7 @@ func (t *P2PTunnel) connect() error { req.Token = t.pn.config.Token } t.pn.push(t.config.PeerNode, MsgPushConnectReq, req) - head, body := t.pn.read(t.config.PeerNode, MsgPush, MsgPushConnectRsp, time.Second*10) + head, body := t.pn.read(t.config.PeerNode, MsgPush, MsgPushConnectRsp, ClientAPITimeout) if head == nil { return errors.New("connect error") } @@ -133,6 +134,7 @@ func (t *P2PTunnel) connect() error { t.config.peerVersion = rsp.Version t.config.peerConeNatPort = rsp.ConeNatPort t.config.peerIP = rsp.FromIP + t.punchTs = rsp.PunchTs err = t.start() if err != nil { gLog.Println(LvERROR, "handshake error:", err) @@ -213,6 +215,13 @@ func (t *P2PTunnel) handshake() error { return err } } + if compareVersion(t.config.peerVersion, SyncServerTimeVersion) == LESS { + gLog.Printf(LvDEBUG, "peer version %s less than %s", t.config.peerVersion, SyncServerTimeVersion) + } else { + ts := time.Duration(int64(t.punchTs) + t.pn.dt + t.pn.ddt*int64(time.Since(t.pn.hbTime)+PunchTsDelay)/int64(NetworkHeartbeatTime) - time.Now().UnixNano()) + gLog.Printf(LvDEBUG, "sleep %d ms", ts/time.Millisecond) + time.Sleep(ts) + } gLog.Println(LvDEBUG, "handshake to ", t.config.PeerNode) var err error // TODO: handle NATNone, nodes with public ip has no punching @@ -241,7 +250,7 @@ func (t *P2PTunnel) connectUnderlay() (err error) { case LinkModeTCP6: t.conn, err = t.connectUnderlayTCP6() case LinkModeTCP4: - t.conn, err = t.connectUnderlayTCP() + t.conn, err = t.connectUnderlayTCP() // TODO: can not listen the same tcp port in pararell case LinkModeTCPPunch: t.conn, err = t.connectUnderlayTCP() case LinkModeUDPPunch: @@ -298,7 +307,7 @@ func (t *P2PTunnel) connectUnderlayQuic() (c underlay, err error) { return nil, fmt.Errorf("quic listen error:%s", e) } } - t.pn.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, time.Second*5) + t.pn.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, HandshakeTimeout) gLog.Println(LvDEBUG, "quic dial to ", t.ra.String()) qConn, e = dialQuic(conn, t.ra, TunnelIdleTimeout) if e != nil { @@ -327,8 +336,7 @@ func (t *P2PTunnel) connectUnderlayTCP() (c underlay, err error) { defer gLog.Println(LvINFO, "connectUnderlayTCP end") var qConn *underlayTCP if t.config.isUnderlayServer == 1 { - t.pn.push(t.config.PeerNode, MsgPushUnderlayConnect, nil) - qConn, err = listenTCP(t.config.peerIP, t.config.peerConeNatPort, t.coneLocalPort, t.config.linkMode) + qConn, err = listenTCP(t.config.peerIP, t.config.peerConeNatPort, t.coneLocalPort, t.config.linkMode, t) if err != nil { return nil, fmt.Errorf("listen TCP error:%s", err) } @@ -345,9 +353,20 @@ func (t *P2PTunnel) connectUnderlayTCP() (c underlay, err error) { return qConn, nil } - //else - t.pn.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, time.Second*5) - gLog.Println(LvDEBUG, "TCP dial to ", t.config.peerIP, ":", t.config.peerConeNatPort) + // client side + if t.config.linkMode == LinkModeTCP4 { + t.pn.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, HandshakeTimeout) + } else { //tcp punch should sleep for punch the same time + if compareVersion(t.config.peerVersion, SyncServerTimeVersion) == LESS { + gLog.Printf(LvDEBUG, "peer version %s less than %s", t.config.peerVersion, SyncServerTimeVersion) + } else { + ts := time.Duration(int64(t.punchTs) + t.pn.dt + t.pn.ddt*int64(time.Since(t.pn.hbTime)+PunchTsDelay)/int64(NetworkHeartbeatTime) - time.Now().UnixNano()) + gLog.Printf(LvDEBUG, "sleep %d ms", ts/time.Millisecond) + time.Sleep(ts) + } + } + + gLog.Println(LvDEBUG, (time.Now().UnixNano()-t.pn.dt)/(int64)(time.Millisecond), " send tcp punch: ", fmt.Sprintf("0.0.0.0:%d", t.coneLocalPort), "-->", fmt.Sprintf("%s:%d", t.config.peerIP, t.config.peerConeNatPort)) qConn, err = dialTCP(t.config.peerIP, t.config.peerConeNatPort, t.coneLocalPort, t.config.linkMode) if err != nil { return nil, fmt.Errorf("TCP dial to %s:%d error:%s", t.config.peerIP, t.config.peerConeNatPort, err) @@ -374,7 +393,7 @@ func (t *P2PTunnel) connectUnderlayTCP6() (c underlay, err error) { var qConn *underlayTCP6 if t.config.isUnderlayServer == 1 { t.pn.push(t.config.PeerNode, MsgPushUnderlayConnect, nil) - qConn, err = listenTCP6(t.coneNatPort, TunnelIdleTimeout) + qConn, err = listenTCP6(t.coneNatPort, HandshakeTimeout) if err != nil { return nil, fmt.Errorf("listen TCP6 error:%s", err) } @@ -392,7 +411,7 @@ func (t *P2PTunnel) connectUnderlayTCP6() (c underlay, err error) { } //else - t.pn.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, time.Second*5) + t.pn.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, HandshakeTimeout) gLog.Println(LvDEBUG, "TCP6 dial to ", t.config.peerIPv6) qConn, err = dialTCP6(t.config.peerIPv6, t.config.peerConeNatPort) if err != nil { @@ -507,7 +526,7 @@ func (t *P2PTunnel) readLoop() { } overlayID := req.ID - gLog.Printf(LvDEBUG, "App:%d overlayID:%d connect %+v", req.AppID, overlayID, req) + gLog.Printf(LvDEBUG, "App:%d overlayID:%d connect %s:%d", req.AppID, overlayID, req.DstIP, req.DstPort) oConn := overlayConn{ tunnel: t, id: overlayID, @@ -520,7 +539,8 @@ func (t *P2PTunnel) readLoop() { if req.Protocol == "udp" { oConn.connUDP, err = net.DialUDP("udp", nil, &net.UDPAddr{IP: net.ParseIP(req.DstIP), Port: req.DstPort}) } else { - oConn.connTCP, err = net.DialTimeout("tcp", fmt.Sprintf("%s:%d", req.DstIP, req.DstPort), time.Second*5) + oConn.connTCP, err = net.DialTimeout("tcp", fmt.Sprintf("%s:%d", req.DstIP, req.DstPort), HandshakeTimeout) + } if err != nil { gLog.Println(LvERROR, err) @@ -593,8 +613,10 @@ func (t *P2PTunnel) listen() error { FromIP: t.pn.config.publicIP, ConeNatPort: t.coneNatPort, ID: t.id, + PunchTs: uint64(time.Now().UnixNano() + int64(PunchTsDelay) - t.pn.dt), Version: OpenP2PVersion, } + t.punchTs = rsp.PunchTs // only private node set ipv6 if t.config.fromToken == t.pn.config.Token { t.pn.refreshIPv6(false) diff --git a/core/protocol.go b/core/protocol.go index 184aba0..3e9221f 100644 --- a/core/protocol.go +++ b/core/protocol.go @@ -10,9 +10,10 @@ import ( "time" ) -const OpenP2PVersion = "3.8.0" +const OpenP2PVersion = "3.9.1" const ProductName string = "openp2p" const LeastSupportVersion = "3.0.0" +const SyncServerTimeVersion = "3.9.0" const ( IfconfigPort1 = 27180 @@ -138,19 +139,21 @@ const ( TunnelIdleTimeout = time.Minute SymmetricHandshakeNum = 800 // 0.992379 // SymmetricHandshakeNum = 1000 // 0.999510 - SymmetricHandshakeInterval = time.Millisecond - SymmetricHandshakeAckTimeout = time.Second * 5 - PeerAddRelayTimeount = time.Second * 20 - CheckActiveTimeout = time.Second * 5 - PaddingSize = 16 - AESKeySize = 16 - MaxRetry = 10 - RetryInterval = time.Second * 30 - PublicIPEchoTimeout = time.Second * 1 - NatTestTimeout = time.Second * 5 - UDPReadTimeout = time.Second * 5 - ClientAPITimeout = time.Second * 10 - MaxDirectTry = 3 + SymmetricHandshakeInterval = time.Millisecond + HandshakeTimeout = time.Second * 5 + PeerAddRelayTimeount = HandshakeTimeout * 2 + CheckActiveTimeout = time.Second * 5 + PaddingSize = 16 + AESKeySize = 16 + MaxRetry = 10 + RetryInterval = time.Second * 30 + PublicIPEchoTimeout = time.Second * 1 + NatTestTimeout = time.Second * 5 + UDPReadTimeout = time.Second * 5 + ClientAPITimeout = time.Second * 10 + UnderlayConnectTimeout = time.Second * 10 + MaxDirectTry = 3 + PunchTsDelay = time.Second * 2 ) // NATNone has public ip @@ -240,6 +243,7 @@ type PushConnectRsp struct { ConeNatPort int `json:"coneNatPort,omitempty"` //it's not only cone, but also upnp or nat-pmp hole FromIP string `json:"fromIP,omitempty"` ID uint64 `json:"id,omitempty"` + PunchTs uint64 `json:"punchts,omitempty"` // server timestamp Version string `json:"version,omitempty"` } type PushRsp struct { diff --git a/core/underlay_quic.go b/core/underlay_quic.go index a3476ea..4b2be0f 100644 --- a/core/underlay_quic.go +++ b/core/underlay_quic.go @@ -15,10 +15,10 @@ import ( "sync" "time" - "github.com/lucas-clemente/quic-go" + "github.com/quic-go/quic-go" ) -//quic.DialContext do not support version 44,disable it +// quic.DialContext do not support version 44,disable it var quicVersion []quic.VersionNumber type underlayQUIC struct { @@ -87,7 +87,7 @@ func (conn *underlayQUIC) CloseListener() { } func (conn *underlayQUIC) Accept() error { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + ctx, cancel := context.WithTimeout(context.Background(), UnderlayConnectTimeout) defer cancel() sess, err := conn.listener.Accept(ctx) if err != nil { diff --git a/core/underlay_tcp.go b/core/underlay_tcp.go index af9d3e5..8f0bdc2 100644 --- a/core/underlay_tcp.go +++ b/core/underlay_tcp.go @@ -67,21 +67,30 @@ func (conn *underlayTCP) Close() error { return conn.Conn.Close() } -func listenTCP(host string, port int, localPort int, mode string) (*underlayTCP, error) { +func listenTCP(host string, port int, localPort int, mode string, t *P2PTunnel) (*underlayTCP, error) { if mode == LinkModeTCPPunch { - c, err := reuse.DialTimeout("tcp", fmt.Sprintf("0.0.0.0:%d", localPort), fmt.Sprintf("%s:%d", host, port), SymmetricHandshakeAckTimeout) + if compareVersion(t.config.peerVersion, SyncServerTimeVersion) == LESS { + gLog.Printf(LvDEBUG, "peer version %s less than %s", t.config.peerVersion, SyncServerTimeVersion) + } else { + ts := time.Duration(int64(t.punchTs) + t.pn.dt - time.Now().UnixNano()) + gLog.Printf(LvDEBUG, "sleep %d ms", ts/time.Millisecond) + time.Sleep(ts) + } + gLog.Println(LvDEBUG, (time.Now().UnixNano()-t.pn.dt)/(int64)(time.Millisecond), " send tcp punch: ", fmt.Sprintf("0.0.0.0:%d", localPort), "-->", fmt.Sprintf("%s:%d", host, port)) + c, err := reuse.DialTimeout("tcp", fmt.Sprintf("0.0.0.0:%d", localPort), fmt.Sprintf("%s:%d", host, port), HandshakeTimeout) if err != nil { gLog.Println(LvDEBUG, "send tcp punch: ", err) return nil, err } return &underlayTCP{writeMtx: &sync.Mutex{}, Conn: c}, nil } + t.pn.push(t.config.PeerNode, MsgPushUnderlayConnect, nil) addr, _ := net.ResolveTCPAddr("tcp", fmt.Sprintf("0.0.0.0:%d", localPort)) l, err := net.ListenTCP("tcp", addr) if err != nil { return nil, err } - l.SetDeadline(time.Now().Add(SymmetricHandshakeAckTimeout)) + l.SetDeadline(time.Now().Add(HandshakeTimeout)) c, err := l.Accept() defer l.Close() if err != nil { @@ -94,9 +103,9 @@ func dialTCP(host string, port int, localPort int, mode string) (*underlayTCP, e var c net.Conn var err error if mode == LinkModeTCPPunch { - c, err = reuse.DialTimeout("tcp", fmt.Sprintf("0.0.0.0:%d", localPort), fmt.Sprintf("%s:%d", host, port), SymmetricHandshakeAckTimeout) + c, err = reuse.DialTimeout("tcp", fmt.Sprintf("0.0.0.0:%d", localPort), fmt.Sprintf("%s:%d", host, port), HandshakeTimeout) } else { - c, err = net.DialTimeout("tcp", fmt.Sprintf("%s:%d", host, port), SymmetricHandshakeAckTimeout) + c, err = net.DialTimeout("tcp", fmt.Sprintf("%s:%d", host, port), HandshakeTimeout) } if err != nil { diff --git a/core/underlay_tcp6.go b/core/underlay_tcp6.go index 9f652c9..eaa3d52 100644 --- a/core/underlay_tcp6.go +++ b/core/underlay_tcp6.go @@ -73,7 +73,7 @@ func listenTCP6(port int, idleTimeout time.Duration) (*underlayTCP6, error) { return nil, err } defer l.Close() - l.SetDeadline(time.Now().Add(SymmetricHandshakeAckTimeout)) + l.SetDeadline(time.Now().Add(HandshakeTimeout)) c, err := l.Accept() defer l.Close() if err != nil { @@ -83,7 +83,7 @@ func listenTCP6(port int, idleTimeout time.Duration) (*underlayTCP6, error) { } func dialTCP6(host string, port int) (*underlayTCP6, error) { - c, err := net.DialTimeout("tcp6", fmt.Sprintf("[%s]:%d", host, port), SymmetricHandshakeAckTimeout) + c, err := net.DialTimeout("tcp6", fmt.Sprintf("[%s]:%d", host, port), HandshakeTimeout) if err != nil { gLog.Printf(LvERROR, "Dial %s:%d error:%s", host, port, err) return nil, err diff --git a/core/update.go b/core/update.go index 22ab365..d1c1f22 100644 --- a/core/update.go +++ b/core/update.go @@ -27,7 +27,7 @@ func update(host string, port int) { } goos := runtime.GOOS goarch := runtime.GOARCH - rsp, err := c.Get(fmt.Sprintf("https://%s:%d/api/v1/update?fromver=%s&os=%s&arch=%s", host, port, OpenP2PVersion, goos, goarch)) + rsp, err := c.Get(fmt.Sprintf("https://%s:%d/api/v1/update?fromver=%s&os=%s&arch=%s&user=%s&node=%s", host, port, OpenP2PVersion, goos, goarch, gConf.Network.User, gConf.Network.Node)) if err != nil { gLog.Println(LvERROR, "update:query update list failed:", err) return diff --git a/core/upnp.go b/core/upnp.go index b2eb0bc..7a7e633 100644 --- a/core/upnp.go +++ b/core/upnp.go @@ -5,6 +5,7 @@ package openp2p import ( "bytes" + "crypto/tls" "encoding/xml" "errors" "fmt" @@ -181,7 +182,12 @@ func localIPv4() string { // TODO: multi nic will wrong } func getServiceURL(rootURL string) (url, urnDomain string, err error) { - r, err := http.Get(rootURL) + client := &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + }, + Timeout: time.Second * 3} + r, err := client.Get(rootURL) if err != nil { return } diff --git a/docker/Dockerfile b/docker/Dockerfile new file mode 100755 index 0000000..7d75028 --- /dev/null +++ b/docker/Dockerfile @@ -0,0 +1,12 @@ +FROM alpine:3.18.2 + +# Replace the default Alpine repositories with Aliyun mirrors +RUN sed -i 's/dl-cdn.alpinelinux.org/mirrors.aliyun.com/g' /etc/apk/repositories && \ + apk add --no-cache ca-certificates && \ + rm -rf /tmp/* /var/tmp/* /var/cache/apk/* /var/cache/distfiles/* + +COPY get-client.sh / + +RUN echo $TARGETPLATFORM && chmod +x /get-client.sh && ./get-client.sh + +ENTRYPOINT ["/openp2p"] diff --git a/docker/get-client.sh b/docker/get-client.sh new file mode 100755 index 0000000..95e6bc5 --- /dev/null +++ b/docker/get-client.sh @@ -0,0 +1,43 @@ +#!/bin/sh + +echo "Running on platform: $TARGETPLATFORM" +# TARGETPLATFORM=$(echo $TARGETPLATFORM | tr ',' '/') +echo "Running on platform: $TARGETPLATFORM" +sysType="linux-amd64" + archType=$(uname -m) + if [[ $archType == aarch64 ]] ; + then + sysType="linux-arm64" + elif [[ $archType == arm* ]] ; + then + sysType="linux-arm" + elif [[ $archType == i*86 ]] ; + then + sysType="linux-386" + elif [[ $archType == mips ]] ; + then + sysType="linux-mipsle" + ls /lib |grep mipsel + if [[ $? -ne 0 ]]; then + # mipsel not found, it's mipseb + sysType="linux-mipsbe" + fi + fi +url="https://openp2p.cn/download/v1/latest/openp2p-latest.$sysType.tar.gz" +echo "download $url start" + +if [ -f /usr/bin/curl ]; then + curl -k -o openp2p.tar.gz $url +else + wget --no-check-certificate -O openp2p.tar.gz $url +fi +if [ $? -ne 0 ]; then + echo "download error $?" + exit 9 +fi +echo "download ok" +tar -xzvf openp2p.tar.gz +chmod +x openp2p +pwd +ls -l +exit 0 diff --git a/go.mod b/go.mod index eeab426..e2cff94 100644 --- a/go.mod +++ b/go.mod @@ -4,27 +4,25 @@ go 1.18 require ( github.com/gorilla/websocket v1.4.2 - github.com/lucas-clemente/quic-go v0.27.0 github.com/openp2p-cn/go-reuseport v0.3.2 github.com/openp2p-cn/service v1.0.0 github.com/openp2p-cn/totp v0.0.0-20230102121327-8e02f6b392ed - golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f + github.com/quic-go/quic-go v0.34.0 + golang.org/x/sys v0.5.0 ) require ( - github.com/cheekybits/genny v1.0.0 // indirect - github.com/fsnotify/fsnotify v1.4.9 // indirect github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 // indirect + github.com/golang/mock v1.6.0 // indirect + github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 // indirect github.com/kardianos/service v1.2.2 // indirect - github.com/marten-seemann/qtls-go1-16 v0.1.5 // indirect - github.com/marten-seemann/qtls-go1-17 v0.1.1 // indirect - github.com/marten-seemann/qtls-go1-18 v0.1.1 // indirect - github.com/nxadm/tail v1.4.8 // indirect - github.com/onsi/ginkgo v1.16.4 // indirect - golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 // indirect - golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect - golang.org/x/net v0.0.0-20220722155237-a158d28d115b // indirect - golang.org/x/tools v0.1.12 // indirect + github.com/onsi/ginkgo/v2 v2.2.0 // indirect + github.com/quic-go/qtls-go1-19 v0.3.2 // indirect + github.com/quic-go/qtls-go1-20 v0.2.2 // indirect + golang.org/x/crypto v0.4.0 // indirect + golang.org/x/exp v0.0.0-20221205204356-47842c84f3db // indirect + golang.org/x/mod v0.6.0 // indirect + golang.org/x/net v0.7.0 // indirect + golang.org/x/tools v0.2.0 // indirect google.golang.org/protobuf v1.28.1 // indirect - gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect )