Skip to content

Commit

Permalink
Improve statsd sink performance (#32)
Browse files Browse the repository at this point in the history
* initial

Signed-off-by: James Sedgwick <[email protected]>

* fix

Signed-off-by: James Sedgwick <[email protected]>

* lint

Signed-off-by: James Sedgwick <[email protected]>

* use buffer pool

Signed-off-by: James Sedgwick <[email protected]>

* check

Signed-off-by: James Sedgwick <[email protected]>

* update travis config

* sync.Pool([]byte) -> (*bytes.Buffer), etc (#33)
  • Loading branch information
James Sedgwick authored Jul 12, 2018
1 parent 942830d commit bb587bd
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 58 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go:
- 1.7
- 1.8
- 1.9
- "1.10"

before_install: mkdir -p $GOPATH/bin
install: make install
Expand Down
185 changes: 127 additions & 58 deletions tcp_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,107 +2,176 @@ package stats

import (
"bufio"
"bytes"
"fmt"
"net"
"sync/atomic"
"sync"
"time"

logger "github.com/sirupsen/logrus"
)

// TODO(btc): add constructor that accepts functional options in order to allow
// users to choose the constants that work best for them. (Leave the existing
// c'tor for backwards compatibility)
// e.g. `func NewTCPStatsdSinkWithOptions(opts ...Option) Sink`

const (
logOnEveryNDropped = 1000
flushInterval = time.Second
logOnEveryNDroppedBytes = 1 << 15 // Log once per 32kb of dropped stats
defaultBufferSize = 1 << 16
approxMaxMemBytes = 1 << 22
chanSize = approxMaxMemBytes / defaultBufferSize
)

// NewTCPStatsdSink returns a Sink that is backed by a go channel with a limit of 1000 messages.
// NewTCPStatsdSink returns a Sink that is backed by a buffered writer and a separate goroutine
// that flushes those buffers to a statsd connection.
func NewTCPStatsdSink() Sink {
sink := &tcpStatsdSink{
outc: make(chan string, 1000),
outc := make(chan *bytes.Buffer, chanSize) // TODO(btc): parameterize
writer := sinkWriter{
outc: outc,
}
bufWriter := bufio.NewWriterSize(&writer, defaultBufferSize) // TODO(btc): parameterize size
pool := newBufferPool(defaultBufferSize)
s := &tcpStatsdSink{
outc: outc,
bufWriter: bufWriter,
pool: pool,
}
go sink.run()
return sink
writer.pool = s.pool
go s.run()
return s
}

type tcpStatsdSink struct {
conn net.Conn
outc chan string
droppedTimers uint64
droppedCounters uint64
droppedGauges uint64
conn net.Conn
outc chan *bytes.Buffer
pool *bufferpool

mu sync.Mutex
droppedBytes uint64
bufWriter *bufio.Writer
}

func (s *tcpStatsdSink) FlushCounter(name string, value uint64) {
type sinkWriter struct {
pool *bufferpool
outc chan<- *bytes.Buffer
}

func (w *sinkWriter) Write(p []byte) (int, error) {
n := len(p)
dest := w.pool.Get()
dest.Write(p)
select {
case s.outc <- fmt.Sprintf("%s:%d|c\n", name, value):
case w.outc <- dest:
return n, nil
default:
new := atomic.AddUint64(&s.droppedCounters, 1)
if new%logOnEveryNDropped == 0 {
logger.WithField("total_dropped_records", new).
WithField("counter", name).
Error("statsd channel full, discarding counter flush value")
}
return 0, fmt.Errorf("statsd channel full, dropping stats buffer with %d bytes", n)
}
}

func (s *tcpStatsdSink) flush(f string, args ...interface{}) {
s.mu.Lock()
_, err := fmt.Fprintf(s.bufWriter, f, args...)
if err != nil {
s.handleFlushError(err, s.bufWriter.Buffered())
}
s.mu.Unlock()
}

func (s *tcpStatsdSink) FlushGauge(name string, value uint64) {
select {
case s.outc <- fmt.Sprintf("%s:%d|g\n", name, value):
default:
new := atomic.AddUint64(&s.droppedGauges, 1)
if new%logOnEveryNDropped == 0 {
logger.WithField("total_dropped_records", new).
WithField("gauge", name).
Error("statsd channel full, discarding gauge flush value")
}
// s.mu should be held
func (s *tcpStatsdSink) handleFlushError(err error, droppedBytes int) {
d := uint64(droppedBytes)
if (s.droppedBytes+d)%logOnEveryNDroppedBytes > s.droppedBytes%logOnEveryNDroppedBytes {
logger.WithField("total_dropped_bytes", s.droppedBytes+d).
WithField("dropped_bytes", d).
Error(err)
}
s.droppedBytes += d

s.bufWriter.Reset(&sinkWriter{
pool: s.pool,
outc: s.outc,
})
}

func (s *tcpStatsdSink) FlushCounter(name string, value uint64) {
s.flush("%s:%d|c\n", name, value)
}

func (s *tcpStatsdSink) FlushGauge(name string, value uint64) {
s.flush("%s:%d|g\n", name, value)
}

func (s *tcpStatsdSink) FlushTimer(name string, value float64) {
select {
case s.outc <- fmt.Sprintf("%s:%f|ms\n", name, value):
default:
new := atomic.AddUint64(&s.droppedTimers, 1)
if new%logOnEveryNDropped == 0 {
logger.WithField("total_dropped_records", new).
WithField("timer", name).
Error("statsd channel full, discarding timer flush value")
}
}
s.flush("%s:%f|ms\n", name, value)
}

func (s *tcpStatsdSink) run() {
settings := GetSettings()
var writer *bufio.Writer
var err error
t := time.NewTimer(flushInterval)
defer t.Stop()
for {
if s.conn == nil {
s.conn, err = net.Dial("tcp", fmt.Sprintf("%s:%d", settings.StatsdHost,
conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", settings.StatsdHost,
settings.StatsdPort))
if err != nil {
logger.Warnf("statsd connection error: %s", err)
time.Sleep(3 * time.Second)
continue
}
writer = bufio.NewWriter(s.conn)
s.conn = conn
}

// Receive from the channel and check if the channel has been closed
metric, ok := <-s.outc
if !ok {
logger.Warnf("Closing statsd client")
s.conn.Close()
return
select {
case <-t.C:
s.mu.Lock()
if err := s.bufWriter.Flush(); err != nil {
s.handleFlushError(err, s.bufWriter.Buffered())
}
s.mu.Unlock()
case buf, ok := <-s.outc: // Receive from the channel and check if the channel has been closed
if !ok {
logger.Warnf("Closing statsd client")
s.conn.Close()
return
}

lenbuf := len(buf.Bytes())
n, err := s.conn.Write(buf.Bytes())
if err != nil || n < lenbuf {
s.mu.Lock()
if err != nil {
s.handleFlushError(err, lenbuf)
} else {
s.handleFlushError(fmt.Errorf("short write to statsd, resetting connection"), lenbuf-n)
}
s.mu.Unlock()
_ = s.conn.Close() // Ignore close failures
s.conn = nil
}
s.pool.Put(buf)
}
}
}

writer.WriteString(metric)
err = writer.Flush()
type bufferpool struct {
pool sync.Pool
}

if err != nil {
logger.Warnf("Writing to statsd failed: %s", err)
_ = s.conn.Close() // Ignore close failures
s.conn = nil
writer = nil
}
func newBufferPool(defaultSizeBytes int) *bufferpool {
p := new(bufferpool)
p.pool.New = func() interface{} {
return bytes.NewBuffer(make([]byte, 0, defaultSizeBytes))
}
return p
}

func (p *bufferpool) Put(b *bytes.Buffer) {
b.Reset()
p.pool.Put(b)
}

func (p *bufferpool) Get() *bytes.Buffer {
return p.pool.Get().(*bytes.Buffer)
}

0 comments on commit bb587bd

Please sign in to comment.