diff --git a/core/bandwidthLimit.go b/core/bandwidthLimit.go deleted file mode 100644 index b23be6f..0000000 --- a/core/bandwidthLimit.go +++ /dev/null @@ -1,45 +0,0 @@ -package openp2p - -import ( - "sync" - "time" -) - -// BandwidthLimiter ... -type BandwidthLimiter struct { - ts time.Time - bw int // mbps - freeBytes int // bytes - maxFreeBytes int // bytes - mtx sync.Mutex -} - -// mbps -func newBandwidthLimiter(bw int) *BandwidthLimiter { - return &BandwidthLimiter{ - bw: bw, - ts: time.Now(), - maxFreeBytes: bw * 1024 * 1024 / 8, - freeBytes: bw * 1024 * 1024 / 8, - } -} - -// Add ... -func (bl *BandwidthLimiter) Add(bytes int) { - if bl.bw <= 0 { - return - } - bl.mtx.Lock() - defer bl.mtx.Unlock() - // calc free flow 1000*1000/1024/1024=0.954; 1024*1024/1000/1000=1.048 - bl.freeBytes += int(time.Since(bl.ts) * time.Duration(bl.bw) / 8 / 954) - if bl.freeBytes > bl.maxFreeBytes { - bl.freeBytes = bl.maxFreeBytes - } - bl.freeBytes -= bytes - bl.ts = time.Now() - if bl.freeBytes < 0 { - // sleep for the overflow - time.Sleep(time.Millisecond * time.Duration(-bl.freeBytes/(bl.bw*1048/8))) - } -} diff --git a/core/errorcode.go b/core/errorcode.go index d47f486..60267fb 100644 --- a/core/errorcode.go +++ b/core/errorcode.go @@ -13,10 +13,16 @@ var ( ErrorNewUser = errors.New("new user") ErrorLogin = errors.New("user or password not correct") ErrNodeTooShort = errors.New("node name too short, it must >=8 charaters") + ErrReadDB = errors.New("read db error") + ErrNoUpdate = errors.New("there are currently no updates available") ErrPeerOffline = errors.New("peer offline") ErrNetwork = errors.New("network error") ErrMsgFormat = errors.New("message format wrong") ErrVersionNotCompatible = errors.New("version not compatible") ErrOverlayConnDisconnect = errors.New("overlay connection is disconnected") ErrConnectRelayNode = errors.New("connect relay node error") + ErrConnectPublicV4 = errors.New("connect public ipv4 error") + ErrMsgChannelNotFound = errors.New("message channel not found") + ErrRelayTunnelNotFound = errors.New("relay tunnel not found") + ErrSymmetricLimit = errors.New("symmetric limit") ) diff --git a/core/handlepush.go b/core/handlepush.go index 4644b2b..d65b53b 100644 --- a/core/handlepush.go +++ b/core/handlepush.go @@ -21,7 +21,7 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error { } gLog.Printf(LvDEBUG, "handle push msg type:%d, push header:%+v", subType, pushHead) switch subType { - case MsgPushConnectReq: // TODO: handle a msg move to a new function + case MsgPushConnectReq: err = handleConnectReq(pn, subType, msg) case MsgPushRsp: rsp := PushRsp{} @@ -86,7 +86,6 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error { } gConf.setNode(req.NewName) gConf.setShareBandwidth(req.Bandwidth) - // TODO: hot reload os.Exit(0) case MsgPushSwitchApp: gLog.Println(LvINFO, "MsgPushSwitchApp") @@ -112,10 +111,12 @@ func handlePush(pn *P2PNetwork, subType uint16, msg []byte) error { gLog.Println(LvINFO, "retry peerNode ", req.Node) gConf.retryApp(req.Node) default: - pn.msgMapMtx.Lock() - ch := pn.msgMap[pushHead.From] - pn.msgMapMtx.Unlock() - ch <- pushMsg{data: msg, ts: time.Now()} + i, ok := pn.msgMap.Load(pushHead.From) + if !ok { + return ErrMsgChannelNotFound + } + ch := i.(chan msgCtx) + ch <- msgCtx{data: msg, ts: time.Now()} } return err } @@ -145,9 +146,6 @@ func handleEditApp(pn *P2PNetwork, subType uint16, msg []byte) (err error) { gConf.add(newConf, false) pn.DeleteApp(oldConf) // DeleteApp may cost some times, execute at the end return nil - // autoReconnect will auto AddApp - // pn.AddApp(config) - // TODO: report result } func handleConnectReq(pn *P2PNetwork, subType uint16, msg []byte) (err error) { diff --git a/core/holepunch.go b/core/holepunch.go index e7e4661..4499734 100644 --- a/core/holepunch.go +++ b/core/holepunch.go @@ -24,13 +24,8 @@ func handshakeC2C(t *P2PTunnel) (err error) { } ra, head, _, _, err := UDPRead(conn, HandshakeTimeout) if err != nil { - time.Sleep(time.Millisecond * 200) - gLog.Println(LvDEBUG, err, ", return this error when ip was not reachable, retry read") - ra, head, _, _, err = UDPRead(conn, HandshakeTimeout) - if err != nil { - gLog.Println(LvDEBUG, "handshakeC2C read MsgPunchHandshake error:", err) - return err - } + gLog.Println(LvDEBUG, "handshakeC2C read MsgPunchHandshake error:", err) + return err } t.ra, _ = net.ResolveUDPAddr("udp", ra.String()) if head.MainType == MsgP2P && head.SubType == MsgPunchHandshake { @@ -57,6 +52,7 @@ func handshakeC2C(t *P2PTunnel) (err error) { func handshakeC2S(t *P2PTunnel) error { gLog.Printf(LvDEBUG, "handshakeC2S start") defer gLog.Printf(LvDEBUG, "handshakeC2S end") + startTime := time.Now() r := rand.New(rand.NewSource(time.Now().UnixNano())) randPorts := r.Perm(65532) conn, err := net.ListenUDP("udp", t.la) @@ -68,7 +64,6 @@ func handshakeC2S(t *P2PTunnel) error { go func() error { gLog.Printf(LvDEBUG, "send symmetric handshake to %s from %d:%d start", t.config.peerIP, t.coneLocalPort, t.coneNatPort) for i := 0; i < SymmetricHandshakeNum; i++ { - // TODO: auto calc cost time // time.Sleep(SymmetricHandshakeInterval) dst, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", t.config.peerIP, randPorts[i]+2)) if err != nil { @@ -124,19 +119,19 @@ func handshakeC2S(t *P2PTunnel) error { } else { gLog.Println(LvDEBUG, "handshakeS2C read msg but not MsgPunchHandshakeAck") } - gLog.Printf(LvINFO, "handshakeC2S ok") + gLog.Printf(LvINFO, "handshakeC2S ok. cost %d ms", time.Since(startTime)/time.Millisecond) return nil } func handshakeS2C(t *P2PTunnel) error { gLog.Printf(LvDEBUG, "handshakeS2C start") defer gLog.Printf(LvDEBUG, "handshakeS2C end") + startTime := time.Now() gotCh := make(chan *net.UDPAddr, 5) // sequencely udp send handshake, do not parallel send gLog.Printf(LvDEBUG, "send symmetric handshake to %s:%d start", t.config.peerIP, t.config.peerConeNatPort) gotIt := false for i := 0; i < SymmetricHandshakeNum; i++ { - // TODO: auto calc cost time // time.Sleep(SymmetricHandshakeInterval) go func(t *P2PTunnel) error { conn, err := net.ListenUDP("udp", nil) // TODO: system allocated port really random? @@ -197,7 +192,7 @@ func handshakeS2C(t *P2PTunnel) error { case la := <-gotCh: t.la = la gLog.Println(LvDEBUG, "symmetric handshake ok", la) - gLog.Printf(LvINFO, "handshakeS2C ok") + gLog.Printf(LvINFO, "handshakeS2C ok. cost %dms", time.Since(startTime)/time.Millisecond) } return nil } diff --git a/core/iptree.go b/core/iptree.go index 72b1f46..553efb3 100644 --- a/core/iptree.go +++ b/core/iptree.go @@ -76,6 +76,13 @@ func (iptree *IPTree) Contains(ipStr string) bool { return iptree.ContainsInt(ip) } +func IsLocalhost(ipStr string) bool { + if ipStr == "localhost" || ipStr == "127.0.0.1" || ipStr == "::1" { + return true + } + return false +} + func (iptree *IPTree) ContainsInt(ip uint32) bool { iptree.treeMtx.RLock() defer iptree.treeMtx.RUnlock() diff --git a/core/iptree_test.go b/core/iptree_test.go index 2391d49..25cef2c 100644 --- a/core/iptree_test.go +++ b/core/iptree_test.go @@ -5,6 +5,7 @@ import ( "encoding/binary" "net" "testing" + "time" ) func wrapTestContains(t *testing.T, iptree *IPTree, ip string, result bool) { @@ -128,6 +129,7 @@ func BenchmarkBuildipTree20k(t *testing.B) { t.Logf("clear. ipTree size:%d\n", iptree.Size()) } func BenchmarkQuery(t *testing.B) { + ts := time.Now() iptree := NewIPTree("") iptree.Clear() iptree.Add("10.1.5.50", "10.1.5.100") @@ -145,7 +147,7 @@ func BenchmarkQuery(t *testing.B) { binary.Read(bytes.NewBuffer(net.ParseIP("10.1.1.1").To4()), binary.BigEndian, &minIP) // insert 10k block ip single - nodeNum := uint32(10000 * 100) + nodeNum := uint32(10000 * 1000) gap := uint32(10) for i := minIP; i < minIP+nodeNum*gap; i += gap { iptree.AddIntIP(i, i) @@ -156,8 +158,9 @@ func BenchmarkQuery(t *testing.B) { for i := minIP; i < minIP+nodeNum*gap; i += gap { iptree.AddIntIP(i, i+5) } - t.Logf("ipTree size:%d\n", iptree.Size()) - t.ResetTimer() + t.Logf("ipTree size:%d cost:%dms\n", iptree.Size(), time.Since(ts)/time.Millisecond) + ts = time.Now() + // t.ResetTimer() queryNum := 100 * 10000 for i := 0; i < queryNum; i++ { iptree.ContainsInt(minIP + uint32(i)) @@ -166,6 +169,6 @@ func BenchmarkQuery(t *testing.B) { wrapBenchmarkContains(t, iptree, "10.1.5.200", false) wrapBenchmarkContains(t, iptree, "200.1.1.1", false) } - t.Logf("query list:%d\n", queryNum*4) + t.Logf("query num:%d cost:%dms\n", queryNum*4, time.Since(ts)/time.Millisecond) } diff --git a/core/overlay.go b/core/overlay.go index 77089e1..7665e52 100644 --- a/core/overlay.go +++ b/core/overlay.go @@ -67,13 +67,13 @@ func (oConn *overlayConn) run() { writeBytes := append(tunnelHead.Bytes(), payload...) if oConn.rtid == 0 { oConn.tunnel.conn.WriteBytes(MsgP2P, MsgOverlayData, writeBytes) - gLog.Printf(LvDEBUG, "write overlay data to %d:%d bodylen=%d", oConn.rtid, oConn.id, len(writeBytes)) + gLog.Printf(LvDEBUG, "write overlay data to tid:%d,oid:%d bodylen=%d", oConn.tunnel.id, oConn.id, len(writeBytes)) } else { // write raley data all := append(relayHead.Bytes(), encodeHeader(MsgP2P, MsgOverlayData, uint32(len(writeBytes)))...) all = append(all, writeBytes...) oConn.tunnel.conn.WriteBytes(MsgP2P, MsgRelayData, all) - gLog.Printf(LvDEBUG, "write relay data to %d:%d bodylen=%d", oConn.rtid, oConn.id, len(writeBytes)) + gLog.Printf(LvDEBUG, "write relay data to tid:%d,rtid:%d,oid:%d bodylen=%d", oConn.tunnel.id, oConn.rtid, oConn.id, len(writeBytes)) } } if oConn.connTCP != nil { diff --git a/core/p2papp.go b/core/p2papp.go index 4a6b7ee..0a72e65 100644 --- a/core/p2papp.go +++ b/core/p2papp.go @@ -52,7 +52,11 @@ func (app *p2pApp) listenTCP() error { gLog.Printf(LvDEBUG, "tcp accept on port %d start", app.config.SrcPort) defer gLog.Printf(LvDEBUG, "tcp accept on port %d end", app.config.SrcPort) var err error - app.listener, err = net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", app.config.SrcPort)) // support tcp4 and tcp6 + listenAddr := "" + if IsLocalhost(app.config.Whitelist) { // not expose port + listenAddr = "127.0.0.1" + } + app.listener, err = net.Listen("tcp", fmt.Sprintf("%s:%d", listenAddr, app.config.SrcPort)) if err != nil { gLog.Printf(LvERROR, "listen error:%s", err) return err @@ -67,8 +71,8 @@ func (app *p2pApp) listenTCP() error { } // check white list if app.config.Whitelist != "" { - remoteIP := strings.Split(conn.RemoteAddr().String(), ":")[0] - if !app.iptree.Contains(remoteIP) { + remoteIP := conn.RemoteAddr().(*net.TCPAddr).IP.String() + if !app.iptree.Contains(remoteIP) && !IsLocalhost(remoteIP) { conn.Close() gLog.Printf(LvERROR, "%s not in whitelist, access denied", remoteIP) continue @@ -252,8 +256,8 @@ func (app *p2pApp) close() { func (app *p2pApp) relayHeartbeatLoop() { app.wg.Add(1) defer app.wg.Done() - gLog.Printf(LvDEBUG, "relayHeartbeat to %d start", app.rtid) - defer gLog.Printf(LvDEBUG, "relayHeartbeat to %d end", app.rtid) + gLog.Printf(LvDEBUG, "relayHeartbeat to rtid:%d start", app.rtid) + defer gLog.Printf(LvDEBUG, "relayHeartbeat to rtid%d end", app.rtid) relayHead := new(bytes.Buffer) binary.Write(relayHead, binary.LittleEndian, app.rtid) req := RelayHeartbeat{RelayTunnelID: app.tunnel.id, @@ -261,7 +265,12 @@ func (app *p2pApp) relayHeartbeatLoop() { msg, _ := newMessage(MsgP2P, MsgRelayHeartbeat, &req) msgWithHead := append(relayHead.Bytes(), msg...) for app.tunnel.isRuning() && app.running { - app.tunnel.conn.WriteBytes(MsgP2P, MsgRelayData, msgWithHead) + err := app.tunnel.conn.WriteBytes(MsgP2P, MsgRelayData, msgWithHead) + if err != nil { + gLog.Printf(LvERROR, "%d app write relay tunnel heartbeat error %s", app.rtid, err) + return + } + gLog.Printf(LvDEBUG, "%d app write relay tunnel heartbeat ok", app.rtid) time.Sleep(TunnelHeartbeatTime) } } diff --git a/core/p2pnetwork.go b/core/p2pnetwork.go index e13ab2b..58c32dc 100644 --- a/core/p2pnetwork.go +++ b/core/p2pnetwork.go @@ -20,8 +20,10 @@ import ( ) var ( - instance *P2PNetwork - once sync.Once + v4l *v4Listener + instance *P2PNetwork + once sync.Once + onceV4Listener sync.Once ) const ( @@ -32,7 +34,7 @@ const ( // golang not support float64 const var ( ma10 float64 = 1.0 / 10 - ma20 float64 = 1.0 / 20 + ma5 float64 = 1.0 / 5 ) type P2PNetwork struct { @@ -44,26 +46,23 @@ type P2PNetwork struct { writeMtx sync.Mutex hbTime time.Time // for sync server time - t1 int64 // nanoSeconds - dt int64 // client faster then server dt nanoSeconds - ddtma int64 - ddt int64 // differential of dt - // msgMap sync.Map - msgMap map[uint64]chan pushMsg //key: nodeID - msgMapMtx sync.Mutex + t1 int64 // nanoSeconds + dt int64 // client faster then server dt nanoSeconds + ddtma int64 + ddt int64 // differential of dt + msgMap sync.Map //key: nodeID + // msgMap map[uint64]chan pushMsg //key: nodeID config NetworkConfig allTunnels sync.Map apps sync.Map //key: protocol+srcport; value: p2pApp - limiter *BandwidthLimiter + limiter *SpeedLimiter } -type pushMsg struct { +type msgCtx struct { data []byte ts time.Time } -const msgExpiredTime = time.Minute - func P2PNetworkInstance(config *NetworkConfig) *P2PNetwork { if instance == nil { once.Do(func() { @@ -71,12 +70,11 @@ func P2PNetworkInstance(config *NetworkConfig) *P2PNetwork { restartCh: make(chan bool, 2), online: false, running: true, - msgMap: make(map[uint64]chan pushMsg), - limiter: newBandwidthLimiter(config.ShareBandwidth), + limiter: newSpeedLimiter(config.ShareBandwidth*1024*1024/8, 1), dt: 0, ddt: 0, } - instance.msgMap[0] = make(chan pushMsg) // for gateway + instance.msgMap.Store(uint64(0), make(chan msgCtx)) // for gateway if config != nil { instance.config = *config } @@ -183,7 +181,6 @@ func (pn *P2PNetwork) autorunApp() { func (pn *P2PNetwork) addRelayTunnel(config AppConfig) (*P2PTunnel, uint64, string, error) { gLog.Printf(LvINFO, "addRelayTunnel to %s start", config.PeerNode) defer gLog.Printf(LvINFO, "addRelayTunnel to %s end", config.PeerNode) - // request a relay node or specify manually(TODO) relayConfig := config relayMode := "private" if config.RelayNode == "" { @@ -265,8 +262,6 @@ func (pn *P2PNetwork) AddApp(config AppConfig) error { peerNatType = t.config.peerNatType peerIP = t.config.peerIP } - // TODO: if tcp failed, should try udp punching, nattype should refactor also, when NATNONE and failed we don't know the peerNatType - if err != nil && err == ErrorHandshake { gLog.Println(LvERROR, "direct connect failed, try to relay") t, rtid, relayMode, err = pn.addRelayTunnel(config) @@ -303,7 +298,7 @@ func (pn *P2PNetwork) AddApp(config AppConfig) error { AppID: appID, AppKey: appKey, } - gLog.Printf(LvINFO, "sync appkey to %s", config.PeerNode) + gLog.Printf(LvDEBUG, "sync appkey to %s", config.PeerNode) pn.push(config.PeerNode, MsgPushAPPKey, &req) } app := p2pApp{ @@ -317,7 +312,7 @@ func (pn *P2PNetwork) AddApp(config AppConfig) error { relayMode: relayMode, hbTime: time.Now()} pn.apps.Store(config.ID(), &app) - gLog.Printf(LvINFO, "%s use tunnel %d", app.config.AppName, app.tunnel.id) + gLog.Printf(LvDEBUG, "%s use tunnel %d", app.config.AppName, app.tunnel.id) if err == nil { go app.listen() } @@ -359,18 +354,18 @@ func (pn *P2PNetwork) findTunnel(config *AppConfig) (t *P2PTunnel) { } func (pn *P2PNetwork) addDirectTunnel(config AppConfig, tid uint64) (t *P2PTunnel, err error) { - gLog.Printf(LvDEBUG, "addDirectTunnel %s%d to %s:%s:%d start", config.Protocol, config.SrcPort, config.PeerNode, config.DstHost, config.DstPort) - defer gLog.Printf(LvDEBUG, "addDirectTunnel %s%d to %s:%s:%d end", config.Protocol, config.SrcPort, config.PeerNode, config.DstHost, config.DstPort) + gLog.Printf(LvDEBUG, "addDirectTunnel %s%d to %s:%s:%d tid:%d start", config.Protocol, config.SrcPort, config.PeerNode, config.DstHost, config.DstPort, tid) + defer gLog.Printf(LvDEBUG, "addDirectTunnel %s%d to %s:%s:%d tid:%d end", config.Protocol, config.SrcPort, config.PeerNode, config.DstHost, config.DstPort, tid) isClient := false // client side tid=0, assign random uint64 if tid == 0 { tid = rand.Uint64() isClient = true } + if _, ok := pn.msgMap.Load(nodeNameToID(config.PeerNode)); !ok { + pn.msgMap.Store(nodeNameToID(config.PeerNode), make(chan msgCtx, 50)) + } - pn.msgMapMtx.Lock() - pn.msgMap[nodeNameToID(config.PeerNode)] = make(chan pushMsg, 50) - pn.msgMapMtx.Unlock() // server side if !isClient { t, err = pn.newTunnel(config, tid, isClient) @@ -407,13 +402,15 @@ func (pn *P2PNetwork) addDirectTunnel(config AppConfig, tid uint64) (t *P2PTunne } if t, err = pn.newTunnel(config, tid, isClient); err == nil { return t, nil + } else if config.hasIPv4 == 1 || config.hasUPNPorNATPMP == 1 { // peer has ipv4 no punching + return nil, ErrConnectPublicV4 } } // TODO: try UDP4 // try TCPPunch - for i := 0; i < Cone2ConePunchMaxRetry; i++ { // when both 2 nats has restrict firewall, simultaneous punching needs to be very precise, it takes a few tries - if config.peerNatType == NATCone && pn.config.natType == NATCone { // TODO: support c2s + for i := 0; i < Cone2ConeTCPPunchMaxRetry; i++ { // when both 2 nats has restrict firewall, simultaneous punching needs to be very precise, it takes a few tries + if config.peerNatType == NATCone && pn.config.natType == NATCone { gLog.Println(LvINFO, "try TCP4 Punch") config.linkMode = LinkModeTCPPunch config.isUnderlayServer = 0 @@ -425,7 +422,7 @@ func (pn *P2PNetwork) addDirectTunnel(config AppConfig, tid uint64) (t *P2PTunne } // try UDPPunch - for i := 0; i < Cone2ConePunchMaxRetry; i++ { // when both 2 nats has restrict firewall, simultaneous punching needs to be very precise, it takes a few tries + for i := 0; i < Cone2ConeUDPPunchMaxRetry; i++ { // when both 2 nats has restrict firewall, simultaneous punching needs to be very precise, it takes a few tries if config.peerNatType == NATCone || pn.config.natType == NATCone { gLog.Println(LvINFO, "try UDP4 Punch") config.linkMode = LinkModeUDPPunch @@ -470,7 +467,7 @@ func (pn *P2PNetwork) newTunnel(config AppConfig, tid uint64, isClient bool) (t return } func (pn *P2PNetwork) init() error { - gLog.Println(LvINFO, "init start") + gLog.Println(LvINFO, "P2PNetwork start") pn.wgReconnect.Add(1) defer pn.wgReconnect.Done() var err error @@ -495,7 +492,13 @@ func (pn *P2PNetwork) init() error { gLog.Println(LvDEBUG, "detect NAT type error:", err) break } - gLog.Println(LvDEBUG, "detect NAT type:", pn.config.natType, " publicIP:", pn.config.publicIP) + if pn.config.hasIPv4 == 1 || pn.config.hasUPNPorNATPMP == 1 { + onceV4Listener.Do(func() { + v4l = &v4Listener{port: gConf.Network.TCPPort} + go v4l.start() + }) + } + gLog.Printf(LvINFO, "hasIPv4:%d, UPNP:%d, NAT type:%d, publicIP:%s", pn.config.hasIPv4, pn.config.hasUPNPorNATPMP, pn.config.natType, pn.config.publicIP) gatewayURL := fmt.Sprintf("%s:%d", pn.config.ServerHost, pn.config.ServerPort) uri := "/api/v1/login" caCertPool, err := x509.SystemCertPool() @@ -510,6 +513,7 @@ func (pn *P2PNetwork) init() error { RootCAs: caCertPool, InsecureSkipVerify: false} // let's encrypt root cert "DST Root CA X3" expired at 2021/09/29. many old system(windows server 2008 etc) will not trust our cert websocket.DefaultDialer.TLSClientConfig = &config + websocket.DefaultDialer.HandshakeTimeout = ClientAPITimeout u := url.URL{Scheme: "wss", Host: gatewayURL, Path: uri} q := u.Query() q.Add("node", pn.config.Node) @@ -521,6 +525,7 @@ func (pn *P2PNetwork) init() error { var ws *websocket.Conn ws, _, err = websocket.DefaultDialer.Dial(u.String(), nil) if err != nil { + gLog.Println(LvERROR, "Dial error:", err) break } pn.online = true @@ -606,28 +611,27 @@ func (pn *P2PNetwork) handleMessage(t int, msg []byte) { dt := pn.t1 + rtt/2 - t2 if pn.dt != 0 { ddt := dt - pn.dt - if pn.ddtma > 0 && (ddt > (pn.ddtma+pn.ddtma/3) || ddt < (pn.ddtma-pn.ddtma/3)) { - newdt := pn.dt + pn.ddtma - gLog.Printf(LvDEBUG, "server time auto adjust dt=%.2fms to %.2fms", float64(dt)/float64(time.Millisecond), float64(newdt)/float64(time.Millisecond)) - dt = newdt - } pn.ddt = ddt if pn.ddtma == 0 { pn.ddtma = pn.ddt } else { pn.ddtma = int64(float64(pn.ddtma)*(1-ma10) + float64(pn.ddt)*ma10) // avoid int64 overflow + newdt := pn.dt + pn.ddtma + // gLog.Printf(LvDEBUG, "server time auto adjust dt=%.2fms to %.2fms", float64(dt)/float64(time.Millisecond), float64(newdt)/float64(time.Millisecond)) + dt = newdt } } pn.dt = dt - - gLog.Printf(LvDEBUG, "server time dt=%dms ddt=%dns ddtma=%dns rtt=%dms ", pn.dt/int64(time.Millisecond), pn.ddt, pn.ddtma, rtt/int64(time.Millisecond)) + gLog.Printf(LvDEBUG, "synctime dt=%dms ddt=%dns ddtma=%dns rtt=%dms ", pn.dt/int64(time.Millisecond), pn.ddt, pn.ddtma, rtt/int64(time.Millisecond)) case MsgPush: handlePush(pn, head.SubType, msg) default: - pn.msgMapMtx.Lock() - ch := pn.msgMap[0] - pn.msgMapMtx.Unlock() - ch <- pushMsg{data: msg, ts: time.Now()} + i, ok := pn.msgMap.Load(uint64(0)) + if ok { + ch := i.(chan msgCtx) + ch <- msgCtx{data: msg, ts: time.Now()} + } + return } } @@ -668,17 +672,20 @@ func (pn *P2PNetwork) write(mainType uint16, subType uint16, packet interface{}) } func (pn *P2PNetwork) relay(to uint64, body []byte) error { - gLog.Printf(LvDEBUG, "relay data to %d", to) i, ok := pn.allTunnels.Load(to) if !ok { - return nil + gLog.Printf(LvERROR, "relay to %d len=%d error:%s", to, len(body), ErrRelayTunnelNotFound) + return ErrRelayTunnelNotFound } tunnel := i.(*P2PTunnel) if tunnel.config.shareBandwidth > 0 { - pn.limiter.Add(len(body)) + pn.limiter.Add(len(body), true) } - tunnel.conn.WriteBuffer(body) - return nil + var err error + if err = tunnel.conn.WriteBuffer(body); err != nil { + gLog.Printf(LvERROR, "relay to %d len=%d error:%s", to, len(body), err) + } + return err } func (pn *P2PNetwork) push(to string, subType uint16, packet interface{}) error { @@ -717,28 +724,31 @@ func (pn *P2PNetwork) read(node string, mainType uint16, subType uint16, timeout } else { nodeID = nodeNameToID(node) } - pn.msgMapMtx.Lock() - ch := pn.msgMap[nodeID] - pn.msgMapMtx.Unlock() + i, ok := pn.msgMap.Load(nodeID) + if !ok { + return + } + ch := i.(chan msgCtx) for { select { case <-time.After(timeout): gLog.Printf(LvERROR, "wait msg%d:%d timeout", mainType, subType) return case msg := <-ch: - if msg.ts.Before(time.Now().Add(-msgExpiredTime)) { - gLog.Printf(LvDEBUG, "msg expired error %d:%d", head.MainType, head.SubType) - continue - } head = &openP2PHeader{} err := binary.Read(bytes.NewReader(msg.data[:openP2PHeaderSize]), binary.LittleEndian, head) if err != nil { gLog.Println(LvERROR, "read msg error:", err) break } + if time.Since(msg.ts) > ReadMsgTimeout { + gLog.Printf(LvDEBUG, "msg expired error %d:%d", head.MainType, head.SubType) + continue + } if head.MainType != mainType || head.SubType != subType { gLog.Printf(LvDEBUG, "read msg type error %d:%d, requeue it", head.MainType, head.SubType) ch <- msg + time.Sleep(time.Second) continue } if mainType == MsgPush { diff --git a/core/p2ptunnel.go b/core/p2ptunnel.go index 93a7b93..f5aae15 100644 --- a/core/p2ptunnel.go +++ b/core/p2ptunnel.go @@ -18,7 +18,6 @@ type P2PTunnel struct { conn underlay hbTime time.Time hbMtx sync.Mutex - hbTimeRelay time.Time config AppConfig la *net.UDPAddr // local hole address ra *net.UDPAddr // remote hole address @@ -35,11 +34,7 @@ type P2PTunnel struct { func (t *P2PTunnel) initPort() { t.running = true - t.hbMtx.Lock() - t.hbTime = time.Now() - t.hbMtx.Unlock() - t.hbTimeRelay = time.Now().Add(time.Second * 600) // TODO: test fake time - localPort := int(rand.Uint32()%15000 + 50000) // if the process has bug, will add many upnp port. use specify p2p port by param + localPort := int(rand.Uint32()%15000 + 50000) // if the process has bug, will add many upnp port. use specify p2p port by param if t.config.linkMode == LinkModeTCP6 || t.config.linkMode == LinkModeTCP4 { t.coneLocalPort = t.pn.config.TCPPort t.coneNatPort = t.pn.config.TCPPort // symmetric doesn't need coneNatPort @@ -83,7 +78,7 @@ func (t *P2PTunnel) connect() error { req.Token = t.pn.config.Token } t.pn.push(t.config.PeerNode, MsgPushConnectReq, req) - head, body := t.pn.read(t.config.PeerNode, MsgPush, MsgPushConnectRsp, HandshakeTimeout*3) + head, body := t.pn.read(t.config.PeerNode, MsgPush, MsgPushConnectRsp, UnderlayConnectTimeout*3) if head == nil { return errors.New("connect error") } @@ -184,13 +179,12 @@ func (t *P2PTunnel) handshake() error { if compareVersion(t.config.peerVersion, SyncServerTimeVersion) == LESS { gLog.Printf(LvDEBUG, "peer version %s less than %s", t.config.peerVersion, SyncServerTimeVersion) } else { - ts := time.Duration(int64(t.punchTs) + t.pn.dt + t.pn.ddt*int64(time.Since(t.pn.hbTime)+PunchTsDelay)/int64(NetworkHeartbeatTime) - time.Now().UnixNano()) + ts := time.Duration(int64(t.punchTs) + t.pn.dt + t.pn.ddtma*int64(time.Since(t.pn.hbTime)+PunchTsDelay)/int64(NetworkHeartbeatTime) - time.Now().UnixNano()) gLog.Printf(LvDEBUG, "sleep %d ms", ts/time.Millisecond) time.Sleep(ts) } gLog.Println(LvDEBUG, "handshake to ", t.config.PeerNode) var err error - // TODO: handle NATNone, nodes with public ip has no punching if t.pn.config.natType == NATCone && t.config.peerNatType == NATCone { err = handshakeC2C(t) } else if t.config.peerNatType == NATSymmetric && t.pn.config.natType == NATSymmetric { @@ -216,7 +210,7 @@ func (t *P2PTunnel) connectUnderlay() (err error) { case LinkModeTCP6: t.conn, err = t.connectUnderlayTCP6() case LinkModeTCP4: - t.conn, err = t.connectUnderlayTCP() // TODO: can not listen the same tcp port in pararell + t.conn, err = t.connectUnderlayTCP() case LinkModeTCPPunch: t.conn, err = t.connectUnderlayTCP() case LinkModeUDPPunch: @@ -238,30 +232,30 @@ func (t *P2PTunnel) connectUnderlay() (err error) { func (t *P2PTunnel) connectUnderlayQuic() (c underlay, err error) { gLog.Println(LvINFO, "connectUnderlayQuic start") defer gLog.Println(LvINFO, "connectUnderlayQuic end") - var qConn *underlayQUIC + var ul *underlayQUIC if t.config.isUnderlayServer == 1 { time.Sleep(time.Millisecond * 10) // punching udp port will need some times in some env - qConn, err = listenQuic(t.la.String(), TunnelIdleTimeout) + ul, err = listenQuic(t.la.String(), TunnelIdleTimeout) if err != nil { gLog.Println(LvINFO, "listen quic error:", err, ", retry...") } t.pn.push(t.config.PeerNode, MsgPushUnderlayConnect, nil) - err = qConn.Accept() + err = ul.Accept() if err != nil { - qConn.CloseListener() + ul.CloseListener() return nil, fmt.Errorf("accept quic error:%s", err) } - _, buff, err := qConn.ReadBuffer() + _, buff, err := ul.ReadBuffer() if err != nil { - qConn.listener.Close() + ul.listener.Close() return nil, fmt.Errorf("read start msg error:%s", err) } if buff != nil { gLog.Println(LvDEBUG, string(buff)) } - qConn.WriteBytes(MsgP2P, MsgTunnelHandshakeAck, []byte("OpenP2P,hello2")) + ul.WriteBytes(MsgP2P, MsgTunnelHandshakeAck, []byte("OpenP2P,hello2")) gLog.Println(LvDEBUG, "quic connection ok") - return qConn, nil + return ul, nil } //else @@ -273,17 +267,17 @@ func (t *P2PTunnel) connectUnderlayQuic() (c underlay, err error) { return nil, fmt.Errorf("quic listen error:%s", e) } } - t.pn.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, HandshakeTimeout*3) + t.pn.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, ReadMsgTimeout) gLog.Println(LvDEBUG, "quic dial to ", t.ra.String()) - qConn, e = dialQuic(conn, t.ra, TunnelIdleTimeout) + ul, e = dialQuic(conn, t.ra, TunnelIdleTimeout) if e != nil { return nil, fmt.Errorf("quic dial to %s error:%s", t.ra.String(), e) } handshakeBegin := time.Now() - qConn.WriteBytes(MsgP2P, MsgTunnelHandshake, []byte("OpenP2P,hello")) - _, buff, err := qConn.ReadBuffer() + ul.WriteBytes(MsgP2P, MsgTunnelHandshake, []byte("OpenP2P,hello")) + _, buff, err := ul.ReadBuffer() if e != nil { - qConn.listener.Close() + ul.listener.Close() return nil, fmt.Errorf("read MsgTunnelHandshake error:%s", err) } if buff != nil { @@ -293,101 +287,92 @@ func (t *P2PTunnel) connectUnderlayQuic() (c underlay, err error) { gLog.Println(LvINFO, "rtt=", time.Since(handshakeBegin)) gLog.Println(LvDEBUG, "quic connection ok") t.linkModeWeb = LinkModeUDPPunch - return qConn, nil + return ul, nil } // websocket func (t *P2PTunnel) connectUnderlayTCP() (c underlay, err error) { - gLog.Println(LvINFO, "connectUnderlayTCP start") - defer gLog.Println(LvINFO, "connectUnderlayTCP end") - var qConn *underlayTCP + gLog.Println(LvDEBUG, "connectUnderlayTCP start") + defer gLog.Println(LvDEBUG, "connectUnderlayTCP end") + var ul *underlayTCP if t.config.isUnderlayServer == 1 { - qConn, err = listenTCP(t.config.peerIP, t.config.peerConeNatPort, t.coneLocalPort, t.config.linkMode, t) + ul, err = listenTCP(t.config.peerIP, t.config.peerConeNatPort, t.coneLocalPort, t.config.linkMode, t) if err != nil { return nil, fmt.Errorf("listen TCP error:%s", err) } - - _, buff, err := qConn.ReadBuffer() - if err != nil { - return nil, fmt.Errorf("read start msg error:%s", err) - } - if buff != nil { - gLog.Println(LvDEBUG, string(buff)) - } - qConn.WriteBytes(MsgP2P, MsgTunnelHandshakeAck, []byte("OpenP2P,hello2")) gLog.Println(LvINFO, "TCP connection ok") - return qConn, nil + return ul, nil } // client side if t.config.linkMode == LinkModeTCP4 { - t.pn.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, HandshakeTimeout*3) + t.pn.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, ReadMsgTimeout) } else { //tcp punch should sleep for punch the same time if compareVersion(t.config.peerVersion, SyncServerTimeVersion) == LESS { gLog.Printf(LvDEBUG, "peer version %s less than %s", t.config.peerVersion, SyncServerTimeVersion) } else { - ts := time.Duration(int64(t.punchTs) + t.pn.dt + t.pn.ddt*int64(time.Since(t.pn.hbTime)+PunchTsDelay)/int64(NetworkHeartbeatTime) - time.Now().UnixNano()) + ts := time.Duration(int64(t.punchTs) + t.pn.dt + t.pn.ddtma*int64(time.Since(t.pn.hbTime)+PunchTsDelay)/int64(NetworkHeartbeatTime) - time.Now().UnixNano()) gLog.Printf(LvDEBUG, "sleep %d ms", ts/time.Millisecond) time.Sleep(ts) } } - - gLog.Println(LvDEBUG, (time.Now().UnixNano()-t.pn.dt)/(int64)(time.Millisecond), " send tcp punch: ", fmt.Sprintf("0.0.0.0:%d", t.coneLocalPort), "-->", fmt.Sprintf("%s:%d", t.config.peerIP, t.config.peerConeNatPort)) - qConn, err = dialTCP(t.config.peerIP, t.config.peerConeNatPort, t.coneLocalPort, t.config.linkMode) + ul, err = dialTCP(t.config.peerIP, t.config.peerConeNatPort, t.coneLocalPort, t.config.linkMode) if err != nil { return nil, fmt.Errorf("TCP dial to %s:%d error:%s", t.config.peerIP, t.config.peerConeNatPort, err) } handshakeBegin := time.Now() - qConn.WriteBytes(MsgP2P, MsgTunnelHandshake, []byte("OpenP2P,hello")) - _, buff, err := qConn.ReadBuffer() + tidBuff := new(bytes.Buffer) + binary.Write(tidBuff, binary.LittleEndian, t.id) + ul.WriteBytes(MsgP2P, MsgTunnelHandshake, tidBuff.Bytes()) // tunnelID + _, buff, err := ul.ReadBuffer() if err != nil { return nil, fmt.Errorf("read MsgTunnelHandshake error:%s", err) } if buff != nil { - gLog.Println(LvDEBUG, string(buff)) + gLog.Println(LvDEBUG, "hello ", string(buff)) } gLog.Println(LvINFO, "rtt=", time.Since(handshakeBegin)) gLog.Println(LvINFO, "TCP connection ok") t.linkModeWeb = LinkModeIPv4 - return qConn, nil + return ul, nil } func (t *P2PTunnel) connectUnderlayTCP6() (c underlay, err error) { gLog.Println(LvINFO, "connectUnderlayTCP6 start") defer gLog.Println(LvINFO, "connectUnderlayTCP6 end") - var qConn *underlayTCP6 + var ul *underlayTCP6 if t.config.isUnderlayServer == 1 { t.pn.push(t.config.PeerNode, MsgPushUnderlayConnect, nil) - qConn, err = listenTCP6(t.coneNatPort, HandshakeTimeout) + ul, err = listenTCP6(t.coneNatPort, UnderlayConnectTimeout) if err != nil { return nil, fmt.Errorf("listen TCP6 error:%s", err) } - _, buff, err := qConn.ReadBuffer() + _, buff, err := ul.ReadBuffer() if err != nil { - qConn.listener.Close() + ul.listener.Close() return nil, fmt.Errorf("read start msg error:%s", err) } if buff != nil { gLog.Println(LvDEBUG, string(buff)) } - qConn.WriteBytes(MsgP2P, MsgTunnelHandshakeAck, []byte("OpenP2P,hello2")) + ul.WriteBytes(MsgP2P, MsgTunnelHandshakeAck, []byte("OpenP2P,hello2")) gLog.Println(LvDEBUG, "TCP6 connection ok") - return qConn, nil + return ul, nil } //else - t.pn.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, HandshakeTimeout*3) + t.pn.read(t.config.PeerNode, MsgPush, MsgPushUnderlayConnect, ReadMsgTimeout) gLog.Println(LvDEBUG, "TCP6 dial to ", t.config.peerIPv6) - qConn, err = dialTCP6(t.config.peerIPv6, t.config.peerConeNatPort) + ul, err = dialTCP6(t.config.peerIPv6, t.config.peerConeNatPort) if err != nil { return nil, fmt.Errorf("TCP6 dial to %s:%d error:%s", t.config.peerIPv6, t.config.peerConeNatPort, err) } handshakeBegin := time.Now() - qConn.WriteBytes(MsgP2P, MsgTunnelHandshake, []byte("OpenP2P,hello")) - _, buff, err := qConn.ReadBuffer() + ul.WriteBytes(MsgP2P, MsgTunnelHandshake, []byte("OpenP2P,hello")) + _, buff, err := ul.ReadBuffer() if err != nil { - qConn.listener.Close() + ul.listener.Close() return nil, fmt.Errorf("read MsgTunnelHandshake error:%s", err) } if buff != nil { @@ -397,7 +382,7 @@ func (t *P2PTunnel) connectUnderlayTCP6() (c underlay, err error) { gLog.Println(LvINFO, "rtt=", time.Since(handshakeBegin)) gLog.Println(LvDEBUG, "TCP6 connection ok") t.linkModeWeb = LinkModeIPv6 - return qConn, nil + return ul, nil } func (t *P2PTunnel) readLoop() { @@ -413,6 +398,7 @@ func (t *P2PTunnel) readLoop() { break } if head.MainType != MsgP2P { + gLog.Printf(LvWARN, "%d head.MainType != MsgP2P", t.id) continue } switch head.SubType { @@ -427,6 +413,7 @@ func (t *P2PTunnel) readLoop() { gLog.Printf(LvDEBUG, "%d read tunnel heartbeat ack", t.id) case MsgOverlayData: if len(body) < overlayHeaderSize { + gLog.Printf(LvWARN, "%d len(body) < overlayHeaderSize", t.id) continue } overlayID := binary.LittleEndian.Uint64(body[:8]) @@ -451,19 +438,19 @@ func (t *P2PTunnel) readLoop() { gLog.Println(LvERROR, "overlay write error:", err) } case MsgRelayData: - gLog.Printf(LvDEBUG, "got relay data datalen=%d", head.DataLen) if len(body) < 8 { continue } tunnelID := binary.LittleEndian.Uint64(body[:8]) - t.pn.relay(tunnelID, body[8:]) + gLog.Printf(LvDEBUG, "relay data to %d, len=%d", tunnelID, head.DataLen-RelayHeaderSize) + t.pn.relay(tunnelID, body[RelayHeaderSize:]) case MsgRelayHeartbeat: req := RelayHeartbeat{} if err := json.Unmarshal(body, &req); err != nil { gLog.Printf(LvERROR, "wrong %v:%s", reflect.TypeOf(req), err) continue } - gLog.Printf(LvDEBUG, "got MsgRelayHeartbeat from %d:%d", req.RelayTunnelID, req.AppID) + gLog.Printf(LvDEBUG, "read MsgRelayHeartbeat from rtid:%d,appid:%d", req.RelayTunnelID, req.AppID) relayHead := new(bytes.Buffer) binary.Write(relayHead, binary.LittleEndian, req.RelayTunnelID) msg, _ := newMessage(MsgP2P, MsgRelayHeartbeatAck, &req) @@ -476,7 +463,7 @@ func (t *P2PTunnel) readLoop() { gLog.Printf(LvERROR, "wrong RelayHeartbeat:%s", err) continue } - gLog.Printf(LvDEBUG, "got MsgRelayHeartbeatAck to %d", req.AppID) + gLog.Printf(LvDEBUG, "read MsgRelayHeartbeatAck to appid:%d", req.AppID) t.pn.updateAppHeartbeat(req.AppID) case MsgOverlayConnectReq: req := OverlayConnectReq{} @@ -504,7 +491,7 @@ func (t *P2PTunnel) readLoop() { if req.Protocol == "udp" { oConn.connUDP, err = net.DialUDP("udp", nil, &net.UDPAddr{IP: net.ParseIP(req.DstIP), Port: req.DstPort}) } else { - oConn.connTCP, err = net.DialTimeout("tcp", fmt.Sprintf("%s:%d", req.DstIP, req.DstPort), HandshakeTimeout) + oConn.connTCP, err = net.DialTimeout("tcp", fmt.Sprintf("%s:%d", req.DstIP, req.DstPort), ReadMsgTimeout) } if err != nil { @@ -544,6 +531,9 @@ func (t *P2PTunnel) readLoop() { } func (t *P2PTunnel) heartbeatLoop() { + t.hbMtx.Lock() + t.hbTime = time.Now() // init + t.hbMtx.Unlock() tc := time.NewTicker(TunnelHeartbeatTime) defer tc.Stop() gLog.Printf(LvDEBUG, "%d tunnel heartbeatLoop start", t.id) diff --git a/core/protocol.go b/core/protocol.go index a9b8c8b..15410a7 100644 --- a/core/protocol.go +++ b/core/protocol.go @@ -10,11 +10,12 @@ import ( "time" ) -const OpenP2PVersion = "3.10.9" +const OpenP2PVersion = "3.12.0" const ProductName string = "openp2p" const LeastSupportVersion = "3.0.0" const SyncServerTimeVersion = "3.9.0" const SymmetricSimultaneouslySendVersion = "3.10.7" +const PublicIPVersion = "3.11.2" const ( IfconfigPort1 = 27180 @@ -39,6 +40,8 @@ type PushHeader struct { var PushHeaderSize = binary.Size(PushHeader{}) +const RelayHeaderSize = 8 + type overlayHeader struct { id uint64 } @@ -134,27 +137,29 @@ const ( ) const ( - ReadBuffLen = 4096 // for UDP maybe not enough - NetworkHeartbeatTime = time.Second * 30 // TODO: server no response hb, save flow + ReadBuffLen = 4096 // for UDP maybe not enough + NetworkHeartbeatTime = time.Second * 30 TunnelHeartbeatTime = time.Second * 10 // some nat udp session expired time less than 15s. change to 10s TunnelIdleTimeout = time.Minute SymmetricHandshakeNum = 800 // 0.992379 // SymmetricHandshakeNum = 1000 // 0.999510 SymmetricHandshakeInterval = time.Millisecond - HandshakeTimeout = time.Second * 10 - PeerAddRelayTimeount = time.Second * 30 // peer need times + HandshakeTimeout = time.Second * 7 + PunchTsDelay = time.Second * 3 + PeerAddRelayTimeount = time.Second * 30 // peer need times. S2C\TCP\TCP Punch\UDP Punch CheckActiveTimeout = time.Second * 5 + ReadMsgTimeout = time.Second * 5 PaddingSize = 16 AESKeySize = 16 MaxRetry = 10 - Cone2ConePunchMaxRetry = 1 + Cone2ConeTCPPunchMaxRetry = 1 + Cone2ConeUDPPunchMaxRetry = 1 PublicIPEchoTimeout = time.Second * 1 NatTestTimeout = time.Second * 5 UDPReadTimeout = time.Second * 5 ClientAPITimeout = time.Second * 10 UnderlayConnectTimeout = time.Second * 10 MaxDirectTry = 3 - PunchTsDelay = time.Second * 3 ) // NATNone has public ip diff --git a/core/speedlimiter.go b/core/speedlimiter.go new file mode 100644 index 0000000..8abdabd --- /dev/null +++ b/core/speedlimiter.go @@ -0,0 +1,51 @@ +package openp2p + +import ( + "fmt" + "sync" + "time" +) + +// SpeedLimiter ... +type SpeedLimiter struct { + lastUpdate time.Time + speed int // per second + precision int // seconds + freeCap int + maxFreeCap int + mtx sync.Mutex +} + +func newSpeedLimiter(speed int, precision int) *SpeedLimiter { + return &SpeedLimiter{ + speed: speed, + precision: precision, + lastUpdate: time.Now(), + maxFreeCap: speed * precision, + freeCap: speed * precision, + } +} + +// Add ... +func (sl *SpeedLimiter) Add(increment int, wait bool) bool { + if sl.speed <= 0 { + return true + } + sl.mtx.Lock() + defer sl.mtx.Unlock() + sl.freeCap += int(time.Since(sl.lastUpdate) * time.Duration(sl.speed) / time.Second) + if sl.freeCap > sl.maxFreeCap { + sl.freeCap = sl.maxFreeCap + } + if !wait && sl.freeCap < increment { + return false + } + sl.freeCap -= increment + sl.lastUpdate = time.Now() + if sl.freeCap < 0 { + // sleep for the overflow + fmt.Println("sleep ", time.Millisecond*time.Duration(-sl.freeCap*100)/time.Duration(sl.speed)) + time.Sleep(time.Millisecond * time.Duration(-sl.freeCap*1000) / time.Duration(sl.speed)) // sleep ms + } + return true +} diff --git a/core/speedlimiter_test.go b/core/speedlimiter_test.go new file mode 100644 index 0000000..a893137 --- /dev/null +++ b/core/speedlimiter_test.go @@ -0,0 +1,58 @@ +package openp2p + +import ( + "testing" + "time" +) + +func TestBandwidth(t *testing.T) { + speed := 10 * 1024 * 1024 / 8 // 10mbps + speedl := newSpeedLimiter(speed, 1) + oneBuffSize := 4096 + writeNum := 5000 + expectTime := oneBuffSize * writeNum / speed + startTs := time.Now() + for i := 0; i < writeNum; i++ { + speedl.Add(oneBuffSize, true) + } + t.Logf("cost %ds, expect %ds", time.Since(startTs)/time.Second, expectTime) + if time.Since(startTs) > time.Duration(expectTime+1)*time.Second || time.Since(startTs) < time.Duration(expectTime-1)*time.Second { + t.Error("error") + } +} + +func TestSymmetric(t *testing.T) { + speed := 20000 / 180 + speedl := newSpeedLimiter(speed, 180) + oneBuffSize := 300 + writeNum := 100 + expectTime := (oneBuffSize*writeNum - 20000) / speed + startTs := time.Now() + for i := 0; i < writeNum; i++ { + speedl.Add(oneBuffSize, true) + } + t.Logf("cost %ds, expect %ds", time.Since(startTs)/time.Second, expectTime) + if time.Since(startTs) > time.Duration(expectTime+1)*time.Second || time.Since(startTs) < time.Duration(expectTime-1)*time.Second { + t.Error("error") + } +} + +func TestSymmetric2(t *testing.T) { + speed := 30000 / 180 + speedl := newSpeedLimiter(speed, 180) + oneBuffSize := 800 + writeNum := 50 + expectTime := (oneBuffSize*writeNum - 30000) / speed + startTs := time.Now() + for i := 0; i < writeNum; { + if speedl.Add(oneBuffSize, true) { + i++ + } else { + time.Sleep(time.Millisecond) + } + } + t.Logf("cost %ds, expect %ds", time.Since(startTs)/time.Second, expectTime) + if time.Since(startTs) > time.Duration(expectTime+1)*time.Second || time.Since(startTs) < time.Duration(expectTime-1)*time.Second { + t.Error("error") + } +} diff --git a/core/underlay.go b/core/underlay.go index cf0067d..4ecac27 100644 --- a/core/underlay.go +++ b/core/underlay.go @@ -1,16 +1,62 @@ package openp2p import ( + "io" "time" ) type underlay interface { + Read([]byte) (int, error) + Write([]byte) (int, error) ReadBuffer() (*openP2PHeader, []byte, error) WriteBytes(uint16, uint16, []byte) error WriteBuffer([]byte) error WriteMessage(uint16, uint16, interface{}) error Close() error + WLock() + WUnlock() SetReadDeadline(t time.Time) error SetWriteDeadline(t time.Time) error Protocol() string } + +func DefaultReadBuffer(ul underlay) (*openP2PHeader, []byte, error) { + headBuf := make([]byte, openP2PHeaderSize) + _, err := io.ReadFull(ul, headBuf) + if err != nil { + return nil, nil, err + } + head, err := decodeHeader(headBuf) + if err != nil { + return nil, nil, err + } + dataBuf := make([]byte, head.DataLen) + _, err = io.ReadFull(ul, dataBuf) + return head, dataBuf, err +} + +func DefaultWriteBytes(ul underlay, mainType, subType uint16, data []byte) error { + writeBytes := append(encodeHeader(mainType, subType, uint32(len(data))), data...) + ul.WLock() + _, err := ul.Write(writeBytes) + ul.WUnlock() + return err +} + +func DefaultWriteBuffer(ul underlay, data []byte) error { + ul.WLock() + _, err := ul.Write(data) + ul.WUnlock() + return err +} + +func DefaultWriteMessage(ul underlay, mainType uint16, subType uint16, packet interface{}) error { + writeBytes, err := newMessage(mainType, subType, packet) + if err != nil { + return err + } + ul.WLock() + _, err = ul.Write(writeBytes) + ul.WUnlock() + return err +} diff --git a/core/underlay_quic.go b/core/underlay_quic.go index 4b2be0f..95a6db1 100644 --- a/core/underlay_quic.go +++ b/core/underlay_quic.go @@ -6,10 +6,8 @@ import ( "crypto/rsa" "crypto/tls" "crypto/x509" - "encoding/json" "encoding/pem" "fmt" - "io" "math/big" "net" "sync" @@ -33,46 +31,19 @@ func (conn *underlayQUIC) Protocol() string { } func (conn *underlayQUIC) ReadBuffer() (*openP2PHeader, []byte, error) { - headBuf := make([]byte, openP2PHeaderSize) - _, err := io.ReadFull(conn, headBuf) - if err != nil { - return nil, nil, err - } - head, err := decodeHeader(headBuf) - if err != nil { - return nil, nil, err - } - dataBuf := make([]byte, head.DataLen) - _, err = io.ReadFull(conn, dataBuf) - return head, dataBuf, err + return DefaultReadBuffer(conn) } func (conn *underlayQUIC) WriteBytes(mainType uint16, subType uint16, data []byte) error { - writeBytes := append(encodeHeader(mainType, subType, uint32(len(data))), data...) - conn.writeMtx.Lock() - _, err := conn.Write(writeBytes) - conn.writeMtx.Unlock() - return err + return DefaultWriteBytes(conn, mainType, subType, data) } func (conn *underlayQUIC) WriteBuffer(data []byte) error { - conn.writeMtx.Lock() - _, err := conn.Write(data) - conn.writeMtx.Unlock() - return err + return DefaultWriteBuffer(conn, data) } func (conn *underlayQUIC) WriteMessage(mainType uint16, subType uint16, packet interface{}) error { - // TODO: call newMessage - data, err := json.Marshal(packet) - if err != nil { - return err - } - writeBytes := append(encodeHeader(mainType, subType, uint32(len(data))), data...) - conn.writeMtx.Lock() - _, err = conn.Write(writeBytes) - conn.writeMtx.Unlock() - return err + return DefaultWriteMessage(conn, mainType, subType, packet) } func (conn *underlayQUIC) Close() error { @@ -80,6 +51,12 @@ func (conn *underlayQUIC) Close() error { conn.Connection.CloseWithError(0, "") return nil } +func (conn *underlayQUIC) WLock() { + conn.writeMtx.Lock() +} +func (conn *underlayQUIC) WUnlock() { + conn.writeMtx.Unlock() +} func (conn *underlayQUIC) CloseListener() { if conn.listener != nil { conn.listener.Close() diff --git a/core/underlay_tcp.go b/core/underlay_tcp.go index 5e14e5b..3967fa1 100644 --- a/core/underlay_tcp.go +++ b/core/underlay_tcp.go @@ -1,9 +1,8 @@ package openp2p import ( - "encoding/json" + "encoding/binary" "fmt" - "io" "net" "sync" "time" @@ -21,51 +20,30 @@ func (conn *underlayTCP) Protocol() string { } func (conn *underlayTCP) ReadBuffer() (*openP2PHeader, []byte, error) { - headBuf := make([]byte, openP2PHeaderSize) - _, err := io.ReadFull(conn, headBuf) - if err != nil { - return nil, nil, err - } - head, err := decodeHeader(headBuf) - if err != nil { - return nil, nil, err - } - dataBuf := make([]byte, head.DataLen) - _, err = io.ReadFull(conn, dataBuf) - return head, dataBuf, err + return DefaultReadBuffer(conn) } func (conn *underlayTCP) WriteBytes(mainType uint16, subType uint16, data []byte) error { - writeBytes := append(encodeHeader(mainType, subType, uint32(len(data))), data...) - conn.writeMtx.Lock() - _, err := conn.Write(writeBytes) - conn.writeMtx.Unlock() - return err + return DefaultWriteBytes(conn, mainType, subType, data) } func (conn *underlayTCP) WriteBuffer(data []byte) error { - conn.writeMtx.Lock() - _, err := conn.Write(data) - conn.writeMtx.Unlock() - return err + return DefaultWriteBuffer(conn, data) } func (conn *underlayTCP) WriteMessage(mainType uint16, subType uint16, packet interface{}) error { - // TODO: call newMessage - data, err := json.Marshal(packet) - if err != nil { - return err - } - writeBytes := append(encodeHeader(mainType, subType, uint32(len(data))), data...) - conn.writeMtx.Lock() - _, err = conn.Write(writeBytes) - conn.writeMtx.Unlock() - return err + return DefaultWriteMessage(conn, mainType, subType, packet) } func (conn *underlayTCP) Close() error { return conn.Conn.Close() } +func (conn *underlayTCP) WLock() { + conn.writeMtx.Lock() +} +func (conn *underlayTCP) WUnlock() { + conn.writeMtx.Unlock() +} func listenTCP(host string, port int, localPort int, mode string, t *P2PTunnel) (*underlayTCP, error) { if mode == LinkModeTCPPunch { @@ -76,34 +54,46 @@ func listenTCP(host string, port int, localPort int, mode string, t *P2PTunnel) gLog.Printf(LvDEBUG, "sleep %d ms", ts/time.Millisecond) time.Sleep(ts) } - gLog.Println(LvDEBUG, (time.Now().UnixNano()-t.pn.dt)/(int64)(time.Millisecond), " send tcp punch: ", fmt.Sprintf("0.0.0.0:%d", localPort), "-->", fmt.Sprintf("%s:%d", host, port)) + gLog.Println(LvDEBUG, " send tcp punch: ", fmt.Sprintf("0.0.0.0:%d", localPort), "-->", fmt.Sprintf("%s:%d", host, port)) c, err := reuse.DialTimeout("tcp", fmt.Sprintf("0.0.0.0:%d", localPort), fmt.Sprintf("%s:%d", host, port), CheckActiveTimeout) if err != nil { gLog.Println(LvDEBUG, "send tcp punch: ", err) return nil, err } - return &underlayTCP{writeMtx: &sync.Mutex{}, Conn: c}, nil + utcp := &underlayTCP{writeMtx: &sync.Mutex{}, Conn: c} + _, buff, err := utcp.ReadBuffer() + if err != nil { + return nil, fmt.Errorf("read start msg error:%s", err) + } + if buff != nil { + gLog.Println(LvDEBUG, string(buff)) + } + utcp.WriteBytes(MsgP2P, MsgTunnelHandshakeAck, buff) + return utcp, nil } t.pn.push(t.config.PeerNode, MsgPushUnderlayConnect, nil) - addr, _ := net.ResolveTCPAddr("tcp", fmt.Sprintf("0.0.0.0:%d", localPort)) - l, err := net.ListenTCP("tcp", addr) - if err != nil { - return nil, err + tid := t.id + if compareVersion(t.config.peerVersion, PublicIPVersion) == LESS { // old version + ipBytes := net.ParseIP(t.config.peerIP).To4() + tid = uint64(binary.BigEndian.Uint32(ipBytes)) + gLog.Println(LvDEBUG, "compatible with old client, use ip as key:", tid) } - l.SetDeadline(time.Now().Add(CheckActiveTimeout)) - c, err := l.Accept() - defer l.Close() - if err != nil { - return nil, err + utcp := v4l.getUnderlayTCP(tid) + if utcp == nil { + return nil, ErrConnectPublicV4 } - return &underlayTCP{writeMtx: &sync.Mutex{}, Conn: c}, nil + return utcp, nil } func dialTCP(host string, port int, localPort int, mode string) (*underlayTCP, error) { var c net.Conn var err error if mode == LinkModeTCPPunch { - c, err = reuse.DialTimeout("tcp", fmt.Sprintf("0.0.0.0:%d", localPort), fmt.Sprintf("%s:%d", host, port), CheckActiveTimeout) + gLog.Println(LvDEBUG, " send tcp punch: ", fmt.Sprintf("0.0.0.0:%d", localPort), "-->", fmt.Sprintf("%s:%d", host, port)) + if c, err = reuse.DialTimeout("tcp", fmt.Sprintf("0.0.0.0:%d", localPort), fmt.Sprintf("%s:%d", host, port), CheckActiveTimeout); err != nil { + gLog.Println(LvDEBUG, "send tcp punch: ", err) + } + } else { c, err = net.DialTimeout("tcp", fmt.Sprintf("%s:%d", host, port), CheckActiveTimeout) } diff --git a/core/underlay_tcp6.go b/core/underlay_tcp6.go index eaa3d52..8bb7a03 100644 --- a/core/underlay_tcp6.go +++ b/core/underlay_tcp6.go @@ -1,9 +1,7 @@ package openp2p import ( - "encoding/json" "fmt" - "io" "net" "sync" "time" @@ -20,52 +18,30 @@ func (conn *underlayTCP6) Protocol() string { } func (conn *underlayTCP6) ReadBuffer() (*openP2PHeader, []byte, error) { - headBuf := make([]byte, openP2PHeaderSize) - _, err := io.ReadFull(conn, headBuf) - if err != nil { - return nil, nil, err - } - head, err := decodeHeader(headBuf) - if err != nil { - return nil, nil, err - } - dataBuf := make([]byte, head.DataLen) - _, err = io.ReadFull(conn, dataBuf) - return head, dataBuf, err + return DefaultReadBuffer(conn) } func (conn *underlayTCP6) WriteBytes(mainType uint16, subType uint16, data []byte) error { - writeBytes := append(encodeHeader(mainType, subType, uint32(len(data))), data...) - conn.writeMtx.Lock() - _, err := conn.Write(writeBytes) - conn.writeMtx.Unlock() - return err + return DefaultWriteBytes(conn, mainType, subType, data) } func (conn *underlayTCP6) WriteBuffer(data []byte) error { - conn.writeMtx.Lock() - _, err := conn.Write(data) - conn.writeMtx.Unlock() - return err + return DefaultWriteBuffer(conn, data) } func (conn *underlayTCP6) WriteMessage(mainType uint16, subType uint16, packet interface{}) error { - // TODO: call newMessage - data, err := json.Marshal(packet) - if err != nil { - return err - } - writeBytes := append(encodeHeader(mainType, subType, uint32(len(data))), data...) - conn.writeMtx.Lock() - _, err = conn.Write(writeBytes) - conn.writeMtx.Unlock() - return err + return DefaultWriteMessage(conn, mainType, subType, packet) } func (conn *underlayTCP6) Close() error { return conn.Conn.Close() } - +func (conn *underlayTCP6) WLock() { + conn.writeMtx.Lock() +} +func (conn *underlayTCP6) WUnlock() { + conn.writeMtx.Unlock() +} func listenTCP6(port int, idleTimeout time.Duration) (*underlayTCP6, error) { addr, _ := net.ResolveTCPAddr("tcp6", fmt.Sprintf("[::]:%d", port)) l, err := net.ListenTCP("tcp6", addr) @@ -73,7 +49,7 @@ func listenTCP6(port int, idleTimeout time.Duration) (*underlayTCP6, error) { return nil, err } defer l.Close() - l.SetDeadline(time.Now().Add(HandshakeTimeout)) + l.SetDeadline(time.Now().Add(UnderlayConnectTimeout)) c, err := l.Accept() defer l.Close() if err != nil { @@ -83,7 +59,7 @@ func listenTCP6(port int, idleTimeout time.Duration) (*underlayTCP6, error) { } func dialTCP6(host string, port int) (*underlayTCP6, error) { - c, err := net.DialTimeout("tcp6", fmt.Sprintf("[%s]:%d", host, port), HandshakeTimeout) + c, err := net.DialTimeout("tcp6", fmt.Sprintf("[%s]:%d", host, port), UnderlayConnectTimeout) if err != nil { gLog.Printf(LvERROR, "Dial %s:%d error:%s", host, port, err) return nil, err diff --git a/core/v4listener.go b/core/v4listener.go new file mode 100644 index 0000000..d29e070 --- /dev/null +++ b/core/v4listener.go @@ -0,0 +1,82 @@ +package openp2p + +import ( + "encoding/binary" + "fmt" + "net" + "sync" + "time" +) + +type v4Listener struct { + conns sync.Map + port int + acceptCh chan bool +} + +func (vl *v4Listener) start() error { + v4l.acceptCh = make(chan bool, 10) + for { + vl.listen() + time.Sleep(time.Second * 5) + } +} + +func (vl *v4Listener) listen() error { + gLog.Printf(LvINFO, "listen %d start", vl.port) + defer gLog.Printf(LvINFO, "listen %d end", vl.port) + addr, _ := net.ResolveTCPAddr("tcp", fmt.Sprintf("0.0.0.0:%d", vl.port)) + l, err := net.ListenTCP("tcp", addr) + if err != nil { + gLog.Printf(LvERROR, "listen %d error:", vl.port, err) + return err + } + defer l.Close() + for { + c, err := l.Accept() + if err != nil { + break + } + go vl.handleConnection(c) + } + return nil +} +func (vl *v4Listener) handleConnection(c net.Conn) { + gLog.Println(LvDEBUG, "v4Listener accept connection: ", c.RemoteAddr().String()) + utcp := &underlayTCP{writeMtx: &sync.Mutex{}, Conn: c} + utcp.SetReadDeadline(time.Now().Add(time.Second * 5)) + _, buff, err := utcp.ReadBuffer() + if err != nil { + gLog.Printf(LvERROR, "utcp.ReadBuffer error:", err) + } + utcp.WriteBytes(MsgP2P, MsgTunnelHandshakeAck, buff) + var tid uint64 + if string(buff) == "OpenP2P,hello" { // old client + // save remoteIP as key + remoteAddr := c.RemoteAddr().(*net.TCPAddr).IP + ipBytes := remoteAddr.To4() + tid = uint64(binary.BigEndian.Uint32(ipBytes)) // bytes not enough for uint64 + gLog.Println(LvDEBUG, "hello ", string(buff)) + } else { + if len(buff) < 8 { + return + } + tid = binary.LittleEndian.Uint64(buff[:8]) + gLog.Println(LvDEBUG, "hello ", tid) + } + vl.conns.Store(tid, utcp) + vl.acceptCh <- true +} + +func (vl *v4Listener) getUnderlayTCP(tid uint64) *underlayTCP { + for i := 0; i < 100; i++ { + select { + case <-time.After(time.Millisecond * 50): + case <-vl.acceptCh: + } + if u, ok := vl.conns.LoadAndDelete(tid); ok { + return u.(*underlayTCP) + } + } + return nil +}