Skip to content

Commit

Permalink
enable/disable accesslog with kmeshctl
Browse files Browse the repository at this point in the history
Signed-off-by: LiZhenCheng9527 <[email protected]>
  • Loading branch information
LiZhenCheng9527 committed Oct 9, 2024
1 parent 973cbc6 commit 0a4919a
Show file tree
Hide file tree
Showing 9 changed files with 221 additions and 21 deletions.
124 changes: 124 additions & 0 deletions ctl/accesslog/accesslog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Copyright The Kmesh Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package accesslog

import (
"context"
"fmt"
"net/http"
"os"

"github.com/spf13/cobra"
"istio.io/istio/pkg/kube"

"kmesh.net/kmesh/ctl/utils"
"kmesh.net/kmesh/pkg/logger"
)

const (
patternAccesslog = "/accesslog"
)

var log = logger.NewLoggerScope("kmeshctl/accesslog")

func NewCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "accesslog",
Short: "Enable or disable Kmesh's accesslog",
Example: `# Enable Kmesh's accesslog:
kmeshctl accesslog <kmesh-daemon-pod> enable
# Disable Kmesh's accesslog:
kmeshctl accesslog <kmesh-daemon-pod> disable`,
Args: cobra.MinimumNArgs(1),
Run: func(cmd *cobra.Command, args []string) {
SetAccesslog(cmd, args)
},
}
return cmd
}

func SetAccesslog(cmd *cobra.Command, args []string) {
var info string
accesslogFlag := args[len(args)-1]
if accesslogFlag == "enable" {
info = "true"
} else if accesslogFlag == "disable" {
info = "false"
} else {
log.Errorf("Error: Argument must be 'enable' or 'disable'")
os.Exit(1)
}

cli, err := utils.CreateKubeClient()
if err != nil {
log.Errorf("failed to create cli client: %v", err)
os.Exit(1)
}

if len(args) == 1 {
// Perform operations on all kmesh daemons.
podList, err := cli.PodsForSelector(context.TODO(), utils.KmeshNamespace, utils.KmeshLabel)
if err != nil {
log.Errorf("failed to get kmesh podList: %v", err)
os.Exit(1)
}
for _, pod := range podList.Items {
SetAccesslogPerKmeshDaemon(cli, pod.GetName(), info)
}
} else {
// Processes accesslog triggers for specified kmesh daemon.
for _, podname := range args[:len(args)-1] {
SetAccesslogPerKmeshDaemon(cli, podname, info)
}
}
}

func SetAccesslogPerKmeshDaemon(cli kube.CLIClient, podName, info string) {
fw, err := utils.CreateKmeshPortForwarder(cli, podName)
if err != nil {
log.Errorf("failed to create port forwarder for Kmesh daemon pod %s: %v", podName, err)
os.Exit(1)
}
if err := fw.Start(); err != nil {
log.Errorf("failed to start port forwarder for Kmesh daemon pod %s: %v", podName, err)
os.Exit(1)
}
defer fw.Close()

url := fmt.Sprintf("http://%s%s?enable=%s", fw.Address(), patternAccesslog, info)

req, err := http.NewRequest(http.MethodPost, url, nil)
if err != nil {
log.Errorf("Error creating request: %v", err)
return
}

req.Header.Set("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
log.Errorf("failed to make HTTP request: %v", err)
return
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
log.Errorf("Error: received status code %d", resp.StatusCode)
return
}
}
8 changes: 7 additions & 1 deletion ctl/dump/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,13 @@ func RunDump(cmd *cobra.Command, args []string) error {
os.Exit(1)
}

fw, err := utils.CreateKmeshPortForwarder(podName)
cli, err := utils.CreateKubeClient()
if err != nil {
log.Errorf("failed to create cli client: %v", err)
os.Exit(1)
}

