Skip to content

Commit

Permalink
Merge pull request #1 from koid/feature/use-buffered-client
Browse files Browse the repository at this point in the history
use Statsd BufferedClient
  • Loading branch information
koid authored Jun 25, 2018
2 parents b63f8c9 + 57bf13d commit f99d7a5
Showing 1 changed file with 13 additions and 12 deletions.
25 changes: 13 additions & 12 deletions dogstatsd/dogstatsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,41 +10,42 @@ import (
// Statsd is a statreceiver that writes stats to a statsd endpoint
type Statsd struct {
client *statsd.Client
tags []string
}

// New creates a new Statsd statreceiver with a new instance of a cactus statter
func New(addr string, tags []string) (*Statsd, error) {
sd, err := statsd.New(addr)
sd, err := statsd.NewBuffered(addr, 10000)
if err != nil {
return nil, err
}
sd.SkipErrors = true
sd.Tags = tags
return &Statsd{
client: sd,
tags: tags,
}, nil
}

// Checkpoint implementation that writes to statsd
func (s *Statsd) Checkpoint() {
_ = s.client.Incr("kinsumer.checkpoints", s.tags, 1.0)
_ = s.client.Incr("kinsumer.checkpoints", nil, 1.0)
}

// EventToClient implementation that writes to statsd metrics about a record
// that was consumed by the client
func (s *Statsd) EventToClient(inserted, retrieved time.Time) {
now := time.Now()

_ = s.client.Incr("kinsumer.consumed", s.tags, 1.0)
_ = s.client.Timing("kinsumer.in_stream", retrieved.Sub(inserted), s.tags, 1.0)
_ = s.client.Timing("kinsumer.end_to_end", now.Sub(inserted), s.tags, 1.0)
_ = s.client.Timing("kinsumer.in_kinsumer", now.Sub(retrieved), s.tags, 1.0)
_ = s.client.Incr("kinsumer.consumed", nil, 1.0)
_ = s.client.Timing("kinsumer.in_stream", retrieved.Sub(inserted), nil, 1.0)
_ = s.client.Timing("kinsumer.end_to_end", now.Sub(inserted), nil, 1.0)
_ = s.client.Timing("kinsumer.in_kinsumer", now.Sub(retrieved), nil, 1.0)
}

// EventsFromKinesis implementation that writes to statsd metrics about records that
// were retrieved from kinesis
func (s *Statsd) EventsFromKinesis(num int, shardID string, lag time.Duration) {
shardTag := fmt.Sprintf("shardId:%s", shardID)
_ = s.client.Timing("kinsumer.lag", lag, append(s.tags, shardTag), 1.0)
_ = s.client.Count("kinsumer.retrieved", int64(num), append(s.tags, shardTag), 1.0)
tags := []string{
fmt.Sprintf("shardId:%s", shardID),
}
_ = s.client.Timing("kinsumer.lag", lag, tags, 1.0)
_ = s.client.Count("kinsumer.retrieved", int64(num), tags, 1.0)
}

0 comments on commit f99d7a5

Please sign in to comment.