Skip to content

Commit 36512a2

Browse files
David Wertenteilamirmalka
andauthored
Replace bolt db (#104)
* enable profiler Signed-off-by: David Wertenteil <[email protected]> * Update main.go * modify gh actions Signed-off-by: David Wertenteil <[email protected]> * fixed wf * ermove units * fixed main * add build wf Signed-off-by: David Wertenteil <[email protected]> * add pprof Signed-off-by: David Wertenteil <[email protected]> * remove env * start profiler on 6060 * add simple db * fixed lock * adding mutex per bucket * remove files after SBOM creation * clear after filtered sbom creation * release lock * adding logs * fixed lock * change map to pointer * add test flow * commented out otel tracing Signed-off-by: Amir Malka <[email protected]> * adding e2e test * add push * build only on main * update tests * remove pointers * fixed test file * do not remove after 2 min * fixed filepath * add demo file * update conf * fixed container stopped logs * cleanup code * cleanup * rename file handlers * update logs --------- Signed-off-by: David Wertenteil <[email protected]> Signed-off-by: Amir Malka <[email protected]> Co-authored-by: Amir Malka <[email protected]>
1 parent 7fcf782 commit 36512a2

File tree

13 files changed

+161
-91
lines changed

13 files changed

+161
-91
lines changed

go.mod

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ require (
1818
go.opentelemetry.io/otel v1.16.0
1919
go.opentelemetry.io/otel/trace v1.16.0
2020
golang.org/x/sys v0.10.0
21-
k8s.io/apimachinery v0.27.3
22-
k8s.io/client-go v0.27.3
21+
k8s.io/apimachinery v0.27.4
22+
k8s.io/client-go v0.27.4
2323
)
2424

2525
require (
@@ -133,8 +133,8 @@ require (
133133
gopkg.in/square/go-jose.v2 v2.6.0 // indirect
134134
gopkg.in/yaml.v2 v2.4.0 // indirect
135135
gopkg.in/yaml.v3 v3.0.1 // indirect
136-
k8s.io/api v0.27.3 // indirect
137-
k8s.io/cri-api v0.27.3 // indirect
136+
k8s.io/api v0.27.4 // indirect
137+
k8s.io/cri-api v0.27.4 // indirect
138138
k8s.io/klog/v2 v2.90.1 // indirect
139139
k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f // indirect
140140
k8s.io/utils v0.0.0-20230220204549-a5ecb0141aa5 // indirect

go.sum

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1
364364
github.com/prometheus/procfs v0.10.1 h1:kYK1Va/YMlutzCGazswoHKo//tZVlFpKYh+PymziUAg=
365365
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
366366
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
367-
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
367+
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
368368
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
369369
github.com/s3rj1k/go-fanotify/fanotify v0.0.0-20210917134616-9c00a300bb7a h1:np2nR32/A/VcOG9Hn+IOPA8kMk1gbBzK5LpSsgq5pJI=
370370
github.com/s3rj1k/go-fanotify/fanotify v0.0.0-20210917134616-9c00a300bb7a/go.mod h1:wiP6GQ2T378F+YIyuNw7yXtBxJZR+fqrrn1Z6UHZi0Q=
@@ -888,14 +888,14 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh
888888
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
889889
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
890890
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
891-
k8s.io/api v0.27.3 h1:yR6oQXXnUEBWEWcvPWS0jQL575KoAboQPfJAuKNrw5Y=
892-
k8s.io/api v0.27.3/go.mod h1:C4BNvZnQOF7JA/0Xed2S+aUyJSfTGkGFxLXz9MnpIpg=
893-
k8s.io/apimachinery v0.27.3 h1:Ubye8oBufD04l9QnNtW05idcOe9Z3GQN8+7PqmuVcUM=
894-
k8s.io/apimachinery v0.27.3/go.mod h1:XNfZ6xklnMCOGGFNqXG7bUrQCoR04dh/E7FprV6pb+E=
895-
k8s.io/client-go v0.27.3 h1:7dnEGHZEJld3lYwxvLl7WoehK6lAq7GvgjxpA3nv1E8=
896-
k8s.io/client-go v0.27.3/go.mod h1:2MBEKuTo6V1lbKy3z1euEGnhPfGZLKTS9tiJ2xodM48=
897-
k8s.io/cri-api v0.27.3 h1:MkUcz7FMDA/BVSoC0iWI9uFjYG0Pd//gOdPKb4pKasY=
898-
k8s.io/cri-api v0.27.3/go.mod h1:+Ts/AVYbIo04S86XbTD73UPp/DkTiYxtsFeOFEu32L0=
891+
k8s.io/api v0.27.4 h1:0pCo/AN9hONazBKlNUdhQymmnfLRbSZjd5H5H3f0bSs=
892+
k8s.io/api v0.27.4/go.mod h1:O3smaaX15NfxjzILfiln1D8Z3+gEYpjEpiNA/1EVK1Y=
893+
k8s.io/apimachinery v0.27.4 h1:CdxflD4AF61yewuid0fLl6bM4a3q04jWel0IlP+aYjs=
894+
k8s.io/apimachinery v0.27.4/go.mod h1:XNfZ6xklnMCOGGFNqXG7bUrQCoR04dh/E7FprV6pb+E=
895+
k8s.io/client-go v0.27.4 h1:vj2YTtSJ6J4KxaC88P4pMPEQECWMY8gqPqsTgUKzvjk=
896+
k8s.io/client-go v0.27.4/go.mod h1:ragcly7lUlN0SRPk5/ZkGnDjPknzb37TICq07WhI6Xc=
897+
k8s.io/cri-api v0.27.4 h1:OqLsrkRpiEieMcNNqf1WxoMQyzDjOd/zUISrwjS5zAw=
898+
k8s.io/cri-api v0.27.4/go.mod h1:+Ts/AVYbIo04S86XbTD73UPp/DkTiYxtsFeOFEu32L0=
899899
k8s.io/klog/v2 v2.90.1 h1:m4bYOKall2MmOiRaR1J+We67Do7vm9KiQVlT96lnHUw=
900900
k8s.io/klog/v2 v2.90.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0=
901901
k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f h1:2kWPakN3i/k81b0gvD5C5FJ2kxm1WrQFanWchyKuqGg=

main.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package main
22

33
import (
44
"context"
5-
"log"
5+
"net/http"
66
"net/url"
77
"node-agent/internal/validator"
88
"node-agent/pkg/config"
@@ -18,6 +18,8 @@ import (
1818
"github.com/kubescape/go-logger/helpers"
1919
"github.com/kubescape/k8s-interface/k8sinterface"
2020
"github.com/spf13/afero"
21+
22+
_ "net/http/pprof"
2123
)
2224

2325
func main() {
@@ -48,8 +50,13 @@ func main() {
4850
logger.L().Ctx(ctx).Fatal("error during validation", helpers.Error(err))
4951
}
5052

53+
if _, present := os.LookupEnv("ENABLE_PROFILER"); present {
54+
logger.L().Info("Starting profiler on port 6060")
55+
go http.ListenAndServe("localhost:6060", nil)
56+
}
57+
5158
// Create the relevancy manager
52-
fileHandler, err := filehandler.CreateBoltFileHandler()
59+
fileHandler, err := filehandler.CreateInMemoryFileHandler()
5360
if err != nil {
5461
logger.L().Ctx(ctx).Fatal("failed to create fileDB", helpers.Error(err))
5562
}
@@ -81,7 +88,6 @@ func main() {
8188
shutdown := make(chan os.Signal, 1)
8289
signal.Notify(shutdown, os.Interrupt, syscall.SIGTERM)
8390
<-shutdown
84-
log.Println("Shutting down...")
8591

8692
// Exit with success
8793
os.Exit(0)

pkg/containerwatcher/v1/container_watcher.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"github.com/kubescape/go-logger"
2121
"github.com/kubescape/go-logger/helpers"
2222
"github.com/kubescape/k8s-interface/k8sinterface"
23-
"go.opentelemetry.io/otel"
2423
)
2524

2625
const (
@@ -60,8 +59,6 @@ func CreateIGContainerWatcher(k8sClient *k8sinterface.KubernetesApi, relevancyMa
6059
}
6160

6261
func (ch *IGContainerWatcher) Start(ctx context.Context) error {
63-
ctx, span := otel.Tracer("").Start(ctx, "IGContainerWatcher.Start")
64-
defer span.End()
6562

6663
ch.relevancyManager.SetContainerHandler(ch)
6764
ch.relevancyManager.StartRelevancyManager(ctx)
@@ -139,7 +136,7 @@ func (ch *IGContainerWatcher) Start(ctx context.Context) error {
139136
}
140137
if event.Ret > -1 {
141138
ch.workerPool.Submit(func() {
142-
ch.relevancyManager.ReportFileAccess(ctx, event.Namespace, event.Pod, event.Container, event.Path)
139+
ch.relevancyManager.ReportFileAccess(ctx, event.Namespace, event.Pod, event.Container, event.FullPath)
143140
})
144141
}
145142
}
@@ -166,7 +163,7 @@ func (ch *IGContainerWatcher) Start(ctx context.Context) error {
166163
}
167164

168165
// Create the exec tracer
169-
ch.tracerOpen, err = traceropen.NewTracer(&traceropen.Config{MountnsMap: openMountnsmap}, ch.containerCollection, openEventCallback)
166+
ch.tracerOpen, err = traceropen.NewTracer(&traceropen.Config{MountnsMap: openMountnsmap, FullPath: true}, ch.containerCollection, openEventCallback)
170167
if err != nil {
171168
return fmt.Errorf("error creating tracerOpen: %s\n", err)
172169
}
@@ -199,8 +196,6 @@ func (ch *IGContainerWatcher) Stop() {
199196
}
200197

201198
func (ch *IGContainerWatcher) UnregisterContainer(ctx context.Context, container *containercollection.Container) {
202-
_, span := otel.Tracer("").Start(ctx, "IGContainerWatcher.UnregisterContainer")
203-
defer span.End()
204199

205200
event := containercollection.PubSubEvent{
206201
Timestamp: time.Now().Format(time.RFC3339),

pkg/filehandler/file_handler_interface.go renamed to pkg/filehandler/interface.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package filehandler
22

3-
import "context"
3+
import (
4+
"context"
5+
)
46

57
type FileHandler interface {
68
AddFile(ctx context.Context, bucket, file string) error

pkg/filehandler/v1/file_handler.go renamed to pkg/filehandler/v1/boltdb.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"github.com/kubescape/go-logger"
99
"github.com/kubescape/go-logger/helpers"
1010
bolt "go.etcd.io/bbolt"
11-
"go.opentelemetry.io/otel"
1211
)
1312

1413
type BoltFileHandler struct {
@@ -26,8 +25,6 @@ func CreateBoltFileHandler() (*BoltFileHandler, error) {
2625
}
2726

2827
func (b BoltFileHandler) AddFile(ctx context.Context, bucket, file string) error {
29-
_, span := otel.Tracer("").Start(ctx, "BoltFileHandler.AddFile")
30-
defer span.End()
3128
return b.fileDB.Batch(func(tx *bolt.Tx) error {
3229
b, err := tx.CreateBucketIfNotExists([]byte(bucket))
3330
if err != nil {
@@ -42,8 +39,6 @@ func (b BoltFileHandler) Close() {
4239
}
4340

4441
func (b BoltFileHandler) GetFiles(ctx context.Context, container string) (map[string]bool, error) {
45-
_, span := otel.Tracer("").Start(ctx, "BoltFileHandler.GetFiles")
46-
defer span.End()
4742
fileList := make(map[string]bool)
4843
err := b.fileDB.View(func(tx *bolt.Tx) error {
4944
b := tx.Bucket([]byte(container))
@@ -60,8 +55,6 @@ func (b BoltFileHandler) GetFiles(ctx context.Context, container string) (map[st
6055
}
6156

6257
func (b BoltFileHandler) RemoveBucket(ctx context.Context, bucket string) error {
63-
_, span := otel.Tracer("").Start(ctx, "BoltFileHandler.RemoveBucket")
64-
defer span.End()
6558
return b.fileDB.Update(func(tx *bolt.Tx) error {
6659
err := tx.DeleteBucket([]byte(bucket))
6760
if err != nil {

pkg/filehandler/v1/inmemory.go

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
package filehandler
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"node-agent/pkg/filehandler"
7+
"sync"
8+
)
9+
10+
const initFileListLength = 5000
11+
12+
type InMemoryFileHandler struct {
13+
mutex sync.RWMutex
14+
m map[string]*sync.RWMutex
15+
files map[string]map[string]bool
16+
}
17+
18+
var _ filehandler.FileHandler = (*InMemoryFileHandler)(nil)
19+
20+
func CreateInMemoryFileHandler() (*InMemoryFileHandler, error) {
21+
return &InMemoryFileHandler{
22+
m: make(map[string]*sync.RWMutex),
23+
files: make(map[string]map[string]bool, 20),
24+
}, nil
25+
}
26+
27+
func (s *InMemoryFileHandler) AddFile(ctx context.Context, bucket, file string) error {
28+
29+
// Acquire a read lock first
30+
s.mutex.RLock()
31+
bucketLock, ok := s.m[bucket]
32+
bucketFiles, okF := s.files[bucket]
33+
s.mutex.RUnlock()
34+
35+
// If the bucket doesn't exist, acquire a write lock to create the new bucket
36+
if !ok || !okF {
37+
s.mutex.Lock()
38+
// Double-check the bucket's existence to ensure another goroutine didn't already create it
39+
bucketLock, ok = s.m[bucket]
40+
if !ok {
41+
bucketLock = &sync.RWMutex{}
42+
s.m[bucket] = bucketLock
43+
}
44+
45+
bucketFiles, okF = s.files[bucket]
46+
if !okF {
47+
bucketFiles = make(map[string]bool, initFileListLength)
48+
s.files[bucket] = bucketFiles
49+
}
50+
s.mutex.Unlock()
51+
}
52+
53+
// Acquire a write lock if the bucket already exists
54+
bucketLock.Lock()
55+
defer bucketLock.Unlock()
56+
57+
bucketFiles[file] = true
58+
59+
return nil
60+
}
61+
62+
func (s *InMemoryFileHandler) Close() {
63+
// Nothing to do
64+
}
65+
66+
func shallowCopyMapStringBool(m map[string]bool) map[string]bool {
67+
if m == nil {
68+
return nil
69+
}
70+
mCopy := make(map[string]bool, len(m))
71+
for k, v := range m {
72+
mCopy[k] = v
73+
}
74+
return mCopy
75+
}
76+
77+
func (s *InMemoryFileHandler) GetFiles(ctx context.Context, bucket string) (map[string]bool, error) {
78+
s.mutex.RLock()
79+
bucketLock, ok := s.m[bucket]
80+
bucketFiles, okFiles := s.files[bucket]
81+
s.mutex.RUnlock()
82+
83+
if !ok || !okFiles {
84+
return map[string]bool{}, fmt.Errorf("bucket does not exist for container %s", bucket)
85+
}
86+
87+
bucketLock.RLock()
88+
defer bucketLock.RUnlock()
89+
90+
return shallowCopyMapStringBool(bucketFiles), nil
91+
}
92+
func (s *InMemoryFileHandler) RemoveBucket(ctx context.Context, bucket string) error {
93+
94+
s.mutex.Lock()
95+
bucketLock, ok := s.m[bucket]
96+
if ok {
97+
bucketLock.Lock()
98+
defer bucketLock.Unlock()
99+
}
100+
101+
delete(s.m, bucket)
102+
delete(s.files, bucket)
103+
s.mutex.Unlock()
104+
105+
return nil
106+
}

0 commit comments

Comments
 (0)