From 2115de73048ad4b280cf557f9634b6939e7243f3 Mon Sep 17 00:00:00 2001 From: LiZhenCheng9527 Date: Tue, 13 Aug 2024 16:44:57 +0800 Subject: [PATCH 1/6] add accesslog handle Signed-off-by: LiZhenCheng9527 --- bpf/kmesh/probes/tcp_probe.h | 5 +- pkg/controller/telemetry/accesslog.go | 83 ++++++++++++++++++++++ pkg/controller/telemetry/accesslog_test.go | 81 +++++++++++++++++++++ pkg/controller/telemetry/accesslog_tets.go | 17 +++++ pkg/controller/telemetry/metric.go | 52 ++++++++++---- pkg/controller/telemetry/metric_test.go | 4 +- 6 files changed, 226 insertions(+), 16 deletions(-) create mode 100644 pkg/controller/telemetry/accesslog.go create mode 100644 pkg/controller/telemetry/accesslog_test.go create mode 100644 pkg/controller/telemetry/accesslog_tets.go diff --git a/bpf/kmesh/probes/tcp_probe.h b/bpf/kmesh/probes/tcp_probe.h index 6dd2b0c63..f06845e57 100644 --- a/bpf/kmesh/probes/tcp_probe.h +++ b/bpf/kmesh/probes/tcp_probe.h @@ -5,6 +5,7 @@ #define __KMESH_BPF_ACCESS_LOG_H__ #include "bpf_common.h" +#include "linux/mmtimer.h" // direction enum { @@ -25,10 +26,10 @@ struct tcp_probe_info { __u32 received_bytes; __u32 conn_success; __u32 direction; - __u32 state; /* tcp state */ - __u32 protocol; __u64 duration; // ns __u64 close_ns; + __u32 state; /* tcp state */ + __u32 protocol; __u32 srtt_us; /* smoothed round trip time << 3 in usecs */ __u32 rtt_min; __u32 mss_cache; /* Cached effective mss, not including SACKS */ diff --git a/pkg/controller/telemetry/accesslog.go b/pkg/controller/telemetry/accesslog.go new file mode 100644 index 000000000..75a3d5887 --- /dev/null +++ b/pkg/controller/telemetry/accesslog.go @@ -0,0 +1,83 @@ +/* + * Copyright The Kmesh Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package telemetry + +import ( + "bufio" + "fmt" + "os/exec" + "strings" + "time" +) + +type logInfo struct { + direction string + sentBytes uint32 + receivedBytes uint32 + duration uint64 + + sourceAddress string + sourceWorkload string + sourceNamespace string + + destinationAddress string + destinationService string + destinationWorkload string + destinationNamespace string +} + +func RunAccesslog(data requestMetric, accesslog logInfo) { + _ = buildAccesslog(data, accesslog) +} + +func buildAccesslog(data requestMetric, accesslog logInfo) string { + closeTime := data.closeTime + startTime := getOSBootTime() + uptime := calculateUptime(startTime, closeTime) + + timeInfo := fmt.Sprintf("%v", uptime) + sourceInfo := fmt.Sprintf("src.addr=%s, src.workload=%s, src.namespace=%s", accesslog.sourceAddress, accesslog.sourceWorkload, accesslog.sourceNamespace) + destinationInfo := fmt.Sprintf("dst.addr=%s, dst.service=%s, dst.workload=%s, dst.namespace=%s", accesslog.destinationAddress, accesslog.destinationService, accesslog.destinationWorkload, accesslog.destinationNamespace) + connectionInfo := fmt.Sprintf("direction=%s, sent_bytes=%d, received_bytes=%d, duration=%vms", accesslog.direction, data.sentBytes, data.receivedBytes, (float64(data.duration) / 1000000.0)) + + logResult := fmt.Sprintf("%s %s, %s, %s", timeInfo, sourceInfo, destinationInfo, connectionInfo) + log.Infof("%s", logResult) + return logResult +} + +func getOSBootTime() time.Time { + cmd := exec.Command("uptime", "-s") + stdout, err := cmd.StdoutPipe() + if err != nil { + fmt.Println(err) + } + cmd.Start() + reader := bufio.NewReader(stdout) + timeStr, err2 := reader.ReadString('\n') + if err2 != nil { + log.Errorf("get system last start time error: %v", err2) + } + timeStr = strings.Trim(timeStr, "\n") + bootTime, err := time.Parse("2006-01-02 15:04:05", timeStr) + return bootTime +} + +func calculateUptime(startTime time.Time, elapsedTimeNs uint64) time.Time { + elapsedDuration := time.Duration(elapsedTimeNs) * time.Nanosecond + currentTime := startTime.Add(elapsedDuration) + return currentTime +} diff --git a/pkg/controller/telemetry/accesslog_test.go b/pkg/controller/telemetry/accesslog_test.go new file mode 100644 index 000000000..a7f5a15c7 --- /dev/null +++ b/pkg/controller/telemetry/accesslog_test.go @@ -0,0 +1,81 @@ +/* + * Copyright The Kmesh Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package telemetry + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func Test_buildAccesslog(t *testing.T) { + type args struct { + data requestMetric + accesslog logInfo + } + tests := []struct { + name string + args args + want string + }{ + { + name: "build accesslog", + args: args{ + data: requestMetric{ + sentBytes: uint32(60), + receivedBytes: uint32(172), + duration: uint64(2236000), + closeTime: uint64(3506247005837715), + }, + accesslog: logInfo{ + direction: "INBOUND", + sourceAddress: "10.244.0.10:47667", + sourceWorkload: "sleep-7656cf8794-9v2gv", + sourceNamespace: "kmesh-system", + destinationAddress: "10.244.0.7:8080", + destinationService: "httpbin.ambient-demo.svc.cluster.local", + destinationWorkload: "httpbin-86b8ffc5ff-bhvxx", + destinationNamespace: "kmesh-system", + }, + }, + want: "2024-08-14 10:40:08.005837715 +0000 UTC src.addr=10.244.0.10:47667, src.workload=sleep-7656cf8794-9v2gv, src.namespace=kmesh-system, dst.addr=10.244.0.7:8080, dst.service=httpbin.ambient-demo.svc.cluster.local, dst.workload=httpbin-86b8ffc5ff-bhvxx, dst.namespace=kmesh-system, direction=INBOUND, sent_bytes=60, received_bytes=172, duration=2.236ms", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := buildAccesslog(tt.args.data, tt.args.accesslog) + assert.Equal(t, tt.want, got) + }) + } +} + +func Test_getOSBootTime(t *testing.T) { + t.Run("function test", func(t *testing.T) { + data := getOSBootTime() + fmt.Printf("%v", data) + }) +} + +func Test_calculateUptime(t *testing.T) { + startTime := time.Date(2024, 7, 4, 20, 42, 0, 0, time.UTC) + elapsedTimeNs := uint64(3506247005837715) + want := time.Date(2024, 8, 14, 10, 39, 27, 5837715, time.UTC) + uptime := calculateUptime(startTime, elapsedTimeNs) + assert.Equal(t, want, uptime) +} diff --git a/pkg/controller/telemetry/accesslog_tets.go b/pkg/controller/telemetry/accesslog_tets.go new file mode 100644 index 000000000..b803298fd --- /dev/null +++ b/pkg/controller/telemetry/accesslog_tets.go @@ -0,0 +1,17 @@ +/* + * Copyright The Kmesh Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package telemetry diff --git a/pkg/controller/telemetry/metric.go b/pkg/controller/telemetry/metric.go index 5ec1eb4a4..ee398b2ab 100644 --- a/pkg/controller/telemetry/metric.go +++ b/pkg/controller/telemetry/metric.go @@ -57,6 +57,8 @@ type connectionDataV4 struct { ReceivedBytes uint32 ConnectSuccess uint32 Direction uint32 + Duration uint64 + CloseTime uint64 State uint32 } @@ -69,18 +71,23 @@ type connectionDataV6 struct { ReceivedBytes uint32 ConnectSuccess uint32 Direction uint32 + Duration uint64 + CloseTime uint64 State uint32 } type requestMetric struct { src [4]uint32 dst [4]uint32 + srcPort uint16 dstPort uint16 direction uint32 receivedBytes uint32 sentBytes uint32 state uint32 success uint32 + duration uint64 + closeTime uint64 } type workloadMetricLabels struct { @@ -166,14 +173,13 @@ func (m *MetricController) Run(ctx context.Context, mapOfTcpInfo *ebpf.Map) { // Register metrics to Prometheus and start Prometheus server go RunPrometheusClient(ctx) - rec := ringbuf.Record{} - data := requestMetric{} - for { select { case <-ctx.Done(): return default: + data := requestMetric{} + rec := ringbuf.Record{} if err := reader.ReadInto(&rec); err != nil { log.Errorf("ringbuf reader FAILED to read, err: %v", err) continue @@ -197,19 +203,25 @@ func (m *MetricController) Run(ctx context.Context, mapOfTcpInfo *ebpf.Map) { } workloadLabels := m.buildWorkloadMetric(&data) - serviceLabels := m.buildServiceMetric(&data) + serviceLabels, accesslog := m.buildServiceMetric(&data) workloadLabels.reporter = "-" serviceLabels.reporter = "-" + accesslog.direction = "-" if data.direction == constants.INBOUND { workloadLabels.reporter = "destination" serviceLabels.reporter = "destination" + accesslog.direction = "INBOUND" } if data.direction == constants.OUTBOUND { workloadLabels.reporter = "source" serviceLabels.reporter = "source" + accesslog.direction = "OUTBOUND" } + if data.state == TCP_CLOST { + RunAccesslog(data, accesslog) + } buildWorkloadMetricsToPrometheus(data, workloadLabels) buildServiceMetricsToPrometheus(data, serviceLabels) } @@ -227,10 +239,14 @@ func buildV4Metric(buf *bytes.Buffer) (requestMetric, error) { data.dst[0] = connectData.DstAddr data.direction = connectData.Direction data.dstPort = connectData.DstPort + data.srcPort = connectData.SrcPort + data.sentBytes = connectData.SentBytes data.receivedBytes = connectData.ReceivedBytes data.state = connectData.State data.success = connectData.ConnectSuccess + data.duration = connectData.Duration + data.closeTime = connectData.CloseTime return data, nil } @@ -241,16 +257,18 @@ func buildV6Metric(buf *bytes.Buffer) (requestMetric, error) { if err := binary.Read(buf, binary.LittleEndian, &connectData); err != nil { return data, err } - data.src = connectData.SrcAddr data.dst = connectData.DstAddr data.direction = connectData.Direction data.dstPort = connectData.DstPort + data.srcPort = connectData.SrcPort data.sentBytes = connectData.SentBytes data.receivedBytes = connectData.ReceivedBytes data.state = connectData.State data.success = connectData.ConnectSuccess + data.duration = connectData.Duration + data.closeTime = connectData.CloseTime return data, nil } @@ -274,22 +292,24 @@ func (m *MetricController) buildWorkloadMetric(data *requestMetric) workloadMetr return trafficLabels } -func (m *MetricController) buildServiceMetric(data *requestMetric) serviceMetricLabels { +func (m *MetricController) buildServiceMetric(data *requestMetric) (serviceMetricLabels, logInfo) { var dstAddr, srcAddr []byte for i := range data.dst { dstAddr = binary.LittleEndian.AppendUint32(dstAddr, data.dst[i]) srcAddr = binary.LittleEndian.AppendUint32(srcAddr, data.src[i]) } - dstWorkload, _ := m.getWorkloadByAddress(restoreIPv4(dstAddr)) - srcWorkload, _ := m.getWorkloadByAddress(restoreIPv4(srcAddr)) + dstWorkload, dstIp := m.getWorkloadByAddress(restoreIPv4(dstAddr)) + srcWorkload, srcIp := m.getWorkloadByAddress(restoreIPv4(srcAddr)) - trafficLabels := buildServiceMetric(dstWorkload, srcWorkload, data.dstPort) + trafficLabels, accesslog := buildServiceMetric(dstWorkload, srcWorkload, data.dstPort) trafficLabels.requestProtocol = "tcp" trafficLabels.responseFlags = "-" trafficLabels.connectionSecurityPolicy = "mutual_tls" + accesslog.destinationAddress = dstIp + ":" + fmt.Sprintf("%d", data.dstPort) + accesslog.sourceAddress = srcIp + ":" + fmt.Sprintf("%d", data.srcPort) - return trafficLabels + return trafficLabels, accesslog } func (m *MetricController) getWorkloadByAddress(address []byte) (*workloadapi.Workload, string) { @@ -332,8 +352,9 @@ func buildWorkloadMetric(dstWorkload, srcWorkload *workloadapi.Workload) workloa return trafficLabels } -func buildServiceMetric(dstWorkload, srcWorkload *workloadapi.Workload, dstPort uint16) serviceMetricLabels { +func buildServiceMetric(dstWorkload, srcWorkload *workloadapi.Workload, dstPort uint16) (serviceMetricLabels, logInfo) { trafficLabels := serviceMetricLabels{} + accesslog := logInfo{} if dstWorkload != nil { namespacedhost := "" @@ -369,6 +390,10 @@ func buildServiceMetric(dstWorkload, srcWorkload *workloadapi.Workload, dstPort trafficLabels.destinationCluster = dstWorkload.ClusterId trafficLabels.destinationPrincipal = buildPrincipal(dstWorkload) trafficLabels.destinationPrincipal = buildPrincipal(dstWorkload) + + accesslog.destinationWorkload = dstWorkload.Name + accesslog.destinationNamespace = svcNamespace + accesslog.destinationService = svcHost } if srcWorkload != nil { @@ -380,9 +405,12 @@ func buildServiceMetric(dstWorkload, srcWorkload *workloadapi.Workload, dstPort trafficLabels.sourceVersion = srcWorkload.CanonicalRevision trafficLabels.sourceCluster = srcWorkload.ClusterId trafficLabels.sourcePrincipal = buildPrincipal(srcWorkload) + + accesslog.sourceNamespace = srcWorkload.Namespace + accesslog.sourceWorkload = srcWorkload.Name } - return trafficLabels + return trafficLabels, accesslog } func buildPrincipal(workload *workloadapi.Workload) string { diff --git a/pkg/controller/telemetry/metric_test.go b/pkg/controller/telemetry/metric_test.go index 4085602a7..2a97a0853 100644 --- a/pkg/controller/telemetry/metric_test.go +++ b/pkg/controller/telemetry/metric_test.go @@ -578,7 +578,7 @@ func TestBuildServiceMetric(t *testing.T) { } m.workloadCache.AddOrUpdateWorkload(dstWorkload) m.workloadCache.AddOrUpdateWorkload(srcWorkload) - got := m.buildServiceMetric(tt.args.data) + got, _ := m.buildServiceMetric(tt.args.data) assert.Equal(t, tt.want, got) }) } @@ -701,7 +701,7 @@ func Test_buildServiceMetric(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got := buildServiceMetric(tt.args.dstWorkload, tt.args.srcWorkload, tt.args.dstPort) + got, _ := buildServiceMetric(tt.args.dstWorkload, tt.args.srcWorkload, tt.args.dstPort) assert.Equal(t, tt.want, got) }) } From 8971515b91a2b8b183d33b72c2458d1ffbd64e79 Mon Sep 17 00:00:00 2001 From: LiZhenCheng9527 Date: Wed, 14 Aug 2024 16:18:20 +0800 Subject: [PATCH 2/6] clean up Signed-off-by: LiZhenCheng9527 --- bpf/kmesh/probes/tcp_probe.h | 1 - pkg/controller/telemetry/accesslog_tets.go | 17 ----------------- 2 files changed, 18 deletions(-) delete mode 100644 pkg/controller/telemetry/accesslog_tets.go diff --git a/bpf/kmesh/probes/tcp_probe.h b/bpf/kmesh/probes/tcp_probe.h index f06845e57..1f0d59c1f 100644 --- a/bpf/kmesh/probes/tcp_probe.h +++ b/bpf/kmesh/probes/tcp_probe.h @@ -5,7 +5,6 @@ #define __KMESH_BPF_ACCESS_LOG_H__ #include "bpf_common.h" -#include "linux/mmtimer.h" // direction enum { diff --git a/pkg/controller/telemetry/accesslog_tets.go b/pkg/controller/telemetry/accesslog_tets.go deleted file mode 100644 index b803298fd..000000000 --- a/pkg/controller/telemetry/accesslog_tets.go +++ /dev/null @@ -1,17 +0,0 @@ -/* - * Copyright The Kmesh Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package telemetry From bb9d84f21af2b5862b4e7a6f45634390f0e99eca Mon Sep 17 00:00:00 2001 From: LiZhenCheng9527 Date: Wed, 14 Aug 2024 17:07:34 +0800 Subject: [PATCH 3/6] go lint Signed-off-by: LiZhenCheng9527 --- pkg/controller/telemetry/accesslog.go | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/pkg/controller/telemetry/accesslog.go b/pkg/controller/telemetry/accesslog.go index 75a3d5887..e869885a7 100644 --- a/pkg/controller/telemetry/accesslog.go +++ b/pkg/controller/telemetry/accesslog.go @@ -25,11 +25,7 @@ import ( ) type logInfo struct { - direction string - sentBytes uint32 - receivedBytes uint32 - duration uint64 - + direction string sourceAddress string sourceWorkload string sourceNamespace string @@ -65,14 +61,14 @@ func getOSBootTime() time.Time { if err != nil { fmt.Println(err) } - cmd.Start() + _ = cmd.Start() reader := bufio.NewReader(stdout) timeStr, err2 := reader.ReadString('\n') if err2 != nil { log.Errorf("get system last start time error: %v", err2) } timeStr = strings.Trim(timeStr, "\n") - bootTime, err := time.Parse("2006-01-02 15:04:05", timeStr) + bootTime, _ := time.Parse("2006-01-02 15:04:05", timeStr) return bootTime } From c447e7a7e4e62ed0b24bf000bf99c82c7bd4578d Mon Sep 17 00:00:00 2001 From: LiZhenCheng9527 Date: Wed, 14 Aug 2024 17:34:39 +0800 Subject: [PATCH 4/6] add ut robust Signed-off-by: LiZhenCheng9527 --- pkg/controller/telemetry/accesslog_test.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pkg/controller/telemetry/accesslog_test.go b/pkg/controller/telemetry/accesslog_test.go index a7f5a15c7..893e71d2b 100644 --- a/pkg/controller/telemetry/accesslog_test.go +++ b/pkg/controller/telemetry/accesslog_test.go @@ -21,6 +21,7 @@ import ( "testing" "time" + "github.com/agiledragon/gomonkey/v2" "github.com/stretchr/testify/assert" ) @@ -54,9 +55,14 @@ func Test_buildAccesslog(t *testing.T) { destinationNamespace: "kmesh-system", }, }, - want: "2024-08-14 10:40:08.005837715 +0000 UTC src.addr=10.244.0.10:47667, src.workload=sleep-7656cf8794-9v2gv, src.namespace=kmesh-system, dst.addr=10.244.0.7:8080, dst.service=httpbin.ambient-demo.svc.cluster.local, dst.workload=httpbin-86b8ffc5ff-bhvxx, dst.namespace=kmesh-system, direction=INBOUND, sent_bytes=60, received_bytes=172, duration=2.236ms", + want: "2024-08-14 10:11:27.005837715 +0000 UTC src.addr=10.244.0.10:47667, src.workload=sleep-7656cf8794-9v2gv, src.namespace=kmesh-system, dst.addr=10.244.0.7:8080, dst.service=httpbin.ambient-demo.svc.cluster.local, dst.workload=httpbin-86b8ffc5ff-bhvxx, dst.namespace=kmesh-system, direction=INBOUND, sent_bytes=60, received_bytes=172, duration=2.236ms", }, } + patch := gomonkey.NewPatches() + patch.ApplyFunc(getOSBootTime, func() time.Time { + return time.Date(2024, 7, 4, 20, 14, 0, 0, time.UTC) + }) + defer patch.Reset() for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { got := buildAccesslog(tt.args.data, tt.args.accesslog) From 5bcb9373996d1961ff66dbad60db6159651548d2 Mon Sep 17 00:00:00 2001 From: LiZhenCheng9527 Date: Wed, 28 Aug 2024 17:05:04 +0800 Subject: [PATCH 5/6] fix by comment Signed-off-by: LiZhenCheng9527 --- pkg/controller/telemetry/accesslog.go | 33 ++++++++++++++-------- pkg/controller/telemetry/accesslog_test.go | 12 ++------ pkg/controller/telemetry/metric.go | 17 ++++++++--- 3 files changed, 37 insertions(+), 25 deletions(-) diff --git a/pkg/controller/telemetry/accesslog.go b/pkg/controller/telemetry/accesslog.go index e869885a7..ecc848d44 100644 --- a/pkg/controller/telemetry/accesslog.go +++ b/pkg/controller/telemetry/accesslog.go @@ -37,13 +37,13 @@ type logInfo struct { } func RunAccesslog(data requestMetric, accesslog logInfo) { - _ = buildAccesslog(data, accesslog) + logStr := buildAccesslog(data, accesslog) + fmt.Println("accesslog:", logStr) } func buildAccesslog(data requestMetric, accesslog logInfo) string { closeTime := data.closeTime - startTime := getOSBootTime() - uptime := calculateUptime(startTime, closeTime) + uptime := calculateUptime(osStartTime, closeTime) timeInfo := fmt.Sprintf("%v", uptime) sourceInfo := fmt.Sprintf("src.addr=%s, src.workload=%s, src.namespace=%s", accesslog.sourceAddress, accesslog.sourceWorkload, accesslog.sourceNamespace) @@ -51,25 +51,34 @@ func buildAccesslog(data requestMetric, accesslog logInfo) string { connectionInfo := fmt.Sprintf("direction=%s, sent_bytes=%d, received_bytes=%d, duration=%vms", accesslog.direction, data.sentBytes, data.receivedBytes, (float64(data.duration) / 1000000.0)) logResult := fmt.Sprintf("%s %s, %s, %s", timeInfo, sourceInfo, destinationInfo, connectionInfo) - log.Infof("%s", logResult) return logResult } -func getOSBootTime() time.Time { +func getOSBootTime() (time.Time, error) { cmd := exec.Command("uptime", "-s") stdout, err := cmd.StdoutPipe() if err != nil { - fmt.Println(err) + return time.Time{}, err } - _ = cmd.Start() + + err = cmd.Start() + if err != nil { + return time.Time{}, err + } + reader := bufio.NewReader(stdout) - timeStr, err2 := reader.ReadString('\n') - if err2 != nil { - log.Errorf("get system last start time error: %v", err2) + timeStr, err := reader.ReadString('\n') + if err != nil { + return time.Time{}, err } + timeStr = strings.Trim(timeStr, "\n") - bootTime, _ := time.Parse("2006-01-02 15:04:05", timeStr) - return bootTime + bootTime, err := time.Parse("2006-01-02 15:04:05", timeStr) + if err != nil { + return time.Time{}, err + } + + return bootTime, nil } func calculateUptime(startTime time.Time, elapsedTimeNs uint64) time.Time { diff --git a/pkg/controller/telemetry/accesslog_test.go b/pkg/controller/telemetry/accesslog_test.go index 893e71d2b..89c42d719 100644 --- a/pkg/controller/telemetry/accesslog_test.go +++ b/pkg/controller/telemetry/accesslog_test.go @@ -17,11 +17,9 @@ package telemetry import ( - "fmt" "testing" "time" - "github.com/agiledragon/gomonkey/v2" "github.com/stretchr/testify/assert" ) @@ -58,11 +56,7 @@ func Test_buildAccesslog(t *testing.T) { want: "2024-08-14 10:11:27.005837715 +0000 UTC src.addr=10.244.0.10:47667, src.workload=sleep-7656cf8794-9v2gv, src.namespace=kmesh-system, dst.addr=10.244.0.7:8080, dst.service=httpbin.ambient-demo.svc.cluster.local, dst.workload=httpbin-86b8ffc5ff-bhvxx, dst.namespace=kmesh-system, direction=INBOUND, sent_bytes=60, received_bytes=172, duration=2.236ms", }, } - patch := gomonkey.NewPatches() - patch.ApplyFunc(getOSBootTime, func() time.Time { - return time.Date(2024, 7, 4, 20, 14, 0, 0, time.UTC) - }) - defer patch.Reset() + osStartTime = time.Date(2024, 7, 4, 20, 14, 0, 0, time.UTC) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { got := buildAccesslog(tt.args.data, tt.args.accesslog) @@ -73,8 +67,8 @@ func Test_buildAccesslog(t *testing.T) { func Test_getOSBootTime(t *testing.T) { t.Run("function test", func(t *testing.T) { - data := getOSBootTime() - fmt.Printf("%v", data) + _, err := getOSBootTime() + assert.NoError(t, err) }) } diff --git a/pkg/controller/telemetry/metric.go b/pkg/controller/telemetry/metric.go index ee398b2ab..9dac6863e 100644 --- a/pkg/controller/telemetry/metric.go +++ b/pkg/controller/telemetry/metric.go @@ -24,6 +24,7 @@ import ( "net/netip" "reflect" "strings" + "time" "unsafe" "github.com/cilium/ebpf" @@ -36,13 +37,15 @@ import ( const ( TCP_ESTABLISHED = uint32(1) - TCP_CLOST = uint32(7) + TCP_CLOSTED = uint32(7) connection_success = uint32(1) MSG_LEN = 112 ) +var osStartTime time.Time + type MetricController struct { workloadCache cache.WorkloadCache } @@ -159,6 +162,12 @@ func (m *MetricController) Run(ctx context.Context, mapOfTcpInfo *ebpf.Map) { return } + var err error + osStartTime, err = getOSBootTime() + if err != nil { + log.Errorf("get latest os boot time for accesslog failed: %v", err) + } + reader, err := ringbuf.NewReader(mapOfTcpInfo) if err != nil { log.Errorf("open metric notify ringbuf map FAILED, err: %v", err) @@ -219,7 +228,7 @@ func (m *MetricController) Run(ctx context.Context, mapOfTcpInfo *ebpf.Map) { accesslog.direction = "OUTBOUND" } - if data.state == TCP_CLOST { + if data.state == TCP_CLOSTED { RunAccesslog(data, accesslog) } buildWorkloadMetricsToPrometheus(data, workloadLabels) @@ -426,7 +435,7 @@ func buildWorkloadMetricsToPrometheus(data requestMetric, labels workloadMetricL if data.state == TCP_ESTABLISHED { tcpConnectionOpenedInWorkload.With(commonLabels).Add(float64(1)) } - if data.state == TCP_CLOST { + if data.state == TCP_CLOSTED { tcpConnectionClosedInWorkload.With(commonLabels).Add(float64(1)) } if data.success != connection_success { @@ -442,7 +451,7 @@ func buildServiceMetricsToPrometheus(data requestMetric, labels serviceMetricLab if data.state == TCP_ESTABLISHED { tcpConnectionOpenedInService.With(commonLabels).Add(float64(1)) } - if data.state == TCP_CLOST { + if data.state == TCP_CLOSTED { tcpConnectionClosedInService.With(commonLabels).Add(float64(1)) } if data.success != uint32(1) { From 45ba81729b3b113567f2880c86431c699000acdd Mon Sep 17 00:00:00 2001 From: LiZhenCheng9527 Date: Thu, 29 Aug 2024 14:36:11 +0800 Subject: [PATCH 6/6] not dependent on command to get last reboot time Signed-off-by: LiZhenCheng9527 --- pkg/controller/telemetry/accesslog.go | 32 ++++++++------------------- pkg/controller/telemetry/metric.go | 2 +- 2 files changed, 10 insertions(+), 24 deletions(-) diff --git a/pkg/controller/telemetry/accesslog.go b/pkg/controller/telemetry/accesslog.go index ecc848d44..1d2aa00c8 100644 --- a/pkg/controller/telemetry/accesslog.go +++ b/pkg/controller/telemetry/accesslog.go @@ -17,10 +17,8 @@ package telemetry import ( - "bufio" "fmt" - "os/exec" - "strings" + "syscall" "time" ) @@ -36,7 +34,7 @@ type logInfo struct { destinationNamespace string } -func RunAccesslog(data requestMetric, accesslog logInfo) { +func OutputAccesslog(data requestMetric, accesslog logInfo) { logStr := buildAccesslog(data, accesslog) fmt.Println("accesslog:", logStr) } @@ -55,30 +53,18 @@ func buildAccesslog(data requestMetric, accesslog logInfo) string { } func getOSBootTime() (time.Time, error) { - cmd := exec.Command("uptime", "-s") - stdout, err := cmd.StdoutPipe() - if err != nil { - return time.Time{}, err - } + now := time.Now() + now = now.Round(time.Duration(now.Second())) - err = cmd.Start() - if err != nil { + sysinfo := &syscall.Sysinfo_t{} + if err := syscall.Sysinfo(sysinfo); err != nil { return time.Time{}, err } - reader := bufio.NewReader(stdout) - timeStr, err := reader.ReadString('\n') - if err != nil { - return time.Time{}, err - } - - timeStr = strings.Trim(timeStr, "\n") - bootTime, err := time.Parse("2006-01-02 15:04:05", timeStr) - if err != nil { - return time.Time{}, err - } + uptime := time.Duration(sysinfo.Uptime) * time.Second + lastRebootTime := now.Add(-uptime) - return bootTime, nil + return lastRebootTime, nil } func calculateUptime(startTime time.Time, elapsedTimeNs uint64) time.Time { diff --git a/pkg/controller/telemetry/metric.go b/pkg/controller/telemetry/metric.go index 9dac6863e..124519a60 100644 --- a/pkg/controller/telemetry/metric.go +++ b/pkg/controller/telemetry/metric.go @@ -229,7 +229,7 @@ func (m *MetricController) Run(ctx context.Context, mapOfTcpInfo *ebpf.Map) { } if data.state == TCP_CLOSTED { - RunAccesslog(data, accesslog) + OutputAccesslog(data, accesslog) } buildWorkloadMetricsToPrometheus(data, workloadLabels) buildServiceMetricsToPrometheus(data, serviceLabels)