Skip to content

Commit

Permalink
Merge pull request #33 from castaneai/loadtest/add-connection-check
Browse files Browse the repository at this point in the history
loadtest: add connection check
  • Loading branch information
castaneai authored Nov 6, 2024
2 parents c4b6628 + 04d301b commit ac28fb7
Show file tree
Hide file tree
Showing 12 changed files with 586 additions and 87 deletions.
5 changes: 4 additions & 1 deletion charts/minimatch-scaled/templates/backend.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ spec:
- name: {{ .Chart.Name }}-backend
image: {{ .Values.backend.deployment.image }}
env:
{{- toYaml .Values.backend.deployment.env | nindent 12 }}
{{- range .Values.backend.deployment.env }}
- name: {{ .name }}
value: "{{ .value }}"
{{- end }}
resources:
{{- toYaml .Values.backend.deployment.resources | nindent 12 }}
ports:
Expand Down
5 changes: 4 additions & 1 deletion charts/minimatch-scaled/templates/frontend.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ spec:
- name: {{ .Chart.Name }}-frontend
image: {{ .Values.frontend.deployment.image }}
env:
{{- toYaml .Values.frontend.deployment.env | nindent 12 }}
{{- range .Values.frontend.deployment.env }}
- name: {{ .name }}
value: "{{ .value }}"
{{- end }}
resources:
{{- toYaml .Values.frontend.deployment.resources | nindent 12 }}
ports:
Expand Down
2 changes: 1 addition & 1 deletion loadtest/aqua.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
# - all
registries:
- type: standard
ref: v4.246.0 # renovate: depName=aquaproj/aqua-registry
ref: v4.247.0 # renovate: depName=aquaproj/aqua-registry
packages:
- name: ko-build/[email protected]
- name: helmfile/[email protected]
Expand Down
6 changes: 5 additions & 1 deletion loadtest/charts/attacker/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ spec:
containers:
- name: {{ .Chart.Name }}
image: "{{ .Values.image }}"
args: ["-rps", "{{ .Values.rps }}", "-addr", "{{ .Values.frontendAddr }}", "-timeout", "{{ .Values.matchTimeout }}"]
args: ["-rps", "{{ .Values.rps }}",
"-addr", "{{ .Values.frontendAddr }}",
"-timeout", "{{ .Values.matchTimeout }}",
"-redis_addr", "{{ .Values.redisAddr }}",
"-connect_timeout", "{{ .Values.connectTimeout }}"]
ports:
- name: metrics
containerPort: 2112
Expand Down
2 changes: 2 additions & 0 deletions loadtest/charts/attacker/values.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
rps: "1"
frontendAddr:
redisAddr:
matchTimeout: "10s"
connectTimeout: "3s"
replicas: 1
image:
podAnnotations: {}
Expand Down
123 changes: 110 additions & 13 deletions loadtest/cmd/attacker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/redis/rueidis"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
Expand All @@ -28,6 +29,12 @@ const (
matchStatusAssigned = "assigned"
matchStatusTimeout = "timeout"
matchStatusError = "error"

connectStatusOk = "ok"
connectStatusTimeout = "timeout"
connectStatusError = "error"

ticketsPerMatch = 2
)

var (
Expand All @@ -40,20 +47,41 @@ var (
Namespace: metricsNamespace,
Buckets: []float64{.25, .5, 1, 2, 4, 8, 16, 30},
})
connectFinishedTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "connect_finished_total",
Namespace: metricsNamespace,
}, []string{"status"})
connectedDuration = promauto.NewHistogram(prometheus.HistogramOpts{
Name: "connected_duration_seconds",
Namespace: metricsNamespace,
Buckets: []float64{.25, .5, 1, 2, 4, 8, 16, 30},
})
errWatchTicketTimeout = errors.New("watch ticket timed out")
errWatchAssignmentTimeout = errors.New("watch assignment timed out")
)

func main() {
var (
rps float64
frontendAddr string
matchTimeout time.Duration
rps float64
frontendAddr string
redisAddr string
matchTimeout time.Duration
connectTimeout time.Duration
)
flag.Float64Var(&rps, "rps", 1.0, "RPS (request per second)")
flag.StringVar(&frontendAddr, "addr", "localhost:50504", "An address of Open Match frontend")
flag.StringVar(&redisAddr, "redis_addr", "localhost:6379", "An address of redis instance for connection check")
flag.DurationVar(&matchTimeout, "timeout", 10*time.Second, "Matching timeout")
flag.DurationVar(&connectTimeout, "connect_timeout", 5*time.Second, "Connecting timeout")
flag.Parse()

log.Printf("minimatch load-testing (rps: %.2f, frontend: %s, timeout: %s)", rps, frontendAddr, matchTimeout)
log.Printf("minimatch load-testing (rps: %.2f, frontend: %s, match_timeout: %s, redis: %s, connect_timeout: %s)",
rps, frontendAddr, matchTimeout, redisAddr, connectTimeout)

redis, err := newRedisClient(redisAddr)
if err != nil {
log.Fatalf("failed to create redis client: %+v", err)
}

ctx, shutdown := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer shutdown()
Expand Down Expand Up @@ -81,12 +109,12 @@ func main() {
log.Printf("shutting down attacker...")
return
case <-ticker.C:
go createAndWatchTicket(context.Background(), frontendAddr, matchTimeout)
go createAndWatchTicket(context.Background(), frontendAddr, redis, matchTimeout, connectTimeout)
}
}
}

