Skip to content

Commit

Permalink
Added lock protection for data race
Browse files Browse the repository at this point in the history
Signed-off-by: LiZhenCheng9527 <[email protected]>
  • Loading branch information
LiZhenCheng9527 committed Oct 8, 2024
1 parent 3ed351e commit 0d85d7b
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 16 deletions.
9 changes: 7 additions & 2 deletions ctl/accesslog/accesslog.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,14 @@ kmeshctl accesslog <kmesh-daemon-pod> enable`,
}

func SetAccesslog(cmd *cobra.Command, args []string) {
var info string
podName := args[0]
accesslogFlag := args[1]
if accesslogFlag != "enable" && accesslogFlag != "disable" {
if accesslogFlag == "enable" {
info = "true"
} else if accesslogFlag == "disable" {
info = "false"
} else {
log.Errorf("Error: Argument must be 'enable' or 'disable'")
os.Exit(1)
}
Expand All @@ -66,7 +71,7 @@ func SetAccesslog(cmd *cobra.Command, args []string) {
}
defer fw.Close()

url := fmt.Sprintf("http://%s%s?enable=%s", fw.Address(), patternAccesslog, accesslogFlag)
url := fmt.Sprintf("http://%s%s?enable=%s", fw.Address(), patternAccesslog, info)

req, err := http.NewRequest(http.MethodPost, url, nil)
if err != nil {
Expand Down
27 changes: 24 additions & 3 deletions pkg/controller/telemetry/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,18 @@ const (
var osStartTime time.Time

type MetricController struct {
EnableAccesslog bool
EnableAccesslog enableTrigger
workloadCache cache.WorkloadCache
workloadMetricCache map[workloadMetricLabels]*workloadMetricInfo
serviceMetricCache map[serviceMetricLabels]*serviceMetricInfo
mutex sync.RWMutex
}

type enableTrigger struct {
mutex sync.RWMutex
accesslogTrigger bool
}

type workloadMetricInfo struct {
WorkloadConnOpened float64
WorkloadConnClosed float64
Expand Down Expand Up @@ -176,13 +181,23 @@ type serviceMetricLabels struct {

func NewMetric(workloadCache cache.WorkloadCache, enableAccesslog bool) *MetricController {
return &MetricController{
EnableAccesslog: enableAccesslog,
EnableAccesslog: NewEnabelTrigger(enableAccesslog),

Check warning on line 184 in pkg/controller/telemetry/metric.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/telemetry/metric.go#L184

Added line #L184 was not covered by tests
workloadCache: workloadCache,
workloadMetricCache: map[workloadMetricLabels]*workloadMetricInfo{},
serviceMetricCache: map[serviceMetricLabels]*serviceMetricInfo{},
}
}

func NewEnabelTrigger(info bool) enableTrigger {
return enableTrigger{
accesslogTrigger: info,

Check warning on line 193 in pkg/controller/telemetry/metric.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/telemetry/metric.go#L191-L193

Added lines #L191 - L193 were not covered by tests
}
}

func (t *enableTrigger) GetAccesslogTrigger() bool {
return t.accesslogTrigger

Check warning on line 198 in pkg/controller/telemetry/metric.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/telemetry/metric.go#L197-L198

Added lines #L197 - L198 were not covered by tests
}

func (m *MetricController) Run(ctx context.Context, mapOfTcpInfo *ebpf.Map) {
if m == nil {
return
Expand Down Expand Up @@ -262,7 +277,7 @@ func (m *MetricController) Run(ctx context.Context, mapOfTcpInfo *ebpf.Map) {
serviceLabels.reporter = "source"
accesslog.direction = "OUTBOUND"
}
if data.state == TCP_CLOSTED && accesslog.sourceWorkload != "-" && m.EnableAccesslog {
if data.state == TCP_CLOSTED && accesslog.sourceWorkload != "-" && m.EnableAccesslog.accesslogTrigger {

Check warning on line 280 in pkg/controller/telemetry/metric.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/telemetry/metric.go#L280

Added line #L280 was not covered by tests
OutputAccesslog(data, accesslog)
}
m.mutex.Lock()
Expand Down Expand Up @@ -611,3 +626,9 @@ func restoreIPv4(bytes []byte) []byte {

return bytes[:4]
}

func (t *enableTrigger) SetAccesslogTrigger(info bool) {
t.mutex.Lock()
defer t.mutex.Unlock()
t.accesslogTrigger = info

Check warning on line 633 in pkg/controller/telemetry/metric.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/telemetry/metric.go#L630-L633

Added lines #L630 - L633 were not covered by tests
}
15 changes: 10 additions & 5 deletions pkg/status/status_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,14 +239,19 @@ func (s *Server) loggersHandler(w http.ResponseWriter, r *http.Request) {
}

func (s *Server) accesslogHandler(w http.ResponseWriter, r *http.Request) {
accesslogInfo := r.URL.Query().Get("enable")
if r.Method != http.MethodPost {
log.Errorf("accesslogHandler export POST method but get %v", r.Method)
return

Check warning on line 244 in pkg/status/status_server.go

View check run for this annotation

Codecov / codecov/patch

pkg/status/status_server.go#L243-L244

Added lines #L243 - L244 were not covered by tests
}

if accesslogInfo == "enable" {
s.xdsClient.WorkloadController.MetricController.EnableAccesslog = true
var info bool
accesslogInfo := r.URL.Query().Get("enable")
if accesslogInfo == "true" {
info = true

Check warning on line 250 in pkg/status/status_server.go

View check run for this annotation

Codecov / codecov/patch

pkg/status/status_server.go#L250

Added line #L250 was not covered by tests
} else {
s.xdsClient.WorkloadController.MetricController.EnableAccesslog = false
info = false
}

s.xdsClient.WorkloadController.MetricController.EnableAccesslog.SetAccesslogTrigger(info)
w.WriteHeader(http.StatusOK)
}

Expand Down
15 changes: 9 additions & 6 deletions pkg/status/status_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,9 +488,12 @@ func TestServer_dumpAdsBpfMap(t *testing.T) {
func TestServer_accesslogHandler(t *testing.T) {
t.Run("change accesslog config info", func(t *testing.T) {
config := options.BpfConfig{
Mode: "workload",
BpfFsPath: "/sys/fs/bpf",
Cgroup2Path: "/mnt/kmesh_cgroup2",
Mode: "workload",
BpfFsPath: "/sys/fs/bpf",
Cgroup2Path: "/mnt/kmesh_cgroup2",
EnableMda: false,
EnableBpfLog: false,
EnableAccesslog: true,
}
cleanup, _ := test.InitBpfMap(t, config)
defer cleanup()
Expand All @@ -499,17 +502,17 @@ func TestServer_accesslogHandler(t *testing.T) {
xdsClient: &controller.XdsClient{
WorkloadController: &workload.Controller{
MetricController: &telemetry.MetricController{
EnableAccesslog: false,
EnableAccesslog: telemetry.NewEnabelTrigger(true),
},
},
},
}

url := fmt.Sprintf("%s?enable=%s", patternAccesslog, "disable")
url := fmt.Sprintf("%s?enable=%s", patternAccesslog, "false")
req := httptest.NewRequest(http.MethodPost, url, nil)
w := httptest.NewRecorder()
server.accesslogHandler(w, req)

assert.Equal(t, server.xdsClient.WorkloadController.MetricController.EnableAccesslog, false)
assert.Equal(t, server.xdsClient.WorkloadController.MetricController.EnableAccesslog.GetAccesslogTrigger(), false)
})
}

0 comments on commit 0d85d7b

Please sign in to comment.