Skip to content
This repository has been archived by the owner on Mar 28, 2023. It is now read-only.

Commit

Permalink
feat: add latency & count metrics for content routing client (#59)
Browse files Browse the repository at this point in the history
* feat: add latency & count metrics for content routing client

* use consistent err string logic as hydras
  • Loading branch information
guseggert authored Oct 19, 2022
1 parent 0c84bf8 commit 98c6d42
Show file tree
Hide file tree
Showing 6 changed files with 248 additions and 46 deletions.
53 changes: 53 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package client

import (
"context"
"errors"
"time"

"github.com/ipfs/go-cid"
proto "github.com/ipfs/go-delegated-routing/gen/proto"
ipns "github.com/ipfs/go-ipns"
logging "github.com/ipfs/go-log/v2"
record "github.com/libp2p/go-libp2p-record"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
)

var logger = logging.Logger("service/client/delegatedrouting")

type DelegatedRoutingClient interface {
FindProviders(ctx context.Context, key cid.Cid) ([]peer.AddrInfo, error)
FindProvidersAsync(ctx context.Context, key cid.Cid) (<-chan FindProvidersAsyncResult, error)
GetIPNS(ctx context.Context, id []byte) ([]byte, error)
GetIPNSAsync(ctx context.Context, id []byte) (<-chan GetIPNSAsyncResult, error)
PutIPNS(ctx context.Context, id []byte, record []byte) error
PutIPNSAsync(ctx context.Context, id []byte, record []byte) (<-chan PutIPNSAsyncResult, error)
Provide(ctx context.Context, key []cid.Cid, ttl time.Duration) (time.Duration, error)
ProvideAsync(ctx context.Context, key []cid.Cid, ttl time.Duration) (<-chan time.Duration, error)
}

type Client struct {
client proto.DelegatedRouting_Client
validator record.Validator

provider *Provider
identity crypto.PrivKey
}

var _ DelegatedRoutingClient = (*Client)(nil)

// NewClient creates a client.
// The Provider and identity parameters are option. If they are nil, the `Provide` method will not function.
func NewClient(c proto.DelegatedRouting_Client, p *Provider, identity crypto.PrivKey) (*Client, error) {
if p != nil && !p.Peer.ID.MatchesPublicKey(identity.GetPublic()) {
return nil, errors.New("identity does not match provider")
}

return &Client{
client: c,
validator: ipns.Validator{},
provider: p,
identity: identity,
}, nil
}
17 changes: 15 additions & 2 deletions client/contentrouting.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,31 @@ func NewContentRoutingClient(c DelegatedRoutingClient) *ContentRoutingClient {
}

func (c *ContentRoutingClient) Provide(ctx context.Context, key cid.Cid, announce bool) error {
var err error
recordMetrics := startMetrics(ctx, "ContentRoutingClient.Provide")
defer recordMetrics(err)

// If 'true' is
// passed, it also announces it, otherwise it is just kept in the local
// accounting of which objects are being provided.
if !announce {
return nil
}

_, err := c.client.Provide(ctx, []cid.Cid{key}, 24*time.Hour)
_, err = c.client.Provide(ctx, []cid.Cid{key}, 24*time.Hour)
return err
}

func (c *ContentRoutingClient) ProvideMany(ctx context.Context, keys []multihash.Multihash) error {
var err error
recordMetrics := startMetrics(ctx, "ContentRoutingClient.ProvideMany")
defer recordMetrics(err)

keysAsCids := make([]cid.Cid, 0, len(keys))
for _, m := range keys {
keysAsCids = append(keysAsCids, cid.NewCidV1(cid.Raw, m))
}
_, err := c.client.Provide(ctx, keysAsCids, 24*time.Hour)
_, err = c.client.Provide(ctx, keysAsCids, 24*time.Hour)
return err
}

Expand All @@ -51,13 +59,18 @@ func (c *ContentRoutingClient) Ready() bool {
}

func (c *ContentRoutingClient) FindProvidersAsync(ctx context.Context, key cid.Cid, numResults int) <-chan peer.AddrInfo {
var err error
recordMetrics := startMetrics(ctx, "ContentRoutingClient.FindProvidersAsync")

addrInfoCh := make(chan peer.AddrInfo)
resultCh, err := c.client.FindProvidersAsync(ctx, key)
if err != nil {
close(addrInfoCh)
recordMetrics(err)
return addrInfoCh
}
go func() {
defer recordMetrics(nil)
numProcessed := 0
closed := false
for asyncResult := range resultCh {
Expand Down
44 changes: 0 additions & 44 deletions client/findproviders.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,58 +2,14 @@ package client

import (
"context"
"errors"
"time"

"github.com/ipfs/go-cid"
proto "github.com/ipfs/go-delegated-routing/gen/proto"
ipns "github.com/ipfs/go-ipns"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/edelweiss/values"
record "github.com/libp2p/go-libp2p-record"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
)

var logger = logging.Logger("service/client/delegatedrouting")

type DelegatedRoutingClient interface {
FindProviders(ctx context.Context, key cid.Cid) ([]peer.AddrInfo, error)
FindProvidersAsync(ctx context.Context, key cid.Cid) (<-chan FindProvidersAsyncResult, error)
GetIPNS(ctx context.Context, id []byte) ([]byte, error)
GetIPNSAsync(ctx context.Context, id []byte) (<-chan GetIPNSAsyncResult, error)
PutIPNS(ctx context.Context, id []byte, record []byte) error
PutIPNSAsync(ctx context.Context, id []byte, record []byte) (<-chan PutIPNSAsyncResult, error)
Provide(ctx context.Context, key []cid.Cid, ttl time.Duration) (time.Duration, error)
ProvideAsync(ctx context.Context, key []cid.Cid, ttl time.Duration) (<-chan time.Duration, error)
}

type Client struct {
client proto.DelegatedRouting_Client
validator record.Validator

provider *Provider
identity crypto.PrivKey
}

var _ DelegatedRoutingClient = (*Client)(nil)

// NewClient creates a client.
// The Provider and identity parameters are option. If they are nil, the `Provide` method will not function.
func NewClient(c proto.DelegatedRouting_Client, p *Provider, identity crypto.PrivKey) (*Client, error) {
if p != nil && !p.Peer.ID.MatchesPublicKey(identity.GetPublic()) {
return nil, errors.New("identity does not match provider")
}

return &Client{
client: c,
validator: ipns.Validator{},
provider: p,
identity: identity,
}, nil
}

func (fp *Client) FindProviders(ctx context.Context, key cid.Cid) ([]peer.AddrInfo, error) {
resps, err := fp.client.FindProviders(ctx, cidsToFindProvidersRequest(key))
if err != nil {
Expand Down
110 changes: 110 additions & 0 deletions client/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package client

import (
"context"
"errors"
"net"
"time"

"github.com/ipld/edelweiss/services"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
)

var (
defaultDurationDistribution = view.Distribution(0, 1, 2, 5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 20000)

measureDuration = stats.Float64("delegated_routing/duration", "The time to complete an entire request", stats.UnitMilliseconds)
measureRequests = stats.Float64("delegated_routing/requests", "The number of requests made", stats.UnitDimensionless)

keyName = tag.MustNewKey("name")
keyError = tag.MustNewKey("error")

durationView = &view.View{
Measure: measureDuration,
TagKeys: []tag.Key{keyName, keyError},
Aggregation: defaultDurationDistribution,
}
requestsView = &view.View{
Measure: measureRequests,
TagKeys: []tag.Key{keyName, keyError},
Aggregation: view.Sum(),
}

DefaultViews = []*view.View{
durationView,
requestsView,
}
)

// startMetrics begins recording metrics.
// The returned function flushes the metrics when called, recording metrics about the passed error.
func startMetrics(ctx context.Context, name string) (done func(err error)) {
start := time.Now()

return func(err error) {
latency := time.Since(start)

errStr := "None"
if err != nil {
logger.Warnw("received delegated routing error", "Error", err)
errStr = metricsErrStr(err)
}

stats.RecordWithTags(ctx,
[]tag.Mutator{
tag.Upsert(keyName, name),
tag.Upsert(keyError, errStr),
},
measureDuration.M(float64(latency.Milliseconds())),
measureRequests.M(1),
)
}
}

// metricsErrStr returns a string to use for recording metrics from an error.
// We shouldn't use the error string itself as that can result in high-cardinality metrics.
// For more specific root causing, check the logs.
func metricsErrStr(err error) string {
if errors.Is(err, context.DeadlineExceeded) {
return "DeadlineExceeded"
}
if errors.Is(err, context.Canceled) {
return "Canceled"
}
if errors.Is(err, services.ErrSchema) {
return "Schema"
}

var serviceErr *services.ErrService
if errors.As(err, &serviceErr) {
return "Service"
}

var protoErr *services.ErrProto
if errors.As(err, &protoErr) {
return "Proto"
}

var dnsErr *net.DNSError
if errors.As(err, &dnsErr) {
if dnsErr.IsNotFound {
return "DNSNotFound"
}
if dnsErr.IsTimeout {
return "DNSTimeout"
}
return "DNS"
}

var netErr net.Error
if errors.As(err, &netErr) {
if netErr.Timeout() {
return "NetTimeout"
}
return "Net"
}

return "Other"
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
github.com/multiformats/go-multicodec v0.6.0
github.com/multiformats/go-multihash v0.2.1
github.com/polydawn/refmt v0.0.0-20201211092308-30ac6d18308e
go.opencensus.io v0.23.0
)

require (
Expand Down
Loading

0 comments on commit 98c6d42

Please sign in to comment.