Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
koid committed Jun 21, 2018
0 parents commit 6ad4cfd
Show file tree
Hide file tree
Showing 9 changed files with 446 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
vendor
14 changes: 14 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
FROM golang:1.10.3-alpine AS builder
RUN apk add --update git
RUN go get -u github.com/golang/dep/cmd/dep
ENV GOPATH /go
ENV GOOS linux
ENV GOARCH amd64
WORKDIR /go/src/github.com/koid/kinforward
COPY . ./
RUN dep ensure
RUN go build -o kinforward

FROM alpine:latest
RUN apk add --update --no-cache ca-certificates && update-ca-certificates
COPY --from=builder /go/src/github.com/koid/kinforward/kinforward /bin/kinforward
96 changes: 96 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

46 changes: 46 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Gopkg.toml example
#
# Refer to https://golang.github.io/dep/docs/Gopkg.toml.html
# for detailed Gopkg.toml documentation.
#
# required = ["github.com/user/thing/cmd/thing"]
# ignored = ["github.com/user/project/pkgX", "bitbucket.org/user/project/pkgA/pkgY"]
#
# [[constraint]]
# name = "github.com/user/project"
# version = "1.0.0"
#
# [[constraint]]
# name = "github.com/user/project2"
# branch = "dev"
# source = "github.com/myfork/project2"
#
# [[override]]
# name = "github.com/x/y"
# version = "2.4.0"
#
# [prune]
# non-go = false
# go-tests = true
# unused-packages = true


[[constraint]]
name = "github.com/aws/aws-sdk-go"
version = "1.14.7"

[[constraint]]
name = "github.com/fluent/fluent-logger-golang"
version = "1.3.0"

[[constraint]]
name = "github.com/twinj/uuid"
version = "1.0.0"

[[constraint]]
branch = "master"
name = "github.com/twitchscience/kinsumer"

[prune]
go-tests = true
unused-packages = true
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# kinforward

An implementation of [kinsumer](https://github.com/twitchscience/kinsumer) for forwarding to fluentd.

29 changes: 29 additions & 0 deletions docker-compose.example.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
version: '3'
services:
kinforward:
build: .
command: kinforward
environment:
AWS_REGION: us-east-1
AWS_ACCESS_KEY_ID:
AWS_SECRET_ACCESS_KEY:
AWS_SESSION_TOKEN:
KINESIS_STREAM_NAME:
CHECKPOINT_TABLE_PREFIX:
TAG_KEY:
FLUENT_HOST: fluentd
FLUENT_PORT: 24224
DOGSTATSD_HOST_PORT: dogstatsd:8125
depends_on:
- fluentd
- dogstatsd
fluentd:
image: fluent/fluentd
environment:
FLUENTD_CONF: fluentd.example.conf
volumes:
- .:/fluentd/etc
dogstatsd:
image: datadog/dogstatsd:6.2.1
environment:
DD_API_KEY:
50 changes: 50 additions & 0 deletions dogstatsd/dogstatsd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package dogstatsd

import (
"fmt"
"time"

"github.com/DataDog/datadog-go/statsd"
)

// Statsd is a statreceiver that writes stats to a statsd endpoint
type Statsd struct {
client *statsd.Client
tags []string
}

// New creates a new Statsd statreceiver with a new instance of a cactus statter
func New(addr string, tags []string) (*Statsd, error) {
sd, err := statsd.New(addr)
if err != nil {
return nil, err
}
return &Statsd{
client: sd,
tags: tags,
}, nil
}

// Checkpoint implementation that writes to statsd
func (s *Statsd) Checkpoint() {
_ = s.client.Incr("kinsumer.checkpoints", s.tags, 1.0)
}

// EventToClient implementation that writes to statsd metrics about a record
// that was consumed by the client
func (s *Statsd) EventToClient(inserted, retrieved time.Time) {
now := time.Now()

_ = s.client.Incr("kinsumer.consumed", s.tags, 1.0)
_ = s.client.Timing("kinsumer.in_stream", retrieved.Sub(inserted), s.tags, 1.0)
_ = s.client.Timing("kinsumer.end_to_end", now.Sub(inserted), s.tags, 1.0)
_ = s.client.Timing("kinsumer.in_kinsumer", now.Sub(retrieved), s.tags, 1.0)
}

// EventsFromKinesis implementation that writes to statsd metrics about records that
// were retrieved from kinesis
func (s *Statsd) EventsFromKinesis(num int, shardID string, lag time.Duration) {
shardTag := fmt.Sprintf("shardId:%s", shardID)
_ = s.client.Timing("kinsumer.lag", lag, append(s.tags, shardTag), 1.0)
_ = s.client.Count("kinsumer.retrieved", int64(num), append(s.tags, shardTag), 1.0)
}
6 changes: 6 additions & 0 deletions fluentd.example.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
<source>
@type forward
</source>
<match **>
@type stdout
</match>
Loading

0 comments on commit 6ad4cfd

Please sign in to comment.