diff --git a/cache.go b/cache.go index 288af19..b285922 100644 --- a/cache.go +++ b/cache.go @@ -2,6 +2,7 @@ package diderot import ( "fmt" + "sync" "time" "github.com/linkedin/diderot/ads" @@ -216,24 +217,44 @@ func (c *cache[T]) IsSubscribedTo(name string, handler ads.SubscriptionHandler[T } func (c *cache[T]) Subscribe(name string, handler ads.SubscriptionHandler[T]) { + wait := func(wgs ...*sync.WaitGroup) { + for _, wg := range wgs { + wg.Wait() + } + } + if name == ads.WildcardSubscription { subscribedAt, version := c.wildcardSubscribers.Subscribe(handler) - c.EntryNames(func(name string) bool { + + var waitGroups []*sync.WaitGroup + for name := range c.EntryNames { // Cannot call c.Subscribe here because it always creates a backing watchableValue if it does not // already exist. For wildcard subscriptions, if the entry doesn't exist (or in this case has been // deleted), a subscription isn't necessary. If the entry reappears, it will be automatically // subscribed to. c.resources.ComputeIfPresent(name, func(name string, value *internal.WatchableValue[T]) { - value.NotifyHandlerAfterSubscription(handler, internal.WildcardSubscription, subscribedAt, version) + wg := value.NotifyHandlerAfterSubscription( + handler, + internal.WildcardSubscription, + subscribedAt, + version, + ) + if wg != nil { + waitGroups = append(waitGroups, wg) + } }) - return true - }) + } + wait(waitGroups...) } else if gcURL, err := ads.ParseGlobCollectionURL(name, c.trimmedTypeURL); err == nil { - c.globCollections.Subscribe(gcURL, handler) + wait(c.globCollections.Subscribe(gcURL, handler)...) } else { + var wg *sync.WaitGroup c.createOrModifyEntry(name, func(name string, value *internal.WatchableValue[T]) { - value.Subscribe(handler) + wg = value.Subscribe(handler) }) + if wg != nil { + wg.Wait() + } } } @@ -337,7 +358,11 @@ type cacheWithPriority[T proto.Message] struct { func (c *cacheWithPriority[T]) Clear(name string, clearedAt time.Time) { var shouldDelete bool c.resources.ComputeIfPresent(name, func(name string, value *internal.WatchableValue[T]) { - shouldDelete = value.Clear(c.p, clearedAt) && value.SubscriberSets[internal.ExplicitSubscription].Size() == 0 + isFullClear := value.Clear(c.p, clearedAt) + if gcURL, err := ads.ExtractGlobCollectionURLFromResourceURN(name, c.trimmedTypeURL); err == nil { + c.globCollections.RemoveValueFromCollection(gcURL, value) + } + shouldDelete = isFullClear && value.SubscriberSets[internal.ExplicitSubscription].Size() == 0 }) if shouldDelete { c.deleteEntryIfNilAndNoSubscribers(name) diff --git a/cache_test.go b/cache_test.go index 75ffa92..08e27db 100644 --- a/cache_test.go +++ b/cache_test.go @@ -3,9 +3,12 @@ package diderot_test import ( "fmt" "maps" + "math/rand/v2" + "slices" "sort" "strconv" "sync" + "sync/atomic" "testing" "time" @@ -33,6 +36,8 @@ const ( name3 = "r3" ) +var globCollectionPrefix = ads.XDSTPScheme + "/" + diderot.TypeOf[*Timestamp]().TrimmedURL() + "/" + var noTime time.Time func TestCacheCrud(t *testing.T) { @@ -338,7 +343,7 @@ func TestNotifyMetadata(t *testing.T) { // loop. The loop should abort and not call the remaining subscribers. It should instead restart and run through each // subscriber with the updated value. func TestWatchableValueUpdateCancel(t *testing.T) { - prefix := ads.XDSTPScheme + "/" + diderot.TypeOf[*Timestamp]().TrimmedURL() + "/foo/" + prefix := globCollectionPrefix + "foo/" c := newCache() @@ -442,20 +447,20 @@ func TestCacheEntryDeletion(t *testing.T) { func TestCacheCollections(t *testing.T) { c := diderot.NewCache[*Timestamp]() - const prefix = "xdstp:///google.protobuf.Timestamp/" + const prefix = "xdstp:///google.protobuf.Timestamp/a/" h := make(testutils.ChanSubscriptionHandler[*Timestamp], 1) - c.Subscribe(prefix+"a/*", h) - h.WaitForDelete(t, prefix+"a/*") + c.Subscribe(prefix+"*", h) + h.WaitForDelete(t, prefix+"*") - c.Subscribe(prefix+"a/foo", h) - h.WaitForDelete(t, prefix+"a/foo") + c.Subscribe(prefix+"foo", h) + h.WaitForDelete(t, prefix+"foo") var updates []testutils.ExpectedNotification[*Timestamp] var deletes []testutils.ExpectedNotification[*Timestamp] for i := 0; i < 5; i++ { - name, v := prefix+"a/"+strconv.Itoa(i), strconv.Itoa(i) + name, v := prefix+""+strconv.Itoa(i), strconv.Itoa(i) updates = append(updates, testutils.ExpectUpdate(c.Set(name, v, Now(), noTime))) deletes = append(deletes, testutils.ExpectDelete[*Timestamp](name)) } @@ -468,7 +473,7 @@ func TestCacheCollections(t *testing.T) { h.WaitForNotifications(t, deletes...) - h.WaitForDelete(t, prefix+"a/*") + h.WaitForDelete(t, prefix+"*") } // TestCache raw validates that the various *Raw methods on the cache work as expected. Namely, raw @@ -798,3 +803,147 @@ func DisableTime(tb testing.TB) { internal.SetTimeProvider(time.Now) }) } + +// The following tests flex various critical sections in the way glob collections are handled (along +// with almost all the cache), to attempt to trigger a race condition. The tests must be run with +// -race for this to have any use. +func TestGlobRace(t *testing.T) { + prefix := globCollectionPrefix + "foo/" + + // This tests many writers all competing for writes against overlapping entries. + t.Run("update", func(t *testing.T) { + c := newCache() + + const ( + entries = 100 + writers = 100 + count = 100 + readers = 100 + + doneVersion = "done" + ) + + entryNames := make([]string, entries) + for i := range entryNames { + name := prefix + strconv.Itoa(i) + entryNames[i] = name + } + + var writesDone, readsDone sync.WaitGroup + writesDone.Add(writers) + readsDone.Add(writers * readers) + + for range readers { + h := testutils.NewSubscriptionHandler(func(name string, r *ads.Resource[*Timestamp], _ ads.SubscriptionMetadata) { + if r.Version == doneVersion { + readsDone.Done() + } + }) + c.Subscribe(ads.WildcardSubscription, h) + } + + for range writers { + names := slices.Clone(entryNames) + shuffle := func() []string { + rand.Shuffle(len(names), func(i, j int) { + names[i], names[j] = names[j], names[i] + }) + return names + } + go func() { + defer writesDone.Done() + for range count { + for _, name := range shuffle() { + c.Set(name, "1", new(Timestamp), time.Time{}) + } + } + }() + } + go func() { + writesDone.Wait() + for _, name := range entryNames { + c.Set(name, doneVersion, new(Timestamp), time.Time{}) + } + }() + + readsDone.Wait() + }) + // This tests subscribing to a collection that is currently being updated. + t.Run("subscribe", func(t *testing.T) { + c := newCache() + + stop := make(chan struct{}) + t.Cleanup(func() { close(stop) }) + + for i := range 1 { + name := prefix + strconv.Itoa(i) + go func() { + for { + select { + case <-stop: + return + default: + } + c.Set(name, "", new(Timestamp), noTime) + time.Sleep(time.Nanosecond) + c.Clear(name, noTime) + } + }() + } + h := testutils.NewSubscriptionHandler[*Timestamp](func(string, *ads.Resource[*Timestamp], ads.SubscriptionMetadata) {}) + + // 1000 chosen arbitrarily, can be increased to increase likelihood of race condition, but the test + // will take longer to run. + for range 1000 { + c.Subscribe(prefix+ads.WildcardSubscription, h) + } + }) + // This tests multiple subscribers all subscribing at once. + t.Run("concurrent subscriptions", func(t *testing.T) { + c := newCache() + + const ( + entries = 100 + subscribers = 100 + ) + + for i := range entries { + name := prefix + strconv.Itoa(i) + c.Set(name, "", Now(), noTime) + } + + var done sync.WaitGroup + done.Add(subscribers) + + // Set to true if inFlight is greater than one. + var multipleInFlight atomic.Bool + // Incremented when the Subscription loop starts, and decremented when it ends. If only one + // subscriber can go through the elements of a glob collection at once, then this will never be + // greater than once, and multipleInFlight will therefore never be set to true, failing the test. + var inFlight atomic.Int32 + + for range subscribers { + go func() { + defer done.Done() + var remainingEntries atomic.Int32 + remainingEntries.Add(entries) + handlerFunc := func(string, *ads.Resource[*Timestamp], ads.SubscriptionMetadata) { + switch remainingEntries.Add(-1) { + case entries - 1: + if inFlight.Add(1) > 1 { + multipleInFlight.Store(true) + } + case 0: + inFlight.Add(-1) + } + } + + c.Subscribe(prefix+ads.WildcardSubscription, testutils.NewSubscriptionHandler[*Timestamp](handlerFunc)) + }() + } + + done.Wait() + require.Equal(t, int32(0), inFlight.Load()) + require.True(t, multipleInFlight.Load()) + }) +} diff --git a/internal/cache/glob_collection.go b/internal/cache/glob_collection.go index 86db12d..39d6409 100644 --- a/internal/cache/glob_collection.go +++ b/internal/cache/glob_collection.go @@ -15,15 +15,12 @@ type globCollection[T proto.Message] struct { // GlobCollectionURL to avoid repeated redundant calls to GlobCollectionURL.String. url string - // Protects subscribers and values. - subscribersAndValuesLock sync.RWMutex // The current subscribers to this collection. subscribers SubscriberSet[T] + // Protects values and nonNilValueNames. + lock sync.RWMutex // The set of values in the collection, used by new subscribers to subscribe to all values. values utils.Set[*WatchableValue[T]] - - // Protects nonNilValueNames - nonNilValueNamesLock sync.Mutex // The set of all non-nil resource names in this collection. Used to track whether a collection is // empty. Note that a collection can be empty even if values is non-empty since values that are // explicitly subscribed to are kept in the collection/cache to track the subscription in case the @@ -31,62 +28,30 @@ type globCollection[T proto.Message] struct { nonNilValueNames utils.Set[string] } +func newGlobCollection[T proto.Message](url string) *globCollection[T] { + return &globCollection[T]{ + url: url, + values: make(utils.Set[*WatchableValue[T]]), + nonNilValueNames: make(utils.Set[string]), + } +} + func (g *globCollection[T]) hasNoValuesOrSubscribersNoLock() bool { return len(g.values) == 0 && g.subscribers.Size() == 0 } // hasNoValuesOrSubscribers returns true if the collection is empty and has no subscribers. func (g *globCollection[T]) hasNoValuesOrSubscribers() bool { - g.subscribersAndValuesLock.RLock() - defer g.subscribersAndValuesLock.RUnlock() + g.lock.RLock() + defer g.lock.RUnlock() return g.hasNoValuesOrSubscribersNoLock() } -// isSubscribed checks if the given handler is already subscribed to the collection. -func (g *globCollection[T]) isSubscribed(handler ads.SubscriptionHandler[T]) bool { - g.subscribersAndValuesLock.Lock() - defer g.subscribersAndValuesLock.Unlock() - - return g.subscribers.IsSubscribed(handler) -} - -// subscribe adds the given handler as a subscriber to the collection, and iterates through all the -// values in the collection, notifying the handler for each value. If the collection is empty, the -// handler will be notified that the resource is deleted. -func (g *globCollection[T]) subscribe(handler ads.SubscriptionHandler[T]) { - g.subscribersAndValuesLock.Lock() - defer g.subscribersAndValuesLock.Unlock() - - subscribedAt, version := g.subscribers.Subscribe(handler) - - if len(g.nonNilValueNames) == 0 { - handler.Notify(g.url, nil, ads.SubscriptionMetadata{ - SubscribedAt: subscribedAt, - ModifiedAt: time.Time{}, - CachedAt: time.Time{}, - }) - } else { - for v := range g.values { - v.NotifyHandlerAfterSubscription(handler, GlobSubscription, subscribedAt, version) - } - } -} - -// unsubscribe unsubscribes the given handler from the collection. Returns true if the collection has -// no subscribers and is empty. -func (g *globCollection[T]) unsubscribe(handler ads.SubscriptionHandler[T]) bool { - g.subscribersAndValuesLock.Lock() - defer g.subscribersAndValuesLock.Unlock() - - g.subscribers.Unsubscribe(handler) - return g.hasNoValuesOrSubscribersNoLock() -} - // resourceSet notifies the collection that the given resource has been created. func (g *globCollection[T]) resourceSet(name string) { - g.nonNilValueNamesLock.Lock() - defer g.nonNilValueNamesLock.Unlock() + g.lock.Lock() + defer g.lock.Unlock() g.nonNilValueNames.Add(name) } @@ -95,25 +60,24 @@ func (g *globCollection[T]) resourceSet(name string) { // remaining non-nil values in the collection (or no values at all), the subscribers are all notified // that the collection has been deleted. func (g *globCollection[T]) resourceCleared(name string) { - g.nonNilValueNamesLock.Lock() - defer g.nonNilValueNamesLock.Unlock() + g.lock.Lock() + defer g.lock.Unlock() g.nonNilValueNames.Remove(name) - if len(g.nonNilValueNames) == 0 { - g.subscribersAndValuesLock.Lock() - defer g.subscribersAndValuesLock.Unlock() + if len(g.nonNilValueNames) > 0 { + return + } - deletedAt := time.Now() + deletedAt := time.Now() - subscribers, _ := g.subscribers.Iterator() - for handler, subscribedAt := range subscribers { - handler.Notify(g.url, nil, ads.SubscriptionMetadata{ - SubscribedAt: subscribedAt, - ModifiedAt: deletedAt, - CachedAt: deletedAt, - GlobCollectionURL: g.url, - }) - } + subscribers, _ := g.subscribers.Iterator() + for handler, subscribedAt := range subscribers { + handler.Notify(g.url, nil, ads.SubscriptionMetadata{ + SubscribedAt: subscribedAt, + ModifiedAt: deletedAt, + CachedAt: deletedAt, + GlobCollectionURL: g.url, + }) } } diff --git a/internal/cache/glob_collections_map.go b/internal/cache/glob_collections_map.go index 979ee8f..aad53e7 100644 --- a/internal/cache/glob_collections_map.go +++ b/internal/cache/glob_collections_map.go @@ -2,9 +2,10 @@ package internal import ( "log/slog" + "sync" + "time" "github.com/linkedin/diderot/ads" - "github.com/linkedin/diderot/internal/utils" "google.golang.org/protobuf/proto" ) @@ -28,11 +29,7 @@ func (gcm *GlobCollectionsMap[T]) createOrModifyCollection( gcm.collections.Compute( gcURL, func(gcURL ads.GlobCollectionURL) *globCollection[T] { - gc := &globCollection[T]{ - url: gcURL.String(), - values: make(utils.Set[*WatchableValue[T]]), - nonNilValueNames: make(utils.Set[string]), - } + gc := newGlobCollection[T](gcURL.String()) slog.Debug("Created collection", "url", gcURL) return gc }, @@ -44,8 +41,8 @@ func (gcm *GlobCollectionsMap[T]) createOrModifyCollection( // value in it. func (gcm *GlobCollectionsMap[T]) PutValueInCollection(gcURL ads.GlobCollectionURL, value *WatchableValue[T]) { gcm.createOrModifyCollection(gcURL, func(gcURL ads.GlobCollectionURL, collection *globCollection[T]) { - collection.subscribersAndValuesLock.Lock() - defer collection.subscribersAndValuesLock.Unlock() + collection.lock.Lock() + defer collection.lock.Unlock() value.globCollection = collection collection.values.Add(value) @@ -58,8 +55,8 @@ func (gcm *GlobCollectionsMap[T]) PutValueInCollection(gcURL ads.GlobCollectionU func (gcm *GlobCollectionsMap[T]) RemoveValueFromCollection(gcURL ads.GlobCollectionURL, value *WatchableValue[T]) { var isEmpty bool gcm.collections.ComputeIfPresent(gcURL, func(gcURL ads.GlobCollectionURL, collection *globCollection[T]) { - collection.subscribersAndValuesLock.Lock() - defer collection.subscribersAndValuesLock.Unlock() + collection.lock.Lock() + defer collection.lock.Unlock() collection.values.Remove(value) @@ -71,11 +68,36 @@ func (gcm *GlobCollectionsMap[T]) RemoveValueFromCollection(gcURL ads.GlobCollec } // Subscribe creates or gets the corresponding collection for the given URL using -// createOrModifyCollection, then invokes globCollection.subscribe with the given handler. -func (gcm *GlobCollectionsMap[T]) Subscribe(gcURL ads.GlobCollectionURL, handler ads.SubscriptionHandler[T]) { +// createOrModifyCollection. It adds the given handler as a subscriber to the collection, then +// iterates through all the values in the collection, notifying the handler for each value. If the +// collection is empty, the handler will be notified that the resource is deleted. See the +// documentation on [WatchableValue.NotifyHandlerAfterSubscription] for more insight on the returned +// [sync.WaitGroup] slice. +func (gcm *GlobCollectionsMap[T]) Subscribe( + gcURL ads.GlobCollectionURL, handler ads.SubscriptionHandler[T], +) (wgs []*sync.WaitGroup) { gcm.createOrModifyCollection(gcURL, func(_ ads.GlobCollectionURL, collection *globCollection[T]) { - collection.subscribe(handler) + subscribedAt, version := collection.subscribers.Subscribe(handler) + + collection.lock.RLock() + defer collection.lock.RUnlock() + + if len(collection.nonNilValueNames) == 0 { + handler.Notify(collection.url, nil, ads.SubscriptionMetadata{ + SubscribedAt: subscribedAt, + ModifiedAt: time.Time{}, + CachedAt: time.Time{}, + }) + } else { + for v := range collection.values { + wg := v.NotifyHandlerAfterSubscription(handler, GlobSubscription, subscribedAt, version) + if wg != nil { + wgs = append(wgs, wg) + } + } + } }) + return wgs } // Unsubscribe invokes globCollection.unsubscribe on the collection for the given URL, if it exists. @@ -83,7 +105,11 @@ func (gcm *GlobCollectionsMap[T]) Subscribe(gcURL ads.GlobCollectionURL, handler func (gcm *GlobCollectionsMap[T]) Unsubscribe(gcURL ads.GlobCollectionURL, handler ads.SubscriptionHandler[T]) { var isEmpty bool gcm.collections.ComputeIfPresent(gcURL, func(_ ads.GlobCollectionURL, collection *globCollection[T]) { - isEmpty = collection.unsubscribe(handler) + collection.lock.RLock() + defer collection.lock.RUnlock() + + collection.subscribers.Unsubscribe(handler) + isEmpty = collection.hasNoValuesOrSubscribersNoLock() }) if isEmpty { gcm.deleteCollectionIfEmpty(gcURL) @@ -105,7 +131,8 @@ func (gcm *GlobCollectionsMap[T]) deleteCollectionIfEmpty(gcURL ads.GlobCollecti // IsSubscribed checks if the given handler is subscribed to the collection. func (gcm *GlobCollectionsMap[T]) IsSubscribed(gcURL ads.GlobCollectionURL, handler ads.SubscriptionHandler[T]) (subscribed bool) { gcm.collections.ComputeIfPresent(gcURL, func(_ ads.GlobCollectionURL, collection *globCollection[T]) { - subscribed = collection.isSubscribed(handler) + // Locking is not required here, as SubscriberSet is safe for concurrent access. + subscribed = collection.subscribers.IsSubscribed(handler) }) return subscribed } diff --git a/internal/cache/watchable_value.go b/internal/cache/watchable_value.go index 3ee99fa..317c7ce 100644 --- a/internal/cache/watchable_value.go +++ b/internal/cache/watchable_value.go @@ -52,8 +52,12 @@ type WatchableValue[T proto.Message] struct { // loopStatus stores the current state of the loop, see startNotificationLoop for additional // details. loopStatus loopStatus - // loopWg is incremented every time the loop starts, and decremented whenever it completes. - loopWg sync.WaitGroup + // subscriberWg is used when NotifyHandlerAfterSubscription is invoked while the notification loop is + // running. It will be set by NotifyHandlerAfterSubscription (if not already set) to indicate to the + // notification loop that a subscriber is currently waiting for the loop to send the notification of + // the current value instead of NotifyHandlerAfterSubscription doing it directly. This is done to + // avoid double notifications. + subscriberWg *sync.WaitGroup // SubscriberSets is holds all the async.SubscriberSet instances relevant to this WatchableValue. SubscriberSets [subscriptionTypes]*SubscriberSet[T] // lastSeenSubscriberSetVersions stores the SubscriberSetVersion of the most-recently iterated @@ -82,9 +86,9 @@ func (v *WatchableValue[T]) IsSubscribed(handler ads.SubscriptionHandler[T]) boo return v.SubscriberSets[ExplicitSubscription].IsSubscribed(handler) } -func (v *WatchableValue[T]) Subscribe(handler ads.SubscriptionHandler[T]) { +func (v *WatchableValue[T]) Subscribe(handler ads.SubscriptionHandler[T]) *sync.WaitGroup { subscribedAt, version := v.SubscriberSets[ExplicitSubscription].Subscribe(handler) - v.NotifyHandlerAfterSubscription(handler, ExplicitSubscription, subscribedAt, version) + return v.NotifyHandlerAfterSubscription(handler, ExplicitSubscription, subscribedAt, version) } func (v *WatchableValue[T]) Unsubscribe(handler ads.SubscriptionHandler[T]) (empty bool) { @@ -198,9 +202,18 @@ const ( ) // NotifyHandlerAfterSubscription should be invoked by subscribers after subscribing to the -// corresponding SubscriberSet. This function is guaranteed to only return once the handler has been -// notified of the current value, since the xDS protocol spec explicitly states that an explicit -// subscription to an entry must always be respected by sending the current value: +// corresponding SubscriberSet. If the returned [sync.WaitGroup] is nil, the subscriber has been +// notified of the current value. Otherwise, the subscriber will only be guaranteed to have been +// notified once the WaitGroup is done (more details can be found in the inline comments of this +// function, but the high level reason for this is that the notification loop will handle the +// notification, which avoids double notifications). A WaitGroup is returned instead of this function +// blocking inline to avoid a number of deadlocks that can occur in glob collections. The WaitGroup +// should only be waited on *outside* of the glob collection critical sections such as +// [GlobCollectionsMap.Subscribe] to avoid these deadlocks. +// +// It is critical that the WaitGroup be waited on before the top-level Subscribe function in the root +// package returns, as it the API contract established by the cache. It also helps the implementation +// of the xDS protocol, since it explicitly states the following: // https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#subscribing-to-resources // // A resource_names_subscribe field may contain resource names that the server believes the client is @@ -212,7 +225,7 @@ func (v *WatchableValue[T]) NotifyHandlerAfterSubscription( subType subscriptionType, subscribedAt time.Time, version SubscriberSetVersion, -) { +) *sync.WaitGroup { v.lock.Lock() value := v.readWithMetadataNoLock() @@ -227,19 +240,25 @@ func (v *WatchableValue[T]) NotifyHandlerAfterSubscription( // exit immediately. case v.loopStatus != running && v.lastSeenSubscriberSetVersions[subType] >= version: v.lock.Unlock() - // Since lastSeenSubscriberSetVersions is updated by the notification loop goroutine while holding the lock, - // it can be used to track whether the loop goroutine has already picked up the new + // Since lastSeenSubscriberSetVersions is updated by the notification loop goroutine while holding + // the lock, it can be used to track whether the loop goroutine has already picked up the new // SubscriptionHandler and will notify it as part of its ongoing execution. In this case, the - // subscriber goroutine simply waits for the loop to complete to guarantee that the handler has been - // notified. This is as opposed to notifying the handler directly even though the loop is running, - // potentially resulting in a double notification. + // subscriber goroutine should wait on the returned [sync.WaitGroup], which waits for the loop to + // complete to guarantee that the handler has been notified. This is as opposed to notifying the + // handler directly even though the loop is running, potentially resulting in a double notification. case v.loopStatus == initialized || (v.loopStatus == running && v.lastSeenSubscriberSetVersions[subType] >= version): + if v.subscriberWg == nil { + v.subscriberWg = new(sync.WaitGroup) + v.subscriberWg.Add(1) + } + wg := v.subscriberWg v.lock.Unlock() - v.loopWg.Wait() + return wg default: handler.Notify(v.name, value.resource, value.subscriptionMetadata(subscribedAt)) v.lock.Unlock() } + return nil } // startNotificationLoop spawns a goroutine that will notify all the subscribers to this entry of the @@ -256,10 +275,8 @@ func (v *WatchableValue[T]) startNotificationLoop() { } v.loopStatus = initialized - v.loopWg.Add(1) go func() { - defer v.loopWg.Done() for { v.lock.Lock() value := v.readWithMetadataNoLock() @@ -291,18 +308,24 @@ func (v *WatchableValue[T]) startNotificationLoop() { v.lock.Lock() done := v.valuesFromDifferentPrioritySources[v.currentIndex] == value.resource + var wg *sync.WaitGroup if done { // At this point, the most recent value was successfully pushed to all subscribers since it has not // changed from when it was initially read at the top of the loop. Since the lock is currently held, // setting loopRunning to false will signal to the next invocation of startNotificationLoop that the // loop routine is not running. v.loopStatus = notRunning + wg = v.subscriberWg + v.subscriberWg = nil } // Otherwise, if done isn't true then the value changed in between notifying the subscribers and // grabbing the lock. In this case the loop will restart, reusing the goroutine. v.lock.Unlock() if done { + if wg != nil { + wg.Done() + } return } } diff --git a/server.go b/server.go index 7f8054e..c3cbf19 100644 --- a/server.go +++ b/server.go @@ -418,7 +418,11 @@ type ResourceLocator interface { // Subscribe subscribes the given handler to the desired resource. The returned function should // execute the unsubscription to the resource. The desired behavior when a client resubscribes to a // resource is for the resource to be re-sent. To achieve this, the returned unsubscription function - // will be called, then [Subscribe] will be called again with the same parameters. + // will be called, then [Subscribe] will be called again with the same parameters. Additionally, this + // function should only return when the given subscription handler has received the notification(s) + // for the corresponding subscription(s). It allows the server implementation to correctly batch + // resources for more efficient responses, as well as skip the granular limiter (if enabled). This is + // especially important during the initial bulk subscriptions a client will emit at startup. // // Note: There is no clear provision in the protocol for what to do if a client sends a request for a // type that is unsupported by this server. Therefore, this is not explicitly handled by the Server.