Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 16 additions & 89 deletions sync/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,47 +11,28 @@
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

otelattr "github.com/celestiaorg/go-header/internal/otelattr"
"github.com/celestiaorg/go-header"
)

var meter = otel.Meter("header/sync")

type metrics struct {
syncerReg metric.Registration

subjectiveHeadInst metric.Int64ObservableGauge
syncLoopRunningInst metric.Int64ObservableGauge

syncLoopStarted metric.Int64Counter
trustedPeersOutOfSync metric.Int64Counter
outdatedHeader metric.Int64Counter
subjectiveInit metric.Int64Counter

bifurcations metric.Int64Counter
failedBifurcations metric.Int64Counter

subjectiveHead atomic.Uint64

syncLoopDurationHist metric.Float64Histogram
syncLoopActive atomic.Int64
syncStartedTS time.Time
subjectiveHeadInst metric.Int64ObservableGauge
subjectiveHead atomic.Uint64

requestRangeTimeHist metric.Float64Histogram
requestRangeStartTS time.Time

blockTime metric.Float64Histogram
prevHeader time.Time
}

func newMetrics() (*metrics, error) {
syncLoopStarted, err := meter.Int64Counter(
"hdr_sync_loop_started_counter",
metric.WithDescription("sync loop started shows that syncing is in progress"),
)
if err != nil {
return nil, err
}

trustedPeersOutOfSync, err := meter.Int64Counter(
"hdr_sync_trust_peers_out_of_sync_counter",
metric.WithDescription("trusted peers out of sync and gave outdated header"),
Expand Down Expand Up @@ -106,52 +87,25 @@
return nil, err
}

syncLoopDurationHist, err := meter.Float64Histogram(
"hdr_sync_loop_time_hist",
metric.WithDescription("tracks the duration of syncing"))
if err != nil {
return nil, err
}

requestRangeTimeHist, err := meter.Float64Histogram("hdr_sync_range_request_time_hist",
metric.WithDescription("tracks the duration of GetRangeByHeight requests"))
if err != nil {
return nil, err
}

syncLoopRunningInst, err := meter.Int64ObservableGauge(
"hdr_sync_loop_status_gauge",
metric.WithDescription("reports whether syncing is active or not"))
if err != nil {
return nil, err
}

blockTime, err := meter.Float64Histogram(
"hdr_sync_actual_blockTime_ts_hist",
metric.WithDescription("duration between creation of 2 blocks"),
)
if err != nil {
return nil, err
}

m := &metrics{
syncLoopStarted: syncLoopStarted,
trustedPeersOutOfSync: trustedPeersOutOfSync,
outdatedHeader: outdatedHeader,
subjectiveInit: subjectiveInit,
bifurcations: bifurcations,
failedBifurcations: failedBifurcations,
syncLoopDurationHist: syncLoopDurationHist,
syncLoopRunningInst: syncLoopRunningInst,
requestRangeTimeHist: requestRangeTimeHist,
blockTime: blockTime,
subjectiveHeadInst: subjectiveHead,
}

m.syncerReg, err = meter.RegisterCallback(
m.observeMetrics,
m.subjectiveHeadInst,
m.syncLoopRunningInst,
)
if err != nil {
return nil, err
Expand All @@ -166,25 +120,9 @@
return fmt.Errorf("height overflows int64: %d", headHeight)
}
obs.ObserveInt64(m.subjectiveHeadInst, int64(headHeight))
obs.ObserveInt64(m.syncLoopRunningInst, m.syncLoopActive.Load())
return nil
}

func (m *metrics) syncStarted(ctx context.Context) {
m.observe(ctx, func(ctx context.Context) {
m.syncStartedTS = time.Now()
m.syncLoopStarted.Add(ctx, 1)
m.syncLoopActive.Store(1)
})
}

func (m *metrics) syncFinished(ctx context.Context) {
m.observe(ctx, func(ctx context.Context) {
m.syncLoopActive.Store(0)
m.syncLoopDurationHist.Record(ctx, time.Since(m.syncStartedTS).Seconds())
})
}

func (m *metrics) outdatedHead(ctx context.Context) {
m.observe(ctx, func(ctx context.Context) {
m.outdatedHeader.Add(ctx, 1)
Expand All @@ -203,23 +141,9 @@
})
}

func (m *metrics) updateGetRangeRequestInfo(ctx context.Context, amount uint64, failed bool) {
m.observe(ctx, func(ctx context.Context) {
m.requestRangeTimeHist.Record(ctx, time.Since(m.requestRangeStartTS).Seconds(),
metric.WithAttributes(
otelattr.Uint64("headers amount", amount),
attribute.Bool("request failed", failed),
))
})
}

func (m *metrics) newSubjectiveHead(ctx context.Context, height uint64, timestamp time.Time) {
func (m *metrics) newSubjectiveHead(ctx context.Context, height uint64) {
m.observe(ctx, func(ctx context.Context) {

Check failure on line 145 in sync/metrics.go

View workflow job for this annotation

GitHub Actions / build

unused-parameter: parameter 'ctx' seems to be unused, consider removing or renaming it as _ (revive)
m.subjectiveHead.Store(height)

if !m.prevHeader.IsZero() {
m.blockTime.Record(ctx, timestamp.Sub(m.prevHeader).Seconds())
}
})
}

Expand All @@ -245,18 +169,21 @@
})
}

func (m *metrics) rangeRequestStart() {
if m == nil {
return
}
m.requestRangeStartTS = time.Now()
}

func (m *metrics) rangeRequestStop() {
// recordRangeRequest will record the duration it takes to request a batch of
// headers. It will also record whether the range request was the full
// MaxRangeRequestSize (64) or not, giving us a lower-cardinality way to derive
// sync speed.
func (m *metrics) recordRangeRequest(ctx context.Context, duration time.Duration, size uint64) {
if m == nil {
return
}
m.requestRangeStartTS = time.Time{}
m.observe(ctx, func(ctx context.Context) {
m.requestRangeTimeHist.Record(ctx, duration.Seconds(),
metric.WithAttributes(
attribute.Bool("is_max_range_req_size", size == header.MaxRangeRequestSize),
),
)
})
}

func (m *metrics) observe(ctx context.Context, observeFn func(context.Context)) {
Expand Down
7 changes: 2 additions & 5 deletions sync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,7 @@ func (s *Syncer[H]) syncLoop() {
for {
select {
case <-s.triggerSync:
s.metrics.syncStarted(s.ctx)
s.sync(s.ctx)
s.metrics.syncFinished(s.ctx)
case <-s.ctx.Done():
return
}
Expand Down Expand Up @@ -360,10 +358,9 @@ func (s *Syncer[H]) requestHeaders(
}

to := fromHead.Height() + size + 1
s.metrics.rangeRequestStart()
start := time.Now()
headers, err := s.getter.GetRangeByHeight(ctx, fromHead, to)
s.metrics.updateGetRangeRequestInfo(s.ctx, size/100, err != nil)
s.metrics.rangeRequestStop()
s.metrics.recordRangeRequest(ctx, time.Since(start), size)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion sync/syncer_head.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (s *Syncer[H]) setLocalHead(ctx context.Context, netHead H) {
"hash", netHead.Hash().String(),
"err", err)
}
s.metrics.newSubjectiveHead(s.ctx, netHead.Height(), netHead.Time())
s.metrics.newSubjectiveHead(s.ctx, netHead.Height())

storeHead, err := s.store.Head(ctx)
if err == nil && storeHead.Height() >= netHead.Height() {
Expand Down
Loading