From 1a8a4638f388df3e2337760c078fde17f3f62093 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Wed, 4 Sep 2024 15:58:13 -0700 Subject: [PATCH 1/3] feat: Detect TCP Simopen and retry or backoff --- core/network/network.go | 3 + core/sec/security.go | 2 + p2p/net/swarm/dial_sync.go | 24 +++++++- p2p/net/swarm/dial_worker.go | 51 +++++++++++++--- p2p/net/swarm/swarm.go | 13 ++++ p2p/net/swarm/swarm_dial.go | 4 +- p2p/net/upgrader/upgrader.go | 14 +++++ p2p/security/noise/transport.go | 8 +++ p2p/security/tls/transport.go | 8 +++ tcpsimopen_test.go | 102 ++++++++++++++++++++++++++++++++ 10 files changed, 217 insertions(+), 12 deletions(-) create mode 100644 tcpsimopen_test.go diff --git a/core/network/network.go b/core/network/network.go index d2e2bc818d..20dc838356 100644 --- a/core/network/network.go +++ b/core/network/network.go @@ -204,6 +204,9 @@ type Dialer interface { type AddrDelay struct { Addr ma.Multiaddr Delay time.Duration + // ForceDelay forces the dialer to wait for the full delay before dialing, + // even if there are no other dials in flight. + ForceDelay bool } // DialRanker provides a schedule of dialing the provided addresses diff --git a/core/sec/security.go b/core/sec/security.go index d9e9183298..522c628fe7 100644 --- a/core/sec/security.go +++ b/core/sec/security.go @@ -41,3 +41,5 @@ func (e ErrPeerIDMismatch) Error() string { } var _ error = (*ErrPeerIDMismatch)(nil) + +var ErrSimOpen = fmt.Errorf("TCP simultaneous open caused security handshake to fail") diff --git a/p2p/net/swarm/dial_sync.go b/p2p/net/swarm/dial_sync.go index 3cc8547281..0e7cdfe364 100644 --- a/p2p/net/swarm/dial_sync.go +++ b/p2p/net/swarm/dial_sync.go @@ -10,7 +10,7 @@ import ( ) // dialWorkerFunc is used by dialSync to spawn a new dial worker -type dialWorkerFunc func(peer.ID, <-chan dialRequest) +type dialWorkerFunc func(context.Context, peer.ID, <-chan dialRequest) // errConcurrentDialSuccessful is used to signal that a concurrent dial succeeded var errConcurrentDialSuccessful = errors.New("concurrent dial successful") @@ -55,6 +55,8 @@ func (ad *activeDial) dial(ctx context.Context) (*Conn, error) { case ad.reqch <- dialRequest{ctx: dialCtx, resch: resch}: case <-ctx.Done(): return nil, ctx.Err() + case <-dialCtx.Done(): + return nil, dialCtx.Err() } select { @@ -62,6 +64,8 @@ func (ad *activeDial) dial(ctx context.Context) (*Conn, error) { return res.conn, res.err case <-ctx.Done(): return nil, ctx.Err() + case <-dialCtx.Done(): + return nil, dialCtx.Err() } } @@ -79,7 +83,7 @@ func (ds *dialSync) getActiveDial(p peer.ID) (*activeDial, error) { cancelCause: cancel, reqch: make(chan dialRequest), } - go ds.dialWorker(p, actd.reqch) + go ds.dialWorker(ctx, p, actd.reqch) ds.dials[p] = actd } // increase ref count before dropping mutex @@ -96,6 +100,12 @@ func (ds *dialSync) Dial(ctx context.Context, p peer.ID) (*Conn, error) { } conn, err := ad.dial(ctx) + if cause := context.Cause(ad.ctx); cause != nil { + var haveInboundConn errHaveInboundConn + if errors.As(cause, &haveInboundConn) { + conn, err = haveInboundConn.c, nil + } + } ds.mutex.Lock() defer ds.mutex.Unlock() @@ -113,3 +123,13 @@ func (ds *dialSync) Dial(ctx context.Context, p peer.ID) (*Conn, error) { return conn, err } + +func (ds *dialSync) CancelActiveDial(p peer.ID, cause error) { + ds.mutex.Lock() + ad, ok := ds.dials[p] + ds.mutex.Unlock() + if !ok { + return + } + ad.cancelCause(cause) +} diff --git a/p2p/net/swarm/dial_worker.go b/p2p/net/swarm/dial_worker.go index 360a99e2ab..0e928ad5b4 100644 --- a/p2p/net/swarm/dial_worker.go +++ b/p2p/net/swarm/dial_worker.go @@ -2,13 +2,18 @@ package swarm import ( "context" + "errors" + "fmt" "math" + "strings" "sync" "time" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/sec" tpt "github.com/libp2p/go-libp2p/core/transport" + "golang.org/x/exp/rand" ma "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" @@ -66,11 +71,13 @@ type addrDial struct { dialRankingDelay time.Duration // expectedTCPUpgradeTime is the expected time by which security upgrade will complete expectedTCPUpgradeTime time.Time + retried bool } // dialWorker synchronises concurrent dials to a peer. It ensures that we make at most one dial to a // peer's address type dialWorker struct { + ctx context.Context s *Swarm peer peer.ID // reqch is used to send dial requests to the worker. close reqch to end the worker loop @@ -78,7 +85,7 @@ type dialWorker struct { // pendingRequests is the set of pendingRequests pendingRequests map[*pendRequest]struct{} // trackedDials tracks dials to the peer's addresses. An entry here is used to ensure that - // we dial an address at most once + // we dial an address at most once. Maybe twice in the case of an error from TCP simultaneous open trackedDials map[string]*addrDial // resch is used to receive response for dials to the peers addresses. resch chan tpt.DialUpdate @@ -90,11 +97,12 @@ type dialWorker struct { cl Clock } -func newDialWorker(s *Swarm, p peer.ID, reqch <-chan dialRequest, cl Clock) *dialWorker { +func newDialWorker(ctx context.Context, s *Swarm, p peer.ID, reqch <-chan dialRequest, cl Clock) *dialWorker { if cl == nil { cl = RealClock{} } return &dialWorker{ + ctx: ctx, s: s, peer: p, reqch: reqch, @@ -130,7 +138,8 @@ func (w *dialWorker) loop() { } timerRunning = false if dq.Len() > 0 { - if dialsInFlight == 0 && !w.connected { + top := dq.top() + if !top.ForceDelay && dialsInFlight == 0 && !w.connected { // if there are no dials in flight, trigger the next dials immediately dialTimer.Reset(startTime) } else { @@ -159,6 +168,8 @@ loop: // interested in dials on this address. select { + case <-w.ctx.Done(): + return case req, ok := <-w.reqch: if !ok { if w.s.metricsTracer != nil { @@ -368,13 +379,37 @@ loop: // it must be an error -- add backoff if applicable and dispatch // ErrDialRefusedBlackHole shouldn't end up here, just a safety check - if res.Err != ErrDialRefusedBlackHole && res.Err != context.Canceled && !w.connected { - // we only add backoff if there has not been a successful connection - // for consistency with the old dialer behavior. - w.s.backf.AddBackoff(w.peer, res.Addr) - } else if res.Err == ErrDialRefusedBlackHole { + switch { + case res.Err == ErrDialRefusedBlackHole: log.Errorf("SWARM BUG: unexpected ErrDialRefusedBlackHole while dialing peer %s to addr %s", w.peer, res.Addr) + case res.Err == context.Canceled: + case !ad.retried && errors.Is(res.Err, sec.ErrSimOpen): + now := time.Now() + // these are new addresses, track them and add them to dq + w.trackedDials[string(ad.addr.Bytes())] = &addrDial{ + addr: ad.addr, + ctx: ad.ctx, + createdAt: now, + retried: true, + } + // This is an error due to simultaneous open. Let the "smaller" + // peer try again first, otherwise we'll try again after a delay + var delay time.Duration + if strings.Compare(string(w.peer), string(w.s.LocalPeer())) == -1 { + // Random delay to avoid the other side being able to predict when we'll try again + delay = 50*time.Millisecond + time.Duration(rand.Intn(100))*time.Millisecond + } + dq.UpdateOrAdd(network.AddrDelay{Addr: ad.addr, Delay: delay, ForceDelay: true}) + scheduleNextDial() + continue loop + default: + if !w.connected { + // we only add backoff if there has not been a successful connection + // for consistency with the old dialer behavior. + w.s.backf.AddBackoff(w.peer, res.Addr) + } + } w.dispatchError(ad, res.Err) diff --git a/p2p/net/swarm/swarm.go b/p2p/net/swarm/swarm.go index 0127555552..bd25c0b727 100644 --- a/p2p/net/swarm/swarm.go +++ b/p2p/net/swarm/swarm.go @@ -357,6 +357,14 @@ func (s *Swarm) close() { wg.Wait() } +type errHaveInboundConn struct { + c *Conn +} + +func (e errHaveInboundConn) Error() string { + return "have inbound connection" +} + func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, error) { var ( p = tc.RemotePeer() @@ -401,6 +409,11 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, // Clear any backoffs s.backf.Clear(p) + // Cancel the pending dial if it exists. + if dir == network.DirInbound { + s.dsync.CancelActiveDial(p, errHaveInboundConn{c}) + } + // Finally, add the peer. s.conns.Lock() // Check if we're still online diff --git a/p2p/net/swarm/swarm_dial.go b/p2p/net/swarm/swarm_dial.go index 446ece4504..cc79327976 100644 --- a/p2p/net/swarm/swarm_dial.go +++ b/p2p/net/swarm/swarm_dial.go @@ -290,8 +290,8 @@ func (s *Swarm) dialPeer(ctx context.Context, p peer.ID) (*Conn, error) { } // dialWorkerLoop synchronizes and executes concurrent dials to a single peer -func (s *Swarm) dialWorkerLoop(p peer.ID, reqch <-chan dialRequest) { - w := newDialWorker(s, p, reqch, nil) +func (s *Swarm) dialWorkerLoop(ctx context.Context, p peer.ID, reqch <-chan dialRequest) { + w := newDialWorker(ctx, s, p, reqch, nil) w.loop() } diff --git a/p2p/net/upgrader/upgrader.go b/p2p/net/upgrader/upgrader.go index 3a6f8b9f52..696600cd6a 100644 --- a/p2p/net/upgrader/upgrader.go +++ b/p2p/net/upgrader/upgrader.go @@ -328,6 +328,20 @@ func (u *upgrader) negotiateSecurity(ctx context.Context, insecure net.Conn, ser select { case r := <-done: if r.err != nil { + var errUnrecognized mss.ErrUnrecognizedResponse[protocol.ID] + if errors.As(r.err, &errUnrecognized) && len(u.securityIDs) > 0 { + // We tried to open a connection with our first security ID, and + // the other side gave us something we didn't expect that looks + // like a security ID. + // + // This hints that the error is due to TCP Simultaneous Open. + // Both sides think they are the client and suggest a protocol + // at the beginning. We fail because they suggested a protocol + // that we didn't suggest. + if errUnrecognized.Expected == u.securityIDs[0] && len(errUnrecognized.Actual) > 0 && errUnrecognized.Actual[0] == '/' { + return nil, sec.ErrSimOpen + } + } return nil, r.err } if s := u.getSecurityByID(r.proto); s != nil { diff --git a/p2p/security/noise/transport.go b/p2p/security/noise/transport.go index e42cea1bf7..30afd2a248 100644 --- a/p2p/security/noise/transport.go +++ b/p2p/security/noise/transport.go @@ -63,11 +63,19 @@ func (t *Transport) SecureInbound(ctx context.Context, insecure net.Conn, p peer return SessionWithConnState(c, responderEDH.MatchMuxers(false)), err } +// Detect if the error is due to TCP Simultaneous Open. In that case, both sides will believe they are the client. +func errIsSimOpen(err error) bool { + return err.Error() == "error reading handshake message: noise: message is too short" +} + // SecureOutbound runs the Noise handshake as the initiator. func (t *Transport) SecureOutbound(ctx context.Context, insecure net.Conn, p peer.ID) (sec.SecureConn, error) { initiatorEDH := newTransportEDH(t) c, err := newSecureSession(t, ctx, insecure, p, nil, initiatorEDH, nil, true, true) if err != nil { + if errIsSimOpen(err) { + return nil, sec.ErrSimOpen + } return c, err } return SessionWithConnState(c, initiatorEDH.MatchMuxers(true)), err diff --git a/p2p/security/tls/transport.go b/p2p/security/tls/transport.go index 0c494a7fdc..4e66d74091 100644 --- a/p2p/security/tls/transport.go +++ b/p2p/security/tls/transport.go @@ -100,6 +100,11 @@ func (t *Transport) SecureInbound(ctx context.Context, insecure net.Conn, p peer return cs, err } +// Detect if the error is due to TCP Simultaneous Open. In that case, both sides will believe they are the client. +func errIsSimOpen(err error) bool { + return err.Error() == "tls: received unexpected handshake message of type *tls.clientHelloMsg when waiting for *tls.serverHelloMsg" +} + // SecureOutbound runs the TLS handshake as a client. // Note that SecureOutbound will not return an error if the server doesn't // accept the certificate. This is due to the fact that in TLS 1.3, the client @@ -118,6 +123,9 @@ func (t *Transport) SecureOutbound(ctx context.Context, insecure net.Conn, p pee cs, err := t.handshake(ctx, tls.Client(insecure, config), keyCh) if err != nil { insecure.Close() + if errIsSimOpen(err) { + return nil, sec.ErrSimOpen + } } return cs, err } diff --git a/tcpsimopen_test.go b/tcpsimopen_test.go new file mode 100644 index 0000000000..e6c663a787 --- /dev/null +++ b/tcpsimopen_test.go @@ -0,0 +1,102 @@ +package libp2p_test + +import ( + "context" + "errors" + "fmt" + "reflect" + "testing" + + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/p2p/security/noise" + tls "github.com/libp2p/go-libp2p/p2p/security/tls" + "github.com/stretchr/testify/require" +) + +func TestSimOpen(t *testing.T) { + tls := libp2p.Security(tls.ID, tls.New) + noise := libp2p.Security(noise.ID, noise.New) + + type testCase struct { + name string + securityOptions []libp2p.Option + } + + testCasesPerHost := []testCase{ + {"TLS,Noise", []libp2p.Option{tls, noise}}, + {"Noise,TLS", []libp2p.Option{noise, tls}}, + {"TLS", []libp2p.Option{tls}}, + {"Noise", []libp2p.Option{noise}}, + } + + noIntersection := func(a, b []libp2p.Option) bool { + for _, aOpt := range a { + for _, bOpt := range b { + if reflect.ValueOf(aOpt).Pointer() == reflect.ValueOf(bOpt).Pointer() { + return false + } + } + } + return true + } + + for _, tc1 := range testCasesPerHost { + for _, tc2 := range testCasesPerHost { + t.Run(fmt.Sprintf("h1(%s)<->h2(%s)", tc1.name, tc2.name), func(t *testing.T) { + newHosts := func() (host.Host, host.Host) { + h1, err := libp2p.New(libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0"), libp2p.ChainOptions(tc1.securityOptions...)) + require.NoError(t, err) + h2, err := libp2p.New(libp2p.ListenAddrStrings("/ip4/127.0.0.1/tcp/0"), libp2p.ChainOptions(tc2.securityOptions...)) + require.NoError(t, err) + return h1, h2 + } + closeHosts := func(hs ...host.Host) { + for _, h := range hs { + require.NoError(t, h.Close()) + } + } + + simConnect := func(ctx context.Context, h1, h2 host.Host) error { + errs := make(chan error, 2) + go func() { + errs <- h1.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}) + }() + go func() { + errs <- h2.Connect(ctx, peer.AddrInfo{ID: h1.ID(), Addrs: h1.Addrs()}) + }() + return errors.Join(<-errs, <-errs) + } + closeConns := func(hs ...host.Host) { + for _, h := range hs { + for _, c := range h.Network().Conns() { + require.NoError(t, c.Close()) + } + } + } + + if noIntersection(tc1.securityOptions, tc2.securityOptions) { + // This is going to fail because there is no common security protocol + for i := 0; i < 3; i++ { + h1, h2 := newHosts() + ctx, cancel := context.WithCancel(context.Background()) + require.Error(t, simConnect(ctx, h1, h2), "iteration %d", i) + cancel() + closeHosts(h1, h2) + } + } else { + for i := 0; i < 100; i++ { + h1, h2 := newHosts() + ctx, cancel := context.WithCancel(context.Background()) + require.NoError(t, simConnect(ctx, h1, h2), "iteration %d", i) + cancel() + closeConns(h1, h2) + closeHosts(h1, h2) + } + } + + }) + } + } +} From 7773406f79f34415adf12d116d1ff4db32f2fb33 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Wed, 4 Sep 2024 15:58:26 -0700 Subject: [PATCH 2/3] Use prerelease of go-multistream --- go.mod | 3 +++ go.sum | 4 ++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/go.mod b/go.mod index 684f354427..5e42650ee3 100644 --- a/go.mod +++ b/go.mod @@ -130,3 +130,6 @@ require ( gopkg.in/yaml.v3 v3.0.1 // indirect lukechampine.com/blake3 v1.3.0 // indirect ) + +// TODO remove once we have a go-multistream release +replace github.com/multiformats/go-multistream => github.com/multiformats/go-multistream v0.5.1-0.20240903234121-990321dd2b7f diff --git a/go.sum b/go.sum index 634d497c4b..36622157d1 100644 --- a/go.sum +++ b/go.sum @@ -248,8 +248,8 @@ github.com/multiformats/go-multicodec v0.9.0/go.mod h1:L3QTQvMIaVBkXOXXtVmYE+LI1 github.com/multiformats/go-multihash v0.0.8/go.mod h1:YSLudS+Pi8NHE7o6tb3D8vrpKa63epEDmG8nTduyAew= github.com/multiformats/go-multihash v0.2.3 h1:7Lyc8XfX/IY2jWb/gI7JP+o7JEq9hOa7BFvVU9RSh+U= github.com/multiformats/go-multihash v0.2.3/go.mod h1:dXgKXCXjBzdscBLk9JkjINiEsCKRVch90MdaGiKsvSM= -github.com/multiformats/go-multistream v0.5.0 h1:5htLSLl7lvJk3xx3qT/8Zm9J4K8vEOf/QGkvOGQAyiE= -github.com/multiformats/go-multistream v0.5.0/go.mod h1:n6tMZiwiP2wUsR8DgfDWw1dydlEqV3l6N3/GBsX6ILA= +github.com/multiformats/go-multistream v0.5.1-0.20240903234121-990321dd2b7f h1:M6Yt9ZHP5HWoV9FwzzwBSvrrR0vPNzzsp+cCmIF4gVY= +github.com/multiformats/go-multistream v0.5.1-0.20240903234121-990321dd2b7f/go.mod h1:MOyoG5otO24cHIg8kf9QW2/NozURlkP/rvi2FQJyCPg= github.com/multiformats/go-varint v0.0.1/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXSrVKRY101jdMZYE= github.com/multiformats/go-varint v0.0.7 h1:sWSGR+f/eu5ABZA2ZpYKBILXTTs9JWpdEM/nEGOHFS8= github.com/multiformats/go-varint v0.0.7/go.mod h1:r8PUYw/fD/SjBCiKOoDlGF6QawOELpZAu9eioSos/OU= From 464835023c6ebe5db2008d8242da813f0552eec2 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Wed, 4 Sep 2024 16:12:36 -0700 Subject: [PATCH 3/3] Retry a couple of times --- p2p/net/swarm/dial_worker.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/p2p/net/swarm/dial_worker.go b/p2p/net/swarm/dial_worker.go index 0e928ad5b4..0980494ff6 100644 --- a/p2p/net/swarm/dial_worker.go +++ b/p2p/net/swarm/dial_worker.go @@ -19,6 +19,8 @@ import ( manet "github.com/multiformats/go-multiaddr/net" ) +const maxTCPSimOpenRetries = 3 + // dialRequest is structure used to request dials to the peer associated with a // worker loop type dialRequest struct { @@ -71,7 +73,7 @@ type addrDial struct { dialRankingDelay time.Duration // expectedTCPUpgradeTime is the expected time by which security upgrade will complete expectedTCPUpgradeTime time.Time - retried bool + retryCount uint8 } // dialWorker synchronises concurrent dials to a peer. It ensures that we make at most one dial to a @@ -384,21 +386,21 @@ loop: log.Errorf("SWARM BUG: unexpected ErrDialRefusedBlackHole while dialing peer %s to addr %s", w.peer, res.Addr) case res.Err == context.Canceled: - case !ad.retried && errors.Is(res.Err, sec.ErrSimOpen): + case ad.retryCount < maxTCPSimOpenRetries && errors.Is(res.Err, sec.ErrSimOpen): now := time.Now() // these are new addresses, track them and add them to dq w.trackedDials[string(ad.addr.Bytes())] = &addrDial{ - addr: ad.addr, - ctx: ad.ctx, - createdAt: now, - retried: true, + addr: ad.addr, + ctx: ad.ctx, + createdAt: now, + retryCount: ad.retryCount + 1, } // This is an error due to simultaneous open. Let the "smaller" // peer try again first, otherwise we'll try again after a delay var delay time.Duration if strings.Compare(string(w.peer), string(w.s.LocalPeer())) == -1 { // Random delay to avoid the other side being able to predict when we'll try again - delay = 50*time.Millisecond + time.Duration(rand.Intn(100))*time.Millisecond + delay = time.Duration((50+rand.Intn(100))<