Skip to content

Commit 22a92e7

Browse files
committed
Add phase 2,3 metrics
1 parent 168f918 commit 22a92e7

File tree

10 files changed

+827
-87
lines changed

10 files changed

+827
-87
lines changed

extra/redisotel-native/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ const (
1111
MetricGroupConnectionBasic MetricGroup = "connection-basic"
1212
MetricGroupResiliency MetricGroup = "resiliency"
1313
MetricGroupConnectionAdvanced MetricGroup = "connection-advanced"
14+
MetricGroupPubSub MetricGroup = "pubsub"
1415
MetricGroupStream MetricGroup = "stream"
1516
)
1617

extra/redisotel-native/metrics.go

Lines changed: 291 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,19 @@ type metricsRecorder struct {
5050
clientErrors metric.Int64Counter
5151
maintenanceNotifications metric.Int64Counter
5252

53+
connectionWaitTime metric.Float64Histogram
54+
connectionUseTime metric.Float64Histogram
55+
connectionTimeouts metric.Int64Counter
56+
connectionClosed metric.Int64Counter
57+
connectionPendingReqs metric.Int64UpDownCounter
58+
59+
pubsubMessages metric.Int64Counter
60+
61+
streamLag metric.Float64Histogram
62+
63+
// Configuration
64+
cfg *config
65+
5366
// Client configuration for attributes (used for operation metrics only)
5467
serverAddr string
5568
serverPort string
@@ -68,6 +81,11 @@ func (r *metricsRecorder) RecordOperationDuration(
6881
return
6982
}
7083

84+
// Check if command should be included
85+
if r.cfg != nil && !r.cfg.isCommandIncluded(cmd.Name()) {
86+
return
87+
}
88+
7189
// Convert duration to seconds (OTel convention for duration metrics)
7290
durationSeconds := duration.Seconds()
7391

@@ -507,3 +525,276 @@ func (r *metricsRecorder) RecordMaintenanceNotification(
507525
// Record the counter
508526
r.maintenanceNotifications.Add(ctx, 1, metric.WithAttributes(attrs...))
509527
}
528+
529+
// RecordConnectionWaitTime records db.client.connection.wait_time metric
530+
func (r *metricsRecorder) RecordConnectionWaitTime(
531+
ctx context.Context,
532+
duration time.Duration,
533+
cn redis.ConnInfo,
534+
) {
535+
if r.connectionWaitTime == nil {
536+
return
537+
}
538+
539+
// Extract server address and peer address from connection
540+
serverAddr, serverPort := extractServerInfo(cn)
541+
peerAddr, peerPort := serverAddr, serverPort
542+
543+
// Build attributes
544+
attrs := []attribute.KeyValue{
545+
attribute.String("db.system.name", "redis"),
546+
attribute.String("server.address", serverAddr),
547+
getLibraryVersionAttr(),
548+
}
549+
550+
// Add server.port if not default
551+
attrs = addServerPortIfNonDefault(attrs, serverPort)
552+
553+
// Add peer info
554+
if peerAddr != "" {
555+
attrs = append(attrs, attribute.String("network.peer.address", peerAddr))
556+
if peerPort != "" {
557+
attrs = append(attrs, attribute.String("network.peer.port", peerPort))
558+
}
559+
}
560+
561+
// Record the histogram (duration in seconds)
562+
r.connectionWaitTime.Record(ctx, duration.Seconds(), metric.WithAttributes(attrs...))
563+
}
564+
565+
// RecordConnectionUseTime records db.client.connection.use_time metric
566+
func (r *metricsRecorder) RecordConnectionUseTime(
567+
ctx context.Context,
568+
duration time.Duration,
569+
cn redis.ConnInfo,
570+
) {
571+
if r.connectionUseTime == nil {
572+
return
573+
}
574+
575+
// Extract server address and peer address from connection
576+
serverAddr, serverPort := extractServerInfo(cn)
577+
peerAddr, peerPort := serverAddr, serverPort
578+
579+
// Build attributes
580+
attrs := []attribute.KeyValue{
581+
attribute.String("db.system.name", "redis"),
582+
attribute.String("server.address", serverAddr),
583+
getLibraryVersionAttr(),
584+
}
585+
586+
// Add server.port if not default
587+
attrs = addServerPortIfNonDefault(attrs, serverPort)
588+
589+
// Add peer info
590+
if peerAddr != "" {
591+
attrs = append(attrs, attribute.String("network.peer.address", peerAddr))
592+
if peerPort != "" {
593+
attrs = append(attrs, attribute.String("network.peer.port", peerPort))
594+
}
595+
}
596+
597+
// Record the histogram (duration in seconds)
598+
r.connectionUseTime.Record(ctx, duration.Seconds(), metric.WithAttributes(attrs...))
599+
}
600+
601+
// RecordConnectionTimeout records db.client.connection.timeouts metric
602+
func (r *metricsRecorder) RecordConnectionTimeout(
603+
ctx context.Context,
604+
cn redis.ConnInfo,
605+
timeoutType string,
606+
) {
607+
if r.connectionTimeouts == nil {
608+
return
609+
}
610+
611+
// Extract server address and peer address from connection
612+
serverAddr, serverPort := extractServerInfo(cn)
613+
peerAddr, peerPort := serverAddr, serverPort
614+
615+
// Build attributes
616+
attrs := []attribute.KeyValue{
617+
attribute.String("db.system.name", "redis"),
618+
attribute.String("server.address", serverAddr),
619+
attribute.String("redis.client.connection.timeout_type", timeoutType),
620+
getLibraryVersionAttr(),
621+
}
622+
623+
// Add server.port if not default
624+
attrs = addServerPortIfNonDefault(attrs, serverPort)
625+
626+
// Add peer info
627+
if peerAddr != "" {
628+
attrs = append(attrs, attribute.String("network.peer.address", peerAddr))
629+
if peerPort != "" {
630+
attrs = append(attrs, attribute.String("network.peer.port", peerPort))
631+
}
632+
}
633+
634+
// Record the counter
635+
r.connectionTimeouts.Add(ctx, 1, metric.WithAttributes(attrs...))
636+
}
637+
638+
// RecordConnectionClosed records redis.client.connection.closed metric
639+
func (r *metricsRecorder) RecordConnectionClosed(
640+
ctx context.Context,
641+
cn redis.ConnInfo,
642+
reason string,
643+
) {
644+
if r.connectionClosed == nil {
645+
return
646+
}
647+
648+
// Extract server address and peer address from connection
649+
serverAddr, serverPort := extractServerInfo(cn)
650+
peerAddr, peerPort := serverAddr, serverPort
651+
652+
// Build attributes
653+
attrs := []attribute.KeyValue{
654+
attribute.String("db.system.name", "redis"),
655+
attribute.String("server.address", serverAddr),
656+
attribute.String("redis.client.connection.close_reason", reason),
657+
getLibraryVersionAttr(),
658+
}
659+
660+
// Add server.port if not default
661+
attrs = addServerPortIfNonDefault(attrs, serverPort)
662+
663+
// Add peer info
664+
if peerAddr != "" {
665+
attrs = append(attrs, attribute.String("network.peer.address", peerAddr))
666+
if peerPort != "" {
667+
attrs = append(attrs, attribute.String("network.peer.port", peerPort))
668+
}
669+
}
670+
671+
// Record the counter
672+
r.connectionClosed.Add(ctx, 1, metric.WithAttributes(attrs...))
673+
}
674+
675+
// RecordConnectionPendingRequests records db.client.connection.pending_requests metric
676+
func (r *metricsRecorder) RecordConnectionPendingRequests(
677+
ctx context.Context,
678+
delta int,
679+
cn redis.ConnInfo,
680+
) {
681+
if r.connectionPendingReqs == nil {
682+
return
683+
}
684+
685+
// Extract server address and peer address from connection
686+
serverAddr, serverPort := extractServerInfo(cn)
687+
peerAddr, peerPort := serverAddr, serverPort
688+
689+
// Build attributes
690+
attrs := []attribute.KeyValue{
691+
attribute.String("db.system.name", "redis"),
692+
attribute.String("server.address", serverAddr),
693+
getLibraryVersionAttr(),
694+
}
695+
696+
// Add server.port if not default
697+
attrs = addServerPortIfNonDefault(attrs, serverPort)
698+
699+
// Add peer info
700+
if peerAddr != "" {
701+
attrs = append(attrs, attribute.String("network.peer.address", peerAddr))
702+
if peerPort != "" {
703+
attrs = append(attrs, attribute.String("network.peer.port", peerPort))
704+
}
705+
}
706+
707+
// Record the up/down counter
708+
r.connectionPendingReqs.Add(ctx, int64(delta), metric.WithAttributes(attrs...))
709+
}
710+
711+
// RecordPubSubMessage records redis.client.pubsub.messages metric
712+
func (r *metricsRecorder) RecordPubSubMessage(
713+
ctx context.Context,
714+
cn redis.ConnInfo,
715+
direction string,
716+
channel string,
717+
sharded bool,
718+
) {
719+
if r.pubsubMessages == nil {
720+
return
721+
}
722+
723+
// Extract server address and peer address from connection
724+
serverAddr, serverPort := extractServerInfo(cn)
725+
peerAddr, peerPort := serverAddr, serverPort
726+
727+
// Build attributes
728+
attrs := []attribute.KeyValue{
729+
attribute.String("db.system.name", "redis"),
730+
attribute.String("server.address", serverAddr),
731+
attribute.String("redis.client.pubsub.direction", direction), // "sent" or "received"
732+
attribute.Bool("redis.client.pubsub.sharded", sharded),
733+
getLibraryVersionAttr(),
734+
}
735+
736+
// Add channel name if not hidden for cardinality reduction
737+
if !r.cfg.hidePubSubChannelNames && channel != "" {
738+
attrs = append(attrs, attribute.String("redis.client.pubsub.channel", channel))
739+
}
740+
741+
// Add server.port if not default
742+
attrs = addServerPortIfNonDefault(attrs, serverPort)
743+
744+
// Add peer info
745+
if peerAddr != "" {
746+
attrs = append(attrs, attribute.String("network.peer.address", peerAddr))
747+
if peerPort != "" {
748+
attrs = append(attrs, attribute.String("network.peer.port", peerPort))
749+
}
750+
}
751+
752+
// Record the counter
753+
r.pubsubMessages.Add(ctx, 1, metric.WithAttributes(attrs...))
754+
}
755+
756+
// RecordStreamLag records redis.client.stream.lag metric
757+
func (r *metricsRecorder) RecordStreamLag(
758+
ctx context.Context,
759+
lag time.Duration,
760+
cn redis.ConnInfo,
761+
streamName string,
762+
consumerGroup string,
763+
consumerName string,
764+
) {
765+
if r.streamLag == nil {
766+
return
767+
}
768+
769+
// Extract server address and peer address from connection
770+
serverAddr, serverPort := extractServerInfo(cn)
771+
peerAddr, peerPort := serverAddr, serverPort
772+
773+
// Build attributes
774+
attrs := []attribute.KeyValue{
775+
attribute.String("db.system.name", "redis"),
776+
attribute.String("server.address", serverAddr),
777+
attribute.String("redis.client.stream.consumer_group", consumerGroup),
778+
attribute.String("redis.client.stream.consumer_name", consumerName),
779+
getLibraryVersionAttr(),
780+
}
781+
782+
// Add stream name if not hidden for cardinality reduction
783+
if !r.cfg.hideStreamNames && streamName != "" {
784+
attrs = append(attrs, attribute.String("redis.client.stream.name", streamName))
785+
}
786+
787+
// Add server.port if not default
788+
attrs = addServerPortIfNonDefault(attrs, serverPort)
789+
790+
// Add peer info
791+
if peerAddr != "" {
792+
attrs = append(attrs, attribute.String("network.peer.address", peerAddr))
793+
if peerPort != "" {
794+
attrs = append(attrs, attribute.String("network.peer.port", peerPort))
795+
}
796+
}
797+
798+
// Record the histogram (lag in seconds)
799+
r.streamLag.Record(ctx, lag.Seconds(), metric.WithAttributes(attrs...))
800+
}

0 commit comments

Comments
 (0)