Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

otelhttp: Introduce internal wrapper package #5291

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 26 additions & 27 deletions instrumentation/net/http/otelhttp/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"github.com/felixge/httpsnoop"

"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp/internal/semconvutil"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp/internal/wrapper"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
Expand Down Expand Up @@ -167,13 +168,13 @@
}
}

var bw bodyWrapper
var bw wrapper.Body
// if request body is nil or NoBody, we don't want to mutate the body as it
// will affect the identity of it in an unforeseeable way because we assert
// ReadCloser fulfills a certain interface and it is indeed nil or NoBody.
if r.Body != nil && r.Body != http.NoBody {
bw.ReadCloser = r.Body
bw.record = readRecordFunc
bw.OnRecordFn = readRecordFunc
r.Body = &bw
}

Expand All @@ -184,12 +185,10 @@
}
}

rww := &respWriterWrapper{
rw := &wrapper.ResponseWriter{
ResponseWriter: w,
record: writeRecordFunc,
ctx: ctx,
props: h.propagators,
statusCode: http.StatusOK, // default status code in case the Handler doesn't write anything
OnRecordFn: writeRecordFunc,
StatusCode: http.StatusOK, // default status code in case the Handler doesn't write anything
}

// Wrap w to use our ResponseWriter methods while also exposing
Expand All @@ -198,13 +197,13 @@

w = httpsnoop.Wrap(w, httpsnoop.Hooks{
Header: func(httpsnoop.HeaderFunc) httpsnoop.HeaderFunc {
return rww.Header
return rw.Header

Check warning on line 200 in instrumentation/net/http/otelhttp/handler.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/net/http/otelhttp/handler.go#L200

Added line #L200 was not covered by tests
},
Write: func(httpsnoop.WriteFunc) httpsnoop.WriteFunc {
return rww.Write
return rw.Write
},
WriteHeader: func(httpsnoop.WriteHeaderFunc) httpsnoop.WriteHeaderFunc {
return rww.WriteHeader
return rw.WriteHeader
},
})

Expand All @@ -213,44 +212,44 @@

next.ServeHTTP(w, r.WithContext(ctx))

setAfterServeAttributes(span, bw.read.Load(), rww.written, rww.statusCode, bw.err, rww.err)
setAfterServeAttributes(span, rw, &bw)

// Add metrics
attributes := append(labeler.Get(), semconvutil.HTTPServerRequestMetrics(h.server, r)...)
if rww.statusCode > 0 {
attributes = append(attributes, semconv.HTTPStatusCode(rww.statusCode))
if rw.StatusCode > 0 {
attributes = append(attributes, semconv.HTTPStatusCode(rw.StatusCode))
}
o := metric.WithAttributes(attributes...)
h.requestBytesCounter.Add(ctx, bw.read.Load(), o)
h.responseBytesCounter.Add(ctx, rww.written, o)
h.requestBytesCounter.Add(ctx, bw.ReadLength(), o)
h.responseBytesCounter.Add(ctx, rw.Written, o)

// Use floating point division here for higher precision (instead of Millisecond method).
elapsedTime := float64(time.Since(requestStartTime)) / float64(time.Millisecond)

h.serverLatencyMeasure.Record(ctx, elapsedTime, o)
}

func setAfterServeAttributes(span trace.Span, read, wrote int64, statusCode int, rerr, werr error) {
func setAfterServeAttributes(span trace.Span, rw *wrapper.ResponseWriter, bw *wrapper.Body) {
attributes := []attribute.KeyValue{}

// TODO: Consider adding an event after each read and write, possibly as an
// option (defaulting to off), so as to not create needlessly verbose spans.
if read > 0 {
attributes = append(attributes, ReadBytesKey.Int64(read))
if bw.ReadLength() > 0 {
attributes = append(attributes, ReadBytesKey.Int64(bw.ReadLength()))
}
if rerr != nil && rerr != io.EOF {
attributes = append(attributes, ReadErrorKey.String(rerr.Error()))
if bw.Err != nil && bw.Err != io.EOF {
attributes = append(attributes, ReadErrorKey.String(bw.Err.Error()))

Check warning on line 241 in instrumentation/net/http/otelhttp/handler.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/net/http/otelhttp/handler.go#L241

Added line #L241 was not covered by tests
}
if wrote > 0 {
attributes = append(attributes, WroteBytesKey.Int64(wrote))
if rw.Written > 0 {
attributes = append(attributes, WroteBytesKey.Int64(rw.Written))
}
if statusCode > 0 {
attributes = append(attributes, semconv.HTTPStatusCode(statusCode))
if rw.StatusCode > 0 {
attributes = append(attributes, semconv.HTTPStatusCode(rw.StatusCode))
}
span.SetStatus(semconvutil.HTTPServerStatus(statusCode))
span.SetStatus(semconvutil.HTTPServerStatus(rw.StatusCode))

if werr != nil && werr != io.EOF {
attributes = append(attributes, WriteErrorKey.String(werr.Error()))
if rw.Err != nil && rw.Err != io.EOF {
attributes = append(attributes, WriteErrorKey.String(rw.Err.Error()))

Check warning on line 252 in instrumentation/net/http/otelhttp/handler.go

View check run for this annotation

Codecov / codecov/patch

instrumentation/net/http/otelhttp/handler.go#L252

Added line #L252 was not covered by tests
}
span.SetAttributes(attributes...)
}
Expand Down
38 changes: 38 additions & 0 deletions instrumentation/net/http/otelhttp/internal/wrapper/body.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package wrapper // import "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp/internal/wrapper"

import (
"io"
"sync/atomic"
)

var _ io.ReadCloser = &Body{}

// Body wraps an http.Request.Body (an io.ReadCloser) to track the
// number of bytes read and the last error.
type Body struct {
io.ReadCloser
OnRecordFn func(n int64) // must not be nil

read atomic.Int64
Err error
}

func (w *Body) Read(b []byte) (int, error) {
n, err := w.ReadCloser.Read(b)
n1 := int64(n)
w.read.Add(n1)
w.Err = err
w.OnRecordFn(n1)
return n, err
}

func (w *Body) ReadLength() int64 {
return w.read.Load()
}

func (w *Body) Close() error {
return w.ReadCloser.Close()
}
56 changes: 56 additions & 0 deletions instrumentation/net/http/otelhttp/internal/wrapper/body_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package wrapper

import (
"io"
"strings"
"sync"
"testing"

"github.com/stretchr/testify/assert"
)

func TestBodyRead(t *testing.T) {
b := Body{
ReadCloser: io.NopCloser(strings.NewReader("Hello, world!")),
OnRecordFn: func(int64) {},
}
c, err := io.ReadAll(&b)
assert.NoError(t, err)
assert.Equal(t, "Hello, world!", string(c))
assert.Equal(t, int64(13), b.ReadLength())
}

func TestBodyOnRecordFn(t *testing.T) {
var sizeFromFn int64

b := Body{
ReadCloser: io.NopCloser(strings.NewReader("Hello, world!")),
OnRecordFn: func(n int64) {
sizeFromFn += n
},
}
_, err := io.ReadAll(&b)
assert.NoError(t, err)
assert.Equal(t, int64(13), sizeFromFn)
}

func TestBodyReadParallel(t *testing.T) {
b := Body{
ReadCloser: io.NopCloser(strings.NewReader("Hello, world!")),
OnRecordFn: func(int64) {},
}

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
b.ReadLength()
}()
_, err := io.ReadAll(&b)
assert.NoError(t, err)

wg.Wait()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package wrapper // import "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp/internal/wrapper"

import (
"net/http"
)

var _ http.ResponseWriter = &ResponseWriter{}

// ResponseWriter wraps a http.ResponseWriter in order to track the number of
// bytes written, the last error, and to catch the first written statusCode.
// TODO: The wrapped http.ResponseWriter doesn't implement any of the optional
// types (http.Hijacker, http.Pusher, http.CloseNotifier, http.Flusher, etc)
// that may be useful when using it in real life situations.
type ResponseWriter struct {
http.ResponseWriter
OnRecordFn func(n int64) // must not be nil

Written int64
StatusCode int
Err error
WroteHeader bool
}

func (w *ResponseWriter) Write(p []byte) (int, error) {
if !w.WroteHeader {
w.WriteHeader(http.StatusOK)
}
n, err := w.ResponseWriter.Write(p)
Dismissed Show dismissed Hide dismissed
n1 := int64(n)
w.OnRecordFn(n1)
w.Written += n1
w.Err = err
return n, err
}

// WriteHeader persists initial statusCode for span attribution.
// All calls to WriteHeader will be propagated to the underlying ResponseWriter
// and will persist the statusCode from the first call.
// Blocking consecutive calls to WriteHeader alters expected behavior and will
// remove warning logs from net/http where developers will notice incorrect handler implementations.
func (w *ResponseWriter) WriteHeader(statusCode int) {
if !w.WroteHeader {
w.WroteHeader = true
w.StatusCode = statusCode
}
w.ResponseWriter.WriteHeader(statusCode)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package wrapper

import (
"net/http"
"net/http/httptest"
"testing"

"github.com/stretchr/testify/assert"
)

func TestResponseWriterWrite(t *testing.T) {
rw := &ResponseWriter{
ResponseWriter: &httptest.ResponseRecorder{},
OnRecordFn: func(int64) {},
}

_, err := rw.Write([]byte("hello world"))
assert.NoError(t, err)
assert.Equal(t, int64(11), rw.Written)
assert.Equal(t, http.StatusOK, rw.StatusCode)
assert.Nil(t, rw.Err)
assert.True(t, rw.WroteHeader)
}

func TestResponseWriterOnRecordFn(t *testing.T) {
var sizeFromFn int64

rw := &ResponseWriter{
ResponseWriter: &httptest.ResponseRecorder{},
OnRecordFn: func(n int64) {
sizeFromFn += n
},
}

_, err := rw.Write([]byte("hello world"))
assert.NoError(t, err)
assert.Equal(t, int64(11), sizeFromFn)
}

func TestResponseWriterWriteHeader(t *testing.T) {
rw := &ResponseWriter{
ResponseWriter: &httptest.ResponseRecorder{},
OnRecordFn: func(int64) {},
}

rw.WriteHeader(http.StatusTeapot)
assert.Equal(t, http.StatusTeapot, rw.StatusCode)
assert.True(t, rw.WroteHeader)

rw.WriteHeader(http.StatusGone)
assert.Equal(t, http.StatusTeapot, rw.StatusCode)
}
7 changes: 4 additions & 3 deletions instrumentation/net/http/otelhttp/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"go.opentelemetry.io/otel/metric"

"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp/internal/semconvutil"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp/internal/wrapper"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/propagation"
Expand Down Expand Up @@ -143,14 +144,14 @@ func (t *Transport) RoundTrip(r *http.Request) (*http.Response, error) {
r = r.Clone(ctx) // According to RoundTripper spec, we shouldn't modify the origin request.

// use a body wrapper to determine the request size
var bw bodyWrapper
var bw wrapper.Body
// if request body is nil or NoBody, we don't want to mutate the body as it
// will affect the identity of it in an unforeseeable way because we assert
// ReadCloser fulfills a certain interface and it is indeed nil or NoBody.
if r.Body != nil && r.Body != http.NoBody {
bw.ReadCloser = r.Body
// noop to prevent nil panic. not using this record fun yet.
bw.record = func(int64) {}
bw.OnRecordFn = func(int64) {}
r.Body = &bw
}

Expand All @@ -171,7 +172,7 @@ func (t *Transport) RoundTrip(r *http.Request) (*http.Response, error) {
metricAttrs = append(metricAttrs, semconv.HTTPStatusCode(res.StatusCode))
}
o := metric.WithAttributes(metricAttrs...)
t.requestBytesCounter.Add(ctx, bw.read.Load(), o)
t.requestBytesCounter.Add(ctx, bw.ReadLength(), o)
// For handling response bytes we leverage a callback when the client reads the http response
readRecordFunc := func(n int64) {
t.responseBytesCounter.Add(ctx, n, o)
Expand Down
Loading
Loading