diff --git a/interop/client/client.go b/interop/client/client.go index f50139fe6dbb..b088451cd203 100644 --- a/interop/client/client.go +++ b/interop/client/client.go @@ -28,6 +28,7 @@ import ( "crypto/tls" "crypto/x509" "flag" + "log" "net" "os" "strconv" @@ -79,6 +80,7 @@ var ( soakMinTimeMsBetweenRPCs = flag.Int("soak_min_time_ms_between_rpcs", 0, "The minimum time in milliseconds between consecutive RPCs in a soak test (rpc_soak or channel_soak), useful for limiting QPS") soakRequestSize = flag.Int("soak_request_size", 271828, "The request size in a soak RPC. The default value is set based on the interop large unary test case.") soakResponseSize = flag.Int("soak_response_size", 314159, "The response size in a soak RPC. The default value is set based on the interop large unary test case.") + soakNumThreads = flag.Int("soak_num_threads", 1, "The number of threads for concurrent execution of the soak tests (rpc_soak or channel_soak). The default value is set based on the interop large unary test case.") tlsServerName = flag.String("server_host_override", "", "The server name used to verify the hostname returned by TLS handshake if it is not empty. Otherwise, --server_host is used.") additionalMetadata = flag.String("additional_metadata", "", "Additional metadata to send in each request, as a semicolon-separated list of key:value pairs.") testCase = flag.String("test_case", "large_unary", @@ -149,6 +151,21 @@ func parseAdditionalMetadataFlag() []string { return addMd } +// createSoakTestConfig creates a shared configuration structure for soak tests. +func createBaseSoakConfig(serverAddr string) interop.SoakTestConfig { + return interop.SoakTestConfig{ + RequestSize: *soakRequestSize, + ResponseSize: *soakResponseSize, + PerIterationMaxAcceptableLatency: time.Duration(*soakPerIterationMaxAcceptableLatencyMs) * time.Millisecond, + MinTimeBetweenRPCs: time.Duration(*soakMinTimeMsBetweenRPCs) * time.Millisecond, + OverallTimeout: time.Duration(*soakOverallTimeoutSeconds) * time.Second, + ServerAddr: serverAddr, + NumWorkers: *soakNumThreads, + Iterations: *soakIterations, + MaxFailures: *soakMaxFailures, + } +} + func main() { flag.Parse() logger.Infof("Client running with test case %q", *testCase) @@ -261,7 +278,7 @@ func main() { } opts = append(opts, grpc.WithUnaryInterceptor(unaryAddMd), grpc.WithStreamInterceptor(streamingAddMd)) } - conn, err := grpc.Dial(serverAddr, opts...) + conn, err := grpc.NewClient(serverAddr, opts...) if err != nil { logger.Fatalf("Fail to dial: %v", err) } @@ -358,10 +375,20 @@ func main() { interop.DoPickFirstUnary(ctx, tc) logger.Infoln("PickFirstUnary done") case "rpc_soak": - interop.DoSoakTest(ctxWithDeadline, tc, serverAddr, opts, false /* resetChannel */, *soakIterations, *soakMaxFailures, *soakRequestSize, *soakResponseSize, time.Duration(*soakPerIterationMaxAcceptableLatencyMs)*time.Millisecond, time.Duration(*soakMinTimeMsBetweenRPCs)*time.Millisecond) + rpcSoakConfig := createBaseSoakConfig(serverAddr) + rpcSoakConfig.ChannelForTest = func() (*grpc.ClientConn, func()) { return conn, func() {} } + interop.DoSoakTest(ctxWithDeadline, rpcSoakConfig) logger.Infoln("RpcSoak done") case "channel_soak": - interop.DoSoakTest(ctxWithDeadline, tc, serverAddr, opts, true /* resetChannel */, *soakIterations, *soakMaxFailures, *soakRequestSize, *soakResponseSize, time.Duration(*soakPerIterationMaxAcceptableLatencyMs)*time.Millisecond, time.Duration(*soakMinTimeMsBetweenRPCs)*time.Millisecond) + channelSoakConfig := createBaseSoakConfig(serverAddr) + channelSoakConfig.ChannelForTest = func() (*grpc.ClientConn, func()) { + cc, err := grpc.NewClient(serverAddr, opts...) + if err != nil { + log.Fatalf("Failed to create shared channel: %v", err) + } + return cc, func() { cc.Close() } + } + interop.DoSoakTest(ctxWithDeadline, channelSoakConfig) logger.Infoln("ChannelSoak done") case "orca_per_rpc": interop.DoORCAPerRPCTest(ctx, tc) diff --git a/interop/soak_tests.go b/interop/soak_tests.go new file mode 100644 index 000000000000..b667d3678a80 --- /dev/null +++ b/interop/soak_tests.go @@ -0,0 +1,193 @@ +/* + * + * Copyright 2014 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package interop + +import ( + "bytes" + "context" + "fmt" + "os" + "sync" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/benchmark/stats" + "google.golang.org/grpc/peer" + + testgrpc "google.golang.org/grpc/interop/grpc_testing" + testpb "google.golang.org/grpc/interop/grpc_testing" +) + +// SoakWorkerResults stores the aggregated results for a specific worker during the soak test. +type SoakWorkerResults struct { + IterationsDone int + Failures int + Latencies *stats.Histogram +} + +// SoakIterationConfig holds the parameters required for a single soak iteration. +type SoakIterationConfig struct { + RequestSize int // The size of the request payload in bytes. + ResponseSize int // The expected size of the response payload in bytes. + Client testgrpc.TestServiceClient // The gRPC client to make the call. + CallOptions []grpc.CallOption // Call options for the RPC. +} + +// SoakTestConfig holds the configuration for the entire soak test. +type SoakTestConfig struct { + RequestSize int + ResponseSize int + PerIterationMaxAcceptableLatency time.Duration + MinTimeBetweenRPCs time.Duration + OverallTimeout time.Duration + ServerAddr string + NumWorkers int + Iterations int + MaxFailures int + ChannelForTest func() (*grpc.ClientConn, func()) +} + +func doOneSoakIteration(ctx context.Context, config SoakIterationConfig) (latency time.Duration, err error) { + start := time.Now() + // Do a large-unary RPC. + // Create the request payload. + pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, config.RequestSize) + req := &testpb.SimpleRequest{ + ResponseType: testpb.PayloadType_COMPRESSABLE, + ResponseSize: int32(config.ResponseSize), + Payload: pl, + } + // Perform the GRPC call. + var reply *testpb.SimpleResponse + reply, err = config.Client.UnaryCall(ctx, req, config.CallOptions...) + if err != nil { + err = fmt.Errorf("/TestService/UnaryCall RPC failed: %s", err) + return 0, err + } + // Validate response. + t := reply.GetPayload().GetType() + s := len(reply.GetPayload().GetBody()) + if t != testpb.PayloadType_COMPRESSABLE || s != config.ResponseSize { + err = fmt.Errorf("got the reply with type %d len %d; want %d, %d", t, s, testpb.PayloadType_COMPRESSABLE, config.ResponseSize) + return 0, err + } + // Calculate latency and return result. + latency = time.Since(start) + return latency, nil +} + +func executeSoakTestInWorker(ctx context.Context, config SoakTestConfig, startTime time.Time, workerID int, soakWorkerResults *SoakWorkerResults) { + timeoutDuration := config.OverallTimeout + soakIterationsPerWorker := config.Iterations / config.NumWorkers + for i := 0; i < soakIterationsPerWorker; i++ { + if ctx.Err() != nil { + return + } + if time.Since(startTime) >= timeoutDuration { + fmt.Printf("Test exceeded overall timeout of %v, stopping...\n", config.OverallTimeout) + return + } + earliestNextStart := time.After(config.MinTimeBetweenRPCs) + currentChannel, cleanup := config.ChannelForTest() + defer cleanup() + client := testgrpc.NewTestServiceClient(currentChannel) + var p peer.Peer + iterationConfig := SoakIterationConfig{ + RequestSize: config.RequestSize, + ResponseSize: config.ResponseSize, + Client: client, + CallOptions: []grpc.CallOption{grpc.Peer(&p)}, + } + latency, err := doOneSoakIteration(ctx, iterationConfig) + if err != nil { + fmt.Fprintf(os.Stderr, "Worker %d: soak iteration: %d elapsed_ms: %d peer: %v server_uri: %s failed: %s\n", workerID, i, 0, p.Addr, config.ServerAddr, err) + soakWorkerResults.Failures++ + <-earliestNextStart + continue + } + if latency > config.PerIterationMaxAcceptableLatency { + fmt.Fprintf(os.Stderr, "Worker %d: soak iteration: %d elapsed_ms: %d peer: %v server_uri: %s exceeds max acceptable latency: %d\n", workerID, i, latency, p.Addr, config.ServerAddr, config.PerIterationMaxAcceptableLatency.Milliseconds()) + soakWorkerResults.Failures++ + <-earliestNextStart + continue + } + // Success: log the details of the iteration. + soakWorkerResults.Latencies.Add(latency.Milliseconds()) + soakWorkerResults.IterationsDone++ + fmt.Fprintf(os.Stderr, "Worker %d: soak iteration: %d elapsed_ms: %d peer: %v server_uri: %s succeeded\n", workerID, i, latency, p.Addr, config.ServerAddr) + <-earliestNextStart + } +} + +// DoSoakTest runs large unary RPCs in a loop for a configurable number of times, with configurable failure thresholds. +// If resetChannel is false, then each RPC will be performed on tc. Otherwise, each RPC will be performed on a new +// stub that is created with the provided server address and dial options. +// TODO(mohanli-ml): Create SoakTestOptions as a parameter for this method. +func DoSoakTest(ctx context.Context, soakConfig SoakTestConfig) { + if soakConfig.Iterations%soakConfig.NumWorkers != 0 { + fmt.Fprintf(os.Stderr, "soakIterations must be evenly divisible by soakNumWThreads\n") + } + startTime := time.Now() + var wg sync.WaitGroup + soakWorkerResults := make([]SoakWorkerResults, soakConfig.NumWorkers) + for i := 0; i < soakConfig.NumWorkers; i++ { + wg.Add(1) + go func(workerID int) { + defer wg.Done() + executeSoakTestInWorker(ctx, soakConfig, startTime, workerID, &soakWorkerResults[workerID]) + }(i) + } + // Wait for all goroutines to complete. + wg.Wait() + + //Handle results. + totalIterations := 0 + totalFailures := 0 + latencies := stats.NewHistogram(stats.HistogramOptions{ + NumBuckets: 20, + GrowthFactor: 1, + BaseBucketSize: 1, + MinValue: 0, + }) + for _, worker := range soakWorkerResults { + totalIterations += worker.IterationsDone + totalFailures += worker.Failures + if worker.Latencies != nil { + // Add latencies from the worker's Histogram to the main latencies. + latencies.Merge(worker.Latencies) + } + } + var b bytes.Buffer + latencies.Print(&b) + fmt.Fprintf(os.Stderr, + "(server_uri: %s) soak test ran: %d / %d iterations. Total failures: %d. Latencies in milliseconds: %s\n", + soakConfig.ServerAddr, totalIterations, soakConfig.Iterations, totalFailures, b.String()) + + if totalIterations != soakConfig.Iterations { + fmt.Fprintf(os.Stderr, "Soak test consumed all %v of time and quit early, ran %d out of %d iterations.\n", soakConfig.OverallTimeout, totalIterations, soakConfig.Iterations) + } + + if totalFailures > soakConfig.MaxFailures { + fmt.Fprintf(os.Stderr, "Soak test total failures: %d exceeded max failures threshold: %d\n", totalFailures, soakConfig.MaxFailures) + } + if soakConfig.ChannelForTest != nil { + _, cleanup := soakConfig.ChannelForTest() + defer cleanup() + } +} diff --git a/interop/test_utils.go b/interop/test_utils.go index 71d0b0f060be..3aa7bd307a9b 100644 --- a/interop/test_utils.go +++ b/interop/test_utils.go @@ -24,7 +24,6 @@ package interop import ( - "bytes" "context" "fmt" "io" @@ -36,12 +35,10 @@ import ( "golang.org/x/oauth2" "golang.org/x/oauth2/google" "google.golang.org/grpc" - "google.golang.org/grpc/benchmark/stats" "google.golang.org/grpc/codes" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/metadata" "google.golang.org/grpc/orca" - "google.golang.org/grpc/peer" "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" @@ -684,100 +681,6 @@ func DoPickFirstUnary(ctx context.Context, tc testgrpc.TestServiceClient) { } } -func doOneSoakIteration(ctx context.Context, tc testgrpc.TestServiceClient, resetChannel bool, serverAddr string, soakRequestSize int, soakResponseSize int, dopts []grpc.DialOption, copts []grpc.CallOption) (latency time.Duration, err error) { - start := time.Now() - client := tc - if resetChannel { - var conn *grpc.ClientConn - conn, err = grpc.Dial(serverAddr, dopts...) - if err != nil { - return - } - defer conn.Close() - client = testgrpc.NewTestServiceClient(conn) - } - // per test spec, don't include channel shutdown in latency measurement - defer func() { latency = time.Since(start) }() - // do a large-unary RPC - pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, soakRequestSize) - req := &testpb.SimpleRequest{ - ResponseType: testpb.PayloadType_COMPRESSABLE, - ResponseSize: int32(soakResponseSize), - Payload: pl, - } - var reply *testpb.SimpleResponse - reply, err = client.UnaryCall(ctx, req, copts...) - if err != nil { - err = fmt.Errorf("/TestService/UnaryCall RPC failed: %s", err) - return - } - t := reply.GetPayload().GetType() - s := len(reply.GetPayload().GetBody()) - if t != testpb.PayloadType_COMPRESSABLE || s != soakResponseSize { - err = fmt.Errorf("got the reply with type %d len %d; want %d, %d", t, s, testpb.PayloadType_COMPRESSABLE, soakResponseSize) - return - } - return -} - -// DoSoakTest runs large unary RPCs in a loop for a configurable number of times, with configurable failure thresholds. -// If resetChannel is false, then each RPC will be performed on tc. Otherwise, each RPC will be performed on a new -// stub that is created with the provided server address and dial options. -// TODO(mohanli-ml): Create SoakTestOptions as a parameter for this method. -func DoSoakTest(ctx context.Context, tc testgrpc.TestServiceClient, serverAddr string, dopts []grpc.DialOption, resetChannel bool, soakIterations int, maxFailures int, soakRequestSize int, soakResponseSize int, perIterationMaxAcceptableLatency time.Duration, minTimeBetweenRPCs time.Duration) { - start := time.Now() - var elapsedTime float64 - iterationsDone := 0 - totalFailures := 0 - hopts := stats.HistogramOptions{ - NumBuckets: 20, - GrowthFactor: 1, - BaseBucketSize: 1, - MinValue: 0, - } - h := stats.NewHistogram(hopts) - for i := 0; i < soakIterations; i++ { - if ctx.Err() != nil { - elapsedTime = time.Since(start).Seconds() - break - } - earliestNextStart := time.After(minTimeBetweenRPCs) - iterationsDone++ - var p peer.Peer - latency, err := doOneSoakIteration(ctx, tc, resetChannel, serverAddr, soakRequestSize, soakResponseSize, dopts, []grpc.CallOption{grpc.Peer(&p)}) - latencyMs := int64(latency / time.Millisecond) - h.Add(latencyMs) - if err != nil { - totalFailures++ - addrStr := "nil" - if p.Addr != nil { - addrStr = p.Addr.String() - } - fmt.Fprintf(os.Stderr, "soak iteration: %d elapsed_ms: %d peer: %s server_uri: %s failed: %s\n", i, latencyMs, addrStr, serverAddr, err) - <-earliestNextStart - continue - } - if latency > perIterationMaxAcceptableLatency { - totalFailures++ - fmt.Fprintf(os.Stderr, "soak iteration: %d elapsed_ms: %d peer: %s server_uri: %s exceeds max acceptable latency: %d\n", i, latencyMs, p.Addr.String(), serverAddr, perIterationMaxAcceptableLatency.Milliseconds()) - <-earliestNextStart - continue - } - fmt.Fprintf(os.Stderr, "soak iteration: %d elapsed_ms: %d peer: %s server_uri: %s succeeded\n", i, latencyMs, p.Addr.String(), serverAddr) - <-earliestNextStart - } - var b bytes.Buffer - h.Print(&b) - fmt.Fprintf(os.Stderr, "(server_uri: %s) histogram of per-iteration latencies in milliseconds: %s\n", serverAddr, b.String()) - fmt.Fprintf(os.Stderr, "(server_uri: %s) soak test ran: %d / %d iterations. total failures: %d. max failures threshold: %d. See breakdown above for which iterations succeeded, failed, and why for more info.\n", serverAddr, iterationsDone, soakIterations, totalFailures, maxFailures) - if iterationsDone < soakIterations { - logger.Fatalf("(server_uri: %s) soak test consumed all %f seconds of time and quit early, only having ran %d out of desired %d iterations.", serverAddr, elapsedTime, iterationsDone, soakIterations) - } - if totalFailures > maxFailures { - logger.Fatalf("(server_uri: %s) soak test total failures: %d exceeds max failures threshold: %d.", serverAddr, totalFailures, maxFailures) - } -} - type testServer struct { testgrpc.UnimplementedTestServiceServer diff --git a/interop/xds_federation/client.go b/interop/xds_federation/client.go index 56572e4a35c3..1d210369f089 100644 --- a/interop/xds_federation/client.go +++ b/interop/xds_federation/client.go @@ -22,6 +22,7 @@ package main import ( "context" "flag" + "log" "strings" "sync" "time" @@ -54,6 +55,7 @@ var ( soakMinTimeMsBetweenRPCs = flag.Int("soak_min_time_ms_between_rpcs", 0, "The minimum time in milliseconds between consecutive RPCs in a soak test (rpc_soak or channel_soak), useful for limiting QPS") soakRequestSize = flag.Int("soak_request_size", 271828, "The request size in a soak RPC. The default value is set based on the interop large unary test case.") soakResponseSize = flag.Int("soak_response_size", 314159, "The response size in a soak RPC. The default value is set based on the interop large unary test case.") + soakNumThreads = flag.Int("soak_num_threads", 1, "The number of threads for concurrent execution of the soak tests (rpc_soak or channel_soak). The default value is set based on the interop large unary test case.") testCase = flag.String("test_case", "rpc_soak", `Configure different test cases. Valid options are: rpc_soak: sends --soak_iterations large_unary RPCs; @@ -63,6 +65,7 @@ var ( ) type clientConfig struct { + conn *grpc.ClientConn tc testgrpc.TestServiceClient opts []grpc.DialOption uri string @@ -81,17 +84,6 @@ func main() { logger.Fatalf("Unsupported credentials type: %v", c) } } - var resetChannel bool - switch *testCase { - case "rpc_soak": - resetChannel = false - case "channel_soak": - resetChannel = true - default: - logger.Fatal("Unsupported test case: ", *testCase) - } - - // create clients as specified in flags var clients []clientConfig for i := range uris { var opts []grpc.DialOption @@ -101,12 +93,13 @@ func main() { case insecureCredsName: opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) } - cc, err := grpc.Dial(uris[i], opts...) + cc, err := grpc.NewClient(uris[i], opts...) if err != nil { logger.Fatalf("Fail to dial %v: %v", uris[i], err) } defer cc.Close() clients = append(clients, clientConfig{ + conn: cc, tc: testgrpc.NewTestServiceClient(cc), opts: opts, uri: uris[i], @@ -116,13 +109,40 @@ func main() { // run soak tests with the different clients logger.Infof("Clients running with test case %q", *testCase) var wg sync.WaitGroup + var channelForTest func() (*grpc.ClientConn, func()) ctx := context.Background() for i := range clients { wg.Add(1) go func(c clientConfig) { ctxWithDeadline, cancel := context.WithTimeout(ctx, time.Duration(*soakOverallTimeoutSeconds)*time.Second) defer cancel() - interop.DoSoakTest(ctxWithDeadline, c.tc, c.uri, c.opts, resetChannel, *soakIterations, *soakMaxFailures, *soakRequestSize, *soakResponseSize, time.Duration(*soakPerIterationMaxAcceptableLatencyMs)*time.Millisecond, time.Duration(*soakMinTimeMsBetweenRPCs)*time.Millisecond) + switch *testCase { + case "rpc_soak": + channelForTest = func() (*grpc.ClientConn, func()) { return c.conn, func() {} } + case "channel_soak": + channelForTest = func() (*grpc.ClientConn, func()) { + cc, err := grpc.NewClient(c.uri, c.opts...) + if err != nil { + log.Fatalf("Failed to create shared channel: %v", err) + } + return cc, func() { cc.Close() } + } + default: + logger.Fatal("Unsupported test case: ", *testCase) + } + soakConfig := interop.SoakTestConfig{ + RequestSize: *soakRequestSize, + ResponseSize: *soakResponseSize, + PerIterationMaxAcceptableLatency: time.Duration(*soakPerIterationMaxAcceptableLatencyMs) * time.Millisecond, + MinTimeBetweenRPCs: time.Duration(*soakMinTimeMsBetweenRPCs) * time.Millisecond, + OverallTimeout: time.Duration(*soakOverallTimeoutSeconds) * time.Second, + ServerAddr: c.uri, + NumWorkers: *soakNumThreads, + Iterations: *soakIterations, + MaxFailures: *soakMaxFailures, + ChannelForTest: channelForTest, + } + interop.DoSoakTest(ctxWithDeadline, soakConfig) logger.Infof("%s test done for server: %s", *testCase, c.uri) wg.Done() }(clients[i]) diff --git a/scripts/vet.sh b/scripts/vet.sh index 4867e8c677b6..98e15998ad00 100755 --- a/scripts/vet.sh +++ b/scripts/vet.sh @@ -49,7 +49,7 @@ git grep 'func [A-Z]' -- "*_test.go" | not grep -v 'func Test\|Benchmark\|Exampl # - Do not use time.After except in tests. It has the potential to leak the # timer since there is no way to stop it early. -git grep -l 'time.After(' -- "*.go" | not grep -v '_test.go\|test_utils\|testutils' +git grep -l 'time.After(' -- "*.go" | not grep -v '_test.go\|soak_tests\|testutils' # - Do not use "interface{}"; use "any" instead. git grep -l 'interface{}' -- "*.go" 2>&1 | not grep -v '\.pb\.go\|protoc-gen-go-grpc\|grpc_testing_not_regenerated'