Skip to content

Commit

Permalink
fix(verifier): acquire HTTP port atomically
Browse files Browse the repository at this point in the history
Acquire the port in VerifyMessageProvider and VerifyProvider atomically
by using the listener used for a free port acquisition for the HTTP server.

This prevents a race condition when multiple pact verifiers run in
parallel and compete for free ports, which could result in errors like

```
Expected server to start < 10s. Timed out waiting for http verification proxy on port 34425 - check for errors
```
  • Loading branch information
Alex Tsibulya committed Sep 25, 2023
1 parent c4aa28e commit ea6792d
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 29 deletions.
31 changes: 17 additions & 14 deletions dsl/pact.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,14 @@ func (p *Pact) VerifyProviderRaw(request types.VerifyRequest) ([]types.ProviderV
// This maps the 'description' field of a message pact, to a function handler
// that will implement the message producer. This function must return an object and optionally
// and error. The object will be marshalled to JSON for comparison.
port, err := proxy.HTTPReverseProxy(opts)
listener, err := proxy.HTTPReverseProxy(opts)
if err != nil {
log.Printf("[ERROR] unable to start http verification proxy: %v", err)
return nil, err
}
defer listener.Close()

port := listener.Addr().(*net.TCPAddr).Port

// Backwards compatibility, setup old provider states URL if given
// Otherwise point to proxy
Expand Down Expand Up @@ -669,14 +676,17 @@ func (p *Pact) VerifyMessageProviderRaw(request VerifyMessageRequest) ([]types.P
// and error. The object will be marshalled to JSON for comparison.
mux := http.NewServeMux()

port, err := utils.GetFreePort()
ln, err := net.Listen("tcp", "localhost:0")
if err != nil {
return response, fmt.Errorf("unable to allocate a port for verification: %v", err)
log.Fatal(err)
}
defer ln.Close()

log.Printf("[DEBUG] API handler starting at %s", ln.Addr())

// Construct verifier request
verificationRequest := types.VerifyRequest{
ProviderBaseURL: fmt.Sprintf("http://localhost:%d", port),
ProviderBaseURL: fmt.Sprintf("http://%s", ln.Addr()),
PactURLs: request.PactURLs,
BrokerURL: request.BrokerURL,
Tags: request.Tags,
Expand All @@ -695,25 +705,18 @@ func (p *Pact) VerifyMessageProviderRaw(request VerifyMessageRequest) ([]types.P

mux.HandleFunc("/", messageVerificationHandler(request.MessageHandlers, request.StateHandlers))

ln, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
log.Fatal(err)
}
defer ln.Close()

log.Printf("[DEBUG] API handler starting: port %d (%s)", port, ln.Addr())
go func() {
if err := http.Serve(ln, mux); err != nil {
// NOTE: calling Fatalf causing test failures due to "accept tcp [::]:<port>: use of closed network connection"
if err := http.Serve(ln, mux); err != nil && !strings.HasSuffix(err.Error(), "use of closed network connection") {
log.Printf("[DEBUG] API handler start failed: %v", err)
}
}()

port := ln.Addr().(*net.TCPAddr).Port

portErr := waitForPort(port, "tcp", "localhost", p.ClientTimeout,
fmt.Sprintf(`Timed out waiting for pact proxy on port %d - check for errors`, port))

if portErr != nil {
log.Fatal("Error:", err)
return response, portErr
}

Expand Down
24 changes: 12 additions & 12 deletions proxy/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
"net/url"
"strings"
"time"

"github.com/pact-foundation/pact-go/utils"
)

// Middleware is a way to use composition to add functionality
Expand Down Expand Up @@ -72,7 +70,7 @@ func chainHandlers(mw ...Middleware) Middleware {

// HTTPReverseProxy provides a default setup for proxying
// internal components within the framework
func HTTPReverseProxy(options Options) (int, error) {
func HTTPReverseProxy(options Options) (net.Listener, error) {
log.Println("[DEBUG] starting new proxy with opts", options)
port := options.ProxyPort
var err error
Expand All @@ -86,20 +84,22 @@ func HTTPReverseProxy(options Options) (int, error) {
proxy := createProxy(url, options.InternalRequestPathPrefix)
proxy.Transport = customTransport{tlsConfig: options.CustomTLSConfig}

if port == 0 {
port, err = utils.GetFreePort()
if err != nil {
log.Println("[ERROR] unable to start reverse proxy server:", err)
return 0, err
}
ln, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", port))
if err != nil {
return nil, err
}

wrapper := chainHandlers(append(options.Middleware, loggingMiddleware)...)

log.Println("[DEBUG] starting reverse proxy on port", port)
go http.ListenAndServe(fmt.Sprintf(":%d", port), wrapper(proxy)) // nolint:errcheck
go func() {
if err := http.Serve(ln, wrapper(proxy)); err != nil && !strings.HasSuffix(err.Error(), "use of closed network connection") {
log.Printf("[ERROR] unable to start reverse proxy server: %v", err)
}
}()

log.Printf("[DEBUG] starting reverse proxy at %s", ln.Addr())

return port, nil
return ln, nil
}

// https://stackoverflow.com/questions/52986853/how-to-debug-httputil-newsinglehostreverseproxy
Expand Down
8 changes: 5 additions & 3 deletions proxy/http_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package proxy

import (
"net"
"net/http"
"net/http/httptest"
"testing"
Expand Down Expand Up @@ -72,19 +73,20 @@ func TestChainHandlers(t *testing.T) {
func TestHTTPReverseProxy(t *testing.T) {

// Setup target to proxy
port, err := HTTPReverseProxy(Options{
ln, err := HTTPReverseProxy(Options{
Middleware: []Middleware{
DummyMiddleware("1"),
},
TargetScheme: "http",
TargetAddress: "127.0.0.1:1234",
})
defer ln.Close()

Check failure on line 83 in proxy/http_test.go

View workflow job for this annotation

GitHub Actions / lint (1.17.x, ubuntu-latest)

SA5001: should check returned error before deferring ln.Close() (staticcheck)

Check failure on line 83 in proxy/http_test.go

View workflow job for this annotation

GitHub Actions / lint (1.18.x, ubuntu-latest)

SA5001: should check returned error before deferring ln.Close() (staticcheck)

Check failure on line 83 in proxy/http_test.go

View workflow job for this annotation

GitHub Actions / lint (1.19.x, ubuntu-latest)

SA5001: should check returned error before deferring ln.Close() (staticcheck)

if err != nil {
t.Errorf("unexpected error %v", err)
}

if port == 0 {
t.Errorf("want non-zero port, got %v", port)
if tcpAddr, ok := ln.Addr().(*net.TCPAddr); !ok || tcpAddr.Port == 0 {
t.Errorf("want non-zero port, got %v", ln.Addr())
}
}

0 comments on commit ea6792d

Please sign in to comment.