Skip to content

Commit 034ca64

Browse files
authored
feat: support messaging.destination.name (#298)
1 parent dae54a3 commit 034ca64

File tree

3 files changed

+187
-103
lines changed

3 files changed

+187
-103
lines changed

input/elasticapm/internal/modeldecoder/v2/span_test.go

Lines changed: 40 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -424,38 +424,47 @@ func TestDecodeMapToSpanModel(t *testing.T) {
424424
})
425425

426426
t.Run("messaging", func(t *testing.T) {
427-
attrs := map[string]interface{}{
428-
"messaging.system": "kafka",
429-
"messaging.destination": "myTopic",
430-
"net.peer.ip": "10.20.30.40",
431-
"net.peer.port": json.Number("123"),
432-
}
433-
434-
var input span
435-
var event modelpb.APMEvent
436-
modeldecodertest.SetStructValues(&input, modeldecodertest.DefaultValues())
437-
input.OTel.Attributes = attrs
438-
input.OTel.SpanKind.Set("PRODUCER")
439-
input.Type.Reset()
440-
mapToSpanModel(&input, &event)
427+
for _, attrs := range []map[string]any{
428+
{
429+
"messaging.system": "kafka",
430+
"messaging.destination": "myTopic",
431+
"net.peer.ip": "10.20.30.40",
432+
"net.peer.port": json.Number("123"),
433+
},
434+
{
435+
"messaging.system": "kafka",
436+
"messaging.destination.name": "myTopic",
437+
"net.peer.ip": "10.20.30.40",
438+
"net.peer.port": json.Number("123"),
439+
},
440+
} {
441441

442-
assert.Equal(t, "messaging", event.Span.Type)
443-
assert.Equal(t, "kafka", event.Span.Subtype)
444-
assert.Equal(t, "send", event.Span.Action)
445-
assert.Equal(t, "PRODUCER", event.Span.Kind)
446-
assert.Equal(t, &modelpb.Destination{
447-
Address: "10.20.30.40",
448-
Port: 123,
449-
}, event.Destination)
450-
assert.Empty(t, cmp.Diff(&modelpb.DestinationService{
451-
Type: "messaging",
452-
Name: "kafka",
453-
Resource: "kafka/myTopic",
454-
}, event.Span.DestinationService, protocmp.Transform()))
455-
assert.Empty(t, cmp.Diff(&modelpb.ServiceTarget{
456-
Type: "kafka",
457-
Name: "myTopic",
458-
}, event.Service.Target, protocmp.Transform()))
442+
var input span
443+
var event modelpb.APMEvent
444+
modeldecodertest.SetStructValues(&input, modeldecodertest.DefaultValues())
445+
input.OTel.Attributes = attrs
446+
input.OTel.SpanKind.Set("PRODUCER")
447+
input.Type.Reset()
448+
mapToSpanModel(&input, &event)
449+
450+
assert.Equal(t, "messaging", event.Span.Type)
451+
assert.Equal(t, "kafka", event.Span.Subtype)
452+
assert.Equal(t, "send", event.Span.Action)
453+
assert.Equal(t, "PRODUCER", event.Span.Kind)
454+
assert.Equal(t, &modelpb.Destination{
455+
Address: "10.20.30.40",
456+
Port: 123,
457+
}, event.Destination)
458+
assert.Empty(t, cmp.Diff(&modelpb.DestinationService{
459+
Type: "messaging",
460+
Name: "kafka",
461+
Resource: "kafka/myTopic",
462+
}, event.Span.DestinationService, protocmp.Transform()))
463+
assert.Empty(t, cmp.Diff(&modelpb.ServiceTarget{
464+
Type: "kafka",
465+
Name: "myTopic",
466+
}, event.Service.Target, protocmp.Transform()))
467+
}
459468
})
460469

