Skip to content

Commit

Permalink
Merge pull request #35 from castaneai/deindex-ticket-api
Browse files Browse the repository at this point in the history
frontend: DeindexTicket API
  • Loading branch information
castaneai authored Nov 8, 2024
2 parents 30c4416 + 1741354 commit 3eacddc
Show file tree
Hide file tree
Showing 10 changed files with 369 additions and 79 deletions.
12 changes: 12 additions & 0 deletions api/openmatch/frontend.proto
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ message UpdateBackfillRequest {
Backfill backfill = 1;
}

message DeindexTicketRequest {
string ticket_id = 1;
}

message DeindexTicketResponse {
}

service FrontendService {
rpc CreateTicket(CreateTicketRequest) returns (Ticket);
rpc DeleteTicket(DeleteTicketRequest) returns (google.protobuf.Empty);
Expand All @@ -61,4 +68,9 @@ service FrontendService {
rpc DeleteBackfill(DeleteBackfillRequest) returns (google.protobuf.Empty);
rpc GetBackfill(GetBackfillRequest) returns (Backfill);
rpc UpdateBackfill(UpdateBackfillRequest) returns (Backfill);

// DeindexTickets removes the ticket from the matching candidates.
// unlike DeleteTicket, it does not delete the ticket body;
// you can still get the Assignment with GetTicket after Deindex.
rpc DeindexTicket(DeindexTicketRequest) returns (DeindexTicketResponse);
}
12 changes: 12 additions & 0 deletions docs/differences.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,15 @@ minimatch stores Ticket and Assignment in separate keys.

If the Ticket ID is `abc`, the Ticket key is `abc` and the Assignement key is `assign:abc`.
See also [Scalable minimatch](./scalable.md) for how to store each in a different Redis instance.

## DeindexTicket API

The minimatch Frontend has a DeindexTicket API that the Open Match Frontend does not have.

DeindexTickets removes the ticket from the matching candidates.
unlike DeleteTicket, it does not delete the ticket body;
you can still get the Assignment with GetTicket after Deindex.

To use the Frontend API added by minimatch,
you need to use [connect-go](https://github.com/connectrpc/connect-go) and `github.com/castaneai/minimatch/gen/openmatch` instead of package `open-match.dev/open-match`.
Please see [examples/frontendclient](../examples/frontendclient/frontendclient.go).
34 changes: 34 additions & 0 deletions examples/frontendclient/frontendclient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package main

import (
"context"
"log"
"net/http"
"os/signal"
"syscall"

"connectrpc.com/connect"

pb "github.com/castaneai/minimatch/gen/openmatch"
"github.com/castaneai/minimatch/gen/openmatch/openmatchconnect"
)

func main() {
ctx, shutdown := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer shutdown()

client := openmatchconnect.NewFrontendServiceClient(http.DefaultClient, "http://localhost:50504", connect.WithGRPC())
resp, err := client.CreateTicket(ctx, connect.NewRequest(&pb.CreateTicketRequest{Ticket: &pb.Ticket{}}))
if err != nil {
log.Printf("failed to create ticket: %+v", err)
return
}
ticket := resp.Msg
log.Printf("ticket created: %s", ticket.Id)

if _, err := client.DeindexTicket(ctx, connect.NewRequest(&pb.DeindexTicketRequest{TicketId: ticket.Id})); err != nil {
log.Printf("failed to deindex ticket: %+v", err)
return
}
log.Printf("ticket deindexed: %s", ticket.Id)
}
10 changes: 10 additions & 0 deletions frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,16 @@ func (s *FrontendService) UpdateBackfill(ctx context.Context, request *connect.R
return nil, connect.NewError(connect.CodeUnimplemented, errors.New("not implemented"))
}

func (s *FrontendService) DeindexTicket(ctx context.Context, req *connect.Request[pb.DeindexTicketRequest]) (*connect.Response[pb.DeindexTicketResponse], error) {
if req.Msg.TicketId == "" {
return nil, connect.NewError(connect.CodeInvalidArgument, errors.New("invalid ticket_id"))
}
if err := s.store.DeindexTicket(ctx, req.Msg.TicketId); err != nil {
return nil, err
}
return connect.NewResponse(&pb.DeindexTicketResponse{}), nil
}

func newWatchAssignmentBackoff() retry.Backoff {
return retry.NewConstant(watchAssignmentInterval)
}
281 changes: 202 additions & 79 deletions gen/openmatch/frontend.pb.go

Large diffs are not rendered by default.

36 changes: 36 additions & 0 deletions gen/openmatch/openmatchconnect/frontend.connect.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/statestore/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ type FrontendStore interface {
DeleteTicket(ctx context.Context, ticketID string) error
GetTicket(ctx context.Context, ticketID string) (*pb.Ticket, error)
GetAssignment(ctx context.Context, ticketID string) (*pb.Assignment, error)
DeindexTicket(ctx context.Context, ticketID string) error
}
4 changes: 4 additions & 0 deletions pkg/statestore/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,10 @@ func (s *RedisStore) GetTicketCount(ctx context.Context) (int64, error) {
return count, nil
}

func (s *RedisStore) DeindexTicket(ctx context.Context, ticketID string) error {
return s.deIndexTickets(ctx, []string{ticketID})
}

func (s *RedisStore) getTicket(ctx context.Context, client rueidis.Client, ticketID string) (*pb.Ticket, error) {
resp := client.Do(ctx, client.B().Get().Key(redisKeyTicketData(s.opts.keyPrefix, ticketID)).Build())
if err := resp.Error(); err != nil {
Expand Down
54 changes: 54 additions & 0 deletions pkg/statestore/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,60 @@ func TestReadReplica(t *testing.T) {
require.Equal(t, "replica", t2.SearchFields.Tags[0])
}

func TestDeindexTicket(t *testing.T) {
mr := miniredis.RunT(t)
ticketTTL := 5 * time.Second
store := newTestRedisStore(t, mr.Addr())
ctx := context.Background()

mustCreateTicket := func(id string) {
require.NoError(t, store.CreateTicket(ctx, &pb.Ticket{Id: id}, ticketTTL))
ticket, err := store.GetTicket(ctx, id)
require.NoError(t, err)
require.Equal(t, id, ticket.Id)
}

mustCreateTicket("t1")
mustCreateTicket("t2")

activeTicketIDs, err := store.GetActiveTicketIDs(ctx, defaultGetTicketLimit)
require.NoError(t, err)
require.ElementsMatch(t, []string{"t1", "t2"}, activeTicketIDs)

t1, err := store.GetTicket(ctx, "t1")
require.NoError(t, err)
require.Equal(t, "t1", t1.Id)
t2, err := store.GetTicket(ctx, "t2")
require.NoError(t, err)
require.Equal(t, "t2", t2.Id)

err = store.ReleaseTickets(ctx, activeTicketIDs)
require.NoError(t, err)

err = store.DeindexTicket(ctx, "t1")
require.NoError(t, err)

activeTicketIDs, err = store.GetActiveTicketIDs(ctx, defaultGetTicketLimit)
require.NoError(t, err)
require.ElementsMatch(t, []string{"t2"}, activeTicketIDs)

t1, err = store.GetTicket(ctx, "t1")
require.NoError(t, err)
require.Equal(t, "t1", t1.Id)
t2, err = store.GetTicket(ctx, "t2")
require.NoError(t, err)
require.Equal(t, "t2", t2.Id)

err = store.ReleaseTickets(ctx, activeTicketIDs)
require.NoError(t, err)

err = store.DeleteTicket(ctx, "t1")
require.NoError(t, err)

_, err = store.GetTicket(ctx, "t1")
require.ErrorIs(t, err, ErrTicketNotFound)
}

// https://stackoverflow.com/a/72408490
func chunkBy[T any](items []T, chunkSize int) (chunks [][]T) {
for chunkSize < len(items) {
Expand Down
4 changes: 4 additions & 0 deletions pkg/statestore/ticketcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ func (s *FrontendStoreWithTicketCache) GetAssignment(ctx context.Context, ticket
return s.origin.GetAssignment(ctx, ticketID)
}

func (s *FrontendStoreWithTicketCache) DeindexTicket(ctx context.Context, ticketID string) error {
return s.origin.DeindexTicket(ctx, ticketID)
}

// BackendStoreWithTicketCache caches GetTickets results in-memory with TTL
type BackendStoreWithTicketCache struct {
origin BackendStore
Expand Down

0 comments on commit 3eacddc

Please sign in to comment.