Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fifocache: make queue1 size configurable #21301

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pkg/fileservice/disk_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ func NewDiskCache(
)
}
},
func(capacity int64) int64 {
// use half space for queue1
return capacity / 2
},
),
}
ret.updatingPaths.Cond = sync.NewCond(new(sync.Mutex))
Expand Down Expand Up @@ -160,6 +164,9 @@ func (d *DiskCache) loadCache(ctx context.Context) {
}

d.cache.Set(ctx, work.Path, struct{}{}, int64(fileSize(info)))
// get 2 times to prevent too early eviction
d.cache.Get(ctx, work.Path)
d.cache.Get(ctx, work.Path)
}
}()
}
Expand Down
24 changes: 18 additions & 6 deletions pkg/fileservice/fifocache/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import (
func BenchmarkSequentialSet(b *testing.B) {
ctx := context.Background()
size := 65536
cache := New[int, int](fscache.ConstCapacity(int64(size)), ShardInt[int], nil, nil, nil)
cache := New[int, int](fscache.ConstCapacity(int64(size)), ShardInt[int], nil, nil, nil, func(capacity int64) int64 {
return capacity / 10
})
nElements := size * 16
b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand All @@ -37,7 +39,9 @@ func BenchmarkSequentialSet(b *testing.B) {
func BenchmarkParallelSet(b *testing.B) {
ctx := context.Background()
size := 65536
cache := New[int, int](fscache.ConstCapacity(int64(size)), ShardInt[int], nil, nil, nil)
cache := New[int, int](fscache.ConstCapacity(int64(size)), ShardInt[int], nil, nil, nil, func(capacity int64) int64 {
return capacity / 10
})
nElements := size * 16
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
Expand All @@ -50,7 +54,9 @@ func BenchmarkParallelSet(b *testing.B) {
func BenchmarkGet(b *testing.B) {
ctx := context.Background()
size := 65536
cache := New[int, int](fscache.ConstCapacity(int64(size)), ShardInt[int], nil, nil, nil)
cache := New[int, int](fscache.ConstCapacity(int64(size)), ShardInt[int], nil, nil, nil, func(capacity int64) int64 {
return capacity / 10
})
nElements := size * 16
for i := 0; i < nElements; i++ {
cache.Set(ctx, i, i, int64(1+i%3))
Expand All @@ -64,7 +70,9 @@ func BenchmarkGet(b *testing.B) {
func BenchmarkParallelGet(b *testing.B) {
ctx := context.Background()
size := 65536
cache := New[int, int](fscache.ConstCapacity(int64(size)), ShardInt[int], nil, nil, nil)
cache := New[int, int](fscache.ConstCapacity(int64(size)), ShardInt[int], nil, nil, nil, func(capacity int64) int64 {
return capacity / 10
})
nElements := size * 16
for i := 0; i < nElements; i++ {
cache.Set(ctx, i, i, int64(1+i%3))
Expand All @@ -80,7 +88,9 @@ func BenchmarkParallelGet(b *testing.B) {
func BenchmarkParallelGetOrSet(b *testing.B) {
ctx := context.Background()
size := 65536
cache := New[int, int](fscache.ConstCapacity(int64(size)), ShardInt[int], nil, nil, nil)
cache := New[int, int](fscache.ConstCapacity(int64(size)), ShardInt[int], nil, nil, nil, func(capacity int64) int64 {
return capacity / 10
})
nElements := size * 16
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
Expand All @@ -97,7 +107,9 @@ func BenchmarkParallelGetOrSet(b *testing.B) {
func BenchmarkParallelEvict(b *testing.B) {
ctx := context.Background()
size := 65536
cache := New[int, int](fscache.ConstCapacity(int64(size)), ShardInt[int], nil, nil, nil)
cache := New[int, int](fscache.ConstCapacity(int64(size)), ShardInt[int], nil, nil, nil, func(capacity int64) int64 {
return capacity / 10
})
nElements := size * 16
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
Expand Down
4 changes: 3 additions & 1 deletion pkg/fileservice/fifocache/data_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ func NewDataCache(
postEvict func(ctx context.Context, key fscache.CacheKey, value fscache.Data, size int64),
) *DataCache {
return &DataCache{
fifo: New(capacity, shardCacheKey, postSet, postGet, postEvict),
fifo: New(capacity, shardCacheKey, postSet, postGet, postEvict, func(capacity int64) int64 {
return capacity / 10
}),
}
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/fileservice/fifocache/fifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,12 @@ func New[K comparable, V any](
postSet func(ctx context.Context, key K, value V, size int64),
postGet func(ctx context.Context, key K, value V, size int64),
postEvict func(ctx context.Context, key K, value V, size int64),
capacity1Func func(capacity int64) int64,
) *Cache[K, V] {
ret := &Cache[K, V]{
capacity: capacity,
capacity1: func() int64 {
return capacity() / 10
return capacity1Func(capacity())
},
itemQueue: make(chan *_CacheItem[K, V], runtime.GOMAXPROCS(0)*2),
queue1: *NewQueue[*_CacheItem[K, V]](),
Expand Down
15 changes: 12 additions & 3 deletions pkg/fileservice/fifocache/fifo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import (

func TestCacheSetGet(t *testing.T) {
ctx := context.Background()
cache := New[int, int](fscache.ConstCapacity(8), ShardInt[int], nil, nil, nil)
cache := New[int, int](fscache.ConstCapacity(8), ShardInt[int], nil, nil, nil, func(capacity int64) int64 {
return capacity / 10
})

cache.Set(ctx, 1, 1, 1)
n, ok := cache.Get(ctx, 1)
Expand All @@ -42,7 +44,9 @@ func TestCacheSetGet(t *testing.T) {

func TestCacheEvict(t *testing.T) {
ctx := context.Background()
cache := New[int, int](fscache.ConstCapacity(8), ShardInt[int], nil, nil, nil)
cache := New[int, int](fscache.ConstCapacity(8), ShardInt[int], nil, nil, nil, func(capacity int64) int64 {
return capacity / 10
})
for i := 0; i < 64; i++ {
cache.Set(ctx, i, i, 1)
if cache.used1+cache.used2 > cache.capacity() {
Expand All @@ -53,7 +57,9 @@ func TestCacheEvict(t *testing.T) {

func TestCacheEvict2(t *testing.T) {
ctx := context.Background()
cache := New[int, int](fscache.ConstCapacity(2), ShardInt[int], nil, nil, nil)
cache := New[int, int](fscache.ConstCapacity(2), ShardInt[int], nil, nil, nil, func(capacity int64) int64 {
return capacity / 10
})
cache.Set(ctx, 1, 1, 1)
cache.Set(ctx, 2, 2, 1)

Expand Down Expand Up @@ -94,6 +100,9 @@ func TestCacheEvict3(t *testing.T) {
func(_ context.Context, _ int, _ bool, _ int64) {
nEvict++
},
func(capacity int64) int64 {
return capacity / 10
},
)
for i := 0; i < 1024; i++ {
cache.Set(ctx, i, true, 1)
Expand Down
6 changes: 5 additions & 1 deletion pkg/objectio/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,11 @@ func newMetaCache(capacity fscache.CapacityFunc) *fifocache.Cache[mataCacheKey,
func(_ context.Context, _ mataCacheKey, _ []byte, size int64) { // postEvict
inuseBytes.Add(float64(-size))
capacityBytes.Set(float64(capacity()))
})
},
func(capacity int64) int64 {
return capacity / 10
},
)
}

func EvictCache(ctx context.Context) (target int64) {
Expand Down
Loading