Skip to content

Commit

Permalink
use ttl cache for saved elements, remove compression
Browse files Browse the repository at this point in the history
Signed-off-by: Matthias Bertschy <[email protected]>
  • Loading branch information
matthyx committed Nov 8, 2024
1 parent bb78741 commit 504548a
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 103 deletions.
128 changes: 55 additions & 73 deletions pkg/applicationprofilemanager/v1/applicationprofile_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"istio.io/pkg/cache"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
Expand All @@ -48,11 +49,11 @@ type ApplicationProfileManager struct {
containerMutexes storageUtils.MapMutex[string] // key is k8sContainerID
trackedContainers mapset.Set[string] // key is k8sContainerID
removedContainers mapset.Set[string] // key is k8sContainerID
droppedEvents maps.SafeMap[string, bool] // key is k8sContainerID
savedCapabilities maps.SafeMap[string, mapset.Set[string]] // key is k8sContainerID
savedEndpoints maps.SafeMap[string, *maps.SafeMap[string, *v1beta1.HTTPEndpoint]] // key is k8sContainerID
savedExecs maps.SafeMap[string, *maps.SafeMap[string, []string]] // key is k8sContainerID
savedOpens maps.SafeMap[string, *maps.SafeMap[string, mapset.Set[string]]] // key is k8sContainerID
droppedEventsContainers mapset.Set[string] // key is k8sContainerID
savedCapabilities maps.SafeMap[string, cache.ExpiringCache] // key is k8sContainerID
savedEndpoints maps.SafeMap[string, cache.ExpiringCache] // key is k8sContainerID
savedExecs maps.SafeMap[string, cache.ExpiringCache] // key is k8sContainerID
savedOpens maps.SafeMap[string, cache.ExpiringCache] // key is k8sContainerID
savedSyscalls maps.SafeMap[string, mapset.Set[string]] // key is k8sContainerID
toSaveCapabilities maps.SafeMap[string, mapset.Set[string]] // key is k8sContainerID
toSaveEndpoints maps.SafeMap[string, *maps.SafeMap[string, *v1beta1.HTTPEndpoint]] // key is k8sContainerID
Expand All @@ -71,17 +72,18 @@ var _ applicationprofilemanager.ApplicationProfileManagerClient = (*ApplicationP

func CreateApplicationProfileManager(ctx context.Context, cfg config.Config, clusterName string, k8sClient k8sclient.K8sClientInterface, storageClient storage.StorageClient, preRunningContainerIDs mapset.Set[string], k8sObjectCache objectcache.K8sObjectCache, seccompManager seccompmanager.SeccompManagerClient) (*ApplicationProfileManager, error) {
return &ApplicationProfileManager{
cfg: cfg,
clusterName: clusterName,
ctx: ctx,
k8sClient: k8sClient,
k8sObjectCache: k8sObjectCache,
storageClient: storageClient,
containerMutexes: storageUtils.NewMapMutex[string](),
trackedContainers: mapset.NewSet[string](),
removedContainers: mapset.NewSet[string](),
preRunningContainerIDs: preRunningContainerIDs,
seccompManager: seccompManager,
cfg: cfg,
clusterName: clusterName,
ctx: ctx,
k8sClient: k8sClient,
k8sObjectCache: k8sObjectCache,
storageClient: storageClient,
containerMutexes: storageUtils.NewMapMutex[string](),
trackedContainers: mapset.NewSet[string](),
removedContainers: mapset.NewSet[string](),
droppedEventsContainers: mapset.NewSet[string](),
preRunningContainerIDs: preRunningContainerIDs,
seccompManager: seccompManager,
}, nil
}

Expand Down Expand Up @@ -140,7 +142,7 @@ func (am *ApplicationProfileManager) deleteResources(watchedContainer *utils.Wat
// delete resources
watchedContainer.UpdateDataTicker.Stop()
am.trackedContainers.Remove(watchedContainer.K8sContainerID)
am.droppedEvents.Delete(watchedContainer.K8sContainerID)
am.droppedEventsContainers.Remove(watchedContainer.K8sContainerID)
am.savedCapabilities.Delete(watchedContainer.K8sContainerID)
am.savedEndpoints.Delete(watchedContainer.K8sContainerID)
am.savedExecs.Delete(watchedContainer.K8sContainerID)
Expand Down Expand Up @@ -246,7 +248,7 @@ func (am *ApplicationProfileManager) saveProfile(ctx context.Context, watchedCon
// sleep for container index second to desynchronize the profiles saving
time.Sleep(time.Duration(watchedContainer.ContainerIndex) * time.Second)

if droppedEvents := am.droppedEvents.Get(watchedContainer.K8sContainerID); droppedEvents {
if am.droppedEventsContainers.ContainsOne(watchedContainer.K8sContainerID) {
watchedContainer.SetStatus(utils.WatchedContainerStatusMissingRuntime)
}

Expand Down Expand Up @@ -532,50 +534,32 @@ func (am *ApplicationProfileManager) saveProfile(ctx context.Context, watchedCon
// record saved syscalls
am.savedSyscalls.Get(watchedContainer.K8sContainerID).Append(toSaveSyscalls...)
// record saved capabilities
am.savedCapabilities.Get(watchedContainer.K8sContainerID).Append(capabilities...)
savedCapabilities := am.savedCapabilities.Get(watchedContainer.K8sContainerID)
for _, capability := range capabilities {
savedCapabilities.Set(capability, nil)
}
// record saved endpoints
savedEndpoints := am.savedEndpoints.Get(watchedContainer.K8sContainerID)
toSaveEndpoints.Range(func(path string, endpoint *v1beta1.HTTPEndpoint) bool {
if !am.savedEndpoints.Get(watchedContainer.K8sContainerID).Has(path) {
am.savedEndpoints.Get(watchedContainer.K8sContainerID).Set(path, endpoint)
}
savedEndpoints.Set(path, endpoint)
return true
})
// record saved execs
savedExecs := am.savedExecs.Get(watchedContainer.K8sContainerID)
toSaveExecs.Range(func(uniqueExecIdentifier string, v []string) bool {
if !am.savedExecs.Get(watchedContainer.K8sContainerID).Has(uniqueExecIdentifier) {
am.savedExecs.Get(watchedContainer.K8sContainerID).Set(uniqueExecIdentifier, v)
}
savedExecs.Set(uniqueExecIdentifier, v)
return true
})
// record saved opens
savedOpens := am.savedOpens.Get(watchedContainer.K8sContainerID)
toSaveOpens.Range(utils.SetInMap(savedOpens))
// use a dynamic path detector to compress opens
analyzer := dynamicpathdetector.NewPathAnalyzer(OpenDynamicThreshold)
keys := savedOpens.Keys()
// first pass to learn the opens
for _, path := range keys {
_, _ = dynamicpathdetector.AnalyzeOpen(path, analyzer)
}
// second pass to compress the opens
for _, path := range keys {
result, err := dynamicpathdetector.AnalyzeOpen(path, analyzer)
if err != nil {
continue
}
if result != path {
// path becomes compressed
// we avoid a lock by using Pop to remove path and retrieve its flags
pathFlags := savedOpens.Pop(path)
if savedOpens.Has(result) {
// merge flags
savedOpens.Get(result).Append(pathFlags.ToSlice()...)
} else {
// create new entry
savedOpens.Set(result, pathFlags)
}
toSaveOpens.Range(func(path string, newOpens mapset.Set[string]) bool {
if oldOpens, ok := savedOpens.Get(path); ok {
oldOpens.(mapset.Set[string]).Append(newOpens.ToSlice()...)
} else {
savedOpens.Set(path, newOpens)
}
}
return true
})
logger.L().Debug("ApplicationProfileManager - saved application profile",
helpers.Int("capabilities", len(capabilities)),
helpers.Int("endpoints", toSaveEndpoints.Len()),
Expand Down Expand Up @@ -651,11 +635,10 @@ func (am *ApplicationProfileManager) ContainerCallback(notif containercollection
if am.watchedContainerChannels.Has(notif.Container.Runtime.ContainerID) {
return
}
am.savedCapabilities.Set(k8sContainerID, mapset.NewSet[string]())
am.droppedEvents.Set(k8sContainerID, false)
am.savedEndpoints.Set(k8sContainerID, new(maps.SafeMap[string, *v1beta1.HTTPEndpoint]))
am.savedExecs.Set(k8sContainerID, new(maps.SafeMap[string, []string]))
am.savedOpens.Set(k8sContainerID, new(maps.SafeMap[string, mapset.Set[string]]))
am.savedCapabilities.Set(k8sContainerID, cache.NewTTL(5*am.cfg.UpdateDataPeriod, am.cfg.UpdateDataPeriod))
am.savedEndpoints.Set(k8sContainerID, cache.NewTTL(5*am.cfg.UpdateDataPeriod, am.cfg.UpdateDataPeriod))
am.savedExecs.Set(k8sContainerID, cache.NewTTL(5*am.cfg.UpdateDataPeriod, am.cfg.UpdateDataPeriod))
am.savedOpens.Set(k8sContainerID, cache.NewTTL(5*am.cfg.UpdateDataPeriod, am.cfg.UpdateDataPeriod))
am.savedSyscalls.Set(k8sContainerID, mapset.NewSet[string]())
am.toSaveCapabilities.Set(k8sContainerID, mapset.NewSet[string]())
am.toSaveEndpoints.Set(k8sContainerID, new(maps.SafeMap[string, *v1beta1.HTTPEndpoint]))
Expand All @@ -681,9 +664,11 @@ func (am *ApplicationProfileManager) ReportCapability(k8sContainerID, capability
if err := am.waitForContainer(k8sContainerID); err != nil {
return
}
if am.savedCapabilities.Get(k8sContainerID).ContainsOne(capability) {
// check if we already have this capability
if _, ok := am.savedCapabilities.Get(k8sContainerID).Get(capability); ok {
return
}
// add to capability map
am.toSaveCapabilities.Get(k8sContainerID).Add(capability)
}

Expand All @@ -697,15 +682,12 @@ func (am *ApplicationProfileManager) ReportFileExec(k8sContainerID, path string,
}
// check if we already have this exec
// we use a SHA256 hash of the exec to identify it uniquely (path + args, in the order they were provided)
savedExecs := am.savedExecs.Get(k8sContainerID)
execIdentifier := utils.CalculateSHA256FileExecHash(path, args)
if savedExecs.Has(execIdentifier) {
if _, ok := am.savedExecs.Get(k8sContainerID).Get(execIdentifier); ok {
return
}

// add to exec map, first element is the path, the rest are the args
execMap := am.toSaveExecs.Get(k8sContainerID)
execMap.Set(execIdentifier, append([]string{path}, args...))
am.toSaveExecs.Get(k8sContainerID).Set(execIdentifier, append([]string{path}, args...))
}

func (am *ApplicationProfileManager) ReportFileOpen(k8sContainerID, path string, flags []string) {
Expand All @@ -718,8 +700,7 @@ func (am *ApplicationProfileManager) ReportFileOpen(k8sContainerID, path string,
path = procRegex.ReplaceAllString(path, "/proc/"+dynamicpathdetector.DynamicIdentifier)
}
// check if we already have this open
savedOpens := am.savedOpens.Get(k8sContainerID)
if savedOpens.Has(path) && savedOpens.Get(path).Contains(flags...) {
if opens, ok := am.savedOpens.Get(k8sContainerID).Get(path); ok && opens.(mapset.Set[string]).Contains(flags...) {
return
}
// add to open map
Expand All @@ -732,28 +713,29 @@ func (am *ApplicationProfileManager) ReportFileOpen(k8sContainerID, path string,
}

func (am *ApplicationProfileManager) ReportDroppedEvent(k8sContainerID string) {
if err := am.waitForContainer(k8sContainerID); err != nil {
return
}
am.droppedEvents.Set(k8sContainerID, true)
am.droppedEventsContainers.Add(k8sContainerID)
}

func (am *ApplicationProfileManager) ReportHTTPEvent(k8sContainerID string, event *tracerhttptype.Event) {
if err := am.waitForContainer(k8sContainerID); err != nil {
return
}

// get endpoint from event
endpointIdentifier, err := am.GetEndpointIdentifier(event)
if err != nil {
logger.L().Ctx(am.ctx).Warning("ApplicationProfileManager - failed to get endpoint identifier", helpers.Error(err))
return
}

endpoint, err := GetNewEndpoint(event, endpointIdentifier)
if err != nil {
logger.L().Ctx(am.ctx).Warning("ApplicationProfileManager - failed to get new endpoint", helpers.Error(err))
return
}

endpointMap := am.toSaveEndpoints.Get(k8sContainerID)
// check if we already have this endpoint
endpointHash := CalculateHTTPEndpointHash(endpoint)
endpointMap.Set(endpointHash, endpoint)
if _, ok := am.savedEndpoints.Get(k8sContainerID).Get(endpointHash); ok {
return
}
// add to endpoint map
am.toSaveEndpoints.Get(k8sContainerID).Set(endpointHash, endpoint)
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestApplicationProfileManager(t *testing.T) {
cfg := config.Config{
InitialDelay: 1 * time.Second,
MaxSniffingTime: 5 * time.Minute,
UpdateDataPeriod: 1 * time.Second,
UpdateDataPeriod: 5 * time.Second,
}
ctx := context.TODO()
k8sClient := &k8sclient.K8sClientMock{}
Expand Down
55 changes: 26 additions & 29 deletions pkg/networkmanager/v2/network_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,8 @@ import (
"fmt"
"time"

"github.com/kubescape/node-agent/pkg/config"
"github.com/kubescape/node-agent/pkg/dnsmanager"
"github.com/kubescape/node-agent/pkg/k8sclient"
"github.com/kubescape/node-agent/pkg/networkmanager"
"github.com/kubescape/node-agent/pkg/objectcache"
"github.com/kubescape/node-agent/pkg/storage"
"github.com/kubescape/node-agent/pkg/utils"

"k8s.io/utils/ptr"

"github.com/cenkalti/backoff/v4"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

helpersv1 "github.com/kubescape/k8s-interface/instanceidhandler/v1/helpers"

"github.com/armosec/utils-k8s-go/wlid"
"github.com/cenkalti/backoff/v4"
mapset "github.com/deckarep/golang-set/v2"
"github.com/google/uuid"
"github.com/goradd/maps"
Expand All @@ -31,12 +16,24 @@ import (
"github.com/kubescape/go-logger"
"github.com/kubescape/go-logger/helpers"
"github.com/kubescape/k8s-interface/instanceidhandler/v1"
helpersv1 "github.com/kubescape/k8s-interface/instanceidhandler/v1/helpers"
"github.com/kubescape/k8s-interface/workloadinterface"
"github.com/kubescape/node-agent/pkg/config"
"github.com/kubescape/node-agent/pkg/dnsmanager"
"github.com/kubescape/node-agent/pkg/k8sclient"
"github.com/kubescape/node-agent/pkg/networkmanager"
"github.com/kubescape/node-agent/pkg/objectcache"
"github.com/kubescape/node-agent/pkg/storage"
"github.com/kubescape/node-agent/pkg/utils"
"github.com/kubescape/storage/pkg/apis/softwarecomposition/v1beta1"
storageUtils "github.com/kubescape/storage/pkg/utils"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"istio.io/pkg/cache"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/ptr"
)

type NetworkManager struct {
Expand All @@ -46,8 +43,8 @@ type NetworkManager struct {
containerMutexes storageUtils.MapMutex[string] // key is k8sContainerID
trackedContainers mapset.Set[string] // key is k8sContainerID
removedContainers mapset.Set[string] // key is k8sContainerID
droppedEvents maps.SafeMap[string, bool] // key is k8sContainerID
savedEvents maps.SafeMap[string, mapset.Set[networkmanager.NetworkEvent]] // key is k8sContainerID
droppedEventsContainers mapset.Set[string] // key is k8sContainerID
savedEvents maps.SafeMap[string, cache.ExpiringCache] // key is k8sContainerID
toSaveEvents maps.SafeMap[string, mapset.Set[networkmanager.NetworkEvent]] // key is k8sContainerID
watchedContainerChannels maps.SafeMap[string, chan error] // key is ContainerID
k8sClient k8sclient.K8sClientInterface
Expand Down Expand Up @@ -135,11 +132,12 @@ func (nm *NetworkManager) deleteResources(watchedContainer *utils.WatchedContain
// delete resources
watchedContainer.UpdateDataTicker.Stop()
nm.trackedContainers.Remove(watchedContainer.K8sContainerID)
nm.droppedEvents.Delete(watchedContainer.K8sContainerID)
nm.droppedEventsContainers.Remove(watchedContainer.K8sContainerID)
nm.savedEvents.Delete(watchedContainer.K8sContainerID)
nm.toSaveEvents.Delete(watchedContainer.K8sContainerID)
nm.watchedContainerChannels.Delete(watchedContainer.ContainerID)
}

func (nm *NetworkManager) ContainerReachedMaxTime(containerID string) {
if channel := nm.watchedContainerChannels.Get(containerID); channel != nil {
channel <- utils.ContainerReachedMaxTime
Expand Down Expand Up @@ -250,7 +248,7 @@ func (nm *NetworkManager) saveNetworkEvents(ctx context.Context, watchedContaine
// sleep for container index second to desynchronize the profiles saving
time.Sleep(time.Duration(watchedContainer.ContainerIndex) * time.Second)

if droppedEvents := nm.droppedEvents.Get(watchedContainer.K8sContainerID); droppedEvents {
if nm.droppedEventsContainers.ContainsOne(watchedContainer.K8sContainerID) {
watchedContainer.SetStatus(utils.WatchedContainerStatusMissingRuntime)
}

Expand Down Expand Up @@ -413,7 +411,11 @@ func (nm *NetworkManager) saveNetworkEvents(ctx context.Context, watchedContaine
watchedContainer.ResetStatusUpdatedFlag()

// record saved events
nm.savedEvents.Get(watchedContainer.K8sContainerID).Append(toSaveEvents.ToSlice()...)
savedEvents := nm.savedEvents.Get(watchedContainer.K8sContainerID)
toSaveEvents.Each(func(event networkmanager.NetworkEvent) bool {
savedEvents.Set(event, nil)
return false
})
logger.L().Debug("NetworkManager - saved neighborhood",
helpers.Int("events", toSaveEvents.Cardinality()),
helpers.String("slug", slug),
Expand Down Expand Up @@ -489,8 +491,7 @@ func (nm *NetworkManager) ContainerCallback(notif containercollection.PubSubEven
helpers.String("k8s workload", k8sContainerID))
return
}
nm.droppedEvents.Set(k8sContainerID, false)
nm.savedEvents.Set(k8sContainerID, mapset.NewSet[networkmanager.NetworkEvent]())
nm.savedEvents.Set(k8sContainerID, cache.NewTTL(5*nm.cfg.UpdateDataPeriod, nm.cfg.UpdateDataPeriod))
nm.toSaveEvents.Set(k8sContainerID, mapset.NewSet[networkmanager.NetworkEvent]())
nm.removedContainers.Remove(k8sContainerID) // make sure container is not in the removed list
nm.trackedContainers.Add(k8sContainerID)
Expand Down Expand Up @@ -526,18 +527,14 @@ func (nm *NetworkManager) ReportNetworkEvent(k8sContainerID string, event tracer
networkEvent.SetDestinationPodLabels(event.DstEndpoint.PodLabels)

// skip if we already saved this event
savedEvents := nm.savedEvents.Get(k8sContainerID)
if savedEvents.Contains(networkEvent) {
if _, ok := nm.savedEvents.Get(k8sContainerID).Get(networkEvent); ok {
return
}
nm.toSaveEvents.Get(k8sContainerID).Add(networkEvent)
}

func (nm *NetworkManager) ReportDroppedEvent(k8sContainerID string) {
if err := nm.waitForContainer(k8sContainerID); err != nil {
return
}
nm.droppedEvents.Set(k8sContainerID, true)
nm.droppedEventsContainers.Add(k8sContainerID)
}

func (nm *NetworkManager) createNetworkNeighbor(networkEvent networkmanager.NetworkEvent, namespace string) *v1beta1.NetworkNeighbor {
Expand Down

0 comments on commit 504548a

Please sign in to comment.