461470
t.Run("network", func(t *testing.T) {

input/otlp/traces.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -439,7 +439,11 @@ func TranslateTransaction(
439439
event.Network.Carrier.Icc = stringval
440440

441441
// messaging.*
442-
case "message_bus.destination", semconv.AttributeMessagingDestination:
442+
//
443+
// messaging.destination is now called messaging.destination.name in the latest semconv
444+
// https://opentelemetry.io/docs/specs/semconv/attributes-registry/messaging
445+
// keep both of them for the backward compatibility
446+
case "message_bus.destination", semconv.AttributeMessagingDestination, "messaging.destination.name":
443447
isMessaging = true
444448
messagingQueueName = stringval
445449
case semconv.AttributeMessagingSystem:
@@ -784,7 +788,11 @@ func TranslateSpan(spanKind ptrace.SpanKind, attributes pcommon.Map, event *mode
784788
event.Session.Id = stringval
785789

786790
// messaging.*
787-
case "message_bus.destination", semconv.AttributeMessagingDestination:
791+
//
792+
// messaging.destination is now called messaging.destination.name in the latest semconv
793+
// https://opentelemetry.io/docs/specs/semconv/attributes-registry/messaging
794+
// keep both of them for the backward compatibility
795+
case "message_bus.destination", semconv.AttributeMessagingDestination, "messaging.destination.name":
788796
message.QueueName = stringval
789797
isMessaging = true
790798
case semconv.AttributeMessagingOperation:

input/otlp/traces_test.go

Lines changed: 137 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -677,27 +677,37 @@ func TestMessagingTransaction(t *testing.T) {
677677
}
678678

679679
func TestMessagingSpan(t *testing.T) {
680-
event := transformSpanWithAttributes(t, map[string]interface{}{
681-
"messaging.system": "kafka",
682-
"messaging.destination": "myTopic",
683-
"net.peer.ip": "10.20.30.40",
684-
"net.peer.port": 123,
685-
}, func(s ptrace.Span) {
686-
s.SetKind(ptrace.SpanKindProducer)
687-
})
688-
assert.Equal(t, "messaging", event.Span.Type)
689-
assert.Equal(t, "kafka", event.Span.Subtype)
690-
assert.Equal(t, "send", event.Span.Action)
691-
assert.Empty(t, event.Labels)
692-
assert.Equal(t, &modelpb.Destination{
693-
Address: "10.20.30.40",
694-
Port: 123,
695-
}, event.Destination)
696-
assert.Empty(t, cmp.Diff(&modelpb.DestinationService{
697-
Type: "messaging",
698-
Name: "kafka",
699-
Resource: "kafka/myTopic",
700-
}, event.Span.DestinationService, protocmp.Transform()))
680+
for _, attr := range []map[string]any{
681+
{
682+
"messaging.system": "kafka",
683+
"messaging.destination": "myTopic",
684+
"net.peer.ip": "10.20.30.40",
685+
"net.peer.port": 123,
686+
},
687+
{
688+
"messaging.system": "kafka",
689+
"messaging.destination.name": "myTopic",
690+
"net.peer.ip": "10.20.30.40",
691+
"net.peer.port": 123,
692+
},
693+
} {
694+
event := transformSpanWithAttributes(t, attr, func(s ptrace.Span) {
695+
s.SetKind(ptrace.SpanKindProducer)
696+
})
697+
assert.Equal(t, "messaging", event.Span.Type)
698+
assert.Equal(t, "kafka", event.Span.Subtype)
699+
assert.Equal(t, "send", event.Span.Action)
700+
assert.Empty(t, event.Labels)
701+
assert.Equal(t, &modelpb.Destination{
702+
Address: "10.20.30.40",
703+
Port: 123,
704+
}, event.Destination)
705+
assert.Empty(t, cmp.Diff(&modelpb.DestinationService{
706+
Type: "messaging",
707+
Name: "kafka",
708+
Resource: "kafka/myTopic",
709+
}, event.Span.DestinationService, protocmp.Transform()))
710+
}
701711
}
702712

703713
func TestMessagingSpan_DestinationResource(t *testing.T) {
@@ -708,40 +718,66 @@ func TestMessagingSpan_DestinationResource(t *testing.T) {
708718
assert.Empty(t, cmp.Diff(expectedDestinationService, event.Span.DestinationService, protocmp.Transform()))
709719
}
710720

721+
setAttr := func(t *testing.T, baseAttr map[string]any, key string, val any) map[string]any {
722+
t.Helper()
723+
newAttr := make(map[string]any)
724+
// Copy from the original map to the target map
725+
for key, value := range baseAttr {
726+
newAttr[key] = value
727+
}
728+
newAttr[key] = val
729+
return newAttr
730+
}
731+
711732
t.Run("system_destination_peerservice_peeraddress", func(t *testing.T) {
712-
test(t, &modelpb.Destination{
713-
Address: "127.0.0.1",
714-
}, &modelpb.DestinationService{
715-
Type: "messaging",
716-
Name: "testsvc",
717-
Resource: "127.0.0.1/testtopic",
718-
}, map[string]interface{}{
719-
"messaging.system": "kafka",
720-
"messaging.destination": "testtopic",
721-
"peer.service": "testsvc",
722-
"peer.address": "127.0.0.1",
723-
})
733+
baseAttr := map[string]any{
734+
"messaging.system": "kafka",
735+
"peer.service": "testsvc",
736+
"peer.address": "127.0.0.1",
737+
}
738+
for _, attr := range []map[string]any{
739+
setAttr(t, baseAttr, "messaging.destination", "testtopic"),
740+
setAttr(t, baseAttr, "messaging.destination.name", "testtopic"),
741+
} {
742+
test(t, &modelpb.Destination{
743+
Address: "127.0.0.1",
744+
}, &modelpb.DestinationService{
745+
Type: "messaging",
746+
Name: "testsvc",
747+
Resource: "127.0.0.1/testtopic",
748+
}, attr)
749+
}
724750
})
725751
t.Run("system_destination_peerservice", func(t *testing.T) {
726-
test(t, nil, &modelpb.DestinationService{
727-
Type: "messaging",
728-
Name: "testsvc",
729-
Resource: "testsvc/testtopic",
730-
}, map[string]interface{}{
731-
"messaging.system": "kafka",
732-
"messaging.destination": "testtopic",
733-
"peer.service": "testsvc",
734-
})
752+
baseAttr := map[string]any{
753+
"messaging.system": "kafka",
754+
"peer.service": "testsvc",
755+
}
756+
for _, attr := range []map[string]any{
757+
setAttr(t, baseAttr, "messaging.destination", "testtopic"),
758+
setAttr(t, baseAttr, "messaging.destination.name", "testtopic"),
759+
} {
760+
test(t, nil, &modelpb.DestinationService{
761+
Type: "messaging",
762+
Name: "testsvc",
763+
Resource: "testsvc/testtopic",
764+
}, attr)
765+
}
735766
})
736767
t.Run("system_destination", func(t *testing.T) {
737-
test(t, nil, &modelpb.DestinationService{
738-
Type: "messaging",
739-
Name: "kafka",
740-
Resource: "kafka/testtopic",
741-
}, map[string]interface{}{
742-
"messaging.system": "kafka",
743-
"messaging.destination": "testtopic",
744-
})
768+
baseAttr := map[string]any{
769+
"messaging.system": "kafka",
770+
}
771+
for _, attr := range []map[string]any{
772+
setAttr(t, baseAttr, "messaging.destination", "testtopic"),
773+
setAttr(t, baseAttr, "messaging.destination.name", "testtopic"),
774+
} {
775+
test(t, nil, &modelpb.DestinationService{
776+
Type: "messaging",
777+
Name: "kafka",
778+
Resource: "kafka/testtopic",
779+
}, attr)
780+
}
745781
})
746782
}
747783

@@ -776,6 +812,9 @@ func TestTransactionTypePriorities(t *testing.T) {
776812

777813
attribs["messaging.destination"] = "foobar"
778814
assert.Equal(t, "messaging", transactionWithAttribs(attribs).Transaction.Type)
815+
delete(attribs, "messaging.destination")
816+
attribs["messaging.destination.name"] = "foobar"
817+
assert.Equal(t, "messaging", transactionWithAttribs(attribs).Transaction.Type)
779818
}
780819

781820
func TestSpanTypePriorities(t *testing.T) {
@@ -797,6 +836,10 @@ func TestSpanTypePriorities(t *testing.T) {
797836
attribs["messaging.destination"] = "foobar"
798837
assert.Equal(t, "messaging", spanWithAttribs(attribs).Span.Type)
799838

839+
delete(attribs, "messaging.destination")
840+
attribs["messaging.destination.name"] = "foobar"
841+
assert.Equal(t, "messaging", spanWithAttribs(attribs).Span.Type)
842+
800843
attribs["db.statement"] = "SELECT * FROM FOO"
801844
assert.Equal(t, "db", spanWithAttribs(attribs).Span.Type)
802845
}
@@ -1655,6 +1698,17 @@ func TestServiceTarget(t *testing.T) {
16551698
event := transformSpanWithAttributes(t, input)
16561699
assert.Empty(t, cmp.Diff(expected, event.Service.Target, protocmp.Transform()))
16571700
}
1701+
1702+
setAttr := func(t *testing.T, baseAttr map[string]any, key string, val any) map[string]any {
1703+
t.Helper()
1704+
newAttr := make(map[string]any)
1705+
// Copy from the original map to the target map
1706+
for key, value := range baseAttr {
1707+
newAttr[key] = value
1708+
}
1709+
newAttr[key] = val
1710+
return newAttr
1711+
}
16581712
t.Run("db_spans_with_peerservice_system", func(t *testing.T) {
16591713
test(t, &modelpb.ServiceTarget{
16601714
Type: "postgresql",
@@ -1762,35 +1816,48 @@ func TestServiceTarget(t *testing.T) {
17621816
})
17631817

17641818
t.Run("messaging_spans_with_peerservice_system_destination", func(t *testing.T) {
1765-
test(t, &modelpb.ServiceTarget{
1766-
Name: "myTopic",
1767-
Type: "kafka",
1768-
}, map[string]interface{}{
1769-
"peer.service": "testsvc",
1770-
"messaging.system": "kafka",
1771-
"messaging.destination": "myTopic",
1772-
})
1819+
baseAttr := map[string]any{
1820+
"peer.service": "testsvc",
1821+
"messaging.system": "kafka",
1822+
}
1823+
for _, attr := range []map[string]any{
1824+
setAttr(t, baseAttr, "messaging.destination", "myTopic"),
1825+
setAttr(t, baseAttr, "messaging.destination.name", "myTopic"),
1826+
} {
1827+
test(t, &modelpb.ServiceTarget{
1828+
Name: "myTopic",
1829+
Type: "kafka",
1830+
}, attr)
1831+
}
17731832
})
17741833

17751834
t.Run("messaging_spans_with_peerservice_system_destination_tempdestination", func(t *testing.T) {
1776-
test(t, &modelpb.ServiceTarget{
1777-
Name: "testsvc",
1778-
Type: "kafka",
1779-
}, map[string]interface{}{
1835+
baseAttr := map[string]any{
17801836
"peer.service": "testsvc",
17811837
"messaging.temp_destination": true,
17821838
"messaging.system": "kafka",
1783-
"messaging.destination": "myTopic",
1784-
})
1839+
}
1840+
for _, attr := range []map[string]any{
1841+
setAttr(t, baseAttr, "messaging.destination", "myTopic"),
1842+
setAttr(t, baseAttr, "messaging.destination.name", "myTopic"),
1843+
} {
1844+
test(t, &modelpb.ServiceTarget{
1845+
Name: "testsvc",
1846+
Type: "kafka",
1847+
}, attr)
1848+
}
17851849
})
17861850

17871851
t.Run("messaging_spans_with_destination", func(t *testing.T) {
1788-
test(t, &modelpb.ServiceTarget{
1789-
Name: "myTopic",
1790-
Type: "messaging",
1791-
}, map[string]interface{}{
1792-
"messaging.destination": "myTopic",
1793-
})
1852+
for _, attr := range []map[string]any{
1853+
{"messaging.destination": "myTopic"},
1854+
{"messaging.destination.name": "myTopic"},
1855+
} {
1856+
test(t, &modelpb.ServiceTarget{
1857+
Name: "myTopic",
1858+
Type: "messaging",
1859+
}, attr)
1860+
}
17941861
})
17951862
}
17961863

0 commit comments

Comments
 (0)