Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Clean up internals of the logstreams #897

Merged
merged 13 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/mtail/read_pipe_integration_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"golang.org/x/sys/unix"
)

func TestReadFromPipe(t *testing.T) {
func TestReadFromFifo(t *testing.T) {
testutil.SkipIfShort(t)
tmpDir := testutil.TestTempDir(t)

Expand Down
7 changes: 6 additions & 1 deletion internal/mtail/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,12 @@ func TestMakeServer(tb testing.TB, patternWakers int, streamWakers int, options
tb: tb,
cancel: cancel,
}
ts.streamWaker, ts.AwakenLogStreams = waker.NewTest(ctx, streamWakers, "streams")
if streamWakers == 0 {
ts.streamWaker = waker.NewTestAlways()
ts.AwakenLogStreams = func(int, int) {}
} else {
ts.streamWaker, ts.AwakenLogStreams = waker.NewTest(ctx, streamWakers, "streams")
}
ts.patternWaker, ts.AwakenPatternPollers = waker.NewTest(ctx, patternWakers, "patterns")
options = append(options,
LogstreamPollWaker(ts.streamWaker),
Expand Down
24 changes: 24 additions & 0 deletions internal/tailer/logstream/base.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2024 Google Inc. All Rights Reserved.
// This file is available under the Apache license.

package logstream

import (
"time"

"github.com/google/mtail/internal/logline"
)

type streamBase struct {
sourcename string // human readable name of the logstream source

lines chan *logline.LogLine // outbound channel for lines

staleTimer *time.Timer // Expire the stream if no read in 24h.
}

// Lines returns the output log line channel for this stream. The stream is
// completed when this channel closes.
func (s *streamBase) Lines() <-chan *logline.LogLine {
return s.lines
}
10 changes: 9 additions & 1 deletion internal/tailer/logstream/cancel.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@ import (
"github.com/golang/glog"
)

// ReadDeadliner has a SetReadDeadline function to be used for interrupting reads.
type ReadDeadliner interface {
SetReadDeadline(t time.Time) error
}

// SetReadDeadlineOnDone waits for the context to be done, and then sets an
// immediate read deadline on the flie descriptor `d`. This causes any blocked
// reads on that descriptor to return with an i/o timeout error.
func SetReadDeadlineOnDone(ctx context.Context, d ReadDeadliner) {
go func() {
<-ctx.Done()
Expand All @@ -25,7 +29,11 @@ func SetReadDeadlineOnDone(ctx context.Context, d ReadDeadliner) {
}()
}

func IsEndOrCancel(err error) bool {
// IsExitableError returns true if a stream should exit because of this error.
func IsExitableError(err error) bool {
if err == nil {
return false
}
if errors.Is(err, io.EOF) {
return true
}
Expand Down
10 changes: 5 additions & 5 deletions internal/tailer/logstream/decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
var logLines = expvar.NewMap("log_lines_total")

// decodeAndSend transforms the byte array `b` into unicode in `partial`, sending to the llp as each newline is decoded.
func decodeAndSend(ctx context.Context, lines chan<- *logline.LogLine, pathname string, n int, b []byte, partial *bytes.Buffer) int {
func (s *streamBase) decodeAndSend(ctx context.Context, n int, b []byte, partial *bytes.Buffer) int {
var (
r rune
width int
Expand Down Expand Up @@ -50,7 +50,7 @@ func decodeAndSend(ctx context.Context, lines chan<- *logline.LogLine, pathname
case r == '\r':
// nom
case r == '\n':
sendLine(ctx, pathname, partial, lines)
s.sendLine(ctx, partial)
default:
partial.WriteRune(r)
}
Expand All @@ -60,9 +60,9 @@ func decodeAndSend(ctx context.Context, lines chan<- *logline.LogLine, pathname
return count
}

func sendLine(ctx context.Context, pathname string, partial *bytes.Buffer, lines chan<- *logline.LogLine) {
func (s *streamBase) sendLine(ctx context.Context, partial *bytes.Buffer) {
glog.V(2).Infof("sendline")
logLines.Add(pathname, 1)
lines <- logline.New(ctx, pathname, partial.String())
logLines.Add(s.sourcename, 1)
s.lines <- logline.New(ctx, s.sourcename, partial.String())
partial.Reset()
}
56 changes: 26 additions & 30 deletions internal/tailer/logstream/dgramstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package logstream
import (
"bytes"
"context"
"fmt"
"net"
"sync"
"time"
Expand All @@ -16,25 +17,28 @@ import (
)

type dgramStream struct {
cancel context.CancelFunc
streamBase

lines chan *logline.LogLine
cancel context.CancelFunc

scheme string // Datagram scheme, either "unixgram" or "udp".
address string // Given name for the underlying socket path on the filesystem or hostport.

mu sync.RWMutex // protects following fields
lastReadTime time.Time // Last time a log line was read from this named pipe

staleTimer *time.Timer // Expire the stream if no read in 24h
}

func newDgramStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, scheme, address string, oneShot OneShotMode) (LogStream, error) {
if address == "" {
return nil, ErrEmptySocketAddress
}
ctx, cancel := context.WithCancel(ctx)
ss := &dgramStream{cancel: cancel, scheme: scheme, address: address, lastReadTime: time.Now(), lines: make(chan *logline.LogLine)}
ss := &dgramStream{
cancel: cancel,
scheme: scheme,
address: address,
streamBase: streamBase{
sourcename: fmt.Sprintf("%s://%s", scheme, address),
lines: make(chan *logline.LogLine),
},
}
if err := ss.stream(ctx, wg, waker, oneShot); err != nil {
return nil, err
}
Expand All @@ -50,16 +54,16 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak
logErrors.Add(ds.address, 1)
return err
}
glog.V(2).Infof("stream(%s:%s): opened new datagram socket %v", ds.scheme, ds.address, c)
glog.V(2).Infof("stream(%s): opened new datagram socket %v", ds.sourcename, c)
b := make([]byte, datagramReadBufferSize)
partial := bytes.NewBufferString("")
var total int
wg.Add(1)
go func() {
defer wg.Done()
defer func() {
glog.V(2).Infof("stream(%s:%s): read total %d bytes", ds.scheme, ds.address, total)
glog.V(2).Infof("stream(%s:%s): closing connection", ds.scheme, ds.address)
glog.V(2).Infof("stream(%s): read total %d bytes", ds.sourcename, total)
glog.V(2).Infof("stream(%s): closing connection", ds.sourcename)
err := c.Close()
if err != nil {
logErrors.Add(ds.address, 1)
Expand All @@ -75,7 +79,7 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak

for {
n, _, err := c.ReadFrom(b)
glog.V(2).Infof("stream(%s:%s): read %d bytes, err is %v", ds.scheme, ds.address, n, err)
glog.V(2).Infof("stream(%s): read %d bytes, err is %v", ds.sourcename, n, err)

if ds.staleTimer != nil {
ds.staleTimer.Stop()
Expand All @@ -86,17 +90,17 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak
// equivalent to an "EOF" in connection and file oriented streams.
if n == 0 {
if oneShot {
glog.V(2).Infof("stream(%s:%s): exiting because zero byte read and one shot", ds.scheme, ds.address)
glog.V(2).Infof("stream(%s): exiting because zero byte read and one shot", ds.sourcename)
if partial.Len() > 0 {
sendLine(ctx, ds.address, partial, ds.lines)
ds.sendLine(ctx, partial)
}
return
}
select {
case <-ctx.Done():
glog.V(2).Infof("stream(%s:%s): exiting because zero byte read after cancellation", ds.scheme, ds.address)
glog.V(2).Infof("stream(%s): exiting because zero byte read after cancellation", ds.sourcename)
if partial.Len() > 0 {
sendLine(ctx, ds.address, partial, ds.lines)
ds.sendLine(ctx, partial)
}
return
default:
Expand All @@ -106,10 +110,7 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak
if n > 0 {
total += n
//nolint:contextcheck
decodeAndSend(ctx, ds.lines, ds.address, n, b[:n], partial)
ds.mu.Lock()
ds.lastReadTime = time.Now()
ds.mu.Unlock()
ds.decodeAndSend(ctx, n, b[:n], partial)
ds.staleTimer = time.AfterFunc(time.Hour*24, ds.cancel)

// No error implies more to read, so restart the loop.
Expand All @@ -118,16 +119,16 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak
}
}

if err != nil && IsEndOrCancel(err) {
if IsExitableError(err) {
if partial.Len() > 0 {
sendLine(ctx, ds.address, partial, ds.lines)
ds.sendLine(ctx, partial)
}
glog.V(2).Infof("stream(%s:%s): exiting, stream has error %s", ds.scheme, ds.address, err)
glog.V(2).Infof("stream(%s): exiting, stream has error %s", ds.sourcename, err)
return
}

// Yield and wait
glog.V(2).Infof("stream(%s:%s): waiting", ds.scheme, ds.address)
glog.V(2).Infof("stream(%s): waiting", ds.sourcename)
select {
case <-ctx.Done():
// Exit after next read attempt.
Expand All @@ -139,14 +140,9 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak
glog.V(2).Infof("stream(%s): context cancelled, exiting after next zero byte read", ds.scheme, ds.address)
case <-waker.Wake():
// sleep until next Wake()
glog.V(2).Infof("stream(%s:%s): Wake received", ds.scheme, ds.address)
glog.V(2).Infof("stream(%s): Wake received", ds.sourcename)
}
}
}()
return nil
}

// Lines implements the LogStream interface, returning the output lines channel.
func (ds *dgramStream) Lines() <-chan *logline.LogLine {
return ds.lines
}
4 changes: 2 additions & 2 deletions internal/tailer/logstream/dgramstream_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestDgramStreamReadCompletedBecauseSocketClosed(t *testing.T) {
testutil.FatalIfErr(t, err)

expected := []*logline.LogLine{
{Context: context.TODO(), Filename: addr, Line: "1"},
{Context: context.TODO(), Filename: sockName, Line: "1"},
}
checkLineDiff := testutil.ExpectLinesReceivedNoDiff(t, expected, ds.Lines())

Expand Down Expand Up @@ -99,7 +99,7 @@ func TestDgramStreamReadCompletedBecauseCancel(t *testing.T) {
testutil.FatalIfErr(t, err)

expected := []*logline.LogLine{
{Context: context.TODO(), Filename: addr, Line: "1"},
{Context: context.TODO(), Filename: sockName, Line: "1"},
}
checkLineDiff := testutil.ExpectLinesReceivedNoDiff(t, expected, ds.Lines())

Expand Down
Loading
Loading