diff --git a/pkg/clusteragent/kubeactions/executors/adapter.go b/pkg/clusteragent/kubeactions/executors/adapter.go index fb3dc03a557b..28782995c09b 100644 --- a/pkg/clusteragent/kubeactions/executors/adapter.go +++ b/pkg/clusteragent/kubeactions/executors/adapter.go @@ -10,10 +10,16 @@ package executors import ( "context" + "time" kubeactions "github.com/DataDog/agent-payload/v5/kubeactions" ) +const ( + // default timeout for all executors, can be overridden by individual executors if needed + defaultExecutorTimeout = 10 * time.Second +) + // Executor is the interface that all executors in this package implement type Executor interface { Execute(ctx context.Context, action *kubeactions.KubeAction) ExecutionResult diff --git a/pkg/clusteragent/kubeactions/executors/delete_pod.go b/pkg/clusteragent/kubeactions/executors/delete_pod.go index 0175f817762f..673a27818148 100644 --- a/pkg/clusteragent/kubeactions/executors/delete_pod.go +++ b/pkg/clusteragent/kubeactions/executors/delete_pod.go @@ -12,9 +12,10 @@ import ( "fmt" kubeactions "github.com/DataDog/agent-payload/v5/kubeactions" - "github.com/DataDog/datadog-agent/pkg/util/log" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + + "github.com/DataDog/datadog-agent/pkg/util/log" ) // Execution status constants @@ -25,8 +26,9 @@ const ( // ExecutionResult represents the result of executing an action type ExecutionResult struct { - Status string - Message string + Status string + Message string + Payloads map[string][]byte } // DeletePodExecutor executes delete pod actions @@ -34,6 +36,8 @@ type DeletePodExecutor struct { clientset kubernetes.Interface } +var _ Executor = (*DeletePodExecutor)(nil) + // NewDeletePodExecutor creates a new DeletePodExecutor func NewDeletePodExecutor(clientset kubernetes.Interface) *DeletePodExecutor { return &DeletePodExecutor{ diff --git a/pkg/clusteragent/kubeactions/executors/get_resource.go b/pkg/clusteragent/kubeactions/executors/get_resource.go new file mode 100644 index 000000000000..2dd7a84c3a19 --- /dev/null +++ b/pkg/clusteragent/kubeactions/executors/get_resource.go @@ -0,0 +1,150 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +//go:build kubeapiserver + +package executors + +import ( + "bytes" + "context" + "errors" + "fmt" + "net/url" + "strings" + + "k8s.io/client-go/kubernetes" + + "sigs.k8s.io/yaml" + + "github.com/DataDog/datadog-agent/pkg/util/log" + + kubeactions "github.com/DataDog/agent-payload/v5/kubeactions" +) + +const ( + maxResourceOutputSize = 4 * 1024 // 4KB +) + +type GetResourceExecutor struct { + clientset kubernetes.Interface +} + +// Ensure interface compliance at compile time +var _ Executor = (*GetResourceExecutor)(nil) + +var ( + // ErrUnsupportedFormat is returned when the requested output format is not supported + ErrUnsupportedFormat = errors.New("unsupported output format") +) + +// NewGetResourceExecutor creates a new GetResourceExecutor +func NewGetResourceExecutor(clientset kubernetes.Interface) *GetResourceExecutor { + return &GetResourceExecutor{ + clientset: clientset, + } +} + +// Execute retrieves the specified Kubernetes resource and returns it as JSON string in the message field of ExecutionResult +func (e *GetResourceExecutor) Execute(ctx context.Context, action *kubeactions.KubeAction) ExecutionResult { + resource := action.Resource + namespace := strings.ToLower(resource.GetNamespace()) + name := strings.ToLower(resource.GetName()) + apiVersion := strings.ToLower(resource.GetApiVersion()) + kind := strings.ToLower(resource.GetKind()) + + if apiVersion == "" { + return ExecutionResult{ + Status: StatusFailed, + Message: "apiVersion is required to get resource", + } + } + + // prevent the executor from being used to get secrets for security reasons, even if the user has permissions to do so, we don't want to allow that + if strings.Contains(kind, "secret") { + return ExecutionResult{ + Status: StatusFailed, + Message: "getting secrets is not allowed for security reasons", + } + } + + log.Infof("Getting resource %s/%s of type %s", namespace, name, resource.Kind) + + // build the raw REST request to get the resource as unstructured JSON + var path string + + // the api version for core resources does not contain a '/'. + // and the path for core resources is /api/.... + // as for all other resources the path is /apis/... + var apiPrefix string + if !strings.Contains(apiVersion, "/") { + apiPrefix = "/api" + } else { + apiPrefix = "/apis" + } + + // resource.GetApiVersion() returns group/version, it will automagically handle adding the group prefix if needed + // or not adding it for core resources + if namespace == "" { + path, _ = url.JoinPath(apiPrefix, apiVersion, kind, name) + } else { + path, _ = url.JoinPath(apiPrefix, apiVersion, "namespaces", namespace, kind, name) + } + + ctx, cancel := context.WithTimeout(ctx, defaultExecutorTimeout) + defer cancel() + + log.Debugf("get_resource using path '%s'", path) + data, err := e.clientset.CoreV1().RESTClient().Get().AbsPath(path).Do(ctx).Raw() + if err != nil { + return ExecutionResult{ + Status: StatusFailed, + Message: fmt.Sprintf("failed to get resource: %v -- raw response body: %s", err, string(data)), + } + } + + outputFormat := "json" + if output := action.GetGetResource_().GetOutputFormat(); output != "" { + outputFormat = strings.ToLower(output) + } + + output, err := formatOutput(data, outputFormat) + if err != nil { + return ExecutionResult{ + Status: StatusFailed, + Message: fmt.Sprintf("failed to format output to %s: %v", outputFormat, err), + } + } + + output = bytes.TrimSpace(output) + if len(output) > maxResourceOutputSize { + log.Warnf("output for resource %s/%s of type %s is too large (%d bytes), truncating to %d bytes", namespace, name, kind, len(output), maxResourceOutputSize) + output = output[:maxResourceOutputSize] + } + + return ExecutionResult{ + Status: StatusSuccess, + Message: fmt.Sprintf("get resource %s/%s success", kind, name), + Payloads: map[string][]byte{ + "resource": []byte(output), + }, + } +} + +func formatOutput(data []byte, format string) ([]byte, error) { + switch format { + case "json": + return data, nil + case "yaml": + jsonData := data + yamlData, err := yaml.JSONToYAML(jsonData) + if err != nil { + return nil, fmt.Errorf("failed to convert resource JSON to YAML: %v", err) + } + return yamlData, nil + default: + return nil, ErrUnsupportedFormat + } +} diff --git a/pkg/clusteragent/kubeactions/executors/get_resource_test.go b/pkg/clusteragent/kubeactions/executors/get_resource_test.go new file mode 100644 index 000000000000..3f4ee93a94c8 --- /dev/null +++ b/pkg/clusteragent/kubeactions/executors/get_resource_test.go @@ -0,0 +1,79 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +//go:build kubeapiserver + +package executors + +import ( + "errors" + "testing" +) + +var testPodJSON = []byte(`{ + "apiVersion": "v1", + "kind": "Pod", + "metadata": { + "name": "test-pod", + "namespace": "default" + }, + "spec": { + "containers": [ + { + "name": "test-container", + "image": "busybox", + "command": ["sleep", "3600"] + } + ] + } +}`) + +func TestOutputFormat(t *testing.T) { + tests := []struct { + name string + input []byte + outputFormat string + expectErr bool + }{ + { + name: "from json to json", + input: testPodJSON, + outputFormat: "json", + expectErr: false, + }, + { + name: "from json to yaml", + input: testPodJSON, + outputFormat: "yaml", + expectErr: false, + }, + { + name: "unsupported format", + input: testPodJSON, + outputFormat: "xml", + expectErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // test the output format conversion + _, err := formatOutput(tt.input, tt.outputFormat) + + // we don't compare the output because the yaml output can have unordered fields. + // we only ensure it does not fail + if err != nil { + if !tt.expectErr { + t.Errorf("exccpected a result, unexpected error: %v", err) + } + + // with unsuported format we expect an error + if !errors.Is(err, ErrUnsupportedFormat) { + t.Errorf("expected ErrUnsupportedFormat, got: %v", err) + } + } + }) + } +} diff --git a/pkg/clusteragent/kubeactions/executors/patch_deployment.go b/pkg/clusteragent/kubeactions/executors/patch_deployment.go index c1608168a4d9..739d63f195c4 100644 --- a/pkg/clusteragent/kubeactions/executors/patch_deployment.go +++ b/pkg/clusteragent/kubeactions/executors/patch_deployment.go @@ -12,10 +12,11 @@ import ( "fmt" kubeactions "github.com/DataDog/agent-payload/v5/kubeactions" - "github.com/DataDog/datadog-agent/pkg/util/log" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" + + "github.com/DataDog/datadog-agent/pkg/util/log" ) // PatchDeploymentExecutor executes patch deployment actions @@ -23,6 +24,8 @@ type PatchDeploymentExecutor struct { clientset kubernetes.Interface } +var _ Executor = (*PatchDeploymentExecutor)(nil) + // NewPatchDeploymentExecutor creates a new PatchDeploymentExecutor func NewPatchDeploymentExecutor(clientset kubernetes.Interface) *PatchDeploymentExecutor { return &PatchDeploymentExecutor{ diff --git a/pkg/clusteragent/kubeactions/executors/restart_deployment.go b/pkg/clusteragent/kubeactions/executors/restart_deployment.go index eac9505a38e3..525980399a59 100644 --- a/pkg/clusteragent/kubeactions/executors/restart_deployment.go +++ b/pkg/clusteragent/kubeactions/executors/restart_deployment.go @@ -13,9 +13,10 @@ import ( "time" kubeactions "github.com/DataDog/agent-payload/v5/kubeactions" - "github.com/DataDog/datadog-agent/pkg/util/log" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + + "github.com/DataDog/datadog-agent/pkg/util/log" ) // RestartDeploymentExecutor executes restart deployment actions @@ -23,6 +24,8 @@ type RestartDeploymentExecutor struct { clientset kubernetes.Interface } +var _ Executor = (*RestartDeploymentExecutor)(nil) + // NewRestartDeploymentExecutor creates a new RestartDeploymentExecutor func NewRestartDeploymentExecutor(clientset kubernetes.Interface) *RestartDeploymentExecutor { return &RestartDeploymentExecutor{ diff --git a/pkg/clusteragent/kubeactions/executors/rollback_deployment.go b/pkg/clusteragent/kubeactions/executors/rollback_deployment.go index 910d88d1a64c..8a8b601ef28b 100644 --- a/pkg/clusteragent/kubeactions/executors/rollback_deployment.go +++ b/pkg/clusteragent/kubeactions/executors/rollback_deployment.go @@ -83,6 +83,8 @@ type RollbackDeploymentExecutor struct { clientset kubernetes.Interface } +var _ Executor = (*RollbackDeploymentExecutor)(nil) + // NewRollbackDeploymentExecutor creates a new RollbackDeploymentExecutor func NewRollbackDeploymentExecutor(clientset kubernetes.Interface) *RollbackDeploymentExecutor { return &RollbackDeploymentExecutor{ diff --git a/pkg/clusteragent/kubeactions/reporter.go b/pkg/clusteragent/kubeactions/reporter.go index 68ec1af58028..04041999f262 100644 --- a/pkg/clusteragent/kubeactions/reporter.go +++ b/pkg/clusteragent/kubeactions/reporter.go @@ -12,6 +12,7 @@ import ( "time" kubeactions "github.com/DataDog/agent-payload/v5/kubeactions" + "github.com/DataDog/datadog-agent/comp/forwarder/eventplatform" "github.com/DataDog/datadog-agent/pkg/logs/message" "github.com/DataDog/datadog-agent/pkg/util/log" @@ -27,20 +28,21 @@ const ( // The backend stores the entire serialized payload as the `data` jsonb column // and extracts the top-level fields into their respective DB columns. type ActionResultEvent struct { - ActionID string `json:"action_id"` - OrgID int64 `json:"org_id"` - EventType string `json:"event_type"` - Status string `json:"status"` - ActionType string `json:"action_type"` - ClusterID string `json:"cluster_id"` - ResourceID string `json:"resource_id"` - RequestedBy string `json:"requested_by"` - Timestamp string `json:"timestamp"` - Message string `json:"message"` - ClusterName string `json:"cluster_name"` - ResourceKind string `json:"resource_kind,omitempty"` - ResourceName string `json:"resource_name,omitempty"` - ResourceNamespace string `json:"resource_namespace,omitempty"` + ActionID string `json:"action_id"` + OrgID int64 `json:"org_id"` + EventType string `json:"event_type"` + Status string `json:"status"` + ActionType string `json:"action_type"` + ClusterID string `json:"cluster_id"` + ResourceID string `json:"resource_id"` + RequestedBy string `json:"requested_by"` + Timestamp string `json:"timestamp"` + Message string `json:"message"` + Payloads map[string][]byte `json:"payloads,omitempty"` + ClusterName string `json:"cluster_name"` + ResourceKind string `json:"resource_kind,omitempty"` + ResourceName string `json:"resource_name,omitempty"` + ResourceNamespace string `json:"resource_namespace,omitempty"` } // ResultReporter handles reporting action execution results back to the backend via Event Platform @@ -61,15 +63,15 @@ func NewResultReporter(epForwarder eventplatform.Forwarder, clusterName, cluster // ReportReceived sends an action_received event via EVP when an action is first received from RC func (r *ResultReporter) ReportReceived(actionKey ActionKey, action *kubeactions.KubeAction, orgID int64) { - r.report(actionKey, action, orgID, EventTypeActionReceived, StatusSuccess, "action received", time.Now()) + r.report(actionKey, action, orgID, EventTypeActionReceived, StatusSuccess, "action received", nil, time.Now()) } // ReportResult sends an action_executed event via EVP after execution completes func (r *ResultReporter) ReportResult(actionKey ActionKey, action *kubeactions.KubeAction, result ExecutionResult, orgID int64, executedAt time.Time) { - r.report(actionKey, action, orgID, EventTypeActionExecuted, result.Status, result.Message, executedAt) + r.report(actionKey, action, orgID, EventTypeActionExecuted, result.Status, result.Message, result.Payloads, executedAt) } -func (r *ResultReporter) report(actionKey ActionKey, action *kubeactions.KubeAction, orgID int64, evpEventType, status, msg string, ts time.Time) { +func (r *ResultReporter) report(actionKey ActionKey, action *kubeactions.KubeAction, orgID int64, evpEventType, status, msg string, payloads map[string][]byte, ts time.Time) { if r.epForwarder == nil { log.Warnf("[KubeActions] Event Platform forwarder not available, skipping %s reporting for action %s", evpEventType, actionKey.String()) return @@ -102,6 +104,7 @@ func (r *ResultReporter) report(actionKey ActionKey, action *kubeactions.KubeAct RequestedBy: action.GetRequestedBy(), Timestamp: ts.Format(time.RFC3339), Message: msg, + Payloads: payloads, ClusterName: r.clusterName, ResourceKind: resourceKind, ResourceName: resourceName, diff --git a/pkg/clusteragent/kubeactions/setup.go b/pkg/clusteragent/kubeactions/setup.go index 867535d71fa9..28ff34cb3578 100644 --- a/pkg/clusteragent/kubeactions/setup.go +++ b/pkg/clusteragent/kubeactions/setup.go @@ -58,8 +58,9 @@ type executorAdapter struct { func (a *executorAdapter) Execute(ctx context.Context, action *kubeactions.KubeAction) ExecutionResult { result := a.exec.Execute(ctx, action) return ExecutionResult{ - Status: result.Status, - Message: result.Message, + Status: result.Status, + Message: result.Message, + Payloads: result.Payloads, } } @@ -81,6 +82,10 @@ func registerExecutors(registry *ExecutorRegistry, clientset kubernetes.Interfac registry.Register("rollback_deployment", &executorAdapter{exec: executors.NewRollbackDeploymentExecutor(clientset)}) log.Infof("Registered executor for action type: rollback_deployment") + // Register get_resource executor + registry.Register("get_resource", &executorAdapter{exec: executors.NewGetResourceExecutor(clientset)}) + log.Infof("Registered executor for action type: get_resource") + // TODO: Add more executors here as they are implemented: // registry.Register("drain_node", &executorAdapter{exec: executors.NewDrainNodeExecutor(clientset)}) // registry.Register("cordon_node", &executorAdapter{exec: executors.NewCordonNodeExecutor(clientset)}) diff --git a/pkg/clusteragent/kubeactions/types.go b/pkg/clusteragent/kubeactions/types.go index bca4619b4ada..8740f1d22985 100644 --- a/pkg/clusteragent/kubeactions/types.go +++ b/pkg/clusteragent/kubeactions/types.go @@ -20,6 +20,7 @@ const ( ActionTypeRestartDeployment = "restart_deployment" ActionTypePatchDeployment = "patch_deployment" ActionTypeRollbackDeployment = "rollback_deployment" + ActionTypeGetResource = "get_resource" ) // Execution status constants @@ -33,8 +34,9 @@ const ( // ExecutionResult represents the result of executing an action type ExecutionResult struct { - Status string - Message string + Status string + Message string + Payloads map[string][]byte } // ActionExecutor is the interface that all action executors must implement @@ -59,6 +61,8 @@ func GetActionType(action *kubeactions.KubeAction) string { return ActionTypePatchDeployment case *kubeactions.KubeAction_RollbackDeployment: return ActionTypeRollbackDeployment + case *kubeactions.KubeAction_GetResource_: + return ActionTypeGetResource default: return ActionTypeUnknown } diff --git a/releasenotes/notes/add-new-remote-config-action-k8s-get-resource-b6cbed5ae1f48410.yaml b/releasenotes/notes/add-new-remote-config-action-k8s-get-resource-b6cbed5ae1f48410.yaml new file mode 100644 index 000000000000..cdcc80ad0c6f --- /dev/null +++ b/releasenotes/notes/add-new-remote-config-action-k8s-get-resource-b6cbed5ae1f48410.yaml @@ -0,0 +1,11 @@ +# Each section from every release note are combined when the +# CHANGELOG.rst is rendered. So the text needs to be worded so that +# it does not depend on any information only available in another +# section. This may mean repeating some details, but each section +# must be readable independently of the other. +# +# Each section note must be formatted as reStructuredText. +--- +features: + - | + Adds a new action ``get-resource`` in kubeactions.