Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Large PR that introduces a v2 format alongside changes to support multiple serialization types. #8

Merged
merged 25 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ on: [pull_request]
jobs:
build:
runs-on: github-hosted-ubuntu-x64-large

steps:
- uses: actions/checkout@v4
- name: Setup Go
Expand Down
106 changes: 106 additions & 0 deletions e2e_benchmark_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package walqueue

import (
"context"
"fmt"
"github.com/go-kit/log"
"github.com/golang/snappy"
prom "github.com/grafana/walqueue/implementations/prometheus"
"github.com/grafana/walqueue/types"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"io"
"math/rand"
"net/http"
"net/http/httptest"
"os"
"sync"
"testing"
"time"
)

func TestV2E2E(b *testing.T) {
// BenchmarkV2E2E-20 2504 451484 ns/op
dir := b.TempDir()
totalSeries := atomic.NewInt32(0)
mut := sync.Mutex{}
set := make(map[float64]struct{})
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
mut.Lock()
defer mut.Unlock()
defer r.Body.Close()
data, err := io.ReadAll(r.Body)
require.NoError(b, err)
data, err = snappy.Decode(nil, data)
require.NoError(b, err)

var req prompb.WriteRequest
err = req.Unmarshal(data)
require.NoError(b, err)

for _, x := range req.GetTimeseries() {
totalSeries.Add(int32(len(x.Samples)))
for _, sample := range x.Samples {
_, found := set[sample.Value]
if found {
panic("found duplicate sample")
}
set[sample.Value] = struct{}{}
}
}
}))
cc := types.ConnectionConfig{
URL: srv.URL,
BatchCount: 10,
FlushInterval: 1 * time.Second,
Connections: 3,
Timeout: 10 * time.Second,
}
q, err := prom.NewQueue("test", cc, dir, 10000, 1*time.Second, 1*time.Hour, prometheus.NewRegistry(), "alloy", log.NewLogfmtLogger(os.Stderr))

require.NoError(b, err)
go q.Start()
defer q.Stop()

metricCount := 100
sends := 2
metrics := make([]*types.Metric, 0)
for k := 0; k < metricCount; k++ {
lblsMap := make(map[string]string)
for j := 0; j < 10; j++ {
key := fmt.Sprintf("key_%d", j)
v := randString()
lblsMap[key] = v
}
m := &types.Metric{}
m.Labels = labels.FromMap(lblsMap)
m.TS = time.Now().UnixMilli()
metrics = append(metrics, m)
}
index := 1
for n := 0; n < sends; n++ {
app := q.Appender(context.Background())
for _, m := range metrics {
app.Append(0, m.Labels, m.TS, float64(index))
index++
}
app.Commit()
}
require.Eventually(b, func() bool {
return totalSeries.Load() == int32(metricCount*sends)
}, 50*time.Second, 50*time.Millisecond)
require.Truef(b, types.OutstandingIndividualMetrics.Load() == 0, "outstanding indicidual metrics are %d", types.OutstandingIndividualMetrics.Load())
}

var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

func randString() string {
b := make([]rune, rand.Intn(20))
for i := range b {
b[i] = letterRunes[rand.Intn(len(letterRunes))]
}
return string(b)
}
6 changes: 4 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ go 1.23
toolchain go1.23.1

