Skip to content

Commit

Permalink
Merge pull request #803 from LiZhenCheng9527/batch-update-metric
Browse files Browse the repository at this point in the history
Batch update metrics through prometheus API
  • Loading branch information
hzxuzhonghu authored Sep 5, 2024
2 parents e73ace7 + 704c06a commit 0caf9d9
Show file tree
Hide file tree
Showing 5 changed files with 409 additions and 58 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ require (
github.com/golang/protobuf v1.5.4
github.com/miekg/dns v1.1.62
github.com/prometheus/client_golang v1.20.2
github.com/prometheus/client_model v0.6.1
github.com/prometheus/common v0.56.0
github.com/sirupsen/logrus v1.9.3
github.com/spf13/cobra v1.8.1
Expand Down Expand Up @@ -160,6 +159,7 @@ require (
github.com/pkg/errors v0.9.1 // indirect
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/prometheus/prometheus v0.52.1 // indirect
github.com/quic-go/qpack v0.4.0 // indirect
Expand Down
162 changes: 133 additions & 29 deletions pkg/controller/telemetry/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"net/netip"
"reflect"
"strings"
"sync"
"time"
"unsafe"

Expand All @@ -42,12 +43,33 @@ const (
connection_success = uint32(1)

MSG_LEN = 112

metricFlushInterval = 5 * time.Second
)

var osStartTime time.Time

type MetricController struct {
workloadCache cache.WorkloadCache
workloadCache cache.WorkloadCache
workloadMetricCache map[workloadMetricLabels]*workloadMetricInfo
serviceMetricCache map[serviceMetricLabels]*serviceMetricInfo
mutex sync.RWMutex
}

type workloadMetricInfo struct {
WorkloadConnOpened float64
WorkloadConnClosed float64
WorkloadConnSentBytes float64
WorkloadConnReceivedBytes float64
WorkloadConnFailed float64
}

type serviceMetricInfo struct {
ServiceConnOpened float64
ServiceConnClosed float64
ServiceConnSentBytes float64
ServiceConnReceivedBytes float64
ServiceConnFailed float64
}

type connectionDataV4 struct {
Expand Down Expand Up @@ -153,7 +175,9 @@ type serviceMetricLabels struct {

func NewMetric(workloadCache cache.WorkloadCache) *MetricController {
return &MetricController{
workloadCache: workloadCache,
workloadCache: workloadCache,
workloadMetricCache: map[workloadMetricLabels]*workloadMetricInfo{},
serviceMetricCache: map[serviceMetricLabels]*serviceMetricInfo{},
}
}

Expand Down Expand Up @@ -181,6 +205,18 @@ func (m *MetricController) Run(ctx context.Context, mapOfTcpInfo *ebpf.Map) {

// Register metrics to Prometheus and start Prometheus server
go RunPrometheusClient(ctx)
go func() {
for {
select {
case <-ctx.Done():
return
default:
// Metrics updated every 5 seconds
time.Sleep(metricFlushInterval)
m.updatePrometheusMetric()
}
}
}()

for {
select {
Expand All @@ -197,7 +233,6 @@ func (m *MetricController) Run(ctx context.Context, mapOfTcpInfo *ebpf.Map) {
log.Errorf("wrong length %v of a msg, should be %v", len(rec.RawSample), MSG_LEN)
continue
}

connectType := binary.LittleEndian.Uint32(rec.RawSample)
originInfo := rec.RawSample[unsafe.Sizeof(connectType):]
buf := bytes.NewBuffer(originInfo)
Expand All @@ -210,7 +245,6 @@ func (m *MetricController) Run(ctx context.Context, mapOfTcpInfo *ebpf.Map) {
log.Errorf("get connection info failed: %v", err)
continue
}

workloadLabels := m.buildWorkloadMetric(&data)
serviceLabels, accesslog := m.buildServiceMetric(&data)

Expand All @@ -227,12 +261,13 @@ func (m *MetricController) Run(ctx context.Context, mapOfTcpInfo *ebpf.Map) {
serviceLabels.reporter = "source"
accesslog.direction = "OUTBOUND"
}

if data.state == TCP_CLOSTED {
OutputAccesslog(data, accesslog)
}
buildWorkloadMetricsToPrometheus(data, workloadLabels)
buildServiceMetricsToPrometheus(data, serviceLabels)
m.mutex.Lock()
m.updateWorkloadMetricCache(data, workloadLabels)
m.updateServiceMetricCache(data, serviceLabels)
m.mutex.Unlock()
}
}
}
Expand Down Expand Up @@ -429,36 +464,105 @@ func buildPrincipal(workload *workloadapi.Workload) string {
return "-"
}

func buildWorkloadMetricsToPrometheus(data requestMetric, labels workloadMetricLabels) {
commonLabels := struct2map(labels)

if data.state == TCP_ESTABLISHED {
tcpConnectionOpenedInWorkload.With(commonLabels).Add(float64(1))
}
if data.state == TCP_CLOSTED {
tcpConnectionClosedInWorkload.With(commonLabels).Add(float64(1))
func (m *MetricController) updateWorkloadMetricCache(data requestMetric, labels workloadMetricLabels) {
v, ok := m.workloadMetricCache[labels]
if ok {
if data.state == TCP_ESTABLISHED {
v.WorkloadConnOpened = v.WorkloadConnOpened + 1
}
if data.state == TCP_CLOSTED {
v.WorkloadConnClosed = v.WorkloadConnClosed + 1
}
if data.success != connection_success {
v.WorkloadConnFailed = v.WorkloadConnFailed + 1
}
v.WorkloadConnReceivedBytes = v.WorkloadConnReceivedBytes + float64(data.receivedBytes)
v.WorkloadConnSentBytes = v.WorkloadConnSentBytes + float64(data.sentBytes)
} else {
newWorkloadMetricInfo := workloadMetricInfo{}
if data.state == TCP_ESTABLISHED {
newWorkloadMetricInfo.WorkloadConnOpened = 1
}
if data.state == TCP_CLOSTED {
newWorkloadMetricInfo.WorkloadConnClosed = 1
}
if data.success != connection_success {
newWorkloadMetricInfo.WorkloadConnFailed = 1
}
newWorkloadMetricInfo.WorkloadConnReceivedBytes = float64(data.receivedBytes)
newWorkloadMetricInfo.WorkloadConnSentBytes = float64(data.sentBytes)
m.workloadMetricCache[labels] = &newWorkloadMetricInfo
}
if data.success != connection_success {
tcpConnectionFailedInWorkload.With(commonLabels).Add(float64(1))
}

func (m *MetricController) updateServiceMetricCache(data requestMetric, labels serviceMetricLabels) {
v, ok := m.serviceMetricCache[labels]
if ok {
if data.state == TCP_ESTABLISHED {
v.ServiceConnOpened = v.ServiceConnOpened + 1
}
if data.state == TCP_CLOSTED {
v.ServiceConnClosed = v.ServiceConnClosed + 1
}
if data.success != connection_success {
v.ServiceConnFailed = v.ServiceConnFailed + 1
}
v.ServiceConnReceivedBytes = v.ServiceConnReceivedBytes + float64(data.receivedBytes)
v.ServiceConnSentBytes = v.ServiceConnSentBytes + float64(data.sentBytes)
} else {
newServiceMetricInfo := serviceMetricInfo{}
if data.state == TCP_ESTABLISHED {
newServiceMetricInfo.ServiceConnOpened = 1
}
if data.state == TCP_CLOSTED {
newServiceMetricInfo.ServiceConnClosed = 1
}
if data.success != connection_success {
newServiceMetricInfo.ServiceConnFailed = 1
}
newServiceMetricInfo.ServiceConnReceivedBytes = float64(data.receivedBytes)
newServiceMetricInfo.ServiceConnSentBytes = float64(data.sentBytes)
m.serviceMetricCache[labels] = &newServiceMetricInfo
}
tcpReceivedBytesInWorkload.With(commonLabels).Add(float64(data.receivedBytes))
tcpSentBytesInWorkload.With(commonLabels).Add(float64(data.sentBytes))
}

func buildServiceMetricsToPrometheus(data requestMetric, labels serviceMetricLabels) {
commonLabels := struct2map(labels)
func (m *MetricController) updatePrometheusMetric() {
m.mutex.Lock()
workloadInfoCache := m.workloadMetricCache
serviceInfoCache := m.serviceMetricCache
m.workloadMetricCache = map[workloadMetricLabels]*workloadMetricInfo{}
m.serviceMetricCache = map[serviceMetricLabels]*serviceMetricInfo{}
m.mutex.Unlock()

for k, v := range workloadInfoCache {
workloadLabels := struct2map(k)
tcpConnectionOpenedInWorkload.With(workloadLabels).Add(v.WorkloadConnOpened)
tcpConnectionClosedInWorkload.With(workloadLabels).Add(v.WorkloadConnClosed)
tcpSentBytesInWorkload.With(workloadLabels).Add(v.WorkloadConnSentBytes)
tcpReceivedBytesInWorkload.With(workloadLabels).Add(v.WorkloadConnReceivedBytes)
tcpConnectionFailedInWorkload.With(workloadLabels).Add(v.WorkloadConnFailed)
}

if data.state == TCP_ESTABLISHED {
tcpConnectionOpenedInService.With(commonLabels).Add(float64(1))
for k, v := range serviceInfoCache {
serviceLabels := struct2map(k)
tcpConnectionOpenedInService.With(serviceLabels).Add(v.ServiceConnOpened)
tcpConnectionClosedInService.With(serviceLabels).Add(v.ServiceConnClosed)
tcpConnectionFailedInService.With(serviceLabels).Add(v.ServiceConnFailed)
tcpReceivedBytesInService.With(serviceLabels).Add(v.ServiceConnReceivedBytes)
tcpSentBytesInService.With(serviceLabels).Add(v.ServiceConnSentBytes)
}
if data.state == TCP_CLOSTED {
tcpConnectionClosedInService.With(commonLabels).Add(float64(1))

// delete metrics
workloadDeleteLength := len(deleteWorkload)
serviceDeleteLength := len(deleteService)
for i := 0; i < workloadDeleteLength; i++ {
deleteWorkloadMetricInPrometheus(deleteWorkload[i])
}
if data.success != uint32(1) {
tcpConnectionFailedInService.With(commonLabels).Add(float64(1))
for i := 0; i < serviceDeleteLength; i++ {
deleteServiceMetricInPrometheus(deleteService[i])
}
tcpReceivedBytesInService.With(commonLabels).Add(float64(data.receivedBytes))
tcpSentBytesInService.With(commonLabels).Add(float64(data.sentBytes))
deleteWorkload = deleteWorkload[workloadDeleteLength:]
deleteService = deleteService[serviceDeleteLength:]
}

func struct2map(labels interface{}) map[string]string {
Expand Down
Loading

0 comments on commit 0caf9d9

Please sign in to comment.