Skip to content

Commit 1181d39

Browse files
committed
refactor(store): synchronize Store writes
1 parent 996923f commit 1181d39

File tree

2 files changed

+88
-70
lines changed

2 files changed

+88
-70
lines changed

store/metrics.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ func newMetrics() (m *metrics, err error) {
5252
}
5353
m.writesQueueBlockedInst, err = meter.Int64Counter(
5454
"hdr_store_writes_blocked_counter",
55-
metric.WithDescription("header store writes blocked counter"),
55+
metric.WithDescription("header store writesCh blocked counter"),
5656
)
5757
if err != nil {
5858
return nil, err

store/store.go

Lines changed: 87 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import (
44
"context"
55
"errors"
66
"fmt"
7-
"sync/atomic"
7+
"sync"
88
"time"
99

1010
lru "github.com/hashicorp/golang-lru/v2"
@@ -45,14 +45,13 @@ type Store[H header.Header[H]] struct {
4545

4646
// writing to datastore
4747
//
48-
// queue of headers to be written
49-
writes chan []H
48+
writesMu sync.Mutex
49+
// writesPending keeps headers pending to be written in one batch
50+
writesPending *batch[H]
51+
// queue of batches to be written
52+
writesCh chan *batch[H]
5053
// signals when writes are finished
5154
writesDn chan struct{}
52-
// writeHead maintains the current write head
53-
writeHead atomic.Pointer[H]
54-
// pending keeps headers pending to be written in one batch
55-
pending *batch[H]
5655

5756
Params Parameters
5857
}
@@ -99,15 +98,15 @@ func newStore[H header.Header[H]](ds datastore.Batching, opts ...Option) (*Store
9998
}
10099

101100
return &Store[H]{
102-
ds: wrappedStore,
103-
cache: cache,
104-
metrics: metrics,
105-
heightIndex: index,
106-
heightSub: newHeightSub[H](),
107-
writes: make(chan []H, 16),
108-
writesDn: make(chan struct{}),
109-
pending: newBatch[H](params.WriteBatchSize),
110-
Params: params,
101+
ds: wrappedStore,
102+
cache: cache,
103+
metrics: metrics,
104+
heightIndex: index,
105+
heightSub: newHeightSub[H](),
106+
writesCh: make(chan *batch[H], 4),
107+
writesDn: make(chan struct{}),
108+
writesPending: newBatch[H](params.WriteBatchSize),
109+
Params: params,
111110
}, nil
112111
}
113112

@@ -126,31 +125,41 @@ func (s *Store[H]) Init(ctx context.Context, initial H) error {
126125
return nil
127126
}
128127

128+
// Start starts or restarts the Store.
129129
func (s *Store[H]) Start(context.Context) error {
130130
// closed s.writesDn means that store was stopped before, recreate chan.
131131
select {
132132
case <-s.writesDn:
133+
s.writesCh = make(chan *batch[H], 4)
133134
s.writesDn = make(chan struct{})
135+
s.writesPending = newBatch[H](s.Params.WriteBatchSize)
134136
default:
135137
}
136138

137139
go s.flushLoop()
138140
return nil
139141
}
140142

143+
// Stop stops the store and cleans up resources.
144+
// Canceling context while stopping may leave the store in an inconsistent state.
141145
func (s *Store[H]) Stop(ctx context.Context) error {
146+
s.writesMu.Lock()
147+
defer s.writesMu.Unlock()
148+
// check if store was already stopped
142149
select {
143150
case <-s.writesDn:
144151
return errStoppedStore
145152
default:
146153
}
147-
// signal to prevent further writes to Store
154+
// write the pending leftover
148155
select {
149-
case s.writes <- nil:
156+
case s.writesCh <- s.writesPending:
157+
// signal closing to flushLoop
158+
close(s.writesCh)
150159
case <-ctx.Done():
151160
return ctx.Err()
152161
}
153-
// wait till it is done writing
162+
// wait till flushLoop is done writing
154163
select {
155164
case <-s.writesDn:
156165
case <-ctx.Done():
@@ -193,7 +202,7 @@ func (s *Store[H]) Get(ctx context.Context, hash header.Hash) (H, error) {
193202
return v, nil
194203
}
195204
// check if the requested header is not yet written on disk
196-
if h := s.pending.Get(hash); !h.IsZero() {
205+
if h := s.writesPending.Get(hash); !h.IsZero() {
197206
return h, nil
198207
}
199208

@@ -227,7 +236,8 @@ func (s *Store[H]) GetByHeight(ctx context.Context, height uint64) (H, error) {
227236
// which means the requested 'height' should be present
228237
//
229238
// check if the requested header is not yet written on disk
230-
if h := s.pending.GetByHeight(height); !h.IsZero() {
239+
// TODO: Synchronize with prepareWrite?
240+
if h := s.writesPending.GetByHeight(height); !h.IsZero() {
231241
return h, nil
232242
}
233243

@@ -287,7 +297,7 @@ func (s *Store[H]) Has(ctx context.Context, hash header.Hash) (bool, error) {
287297
return ok, nil
288298
}
289299
// check if the requested header is not yet written on disk
290-
if ok := s.pending.Has(hash); ok {
300+
if ok := s.writesPending.Has(hash); ok {
291301
return ok, nil
292302
}
293303

@@ -304,23 +314,15 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error {
304314
return nil
305315
}
306316

307-
var err error
308317
// take current write head to verify headers against
309-
var head H
310-
headPtr := s.writeHead.Load()
311-
if headPtr == nil {
312-
head, err = s.Head(ctx)
313-
if err != nil {
314-
return err
315-
}
316-
} else {
317-
head = *headPtr
318+
head, err := s.Head(ctx)
319+
if err != nil {
320+
return err
318321
}
319322

320323
// collect valid headers
321324
verified := make([]H, 0, lh)
322325
for i, h := range headers {
323-
324326
err = head.Verify(h)
325327
if err != nil {
326328
var verErr *header.VerifyError
@@ -344,27 +346,27 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error {
344346
head = h
345347
}
346348

347-
onWrite := func() {
348-
newHead := verified[len(verified)-1]
349-
s.writeHead.Store(&newHead)
350-
log.Infow("new head", "height", newHead.Height(), "hash", newHead.Hash())
351-
s.metrics.newHead(newHead.Height())
349+
// prepare headers to be written
350+
toWrite, err := s.prepareWrite(verified)
351+
switch {
352+
case err != nil:
353+
return err
354+
case toWrite == nil:
355+
return nil
352356
}
353357

354358
// queue headers to be written on disk
355359
select {
356-
case s.writes <- verified:
360+
case s.writesCh <- toWrite:
357361
// we return an error here after writing,
358362
// as there might be an invalid header in between of a given range
359-
onWrite()
360363
return err
361364
default:
362365
s.metrics.writesQueueBlocked(ctx)
363366
}
364-
// if the writes queue is full, we block until it is not
367+
// if the writesCh queue is full - we block anyway
365368
select {
366-
case s.writes <- verified:
367-
onWrite()
369+
case s.writesCh <- toWrite:
368370
return err
369371
case <-s.writesDn:
370372
return errStoppedStore
@@ -373,28 +375,50 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error {
373375
}
374376
}
375377

378+
func (s *Store[H]) prepareWrite(headers []H) (*batch[H], error) {
379+
s.writesMu.Lock()
380+
defer s.writesMu.Unlock()
381+
// check if store was stopped
382+
select {
383+
case <-s.writesDn:
384+
return nil, errStoppedStore
385+
default:
386+
}
387+
388+
// keep verified headers as pending writes and ensure they are accessible for reads
389+
s.writesPending.Append(headers...)
390+
// notify waiters if any
391+
// it is important to do Pub after updating pending
392+
// so pending is consistent with atomic Height counter on the heightSub
393+
s.heightSub.Pub(headers...)
394+
395+
// TODO: Head advancing
396+
// announce our new head
397+
newHead := headers[len(headers)-1]
398+
s.metrics.newHead(newHead.Height())
399+
log.Infow("new head", "height", newHead.Height(), "hash", newHead.Hash())
400+
401+
// don't flush and continue if pending write batch is not grown enough,
402+
if s.writesPending.Len() < s.Params.WriteBatchSize {
403+
return nil, nil
404+
}
405+
406+
toWrite := s.writesPending
407+
s.writesPending = newBatch[H](s.Params.WriteBatchSize)
408+
return toWrite, nil
409+
}
410+
376411
// flushLoop performs writing task to the underlying datastore in a separate routine
377-
// This way writes are controlled and manageable from one place allowing
378-
// (1) Appends not to be blocked on long disk IO writes and underlying DB compactions
379-
// (2) Batching header writes
412+
// This way writesCh are controlled and manageable from one place allowing
413+
// (1) Appends not to be blocked on long disk IO writesCh and underlying DB compactions
414+
// (2) Batching header writesCh
380415
func (s *Store[H]) flushLoop() {
381416
defer close(s.writesDn)
382417
ctx := context.Background()
383-
for headers := range s.writes {
384-
// add headers to the pending and ensure they are accessible
385-
s.pending.Append(headers...)
386-
// and notify waiters if any + increase current read head height
387-
// it is important to do Pub after updating pending
388-
// so pending is consistent with atomic Height counter on the heightSub
389-
s.heightSub.Pub(headers...)
390-
// don't flush and continue if pending batch is not grown enough,
391-
// and Store is not stopping(headers == nil)
392-
if s.pending.Len() < s.Params.WriteBatchSize && headers != nil {
393-
continue
394-
}
395418

419+
for headers := range s.writesCh {
396420
startTime := time.Now()
397-
toFlush := s.pending.GetAll()
421+
toFlush := headers.GetAll()
398422

399423
for i := 0; ; i++ {
400424
err := s.flush(ctx, toFlush...)
@@ -404,25 +428,19 @@ func (s *Store[H]) flushLoop() {
404428

405429
from, to := toFlush[0].Height(), toFlush[len(toFlush)-1].Height()
406430
log.Errorw("writing header batch", "try", i+1, "from", from, "to", to, "err", err)
407-
s.metrics.flush(ctx, time.Since(startTime), s.pending.Len(), true)
431+
s.metrics.flush(ctx, time.Since(startTime), s.writesPending.Len(), true)
408432

409433
const maxRetrySleep = time.Second
410434
sleep := min(10*time.Duration(i+1)*time.Millisecond, maxRetrySleep)
411435
time.Sleep(sleep)
412436
}
413437

414-
s.metrics.flush(ctx, time.Since(startTime), s.pending.Len(), false)
415-
// reset pending
416-
s.pending.Reset()
417-
418-
if headers == nil {
419-
// a signal to stop
420-
return
421-
}
438+
s.metrics.flush(ctx, time.Since(startTime), s.writesPending.Len(), false)
439+
headers.Reset()
422440
}
423441
}
424442

425-
// flush writes the given batch to datastore.
443+
// flush writesCh the given batch to datastore.
426444
func (s *Store[H]) flush(ctx context.Context, headers ...H) error {
427445
ln := len(headers)
428446
if ln == 0 {

0 commit comments

Comments
 (0)