Skip to content

Commit

Permalink
Merge pull request #23 from castaneai/total-active-tickets-metrics
Browse files Browse the repository at this point in the history
Total active tickets metrics
  • Loading branch information
castaneai authored Mar 12, 2024
2 parents e2195ed + 0e763d2 commit 825088b
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 55 deletions.
13 changes: 9 additions & 4 deletions backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (b *Backend) Start(ctx context.Context, tickRate time.Duration) error {
}

func (b *Backend) Tick(ctx context.Context) error {
activeTickets, err := b.fetchActiveTickets(ctx)
activeTickets, err := b.fetchActiveTickets(ctx, b.options.fetchTicketsLimit)
if err != nil {
return err
}
Expand Down Expand Up @@ -155,15 +155,20 @@ func (b *Backend) Tick(ctx context.Context) error {
return nil
}

func (b *Backend) fetchActiveTickets(ctx context.Context) ([]*pb.Ticket, error) {
func (b *Backend) fetchActiveTickets(ctx context.Context, limit int64) ([]*pb.Ticket, error) {
start := time.Now()
activeTicketIDs, err := b.store.GetActiveTicketIDs(ctx, b.options.fetchTicketsLimit)
activeTicketIDs, err := b.store.GetActiveTicketIDs(ctx)
if err != nil {
return nil, fmt.Errorf("failed to fetch active ticket IDs: %w", err)
}
if len(activeTicketIDs) == 0 {
activeTicketCount := int64(len(activeTicketIDs))
b.metrics.recordTicketCountActive(ctx, activeTicketCount)
if activeTicketCount == 0 {
return nil, nil
}
if activeTicketCount > limit {
activeTicketIDs = activeTicketIDs[:limit]
}
tickets, err := b.store.GetTickets(ctx, activeTicketIDs)
if err != nil {
return nil, fmt.Errorf("failed to fetch active tickets: %w", err)
Expand Down
17 changes: 9 additions & 8 deletions docs/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ minimatch Backend exposes metrics in OpenTelemetry format to help monitor perfor

## Metrics list

| Metrics Name | Type | Description |
|:--------------------------------------------|:----------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `minimatch.backend.tickets_fetched` | Counter | Number of times Ticket has been fetched by backends. |
| `minimatch.backend.tickets_assigned` | Counter | Number of times match has been assigned to a Ticket by backends. If this value is extremely less than `minimatch.backend.tickets_fetched`, the matchmaking logic may be undesirable. |
| `minimatch.backend.fetch_tickets_latency` | Histogram | Latency of the time the Ticket has been fetched by backends. If this value is slow, you may have a Redis performance problem or a lock conflict with assign tickets or other backends. |
| `minimatch.backend.match_function_latency` | Histogram | Latency of Match Function calls. |
| `minimatch.backend.assigner_latency` | Histogram | Latency of Assigner calls. |
| `minimatch.backend.assign_to_redis_latency` | Histogram | Latency to write Assign results to Redis. If this value is slow, you may have a Redis performance problem or a lock conflict with tickets_fetched or other backends. |
| Metrics Name | Type | Description |
|:--------------------------------------------|:--------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `minimatch.backend.tickets.count` | UpDownCounter | Total number of tickets. **Do not sum** this counter, as a single backend counts all tickets. |
| `minimatch.backend.tickets_fetched` | Counter | Number of times Ticket has been fetched by backends. |
| `minimatch.backend.tickets_assigned` | Counter | Number of times match has been assigned to a Ticket by backends. If this value is extremely less than `minimatch.backend.tickets_fetched`, the matchmaking logic may be undesirable. |
| `minimatch.backend.fetch_tickets_latency` | Histogram | Latency of the time the Ticket has been fetched by backends. If this value is slow, you may have a Redis performance problem or a lock conflict with assign tickets or other backends. |
| `minimatch.backend.match_function_latency` | Histogram | Latency of Match Function calls. |
| `minimatch.backend.assigner_latency` | Histogram | Latency of Assigner calls. |
| `minimatch.backend.assign_to_redis_latency` | Histogram | Latency to write Assign results to Redis. If this value is slow, you may have a Redis performance problem or a lock conflict with tickets_fetched or other backends. |

## Meter provider

Expand Down
25 changes: 23 additions & 2 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package minimatch

import (
"context"
"sync/atomic"
"time"

"go.opentelemetry.io/otel/attribute"
Expand All @@ -18,16 +19,21 @@ var (
defaultHistogramBuckets = []float64{
.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10,
}
keyTicketStatus = attribute.Key("status")
attributeActiveTicket = keyTicketStatus.String("active")
)

type backendMetrics struct {
meter metric.Meter
ticketsFetched metric.Int64Counter
ticketsAssigned metric.Int64Counter
ticketCount metric.Int64ObservableUpDownCounter
fetchTicketsLatency metric.Float64Histogram
matchFunctionLatency metric.Float64Histogram
assignerLatency metric.Float64Histogram
assignToRedisLatency metric.Float64Histogram

ticketCountActive atomic.Int64
}

func newBackendMetrics(provider metric.MeterProvider) (*backendMetrics, error) {
Expand Down Expand Up @@ -64,15 +70,26 @@ func newBackendMetrics(provider metric.MeterProvider) (*backendMetrics, error) {
if err != nil {
return nil, err
}
return &backendMetrics{
metrics := &backendMetrics{
meter: meter,
ticketsFetched: ticketsFetched,
ticketsAssigned: ticketsAssigned,
fetchTicketsLatency: fetchTicketsLatency,
matchFunctionLatency: matchFunctionLatency,
assignerLatency: assignerLatency,
assignToRedisLatency: assignToRedisLatency,
}, nil
}
ticketCount, err := meter.Int64ObservableUpDownCounter("minimatch.backend.tickets.count",
metric.WithDescription("Total number of tickets. Do not sum this counter, as a single backend counts all tickets."),
metric.WithInt64Callback(func(ctx context.Context, o metric.Int64Observer) error {
o.Observe(metrics.ticketCountActive.Load(), metric.WithAttributes(attributeActiveTicket))
return nil
}))
if err != nil {
return nil, err
}
metrics.ticketCount = ticketCount
return metrics, nil
}

func (m *backendMetrics) recordMatchFunctionLatency(ctx context.Context, seconds float64, matchProfile *pb.MatchProfile) {
Expand All @@ -99,6 +116,10 @@ func (m *backendMetrics) recordAssignToRedisLatency(ctx context.Context, latency
m.assignToRedisLatency.Record(ctx, latency.Seconds())
}

func (m *backendMetrics) recordTicketCountActive(ctx context.Context, count int64) {
m.ticketCountActive.Store(count)
}

type matchFunctionWithMetrics struct {
mmf MatchFunction
metrics *backendMetrics
Expand Down
10 changes: 5 additions & 5 deletions pkg/statestore/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,10 @@ func (s *RedisStore) GetAssignment(ctx context.Context, ticketID string) (*pb.As
return s.getAssignment(ctx, redis, ticketID)
}

// The ActiveTicketIDs may still contain the ID of a ticket that was deleted by TTL.
// GetActiveTicketIDs may also retrieve tickets deleted by TTL.
// This is because the ticket index and Ticket data are stored in separate keys.
// The next `GetTicket` or `GetTickets` call will resolve this inconsistency.
func (s *RedisStore) GetActiveTicketIDs(ctx context.Context, limit int64) ([]string, error) {
func (s *RedisStore) GetActiveTicketIDs(ctx context.Context) ([]string, error) {
// Acquire a lock to prevent multiple backends from fetching the same Ticket.
// In order to avoid race conditions with other Ticket Index changes, get tickets and set them to pending state should be done atomically.
lockedCtx, unlock, err := s.locker.WithContext(ctx, redisKeyFetchTicketsLock(s.opts.keyPrefix))
Expand All @@ -168,7 +168,7 @@ func (s *RedisStore) GetActiveTicketIDs(ctx context.Context, limit int64) ([]str
}
defer unlock()

allTicketIDs, err := s.getAllTicketIDs(lockedCtx, limit)
allTicketIDs, err := s.getAllTicketIDs(lockedCtx)
if err != nil {
return nil, fmt.Errorf("failed to get all ticket IDs: %w", err)
}
Expand All @@ -189,8 +189,8 @@ func (s *RedisStore) GetActiveTicketIDs(ctx context.Context, limit int64) ([]str
return activeTicketIDs, nil
}

func (s *RedisStore) getAllTicketIDs(ctx context.Context, limit int64) ([]string, error) {
resp := s.client.Do(ctx, s.client.B().Srandmember().Key(redisKeyTicketIndex(s.opts.keyPrefix)).Count(limit).Build())
func (s *RedisStore) getAllTicketIDs(ctx context.Context) ([]string, error) {
resp := s.client.Do(ctx, s.client.B().Smembers().Key(redisKeyTicketIndex(s.opts.keyPrefix)).Build())
if err := resp.Error(); err != nil {
if rueidis.IsRedisNil(err) {
return nil, nil
Expand Down
43 changes: 12 additions & 31 deletions pkg/statestore/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@ import (
"open-match.dev/open-match/pkg/pb"
)

const (
defaultFetchTicketsLimit int64 = 10000
)

func newTestRedisStore(t *testing.T, addr string, opts ...RedisOption) *RedisStore {
copt := rueidis.ClientOption{InitAddress: []string{addr}, DisableCache: true}
rc, err := rueidis.NewClient(copt)
Expand All @@ -41,18 +37,18 @@ func TestPendingRelease(t *testing.T) {

require.NoError(t, store.CreateTicket(ctx, &pb.Ticket{Id: "test1"}))
require.NoError(t, store.CreateTicket(ctx, &pb.Ticket{Id: "test2"}))
activeTicketIDs, err := store.GetActiveTicketIDs(ctx, defaultFetchTicketsLimit)
activeTicketIDs, err := store.GetActiveTicketIDs(ctx)
require.NoError(t, err)
require.ElementsMatch(t, activeTicketIDs, []string{"test1", "test2"})

activeTicketIDs, err = store.GetActiveTicketIDs(ctx, defaultFetchTicketsLimit)
activeTicketIDs, err = store.GetActiveTicketIDs(ctx)
require.NoError(t, err)
require.Empty(t, activeTicketIDs)

// release one ticket
require.NoError(t, store.ReleaseTickets(ctx, []string{"test1"}))

activeTicketIDs, err = store.GetActiveTicketIDs(ctx, defaultFetchTicketsLimit)
activeTicketIDs, err = store.GetActiveTicketIDs(ctx)
require.NoError(t, err)
require.ElementsMatch(t, activeTicketIDs, []string{"test1"})
}
Expand All @@ -67,12 +63,12 @@ func TestPendingReleaseTimeout(t *testing.T) {
require.NoError(t, store.CreateTicket(ctx, &pb.Ticket{Id: "test"}))

// get active tickets for proposal (active -> pending)
activeTicketIDs, err := store.GetActiveTicketIDs(ctx, defaultFetchTicketsLimit)
activeTicketIDs, err := store.GetActiveTicketIDs(ctx)
require.NoError(t, err)
require.Len(t, activeTicketIDs, 1)

// 0 active ticket
activeTicketIDs, err = store.GetActiveTicketIDs(ctx, defaultFetchTicketsLimit)
activeTicketIDs, err = store.GetActiveTicketIDs(ctx)
require.NoError(t, err)
require.Len(t, activeTicketIDs, 0)

Expand All @@ -81,34 +77,19 @@ func TestPendingReleaseTimeout(t *testing.T) {
require.NoError(t, err)

// 1 active ticket
activeTicketIDs, err = store.GetActiveTicketIDs(ctx, defaultFetchTicketsLimit)
activeTicketIDs, err = store.GetActiveTicketIDs(ctx)
require.NoError(t, err)
require.Len(t, activeTicketIDs, 1)
}

func TestGetActiveTicketIDsLimit(t *testing.T) {
mr := miniredis.RunT(t)
store := newTestRedisStore(t, mr.Addr())
ctx := context.Background()

for i := 0; i < 10; i++ {
require.NoError(t, store.CreateTicket(ctx, &pb.Ticket{Id: fmt.Sprintf("test-%d", i)}))
}
for i := 0; i < 3; i++ {
activeTicketIDs, err := store.GetActiveTicketIDs(ctx, 4)
require.NoError(t, err)
require.LessOrEqual(t, len(activeTicketIDs), 4)
}
}

func TestAssignedDeleteTimeout(t *testing.T) {
mr := miniredis.RunT(t)
store := newTestRedisStore(t, mr.Addr())
ctx := context.Background()

require.NoError(t, store.CreateTicket(ctx, &pb.Ticket{Id: "test1"}))
require.NoError(t, store.CreateTicket(ctx, &pb.Ticket{Id: "test2"}))
activeTicketIDs, err := store.GetActiveTicketIDs(ctx, defaultFetchTicketsLimit)
activeTicketIDs, err := store.GetActiveTicketIDs(ctx)
require.NoError(t, err)
require.ElementsMatch(t, activeTicketIDs, []string{"test1", "test2"})

Expand Down Expand Up @@ -169,7 +150,7 @@ func TestTicketTTL(t *testing.T) {
_, err = store.GetTicket(ctx, "test1")
require.Error(t, err, ErrTicketNotFound)

activeTicketIDs, err := store.GetActiveTicketIDs(ctx, defaultFetchTicketsLimit)
activeTicketIDs, err := store.GetActiveTicketIDs(ctx)
require.NoError(t, err)
require.NotContains(t, activeTicketIDs, "test1")

Expand All @@ -190,7 +171,7 @@ func TestTicketTTL(t *testing.T) {
// This is because the ticket index and Ticket data are stored in separate keys.

// In this example, "test2" and "test3" were deleted by TTL, but remain in the ticket index.
activeTicketIDs, err = store.GetActiveTicketIDs(ctx, defaultFetchTicketsLimit)
activeTicketIDs, err = store.GetActiveTicketIDs(ctx)
require.NoError(t, err)
require.ElementsMatch(t, activeTicketIDs, []string{"test2", "test3", "test4"})
err = store.ReleaseTickets(ctx, []string{"test2", "test3", "test4"})
Expand All @@ -203,7 +184,7 @@ func TestTicketTTL(t *testing.T) {

// Because we called GetTickets, "test2" and "test3" which were deleted by TTL,
// were deleted from the ticket index as well.
activeTicketIDs, err = store.GetActiveTicketIDs(ctx, defaultFetchTicketsLimit)
activeTicketIDs, err = store.GetActiveTicketIDs(ctx)
require.NoError(t, err)
require.ElementsMatch(t, activeTicketIDs, []string{"test4"})
}
Expand All @@ -225,7 +206,7 @@ func TestConcurrentFetchActiveTickets(t *testing.T) {
duplicateMap := map[string]struct{}{}
for i := 0; i < concurrency; i++ {
eg.Go(func() error {
ticketIDs, err := store.GetActiveTicketIDs(ctx, 1000)
ticketIDs, err := store.GetActiveTicketIDs(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -262,7 +243,7 @@ func TestConcurrentFetchAndAssign(t *testing.T) {
eg, _ := errgroup.WithContext(ctx)
for i := 0; i < concurrency; i++ {
eg.Go(func() error {
ticketIDs, err := store.GetActiveTicketIDs(ctx, 1000)
ticketIDs, err := store.GetActiveTicketIDs(ctx)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/statestore/statestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type StateStore interface {
GetTicket(ctx context.Context, ticketID string) (*pb.Ticket, error)
GetTickets(ctx context.Context, ticketIDs []string) ([]*pb.Ticket, error)
GetAssignment(ctx context.Context, ticketID string) (*pb.Assignment, error)
GetActiveTicketIDs(ctx context.Context, limit int64) ([]string, error)
GetActiveTicketIDs(ctx context.Context) ([]string, error)
ReleaseTickets(ctx context.Context, ticketIDs []string) error
AssignTickets(ctx context.Context, asgs []*pb.AssignmentGroup) error
}
4 changes: 2 additions & 2 deletions pkg/statestore/ticketcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ func (s *StoreWithTicketCache) GetAssignment(ctx context.Context, ticketID strin
return s.origin.GetAssignment(ctx, ticketID)
}

func (s *StoreWithTicketCache) GetActiveTicketIDs(ctx context.Context, limit int64) ([]string, error) {
return s.origin.GetActiveTicketIDs(ctx, limit)
func (s *StoreWithTicketCache) GetActiveTicketIDs(ctx context.Context) ([]string, error) {
return s.origin.GetActiveTicketIDs(ctx)
}

func (s *StoreWithTicketCache) ReleaseTickets(ctx context.Context, ticketIDs []string) error {
Expand Down
8 changes: 6 additions & 2 deletions pkg/statestore/ticketcache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,18 @@ func TestTicketCache(t *testing.T) {
require.NoError(t, err)
require.ElementsMatch(t, []string{"t2", "t3", "t4"}, getTicketIDs(ts))

// delete "t3" in redis
require.NoError(t, redisStore.DeleteTicket(ctx, "t3"))
ts, err = store.GetTickets(ctx, []string{"t2", "t3", "t4", "t5"})

// "t3" is still in cache
ts, err = store.GetTickets(ctx, []string{"t2", "t3", "t4"})
require.NoError(t, err)
require.ElementsMatch(t, []string{"t2", "t3", "t4"}, getTicketIDs(ts))

// expires "t3" cache
time.Sleep(ttl + 10*time.Millisecond)

ts, err = store.GetTickets(ctx, []string{"t2", "t3", "t4", "t5"})
ts, err = store.GetTickets(ctx, []string{"t2", "t3", "t4"})
require.NoError(t, err)
require.ElementsMatch(t, []string{"t2", "t4"}, getTicketIDs(ts))
}

0 comments on commit 825088b

Please sign in to comment.