Skip to content

Commit 381849f

Browse files
committed
switch to debounce
1 parent a8ac735 commit 381849f

File tree

3 files changed

+21
-37
lines changed

3 files changed

+21
-37
lines changed

mcp/mcp_test.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1720,11 +1720,8 @@ func TestSynchronousNotifications(t *testing.T) {
17201720
})
17211721

17221722
t.Run("from server", func(t *testing.T) {
1723-
// Because server change notifications are batched, we must generate a lot of them.
1724-
for range maxPendingNotifications/2 + 1 {
1725-
server.RemoveTools("tool")
1726-
addTool(server)
1727-
}
1723+
// TODO: test that multiple changes result in a single notification.
1724+
time.Sleep(notificationDelay * 2) // Wait for delayed notification.
17281725
if _, err := ss.CreateMessage(context.Background(), new(CreateMessageParams)); err != nil {
17291726
t.Errorf("CreateMessage failed: %v", err)
17301727
}

mcp/server.go

Lines changed: 17 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ type Server struct {
5151
sendingMethodHandler_ MethodHandler
5252
receivingMethodHandler_ MethodHandler
5353
resourceSubscriptions map[string]map[*ServerSession]bool // uri -> session -> bool
54-
pendingNotifications map[string]int // notification name -> count of unsent changes
54+
pendingNotifications map[string]*time.Timer // notification name -> timer for pending notification send
5555
}
5656

5757
// ServerOptions is used to configure behavior of the server.
@@ -150,7 +150,7 @@ func NewServer(impl *Implementation, options *ServerOptions) *Server {
150150
sendingMethodHandler_: defaultSendingMethodHandler[*ServerSession],
151151
receivingMethodHandler_: defaultReceivingMethodHandler[*ServerSession],
152152
resourceSubscriptions: make(map[string]map[*ServerSession]bool),
153-
pendingNotifications: make(map[string]int),
153+
pendingNotifications: make(map[string]*time.Timer),
154154
}
155155
}
156156

@@ -501,48 +501,34 @@ var changeNotificationParams = map[string]Params{
501501
notificationResourceListChanged: &ResourceListChangedParams{},
502502
}
503503

504-
// The maximum number of change notifications of a particular type (e.g. tools-changed)
505-
// that can be pending.
506-
const maxPendingNotifications = 10
507-
508504
// How long to wait before sending a change notification.
509-
var notificationDelay = 50 * time.Millisecond
505+
const notificationDelay = 10 * time.Millisecond
510506

511507
// changeAndNotify is called when a feature is added or removed.
512508
// It calls change, which should do the work and report whether a change actually occurred.
513-
// If there was a change, it notifies a snapshot of the sessions.
509+
// If there was a change, it sets a timer to send a notification.
510+
// This debounces change notifications: a single notification is sent after
511+
// multiple changes occur in close proximity.
514512
func (s *Server) changeAndNotify(notification string, change func() bool) {
515-
var sessions []*ServerSession
516-
send := false
517513
s.mu.Lock()
514+
defer s.mu.Unlock()
518515
if change() {
519-
pending := s.pendingNotifications[notification]
520-
if pending >= maxPendingNotifications {
521-
send = true
522-
pending = 0
523-
// Make a local copy of the session list so we can use it without holding the lock.
524-
sessions = slices.Clone(s.sessions)
525-
} else {
526-
pending++
527-
if pending == 1 {
528-
time.AfterFunc(notificationDelay, func() { s.sendNotification(notification) })
529-
}
516+
// Stop the outstanding delayed call, if any.
517+
if t := s.pendingNotifications[notification]; t != nil {
518+
t.Stop()
530519
}
531-
s.pendingNotifications[notification] = pending
532-
}
533-
s.mu.Unlock() // Don't hold lock during notifications.
534-
if send {
535-
notifySessions(sessions, notification, changeNotificationParams[notification])
520+
//
521+
s.pendingNotifications[notification] = time.AfterFunc(notificationDelay, func() { s.notifySessions(notification) })
536522
}
537523
}
538524

539-
// sendNotification is called asynchronously to ensure that notifications are sent
540-
// soon after they occur.
541-
func (s *Server) sendNotification(n string) {
525+
// notifySessions sends the notification n to all existing sessions.
526+
// It is called asynchronously by changeAndNotify.
527+
func (s *Server) notifySessions(n string) {
542528
s.mu.Lock()
543529
sessions := slices.Clone(s.sessions)
544-
s.pendingNotifications[n] = 0
545-
s.mu.Unlock()
530+
s.pendingNotifications[n] = nil
531+
s.mu.Unlock() // Don't hold the lock during notification: it causes deadlock.
546532
notifySessions(sessions, n, changeNotificationParams[n])
547533
}
548534

@@ -1103,7 +1089,6 @@ func (ss *ServerSession) Elicit(ctx context.Context, params *ElicitParams) (*Eli
11031089

11041090
resolved, err := schema.Resolve(nil)
11051091
if err != nil {
1106-
fmt.Printf(" resolve err: %s", err)
11071092
return nil, err
11081093
}
11091094
if err := resolved.Validate(res.Content); err != nil {

mcp/shared.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,8 @@ func notifySessions[S Session, P Params](sessions []S, method string, params P)
349349
if sessions == nil {
350350
return
351351
}
352+
// Notify with the background context, so the messages are sent on the
353+
// standalone stream.
352354
// TODO: make this timeout configurable, or call handleNotify asynchronously.
353355
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
354356
defer cancel()

0 commit comments

Comments
 (0)