Skip to content

Commit

Permalink
Eliminate busy loop when receiving messages from non-blocking socket
Browse files Browse the repository at this point in the history
For a non-blocking socket, it should wait for events first before
receiving messages from the socket, otherwise it would receive empty
message and run into a busy loop.

Signed-off-by: Quan Tian <[email protected]>
  • Loading branch information
tnqn committed May 23, 2024
1 parent 856e190 commit 626063b
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 4 deletions.
27 changes: 24 additions & 3 deletions nl/nl_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,10 @@ type NetlinkSocket struct {
fd int32
lsa unix.SockaddrNetlink
sync.Mutex

// pfd is non nil when the socket is in non-blocking mode, and is used to wait for events on the socket.
pfd *unix.PollFd
pollTimeout int64
}

func getNetlinkSocket(protocol int) (*NetlinkSocket, error) {
Expand Down Expand Up @@ -728,17 +732,22 @@ func Subscribe(protocol int, groups ...uint) (*NetlinkSocket, error) {
return nil, err
}

var pfd *unix.PollFd
// Sometimes (socket_linux.go:SocketGet), Subscribe is used to create a socket
// that subscirbed to no groups. So we don't need to set nonblock there.
// that subscribes to no groups. So we don't need to set nonblock there.
if len(groups) > 0 {
if err := unix.SetNonblock(fd, true); err != nil {
unix.Close(fd)
return nil, err
}
pfd = &unix.PollFd{Fd: int32(fd), Events: unix.POLLIN}
}

s := &NetlinkSocket{
fd: int32(fd),
fd: int32(fd),
pfd: pfd,
// poll blocks infinitely by default.
pollTimeout: -1,
}
s.lsa.Family = unix.AF_NETLINK

Expand Down Expand Up @@ -791,6 +800,13 @@ func (s *NetlinkSocket) Receive() ([]syscall.NetlinkMessage, *unix.SockaddrNetli
if fd < 0 {
return nil, nil, fmt.Errorf("Receive called on a closed socket")
}
// The socket is in non-blocking mode.
if s.pfd != nil {
if _, err := unix.Poll([]unix.PollFd{*s.pfd}, int(atomic.LoadInt64(&s.pollTimeout))); err != nil {
return nil, nil, fmt.Errorf("Error polling the socket: %w", err)
}
}

var fromAddr *unix.SockaddrNetlink
var rb [RECEIVE_BUFFER_SIZE]byte
nr, from, err := unix.Recvfrom(fd, rb[:], 0)
Expand Down Expand Up @@ -825,7 +841,12 @@ func (s *NetlinkSocket) SetSendTimeout(timeout *unix.Timeval) error {
func (s *NetlinkSocket) SetReceiveTimeout(timeout *unix.Timeval) error {
// Set a read timeout of SOCKET_READ_TIMEOUT, this will allow the Read to periodically unblock and avoid that a routine
// remains stuck on a recvmsg on a closed fd
return unix.SetsockoptTimeval(int(s.fd), unix.SOL_SOCKET, unix.SO_RCVTIMEO, timeout)
if err := unix.SetsockoptTimeval(int(s.fd), unix.SOL_SOCKET, unix.SO_RCVTIMEO, timeout); err != nil {
return err
}
// Set poll timeout to the same value to allow it to unblock upon timeout.
atomic.StoreInt64(&s.pollTimeout, timeout.Sec*1000+timeout.Usec/1000)
return nil
}

// SetReceiveBufferSize allows to set a receive buffer size on the socket
Expand Down
2 changes: 1 addition & 1 deletion nl/nl_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestIfSocketCloses(t *testing.T) {
for {
_, _, err := sk.Receive()
// Receive returned because of a timeout and the FD == -1 means that the socket got closed
if err == unix.EAGAIN && nlSock.GetFd() == -1 {
if nlSock.GetFd() == -1 {
endCh <- err
return
}
Expand Down

0 comments on commit 626063b

Please sign in to comment.