Skip to content

Commit

Permalink
feat(v2): dlq recovery (#3595)
Browse files Browse the repository at this point in the history
  • Loading branch information
korniltsev authored Oct 2, 2024
1 parent 363d453 commit 3cc5bd8
Show file tree
Hide file tree
Showing 32 changed files with 1,029 additions and 186 deletions.
3 changes: 3 additions & 0 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,6 @@ packages:
github.com/grafana/pyroscope/pkg/experiment/metastore/discovery:
interfaces:
Discovery:
github.com/grafana/pyroscope/pkg/experiment/metastore/dlq:
interfaces:
LocalServer:
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ $(BIN)/buf: Makefile

$(BIN)/golangci-lint: Makefile
@mkdir -p $(@D)
GOBIN=$(abspath $(@D)) $(GO) install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.59.1
GOBIN=$(abspath $(@D)) $(GO) install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.61.0

$(BIN)/protoc-gen-go: Makefile go.mod
@mkdir -p $(@D)
Expand Down
40 changes: 17 additions & 23 deletions pkg/experiment/ingester/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"context"
"crypto/rand"
"fmt"
"path"
"runtime/pprof"
"slices"
"strings"
Expand All @@ -22,19 +21,14 @@ import (
metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1"
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
"github.com/grafana/pyroscope/pkg/experiment/ingester/memdb"
segmentstorage "github.com/grafana/pyroscope/pkg/experiment/ingester/storage"
"github.com/grafana/pyroscope/pkg/model"
pprofsplit "github.com/grafana/pyroscope/pkg/model/pprof_split"
pprofmodel "github.com/grafana/pyroscope/pkg/pprof"
"github.com/grafana/pyroscope/pkg/tenant"
"github.com/grafana/pyroscope/pkg/util/math"
"github.com/grafana/pyroscope/pkg/validation"
)

const pathSegments = "segments"
const pathDLQ = "dlq"
const pathAnon = tenant.DefaultTenantID
const pathBlock = "block.bin"

var ErrMetastoreDLQFailed = fmt.Errorf("failed to store block metadata in DLQ")

type shardKey uint32
Expand Down Expand Up @@ -193,17 +187,15 @@ func (sw *segmentsWriter) newShard(sk shardKey) *shard {
func (sw *segmentsWriter) newSegment(sh *shard, sk shardKey, sl log.Logger) *segment {
id := ulid.MustNew(ulid.Timestamp(time.Now()), rand.Reader)
sshard := fmt.Sprintf("%d", sk)
blockPath := path.Join(pathSegments, sshard, pathAnon, id.String(), pathBlock)
s := &segment{
l: log.With(sl, "segment-id", id.String()),
ulid: id,
heads: make(map[serviceKey]serviceHead),
sw: sw,
sh: sh,
shard: sk,
sshard: sshard,
blockPath: blockPath,
doneChan: make(chan struct{}, 0),
l: log.With(sl, "segment-id", id.String()),
ulid: id,
heads: make(map[serviceKey]serviceHead),
sw: sw,
sh: sh,
shard: sk,
sshard: sshard,
doneChan: make(chan struct{}, 0),
}
return s
}
Expand Down Expand Up @@ -234,7 +226,7 @@ func (s *segment) flush(ctx context.Context) (err error) {
return fmt.Errorf("failed to flush block %s: %w", s.ulid.String(), err)
}
// TODO(kolesnikovae): Add sane timeouts to all the operations.
if err = s.sw.uploadBlock(ctx, blockData, s); err != nil {
if err = s.sw.uploadBlock(ctx, blockData, blockMeta, s); err != nil {
return fmt.Errorf("failed to upload block %s: %w", s.ulid.String(), err)
}
if err = s.sw.storeMeta(ctx, blockMeta, s); err != nil {
Expand Down Expand Up @@ -406,7 +398,6 @@ type segment struct {
heads map[serviceKey]serviceHead
headsLock sync.RWMutex
sw *segmentsWriter
blockPath string
doneChan chan struct{}
flushErr error
flushErrMutex sync.Mutex
Expand Down Expand Up @@ -517,16 +508,19 @@ func (s *segment) headForIngest(k serviceKey) *memdb.Head {
return nh
}

func (sw *segmentsWriter) uploadBlock(ctx context.Context, blockData []byte, s *segment) error {
func (sw *segmentsWriter) uploadBlock(ctx context.Context, blockData []byte, meta *metastorev1.BlockMeta, s *segment) error {
t1 := time.Now()
defer func() {
sw.metrics.blockUploadDuration.WithLabelValues(s.sshard).Observe(time.Since(t1).Seconds())
}()
sw.metrics.segmentBlockSizeBytes.WithLabelValues(s.sshard).Observe(float64(len(blockData)))
if err := sw.bucket.Upload(ctx, s.blockPath, bytes.NewReader(blockData)); err != nil {

blockPath := segmentstorage.PathForSegment(meta)

if err := sw.bucket.Upload(ctx, blockPath, bytes.NewReader(blockData)); err != nil {
return err
}
sw.l.Log("msg", "uploaded block", "path", s.blockPath, "upload_duration", time.Since(t1))
sw.l.Log("msg", "uploaded block", "path", blockPath, "upload_duration", time.Since(t1))
return nil
}

Expand All @@ -549,7 +543,7 @@ func (sw *segmentsWriter) storeMetaDLQ(ctx context.Context, meta *metastorev1.Bl
sw.metrics.storeMetaDLQ.WithLabelValues(s.sshard, "err").Inc()
return err
}
fullPath := path.Join(pathDLQ, s.sshard, pathAnon, s.ulid.String(), "meta.pb")
fullPath := segmentstorage.PathForDLQ(meta)
if err = sw.bucket.Upload(ctx, fullPath, bytes.NewReader(metaBlob)); err != nil {
sw.metrics.storeMetaDLQ.WithLabelValues(s.sshard, "err").Inc()
return fmt.Errorf("%w, %w", ErrMetastoreDLQFailed, err)
Expand Down
101 changes: 96 additions & 5 deletions pkg/experiment/ingester/segment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"bytes"
"context"
"fmt"
"github.com/grafana/dskit/flagext"
"github.com/grafana/pyroscope/pkg/experiment/metastore"
"github.com/grafana/pyroscope/pkg/test/mocks/mockdlq"
"io"
"math/rand"
"path/filepath"
Expand All @@ -14,14 +17,16 @@ import (
"time"

gprofile "github.com/google/pprof/profile"

profilev1 "github.com/grafana/pyroscope/api/gen/proto/go/google/v1"
ingesterv1 "github.com/grafana/pyroscope/api/gen/proto/go/ingester/v1"
"github.com/grafana/pyroscope/api/gen/proto/go/ingester/v1/ingesterv1connect"
metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1"
typesv1 "github.com/grafana/pyroscope/api/gen/proto/go/types/v1"
"github.com/grafana/pyroscope/pkg/experiment/ingester/memdb"
testutil2 "github.com/grafana/pyroscope/pkg/experiment/ingester/memdb/testutil"
segmentstorage "github.com/grafana/pyroscope/pkg/experiment/ingester/storage"
"github.com/grafana/pyroscope/pkg/experiment/metastore/dlq"
metastoretest "github.com/grafana/pyroscope/pkg/experiment/metastore/test"
"github.com/grafana/pyroscope/pkg/model"
"github.com/grafana/pyroscope/pkg/objstore/providers/filesystem"
"github.com/grafana/pyroscope/pkg/objstore/providers/memory"
Expand Down Expand Up @@ -226,10 +231,10 @@ func TestDLQFail(t *testing.T) {
l := testutil.NewLogger(t)
bucket := mockobjstore.NewMockBucket(t)
bucket.On("Upload", mock.Anything, mock.MatchedBy(func(name string) bool {
return strings.HasSuffix(name, pathBlock)
return segmentstorage.IsSegmentPath(name)
}), mock.Anything).Return(nil)
bucket.On("Upload", mock.Anything, mock.MatchedBy(func(name string) bool {
return strings.Contains(name, pathDLQ)
return segmentstorage.IsDLQPath(name)
}), mock.Anything).Return(fmt.Errorf("mock upload DLQ error"))
client := mockmetastorev1.NewMockMetastoreServiceClient(t)
client.On("AddBlock", mock.Anything, mock.Anything, mock.Anything).
Expand Down Expand Up @@ -369,6 +374,92 @@ func TestQueryMultipleSeriesSingleTenant(t *testing.T) {
require.Equal(t, expectedCollapsed, actualCollapsed)
}

func TestDLQRecoveryMock(t *testing.T) {
chunk := inputChunk([]input{
{shard: 1, tenant: "tb", profile: cpuProfile(42, 239, "svc1", "kek", "foo", "bar")},
})

sw := newTestSegmentWriter(t, segmentWriterConfig{
segmentDuration: 100 * time.Millisecond,
})
sw.client.On("AddBlock", mock.Anything, mock.Anything, mock.Anything).
Return(nil, fmt.Errorf("mock metastore unavailable"))

_ = sw.ingestChunk(t, chunk, false)
allBlocks := sw.getMetadataDLQ()
assert.Len(t, allBlocks, 1)

recoveredMetas := make(chan *metastorev1.BlockMeta, 1)
srv := mockdlq.NewMockLocalServer(t)
srv.On("AddRecoveredBlock", mock.Anything, mock.Anything).
Once().
Run(func(args mock.Arguments) {
meta := args.Get(1).(*metastorev1.AddBlockRequest).Block
recoveredMetas <- meta
}).
Return(&metastorev1.AddBlockResponse{}, nil)
recovery := dlq.NewRecovery(dlq.RecoveryConfig{
Period: 100 * time.Millisecond,
}, testutil.NewLogger(t), srv, sw.bucket)
recovery.Start()
defer recovery.Stop()

meta := <-recoveredMetas
assert.Equal(t, allBlocks[0].Id, meta.Id)

clients := sw.createBlocksFromMetas(allBlocks)
inputs := groupInputs(t, chunk)
sw.queryInputs(clients, inputs)
}

func TestDLQRecovery(t *testing.T) {
const tenant = "tb"
const ts = 239
chunk := inputChunk([]input{
{shard: 1, tenant: tenant, profile: cpuProfile(42, ts, "svc1", "kek", "foo", "bar")},
})

sw := newTestSegmentWriter(t, segmentWriterConfig{
segmentDuration: 100 * time.Millisecond,
})
sw.client.On("AddBlock", mock.Anything, mock.Anything, mock.Anything).
Return(nil, fmt.Errorf("mock metastore unavailable"))

_ = sw.ingestChunk(t, chunk, false)

cfg := new(metastore.Config)
flagext.DefaultValues(cfg)
cfg.DLQRecoveryPeriod = 100 * time.Millisecond
m := metastoretest.NewMetastoreSet(t, cfg, 3, sw.bucket)
defer m.Close()

queryBlock := func() *metastorev1.BlockMeta {
res, err := m.Client.QueryMetadata(context.Background(), &metastorev1.QueryMetadataRequest{
TenantId: []string{tenant},
StartTime: ts - 1,
EndTime: ts + 1,
Query: "{service_name=~\"svc1\"}",
})
if err != nil {
return nil
}
if len(res.Blocks) == 1 {
return res.Blocks[0]
}
return nil
}
require.Eventually(t, func() bool {
return queryBlock() != nil
}, 10*time.Second, 100*time.Millisecond)

block := queryBlock()
require.NotNil(t, block)

clients := sw.createBlocksFromMetas([]*metastorev1.BlockMeta{block})
inputs := groupInputs(t, chunk)
sw.queryInputs(clients, inputs)
}

type sw struct {
*segmentsWriter
bucket *memory.InMemBucket
Expand Down Expand Up @@ -407,7 +498,7 @@ func defaultTestSegmentWriterConfig() segmentWriterConfig {
func (sw *sw) createBlocksFromMetas(blocks []*metastorev1.BlockMeta) tenantClients {
dir := sw.t.TempDir()
for _, meta := range blocks {
blobReader, err := sw.bucket.Get(context.Background(), fmt.Sprintf("%s/%d/%s/%s/%s", pathSegments, meta.Shard, pathAnon, meta.Id, pathBlock))
blobReader, err := sw.bucket.Get(context.Background(), segmentstorage.PathForSegment(meta))
require.NoError(sw.t, err)
blob, err := io.ReadAll(blobReader)
require.NoError(sw.t, err)
Expand Down Expand Up @@ -561,7 +652,7 @@ func (sw *sw) getMetadataDLQ() []*metastorev1.BlockMeta {
objects := sw.bucket.Objects()
dlqFiles := []string{}
for s := range objects {
if strings.HasPrefix(s, pathDLQ) {
if segmentstorage.IsDLQPath(s) {
dlqFiles = append(dlqFiles, s)
} else {
}
Expand Down
34 changes: 34 additions & 0 deletions pkg/experiment/ingester/storage/path.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package storage

import (
"fmt"
metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1"
"github.com/grafana/pyroscope/pkg/tenant"
"path"
"strings"
)

const PathDLQ = "dlq"

const pathSegments = "segments"
const pathAnon = tenant.DefaultTenantID
const pathBlock = "block.bin"
const pathMetaPB = "meta.pb"

func PathForDLQ(meta *metastorev1.BlockMeta) string {
return path.Join(PathDLQ, fmt.Sprintf("%d", meta.Shard), pathAnon, meta.Id, pathMetaPB)
}

func PathForSegment(meta *metastorev1.BlockMeta) string {
return path.Join(pathSegments, fmt.Sprintf("%d", meta.Shard), pathAnon, meta.Id, pathBlock)
}

func IsDLQPath(p string) bool {
fs := strings.Split(p, "/")
return len(fs) == 5 && fs[0] == PathDLQ && fs[2] == pathAnon && fs[4] == pathMetaPB
}

func IsSegmentPath(p string) bool {
fs := strings.Split(p, "/")
return len(fs) == 5 && fs[0] == pathSegments && fs[2] == pathAnon && fs[4] == pathBlock
}
18 changes: 0 additions & 18 deletions pkg/experiment/metastore/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,21 +167,3 @@ func dial(address string, grpcClientConfig grpcclient.Config, _ log.Logger) (*gr
)
return grpc.Dial(address, options...)
}

const grpcServiceConfig = `{
"healthCheckConfig": {
"serviceName": "metastore.v1.MetastoreService.RaftLeader"
},
"loadBalancingPolicy":"round_robin",
"methodConfig": [{
"name": [{"service": "metastore.v1.MetastoreService"}],
"waitForReady": true,
"retryPolicy": {
"MaxAttempts": 16,
"InitialBackoff": ".01s",
"MaxBackoff": ".01s",
"BackoffMultiplier": 1.0,
"RetryableStatusCodes": [ "UNAVAILABLE" ]
}
}]
}`
5 changes: 3 additions & 2 deletions pkg/experiment/metastore/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/grafana/dskit/grpcclient"
compactorv1 "github.com/grafana/pyroscope/api/gen/proto/go/compactor/v1"
metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1"
"github.com/grafana/pyroscope/pkg/test"
"github.com/grafana/pyroscope/pkg/test/mocks/mockdiscovery"
"github.com/prometheus/prometheus/util/testutil"
"github.com/stretchr/testify/assert"
Expand All @@ -22,7 +23,7 @@ func TestUnavailable(t *testing.T) {
d.On("Subscribe", mock.Anything).Return()
l := testutil.NewLogger(t)
c := New(l, grpcclient.Config{}, d)
ports, err := getFreePorts(nServers)
ports, err := test.GetFreePorts(nServers)
assert.NoError(t, err)

d.On("ServerError", mock.Anything).Run(func(args mock.Arguments) {
Expand Down Expand Up @@ -87,7 +88,7 @@ func testRediscoverWrongLeader(t *testing.T, f func(c *Client)) {
config := &grpcclient.Config{}
flagext.DefaultValues(config)
c := New(l, *config, d)
ports, err := getFreePorts(nServers * 2)
ports, err := test.GetFreePorts(nServers * 2)
assert.NoError(t, err)

p1 := ports[:nServers]
Expand Down
16 changes: 0 additions & 16 deletions pkg/experiment/metastore/client/server_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,19 +202,3 @@ func newMockServer(t *testing.T) *mockServer {
compactorv1.RegisterCompactionPlannerServer(res.srv, res)
return res
}

func getFreePorts(len int) (ports []int, err error) {
ports = make([]int, len)
for i := 0; i < len; i++ {
var a *net.TCPAddr
if a, err = net.ResolveTCPAddr("tcp", "127.0.0.1:0"); err == nil {
var l *net.TCPListener
if l, err = net.ListenTCP("tcp", a); err != nil {
return nil, err
}
ports[i] = l.Addr().(*net.TCPAddr).Port
l.Close()
}
}
return ports, nil
}
Loading

0 comments on commit 3cc5bd8

Please sign in to comment.