diff --git a/backend.go b/backend.go index 03dc9ec..b994880 100644 --- a/backend.go +++ b/backend.go @@ -119,7 +119,7 @@ func (b *Backend) Start(ctx context.Context, tickRate time.Duration) error { } func (b *Backend) Tick(ctx context.Context) error { - activeTickets, err := b.fetchActiveTickets(ctx) + activeTickets, err := b.fetchActiveTickets(ctx, b.options.fetchTicketsLimit) if err != nil { return err } @@ -155,15 +155,20 @@ func (b *Backend) Tick(ctx context.Context) error { return nil } -func (b *Backend) fetchActiveTickets(ctx context.Context) ([]*pb.Ticket, error) { +func (b *Backend) fetchActiveTickets(ctx context.Context, limit int64) ([]*pb.Ticket, error) { start := time.Now() - activeTicketIDs, err := b.store.GetActiveTicketIDs(ctx, b.options.fetchTicketsLimit) + activeTicketIDs, err := b.store.GetActiveTicketIDs(ctx) if err != nil { return nil, fmt.Errorf("failed to fetch active ticket IDs: %w", err) } - if len(activeTicketIDs) == 0 { + activeTicketCount := int64(len(activeTicketIDs)) + b.metrics.recordTicketCountActive(ctx, activeTicketCount) + if activeTicketCount == 0 { return nil, nil } + if activeTicketCount > limit { + activeTicketIDs = activeTicketIDs[:limit] + } tickets, err := b.store.GetTickets(ctx, activeTicketIDs) if err != nil { return nil, fmt.Errorf("failed to fetch active tickets: %w", err) diff --git a/docs/metrics.md b/docs/metrics.md index 2b1bbd8..253e796 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -4,14 +4,15 @@ minimatch Backend exposes metrics in OpenTelemetry format to help monitor perfor ## Metrics list -| Metrics Name | Type | Description | -|:--------------------------------------------|:----------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| `minimatch.backend.tickets_fetched` | Counter | Number of times Ticket has been fetched by backends. | -| `minimatch.backend.tickets_assigned` | Counter | Number of times match has been assigned to a Ticket by backends. If this value is extremely less than `minimatch.backend.tickets_fetched`, the matchmaking logic may be undesirable. | -| `minimatch.backend.fetch_tickets_latency` | Histogram | Latency of the time the Ticket has been fetched by backends. If this value is slow, you may have a Redis performance problem or a lock conflict with assign tickets or other backends. | -| `minimatch.backend.match_function_latency` | Histogram | Latency of Match Function calls. | -| `minimatch.backend.assigner_latency` | Histogram | Latency of Assigner calls. | -| `minimatch.backend.assign_to_redis_latency` | Histogram | Latency to write Assign results to Redis. If this value is slow, you may have a Redis performance problem or a lock conflict with tickets_fetched or other backends. | +| Metrics Name | Type | Description | +|:--------------------------------------------|:--------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `minimatch.backend.tickets.count` | UpDownCounter | Total number of tickets. **Do not sum** this counter, as a single backend counts all tickets. | +| `minimatch.backend.tickets_fetched` | Counter | Number of times Ticket has been fetched by backends. | +| `minimatch.backend.tickets_assigned` | Counter | Number of times match has been assigned to a Ticket by backends. If this value is extremely less than `minimatch.backend.tickets_fetched`, the matchmaking logic may be undesirable. | +| `minimatch.backend.fetch_tickets_latency` | Histogram | Latency of the time the Ticket has been fetched by backends. If this value is slow, you may have a Redis performance problem or a lock conflict with assign tickets or other backends. | +| `minimatch.backend.match_function_latency` | Histogram | Latency of Match Function calls. | +| `minimatch.backend.assigner_latency` | Histogram | Latency of Assigner calls. | +| `minimatch.backend.assign_to_redis_latency` | Histogram | Latency to write Assign results to Redis. If this value is slow, you may have a Redis performance problem or a lock conflict with tickets_fetched or other backends. | ## Meter provider diff --git a/metrics.go b/metrics.go index 5c9865f..2df9603 100644 --- a/metrics.go +++ b/metrics.go @@ -2,6 +2,7 @@ package minimatch import ( "context" + "sync/atomic" "time" "go.opentelemetry.io/otel/attribute" @@ -18,16 +19,21 @@ var ( defaultHistogramBuckets = []float64{ .005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10, } + keyTicketStatus = attribute.Key("status") + attributeActiveTicket = keyTicketStatus.String("active") ) type backendMetrics struct { meter metric.Meter ticketsFetched metric.Int64Counter ticketsAssigned metric.Int64Counter + ticketCount metric.Int64ObservableUpDownCounter fetchTicketsLatency metric.Float64Histogram matchFunctionLatency metric.Float64Histogram assignerLatency metric.Float64Histogram assignToRedisLatency metric.Float64Histogram + + ticketCountActive atomic.Int64 } func newBackendMetrics(provider metric.MeterProvider) (*backendMetrics, error) { @@ -64,7 +70,7 @@ func newBackendMetrics(provider metric.MeterProvider) (*backendMetrics, error) { if err != nil { return nil, err } - return &backendMetrics{ + metrics := &backendMetrics{ meter: meter, ticketsFetched: ticketsFetched, ticketsAssigned: ticketsAssigned, @@ -72,7 +78,18 @@ func newBackendMetrics(provider metric.MeterProvider) (*backendMetrics, error) { matchFunctionLatency: matchFunctionLatency, assignerLatency: assignerLatency, assignToRedisLatency: assignToRedisLatency, - }, nil + } + ticketCount, err := meter.Int64ObservableUpDownCounter("minimatch.backend.tickets.count", + metric.WithDescription("Total number of tickets. Do not sum this counter, as a single backend counts all tickets."), + metric.WithInt64Callback(func(ctx context.Context, o metric.Int64Observer) error { + o.Observe(metrics.ticketCountActive.Load(), metric.WithAttributes(attributeActiveTicket)) + return nil + })) + if err != nil { + return nil, err + } + metrics.ticketCount = ticketCount + return metrics, nil } func (m *backendMetrics) recordMatchFunctionLatency(ctx context.Context, seconds float64, matchProfile *pb.MatchProfile) { @@ -99,6 +116,10 @@ func (m *backendMetrics) recordAssignToRedisLatency(ctx context.Context, latency m.assignToRedisLatency.Record(ctx, latency.Seconds()) } +func (m *backendMetrics) recordTicketCountActive(ctx context.Context, count int64) { + m.ticketCountActive.Store(count) +} + type matchFunctionWithMetrics struct { mmf MatchFunction metrics *backendMetrics diff --git a/pkg/statestore/redis.go b/pkg/statestore/redis.go index d05e016..1d1f349 100644 --- a/pkg/statestore/redis.go +++ b/pkg/statestore/redis.go @@ -156,10 +156,10 @@ func (s *RedisStore) GetAssignment(ctx context.Context, ticketID string) (*pb.As return s.getAssignment(ctx, redis, ticketID) } -// The ActiveTicketIDs may still contain the ID of a ticket that was deleted by TTL. +// GetActiveTicketIDs may also retrieve tickets deleted by TTL. // This is because the ticket index and Ticket data are stored in separate keys. // The next `GetTicket` or `GetTickets` call will resolve this inconsistency. -func (s *RedisStore) GetActiveTicketIDs(ctx context.Context, limit int64) ([]string, error) { +func (s *RedisStore) GetActiveTicketIDs(ctx context.Context) ([]string, error) { // Acquire a lock to prevent multiple backends from fetching the same Ticket. // In order to avoid race conditions with other Ticket Index changes, get tickets and set them to pending state should be done atomically. lockedCtx, unlock, err := s.locker.WithContext(ctx, redisKeyFetchTicketsLock(s.opts.keyPrefix)) @@ -168,7 +168,7 @@ func (s *RedisStore) GetActiveTicketIDs(ctx context.Context, limit int64) ([]str } defer unlock() - allTicketIDs, err := s.getAllTicketIDs(lockedCtx, limit) + allTicketIDs, err := s.getAllTicketIDs(lockedCtx) if err != nil { return nil, fmt.Errorf("failed to get all ticket IDs: %w", err) } @@ -189,8 +189,8 @@ func (s *RedisStore) GetActiveTicketIDs(ctx context.Context, limit int64) ([]str return activeTicketIDs, nil } -func (s *RedisStore) getAllTicketIDs(ctx context.Context, limit int64) ([]string, error) { - resp := s.client.Do(ctx, s.client.B().Srandmember().Key(redisKeyTicketIndex(s.opts.keyPrefix)).Count(limit).Build()) +func (s *RedisStore) getAllTicketIDs(ctx context.Context) ([]string, error) { + resp := s.client.Do(ctx, s.client.B().Smembers().Key(redisKeyTicketIndex(s.opts.keyPrefix)).Build()) if err := resp.Error(); err != nil { if rueidis.IsRedisNil(err) { return nil, nil diff --git a/pkg/statestore/redis_test.go b/pkg/statestore/redis_test.go index f0b79d1..f8a675b 100644 --- a/pkg/statestore/redis_test.go +++ b/pkg/statestore/redis_test.go @@ -17,10 +17,6 @@ import ( "open-match.dev/open-match/pkg/pb" ) -const ( - defaultFetchTicketsLimit int64 = 10000 -) - func newTestRedisStore(t *testing.T, addr string, opts ...RedisOption) *RedisStore { copt := rueidis.ClientOption{InitAddress: []string{addr}, DisableCache: true} rc, err := rueidis.NewClient(copt) @@ -41,18 +37,18 @@ func TestPendingRelease(t *testing.T) { require.NoError(t, store.CreateTicket(ctx, &pb.Ticket{Id: "test1"})) require.NoError(t, store.CreateTicket(ctx, &pb.Ticket{Id: "test2"})) - activeTicketIDs, err := store.GetActiveTicketIDs(ctx, defaultFetchTicketsLimit) + activeTicketIDs, err := store.GetActiveTicketIDs(ctx) require.NoError(t, err) require.ElementsMatch(t, activeTicketIDs, []string{"test1", "test2"}) - activeTicketIDs, err = store.GetActiveTicketIDs(ctx, defaultFetchTicketsLimit) + activeTicketIDs, err = store.GetActiveTicketIDs(ctx) require.NoError(t, err) require.Empty(t, activeTicketIDs) // release one ticket require.NoError(t, store.ReleaseTickets(ctx, []string{"test1"})) - activeTicketIDs, err = store.GetActiveTicketIDs(ctx, defaultFetchTicketsLimit) + activeTicketIDs, err = store.GetActiveTicketIDs(ctx) require.NoError(t, err) require.ElementsMatch(t, activeTicketIDs, []string{"test1"}) } @@ -67,12 +63,12 @@ func TestPendingReleaseTimeout(t *testing.T) { require.NoError(t, store.CreateTicket(ctx, &pb.Ticket{Id: "test"})) // get active tickets for proposal (active -> pending) - activeTicketIDs, err := store.GetActiveTicketIDs(ctx, defaultFetchTicketsLimit) + activeTicketIDs, err := store.GetActiveTicketIDs(ctx) require.NoError(t, err) require.Len(t, activeTicketIDs, 1) // 0 active ticket - activeTicketIDs, err = store.GetActiveTicketIDs(ctx, defaultFetchTicketsLimit) + activeTicketIDs, err = store.GetActiveTicketIDs(ctx) require.NoError(t, err) require.Len(t, activeTicketIDs, 0) @@ -81,26 +77,11 @@ func TestPendingReleaseTimeout(t *testing.T) { require.NoError(t, err) // 1 active ticket - activeTicketIDs, err = store.GetActiveTicketIDs(ctx, defaultFetchTicketsLimit) + activeTicketIDs, err = store.GetActiveTicketIDs(ctx) require.NoError(t, err) require.Len(t, activeTicketIDs, 1) } -func TestGetActiveTicketIDsLimit(t *testing.T) { - mr := miniredis.RunT(t) - store := newTestRedisStore(t, mr.Addr()) - ctx := context.Background() - - for i := 0; i < 10; i++ { - require.NoError(t, store.CreateTicket(ctx, &pb.Ticket{Id: fmt.Sprintf("test-%d", i)})) - } - for i := 0; i < 3; i++ { - activeTicketIDs, err := store.GetActiveTicketIDs(ctx, 4) - require.NoError(t, err) - require.LessOrEqual(t, len(activeTicketIDs), 4) - } -} - func TestAssignedDeleteTimeout(t *testing.T) { mr := miniredis.RunT(t) store := newTestRedisStore(t, mr.Addr()) @@ -108,7 +89,7 @@ func TestAssignedDeleteTimeout(t *testing.T) { require.NoError(t, store.CreateTicket(ctx, &pb.Ticket{Id: "test1"})) require.NoError(t, store.CreateTicket(ctx, &pb.Ticket{Id: "test2"})) - activeTicketIDs, err := store.GetActiveTicketIDs(ctx, defaultFetchTicketsLimit) + activeTicketIDs, err := store.GetActiveTicketIDs(ctx) require.NoError(t, err) require.ElementsMatch(t, activeTicketIDs, []string{"test1", "test2"}) @@ -169,7 +150,7 @@ func TestTicketTTL(t *testing.T) { _, err = store.GetTicket(ctx, "test1") require.Error(t, err, ErrTicketNotFound) - activeTicketIDs, err := store.GetActiveTicketIDs(ctx, defaultFetchTicketsLimit) + activeTicketIDs, err := store.GetActiveTicketIDs(ctx) require.NoError(t, err) require.NotContains(t, activeTicketIDs, "test1") @@ -190,7 +171,7 @@ func TestTicketTTL(t *testing.T) { // This is because the ticket index and Ticket data are stored in separate keys. // In this example, "test2" and "test3" were deleted by TTL, but remain in the ticket index. - activeTicketIDs, err = store.GetActiveTicketIDs(ctx, defaultFetchTicketsLimit) + activeTicketIDs, err = store.GetActiveTicketIDs(ctx) require.NoError(t, err) require.ElementsMatch(t, activeTicketIDs, []string{"test2", "test3", "test4"}) err = store.ReleaseTickets(ctx, []string{"test2", "test3", "test4"}) @@ -203,7 +184,7 @@ func TestTicketTTL(t *testing.T) { // Because we called GetTickets, "test2" and "test3" which were deleted by TTL, // were deleted from the ticket index as well. - activeTicketIDs, err = store.GetActiveTicketIDs(ctx, defaultFetchTicketsLimit) + activeTicketIDs, err = store.GetActiveTicketIDs(ctx) require.NoError(t, err) require.ElementsMatch(t, activeTicketIDs, []string{"test4"}) } @@ -225,7 +206,7 @@ func TestConcurrentFetchActiveTickets(t *testing.T) { duplicateMap := map[string]struct{}{} for i := 0; i < concurrency; i++ { eg.Go(func() error { - ticketIDs, err := store.GetActiveTicketIDs(ctx, 1000) + ticketIDs, err := store.GetActiveTicketIDs(ctx) if err != nil { return err } @@ -262,7 +243,7 @@ func TestConcurrentFetchAndAssign(t *testing.T) { eg, _ := errgroup.WithContext(ctx) for i := 0; i < concurrency; i++ { eg.Go(func() error { - ticketIDs, err := store.GetActiveTicketIDs(ctx, 1000) + ticketIDs, err := store.GetActiveTicketIDs(ctx) if err != nil { return err } diff --git a/pkg/statestore/statestore.go b/pkg/statestore/statestore.go index 7d1bdcd..a32d90a 100644 --- a/pkg/statestore/statestore.go +++ b/pkg/statestore/statestore.go @@ -18,7 +18,7 @@ type StateStore interface { GetTicket(ctx context.Context, ticketID string) (*pb.Ticket, error) GetTickets(ctx context.Context, ticketIDs []string) ([]*pb.Ticket, error) GetAssignment(ctx context.Context, ticketID string) (*pb.Assignment, error) - GetActiveTicketIDs(ctx context.Context, limit int64) ([]string, error) + GetActiveTicketIDs(ctx context.Context) ([]string, error) ReleaseTickets(ctx context.Context, ticketIDs []string) error AssignTickets(ctx context.Context, asgs []*pb.AssignmentGroup) error } diff --git a/pkg/statestore/ticketcache.go b/pkg/statestore/ticketcache.go index 898aaa5..2fb695a 100644 --- a/pkg/statestore/ticketcache.go +++ b/pkg/statestore/ticketcache.go @@ -101,8 +101,8 @@ func (s *StoreWithTicketCache) GetAssignment(ctx context.Context, ticketID strin return s.origin.GetAssignment(ctx, ticketID) } -func (s *StoreWithTicketCache) GetActiveTicketIDs(ctx context.Context, limit int64) ([]string, error) { - return s.origin.GetActiveTicketIDs(ctx, limit) +func (s *StoreWithTicketCache) GetActiveTicketIDs(ctx context.Context) ([]string, error) { + return s.origin.GetActiveTicketIDs(ctx) } func (s *StoreWithTicketCache) ReleaseTickets(ctx context.Context, ticketIDs []string) error { diff --git a/pkg/statestore/ticketcache_test.go b/pkg/statestore/ticketcache_test.go index f7c66cb..c6786dc 100644 --- a/pkg/statestore/ticketcache_test.go +++ b/pkg/statestore/ticketcache_test.go @@ -51,14 +51,18 @@ func TestTicketCache(t *testing.T) { require.NoError(t, err) require.ElementsMatch(t, []string{"t2", "t3", "t4"}, getTicketIDs(ts)) + // delete "t3" in redis require.NoError(t, redisStore.DeleteTicket(ctx, "t3")) - ts, err = store.GetTickets(ctx, []string{"t2", "t3", "t4", "t5"}) + + // "t3" is still in cache + ts, err = store.GetTickets(ctx, []string{"t2", "t3", "t4"}) require.NoError(t, err) require.ElementsMatch(t, []string{"t2", "t3", "t4"}, getTicketIDs(ts)) + // expires "t3" cache time.Sleep(ttl + 10*time.Millisecond) - ts, err = store.GetTickets(ctx, []string{"t2", "t3", "t4", "t5"}) + ts, err = store.GetTickets(ctx, []string{"t2", "t3", "t4"}) require.NoError(t, err) require.ElementsMatch(t, []string{"t2", "t4"}, getTicketIDs(ts)) }