Skip to content

Commit

Permalink
enable applied index optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
kolesnikovae committed Nov 20, 2024
1 parent 2628645 commit 6472024
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 11 deletions.
11 changes: 5 additions & 6 deletions pkg/experiment/metastore/fsm/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,12 +202,11 @@ func (fsm *FSM) applyCommand(cmd *raft.Log) any {
if err := e.UnmarshalBinary(cmd.Data); err != nil {
return errResponse(cmd, err)
}
// TODO(kolesnikovae): Figure out why skipping applied commands causes issues.
// if cmd.Index <= fsm.appliedIndex {
// // Skip already applied commands at WAL restore.
// // Note that the 0 index is a noop and is never applied to FSM.
// return Response{}
// }
if cmd.Index <= fsm.appliedIndex {
// Skip already applied commands at WAL restore.
// Note that the 0 index is a noop and is never applied to FSM.
return Response{}
}

cmdType := strconv.FormatUint(uint64(e.Type), 10)
fsm.db.metrics.fsmApplyCommandSize.WithLabelValues(cmdType).Observe(float64(len(cmd.Data)))
Expand Down
67 changes: 67 additions & 0 deletions pkg/experiment/metastore/index/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"go.etcd.io/bbolt"

metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1"
"github.com/grafana/pyroscope/pkg/experiment/metastore/index"
Expand Down Expand Up @@ -312,3 +313,69 @@ func createBlock(key string, offset time.Duration) *metastorev1.BlockMeta {
TenantId: "tenant-1",
}
}

func TestReplaceBlocks_Persistence(t *testing.T) {
db := test.BoltDB(t)
c := &index.Config{
PartitionDuration: 24 * time.Hour,
PartitionCacheSize: 7,
QueryLookaroundPeriod: time.Hour,
}
md1 := &metastorev1.BlockMeta{
Id: test.ULID("2024-09-22T08:00:00.123Z"),
Shard: 3,
CompactionLevel: 0,
TenantId: "",
}
md2 := &metastorev1.BlockMeta{
Id: test.ULID("2024-09-22T08:01:00.123Z"),
Shard: 3,
CompactionLevel: 0,
TenantId: "",
}
md3 := &metastorev1.BlockMeta{
Id: test.ULID("2024-09-25T09:00:00.123Z"),
Shard: 3,
CompactionLevel: 1,
TenantId: "x1",
}
md4 := &metastorev1.BlockMeta{
Id: test.ULID("2024-09-25T09:01:00.123Z"),
Shard: 3,
CompactionLevel: 1,
TenantId: "x2",
}

x := index.NewIndex(util.Logger, index.NewStore(), c)
require.NoError(t, db.Update(x.Init))
require.NoError(t, db.View(x.Restore))

require.NoError(t, db.Update(func(tx *bbolt.Tx) error {
return x.InsertBlock(tx, md1)
}))
require.NoError(t, db.Update(func(tx *bbolt.Tx) error {
return x.InsertBlock(tx, md2)
}))

require.NoError(t, db.Update(func(tx *bbolt.Tx) error {
return x.ReplaceBlocks(tx, &metastorev1.CompactedBlocks{
NewBlocks: []*metastorev1.BlockMeta{md3, md4},
SourceBlocks: &metastorev1.BlockList{
Tenant: md1.TenantId,
Shard: md1.Shard,
Blocks: []string{md1.Id, md2.Id},
},
})
}))

x = index.NewIndex(util.Logger, index.NewStore(), c)
require.NoError(t, db.Update(x.Init))
require.NoError(t, db.View(x.Restore))
require.NoError(t, db.View(func(tx *bbolt.Tx) error {
require.Nil(t, x.FindBlock(tx, md1.Shard, md1.TenantId, md1.Id))
require.Nil(t, x.FindBlock(tx, md2.Shard, md2.TenantId, md2.Id))
require.NotNil(t, x.FindBlock(tx, md3.Shard, md3.TenantId, md3.Id))
require.NotNil(t, x.FindBlock(tx, md4.Shard, md4.TenantId, md4.Id))
return nil
}))
}
4 changes: 3 additions & 1 deletion pkg/experiment/metastore/index/store/index_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ func (m *IndexStore) DeleteBlockList(tx *bbolt.Tx, pk PartitionKey, list *metast
return nil
}
for _, b := range list.Blocks {
return tenant.Delete([]byte(b))
if err := tenant.Delete([]byte(b)); err != nil {
return err
}
}
return nil
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/experiment/metastore/raftnode/node_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,11 @@ func (n *Node) readIndex() (ReadIndex, error) {
// See the "runLeader" and "dispatchLogs" implementation (hashicorp raft)
// for details: when the leader is elected, it issues a noop, we only need
// to ensure that the entry is committed before we access the current
// commit index (which may be greater than the current last index).
// We also keep track of the current term to ensure that the leader has not
// changed while we were waiting for the noop to be committed and heartbeat
// messages to be exchanged.
// commit index. This may incur substantial latency, if replicas are slow,
// but it's the only way to ensure that the leader has all committed
// entries. We also keep track of the current term to ensure that the
// leader has not changed while we were waiting for the noop to be
// committed and heartbeat messages to be exchanged.
if err := n.waitLastIndexCommitted(); err != nil {
return ReadIndex{}, err
}
Expand Down

0 comments on commit 6472024

Please sign in to comment.