Skip to content

Commit

Permalink
Merge pull request #20 from castaneai/handling-assigner-error
Browse files Browse the repository at this point in the history
backend: release a pending tickets when assigner returns error
  • Loading branch information
castaneai authored Jan 17, 2024
2 parents 5de0d3a + 92a31bc commit a55a507
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 25 deletions.
14 changes: 14 additions & 0 deletions backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ func (b *Backend) Tick(ctx context.Context) error {
}
if len(matches) > 0 {
if err := b.assign(ctx, matches); err != nil {
unmatchedTicketIDs = ticketIDsFromMatches(matches)
if err := b.store.ReleaseTickets(ctx, unmatchedTicketIDs); err != nil {
return fmt.Errorf("failed to release unmatched tickets: %w", err)
}
return err
}
}
Expand Down Expand Up @@ -263,3 +267,13 @@ func filterUnmatchedTicketIDs(allTickets []*pb.Ticket, matches []*pb.Match) []st
}
return unmatchedTicketIDs
}

func ticketIDsFromMatches(matches []*pb.Match) []string {
var ticketIDs []string
for _, match := range matches {
for _, ticket := range match.Tickets {
ticketIDs = append(ticketIDs, ticket.Id)
}
}
return ticketIDs
}
74 changes: 49 additions & 25 deletions testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ import (
)

type TestServer struct {
mm *MiniMatch
frontendAddr string
options *testServerOptions
mm *MiniMatch
frontend *TestFrontendServer
options *testServerOptions
}

type TestServerOption interface {
Expand Down Expand Up @@ -64,18 +64,51 @@ func defaultTestServerOpts() *testServerOptions {
}
}

