Skip to content

Commit 43f5d2e

Browse files
committed
malloc: refactor mmap allocator
1 parent fa2ec45 commit 43f5d2e

File tree

4 files changed

+275
-88
lines changed

4 files changed

+275
-88
lines changed

pkg/common/malloc/fixed_size_mmap_allocator.go

Lines changed: 162 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -15,41 +15,38 @@
1515
package malloc
1616

1717
import (
18+
"math/bits"
19+
"slices"
20+
"sync"
21+
"sync/atomic"
1822
"unsafe"
1923

2024
"golang.org/x/sys/unix"
2125
)
2226

23-
const (
24-
// Classes with smaller size than smallClassCap will buffer min((smallClassCap/size), maxBuffer1Cap) objects in buffer1
25-
smallClassCap = 1 * MB
26-
maxBuffer1Cap = 256
27-
28-
// objects in buffer2 will be MADV_DONTNEED-advised and will not occupy RSS, so it's safe to use a large number
29-
buffer2Cap = 1024
30-
)
31-
3227
type fixedSizeMmapAllocator struct {
3328
size uint64
34-
// buffer1 buffers objects
35-
buffer1 chan unsafe.Pointer
36-
// buffer2 buffers MADV_DONTNEED objects
37-
buffer2 chan unsafe.Pointer
29+
30+
mu sync.Mutex
31+
slabs []*_Slab
32+
maxSlabs int
33+
freeSlabs []*_Slab
34+
maxFreeSlabs int
3835

3936
deallocatorPool *ClosureDeallocatorPool[fixedSizeMmapDeallocatorArgs, *fixedSizeMmapDeallocatorArgs]
4037
}
4138

4239
type fixedSizeMmapDeallocatorArgs struct {
43-
length uint64
44-
ptr unsafe.Pointer
40+
slab *_Slab
41+
ptr unsafe.Pointer
4542
}
4643

4744
func (f fixedSizeMmapDeallocatorArgs) As(trait Trait) bool {
48-
if info, ok := trait.(*MmapInfo); ok {
49-
info.Addr = f.ptr
50-
info.Length = f.length
51-
return true
52-
}
45+
//if info, ok := trait.(*MmapInfo); ok {
46+
// info.Addr = f.ptr
47+
// info.Length = f.length
48+
// return true
49+
//}
5350
return false
5451
}
5552

@@ -60,97 +57,184 @@ type MmapInfo struct {
6057

6158
func (*MmapInfo) IsTrait() {}
6259

60+
const (
61+
maxActiveSlabs = 256
62+
maxActiveBytes = 1 * MB
63+
maxStandbySlabs = 1024
64+
maxStandbyBytes = 128 * MB
65+
)
66+
6367
func NewFixedSizeMmapAllocator(
6468
size uint64,
6569
) (ret *fixedSizeMmapAllocator) {
6670

67-
// if size is larger than smallClassCap, num1 will be zero, buffer1 will be empty
68-
num1 := smallClassCap / size
69-
if num1 > maxBuffer1Cap {
70-
// don't buffer too much, since chans with larger buffer consume more memory
71-
num1 = maxBuffer1Cap
72-
}
73-
7471
ret = &fixedSizeMmapAllocator{
75-
size: size,
76-
buffer1: make(chan unsafe.Pointer, num1),
77-
buffer2: make(chan unsafe.Pointer, buffer2Cap),
72+
size: size,
73+
maxSlabs: min(
74+
maxActiveSlabs,
75+
maxActiveBytes/(int(size)*slabCapacity),
76+
),
77+
maxFreeSlabs: min(
78+
maxStandbySlabs,
79+
maxStandbyBytes/(int(size)*slabCapacity),
80+
),
7881

7982
deallocatorPool: NewClosureDeallocatorPool(
8083
func(hints Hints, args *fixedSizeMmapDeallocatorArgs) {
81-
82-
if hints&DoNotReuse > 0 {
83-
ret.freeMem(args.ptr)
84-
return
84+
empty := args.slab.free(args.ptr)
85+
if empty {
86+
ret.freeSlab(args.slab)
8587
}
88+
},
89+
),
90+
}
8691

87-
select {
92+
return ret
93+
}
8894

89-
case ret.buffer1 <- args.ptr:
90-
// buffer in buffer1
95+
var _ FixedSizeAllocator = new(fixedSizeMmapAllocator)
9196

92-
default:
97+
func (f *fixedSizeMmapAllocator) Allocate(hints Hints, clearSize uint64) ([]byte, Deallocator, error) {
98+
slab, ptr, err := f.allocate()
99+
if err != nil {
100+
return nil, nil, err
101+
}
93102

94-
ret.freeMem(args.ptr)
103+
slice := unsafe.Slice(
104+
(*byte)(ptr),
105+
f.size,
106+
)
107+
if hints&NoClear == 0 {
108+
clear(slice[:min(clearSize, f.size)])
109+
}
95110

96-
select {
111+
return slice, f.deallocatorPool.Get(fixedSizeMmapDeallocatorArgs{
112+
slab: slab,
113+
ptr: ptr,
114+
}), nil
115+
}
97116

98-
case ret.buffer2 <- args.ptr:
99-
// buffer in buffer2
117+
func (f *fixedSizeMmapAllocator) allocate() (*_Slab, unsafe.Pointer, error) {
118+
f.mu.Lock()
119+
defer f.mu.Unlock()
100120

101-
default:
121+
// from existing
122+
for _, slab := range f.slabs {
123+
ptr, ok := slab.allocate()
124+
if ok {
125+
return slab, ptr, nil
126+
}
127+
}
102128

103-
}
129+
// empty or all full
130+
// from freeSlabs
131+
if len(f.freeSlabs) > 0 {
132+
slab := f.freeSlabs[len(f.freeSlabs)-1]
133+
f.freeSlabs = f.freeSlabs[:len(f.freeSlabs)-1]
134+
reuseMem(slab.base, slab.objectSize*slabCapacity)
135+
f.slabs = append(f.slabs, slab)
136+
ptr, _ := slab.allocate()
137+
return slab, ptr, nil
138+
}
104139

105-
}
140+
// allocate new
141+
slice, err := unix.Mmap(
142+
-1, 0,
143+
int(f.size*slabCapacity),
144+
unix.PROT_READ|unix.PROT_WRITE,
145+
unix.MAP_PRIVATE|unix.MAP_ANONYMOUS,
146+
)
147+
if err != nil {
148+
return nil, nil, err
149+
}
106150

107-
},
108-
),
151+
base := unsafe.Pointer(unsafe.SliceData(slice))
152+
slab := &_Slab{
153+
base: base,
154+
objectSize: int(f.size),
109155
}
156+
f.slabs = append(f.slabs, slab)
110157

111-
return ret
158+
ptr, _ := slab.allocate()
159+
return slab, ptr, nil
112160
}
113161

