Skip to content
This repository was archived by the owner on Nov 5, 2021. It is now read-only.

Commit 5d8563a

Browse files
committed
[HTTP Probe] Add support for running multiple HTTP requests in parallel.
Github issue: #319 PiperOrigin-RevId: 281802925
1 parent c7d43b3 commit 5d8563a

File tree

5 files changed

+147
-82
lines changed

5 files changed

+147
-82
lines changed

probes/http/http.go

Lines changed: 25 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ type Probe struct {
5252
// book-keeping params
5353
targets []endpoint.Endpoint
5454
httpRequests map[string]*http.Request
55-
results map[string]*result
55+
results map[string]*probeResult
5656
protocol string
5757
method string
5858
url string
@@ -72,7 +72,7 @@ type Probe struct {
7272
statsExportFrequency int64
7373
}
7474

75-
type result struct {
75+
type probeResult struct {
7676
total, success, timeouts int64
7777
latency metrics.Value
7878
respCodes *metrics.Map
@@ -101,18 +101,13 @@ func (p *Probe) Init(name string, opts *options.Options) error {
101101
return fmt.Errorf("Invalid Relative URL: %s, must begin with '/'", p.url)
102102
}
103103

104-
if p.c.GetRequestsPerProbe() != 1 {
105-
p.l.Warningf("requests_per_probe field is now deprecated and will be removed in future releases.")
106-
}
107-
108104
// Create a transport for our use. This is mostly based on
109105
// http.DefaultTransport with some timeouts changed.
110106
// TODO(manugarg): Considering cloning DefaultTransport once
111107
// https://github.com/golang/go/issues/26013 is fixed.
112108
dialer := &net.Dialer{
113109
Timeout: p.opts.Timeout,
114110
KeepAlive: 30 * time.Second, // TCP keep-alive
115-
DualStack: true,
116111
}
117112

118113
if p.opts.SourceIP != nil {
@@ -182,13 +177,18 @@ func isClientTimeout(err error) bool {
182177
}
183178

184179
// httpRequest executes an HTTP request and updates the provided result struct.
185-
func (p *Probe) doHTTPRequest(req *http.Request, result *result) {
180+
func (p *Probe) doHTTPRequest(req *http.Request, result *probeResult, resultMu *sync.Mutex) {
186181
start := time.Now()
187-
result.total++
188182

189183
resp, err := p.client.Do(req)
190184
latency := time.Since(start)
191185

186+
// Note that we take lock on result object outside of the actual request.
187+
resultMu.Lock()
188+
defer resultMu.Unlock()
189+
190+
result.total++
191+
192192
if err != nil {
193193
if isClientTimeout(err) {
194194
p.l.Warning("Target:", req.Host, ", URL:", req.URL.String(), ", http.doHTTPRequest: timeout error: ", err.Error())
@@ -237,7 +237,7 @@ func (p *Probe) updateTargets() {
237237
}
238238

239239
if p.results == nil {
240-
p.results = make(map[string]*result, len(p.targets))
240+
p.results = make(map[string]*probeResult, len(p.targets))
241241
}
242242

243243
for _, target := range p.targets {
@@ -259,7 +259,7 @@ func (p *Probe) updateTargets() {
259259
} else {
260260
latencyValue = metrics.NewFloat(0)
261261
}
262-
p.results[target.Name] = &result{
262+
p.results[target.Name] = &probeResult{
263263
latency: latencyValue,
264264
respCodes: metrics.NewMap("code", metrics.NewInt(0)),
265265
respBodies: metrics.NewMap("resp", metrics.NewInt(0)),
@@ -275,28 +275,26 @@ func (p *Probe) runProbe(ctx context.Context) {
275275

276276
wg := sync.WaitGroup{}
277277
for _, target := range p.targets {
278-
req := p.httpRequests[target.Name]
278+
req, result := p.httpRequests[target.Name], p.results[target.Name]
279279
if req == nil {
280280
continue
281281
}
282282

283-
wg.Add(1)
283+
// We launch a separate goroutine for each HTTP request. Since there can be
284+
// multiple requests per probe per target, we use a mutex to protect access
285+
// to per-target result object in doHTTPRequest. Note that result object is
286+
// not accessed concurrently anywhere else -- export of the metrics happens
287+
// when probe is not running.
288+
var resultMu sync.Mutex
284289

285-
// Launch a separate goroutine for each target.
286-
go func(target string, req *http.Request) {
287-
defer wg.Done()
288-
numRequests := int32(0)
289-
for {
290-
p.doHTTPRequest(req.WithContext(reqCtx), p.results[target])
290+
for numReq := int32(0); numReq < p.c.GetRequestsPerProbe(); numReq++ {
291+
wg.Add(1)
291292

292-
numRequests++
293-
if numRequests >= p.c.GetRequestsPerProbe() {
294-
break
295-
}
296-
// Sleep for requests_interval_msec before continuing.
297-
time.Sleep(time.Duration(p.c.GetRequestsIntervalMsec()) * time.Millisecond)
298-
}
299-
}(target.Name, req)
293+
go func(req *http.Request, result *probeResult) {
294+
defer wg.Done()
295+
p.doHTTPRequest(req.WithContext(reqCtx), result, &resultMu)
296+
}(req, result)
297+
}
300298
}
301299

302300
// Wait until all probes are done.

probes/http/http_test.go

Lines changed: 102 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,12 @@ package http
1717
import (
1818
"bytes"
1919
"context"
20+
"errors"
2021
"fmt"
22+
"io"
2123
"io/ioutil"
2224
"net/http"
25+
"strings"
2326
"testing"
2427
"time"
2528

@@ -32,7 +35,9 @@ import (
3235

3336
// The Transport is mocked instead of the Client because Client is not an
3437
// interface, but RoundTripper (which Transport implements) is.
35-
type testTransport struct{}
38+
type testTransport struct {
39+
noBody io.ReadCloser
40+
}
3641

3742
func newTestTransport() *testTransport {
3843
return &testTransport{}
@@ -46,11 +51,20 @@ type testReadCloser struct {
4651
func (trc *testReadCloser) Read(p []byte) (n int, err error) {
4752
return trc.b.Read(p)
4853
}
54+
4955
func (trc *testReadCloser) Close() error {
5056
return nil
5157
}
5258

5359
func (tt *testTransport) RoundTrip(req *http.Request) (*http.Response, error) {
60+
if req.URL.Host == "fail-test.com" {
61+
return nil, errors.New("failing for fail-target.com")
62+
}
63+
64+
if req.Body == nil {
65+
return &http.Response{Body: http.NoBody}, nil
66+
}
67+
5468
b, err := ioutil.ReadAll(req.Body)
5569
if err != nil {
5670
return nil, err
@@ -66,7 +80,7 @@ func (tt *testTransport) RoundTrip(req *http.Request) (*http.Response, error) {
6680

6781
func (tt *testTransport) CancelRequest(req *http.Request) {}
6882

69-
func testProbe(opts *options.Options) ([]*result, error) {
83+
func testProbe(opts *options.Options) ([]*probeResult, error) {
7084
p := &Probe{}
7185
err := p.Init("http_test", opts)
7286
if err != nil {
@@ -76,23 +90,16 @@ func testProbe(opts *options.Options) ([]*result, error) {
7690

7791
p.runProbe(context.Background())
7892

79-
var results []*result
93+
var results []*probeResult
8094
for _, target := range p.targets {
8195
results = append(results, p.results[target.Name])
8296
}
8397
return results, nil
8498
}
8599

86-
func TestRun(t *testing.T) {
87-
methods := []configpb.ProbeConf_Method{
88-
configpb.ProbeConf_GET,
89-
configpb.ProbeConf_POST,
90-
configpb.ProbeConf_PUT,
91-
configpb.ProbeConf_HEAD,
92-
configpb.ProbeConf_DELETE,
93-
configpb.ProbeConf_PATCH,
94-
configpb.ProbeConf_OPTIONS,
95-
100, // Should default to configpb.ProbeConf_GET
100+
func TestProbeVariousMethods(t *testing.T) {
101+
mpb := func(s string) *configpb.ProbeConf_Method {
102+
return configpb.ProbeConf_Method(configpb.ProbeConf_Method_value[s]).Enum()
96103
}
97104

98105
testBody := "Test HTTP Body"
@@ -105,39 +112,43 @@ func TestRun(t *testing.T) {
105112
{&configpb.ProbeConf{}, "total: 1, success: 1"},
106113
{&configpb.ProbeConf{Protocol: configpb.ProbeConf_HTTPS.Enum()}, "total: 1, success: 1"},
107114
{&configpb.ProbeConf{RequestsPerProbe: proto.Int32(1)}, "total: 1, success: 1"},
108-
{&configpb.ProbeConf{Method: &methods[0]}, "total: 1, success: 1"},
109-
{&configpb.ProbeConf{Method: &methods[1]}, "total: 1, success: 1"},
110-
{&configpb.ProbeConf{Method: &methods[1], Body: &testBody}, "total: 1, success: 1"},
111-
{&configpb.ProbeConf{Method: &methods[2]}, "total: 1, success: 1"},
112-
{&configpb.ProbeConf{Method: &methods[2], Body: &testBody}, "total: 1, success: 1"},
113-
{&configpb.ProbeConf{Method: &methods[3]}, "total: 1, success: 1"},
114-
{&configpb.ProbeConf{Method: &methods[4]}, "total: 1, success: 1"},
115-
{&configpb.ProbeConf{Method: &methods[5]}, "total: 1, success: 1"},
116-
{&configpb.ProbeConf{Method: &methods[6]}, "total: 1, success: 1"},
117-
{&configpb.ProbeConf{Method: &methods[7]}, "total: 1, success: 1"},
115+
{&configpb.ProbeConf{RequestsPerProbe: proto.Int32(4)}, "total: 4, success: 4"},
116+
{&configpb.ProbeConf{Method: mpb("GET")}, "total: 1, success: 1"},
117+
{&configpb.ProbeConf{Method: mpb("POST")}, "total: 1, success: 1"},
118+
{&configpb.ProbeConf{Method: mpb("POST"), Body: &testBody}, "total: 1, success: 1"},
119+
{&configpb.ProbeConf{Method: mpb("PUT")}, "total: 1, success: 1"},
120+
{&configpb.ProbeConf{Method: mpb("PUT"), Body: &testBody}, "total: 1, success: 1"},
121+
{&configpb.ProbeConf{Method: mpb("HEAD")}, "total: 1, success: 1"},
122+
{&configpb.ProbeConf{Method: mpb("DELETE")}, "total: 1, success: 1"},
123+
{&configpb.ProbeConf{Method: mpb("PATCH")}, "total: 1, success: 1"},
124+
{&configpb.ProbeConf{Method: mpb("OPTIONS")}, "total: 1, success: 1"},
118125
{&configpb.ProbeConf{Headers: []*configpb.ProbeConf_Header{{Name: &testHeaderName, Value: &testHeaderValue}}}, "total: 1, success: 1"},
119126
}
120127

121-
for _, test := range tests {
122-
opts := &options.Options{
123-
Targets: targets.StaticTargets("test.com"),
124-
Interval: 2 * time.Second,
125-
Timeout: time.Second,
126-
ProbeConf: test.input,
127-
}
128-
results, err := testProbe(opts)
129-
if err != nil {
130-
if fmt.Sprintf("error: '%s'", err.Error()) != test.want {
131-
t.Errorf("Unexpected initialization error: %v", err)
128+
for i, test := range tests {
129+
t.Run(fmt.Sprintf("Test_case(%d)_config(%v)", i, test.input), func(t *testing.T) {
130+
opts := &options.Options{
131+
Targets: targets.StaticTargets("test.com"),
132+
Interval: 2 * time.Second,
133+
Timeout: time.Second,
134+
ProbeConf: test.input,
135+
}
136+
137+
results, err := testProbe(opts)
138+
if err != nil {
139+
if fmt.Sprintf("error: '%s'", err.Error()) != test.want {
140+
t.Errorf("Unexpected initialization error: %v", err)
141+
}
142+
return
132143
}
133-
} else {
144+
134145
for _, result := range results {
135146
got := fmt.Sprintf("total: %d, success: %d", result.total, result.success)
136147
if got != test.want {
137148
t.Errorf("Mismatch got '%s', want '%s'", got, test.want)
138149
}
139150
}
140-
}
151+
})
141152
}
142153
}
143154

@@ -181,3 +192,58 @@ func TestProbeWithBody(t *testing.T) {
181192
t.Errorf("response map: got=%s, expected=%s", got, expected)
182193
}
183194
}
195+
196+
func TestMultipleTargetsMultipleRequests(t *testing.T) {
197+
testTargets := []string{"test.com", "fail-test.com"}
198+
reqPerProbe := int64(6)
199+
opts := &options.Options{
200+
Targets: targets.StaticTargets(strings.Join(testTargets, ",")),
201+
Interval: 10 * time.Millisecond,
202+
ProbeConf: &configpb.ProbeConf{RequestsPerProbe: proto.Int32(int32(reqPerProbe))},
203+
}
204+
205+
p := &Probe{}
206+
err := p.Init("http_test", opts)
207+
if err != nil {
208+
t.Errorf("Unexpected error: %v", err)
209+
return
210+
}
211+
p.client.Transport = newTestTransport()
212+
213+
// Verify that Init() created result struct for each target.
214+
for _, tgt := range testTargets {
215+
if _, ok := p.results[tgt]; !ok {
216+
t.Errorf("didn't find results for the target: %s", tgt)
217+
}
218+
}
219+
220+
p.runProbe(context.Background())
221+
222+
wantSuccess := map[string]int64{
223+
"test.com": reqPerProbe,
224+
"fail-test.com": 0, // Test transport is configured to fail this.
225+
}
226+
227+
for _, tgt := range testTargets {
228+
if p.results[tgt].total != reqPerProbe {
229+
t.Errorf("For target %s, total=%d, want=%d", tgt, p.results[tgt].total, reqPerProbe)
230+
}
231+
if p.results[tgt].success != wantSuccess[tgt] {
232+
t.Errorf("For target %s, success=%d, want=%d", tgt, p.results[tgt].success, wantSuccess[tgt])
233+
}
234+
}
235+
236+
// Run again
237+
p.runProbe(context.Background())
238+
239+
wantSuccess["test.com"] += reqPerProbe
240+
241+
for _, tgt := range testTargets {
242+
if p.results[tgt].total != 2*reqPerProbe {
243+
t.Errorf("For target %s, total=%d, want=%d", tgt, p.results[tgt].total, reqPerProbe)
244+
}
245+
if p.results[tgt].success != wantSuccess[tgt] {
246+
t.Errorf("For target %s, success=%d, want=%d", tgt, p.results[tgt].success, wantSuccess[tgt])
247+
}
248+
}
249+
}

probes/http/proto/config.pb.go

Lines changed: 4 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

probes/http/proto/config.proto

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,12 @@ message ProbeConf {
6666
// presented by the server for any host name will be accepted.
6767
optional bool disable_cert_validation = 14;
6868

69-
// Requests per probe (Deprecated).
70-
// NOTE: This field is now deprecated and will be removed after the v0.10.3
71-
// releases.
69+
// Requests per probe.
70+
// Number of HTTP requests per probe. Requests are executed concurrently and
71+
// each HTTP re contributes to probe results. For example, if you run two
72+
// requests per probe, "total" counter will be incremented by 2.
7273
optional int32 requests_per_probe = 98 [default = 1];
74+
7375
// How long to wait between two requests to the same target
7476
// NOTE: This field is now deprecated and will be removed after the v0.10.3
7577
// releases.

0 commit comments

Comments
 (0)