fw, err := utils.CreateKmeshPortForwarder(cli, podName)
if err != nil {
log.Errorf("failed to create port forwarder for Kmesh daemon pod %s: %v", podName, err)
os.Exit(1)
Expand Down
8 changes: 7 additions & 1 deletion ctl/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,13 @@ func SetLoggerLevel(url string, setFlag string) {
func RunGetOrSetLoggerLevel(cmd *cobra.Command, args []string) {
podName := args[0]

fw, err := utils.CreateKmeshPortForwarder(podName)
cli, err := utils.CreateKubeClient()
if err != nil {
log.Errorf("failed to create cli client: %v", err)
os.Exit(1)
}

fw, err := utils.CreateKmeshPortForwarder(cli, podName)
if err != nil {
log.Errorf("failed to create port forwarder for Kmesh daemon pod %s: %v", podName, err)
os.Exit(1)
Expand Down
2 changes: 2 additions & 0 deletions ctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/spf13/cobra"

"kmesh.net/kmesh/ctl/accesslog"
"kmesh.net/kmesh/ctl/dump"
logcmd "kmesh.net/kmesh/ctl/log"
"kmesh.net/kmesh/ctl/waypoint"
Expand All @@ -39,6 +40,7 @@ func main() {
rootCmd.AddCommand(logcmd.NewCmd())
rootCmd.AddCommand(dump.NewCmd())
rootCmd.AddCommand(waypoint.NewCmd())
rootCmd.AddCommand(accesslog.NewCmd())

if err := rootCmd.Execute(); err != nil {
os.Exit(1)
Expand Down
31 changes: 16 additions & 15 deletions ctl/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,10 @@ import (

const (
KmeshNamespace = "kmesh-system"
KmeshLabel = "app=kmesh"
KmeshAdminPort = 15200
)

// Create a new PortForwarder configured for the given Kmesh daemon pod.
func CreateKmeshPortForwarder(podName string) (kube.PortForwarder, error) {
cli, err := CreateKubeClient()
if err != nil {
return nil, err
}

fw, err := cli.NewPortForwarder(podName, KmeshNamespace, "", 0, KmeshAdminPort)
if err != nil {
return nil, fmt.Errorf("failed to create port forwarder: %v", err)
}

return fw, nil
}

func CreateKubeClient() (kube.CLIClient, error) {
rc, err := kube.DefaultRestConfig("", "")
if err != nil {
Expand All @@ -55,3 +41,18 @@ func CreateKubeClient() (kube.CLIClient, error) {

return cli, nil
}

// Create a new PortForwarder configured for the given Kmesh daemon pod.
func CreateKmeshPortForwarder(cliClient kube.CLIClient, podName string) (kube.PortForwarder, error) {
// cli, err := CreateKubeClient()
// if err != nil {
// return nil, err
// }

fw, err := cliClient.NewPortForwarder(podName, KmeshNamespace, "", 0, KmeshAdminPort)
if err != nil {
return nil, fmt.Errorf("failed to create port forwarder: %v", err)
}

return fw, nil
}
10 changes: 6 additions & 4 deletions pkg/controller/telemetry/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"reflect"
"strings"
"sync"
"sync/atomic"
"time"
"unsafe"

Expand All @@ -50,7 +51,7 @@ const (
var osStartTime time.Time

type MetricController struct {
enableAccesslog bool
EnableAccesslog atomic.Bool
workloadCache cache.WorkloadCache
workloadMetricCache map[workloadMetricLabels]*workloadMetricInfo
serviceMetricCache map[serviceMetricLabels]*serviceMetricInfo
Expand Down Expand Up @@ -175,12 +176,13 @@ type serviceMetricLabels struct {
}

func NewMetric(workloadCache cache.WorkloadCache, enableAccesslog bool) *MetricController {
return &MetricController{
enableAccesslog: enableAccesslog,
m := &MetricController{

Check warning on line 179 in pkg/controller/telemetry/metric.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/telemetry/metric.go#L179

Added line #L179 was not covered by tests
workloadCache: workloadCache,
workloadMetricCache: map[workloadMetricLabels]*workloadMetricInfo{},
serviceMetricCache: map[serviceMetricLabels]*serviceMetricInfo{},
}
m.EnableAccesslog.Store(enableAccesslog)
return m

Check warning on line 185 in pkg/controller/telemetry/metric.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/telemetry/metric.go#L184-L185

Added lines #L184 - L185 were not covered by tests
}

func (m *MetricController) Run(ctx context.Context, mapOfTcpInfo *ebpf.Map) {
Expand Down Expand Up @@ -262,7 +264,7 @@ func (m *MetricController) Run(ctx context.Context, mapOfTcpInfo *ebpf.Map) {
serviceLabels.reporter = "source"
accesslog.direction = "OUTBOUND"
}
if data.state == TCP_CLOSTED && accesslog.sourceWorkload != "-" && m.enableAccesslog {
if data.state == TCP_CLOSTED && accesslog.sourceWorkload != "-" && m.EnableAccesslog.Load() {

Check warning on line 267 in pkg/controller/telemetry/metric.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/telemetry/metric.go#L267

Added line #L267 was not covered by tests
OutputAccesslog(data, accesslog)
}
m.mutex.Lock()
Expand Down
8 changes: 8 additions & 0 deletions pkg/controller/workload/workload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,11 @@ func (c *Controller) HandleWorkloadStream() error {

return nil
}

func (c *Controller) SetAccesslogTrigger(info bool) {
c.MetricController.EnableAccesslog.Store(info)

Check warning on line 133 in pkg/controller/workload/workload_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workload/workload_controller.go#L132-L133

Added lines #L132 - L133 were not covered by tests
}

func (c *Controller) GetAccesslogTrigger() bool {
return c.MetricController.EnableAccesslog.Load()

Check warning on line 137 in pkg/controller/workload/workload_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workload/workload_controller.go#L136-L137

Added lines #L136 - L137 were not covered by tests
}
19 changes: 19 additions & 0 deletions pkg/status/status_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ const (
patternConfigDumpWorkload = configDumpPrefix + "/workload"
patternReadyProbe = "/debug/ready"
patternLoggers = "/debug/loggers"
patternAccesslog = "/accesslog"

bpfLoggerName = "bpf"

Expand Down Expand Up @@ -100,6 +101,7 @@ func NewServer(c *controller.XdsClient, configs *options.BootstrapConfigs, bpfLo
s.mux.HandleFunc(patternConfigDumpAds, s.configDumpAds)
s.mux.HandleFunc(patternConfigDumpWorkload, s.configDumpWorkload)
s.mux.HandleFunc(patternLoggers, s.loggersHandler)
s.mux.HandleFunc(patternAccesslog, s.accesslogHandler)

Check warning on line 104 in pkg/status/status_server.go

View check run for this annotation

Codecov / codecov/patch

pkg/status/status_server.go#L104

Added line #L104 was not covered by tests

// TODO: add dump certificate, authorizationPolicies and services
s.mux.HandleFunc(patternReadyProbe, s.readyProbe)
Expand Down Expand Up @@ -236,6 +238,23 @@ func (s *Server) loggersHandler(w http.ResponseWriter, r *http.Request) {
}
}

func (s *Server) accesslogHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
log.Errorf("accesslogHandler export POST method but get %v", r.Method)
return

Check warning on line 244 in pkg/status/status_server.go

View check run for this annotation

Codecov / codecov/patch

pkg/status/status_server.go#L243-L244

Added lines #L243 - L244 were not covered by tests
}

var info bool
accesslogInfo := r.URL.Query().Get("enable")
if accesslogInfo == "true" {
info = true

Check warning on line 250 in pkg/status/status_server.go

View check run for this annotation

Codecov / codecov/patch

pkg/status/status_server.go#L250

Added line #L250 was not covered by tests
} else {
info = false
}
s.xdsClient.WorkloadController.SetAccesslogTrigger(info)
w.WriteHeader(http.StatusOK)
}

func (s *Server) getLoggerNames(w http.ResponseWriter) {
loggerNames := append(logger.GetLoggerNames(), bpfLoggerName)
data, err := json.MarshalIndent(&loggerNames, "", " ")
Expand Down
32 changes: 32 additions & 0 deletions pkg/status/status_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"kmesh.net/kmesh/pkg/constants"
"kmesh.net/kmesh/pkg/controller"
"kmesh.net/kmesh/pkg/controller/ads"
"kmesh.net/kmesh/pkg/controller/telemetry"
"kmesh.net/kmesh/pkg/controller/workload"
"kmesh.net/kmesh/pkg/controller/workload/bpfcache"
"kmesh.net/kmesh/pkg/controller/workload/cache"
Expand Down Expand Up @@ -483,3 +484,34 @@ func TestServer_dumpAdsBpfMap(t *testing.T) {
assert.Equal(t, len(testListeners), len(dump.DynamicResources.ListenerConfigs))
})
}

func TestServerAccesslogHandler(t *testing.T) {
t.Run("change accesslog config info", func(t *testing.T) {
config := options.BpfConfig{
Mode: "workload",
BpfFsPath: "/sys/fs/bpf",
Cgroup2Path: "/mnt/kmesh_cgroup2",
EnableMda: false,
EnableBpfLog: false,
EnableAccesslog: true,
}
cleanup, _ := test.InitBpfMap(t, config)
defer cleanup()

server := &Server{
xdsClient: &controller.XdsClient{
WorkloadController: &workload.Controller{
MetricController: &telemetry.MetricController{},
},
},
}
server.xdsClient.WorkloadController.MetricController.EnableAccesslog.Store(true)

url := fmt.Sprintf("%s?enable=%s", patternAccesslog, "false")
req := httptest.NewRequest(http.MethodPost, url, nil)
w := httptest.NewRecorder()
server.accesslogHandler(w, req)

assert.Equal(t, server.xdsClient.WorkloadController.GetAccesslogTrigger(), false)
})
}

0 comments on commit 0a4919a

Please sign in to comment.