diff --git a/cmd/status-cli/util.go b/cmd/status-cli/util.go index e9bfe75a2a3..0e7fb234aaf 100644 --- a/cmd/status-cli/util.go +++ b/cmd/status-cli/util.go @@ -1,7 +1,6 @@ package main import ( - "context" "errors" "fmt" "os" @@ -12,11 +11,11 @@ import ( "github.com/status-im/status-go/api" "github.com/status-im/status-go/logutils" + "github.com/status-im/status-go/metrics/wakumetrics" "github.com/status-im/status-go/multiaccounts" "github.com/status-im/status-go/params" "github.com/status-im/status-go/protocol/requests" "github.com/status-im/status-go/services/wakuv2ext" - "github.com/status-im/status-go/telemetry" "github.com/urfave/cli/v2" ) @@ -74,14 +73,12 @@ func start(p StartParams, logger *zap.SugaredLogger) (*StatusCLI, error) { } if p.TelemetryURL != "" { - telemetryLogger, err := getLogger(true) + waku := backend.StatusNode().WakuV2Service() + telemetryClient, err := wakumetrics.NewClient(wakumetrics.WithPeerID(waku.PeerID().String())) if err != nil { return nil, err } - waku := backend.StatusNode().WakuV2Service() - telemetryClient := telemetry.NewClient(telemetryLogger, p.TelemetryURL, backend.SelectedAccountKeyID(), p.Name, "cli", telemetry.WithPeerID(waku.PeerID().String())) - telemetryClient.Start(context.Background()) - backend.StatusNode().WakuV2Service().SetStatusTelemetryClient(telemetryClient) + backend.StatusNode().WakuV2Service().SetMetricsHandler(telemetryClient) } wakuAPI := wakuv2ext.NewPublicAPI(wakuService) @@ -152,7 +149,8 @@ func createAccountAndLogin(b *api.GethStatusBackend, rootDataDir, password strin HTTPHost: "127.0.0.1", HTTPPort: p.Port, }, - TelemetryServerURL: p.TelemetryURL, + TelemetryServerURL: p.TelemetryURL, + WakuV2EnableMissingMessageVerification: true, } return b.CreateAccountAndLogin(req, params.WithFleet(p.Fleet), diff --git a/metrics/metrics.go b/metrics/metrics.go index 7a4e4b9c194..052b0ac8e37 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -48,9 +48,13 @@ func healthHandler() http.Handler { func Handler(reg metrics.Registry) http.Handler { // we disable compression because geth doesn't support it opts := promhttp.HandlerOpts{DisableCompression: true} - // we are combining handlers to avoid having 2 endpoints - statusMetrics := promhttp.HandlerFor(prom.DefaultGatherer, opts) // our metrics - gethMetrics := gethprom.Handler(reg) // geth metrics + // we are using only our own metrics + statusMetrics := promhttp.HandlerFor(prom.DefaultGatherer, opts) + if reg == nil { + return statusMetrics + } + // if registry is provided, combine handlers + gethMetrics := gethprom.Handler(reg) return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { statusMetrics.ServeHTTP(w, r) gethMetrics.ServeHTTP(w, r) @@ -62,3 +66,11 @@ func (p *Server) Listen() { defer common.LogOnPanic() logutils.ZapLogger().Info("metrics server stopped", zap.Error(p.server.ListenAndServe())) } + +// Stop gracefully shuts down the metrics server +func (p *Server) Stop() error { + if p.server != nil { + return p.server.Close() + } + return nil +} diff --git a/metrics/wakumetrics/client.go b/metrics/wakumetrics/client.go new file mode 100644 index 00000000000..09f1b4b4688 --- /dev/null +++ b/metrics/wakumetrics/client.go @@ -0,0 +1,182 @@ +package wakumetrics + +import ( + "fmt" + "strconv" + + "github.com/status-im/status-go/eth-node/types" + "github.com/status-im/status-go/protocol/transport" + "github.com/status-im/status-go/wakuv2" + + v1protocol "github.com/status-im/status-go/protocol/v1" + v2common "github.com/status-im/status-go/wakuv2/common" + wps "github.com/waku-org/go-waku/waku/v2/peerstore" + v2protocol "github.com/waku-org/go-waku/waku/v2/protocol" +) + +type ReceivedMessages struct { + Filter transport.Filter + SSHMessage *types.Message + Messages []*v1protocol.StatusMessage +} + +type Client struct { + peerId string + deviceType string + version string + lastPeerConnFailures map[string]int +} + +type TelemetryClientOption func(*Client) + +func WithPeerID(peerId string) TelemetryClientOption { + return func(c *Client) { + c.peerId = peerId + } +} + +func WithDeviceType(deviceType string) TelemetryClientOption { + return func(c *Client) { + c.deviceType = deviceType + } +} + +func WithVersion(version string) TelemetryClientOption { + return func(c *Client) { + c.version = version + } +} + +func NewClient(opts ...TelemetryClientOption) (*Client, error) { + client := &Client{ + lastPeerConnFailures: make(map[string]int), + } + + for _, opt := range opts { + opt(client) + } + + return client, nil +} + +// RegisterWithRegistry registers all metrics with the provided registry +func (c *Client) RegisterWithRegistry() error { + if err := RegisterMetrics(); err != nil { + return fmt.Errorf("failed to register metrics: %v", err) + } + return nil +} + +func (c *Client) SetDeviceType(deviceType string) { + c.deviceType = deviceType +} + +func (c *Client) PushReceivedMessages(receivedMessages ReceivedMessages) { + MessagesReceivedTotal.WithLabelValues( + receivedMessages.Filter.PubsubTopic, + receivedMessages.Filter.ContentTopic.String(), + receivedMessages.Filter.ChatID, + ).Observe(float64(len(receivedMessages.Messages))) +} + +func (c *Client) PushSentEnvelope(sentEnvelope wakuv2.SentEnvelope) { + EnvelopeSentTotal.WithLabelValues( + sentEnvelope.Envelope.PubsubTopic(), + sentEnvelope.Envelope.Message().ContentTopic, + sentEnvelope.PublishMethod.String(), + ).Observe(1) +} + +func (c *Client) PushErrorSendingEnvelope(errorSendingEnvelope wakuv2.ErrorSendingEnvelope) { + EnvelopeSentErrors.WithLabelValues( + errorSendingEnvelope.SentEnvelope.Envelope.PubsubTopic(), + errorSendingEnvelope.SentEnvelope.Envelope.Message().ContentTopic, + ).Inc() +} + +func (c *Client) PushPeerCount(peerCount int) { + ConnectedPeers.Set(float64(peerCount)) +} + +func (c *Client) PushPeerConnFailures(peerConnFailures map[string]int) { + for peerID, failures := range peerConnFailures { + if lastFailures, exists := c.lastPeerConnFailures[peerID]; exists { + if failures == lastFailures { + continue + } + } + c.lastPeerConnFailures[peerID] = failures + PeerConnectionFailures.Add(float64(failures)) + } +} + +func (c *Client) PushMessageCheckSuccess() { + StoreQuerySuccesses.Observe(1) +} + +func (c *Client) PushMessageCheckFailure() { + StoreQueryFailures.Observe(1) +} + +func (c *Client) PushPeerCountByShard(peerCountByShard map[uint16]uint) { + for shard, count := range peerCountByShard { + PeersByShard.WithLabelValues(strconv.FormatUint(uint64(shard), 10)).Set(float64(count)) + } +} + +func (c *Client) PushPeerCountByOrigin(peerCountByOrigin map[wps.Origin]uint) { + for origin, count := range peerCountByOrigin { + PeersByOrigin.WithLabelValues(getOriginString(origin)).Set(float64(count)) + } +} + +func (c *Client) PushDialFailure(dialFailure v2common.DialError) { + PeerDialFailures.WithLabelValues( + dialFailure.ErrType.String(), + dialFailure.Protocols, + ).Inc() +} + +func (c *Client) PushMissedMessage(envelope *v2protocol.Envelope) { + MissedMessages.WithLabelValues( + envelope.PubsubTopic(), + envelope.Message().ContentTopic, + ).Inc() +} + +func (c *Client) PushMissedRelevantMessage(receivedMessage *v2common.ReceivedMessage) { + MissedMessages.WithLabelValues( + receivedMessage.PubsubTopic, + receivedMessage.ContentTopic.String(), + ).Inc() +} + +func (c *Client) PushMessageDeliveryConfirmed() { + MessageDeliveryConfirmations.Inc() +} + +func (c *Client) PushSentMessageTotal(messageSize uint32, publishMethod string) { + WakuMessagesSizeBytes.WithLabelValues(publishMethod).Observe(float64(messageSize)) + MessagesSentTotal.WithLabelValues(publishMethod).Observe(1) +} + +func getOriginString(origin wps.Origin) string { + switch origin { + case wps.Unknown: + return "unknown" + case wps.Discv5: + return "discv5" + case wps.Static: + return "static" + case wps.PeerExchange: + return "peer_exchange" + case wps.DNSDiscovery: + return "dns_discovery" + case wps.Rendezvous: + return "rendezvous" + case wps.PeerManager: + return "peer_manager" + default: + return "unknown" + } +} diff --git a/metrics/wakumetrics/client_test.go b/metrics/wakumetrics/client_test.go new file mode 100644 index 00000000000..735637ba43a --- /dev/null +++ b/metrics/wakumetrics/client_test.go @@ -0,0 +1,181 @@ +package wakumetrics + +import ( + "errors" + "strconv" + "testing" + + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/require" + + "github.com/status-im/status-go/eth-node/types" + "github.com/status-im/status-go/protocol/transport" + "github.com/status-im/status-go/wakuv2" + "github.com/waku-org/go-waku/waku/v2/api/publish" + wps "github.com/waku-org/go-waku/waku/v2/peerstore" + v2protocol "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/pb" +) + +var ( + errTest = errors.New("test error") +) + +func init() { + // Register metrics with test registry + err := RegisterMetrics() + if err != nil { + panic("Failed to register metrics for testing: " + err.Error()) + } +} + +func createTestClient(t *testing.T) *Client { + client, err := NewClient(WithPeerID("test-key"), WithDeviceType("test-node"), WithVersion("test-version")) + require.NoError(t, err) + return client +} + +func createTestMessage(pubsubTopic string, contentTopic types.TopicType, payload []byte) *types.Message { + return &types.Message{ + PubsubTopic: pubsubTopic, + Topic: contentTopic, + Payload: payload, + } +} + +func getCounterValue(metric *prometheus.CounterVec, labels ...string) float64 { + m := metric.WithLabelValues(labels...) + pb := &dto.Metric{} + err := m.(prometheus.Metric).Write(pb) + if err != nil { + return 0 + } + return pb.Counter.GetValue() +} + +func getGaugeValue(metric prometheus.Gauge) float64 { + pb := &dto.Metric{} + err := metric.(prometheus.Metric).Write(pb) + if err != nil { + return 0 + } + return pb.Gauge.GetValue() +} + +func getGaugeVecValue(metric *prometheus.GaugeVec, labels ...string) float64 { + m := metric.WithLabelValues(labels...) + pb := &dto.Metric{} + err := m.(prometheus.Metric).Write(pb) + if err != nil { + return 0 + } + return pb.Gauge.GetValue() +} + +func getHistogramValue(metric *prometheus.HistogramVec, labels ...string) float64 { + m := metric.WithLabelValues(labels...) + pb := &dto.Metric{} + err := m.(prometheus.Metric).Write(pb) + if err != nil { + return 0 + } + return pb.Histogram.GetSampleSum() +} + +func TestClient_PushReceivedMessages(t *testing.T) { + client := createTestClient(t) + + filter := transport.Filter{ + PubsubTopic: "test-pubsub", + ContentTopic: types.StringToTopic("test-content"), + ChatID: "test-chat", + } + + sshMessage := createTestMessage("test-pubsub", types.StringToTopic("test-content"), []byte("test-payload")) + + receivedMessages := ReceivedMessages{ + Filter: filter, + SSHMessage: sshMessage, + } + + client.PushReceivedMessages(receivedMessages) + + // Verify MessagesReceivedTotal metric + value := getHistogramValue(MessagesReceivedTotal, + filter.PubsubTopic, + filter.ContentTopic.String(), + filter.ChatID, + ) + require.Equal(t, float64(1), value) +} + +func TestClient_PushPeerCount(t *testing.T) { + client := createTestClient(t) + + client.PushPeerCount(5) + value := getGaugeValue(ConnectedPeers) + require.Equal(t, float64(5), value) +} + +func TestClient_PushPeerCountByOrigin(t *testing.T) { + client := createTestClient(t) + + peerCountByOrigin := map[wps.Origin]uint{ + wps.Discv5: 5, + wps.Static: 3, + wps.PeerExchange: 2, + } + + client.PushPeerCountByOrigin(peerCountByOrigin) + + // Verify metrics for each origin + for origin, expectedCount := range peerCountByOrigin { + value := getGaugeVecValue(PeersByOrigin, getOriginString(origin)) + require.Equal(t, float64(expectedCount), value) + } +} + +func TestClient_PushPeerCountByShard(t *testing.T) { + client := createTestClient(t) + + peerCountByShard := map[uint16]uint{ + 1: 5, + 2: 3, + 3: 2, + } + + client.PushPeerCountByShard(peerCountByShard) + + // Verify metrics for each shard + for shard, expectedCount := range peerCountByShard { + value := getGaugeVecValue(PeersByShard, strconv.FormatUint(uint64(shard), 10)) + require.Equal(t, float64(expectedCount), value) + } +} + +func TestClient_PushErrorSendingEnvelope(t *testing.T) { + client := createTestClient(t) + + msg := &pb.WakuMessage{ + Payload: []byte("test-payload"), + ContentTopic: "test-content", + } + envelope := v2protocol.NewEnvelope(msg, 0, "") + + errorSendingEnvelope := wakuv2.ErrorSendingEnvelope{ + SentEnvelope: wakuv2.SentEnvelope{ + Envelope: envelope, + PublishMethod: publish.LightPush, + }, + Error: errTest, + } + + client.PushErrorSendingEnvelope(errorSendingEnvelope) + + value := getCounterValue(EnvelopeSentErrors, + envelope.PubsubTopic(), + envelope.Message().ContentTopic, + ) + require.Equal(t, float64(1), value) +} diff --git a/metrics/wakumetrics/metrics.go b/metrics/wakumetrics/metrics.go new file mode 100644 index 00000000000..e0aac4a2444 --- /dev/null +++ b/metrics/wakumetrics/metrics.go @@ -0,0 +1,141 @@ +package wakumetrics + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +var ( + MessagesSentTotal = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "waku_telem_messages_sent_total", + Help: "Frequency of Waku messages sent by this node", + }, + []string{"publish_method"}, + ) + + EnvelopeSentTotal = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "waku_telem_envelope_sent_total", + Help: "Total number of envelopes sent by this node", + }, + []string{"pubsub_topic", "content_topic", "publish_method"}, + ) + + MessagesReceivedTotal = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "waku_telem_messages_received_total", + Help: "Frequency of Status messages received", + }, + []string{"pubsub_topic", "content_topic", "chat_id"}, + ) + + WakuMessagesSizeBytes = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "waku_telem_message_size_bytes", + Help: "Size of each Waku message in bytes sent by this node", + }, + []string{"publish_method"}, + ) + + EnvelopeSentErrors = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "waku_telem_envelope_sent_errors_total", + Help: "Frequency of errors occurred when sending an envelope", + }, + []string{"pubsub_topic", "content_topic"}, + ) + + MessageDeliveryConfirmations = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "waku_telem_message_delivery_confirmations_total", + Help: "Frequency of message delivery confirmations", + }, + ) + + ConnectedPeers = prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: "waku_telem_connected_peers", + Help: "Current number of peers connected", + }, + ) + + PeersByOrigin = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "waku_telem_peers_by_origin", + Help: "Number of peers by discovery origin", + }, + []string{"origin"}, + ) + + PeersByShard = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "waku_telem_peers_by_shard", + Help: "Number of peers by shard", + }, + []string{"shard"}, + ) + + PeerConnectionFailures = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "waku_telem_peer_connection_failures_total", + Help: "Total number of peer connection failures", + }, + ) + + PeerDialFailures = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "waku_telem_peer_dial_failures_total", + Help: "Total number of peer dial failures by error type", + }, + []string{"error_type", "protocols"}, + ) + + StoreQuerySuccesses = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Name: "waku_telem_store_query_successes_total", + Help: "Frequency of successful store confirmation queries", + }, + ) + + StoreQueryFailures = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Name: "waku_telem_store_query_failures_total", + Help: "Frequency of failed store confirmation queries", + }, + ) + + MissedMessages = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "waku_telem_missed_messages_total", + Help: "Frequency of missed messages detected by store query", + }, + []string{"pubsub_topic", "content_topic"}, + ) +) + +// RegisterMetrics registers all metrics with the provided registry +func RegisterMetrics() error { + collectors := []prometheus.Collector{ + MessagesSentTotal, + MessagesReceivedTotal, + WakuMessagesSizeBytes, + EnvelopeSentErrors, + MessageDeliveryConfirmations, + ConnectedPeers, + PeersByOrigin, + PeersByShard, + PeerConnectionFailures, + PeerDialFailures, + StoreQuerySuccesses, + StoreQueryFailures, + MissedMessages, + } + + for _, collector := range collectors { + if err := prometheus.Register(collector); err != nil { + return err + } + } + + return nil +} diff --git a/mobile/status.go b/mobile/status.go index 1587f435339..e1fa68dbcaf 100644 --- a/mobile/status.go +++ b/mobile/status.go @@ -32,6 +32,7 @@ import ( "github.com/status-im/status-go/images" "github.com/status-im/status-go/logutils" "github.com/status-im/status-go/logutils/requestlog" + "github.com/status-im/status-go/metrics" "github.com/status-im/status-go/mobile/callog" m_requests "github.com/status-im/status-go/mobile/requests" "github.com/status-im/status-go/multiaccounts" @@ -55,6 +56,10 @@ import ( "github.com/status-im/status-go/signal" ) +var ( + metricsServer *metrics.Server +) + func call(fn any, params ...any) any { return callog.Call(logutils.ZapLogger(), requestlog.GetRequestLogger(), fn, params...) } @@ -180,6 +185,22 @@ func initializeLogging(request *requests.InitializeApplication) error { } } + if request.WakuMetricsEnabled { + // Start metrics server + if metricsServer != nil { + err := metricsServer.Stop() + if err != nil { + logutils.ZapLogger().Error("failed to stop running metrics server", zap.Error(err)) + } + } + if request.WakuMetricsPort == 0 { + request.WakuMetricsPort = 9305 + } + metricsServer = metrics.NewMetricsServer(request.WakuMetricsPort, nil) + go metricsServer.Listen() + logutils.ZapLogger().Info("waku metrics prometheus server started", zap.Int("port", request.WakuMetricsPort)) + } + return nil } diff --git a/protocol/messenger.go b/protocol/messenger.go index 77f5f2c025d..8fd0381362a 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -35,6 +35,7 @@ import ( "github.com/status-im/status-go/eth-node/crypto" "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/images" + "github.com/status-im/status-go/metrics/wakumetrics" multiaccountscommon "github.com/status-im/status-go/multiaccounts/common" "github.com/status-im/status-go/multiaccounts" @@ -69,7 +70,6 @@ import ( "github.com/status-im/status-go/services/wallet/community" "github.com/status-im/status-go/services/wallet/token" "github.com/status-im/status-go/signal" - "github.com/status-im/status-go/telemetry" ) const ( @@ -160,7 +160,7 @@ type Messenger struct { } connectionState connection.State - telemetryClient *telemetry.Client + wakuMetricsHandler *wakumetrics.Client contractMaker *contracts.ContractMaker verificationDatabase *verification.Persistence savedAddressesManager *wallet.SavedAddressesManager @@ -521,20 +521,25 @@ func NewMessenger( return nil, fmt.Errorf("failed to build contact of ourself: %w", err) } - ctx, cancel := context.WithCancel(context.Background()) - - var telemetryClient *telemetry.Client + var wakuMetricsHandler *wakumetrics.Client if c.telemetryServerURL != "" { - options := []telemetry.TelemetryClientOption{ - telemetry.WithPeerID(peerId.String()), + options := []wakumetrics.TelemetryClientOption{ + wakumetrics.WithPeerID(peerId.String()), + } + wakuMetricsHandler, err = wakumetrics.NewClient(options...) + if err != nil { + return nil, err } - telemetryClient = telemetry.NewClient(logger, c.telemetryServerURL, c.account.KeyUID, nodeName, version, options...) if c.wakuService != nil { - c.wakuService.SetStatusTelemetryClient(telemetryClient) + c.wakuService.SetMetricsHandler(wakuMetricsHandler) + } + err = wakuMetricsHandler.RegisterWithRegistry() + if err != nil { + return nil, err } - telemetryClient.Start(ctx) } + ctx, cancel := context.WithCancel(context.Background()) messenger = &Messenger{ config: &c, node: node, @@ -545,7 +550,7 @@ func NewMessenger( sender: sender, anonMetricsClient: anonMetricsClient, anonMetricsServer: anonMetricsServer, - telemetryClient: telemetryClient, + wakuMetricsHandler: wakuMetricsHandler, communityTokensService: c.communityTokensService, pushNotificationClient: pushNotificationClient, pushNotificationServer: pushNotificationServer, @@ -3358,9 +3363,6 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte handleMessagesResponse, err := m.sender.HandleMessages(shhMessage) if err != nil { - if m.telemetryClient != nil { - go m.telemetryClient.UpdateEnvelopeProcessingError(shhMessage, err) - } logger.Info("failed to decode messages", zap.Error(err)) continue } @@ -3371,8 +3373,8 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte statusMessages := handleMessagesResponse.StatusMessages - if m.telemetryClient != nil { - m.telemetryClient.PushReceivedMessages(m.ctx, telemetry.ReceivedMessages{ + if m.wakuMetricsHandler != nil { + m.wakuMetricsHandler.PushReceivedMessages(wakumetrics.ReceivedMessages{ Filter: filter, SSHMessage: shhMessage, Messages: statusMessages, diff --git a/protocol/messenger_pairing_and_syncing.go b/protocol/messenger_pairing_and_syncing.go index 3995168c67d..cf84650d73a 100644 --- a/protocol/messenger_pairing_and_syncing.go +++ b/protocol/messenger_pairing_and_syncing.go @@ -465,10 +465,10 @@ func (m *Messenger) InitInstallations() error { return err } - if m.telemetryClient != nil { + if m.wakuMetricsHandler != nil { installation, ok := m.allInstallations.Load(m.installationID) if ok { - m.telemetryClient.SetDeviceType(installation.InstallationMetadata.DeviceType) + m.wakuMetricsHandler.SetDeviceType(installation.InstallationMetadata.DeviceType) } } diff --git a/protocol/requests/initialize_application.go b/protocol/requests/initialize_application.go index e2165db28f7..232398b8f2b 100644 --- a/protocol/requests/initialize_application.go +++ b/protocol/requests/initialize_application.go @@ -23,6 +23,9 @@ type InitializeApplication struct { LogEnabled bool `json:"logEnabled"` LogLevel string `json:"logLevel"` APILoggingEnabled bool `json:"apiLoggingEnabled"` + + WakuMetricsEnabled bool `json:"wakuMetricsEnabled"` + WakuMetricsPort int `json:"wakuMetricsServerPort"` } func (i *InitializeApplication) Validate() error { diff --git a/telemetry/client.go b/telemetry/client.go deleted file mode 100644 index 5ded3e7b748..00000000000 --- a/telemetry/client.go +++ /dev/null @@ -1,627 +0,0 @@ -package telemetry - -import ( - "bytes" - "context" - "encoding/json" - "fmt" - "net/http" - "strings" - "sync" - "time" - - "go.uber.org/zap" - - "github.com/status-im/status-go/common" - "github.com/status-im/status-go/eth-node/types" - "github.com/status-im/status-go/protocol/transport" - "github.com/status-im/status-go/wakuv2" - - wps "github.com/waku-org/go-waku/waku/v2/peerstore" - - v2protocol "github.com/waku-org/go-waku/waku/v2/protocol" - - v1protocol "github.com/status-im/status-go/protocol/v1" - v2common "github.com/status-im/status-go/wakuv2/common" -) - -type TelemetryType string - -const ( - // Bandwidth as reported by libp2p - ProtocolStatsMetric TelemetryType = "ProtocolStats" - // Envelopes sent by this node - SentEnvelopeMetric TelemetryType = "SentEnvelope" - // Change in status of a sent envelope (usually processing errors) - UpdateEnvelopeMetric TelemetryType = "UpdateEnvelope" - // Messages received by this node - ReceivedMessagesMetric TelemetryType = "ReceivedMessages" - // Errors encountered when sending envelopes - ErrorSendingEnvelopeMetric TelemetryType = "ErrorSendingEnvelope" - // Total connections for this node at a given time - PeerCountMetric TelemetryType = "PeerCount" - // Number of failed peer connections for this node at a given time - PeerConnFailuresMetric TelemetryType = "PeerConnFailure" - // Store confirmation for a sent message successful - MessageCheckSuccessMetric TelemetryType = "MessageCheckSuccess" - // Store confirmation for a sent message failed - MessageCheckFailureMetric TelemetryType = "MessageCheckFailure" - // Total connections for this node per shard at a given time - PeerCountByShardMetric TelemetryType = "PeerCountByShard" - // Total connections for this node per discovery origin at a given time - PeerCountByOriginMetric TelemetryType = "PeerCountByOrigin" - // Error encountered when attempting to dial a peer - DialFailureMetric TelemetryType = "DialFailure" - // Missed message as detected by periodic store query - MissedMessageMetric TelemetryType = "MissedMessages" - // Missed message with a relevant filter - MissedRelevantMessageMetric TelemetryType = "MissedRelevantMessages" - // MVDS ack received for a sent message - MessageDeliveryConfirmedMetric TelemetryType = "MessageDeliveryConfirmed" - // Total number and size of Waku messages sent by this node - SentMessageTotalMetric TelemetryType = "SentMessageTotal" -) - -const MaxRetryCache = 5000 - -type TelemetryRequest struct { - Id int `json:"id"` - TelemetryType TelemetryType `json:"telemetry_type"` - TelemetryData *json.RawMessage `json:"telemetry_data"` -} - -func (c *Client) PushReceivedMessages(ctx context.Context, receivedMessages ReceivedMessages) { - c.processAndPushTelemetry(ctx, receivedMessages) -} - -func (c *Client) PushSentEnvelope(ctx context.Context, sentEnvelope wakuv2.SentEnvelope) { - c.processAndPushTelemetry(ctx, sentEnvelope) -} - -func (c *Client) PushReceivedEnvelope(ctx context.Context, receivedEnvelope *v2protocol.Envelope) { - c.processAndPushTelemetry(ctx, receivedEnvelope) -} - -func (c *Client) PushErrorSendingEnvelope(ctx context.Context, errorSendingEnvelope wakuv2.ErrorSendingEnvelope) { - c.processAndPushTelemetry(ctx, errorSendingEnvelope) -} - -func (c *Client) PushPeerCount(ctx context.Context, peerCount int) { - now := time.Now() - if peerCount != c.lastPeerCount && now.Sub(c.lastPeerCountTime) > 1*time.Second { - c.lastPeerCount = peerCount - c.lastPeerCountTime = now - c.processAndPushTelemetry(ctx, PeerCount{PeerCount: peerCount}) - } -} - -func (c *Client) PushPeerConnFailures(ctx context.Context, peerConnFailures map[string]int) { - for peerID, failures := range peerConnFailures { - if lastFailures, exists := c.lastPeerConnFailures[peerID]; exists { - if failures == lastFailures { - continue - } - } - c.lastPeerConnFailures[peerID] = failures - c.processAndPushTelemetry(ctx, PeerConnFailure{FailedPeerId: peerID, FailureCount: failures}) - } -} - -func (c *Client) PushMessageCheckSuccess(ctx context.Context, messageHash string) { - c.processAndPushTelemetry(ctx, MessageCheckSuccess{MessageHash: messageHash}) -} - -func (c *Client) PushMessageCheckFailure(ctx context.Context, messageHash string) { - c.processAndPushTelemetry(ctx, MessageCheckFailure{MessageHash: messageHash}) -} - -func (c *Client) PushPeerCountByShard(ctx context.Context, peerCountByShard map[uint16]uint) { - for shard, count := range peerCountByShard { - c.processAndPushTelemetry(ctx, PeerCountByShard{Shard: shard, Count: count}) - } -} - -func (c *Client) PushPeerCountByOrigin(ctx context.Context, peerCountByOrigin map[wps.Origin]uint) { - for origin, count := range peerCountByOrigin { - c.processAndPushTelemetry(ctx, PeerCountByOrigin{Origin: origin, Count: count}) - } -} - -func (c *Client) PushDialFailure(ctx context.Context, dialFailure v2common.DialError) { - var errorMessage string = "" - if dialFailure.ErrType == v2common.ErrorUnknown { - errorMessage = dialFailure.ErrMsg - } - c.processAndPushTelemetry(ctx, DialFailure{ErrorType: dialFailure.ErrType, ErrorMsg: errorMessage, Protocols: dialFailure.Protocols}) -} - -func (c *Client) PushMissedMessage(ctx context.Context, envelope *v2protocol.Envelope) { - c.processAndPushTelemetry(ctx, MissedMessage{Envelope: envelope}) -} - -func (c *Client) PushMissedRelevantMessage(ctx context.Context, receivedMessage *v2common.ReceivedMessage) { - c.processAndPushTelemetry(ctx, MissedRelevantMessage{ReceivedMessage: receivedMessage}) -} - -func (c *Client) PushMessageDeliveryConfirmed(ctx context.Context, messageHash string) { - c.processAndPushTelemetry(ctx, MessageDeliveryConfirmed{MessageHash: messageHash}) -} - -func (c *Client) PushSentMessageTotal(ctx context.Context, messageSize uint32) { - c.processAndPushTelemetry(ctx, SentMessageTotal{Size: messageSize}) -} - -type ReceivedMessages struct { - Filter transport.Filter - SSHMessage *types.Message - Messages []*v1protocol.StatusMessage -} - -type PeerCount struct { - PeerCount int -} - -type PeerConnFailure struct { - FailedPeerId string - FailureCount int -} - -type MessageCheckSuccess struct { - MessageHash string -} - -type MessageCheckFailure struct { - MessageHash string -} - -type PeerCountByShard struct { - Shard uint16 - Count uint -} - -type PeerCountByOrigin struct { - Origin wps.Origin - Count uint -} - -type DialFailure struct { - ErrorType v2common.DialErrorType - ErrorMsg string - Protocols string -} - -type MissedMessage struct { - Envelope *v2protocol.Envelope -} - -type MissedRelevantMessage struct { - ReceivedMessage *v2common.ReceivedMessage -} - -type MessageDeliveryConfirmed struct { - MessageHash string -} - -type SentMessageTotal struct { - Size uint32 -} - -type Client struct { - serverURL string - httpClient *http.Client - logger *zap.Logger - keyUID string - nodeName string - peerId string - version string - telemetryCh chan TelemetryRequest - telemetryCacheLock sync.Mutex - telemetryCache []TelemetryRequest - telemetryRetryCache []TelemetryRequest - nextIdLock sync.Mutex - nextId int - sendPeriod time.Duration - lastPeerCount int - lastPeerCountTime time.Time - lastPeerConnFailures map[string]int - deviceType string -} - -type TelemetryClientOption func(*Client) - -func WithSendPeriod(sendPeriod time.Duration) TelemetryClientOption { - return func(c *Client) { - c.sendPeriod = sendPeriod - } -} - -func WithPeerID(peerId string) TelemetryClientOption { - return func(c *Client) { - c.peerId = peerId - } -} - -func NewClient(logger *zap.Logger, serverURL string, keyUID string, nodeName string, version string, opts ...TelemetryClientOption) *Client { - serverURL = strings.TrimRight(serverURL, "/") - client := &Client{ - serverURL: serverURL, - httpClient: &http.Client{Timeout: time.Minute}, - logger: logger, - keyUID: keyUID, - nodeName: nodeName, - version: version, - telemetryCh: make(chan TelemetryRequest), - telemetryCacheLock: sync.Mutex{}, - telemetryCache: make([]TelemetryRequest, 0), - telemetryRetryCache: make([]TelemetryRequest, 0), - nextId: 0, - nextIdLock: sync.Mutex{}, - sendPeriod: 10 * time.Second, // default value - lastPeerCount: 0, - lastPeerCountTime: time.Time{}, - lastPeerConnFailures: make(map[string]int), - } - - for _, opt := range opts { - opt(client) - } - - return client -} - -func (c *Client) SetDeviceType(deviceType string) { - c.deviceType = deviceType -} - -func (c *Client) Start(ctx context.Context) { - go func() { - defer common.LogOnPanic() - for { - select { - case telemetryRequest := <-c.telemetryCh: - c.telemetryCacheLock.Lock() - c.telemetryCache = append(c.telemetryCache, telemetryRequest) - c.telemetryCacheLock.Unlock() - case <-ctx.Done(): - return - } - } - }() - go func() { - defer common.LogOnPanic() - sendPeriod := c.sendPeriod - timer := time.NewTimer(sendPeriod) - defer timer.Stop() - - for { - select { - case <-timer.C: - c.telemetryCacheLock.Lock() - telemetryRequests := make([]TelemetryRequest, len(c.telemetryCache)) - copy(telemetryRequests, c.telemetryCache) - c.telemetryCache = nil - c.telemetryCacheLock.Unlock() - - if len(telemetryRequests) > 0 { - err := c.pushTelemetryRequest(telemetryRequests) - if err != nil { - if sendPeriod < 60*time.Second { //Stop the growing if the timer is > 60s to at least retry every minute - sendPeriod = sendPeriod * 2 - } - } else { - sendPeriod = c.sendPeriod - } - } - timer.Reset(sendPeriod) - case <-ctx.Done(): - return - } - } - - }() -} - -func (c *Client) processAndPushTelemetry(ctx context.Context, data interface{}) { - var telemetryRequest TelemetryRequest - switch v := data.(type) { - case ReceivedMessages: - telemetryRequest = TelemetryRequest{ - Id: c.nextId, - TelemetryType: ReceivedMessagesMetric, - TelemetryData: c.ProcessReceivedMessages(v), - } - case wakuv2.SentEnvelope: - telemetryRequest = TelemetryRequest{ - Id: c.nextId, - TelemetryType: SentEnvelopeMetric, - TelemetryData: c.ProcessSentEnvelope(v), - } - case wakuv2.ErrorSendingEnvelope: - telemetryRequest = TelemetryRequest{ - Id: c.nextId, - TelemetryType: ErrorSendingEnvelopeMetric, - TelemetryData: c.ProcessErrorSendingEnvelope(v), - } - case PeerCount: - telemetryRequest = TelemetryRequest{ - Id: c.nextId, - TelemetryType: PeerCountMetric, - TelemetryData: c.ProcessPeerCount(v), - } - case PeerConnFailure: - telemetryRequest = TelemetryRequest{ - Id: c.nextId, - TelemetryType: PeerConnFailuresMetric, - TelemetryData: c.ProcessPeerConnFailure(v), - } - case MessageCheckSuccess: - telemetryRequest = TelemetryRequest{ - Id: c.nextId, - TelemetryType: MessageCheckSuccessMetric, - TelemetryData: c.ProcessMessageCheckSuccess(v), - } - case MessageCheckFailure: - telemetryRequest = TelemetryRequest{ - Id: c.nextId, - TelemetryType: MessageCheckFailureMetric, - TelemetryData: c.ProcessMessageCheckFailure(v), - } - case PeerCountByShard: - telemetryRequest = TelemetryRequest{ - Id: c.nextId, - TelemetryType: PeerCountByShardMetric, - TelemetryData: c.ProcessPeerCountByShard(v), - } - case PeerCountByOrigin: - telemetryRequest = TelemetryRequest{ - Id: c.nextId, - TelemetryType: PeerCountByOriginMetric, - TelemetryData: c.ProcessPeerCountByOrigin(v), - } - case DialFailure: - telemetryRequest = TelemetryRequest{ - Id: c.nextId, - TelemetryType: DialFailureMetric, - TelemetryData: c.ProcessDialFailure(v), - } - case MissedMessage: - telemetryRequest = TelemetryRequest{ - Id: c.nextId, - TelemetryType: MissedMessageMetric, - TelemetryData: c.ProcessMissedMessage(v), - } - case MissedRelevantMessage: - telemetryRequest = TelemetryRequest{ - Id: c.nextId, - TelemetryType: MissedRelevantMessageMetric, - TelemetryData: c.ProcessMissedRelevantMessage(v), - } - case MessageDeliveryConfirmed: - telemetryRequest = TelemetryRequest{ - Id: c.nextId, - TelemetryType: MessageDeliveryConfirmedMetric, - TelemetryData: c.ProcessMessageDeliveryConfirmed(v), - } - case SentMessageTotal: - telemetryRequest = TelemetryRequest{ - Id: c.nextId, - TelemetryType: SentMessageTotalMetric, - TelemetryData: c.ProcessSentMessageTotal(v), - } - default: - c.logger.Error("Unknown telemetry data type") - return - } - - select { - case <-ctx.Done(): - return - case c.telemetryCh <- telemetryRequest: - } - - c.nextIdLock.Lock() - c.nextId++ - c.nextIdLock.Unlock() -} - -// This is assuming to not run concurrently as we are not locking the `telemetryRetryCache` -func (c *Client) pushTelemetryRequest(request []TelemetryRequest) error { - if len(c.telemetryRetryCache) > MaxRetryCache { //Limit the size of the cache to not grow the slice indefinitely in case the Telemetry server is gone for longer time - removeNum := len(c.telemetryRetryCache) - MaxRetryCache - c.telemetryRetryCache = c.telemetryRetryCache[removeNum:] - } - c.telemetryRetryCache = append(c.telemetryRetryCache, request...) - - url := fmt.Sprintf("%s/record-metrics", c.serverURL) - body, err := json.Marshal(c.telemetryRetryCache) - if err != nil { - c.logger.Error("Error marshaling telemetry data", zap.Error(err)) - return err - } - res, err := c.httpClient.Post(url, "application/json", bytes.NewBuffer(body)) - if err != nil { - c.logger.Error("Error sending telemetry data", zap.Error(err)) - return err - } - defer res.Body.Close() - var responseBody []map[string]interface{} - if err := json.NewDecoder(res.Body).Decode(&responseBody); err != nil { - c.logger.Error("Error decoding response body", zap.Error(err)) - return err - } - if res.StatusCode != http.StatusCreated { - c.logger.Error("Error sending telemetry data", zap.Int("statusCode", res.StatusCode), zap.Any("responseBody", responseBody)) - return fmt.Errorf("status code %d, response body: %v", res.StatusCode, responseBody) - } - - c.telemetryRetryCache = nil - return nil -} - -func (c *Client) commonPostBody() map[string]interface{} { - return map[string]interface{}{ - "nodeName": c.nodeName, - "peerId": c.peerId, - "statusVersion": c.version, - "deviceType": c.deviceType, - "timestamp": time.Now().Unix(), - } -} - -func (c *Client) ProcessReceivedMessages(receivedMessages ReceivedMessages) *json.RawMessage { - var postBody []map[string]interface{} - for _, message := range receivedMessages.Messages { - messageBody := c.commonPostBody() - messageBody["chatId"] = receivedMessages.Filter.ChatID - messageBody["messageHash"] = types.EncodeHex(receivedMessages.SSHMessage.Hash) - messageBody["messageId"] = message.ApplicationLayer.ID - messageBody["sentAt"] = receivedMessages.SSHMessage.Timestamp - messageBody["pubsubTopic"] = receivedMessages.Filter.PubsubTopic - messageBody["topic"] = receivedMessages.Filter.ContentTopic.String() - messageBody["messageType"] = message.ApplicationLayer.Type.String() - messageBody["receiverKeyUID"] = c.keyUID - messageBody["messageSize"] = len(receivedMessages.SSHMessage.Payload) - postBody = append(postBody, messageBody) - } - body, _ := json.Marshal(postBody) - jsonRawMessage := json.RawMessage(body) - return &jsonRawMessage -} - -func (c *Client) ProcessSentEnvelope(sentEnvelope wakuv2.SentEnvelope) *json.RawMessage { - postBody := c.commonPostBody() - postBody["messageHash"] = sentEnvelope.Envelope.Hash().String() - postBody["sentAt"] = uint32(sentEnvelope.Envelope.Message().GetTimestamp() / int64(time.Second)) - postBody["pubsubTopic"] = sentEnvelope.Envelope.PubsubTopic() - postBody["topic"] = sentEnvelope.Envelope.Message().ContentTopic - postBody["senderKeyUID"] = c.keyUID - postBody["publishMethod"] = sentEnvelope.PublishMethod.String() - return c.marshalPostBody(postBody) -} - -func (c *Client) ProcessErrorSendingEnvelope(errorSendingEnvelope wakuv2.ErrorSendingEnvelope) *json.RawMessage { - postBody := c.commonPostBody() - postBody["messageHash"] = errorSendingEnvelope.SentEnvelope.Envelope.Hash().String() - postBody["sentAt"] = uint32(errorSendingEnvelope.SentEnvelope.Envelope.Message().GetTimestamp() / int64(time.Second)) - postBody["pubsubTopic"] = errorSendingEnvelope.SentEnvelope.Envelope.PubsubTopic() - postBody["topic"] = errorSendingEnvelope.SentEnvelope.Envelope.Message().ContentTopic - postBody["senderKeyUID"] = c.keyUID - postBody["publishMethod"] = errorSendingEnvelope.SentEnvelope.PublishMethod.String() - postBody["error"] = errorSendingEnvelope.Error.Error() - return c.marshalPostBody(postBody) -} - -func (c *Client) ProcessPeerCount(peerCount PeerCount) *json.RawMessage { - postBody := c.commonPostBody() - postBody["peerCount"] = peerCount.PeerCount - return c.marshalPostBody(postBody) -} - -func (c *Client) ProcessPeerConnFailure(peerConnFailure PeerConnFailure) *json.RawMessage { - postBody := c.commonPostBody() - postBody["failedPeerId"] = peerConnFailure.FailedPeerId - postBody["failureCount"] = peerConnFailure.FailureCount - postBody["nodeKeyUID"] = c.keyUID - return c.marshalPostBody(postBody) -} - -func (c *Client) ProcessMessageCheckSuccess(messageCheckSuccess MessageCheckSuccess) *json.RawMessage { - postBody := c.commonPostBody() - postBody["messageHash"] = messageCheckSuccess.MessageHash - return c.marshalPostBody(postBody) -} - -func (c *Client) ProcessPeerCountByShard(peerCountByShard PeerCountByShard) *json.RawMessage { - postBody := c.commonPostBody() - postBody["shard"] = peerCountByShard.Shard - postBody["count"] = peerCountByShard.Count - return c.marshalPostBody(postBody) -} - -func (c *Client) ProcessMessageCheckFailure(messageCheckFailure MessageCheckFailure) *json.RawMessage { - postBody := c.commonPostBody() - postBody["messageHash"] = messageCheckFailure.MessageHash - return c.marshalPostBody(postBody) -} - -func (c *Client) ProcessPeerCountByOrigin(peerCountByOrigin PeerCountByOrigin) *json.RawMessage { - postBody := c.commonPostBody() - postBody["origin"] = peerCountByOrigin.Origin - postBody["count"] = peerCountByOrigin.Count - return c.marshalPostBody(postBody) -} - -func (c *Client) ProcessDialFailure(dialFailure DialFailure) *json.RawMessage { - postBody := c.commonPostBody() - postBody["errorType"] = dialFailure.ErrorType - postBody["errorMsg"] = dialFailure.ErrorMsg - postBody["protocols"] = dialFailure.Protocols - return c.marshalPostBody(postBody) -} - -func (c *Client) ProcessMissedMessage(missedMessage MissedMessage) *json.RawMessage { - postBody := c.commonPostBody() - postBody["messageHash"] = missedMessage.Envelope.Hash().String() - postBody["sentAt"] = uint32(missedMessage.Envelope.Message().GetTimestamp() / int64(time.Second)) - postBody["pubsubTopic"] = missedMessage.Envelope.PubsubTopic() - postBody["contentTopic"] = missedMessage.Envelope.Message().ContentTopic - return c.marshalPostBody(postBody) -} - -func (c *Client) ProcessMissedRelevantMessage(missedMessage MissedRelevantMessage) *json.RawMessage { - postBody := c.commonPostBody() - postBody["messageHash"] = missedMessage.ReceivedMessage.Envelope.Hash().String() - postBody["sentAt"] = missedMessage.ReceivedMessage.Sent - postBody["pubsubTopic"] = missedMessage.ReceivedMessage.PubsubTopic - postBody["contentTopic"] = missedMessage.ReceivedMessage.ContentTopic - return c.marshalPostBody(postBody) -} - -func (c *Client) ProcessMessageDeliveryConfirmed(messageDeliveryConfirmed MessageDeliveryConfirmed) *json.RawMessage { - postBody := c.commonPostBody() - postBody["messageHash"] = messageDeliveryConfirmed.MessageHash - return c.marshalPostBody(postBody) -} - -func (c *Client) ProcessSentMessageTotal(sentMessageTotal SentMessageTotal) *json.RawMessage { - postBody := c.commonPostBody() - postBody["size"] = sentMessageTotal.Size - return c.marshalPostBody(postBody) -} - -// Helper function to marshal post body and handle errors -func (c *Client) marshalPostBody(postBody map[string]interface{}) *json.RawMessage { - body, err := json.Marshal(postBody) - if err != nil { - c.logger.Error("Error marshaling post body", zap.Error(err)) - return nil - } - jsonRawMessage := json.RawMessage(body) - return &jsonRawMessage -} - -func (c *Client) UpdateEnvelopeProcessingError(shhMessage *types.Message, processingError error) { - defer common.LogOnPanic() - c.logger.Debug("Pushing envelope update to telemetry server", zap.String("hash", types.EncodeHex(shhMessage.Hash))) - url := fmt.Sprintf("%s/update-envelope", c.serverURL) - var errorString = "" - if processingError != nil { - errorString = processingError.Error() - } - postBody := map[string]interface{}{ - "messageHash": types.EncodeHex(shhMessage.Hash), - "sentAt": shhMessage.Timestamp, - "pubsubTopic": shhMessage.PubsubTopic, - "topic": shhMessage.Topic, - "receiverKeyUID": c.keyUID, - "peerId": c.peerId, - "nodeName": c.nodeName, - "processingError": errorString, - "deviceType": c.deviceType, - } - body, _ := json.Marshal(postBody) - _, err := c.httpClient.Post(url, "application/json", bytes.NewBuffer(body)) - if err != nil { - c.logger.Error("Error sending envelope update to telemetry server", zap.Error(err)) - } -} diff --git a/telemetry/client_test.go b/telemetry/client_test.go deleted file mode 100644 index 7c1e6e7848d..00000000000 --- a/telemetry/client_test.go +++ /dev/null @@ -1,613 +0,0 @@ -package telemetry - -import ( - "context" - "encoding/json" - "errors" - "net/http" - "net/http/httptest" - "os" - "slices" - "sync" - "testing" - "time" - - "go.uber.org/zap" - "google.golang.org/protobuf/proto" - - "github.com/waku-org/go-waku/waku/v2/api/publish" - v2protocol "github.com/waku-org/go-waku/waku/v2/protocol" - "github.com/waku-org/go-waku/waku/v2/protocol/pb" - - "github.com/stretchr/testify/require" - - "github.com/status-im/status-go/eth-node/types" - "github.com/status-im/status-go/protocol/transport" - "github.com/status-im/status-go/protocol/tt" - v1protocol "github.com/status-im/status-go/protocol/v1" - "github.com/status-im/status-go/wakuv2" - "github.com/status-im/status-go/wakuv2/common" -) - -var ( - testContentTopic = "/waku/1/0x12345679/rfc26" -) - -func createMockServer(t *testing.T, wg *sync.WaitGroup, expectedType TelemetryType, expectedCondition func(received []TelemetryRequest) (shouldSucceed bool, shouldFail bool)) *httptest.Server { - return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.Method != "POST" { - t.Errorf("Expected 'POST' request, got '%s'", r.Method) - } - if r.URL.EscapedPath() != "/record-metrics" { - t.Errorf("Expected request to '/record-metrics', got '%s'", r.URL.EscapedPath()) - } - - // Check the request body is as expected - var received []TelemetryRequest - err := json.NewDecoder(r.Body).Decode(&received) - if err != nil { - t.Fatal(err) - } - - if expectedCondition != nil { - shouldSucceed, shouldFail := expectedCondition(received) - if shouldFail { - w.WriteHeader(http.StatusInternalServerError) - t.Fail() - return - } - if !shouldSucceed { - w.WriteHeader(http.StatusOK) - return - } - } else { - if len(received) != 1 { - t.Errorf("Unexpected data received: %+v", received) - } else { - if received[0].TelemetryType != expectedType { - t.Errorf("Unexpected telemetry type: got %v, want %v", received[0].TelemetryType, expectedType) - } - } - } - // If the data is as expected, respond with success - t.Log("Responding with success") - responseBody := []map[string]interface{}{ - {"status": "created"}, - } - body, err := json.Marshal(responseBody) - if err != nil { - t.Fatalf("Failed to marshal response body: %v", err) - } - w.WriteHeader(http.StatusCreated) - _, err = w.Write(body) - if err != nil { - t.Fatalf("Failed to write response body: %v", err) - } - wg.Done() - })) -} - -func createClient(t *testing.T, mockServerURL string) *Client { - config := zap.NewDevelopmentConfig() - config.Level = zap.NewAtomicLevelAt(zap.DebugLevel) - logger, err := config.Build() - if err != nil { - t.Fatalf("Failed to create logger: %v", err) - } - return NewClient(logger, mockServerURL, "testUID", "testNode", "1.0", WithSendPeriod(100*time.Millisecond), WithPeerID("16Uiu2HAkvWiyFsgRhuJEb9JfjYxEkoHLgnUQmr1N5mKWnYjxYRVm")) -} - -type expectedCondition func(received []TelemetryRequest) (shouldSucceed bool, shouldFail bool) - -func withMockServer(t *testing.T, expectedType TelemetryType, expectedCondition expectedCondition, testFunc func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup)) { - var wg sync.WaitGroup - wg.Add(1) // Expecting one request - - mockServer := createMockServer(t, &wg, expectedType, expectedCondition) - defer mockServer.Close() - - client := createClient(t, mockServer.URL) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - testFunc(ctx, t, client, &wg) - - // Wait for the request to be received - wg.Wait() -} - -func sendEnvelope(ctx context.Context, client *Client) { - client.PushSentEnvelope(ctx, wakuv2.SentEnvelope{ - Envelope: v2protocol.NewEnvelope(&pb.WakuMessage{ - Payload: []byte{1, 2, 3, 4, 5}, - ContentTopic: testContentTopic, - Version: proto.Uint32(0), - Timestamp: proto.Int64(time.Now().Unix()), - }, 0, ""), - PublishMethod: publish.LightPush, - }) -} - -func TestClient_ProcessReceivedMessages(t *testing.T) { - withMockServer(t, ReceivedMessagesMetric, nil, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) { - // Create a telemetry request to send - data := ReceivedMessages{ - Filter: transport.Filter{ - ChatID: "testChat", - PubsubTopic: "testTopic", - ContentTopic: types.StringToTopic(testContentTopic), - }, - SSHMessage: &types.Message{ - Hash: []byte("hash"), - Timestamp: uint32(time.Now().Unix()), - }, - Messages: []*v1protocol.StatusMessage{ - { - ApplicationLayer: v1protocol.ApplicationLayer{ - ID: types.HexBytes("123"), - Type: 1, - }, - }, - }, - } - - // Send the telemetry request - client.Start(ctx) - client.PushReceivedMessages(ctx, data) - }) -} - -func TestClient_ProcessSentEnvelope(t *testing.T) { - withMockServer(t, SentEnvelopeMetric, nil, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) { - // Send the telemetry request - client.Start(ctx) - sendEnvelope(ctx, client) - }) -} - -var ( - testENRBootstrap = "enrtree://AI4W5N5IFEUIHF5LESUAOSMV6TKWF2MB6GU2YK7PU4TYUGUNOCEPW@store.staging.status.nodes.status.im" -) - -func TestTelemetryUponPublishError(t *testing.T) { - withMockServer(t, ErrorSendingEnvelopeMetric, nil, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) { - enrTreeAddress := testENRBootstrap - envEnrTreeAddress := os.Getenv("ENRTREE_ADDRESS") - if envEnrTreeAddress != "" { - enrTreeAddress = envEnrTreeAddress - } - - wakuConfig := &wakuv2.Config{} - wakuConfig.Port = 0 - wakuConfig.EnablePeerExchangeClient = true - wakuConfig.LightClient = true - wakuConfig.EnableDiscV5 = false - wakuConfig.DiscV5BootstrapNodes = []string{enrTreeAddress} - wakuConfig.DiscoveryLimit = 20 - wakuConfig.ClusterID = 16 - wakuConfig.WakuNodes = []string{enrTreeAddress} - wakuConfig.TelemetryServerURL = client.serverURL - wakuConfig.TelemetrySendPeriodMs = 500 - w, err := wakuv2.New(nil, "", wakuConfig, nil, nil, nil, nil, nil) - require.NoError(t, err) - - client.Start(ctx) - w.SetStatusTelemetryClient(client) - - // Setting this forces the publish function to fail when sending a message - w.SkipPublishToTopic(true) - - err = w.Start() - require.NoError(t, err) - - msg := &pb.WakuMessage{ - Payload: []byte{1, 2, 3, 4, 5}, - ContentTopic: testContentTopic, - Version: proto.Uint32(0), - Timestamp: proto.Int64(time.Now().Unix()), - } - - // This should result in a single request sent by the telemetry client - _, err = w.Send(wakuConfig.DefaultShardPubsubTopic, msg, nil) - require.NoError(t, err) - }) -} - -func TestRetryCache(t *testing.T) { - counter := 0 - var wg sync.WaitGroup - wg.Add(2) - - mockServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.Method != "POST" { - t.Errorf("Expected 'POST' request, got '%s'", r.Method) - } - if r.URL.EscapedPath() != "/record-metrics" { - t.Errorf("Expected request to '/record-metrics', got '%s'", r.URL.EscapedPath()) - } - - // Check the request body is as expected - var received []TelemetryRequest - err := json.NewDecoder(r.Body).Decode(&received) - if err != nil { - t.Fatal(err) - } - - // Fail for the first request to make telemetry cache grow - if counter < 1 { - counter++ - w.WriteHeader(http.StatusInternalServerError) - wg.Done() - } else { - t.Log("Counter reached, responding with success") - if len(received) == 4 { - w.WriteHeader(http.StatusCreated) - responseBody := []map[string]interface{}{ - {"status": "created"}, - } - body, err := json.Marshal(responseBody) - if err != nil { - t.Fatalf("Failed to marshal response body: %v", err) - } - w.WriteHeader(http.StatusCreated) - _, err = w.Write(body) - if err != nil { - t.Fatalf("Failed to write response body: %v", err) - } - wg.Done() - } else { - t.Fatalf("Expected 4 metrics, got %d", len(received)-1) - } - } - })) - defer mockServer.Close() - - ctx := context.Background() - - client := createClient(t, mockServer.URL) - client.Start(ctx) - - for i := 0; i < 3; i++ { - sendEnvelope(ctx, client) - } - - time.Sleep(110 * time.Millisecond) - - require.Equal(t, 3, len(client.telemetryRetryCache)) - - sendEnvelope(ctx, client) - - wg.Wait() - - time.Sleep(100 * time.Millisecond) - - require.Equal(t, 0, len(client.telemetryRetryCache)) -} - -func TestRetryCacheCleanup(t *testing.T) { - ctx := context.Background() - - client := createClient(t, "") - - for i := 0; i < 6000; i++ { - go sendEnvelope(ctx, client) - telemetryRequest := <-client.telemetryCh - client.telemetryCache = append(client.telemetryCache, telemetryRequest) - } - - err := client.pushTelemetryRequest(client.telemetryCache) - // For this test case an error when pushing to the server is fine - require.Error(t, err) - - client.telemetryCache = nil - require.Equal(t, 6000, len(client.telemetryRetryCache)) - - go sendEnvelope(ctx, client) - telemetryRequest := <-client.telemetryCh - client.telemetryCache = append(client.telemetryCache, telemetryRequest) - - err = client.pushTelemetryRequest(client.telemetryCache) - require.Error(t, err) - - telemetryRequests := make([]TelemetryRequest, len(client.telemetryCache)) - copy(telemetryRequests, client.telemetryCache) - client.telemetryCache = nil - - err = client.pushTelemetryRequest(telemetryRequests) - require.Error(t, err) - - require.Equal(t, 5001, len(client.telemetryRetryCache)) -} - -func setDefaultConfig(config *wakuv2.Config, lightMode bool) { - config.ClusterID = 16 - - if lightMode { - config.EnablePeerExchangeClient = true - config.LightClient = true - config.EnableDiscV5 = false - } else { - config.EnableDiscV5 = true - config.EnablePeerExchangeServer = true - config.LightClient = false - config.EnablePeerExchangeClient = false - } -} - -var testStoreENRBootstrap = "enrtree://AI4W5N5IFEUIHF5LESUAOSMV6TKWF2MB6GU2YK7PU4TYUGUNOCEPW@store.staging.shards.nodes.status.im" - -func TestPeerCount(t *testing.T) { - // t.Skip("flaky test") - - expectedCondition := func(received []TelemetryRequest) (shouldSucceed bool, shouldFail bool) { - found := slices.ContainsFunc(received, func(req TelemetryRequest) bool { - t.Log(req) - return req.TelemetryType == PeerCountMetric - }) - return found, false - } - withMockServer(t, PeerCountMetric, expectedCondition, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) { - config := &wakuv2.Config{} - setDefaultConfig(config, false) - config.DiscV5BootstrapNodes = []string{testStoreENRBootstrap} - config.DiscoveryLimit = 20 - config.TelemetryServerURL = client.serverURL - config.TelemetrySendPeriodMs = 1500 - config.TelemetryPeerCountSendPeriod = 1500 - w, err := wakuv2.New(nil, "shards.staging", config, nil, nil, nil, nil, nil) - require.NoError(t, err) - - w.SetStatusTelemetryClient(client) - client.Start(ctx) - - require.NoError(t, w.Start()) - - err = tt.RetryWithBackOff(func() error { - if len(w.Peers()) == 0 { - return errors.New("no peers discovered") - } - return nil - }) - - require.NoError(t, err) - - require.NotEqual(t, 0, len(w.Peers())) - }) -} - -func TestPeerId(t *testing.T) { - expectedCondition := func(received []TelemetryRequest) (shouldSucceed bool, shouldFail bool) { - var data map[string]interface{} - - err := json.Unmarshal(*received[0].TelemetryData, &data) - if err != nil { - return false, true - } - - _, ok := data["peerId"] - require.True(t, ok) - return ok, false - } - withMockServer(t, SentEnvelopeMetric, expectedCondition, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) { - // Send the telemetry request - client.Start(ctx) - sendEnvelope(ctx, client) - - }) - -} - -func TestPeerCountByShard(t *testing.T) { - expectedCondition := func(received []TelemetryRequest) (shouldSucceed bool, shouldFail bool) { - found := slices.ContainsFunc(received, func(req TelemetryRequest) bool { - return req.TelemetryType == PeerCountByShardMetric - }) - return found, false - } - withMockServer(t, PeerCountByShardMetric, expectedCondition, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) { - config := &wakuv2.Config{} - setDefaultConfig(config, false) - config.DiscV5BootstrapNodes = []string{testStoreENRBootstrap} - config.DiscoveryLimit = 20 - config.TelemetryServerURL = client.serverURL - config.TelemetryPeerCountSendPeriod = 1500 - config.TelemetrySendPeriodMs = 1500 - w, err := wakuv2.New(nil, "shards.staging", config, nil, nil, nil, nil, nil) - require.NoError(t, err) - - w.SetStatusTelemetryClient(client) - client.Start(ctx) - - require.NoError(t, w.Start()) - - err = tt.RetryWithBackOff(func() error { - if len(w.Peers()) == 0 { - return errors.New("no peers discovered") - } - return nil - }) - - require.NoError(t, err) - - require.NotEqual(t, 0, len(w.Peers())) - }) -} - -func TestPeerCountByOrigin(t *testing.T) { - expectedCondition := func(received []TelemetryRequest) (shouldSucceed bool, shouldFail bool) { - found := slices.ContainsFunc(received, func(req TelemetryRequest) bool { - return req.TelemetryType == PeerCountByOriginMetric - }) - return found, false - } - withMockServer(t, PeerCountByOriginMetric, expectedCondition, func(ctx context.Context, t *testing.T, client *Client, wg *sync.WaitGroup) { - config := &wakuv2.Config{} - setDefaultConfig(config, false) - config.DiscV5BootstrapNodes = []string{testStoreENRBootstrap} - config.DiscoveryLimit = 20 - config.TelemetryServerURL = client.serverURL - config.TelemetryPeerCountSendPeriod = 1500 - config.TelemetrySendPeriodMs = 1500 - w, err := wakuv2.New(nil, "shards.staging", config, nil, nil, nil, nil, nil) - require.NoError(t, err) - - w.SetStatusTelemetryClient(client) - client.Start(ctx) - - require.NoError(t, w.Start()) - - err = tt.RetryWithBackOff(func() error { - if len(w.Peers()) == 0 { - return errors.New("no peers discovered") - } - return nil - }) - - require.NoError(t, err) - - require.NotEqual(t, 0, len(w.Peers())) - }) -} - -type testCase struct { - name string - input interface{} - expectedType TelemetryType - expectedFields map[string]interface{} -} - -func runTestCase(t *testing.T, tc testCase) { - ctx := context.Background() - client := createClient(t, "") - - go client.processAndPushTelemetry(ctx, tc.input) - - telemetryRequest := <-client.telemetryCh - - require.Equal(t, tc.expectedType, telemetryRequest.TelemetryType, "Unexpected telemetry type") - - var telemetryData map[string]interface{} - err := json.Unmarshal(*telemetryRequest.TelemetryData, &telemetryData) - require.NoError(t, err, "Failed to unmarshal telemetry data") - - for key, value := range tc.expectedFields { - require.Equal(t, value, telemetryData[key], "Unexpected value for %s", key) - } - - require.Contains(t, telemetryData, "nodeName", "Missing nodeName in telemetry data") - require.Contains(t, telemetryData, "peerId", "Missing peerId in telemetry data") - require.Contains(t, telemetryData, "statusVersion", "Missing statusVersion in telemetry data") - require.Contains(t, telemetryData, "deviceType", "Missing deviceType in telemetry data") - require.Contains(t, telemetryData, "timestamp", "Missing timestamp in telemetry data") - - // Simulate pushing the telemetry request - client.telemetryCache = append(client.telemetryCache, telemetryRequest) - - err = client.pushTelemetryRequest(client.telemetryCache) - // For this test case, we expect an error when pushing to the server - require.Error(t, err) - - // Verify that the request is now in the retry cache - require.Equal(t, 1, len(client.telemetryRetryCache), "Expected one item in telemetry retry cache") -} - -func TestProcessMessageDeliveryConfirmed(t *testing.T) { - tc := testCase{ - name: "MessageDeliveryConfirmed", - input: MessageDeliveryConfirmed{ - MessageHash: "0x1234567890abcdef", - }, - expectedType: MessageDeliveryConfirmedMetric, - expectedFields: map[string]interface{}{ - "messageHash": "0x1234567890abcdef", - }, - } - runTestCase(t, tc) -} - -func TestProcessMissedRelevantMessage(t *testing.T) { - now := time.Now() - message := common.NewReceivedMessage( - v2protocol.NewEnvelope( - &pb.WakuMessage{ - Payload: []byte{1, 2, 3, 4, 5}, - ContentTopic: testContentTopic, - Version: proto.Uint32(0), - Timestamp: proto.Int64(now.Unix()), - }, 0, ""), - common.MissingMessageType, - ) - tc := testCase{ - name: "MissedRelevantMessage", - input: MissedRelevantMessage{ - ReceivedMessage: message, - }, - expectedType: MissedRelevantMessageMetric, - expectedFields: map[string]interface{}{ - "messageHash": message.Envelope.Hash().String(), - "pubsubTopic": "", - "contentTopic": "0x12345679", - }, - } - runTestCase(t, tc) -} - -func TestProcessMissedMessage(t *testing.T) { - now := time.Now() - message := common.NewReceivedMessage( - v2protocol.NewEnvelope( - &pb.WakuMessage{ - Payload: []byte{1, 2, 3, 4, 5}, - ContentTopic: testContentTopic, - Version: proto.Uint32(0), - Timestamp: proto.Int64(now.Unix()), - }, 0, ""), - common.MissingMessageType, - ) - tc := testCase{ - name: "MissedMessage", - input: MissedMessage{ - Envelope: message.Envelope, - }, - expectedType: MissedMessageMetric, - expectedFields: map[string]interface{}{ - "messageHash": message.Envelope.Hash().String(), - "pubsubTopic": "", - "contentTopic": message.Envelope.Message().ContentTopic, - }, - } - runTestCase(t, tc) -} - -func TestProcessDialFailure(t *testing.T) { - tc := testCase{ - name: "DialFailure", - input: DialFailure{ - ErrorType: common.ErrorUnknown, - ErrorMsg: "test error message", - Protocols: "test-protocols", - }, - expectedType: DialFailureMetric, - expectedFields: map[string]interface{}{ - "errorType": float64(common.ErrorUnknown), - "errorMsg": "test error message", - "protocols": "test-protocols", - }, - } - runTestCase(t, tc) -} - -func TestProcessSentMessageTotal(t *testing.T) { - tc := testCase{ - name: "SentMessageTotal", - input: SentMessageTotal{ - Size: uint32(1234), - }, - expectedType: SentMessageTotalMetric, - expectedFields: map[string]interface{}{ - "size": float64(1234), - }, - } - runTestCase(t, tc) -} diff --git a/wakuv2/common/metrics.go b/wakuv2/common/metrics.go index 9434b7c52c1..a7a47d045c9 100644 --- a/wakuv2/common/metrics.go +++ b/wakuv2/common/metrics.go @@ -27,10 +27,10 @@ var ( Name: "waku2_envelopes_received_total", Help: "Number of envelopes received.", }) - EnvelopesValidatedCounter = prom.NewCounter(prom.CounterOpts{ + EnvelopesValidatedCounter = prom.NewCounterVec(prom.CounterOpts{ Name: "waku2_envelopes_validated_total", Help: "Number of envelopes processed successfully.", - }) + }, []string{"pubsubTopic", "type"}) EnvelopesRejectedCounter = prom.NewCounterVec(prom.CounterOpts{ Name: "waku2_envelopes_rejected_total", Help: "Number of envelopes rejected.", @@ -48,12 +48,17 @@ var ( Help: "Size of processed Waku envelopes in bytes.", Buckets: prom.ExponentialBuckets(256, 4, 10), }) + PeerCountByOrigin = prom.NewGaugeVec(prom.GaugeOpts{ + Name: "waku_peer_count_by_origin", + Help: "Number of peers by origin", + }, []string{"origin"}) ) func init() { prom.MustRegister(EnvelopesReceivedCounter) - prom.MustRegister(EnvelopesRejectedCounter) + prom.MustRegister(EnvelopesValidatedCounter) prom.MustRegister(EnvelopesCacheFailedCounter) prom.MustRegister(EnvelopesCachedCounter) prom.MustRegister(EnvelopesSizeMeter) + prom.MustRegister(PeerCountByOrigin) } diff --git a/wakuv2/message_publishing.go b/wakuv2/message_publishing.go index 93543bc6e39..c242adb4dff 100644 --- a/wakuv2/message_publishing.go +++ b/wakuv2/message_publishing.go @@ -92,11 +92,11 @@ func (w *Waku) publishEnvelope(envelope *protocol.Envelope) { err = w.messageSender.Send(publish.NewRequest(w.ctx, envelope)) } - if w.statusTelemetryClient != nil { + if w.metricsHandler != nil { if err == nil { - w.statusTelemetryClient.PushSentEnvelope(w.ctx, SentEnvelope{Envelope: envelope, PublishMethod: w.messageSender.PublishMethod()}) + w.metricsHandler.PushSentEnvelope(SentEnvelope{Envelope: envelope, PublishMethod: w.messageSender.PublishMethod()}) } else { - w.statusTelemetryClient.PushErrorSendingEnvelope(w.ctx, ErrorSendingEnvelope{Error: err, SentEnvelope: SentEnvelope{Envelope: envelope, PublishMethod: w.messageSender.PublishMethod()}}) + w.metricsHandler.PushErrorSendingEnvelope(ErrorSendingEnvelope{Error: err, SentEnvelope: SentEnvelope{Envelope: envelope, PublishMethod: w.messageSender.PublishMethod()}}) } } diff --git a/wakuv2/waku.go b/wakuv2/waku.go index d381d679ce2..e8df5a7e906 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -37,6 +37,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" "github.com/multiformats/go-multiaddr" + "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" @@ -104,21 +105,21 @@ type ErrorSendingEnvelope struct { SentEnvelope SentEnvelope } -type ITelemetryClient interface { +type IMetricsHandler interface { SetDeviceType(deviceType string) - PushSentEnvelope(ctx context.Context, sentEnvelope SentEnvelope) - PushErrorSendingEnvelope(ctx context.Context, errorSendingEnvelope ErrorSendingEnvelope) - PushPeerCount(ctx context.Context, peerCount int) - PushPeerConnFailures(ctx context.Context, peerConnFailures map[string]int) - PushMessageCheckSuccess(ctx context.Context, messageHash string) - PushMessageCheckFailure(ctx context.Context, messageHash string) - PushPeerCountByShard(ctx context.Context, peerCountByShard map[uint16]uint) - PushPeerCountByOrigin(ctx context.Context, peerCountByOrigin map[wps.Origin]uint) - PushDialFailure(ctx context.Context, dialFailure common.DialError) - PushMissedMessage(ctx context.Context, envelope *protocol.Envelope) - PushMissedRelevantMessage(ctx context.Context, message *common.ReceivedMessage) - PushMessageDeliveryConfirmed(ctx context.Context, messageHash string) - PushSentMessageTotal(ctx context.Context, messageSize uint32) + PushSentEnvelope(sentEnvelope SentEnvelope) + PushErrorSendingEnvelope(errorSendingEnvelope ErrorSendingEnvelope) + PushPeerCount(peerCount int) + PushPeerConnFailures(peerConnFailures map[string]int) + PushMessageCheckSuccess() + PushMessageCheckFailure() + PushPeerCountByShard(peerCountByShard map[uint16]uint) + PushPeerCountByOrigin(peerCountByOrigin map[wps.Origin]uint) + PushDialFailure(dialFailure common.DialError) + PushMissedMessage(envelope *protocol.Envelope) + PushMissedRelevantMessage(message *common.ReceivedMessage) + PushMessageDeliveryConfirmed() + PushSentMessageTotal(messageSize uint32, publishMethod string) } // Waku represents a dark communication interface through the Ethereum @@ -194,13 +195,13 @@ type Waku struct { onHistoricMessagesRequestFailed func([]byte, peer.ID, error) onPeerStats func(types.ConnStatus) - statusTelemetryClient ITelemetryClient + metricsHandler IMetricsHandler defaultShardInfo protocol.RelayShards } -func (w *Waku) SetStatusTelemetryClient(client ITelemetryClient) { - w.statusTelemetryClient = client +func (w *Waku) SetMetricsHandler(client IMetricsHandler) { + w.metricsHandler = client } func newTTLCache() *ttlcache.Cache[gethcommon.Hash, *common.ReceivedMessage] { @@ -293,6 +294,7 @@ func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *Config, logger *zap.Logge node.WithLogLevel(logger.Level()), node.WithClusterID(cfg.ClusterID), node.WithMaxMsgSize(1024 * 1024), + node.WithPrometheusRegisterer(prometheus.DefaultRegisterer), } if cfg.EnableDiscV5 { @@ -1036,10 +1038,8 @@ func (w *Waku) SkipPublishToTopic(value bool) { func (w *Waku) ConfirmMessageDelivered(hashes []gethcommon.Hash) { w.messageSender.MessagesDelivered(hashes) - if w.statusTelemetryClient != nil { - for _, hash := range hashes { - w.statusTelemetryClient.PushMessageDeliveryConfirmed(w.ctx, hash.String()) - } + if w.metricsHandler != nil { + w.metricsHandler.PushMessageDeliveryConfirmed() } } @@ -1127,6 +1127,11 @@ func (w *Waku) Start() error { return } + publishMethod := "relay" + if w.cfg.LightClient { + publishMethod = "lightpush" + } + for { select { case <-w.ctx.Done(): @@ -1136,10 +1141,10 @@ func (w *Waku) Start() error { case dialErr := <-dialErrSub.Out(): errors := common.ParseDialErrors(dialErr.(utils.DialError).Err.Error()) for _, dialError := range errors { - w.statusTelemetryClient.PushDialFailure(w.ctx, common.DialError{ErrType: dialError.ErrType, ErrMsg: dialError.ErrMsg, Protocols: dialError.Protocols}) + w.metricsHandler.PushDialFailure(common.DialError{ErrType: dialError.ErrType, ErrMsg: dialError.ErrMsg, Protocols: dialError.Protocols}) } case messageSent := <-messageSentSub.Out(): - w.statusTelemetryClient.PushSentMessageTotal(w.ctx, messageSent.(publish.MessageSent).Size) + w.metricsHandler.PushSentMessageTotal(messageSent.(publish.MessageSent).Size, publishMethod) } } }() @@ -1161,6 +1166,7 @@ func (w *Waku) Start() error { w.logger) w.missingMsgVerifier.Start(w.ctx) + w.logger.Info("Started missing message verifier") w.wg.Add(1) go func() { @@ -1175,6 +1181,7 @@ func (w *Waku) Start() error { if err != nil { w.logger.Error("OnNewEnvelopes error", zap.Error(err)) } + w.logger.Info("Got a missing message!") } } }() @@ -1256,10 +1263,10 @@ func (w *Waku) checkForConnectionChanges() { } func (w *Waku) reportPeerMetrics() { - if w.statusTelemetryClient != nil { + if w.metricsHandler != nil { connFailures := FormatPeerConnFailures(w.node) - w.statusTelemetryClient.PushPeerCount(w.ctx, w.PeerCount()) - w.statusTelemetryClient.PushPeerConnFailures(w.ctx, connFailures) + w.metricsHandler.PushPeerCount(w.PeerCount()) + w.metricsHandler.PushPeerConnFailures(connFailures) peerCountByOrigin := make(map[wps.Origin]uint) peerCountByShard := make(map[uint16]uint) @@ -1292,8 +1299,8 @@ func (w *Waku) reportPeerMetrics() { } } } - w.statusTelemetryClient.PushPeerCountByShard(w.ctx, peerCountByShard) - w.statusTelemetryClient.PushPeerCountByOrigin(w.ctx, peerCountByOrigin) + w.metricsHandler.PushPeerCountByShard(peerCountByShard) + w.metricsHandler.PushPeerCountByOrigin(peerCountByOrigin) } } @@ -1332,16 +1339,16 @@ func (w *Waku) startMessageSender() error { Hash: hash, Event: common.EventEnvelopeSent, }) - if w.statusTelemetryClient != nil { - w.statusTelemetryClient.PushMessageCheckSuccess(w.ctx, hash.Hex()) + if w.metricsHandler != nil { + w.metricsHandler.PushMessageCheckSuccess() } case hash := <-msgExpiredChan: w.SendEnvelopeEvent(common.EnvelopeEvent{ Hash: hash, Event: common.EventEnvelopeExpired, }) - if w.statusTelemetryClient != nil { - w.statusTelemetryClient.PushMessageCheckFailure(w.ctx, hash.Hex()) + if w.metricsHandler != nil { + w.metricsHandler.PushMessageCheckFailure() } } } @@ -1437,9 +1444,9 @@ func (w *Waku) OnNewEnvelopes(envelope *protocol.Envelope, msgType common.Messag return nil } - if w.statusTelemetryClient != nil { + if w.metricsHandler != nil { if msgType == common.MissingMessageType { - w.statusTelemetryClient.PushMissedMessage(w.ctx, envelope) + w.metricsHandler.PushMissedMessage(envelope) } } @@ -1460,7 +1467,7 @@ func (w *Waku) OnNewEnvelopes(envelope *protocol.Envelope, msgType common.Messag trouble = true } - common.EnvelopesValidatedCounter.Inc() + common.EnvelopesValidatedCounter.With(prometheus.Labels{"pubsubTopic": envelope.PubsubTopic(), "type": msgType}).Inc() if trouble { return errors.New("received invalid envelope") @@ -1561,8 +1568,8 @@ func (w *Waku) processMessage(e *common.ReceivedMessage) { w.storeMsgIDsMu.Unlock() } else { logger.Debug("filters did match") - if w.statusTelemetryClient != nil && e.MsgType == common.MissingMessageType { - w.statusTelemetryClient.PushMissedRelevantMessage(w.ctx, e) + if w.metricsHandler != nil && e.MsgType == common.MissingMessageType { + w.metricsHandler.PushMissedRelevantMessage(e) } e.Processed.Store(true) }