Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Use context cancellation instead of Stop to stop streams. #879

Merged
merged 11 commits into from
Jun 11, 2024
60 changes: 31 additions & 29 deletions internal/tailer/logstream/dgramstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ import (
)

type dgramStream struct {
ctx context.Context
cancel context.CancelFunc

lines chan<- *logline.LogLine

scheme string // Datagram scheme, either "unixgram" or "udp".
Expand All @@ -25,17 +26,15 @@ type dgramStream struct {
mu sync.RWMutex // protects following fields
completed bool // This pipestream is completed and can no longer be used.
lastReadTime time.Time // Last time a log line was read from this named pipe

stopOnce sync.Once // Ensure stopChan only closed once.
stopChan chan struct{} // Close to start graceful shutdown.
}

func newDgramStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, scheme, address string, lines chan<- *logline.LogLine) (LogStream, error) {
func newDgramStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, scheme, address string, lines chan<- *logline.LogLine, oneShot OneShotMode) (LogStream, error) {
if address == "" {
return nil, ErrEmptySocketAddress
}
ss := &dgramStream{ctx: ctx, scheme: scheme, address: address, lastReadTime: time.Now(), lines: lines, stopChan: make(chan struct{})}
if err := ss.stream(ctx, wg, waker); err != nil {
ctx, cancel := context.WithCancel(ctx)
ss := &dgramStream{cancel: cancel, scheme: scheme, address: address, lastReadTime: time.Now(), lines: lines}
if err := ss.stream(ctx, wg, waker, oneShot); err != nil {
return nil, err
}
return ss, nil
Expand All @@ -50,22 +49,22 @@ func (ss *dgramStream) LastReadTime() time.Time {
// The read buffer size for datagrams.
const datagramReadBufferSize = 131072

func (ss *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker) error {
func (ss *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, oneShot OneShotMode) error {
c, err := net.ListenPacket(ss.scheme, ss.address)
if err != nil {
logErrors.Add(ss.address, 1)
return err
}
glog.V(2).Infof("opened new datagram socket %v", c)
glog.V(2).Infof("stream(%s:%s): opened new datagram socket %v", ss.scheme, ss.address, c)
b := make([]byte, datagramReadBufferSize)
partial := bytes.NewBufferString("")
var total int
wg.Add(1)
go func() {
defer wg.Done()
defer func() {
glog.V(2).Infof("%v: read total %d bytes from %s", c, total, ss.address)
glog.V(2).Infof("%v: closing connection", c)
glog.V(2).Infof("stream(%s:%s): read total %d bytes", ss.scheme, ss.address, total)
glog.V(2).Infof("stream(%s:%s): closing connection", ss.scheme, ss.address)
err := c.Close()
if err != nil {
logErrors.Add(ss.address, 1)
Expand All @@ -83,15 +82,25 @@ func (ss *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak

for {
n, _, err := c.ReadFrom(b)
glog.V(2).Infof("%v: read %d bytes, err is %v", c, n, err)
glog.V(2).Infof("stream(%s:%s): read %d bytes, err is %v", ss.scheme, ss.address, n, err)

// This is a test-only trick that says if we've already put this
// logstream in graceful shutdown, then a zero-byte read is
// equivalent to an "EOF" in connection and file oriented streams.
if n == 0 {
if oneShot {
glog.V(2).Infof("stream(%s:%s): exiting because zero byte read and one shot", ss.scheme, ss.address)
if partial.Len() > 0 {
sendLine(ctx, ss.address, partial, ss.lines)
}
return
}
select {
case <-ss.stopChan:
glog.V(2).Infof("%v: exiting because zero byte read after Stop", c)
case <-ctx.Done():
glog.V(2).Infof("stream(%s:%s): exiting because zero byte read after cancellation", ss.scheme, ss.address)
if partial.Len() > 0 {
sendLine(ctx, ss.address, partial, ss.lines)
}
return
default:
}
Expand All @@ -100,7 +109,7 @@ func (ss *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak
if n > 0 {
total += n
//nolint:contextcheck
decodeAndSend(ss.ctx, ss.lines, ss.address, n, b[:n], partial)
decodeAndSend(ctx, ss.lines, ss.address, n, b[:n], partial)
ss.mu.Lock()
ss.lastReadTime = time.Now()
ss.mu.Unlock()
Expand All @@ -110,28 +119,23 @@ func (ss *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak
if partial.Len() > 0 {
sendLine(ctx, ss.address, partial, ss.lines)
}
glog.V(2).Infof("%v: exiting, stream has error %s", c, err)
glog.V(2).Infof("stream(%s:%s): exiting, stream has error %s", ss.scheme, ss.address, err)
return
}

// Yield and wait
glog.V(2).Infof("%v: waiting", c)
glog.V(2).Infof("stream(%s:%s): waiting", ss.scheme, ss.address)
select {
case <-ss.stopChan:
case <-ctx.Done():
// We may have started waiting here when the stop signal
// arrives, but since that wait the file may have been
// written to. The file is not technically yet at EOF so
// we need to go back and try one more read. We'll exit
// the stream in the zero byte handler above.
glog.V(2).Infof("%v: Stopping after next zero byte read", c)
case <-ctx.Done():
// Exit immediately; a cancelled context will set an immediate
// deadline on the next read which will cause us to exit then,
// so don't bother going around the loop again.
return
glog.V(2).Infof("stream(%s:%s): Stopping after next zero byte read", ss.scheme, ss.address)
case <-waker.Wake():
// sleep until next Wake()
glog.V(2).Infof("%v: Wake received", c)
glog.V(2).Infof("stream(%s:%s): Wake received", ss.scheme, ss.address)
}
}
}()
Expand All @@ -145,8 +149,6 @@ func (ss *dgramStream) IsComplete() bool {
}

func (ss *dgramStream) Stop() {
glog.V(2).Infof("Stop received on datagram stream.")
ss.stopOnce.Do(func() {
close(ss.stopChan)
})
glog.V(2).Infof("stream(%s:%s): Stop received on datagram stream.", ss.scheme, ss.address)
ss.cancel()
}
10 changes: 4 additions & 6 deletions internal/tailer/logstream/dgramstream_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestDgramStreamReadCompletedBecauseSocketClosed(t *testing.T) {
waker, awaken := waker.NewTest(ctx, 1, "stream")

sockName := scheme + "://" + addr
ss, err := logstream.New(ctx, &wg, waker, sockName, lines, logstream.OneShotDisabled)
ss, err := logstream.New(ctx, &wg, waker, sockName, lines, logstream.OneShotEnabled)
testutil.FatalIfErr(t, err)

s, err := net.Dial(scheme, addr)
Expand All @@ -54,9 +54,7 @@ func TestDgramStreamReadCompletedBecauseSocketClosed(t *testing.T) {

awaken(0, 0) // sync past read

ss.Stop()

// "Close" the socket by sending zero bytes, which after Stop tells the stream to act as if we're done.
// "Close" the socket by sending zero bytes, which in oneshot mode tells the stream to act as if we're done.
_, err = s.Write([]byte{})
testutil.FatalIfErr(t, err)

Expand All @@ -65,7 +63,7 @@ func TestDgramStreamReadCompletedBecauseSocketClosed(t *testing.T) {

received := testutil.LinesReceived(lines)
expected := []*logline.LogLine{
{context.TODO(), addr, "1"},
{Context: context.TODO(), Filename: addr, Line: "1"},
}
testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context"))

Expand Down Expand Up @@ -118,7 +116,7 @@ func TestDgramStreamReadCompletedBecauseCancel(t *testing.T) {

received := testutil.LinesReceived(lines)
expected := []*logline.LogLine{
{context.TODO(), addr, "1"},
{Context: context.TODO(), Filename: addr, Line: "1"},
}
testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context"))

Expand Down
Loading
Loading