Skip to content

Commit d55da90

Browse files
author
Ambient Code Bot
committed
fix: timer-based cleanup of listenQueues after transient exporter disconnect
When an exporter's Listen() gRPC stream fails with a transient error the queue is no longer deleted immediately. Instead a cleanup timer (default 2 min) is scheduled. If the exporter reconnects before the timer fires, Listen() cancels the timer and inherits the existing queue — ensuring that any router token already buffered there by a concurrent Dial() call is delivered to the reconnected exporter. On clean shutdown (ctx.Done() — lease ended or server stopping) the timer is cancelled and the queue is removed straight away, so there is no memory leak for the normal lifecycle. Fixes #414
1 parent 9fdd1ad commit d55da90

File tree

2 files changed

+133
-1
lines changed

2 files changed

+133
-1
lines changed

controller/internal/service/controller_service.go

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,15 @@ import (
6969
"google.golang.org/protobuf/proto"
7070
)
7171

72+
// listenQueueCleanupDelay is how long to keep a listen queue alive after the
73+
// exporter's stream disconnects with a transient error. It must be long enough
74+
// for the exporter to reconnect and consume any Dial token already buffered in
75+
// the queue, yet short enough to bound the memory overhead of orphaned queues.
76+
// The exporter's default retry window (5 retries × ~1 s backoff) is well under
77+
// 30 s, so 2 minutes is a conservative upper bound.
78+
// Exposed as a var so tests can shorten it without rebuilding.
79+
var listenQueueCleanupDelay = 2 * time.Minute
80+
7281
// ControllerService exposes a gRPC service
7382
type ControllerService struct {
7483
pb.UnimplementedControllerServiceServer
@@ -79,7 +88,8 @@ type ControllerService struct {
7988
Attr authorization.ContextAttributesGetter
8089
ServerOptions []grpc.ServerOption
8190
Router config.Router
82-
listenQueues sync.Map
91+
listenQueues sync.Map // key: leaseName, value: chan *pb.ListenResponse
92+
listenTimers sync.Map // key: leaseName, value: *time.Timer — deferred cleanup after transient disconnect
8393
}
8494

8595
type wrappedStream struct {
@@ -439,13 +449,34 @@ func (s *ControllerService) Listen(req *pb.ListenRequest, stream pb.ControllerSe
439449
return err
440450
}
441451

452+
// Cancel any pending cleanup timer — the exporter is reconnecting and
453+
// should inherit the existing queue (which may hold a buffered Dial token).
454+
if t, ok := s.listenTimers.LoadAndDelete(leaseName); ok {
455+
t.(*time.Timer).Stop()
456+
}
457+
442458
queue, _ := s.listenQueues.LoadOrStore(leaseName, make(chan *pb.ListenResponse, 8))
443459
for {
444460
select {
445461
case <-ctx.Done():
462+
// Clean shutdown (lease ended / server stopping): cancel any timer
463+
// and remove the queue immediately.
464+
if t, ok := s.listenTimers.LoadAndDelete(leaseName); ok {
465+
t.(*time.Timer).Stop()
466+
}
467+
s.listenQueues.Delete(leaseName)
446468
return nil
447469
case msg := <-queue.(chan *pb.ListenResponse):
448470
if err := stream.Send(msg); err != nil {
471+
// Transient stream error: schedule deferred cleanup so a
472+
// reconnecting exporter can still inherit the queue and consume
473+
// any Dial token that was buffered between the error and now.
474+
// listenQueueCleanupDelay gives the exporter time to reconnect.
475+
t := time.AfterFunc(listenQueueCleanupDelay, func() {
476+
s.listenQueues.Delete(leaseName)
477+
s.listenTimers.Delete(leaseName)
478+
})
479+
s.listenTimers.Store(leaseName, t)
449480
return err
450481
}
451482
}

controller/internal/service/controller_service_test.go

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package service
1818

1919
import (
2020
"testing"
21+
"time"
2122

2223
jumpstarterdevv1alpha1 "github.com/jumpstarter-dev/jumpstarter-controller/api/v1alpha1"
2324
pb "github.com/jumpstarter-dev/jumpstarter-controller/internal/protocol/jumpstarter/v1"
@@ -299,6 +300,106 @@ func TestSyncOnlineConditionWithStatus(t *testing.T) {
299300
}
300301
}
301302

303+
// TestListenQueueTimerCleanup verifies that the listen queue is NOT removed
304+
// immediately when a transient stream error occurs, giving a reconnecting
305+
// exporter time to inherit the queue (and any buffered Dial token), and that
306+
// the queue IS removed once the cleanup timer fires.
307+
func TestListenQueueTimerCleanup(t *testing.T) {
308+
// Shorten the delay so the test completes quickly.
309+
original := listenQueueCleanupDelay
310+
listenQueueCleanupDelay = 50 * time.Millisecond
311+
t.Cleanup(func() { listenQueueCleanupDelay = original })
312+
313+
svc := &ControllerService{}
314+
leaseName := "test-lease"
315+
316+
// Seed the queue as Listen() would via LoadOrStore.
317+
ch := make(chan *pb.ListenResponse, 8)
318+
svc.listenQueues.Store(leaseName, ch)
319+
320+
// Simulate the stream-error path: schedule deferred cleanup.
321+
t.Run("queue survives transient error", func(t *testing.T) {
322+
timer := time.AfterFunc(listenQueueCleanupDelay, func() {
323+
svc.listenQueues.Delete(leaseName)
324+
svc.listenTimers.Delete(leaseName)
325+
})
326+
svc.listenTimers.Store(leaseName, timer)
327+
328+
// Queue must still be present immediately after the error.
329+
if _, ok := svc.listenQueues.Load(leaseName); !ok {
330+
t.Fatal("listen queue was removed immediately after stream error — Dial token would be lost")
331+
}
332+
})
333+
334+
t.Run("reconnecting exporter cancels cleanup timer", func(t *testing.T) {
335+
// Simulate Listen() reconnect: cancel the timer and call LoadOrStore.
336+
if raw, ok := svc.listenTimers.LoadAndDelete(leaseName); ok {
337+
raw.(*time.Timer).Stop()
338+
}
339+
got, _ := svc.listenQueues.LoadOrStore(leaseName, make(chan *pb.ListenResponse, 8))
340+
if got != ch {
341+
t.Fatal("reconnecting Listen() did not inherit the existing queue")
342+
}
343+
344+
// Wait well past the original delay — the queue must still be present
345+
// because the timer was stopped.
346+
time.Sleep(listenQueueCleanupDelay * 4)
347+
if _, ok := svc.listenQueues.Load(leaseName); !ok {
348+
t.Fatal("listen queue was removed even though cleanup timer was cancelled")
349+
}
350+
})
351+
352+
t.Run("timer fires and removes queue when exporter does not reconnect", func(t *testing.T) {
353+
// Re-arm the timer without cancelling it this time.
354+
timer := time.AfterFunc(listenQueueCleanupDelay, func() {
355+
svc.listenQueues.Delete(leaseName)
356+
svc.listenTimers.Delete(leaseName)
357+
})
358+
svc.listenTimers.Store(leaseName, timer)
359+
360+
// Wait for the timer to fire.
361+
time.Sleep(listenQueueCleanupDelay * 4)
362+
if _, ok := svc.listenQueues.Load(leaseName); ok {
363+
t.Fatal("listen queue was not removed after cleanup timer fired")
364+
}
365+
})
366+
}
367+
368+
// TestListenQueueCleanShutdown verifies that a clean context cancellation
369+
// (lease end / server stop) removes the queue immediately without waiting for
370+
// the cleanup timer.
371+
func TestListenQueueCleanShutdown(t *testing.T) {
372+
original := listenQueueCleanupDelay
373+
listenQueueCleanupDelay = 2 * time.Minute // keep long — must NOT fire during test
374+
t.Cleanup(func() { listenQueueCleanupDelay = original })
375+
376+
svc := &ControllerService{}
377+
leaseName := "test-lease-shutdown"
378+
379+
ch := make(chan *pb.ListenResponse, 8)
380+
svc.listenQueues.Store(leaseName, ch)
381+
382+
// Arm a timer that should be cancelled before it fires.
383+
timer := time.AfterFunc(listenQueueCleanupDelay, func() {
384+
svc.listenQueues.Delete(leaseName)
385+
svc.listenTimers.Delete(leaseName)
386+
})
387+
svc.listenTimers.Store(leaseName, timer)
388+
389+
// Simulate the ctx.Done() path in Listen().
390+
if raw, ok := svc.listenTimers.LoadAndDelete(leaseName); ok {
391+
raw.(*time.Timer).Stop()
392+
}
393+
svc.listenQueues.Delete(leaseName)
394+
395+
if _, ok := svc.listenQueues.Load(leaseName); ok {
396+
t.Fatal("listen queue was not removed on clean shutdown")
397+
}
398+
if _, ok := svc.listenTimers.Load(leaseName); ok {
399+
t.Fatal("cleanup timer was not cancelled on clean shutdown")
400+
}
401+
}
402+
302403
// contains checks if substr is contained in s
303404
func contains(s, substr string) bool {
304405
return len(s) >= len(substr) && (s == substr || len(substr) == 0 ||

0 commit comments

Comments
 (0)