Skip to content

Commit

Permalink
peer: Synchronize net.Conn with a mutex
Browse files Browse the repository at this point in the history
The access guarded by an atomic int32 was incorrect.  For example, access to
the p.conn could be performed as long as p.connected was non-zero, but
p.connected would be incremented before p.conn was ever assigned by
AssociateConnection.

While here, also add missing mutex locking protecting timeConnected and na.
  • Loading branch information
jrick authored and davecgh committed Mar 4, 2025
1 parent fc664c0 commit 77c9cee
Showing 1 changed file with 23 additions and 10 deletions.
33 changes: 23 additions & 10 deletions peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,10 +460,10 @@ type Peer struct {
bytesSent uint64
lastRecv int64
lastSend int64
connected int32
disconnect int32

conn net.Conn
conn net.Conn
connMtx sync.Mutex

// blake256Hasher is the hash.Hash object that is used by readMessage
// to calculate the hash of read mixing messages. Every peer's hasher
Expand Down Expand Up @@ -1918,23 +1918,27 @@ func (p *Peer) QueueInventoryImmediate(invVect *wire.InvVect) {
//
// This function is safe for concurrent access.
func (p *Peer) Connected() bool {
return atomic.LoadInt32(&p.connected) != 0 &&
atomic.LoadInt32(&p.disconnect) == 0
p.connMtx.Lock()
defer p.connMtx.Unlock()

return p.conn != nil && atomic.LoadInt32(&p.disconnect) == 0
}

// Disconnect disconnects the peer by closing the connection. Calling this
// function when the peer is already disconnected or in the process of
// disconnecting will have no effect.
func (p *Peer) Disconnect() {
if atomic.AddInt32(&p.disconnect, 1) != 1 {
return
}
p.connMtx.Lock()
defer p.connMtx.Unlock()

log.Tracef("Disconnecting %s", p)
if atomic.LoadInt32(&p.connected) != 0 {
if p.conn != nil {
p.conn.Close()
}
close(p.quit)

if atomic.CompareAndSwapInt32(&p.disconnect, 0, 1) {
close(p.quit)
}
}

// readRemoteVersionMsg waits for the next message to arrive from the remote
Expand Down Expand Up @@ -2146,13 +2150,20 @@ func (p *Peer) start() error {
// Calling this function when the peer is already connected will
// have no effect.
func (p *Peer) AssociateConnection(conn net.Conn) {
p.connMtx.Lock()

// Already connected?
if !atomic.CompareAndSwapInt32(&p.connected, 0, 1) {
if p.conn != nil {
p.connMtx.Unlock()
return
}

p.conn = conn
p.connMtx.Unlock()

p.statsMtx.Lock()
p.timeConnected = time.Now()
p.statsMtx.Unlock()

if p.inbound {
p.addr = p.conn.RemoteAddr().String()
Expand All @@ -2166,7 +2177,9 @@ func (p *Peer) AssociateConnection(conn net.Conn) {
p.Disconnect()
return
}
p.flagsMtx.Lock()
p.na = na
p.flagsMtx.Unlock()
}

go func(peer *Peer) {
Expand Down

0 comments on commit 77c9cee

Please sign in to comment.