Skip to content

Commit ed0775f

Browse files
committed
Prototype to stream Envoy metrics to Ambassador's backend
Signed-off-by: Douglas Camata <[email protected]>
1 parent a052571 commit ed0775f

File tree

3 files changed

+96
-6
lines changed

3 files changed

+96
-6
lines changed

pkg/agent/agent.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7+
envoyMetrics "github.com/datawire/ambassador/v2/pkg/api/envoy/service/metrics/v2"
78
"io/ioutil"
89
"net/http"
910
"net/url"
@@ -599,6 +600,24 @@ func (a *Agent) ProcessSnapshot(ctx context.Context, snapshot *snapshotTypes.Sna
599600
return nil
600601
}
601602

603+
func (a *Agent) MetricsRelayHandler(in *envoyMetrics.StreamMetricsMessage) {
604+
ctx := context.Background()
605+
dlog.Debugf(ctx, "received %d metrics", len(in.GetEnvoyMetrics()))
606+
if a.comm != nil && !a.reportingStopped {
607+
a.ambassadorAPIKeyMutex.Lock()
608+
apikey := a.ambassadorAPIKey
609+
a.ambassadorAPIKeyMutex.Unlock()
610+
outMessage := &agent.StreamMetricsMessage{
611+
Identity: a.agentID,
612+
EnvoyMetrics: in.EnvoyMetrics,
613+
}
614+
dlog.Debugf(ctx, "relaying %d metrics", len(outMessage.GetEnvoyMetrics()))
615+
if err := a.comm.StreamMetrics(ctx, outMessage, apikey); err != nil {
616+
dlog.Errorf(ctx, "Error streaming metrics: %+v", err)
617+
}
618+
}
619+
}
620+
602621
// ClearComm ends the current connection to the Director, if it exists, thereby
603622
// forcing a new connection to be created when needed.
604623
func (a *Agent) ClearComm() {

pkg/agent/comm.go

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88
"io"
99
"net/url"
10+
"sync"
1011

1112
"google.golang.org/grpc"
1213
"google.golang.org/grpc/credentials"
@@ -19,12 +20,13 @@ import (
1920
const APIKeyMetadataKey = "x-ambassador-api-key"
2021

2122
type RPCComm struct {
22-
conn io.Closer
23-
client agent.DirectorClient
24-
rptWake chan struct{}
25-
retCancel context.CancelFunc
26-
agentID *agent.Identity
27-
directives chan *agent.Directive
23+
conn io.Closer
24+
client agent.DirectorClient
25+
rptWake chan struct{}
26+
retCancel context.CancelFunc
27+
agentID *agent.Identity
28+
directives chan *agent.Directive
29+
metricsStreamWriterMutex sync.Mutex
2830
}
2931

3032
const (
@@ -189,6 +191,19 @@ func (c *RPCComm) Report(ctx context.Context, report *agent.Snapshot, apiKey str
189191
return nil
190192
}
191193

194+
func (c *RPCComm) StreamMetrics(ctx context.Context, metrics *agent.StreamMetricsMessage, apiKey string) error {
195+
ctx = dlog.WithField(ctx, "agent", "streammetrics")
196+
197+
c.metricsStreamWriterMutex.Lock()
198+
defer c.metricsStreamWriterMutex.Unlock()
199+
ctx = metadata.AppendToOutgoingContext(ctx, APIKeyMetadataKey, apiKey)
200+
streamClient, err := c.client.StreamMetrics(ctx)
201+
if err != nil {
202+
return err
203+
}
204+
return streamClient.Send(metrics)
205+
}
206+
192207
func (c *RPCComm) Directives() <-chan *agent.Directive {
193208
return c.directives
194209
}

pkg/agent/envoy_metrics_server.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package agent
2+
3+
import (
4+
"context"
5+
envoyMetrics "github.com/datawire/ambassador/v2/pkg/api/envoy/service/metrics/v2"
6+
"github.com/datawire/dlib/dlog"
7+
"google.golang.org/grpc"
8+
"io"
9+
"net"
10+
)
11+
12+
type streamHandler func(in *envoyMetrics.StreamMetricsMessage)
13+
14+
type metricsServer struct {
15+
envoyMetrics.MetricsServiceServer
16+
handler streamHandler
17+
}
18+
19+
// NewMetricsServer is the main metricsServer constructor.
20+
func NewMetricsServer(handler streamHandler) *metricsServer {
21+
return &metricsServer{
22+
handler: handler,
23+
}
24+
}
25+
26+
// StartServer will start the metrics gRPC server, listening on :8123
27+
// It is a blocking call until grpcServer.Serve returns.
28+
func (s *metricsServer) StartServer(ctx context.Context) error {
29+
grpcServer := grpc.NewServer()
30+
envoyMetrics.RegisterMetricsServiceServer(grpcServer, s)
31+
32+
listener, err := net.Listen("tcp", ":8123")
33+
if err != nil {
34+
dlog.Errorf(ctx, "metrics service failed to listen: %v", err)
35+
}
36+
37+
dlog.Infof(ctx, "metrics service listening on %s", listener.Addr().String())
38+
return grpcServer.Serve(listener)
39+
}
40+
41+
// StreamMetrics implements the StreamMetrics rpc call by calling the stream handler on each
42+
// message received.
43+
func (s *metricsServer) StreamMetrics(stream envoyMetrics.MetricsService_StreamMetricsServer) error {
44+
ctx := stream.Context()
45+
dlog.Debug(ctx, "started stream")
46+
for {
47+
in, err := stream.Recv()
48+
if err == io.EOF {
49+
return nil
50+
}
51+
if err != nil {
52+
return err
53+
}
54+
s.handler(in)
55+
}
56+
}

0 commit comments

Comments
 (0)