-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve rpc_soak and channel_soak test to cover concurrency in Go #7926
base: master
Are you sure you want to change the base?
Conversation
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #7926 +/- ##
=======================================
Coverage 82.04% 82.04%
=======================================
Files 377 379 +2
Lines 38180 38261 +81
=======================================
+ Hits 31326 31393 +67
- Misses 5551 5562 +11
- Partials 1303 1306 +3 |
interop/client/client.go
Outdated
@@ -358,10 +359,16 @@ func main() { | |||
interop.DoPickFirstUnary(ctx, tc) | |||
logger.Infoln("PickFirstUnary done") | |||
case "rpc_soak": | |||
interop.DoSoakTest(ctxWithDeadline, tc, serverAddr, opts, false /* resetChannel */, *soakIterations, *soakMaxFailures, *soakRequestSize, *soakResponseSize, time.Duration(*soakPerIterationMaxAcceptableLatencyMs)*time.Millisecond, time.Duration(*soakMinTimeMsBetweenRPCs)*time.Millisecond) | |||
interop.DoSoakTest(ctxWithDeadline, conn, serverAddr, *numThreads, *soakIterations, *soakMaxFailures, *soakRequestSize, *soakResponseSize, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please make a struct for passing all this configuration data (long overdue & not your fault!), instead of having so many parameters, many of which are int
s that one could easily get the order wrong when calling.
interop/test_utils.go
Outdated
soakResponseSize int, | ||
copts []grpc.CallOption) (SoakIterationResult, error) { | ||
start := time.Now() | ||
var err error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unless I'm missing something, this doesn't need to be pre-declared.
interop/test_utils.go
Outdated
mu *sync.Mutex, | ||
sharedChannel *grpc.ClientConn, | ||
threadID int, | ||
MayCreateNewChannel ManagedChannel) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above, but 3x worse. :)
// SoakIterationResult represents the latency and status results of a single iteration in the soak test. | ||
type SoakIterationResult struct { | ||
LatencyMs int64 | ||
Status string // The status of the iteration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this really want to be a string? Or a grpc codes.Code
? Or *status.Status
? Or just error
? We can talk about what makes the most sense if you aren't sure, but I'm doubtful we want a string.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, we have In the merged Java version, we have the status field as a string, where statuses like "OK" are used. To maintain consistency across the implementations, we have kept the status field as a string in this Go version as well. However, I'm open to further discussion on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are we doing with this anyway? I can't see where it's read.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of my comments are actually pretty superficial. Thanks for all the cleanups since the first round!
SoakResponseSize: *soakResponseSize, | ||
PerIterationMaxAcceptableLatency: time.Duration(*soakPerIterationMaxAcceptableLatencyMs) * time.Millisecond, | ||
MinTimeBetweenRPCs: time.Duration(*soakMinTimeMsBetweenRPCs) * time.Millisecond, | ||
OverallTimeoutSeconds: *soakOverallTimeoutSeconds, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be a Duration as well?
logger.Infoln("RpcSoak done") | ||
case "channel_soak": | ||
interop.DoSoakTest(ctxWithDeadline, tc, serverAddr, opts, true /* resetChannel */, *soakIterations, *soakMaxFailures, *soakRequestSize, *soakResponseSize, time.Duration(*soakPerIterationMaxAcceptableLatencyMs)*time.Millisecond, time.Duration(*soakMinTimeMsBetweenRPCs)*time.Millisecond) | ||
channelSoakConfig := interop.SoakTestConfig{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to be almost identical to the above. Can we share some code more for common config?
SharedChannel: conn, | ||
MayCreateNewChannel: func(currentChannel *grpc.ClientConn) (*grpc.ClientConn, testgrpc.TestServiceClient) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feels awkward. How about just a ChannelForTest
function that is a closure returning conn
in the shared case and creates a new connection in the non-shared case. And the closure can return a cleanup function that is a nop in the shared case, or closes the connection in the non-shared case.
// SoakIterationResult represents the latency and status results of a single iteration in the soak test. | ||
type SoakIterationResult struct { | ||
LatencyMs int64 | ||
Status string // The status of the iteration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are we doing with this anyway? I can't see where it's read.
@@ -79,6 +79,7 @@ var ( | |||
soakMinTimeMsBetweenRPCs = flag.Int("soak_min_time_ms_between_rpcs", 0, "The minimum time in milliseconds between consecutive RPCs in a soak test (rpc_soak or channel_soak), useful for limiting QPS") | |||
soakRequestSize = flag.Int("soak_request_size", 271828, "The request size in a soak RPC. The default value is set based on the interop large unary test case.") | |||
soakResponseSize = flag.Int("soak_response_size", 314159, "The response size in a soak RPC. The default value is set based on the interop large unary test case.") | |||
soakNumThreads = flag.Int("soak_num_threads", 1, "The number of threads for concurrent execution of the soak tests (rpc_soak or channel_soak). The default value is set based on the interop large unary test case.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that Go doesn't really concern itself with threads, so this is wrong in the context of Go. You probably can't change this because it has cross-language implications, but this should have been thought out more and reviewed by more folks before implementation started.
|
||
// SoakTestConfig holds the configuration for the entire soak test. | ||
type SoakTestConfig struct { | ||
SoakRequestSize int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This struct also has a lot of redundant Soak
s.
} | ||
h := stats.NewHistogram(hopts) | ||
for i := 0; i < soakIterations; i++ { | ||
func executeSoakTestInThread(ctx context.Context, config SoakTestConfig, startNs int64, threadID int, threadResults *ThreadResults, mu *sync.Mutex) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can at least remove "thread" from any Go code, even if we have to support a flag with that name?
} | ||
earliestNextStart := time.After(minTimeBetweenRPCs) | ||
iterationsDone++ | ||
if time.Since(time.Unix(0, startNs)) >= timeoutDuration { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we pass around startNs as a time.Time instead, please?
} | ||
fmt.Fprintf(os.Stderr, "soak iteration: %d elapsed_ms: %d peer: %s server_uri: %s failed: %s\n", i, latencyMs, addrStr, serverAddr, err) | ||
fmt.Fprintf(os.Stderr, "Thread %d: soak iteration: %d elapsed_ms: %d peer: %v server_uri: %s failed: %s\n", threadID, i, 0, p.Addr, config.ServerAddr, err) | ||
mu.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Each goroutine should have exclusive access to its results struct. So this shouldn't need a lock.
Or just make each instance of this function return a results struct instead of mutating one that's stored elsewhere -- that would likely be a cleaner approach.
RELEASE NOTES:
This PR aims to enhance the test coverage of the C2P E2E load test by improving the rpc_soak and channel_soak tests to support concurrency in Go. The updated logic closely follows the approach used in the Java implementation, which has already been merged.
rpc_soak:
The client performs many large_unary RPCs in sequence over the same channel. The test can run in either a concurrent or non-concurrent mode, depending on the number of threads specified (soak_num_threads).
channel_soak:
Similar to rpc_soak, but this time each RPC is performed on a new channel. The channel is created just before each RPC and is destroyed just after.
Note on Concurrent Execution and Channel Creation:
In a concurrent execution setting (i.e., when soak_num_threads > 1), each thread performs a portion of the total soak_iterations and creates and destroys its own channel for each RPC iteration.
Thread-specific logs will include the thread_id, helping to track performance across threads, especially when each thread is managing its own channel lifecycle.
@townba
@dfawley