diff --git a/sync/metrics.go b/sync/metrics.go index f588c089..10e67b27 100644 --- a/sync/metrics.go +++ b/sync/metrics.go @@ -11,7 +11,7 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" - otelattr "github.com/celestiaorg/go-header/internal/otelattr" + "github.com/celestiaorg/go-header" ) var meter = otel.Meter("header/sync") @@ -19,10 +19,6 @@ var meter = otel.Meter("header/sync") type metrics struct { syncerReg metric.Registration - subjectiveHeadInst metric.Int64ObservableGauge - syncLoopRunningInst metric.Int64ObservableGauge - - syncLoopStarted metric.Int64Counter trustedPeersOutOfSync metric.Int64Counter outdatedHeader metric.Int64Counter subjectiveInit metric.Int64Counter @@ -30,28 +26,13 @@ type metrics struct { bifurcations metric.Int64Counter failedBifurcations metric.Int64Counter - subjectiveHead atomic.Uint64 - - syncLoopDurationHist metric.Float64Histogram - syncLoopActive atomic.Int64 - syncStartedTS time.Time + subjectiveHeadInst metric.Int64ObservableGauge + subjectiveHead atomic.Uint64 requestRangeTimeHist metric.Float64Histogram - requestRangeStartTS time.Time - - blockTime metric.Float64Histogram - prevHeader time.Time } func newMetrics() (*metrics, error) { - syncLoopStarted, err := meter.Int64Counter( - "hdr_sync_loop_started_counter", - metric.WithDescription("sync loop started shows that syncing is in progress"), - ) - if err != nil { - return nil, err - } - trustedPeersOutOfSync, err := meter.Int64Counter( "hdr_sync_trust_peers_out_of_sync_counter", metric.WithDescription("trusted peers out of sync and gave outdated header"), @@ -106,52 +87,25 @@ func newMetrics() (*metrics, error) { return nil, err } - syncLoopDurationHist, err := meter.Float64Histogram( - "hdr_sync_loop_time_hist", - metric.WithDescription("tracks the duration of syncing")) - if err != nil { - return nil, err - } - requestRangeTimeHist, err := meter.Float64Histogram("hdr_sync_range_request_time_hist", metric.WithDescription("tracks the duration of GetRangeByHeight requests")) if err != nil { return nil, err } - syncLoopRunningInst, err := meter.Int64ObservableGauge( - "hdr_sync_loop_status_gauge", - metric.WithDescription("reports whether syncing is active or not")) - if err != nil { - return nil, err - } - - blockTime, err := meter.Float64Histogram( - "hdr_sync_actual_blockTime_ts_hist", - metric.WithDescription("duration between creation of 2 blocks"), - ) - if err != nil { - return nil, err - } - m := &metrics{ - syncLoopStarted: syncLoopStarted, trustedPeersOutOfSync: trustedPeersOutOfSync, outdatedHeader: outdatedHeader, subjectiveInit: subjectiveInit, bifurcations: bifurcations, failedBifurcations: failedBifurcations, - syncLoopDurationHist: syncLoopDurationHist, - syncLoopRunningInst: syncLoopRunningInst, requestRangeTimeHist: requestRangeTimeHist, - blockTime: blockTime, subjectiveHeadInst: subjectiveHead, } m.syncerReg, err = meter.RegisterCallback( m.observeMetrics, m.subjectiveHeadInst, - m.syncLoopRunningInst, ) if err != nil { return nil, err @@ -166,25 +120,9 @@ func (m *metrics) observeMetrics(_ context.Context, obs metric.Observer) error { return fmt.Errorf("height overflows int64: %d", headHeight) } obs.ObserveInt64(m.subjectiveHeadInst, int64(headHeight)) - obs.ObserveInt64(m.syncLoopRunningInst, m.syncLoopActive.Load()) return nil } -func (m *metrics) syncStarted(ctx context.Context) { - m.observe(ctx, func(ctx context.Context) { - m.syncStartedTS = time.Now() - m.syncLoopStarted.Add(ctx, 1) - m.syncLoopActive.Store(1) - }) -} - -func (m *metrics) syncFinished(ctx context.Context) { - m.observe(ctx, func(ctx context.Context) { - m.syncLoopActive.Store(0) - m.syncLoopDurationHist.Record(ctx, time.Since(m.syncStartedTS).Seconds()) - }) -} - func (m *metrics) outdatedHead(ctx context.Context) { m.observe(ctx, func(ctx context.Context) { m.outdatedHeader.Add(ctx, 1) @@ -203,23 +141,9 @@ func (m *metrics) subjectiveInitialization(ctx context.Context) { }) } -func (m *metrics) updateGetRangeRequestInfo(ctx context.Context, amount uint64, failed bool) { - m.observe(ctx, func(ctx context.Context) { - m.requestRangeTimeHist.Record(ctx, time.Since(m.requestRangeStartTS).Seconds(), - metric.WithAttributes( - otelattr.Uint64("headers amount", amount), - attribute.Bool("request failed", failed), - )) - }) -} - -func (m *metrics) newSubjectiveHead(ctx context.Context, height uint64, timestamp time.Time) { +func (m *metrics) newSubjectiveHead(ctx context.Context, height uint64) { m.observe(ctx, func(ctx context.Context) { m.subjectiveHead.Store(height) - - if !m.prevHeader.IsZero() { - m.blockTime.Record(ctx, timestamp.Sub(m.prevHeader).Seconds()) - } }) } @@ -245,18 +169,21 @@ func (m *metrics) failedBifurcation(ctx context.Context, height uint64, hash str }) } -func (m *metrics) rangeRequestStart() { - if m == nil { - return - } - m.requestRangeStartTS = time.Now() -} - -func (m *metrics) rangeRequestStop() { +// recordRangeRequest will record the duration it takes to request a batch of +// headers. It will also record whether the range request was the full +// MaxRangeRequestSize (64) or not, giving us a lower-cardinality way to derive +// sync speed. +func (m *metrics) recordRangeRequest(ctx context.Context, duration time.Duration, size uint64) { if m == nil { return } - m.requestRangeStartTS = time.Time{} + m.observe(ctx, func(ctx context.Context) { + m.requestRangeTimeHist.Record(ctx, duration.Seconds(), + metric.WithAttributes( + attribute.Bool("is_max_range_req_size", size == header.MaxRangeRequestSize), + ), + ) + }) } func (m *metrics) observe(ctx context.Context, observeFn func(context.Context)) { diff --git a/sync/syncer.go b/sync/syncer.go index af6df889..bc90b85c 100644 --- a/sync/syncer.go +++ b/sync/syncer.go @@ -223,9 +223,7 @@ func (s *Syncer[H]) syncLoop() { for { select { case <-s.triggerSync: - s.metrics.syncStarted(s.ctx) s.sync(s.ctx) - s.metrics.syncFinished(s.ctx) case <-s.ctx.Done(): return } @@ -360,10 +358,9 @@ func (s *Syncer[H]) requestHeaders( } to := fromHead.Height() + size + 1 - s.metrics.rangeRequestStart() + start := time.Now() headers, err := s.getter.GetRangeByHeight(ctx, fromHead, to) - s.metrics.updateGetRangeRequestInfo(s.ctx, size/100, err != nil) - s.metrics.rangeRequestStop() + s.metrics.recordRangeRequest(ctx, time.Since(start), size) if err != nil { return err } diff --git a/sync/syncer_head.go b/sync/syncer_head.go index aa817877..3fb24c63 100644 --- a/sync/syncer_head.go +++ b/sync/syncer_head.go @@ -201,7 +201,7 @@ func (s *Syncer[H]) setLocalHead(ctx context.Context, netHead H) { "hash", netHead.Hash().String(), "err", err) } - s.metrics.newSubjectiveHead(s.ctx, netHead.Height(), netHead.Time()) + s.metrics.newSubjectiveHead(s.ctx, netHead.Height()) storeHead, err := s.store.Head(ctx) if err == nil && storeHead.Height() >= netHead.Height() {