Skip to content

Commit 9a62cf4

Browse files
committed
feat(store): Sync (#287)
Adds a Sync method which ensures all pending writes are processed. It blocks until the operation completes or fails. Also, the method gets used in `DeleteTo`, ensuring deletion always operates on the latest state. This is technically a hack for until #241 and #263 come
1 parent d1855b7 commit 9a62cf4

File tree

2 files changed

+86
-6
lines changed

2 files changed

+86
-6
lines changed

store/store.go

Lines changed: 60 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ type Store[H header.Header[H]] struct {
5555
contiguousHead atomic.Pointer[H]
5656
// pending keeps headers pending to be written in one batch
5757
pending *batch[H]
58+
// syncCh is a channel used to synchronize writes
59+
syncCh chan chan struct{}
5860

5961
Params Parameters
6062
}
@@ -109,6 +111,7 @@ func newStore[H header.Header[H]](ds datastore.Batching, opts ...Option) (*Store
109111
writes: make(chan []H, 16),
110112
writesDn: make(chan struct{}),
111113
pending: newBatch[H](params.WriteBatchSize),
114+
syncCh: make(chan chan struct{}),
112115
Params: params,
113116
}, nil
114117
}
@@ -153,6 +156,28 @@ func (s *Store[H]) Stop(ctx context.Context) error {
153156
return s.metrics.Close()
154157
}
155158

159+
// Sync ensures all pending writes are synchronized. It blocks until the operation completes or fails.
160+
func (s *Store[H]) Sync(ctx context.Context) error {
161+
waitCh := make(chan struct{})
162+
select {
163+
case s.syncCh <- waitCh:
164+
case <-s.writesDn:
165+
return errStoppedStore
166+
case <-ctx.Done():
167+
return ctx.Err()
168+
}
169+
170+
select {
171+
case <-waitCh:
172+
case <-s.writesDn:
173+
return errStoppedStore
174+
case <-ctx.Done():
175+
return ctx.Err()
176+
}
177+
178+
return nil
179+
}
180+
156181
func (s *Store[H]) Height() uint64 {
157182
return s.heightSub.Height()
158183
}
@@ -305,6 +330,12 @@ func (s *Store[H]) HasAt(_ context.Context, height uint64) bool {
305330

306331
// DeleteTo implements [header.Store] interface.
307332
func (s *Store[H]) DeleteTo(ctx context.Context, to uint64) error {
333+
// ensure all the pending headers are synchronized
334+
err := s.Sync(ctx)
335+
if err != nil {
336+
return err
337+
}
338+
308339
head, err := s.Head(ctx)
309340
if err != nil {
310341
return fmt.Errorf("header/store: reading head: %w", err)
@@ -468,7 +499,8 @@ func (s *Store[H]) Append(ctx context.Context, headers ...H) error {
468499
func (s *Store[H]) flushLoop() {
469500
defer close(s.writesDn)
470501
ctx := context.Background()
471-
for headers := range s.writes {
502+
503+
flush := func(headers []H) {
472504
s.ensureInit(headers)
473505
// add headers to the pending and ensure they are accessible
474506
s.pending.Append(headers...)
@@ -482,7 +514,7 @@ func (s *Store[H]) flushLoop() {
482514
// don't flush and continue if pending batch is not grown enough,
483515
// and Store is not stopping(headers == nil)
484516
if s.pending.Len() < s.Params.WriteBatchSize && headers != nil {
485-
continue
517+
return
486518
}
487519

488520
startTime := time.Now()
@@ -506,15 +538,37 @@ func (s *Store[H]) flushLoop() {
506538
s.metrics.flush(ctx, time.Since(startTime), s.pending.Len(), false)
507539
// reset pending
508540
s.pending.Reset()
541+
}
509542

510-
if headers == nil {
511-
// a signal to stop
512-
return
543+
for {
544+
select {
545+
case dn := <-s.syncCh:
546+
for {
547+
select {
548+
case headers := <-s.writes:
549+
flush(headers)
550+
if headers == nil {
551+
// a signal to stop
552+
return
553+
}
554+
continue
555+
default:
556+
}
557+
558+
close(dn)
559+
break
560+
}
561+
case headers := <-s.writes:
562+
flush(headers)
563+
if headers == nil {
564+
// a signal to stop
565+
return
566+
}
513567
}
514568
}
515569
}
516570

517-
// flush writes the given batch to datastore.
571+
// flush writes given headers to datastore
518572
func (s *Store[H]) flush(ctx context.Context, headers ...H) error {
519573
ln := len(headers)
520574
if ln == 0 {

store/store_test.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -675,6 +675,32 @@ func TestStore_DeleteTo_MoveHeadAndTail(t *testing.T) {
675675
assert.Equal(t, suite.Head().Height(), head.Height())
676676
}
677677

678+
func TestStore_DeleteTo_Synchronized(t *testing.T) {
679+
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
680+
t.Cleanup(cancel)
681+
682+
suite := headertest.NewTestSuite(t)
683+
684+
ds := sync.MutexWrap(datastore.NewMapDatastore())
685+
store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(10))
686+
687+
err := store.Append(ctx, suite.GenDummyHeaders(50)...)
688+
require.NoError(t, err)
689+
690+
err = store.Append(ctx, suite.GenDummyHeaders(50)...)
691+
require.NoError(t, err)
692+
693+
err = store.Append(ctx, suite.GenDummyHeaders(50)...)
694+
require.NoError(t, err)
695+
696+
err = store.DeleteTo(ctx, 100)
697+
require.NoError(t, err)
698+
699+
tail, err := store.Tail(ctx)
700+
require.NoError(t, err)
701+
require.EqualValues(t, 100, tail.Height())
702+
}
703+
678704
func TestStorePendingCacheMiss(t *testing.T) {
679705
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
680706
t.Cleanup(cancel)

0 commit comments

Comments
 (0)