Skip to content

Commit

Permalink
observability - instrument 'ssv/validator' package with Traces
Browse files Browse the repository at this point in the history
  • Loading branch information
oleg-ssvlabs committed Jan 16, 2025
1 parent 2500759 commit f63cc5e
Show file tree
Hide file tree
Showing 7 changed files with 406 additions and 94 deletions.
159 changes: 132 additions & 27 deletions protocol/v2/ssv/validator/committee.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,14 @@ import (

"github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/pkg/errors"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"github.com/ssvlabs/ssv-spec/qbft"
spectypes "github.com/ssvlabs/ssv-spec/types"
"github.com/ssvlabs/ssv/logging/fields"
"github.com/ssvlabs/ssv/observability"
"github.com/ssvlabs/ssv/protocol/v2/message"
"github.com/ssvlabs/ssv/protocol/v2/ssv/queue"
"github.com/ssvlabs/ssv/protocol/v2/ssv/runner"
Expand Down Expand Up @@ -89,43 +92,71 @@ func (c *Committee) RemoveShare(validatorIndex phase0.ValidatorIndex) {
}
}

func (c *Committee) StartConsumeQueue(logger *zap.Logger, duty *spectypes.CommitteeDuty) error {
func (c *Committee) StartConsumeQueue(ctx context.Context, logger *zap.Logger, duty *spectypes.CommitteeDuty) error {
c.mtx.Lock()
defer c.mtx.Unlock()

ctx, span := tracer.Start(ctx,
fmt.Sprintf("%s.start_consume_queue", observabilityNamespace),
trace.WithAttributes(
observability.RunnerRoleAttribute(duty.RunnerRole()),
observability.DutyCountAttribute(len(duty.ValidatorDuties)),
observability.BeaconSlotAttribute(duty.Slot)),
)
defer span.End()

// Setting the cancel function separately due the queue could be created in HandleMessage
q, found := c.Queues[duty.Slot]
if !found {
return errors.New(fmt.Sprintf("no queue found for slot %d", duty.Slot))
err := errors.New(fmt.Sprintf("no queue found for slot %d", duty.Slot))
span.SetStatus(codes.Error, err.Error())
return err
}

r := c.Runners[duty.Slot]
if r == nil {
return errors.New(fmt.Sprintf("no runner found for slot %d", duty.Slot))
err := errors.New(fmt.Sprintf("no runner found for slot %d", duty.Slot))
span.SetStatus(codes.Error, err.Error())
return err
}

// required to stop the queue consumer when timeout message is received by handler
queueCtx, cancelF := context.WithDeadline(c.ctx, time.Unix(c.BeaconNetwork.EstimatedTimeAtSlot(duty.Slot+runnerExpirySlots), 0))
queueCtx, cancelF := context.WithDeadline(ctx, time.Unix(c.BeaconNetwork.EstimatedTimeAtSlot(duty.Slot+runnerExpirySlots), 0))

go func() {
defer cancelF()
if err := c.ConsumeQueue(queueCtx, q, logger, duty.Slot, c.ProcessMessage, r); err != nil {
logger.Error("❗failed consuming committee queue", zap.Error(err))
span.RecordError(err)
}
}()

span.SetStatus(codes.Ok, "")
return nil
}

// StartDuty starts a new duty for the given slot
func (c *Committee) StartDuty(ctx context.Context, logger *zap.Logger, duty *spectypes.CommitteeDuty) error {
c.mtx.Lock()
defer c.mtx.Unlock()
ctx, span := tracer.Start(ctx,
fmt.Sprintf("%s.start_committee_duty", observabilityNamespace),
trace.WithAttributes(
observability.RunnerRoleAttribute(duty.RunnerRole()),
observability.DutyCountAttribute(len(duty.ValidatorDuties)),
observability.BeaconSlotAttribute(duty.Slot)),
)
defer span.End()

if len(duty.ValidatorDuties) == 0 {
return errors.New("no beacon duties")
err := errors.New("no beacon duties")
span.SetStatus(codes.Error, err.Error())
return err
}
if _, exists := c.Runners[duty.Slot]; exists {
return errors.New(fmt.Sprintf("CommitteeRunner for slot %d already exists", duty.Slot))
err := errors.New(fmt.Sprintf("CommitteeRunner for slot %d already exists", duty.Slot))
span.SetStatus(codes.Error, err.Error())
return err
}

// Filter out Beacon duties for which we don't have a share.
Expand All @@ -138,9 +169,15 @@ func (c *Committee) StartDuty(ctx context.Context, logger *zap.Logger, duty *spe
for _, beaconDuty := range duty.ValidatorDuties {
share, exists := c.Shares[beaconDuty.ValidatorIndex]
if !exists {
logger.Debug("no share for validator duty",
const eventMsg = "no share for validator duty"
logger.Debug(eventMsg,
fields.BeaconRole(beaconDuty.Type),
zap.Uint64("validator_index", uint64(beaconDuty.ValidatorIndex)))
span.AddEvent(eventMsg, trace.WithAttributes(
observability.ValidatorIndexAttribute(beaconDuty.ValidatorIndex),
observability.BeaconRoleAttribute(beaconDuty.Type),
observability.ValidatorPublicKeyAttribute(beaconDuty.PubKey),
))
continue
}
shares[beaconDuty.ValidatorIndex] = share
Expand All @@ -151,7 +188,9 @@ func (c *Committee) StartDuty(ctx context.Context, logger *zap.Logger, duty *spe
}
}
if len(shares) == 0 {
return errors.New("no shares for duty's validators")
err := errors.New("no shares for duty's validators")
span.SetStatus(codes.Error, err.Error())
return err
}
duty = filteredDuty

Expand Down Expand Up @@ -179,14 +218,20 @@ func (c *Committee) StartDuty(ctx context.Context, logger *zap.Logger, duty *spe
// Prunes all expired committee runners, when new runner is created
pruneLogger := c.logger.With(zap.Uint64("current_slot", uint64(duty.Slot)))
if err := c.unsafePruneExpiredRunners(pruneLogger, duty.Slot); err != nil {
span.RecordError(err)
pruneLogger.Error("couldn't prune expired committee runners", zap.Error(err))
}

logger.Info("ℹ️ starting duty processing")
const eventMsg = "ℹ️ starting duty processing"
logger.Info(eventMsg)
span.AddEvent(eventMsg)
err = runner.StartNewDuty(ctx, logger, duty, c.CommitteeMember.GetQuorum())
if err != nil {
return errors.Wrap(err, "runner failed to start duty")
err := errors.Wrap(err, "runner failed to start duty")
span.SetStatus(codes.Error, err.Error())
return err
}
span.SetStatus(codes.Ok, "")
return nil
}

Expand All @@ -206,70 +251,130 @@ func (c *Committee) PushToQueue(slot phase0.Slot, dec *queue.SSVMessage) {

// ProcessMessage processes Network Message of all types
func (c *Committee) ProcessMessage(ctx context.Context, logger *zap.Logger, msg *queue.SSVMessage) error {
msgType := msg.GetType()
ctx, span := tracer.Start(ctx, fmt.Sprintf("%s.process_committee_message", observabilityNamespace),
trace.WithAttributes(
observability.ValidatorMsgIDAttribute(msg.GetID()),
observability.ValidatorMsgTypeAttribute(msgType),
observability.RunnerRoleAttribute(msg.GetID().GetRoleType()),
),
trace.WithLinks(trace.LinkFromContext(msg.Context)))
defer span.End()

if msg.SignedSSVMessage != nil {
slot, err := msg.Slot()
if err == nil {
span.SetAttributes(observability.DutyIDAttribute(
fields.FormatCommitteeDutyID(msg.SignedSSVMessage.OperatorIDs, c.BeaconNetwork.EstimatedEpochAtSlot(slot), slot),
))
}
}

// Validate message
if msg.GetType() != message.SSVEventMsgType {
if msgType != message.SSVEventMsgType {
span.AddEvent("validating message and signature")
if err := msg.SignedSSVMessage.Validate(); err != nil {
return errors.Wrap(err, "invalid SignedSSVMessage")
err := errors.Wrap(err, "invalid SignedSSVMessage")
span.SetStatus(codes.Error, err.Error())
return err
}

// Verify SignedSSVMessage's signature
if err := spectypes.Verify(msg.SignedSSVMessage, c.CommitteeMember.Committee); err != nil {
return errors.Wrap(err, "SignedSSVMessage has an invalid signature")
err := errors.Wrap(err, "SignedSSVMessage has an invalid signature")
span.SetStatus(codes.Error, err.Error())
return err
}

if err := c.validateMessage(msg.SignedSSVMessage.SSVMessage); err != nil {
return errors.Wrap(err, "Message invalid")
err := errors.Wrap(err, "Message invalid")
span.SetStatus(codes.Error, err.Error())
return err
}
}

switch msg.GetType() {
switch msgType {
case spectypes.SSVConsensusMsgType:
qbftMsg := &qbft.Message{}
if err := qbftMsg.Decode(msg.GetData()); err != nil {
return errors.Wrap(err, "could not get consensus Message from network Message")
err := errors.Wrap(err, "could not get consensus Message from network Message")
span.SetStatus(codes.Error, err.Error())
return err
}
if err := qbftMsg.Validate(); err != nil {
return errors.Wrap(err, "invalid qbft Message")
err := errors.Wrap(err, "invalid qbft Message")
span.SetStatus(codes.Error, err.Error())
return err
}
c.mtx.Lock()
runner, exists := c.Runners[phase0.Slot(qbftMsg.Height)]
c.mtx.Unlock()
if !exists {
return errors.New("no runner found for message's slot")
err := errors.New("no runner found for message's slot")
span.SetStatus(codes.Error, err.Error())
return err
}
return runner.ProcessConsensus(ctx, logger, msg.SignedSSVMessage)
if err := runner.ProcessConsensus(ctx, logger, msg.SignedSSVMessage); err != nil {
span.SetStatus(codes.Error, err.Error())
return err
}
span.SetStatus(codes.Ok, "")
return nil
case spectypes.SSVPartialSignatureMsgType:
pSigMessages := &spectypes.PartialSignatureMessages{}
if err := pSigMessages.Decode(msg.SignedSSVMessage.SSVMessage.GetData()); err != nil {
return errors.Wrap(err, "could not get post consensus Message from network Message")
err := errors.Wrap(err, "could not get post consensus Message from network Message")
span.SetStatus(codes.Error, err.Error())
return err
}

// Validate
if len(msg.SignedSSVMessage.OperatorIDs) != 1 {
return errors.New("PartialSignatureMessage has more than 1 signer")
err := errors.New("PartialSignatureMessage has more than 1 signer")
span.SetStatus(codes.Error, err.Error())
return err
}

if err := pSigMessages.ValidateForSigner(msg.SignedSSVMessage.OperatorIDs[0]); err != nil {
return errors.Wrap(err, "invalid PartialSignatureMessages")
err := errors.Wrap(err, "invalid PartialSignatureMessages")
span.SetStatus(codes.Error, err.Error())
return err
}

if pSigMessages.Type == spectypes.PostConsensusPartialSig {
c.mtx.Lock()
runner, exists := c.Runners[pSigMessages.Slot]
c.mtx.Unlock()
if !exists {
return errors.New("no runner found for message's slot")
err := errors.New("no runner found for message's slot")
span.SetStatus(codes.Error, err.Error())
return err
}
return runner.ProcessPostConsensus(ctx, logger, pSigMessages)

if err := runner.ProcessPostConsensus(ctx, logger, pSigMessages); err != nil {
span.SetStatus(codes.Error, err.Error())
return err
}
span.SetStatus(codes.Ok, "")
return nil
}
case message.SSVEventMsgType:
return c.handleEventMessage(ctx, logger, msg)
if err := c.handleEventMessage(ctx, logger, msg); err != nil {
span.SetStatus(codes.Error, err.Error())
return err
}
span.SetStatus(codes.Ok, "")
return nil
default:
return errors.New("unknown msg")
err := errors.New("unknown msg")
span.SetStatus(codes.Error, err.Error())
return err
}
return nil

span.SetStatus(codes.Ok, "")
return nil
}

func (c *Committee) unsafePruneExpiredRunners(logger *zap.Logger, currentSlot phase0.Slot) error {
if runnerExpirySlots > currentSlot {
return nil
Expand Down
37 changes: 27 additions & 10 deletions protocol/v2/ssv/validator/committee_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@ package validator
import (
"context"
"errors"
"fmt"

"github.com/attestantio/go-eth2-client/spec/phase0"
specqbft "github.com/ssvlabs/ssv-spec/qbft"
spectypes "github.com/ssvlabs/ssv-spec/types"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"github.com/ssvlabs/ssv/logging/fields"
"github.com/ssvlabs/ssv/observability"
"github.com/ssvlabs/ssv/protocol/v2/message"
"github.com/ssvlabs/ssv/protocol/v2/qbft/instance"
"github.com/ssvlabs/ssv/protocol/v2/ssv/queue"
Expand All @@ -19,17 +23,25 @@ import (
// HandleMessage handles a spectypes.SSVMessage.
// TODO: accept DecodedSSVMessage once p2p is upgraded to decode messages during validation.
// TODO: get rid of logger, add context
func (c *Committee) HandleMessage(_ context.Context, logger *zap.Logger, msg *queue.SSVMessage) {
// logger.Debug("📬 handling SSV message",
// zap.Uint64("type", uint64(msg.MsgType)),
// fields.Role(msg.MsgID.GetRoleType()))

func (c *Committee) HandleMessage(ctx context.Context, logger *zap.Logger, msg *queue.SSVMessage) {
ctx, span := tracer.Start(ctx,
fmt.Sprintf("%s.handle_committee_message", observabilityNamespace),
trace.WithAttributes(
observability.ValidatorMsgIDAttribute(msg.GetID()),
observability.ValidatorMsgTypeAttribute(msg.GetType()),
observability.RunnerRoleAttribute(msg.GetID().GetRoleType()),
))
defer span.End()

msg.Context = ctx
slot, err := msg.Slot()
if err != nil {
logger.Error("❌ could not get slot from message", fields.MessageID(msg.MsgID), zap.Error(err))
span.SetStatus(codes.Error, err.Error())
return
}

span.SetAttributes(observability.BeaconSlotAttribute(slot))
c.mtx.RLock() // read v.Queues
q, ok := c.Queues[slot]
c.mtx.RUnlock()
Expand All @@ -46,16 +58,21 @@ func (c *Committee) HandleMessage(_ context.Context, logger *zap.Logger, msg *qu
c.mtx.Lock()
c.Queues[slot] = q
c.mtx.Unlock()
logger.Debug("missing queue for slot created", fields.Slot(slot))
const eventMsg = "missing queue for slot created"
logger.Debug(eventMsg, fields.Slot(slot))
span.AddEvent(eventMsg)
}

span.AddEvent("pushing message to queue")

if pushed := q.Q.TryPush(msg); !pushed {
msgID := msg.MsgID.String()
logger.Warn("❗ dropping message because the queue is full",
const errMsg = "❗ dropping message because the queue is full"
logger.Warn(errMsg,
zap.String("msg_type", message.MsgTypeToString(msg.MsgType)),
zap.String("msg_id", msgID))
zap.String("msg_id", msg.MsgID.String()))
span.SetStatus(codes.Error, errMsg)
} else {
// logger.Debug("📬 queue: pushed message", fields.MessageID(msg.MsgID), fields.MessageType(msg.MsgType))
span.SetStatus(codes.Ok, "")
}
}

Expand Down
Loading

0 comments on commit f63cc5e

Please sign in to comment.