Skip to content

Commit

Permalink
Merge branch 'main' into feature/cloud-services
Browse files Browse the repository at this point in the history
Signed-off-by: Amit Schendel <[email protected]>
  • Loading branch information
amitschendel authored Dec 2, 2024
2 parents a5e2465 + 6259246 commit d2c90eb
Show file tree
Hide file tree
Showing 22 changed files with 1,668 additions and 176 deletions.
101 changes: 92 additions & 9 deletions go.mod

Large diffs are not rendered by default.

271 changes: 267 additions & 4 deletions go.sum

Large diffs are not rendered by default.

62 changes: 40 additions & 22 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,16 @@ import (
"syscall"

apitypes "github.com/armosec/armoapi-go/armotypes"
utilsmetadata "github.com/armosec/utils-k8s-go/armometadata"
mapset "github.com/deckarep/golang-set/v2"
containercollection "github.com/inspektor-gadget/inspektor-gadget/pkg/container-collection"
beUtils "github.com/kubescape/backend/pkg/utils"
"github.com/kubescape/go-logger"
"github.com/kubescape/go-logger/helpers"
"github.com/kubescape/k8s-interface/k8sinterface"
"github.com/kubescape/node-agent/pkg/applicationprofilemanager"
applicationprofilemanagerv1 "github.com/kubescape/node-agent/pkg/applicationprofilemanager/v1"
cloudmetadata "github.com/kubescape/node-agent/pkg/cloudmetadata"
"github.com/kubescape/node-agent/pkg/cloudmetadata"
"github.com/kubescape/node-agent/pkg/config"
"github.com/kubescape/node-agent/pkg/containerwatcher/v1"
"github.com/kubescape/node-agent/pkg/dnsmanager"
Expand Down Expand Up @@ -43,21 +50,15 @@ import (
"github.com/kubescape/node-agent/pkg/rulemanager"
rulemanagerv1 "github.com/kubescape/node-agent/pkg/rulemanager/v1"
"github.com/kubescape/node-agent/pkg/sbomhandler/syfthandler"
"github.com/kubescape/node-agent/pkg/sbommanager"
sbommanagerv1 "github.com/kubescape/node-agent/pkg/sbommanager/v1"
"github.com/kubescape/node-agent/pkg/seccompmanager"
seccompmanagerv1 "github.com/kubescape/node-agent/pkg/seccompmanager/v1"
"github.com/kubescape/node-agent/pkg/storage/v1"
"github.com/kubescape/node-agent/pkg/utils"
"github.com/kubescape/node-agent/pkg/validator"
"github.com/kubescape/node-agent/pkg/watcher/dynamicwatcher"
"github.com/kubescape/node-agent/pkg/watcher/seccompprofilewatcher"

utilsmetadata "github.com/armosec/utils-k8s-go/armometadata"
mapset "github.com/deckarep/golang-set/v2"

beUtils "github.com/kubescape/backend/pkg/utils"
"github.com/kubescape/go-logger"
"github.com/kubescape/go-logger/helpers"
"github.com/kubescape/k8s-interface/k8sinterface"
)

func main() {
Expand Down Expand Up @@ -130,10 +131,8 @@ func main() {
prometheusExporter = metricsmanager.NewMetricsMock()
}

nodeName := os.Getenv(config.NodeNameEnvVar)

// Detect the container containerRuntime of the node
containerRuntime, err := utils.DetectContainerRuntimeViaK8sAPI(ctx, k8sClient, nodeName)
containerRuntime, err := utils.DetectContainerRuntimeViaK8sAPI(ctx, k8sClient, cfg.NodeName)
if err != nil {
logger.L().Ctx(ctx).Fatal("error detecting the container runtime", helpers.Error(err))
}
Expand All @@ -143,7 +142,7 @@ func main() {
// Create watchers
dWatcher := dynamicwatcher.NewWatchHandler(k8sClient, storageClient.StorageClient, cfg.SkipNamespace)
// create k8sObject cache
k8sObjectCache, err := k8scache.NewK8sObjectCache(nodeName, k8sClient)
k8sObjectCache, err := k8scache.NewK8sObjectCache(cfg.NodeName, k8sClient)
if err != nil {
logger.L().Ctx(ctx).Fatal("error creating K8sObjectCache", helpers.Error(err))
}
Expand Down Expand Up @@ -216,7 +215,7 @@ func main() {
var cloudMetadata *apitypes.CloudMetadata

if cfg.EnableRuntimeDetection || cfg.EnableMalwareDetection {
cloudMetadata, err = cloudmetadata.GetCloudMetadata(ctx, k8sClient, nodeName)
cloudMetadata, err = cloudmetadata.GetCloudMetadata(ctx, k8sClient, cfg.NodeName)
if err != nil {
logger.L().Ctx(ctx).Error("error getting cloud metadata", helpers.Error(err))
}
Expand All @@ -227,16 +226,16 @@ func main() {
processManager = processmanagerv1.CreateProcessManager(ctx)

// create ruleBinding cache
ruleBindingCache := rulebindingcachev1.NewCache(nodeName, k8sClient)
ruleBindingCache := rulebindingcachev1.NewCache(cfg.NodeName, k8sClient)
dWatcher.AddAdaptor(ruleBindingCache)

ruleBindingNotify = make(chan rulebinding.RuleBindingNotify, 100)
ruleBindingCache.AddNotifier(&ruleBindingNotify)

apc := applicationprofilecache.NewApplicationProfileCache(nodeName, storageClient.StorageClient, cfg.MaxDelaySeconds)
apc := applicationprofilecache.NewApplicationProfileCache(cfg.NodeName, storageClient.StorageClient, cfg.MaxDelaySeconds)
dWatcher.AddAdaptor(apc)

nnc := networkneighborhoodcache.NewNetworkNeighborhoodCache(nodeName, storageClient.StorageClient, cfg.MaxDelaySeconds)
nnc := networkneighborhoodcache.NewNetworkNeighborhoodCache(cfg.NodeName, storageClient.StorageClient, cfg.MaxDelaySeconds)
dWatcher.AddAdaptor(nnc)

dc := dnscache.NewDnsCache(dnsResolver)
Expand All @@ -245,7 +244,7 @@ func main() {
objCache = objectcachev1.NewObjectCache(k8sObjectCache, apc, nnc, dc)

// create exporter
exporter := exporters.InitExporters(cfg.Exporters, clusterData.ClusterName, nodeName, cloudMetadata)
exporter := exporters.InitExporters(cfg.Exporters, clusterData.ClusterName, cfg.NodeName, cloudMetadata)

// create runtimeDetection managers
ruleManager, err = rulemanagerv1.CreateRuleManager(ctx, cfg, k8sClient, ruleBindingCache, objCache, exporter, prometheusExporter, nodeName, clusterData.ClusterName, processManager, dnsResolver)

Check failure on line 250 in main.go

View workflow job for this annotation

GitHub Actions / pr-created / test / Create cross-platform build

undefined: nodeName
Expand All @@ -264,7 +263,7 @@ func main() {
var profileManager nodeprofilemanager.NodeProfileManagerClient
if cfg.EnableNodeProfile {
// FIXME validate the HTTPExporterConfig before we use it ?
profileManager = nodeprofilemanagerv1.NewNodeProfileManager(cfg, *clusterData, nodeName, k8sObjectCache, relevancyManager, ruleManager)
profileManager = nodeprofilemanagerv1.NewNodeProfileManager(cfg, *clusterData, cfg.NodeName, k8sObjectCache, relevancyManager, ruleManager)
} else {
profileManager = nodeprofilemanager.NewNodeProfileManagerMock()
}
Expand All @@ -273,17 +272,36 @@ func main() {
var malwareManager malwaremanager.MalwareManagerClient
if cfg.EnableMalwareDetection {
// create exporter
exporter := exporters.InitExporters(cfg.Exporters, clusterData.ClusterName, nodeName, cloudMetadata)
malwareManager, err = malwaremanagerv1.CreateMalwareManager(cfg, k8sClient, nodeName, clusterData.ClusterName, exporter, prometheusExporter)
exporter := exporters.InitExporters(cfg.Exporters, clusterData.ClusterName, cfg.NodeName, cloudMetadata)
malwareManager, err = malwaremanagerv1.CreateMalwareManager(cfg, k8sClient, cfg.NodeName, clusterData.ClusterName, exporter, prometheusExporter)
if err != nil {
logger.L().Ctx(ctx).Fatal("error creating MalwareManager", helpers.Error(err))
}
} else {
malwareManager = malwaremanager.CreateMalwareManagerMock()
}

// Create the IG k8sClient
igK8sClient, err := containercollection.NewK8sClient(cfg.NodeName)
if err != nil {
logger.L().Fatal("error creating IG Kubernetes client", helpers.Error(err))
}
defer igK8sClient.Close()
logger.L().Info("IG Kubernetes client created", helpers.Interface("client", igK8sClient))

// Create the SBOM manager
var sbomManager sbommanager.SbomManagerClient
if cfg.EnableSbomGeneration {
sbomManager, err = sbommanagerv1.CreateSbomManager(ctx, cfg, igK8sClient.SocketPath, storageClient)
if err != nil {
logger.L().Ctx(ctx).Fatal("error creating SbomManager", helpers.Error(err))
}
} else {
sbomManager = sbommanager.CreateSbomManagerMock()
}

// Create the container handler
mainHandler, err := containerwatcher.CreateIGContainerWatcher(cfg, applicationProfileManager, k8sClient, relevancyManager, networkManagerClient, dnsManagerClient, prometheusExporter, ruleManager, malwareManager, preRunningContainersIDs, &ruleBindingNotify, containerRuntime, nil, processManager)
mainHandler, err := containerwatcher.CreateIGContainerWatcher(cfg, applicationProfileManager, k8sClient, igK8sClient, relevancyManager, networkManagerClient, dnsManagerClient, prometheusExporter, ruleManager, malwareManager, sbomManager, preRunningContainersIDs, &ruleBindingNotify, containerRuntime, nil, processManager)
if err != nil {
logger.L().Ctx(ctx).Fatal("error creating the container watcher", helpers.Error(err))
}
Expand Down
12 changes: 12 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package config

import (
"os"
"slices"
"time"

Expand All @@ -19,6 +20,8 @@ type Config struct {
UpdateDataPeriod time.Duration `mapstructure:"updateDataPeriod"`
MaxDelaySeconds int `mapstructure:"maxDelaySeconds"`
MaxJitterPercentage int `mapstructure:"maxJitterPercentage"`
MaxImageSize int64 `mapstructure:"maxImageSize"`
MaxSBOMSize int `mapstructure:"maxSBOMSize"`
EnableFullPathTracing bool `mapstructure:"fullPathTracingEnabled"`
EnableApplicationProfile bool `mapstructure:"applicationProfileServiceEnabled"`
EnableMalwareDetection bool `mapstructure:"malwareDetectionEnabled"`
Expand All @@ -32,6 +35,10 @@ type Config struct {
EnableSeccomp bool `mapstructure:"seccompServiceEnabled"`
ExcludeNamespaces []string `mapstructure:"excludeNamespaces"`
IncludeNamespaces []string `mapstructure:"includeNamespaces"`
EnableSbomGeneration bool `mapstructure:"sbomGenerationEnabled"`
NamespaceName string `mapstructure:"namespaceName"`
NodeName string `mapstructure:"nodeName"`
PodName string `mapstructure:"podName"`
}

// LoadConfig reads configuration from file or environment variables.
Expand All @@ -45,6 +52,11 @@ func LoadConfig(path string) (Config, error) {
viper.SetDefault("nodeProfileInterval", 10*time.Minute)
viper.SetDefault("maxDelaySeconds", 30)
viper.SetDefault("maxJitterPercentage", 5)
viper.SetDefault("maxImageSize", 5*1024*1024*1024)
viper.SetDefault("maxSBOMSize", 20*1024*1024)
viper.SetDefault("namespaceName", os.Getenv(NamespaceEnvVar))
viper.SetDefault("nodeName", os.Getenv(NodeNameEnvVar))
viper.SetDefault("podName", os.Getenv(PodNameEnvVar))

viper.AutomaticEnv()

Expand Down
2 changes: 2 additions & 0 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ func TestLoadConfig(t *testing.T) {
NodeProfileInterval: 1 * time.Minute,
MaxDelaySeconds: 30,
MaxJitterPercentage: 5,
MaxImageSize: 5368709120,
MaxSBOMSize: 20971520,
EnablePrometheusExporter: true,
EnableRuntimeDetection: true,
EnableSeccomp: true,
Expand Down
12 changes: 6 additions & 6 deletions pkg/containerwatcher/v1/container_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package containerwatcher
import (
"context"
"fmt"
"os"

mapset "github.com/deckarep/golang-set/v2"
"github.com/goradd/maps"
Expand Down Expand Up @@ -50,6 +49,7 @@ import (
"github.com/kubescape/node-agent/pkg/relevancymanager"
rulebinding "github.com/kubescape/node-agent/pkg/rulebindingmanager"
"github.com/kubescape/node-agent/pkg/rulemanager"
"github.com/kubescape/node-agent/pkg/sbommanager"
"github.com/kubescape/node-agent/pkg/utils"
"github.com/panjf2000/ants/v2"
)
Expand Down Expand Up @@ -85,18 +85,19 @@ type IGContainerWatcher struct {
cfg config.Config
containerSelector containercollection.ContainerSelector
ctx context.Context
nodeName string
podName string
namespace string

// Clients
applicationProfileManager applicationprofilemanager.ApplicationProfileManagerClient
igK8sClient *containercollection.K8sClient
k8sClient *k8sinterface.KubernetesApi
relevancyManager relevancymanager.RelevancyManagerClient
networkManager networkmanager.NetworkManagerClient
dnsManager dnsmanager.DNSManagerClient
ruleManager rulemanager.RuleManagerClient
malwareManager malwaremanager.MalwareManagerClient
sbomManager sbommanager.SbomManagerClient
// IG Collections
containerCollection *containercollection.ContainerCollection
tracerCollection *tracercollection.TracerCollection
Expand Down Expand Up @@ -160,7 +161,7 @@ type IGContainerWatcher struct {

var _ containerwatcher.ContainerWatcher = (*IGContainerWatcher)(nil)

func CreateIGContainerWatcher(cfg config.Config, applicationProfileManager applicationprofilemanager.ApplicationProfileManagerClient, k8sClient *k8sinterface.KubernetesApi, relevancyManager relevancymanager.RelevancyManagerClient, networkManagerClient networkmanager.NetworkManagerClient, dnsManagerClient dnsmanager.DNSManagerClient, metrics metricsmanager.MetricsManager, ruleManager rulemanager.RuleManagerClient, malwareManager malwaremanager.MalwareManagerClient, preRunningContainers mapset.Set[string], ruleBindingPodNotify *chan rulebinding.RuleBindingNotify, runtime *containerutilsTypes.RuntimeConfig, thirdPartyEventReceivers *maps.SafeMap[utils.EventType, mapset.Set[containerwatcher.EventReceiver]], processManager processmanager.ProcessManagerClient) (*IGContainerWatcher, error) {
func CreateIGContainerWatcher(cfg config.Config, applicationProfileManager applicationprofilemanager.ApplicationProfileManagerClient, k8sClient *k8sinterface.KubernetesApi, igK8sClient *containercollection.K8sClient, relevancyManager relevancymanager.RelevancyManagerClient, networkManagerClient networkmanager.NetworkManagerClient, dnsManagerClient dnsmanager.DNSManagerClient, metrics metricsmanager.MetricsManager, ruleManager rulemanager.RuleManagerClient, malwareManager malwaremanager.MalwareManagerClient, sbomManager sbommanager.SbomManagerClient, preRunningContainers mapset.Set[string], ruleBindingPodNotify *chan rulebinding.RuleBindingNotify, runtime *containerutilsTypes.RuntimeConfig, thirdPartyEventReceivers *maps.SafeMap[utils.EventType, mapset.Set[containerwatcher.EventReceiver]], processManager processmanager.ProcessManagerClient) (*IGContainerWatcher, error) {
// Use container collection to get notified for new containers
containerCollection := &containercollection.ContainerCollection{}
// Create a tracer collection instance
Expand Down Expand Up @@ -421,18 +422,17 @@ func CreateIGContainerWatcher(cfg config.Config, applicationProfileManager appli
// Configuration
cfg: cfg,
containerSelector: containercollection.ContainerSelector{}, // Empty selector to get all containers
nodeName: os.Getenv(config.NodeNameEnvVar),
podName: os.Getenv(config.PodNameEnvVar),
namespace: os.Getenv(config.NamespaceEnvVar),

// Clients
applicationProfileManager: applicationProfileManager,
igK8sClient: igK8sClient,
k8sClient: k8sClient,
relevancyManager: relevancyManager,
networkManager: networkManagerClient,
dnsManager: dnsManagerClient,
ruleManager: ruleManager,
malwareManager: malwareManager,
sbomManager: sbomManager,
// IG Collections
containerCollection: containerCollection,
tracerCollection: tracerCollection,
Expand Down
10 changes: 3 additions & 7 deletions pkg/containerwatcher/v1/container_watcher_private.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func (ch *IGContainerWatcher) startContainerCollection(ctx context.Context) erro
ch.networkManager.ContainerCallback,
ch.malwareManager.ContainerCallback,
ch.ruleManager.ContainerCallback,
ch.sbomManager.ContainerCallback,
ch.processManager.ContainerCallback,
ch.dnsManager.ContainerCallback,
}
Expand Down Expand Up @@ -121,7 +122,7 @@ func (ch *IGContainerWatcher) startContainerCollection(ctx context.Context) erro
containercollection.WithTracerCollection(ch.tracerCollection),

// Enrich those containers with data from the Kubernetes API
containercollection.WithKubernetesEnrichment(ch.nodeName, ch.k8sClient.K8SConfig),
containercollection.WithKubernetesEnrichment(ch.cfg.NodeName, ch.k8sClient.K8SConfig),
}

// Initialize the container collection
Expand All @@ -136,13 +137,8 @@ func (ch *IGContainerWatcher) startContainerCollection(ctx context.Context) erro
}

func (ch *IGContainerWatcher) startRunningContainers() {
k8sClient, err := containercollection.NewK8sClient(ch.nodeName)
if err != nil {
logger.L().Fatal("creating IG Kubernetes client", helpers.Error(err))
}
defer k8sClient.Close()
for n := range *ch.ruleBindingPodNotify {
ch.addRunningContainers(k8sClient, &n)
ch.addRunningContainers(ch.igK8sClient, &n)
}
}

Expand Down
7 changes: 3 additions & 4 deletions pkg/containerwatcher/v1/open_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@ import (
"context"
"testing"

traceropentype "github.com/inspektor-gadget/inspektor-gadget/pkg/gadgets/trace/open/types"
"github.com/inspektor-gadget/inspektor-gadget/pkg/types"
"github.com/kubescape/node-agent/pkg/config"
"github.com/kubescape/node-agent/pkg/filehandler/v1"
"github.com/kubescape/node-agent/pkg/metricsmanager"
"github.com/kubescape/node-agent/pkg/relevancymanager/v1"

traceropentype "github.com/inspektor-gadget/inspektor-gadget/pkg/gadgets/trace/open/types"
"github.com/inspektor-gadget/inspektor-gadget/pkg/types"
"github.com/stretchr/testify/assert"
)

Expand All @@ -23,7 +22,7 @@ func BenchmarkIGContainerWatcher_openEventCallback(b *testing.B) {
assert.NoError(b, err)
mockExporter := metricsmanager.NewMetricsMock()

mainHandler, err := CreateIGContainerWatcher(cfg, nil, nil, relevancyManager, nil, nil, mockExporter, nil, nil, nil, nil, nil, nil, nil)
mainHandler, err := CreateIGContainerWatcher(cfg, nil, nil, nil, relevancyManager, nil, nil, mockExporter, nil, nil, nil, nil, nil, nil, nil, nil)
assert.NoError(b, err)
event := &traceropentype.Event{
Event: types.Event{
Expand Down
4 changes: 4 additions & 0 deletions pkg/filehandler/v1/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"fmt"
"sync"

"github.com/kubescape/go-logger"
"github.com/kubescape/go-logger/helpers"
"github.com/kubescape/node-agent/pkg/filehandler"
)

Expand Down Expand Up @@ -45,6 +47,7 @@ func (s *InMemoryFileHandler) AddFile(bucket, file string) {
files: make(map[string]bool, initFileListLength),
}
s.buckets[bucket] = bucketFiles
logger.L().Debug("Created new bucket", helpers.String("bucket", bucket))
}
s.mutex.Unlock()
}
Expand Down Expand Up @@ -109,6 +112,7 @@ func (s *InMemoryFileHandler) AddFiles(bucket string, files map[string]bool) err
files: make(map[string]bool, initFileListLength),
}
s.buckets[bucket] = bucketFiles
logger.L().Debug("Created new bucket", helpers.String("bucket", bucket))
}
s.mutex.Unlock()
}
Expand Down
Loading

0 comments on commit d2c90eb

Please sign in to comment.