func createAndWatchTicket(ctx context.Context, omFrontendAddr string, timeout time.Duration) {
func createAndWatchTicket(ctx context.Context, omFrontendAddr string, redis rueidis.Client, matchTimeout, connectTimeout time.Duration) {
frontendClient, closer, err := newOMFrontendClient(omFrontendAddr)
if err != nil {
log.Printf("failed to create frontend client: %+v", err)
Expand All @@ -101,18 +129,24 @@ func createAndWatchTicket(ctx context.Context, omFrontendAddr string, timeout ti
log.Printf("failed to create ticket: %+v", err)
return
}
watchTickets(ctx, frontendClient, ticket, timeout)
as, err := watchTickets(ctx, frontendClient, ticket, matchTimeout)
if err != nil && !errors.Is(err, errWatchTicketTimeout) {
log.Printf("failed to watch tickets: %+v", err)
}
if err := waitConnection(ctx, redis, as, ticket, connectTimeout); err != nil && !errors.Is(err, errWatchAssignmentTimeout) {
log.Printf("failed to wait connection: %+v", err)
}
}

func watchTickets(ctx context.Context, omFrontend pb.FrontendServiceClient, ticket *pb.Ticket, timeout time.Duration) {
func watchTickets(ctx context.Context, omFrontend pb.FrontendServiceClient, ticket *pb.Ticket, timeout time.Duration) (*pb.Assignment, error) {
started := time.Now()
ctx, cancel := context.WithCancel(ctx)
defer cancel()

stream, err := omFrontend.WatchAssignments(ctx, &pb.WatchAssignmentsRequest{TicketId: ticket.Id})
if err != nil {
log.Printf("failed to open watch assignments stream: %+v", err)
return
return nil, err
}

respCh := make(chan *pb.Assignment)
Expand Down Expand Up @@ -144,16 +178,63 @@ func watchTickets(ctx context.Context, omFrontend pb.FrontendServiceClient, tick

select {
case <-ctx.Done():
return
return nil, ctx.Err()
case <-time.After(timeout):
matchFinishedTotal.With(prometheus.Labels{"status": matchStatusTimeout}).Inc()
return
case <-respCh:
return nil, errWatchTicketTimeout
case as := <-respCh:
matchFinishedTotal.With(prometheus.Labels{"status": matchStatusAssigned}).Inc()
matchAssignedDuration.Observe(time.Since(started).Seconds())
return as, nil
case err := <-errCh:
matchFinishedTotal.With(prometheus.Labels{"status": matchStatusError}).Inc()
log.Printf("failed to watch assignment: %+v", err)
return nil, err
}
}

func waitConnection(ctx context.Context, redis rueidis.Client, as *pb.Assignment, ticket *pb.Ticket, connectTimeout time.Duration) error {
key := redisKeyAssignment(as)
cmds := []rueidis.Completed{
redis.B().Sadd().Key(key).Member(ticket.Id).Build(),
redis.B().Expire().Key(key).Seconds(int64(connectTimeout.Seconds())).Nx().Build(),
}
resps := redis.DoMulti(ctx, cmds...)
for _, resp := range resps {
if err := resp.Error(); err != nil {
return err
}
}

ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
timeout := time.After(connectTimeout)
started := time.Now()
for {
select {
case <-ctx.Done():
connectFinishedTotal.With(prometheus.Labels{"status": connectStatusError}).Inc()
return ctx.Err()
case <-ticker.C:
cmd := redis.B().Scard().Key(key).Build()
resp := redis.Do(ctx, cmd)
if err := resp.Error(); err != nil {
connectFinishedTotal.With(prometheus.Labels{"status": connectStatusError}).Inc()
return fmt.Errorf("failed to SCARD assignment: %w", err)
}
cnt, err := resp.AsInt64()
if err != nil {
connectFinishedTotal.With(prometheus.Labels{"status": connectStatusError}).Inc()
return fmt.Errorf("failed to decode SCARD response as int: %w", err)
}
if cnt >= ticketsPerMatch {
connectFinishedTotal.With(prometheus.Labels{"status": connectStatusOk}).Inc()
connectedDuration.Observe(time.Since(started).Seconds())
return nil
}
case <-timeout:
connectFinishedTotal.With(prometheus.Labels{"status": connectStatusTimeout}).Inc()
return errWatchAssignmentTimeout
}
}
}

Expand All @@ -167,3 +248,19 @@ func newOMFrontendClient(addr string) (pb.FrontendServiceClient, io.Closer, erro
}
return pb.NewFrontendServiceClient(cc), cc, nil
}

func newRedisClient(addr string) (rueidis.Client, error) {
opt := rueidis.ClientOption{
InitAddress: []string{addr},
DisableCache: true,
}
client, err := rueidis.NewClient(opt)
if err != nil {
return nil, fmt.Errorf("failed to create redis client: %w", err)
}
return client, nil
}

func redisKeyAssignment(as *pb.Assignment) string {
return fmt.Sprintf("attacker:as:%s", as.Connection)
}
3 changes: 2 additions & 1 deletion loadtest/cmd/backend/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type config struct {
TickRate time.Duration `envconfig:"TICK_RATE" default:"1s"`
TicketCacheTTL time.Duration `envconfig:"TICKET_CACHE_TTL" default:"10s"`
OverlappingCheckRedisAddr string `envconfig:"OVERLAPPING_CHECK_REDIS_ADDR"`
TicketValidationEnabled bool `envconfig:"TICKET_VALIDATION_ENABLED" default:"true"`
}

var matchProfile = &pb.MatchProfile{
Expand Down Expand Up @@ -69,7 +70,7 @@ func main() {
store := statestore.NewBackendStoreWithTicketCache(redisStore, ticketCache,
statestore.WithTicketCacheTTL(conf.TicketCacheTTL))
assigner, err := newAssigner(&conf, meterProvider)
backend, err := minimatch.NewBackend(store, assigner)
backend, err := minimatch.NewBackend(store, assigner, minimatch.WithTicketValidationBeforeAssign(conf.TicketValidationEnabled))
if err != nil {
log.Fatalf("failed to create backend: %+v", err)
}
Expand Down
12 changes: 11 additions & 1 deletion loadtest/cmd/frontend/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ import (
"github.com/redis/rueidis"
"github.com/redis/rueidis/rueidislock"
"github.com/redis/rueidis/rueidisotel"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/prometheus"
"go.opentelemetry.io/otel/sdk/metric"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
"open-match.dev/open-match/pkg/pb"

"github.com/castaneai/minimatch"
Expand Down Expand Up @@ -56,7 +58,15 @@ func main() {
store := statestore.NewFrontendStoreWithTicketCache(redisStore, ticketCache,
statestore.WithTicketCacheTTL(conf.TicketCacheTTL))

sv := grpc.NewServer()
serverOpts := []grpc.ServerOption{
grpc.KeepaliveParams(keepalive.ServerParameters{
MaxConnectionIdle: 3 * time.Second,
Time: 1 * time.Second,
Timeout: 5 * time.Second,
}),
grpc.StatsHandler(otelgrpc.NewServerHandler()),
}
sv := grpc.NewServer(serverOpts...)
pb.RegisterFrontendServiceServer(sv, minimatch.NewFrontendService(store))

addr := fmt.Sprintf(":%s", conf.Port)
Expand Down
33 changes: 17 additions & 16 deletions loadtest/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,25 @@ require (
github.com/prometheus/client_golang v1.17.0
github.com/redis/rueidis v1.0.31
github.com/redis/rueidis/rueidisotel v1.0.31
go.opentelemetry.io/otel v1.24.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.56.0
go.opentelemetry.io/otel v1.31.0
go.opentelemetry.io/otel/exporters/prometheus v0.44.0
go.opentelemetry.io/otel/metric v1.24.0
go.opentelemetry.io/otel/sdk/metric v1.24.0
golang.org/x/sync v0.6.0
google.golang.org/grpc v1.62.1
go.opentelemetry.io/otel/metric v1.31.0
go.opentelemetry.io/otel/sdk/metric v1.31.0
golang.org/x/sync v0.8.0
google.golang.org/grpc v1.67.1
open-match.dev/open-match v1.8.1
)

require (
github.com/alicebob/gopher-json v0.0.0-20230218143504-906a9b012302 // indirect
github.com/alicebob/miniredis/v2 v2.32.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.2 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
Expand All @@ -39,14 +41,13 @@ require (
github.com/rs/xid v1.5.0 // indirect
github.com/sethvargo/go-retry v0.2.4 // indirect
github.com/yuin/gopher-lua v1.1.1 // indirect
go.opentelemetry.io/otel/sdk v1.24.0 // indirect
go.opentelemetry.io/otel/trace v1.24.0 // indirect
go.opentelemetry.io/otel/sdk v1.31.0 // indirect
go.opentelemetry.io/otel/trace v1.31.0 // indirect
golang.org/x/exp v0.0.0-20220328175248-053ad81199eb // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto v0.0.0-20240123012728-ef4313101c80 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240123012728-ef4313101c80 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240123012728-ef4313101c80 // indirect
google.golang.org/protobuf v1.33.0 // indirect
golang.org/x/net v0.30.0 // indirect
golang.org/x/sys v0.26.0 // indirect
golang.org/x/text v0.19.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240814211410-ddb44dafa142 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241007155032-5fefd90f89a9 // indirect
google.golang.org/protobuf v1.35.1 // indirect
)
Loading

0 comments on commit ac28fb7

Please sign in to comment.