diff --git a/das/daser.go b/das/daser.go index c04a8dcdb8..69d677c970 100644 --- a/das/daser.go +++ b/das/daser.go @@ -132,6 +132,11 @@ func (d *DASer) Stop(ctx context.Context) error { } d.cancel() + + if err := d.sampler.metrics.close(); err != nil { + log.Warnw("closing metrics", "err", err) + } + if err = d.sampler.wait(ctx); err != nil { return fmt.Errorf("DASer force quit: %w", err) } diff --git a/das/metrics.go b/das/metrics.go index 6454e9d138..82cf0afec8 100644 --- a/das/metrics.go +++ b/das/metrics.go @@ -29,6 +29,8 @@ type metrics struct { newHead metric.Int64Counter lastSampledTS uint64 + + clientReg metric.Registration } func (d *DASer) InitMetrics() error { @@ -119,7 +121,7 @@ func (d *DASer) InitMetrics() error { return nil } - _, err = meter.RegisterCallback(callback, + d.sampler.metrics.clientReg, err = meter.RegisterCallback(callback, lastSampledTS, busyWorkers, networkHead, @@ -133,6 +135,13 @@ func (d *DASer) InitMetrics() error { return nil } +func (m *metrics) close() error { + if m == nil { + return nil + } + return m.clientReg.Unregister() +} + // observeSample records the time it took to sample a header + // the amount of sampled contiguous headers func (m *metrics) observeSample( diff --git a/nodebuilder/node/metrics.go b/nodebuilder/node/metrics.go index 560df808e6..5df847b916 100644 --- a/nodebuilder/node/metrics.go +++ b/nodebuilder/node/metrics.go @@ -4,11 +4,15 @@ import ( "context" "time" + logging "github.com/ipfs/go-log/v2" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" + "go.uber.org/fx" ) +var log = logging.Logger("module/node") + var meter = otel.Meter("node") var ( @@ -17,7 +21,7 @@ var ( ) // WithMetrics registers node metrics. -func WithMetrics() error { +func WithMetrics(lc fx.Lifecycle) error { nodeStartTS, err := meter.Int64ObservableGauge( "node_start_ts", metric.WithDescription("timestamp when the node was started"), @@ -66,7 +70,18 @@ func WithMetrics() error { return nil } - _, err = meter.RegisterCallback(callback, nodeStartTS, totalNodeRunTime, buildInfoGauge) + clientReg, err := meter.RegisterCallback(callback, nodeStartTS, totalNodeRunTime, buildInfoGauge) + if err != nil { + return nil + } - return err + lc.Append( + fx.Hook{OnStop: func(context.Context) error { + if err := clientReg.Unregister(); err != nil { + log.Warn("failed to close metrics", "err", err) + } + return nil + }}, + ) + return nil } diff --git a/nodebuilder/settings.go b/nodebuilder/settings.go index 7830f0e8f6..28bb0ec09a 100644 --- a/nodebuilder/settings.go +++ b/nodebuilder/settings.go @@ -84,11 +84,11 @@ func WithMetrics(metricOpts []otlpmetrichttp.Option, nodeType node.Type) fx.Opti baseComponents := fx.Options( fx.Supply(metricOpts), fx.Invoke(initializeMetrics), - fx.Invoke(func(ca *state.CoreAccessor) { + fx.Invoke(func(lc fx.Lifecycle, ca *state.CoreAccessor) { if ca == nil { return } - state.WithMetrics(ca) + state.WithMetrics(lc, ca) }), fx.Invoke(fraud.WithMetrics[*header.ExtendedHeader]), fx.Invoke(node.WithMetrics), diff --git a/share/eds/cache/accessor_cache.go b/share/eds/cache/accessor_cache.go index 6f937818f8..a45c2542f8 100644 --- a/share/eds/cache/accessor_cache.go +++ b/share/eds/cache/accessor_cache.go @@ -222,10 +222,13 @@ func (bc *AccessorCache) Remove(key shard.Key) error { } // EnableMetrics enables metrics for the cache. -func (bc *AccessorCache) EnableMetrics() error { +func (bc *AccessorCache) EnableMetrics() (CloseMetricsFn, error) { var err error bc.metrics, err = newMetrics(bc) - return err + if err != nil { + return nil, err + } + return bc.metrics.close, err } // refCloser manages references to accessor from provided reader and removes the ref, when the diff --git a/share/eds/cache/cache.go b/share/eds/cache/cache.go index 13e207d7c0..3ec3d2f279 100644 --- a/share/eds/cache/cache.go +++ b/share/eds/cache/cache.go @@ -20,6 +20,8 @@ var ( errCacheMiss = errors.New("accessor not found in blockstore cache") ) +type CloseMetricsFn func() error + // Cache is an interface that defines the basic Cache operations. type Cache interface { // Get retrieves an item from the Cache. @@ -37,7 +39,7 @@ type Cache interface { Remove(shard.Key) error // EnableMetrics enables metrics in Cache - EnableMetrics() error + EnableMetrics() (CloseMetricsFn, error) } // Accessor is a interface type returned by cache, that allows to read raw data by reader or create diff --git a/share/eds/cache/doublecache.go b/share/eds/cache/doublecache.go index a63eadee9e..a7f2a4871e 100644 --- a/share/eds/cache/doublecache.go +++ b/share/eds/cache/doublecache.go @@ -43,9 +43,20 @@ func (mc *DoubleCache) Second() Cache { return mc.second } -func (mc *DoubleCache) EnableMetrics() error { - if err := mc.first.EnableMetrics(); err != nil { - return err +func (mc *DoubleCache) EnableMetrics() (CloseMetricsFn, error) { + firstCloser, err := mc.first.EnableMetrics() + if err != nil { + return nil, err } - return mc.second.EnableMetrics() + secondCloser, err := mc.second.EnableMetrics() + if err != nil { + return nil, err + } + + return func() error { + if err := errors.Join(firstCloser(), secondCloser()); err != nil { + log.Warnw("failed to close metrics", "err", err) + } + return nil + }, nil } diff --git a/share/eds/cache/metrics.go b/share/eds/cache/metrics.go index 565a61a5e0..701b7e3a71 100644 --- a/share/eds/cache/metrics.go +++ b/share/eds/cache/metrics.go @@ -15,6 +15,8 @@ const ( type metrics struct { getCounter metric.Int64Counter evictedCounter metric.Int64Counter + + clientReg metric.Registration } func newMetrics(bc *AccessorCache) (*metrics, error) { @@ -43,12 +45,23 @@ func newMetrics(bc *AccessorCache) (*metrics, error) { observer.ObserveInt64(cacheSize, int64(bc.cache.Len())) return nil } - _, err = meter.RegisterCallback(callback, cacheSize) + clientReg, err := meter.RegisterCallback(callback, cacheSize) + if err != nil { + return nil, err + } return &metrics{ getCounter: getCounter, evictedCounter: evictedCounter, - }, err + clientReg: clientReg, + }, nil +} + +func (m *metrics) close() error { + if m == nil { + return nil + } + return m.clientReg.Unregister() } func (m *metrics) observeEvicted(failed bool) { diff --git a/share/eds/cache/noop.go b/share/eds/cache/noop.go index 0a1a39ec7e..8e1c17924a 100644 --- a/share/eds/cache/noop.go +++ b/share/eds/cache/noop.go @@ -28,8 +28,8 @@ func (n NoopCache) Remove(shard.Key) error { return nil } -func (n NoopCache) EnableMetrics() error { - return nil +func (n NoopCache) EnableMetrics() (CloseMetricsFn, error) { + return func() error { return nil }, nil } var _ Accessor = (*NoopAccessor)(nil) diff --git a/share/eds/metrics.go b/share/eds/metrics.go index 0fd6740154..1ce9fe459d 100644 --- a/share/eds/metrics.go +++ b/share/eds/metrics.go @@ -2,6 +2,7 @@ package eds import ( "context" + "errors" "time" "go.opentelemetry.io/otel" @@ -49,6 +50,9 @@ type metrics struct { longOpTime metric.Float64Histogram gcTime metric.Float64Histogram + + clientReg metric.Registration + closerFn func() error } func (s *Store) WithMetrics() error { @@ -124,7 +128,8 @@ func (s *Store) WithMetrics() error { return err } - if err = s.cache.Load().EnableMetrics(); err != nil { + closerFn, err := s.cache.Load().EnableMetrics() + if err != nil { return err } @@ -139,7 +144,8 @@ func (s *Store) WithMetrics() error { return nil } - if _, err := meter.RegisterCallback(callback, dagStoreShards); err != nil { + clientReg, err := meter.RegisterCallback(callback, dagStoreShards) + if err != nil { return err } @@ -155,10 +161,20 @@ func (s *Store) WithMetrics() error { shardFailureCount: shardFailureCount, longOpTime: longOpTime, gcTime: gcTime, + clientReg: clientReg, + closerFn: closerFn, } return nil } +func (m *metrics) close() error { + if m == nil { + return nil + } + + return errors.Join(m.closerFn(), m.clientReg.Unregister()) +} + func (m *metrics) observeGCtime(ctx context.Context, dur time.Duration, failed bool) { if m == nil { return diff --git a/share/eds/store.go b/share/eds/store.go index 816065909e..da26e16ef4 100644 --- a/share/eds/store.go +++ b/share/eds/store.go @@ -159,6 +159,11 @@ func (s *Store) Start(ctx context.Context) error { // Stop stops the underlying DAGStore. func (s *Store) Stop(context.Context) error { defer s.cancel() + + if err := s.metrics.close(); err != nil { + log.Warnw("failed to close metrics", "err", err) + } + if err := s.invertedIdx.close(); err != nil { return err } diff --git a/share/p2p/discovery/discovery.go b/share/p2p/discovery/discovery.go index 65925819d7..c5cbd88b68 100644 --- a/share/p2p/discovery/discovery.go +++ b/share/p2p/discovery/discovery.go @@ -114,7 +114,12 @@ func (d *Discovery) Start(context.Context) error { func (d *Discovery) Stop(context.Context) error { d.cancel() - return d.metrics.close() + + if err := d.metrics.close(); err != nil { + log.Warnw("failed to close metrics", "err", err) + } + + return nil } // Peers provides a list of discovered peers in the given topic. diff --git a/share/p2p/peers/manager.go b/share/p2p/peers/manager.go index 0ae21ff015..ca85a85ea6 100644 --- a/share/p2p/peers/manager.go +++ b/share/p2p/peers/manager.go @@ -166,6 +166,10 @@ func (m *Manager) Start(startCtx context.Context) error { func (m *Manager) Stop(ctx context.Context) error { m.cancel() + if err := m.metrics.close(); err != nil { + log.Warnw("closing metrics", "err", err) + } + // we do not need to wait for headersub and disconnected peers to finish // here, since they were never started if m.headerSub == nil && m.shrexSub == nil { diff --git a/share/p2p/peers/metrics.go b/share/p2p/peers/metrics.go index 094d81a5e3..da52856425 100644 --- a/share/p2p/peers/metrics.go +++ b/share/p2p/peers/metrics.go @@ -68,6 +68,8 @@ type metrics struct { fullNodesPool metric.Int64ObservableGauge // attributes: pool_status blacklistedPeersByReason sync.Map blacklistedPeers metric.Int64ObservableGauge // attributes: blacklist_reason + + clientReg metric.Registration } func initMetrics(manager *Manager) (*metrics, error) { @@ -154,13 +156,20 @@ func initMetrics(manager *Manager) (*metrics, error) { }) return nil } - _, err = meter.RegisterCallback(callback, shrexPools, fullNodesPool, blacklisted) + metrics.clientReg, err = meter.RegisterCallback(callback, shrexPools, fullNodesPool, blacklisted) if err != nil { return nil, fmt.Errorf("registering metrics callback: %w", err) } return metrics, nil } +func (m *metrics) close() error { + if m == nil { + return nil + } + return m.clientReg.Unregister() +} + func (m *metrics) observeGetPeer( ctx context.Context, source peerSource, poolSize int, waitTime time.Duration, diff --git a/state/metrics.go b/state/metrics.go index 3672ef9b36..a9f0074176 100644 --- a/state/metrics.go +++ b/state/metrics.go @@ -5,11 +5,12 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/metric" + "go.uber.org/fx" ) var meter = otel.Meter("state") -func WithMetrics(ca *CoreAccessor) { +func WithMetrics(lc fx.Lifecycle, ca *CoreAccessor) { pfbCounter, _ := meter.Int64ObservableCounter( "pfb_count", metric.WithDescription("Total count of submitted PayForBlob transactions"), @@ -24,8 +25,18 @@ func WithMetrics(ca *CoreAccessor) { observer.ObserveInt64(lastPfbTimestamp, ca.LastPayForBlob()) return nil } - _, err := meter.RegisterCallback(callback, pfbCounter, lastPfbTimestamp) + + clientReg, err := meter.RegisterCallback(callback, pfbCounter, lastPfbTimestamp) if err != nil { panic(err) } + + lc.Append(fx.Hook{ + OnStop: func(context.Context) error { + if err := clientReg.Unregister(); err != nil { + log.Warnw("failed to close metrics", "err", err) + } + return nil + }, + }) }