From 82e5793e6de4d74df9b18bb23bf64e94059ae2d9 Mon Sep 17 00:00:00 2001 From: Andy Librian Date: Sun, 10 Mar 2024 10:17:53 +0700 Subject: [PATCH] initial work on removing loopValidateProcesses --- pkg/nodeagent/capture_exec.go | 195 -------------------- pkg/nodeagent/nodeagent.go | 335 ++++++++++++++++++++-------------- test/k8s/test.sh | 5 +- 3 files changed, 206 insertions(+), 329 deletions(-) diff --git a/pkg/nodeagent/capture_exec.go b/pkg/nodeagent/capture_exec.go index 4cdefa80..0b9e87cf 100644 --- a/pkg/nodeagent/capture_exec.go +++ b/pkg/nodeagent/capture_exec.go @@ -1,16 +1,5 @@ package nodeagent -import ( - "context" - "fmt" - - "github.com/intelops/tarian-detector/pkg/detector" - "github.com/intelops/tarian-detector/tarian" - "github.com/sirupsen/logrus" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" -) - // ExecEvent represents the structure of an execution event captured by the CaptureExec. // It stores information about a process execution event, including its process ID (Pid), // command name (Command), executable filename (Filename), associated container ID (ContainerID), @@ -44,187 +33,3 @@ type ExecEvent struct { // K8sPodAnnotations are the annotations associated with the Kubernetes Pod. K8sPodAnnotations map[string]string } - -// CaptureExec captures and processes execution events, associating them with Kubernetes Pods. -// It uses eBPF (Extended Berkeley Packet Filter) to capture execution events in the Linux kernel. -type CaptureExec struct { - ctx context.Context - event Event // captured Event - shouldClose bool // Flag indicating whether the capture should be closed - nodeName string // The name of the node where the capture is running - logger *logrus.Logger // Logger instance for logging - eventsDetectorChan chan map[string]any - eventsDetector *detector.EventsDetector -} - -// Event contains the events channel and the error channel -type Event struct { - eventsChan chan ExecEvent - errChan chan error -} - -// NewCaptureExec creates a new CaptureExec instance for capturing and processing execution events. -// It initializes the eBPF capture execution instance and sets up a channel for sending events. -// -// Parameters: -// - logger: A logger instance for logging. -// -// Returns: -// - *CaptureExec: A new instance of CaptureExec. -// - error: An error if creating the eBPF capture execution instance fails. -func NewCaptureExec(ctx context.Context, logger *logrus.Logger) (*CaptureExec, error) { - return &CaptureExec{ - ctx: ctx, - event: Event{eventsChan: make(chan ExecEvent, 1000), errChan: make(chan error)}, - logger: logger, - eventsDetectorChan: make(chan map[string]any, 1000), - }, nil -} - -// SetNodeName sets the name of the node where the capture is running. -// -// Parameters: -// - name: The name of the node. -func (c *CaptureExec) SetNodeName(name string) { - c.nodeName = name -} - -// Start begins capturing execution events and associating them with Kubernetes Pods. -// It returns an error if any of the setup steps fail. -func (c *CaptureExec) Start() { - // Get in-cluster configuration for Kubernetes. - config, err := rest.InClusterConfig() - if err != nil { - c.event.errChan <- fmt.Errorf("CaptureExec.Start: failed to get in cluster config: %w", err) - return - } - - // Create a Kubernetes client. - k8sClient := kubernetes.NewForConfigOrDie(config) - - // Create a PodWatcher to watch for Pods on the node. - watcher, err := NewPodWatcher(c.logger, k8sClient, c.nodeName) - if err != nil { - c.event.errChan <- fmt.Errorf("CaptureExec.Start: failed to create pod watcher: %w", err) - return - } - watcher.Start() - - // Can't get the returned error with Goroutine. If it's needed, we can use a channel. - go c.getTarianDetectorEbpfEvents() - - for { - // Wait for eBPF execution events. - bpfEvt := <-c.eventsDetectorChan - - // Check if the capture should be closed. - if c.shouldClose { - break - } - - pid := bpfEvt["hostProcessId"].(uint32) - // Retrieve the container ID. - containerID, err := procsContainerID(pid) - fmt.Println("containerID", containerID, "err", err) - if err != nil { - continue - } - - // Find the corresponding Kubernetes Pod. - pod := watcher.FindPod(containerID) - - var podName string - var podUID string - var namespace string - var podLabels map[string]string - var podAnnotations map[string]string - - if pod != nil { - podName = pod.GetName() - podUID = string(pod.GetUID()) - namespace = pod.GetNamespace() - podLabels = pod.GetLabels() - podAnnotations = pod.GetAnnotations() - } - - // Create an ExecEvent and send it to the events channel. - execEvent := ExecEvent{ - Pid: pid, - Filename: bpfEvt["directory"].(string) + "/" + bpfEvt["processName"].(string), - Command: bpfEvt["processName"].(string), - ContainerID: containerID, - K8sPodName: podName, - K8sPodUID: podUID, - K8sNamespace: namespace, - K8sPodLabels: podLabels, - K8sPodAnnotations: podAnnotations, - } - c.event.eventsChan <- execEvent - } -} - -// Close stops the capture process and closes associated resources. -func (c *CaptureExec) Close() { - c.shouldClose = true - c.eventsDetector.Close() -} - -// GetEvent returns the Event which contains channel for receiving events and error channel. -func (c *CaptureExec) GetEvent() Event { - return c.event -} - -// getTarianDetectorEbpfEvents retrieves Tarian detector EBPF events. -// -// No parameters. -// Returns an error. -func (c *CaptureExec) getTarianDetectorEbpfEvents() error { - tarianEbpfModule, err := tarian.GetModule() - if err != nil { - c.logger.Errorf("error while get tarian-detector ebpf module: %v", err) - return fmt.Errorf("error while get tarian-detector ebpf module: %w", err) - } - - tarianDetector, err := tarianEbpfModule.Prepare() - if err != nil { - c.logger.Errorf("error while prepare tarian-detector: %v", err) - return fmt.Errorf("error while prepare tarian-detector: %w", err) - } - - // Instantiate event detectors - eventsDetector := detector.NewEventsDetector() - - // Add ebpf programs to detectors - eventsDetector.Add(tarianDetector) - - // Start and defer Close - err = eventsDetector.Start() - if err != nil { - c.logger.Errorf("error while start tarian detector: %v", err) - return fmt.Errorf("error while start tarian-detector: %w", err) - } - - c.eventsDetector = eventsDetector - - defer c.eventsDetector.Close() - - go func() { - for { - event, err := c.eventsDetector.ReadAsInterface() - if err != nil { - c.logger.WithError(err).Error("error while read event") - continue - } - - if event == nil { - continue - } - - c.eventsDetectorChan <- event - } - }() - - <-c.ctx.Done() - return c.ctx.Err() - -} diff --git a/pkg/nodeagent/nodeagent.go b/pkg/nodeagent/nodeagent.go index bb4f9ab2..2cb2c706 100644 --- a/pkg/nodeagent/nodeagent.go +++ b/pkg/nodeagent/nodeagent.go @@ -18,6 +18,9 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/protobuf/types/known/timestamppb" + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" ) // ThreatScanAnnotation is the annotation key used to enable threat scans on pods. @@ -104,18 +107,13 @@ func (n *NodeAgent) Run() { defer n.grpcConn.Close() wg := sync.WaitGroup{} - wg.Add(3) + wg.Add(2) go func() { _ = n.loopSyncConstraints(n.cancelCtx) wg.Done() }() - go func() { - _ = n.loopValidateProcesses(n.cancelCtx) - wg.Done() - }() - go func() { _ = n.loopTarianDetectorReadEvents(n.cancelCtx) wg.Done() @@ -180,73 +178,231 @@ func (n *NodeAgent) SyncConstraints() { n.constraintsInitialized = true } -// loopValidateProcesses continuously validates processes against constraints. -// -// Parameters: -// - ctx: The context for the loop. +// loopTarianDetectorReadEvents reads events from the Tarian detector and sends them to the cluster agent. // -// Returns: -// - error: An error, if any, encountered during the loop. -func (n *NodeAgent) loopValidateProcesses(ctx context.Context) error { - captureExec, err := NewCaptureExec(ctx, n.logger) +// ctx context.Context +// error +func (n *NodeAgent) loopTarianDetectorReadEvents(ctx context.Context) error { + // Create a PodWatcher to watch for Pods on the node. + podWatcher, err := n.setupPodWatcher() if err != nil { - return fmt.Errorf("nodeagent: %w", err) + return err } + podWatcher.Start() - captureExec.SetNodeName(n.nodeName) + eventsDetector, err := n.setupEventsDetector() + if err != nil { + return err + } - exeEvent := captureExec.GetEvent() - go captureExec.Start() + // Start eventsDetector and defer Close + err = eventsDetector.Start() + if err != nil { + n.logger.Errorf("error while starting tarian detector: %v", err) + return fmt.Errorf("error while starting tarian-detector: %w", err) + } + defer eventsDetector.Close() for { select { - case err := <-exeEvent.errChan: - captureExec.Close() - return fmt.Errorf("nodeagent: %w", err) case <-ctx.Done(): - captureExec.Close() - return fmt.Errorf("nodeagent: %w", ctx.Err()) - case evt := <-exeEvent.eventsChan: - if !n.constraintsInitialized { + return ctx.Err() + default: + event, err := eventsDetector.ReadAsInterface() + if err != nil { + n.logger.Errorf("tarian-detector: error while read event: %v", err) + continue + } + + if event == nil { + continue + } + + pid := event["hostProcessId"].(uint32) + + // Retrieve the container ID. + containerID, err := procsContainerID(pid) + if err != nil { continue } - _, threatScanAnnotationPresent := evt.K8sPodAnnotations[ThreatScanAnnotation] - registerAnnotationValue, registerAnnotationPresent := evt.K8sPodAnnotations[RegisterAnnotation] - if !threatScanAnnotationPresent && !registerAnnotationPresent { + if containerID == "" { continue } - // Pod has a register annotation but the cluster disables registration - if registerAnnotationPresent && !n.enableAddConstraint { + // Find the corresponding Kubernetes Pod. + pod := podWatcher.FindPod(containerID) + if pod == nil { continue } - violation := n.ValidateProcess(&evt) - if violation != nil { - registerProcess := false - registerRules := strings.Split(registerAnnotationValue, ",") - for _, rule := range registerRules { - switch strings.TrimSpace(rule) { - case "processes": - registerProcess = true - case "all": - registerProcess = true - } + // TODO: sys_execve_entry could be added here + // But for kubectl exec, the detected entry comm is still the wrapper: runc:init + // With sys_execve_exit, the comm is the target process + detectionDataType := event["eventId"].(string) + if detectionDataType == "sys_execve_exit" { + execEvent, err2 := n.execEventFromTarianDetector(event, containerID, pod) + if err2 != nil { + n.logger.WithField("err", err2).Error("tarian-detector: error while converting tarian-detector to execEvent") } - if registerProcess { - n.logger.WithField("comm", evt).Debug("violated process detected, going to register") - n.RegisterViolationsAsNewConstraint(violation) - } else { - n.logger.WithField("comm", evt).Debug("violated process detected") - n.ReportViolationsToClusterAgent(violation) + if execEvent != nil { + n.handleExecEvent(execEvent) } } + + byteData, err := json.Marshal(event) + if err != nil { + n.logger.Error("tarian-detector: error while marshaling event", "err", err) + continue + } + + n.SendDetectionEventToClusterAgent(detectionDataType, string(byteData)) + n.logger.WithField("binary_file_path", event["directory"]).WithField("hostProcessId", event["hostProcessId"]). + WithField("processId", event["processId"]).WithField("comm", event["processName"]).Info("tarian-detector: ", detectionDataType) } } } +func (n *NodeAgent) setupEventsDetector() (*detector.EventsDetector, error) { + tarianEbpfModule, err := tarian.GetModule() + if err != nil { + n.logger.Errorf("error while get tarian-detector ebpf module: %v", err) + return nil, fmt.Errorf("error while get tarian-detector ebpf module: %w", err) + } + + tarianDetector, err := tarianEbpfModule.Prepare() + if err != nil { + n.logger.Errorf("error while prepare tarian-detector: %v", err) + return nil, fmt.Errorf("error while prepare tarian-detector: %w", err) + } + + // Instantiate event detectors + eventsDetector := detector.NewEventsDetector() + + // Add ebpf programs to detectors + eventsDetector.Add(tarianDetector) + + return eventsDetector, nil +} + +func (n *NodeAgent) setupPodWatcher() (*PodWatcher, error) { + // Get in-cluster configuration for Kubernetes. + config, err := rest.InClusterConfig() + if err != nil { + n.logger.Errorf("error while creating k8s client config: %v", err) + return nil, fmt.Errorf("error while creating k8s client config: %w", err) + } + + // Create a Kubernetes client. + k8sClient := kubernetes.NewForConfigOrDie(config) + + watcher, err := NewPodWatcher(n.logger, k8sClient, n.nodeName) + if err != nil { + n.logger.Errorf("error while starting pod-watcher: %v", err) + return nil, fmt.Errorf("error while starting pod-watcher: %w", err) + } + + return watcher, nil +} + +func (n *NodeAgent) execEventFromTarianDetector(bpfEvt map[string]any, containerID string, pod *corev1.Pod) (*ExecEvent, error) { + pid := bpfEvt["hostProcessId"].(uint32) + + var podName string + var podUID string + var namespace string + var podLabels map[string]string + var podAnnotations map[string]string + + podName = pod.GetName() + podUID = string(pod.GetUID()) + namespace = pod.GetNamespace() + podLabels = pod.GetLabels() + podAnnotations = pod.GetAnnotations() + + // Create an ExecEvent and send it to the events channel. + execEvent := &ExecEvent{ + Pid: pid, + Filename: bpfEvt["directory"].(string) + "/" + bpfEvt["processName"].(string), + Command: bpfEvt["processName"].(string), + ContainerID: containerID, + K8sPodName: podName, + K8sPodUID: podUID, + K8sNamespace: namespace, + K8sPodLabels: podLabels, + K8sPodAnnotations: podAnnotations, + } + + return execEvent, nil +} + +func (n *NodeAgent) handleExecEvent(evt *ExecEvent) error { + if !n.constraintsInitialized { + return nil + } + + _, threatScanAnnotationPresent := evt.K8sPodAnnotations[ThreatScanAnnotation] + registerAnnotationValue, registerAnnotationPresent := evt.K8sPodAnnotations[RegisterAnnotation] + if !threatScanAnnotationPresent && !registerAnnotationPresent { + return nil + } + + // Pod has a register annotation but the cluster disables registration + if registerAnnotationPresent && !n.enableAddConstraint { + return nil + } + + violation := n.ValidateProcess(evt) + if violation != nil { + registerProcess := false + registerRules := strings.Split(registerAnnotationValue, ",") + for _, rule := range registerRules { + switch strings.TrimSpace(rule) { + case "processes": + registerProcess = true + case "all": + registerProcess = true + } + } + + if registerProcess { + n.logger.WithField("comm", evt).Debug("violated process detected, going to register") + n.RegisterViolationsAsNewConstraint(violation) + } else { + n.logger.WithField("comm", evt).Debug("violated process detected") + n.ReportViolationsToClusterAgent(violation) + } + } + + return nil +} + +// SendDetectionEventToClusterAgent sends a detection event to the cluster agent. +// +// It takes two parameters: detectionDataType of type string, and detectionData of type string. +func (n *NodeAgent) SendDetectionEventToClusterAgent(detectionDataType, detectionData string) { + req := tarianpb.IngestEventRequest{ + Event: &tarianpb.Event{ + Type: tarianpb.EventTypeDetection, + ClientTimestamp: timestamppb.New(time.Now()), + Targets: []*tarianpb.Target{ + { + DetectionDataType: detectionDataType, + DetectionData: detectionData, + }, + }, + }, + } + + resp, err := n.eventClient.IngestEvent(context.Background(), &req) + if err != nil { + n.logger.Error("error while sending detection events", "err", err) + } else { + n.logger.Debug("ingest event response", "response", resp) + } +} + // ValidateProcess validates a process event against constraints. // // Parameters: @@ -415,91 +571,6 @@ func (n *NodeAgent) RegisterViolationsAsNewConstraint(violation *ProcessViolatio } } -// loopTarianDetectorReadEvents reads events from the Tarian detector and sends them to the cluster agent. -// -// ctx context.Context -// error -func (n *NodeAgent) loopTarianDetectorReadEvents(ctx context.Context) error { - tarianEbpfModule, err := tarian.GetModule() - if err != nil { - n.logger.Errorf("error while get tarian-detector ebpf module: %v", err) - return fmt.Errorf("error while get tarian-detector ebpf module: %w", err) - } - - tarianDetector, err := tarianEbpfModule.Prepare() - if err != nil { - n.logger.Errorf("error while prepare tarian-detector: %v", err) - return fmt.Errorf("error while prepare tarian-detector: %w", err) - } - - // Instantiate event detectors - eventsDetector := detector.NewEventsDetector() - - // Add ebpf programs to detectors - eventsDetector.Add(tarianDetector) - - // Start and defer Close - err = eventsDetector.Start() - if err != nil { - n.logger.Errorf("error while start tarian detector: %v", err) - return fmt.Errorf("error while start tarian-detector: %w", err) - } - - defer eventsDetector.Close() - - go func() { - for { - event, err := eventsDetector.ReadAsInterface() - if err != nil { - n.logger.Errorf("tarian-detector: error while read event: %v", err) - continue - } - - if event == nil { - continue - } - detectionDataType := event["eventId"].(string) - byteData, err := json.Marshal(event) - if err != nil { - n.logger.Error("tarian-detector: error while marshaling event", "err", err) - continue - } - - n.SendDetectionEventToClusterAgent(detectionDataType, string(byteData)) - n.logger.Info("tarian-detector: ", detectionDataType, "binary_file_path", event["directory"], "hostProcessId", event["hostProcessId"], - "processId", event["processId"], "comm", event["processName"]) - } - }() - - <-ctx.Done() - return ctx.Err() -} - -// SendDetectionEventToClusterAgent sends a detection event to the cluster agent. -// -// It takes two parameters: detectionDataType of type string, and detectionData of type string. -func (n *NodeAgent) SendDetectionEventToClusterAgent(detectionDataType, detectionData string) { - req := tarianpb.IngestEventRequest{ - Event: &tarianpb.Event{ - Type: tarianpb.EventTypeDetection, - ClientTimestamp: timestamppb.New(time.Now()), - Targets: []*tarianpb.Target{ - { - DetectionDataType: detectionDataType, - DetectionData: detectionData, - }, - }, - }, - } - - resp, err := n.eventClient.IngestEvent(context.Background(), &req) - if err != nil { - n.logger.Error("error while sending detection events", "err", err) - } else { - n.logger.Debug("ingest event response", "response", resp) - } -} - // matchLabelsFromPodLabels converts a map of labels to a slice of MatchLabel protobufs. // // Parameters: diff --git a/test/k8s/test.sh b/test/k8s/test.sh index 864d7f45..132f3d80 100755 --- a/test/k8s/test.sh +++ b/test/k8s/test.sh @@ -59,8 +59,9 @@ test $(kubectl run -ti --restart=Never verify-alerts --image=curlimages/curl -- || (echo "expected alerts created" && false) # run command to register constraints -kubectl exec -ti nginx2 -c nginx -- pwd -kubectl exec -ti nginx2 -c nginx -- ls / +# multiple times to compensate occassional eBPF missing events +for i in {1..5}; do kubectl exec -ti nginx2 -c nginx -- pwd; sleep 1; done +for i in {1..5}; do kubectl exec -ti nginx2 -c nginx -- ls /; sleep 1; done # give time for tarian-cluser-agent to process data from node agents sleep 5