require (
github.com/dghubble/trie v0.1.0
github.com/dolthub/swiss v0.2.1
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3
github.com/go-kit/log v0.2.1
github.com/golang/protobuf v1.5.4
github.com/gogo/protobuf v1.3.2
github.com/golang/snappy v0.0.4
github.com/prometheus/client_golang v1.20.5
github.com/prometheus/prometheus v0.55.1
Expand All @@ -24,9 +26,9 @@ require (
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dennwc/varint v1.0.0 // indirect
github.com/dolthub/maphash v0.1.0 // indirect
github.com/gammazero/deque v0.2.1 // indirect
github.com/go-logfmt/logfmt v0.6.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc // indirect
github.com/klauspost/compress v1.17.11 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
Expand Down
10 changes: 6 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dennwc/varint v1.0.0 h1:kGNFFSSw8ToIy3obO/kKr8U9GZYUAxQEVuix4zfDWzE=
github.com/dennwc/varint v1.0.0/go.mod h1:hnItb35rvZvJrbTALZtY/iQfDs48JKRG1RPpgziApxA=
github.com/dghubble/trie v0.1.0 h1:kJnjBLFFElBwS60N4tkPvnLhnpcDxbBjIulgI8CpNGM=
github.com/dghubble/trie v0.1.0/go.mod h1:sOmnzfBNH7H92ow2292dDFWNsVQuh/izuD7otCYb1ak=
github.com/dolthub/maphash v0.1.0 h1:bsQ7JsF4FkkWyrP3oCnFJgrCUAFbFf3kOl4L/QxPDyQ=
github.com/dolthub/maphash v0.1.0/go.mod h1:gkg4Ch4CdCDu5h6PMriVLawB7koZ+5ijb9puGMV50a4=
github.com/dolthub/swiss v0.2.1 h1:gs2osYs5SJkAaH5/ggVJqXQxRXtWshF6uE0lgR/Y3Gw=
github.com/dolthub/swiss v0.2.1/go.mod h1:8AhKZZ1HK7g18j7v7k6c5cYIGEZJcPn0ARsai8cUrh0=
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws=
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0=
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
Expand All @@ -46,8 +52,6 @@ github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17w
github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
Expand All @@ -58,8 +62,6 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/enterprise-certificate-proxy v0.3.4 h1:XYIDZApgAnrN1c855gTgghdIA6Stxb52D5RnLI1SLyw=
github.com/googleapis/enterprise-certificate-proxy v0.3.4/go.mod h1:YKe7cfqYXjKGpGvmSg28/fFvhNzinZQm8DGnaburhGA=
github.com/grafana/go-actor v0.0.0-20241115013232-9acfa54eede7 h1:8zr00qi9K0nZryw/03jW/Og9DAzAxWcBBXs6lGxsmJE=
github.com/grafana/go-actor v0.0.0-20241115013232-9acfa54eede7/go.mod h1:qcIpcfAXGBaWKyBtzyrhEhPjlKpw4nK83rbyZONymcg=
github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc h1:GN2Lv3MGO7AS6PrRoT6yV5+wkrOpcszoIsO4+4ds248=
github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc/go.mod h1:+JKpmjMGhpgPL+rXZ5nsZieVzvarn86asRlBg4uNGnk=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
Expand Down
50 changes: 23 additions & 27 deletions implementations/prometheus/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package prometheus

import (
"context"
v1 "github.com/grafana/walqueue/types/v1"
v2 "github.com/grafana/walqueue/types/v2"
"github.com/prometheus/client_golang/prometheus"
"strconv"
"sync"
"time"

snappy "github.com/eapache/go-xerial-snappy"
Expand All @@ -18,6 +20,7 @@ import (
"github.com/vladopajic/go-actor/actor"
)

var pool = sync.Pool{New: func() interface{} { return make([]byte, 0) }}
var _ storage.Appendable = (*queue)(nil)
var _ Queue = (*queue)(nil)

Expand Down Expand Up @@ -45,7 +48,6 @@ type queue struct {
incoming actor.Mailbox[types.DataHandle]
stats *PrometheusStats
metaStats *PrometheusStats
buf []byte
}

// NewQueue creates and returns a new Queue instance, initializing its components
Expand Down Expand Up @@ -99,7 +101,7 @@ func NewQueue(name string, cc types.ConnectionConfig, directory string, maxSigna
serial, err := serialization.NewSerializer(types.SerializerConfig{
MaxSignalsInBatch: maxSignalsToBatch,
FlushFrequency: flushInterval,
}, q.queue, stats.UpdateSerializer, logger)
}, q.queue, stats.UpdateSerializer, types.AlloyFileVersionV2, logger)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -152,7 +154,10 @@ func (q *queue) Appender(ctx context.Context) storage.Appender {

func (q *queue) deserializeAndSend(ctx context.Context, meta map[string]string, buf []byte) {
var err error
q.buf, err = snappy.DecodeInto(q.buf, buf)
uncompressedBuf := pool.Get().([]byte)
defer pool.Put(uncompressedBuf)

uncompressedBuf, err = snappy.DecodeInto(uncompressedBuf, buf)
if err != nil {
level.Debug(q.logger).Log("msg", "error snappy decoding", "err", err)
return
Expand All @@ -165,40 +170,31 @@ func (q *queue) deserializeAndSend(ctx context.Context, meta map[string]string,
level.Error(q.logger).Log("msg", "version not found for deserialization")
return
}
if version != types.AlloyFileVersion {
var metrics *types.Metrics
var metadata *types.Metrics
switch types.FileFormat(version) {
case types.AlloyFileVersionV2:
s := v2.GetSerializer()
metrics, metadata, err = s.Deserialize(uncompressedBuf)
case types.AlloyFileVersionV1:
s := v1.GetSerializer()
metrics, metadata, err = s.Deserialize(uncompressedBuf)
default:
level.Error(q.logger).Log("msg", "invalid version found for deserialization", "version", version)
return
}
// Grab the amounts of each type and we can go ahead and alloc the space.
seriesCount, _ := strconv.Atoi(meta["series_count"])
metaCount, _ := strconv.Atoi(meta["meta_count"])
stringsCount, _ := strconv.Atoi(meta["strings_count"])
sg := &types.SeriesGroup{
Series: make([]*types.TimeSeriesBinary, seriesCount),
Metadata: make([]*types.TimeSeriesBinary, metaCount),
Strings: make([]types.ByteString, stringsCount),
}
// Prefill our series with items from the pool to limit allocs.
for i := 0; i < seriesCount; i++ {
sg.Series[i] = types.GetTimeSeriesFromPool()
}
for i := 0; i < metaCount; i++ {
sg.Metadata[i] = types.GetTimeSeriesFromPool()
}
sg, q.buf, err = types.DeserializeToSeriesGroup(sg, q.buf)
if err != nil {
level.Debug(q.logger).Log("msg", "error deserializing", "err", err)
return
level.Error(q.logger).Log("msg", "error deserializing", "err", err, "format", version)
}

for _, series := range sg.Series {
for _, series := range metrics.M {
// Check that the TTL.
seriesAge := time.Since(time.UnixMilli(series.TS))
// For any series that exceeds the time to live (ttl) based on its timestamp we do not want to push it to the networking layer
// but instead drop it here by continuing.
if seriesAge > q.ttl {
// Since we arent pushing the TS forward we should put it back into the pool.
types.PutTimeSeriesIntoPool(series)
types.PutMetricIntoPool(series)
q.stats.NetworkTTLDrops.Inc()
continue
}
Expand All @@ -208,7 +204,7 @@ func (q *queue) deserializeAndSend(ctx context.Context, meta map[string]string,
}
}

for _, md := range sg.Metadata {
for _, md := range metadata.M {
sendErr := q.network.SendMetadata(ctx, md)
if sendErr != nil {
level.Error(q.logger).Log("msg", "error sending metadata to write client", "err", sendErr)
Expand Down
2 changes: 1 addition & 1 deletion implementations/prometheus/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func TestQueue_Appender(t *testing.T) {
require.Eventually(t, func() bool {
return recordsFound.Load() == tt.metricCount
}, 10*time.Second, 100*time.Millisecond)
require.True(t, types.OutStandingTimeSeriesBinary.Load() == 0)
//require.True(t, v2.OutStandingTimeSeriesBinary.Load() == 0)
})
}
}
Expand Down
Loading
Loading