Skip to content

Commit

Permalink
fix data race
Browse files Browse the repository at this point in the history
  • Loading branch information
castaneai committed Dec 25, 2023
1 parent ff6ba90 commit c27cf35
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 9 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ jobs:
- name: Get dependencies
run: go mod download
- name: Test
run: go test -v ./...
run: go test -v -race ./...
19 changes: 15 additions & 4 deletions backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package minimatch
import (
"context"
"fmt"
"sync"
"time"

"go.opentelemetry.io/otel"
Expand All @@ -20,6 +21,7 @@ const (
type Backend struct {
store statestore.StateStore
mmfs map[*pb.MatchProfile]MatchFunction
mmfMu sync.RWMutex
assigner Assigner
options *backendOptions
metrics *backendMetrics
Expand Down Expand Up @@ -79,22 +81,28 @@ func NewBackend(store statestore.StateStore, assigner Assigner, opts ...BackendO
return &Backend{
store: store,
mmfs: map[*pb.MatchProfile]MatchFunction{},
mmfMu: sync.RWMutex{},
assigner: newAssignerWithMetrics(assigner, metrics),
options: options,
metrics: metrics,
}, nil
}

func (b *Backend) AddMatchFunction(profile *pb.MatchProfile, mmf MatchFunction) {
b.mmfMu.Lock()
defer b.mmfMu.Unlock()
b.mmfs[profile] = newMatchFunctionWithMetrics(mmf, b.metrics)
}

func (b *Backend) Start(ctx context.Context, tickRate time.Duration) error {
ticker := time.NewTicker(tickRate)
defer ticker.Stop()

profiles := make([]string, 0, len(b.mmfs))
for profile := range b.mmfs {
b.mmfMu.RLock()
mmfs := b.mmfs
b.mmfMu.RUnlock()
profiles := make([]string, 0, len(mmfs))
for profile := range mmfs {
profiles = append(profiles, profile.Name)
}
for {
Expand Down Expand Up @@ -154,9 +162,12 @@ func (b *Backend) fetchActiveTickets(ctx context.Context) ([]*pb.Ticket, error)
}

func (b *Backend) makeMatches(ctx context.Context, activeTickets []*pb.Ticket) ([]*pb.Match, error) {
resCh := make(chan []*pb.Match, len(b.mmfs))
b.mmfMu.RLock()
mmfs := b.mmfs
b.mmfMu.RUnlock()
resCh := make(chan []*pb.Match, len(mmfs))
eg, ctx := errgroup.WithContext(ctx)
for profile, mmf := range b.mmfs {
for profile, mmf := range mmfs {
profile := profile
mmf := mmf
eg.Go(func() error {
Expand Down
16 changes: 12 additions & 4 deletions minimatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net"
"sync"
"time"

"github.com/alicebob/miniredis/v2"
Expand All @@ -19,6 +20,7 @@ type MiniMatch struct {
store statestore.StateStore
mmfs map[*pb.MatchProfile]MatchFunction
backend *Backend
mu sync.RWMutex
}

func NewMiniMatchWithRedis(opts ...statestore.RedisOption) (*MiniMatch, error) {
Expand All @@ -45,6 +47,7 @@ func NewMiniMatch(store statestore.StateStore) *MiniMatch {
return &MiniMatch{
store: store,
mmfs: map[*pb.MatchProfile]MatchFunction{},
mu: sync.RWMutex{},
}
}

Expand All @@ -71,19 +74,24 @@ func (m *MiniMatch) StartBackend(ctx context.Context, assigner Assigner, tickRat
if err != nil {
return fmt.Errorf("failed to create minimatch backend: %w", err)
}
m.mu.Lock()
m.backend = backend
m.mu.Unlock()
for profile, mmf := range m.mmfs {
m.backend.AddMatchFunction(profile, mmf)
backend.AddMatchFunction(profile, mmf)
}
return m.backend.Start(ctx, tickRate)
return backend.Start(ctx, tickRate)
}

// for testing
func (m *MiniMatch) TickBackend(ctx context.Context) error {
if m.backend == nil {
m.mu.RLock()
backend := m.backend
m.mu.RUnlock()
if backend == nil {
return fmt.Errorf("backend has not been started")
}
return m.backend.Tick(ctx)
return backend.Tick(ctx)
}

var MatchFunctionSimple1vs1 = MatchFunctionFunc(func(ctx context.Context, profile *pb.MatchProfile, poolTickets map[string][]*pb.Ticket) ([]*pb.Match, error) {
Expand Down
37 changes: 37 additions & 0 deletions pkg/statestore/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ package statestore
import (
"context"
"fmt"
"sync"
"testing"
"time"

"github.com/alicebob/miniredis/v2"
"github.com/redis/rueidis"
"github.com/redis/rueidis/rueidislock"
"github.com/rs/xid"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
"open-match.dev/open-match/pkg/pb"
)

Expand Down Expand Up @@ -168,3 +171,37 @@ func ticketIDs(tickets []*pb.Ticket) []string {
}
return ids
}

func TestConcurrentFetchActiveTickets(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
mr := miniredis.RunT(t)
store := newTestRedisStore(t, mr.Addr())

for i := 0; i < 1000; i++ {
require.NoError(t, store.CreateTicket(ctx, &pb.Ticket{Id: xid.New().String()}))
}

eg, _ := errgroup.WithContext(ctx)
var mu sync.Mutex
ticketIDs := map[string]struct{}{}
for i := 0; i < 1000; i++ {
eg.Go(func() error {
tickets, err := store.GetActiveTickets(ctx, 1000)
if err != nil {
return err
}
for _, ticket := range tickets {
mu.Lock()
if _, ok := ticketIDs[ticket.Id]; ok {
mu.Unlock()
return fmt.Errorf("duplicated! ticket id: %s", ticket.Id)
}
ticketIDs[ticket.Id] = struct{}{}
mu.Unlock()
}
return nil
})
}
require.NoError(t, eg.Wait())
}

0 comments on commit c27cf35

Please sign in to comment.