Skip to content

Commit

Permalink
Make Store.Flush() guarantee immediate flush on sinks with that suppo…
Browse files Browse the repository at this point in the history
…rt (#40)

* Make Store.Flush() guarantee immediate flush on sinks with that support

Signed-off-by: James Sedgwick <[email protected]>

* blockflush

Signed-off-by: James Sedgwick <[email protected]>

* jsedgwick_is_dumb

Signed-off-by: James Sedgwick <[email protected]>

* flush all bufs

Signed-off-by: James Sedgwick <[email protected]>
  • Loading branch information
James Sedgwick authored Aug 7, 2018
1 parent 1d8f034 commit f60c0ca
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 17 deletions.
6 changes: 5 additions & 1 deletion logging_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import logger "github.com/sirupsen/logrus"
type loggingSink struct{}

// NewLoggingSink returns a Sink that flushes stats to os.StdErr.
func NewLoggingSink() Sink {
func NewLoggingSink() FlushableSink {
return &loggingSink{}
}

Expand All @@ -20,3 +20,7 @@ func (s *loggingSink) FlushGauge(name string, value uint64) {
func (s *loggingSink) FlushTimer(name string, value float64) {
logger.Debugf("[gostats] flushing time %s: %f", name, value)
}

func (s *loggingSink) Flush() {
logger.Debugf("[gostats] Flush() called, all stats would be flushed")
}
8 changes: 8 additions & 0 deletions sink.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
package stats

// A Sink is used by a Store to flush its data.
// These functions may buffer the given data.
type Sink interface {
FlushCounter(name string, value uint64)
FlushGauge(name string, value uint64)
FlushTimer(name string, value float64)
}

// FlushableSink is an extension of Sink that provides a Flush() function that
// will flush any buffered stats to the underlying store.
type FlushableSink interface {
Sink
Flush()
}
5 changes: 5 additions & 0 deletions stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,11 @@ func (s *statStore) Flush() {
s.sink.FlushGauge(name, value)
}
s.gaugesMtx.RUnlock()

flushableSink, ok := s.sink.(FlushableSink)
if ok {
flushableSink.Flush()
}
}

func (s *statStore) Start(ticker *time.Ticker) {
Expand Down
66 changes: 50 additions & 16 deletions tcp_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,23 @@ const (
chanSize = approxMaxMemBytes / defaultBufferSize
)

// NewTCPStatsdSink returns a Sink that is backed by a buffered writer and a separate goroutine
// that flushes those buffers to a statsd connection.
func NewTCPStatsdSink() Sink {
// NewTCPStatsdSink returns a FlushableSink that is backed by a buffered writer
// and a separate goroutine that flushes those buffers to a statsd connection.
func NewTCPStatsdSink() FlushableSink {
outc := make(chan *bytes.Buffer, chanSize) // TODO(btc): parameterize
writer := sinkWriter{
outc: outc,
}
bufWriter := bufio.NewWriterSize(&writer, defaultBufferSize) // TODO(btc): parameterize size
pool := newBufferPool(defaultBufferSize)
mu := &sync.Mutex{}
flushCond := sync.NewCond(mu)
s := &tcpStatsdSink{
outc: outc,
bufWriter: bufWriter,
pool: pool,
mu: mu,
flushCond: flushCond,
}
writer.pool = s.pool
go s.run()
Expand All @@ -48,9 +52,11 @@ type tcpStatsdSink struct {
outc chan *bytes.Buffer
pool *bufferpool

mu sync.Mutex
droppedBytes uint64
bufWriter *bufio.Writer
mu *sync.Mutex
droppedBytes uint64
bufWriter *bufio.Writer
flushCond *sync.Cond
lastFlushTime time.Time
}

type sinkWriter struct {
Expand All @@ -70,7 +76,31 @@ func (w *sinkWriter) Write(p []byte) (int, error) {
}
}

func (s *tcpStatsdSink) flush(f string, args ...interface{}) {
func (s *tcpStatsdSink) Flush() {
now := time.Now()
if err := s.flush(); err != nil {
// Not much we can do here; we don't know how/why we failed.
return
}
s.mu.Lock()
for now.After(s.lastFlushTime) {
s.flushCond.Wait()
}
s.mu.Unlock()
}

func (s *tcpStatsdSink) flush() error {
s.mu.Lock()
defer s.mu.Unlock()
err := s.bufWriter.Flush()
if err != nil {
s.handleFlushError(err, s.bufWriter.Buffered())
return err
}
return nil
}

func (s *tcpStatsdSink) flushString(f string, args ...interface{}) {
s.mu.Lock()
_, err := fmt.Fprintf(s.bufWriter, f, args...)
if err != nil {
Expand All @@ -96,15 +126,15 @@ func (s *tcpStatsdSink) handleFlushError(err error, droppedBytes int) {
}

func (s *tcpStatsdSink) FlushCounter(name string, value uint64) {
s.flush("%s:%d|c\n", name, value)
s.flushString("%s:%d|c\n", name, value)
}

func (s *tcpStatsdSink) FlushGauge(name string, value uint64) {
s.flush("%s:%d|g\n", name, value)
s.flushString("%s:%d|g\n", name, value)
}

func (s *tcpStatsdSink) FlushTimer(name string, value float64) {
s.flush("%s:%f|ms\n", name, value)
s.flushString("%s:%f|ms\n", name, value)
}

func (s *tcpStatsdSink) run() {
Expand All @@ -125,20 +155,24 @@ func (s *tcpStatsdSink) run() {

select {
case <-t.C:
s.mu.Lock()
if err := s.bufWriter.Flush(); err != nil {
s.handleFlushError(err, s.bufWriter.Buffered())
}
s.mu.Unlock()
s.flush()
case buf, ok := <-s.outc: // Receive from the channel and check if the channel has been closed
if !ok {
logger.Warnf("Closing statsd client")
s.conn.Close()
return
}

lenbuf := len(buf.Bytes())
n, err := s.conn.Write(buf.Bytes())

if len(s.outc) == 0 {
// We've at least tried to write all the data we have. Wake up anyone waiting on flush.
s.mu.Lock()
s.lastFlushTime = time.Now()
s.mu.Unlock()
s.flushCond.Broadcast()
}

if err != nil || n < lenbuf {
s.mu.Lock()
if err != nil {
Expand Down

0 comments on commit f60c0ca

Please sign in to comment.