From b7ddac4f0048f0351765bd2372b35d9d5e1076bc Mon Sep 17 00:00:00 2001 From: PapaCharlie Date: Tue, 11 Feb 2025 10:12:24 -0800 Subject: [PATCH] Fix race condition in glob collections When subscribing to a glob collection that is being updated, the `nonNilValueNames` set was being accessed in an unprotected way. This was generally safe as it was only used by new subscribers when they subscribed to empty collections. However, it was still potentially problematic. This fix not only addresses this race condition, but by refactoring `NotifyHandlerAfterSubscription`, it also allows multiple subscribers to subscribe to the glob collection concurrently. This will hopefully speed up the processing of new clients subscribing to many large glob collections all at once (which can happen during startup). --- cache.go | 39 ++++-- cache_test.go | 165 +++++++++++++++++++++++-- internal/cache/glob_collection.go | 92 +++++--------- internal/cache/glob_collections_map.go | 57 ++++++--- internal/cache/watchable_value.go | 55 ++++++--- server.go | 6 +- 6 files changed, 303 insertions(+), 111 deletions(-) diff --git a/cache.go b/cache.go index 288af19..b285922 100644 --- a/cache.go +++ b/cache.go @@ -2,6 +2,7 @@ package diderot import ( "fmt" + "sync" "time" "github.com/linkedin/diderot/ads" @@ -216,24 +217,44 @@ func (c *cache[T]) IsSubscribedTo(name string, handler ads.SubscriptionHandler[T } func (c *cache[T]) Subscribe(name string, handler ads.SubscriptionHandler[T]) { + wait := func(wgs ...*sync.WaitGroup) { + for _, wg := range wgs { + wg.Wait() + } + } + if name == ads.WildcardSubscription { subscribedAt, version := c.wildcardSubscribers.Subscribe(handler) - c.EntryNames(func(name string) bool { + + var waitGroups []*sync.WaitGroup + for name := range c.EntryNames { // Cannot call c.Subscribe here because it always creates a backing watchableValue if it does not // already exist. For wildcard subscriptions, if the entry doesn't exist (or in this case has been // deleted), a subscription isn't necessary. If the entry reappears, it will be automatically // subscribed to. c.resources.ComputeIfPresent(name, func(name string, value *internal.WatchableValue[T]) { - value.NotifyHandlerAfterSubscription(handler, internal.WildcardSubscription, subscribedAt, version) + wg := value.NotifyHandlerAfterSubscription( + handler, + internal.WildcardSubscription, + subscribedAt, + version, + ) + if wg != nil { + waitGroups = append(waitGroups, wg) + } }) - return true - }) + } + wait(waitGroups...) } else if gcURL, err := ads.ParseGlobCollectionURL(name, c.trimmedTypeURL); err == nil { - c.globCollections.Subscribe(gcURL, handler) + wait(c.globCollections.Subscribe(gcURL, handler)...) } else { + var wg *sync.WaitGroup c.createOrModifyEntry(name, func(name string, value *internal.WatchableValue[T]) { - value.Subscribe(handler) + wg = value.Subscribe(handler) }) + if wg != nil { + wg.Wait() + } } } @@ -337,7 +358,11 @@ type cacheWithPriority[T proto.Message] struct { func (c *cacheWithPriority[T]) Clear(name string, clearedAt time.Time) { var shouldDelete bool c.resources.ComputeIfPresent(name, func(name string, value *internal.WatchableValue[T]) { - shouldDelete = value.Clear(c.p, clearedAt) && value.SubscriberSets[internal.ExplicitSubscription].Size() == 0 + isFullClear := value.Clear(c.p, clearedAt) + if gcURL, err := ads.ExtractGlobCollectionURLFromResourceURN(name, c.trimmedTypeURL); err == nil { + c.globCollections.RemoveValueFromCollection(gcURL, value) + } + shouldDelete = isFullClear && value.SubscriberSets[internal.ExplicitSubscription].Size() == 0 }) if shouldDelete { c.deleteEntryIfNilAndNoSubscribers(name) diff --git a/cache_test.go b/cache_test.go index 75ffa92..08e27db 100644 --- a/cache_test.go +++ b/cache_test.go @@ -3,9 +3,12 @@ package diderot_test import ( "fmt" "maps" + "math/rand/v2" + "slices" "sort" "strconv" "sync" + "sync/atomic" "testing" "time" @@ -33,6 +36,8 @@ const ( name3 = "r3" ) +var globCollectionPrefix = ads.XDSTPScheme + "/" + diderot.TypeOf[*Timestamp]().TrimmedURL() + "/" + var noTime time.Time func TestCacheCrud(t *testing.T) { @@ -338,7 +343,7 @@ func TestNotifyMetadata(t *testing.T) { // loop. The loop should abort and not call the remaining subscribers. It should instead restart and run through each // subscriber with the updated value. func TestWatchableValueUpdateCancel(t *testing.T) { - prefix := ads.XDSTPScheme + "/" + diderot.TypeOf[*Timestamp]().TrimmedURL() + "/foo/" + prefix := globCollectionPrefix + "foo/" c := newCache() @@ -442,20 +447,20 @@ func TestCacheEntryDeletion(t *testing.T) { func TestCacheCollections(t *testing.T) { c := diderot.NewCache[*Timestamp]() - const prefix = "xdstp:///google.protobuf.Timestamp/" + const prefix = "xdstp:///google.protobuf.Timestamp/a/" h := make(testutils.ChanSubscriptionHandler[*Timestamp], 1) - c.Subscribe(prefix+"a/*", h) - h.WaitForDelete(t, prefix+"a/*") + c.Subscribe(prefix+"*", h) + h.WaitForDelete(t, prefix+"*") - c.Subscribe(prefix+"a/foo", h) - h.WaitForDelete(t, prefix+"a/foo") + c.Subscribe(prefix+"foo", h) + h.WaitForDelete(t, prefix+"foo") var updates []testutils.ExpectedNotification[*Timestamp] var deletes []testutils.ExpectedNotification[*Timestamp] for i := 0; i < 5; i++ { - name, v := prefix+"a/"+strconv.Itoa(i), strconv.Itoa(i) + name, v := prefix+""+strconv.Itoa(i), strconv.Itoa(i) updates = append(updates, testutils.ExpectUpdate(c.Set(name, v, Now(), noTime))) deletes = append(deletes, testutils.ExpectDelete[*Timestamp](name)) } @@ -468,7 +473,7 @@ func TestCacheCollections(t *testing.T) { h.WaitForNotifications(t, deletes...) - h.WaitForDelete(t, prefix+"a/*") + h.WaitForDelete(t, prefix+"*") } // TestCache raw validates that the various *Raw methods on the cache work as expected. Namely, raw @@ -798,3 +803,147 @@ func DisableTime(tb testing.TB) { internal.SetTimeProvider(time.Now) }) } + +// The following tests flex various critical sections in the way glob collections are handled (along +// with almost all the cache), to attempt to trigger a race condition. The tests must be run with +// -race for this to have any use. +func TestGlobRace(t *testing.T) { + prefix := globCollectionPrefix + "foo/" + + // This tests many writers all competing for writes against overlapping entries. + t.Run("update", func(t *testing.T) { + c := newCache() + + const ( + entries = 100 + writers = 100 + count = 100 + readers = 100 + + doneVersion = "done" + ) + + entryNames := make([]string, entries) + for i := range entryNames { + name := prefix + strconv.Itoa(i) + entryNames[i] = name + } + + var writesDone, readsDone sync.WaitGroup + writesDone.Add(writers) + readsDone.Add(writers * readers) + + for range readers { + h := testutils.NewSubscriptionHandler(func(name string, r *ads.Resource[*Timestamp], _ ads.SubscriptionMetadata) { + if r.Version == doneVersion { + readsDone.Done() + } + }) + c.Subscribe(ads.WildcardSubscription, h) + } + + for range writers { + names := slices.Clone(entryNames) + shuffle := func() []string { + rand.Shuffle(len(names), func(i, j int) { + names[i], names[j] = names[j], names[i] + }) + return names + } + go func() { + defer writesDone.Done() + for range count { + for _, name := range shuffle() { + c.Set(name, "1", new(Timestamp), time.Time{}) + } + } + }() + } + go func() { + writesDone.Wait() + for _, name := range entryNames { + c.Set(name, doneVersion, new(Timestamp), time.Time{}) + } + }() + + readsDone.Wait() + }) + // This tests subscribing to a collection that is currently being updated. + t.Run("subscribe", func(t *testing.T) { + c := newCache() + + stop := make(chan struct{}) + t.Cleanup(func() { close(stop) }) + + for i := range 1 { + name := prefix + strconv.Itoa(i) + go func() { + for { + select { + case <-stop: + return + default: + } + c.Set(name, "", new(Timestamp), noTime) + time.Sleep(time.Nanosecond) + c.Clear(name, noTime) + } + }() + } + h := testutils.NewSubscriptionHandler[*Timestamp](func(string, *ads.Resource[*Timestamp], ads.SubscriptionMetadata) {}) + + // 1000 chosen arbitrarily, can be increased to increase likelihood of race condition, but the test + // will take longer to run. + for range 1000 { + c.Subscribe(prefix+ads.WildcardSubscription, h) + } + }) + // This tests multiple subscribers all subscribing at once. + t.Run("concurrent subscriptions", func(t *testing.T) { + c := newCache() + + const ( + entries = 100 + subscribers = 100 + ) + + for i := range entries { + name := prefix + strconv.Itoa(i) + c.Set(name, "", Now(), noTime) + } + + var done sync.WaitGroup + done.Add(subscribers) + + // Set to true if inFlight is greater than one. + var multipleInFlight atomic.Bool + // Incremented when the Subscription loop starts, and decremented when it ends. If only one + // subscriber can go through the elements of a glob collection at once, then this will never be + // greater than once, and multipleInFlight will therefore never be set to true, failing the test. + var inFlight atomic.Int32 + + for range subscribers { + go func() { + defer done.Done() + var remainingEntries atomic.Int32 + remainingEntries.Add(entries) + handlerFunc := func(string, *ads.Resource[*Timestamp], ads.SubscriptionMetadata) { + switch remainingEntries.Add(-1) { + case entries - 1: + if inFlight.Add(1) > 1 { + multipleInFlight.Store(true) + } + case 0: + inFlight.Add(-1) + } + } + + c.Subscribe(prefix+ads.WildcardSubscription, testutils.NewSubscriptionHandler[*Timestamp](handlerFunc)) + }() + } + + done.Wait() + require.Equal(t, int32(0), inFlight.Load()) + require.True(t, multipleInFlight.Load()) + }) +} diff --git a/internal/cache/glob_collection.go b/internal/cache/glob_collection.go index 86db12d..39d6409 100644 --- a/internal/cache/glob_collection.go +++ b/internal/cache/glob_collection.go @@ -15,15 +15,12 @@ type globCollection[T proto.Message] struct { // GlobCollectionURL to avoid repeated redundant calls to GlobCollectionURL.String. url string - // Protects subscribers and values. - subscribersAndValuesLock sync.RWMutex // The current subscribers to this collection. subscribers SubscriberSet[T] + // Protects values and nonNilValueNames. + lock sync.RWMutex // The set of values in the collection, used by new subscribers to subscribe to all values. values utils.Set[*WatchableValue[T]] - - // Protects nonNilValueNames - nonNilValueNamesLock sync.Mutex // The set of all non-nil resource names in this collection. Used to track whether a collection is // empty. Note that a collection can be empty even if values is non-empty since values that are // explicitly subscribed to are kept in the collection/cache to track the subscription in case the @@ -31,62 +28,30 @@ type globCollection[T proto.Message] struct { nonNilValueNames utils.Set[string] } +func newGlobCollection[T proto.Message](url string) *globCollection[T] { + return &globCollection[T]{ + url: url, + values: make(utils.Set[*WatchableValue[T]]), + nonNilValueNames: make(utils.Set[string]), + } +} + func (g *globCollection[T]) hasNoValuesOrSubscribersNoLock() bool { return len(g.values) == 0 && g.subscribers.Size() == 0 } // hasNoValuesOrSubscribers returns true if the collection is empty and has no subscribers. func (g *globCollection[T]) hasNoValuesOrSubscribers() bool { - g.subscribersAndValuesLock.RLock() - defer g.subscribersAndValuesLock.RUnlock() + g.lock.RLock() + defer g.lock.RUnlock() return g.hasNoValuesOrSubscribersNoLock() } -// isSubscribed checks if the given handler is already subscribed to the collection. -func (g *globCollection[T]) isSubscribed(handler ads.SubscriptionHandler[T]) bool { - g.subscribersAndValuesLock.Lock() - defer g.subscribersAndValuesLock.Unlock() - - return g.subscribers.IsSubscribed(handler) -} - -// subscribe adds the given handler as a subscriber to the collection, and iterates through all the -// values in the collection, notifying the handler for each value. If the collection is empty, the -// handler will be notified that the resource is deleted. -func (g *globCollection[T]) subscribe(handler ads.SubscriptionHandler[T]) { - g.subscribersAndValuesLock.Lock() - defer g.subscribersAndValuesLock.Unlock() - - subscribedAt, version := g.subscribers.Subscribe(handler) - - if len(g.nonNilValueNames) == 0 { - handler.Notify(g.url, nil, ads.SubscriptionMetadata{ - SubscribedAt: subscribedAt, - ModifiedAt: time.Time{}, - CachedAt: time.Time{}, - }) - } else { - for v := range g.values { - v.NotifyHandlerAfterSubscription(handler, GlobSubscription, subscribedAt, version) - } - } -} - -// unsubscribe unsubscribes the given handler from the collection. Returns true if the collection has -// no subscribers and is empty. -func (g *globCollection[T]) unsubscribe(handler ads.SubscriptionHandler[T]) bool { - g.subscribersAndValuesLock.Lock() - defer g.subscribersAndValuesLock.Unlock() - - g.subscribers.Unsubscribe(handler) - return g.hasNoValuesOrSubscribersNoLock() -} - // resourceSet notifies the collection that the given resource has been created. func (g *globCollection[T]) resourceSet(name string) { - g.nonNilValueNamesLock.Lock() - defer g.nonNilValueNamesLock.Unlock() + g.lock.Lock() + defer g.lock.Unlock() g.nonNilValueNames.Add(name) } @@ -95,25 +60,24 @@ func (g *globCollection[T]) resourceSet(name string) { // remaining non-nil values in the collection (or no values at all), the subscribers are all notified // that the collection has been deleted. func (g *globCollection[T]) resourceCleared(name string) { - g.nonNilValueNamesLock.Lock() - defer g.nonNilValueNamesLock.Unlock() + g.lock.Lock() + defer g.lock.Unlock() g.nonNilValueNames.Remove(name) - if len(g.nonNilValueNames) == 0 { - g.subscribersAndValuesLock.Lock() - defer g.subscribersAndValuesLock.Unlock() + if len(g.nonNilValueNames) > 0 { + return + } - deletedAt := time.Now() + deletedAt := time.Now() - subscribers, _ := g.subscribers.Iterator() - for handler, subscribedAt := range subscribers { - handler.Notify(g.url, nil, ads.SubscriptionMetadata{ - SubscribedAt: subscribedAt, - ModifiedAt: deletedAt, - CachedAt: deletedAt, - GlobCollectionURL: g.url, - }) - } + subscribers, _ := g.subscribers.Iterator() + for handler, subscribedAt := range subscribers { + handler.Notify(g.url, nil, ads.SubscriptionMetadata{ + SubscribedAt: subscribedAt, + ModifiedAt: deletedAt, + CachedAt: deletedAt, + GlobCollectionURL: g.url, + }) } } diff --git a/internal/cache/glob_collections_map.go b/internal/cache/glob_collections_map.go index 979ee8f..aad53e7 100644 --- a/internal/cache/glob_collections_map.go +++ b/internal/cache/glob_collections_map.go @@ -2,9 +2,10 @@ package internal import ( "log/slog" + "sync" + "time" "github.com/linkedin/diderot/ads" - "github.com/linkedin/diderot/internal/utils" "google.golang.org/protobuf/proto" ) @@ -28,11 +29,7 @@ func (gcm *GlobCollectionsMap[T]) createOrModifyCollection( gcm.collections.Compute( gcURL, func(gcURL ads.GlobCollectionURL) *globCollection[T] { - gc := &globCollection[T]{ - url: gcURL.String(), - values: make(utils.Set[*WatchableValue[T]]), - nonNilValueNames: make(utils.Set[string]), - } + gc := newGlobCollection[T](gcURL.String()) slog.Debug("Created collection", "url", gcURL) return gc }, @@ -44,8 +41,8 @@ func (gcm *GlobCollectionsMap[T]) createOrModifyCollection( // value in it. func (gcm *GlobCollectionsMap[T]) PutValueInCollection(gcURL ads.GlobCollectionURL, value *WatchableValue[T]) { gcm.createOrModifyCollection(gcURL, func(gcURL ads.GlobCollectionURL, collection *globCollection[T]) { - collection.subscribersAndValuesLock.Lock() - defer collection.subscribersAndValuesLock.Unlock() + collection.lock.Lock() + defer collection.lock.Unlock() value.globCollection = collection collection.values.Add(value) @@ -58,8 +55,8 @@ func (gcm *GlobCollectionsMap[T]) PutValueInCollection(gcURL ads.GlobCollectionU func (gcm *GlobCollectionsMap[T]) RemoveValueFromCollection(gcURL ads.GlobCollectionURL, value *WatchableValue[T]) { var isEmpty bool gcm.collections.ComputeIfPresent(gcURL, func(gcURL ads.GlobCollectionURL, collection *globCollection[T]) { - collection.subscribersAndValuesLock.Lock() - defer collection.subscribersAndValuesLock.Unlock() + collection.lock.Lock() + defer collection.lock.Unlock() collection.values.Remove(value) @@ -71,11 +68,36 @@ func (gcm *GlobCollectionsMap[T]) RemoveValueFromCollection(gcURL ads.GlobCollec } // Subscribe creates or gets the corresponding collection for the given URL using -// createOrModifyCollection, then invokes globCollection.subscribe with the given handler. -func (gcm *GlobCollectionsMap[T]) Subscribe(gcURL ads.GlobCollectionURL, handler ads.SubscriptionHandler[T]) { +// createOrModifyCollection. It adds the given handler as a subscriber to the collection, then +// iterates through all the values in the collection, notifying the handler for each value. If the +// collection is empty, the handler will be notified that the resource is deleted. See the +// documentation on [WatchableValue.NotifyHandlerAfterSubscription] for more insight on the returned +// [sync.WaitGroup] slice. +func (gcm *GlobCollectionsMap[T]) Subscribe( + gcURL ads.GlobCollectionURL, handler ads.SubscriptionHandler[T], +) (wgs []*sync.WaitGroup) { gcm.createOrModifyCollection(gcURL, func(_ ads.GlobCollectionURL, collection *globCollection[T]) { - collection.subscribe(handler) + subscribedAt, version := collection.subscribers.Subscribe(handler) + + collection.lock.RLock() + defer collection.lock.RUnlock() + + if len(collection.nonNilValueNames) == 0 { + handler.Notify(collection.url, nil, ads.SubscriptionMetadata{ + SubscribedAt: subscribedAt, + ModifiedAt: time.Time{}, + CachedAt: time.Time{}, + }) + } else { + for v := range collection.values { + wg := v.NotifyHandlerAfterSubscription(handler, GlobSubscription, subscribedAt, version) + if wg != nil { + wgs = append(wgs, wg) + } + } + } }) + return wgs } // Unsubscribe invokes globCollection.unsubscribe on the collection for the given URL, if it exists. @@ -83,7 +105,11 @@ func (gcm *GlobCollectionsMap[T]) Subscribe(gcURL ads.GlobCollectionURL, handler func (gcm *GlobCollectionsMap[T]) Unsubscribe(gcURL ads.GlobCollectionURL, handler ads.SubscriptionHandler[T]) { var isEmpty bool gcm.collections.ComputeIfPresent(gcURL, func(_ ads.GlobCollectionURL, collection *globCollection[T]) { - isEmpty = collection.unsubscribe(handler) + collection.lock.RLock() + defer collection.lock.RUnlock() + + collection.subscribers.Unsubscribe(handler) + isEmpty = collection.hasNoValuesOrSubscribersNoLock() }) if isEmpty { gcm.deleteCollectionIfEmpty(gcURL) @@ -105,7 +131,8 @@ func (gcm *GlobCollectionsMap[T]) deleteCollectionIfEmpty(gcURL ads.GlobCollecti // IsSubscribed checks if the given handler is subscribed to the collection. func (gcm *GlobCollectionsMap[T]) IsSubscribed(gcURL ads.GlobCollectionURL, handler ads.SubscriptionHandler[T]) (subscribed bool) { gcm.collections.ComputeIfPresent(gcURL, func(_ ads.GlobCollectionURL, collection *globCollection[T]) { - subscribed = collection.isSubscribed(handler) + // Locking is not required here, as SubscriberSet is safe for concurrent access. + subscribed = collection.subscribers.IsSubscribed(handler) }) return subscribed } diff --git a/internal/cache/watchable_value.go b/internal/cache/watchable_value.go index 3ee99fa..317c7ce 100644 --- a/internal/cache/watchable_value.go +++ b/internal/cache/watchable_value.go @@ -52,8 +52,12 @@ type WatchableValue[T proto.Message] struct { // loopStatus stores the current state of the loop, see startNotificationLoop for additional // details. loopStatus loopStatus - // loopWg is incremented every time the loop starts, and decremented whenever it completes. - loopWg sync.WaitGroup + // subscriberWg is used when NotifyHandlerAfterSubscription is invoked while the notification loop is + // running. It will be set by NotifyHandlerAfterSubscription (if not already set) to indicate to the + // notification loop that a subscriber is currently waiting for the loop to send the notification of + // the current value instead of NotifyHandlerAfterSubscription doing it directly. This is done to + // avoid double notifications. + subscriberWg *sync.WaitGroup // SubscriberSets is holds all the async.SubscriberSet instances relevant to this WatchableValue. SubscriberSets [subscriptionTypes]*SubscriberSet[T] // lastSeenSubscriberSetVersions stores the SubscriberSetVersion of the most-recently iterated @@ -82,9 +86,9 @@ func (v *WatchableValue[T]) IsSubscribed(handler ads.SubscriptionHandler[T]) boo return v.SubscriberSets[ExplicitSubscription].IsSubscribed(handler) } -func (v *WatchableValue[T]) Subscribe(handler ads.SubscriptionHandler[T]) { +func (v *WatchableValue[T]) Subscribe(handler ads.SubscriptionHandler[T]) *sync.WaitGroup { subscribedAt, version := v.SubscriberSets[ExplicitSubscription].Subscribe(handler) - v.NotifyHandlerAfterSubscription(handler, ExplicitSubscription, subscribedAt, version) + return v.NotifyHandlerAfterSubscription(handler, ExplicitSubscription, subscribedAt, version) } func (v *WatchableValue[T]) Unsubscribe(handler ads.SubscriptionHandler[T]) (empty bool) { @@ -198,9 +202,18 @@ const ( ) // NotifyHandlerAfterSubscription should be invoked by subscribers after subscribing to the -// corresponding SubscriberSet. This function is guaranteed to only return once the handler has been -// notified of the current value, since the xDS protocol spec explicitly states that an explicit -// subscription to an entry must always be respected by sending the current value: +// corresponding SubscriberSet. If the returned [sync.WaitGroup] is nil, the subscriber has been +// notified of the current value. Otherwise, the subscriber will only be guaranteed to have been +// notified once the WaitGroup is done (more details can be found in the inline comments of this +// function, but the high level reason for this is that the notification loop will handle the +// notification, which avoids double notifications). A WaitGroup is returned instead of this function +// blocking inline to avoid a number of deadlocks that can occur in glob collections. The WaitGroup +// should only be waited on *outside* of the glob collection critical sections such as +// [GlobCollectionsMap.Subscribe] to avoid these deadlocks. +// +// It is critical that the WaitGroup be waited on before the top-level Subscribe function in the root +// package returns, as it the API contract established by the cache. It also helps the implementation +// of the xDS protocol, since it explicitly states the following: // https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#subscribing-to-resources // // A resource_names_subscribe field may contain resource names that the server believes the client is @@ -212,7 +225,7 @@ func (v *WatchableValue[T]) NotifyHandlerAfterSubscription( subType subscriptionType, subscribedAt time.Time, version SubscriberSetVersion, -) { +) *sync.WaitGroup { v.lock.Lock() value := v.readWithMetadataNoLock() @@ -227,19 +240,25 @@ func (v *WatchableValue[T]) NotifyHandlerAfterSubscription( // exit immediately. case v.loopStatus != running && v.lastSeenSubscriberSetVersions[subType] >= version: v.lock.Unlock() - // Since lastSeenSubscriberSetVersions is updated by the notification loop goroutine while holding the lock, - // it can be used to track whether the loop goroutine has already picked up the new + // Since lastSeenSubscriberSetVersions is updated by the notification loop goroutine while holding + // the lock, it can be used to track whether the loop goroutine has already picked up the new // SubscriptionHandler and will notify it as part of its ongoing execution. In this case, the - // subscriber goroutine simply waits for the loop to complete to guarantee that the handler has been - // notified. This is as opposed to notifying the handler directly even though the loop is running, - // potentially resulting in a double notification. + // subscriber goroutine should wait on the returned [sync.WaitGroup], which waits for the loop to + // complete to guarantee that the handler has been notified. This is as opposed to notifying the + // handler directly even though the loop is running, potentially resulting in a double notification. case v.loopStatus == initialized || (v.loopStatus == running && v.lastSeenSubscriberSetVersions[subType] >= version): + if v.subscriberWg == nil { + v.subscriberWg = new(sync.WaitGroup) + v.subscriberWg.Add(1) + } + wg := v.subscriberWg v.lock.Unlock() - v.loopWg.Wait() + return wg default: handler.Notify(v.name, value.resource, value.subscriptionMetadata(subscribedAt)) v.lock.Unlock() } + return nil } // startNotificationLoop spawns a goroutine that will notify all the subscribers to this entry of the @@ -256,10 +275,8 @@ func (v *WatchableValue[T]) startNotificationLoop() { } v.loopStatus = initialized - v.loopWg.Add(1) go func() { - defer v.loopWg.Done() for { v.lock.Lock() value := v.readWithMetadataNoLock() @@ -291,18 +308,24 @@ func (v *WatchableValue[T]) startNotificationLoop() { v.lock.Lock() done := v.valuesFromDifferentPrioritySources[v.currentIndex] == value.resource + var wg *sync.WaitGroup if done { // At this point, the most recent value was successfully pushed to all subscribers since it has not // changed from when it was initially read at the top of the loop. Since the lock is currently held, // setting loopRunning to false will signal to the next invocation of startNotificationLoop that the // loop routine is not running. v.loopStatus = notRunning + wg = v.subscriberWg + v.subscriberWg = nil } // Otherwise, if done isn't true then the value changed in between notifying the subscribers and // grabbing the lock. In this case the loop will restart, reusing the goroutine. v.lock.Unlock() if done { + if wg != nil { + wg.Done() + } return } } diff --git a/server.go b/server.go index 7f8054e..c3cbf19 100644 --- a/server.go +++ b/server.go @@ -418,7 +418,11 @@ type ResourceLocator interface { // Subscribe subscribes the given handler to the desired resource. The returned function should // execute the unsubscription to the resource. The desired behavior when a client resubscribes to a // resource is for the resource to be re-sent. To achieve this, the returned unsubscription function - // will be called, then [Subscribe] will be called again with the same parameters. + // will be called, then [Subscribe] will be called again with the same parameters. Additionally, this + // function should only return when the given subscription handler has received the notification(s) + // for the corresponding subscription(s). It allows the server implementation to correctly batch + // resources for more efficient responses, as well as skip the granular limiter (if enabled). This is + // especially important during the initial bulk subscriptions a client will emit at startup. // // Note: There is no clear provision in the protocol for what to do if a client sends a request for a // type that is unsupported by this server. Therefore, this is not explicitly handled by the Server.