114-
var _ FixedSizeAllocator = new(fixedSizeMmapAllocator)
162+
func (f *fixedSizeMmapAllocator) freeSlab(slab *_Slab) {
163+
f.mu.Lock() // to prevent new allocation
164+
defer f.mu.Unlock()
115165

116-
func (f *fixedSizeMmapAllocator) Allocate(hints Hints, clearSize uint64) (slice []byte, dec Deallocator, err error) {
166+
if len(f.slabs) < f.maxSlabs {
167+
return
168+
}
117169

118-
select {
170+
if slab.mask.Load() != 0 {
171+
// has new allocation
172+
return
173+
}
119174

120-
case ptr := <-f.buffer1:
121-
// from buffer1
122-
slice = unsafe.Slice((*byte)(ptr), f.size)
123-
if hints&NoClear == 0 {
124-
clear(slice[:clearSize])
175+
offset := -1
176+
for i, s := range f.slabs {
177+
if s == slab {
178+
offset = i
179+
break
125180
}
181+
}
182+
if offset == -1 {
183+
// already moved
184+
return
185+
}
126186

127-
default:
187+
// free slab memory
188+
freeMem(slab.base, slab.objectSize*slabCapacity)
189+
190+
// move to freeSlabs
191+
f.slabs = slices.Delete(f.slabs, offset, offset+1)
192+
f.freeSlabs = append(f.freeSlabs, slab)
193+
194+
for len(f.freeSlabs) > f.maxFreeSlabs {
195+
slab := f.freeSlabs[len(f.freeSlabs)-1]
196+
f.freeSlabs = f.freeSlabs[:len(f.freeSlabs)-1]
197+
unix.Munmap(
198+
unsafe.Slice(
199+
(*byte)(slab.base),
200+
slab.objectSize*slabCapacity,
201+
),
202+
)
203+
}
128204

129-
select {
205+
}
130206

131-
case ptr := <-f.buffer2:
132-
// from buffer2
133-
f.reuseMem(ptr, hints, clearSize)
134-
slice = unsafe.Slice((*byte)(ptr), f.size)
207+
const slabCapacity = 64 // uint64 masked
135208

136-
default:
137-
// allocate new
138-
slice, err = unix.Mmap(
139-
-1, 0,
140-
int(f.size),
141-
unix.PROT_READ|unix.PROT_WRITE,
142-
unix.MAP_PRIVATE|unix.MAP_ANONYMOUS,
143-
)
144-
if err != nil {
145-
return nil, nil, err
146-
}
209+
type _Slab struct {
210+
base unsafe.Pointer
211+
objectSize int
212+
mask atomic.Uint64
213+
}
147214

215+
func (s *_Slab) allocate() (unsafe.Pointer, bool) {
216+
for {
217+
mask := s.mask.Load()
218+
reverse := ^mask
219+
if reverse == 0 {
220+
// full
221+
return nil, false
222+
}
223+
offset := bits.TrailingZeros64(reverse)
224+
addr := unsafe.Add(s.base, offset*s.objectSize)
225+
if s.mask.CompareAndSwap(mask, mask|(1<<offset)) {
226+
return addr, true
148227
}
149-
150228
}
229+
}
151230

152-
return slice, f.deallocatorPool.Get(fixedSizeMmapDeallocatorArgs{
153-
ptr: unsafe.Pointer(unsafe.SliceData(slice)),
154-
length: f.size,
155-
}), nil
231+
func (s *_Slab) free(ptr unsafe.Pointer) bool {
232+
offset := (uintptr(ptr) - uintptr(s.base)) / uintptr(s.objectSize)
233+
for {
234+
mask := s.mask.Load()
235+
newMask := mask & ^(uint64(1) << offset)
236+
if s.mask.CompareAndSwap(mask, newMask) {
237+
return newMask == 0
238+
}
239+
}
156240
}

0 commit comments

Comments
 (0)