func (ts *TestServer) setupFrontendServer(t *testing.T) (*grpc.Server, net.Listener) {
type TestFrontendServer struct {
sv *grpc.Server
lis net.Listener
}

func (ts *TestFrontendServer) Addr() string {
return ts.lis.Addr().String()
}

func (ts *TestFrontendServer) Dial(t *testing.T) pb.FrontendServiceClient {
cc, err := grpc.Dial(ts.Addr(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("failed to dial to TestFrontendServer: %+v", err)
}
return pb.NewFrontendServiceClient(cc)
}

func (ts *TestFrontendServer) Start(t *testing.T) {
go func() {
if err := ts.sv.Serve(ts.lis); err != nil {
t.Logf("failed to serve minimatch frontend: %+v", err)
}
}()
waitForTCPServerReady(t, ts.lis.Addr().String(), 10*time.Second)
}

func (ts *TestFrontendServer) Stop() {
ts.sv.Stop()
}

func NewTestFrontendServer(t *testing.T, store statestore.StateStore, addr string) *TestFrontendServer {
// start frontend
lis, err := net.Listen("tcp", ts.options.frontendListenAddr)
lis, err := net.Listen("tcp", addr)
if err != nil {
t.Fatalf("failed to listen test frontend server: %+v", err)
}
ts.frontendAddr = lis.Addr().String()
t.Cleanup(func() { _ = lis.Close() })
sv := grpc.NewServer()
pb.RegisterFrontendServiceServer(sv, ts.mm.FrontendService())
t.Cleanup(func() { sv.Stop() })
return sv, lis
pb.RegisterFrontendServiceServer(sv, NewFrontendService(store))
ts := &TestFrontendServer{
sv: sv,
lis: lis,
}
t.Cleanup(func() { ts.Stop() })
return ts
}

// RunTestServer helps with integration tests using Open Match.
Expand All @@ -85,13 +118,14 @@ func RunTestServer(t *testing.T, matchFunctions map[*pb.MatchProfile]MatchFuncti
for _, o := range opts {
o.apply(options)
}
store, _ := newStateStoreWithMiniRedis(t)
store, _ := NewStateStoreWithMiniRedis(t)
mm := NewMiniMatch(store)
for profile, mmf := range matchFunctions {
mm.AddMatchFunction(profile, mmf)
}

ts := &TestServer{mm: mm, frontendAddr: options.frontendListenAddr, options: options}
frontend := NewTestFrontendServer(t, store, options.frontendListenAddr)
ts := &TestServer{mm: mm, frontend: frontend, options: options}

// start backend
go func() {
Expand All @@ -101,22 +135,12 @@ func RunTestServer(t *testing.T, matchFunctions map[*pb.MatchProfile]MatchFuncti
}()

// start frontend
sv, lis := ts.setupFrontendServer(t)
go func() {
if err := sv.Serve(lis); err != nil {
t.Logf("failed to serve minimatch frontend: %+v", err)
}
}()
waitForTCPServerReady(t, lis.Addr().String(), 10*time.Second)
frontend.Start(t)
return ts
}

func (ts *TestServer) DialFrontend(t *testing.T) pb.FrontendServiceClient {
cc, err := grpc.Dial(ts.FrontendAddr(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("failed to dial to minimatch test server: %+v", err)
}
return pb.NewFrontendServiceClient(cc)
return ts.frontend.Dial(t)
}

// TickBackend triggers a Director's Tick, which immediately calls Match Function and Assigner.
Expand All @@ -127,7 +151,7 @@ func (ts *TestServer) TickBackend() error {

// FrontendAddr returns the address listening as frontend.
func (ts *TestServer) FrontendAddr() string {
return ts.frontendAddr
return ts.frontend.Addr()
}

func waitForTCPServerReady(t *testing.T, addr string, timeout time.Duration) {
Expand Down Expand Up @@ -170,7 +194,7 @@ func waitForTCPServerReady(t *testing.T, addr string, timeout time.Duration) {
}
}

func newStateStoreWithMiniRedis(t *testing.T) (statestore.StateStore, *miniredis.Miniredis) {
func NewStateStoreWithMiniRedis(t *testing.T) (statestore.StateStore, *miniredis.Miniredis) {
mr := miniredis.RunT(t)
copt := rueidis.ClientOption{InitAddress: []string{mr.Addr()}, DisableCache: true}
redis, err := rueidis.NewClient(copt)
Expand Down
35 changes: 35 additions & 0 deletions tests/intergration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,41 @@ func TestEvaluator(t *testing.T) {
require.Equal(t, as3.Connection, as4.Connection)
}

func TestAssignerError(t *testing.T) {
store, _ := minimatch.NewStateStoreWithMiniRedis(t)
invalidAssigner := minimatch.AssignerFunc(func(ctx context.Context, matches []*pb.Match) ([]*pb.AssignmentGroup, error) {
return nil, errors.New("error")
})
invalidBackend, err := minimatch.NewBackend(store, invalidAssigner)
require.NoError(t, err)
invalidBackend.AddMatchFunction(anyProfile, minimatch.MatchFunctionSimple1vs1)

validAssigner := minimatch.AssignerFunc(dummyAssign)
validBackend, err := minimatch.NewBackend(store, validAssigner)
require.NoError(t, err)
validBackend.AddMatchFunction(anyProfile, minimatch.MatchFunctionSimple1vs1)

ctx := context.Background()
frontend := minimatch.NewTestFrontendServer(t, store, "127.0.0.1:0")
frontend.Start(t)
fc := frontend.Dial(t)
t1, err := fc.CreateTicket(ctx, &pb.CreateTicketRequest{Ticket: &pb.Ticket{}})
require.NoError(t, err)
t2, err := fc.CreateTicket(ctx, &pb.CreateTicketRequest{Ticket: &pb.Ticket{}})
require.NoError(t, err)

require.Error(t, invalidBackend.Tick(ctx))
mustNotAssignment(ctx, t, fc, t1.Id)
mustNotAssignment(ctx, t, fc, t2.Id)

// If the Assigner returns an error,
// the ticket in Pending status is released and can be immediately fetched from another backend.
require.NoError(t, validBackend.Tick(ctx))
as1 := mustAssignment(ctx, t, fc, t1.Id)
as2 := mustAssignment(ctx, t, fc, t2.Id)
assert.Equal(t, as1.Connection, as2.Connection)
}

func mustCreateTicket(ctx context.Context, t *testing.T, c pb.FrontendServiceClient, ticket *pb.Ticket) *pb.Ticket {
t.Helper()
resp, err := c.CreateTicket(ctx, &pb.CreateTicketRequest{Ticket: ticket})
Expand Down

0 comments on commit a55a507

Please sign in to comment.