Skip to content

Commit

Permalink
Merge pull request #38 from castaneai/loadtest-logger
Browse files Browse the repository at this point in the history
loadtest: use slog instead of log
  • Loading branch information
castaneai authored Nov 13, 2024
2 parents 04b9c5d + 131904a commit 86de02a
Show file tree
Hide file tree
Showing 5 changed files with 217 additions and 57 deletions.
57 changes: 40 additions & 17 deletions loadtest/cmd/attacker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ import (
"flag"
"fmt"
"io"
"log"
"log/slog"
"net/http"
"os"
"os/signal"
"syscall"
"time"

"github.com/kitagry/slogdriver"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
Expand Down Expand Up @@ -75,12 +76,15 @@ func main() {
flag.DurationVar(&connectTimeout, "connect_timeout", 5*time.Second, "Connecting timeout")
flag.Parse()

log.Printf("minimatch load-testing (rps: %.2f, frontend: %s, match_timeout: %s, redis: %s, connect_timeout: %s)",
rps, frontendAddr, matchTimeout, redisAddr, connectTimeout)
logger := initLogger()

logger.Info(fmt.Sprintf("minimatch load-testing (rps: %.2f, frontend: %s, match_timeout: %s, redis: %s, connect_timeout: %s)",
rps, frontendAddr, matchTimeout, redisAddr, connectTimeout))

redis, err := newRedisClient(redisAddr)
if err != nil {
log.Fatalf("failed to create redis client: %+v", err)
logger.Error(fmt.Sprintf("failed to create redis client: %+v", err), "error", err)
os.Exit(1)
}

ctx, shutdown := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
Expand All @@ -90,13 +94,13 @@ func main() {
http.Handle("/metrics", promhttp.Handler())
go func() {
addr := ":2112"
log.Printf("prometheus endpoint (/metrics) is listening on %s...", addr)
logger.Info(fmt.Sprintf("prometheus endpoint (/metrics) is listening on %s...", addr))
server := &http.Server{
Addr: addr,
ReadHeaderTimeout: 10 * time.Second, // https://app.deepsource.com/directory/analyzers/go/issues/GO-S2114
}
if err := server.ListenAndServe(); err != nil {
log.Printf("failed to serve prometheus endpoint: %+v", err)
logger.Error(fmt.Sprintf("failed to serve prometheus endpoint: %+v", err), "error", err)
}
}()

Expand All @@ -106,18 +110,18 @@ func main() {
for {
select {
case <-ctx.Done():
log.Printf("shutting down attacker...")
logger.Info("shutting down attacker...")
return
case <-ticker.C:
go createAndWatchTicket(context.Background(), frontendAddr, redis, matchTimeout, connectTimeout)
go createAndWatchTicket(context.Background(), frontendAddr, redis, matchTimeout, connectTimeout, logger)
}
}
}

func createAndWatchTicket(ctx context.Context, omFrontendAddr string, redis rueidis.Client, matchTimeout, connectTimeout time.Duration) {
func createAndWatchTicket(ctx context.Context, omFrontendAddr string, redis rueidis.Client, matchTimeout, connectTimeout time.Duration, logger *slog.Logger) {
frontendClient, closer, err := newOMFrontendClient(omFrontendAddr)
if err != nil {
log.Printf("failed to create frontend client: %+v", err)
logger.Error(fmt.Sprintf("failed to create frontend client: %+v", err), "error", err)
return
}
defer closer.Close()
Expand All @@ -126,15 +130,21 @@ func createAndWatchTicket(ctx context.Context, omFrontendAddr string, redis ruei
SearchFields: &pb.SearchFields{},
}})
if err != nil {
log.Printf("failed to create ticket: %+v", err)
logger.Error(fmt.Sprintf("failed to create ticket: %+v", err), "error", err)
return
}
as, err := watchTickets(ctx, frontendClient, ticket, matchTimeout)
if err != nil && !errors.Is(err, errWatchTicketTimeout) {
log.Printf("failed to watch tickets: %+v", err)
if err != nil {
if !errors.Is(err, errWatchTicketTimeout) {
logger.Error(fmt.Sprintf("failed to watch tickets: %+v", err), "error", err)
}
return
}
if err := waitConnection(ctx, redis, as, ticket, connectTimeout); err != nil && !errors.Is(err, errWatchAssignmentTimeout) {
log.Printf("failed to wait connection: %+v", err)
if err := waitConnection(ctx, redis, as, ticket, connectTimeout); err != nil {
if !errors.Is(err, errWatchAssignmentTimeout) {
logger.Error(fmt.Sprintf("failed to wait connection: %+v", err))
}
return
}
}

Expand All @@ -145,8 +155,7 @@ func watchTickets(ctx context.Context, omFrontend pb.FrontendServiceClient, tick

stream, err := omFrontend.WatchAssignments(ctx, &pb.WatchAssignmentsRequest{TicketId: ticket.Id})
if err != nil {
log.Printf("failed to open watch assignments stream: %+v", err)
return nil, err
return nil, fmt.Errorf("failed to watch assignments: %w", err)
}

respCh := make(chan *pb.Assignment)
Expand All @@ -159,6 +168,9 @@ func watchTickets(ctx context.Context, omFrontend pb.FrontendServiceClient, tick
default:
resp, err := stream.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
return
}
if errors.Is(err, context.Canceled) && ctx.Err() != nil {
return
}
Expand Down Expand Up @@ -264,3 +276,14 @@ func newRedisClient(addr string) (rueidis.Client, error) {
func redisKeyAssignment(as *pb.Assignment) string {
return fmt.Sprintf("attacker:as:%s", as.Connection)
}

func initLogger() *slog.Logger {
_, onK8s := os.LookupEnv("KUBERNETES_SERVICE_HOST")
_, onCloudRun := os.LookupEnv("K_CONFIGURATION")
if onK8s || onCloudRun {
return slogdriver.New(os.Stdout, slogdriver.HandlerOptions{
Level: slogdriver.LevelDefault,
})
}
return slog.Default()
}
63 changes: 43 additions & 20 deletions loadtest/cmd/backend/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"errors"
"fmt"
"log"
"log/slog"
"net/http"
"os"
"os/signal"
Expand All @@ -14,6 +14,7 @@ import (
cache "github.com/Code-Hex/go-generics-cache"
"github.com/bojand/hri"
"github.com/kelseyhightower/envconfig"
"github.com/kitagry/slogdriver"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/redis/rueidis"
"github.com/redis/rueidis/rueidislock"
Expand Down Expand Up @@ -54,46 +55,52 @@ var matchProfile = &pb.MatchProfile{
func main() {
var conf config
envconfig.MustProcess("", &conf)
logger := initLogger()

meterProvider, err := newMeterProvider()
if err != nil {
log.Fatalf("failed to create meter provider: %+v", err)
logger.Error(fmt.Sprintf("failed to create meter provider: %+v", err), "error", err)
os.Exit(1)
}
otel.SetMeterProvider(meterProvider)
startPrometheus()
startPrometheus(logger)

redisStore, err := newRedisStateStore(&conf)
if err != nil {
log.Fatalf("failed to create redis store: %+v", err)
logger.Error(fmt.Sprintf("failed to create redis store: %+v", err), "error", err)
os.Exit(1)
}
ticketCache := cache.New[string, *pb.Ticket]()
store := statestore.NewBackendStoreWithTicketCache(redisStore, ticketCache,
statestore.WithTicketCacheTTL(conf.TicketCacheTTL))
assigner, err := newAssigner(&conf, meterProvider)
backend, err := minimatch.NewBackend(store, assigner, minimatch.WithTicketValidationBeforeAssign(conf.TicketValidationEnabled))
assigner, err := newAssigner(&conf, meterProvider, logger)
backend, err := minimatch.NewBackend(store, assigner,
minimatch.WithTicketValidationBeforeAssign(conf.TicketValidationEnabled),
minimatch.WithBackendLogger(logger))
if err != nil {
log.Fatalf("failed to create backend: %+v", err)
logger.Error(fmt.Sprintf("failed to create backend: %+v", err), "error", err)
os.Exit(1)
}
backend.AddMatchFunction(matchProfile, minimatch.MatchFunctionSimple1vs1)

ctx, shutdown := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer shutdown()
if err := backend.Start(ctx, conf.TickRate); err != nil {
if !errors.Is(err, context.Canceled) {
log.Fatalf("failed to start backend: %+v", err)
logger.Error(fmt.Sprintf("failed to start backend: %+v", err), "error", err)
}
}
}

func newAssigner(conf *config, provider metric.MeterProvider) (minimatch.Assigner, error) {
var assigner minimatch.Assigner = minimatch.AssignerFunc(dummyAssign)
func newAssigner(conf *config, provider metric.MeterProvider, logger *slog.Logger) (minimatch.Assigner, error) {
var assigner minimatch.Assigner = &dummyAssigner{logger: logger}
if conf.OverlappingCheckRedisAddr != "" {
log.Printf("with overlapping match checker (redis: %s)", conf.OverlappingCheckRedisAddr)
logger.Info(fmt.Sprintf("with overlapping match checker (redis: %s)", conf.OverlappingCheckRedisAddr))
rc, err := rueidis.NewClient(rueidis.ClientOption{InitAddress: []string{conf.OverlappingCheckRedisAddr}, DisableCache: true})
if err != nil {
return nil, fmt.Errorf("failed to create redis client: %w", err)
}
as, err := newAssignerWithOverlappingChecker("loadtest:", rc, assigner, provider)
as, err := newAssignerWithOverlappingChecker("loadtest:", rc, assigner, provider, logger)
if err != nil {
return nil, fmt.Errorf("failed to create assigner with overlapping checker: %w", err)
}
Expand Down Expand Up @@ -145,12 +152,16 @@ func newRedisStateStore(conf *config) (statestore.BackendStore, error) {
// Assigner assigns a GameServer to a match.
// For example, it could call Agones' Allocate service.
// For the sake of simplicity, a dummy GameServer name is assigned here.
func dummyAssign(ctx context.Context, matches []*pb.Match) ([]*pb.AssignmentGroup, error) {
type dummyAssigner struct {
logger *slog.Logger
}

func (a *dummyAssigner) Assign(ctx context.Context, matches []*pb.Match) ([]*pb.AssignmentGroup, error) {
var asgs []*pb.AssignmentGroup
for _, match := range matches {
tids := ticketIDs(match)
conn := hri.Random()
log.Printf("assign '%s' to tickets: %v", conn, tids)
a.logger.Debug(fmt.Sprintf("assign '%s' to tickets: %v", conn, tids))
asgs = append(asgs, &pb.AssignmentGroup{
TicketIds: tids,
Assignment: &pb.Assignment{Connection: conn},
Expand Down Expand Up @@ -178,17 +189,17 @@ func newMeterProvider() (metric.MeterProvider, error) {
return provider, nil
}

func startPrometheus() {
func startPrometheus(logger *slog.Logger) {
http.Handle("/metrics", promhttp.Handler())
go func() {
addr := ":2112"
log.Printf("prometheus endpoint (/metrics) is listening on %s...", addr)
logger.Info(fmt.Sprintf("prometheus endpoint (/metrics) is listening on %s...", addr))
server := &http.Server{
Addr: addr,
ReadHeaderTimeout: 10 * time.Second, // https://app.deepsource.com/directory/analyzers/go/issues/GO-S2114
}
if err := server.ListenAndServe(); err != nil {
log.Printf("failed to serve prometheus endpoint: %+v", err)
logger.Error(fmt.Sprintf("failed to serve prometheus endpoint: %+v", err), "error", err)
}
}()
}
Expand All @@ -198,13 +209,13 @@ type assignerWithOverlappingChecker struct {
redisKeyPrefix string
redisClient rueidis.Client
assigner minimatch.Assigner
logger *slog.Logger

validAssigns metric.Int64Counter
overlappingWithin metric.Int64Counter
overlappingInter metric.Int64Counter
}

func newAssignerWithOverlappingChecker(redisKeyPrefix string, redisClient rueidis.Client, assigner minimatch.Assigner, provider metric.MeterProvider) (*assignerWithOverlappingChecker, error) {
func newAssignerWithOverlappingChecker(redisKeyPrefix string, redisClient rueidis.Client, assigner minimatch.Assigner, provider metric.MeterProvider, logger *slog.Logger) (*assignerWithOverlappingChecker, error) {
meter := provider.Meter("github.com/castaneai/minimatch/loadtest")
overlappingWithIn, err := meter.Int64Counter("minimatch.loadtest.overlapping_tickets_within_backend",
metric.WithDescription("Number of times the same Ticket is assigned to multiple Matches within a single backend"))
Expand All @@ -220,6 +231,7 @@ func newAssignerWithOverlappingChecker(redisKeyPrefix string, redisClient rueidi
redisKeyPrefix: redisKeyPrefix,
redisClient: redisClient,
assigner: assigner,
logger: logger,
overlappingWithin: overlappingWithIn,
overlappingInter: overlappingInter,
}, nil
Expand All @@ -245,8 +257,19 @@ func (a *assignerWithOverlappingChecker) Assign(ctx context.Context, matches []*
a.overlappingInter.Add(ctx, 1)
continue
}
log.Printf("failed to check overlapping with redis: %+v", err)
a.logger.Error(fmt.Sprintf("failed to check overlapping with redis: %+v", err), "error", err)
}
}
return a.assigner.Assign(ctx, matches)
}

func initLogger() *slog.Logger {
_, onK8s := os.LookupEnv("KUBERNETES_SERVICE_HOST")
_, onCloudRun := os.LookupEnv("K_CONFIGURATION")
if onK8s || onCloudRun {
return slogdriver.New(os.Stdout, slogdriver.HandlerOptions{
Level: slogdriver.LevelDefault,
})
}
return slog.Default()
}
Loading

0 comments on commit 86de02a

Please sign in to comment.