From c4649990e086ff4d936193aa979d92f724425316 Mon Sep 17 00:00:00 2001 From: Bob Melander Date: Tue, 2 Jul 2024 15:18:18 +0200 Subject: [PATCH] feat: adds periodic health reporting to the k8s inventory agent WIP: DO NOT MERGE!! Addresses: - Enterprise-4118 - Enterprise-4119 Signed-off-by: Bob Melander --- anchore-k8s-inventory.yaml | 11 ++ cmd/root.go | 74 +++++++- internal/anchore/anchoreclient.go | 154 ++++++++++++++++ internal/config/config.go | 17 +- .../snapshot/TestDefaultConfigString.golden | 5 + .../snapshot/TestEmptyConfigString.golden | 5 + .../snapshot/TestSensitiveConfigJSON.golden | 4 + .../snapshot/TestSensitiveConfigString.golden | 5 + internal/time/time.go | 35 ++++ pkg/healthreporter/healthreporter.go | 169 +++++++++++++++++ pkg/integration/integration.go | 107 +++++++++++ pkg/lib.go | 170 +++++++++++++++++- skaffold.yaml | 1 + 13 files changed, 745 insertions(+), 12 deletions(-) create mode 100644 internal/anchore/anchoreclient.go create mode 100644 internal/time/time.go create mode 100644 pkg/healthreporter/healthreporter.go create mode 100644 pkg/integration/integration.go diff --git a/anchore-k8s-inventory.yaml b/anchore-k8s-inventory.yaml index 6445dd6..39cf826 100644 --- a/anchore-k8s-inventory.yaml +++ b/anchore-k8s-inventory.yaml @@ -14,6 +14,14 @@ log: # enable/disable checking for application updates on startup check-for-app-update: true +registration: + # The id to register the agent as with Enterprise (should be a freshly generated UUIDv4) + # fallback-register-id: db2963a1-b029-4d7c-b42c-4e8f81c263dd + # Force registration to be done with fallback id, not just as a fallback. Defaults to false. + always-use-fallback-register-id: false + # The name to register the agent as with Enterprise. Defaults to anchore_k8s_inventory_agent. + fallback-register-name: anchore_k8s_inventory_agent + kubeconfig: path: cluster: docker-desktop @@ -117,6 +125,9 @@ ignore-not-running: true # Only respected if mode is periodic polling-interval-seconds: 300 +# Only respected if mode is periodic +health-report-interval-seconds: 60 + # Batch Request configuration inventory-report-limits: namespaces: 0 # default of 0 means no limit per report diff --git a/cmd/root.go b/cmd/root.go index aaa151f..771af73 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -3,6 +3,8 @@ package cmd import ( "errors" "fmt" + "github.com/anchore/k8s-inventory/pkg/healthreporter" + "github.com/anchore/k8s-inventory/pkg/integration" "os" "runtime/pprof" @@ -45,9 +47,57 @@ var rootCmd = &cobra.Command{ os.Exit(1) } + // TODO(bob): Add global variable to disable all health reporting + // * If registration REST call fails with HTTP 404 Not found **API endpoint** + // then that is an indication that agent is interacting with an older version of + // Enterprise that lacks the Integration feature. We should then + // disable all health reporting functionality in the agent but otherwise + // let it perform its normal tasks. + // * If registration REST call fails with HTTP 404 Not found **Account or User** + // then that means the agent has an incorrect configuration and should not be + // allowed to proceed. Log the error and then exit. + // * if + // * If registration REST call fails with HTTP 500 Internal error then what? + // Log the error and then exit? + // * If registration REST call fails with HTTP 409 re-register with different + // integration_id then the agent should change its integration_id to that value + // and re-register. + + // TODO(bob): How to handle Enterprise being offline during registration and + // periodic health-reporting. Reasons for being offline could be that: + // - Enterprise is not started (this could be infinite) + // - Enterprise is starting (this could take minutes) + // - Enterprise undergoes upgrade (this could take hours) + // - other reasons? + // The current behavior of an k8s inventory agent is that its starts and then periodically + // starts sending inventory reports. If a sending fails a new attempt will be made in the + // next cycle. In that sense it will wait indefinitely for Enterprise to come online. + // To be backward compatible with the current behavior of k8s inventory agent, failed + // *registrations* should therefore be retried indefinitely. + // -- Do we really want infinite to literally mean infinite? + // Or just a large enough number of retries with exponential backoff + // -- K8s Deployment, StatefulSets etc will normally restart failed (or just exited) pods + // -- Docker also has ability to restart failed (or just exited) + // Failure to send a health report should just be logged and then ignored. A new attempt + // to send a health report will be performed in the next iteration. + + instance, err := callHome() + if err != nil { + os.Exit(1) + } + switch appConfig.RunMode { case mode.PeriodicPolling: - pkg.PeriodicallyGetInventoryReport(appConfig) + neverDone := make(chan bool, 1) + + gatedReportInfo := healthreporter.GatedReportInfo{ + AccountInventoryReports: make(healthreporter.AccountK8SInventoryReports, 0), + } + + go pkg.PeriodicallyGetInventoryReport(appConfig, &gatedReportInfo) + go healthreporter.PeriodicallySendHealthReport(appConfig, &instance, &gatedReportInfo) + + <-neverDone default: reports, err := pkg.GetInventoryReports(appConfig) if appConfig.Dev.ProfileCPU { @@ -58,10 +108,11 @@ var rootCmd = &cobra.Command{ os.Exit(1) } anErrorOccurred := false + reportInfo := healthreporter.InventoryReportInfo{} for account, reportsForAccount := range reports { for count, report := range reportsForAccount { log.Infof("Sending Inventory Report to Anchore Account %s, %d of %d", account, count+1, len(reportsForAccount)) - err = pkg.HandleReport(report, appConfig, account) + err = pkg.HandleReport(report, &reportInfo, appConfig, account) if errors.Is(err, reporter.ErrAnchoreAccountDoesNotExist) { // Retry with default account retryAccount := appConfig.AnchoreDetails.Account @@ -69,7 +120,7 @@ var rootCmd = &cobra.Command{ retryAccount = appConfig.AccountRouteByNamespaceLabel.DefaultAccount } log.Warnf("Error sending to Anchore Account %s, sending to default account", account) - err = pkg.HandleReport(report, appConfig, retryAccount) + err = pkg.HandleReport(report, &reportInfo, appConfig, retryAccount) } if err != nil { log.Errorf("Failed to handle Image Results: %+v", err) @@ -84,6 +135,23 @@ var rootCmd = &cobra.Command{ }, } +func callHome() (integration.Integration, error) { + namespace := os.Getenv("POD_NAMESPACE") + name := os.Getenv("HOSTNAME") + instance, err := pkg.GetIntegrationInfo(appConfig, namespace, name) + if err != nil { + log.Errorf("Failed to get Integration Info: %+v", err) + return integration.Integration{}, err + } + // Register this agent with enterprise + err = integration.Register(&instance, appConfig.AnchoreDetails) + if err != nil { + log.Errorf("Unable to register agent: %v", err) + return integration.Integration{}, err + } + return instance, nil +} + func init() { opt := "kubeconfig" rootCmd.Flags().StringP(opt, "k", "", "(optional) absolute path to the kubeconfig file") diff --git a/internal/anchore/anchoreclient.go b/internal/anchore/anchoreclient.go new file mode 100644 index 0000000..af7ac71 --- /dev/null +++ b/internal/anchore/anchoreclient.go @@ -0,0 +1,154 @@ +package anchore + +import ( + "bytes" + "crypto/tls" + "encoding/json" + "fmt" + "github.com/anchore/k8s-inventory/internal/config" + "github.com/anchore/k8s-inventory/internal/log" + "github.com/anchore/k8s-inventory/internal/tracker" + "github.com/h2non/gock" + "io" + "net/http" + "net/url" + "strings" + "time" +) + +type APIError struct { + Message string `json:"message"` + Detail map[string]interface{} `json:"detail"` + HTTPCode int `json:"httpcode"` +} + +func ErrAnchoreEndpointDoesNotExist(path string) error { + return fmt.Errorf("api endpoint does not exist: %s", path) +} + +type ErrAchoreAPIClient struct { + HTTPStatusCode int + Message string + Path string + Body *[]byte +} + +func (e *ErrAchoreAPIClient) Error() string { + return fmt.Sprintf("API error(%d): %s Path: %q", e.HTTPStatusCode, e.Message, e.Path) +} + +func Put(requestBody []byte, id string, path string, anchoreDetails config.AnchoreInfo, operation string) (*[]byte, error) { + defer tracker.TrackFunctionTime(time.Now(), fmt.Sprintf("Sent %s request to Anchore", operation)) + + log.Debugf("Performing %s to Anchore using endpoint: %s", operation, strings.Replace(path, "{{id}}", id, 1)) + + client := getClient(anchoreDetails) + + anchoreURL, err := getURL(anchoreDetails, path, id) + if err != nil { + return nil, err + } + + request, err := getPutRequest(anchoreDetails, anchoreURL, requestBody, operation) + if err != nil { + return nil, err + } + + return doPut(client, request, strings.Replace(path, "{{id}}", id, 1), operation) +} + +func getClient(anchoreDetails config.AnchoreInfo) *http.Client { + tr := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: anchoreDetails.HTTP.Insecure}, + } // #nosec G402 + + client := &http.Client{ + Transport: tr, + Timeout: time.Duration(anchoreDetails.HTTP.TimeoutSeconds) * time.Second, + } + gock.InterceptClient(client) // Required to use gock for testing custom client + + return client +} + +func getURL(anchoreDetails config.AnchoreInfo, path string, id string) (string, error) { + anchoreURL, err := url.Parse(anchoreDetails.URL) + if err != nil { + return "", fmt.Errorf("failed to build path (%s) url: %w", path, err) + } + + anchoreURL.Path += strings.Replace(path, "{{id}}", id, 1) + return anchoreURL.String(), nil +} + +func getPutRequest(anchoreDetails config.AnchoreInfo, endpointURL string, reqBody []byte, operation string) (*http.Request, error) { + request, err := http.NewRequest("PUT", endpointURL, bytes.NewBuffer(reqBody)) + if err != nil { + return nil, fmt.Errorf("failed to prepare %s request to Anchore: %w", operation, err) + } + + request.SetBasicAuth(anchoreDetails.User, anchoreDetails.Password) + request.Header.Set("Content-Type", "application/json") + request.Header.Set("x-anchore-account", anchoreDetails.Account) + return request, nil +} + +func doPut(client *http.Client, request *http.Request, path string, operation string) (*[]byte, error) { + response, err := client.Do(request) + if err != nil { + return nil, fmt.Errorf("failed to send %s to Anchore: %w", operation, err) + } + defer response.Body.Close() + + err = checkHTTPErrors(response, request, path, operation) + if err != nil { + return nil, err + } + + responseBody, _ := getBody(response, operation) + return responseBody, nil +} + +func checkHTTPErrors(response *http.Response, _ *http.Request, path string, operation string) error { + switch { + case response.StatusCode == 403: + msg := fmt.Sprintf("forbidden response (403) from Anchore (during %s)", operation) + log.Debug(msg) + return &ErrAchoreAPIClient{Message: msg, Path: path, Body: nil, HTTPStatusCode: response.StatusCode} + // return fmt.Errorf("user account not found") + case response.StatusCode == 404: + msg := fmt.Sprintf("forbidden response (404) from Anchore (during %s)", operation) + log.Debugf("%s: path: %s. Please verify that correct version of Anchore is deployed.", msg, path) + return &ErrAchoreAPIClient{Message: msg, Path: path, Body: nil, HTTPStatusCode: response.StatusCode} + // return ErrAnchoreEndpointDoesNotExist(path) + case response.StatusCode == 409: + msg := fmt.Sprintf("conflict response (409) from Anchore (during %s)", operation) + log.Debug(msg) + respBody, _ := getBody(response, operation) + return &ErrAchoreAPIClient{Message: msg, Path: path, Body: respBody, HTTPStatusCode: response.StatusCode} + case response.StatusCode < 200 || response.StatusCode > 299: + msg := fmt.Sprintf("failed to perform %s to Anchore: %+v", operation, response) + log.Debugf(msg) + return &ErrAchoreAPIClient{Message: msg, Path: path, Body: nil, HTTPStatusCode: response.StatusCode} + // return fmt.Errorf("failed to perform %s to Anchore: %+v", operation, response) + } + return nil +} + +func getBody(response *http.Response, operation string) (*[]byte, error) { + responseBody, err := io.ReadAll(response.Body) + if err != nil { + errMsg := fmt.Sprintf("failed to read %s response body from Anchore:", operation) + log.Debugf("%s %v", operation, errMsg) + return nil, fmt.Errorf("%s %w", errMsg, err) + } + + // Check we received a valid JSON response from Anchore, this will help catch + // any redirect responses where it returns HTML login pages e.g. Enterprise + // running behind cloudflare where a login page is returned with the status 200 + if len(responseBody) > 0 && !json.Valid(responseBody) { + log.Debugf("Anchore %s response body: %s", operation, string(responseBody)) + return nil, fmt.Errorf("%s response from Anchore is not valid json: %+v", operation, response) + } + return &responseBody, nil +} diff --git a/internal/config/config.go b/internal/config/config.go index 9332532..6ce15f9 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -15,7 +15,6 @@ import ( "path" "strings" - "github.com/anchore/k8s-inventory/pkg/mode" "gopkg.in/yaml.v2" "github.com/adrg/xdg" @@ -24,6 +23,7 @@ import ( "github.com/spf13/viper" "github.com/anchore/k8s-inventory/internal" + "github.com/anchore/k8s-inventory/pkg/mode" ) const redacted = "******" @@ -37,8 +37,9 @@ type CliOnlyOptions struct { // All Application configurations type Application struct { ConfigPath string - Quiet bool `mapstructure:"quiet" json:"quiet,omitempty" yaml:"quiet"` - Log Logging `mapstructure:"log" json:"log,omitempty" yaml:"log"` + Quiet bool `mapstructure:"quiet" json:"quiet,omitempty" yaml:"quiet"` + Log Logging `mapstructure:"log" json:"log,omitempty" yaml:"log"` + Registration RegistrationOptions `mapstructure:"registration" json:"registration,omitempty" yaml:"registration"` CliOptions CliOnlyOptions Dev Development `mapstructure:"dev" json:"dev,omitempty" yaml:"dev"` KubeConfig KubeConf `mapstructure:"kubeconfig" json:"kubeconfig,omitempty" yaml:"kubeconfig"` @@ -54,12 +55,19 @@ type Application struct { Mode string `mapstructure:"mode" json:"mode,omitempty" yaml:"mode"` IgnoreNotRunning bool `mapstructure:"ignore-not-running" json:"ignore-not-running,omitempty" yaml:"ignore-not-running"` PollingIntervalSeconds int `mapstructure:"polling-interval-seconds" json:"polling-interval-seconds,omitempty" yaml:"polling-interval-seconds"` + HealthReportIntervalSeconds int `mapstructure:"health-report-interval-seconds" json:"health-report-interval-seconds,omitempty" yaml:"health-report-interval-seconds"` InventoryReportLimits InventoryReportLimits `mapstructure:"inventory-report-limits" json:"inventory-report-limits,omitempty" yaml:"inventory-report-limits"` MetadataCollection MetadataCollection `mapstructure:"metadata-collection" json:"metadata-collection,omitempty" yaml:"metadata-collection"` AnchoreDetails AnchoreInfo `mapstructure:"anchore" json:"anchore,omitempty" yaml:"anchore"` VerboseInventoryReports bool `mapstructure:"verbose-inventory-reports" json:"verbose-inventory-reports,omitempty" yaml:"verbose-inventory-reports"` } +type RegistrationOptions struct { + FallbackRegisterID string `mapstructure:"fallback-register-id" json:"fallback-register-id,omitempty" yaml:"fallback-register-id"` + AlwaysUseFallbackRegisterID bool `mapstructure:"always-use-fallback-register-id" json:"always-use-fallback-register-id,omitempty" yaml:"always-use-fallback-register-id"` + FallbackRegisterName string `mapstructure:"fallback-register-name" json:"fallback-register-name,omitempty" yaml:"fallback-register-name"` +} + // MissingTagConf details the policy for handling missing tags when reporting images type MissingTagConf struct { Policy string `mapstructure:"policy" json:"policy,omitempty" yaml:"policy"` @@ -150,6 +158,8 @@ func setNonCliDefaultValues(v *viper.Viper) { v.SetDefault("log.level", "") v.SetDefault("log.file", "") v.SetDefault("log.structured", false) + v.SetDefault("registration.always-use-fallback-register-id", false) + v.SetDefault("registration.fallback-register-name", "anchore_k8s_inventory_agent") v.SetDefault("dev.profile-cpu", false) v.SetDefault("anchore.account", "admin") v.SetDefault("kubeconfig.anchore.account", "admin") @@ -160,6 +170,7 @@ func setNonCliDefaultValues(v *viper.Viper) { v.SetDefault("kubernetes.request-batch-size", 100) v.SetDefault("kubernetes.worker-pool-size", 100) v.SetDefault("ignore-not-running", true) + v.SetDefault("health-report-interval-seconds", 60) v.SetDefault("missing-registry-override", "") v.SetDefault("missing-tag-policy.policy", "digest") v.SetDefault("missing-tag-policy.tag", "UNKNOWN") diff --git a/internal/config/test-fixtures/snapshot/TestDefaultConfigString.golden b/internal/config/test-fixtures/snapshot/TestDefaultConfigString.golden index c3b656b..db46aeb 100644 --- a/internal/config/test-fixtures/snapshot/TestDefaultConfigString.golden +++ b/internal/config/test-fixtures/snapshot/TestDefaultConfigString.golden @@ -5,6 +5,10 @@ log: levelopt: debug level: debug file: ./anchore-k8s-inventory.log +registration: + fallback-register-id: "" + always-use-fallback-register-id: false + fallback-register-name: anchore_k8s_inventory_agent clioptions: configpath: ../../anchore-k8s-inventory.yaml verbosity: 0 @@ -44,6 +48,7 @@ runmode: 0 mode: adhoc ignore-not-running: true polling-interval-seconds: 300 +health-report-interval-seconds: 60 inventory-report-limits: namespaces: 0 metadata-collection: diff --git a/internal/config/test-fixtures/snapshot/TestEmptyConfigString.golden b/internal/config/test-fixtures/snapshot/TestEmptyConfigString.golden index 8f85b8b..ddf916a 100644 --- a/internal/config/test-fixtures/snapshot/TestEmptyConfigString.golden +++ b/internal/config/test-fixtures/snapshot/TestEmptyConfigString.golden @@ -5,6 +5,10 @@ log: levelopt: panic level: "" file: "" +registration: + fallback-register-id: "" + always-use-fallback-register-id: false + fallback-register-name: "" clioptions: configpath: "" verbosity: 0 @@ -44,6 +48,7 @@ runmode: 0 mode: "" ignore-not-running: false polling-interval-seconds: 0 +health-report-interval-seconds: 0 inventory-report-limits: namespaces: 0 metadata-collection: diff --git a/internal/config/test-fixtures/snapshot/TestSensitiveConfigJSON.golden b/internal/config/test-fixtures/snapshot/TestSensitiveConfigJSON.golden index b9da0c5..3913c5d 100644 --- a/internal/config/test-fixtures/snapshot/TestSensitiveConfigJSON.golden +++ b/internal/config/test-fixtures/snapshot/TestSensitiveConfigJSON.golden @@ -5,6 +5,9 @@ "level": "debug", "file": "./anchore-k8s-inventory.log" }, + "registration": { + "fallback-register-name": "anchore_k8s_inventory_agent" + }, "CliOptions": { "ConfigPath": "../../anchore-k8s-inventory.yaml", "Verbosity": 0 @@ -50,6 +53,7 @@ "mode": "adhoc", "ignore-not-running": true, "polling-interval-seconds": 300, + "health-report-interval-seconds": 60, "inventory-report-limits": {}, "metadata-collection": { "nodes": {}, diff --git a/internal/config/test-fixtures/snapshot/TestSensitiveConfigString.golden b/internal/config/test-fixtures/snapshot/TestSensitiveConfigString.golden index e72b4f9..22111a6 100644 --- a/internal/config/test-fixtures/snapshot/TestSensitiveConfigString.golden +++ b/internal/config/test-fixtures/snapshot/TestSensitiveConfigString.golden @@ -5,6 +5,10 @@ log: levelopt: debug level: debug file: ./anchore-k8s-inventory.log +registration: + fallback-register-id: "" + always-use-fallback-register-id: false + fallback-register-name: anchore_k8s_inventory_agent clioptions: configpath: ../../anchore-k8s-inventory.yaml verbosity: 0 @@ -54,6 +58,7 @@ runmode: 0 mode: adhoc ignore-not-running: true polling-interval-seconds: 300 +health-report-interval-seconds: 60 inventory-report-limits: namespaces: 0 metadata-collection: diff --git a/internal/time/time.go b/internal/time/time.go new file mode 100644 index 0000000..ef64bf8 --- /dev/null +++ b/internal/time/time.go @@ -0,0 +1,35 @@ +package time + +import ( + "fmt" + "time" +) + +// time with json marshalling/unmarshalling support + +type Datetime struct { + time.Time +} +type Duration struct { + time.Duration +} + +func Now() Datetime { + return Datetime{time.Now()} +} + +func (d Datetime) UTC() Datetime { + return Datetime{d.Time.UTC()} +} + +func (d Datetime) Sub(d2 Datetime) Duration { + return Duration{d.Time.Sub(d2.Time)} +} + +func (d Datetime) MarshalJSON() ([]byte, error) { + return []byte(fmt.Sprintf("\"%s\"", d.Format(time.RFC3339))), nil +} + +func (d Duration) MarshalJSON() ([]byte, error) { + return []byte(fmt.Sprintf("\"%f\"", d.Seconds())), nil +} diff --git a/pkg/healthreporter/healthreporter.go b/pkg/healthreporter/healthreporter.go new file mode 100644 index 0000000..fe187cd --- /dev/null +++ b/pkg/healthreporter/healthreporter.go @@ -0,0 +1,169 @@ +package healthreporter + +import ( + "encoding/json" + "sync" + "time" + + "github.com/google/uuid" + + "github.com/anchore/k8s-inventory/internal/anchore" + "github.com/anchore/k8s-inventory/internal/config" + "github.com/anchore/k8s-inventory/internal/log" + jstime "github.com/anchore/k8s-inventory/internal/time" + intg "github.com/anchore/k8s-inventory/pkg/integration" +) + +const healthProtocolVersion = 1 +const healthDataVersion = 1 +const healthDataType = "anchore_k8s_inventory_agent" +const healthReportAPIPathV2 = "v2/system/integrations/{{id}}/health-report" + +type HealthReport struct { + ID string `json:"id,omitempty"` // uuid for this health report + ProtocolVersion int `json:"protocol_version,omitempty"` // protocol version for "common" part of health reporting + Timestamp jstime.Datetime `json:"timestamp,omitempty"` // timestamp for this health report in UTC().Format(time.RFC3339) + IntegrationID string `json:"integration_id,omitempty"` // uuid of the integration + InstanceID string `json:"instance_id,omitempty"` // identifier that make integration instance unique among its replicas + Uptime jstime.Duration `json:"uptime,omitempty"` // running time of integration instance + HealthReportInterval int `json:"health_report_interval,omitempty"` // time in seconds between health reports + HealthData HealthData `json:"health_data,omitempty"` // K8s-inventory agent specific health data +} + +type HealthData struct { + Type string `json:"type,omitempty"` // type of health data + Version int `json:"version,omitempty"` // format version + Errors HealthReportErrors `json:"errors,omitempty"` // list of errors + // Anything below this line is specific to k8s-inventory-agent + AccountK8sInventoryReports AccountK8SInventoryReports `json:"account_k8s_inventory_reports,omitempty"` // latest inventory reports per account +} + +type HealthReportErrors []string + +// AccountK8SInventoryReports holds per account information about latest inventory reports from the same batch set +type AccountK8SInventoryReports map[string]InventoryReportInfo + +type InventoryReportInfo struct { + ReportTimestamp string `json:"report_timestamp"` // Timestamp for the inventory report that was batched + Account string `json:"account"` // Account to which the inventory report belongs + SentAsUser string `json:"sent_as_user"` // User that the inventory report was sent as + BatchSize int `json:"batch_size"` // Number of batches that the inventory report was sent in + LastSuccessfulIndex int `json:"last_successful_index"` // Index of last successfully sent batch, -1 if none + HasErrors bool `json:"has_errors"` // HasErrors is true if any of the batches had an error, false otherwise + Batches []BatchInfo `json:"batches"` // Information about each inventory report batch +} + +type BatchInfo struct { + BatchIndex int `json:"batch_index,omitempty"` // Index of this inventory report batch item + SendTimestamp jstime.Datetime `json:"send_timestamp,omitempty"` // Timestamp when the batch was sent, in UTC().Format(time.RFC3339) + Error string `json:"error,omitempty"` // Any error this batch encountered when sent +} + +// GatedReportInfo The go routine that generates the inventory report must inform the go routine +// that sends health reports about the *latest* sent inventory reports. +// A buffered channel is FIFO so the earliest inserted items are returned first. No new items can +// be added when the buffer is full. This means that the information about the latest sent health +// reports will have to be dropped in such situations. We would rather drop the information about +// the *oldest* sent health reports. +// We therefore use a map (key'ed by account) to store information about the latest sent inventory +// reports This map is shared by the go routine that generates inventory reports and the go +// routine that sends health reports. Access to the map is coordinated by a mutex. +type GatedReportInfo struct { + AccessGate sync.RWMutex + AccountInventoryReports AccountK8SInventoryReports +} + +func PeriodicallySendHealthReport(cfg *config.Application, integration *intg.Integration, gatedReportInfo *GatedReportInfo) { + ticker := time.NewTicker(time.Duration(cfg.HealthReportIntervalSeconds) * time.Second) + + for { + log.Infof("Waiting %d seconds to send health report...", cfg.HealthReportIntervalSeconds) + + healthReportID := uuid.New().String() + lastReports := GetAccountReportInfoNoBlocking(gatedReportInfo, cfg) + + integration.Uptime = jstime.Now().UTC().Sub(integration.StartedAt) + healthReport := HealthReport{ + ID: healthReportID, + ProtocolVersion: healthProtocolVersion, + Timestamp: jstime.Now().UTC(), + IntegrationID: integration.ID, + InstanceID: integration.InstanceID, + Uptime: integration.Uptime, + HealthData: HealthData{ + Type: healthDataType, + Version: healthDataVersion, + Errors: make(HealthReportErrors, 0), // any errors are only reported in lastReports + AccountK8sInventoryReports: lastReports, + }, + HealthReportInterval: cfg.HealthReportIntervalSeconds, + } + + log.Infof("Sending health report (id:%s) covering %d accounts", healthReport.ID, len(healthReport.HealthData.AccountK8sInventoryReports)) + requestBody, err := json.Marshal(healthReport) + if err != nil { + log.Errorf("failed to serialize integration registration as JSON: %v", err) + } else { + log.Debugf("Size of healthreport as marshalled json %d bytes", len(requestBody)) // Bob. End of Remove this + _, err = anchore.Put(requestBody, integration.ID, healthReportAPIPathV2, cfg.AnchoreDetails, "health report") + if err != nil { + log.Errorf("Failed to send health report to Anchore: %v", err) + } + } + + // log.Debugf("Start new health report: %s", <-ticker.C) + <-ticker.C + } +} + +func GetAccountReportInfoNoBlocking(gatedReportInfo *GatedReportInfo, cfg *config.Application) AccountK8SInventoryReports { + locked := gatedReportInfo.AccessGate.TryLock() + + if locked { + defer gatedReportInfo.AccessGate.Unlock() + + log.Info("Removing inventory report info for accounts that are no longer active") + accountsToRemove := make(map[string]bool) + now := time.Now().UTC() + inactiveAge := 200000 * float64(cfg.PollingIntervalSeconds) + + for account, reportInfo := range gatedReportInfo.AccountInventoryReports { + for _, batchInfo := range reportInfo.Batches { + log.Debugf("Last inv.report (time:%s, account:%s, batch:%d/%d, sent:%s error:'%s')", + reportInfo.ReportTimestamp, account, batchInfo.BatchIndex, reportInfo.BatchSize, + batchInfo.SendTimestamp, batchInfo.Error) + reportTime, err := time.Parse(time.RFC3339, reportInfo.ReportTimestamp) + if err != nil { + log.Errorf("failed to parse report_timestamp: %v", err) + } else if now.Sub(reportTime).Seconds() > inactiveAge { + accountsToRemove[account] = true + } + } + } + + for accountToRemove := range accountsToRemove { + log.Debugf("Accounts no longer considered active: %s", accountToRemove) + delete(gatedReportInfo.AccountInventoryReports, accountToRemove) + } + + return gatedReportInfo.AccountInventoryReports + } + log.Debugf("Unable to obtain mutex lock to get aocount inventory report information. Continuing.") + return AccountK8SInventoryReports{} +} + +func SetReportInfoNoBlocking(account string, count int, reportInfo InventoryReportInfo, gatedReportInfo *GatedReportInfo) { + log.Debugf("Setting report (%s) for account '%s': %d/%d %s %s", reportInfo.ReportTimestamp, account, + reportInfo.Batches[count].BatchIndex, reportInfo.BatchSize, reportInfo.Batches[count].SendTimestamp, + reportInfo.Batches[count].Error) + locked := gatedReportInfo.AccessGate.TryLock() + if locked { + defer gatedReportInfo.AccessGate.Unlock() + gatedReportInfo.AccountInventoryReports[account] = reportInfo + } else { + // we prioritize no blocking over actually bookkeeping info for every sent inventory report + log.Debugf("Unable to obtain mutex lock to include inventory report timestamped %s for %s: %d/%d %s %s in health report. Continuing.", + reportInfo.ReportTimestamp, account, reportInfo.Batches[count].BatchIndex, reportInfo.BatchSize, + reportInfo.Batches[count].SendTimestamp) + } +} diff --git a/pkg/integration/integration.go b/pkg/integration/integration.go new file mode 100644 index 0000000..0260bac --- /dev/null +++ b/pkg/integration/integration.go @@ -0,0 +1,107 @@ +package integration + +import ( + "encoding/json" + "errors" + "fmt" + + "github.com/anchore/k8s-inventory/internal/anchore" + "github.com/anchore/k8s-inventory/internal/config" + "github.com/anchore/k8s-inventory/internal/log" + jstime "github.com/anchore/k8s-inventory/internal/time" +) + +const MaxAttempts = 2 +const IntegrationType = "anchore_k8s_inventory_agent" +const registerAPIPathV2 = "v2/system/integrations/{{id}}/register" + +// HealthStatus reflects the state of the Integration wrt any errors +// encountered when performing its tasks +type HealthStatus struct { + State string `json:"state,omitempty"` // state of the integration HEALTHY or UNHEALTHY +} + +// LifeCycle status reflects the state of the Integration from the perspective of Enterprise +type LifeCycleStatus struct { + State string `json:"state,omitempty"` // lifecycle state ONBOARDED, ACTIVE, DEGRADED, DEACTIVATED + Reason string `json:"reason,omitempty"` + Details string `json:"details,omitempty"` + UpdatedAt jstime.Datetime `json:"updated_at,omitempty"` +} + +type Integration struct { + ID string `json:"id,omitempty"` // uuid that uniquely over, space and time, identifies integration + InstanceID string `json:"instance_id,omitempty"` // identifier that make integration instance unique among its replicas + Type string `json:"type,omitempty"` // type of integration (e.g., 'anchore-k8s-agent') + Name string `json:"name,omitempty"` // name of the integration instance (e.g., k8s-agent-admin') + Description string `json:"description,omitempty"` // short description of integration instance + Version string `json:"version,omitempty"` // version of the integration instance + Status *HealthStatus `json:"status,omitempty"` // health status of the integration (Read-only) + IntegrationStatus *LifeCycleStatus `json:"integration_status,omitempty"` // lifecycle status of the integration (Read-only) + StartedAt jstime.Datetime `json:"started_at,omitempty"` // timestamp when integration instance was started in UTC().Format(time.RFC3339) + LastSeen *jstime.Datetime `json:"last_seen,omitempty"` // timestamp of last received health report from integration instance (Read-only) + Uptime jstime.Duration `json:"uptime,omitempty"` // running time of integration instance + Username string `json:"username,omitempty"` // user that the integration instance authenticates as during registration + AccountName string `json:"account_name,omitempty"` // default account that the integration instance authenticates as during registration + ExplicitlyAccountBound bool `json:"explicitly_account_bound,omitempty"` // true if integration is explicitly configured to handle certain accounts + Accounts []string `json:"accounts,omitempty"` // accounts that the integration instance handles + Namespaces []string `json:"namespaces,omitempty"` // namespaces that the integration instance handles + Configuration *config.Application `json:"configuration,omitempty"` // configuration for the integration instance + ClusterName string `json:"cluster_name,omitempty"` // name of cluster where the integration instance runs + Namespace string `json:"namespace,omitempty"` // uuid for namespace that the integration instance belongs to + HealthReportInterval int `json:"health_report_interval,omitempty"` // time in seconds between health reports +} + +func Register(integration *Integration, registrationDetails config.AnchoreInfo) error { + var err error + + // there should ever only be one re-registration with a new id + for i := MaxAttempts; i > 0; i-- { + var newIntegrationID string + + newIntegrationID, err = register(integration, registrationDetails) + if err == nil { + log.Infof("Successfully Registered %s agent: %s(%s) with %s", integration.Type, integration.Name, + integration.ID, registrationDetails.URL) + return nil + } + if newIntegrationID == "" { + break + } + if i > 1 { + log.Infof("Attempting to re-register agent (id: %s) with new id: %s", integration.ID, newIntegrationID) + integration.ID = newIntegrationID + } + } + log.Errorf("Failed to register integration agent: %s", err) + return err +} + +func register(integration *Integration, registrationDetails config.AnchoreInfo) (string, error) { + log.Infof("Registering %s agent: %s(%s) with %s", integration.Type, integration.Name, integration.ID, + registrationDetails.URL) + requestBody, err := json.Marshal(integration) + if err != nil { + return "", fmt.Errorf("failed to serialize integration registration as JSON: %w", err) + } + _, err = anchore.Put(requestBody, integration.ID, registerAPIPathV2, registrationDetails, "integration registration") + if err != nil { + return newID(err), err + } + return "", nil +} + +func newID(putErr error) string { + var apiClientErr *anchore.ErrAchoreAPIClient + + if errors.As(putErr, &apiClientErr) { + if *apiClientErr.Body != nil { + apiError := anchore.APIError{} + err := json.Unmarshal(*apiClientErr.Body, &apiError) + if err == nil && apiError.Message == "Re-register with different id" { + return apiError.Detail["new_integration_id"].(string) + } + } + } + return "" +} diff --git a/pkg/lib.go b/pkg/lib.go index dfb8b53..2545b4f 100644 --- a/pkg/lib.go +++ b/pkg/lib.go @@ -4,6 +4,7 @@ k8s go SDK */package pkg import ( + "context" "encoding/json" "errors" "fmt" @@ -11,16 +12,21 @@ import ( "regexp" "time" - "github.com/anchore/k8s-inventory/pkg/reporter" - "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" + "github.com/google/uuid" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/anchore/k8s-inventory/internal/config" "github.com/anchore/k8s-inventory/internal/log" + jstime "github.com/anchore/k8s-inventory/internal/time" "github.com/anchore/k8s-inventory/pkg/client" + "github.com/anchore/k8s-inventory/pkg/healthreporter" + intg "github.com/anchore/k8s-inventory/pkg/integration" "github.com/anchore/k8s-inventory/pkg/inventory" "github.com/anchore/k8s-inventory/pkg/logger" + "github.com/anchore/k8s-inventory/pkg/reporter" ) type ReportItem struct { @@ -38,6 +44,8 @@ type channels struct { type AccountRoutedReports map[string]inventory.Report type BatchedReports map[string][]inventory.Report +const AppVersionLabel = "app.kubernetes.io/version" + func reportToStdout(report inventory.Report) error { enc := json.NewEncoder(os.Stdout) // prevent > and < from being escaped in the payload @@ -49,7 +57,7 @@ func reportToStdout(report inventory.Report) error { return nil } -func HandleReport(report inventory.Report, cfg *config.Application, account string) error { +func HandleReport(report inventory.Report, reportInfo *healthreporter.InventoryReportInfo, cfg *config.Application, account string) error { if cfg.VerboseInventoryReports { err := reportToStdout(report) if err != nil { @@ -76,6 +84,7 @@ func HandleReport(report inventory.Report, cfg *config.Application, account stri } if anchoreDetails.IsValid() { + reportInfo.SentAsUser = anchoreDetails.User if err := reporter.Post(report, anchoreDetails); err != nil { if errors.Is(err, reporter.ErrAnchoreAccountDoesNotExist) { return err @@ -91,7 +100,7 @@ func HandleReport(report inventory.Report, cfg *config.Application, account stri // PeriodicallyGetInventoryReport periodically retrieve image results and report/output them according to the configuration. // Note: Errors do not cause the function to exit, since this is periodically running -func PeriodicallyGetInventoryReport(cfg *config.Application) { +func PeriodicallyGetInventoryReport(cfg *config.Application, gatedReportInfo *healthreporter.GatedReportInfo) { // Fire off a ticker that reports according to a configurable polling interval ticker := time.NewTicker(time.Duration(cfg.PollingIntervalSeconds) * time.Second) @@ -101,21 +110,46 @@ func PeriodicallyGetInventoryReport(cfg *config.Application) { log.Errorf("Failed to get Inventory Report: %w", err) } else { for account, reportsForAccount := range reports { + reportInfo := healthreporter.InventoryReportInfo{ + Account: account, + BatchSize: len(reportsForAccount), + LastSuccessfulIndex: -1, + Batches: make([]healthreporter.BatchInfo, 0), + HasErrors: false, + } for count, report := range reportsForAccount { log.Infof("Sending Inventory Report to Anchore Account %s, %d of %d", account, count+1, len(reportsForAccount)) - err := HandleReport(report, cfg, account) + + reportInfo.ReportTimestamp = report.Timestamp + batchInfo := healthreporter.BatchInfo{ + SendTimestamp: jstime.Now().UTC(), + BatchIndex: count + 1, + } + + err := HandleReport(report, &reportInfo, cfg, account) if errors.Is(err, reporter.ErrAnchoreAccountDoesNotExist) { + // record this error for the health report even if the retry works + batchInfo.Error = fmt.Sprintf("%s (%s) | ", err.Error(), account) + reportInfo.HasErrors = true + // Retry with default account retryAccount := cfg.AnchoreDetails.Account if cfg.AccountRouteByNamespaceLabel.DefaultAccount != "" { retryAccount = cfg.AccountRouteByNamespaceLabel.DefaultAccount } log.Warnf("Error sending to Anchore Account %s, sending to default account", account) - err = HandleReport(report, cfg, retryAccount) + err = HandleReport(report, &reportInfo, cfg, retryAccount) } if err != nil { log.Errorf("Failed to handle Inventory Report: %w", err) + // append the error to any error that happened during a retry, so we record both failures + batchInfo.Error += err.Error() + reportInfo.HasErrors = true + } else { + reportInfo.LastSuccessfulIndex = count + 1 } + reportInfo.Batches = append(reportInfo.Batches, batchInfo) + healthreporter.SetReportInfoNoBlocking(account, count, reportInfo, gatedReportInfo) } } } @@ -510,3 +544,127 @@ func processNamespace( func SetLogger(logger logger.Logger) { log.Log = logger } + +// GetAccountsAndNamespacesForAgent Determines accounts this agent managers using +// +// config.yaml (under account-routes:) +func GetAccountsAndNamespacesForAgent(cfg *config.Application) ([]string, []string) { + accountSet := make(map[string]bool) + namespaceSet := make(map[string]bool) + + // pick up accounts that are explicitly listed in the config + for account, accountRouteDetails := range cfg.AccountRoutes { + accountSet[account] = true + for _, namespace := range accountRouteDetails.Namespaces { + namespaceSet[namespace] = true + } + } + accounts := make([]string, 0, len(accountSet)) + for account := range accountSet { + accounts = append(accounts, account) + } + + namespaces := make([]string, 0, len(namespaceSet)) + for namespace := range namespaceSet { + namespaces = append(namespaces, namespace) + } + + return accounts, namespaces +} + +func GetIntegrationInfo(cfg *config.Application, namespace string, podName string) (intg.Integration, error) { + var instanceID, instanceName, version string + + if cfg.Registration.AlwaysUseFallbackRegisterID { + if cfg.Registration.FallbackRegisterID == "" { + errMsg := "fallback-register-id not set in config but always-use-fallback-register-id is true" + log.Errorf(errMsg) + return intg.Integration{}, fmt.Errorf(errMsg) + } + instanceID = cfg.Registration.FallbackRegisterID + instanceName = cfg.Registration.FallbackRegisterName + } else { + log.Infof("Attempting to determine instance ID using Pod: %s in Namespace: %s", podName, namespace) + instanceID, instanceName, version = getInstanceDataFromK8s(cfg, namespace, podName) + if instanceID == "" { + log.Infof("Could not determine instance ID from K8s deployment. Using fallback-register-id") + instanceID = cfg.Registration.FallbackRegisterID + instanceName = cfg.Registration.FallbackRegisterName + } + if instanceID == "" { + log.Infof("The fallback-register-id is not set. Generating a UUIDv4 to use as integration id") + instanceID = uuid.New().String() + } + } + accounts, namespaces := GetAccountsAndNamespacesForAgent(cfg) + explicitlyAccountBound := false + + if len(accounts) > 0 { + explicitlyAccountBound = true + } + + instance := intg.Integration{ + ID: instanceID, + InstanceID: podName, + Type: intg.IntegrationType, + Name: instanceName, + // Description: "", + Version: version, + // Status: READ ONLY SO NOT SET + StartedAt: jstime.Now().UTC(), + // LastSeen: READ ONLY SO NOT SET + Uptime: jstime.Duration{}, + Username: cfg.AnchoreDetails.User, + AccountName: cfg.AnchoreDetails.Account, + ExplicitlyAccountBound: explicitlyAccountBound, + Accounts: accounts, + Namespaces: namespaces, + Configuration: cfg, + ClusterName: cfg.KubeConfig.Cluster, + Namespace: namespace, + HealthReportInterval: cfg.HealthReportIntervalSeconds, + } + return instance, nil +} + +func getInstanceDataFromK8s(cfg *config.Application, namespace string, podName string) (string, string, string) { + kubeconfig, err := client.GetKubeConfig(cfg) + if err != nil { + log.Errorf("Failed to get Kubernetes config: %w", err) + return "", "", "" + } + + clientset, err := client.GetClientSet(kubeconfig) + if err != nil { + log.Errorf("failed to get k8s client set: %w", err) + return "", "", "" + } + + k8sClient := client.Client{ + Clientset: clientset, + } + + opts := metav1.GetOptions{} + pod, err := k8sClient.Clientset.CoreV1().Pods(namespace).Get(context.Background(), podName, opts) + if err != nil { + log.Errorf("failed to get pod: %w", err) + return "", "", "" + } + replicaSetName := pod.ObjectMeta.OwnerReferences[0].Name + replicaSet, err := k8sClient.Clientset.AppsV1().ReplicaSets(namespace).Get(context.Background(), replicaSetName, opts) + if err != nil { + log.Errorf("failed to get replica set: %w", err) + return "", "", "" + } + deploymentName := replicaSet.ObjectMeta.OwnerReferences[0].Name + deployment, err := k8sClient.Clientset.AppsV1().Deployments(namespace).Get(context.Background(), deploymentName, opts) + if err != nil { + log.Errorf("failed to get deployment: %w", err) + return "", "", "" + } + + version := deployment.Labels[AppVersionLabel] + instanceID := fmt.Sprint("", deployment.ObjectMeta.UID) + instanceName := deploymentName + return instanceID, instanceName, version +} diff --git a/skaffold.yaml b/skaffold.yaml index 4072691..f669bf2 100644 --- a/skaffold.yaml +++ b/skaffold.yaml @@ -23,6 +23,7 @@ deploy: k8sInventory.quiet: false k8sInventory.verboseInventoryReports: true k8sInventory.pollingIntervalSeconds: 60 + k8sInventory.healthReportIntervalSeconds: 60 k8sInventory.anchore.url: "http://host.docker.internal:8228" k8sInventory.anchore.user: "admin" k8sInventory.anchore.password: "foobar"