- 
                Notifications
    You must be signed in to change notification settings 
- Fork 485
feat: v1 trace protocol implementation #3947
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
f5d87c3
              25440fd
              d0635e4
              478ef22
              230633a
              c0f2493
              1e941ac
              dc00fd3
              98d30ce
              0ad7dd1
              326a244
              ea80c7a
              af26f15
              ded707d
              5124410
              bb1baaf
              f071e05
              03abab3
              22e70ec
              68ab0d8
              23c72cf
              a12502c
              08b1b97
              60165d3
              11a4ad9
              6d10c7a
              46a6cb9
              170a42c
              7d6c3cc
              5881a50
              dc26c98
              9030925
              269988e
              8c7bef3
              8f06c7b
              80dbd54
              b64c68f
              8908891
              e739a34
              3a6e7fa
              f3f86ab
              91c5929
              58d2f62
              82c805b
              1392569
              bbb62d2
              d226900
              72fae1d
              7d4c9cf
              a989bde
              e2ef0d5
              0067018
              5e1633d
              888c82c
              c4b126d
              d931d7e
              a8b15f2
              2ab2051
              092f607
              639f9b2
              581c994
              0487a5a
              8813fa5
              ff24c1f
              50724d8
              883e3f2
              bc024ce
              e759d0e
              e4ae913
              da2fd94
              813031f
              6ad1afe
              7fc1f29
              f2ecb71
              ef59f66
              8293cd1
              bff64ad
              c398de9
              5263a6f
              6e71dbd
              File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -69,8 +69,12 @@ type startupInfo struct { | |
| // checkEndpoint tries to connect to the URL specified by endpoint. | ||
| // If the endpoint is not reachable, checkEndpoint returns an error | ||
| // explaining why. | ||
| func checkEndpoint(c *http.Client, endpoint string) error { | ||
| req, err := http.NewRequest("POST", endpoint, bytes.NewReader([]byte{0x90})) | ||
| func checkEndpoint(c *http.Client, endpoint string, protocol float64) error { | ||
| b := []byte{0x90} // empty array | ||
| if protocol == traceProtocolV1 { | ||
| b = []byte{0x80} // empty map | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Payload v1 is represented by a message pack map, whereas the empty payload in v0.4 was represented by an array. To prevent failures when we send empty data, we need to check for the payload version and send the correct data type. | ||
| } | ||
| req, err := http.NewRequest("POST", endpoint, bytes.NewReader(b)) | ||
| if err != nil { | ||
| return fmt.Errorf("cannot create http request: %s", err) | ||
| } | ||
|  | @@ -162,8 +166,8 @@ func logStartup(t *tracer) { | |
| info.SampleRateLimit = fmt.Sprintf("%v", limit) | ||
| } | ||
| if !t.config.logToStdout { | ||
| if err := checkEndpoint(t.config.httpClient, t.config.transport.endpoint()); err != nil { | ||
| info.AgentError = err.Error() | ||
| if err := checkEndpoint(t.config.httpClient, t.config.transport.endpoint(), t.config.traceProtocol); err != nil { | ||
| info.AgentError = fmt.Sprintf("%s", err.Error()) | ||
| log.Warn("DIAGNOSTICS Unable to reach agent intake: %s", err.Error()) | ||
| } | ||
| } | ||
|  | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -414,9 +414,6 @@ func newConfig(opts ...StartOption) (*config, error) { | |
|  | ||
| reportTelemetryOnAppStarted(telemetry.Configuration{Name: "trace_rate_limit", Value: c.traceRateLimitPerSecond, Origin: origin}) | ||
|  | ||
| // Set the trace protocol to use. | ||
| c.traceProtocol = internal.FloatEnv("DD_TRACE_AGENT_PROTOCOL_VERSION", traceProtocolV04) | ||
|  | ||
| if v := env.Get("OTEL_LOGS_EXPORTER"); v != "" { | ||
| log.Warn("OTEL_LOGS_EXPORTER is not supported") | ||
| } | ||
|  | @@ -595,6 +592,15 @@ func newConfig(opts ...StartOption) (*config, error) { | |
| if c.transport == nil { | ||
| c.transport = newHTTPTransport(c.agentURL.String(), c.httpClient) | ||
| } | ||
| // Set the trace protocol to use. | ||
| if internal.BoolEnv("DD_TRACE_V1_PAYLOAD_FORMAT_ENABLED", false) { | ||
| c.traceProtocol = traceProtocolV1 | ||
| if t, ok := c.transport.(*httpTransport); ok { | ||
| t.traceURL = fmt.Sprintf("%s%s", c.agentURL.String(), tracesAPIPathV1) | ||
| } | ||
| } else { | ||
| c.traceProtocol = traceProtocolV04 | ||
| } | ||
| 
      Comment on lines
    
      +596
     to 
      +603
    
   There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems weird that we're modifying httpTransport.traceURL directly for the  Maybe we can modify the  Like this (pseudocode):  | ||
| if c.propagator == nil { | ||
| envKey := "DD_TRACE_X_DATADOG_TAGS_MAX_LENGTH" | ||
| maxLen := internal.IntEnv(envKey, defaultMaxTagsHeaderLen) | ||
|  | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -6,14 +6,8 @@ | |
| package tracer | ||
|  | ||
| import ( | ||
| "bytes" | ||
| "encoding/binary" | ||
| "io" | ||
| "sync" | ||
| "sync/atomic" | ||
|  | ||
| "github.com/DataDog/dd-trace-go/v2/internal/processtags" | ||
| "github.com/tinylib/msgp/msgp" | ||
| ) | ||
|  | ||
| // payloadStats contains the statistics of a payload. | ||
|  | @@ -52,206 +46,35 @@ type payload interface { | |
| payloadReader | ||
| } | ||
|  | ||
| // unsafePayload is a wrapper on top of the msgpack encoder which allows constructing an | ||
| // encoded array by pushing its entries sequentially, one at a time. It basically | ||
| // allows us to encode as we would with a stream, except that the contents of the stream | ||
| // can be read as a slice by the msgpack decoder at any time. It follows the guidelines | ||
| // from the msgpack array spec: | ||
| // https://github.com/msgpack/msgpack/blob/master/spec.md#array-format-family | ||
| // | ||
| // unsafePayload implements io.Reader and can be used with the decoder directly. | ||
| // | ||
| // unsafePayload is not safe for concurrent use. | ||
| // | ||
| // unsafePayload is meant to be used only once and eventually dismissed with the | ||
| // single exception of retrying failed flush attempts. | ||
| // | ||
| // ⚠️ Warning! | ||
| // | ||
| // The payload should not be reused for multiple sets of traces. Resetting the | ||
| // payload for re-use requires the transport to wait for the HTTP package to | ||
| // Close the request body before attempting to re-use it again! This requires | ||
| // additional logic to be in place. See: | ||
| // | ||
| // • https://github.com/golang/go/blob/go1.16/src/net/http/client.go#L136-L138 | ||
| // • https://github.com/DataDog/dd-trace-go/pull/475 | ||
| // • https://github.com/DataDog/dd-trace-go/pull/549 | ||
| // • https://github.com/DataDog/dd-trace-go/pull/976 | ||
| type unsafePayload struct { | ||
| // header specifies the first few bytes in the msgpack stream | ||
| // indicating the type of array (fixarray, array16 or array32) | ||
| // and the number of items contained in the stream. | ||
| header []byte | ||
|  | ||
| // off specifies the current read position on the header. | ||
| off int | ||
|  | ||
| // count specifies the number of items in the stream. | ||
| count uint32 | ||
|  | ||
| // buf holds the sequence of msgpack-encoded items. | ||
| buf bytes.Buffer | ||
|  | ||
| // reader is used for reading the contents of buf. | ||
| reader *bytes.Reader | ||
|  | ||
| // protocolVersion specifies the trace protocolVersion to use. | ||
| protocolVersion float64 | ||
| } | ||
|  | ||
| var _ io.Reader = (*unsafePayload)(nil) | ||
|  | ||
| // newUnsafePayload returns a ready to use unsafe payload. | ||
| func newUnsafePayload(protocol float64) *unsafePayload { | ||
| p := &unsafePayload{ | ||
| header: make([]byte, 8), | ||
| off: 8, | ||
| protocolVersion: protocol, | ||
| } | ||
| return p | ||
| } | ||
|  | ||
| // push pushes a new item into the stream. | ||
| func (p *unsafePayload) push(t []*Span) (stats payloadStats, err error) { | ||
| p.setTracerTags(t) | ||
| sl := spanList(t) | ||
| p.buf.Grow(sl.Msgsize()) | ||
| if err := msgp.Encode(&p.buf, sl); err != nil { | ||
| return payloadStats{}, err | ||
| } | ||
| p.recordItem() | ||
| return p.stats(), nil | ||
| } | ||
|  | ||
| func (p *unsafePayload) setTracerTags(t []*Span) { | ||
| // set on first chunk | ||
| if atomic.LoadUint32(&p.count) != 0 { | ||
| return | ||
| } | ||
| if len(t) == 0 { | ||
| return | ||
| } | ||
| pTags := processtags.GlobalTags().String() | ||
| if pTags == "" { | ||
| return | ||
| // newPayload returns a ready to use payload. | ||
| func newPayload(protocol float64) payload { | ||
| if protocol == traceProtocolV1 { | ||
| return &safePayload{ | ||
| p: newPayloadV1(), | ||
| } | ||
| } | ||
| t[0].setProcessTags(pTags) | ||
| } | ||
|  | ||
| // itemCount returns the number of items available in the stream. | ||
| func (p *unsafePayload) itemCount() int { | ||
| return int(atomic.LoadUint32(&p.count)) | ||
| } | ||
|  | ||
| // size returns the payload size in bytes. After the first read the value becomes | ||
| // inaccurate by up to 8 bytes. | ||
| func (p *unsafePayload) size() int { | ||
| return p.buf.Len() + len(p.header) - p.off | ||
| } | ||
|  | ||
| // reset sets up the payload to be read a second time. It maintains the | ||
| // underlying byte contents of the buffer. reset should not be used in order to | ||
| // reuse the payload for another set of traces. | ||
| func (p *unsafePayload) reset() { | ||
| p.updateHeader() | ||
| if p.reader != nil { | ||
| p.reader.Seek(0, 0) | ||
| return &safePayload{ | ||
| p: newPayloadV04(), | ||
| } | ||
| } | ||
|  | ||
| // clear empties the payload buffers. | ||
| func (p *unsafePayload) clear() { | ||
| p.buf = bytes.Buffer{} | ||
| p.reader = nil | ||
| } | ||
|  | ||
| // https://github.com/msgpack/msgpack/blob/master/spec.md#array-format-family | ||
| const ( | ||
| // arrays | ||
| msgpackArrayFix byte = 144 // up to 15 items | ||
| msgpackArray16 byte = 0xdc // up to 2^16-1 items, followed by size in 2 bytes | ||
| msgpackArray32 byte = 0xdd // up to 2^32-1 items, followed by size in 4 bytes | ||
| ) | ||
|  | ||
| // updateHeader updates the payload header based on the number of items currently | ||
| // present in the stream. | ||
| func (p *unsafePayload) updateHeader() { | ||
| n := uint64(atomic.LoadUint32(&p.count)) | ||
| switch { | ||
| case n <= 15: | ||
| p.header[7] = msgpackArrayFix + byte(n) | ||
| p.off = 7 | ||
| case n <= 1<<16-1: | ||
| binary.BigEndian.PutUint64(p.header, n) // writes 2 bytes | ||
| p.header[5] = msgpackArray16 | ||
| p.off = 5 | ||
| default: // n <= 1<<32-1 | ||
| binary.BigEndian.PutUint64(p.header, n) // writes 4 bytes | ||
| p.header[3] = msgpackArray32 | ||
| p.off = 3 | ||
| } | ||
| } | ||
|  | ||
| // Close implements io.Closer | ||
| func (p *unsafePayload) Close() error { | ||
| return nil | ||
| } | ||
|  | ||
| // Read implements io.Reader. It reads from the msgpack-encoded stream. | ||
| func (p *unsafePayload) Read(b []byte) (n int, err error) { | ||
| if p.off < len(p.header) { | ||
| // reading header | ||
| n = copy(b, p.header[p.off:]) | ||
| p.off += n | ||
| return n, nil | ||
| } | ||
| if p.reader == nil { | ||
| p.reader = bytes.NewReader(p.buf.Bytes()) | ||
| } | ||
| return p.reader.Read(b) | ||
| } | ||
|  | ||
| // Write implements io.Writer. It writes data directly to the buffer. | ||
| func (p *unsafePayload) Write(data []byte) (n int, err error) { | ||
| return p.buf.Write(data) | ||
| } | ||
|  | ||
| // grow grows the buffer to ensure it can accommodate n more bytes. | ||
| func (p *unsafePayload) grow(n int) { | ||
| p.buf.Grow(n) | ||
| } | ||
|  | ||
| // recordItem records that an item was added and updates the header. | ||
| func (p *unsafePayload) recordItem() { | ||
| atomic.AddUint32(&p.count, 1) | ||
| p.updateHeader() | ||
| } | ||
|  | ||
| // stats returns the current stats of the payload. | ||
| func (p *unsafePayload) stats() payloadStats { | ||
| return payloadStats{ | ||
| size: p.size(), | ||
| itemCount: int(atomic.LoadUint32(&p.count)), | ||
| } | ||
| } | ||
|  | ||
| // protocol returns the protocol version of the payload. | ||
| func (p *unsafePayload) protocol() float64 { | ||
| return p.protocolVersion | ||
| } | ||
|  | ||
| var _ io.Reader = (*safePayload)(nil) | ||
|  | ||
| // newPayload returns a ready to use thread-safe payload. | ||
| func newPayload(protocol float64) payload { | ||
| return &safePayload{ | ||
| p: newUnsafePayload(protocol), | ||
| } | ||
| } | ||
| // maps | ||
| msgpackMapFix byte = 0x80 // up to 15 items | ||
| msgpackMap16 byte = 0xde // up to 2^16-1 items, followed by size in 2 bytes | ||
| msgpackMap32 byte = 0xdf // up to 2^32-1 items, followed by size in 4 bytes | ||
| ) | ||
|  | ||
| // safePayload provides a thread-safe wrapper around unsafePayload. | ||
| // safePayload provides a thread-safe wrapper around payload. | ||
| type safePayload struct { | ||
| mu sync.RWMutex | ||
| p *unsafePayload | ||
| p payload | ||
| 
      Comment on lines
    
      -251
     to 
      +77
    
   There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm slightly confused by  There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mtoffl01  | ||
| } | ||
|  | ||
| // push pushes a new item into the stream in a thread-safe manner. | ||
|  | @@ -262,9 +85,9 @@ func (sp *safePayload) push(t spanList) (stats payloadStats, err error) { | |
| } | ||
|  | ||
| // itemCount returns the number of items available in the stream in a thread-safe manner. | ||
| // This method is not thread-safe, but the underlying payload.itemCount() must be. | ||
| func (sp *safePayload) itemCount() int { | ||
| // Use direct atomic access for better performance - no mutex needed | ||
| return int(atomic.LoadUint32(&sp.p.count)) | ||
| return sp.p.itemCount() | ||
| } | ||
|  | ||
| // size returns the payload size in bytes in a thread-safe manner. | ||
|  | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we adding these public APIs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good catch. @hannahkm Do we need them? I think everything happens inside the same package, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. We don't need them to be public, as we aren't (and probably shouldn't) be calling this functions from anywhere else.