@@ -5,131 +5,133 @@ import (
55 "errors"
66 "sync"
77 "sync/atomic"
8-
9- "github.com/celestiaorg/go-header"
108)
119
1210// errElapsedHeight is thrown when a requested height was already provided to heightSub.
1311var errElapsedHeight = errors .New ("elapsed height" )
1412
1513// heightSub provides a minimalistic mechanism to wait till header for a height becomes available.
16- type heightSub [ H header. Header [ H ]] struct {
14+ type heightSub struct {
1715 // height refers to the latest locally available header height
1816 // that has been fully verified and inserted into the subjective chain
1917 height atomic.Uint64
20- heightReqsLk sync.Mutex
21- heightReqs map [uint64 ]map [chan H ]struct {}
18+ heightSubsLk sync.Mutex
19+ heightSubs map [uint64 ]* sub
20+ }
21+
22+ type sub struct {
23+ signal chan struct {}
24+ count int
2225}
2326
2427// newHeightSub instantiates new heightSub.
25- func newHeightSub [H header.Header [H ]]() * heightSub [H ] {
26- return & heightSub [H ]{
27- heightReqs : make (map [uint64 ]map [chan H ]struct {}),
28+ func newHeightSub () * heightSub {
29+ return & heightSub {
30+ heightSubs : make (map [uint64 ]* sub ),
31+ }
32+ }
33+
34+ // Init the heightSub with a given height.
35+ // Notifies all awaiting [Wait] calls lower than height.
36+ func (hs * heightSub ) Init (height uint64 ) {
37+ hs .height .Store (height )
38+
39+ hs .heightSubsLk .Lock ()
40+ defer hs .heightSubsLk .Unlock ()
41+
42+ for h := range hs .heightSubs {
43+ if h < height {
44+ hs .notify (h , true )
45+ }
2846 }
2947}
3048
3149// Height reports current height.
32- func (hs * heightSub [ H ] ) Height () uint64 {
50+ func (hs * heightSub ) Height () uint64 {
3351 return hs .height .Load ()
3452}
3553
3654// SetHeight sets the new head height for heightSub.
37- func (hs * heightSub [H ]) SetHeight (height uint64 ) {
38- hs .height .Store (height )
55+ // Notifies all awaiting [Wait] calls in range from [heightSub.Height] to height.
56+ func (hs * heightSub ) SetHeight (height uint64 ) {
57+ for {
58+ curr := hs .height .Load ()
59+ if curr >= height {
60+ return
61+ }
62+ if ! hs .height .CompareAndSwap (curr , height ) {
63+ continue
64+ }
65+
66+ hs .heightSubsLk .Lock ()
67+ defer hs .heightSubsLk .Unlock () //nolint:gocritic // we have a return below
68+
69+ for ; curr <= height ; curr ++ {
70+ hs .notify (curr , true )
71+ }
72+ return
73+ }
3974}
4075
41- // Sub subscribes for a header of a given height .
42- // It can return errElapsedHeight, which means a requested header was already provided
76+ // Wait for a given height to be published .
77+ // It can return errElapsedHeight, which means a requested height was already seen
4378// and caller should get it elsewhere.
44- func (hs * heightSub [H ]) Sub (ctx context.Context , height uint64 ) (H , error ) {
45- var zero H
79+ func (hs * heightSub ) Wait (ctx context.Context , height uint64 ) error {
4680 if hs .Height () >= height {
47- return zero , errElapsedHeight
81+ return errElapsedHeight
4882 }
4983
50- hs .heightReqsLk .Lock ()
84+ hs .heightSubsLk .Lock ()
5185 if hs .Height () >= height {
5286 // This is a rare case we have to account for.
5387 // The lock above can park a goroutine long enough for hs.height to change for a requested height,
5488 // leaving the request never fulfilled and the goroutine deadlocked.
55- hs .heightReqsLk .Unlock ()
56- return zero , errElapsedHeight
89+ hs .heightSubsLk .Unlock ()
90+ return errElapsedHeight
5791 }
58- resp := make ( chan H , 1 )
59- reqs , ok := hs .heightReqs [height ]
92+
93+ sac , ok := hs .heightSubs [height ]
6094 if ! ok {
61- reqs = make (map [chan H ]struct {})
62- hs .heightReqs [height ] = reqs
95+ sac = & sub {
96+ signal : make (chan struct {}, 1 ),
97+ }
98+ hs .heightSubs [height ] = sac
6399 }
64- reqs [ resp ] = struct {}{}
65- hs .heightReqsLk .Unlock ()
100+ sac . count ++
101+ hs .heightSubsLk .Unlock ()
66102
67103 select {
68- case resp := <- resp :
69- return resp , nil
104+ case <- sac . signal :
105+ return nil
70106 case <- ctx .Done ():
71107 // no need to keep the request, if the op has canceled
72- hs .heightReqsLk .Lock ()
73- delete (reqs , resp )
74- if len (reqs ) == 0 {
75- delete (hs .heightReqs , height )
76- }
77- hs .heightReqsLk .Unlock ()
78- return zero , ctx .Err ()
108+ hs .heightSubsLk .Lock ()
109+ hs .notify (height , false )
110+ hs .heightSubsLk .Unlock ()
111+ return ctx .Err ()
79112 }
80113}
81114
82- // Pub processes all the outstanding subscriptions matching the given headers.
83- // Pub is only safe when called from one goroutine.
84- // For Pub to work correctly, heightSub has to be initialized with SetHeight
85- // so that given headers are contiguous to the height on heightSub.
86- func (hs * heightSub [H ]) Pub (headers ... H ) {
87- ln := len (headers )
88- if ln == 0 {
89- return
90- }
115+ // Notify and release the waiters in [Wait].
116+ // Note: do not advance heightSub's height.
117+ func (hs * heightSub ) Notify (heights ... uint64 ) {
118+ hs .heightSubsLk .Lock ()
119+ defer hs .heightSubsLk .Unlock ()
91120
92- height := hs .Height ()
93- from , to := headers [0 ].Height (), headers [ln - 1 ].Height ()
94- if height + 1 != from &&
95- height != 0 { // height != 0 is needed to enable init from any height and not only 1
96- log .Fatalf (
97- "PLEASE FILE A BUG REPORT: headers given to the heightSub are in the wrong order: expected %d, got %d" ,
98- height + 1 ,
99- from ,
100- )
101- return
121+ for _ , h := range heights {
122+ hs .notify (h , true )
102123 }
103- hs .SetHeight (to )
104-
105- hs .heightReqsLk .Lock ()
106- defer hs .heightReqsLk .Unlock ()
107-
108- // there is a common case where we Pub only header
109- // in this case, we shouldn't loop over each heightReqs
110- // and instead read from the map directly
111- if ln == 1 {
112- reqs , ok := hs .heightReqs [from ]
113- if ok {
114- for req := range reqs {
115- req <- headers [0 ] // reqs must always be buffered, so this won't block
116- }
117- delete (hs .heightReqs , from )
118- }
124+ }
125+
126+ func (hs * heightSub ) notify (height uint64 , all bool ) {
127+ sac , ok := hs .heightSubs [height ]
128+ if ! ok {
119129 return
120130 }
121131
122- // instead of looping over each header in 'headers', we can loop over each request
123- // which will drastically decrease idle iterations, as there will be less requests than headers
124- for height , reqs := range hs .heightReqs {
125- // then we look if any of the requests match the given range of headers
126- if height >= from && height <= to {
127- // and if so, calculate its position and fulfill requests
128- h := headers [height - from ]
129- for req := range reqs {
130- req <- h // reqs must always be buffered, so this won't block
131- }
132- delete (hs .heightReqs , height )
133- }
132+ sac .count --
133+ if all || sac .count == 0 {
134+ close (sac .signal )
135+ delete (hs .heightSubs , height )
134136 }
135137}
0 commit comments