Skip to content

Commit

Permalink
api: Create a new Go module for API; with remote write client and han…
Browse files Browse the repository at this point in the history
…dler.

Lot's of assumptions here:
* We want separate module (we don't have processes on how to version it and release).
* We want to expose generated protos, without gogo.

Signed-off-by: bwplotka <[email protected]>
  • Loading branch information
bwplotka committed Oct 18, 2024
1 parent e1675ce commit 07d1ec9
Show file tree
Hide file tree
Showing 23 changed files with 8,996 additions and 6 deletions.
16 changes: 11 additions & 5 deletions .bingo/Variables.mk
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Auto generated binary variables helper managed by https://github.com/bwplotka/bingo v0.8. DO NOT EDIT.
# Auto generated binary variables helper managed by https://github.com/bwplotka/bingo v0.9. DO NOT EDIT.
# All tools are designed to be build inside $GOBIN.
BINGO_DIR := $(dir $(lastword $(MAKEFILE_LIST)))
GOPATH ?= $(shell go env GOPATH)
Expand All @@ -7,16 +7,22 @@ GO ?= $(shell which go)

# Below generated variables ensure that every time a tool under each variable is invoked, the correct version
# will be used; reinstalling only if needed.
# For example for goimports variable:
# For example for buf variable:
#
# In your main Makefile (for non array binaries):
#
#include .bingo/Variables.mk # Assuming -dir was set to .bingo .
#
#command: $(GOIMPORTS)
# @echo "Running goimports"
# @$(GOIMPORTS) <flags/args..>
#command: $(BUF)
# @echo "Running buf"
# @$(BUF) <flags/args..>
#
BUF := $(GOBIN)/buf-v1.39.0
$(BUF): $(BINGO_DIR)/buf.mod
@# Install binary/ries using Go 1.14+ build command. This is using bwplotka/bingo-controlled, separate go module with pinned dependencies.
@echo "(re)installing $(GOBIN)/buf-v1.39.0"
@cd $(BINGO_DIR) && GOWORK=off $(GO) build -mod=mod -modfile=buf.mod -o=$(GOBIN)/buf-v1.39.0 "github.com/bufbuild/buf/cmd/buf"

GOIMPORTS := $(GOBIN)/goimports-v0.9.3
$(GOIMPORTS): $(BINGO_DIR)/goimports.mod
@# Install binary/ries using Go 1.14+ build command. This is using bwplotka/bingo-controlled, separate go module with pinned dependencies.
Expand Down
5 changes: 5 additions & 0 deletions .bingo/buf.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
module _ // Auto generated by https://github.com/bwplotka/bingo. DO NOT EDIT

go 1.22.6

require github.com/bufbuild/buf v1.39.0 // cmd/buf
336 changes: 336 additions & 0 deletions .bingo/buf.sum

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion .bingo/variables.env
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Auto generated binary variables helper managed by https://github.com/bwplotka/bingo v0.8. DO NOT EDIT.
# Auto generated binary variables helper managed by https://github.com/bwplotka/bingo v0.9. DO NOT EDIT.
# All tools are designed to be build inside $GOBIN.
# Those variables will work only until 'bingo get' was invoked, or if tools were installed via Makefile's Variables.mk.
GOBIN=${GOBIN:=$(go env GOBIN)}
Expand All @@ -8,5 +8,7 @@ if [ -z "$GOBIN" ]; then
fi


BUF="${GOBIN}/buf-v1.39.0"

GOIMPORTS="${GOBIN}/goimports-v0.9.3"

7 changes: 7 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,10 @@ generate-go-collector-test-files:
.PHONY: fmt
fmt: common-format
$(GOIMPORTS) -local github.com/prometheus/client_golang -w .

RWMODULE = api/remotewrite

.PHONY: proto
proto: ## Regenerate Go from proto.
proto: $(BUF)
@$(MAKE) -C api/remotewrite proto BUF=$(BUF)
11 changes: 11 additions & 0 deletions api/remotewrite/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@

.PHONY: proto
proto: ## Regenerate Go from proto.
proto: $(BUF)
@echo ">> regenerating Prometheus proto"
@$(BUF) generate
# TODO(bwplotka): Is there a way to configure buf for this?
@find genproto/ -type f -exec sed -i '' 's/package prompb/package writev1/g' {} \;
# For some reasons buf generates this unused import, kill it manually for now and reformat.
@find genproto/ -type f -exec sed -i '' 's/_ "github.com\/gogo\/protobuf\/gogoproto"//g' {} \;
@go fmt ./genproto/...
26 changes: 26 additions & 0 deletions api/remotewrite/buf.gen.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# buf.gen.yaml
version: v2

plugins:
- remote: buf.build/protocolbuffers/go:v1.31.0
out: .
opt:
- Mio/prometheus/write/v2/types.proto=genproto/v2
- Mtypes.proto=genproto/v1
- Mremote.proto=genproto/v1

# vtproto for efficiency utilities like pooling etc.
# https://buf.build/community/planetscale-vtprotobuf?version=v0.6.0
- remote: buf.build/community/planetscale-vtprotobuf:v0.6.0
out: .
opt:
- Mio/prometheus/write/v2/types.proto=genproto/v2
- Mtypes.proto=genproto/v1
- Mremote.proto=genproto/v1
- features=marshal+unmarshal+size+clone

inputs:
- module: buf.build/prometheus/prometheus:5b212ab78fb7460e831cf7ff2d83e385
types:
- "io.prometheus.write.v2.Request"
- "prometheus.WriteRequest"
266 changes: 266 additions & 0 deletions api/remotewrite/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
// Copyright 2024 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package remotewrite

import (
"bytes"
"context"
"errors"
"fmt"
"io"
"log/slog"
"net/http"
"strconv"
"time"

"github.com/efficientgo/core/backoff"
"github.com/klauspost/compress/snappy"
writev1 "github.com/prometheus/client_golang/api/remotewrite/genproto/v1"
writev2 "github.com/prometheus/client_golang/api/remotewrite/genproto/v2"
)

const (
defaultBackoff = 0
maxErrMsgLen = 1024
)

type Client struct {
logger *slog.Logger
url string
client *http.Client

userAgent string
retryOnRateLimit bool

compr Compression
comprBuf []byte

b *backoff.Backoff
}

type EncodingClient struct {
client *Client

buf []byte
}

func NewEncodingClient(client *Client) *EncodingClient {
return &EncodingClient{client: client}
}

func (c *EncodingClient) WriteV1(ctx context.Context, req *writev1.WriteRequest, opts *ClientWriteOpts) (WriteResponseStats, error) {
size := req.SizeVT()
if len(c.buf) < size {
c.buf = make([]byte, size)
}
if _, err := req.MarshalToSizedBufferVT(c.buf[:size]); err != nil {
return WriteResponseStats{}, fmt.Errorf("encoding v1 request %w", err)
}
return c.client.Write(ctx, ProtoMsgV1, c.buf[:size], opts)
}

func (c *EncodingClient) WriteV2(ctx context.Context, req *writev2.Request, opts *ClientWriteOpts) (WriteResponseStats, error) {
size := req.SizeVT()
if len(c.buf) < size {
c.buf = make([]byte, size)
}
if _, err := req.MarshalToSizedBufferVT(c.buf[:size]); err != nil {
return WriteResponseStats{}, fmt.Errorf("encoding v2 request %w", err)
}
stats, err := c.client.Write(ctx, ProtoMsgV2, c.buf[:size], opts)
if err != nil {
return stats, err
}

// Check the case mentioned in PRW 2.0.
// https://prometheus.io/docs/specs/remote_write_spec_2_0/#required-written-response-headers.
if !stats.Confirmed && stats.NoDataWritten() {
cStats := WriteResponseStats{}.AddV2(req)
if !cStats.NoDataWritten() {
return stats, fmt.Errorf("sent v2 request with %v samples %v histograms %v exemplars; "+
"got 2xx, but PRW 2.0 response header statistics indicate %v samples, %v histograms "+
"and %v exemplars were accepted; assumining failure e.g. the target only supports "+
"PRW 1.0 prometheus.WriteRequest, but does not check the Content-Type header correctly",
cStats.Samples, cStats.Histograms, cStats.Exemplars,
stats.Samples, stats.Histograms, stats.Exemplars,
)
}
}
return stats, nil
}

// NewClient returns client.
// TODO(bwplotka): Add variadic options.
func NewClient(logger *slog.Logger, url string, hc *http.Client, compr Compression, ua string, retryOnRateLimit bool) *Client {
if hc == nil {
hc = &http.Client{Timeout: 1 * time.Minute}
}
return &Client{
logger: logger,
url: url,
client: hc,
compr: compr,
userAgent: ua,
retryOnRateLimit: retryOnRateLimit,
}
}

type RetryableError struct {
error
retryAfter time.Duration
}

func (r RetryableError) RetryAfter() time.Duration {
return r.retryAfter
}

type ClientWriteOpts struct {
Backoff backoff.Config
}

var defaultOpts = &ClientWriteOpts{
Backoff: backoff.Config{
Min: 1 * time.Second,
Max: 10 * time.Second,
MaxRetries: 10,
},
}

// TODO(bwplotka): Support variadic options allowing too old sample handling, tracing, metrics
func (c *Client) Write(ctx context.Context, proto ProtoMsg, serializedRequest []byte, opts *ClientWriteOpts) (WriteResponseStats, error) {
o := *defaultOpts
if opts != nil {
o = *opts
}
payload, err := compressPayload(&c.comprBuf, c.compr, serializedRequest)
if err != nil {
return WriteResponseStats{}, fmt.Errorf("compressing %w", err)
}

// Since we retry writes we need to track the total amount of accepted data
// across the various attempts.
accumulatedStats := WriteResponseStats{}

b := backoff.New(ctx, o.Backoff)
for {
rs, err := c.write(ctx, proto, payload, b.NumRetries())
accumulatedStats = accumulatedStats.Add(rs)
if err == nil {
// Success!
// TODO(bwplotka): Debug log with retry summary?
return accumulatedStats, nil
}

var retryableErr RetryableError
if !errors.As(err, &retryableErr) {
// TODO(bwplotka): More context in the error e.g. about retries.
return accumulatedStats, err
}

if !b.Ongoing() {
// TODO(bwplotka): More context in the error e.g. about retries.
return accumulatedStats, err
}

backoffDelay := b.NextDelay() + retryableErr.RetryAfter()
c.logger.Error("failed to send remote write request; retrying after backoff", "err", err, "backoff", backoffDelay)
select {
case <-ctx.Done():
// TODO(bwplotka): More context in the error e.g. about retries.
return WriteResponseStats{}, ctx.Err()
case <-time.After(backoffDelay):
// Retry.
}
}
}

func compressPayload(tmpbuf *[]byte, enc Compression, inp []byte) (compressed []byte, _ error) {
switch enc {
case SnappyBlockCompression:
compressed = snappy.Encode(*tmpbuf, inp)
if n := snappy.MaxEncodedLen(len(inp)); n > len(*tmpbuf) {
// grow the buffer for the next time.
*tmpbuf = make([]byte, n)
}
return compressed, nil
default:
return compressed, fmt.Errorf("Unknown compression scheme [%v]", enc)
}
}

func (c *Client) write(ctx context.Context, proto ProtoMsg, payload []byte, attempt int) (WriteResponseStats, error) {
httpReq, err := http.NewRequest(http.MethodPost, c.url, bytes.NewReader(payload))
if err != nil {
// Errors from NewRequest are from unparsable URLs, so are not
// recoverable.
return WriteResponseStats{}, err
}

httpReq.Header.Add("Content-Encoding", string(c.compr))
httpReq.Header.Set("Content-Type", ContentTypeHeader(proto))
httpReq.Header.Set("User-Agent", c.userAgent)
if proto == ProtoMsgV1 {
// Compatibility mode for 1.0.
httpReq.Header.Set(VersionHeader, Version1HeaderValue)
} else {
httpReq.Header.Set(VersionHeader, Version20HeaderValue)
}

if attempt > 0 {
httpReq.Header.Set("Retry-Attempt", strconv.Itoa(attempt))
}

httpResp, err := c.client.Do(httpReq.WithContext(ctx))
if err != nil {
// Errors from Client.Do are likely network errors, so recoverable.
return WriteResponseStats{}, RetryableError{err, defaultBackoff}
}
defer func() {
_, _ = io.Copy(io.Discard, httpResp.Body)
_ = httpResp.Body.Close()
}()

rs, err := parseWriteResponseStats(httpResp)
if err != nil {
c.logger.Warn("parsing rw write statistics failed; partial or no stats", "err", err)
}

if httpResp.StatusCode/100 == 2 {
return rs, nil
}

body, err := io.ReadAll(io.LimitReader(httpResp.Body, maxErrMsgLen))
err = fmt.Errorf("server returned HTTP status %s: %s", httpResp.Status, body)

if httpResp.StatusCode/100 == 5 ||
(c.retryOnRateLimit && httpResp.StatusCode == http.StatusTooManyRequests) {
return rs, RetryableError{err, retryAfterDuration(httpResp.Header.Get("Retry-After"))}
}
return rs, err
}

// retryAfterDuration returns the duration for the Retry-After header. In case of any errors, it
// returns 0 as if the header was never supplied.
func retryAfterDuration(t string) time.Duration {
parsedDuration, err := time.Parse(http.TimeFormat, t)
if err == nil {
return time.Until(parsedDuration)
}
// The duration can be in seconds.
d, err := strconv.Atoi(t)
if err != nil {
return 0
}
return time.Duration(d) * time.Second
}
Loading

0 comments on commit 07d1ec9

Please sign in to comment.