Skip to content

Commit

Permalink
Merge pull request #10 from castaneai/evaluator
Browse files Browse the repository at this point in the history
add Evaluator
  • Loading branch information
castaneai authored Dec 22, 2023
2 parents 6f6335f + 322e368 commit 07d836c
Show file tree
Hide file tree
Showing 12 changed files with 513 additions and 325 deletions.
70 changes: 16 additions & 54 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ It runs in a single process; there are no dependencies other than Go!
- [x] Create/Get/Watch/Delete ticket
- [ ] Backfill
- [x] Run match functions and propose matches
- [x] Evaluator

## Quickstart

Expand All @@ -47,15 +48,15 @@ func AssignGameServer(ctx context.Context, matches []*pb.Match) ([]*pb.Assignmen

func main() {
// Create minimatch instance with miniredis
mm, _ := minimatch.NewMiniMatchWithRedis()
mm, err := minimatch.NewMiniMatchWithRedis()

// Add backend (Match Profile, Match Function and Assigner)
mm.AddBackend(matchProfile, minimatch.MatchFunctionFunc(MakeMatches), minimatch.AssignerFunc(AssignGameServer))
// Add Match Function with Match Profile
mm.AddMatchFunction(matchProfile, minimatch.MatchFunctionFunc(MakeMatches))

// Start minimatch backend service with Director's interval
go func() { mm.StartBackend(context.Background(), 1*time.Second) }()
// Start minimatch backend service with Assigner and tick rate
go func() { mm.StartBackend(context.Background(), minimatch.AssignerFunc(AssignGameServer), 1*time.Second) }()

// Start minimatch frontend service
// Start minimatch frontend service with specific address
mm.StartFrontend(":50504")
}
```
Expand All @@ -64,64 +65,28 @@ See [examples](./examples) for more concrete examples.

## Use case

### Integration tests for matchmaking
### Testing matchmaking logic

Minimatch has Open Match Frontend compatible services.
Therefore, it can be used for integration testing of matchmaking and
can be tested with the same interface as Open Match without Kubernetes.
Therefore, it can be used for testing of matchmaking logic without Kubernetes.

minimatch has a helper function `RunTestServer` making it easy to write matchmaking tests.
See [examples/integration_test](./examples/integration_test/integration_test.go) for more specific examples.

```go
package xxx_test

import (
"context"
"testing"

"github.com/castaneai/minimatch"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"open-match.dev/open-match/pkg/pb"
)

var anyProfile = &pb.MatchProfile{
Name: "test-profile",
Pools: []*pb.Pool{
{Name: "test-pool"},
},
}

func TestSimpleMatch(t *testing.T) {
s := minimatch.RunTestServer(t, anyProfile, minimatch.MatchFunctionFunc(MakeMatches), minimatch.AssignerFunc(AssignGameServer))
c := s.DialFrontend(t)
ctx := context.Background()

t1 := mustCreateTicket(ctx, t, c, &pb.Ticket{})
t2 := mustCreateTicket(ctx, t, c, &pb.Ticket{})

// Trigger director's tick
require.NoError(t, s.TickBackend())

as1 := mustAssignment(ctx, t, c, t1.Id)
as2 := mustAssignment(ctx, t, c, t2.Id)
s := minimatch.RunTestServer(t, profile, minimatch.MatchFunctionFunc(MakeMatches), minimatch.AssignerFunc(AssignGameServer))
frontend := s.DialFrontend(t)

assert.Equal(t, as1.Connection, as2.Connection)
}

func mustCreateTicket(ctx context.Context, t *testing.T, c pb.FrontendServiceClient, ticket *pb.Ticket) *pb.Ticket {
t.Helper()
resp, err := c.CreateTicket(ctx, &pb.CreateTicketRequest{Ticket: ticket})
require.NoError(t, err)
require.NotEmpty(t, resp.Id)
require.NotNil(t, resp.CreateTime)
return resp
}

func mustAssignment(ctx context.Context, t *testing.T, c pb.FrontendServiceClient, ticketID string) *pb.Assignment {
t.Helper()
resp, err := c.GetTicket(ctx, &pb.GetTicketRequest{TicketId: ticketID})
require.NoError(t, err)
require.NotNil(t, resp.Assignment)
return resp.Assignment
// ...
}
```

Expand All @@ -133,10 +98,7 @@ you may want to reduce infrastructure costs for the development environment.
In such cases, minimatch can be installed instead of Open Match to create a minimum development environment.
minimatch has an Open Match compatible Frontend Service, so there is no need to change the API!


## Examples

- [Simple 1vs1 matchmaking server](./examples/simple1vs1/simple1vs1.go)
See [Simple 1vs1 matchmaking server](./examples/simple1vs1/simple1vs1.go) for examples.

## Differences from Open Match

Expand Down
234 changes: 218 additions & 16 deletions backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,48 +2,250 @@ package minimatch

import (
"context"
"fmt"
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
"golang.org/x/sync/errgroup"
"open-match.dev/open-match/pkg/pb"

"github.com/castaneai/minimatch/pkg/mmlog"
"github.com/castaneai/minimatch/pkg/statestore"
)

const (
defaultFetchTicketsLimit int64 = 10000
)

type Backend struct {
directors map[string]*Director
store statestore.StateStore
mmfs map[*pb.MatchProfile]MatchFunction
assigner Assigner
options *backendOptions
metrics *backendMetrics
}

func NewBackend() *Backend {
return &Backend{
directors: map[string]*Director{},
type BackendOption interface {
apply(options *backendOptions)
}

type BackendOptionFunc func(options *backendOptions)

func (f BackendOptionFunc) apply(options *backendOptions) {
f(options)
}

type backendOptions struct {
evaluator Evaluator
fetchTicketsLimit int64
meterProvider metric.MeterProvider
}

func defaultBackendOptions() *backendOptions {
return &backendOptions{
evaluator: nil,
fetchTicketsLimit: defaultFetchTicketsLimit,
meterProvider: otel.GetMeterProvider(),
}
}

func (b *Backend) AddDirector(director *Director) {
b.directors[director.profile.Name] = director
func WithEvaluator(evaluator Evaluator) BackendOption {
return BackendOptionFunc(func(options *backendOptions) {
options.evaluator = evaluator
})
}

func WithBackendMeterProvider(provider metric.MeterProvider) BackendOption {
return BackendOptionFunc(func(options *backendOptions) {
options.meterProvider = provider
})
}

func WithFetchTicketsLimit(limit int64) BackendOption {
return BackendOptionFunc(func(options *backendOptions) {
options.fetchTicketsLimit = limit
})
}

func NewBackend(store statestore.StateStore, assigner Assigner, opts ...BackendOption) (*Backend, error) {
options := defaultBackendOptions()
for _, opt := range opts {
opt.apply(options)
}
metrics, err := newBackendMetrics(options.meterProvider)
if err != nil {
return nil, fmt.Errorf("failed to create backend metrics: %w", err)
}
return &Backend{
store: store,
mmfs: map[*pb.MatchProfile]MatchFunction{},
assigner: newAssignerWithMetrics(assigner, metrics),
options: options,
metrics: metrics,
}, nil
}

func (b *Backend) AddMatchFunction(profile *pb.MatchProfile, mmf MatchFunction) {
b.mmfs[profile] = newMatchFunctionWithMetrics(mmf, b.metrics)
}

func (b *Backend) Start(ctx context.Context, tickRate time.Duration) error {
ticker := time.NewTicker(tickRate)
defer ticker.Stop()

profiles := make([]string, 0, len(b.mmfs))
for profile := range b.mmfs {
profiles = append(profiles, profile.Name)
}
mmlog.Infof("minimatch backend started (matchProfile: %v, tickRate: %s)", profiles, tickRate)
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
if err := b.Tick(ctx); err != nil {
return err
}
}
}
}

func (b *Backend) Tick(ctx context.Context) error {
activeTickets, err := b.fetchActiveTickets(ctx)
if err != nil {
return err
}
if len(activeTickets) == 0 {
return nil
}
matches, err := b.makeMatches(ctx, activeTickets)
if err != nil {
return err
}
if b.options.evaluator != nil {
ms, err := evaluateMatches(ctx, b.options.evaluator, matches)
if err != nil {
return err
}
matches = ms
}
unmatchedTicketIDs := filterUnmatchedTicketIDs(activeTickets, matches)
if len(unmatchedTicketIDs) > 0 {
if err := b.store.ReleaseTickets(ctx, unmatchedTicketIDs); err != nil {
return fmt.Errorf("failed to release unmatched tickets: %w", err)
}
}
if len(matches) > 0 {
if err := b.assign(ctx, matches); err != nil {
return err
}
}
return nil
}

func (b *Backend) fetchActiveTickets(ctx context.Context) ([]*pb.Ticket, error) {
tickets, err := b.store.GetActiveTickets(ctx, b.options.fetchTicketsLimit)
if err != nil {
return nil, fmt.Errorf("failed to fetch active tickets: %w", err)
}
b.metrics.recordTicketsFetched(ctx, int64(len(tickets)))
return tickets, nil
}

func (b *Backend) makeMatches(ctx context.Context, activeTickets []*pb.Ticket) ([]*pb.Match, error) {
resCh := make(chan []*pb.Match, len(b.mmfs))
eg, ctx := errgroup.WithContext(ctx)
for _, d := range b.directors {
dr := d
for profile, mmf := range b.mmfs {
profile := profile
mmf := mmf
eg.Go(func() error {
if err := dr.Run(ctx, tickRate); err != nil {
mmlog.Errorf("error occured in director: %+v", err)
// TODO: retryable?
poolTickets, err := filterTickets(profile, activeTickets)
if err != nil {
return err
}
matches, err := mmf.MakeMatches(ctx, profile, poolTickets)
if err != nil {
return err
}
resCh <- matches
return nil
})
}
return eg.Wait()
if err := eg.Wait(); err != nil {
return nil, err
}
close(resCh)
var totalMatches []*pb.Match
for matches := range resCh {
totalMatches = append(totalMatches, matches...)
}
return totalMatches, nil
}

func (b *Backend) Tick(ctx context.Context) error {
for _, d := range b.directors {
if err := d.Tick(ctx); err != nil {
return err
func (b *Backend) assign(ctx context.Context, matches []*pb.Match) error {
asgs, err := b.assigner.Assign(ctx, matches)
if err != nil {
return fmt.Errorf("failed to assign matches: %w", err)
}
if len(asgs) > 0 {
if err := b.store.AssignTickets(ctx, asgs); err != nil {
return fmt.Errorf("failed to assign tickets: %w", err)
}
}
return nil
}

func evaluateMatches(ctx context.Context, evaluator Evaluator, matches []*pb.Match) ([]*pb.Match, error) {
evaluatedMatches := make([]*pb.Match, 0, len(matches))
evaluatedMatchIDs, err := evaluator.Evaluate(ctx, matches)
if err != nil {
return nil, fmt.Errorf("failed to evaluate matches: %w", err)
}
evaluatedMap := map[string]struct{}{}
for _, emID := range evaluatedMatchIDs {
evaluatedMap[emID] = struct{}{}
}
for _, match := range matches {
if _, ok := evaluatedMap[match.MatchId]; ok {
evaluatedMatches = append(evaluatedMatches, match)
}
}
return evaluatedMatches, nil
}

func filterTickets(profile *pb.MatchProfile, tickets []*pb.Ticket) (map[string][]*pb.Ticket, error) {
poolTickets := map[string][]*pb.Ticket{}
for _, pool := range profile.Pools {
pf, err := newPoolFilter(pool)
if err != nil {
return nil, err
}
if _, ok := poolTickets[pool.Name]; !ok {
poolTickets[pool.Name] = nil
}
for _, ticket := range tickets {
if pf.In(ticket) {
poolTickets[pool.Name] = append(poolTickets[pool.Name], ticket)
}
}
}
return poolTickets, nil
}

func filterUnmatchedTicketIDs(allTickets []*pb.Ticket, matches []*pb.Match) []string {
matchedTickets := map[string]struct{}{}
for _, match := range matches {
for _, ticketID := range ticketIDs(match.Tickets) {
matchedTickets[ticketID] = struct{}{}
}
}

var unmatchedTicketIDs []string
for _, ticket := range allTickets {
if _, ok := matchedTickets[ticket.Id]; !ok {
unmatchedTicketIDs = append(unmatchedTicketIDs, ticket.Id)
}
}
return unmatchedTicketIDs
}
Loading

0 comments on commit 07d836c

Please sign in to comment.