Skip to content

Commit

Permalink
backend: graceful shutdown
Browse files Browse the repository at this point in the history
The processing tick is not interrupted even if the context is canceled.
However, the next tick will not be executed, which is a graceful shutdown process.
  • Loading branch information
castaneai committed Oct 31, 2024
1 parent 2c89edb commit fe2d797
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 1 deletion.
6 changes: 5 additions & 1 deletion backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ func (b *Backend) AddMatchFunction(profile *pb.MatchProfile, mmf MatchFunction)
b.mmfs[profile] = newMatchFunctionWithMetrics(mmf, b.metrics)
}

// ctx is used to stop the backend, preferably one triggered by SIGTERM.
// After stopping, it returns a context.Canceled error.
func (b *Backend) Start(ctx context.Context, tickRate time.Duration) error {
ticker := time.NewTicker(tickRate)
defer ticker.Stop()
Expand All @@ -123,7 +125,9 @@ func (b *Backend) Start(ctx context.Context, tickRate time.Duration) error {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
if err := b.Tick(ctx); err != nil {
// The processing tick is not interrupted even if the context is canceled.
// However, the next tick will not be executed, which is a graceful shutdown process.
if err := b.Tick(context.Background()); err != nil {
b.options.logger.With("error", err).Error(fmt.Sprintf("failed to tick backend: %+v", err))
}
}
Expand Down
38 changes: 38 additions & 0 deletions backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import (
"context"
"log"
"testing"
"time"

"github.com/bojand/hri"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
"open-match.dev/open-match/pkg/pb"

"github.com/castaneai/minimatch/pkg/statestore"
Expand Down Expand Up @@ -94,6 +96,42 @@ func TestValidateTicketExistenceBeforeAssign(t *testing.T) {
})
}

func TestGracefulShutdown(t *testing.T) {
frontStore, backStore, _ := NewStateStoreWithMiniRedis(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

backend, err := NewBackend(backStore, AssignerFunc(func(ctx context.Context, matches []*pb.Match) ([]*pb.AssignmentGroup, error) {
time.Sleep(500 * time.Millisecond)
return dummyAssign(ctx, matches)
}))
require.NoError(t, err)
backend.AddMatchFunction(anyProfile, MatchFunctionSimple1vs1)

err = frontStore.CreateTicket(ctx, &pb.Ticket{Id: "t1"}, defaultTicketTTL)
require.NoError(t, err)
err = frontStore.CreateTicket(ctx, &pb.Ticket{Id: "t2"}, defaultTicketTTL)
require.NoError(t, err)

eg, egCtx := errgroup.WithContext(ctx)
eg.Go(func() error {
return backend.Start(egCtx, 10*time.Millisecond)
})
time.Sleep(50 * time.Millisecond)
cancel() // stop backend
require.ErrorIs(t, eg.Wait(), context.Canceled)

// Even if backend stops, the tick being processed will be completed.
ctx = context.Background()
as1, err := frontStore.GetAssignment(ctx, "t1")
require.NoError(t, err)
require.NotNil(t, as1)
as2, err := frontStore.GetAssignment(ctx, "t2")
require.NoError(t, err)
require.NotNil(t, as2)
require.Equal(t, as1.Connection, as2.Connection)
}

var anyProfile = &pb.MatchProfile{
Name: "test-profile",
Pools: []*pb.Pool{
Expand Down

0 comments on commit fe2d797

Please sign in to comment.