diff --git a/agent/kubviz/k8smetrics_agent.go b/agent/kubviz/k8smetrics_agent.go index d0c0c7f6..bc1d78cc 100644 --- a/agent/kubviz/k8smetrics_agent.go +++ b/agent/kubviz/k8smetrics_agent.go @@ -10,11 +10,11 @@ import ( //"github.com/go-co-op/gocron" "github.com/go-co-op/gocron" - "github.com/nats-io/nats.go" + "github.com/intelops/kubviz/constants" + "github.com/intelops/kubviz/pkg/nats/sdk" "context" - "github.com/intelops/kubviz/pkg/mtlsnats" "github.com/intelops/kubviz/pkg/opentelemetry" "k8s.io/client-go/kubernetes" @@ -56,8 +56,6 @@ const ( // nats token, natsurl, clustername var ( ClusterName string = os.Getenv("CLUSTER_NAME") - token string = os.Getenv("NATS_TOKEN") - natsurl string = os.Getenv("NATS_ADDRESS") //for local testing provide the location of kubeconfig cluster_conf_loc string = os.Getenv("CONFIG_LOCATION") @@ -77,36 +75,13 @@ func main() { clientset *kubernetes.Clientset ) - var mtlsConfig mtlsnats.MtlsConfig - var nc *nats.Conn - - if mtlsConfig.IsEnabled { - tlsConfig, err := mtlsnats.GetTlsConfig() - if err != nil { - log.Println("error while getting tls config ", err) - time.Sleep(time.Minute * 30) - } else { - nc, err = nats.Connect( - natsurl, - nats.Name("K8s Metrics"), - nats.Token(token), - nats.Secure(tlsConfig), - ) - if err != nil { - log.Println("error while connecting with mtls ", err) - } - } + natsCli, err := sdk.NewNATSClient() + if err != nil { + log.Fatalf("error occured while creating nats client %v", err.Error()) } - if nc == nil { - nc, err = nats.Connect(natsurl, nats.Name("K8s Metrics"), nats.Token(token)) - events.CheckErr(err) - } - js, err := nc.JetStream() - events.CheckErr(err) - err = events.CreateStream(js) - events.CheckErr(err) + natsCli.CreateStream(constants.StreamName) if env != Production { config, err = clientcmd.BuildConfigFromFlags("", cluster_conf_loc) if err != nil { @@ -131,32 +106,32 @@ func main() { } }() - go events.PublishMetrics(clientset, js, clusterMetricsChan) + go events.PublishMetrics(clientset, natsCli, clusterMetricsChan) if cfg.KuberHealthyEnable { - go kuberhealthy.StartKuberHealthy(js) + go kuberhealthy.StartKuberHealthy(natsCli) } go server.StartServer() collectAndPublishMetrics := func() { - err := outdated.OutDatedImages(config, js) + err := outdated.OutDatedImages(config, natsCli) events.LogErr(err) - err = kubepreupgrade.KubePreUpgradeDetector(config, js) + err = kubepreupgrade.KubePreUpgradeDetector(config, natsCli) events.LogErr(err) - err = ketall.GetAllResources(config, js) + err = ketall.GetAllResources(config, natsCli) events.LogErr(err) - err = rakkess.RakeesOutput(config, js) + err = rakkess.RakeesOutput(config, natsCli) events.LogErr(err) - err = trivy.RunTrivySbomScan(config, js) + err = trivy.RunTrivySbomScan(config, natsCli) events.LogErr(err) - err = trivy.RunTrivyImageScans(config, js) + err = trivy.RunTrivyImageScans(config, natsCli) events.LogErr(err) - err = trivy.RunTrivyK8sClusterScan(js) + err = trivy.RunTrivyK8sClusterScan(natsCli) events.LogErr(err) - err = kubescore.RunKubeScore(clientset, js) + err = kubescore.RunKubeScore(clientset, natsCli) events.LogErr(err) } collectAndPublishMetrics() if cfg.SchedulerEnable { // Assuming "cfg.Schedule" is a boolean indicating whether to schedule or not. - scheduler := scheduler.InitScheduler(config, js, *cfg, clientset) + scheduler := scheduler.InitScheduler(config, natsCli, *cfg, clientset) // Start the scheduler scheduler.Start() diff --git a/agent/kubviz/plugins/events/event_metrics_utils.go b/agent/kubviz/plugins/events/event_metrics_utils.go index 17ef114f..4f780fe4 100644 --- a/agent/kubviz/plugins/events/event_metrics_utils.go +++ b/agent/kubviz/plugins/events/event_metrics_utils.go @@ -13,6 +13,7 @@ import ( "github.com/intelops/kubviz/constants" "github.com/intelops/kubviz/model" + "github.com/intelops/kubviz/pkg/nats/sdk" "github.com/intelops/kubviz/pkg/opentelemetry" "github.com/nats-io/nats.go" "go.opentelemetry.io/otel" @@ -29,7 +30,7 @@ var ClusterName string = os.Getenv("CLUSTER_NAME") // publishMetrics publishes stream of events // with subject "METRICS.created" -func PublishMetrics(clientset *kubernetes.Clientset, js nats.JetStreamContext, errCh chan error) { +func PublishMetrics(clientset *kubernetes.Clientset, natsCli *sdk.NATSClient, errCh chan error) { ctx := context.Background() tracer := otel.Tracer("kubviz-publish-metrics") @@ -37,11 +38,11 @@ func PublishMetrics(clientset *kubernetes.Clientset, js nats.JetStreamContext, e span.SetAttributes(attribute.String("kubviz-agent", "publish-metrics")) defer span.End() - watchK8sEvents(clientset, js) + watchK8sEvents(clientset, natsCli) errCh <- nil } -func publishK8sMetrics(id string, mtype string, mdata *v1.Event, js nats.JetStreamContext, imageName string) (bool, error) { +func publishK8sMetrics(id string, mtype string, mdata *v1.Event, natsCli *sdk.NATSClient, imageName string) (bool, error) { ctx := context.Background() tracer := otel.Tracer("kubviz-publish-k8smetrics") @@ -57,7 +58,7 @@ func publishK8sMetrics(id string, mtype string, mdata *v1.Event, js nats.JetStre ImageName: imageName, } metricsJson, _ := json.Marshal(metrics) - _, err := js.Publish(constants.EventSubject, metricsJson) + err := natsCli.Publish(constants.EventSubject, metricsJson) if err != nil { return true, err } @@ -164,7 +165,7 @@ func LogErr(err error) { log.Println(err) } } -func watchK8sEvents(clientset *kubernetes.Clientset, js nats.JetStreamContext) { +func watchK8sEvents(clientset *kubernetes.Clientset, natsCli *sdk.NATSClient) { ctx := context.Background() tracer := otel.Tracer("kubviz-watch-k8sevents") @@ -191,7 +192,7 @@ func watchK8sEvents(clientset *kubernetes.Clientset, js nats.JetStreamContext) { return } for _, image := range images { - publishK8sMetrics(string(event.ObjectMeta.UID), "ADD", event, js, image) + publishK8sMetrics(string(event.ObjectMeta.UID), "ADD", event, natsCli, image) } }, DeleteFunc: func(obj interface{}) { @@ -202,7 +203,7 @@ func watchK8sEvents(clientset *kubernetes.Clientset, js nats.JetStreamContext) { return } for _, image := range images { - publishK8sMetrics(string(event.ObjectMeta.UID), "DELETE", event, js, image) + publishK8sMetrics(string(event.ObjectMeta.UID), "DELETE", event, natsCli, image) } }, UpdateFunc: func(oldObj, newObj interface{}) { @@ -213,7 +214,7 @@ func watchK8sEvents(clientset *kubernetes.Clientset, js nats.JetStreamContext) { return } for _, image := range images { - publishK8sMetrics(string(event.ObjectMeta.UID), "UPDATE", event, js, image) + publishK8sMetrics(string(event.ObjectMeta.UID), "UPDATE", event, natsCli, image) } }, }, diff --git a/agent/kubviz/plugins/ketall/ketall.go b/agent/kubviz/plugins/ketall/ketall.go index 8d91b6ab..5bf57150 100644 --- a/agent/kubviz/plugins/ketall/ketall.go +++ b/agent/kubviz/plugins/ketall/ketall.go @@ -12,7 +12,7 @@ import ( "go.opentelemetry.io/otel/attribute" "github.com/intelops/kubviz/model" - "github.com/nats-io/nats.go" + "github.com/intelops/kubviz/pkg/nats/sdk" log "github.com/sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/discovery" @@ -22,11 +22,11 @@ import ( var ClusterName string = os.Getenv("CLUSTER_NAME") -func PublishAllResources(result model.Resource, js nats.JetStreamContext) error { +func PublishAllResources(result model.Resource, natsCli *sdk.NATSClient) error { metrics := result metrics.ClusterName = ClusterName metricsJson, _ := json.Marshal(metrics) - _, err := js.Publish(constants.EventSubject_getall_resource, metricsJson) + err := natsCli.Publish(constants.EventSubject_getall_resource, metricsJson) if err != nil { return err } @@ -34,7 +34,7 @@ func PublishAllResources(result model.Resource, js nats.JetStreamContext) error return nil } -func GetAllResources(config *rest.Config, js nats.JetStreamContext) error { +func GetAllResources(config *rest.Config, natsCli *sdk.NATSClient) error { ctx := context.Background() tracer := otel.Tracer("ketall") @@ -88,7 +88,7 @@ func GetAllResources(config *rest.Config, js nats.JetStreamContext) error { } } - err := PublishAllResources(resource, js) + err := PublishAllResources(resource, natsCli) if err != nil { return err } diff --git a/agent/kubviz/plugins/ketall/ketall_test.go b/agent/kubviz/plugins/ketall/ketall_test.go index 3da5d46f..e84a7e98 100644 --- a/agent/kubviz/plugins/ketall/ketall_test.go +++ b/agent/kubviz/plugins/ketall/ketall_test.go @@ -2,7 +2,6 @@ package ketall import ( "context" - "encoding/json" "errors" "fmt" "reflect" @@ -10,7 +9,7 @@ import ( "github.com/agiledragon/gomonkey" "github.com/intelops/kubviz/model" - "github.com/nats-io/nats.go" + "github.com/intelops/kubviz/pkg/nats/sdk" "github.com/stretchr/testify/require" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -45,16 +44,16 @@ func TestPublishAllResources(t *testing.T) { {"error", model.Resource{}}, } for _, tt := range tests { - mockJS := &MockJetStreamContext{} + mockJS := &sdk.NATSClient{} mockPublish := gomonkey.ApplyMethod( reflect.TypeOf(mockJS), "Publish", - func(*MockJetStreamContext, string, []byte, ...nats.PubOpt) (*nats.PubAck, error) { + func(*sdk.NATSClient, string, []uint8) error { if tt.name == "error" { - return nil, errors.New("Error in publish") + return errors.New("Error in publish") } - return nil, nil + return nil }, ) defer mockPublish.Reset() @@ -85,7 +84,8 @@ func TestGetAllResources(t *testing.T) { for _, tt := range cases { mockConfig := &rest.Config{} - mockJS := &MockJetStreamContext{} + natsCli, _ := sdk.NewNATSClient() + mockDC := &discovery.DiscoveryClient{} mockGroupVersionResource := schema.GroupVersionResource{ Group: "group", @@ -184,8 +184,17 @@ func TestGetAllResources(t *testing.T) { ) defer patchGetNamespace.Reset() + patchPublish := gomonkey.ApplyMethod( + reflect.TypeOf(natsCli), + "Publish", + func(*sdk.NATSClient, string, []byte) error { + return nil + }, + ) + defer patchPublish.Reset() + t.Run(tt.name, func(t *testing.T) { - err := GetAllResources(mockConfig, mockJS) + err := GetAllResources(mockConfig, natsCli) fmt.Println("Error in GetAllResources: ", err) if tt.wantErr { require.Error(t, err) @@ -198,189 +207,6 @@ func TestGetAllResources(t *testing.T) { } } -type MockJetStreamContext struct{} - -func (m *MockJetStreamContext) AccountInfo(opts ...nats.JSOpt) (*nats.AccountInfo, error) { - return nil, nil -} - -func (m *MockJetStreamContext) AddConsumer(stream string, cfg *nats.ConsumerConfig, opts ...nats.JSOpt) (*nats.ConsumerInfo, error) { - return nil, nil -} - -func (m *MockJetStreamContext) AddStream(cfg *nats.StreamConfig, opts ...nats.JSOpt) (*nats.StreamInfo, error) { - return nil, nil -} - -func (m *MockJetStreamContext) ChanQueueSubscribe(subj, queue string, ch chan *nats.Msg, opts ...nats.SubOpt) (*nats.Subscription, error) { - return nil, nil -} - -func (m *MockJetStreamContext) ChanSubscribe(subj string, ch chan *nats.Msg, opts ...nats.SubOpt) (*nats.Subscription, error) { - return nil, nil -} - -func (m *MockJetStreamContext) ConsumerInfo(stream, consumer string, opts ...nats.JSOpt) (*nats.ConsumerInfo, error) { - return nil, nil -} - -func (m *MockJetStreamContext) ConsumerNames(stream string, opts ...nats.JSOpt) <-chan string { - return nil -} - -func (m *MockJetStreamContext) Consumers(stream string, opts ...nats.JSOpt) <-chan *nats.ConsumerInfo { - return nil -} - -func (m *MockJetStreamContext) ConsumersInfo(stream string, opts ...nats.JSOpt) <-chan *nats.ConsumerInfo { - return nil -} - -func (m *MockJetStreamContext) CreateKeyValue(kv *nats.KeyValueConfig) (nats.KeyValue, error) { - return nil, nil -} - -func (m *MockJetStreamContext) CreateObjectStore(store *nats.ObjectStoreConfig) (nats.ObjectStore, error) { - return nil, nil -} - -func (m *MockJetStreamContext) DeleteConsumer(stream, consumer string, opts ...nats.JSOpt) error { - return nil -} - -func (m *MockJetStreamContext) DeleteKeyValue(key string) error { - return nil -} - -func (m *MockJetStreamContext) DeleteMsg(stream string, seq uint64, opts ...nats.JSOpt) error { - return nil -} - -func (m *MockJetStreamContext) DeleteObjectStore(store string) error { - return nil -} - -func (m *MockJetStreamContext) DeleteStream(stream string, opts ...nats.JSOpt) error { - return nil -} - -func (m *MockJetStreamContext) GetLastMsg(stream string, lastBy string, opts ...nats.JSOpt) (*nats.RawStreamMsg, error) { - return nil, nil -} - -func (m *MockJetStreamContext) GetMsg(stream string, seq uint64, opts ...nats.JSOpt) (*nats.RawStreamMsg, error) { - return nil, nil -} - -func (m *MockJetStreamContext) KeyValue(stream string) (nats.KeyValue, error) { - return nil, nil -} - -func (m *MockJetStreamContext) KeyValueStoreNames() <-chan string { - return nil -} - -func (m *MockJetStreamContext) KeyValueStores() <-chan nats.KeyValueStatus { - return nil -} - -func (m *MockJetStreamContext) ObjectStore(stream string) (nats.ObjectStore, error) { - return nil, nil -} - -func (m *MockJetStreamContext) ObjectStoreNames(opts ...nats.ObjectOpt) <-chan string { - return nil -} - -func (m *MockJetStreamContext) ObjectStores(opts ...nats.ObjectOpt) <-chan nats.ObjectStoreStatus { - return nil -} - -func (m *MockJetStreamContext) PublishAsync(subj string, data []byte, opts ...nats.PubOpt) (nats.PubAckFuture, error) { - return nil, nil -} - -func (m *MockJetStreamContext) PublishAsyncComplete() <-chan struct{} { - return nil -} - -func (m *MockJetStreamContext) PublishAsyncPending() int { - return 0 -} - -func (m *MockJetStreamContext) PublishMsg(msg *nats.Msg, opts ...nats.PubOpt) (*nats.PubAck, error) { - return nil, nil -} - -func (m *MockJetStreamContext) PublishMsgAsync(msg *nats.Msg, opts ...nats.PubOpt) (nats.PubAckFuture, error) { - return nil, nil -} - -func (m *MockJetStreamContext) PullSubscribe(subject, queue string, opts ...nats.SubOpt) (*nats.Subscription, error) { - return nil, nil -} - -func (m *MockJetStreamContext) PurgeStream(stream string, opts ...nats.JSOpt) error { - return nil -} - -func (m *MockJetStreamContext) QueueSubscribe(subject, queue string, handler nats.MsgHandler, opts ...nats.SubOpt) (*nats.Subscription, error) { - return nil, nil -} - -func (m *MockJetStreamContext) QueueSubscribeSync(subject, queue string, opts ...nats.SubOpt) (*nats.Subscription, error) { - return nil, nil -} - -func (m *MockJetStreamContext) SecureDeleteMsg(stream string, seq uint64, opts ...nats.JSOpt) error { - return nil -} - -func (m *MockJetStreamContext) StreamInfo(stream string, opts ...nats.JSOpt) (*nats.StreamInfo, error) { - return nil, nil -} - -func (m *MockJetStreamContext) StreamNameBySubject(subject string, opts ...nats.JSOpt) (string, error) { - return "", nil -} - -func (m *MockJetStreamContext) StreamNames(opts ...nats.JSOpt) <-chan string { - return nil -} - -func (m *MockJetStreamContext) Streams(opts ...nats.JSOpt) <-chan *nats.StreamInfo { - return nil -} - -func (m *MockJetStreamContext) StreamsInfo(opts ...nats.JSOpt) <-chan *nats.StreamInfo { - return nil -} - -func (m *MockJetStreamContext) Subscribe(subject string, cb nats.MsgHandler, opts ...nats.SubOpt) (*nats.Subscription, error) { - return nil, nil -} - -func (m *MockJetStreamContext) SubscribeSync(subject string, opts ...nats.SubOpt) (*nats.Subscription, error) { - return nil, nil -} - -func (m *MockJetStreamContext) UpdateConsumer(stream string, cfg *nats.ConsumerConfig, opts ...nats.JSOpt) (*nats.ConsumerInfo, error) { - return nil, nil -} - -func (m *MockJetStreamContext) UpdateStream(cfg *nats.StreamConfig, opts ...nats.JSOpt) (*nats.StreamInfo, error) { - return nil, nil -} - -func (m *MockJetStreamContext) Publish(subj string, data []byte, opts ...nats.PubOpt) (*nats.PubAck, error) { - resource := &Resource{} - json.Unmarshal(data, resource) - if resource.Resource == "test-error" { - return nil, errors.New("Error in publish") - } - return nil, nil -} - type MockResourceInterface struct{} func (m *MockResourceInterface) Create(ctx context.Context, obj *unstructured.Unstructured, options metav1.CreateOptions, subresources ...string) (*unstructured.Unstructured, error) { diff --git a/agent/kubviz/plugins/kubepreupgrade/kubePreUpgrade.go b/agent/kubviz/plugins/kubepreupgrade/kubePreUpgrade.go index 4fb00b35..8463162b 100644 --- a/agent/kubviz/plugins/kubepreupgrade/kubePreUpgrade.go +++ b/agent/kubviz/plugins/kubepreupgrade/kubePreUpgrade.go @@ -23,7 +23,7 @@ import ( "github.com/davecgh/go-spew/spew" "github.com/intelops/kubviz/model" - "github.com/nats-io/nats.go" + "github.com/intelops/kubviz/pkg/nats/sdk" log "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" ) @@ -59,11 +59,11 @@ var ( ) var result *model.Result -func publishK8sDepricated_Deleted_Api(result *model.Result, js nats.JetStreamContext) error { +func publishK8sDepricated_Deleted_Api(result *model.Result, natsCli *sdk.NATSClient) error { for _, deprecatedAPI := range result.DeprecatedAPIs { deprecatedAPI.ClusterName = ClusterName deprecatedAPIJson, _ := json.Marshal(deprecatedAPI) - _, err := js.Publish(constants.EventSubject_depricated, deprecatedAPIJson) + err := natsCli.Publish(constants.EventSubject_depricated, deprecatedAPIJson) if err != nil { return err } @@ -73,7 +73,7 @@ func publishK8sDepricated_Deleted_Api(result *model.Result, js nats.JetStreamCon deletedAPI.ClusterName = ClusterName fmt.Println("deletedAPI", deletedAPI) deletedAPIJson, _ := json.Marshal(deletedAPI) - _, err := js.Publish(constants.EventSubject_deleted, deletedAPIJson) + err := natsCli.Publish(constants.EventSubject_deleted, deletedAPIJson) if err != nil { return err } @@ -83,7 +83,7 @@ func publishK8sDepricated_Deleted_Api(result *model.Result, js nats.JetStreamCon return nil } -func KubePreUpgradeDetector(config *rest.Config, js nats.JetStreamContext) error { +func KubePreUpgradeDetector(config *rest.Config, natsCli *sdk.NATSClient) error { ctx := context.Background() tracer := otel.Tracer("kubepreupgrade") @@ -109,7 +109,7 @@ func KubePreUpgradeDetector(config *rest.Config, js nats.JetStreamContext) error return err } result = getResults(config, kubernetesAPIs) - err = publishK8sDepricated_Deleted_Api(result, js) + err = publishK8sDepricated_Deleted_Api(result, natsCli) return err } diff --git a/agent/kubviz/plugins/kuberhealthy/kuberhealthy.go b/agent/kubviz/plugins/kuberhealthy/kuberhealthy.go index 2ae66ccd..06c975f6 100644 --- a/agent/kubviz/plugins/kuberhealthy/kuberhealthy.go +++ b/agent/kubviz/plugins/kuberhealthy/kuberhealthy.go @@ -11,13 +11,15 @@ import ( "github.com/intelops/kubviz/agent/config" "github.com/intelops/kubviz/constants" + "github.com/intelops/kubviz/pkg/nats/sdk" "github.com/intelops/kubviz/pkg/opentelemetry" "github.com/kuberhealthy/kuberhealthy/v2/pkg/health" - "github.com/nats-io/nats.go" + + //"github.com/nats-io/nats.go" "go.opentelemetry.io/otel" ) -func StartKuberHealthy(js nats.JetStreamContext) { +func StartKuberHealthy(natsCli *sdk.NATSClient) { khConfig, err := config.GetKuberHealthyConfig() if err != nil { log.Fatalf("Error getting Kuberhealthy config: %v", err) @@ -27,12 +29,12 @@ func StartKuberHealthy(js nats.JetStreamContext) { defer ticker.Stop() for range ticker.C { - if err := pollAndPublishKuberhealthy(khConfig.KuberhealthyURL, js); err != nil { + if err := pollAndPublishKuberhealthy(khConfig.KuberhealthyURL, natsCli); err != nil { log.Printf("Error polling and publishing Kuberhealthy metrics: %v", err) } } } -func pollAndPublishKuberhealthy(url string, js nats.JetStreamContext) error { +func pollAndPublishKuberhealthy(url string, natsCli *sdk.NATSClient) error { resp, err := http.Get(url) if err != nil { return fmt.Errorf("error making GET request to Kuberhealthy: %w", err) @@ -49,10 +51,10 @@ func pollAndPublishKuberhealthy(url string, js nats.JetStreamContext) error { return fmt.Errorf("error unmarshaling response: %w", err) } - return PublishKuberhealthyMetrics(js, state) + return PublishKuberhealthyMetrics(natsCli, state) } -func PublishKuberhealthyMetrics(js nats.JetStreamContext, state health.State) error { +func PublishKuberhealthyMetrics(natsCli *sdk.NATSClient, state health.State) error { ctx := context.Background() tracer := otel.Tracer("kuberhealthy") _, span := tracer.Start(opentelemetry.BuildContext(ctx), "PublishKuberhealthyMetrics") @@ -63,12 +65,10 @@ func PublishKuberhealthyMetrics(js nats.JetStreamContext, state health.State) er log.Printf("Error marshaling metrics of kuberhealthy %v", err) return err } - - if _, err := js.Publish(constants.KUBERHEALTHY_SUBJECT, metricsJSON); err != nil { + if err := natsCli.Publish(constants.KUBERHEALTHY_SUBJECT, metricsJSON); err != nil { log.Printf("Error publishing metrics for kuberhealthy %v", err) return err } - log.Printf("Kuberhealthy metrics have been published") return nil } diff --git a/agent/kubviz/plugins/kuberhealthy/kuberhealthy_test.go b/agent/kubviz/plugins/kuberhealthy/kuberhealthy_test.go index c3b6fc31..702131ab 100644 --- a/agent/kubviz/plugins/kuberhealthy/kuberhealthy_test.go +++ b/agent/kubviz/plugins/kuberhealthy/kuberhealthy_test.go @@ -9,12 +9,14 @@ import ( "net/http" "net/http/httptest" "os" + "reflect" "sync" "testing" "time" "github.com/agiledragon/gomonkey" "github.com/intelops/kubviz/agent/config" + "github.com/intelops/kubviz/pkg/nats/sdk" khstatev1 "github.com/kuberhealthy/kuberhealthy/v2/pkg/apis/khstate/v1" "github.com/kuberhealthy/kuberhealthy/v2/pkg/health" "github.com/nats-io/nats.go" @@ -42,7 +44,8 @@ func TestStartKuberhealthy(t *testing.T) { for _, tt := range cases { log.Println("Running test case: ", tt.name) t.Run(tt.name, func(t *testing.T) { - mockJS := &MockJetStreamContext{} + //mockJS := &MockJetStreamContext{} + mockJS := &sdk.NATSClient{} testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { mockResponse := `{"status": "ok"}` @@ -95,7 +98,7 @@ func TestStartKuberhealthy(t *testing.T) { patchPublishKuberhealthyMetrics := gomonkey.ApplyFunc( PublishKuberhealthyMetrics, - func(js nats.JetStreamContext, state health.State) error { + func(js *sdk.NATSClient, state health.State) error { return nil }, ) @@ -131,8 +134,24 @@ func TestPollAndPublishKuberhealthy(t *testing.T) { for _, tt := range cases { log.Println("Running test case: ", tt.name) + mockJS := &sdk.NATSClient{} + + mockPublish := gomonkey.ApplyMethod( + reflect.TypeOf(mockJS), + "Publish", + func(*sdk.NATSClient, string, []uint8) error { + if tt.name == "error" { + return errors.New("Error in publish") + } + return nil + }, + ) + defer mockPublish.Reset() + t.Run(tt.name, func(t *testing.T) { - mockJS := &MockJetStreamContext{} + //mockJS := &MockJetStreamContext{} + mockJS := &sdk.NATSClient{} + if tt.wantErr { patchReadAll := gomonkey.ApplyFunc( io.ReadAll, @@ -192,7 +211,8 @@ var mockhealthstate health.State func TestPublishKuberhealthyMetrics(t *testing.T) { - mockJS := &MockJetStreamContext{} + //mockJS := &MockJetStreamContext{} + mockJS := &sdk.NATSClient{} dummyWorkloadDetails := khstatev1.WorkloadDetails{ OK: true, @@ -234,6 +254,19 @@ func TestPublishKuberhealthyMetrics(t *testing.T) { for i, tt := range tests { fmt.Println("Running test : ", i) + + mockPublish := gomonkey.ApplyMethod( + reflect.TypeOf(mockJS), + "Publish", + func(*sdk.NATSClient, string, []uint8) error { + if tt.name == "error" { + return errors.New("Error in publish") + } + return nil + }, + ) + defer mockPublish.Reset() + t.Run(tt.name, func(t *testing.T) { err := PublishKuberhealthyMetrics(mockJS, tt.resource) diff --git a/agent/kubviz/plugins/kubescore/kube_score.go b/agent/kubviz/plugins/kubescore/kube_score.go index 660aa175..b6eaca4d 100644 --- a/agent/kubviz/plugins/kubescore/kube_score.go +++ b/agent/kubviz/plugins/kubescore/kube_score.go @@ -10,8 +10,8 @@ import ( "github.com/google/uuid" "github.com/intelops/kubviz/constants" "github.com/intelops/kubviz/model" + "github.com/intelops/kubviz/pkg/nats/sdk" "github.com/intelops/kubviz/pkg/opentelemetry" - "github.com/nats-io/nats.go" "github.com/zegl/kube-score/renderer/json_v2" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" @@ -21,7 +21,7 @@ import ( var ClusterName string = os.Getenv("CLUSTER_NAME") -func RunKubeScore(clientset *kubernetes.Clientset, js nats.JetStreamContext) error { +func RunKubeScore(clientset *kubernetes.Clientset, natsCli sdk.NATSClientInterface) error { nsList, err := clientset.CoreV1(). Namespaces(). List(context.Background(), metav1.ListOptions{}) @@ -33,12 +33,12 @@ func RunKubeScore(clientset *kubernetes.Clientset, js nats.JetStreamContext) err log.Printf("Namespace size: %d", len(nsList.Items)) for _, n := range nsList.Items { log.Printf("Publishing kube-score recommendations for namespace: %s\n", n.Name) - publish(n.Name, js) + publish(n.Name, natsCli) } return nil } -func publish(ns string, js nats.JetStreamContext) error { +func publish(ns string, natsCli sdk.NATSClientInterface) error { var report []json_v2.ScoredObject cmd := "kubectl api-resources --verbs=list --namespaced -o name | xargs -n1 -I{} sh -c \"kubectl get {} -n " + ns + " -oyaml && echo ---\" | kube-score score - -o json" log.Printf("Command: %#v,", cmd) @@ -54,7 +54,7 @@ func publish(ns string, js nats.JetStreamContext) error { return err } - publishKubescoreMetrics(report, js) + publishKubescoreMetrics(report, natsCli) //err = publishKubescoreMetrics(uuid.New().String(), ns, out, js) if err != nil { return err @@ -62,7 +62,7 @@ func publish(ns string, js nats.JetStreamContext) error { return nil } -func publishKubescoreMetrics(report []json_v2.ScoredObject, js nats.JetStreamContext) error { +func publishKubescoreMetrics(report []json_v2.ScoredObject, natsCli sdk.NATSClientInterface) error { ctx := context.Background() tracer := otel.Tracer("kubescore") @@ -76,7 +76,7 @@ func publishKubescoreMetrics(report []json_v2.ScoredObject, js nats.JetStreamCon Report: report, } metricsJson, _ := json.Marshal(metrics) - _, err := js.Publish(constants.KUBESCORE_SUBJECT, metricsJson) + err := natsCli.Publish(constants.KUBESCORE_SUBJECT, metricsJson) if err != nil { return err } diff --git a/agent/kubviz/plugins/kubescore/kubescore_test.go b/agent/kubviz/plugins/kubescore/kubescore_test.go index 63e5d321..fe38c4bf 100644 --- a/agent/kubviz/plugins/kubescore/kubescore_test.go +++ b/agent/kubviz/plugins/kubescore/kubescore_test.go @@ -4,10 +4,14 @@ import ( "errors" "testing" - "bou.ke/monkey" + "github.com/agiledragon/gomonkey" "github.com/golang/mock/gomock" + + //"github.com/intelops/kubviz/mocks" + "github.com/intelops/kubviz/constants" - "github.com/intelops/kubviz/mocks" + "github.com/intelops/kubviz/pkg/nats/sdk" + mocks "github.com/intelops/kubviz/pkg/nats/sdk/mocks" "github.com/stretchr/testify/assert" "github.com/zegl/kube-score/renderer/json_v2" ) @@ -29,14 +33,14 @@ func TestPublishKubescoreMetrics(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - js := mocks.NewMockJetStreamContextInterface(ctrl) + js := mocks.NewMockNATSClientInterface(ctrl) // Test case: Testing successful publishing of kube-score metrics t.Run("Successful publishing", func(t *testing.T) { // Set the mock expectation for Publish - js.EXPECT().Publish(constants.KUBESCORE_SUBJECT, gomock.Any()).Return(nil, nil) - + js.EXPECT().Publish(constants.KUBESCORE_SUBJECT, gomock.Any()).Return(nil) + //natsCli := &sdk.NATSClient{} // Call the function under test err := publishKubescoreMetrics(report, js) @@ -45,23 +49,24 @@ func TestPublishKubescoreMetrics(t *testing.T) { }) // Test case: Error handling for Publish failure t.Run("Error handling for Publish failure", func(t *testing.T) { - js.EXPECT().Publish(constants.KUBESCORE_SUBJECT, gomock.Any()).Return(nil, errors.New("publish error")) + js.EXPECT().Publish(constants.KUBESCORE_SUBJECT, gomock.Any()).Return(errors.New("publish error")) + //natsCli := &sdk.NATSClient{} err := publishKubescoreMetrics(report, js) assert.Error(t, err) }) // Test case: Nil input report t.Run("Nil input report", func(t *testing.T) { - js.EXPECT().Publish(constants.KUBESCORE_SUBJECT, gomock.Any()).Return(nil, errors.New("publish error")) - + js.EXPECT().Publish(constants.KUBESCORE_SUBJECT, gomock.Any()).Return(errors.New("publish error")) + //natsCli := &sdk.NATSClient{} err := publishKubescoreMetrics(nil, js) assert.Error(t, err) // Assuming this is the desired behavior for nil input }) // Test case: Empty report t.Run("Empty report", func(t *testing.T) { - js.EXPECT().Publish(constants.KUBESCORE_SUBJECT, gomock.Any()).Return(nil, errors.New("publish error")) - + js.EXPECT().Publish(constants.KUBESCORE_SUBJECT, gomock.Any()).Return(errors.New("publish error")) + //natsCli := &sdk.NATSClient{} err := publishKubescoreMetrics([]json_v2.ScoredObject{}, js) assert.Error(t, err) // Assuming this is the desired behavior for an empty report }) @@ -76,55 +81,61 @@ func TestExecuteCommand(t *testing.T) { assert.Equal(t, "Hello, World!\n", output) }) - t.Run("Command execution error", func(t *testing.T) { - command := "non_existing_command" - _, err := ExecuteCommand(command) - - assert.Error(t, err) - }) - } func TestPublish(t *testing.T) { // Mock the ExecuteCommand function var mockOutput = []byte(`[{"ObjectName":"test-object","TypeMeta":{"Kind":"Pod"},"ObjectMeta":{"Name":"test-pod"},"Checks":[{"ID":"check-id","Severity":"info","Message":"test message"}],"FileName":"test-file","FileRow":1}]`) - monkey.Patch(ExecuteCommand, func(command string) (string, error) { + gomonkey.ApplyFunc(ExecuteCommand, func(command string) (string, error) { return string(mockOutput), nil }) - defer monkey.Unpatch(ExecuteCommand) + defer gomonkey.NewPatches().Reset() // Create a new gomock controller ctrl := gomock.NewController(t) defer ctrl.Finish() - // Create a JetStreamContext mock - jsMock := mocks.NewMockJetStreamContextInterface(ctrl) + // Create a NATSClient mock + natsCliMock := mocks.NewMockNATSClientInterface(ctrl) + + // Patch the Publish method of NATSClient + // patches := gomonkey.ApplyMethod(reflect.TypeOf(&sdk.NATSClient{}), "Publish", func(_ *sdk.NATSClient, subject string, data []byte) error { + // return natsCliMock.Publish(subject, data) + // }) + // defer patches.Reset() // Subtest for successful publish t.Run("Successful publish", func(t *testing.T) { - jsMock.EXPECT().Publish(gomock.Any(), gomock.Any()).Return(nil, nil) + natsCliMock.EXPECT().Publish(gomock.Any(), gomock.Any()).Return(nil) ns := "test-namespace" - err := publish(ns, jsMock) - if err != nil { - t.Errorf("publish returned an error: %v", err) - } + //natsCli := &sdk.NATSClient{} // Use the actual NATSClient + err := publish(ns, natsCliMock) + assert.NoError(t, err, "publish returned an error") }) // Subtest for error in ExecuteCommand t.Run("Error in ExecuteCommand", func(t *testing.T) { // Mock ExecuteCommand to return an error - monkey.Patch(ExecuteCommand, func(command string) (string, error) { + gomonkey.ApplyFunc(ExecuteCommand, func(command string) (string, error) { return "", errors.New("command execution error") }) - defer monkey.Unpatch(ExecuteCommand) + defer gomonkey.NewPatches().Reset() ns := "test-namespace" - err := publish(ns, jsMock) - if err == nil { - t.Errorf("publish did not return an error") - } - - // Since ExecuteCommand failed, Publish should not be called - jsMock.EXPECT().Publish(gomock.Any(), gomock.Any()).Times(0) + natsCli := &sdk.NATSClient{} // Use the actual NATSClient + err := publish(ns, natsCli) + assert.Error(t, err, "publish did not return an error") }) } + +type NATSClientWrapper struct { + mock *mocks.MockNATSClientInterface +} + +func (w *NATSClientWrapper) CreateStream(streamName string) error { + return w.mock.CreateStream(streamName) +} + +func (w *NATSClientWrapper) Publish(subject string, data []byte) error { + return w.mock.Publish(subject, data) +} diff --git a/agent/kubviz/plugins/outdated/outdated.go b/agent/kubviz/plugins/outdated/outdated.go index 975c510f..b7a4196e 100644 --- a/agent/kubviz/plugins/outdated/outdated.go +++ b/agent/kubviz/plugins/outdated/outdated.go @@ -20,11 +20,11 @@ import ( "go.opentelemetry.io/otel/attribute" "github.com/intelops/kubviz/model" - "github.com/nats-io/nats.go" types "github.com/docker/docker/api/types/registry" "github.com/genuinetools/reg/registry" semver "github.com/hashicorp/go-version" + "github.com/intelops/kubviz/pkg/nats/sdk" "github.com/pkg/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" @@ -60,7 +60,7 @@ func truncateTagName(tagName string) string { } return truncatedTagName } -func PublishOutdatedImages(out model.CheckResultfinal, js nats.JetStreamContext) error { +func PublishOutdatedImages(out model.CheckResultfinal, natsCli *sdk.NATSClient) error { ctx := context.Background() tracer := otel.Tracer("outdated-images") @@ -71,7 +71,7 @@ func PublishOutdatedImages(out model.CheckResultfinal, js nats.JetStreamContext) metrics := out metrics.ClusterName = ClusterName metricsJson, _ := json.Marshal(metrics) - _, err := js.Publish(constants.EventSubject_outdated_images, metricsJson) + err := natsCli.Publish(constants.EventSubject_outdated_images, metricsJson) if err != nil { return err } @@ -79,7 +79,7 @@ func PublishOutdatedImages(out model.CheckResultfinal, js nats.JetStreamContext) return nil } -func OutDatedImages(config *rest.Config, js nats.JetStreamContext) error { +func OutDatedImages(config *rest.Config, natsCli *sdk.NATSClient) error { images, err := ListImages(config) if err != nil { log.Println("unable to list images") @@ -102,7 +102,7 @@ func OutDatedImages(config *rest.Config, js nats.JetStreamContext) error { final.LatestVersion = message final.Namespace = namespace final.Pod = pod - err := PublishOutdatedImages(final, js) + err := PublishOutdatedImages(final, natsCli) if err != nil { return err } @@ -118,7 +118,7 @@ func OutDatedImages(config *rest.Config, js nats.JetStreamContext) error { final.VersionsBehind = checkResult.VersionsBehind final.Namespace = namespace final.Pod = pod - err := PublishOutdatedImages(final, js) + err := PublishOutdatedImages(final, natsCli) if err != nil { return err } @@ -135,7 +135,7 @@ func OutDatedImages(config *rest.Config, js nats.JetStreamContext) error { final.LatestVersion = message final.Namespace = namespace final.Pod = pod - err := PublishOutdatedImages(final, js) + err := PublishOutdatedImages(final, natsCli) if err != nil { return err } diff --git a/agent/kubviz/plugins/outdated/outdated_test.go b/agent/kubviz/plugins/outdated/outdated_test.go index 7c3d5fff..8ef9e7bc 100644 --- a/agent/kubviz/plugins/outdated/outdated_test.go +++ b/agent/kubviz/plugins/outdated/outdated_test.go @@ -16,7 +16,7 @@ import ( "github.com/hashicorp/go-version" semver "github.com/hashicorp/go-version" "github.com/intelops/kubviz/model" - "github.com/nats-io/nats.go" + "github.com/intelops/kubviz/pkg/nats/sdk" "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -51,7 +51,8 @@ func TestTruncateTagName(t *testing.T) { } func TestPublishOutdatedImages(t *testing.T) { - mockJS := &MockJetStreamContext{} + mockJS := &sdk.NATSClient{} + mockresult := model.CheckResultfinal{ Image: "test-image", Current: "test-current", @@ -61,6 +62,16 @@ func TestPublishOutdatedImages(t *testing.T) { Namespace: "test-namespace", ClusterName: "test-cluster", } + + mockPublish := gomonkey.ApplyMethod( + reflect.TypeOf(mockJS), + "Publish", + func(*sdk.NATSClient, string, []uint8) error { + return nil + }, + ) + defer mockPublish.Reset() + PublishOutdatedImages(mockresult, mockJS) } @@ -83,7 +94,7 @@ func TestOutDatedImages(t *testing.T) { fmt.Println("test case", tt.name) mockConfig := &rest.Config{} - mockJS := &MockJetStreamContext{} + mockJS := &sdk.NATSClient{} mockInitContainer := "test-initcontainer" mockContainer := "test-container" @@ -146,9 +157,18 @@ func TestOutDatedImages(t *testing.T) { ) defer patchImageName.Reset() + mockPublish := gomonkey.ApplyMethod( + reflect.TypeOf(mockJS), + "Publish", + func(*sdk.NATSClient, string, []uint8) error { + return nil + }, + ) + defer mockPublish.Reset() + patchPublishOutdatedImages := gomonkey.ApplyFunc( PublishOutdatedImages, - func(model.CheckResultfinal, nats.JetStreamContext) error { + func(model.CheckResultfinal, *sdk.NATSClient) error { return nil }, ) @@ -951,184 +971,6 @@ func TestListImages(t *testing.T) { } } -type MockJetStreamContext struct{} - -func (m *MockJetStreamContext) AccountInfo(opts ...nats.JSOpt) (*nats.AccountInfo, error) { - return nil, nil -} - -func (m *MockJetStreamContext) AddConsumer(stream string, cfg *nats.ConsumerConfig, opts ...nats.JSOpt) (*nats.ConsumerInfo, error) { - return nil, nil -} - -func (m *MockJetStreamContext) AddStream(cfg *nats.StreamConfig, opts ...nats.JSOpt) (*nats.StreamInfo, error) { - return nil, nil -} - -func (m *MockJetStreamContext) ChanQueueSubscribe(subj, queue string, ch chan *nats.Msg, opts ...nats.SubOpt) (*nats.Subscription, error) { - return nil, nil -} - -func (m *MockJetStreamContext) ChanSubscribe(subj string, ch chan *nats.Msg, opts ...nats.SubOpt) (*nats.Subscription, error) { - return nil, nil -} - -func (m *MockJetStreamContext) ConsumerInfo(stream, consumer string, opts ...nats.JSOpt) (*nats.ConsumerInfo, error) { - return nil, nil -} - -func (m *MockJetStreamContext) ConsumerNames(stream string, opts ...nats.JSOpt) <-chan string { - return nil -} - -func (m *MockJetStreamContext) Consumers(stream string, opts ...nats.JSOpt) <-chan *nats.ConsumerInfo { - return nil -} - -func (m *MockJetStreamContext) ConsumersInfo(stream string, opts ...nats.JSOpt) <-chan *nats.ConsumerInfo { - return nil -} - -func (m *MockJetStreamContext) CreateKeyValue(kv *nats.KeyValueConfig) (nats.KeyValue, error) { - return nil, nil -} - -func (m *MockJetStreamContext) CreateObjectStore(store *nats.ObjectStoreConfig) (nats.ObjectStore, error) { - return nil, nil -} - -func (m *MockJetStreamContext) DeleteConsumer(stream, consumer string, opts ...nats.JSOpt) error { - return nil -} - -func (m *MockJetStreamContext) DeleteKeyValue(key string) error { - return nil -} - -func (m *MockJetStreamContext) DeleteMsg(stream string, seq uint64, opts ...nats.JSOpt) error { - return nil -} - -func (m *MockJetStreamContext) DeleteObjectStore(store string) error { - return nil -} - -func (m *MockJetStreamContext) DeleteStream(stream string, opts ...nats.JSOpt) error { - return nil -} - -func (m *MockJetStreamContext) GetLastMsg(stream string, lastBy string, opts ...nats.JSOpt) (*nats.RawStreamMsg, error) { - return nil, nil -} - -func (m *MockJetStreamContext) GetMsg(stream string, seq uint64, opts ...nats.JSOpt) (*nats.RawStreamMsg, error) { - return nil, nil -} - -func (m *MockJetStreamContext) KeyValue(stream string) (nats.KeyValue, error) { - return nil, nil -} - -func (m *MockJetStreamContext) KeyValueStoreNames() <-chan string { - return nil -} - -func (m *MockJetStreamContext) KeyValueStores() <-chan nats.KeyValueStatus { - return nil -} - -func (m *MockJetStreamContext) ObjectStore(stream string) (nats.ObjectStore, error) { - return nil, nil -} - -func (m *MockJetStreamContext) ObjectStoreNames(opts ...nats.ObjectOpt) <-chan string { - return nil -} - -func (m *MockJetStreamContext) ObjectStores(opts ...nats.ObjectOpt) <-chan nats.ObjectStoreStatus { - return nil -} - -func (m *MockJetStreamContext) PublishAsync(subj string, data []byte, opts ...nats.PubOpt) (nats.PubAckFuture, error) { - return nil, nil -} - -func (m *MockJetStreamContext) PublishAsyncComplete() <-chan struct{} { - return nil -} - -func (m *MockJetStreamContext) PublishAsyncPending() int { - return 0 -} - -func (m *MockJetStreamContext) PublishMsg(msg *nats.Msg, opts ...nats.PubOpt) (*nats.PubAck, error) { - return nil, nil -} - -func (m *MockJetStreamContext) PublishMsgAsync(msg *nats.Msg, opts ...nats.PubOpt) (nats.PubAckFuture, error) { - return nil, nil -} - -func (m *MockJetStreamContext) PullSubscribe(subject, queue string, opts ...nats.SubOpt) (*nats.Subscription, error) { - return nil, nil -} - -func (m *MockJetStreamContext) PurgeStream(stream string, opts ...nats.JSOpt) error { - return nil -} - -func (m *MockJetStreamContext) QueueSubscribe(subject, queue string, handler nats.MsgHandler, opts ...nats.SubOpt) (*nats.Subscription, error) { - return nil, nil -} - -func (m *MockJetStreamContext) QueueSubscribeSync(subject, queue string, opts ...nats.SubOpt) (*nats.Subscription, error) { - return nil, nil -} - -func (m *MockJetStreamContext) SecureDeleteMsg(stream string, seq uint64, opts ...nats.JSOpt) error { - return nil -} - -func (m *MockJetStreamContext) StreamInfo(stream string, opts ...nats.JSOpt) (*nats.StreamInfo, error) { - return nil, nil -} - -func (m *MockJetStreamContext) StreamNameBySubject(subject string, opts ...nats.JSOpt) (string, error) { - return "", nil -} - -func (m *MockJetStreamContext) StreamNames(opts ...nats.JSOpt) <-chan string { - return nil -} - -func (m *MockJetStreamContext) Streams(opts ...nats.JSOpt) <-chan *nats.StreamInfo { - return nil -} - -func (m *MockJetStreamContext) StreamsInfo(opts ...nats.JSOpt) <-chan *nats.StreamInfo { - return nil -} - -func (m *MockJetStreamContext) Subscribe(subject string, cb nats.MsgHandler, opts ...nats.SubOpt) (*nats.Subscription, error) { - return nil, nil -} - -func (m *MockJetStreamContext) SubscribeSync(subject string, opts ...nats.SubOpt) (*nats.Subscription, error) { - return nil, nil -} - -func (m *MockJetStreamContext) UpdateConsumer(stream string, cfg *nats.ConsumerConfig, opts ...nats.JSOpt) (*nats.ConsumerInfo, error) { - return nil, nil -} - -func (m *MockJetStreamContext) UpdateStream(cfg *nats.StreamConfig, opts ...nats.JSOpt) (*nats.StreamInfo, error) { - return nil, nil -} - -func (m *MockJetStreamContext) Publish(subj string, data []byte, opts ...nats.PubOpt) (*nats.PubAck, error) { - return nil, errors.New("mocked publish error") -} - type MockResourceInterface struct{} func (m *MockResourceInterface) Create(ctx context.Context, obj *unstructured.Unstructured, options metav1.CreateOptions, subresources ...string) (*unstructured.Unstructured, error) { diff --git a/agent/kubviz/plugins/rakkess/rakees_agent.go b/agent/kubviz/plugins/rakkess/rakees_agent.go index 93414db3..3840f209 100644 --- a/agent/kubviz/plugins/rakkess/rakees_agent.go +++ b/agent/kubviz/plugins/rakkess/rakees_agent.go @@ -15,7 +15,7 @@ import ( "go.opentelemetry.io/otel/attribute" "github.com/intelops/kubviz/model" - "github.com/nats-io/nats.go" + "github.com/intelops/kubviz/pkg/nats/sdk" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" ) @@ -37,7 +37,7 @@ func accessToOutcome(access Access) (Outcome, error) { } } -func RakeesOutput(config *rest.Config, js nats.JetStreamContext) error { +func RakeesOutput(config *rest.Config, natsCli *sdk.NATSClient) error { ctx := context.Background() tracer := otel.Tracer("rakees") @@ -96,7 +96,7 @@ func RakeesOutput(config *rest.Config, js nats.JetStreamContext) error { Update: HumanreadableAccessCode(updateOutcome), } metricsJson, _ := json.Marshal(metrics) - _, err = js.Publish(constants.EventSubject_rakees, metricsJson) + err = natsCli.Publish(constants.EventSubject_rakees, metricsJson) if err != nil { return err } diff --git a/agent/kubviz/plugins/trivy/trivy.go b/agent/kubviz/plugins/trivy/trivy.go index 625b405a..ddb1e995 100644 --- a/agent/kubviz/plugins/trivy/trivy.go +++ b/agent/kubviz/plugins/trivy/trivy.go @@ -14,8 +14,8 @@ import ( "github.com/google/uuid" "github.com/intelops/kubviz/constants" "github.com/intelops/kubviz/model" + "github.com/intelops/kubviz/pkg/nats/sdk" "github.com/intelops/kubviz/pkg/opentelemetry" - "github.com/nats-io/nats.go" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" ) @@ -43,7 +43,7 @@ func executeCommandTrivy(command string) ([]byte, error) { return outc.Bytes(), err } -func RunTrivyK8sClusterScan(js nats.JetStreamContext) error { +func RunTrivyK8sClusterScan(natsCli sdk.NATSClientInterface) error { pvcMountPath := "/mnt/agent/kbz" trivyCacheDir := fmt.Sprintf("%s/trivy-cache", pvcMountPath) err := os.MkdirAll(trivyCacheDir, 0755) @@ -87,21 +87,21 @@ func RunTrivyK8sClusterScan(js nats.JetStreamContext) error { // log.Printf("Error executing command: %v\n", err) // return err // } - err = PublishTrivyK8sReport(report, js) + err = PublishTrivyK8sReport(report, natsCli) if err != nil { return err } return nil } -func PublishTrivyK8sReport(report report.ConsolidatedReport, js nats.JetStreamContext) error { +func PublishTrivyK8sReport(report report.ConsolidatedReport, natsCli sdk.NATSClientInterface) error { metrics := model.Trivy{ ID: uuid.New().String(), ClusterName: ClusterName, Report: report, } metricsJson, _ := json.Marshal(metrics) - _, err := js.Publish(constants.TRIVY_K8S_SUBJECT, metricsJson) + err := natsCli.Publish(constants.TRIVY_K8S_SUBJECT, metricsJson) if err != nil { return err } diff --git a/agent/kubviz/plugins/trivy/trivy_image.go b/agent/kubviz/plugins/trivy/trivy_image.go index 9628e31d..0cfff0f5 100644 --- a/agent/kubviz/plugins/trivy/trivy_image.go +++ b/agent/kubviz/plugins/trivy/trivy_image.go @@ -14,6 +14,7 @@ import ( "github.com/google/uuid" "github.com/intelops/kubviz/constants" "github.com/intelops/kubviz/model" + "github.com/intelops/kubviz/pkg/nats/sdk" "github.com/intelops/kubviz/pkg/opentelemetry" "github.com/nats-io/nats.go" "github.com/pkg/errors" @@ -31,8 +32,12 @@ type JetStreamContextInterface interface { nats.ObjectStoreManager AccountInfo(opts ...nats.JSOpt) (*nats.AccountInfo, error) } +type NATSClientInterface interface { + CreateStream(streamName string) error + Publish(subject string, data []byte) error +} -func RunTrivyImageScans(config *rest.Config, js nats.JetStreamContext) error { +func RunTrivyImageScans(config *rest.Config, natsCli sdk.NATSClientInterface) error { pvcMountPath := "/mnt/agent/kbz" trivyImageCacheDir := fmt.Sprintf("%s/trivy-imagecache", pvcMountPath) err := os.MkdirAll(trivyImageCacheDir, 0755) @@ -84,7 +89,7 @@ func RunTrivyImageScans(config *rest.Config, js nats.JetStreamContext) error { // log.Printf("Error executing command: %v\n", err) // return err // } - err = PublishImageScanReports(report, js) + err = PublishImageScanReports(report, natsCli) if err != nil { return err } @@ -92,14 +97,14 @@ func RunTrivyImageScans(config *rest.Config, js nats.JetStreamContext) error { return nil } -func PublishImageScanReports(report types.Report, js nats.JetStreamContext) error { +func PublishImageScanReports(report types.Report, natsCli sdk.NATSClientInterface) error { metrics := model.TrivyImage{ ID: uuid.New().String(), ClusterName: ClusterName, Report: report, } metricsJson, _ := json.Marshal(metrics) - _, err := js.Publish(constants.TRIVY_IMAGE_SUBJECT, metricsJson) + err := natsCli.Publish(constants.TRIVY_IMAGE_SUBJECT, metricsJson) if err != nil { return err } diff --git a/agent/kubviz/plugins/trivy/trivy_sbom.go b/agent/kubviz/plugins/trivy/trivy_sbom.go index 62027761..2b11f661 100644 --- a/agent/kubviz/plugins/trivy/trivy_sbom.go +++ b/agent/kubviz/plugins/trivy/trivy_sbom.go @@ -13,8 +13,8 @@ import ( "github.com/google/uuid" "github.com/intelops/kubviz/constants" "github.com/intelops/kubviz/model" + "github.com/intelops/kubviz/pkg/nats/sdk" "github.com/intelops/kubviz/pkg/opentelemetry" - "github.com/nats-io/nats.go" "github.com/pkg/errors" "go.opentelemetry.io/otel" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -22,7 +22,7 @@ import ( "k8s.io/client-go/rest" ) -func PublishTrivySbomReport(report map[string]interface{}, js nats.JetStreamContext) error { +func PublishTrivySbomReport(report map[string]interface{}, natsCli sdk.NATSClientInterface) error { metrics := model.Sbom{ ID: uuid.New().String(), @@ -34,7 +34,7 @@ func PublishTrivySbomReport(report map[string]interface{}, js nats.JetStreamCont log.Println("error occurred while marshalling sbom metrics in agent", err.Error()) return err } - _, err = js.Publish(constants.TRIVY_SBOM_SUBJECT, metricsJson) + err = natsCli.Publish(constants.TRIVY_SBOM_SUBJECT, metricsJson) if err != nil { return err } @@ -55,7 +55,7 @@ func executeCommandSbom(command string) ([]byte, error) { return outc.Bytes(), err } -func RunTrivySbomScan(config *rest.Config, js nats.JetStreamContext) error { +func RunTrivySbomScan(config *rest.Config, natsCli sdk.NATSClientInterface) error { log.Println("trivy sbom scan started...") pvcMountPath := "/mnt/agent/kbz" trivySbomCacheDir := fmt.Sprintf("%s/trivy-sbomcache", pvcMountPath) @@ -100,7 +100,7 @@ func RunTrivySbomScan(config *rest.Config, js nats.JetStreamContext) error { log.Printf("Error unmarshaling JSON data for image sbom %s: %v", image.PullableImage, err) continue // Move on to the next image in case of an error } - err = PublishTrivySbomReport(report, js) + err = PublishTrivySbomReport(report, natsCli) if err != nil { log.Printf("Error publishing Trivy SBOM report for image %s: %v", image.PullableImage, err) continue diff --git a/agent/kubviz/plugins/trivy/trivy_test.go b/agent/kubviz/plugins/trivy/trivy_test.go index 9ef99e40..69be9e61 100644 --- a/agent/kubviz/plugins/trivy/trivy_test.go +++ b/agent/kubviz/plugins/trivy/trivy_test.go @@ -12,7 +12,9 @@ import ( "github.com/aquasecurity/trivy/pkg/types" "github.com/golang/mock/gomock" "github.com/intelops/kubviz/constants" - "github.com/intelops/kubviz/mocks" + mocks "github.com/intelops/kubviz/pkg/nats/sdk/mocks" + + //"github.com/intelops/kubviz/mocks" "github.com/intelops/kubviz/model" "github.com/stretchr/testify/assert" "k8s.io/client-go/kubernetes" @@ -41,7 +43,7 @@ func TestPublishTrivyK8sReport(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - js := mocks.NewMockJetStreamContextInterface(ctrl) + js := mocks.NewMockNATSClientInterface(ctrl) // Define the sample consolidated report report := report.ConsolidatedReport{ @@ -51,7 +53,7 @@ func TestPublishTrivyK8sReport(t *testing.T) { // Test case: Testing successful publishing of Trivy K8s report t.Run("Successful publishing", func(t *testing.T) { // Set the mock expectation for Publish - js.EXPECT().Publish(constants.TRIVY_K8S_SUBJECT, gomock.Any()).Return(nil, nil) + js.EXPECT().Publish(constants.TRIVY_K8S_SUBJECT, gomock.Any()).Return(nil) // Call the function under test err := PublishTrivyK8sReport(report, js) @@ -63,7 +65,7 @@ func TestPublishTrivyK8sReport(t *testing.T) { // Test case: Error handling for Publish failure t.Run("Error handling for Publish failure", func(t *testing.T) { // Set the mock expectation for Publish to return an error - js.EXPECT().Publish(constants.TRIVY_K8S_SUBJECT, gomock.Any()).Return(nil, errors.New("publish error")) + js.EXPECT().Publish(constants.TRIVY_K8S_SUBJECT, gomock.Any()).Return(errors.New("publish error")) // Call the function under test err := PublishTrivyK8sReport(report, js) @@ -93,12 +95,12 @@ func TestRunTrivyK8sClusterScan(t *testing.T) { defer ctrl.Finish() // Create a JetStreamContext mock - jsMock := mocks.NewMockJetStreamContextInterface(ctrl) + jsMock := mocks.NewMockNATSClientInterface(ctrl) // Test case: Successful Trivy scan t.Run("Successful scan", func(t *testing.T) { // Set the mock expectation for PublishTrivyK8sReport - jsMock.EXPECT().Publish(gomock.Any(), gomock.Any()).Return(nil, nil) + jsMock.EXPECT().Publish(gomock.Any(), gomock.Any()).Return(nil) // Call the function under test err := RunTrivyK8sClusterScan(jsMock) @@ -166,7 +168,7 @@ func TestRunTrivyK8sClusterScan(t *testing.T) { defer monkey.Unpatch(executeCommandTrivy) // Mock Publish to return an error - jsMock.EXPECT().Publish(gomock.Any(), gomock.Any()).Return(nil, errors.New("publish error")) + jsMock.EXPECT().Publish(gomock.Any(), gomock.Any()).Return(errors.New("publish error")) // Call the function under test err := RunTrivyK8sClusterScan(jsMock) @@ -179,7 +181,7 @@ func TestPublishTrivySbomReport(t *testing.T) { defer ctrl.Finish() // Create a JetStreamContext mock - jsMock := mocks.NewMockJetStreamContextInterface(ctrl) + jsMock := mocks.NewMockNATSClientInterface(ctrl) // Define a sample report report := map[string]interface{}{ @@ -190,7 +192,7 @@ func TestPublishTrivySbomReport(t *testing.T) { // Test case: Successful publishing of Trivy SBOM report t.Run("Successful publishing", func(t *testing.T) { // Set the mock expectation for Publish - jsMock.EXPECT().Publish(constants.TRIVY_SBOM_SUBJECT, gomock.Any()).Return(nil, nil) + jsMock.EXPECT().Publish(constants.TRIVY_SBOM_SUBJECT, gomock.Any()).Return(nil) // Call the function under test err := PublishTrivySbomReport(report, jsMock) @@ -202,7 +204,7 @@ func TestPublishTrivySbomReport(t *testing.T) { // Test case: Error handling for Publish failure t.Run("Error handling for Publish failure", func(t *testing.T) { // Set the mock expectation for Publish to return an error - jsMock.EXPECT().Publish(constants.TRIVY_SBOM_SUBJECT, gomock.Any()).Return(nil, errors.New("publish error")) + jsMock.EXPECT().Publish(constants.TRIVY_SBOM_SUBJECT, gomock.Any()).Return(errors.New("publish error")) // Call the function under test err := PublishTrivySbomReport(report, jsMock) @@ -251,7 +253,7 @@ func TestPublishImageScanReports(t *testing.T) { defer ctrl.Finish() // Create a JetStreamContext mock - jsMock := mocks.NewMockJetStreamContextInterface(ctrl) + jsMock := mocks.NewMockNATSClientInterface(ctrl) // Define a sample report report := types.Report{ @@ -261,7 +263,7 @@ func TestPublishImageScanReports(t *testing.T) { // Test case: Successful publishing of Trivy image scan report t.Run("Successful publishing", func(t *testing.T) { // Set the mock expectation for Publish - jsMock.EXPECT().Publish(constants.TRIVY_IMAGE_SUBJECT, gomock.Any()).Return(nil, nil) + jsMock.EXPECT().Publish(constants.TRIVY_IMAGE_SUBJECT, gomock.Any()).Return(nil) // Call the function under test err := PublishImageScanReports(report, jsMock) @@ -273,7 +275,7 @@ func TestPublishImageScanReports(t *testing.T) { // Test case: Error handling for Publish failure t.Run("Error handling for Publish failure", func(t *testing.T) { // Set the mock expectation for Publish to return an error - jsMock.EXPECT().Publish(constants.TRIVY_IMAGE_SUBJECT, gomock.Any()).Return(nil, errors.New("publish error")) + jsMock.EXPECT().Publish(constants.TRIVY_IMAGE_SUBJECT, gomock.Any()).Return(errors.New("publish error")) // Call the function under test err := PublishImageScanReports(report, jsMock) @@ -309,7 +311,7 @@ func TestRunTrivySbomScan(t *testing.T) { defer ctrl.Finish() // Create a JetStreamContext mock - jsMock := mocks.NewMockJetStreamContextInterface(ctrl) + jsMock := mocks.NewMockNATSClientInterface(ctrl) monkey.Patch(ListImagesforSbom, func(config *rest.Config) ([]model.RunningImage, error) { return []model.RunningImage{{PullableImage: "image1"}}, nil }) @@ -335,7 +337,7 @@ func TestRunTrivyImageScans(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - jsMock := mocks.NewMockJetStreamContextInterface(ctrl) + jsMock := mocks.NewMockNATSClientInterface(ctrl) images := []model.RunningImage{ {PullableImage: "image1"}, diff --git a/agent/kubviz/scheduler/scheduler.go b/agent/kubviz/scheduler/scheduler.go index ccd9564e..ac7f68d5 100644 --- a/agent/kubviz/scheduler/scheduler.go +++ b/agent/kubviz/scheduler/scheduler.go @@ -3,7 +3,6 @@ package scheduler import ( "sync" - "github.com/nats-io/nats.go" "github.com/pkg/errors" "github.com/robfig/cron/v3" "k8s.io/client-go/kubernetes" @@ -11,6 +10,7 @@ import ( "github.com/intelops/go-common/logging" "github.com/intelops/kubviz/agent/config" + "github.com/intelops/kubviz/pkg/nats/sdk" ) type jobHandler interface { @@ -91,7 +91,7 @@ func (t *Scheduler) GetJobs() map[string]jobHandler { return t.jobs } -func InitScheduler(config *rest.Config, js nats.JetStreamContext, cfg config.AgentConfigurations, clientset *kubernetes.Clientset) (s *Scheduler) { +func InitScheduler(config *rest.Config, js *sdk.NATSClient, cfg config.AgentConfigurations, clientset *kubernetes.Clientset) (s *Scheduler) { log := logging.NewLogger() s = NewScheduler(log) if cfg.OutdatedInterval != "" && cfg.OutdatedInterval != "0" { diff --git a/agent/kubviz/scheduler/scheduler_watch.go b/agent/kubviz/scheduler/scheduler_watch.go index f2e769af..93ded7df 100644 --- a/agent/kubviz/scheduler/scheduler_watch.go +++ b/agent/kubviz/scheduler/scheduler_watch.go @@ -4,7 +4,6 @@ import ( "github.com/intelops/kubviz/agent/kubviz/plugins/events" "github.com/intelops/kubviz/agent/kubviz/plugins/ketall" "github.com/intelops/kubviz/agent/kubviz/plugins/kubepreupgrade" - "github.com/nats-io/nats.go" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -12,51 +11,52 @@ import ( "github.com/intelops/kubviz/agent/kubviz/plugins/outdated" "github.com/intelops/kubviz/agent/kubviz/plugins/rakkess" "github.com/intelops/kubviz/agent/kubviz/plugins/trivy" + "github.com/intelops/kubviz/pkg/nats/sdk" ) type OutDatedImagesJob struct { config *rest.Config - js nats.JetStreamContext + js *sdk.NATSClient frequency string } type KetallJob struct { config *rest.Config - js nats.JetStreamContext + js *sdk.NATSClient frequency string } type TrivyImageJob struct { config *rest.Config - js nats.JetStreamContext + js *sdk.NATSClient frequency string } type TrivySbomJob struct { config *rest.Config - js nats.JetStreamContext + js *sdk.NATSClient frequency string } type TrivyClusterScanJob struct { //config *rest.Config - js nats.JetStreamContext + js *sdk.NATSClient frequency string } type RakkessJob struct { config *rest.Config - js nats.JetStreamContext + js *sdk.NATSClient frequency string } type KubePreUpgradeJob struct { config *rest.Config - js nats.JetStreamContext + js *sdk.NATSClient frequency string } type KubescoreJob struct { clientset *kubernetes.Clientset - js nats.JetStreamContext + js *sdk.NATSClient frequency string } -func NewTrivySbomJob(config *rest.Config, js nats.JetStreamContext, frequency string) (*TrivySbomJob, error) { +func NewTrivySbomJob(config *rest.Config, js *sdk.NATSClient, frequency string) (*TrivySbomJob, error) { return &TrivySbomJob{ config: config, js: js, @@ -73,7 +73,7 @@ func (j *TrivySbomJob) Run() { events.LogErr(err) } -func NewTrivyClusterScanJob(js nats.JetStreamContext, frequency string) (*TrivyClusterScanJob, error) { +func NewTrivyClusterScanJob(js *sdk.NATSClient, frequency string) (*TrivyClusterScanJob, error) { return &TrivyClusterScanJob{ // config: config, js: js, @@ -89,7 +89,7 @@ func (j *TrivyClusterScanJob) Run() { err := trivy.RunTrivyK8sClusterScan(j.js) events.LogErr(err) } -func NewTrivyImagesJob(config *rest.Config, js nats.JetStreamContext, frequency string) (*TrivyImageJob, error) { +func NewTrivyImagesJob(config *rest.Config, js *sdk.NATSClient, frequency string) (*TrivyImageJob, error) { return &TrivyImageJob{ config: config, js: js, @@ -105,7 +105,7 @@ func (j *TrivyImageJob) Run() { err := trivy.RunTrivyImageScans(j.config, j.js) events.LogErr(err) } -func NewOutDatedImagesJob(config *rest.Config, js nats.JetStreamContext, frequency string) (*OutDatedImagesJob, error) { +func NewOutDatedImagesJob(config *rest.Config, js *sdk.NATSClient, frequency string) (*OutDatedImagesJob, error) { return &OutDatedImagesJob{ config: config, js: js, @@ -121,7 +121,7 @@ func (j *OutDatedImagesJob) Run() { err := outdated.OutDatedImages(j.config, j.js) events.LogErr(err) } -func NewKetallJob(config *rest.Config, js nats.JetStreamContext, frequency string) (*KetallJob, error) { +func NewKetallJob(config *rest.Config, js *sdk.NATSClient, frequency string) (*KetallJob, error) { return &KetallJob{ config: config, js: js, @@ -138,7 +138,7 @@ func (j *KetallJob) Run() { events.LogErr(err) } -func NewKubePreUpgradeJob(config *rest.Config, js nats.JetStreamContext, frequency string) (*KubePreUpgradeJob, error) { +func NewKubePreUpgradeJob(config *rest.Config, js *sdk.NATSClient, frequency string) (*KubePreUpgradeJob, error) { return &KubePreUpgradeJob{ config: config, js: js, @@ -155,7 +155,7 @@ func (j *KubePreUpgradeJob) Run() { events.LogErr(err) } -func NewKubescoreJob(clientset *kubernetes.Clientset, js nats.JetStreamContext, frequency string) (*KubescoreJob, error) { +func NewKubescoreJob(clientset *kubernetes.Clientset, js *sdk.NATSClient, frequency string) (*KubescoreJob, error) { return &KubescoreJob{ clientset: clientset, js: js, @@ -171,7 +171,7 @@ func (j *KubescoreJob) Run() { err := kubescore.RunKubeScore(j.clientset, j.js) events.LogErr(err) } -func NewRakkessJob(config *rest.Config, js nats.JetStreamContext, frequency string) (*RakkessJob, error) { +func NewRakkessJob(config *rest.Config, js *sdk.NATSClient, frequency string) (*RakkessJob, error) { return &RakkessJob{ config: config, js: js, diff --git a/mocks/trivy_client_mock.go b/mocks/trivy_client_mock.go index 8d6ba59c..a9508799 100644 --- a/mocks/trivy_client_mock.go +++ b/mocks/trivy_client_mock.go @@ -8,7 +8,7 @@ import ( reflect "reflect" gomock "github.com/golang/mock/gomock" - nats_go "github.com/nats-io/nats.go" + nats "github.com/nats-io/nats.go" ) // MockJetStreamContextInterface is a mock of JetStreamContextInterface interface. @@ -35,14 +35,14 @@ func (m *MockJetStreamContextInterface) EXPECT() *MockJetStreamContextInterfaceM } // AccountInfo mocks base method. -func (m *MockJetStreamContextInterface) AccountInfo(opts ...nats_go.JSOpt) (*nats_go.AccountInfo, error) { +func (m *MockJetStreamContextInterface) AccountInfo(opts ...nats.JSOpt) (*nats.AccountInfo, error) { m.ctrl.T.Helper() varargs := []interface{}{} for _, a := range opts { varargs = append(varargs, a) } ret := m.ctrl.Call(m, "AccountInfo", varargs...) - ret0, _ := ret[0].(*nats_go.AccountInfo) + ret0, _ := ret[0].(*nats.AccountInfo) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -54,14 +54,14 @@ func (mr *MockJetStreamContextInterfaceMockRecorder) AccountInfo(opts ...interfa } // AddConsumer mocks base method. -func (m *MockJetStreamContextInterface) AddConsumer(stream string, cfg *nats_go.ConsumerConfig, opts ...nats_go.JSOpt) (*nats_go.ConsumerInfo, error) { +func (m *MockJetStreamContextInterface) AddConsumer(stream string, cfg *nats.ConsumerConfig, opts ...nats.JSOpt) (*nats.ConsumerInfo, error) { m.ctrl.T.Helper() varargs := []interface{}{stream, cfg} for _, a := range opts { varargs = append(varargs, a) } ret := m.ctrl.Call(m, "AddConsumer", varargs...) - ret0, _ := ret[0].(*nats_go.ConsumerInfo) + ret0, _ := ret[0].(*nats.ConsumerInfo) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -74,14 +74,14 @@ func (mr *MockJetStreamContextInterfaceMockRecorder) AddConsumer(stream, cfg int } // AddStream mocks base method. -func (m *MockJetStreamContextInterface) AddStream(cfg *nats_go.StreamConfig, opts ...nats_go.JSOpt) (*nats_go.StreamInfo, error) { +func (m *MockJetStreamContextInterface) AddStream(cfg *nats.StreamConfig, opts ...nats.JSOpt) (*nats.StreamInfo, error) { m.ctrl.T.Helper() varargs := []interface{}{cfg} for _, a := range opts { varargs = append(varargs, a) } ret := m.ctrl.Call(m, "AddStream", varargs...) - ret0, _ := ret[0].(*nats_go.StreamInfo) + ret0, _ := ret[0].(*nats.StreamInfo) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -94,14 +94,14 @@ func (mr *MockJetStreamContextInterfaceMockRecorder) AddStream(cfg interface{}, } // ChanQueueSubscribe mocks base method. -func (m *MockJetStreamContextInterface) ChanQueueSubscribe(subj, queue string, ch chan *nats_go.Msg, opts ...nats_go.SubOpt) (*nats_go.Subscription, error) { +func (m *MockJetStreamContextInterface) ChanQueueSubscribe(subj, queue string, ch chan *nats.Msg, opts ...nats.SubOpt) (*nats.Subscription, error) { m.ctrl.T.Helper() varargs := []interface{}{subj, queue, ch} for _, a := range opts { varargs = append(varargs, a) } ret := m.ctrl.Call(m, "ChanQueueSubscribe", varargs...) - ret0, _ := ret[0].(*nats_go.Subscription) + ret0, _ := ret[0].(*nats.Subscription) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -114,14 +114,14 @@ func (mr *MockJetStreamContextInterfaceMockRecorder) ChanQueueSubscribe(subj, qu } // ChanSubscribe mocks base method. -func (m *MockJetStreamContextInterface) ChanSubscribe(subj string, ch chan *nats_go.Msg, opts ...nats_go.SubOpt) (*nats_go.Subscription, error) { +func (m *MockJetStreamContextInterface) ChanSubscribe(subj string, ch chan *nats.Msg, opts ...nats.SubOpt) (*nats.Subscription, error) { m.ctrl.T.Helper() varargs := []interface{}{subj, ch} for _, a := range opts { varargs = append(varargs, a) } ret := m.ctrl.Call(m, "ChanSubscribe", varargs...) - ret0, _ := ret[0].(*nats_go.Subscription) + ret0, _ := ret[0].(*nats.Subscription) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -134,14 +134,14 @@ func (mr *MockJetStreamContextInterfaceMockRecorder) ChanSubscribe(subj, ch inte } // ConsumerInfo mocks base method. -func (m *MockJetStreamContextInterface) ConsumerInfo(stream, name string, opts ...nats_go.JSOpt) (*nats_go.ConsumerInfo, error) { +func (m *MockJetStreamContextInterface) ConsumerInfo(stream, name string, opts ...nats.JSOpt) (*nats.ConsumerInfo, error) { m.ctrl.T.Helper() varargs := []interface{}{stream, name} for _, a := range opts { varargs = append(varargs, a) } ret := m.ctrl.Call(m, "ConsumerInfo", varargs...) - ret0, _ := ret[0].(*nats_go.ConsumerInfo) + ret0, _ := ret[0].(*nats.ConsumerInfo) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -154,7 +154,7 @@ func (mr *MockJetStreamContextInterfaceMockRecorder) ConsumerInfo(stream, name i } // ConsumerNames mocks base method. -func (m *MockJetStreamContextInterface) ConsumerNames(stream string, opts ...nats_go.JSOpt) <-chan string { +func (m *MockJetStreamContextInterface) ConsumerNames(stream string, opts ...nats.JSOpt) <-chan string { m.ctrl.T.Helper() varargs := []interface{}{stream} for _, a := range opts { @@ -173,14 +173,14 @@ func (mr *MockJetStreamContextInterfaceMockRecorder) ConsumerNames(stream interf } // Consumers mocks base method. -func (m *MockJetStreamContextInterface) Consumers(stream string, opts ...nats_go.JSOpt) <-chan *nats_go.ConsumerInfo { +func (m *MockJetStreamContextInterface) Consumers(stream string, opts ...nats.JSOpt) <-chan *nats.ConsumerInfo { m.ctrl.T.Helper() varargs := []interface{}{stream} for _, a := range opts { varargs = append(varargs, a) } ret := m.ctrl.Call(m, "Consumers", varargs...) - ret0, _ := ret[0].(<-chan *nats_go.ConsumerInfo) + ret0, _ := ret[0].(<-chan *nats.ConsumerInfo) return ret0 } @@ -192,14 +192,14 @@ func (mr *MockJetStreamContextInterfaceMockRecorder) Consumers(stream interface{ } // ConsumersInfo mocks base method. -func (m *MockJetStreamContextInterface) ConsumersInfo(stream string, opts ...nats_go.JSOpt) <-chan *nats_go.ConsumerInfo { +func (m *MockJetStreamContextInterface) ConsumersInfo(stream string, opts ...nats.JSOpt) <-chan *nats.ConsumerInfo { m.ctrl.T.Helper() varargs := []interface{}{stream} for _, a := range opts { varargs = append(varargs, a) } ret := m.ctrl.Call(m, "ConsumersInfo", varargs...) - ret0, _ := ret[0].(<-chan *nats_go.ConsumerInfo) + ret0, _ := ret[0].(<-chan *nats.ConsumerInfo) return ret0 } @@ -211,10 +211,10 @@ func (mr *MockJetStreamContextInterfaceMockRecorder) ConsumersInfo(stream interf } // CreateKeyValue mocks base method. -func (m *MockJetStreamContextInterface) CreateKeyValue(cfg *nats_go.KeyValueConfig) (nats_go.KeyValue, error) { +func (m *MockJetStreamContextInterface) CreateKeyValue(cfg *nats.KeyValueConfig) (nats.KeyValue, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "CreateKeyValue", cfg) - ret0, _ := ret[0].(nats_go.KeyValue) + ret0, _ := ret[0].(nats.KeyValue) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -226,10 +226,10 @@ func (mr *MockJetStreamContextInterfaceMockRecorder) CreateKeyValue(cfg interfac } // CreateObjectStore mocks base method. -func (m *MockJetStreamContextInterface) CreateObjectStore(cfg *nats_go.ObjectStoreConfig) (nats_go.ObjectStore, error) { +func (m *MockJetStreamContextInterface) CreateObjectStore(cfg *nats.ObjectStoreConfig) (nats.ObjectStore, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "CreateObjectStore", cfg) - ret0, _ := ret[0].(nats_go.ObjectStore) + ret0, _ := ret[0].(nats.ObjectStore) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -241,7 +241,7 @@ func (mr *MockJetStreamContextInterfaceMockRecorder) CreateObjectStore(cfg inter } // DeleteConsumer mocks base method. -func (m *MockJetStreamContextInterface) DeleteConsumer(stream, consumer string, opts ...nats_go.JSOpt) error { +func (m *MockJetStreamContextInterface) DeleteConsumer(stream, consumer string, opts ...nats.JSOpt) error { m.ctrl.T.Helper() varargs := []interface{}{stream, consumer} for _, a := range opts { @@ -274,7 +274,7 @@ func (mr *MockJetStreamContextInterfaceMockRecorder) DeleteKeyValue(bucket inter } // DeleteMsg mocks base method. -func (m *MockJetStreamContextInterface) DeleteMsg(name string, seq uint64, opts ...nats_go.JSOpt) error { +func (m *MockJetStreamContextInterface) DeleteMsg(name string, seq uint64, opts ...nats.JSOpt) error { m.ctrl.T.Helper() varargs := []interface{}{name, seq} for _, a := range opts { @@ -307,7 +307,7 @@ func (mr *MockJetStreamContextInterfaceMockRecorder) DeleteObjectStore(bucket in } // DeleteStream mocks base method. -func (m *MockJetStreamContextInterface) DeleteStream(name string, opts ...nats_go.JSOpt) error { +func (m *MockJetStreamContextInterface) DeleteStream(name string, opts ...nats.JSOpt) error { m.ctrl.T.Helper() varargs := []interface{}{name} for _, a := range opts { @@ -326,14 +326,14 @@ func (mr *MockJetStreamContextInterfaceMockRecorder) DeleteStream(name interface } // GetLastMsg mocks base method. -func (m *MockJetStreamContextInterface) GetLastMsg(name, subject string, opts ...nats_go.JSOpt) (*nats_go.RawStreamMsg, error) { +func (m *MockJetStreamContextInterface) GetLastMsg(name, subject string, opts ...nats.JSOpt) (*nats.RawStreamMsg, error) { m.ctrl.T.Helper() varargs := []interface{}{name, subject} for _, a := range opts { varargs = append(varargs, a) } ret := m.ctrl.Call(m, "GetLastMsg", varargs...) - ret0, _ := ret[0].(*nats_go.RawStreamMsg) + ret0, _ := ret[0].(*nats.RawStreamMsg) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -346,14 +346,14 @@ func (mr *MockJetStreamContextInterfaceMockRecorder) GetLastMsg(name, subject in } // GetMsg mocks base method. -func (m *MockJetStreamContextInterface) GetMsg(name string, seq uint64, opts ...nats_go.JSOpt) (*nats_go.RawStreamMsg, error) { +func (m *MockJetStreamContextInterface) GetMsg(name string, seq uint64, opts ...nats.JSOpt) (*nats.RawStreamMsg, error) { m.ctrl.T.Helper() varargs := []interface{}{name, seq} for _, a := range opts { varargs = append(varargs, a) } ret := m.ctrl.Call(m, "GetMsg", varargs...) - ret0, _ := ret[0].(*nats_go.RawStreamMsg) + ret0, _ := ret[0].(*nats.RawStreamMsg) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -366,10 +366,10 @@ func (mr *MockJetStreamContextInterfaceMockRecorder) GetMsg(name, seq interface{ } // KeyValue mocks base method. -func (m *MockJetStreamContextInterface) KeyValue(bucket string) (nats_go.KeyValue, error) { +func (m *MockJetStreamContextInterface) KeyValue(bucket string) (nats.KeyValue, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "KeyValue", bucket) - ret0, _ := ret[0].(nats_go.KeyValue) + ret0, _ := ret[0].(nats.KeyValue) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -395,10 +395,10 @@ func (mr *MockJetStreamContextInterfaceMockRecorder) KeyValueStoreNames() *gomoc } // KeyValueStores mocks base method. -func (m *MockJetStreamContextInterface) KeyValueStores() <-chan nats_go.KeyValueStatus { +func (m *MockJetStreamContextInterface) KeyValueStores() <-chan nats.KeyValueStatus { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "KeyValueStores") - ret0, _ := ret[0].(<-chan nats_go.KeyValueStatus) + ret0, _ := ret[0].(<-chan nats.KeyValueStatus) return ret0 } @@ -409,10 +409,10 @@ func (mr *MockJetStreamContextInterfaceMockRecorder) KeyValueStores() *gomock.Ca } // ObjectStore mocks base method. -func (m *MockJetStreamContextInterface) ObjectStore(bucket string) (nats_go.ObjectStore, error) { +func (m *MockJetStreamContextInterface) ObjectStore(bucket string) (nats.ObjectStore, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ObjectStore", bucket) - ret0, _ := ret[0].(nats_go.ObjectStore) + ret0, _ := ret[0].(nats.ObjectStore) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -424,7 +424,7 @@ func (mr *MockJetStreamContextInterfaceMockRecorder) ObjectStore(bucket interfac } // ObjectStoreNames mocks base method. -func (m *MockJetStreamContextInterface) ObjectStoreNames(opts ...nats_go.ObjectOpt) <-chan string { +func (m *MockJetStreamContextInterface) ObjectStoreNames(opts ...nats.ObjectOpt) <-chan string { m.ctrl.T.Helper() varargs := []interface{}{} for _, a := range opts { @@ -442,14 +442,14 @@ func (mr *MockJetStreamContextInterfaceMockRecorder) ObjectStoreNames(opts ...in } // ObjectStores mocks base method. -func (m *MockJetStreamContextInterface) ObjectStores(opts ...nats_go.ObjectOpt) <-chan nats_go.ObjectStoreStatus { +func (m *MockJetStreamContextInterface) ObjectStores(opts ...nats.ObjectOpt) <-chan nats.ObjectStoreStatus { m.ctrl.T.Helper() varargs := []interface{}{} for _, a := range opts { varargs = append(varargs, a) } ret := m.ctrl.Call(m, "ObjectStores", varargs...) - ret0, _ := ret[0].(<-chan nats_go.ObjectStoreStatus) + ret0, _ := ret[0].(<-chan nats.ObjectStoreStatus) return ret0 } @@ -460,14 +460,14 @@ func (mr *MockJetStreamContextInterfaceMockRecorder) ObjectStores(opts ...interf } // Publish mocks base method. -func (m *MockJetStreamContextInterface) Publish(subj string, data []byte, opts ...nats_go.PubOpt) (*nats_go.PubAck, error) { +func (m *MockJetStreamContextInterface) Publish(subj string, data []byte, opts ...nats.PubOpt) (*nats.PubAck, error) { m.ctrl.T.Helper() varargs := []interface{}{subj, data} for _, a := range opts { varargs = append(varargs, a) } ret := m.ctrl.Call(m, "Publish", varargs...) - ret0, _ := ret[0].(*nats_go.PubAck) + ret0, _ := ret[0].(*nats.PubAck) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -480,14 +480,14 @@ func (mr *MockJetStreamContextInterfaceMockRecorder) Publish(subj, data interfac } // PublishAsync mocks base method. -func (m *MockJetStreamContextInterface) PublishAsync(subj string, data []byte, opts ...nats_go.PubOpt) (nats_go.PubAckFuture, error) { +func (m *MockJetStreamContextInterface) PublishAsync(subj string, data []byte, opts ...nats.PubOpt) (nats.PubAckFuture, error) { m.ctrl.T.Helper() varargs := []interface{}{subj, data} for _, a := range opts { varargs = append(varargs, a) } ret := m.ctrl.Call(m, "PublishAsync", varargs...) - ret0, _ := ret[0].(nats_go.PubAckFuture) + ret0, _ := ret[0].(nats.PubAckFuture) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -528,14 +528,14 @@ func (mr *MockJetStreamContextInterfaceMockRecorder) PublishAsyncPending() *gomo } // PublishMsg mocks base method. -func (m_2 *MockJetStreamContextInterface) PublishMsg(m *nats_go.Msg, opts ...nats_go.PubOpt) (*nats_go.PubAck, error) { +func (m_2 *MockJetStreamContextInterface) PublishMsg(m *nats.Msg, opts ...nats.PubOpt) (*nats.PubAck, error) { m_2.ctrl.T.Helper() varargs := []interface{}{m} for _, a := range opts { varargs = append(varargs, a) } ret := m_2.ctrl.Call(m_2, "PublishMsg", varargs...) - ret0, _ := ret[0].(*nats_go.PubAck) + ret0, _ := ret[0].(*nats.PubAck) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -548,14 +548,14 @@ func (mr *MockJetStreamContextInterfaceMockRecorder) PublishMsg(m interface{}, o } // PublishMsgAsync mocks base method. -func (m_2 *MockJetStreamContextInterface) PublishMsgAsync(m *nats_go.Msg, opts ...nats_go.PubOpt) (nats_go.PubAckFuture, error) { +func (m_2 *MockJetStreamContextInterface) PublishMsgAsync(m *nats.Msg, opts ...nats.PubOpt) (nats.PubAckFuture, error) { m_2.ctrl.T.Helper() varargs := []interface{}{m} for _, a := range opts { varargs = append(varargs, a) } ret := m_2.ctrl.Call(m_2, "PublishMsgAsync", varargs...) - ret0, _ := ret[0].(nats_go.PubAckFuture) + ret0, _ := ret[0].(nats.PubAckFuture) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -568,14 +568,14 @@ func (mr *MockJetStreamContextInterfaceMockRecorder) PublishMsgAsync(m interface } // PullSubscribe mocks base method. -func (m *MockJetStreamContextInterface) PullSubscribe(subj, durable string, opts ...nats_go.SubOpt) (*nats_go.Subscription, error) { +func (m *MockJetStreamContextInterface) PullSubscribe(subj, durable string, opts ...nats.SubOpt) (*nats.Subscription, error) { m.ctrl.T.Helper() varargs := []interface{}{subj, durable} for _, a := range opts { varargs = append(varargs, a) } ret := m.ctrl.Call(m, "PullSubscribe", varargs...) - ret0, _ := ret[0].(*nats_go.Subscription) + ret0, _ := ret[0].(*nats.Subscription) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -588,7 +588,7 @@ func (mr *MockJetStreamContextInterfaceMockRecorder) PullSubscribe(subj, durable } // PurgeStream mocks base method. -func (m *MockJetStreamContextInterface) PurgeStream(name string, opts ...nats_go.JSOpt) error { +func (m *MockJetStreamContextInterface) PurgeStream(name string, opts ...nats.JSOpt) error { m.ctrl.T.Helper() varargs := []interface{}{name} for _, a := range opts { @@ -607,14 +607,14 @@ func (mr *MockJetStreamContextInterfaceMockRecorder) PurgeStream(name interface{ } // QueueSubscribe mocks base method. -func (m *MockJetStreamContextInterface) QueueSubscribe(subj, queue string, cb nats_go.MsgHandler, opts ...nats_go.SubOpt) (*nats_go.Subscription, error) { +func (m *MockJetStreamContextInterface) QueueSubscribe(subj, queue string, cb nats.MsgHandler, opts ...nats.SubOpt) (*nats.Subscription, error) { m.ctrl.T.Helper() varargs := []interface{}{subj, queue, cb} for _, a := range opts { varargs = append(varargs, a) } ret := m.ctrl.Call(m, "QueueSubscribe", varargs...) - ret0, _ := ret[0].(*nats_go.Subscription) + ret0, _ := ret[0].(*nats.Subscription) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -627,14 +627,14 @@ func (mr *MockJetStreamContextInterfaceMockRecorder) QueueSubscribe(subj, queue, } // QueueSubscribeSync mocks base method. -func (m *MockJetStreamContextInterface) QueueSubscribeSync(subj, queue string, opts ...nats_go.SubOpt) (*nats_go.Subscription, error) { +func (m *MockJetStreamContextInterface) QueueSubscribeSync(subj, queue string, opts ...nats.SubOpt) (*nats.Subscription, error) { m.ctrl.T.Helper() varargs := []interface{}{subj, queue} for _, a := range opts { varargs = append(varargs, a) } ret := m.ctrl.Call(m, "QueueSubscribeSync", varargs...) - ret0, _ := ret[0].(*nats_go.Subscription) + ret0, _ := ret[0].(*nats.Subscription) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -647,7 +647,7 @@ func (mr *MockJetStreamContextInterfaceMockRecorder) QueueSubscribeSync(subj, qu } // SecureDeleteMsg mocks base method. -func (m *MockJetStreamContextInterface) SecureDeleteMsg(name string, seq uint64, opts ...nats_go.JSOpt) error { +func (m *MockJetStreamContextInterface) SecureDeleteMsg(name string, seq uint64, opts ...nats.JSOpt) error { m.ctrl.T.Helper() varargs := []interface{}{name, seq} for _, a := range opts { @@ -666,14 +666,14 @@ func (mr *MockJetStreamContextInterfaceMockRecorder) SecureDeleteMsg(name, seq i } // StreamInfo mocks base method. -func (m *MockJetStreamContextInterface) StreamInfo(stream string, opts ...nats_go.JSOpt) (*nats_go.StreamInfo, error) { +func (m *MockJetStreamContextInterface) StreamInfo(stream string, opts ...nats.JSOpt) (*nats.StreamInfo, error) { m.ctrl.T.Helper() varargs := []interface{}{stream} for _, a := range opts { varargs = append(varargs, a) } ret := m.ctrl.Call(m, "StreamInfo", varargs...) - ret0, _ := ret[0].(*nats_go.StreamInfo) + ret0, _ := ret[0].(*nats.StreamInfo) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -686,7 +686,7 @@ func (mr *MockJetStreamContextInterfaceMockRecorder) StreamInfo(stream interface } // StreamNameBySubject mocks base method. -func (m *MockJetStreamContextInterface) StreamNameBySubject(arg0 string, arg1 ...nats_go.JSOpt) (string, error) { +func (m *MockJetStreamContextInterface) StreamNameBySubject(arg0 string, arg1 ...nats.JSOpt) (string, error) { m.ctrl.T.Helper() varargs := []interface{}{arg0} for _, a := range arg1 { @@ -706,7 +706,7 @@ func (mr *MockJetStreamContextInterfaceMockRecorder) StreamNameBySubject(arg0 in } // StreamNames mocks base method. -func (m *MockJetStreamContextInterface) StreamNames(opts ...nats_go.JSOpt) <-chan string { +func (m *MockJetStreamContextInterface) StreamNames(opts ...nats.JSOpt) <-chan string { m.ctrl.T.Helper() varargs := []interface{}{} for _, a := range opts { @@ -724,14 +724,14 @@ func (mr *MockJetStreamContextInterfaceMockRecorder) StreamNames(opts ...interfa } // Streams mocks base method. -func (m *MockJetStreamContextInterface) Streams(opts ...nats_go.JSOpt) <-chan *nats_go.StreamInfo { +func (m *MockJetStreamContextInterface) Streams(opts ...nats.JSOpt) <-chan *nats.StreamInfo { m.ctrl.T.Helper() varargs := []interface{}{} for _, a := range opts { varargs = append(varargs, a) } ret := m.ctrl.Call(m, "Streams", varargs...) - ret0, _ := ret[0].(<-chan *nats_go.StreamInfo) + ret0, _ := ret[0].(<-chan *nats.StreamInfo) return ret0 } @@ -742,14 +742,14 @@ func (mr *MockJetStreamContextInterfaceMockRecorder) Streams(opts ...interface{} } // StreamsInfo mocks base method. -func (m *MockJetStreamContextInterface) StreamsInfo(opts ...nats_go.JSOpt) <-chan *nats_go.StreamInfo { +func (m *MockJetStreamContextInterface) StreamsInfo(opts ...nats.JSOpt) <-chan *nats.StreamInfo { m.ctrl.T.Helper() varargs := []interface{}{} for _, a := range opts { varargs = append(varargs, a) } ret := m.ctrl.Call(m, "StreamsInfo", varargs...) - ret0, _ := ret[0].(<-chan *nats_go.StreamInfo) + ret0, _ := ret[0].(<-chan *nats.StreamInfo) return ret0 } @@ -760,14 +760,14 @@ func (mr *MockJetStreamContextInterfaceMockRecorder) StreamsInfo(opts ...interfa } // Subscribe mocks base method. -func (m *MockJetStreamContextInterface) Subscribe(subj string, cb nats_go.MsgHandler, opts ...nats_go.SubOpt) (*nats_go.Subscription, error) { +func (m *MockJetStreamContextInterface) Subscribe(subj string, cb nats.MsgHandler, opts ...nats.SubOpt) (*nats.Subscription, error) { m.ctrl.T.Helper() varargs := []interface{}{subj, cb} for _, a := range opts { varargs = append(varargs, a) } ret := m.ctrl.Call(m, "Subscribe", varargs...) - ret0, _ := ret[0].(*nats_go.Subscription) + ret0, _ := ret[0].(*nats.Subscription) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -780,14 +780,14 @@ func (mr *MockJetStreamContextInterfaceMockRecorder) Subscribe(subj, cb interfac } // SubscribeSync mocks base method. -func (m *MockJetStreamContextInterface) SubscribeSync(subj string, opts ...nats_go.SubOpt) (*nats_go.Subscription, error) { +func (m *MockJetStreamContextInterface) SubscribeSync(subj string, opts ...nats.SubOpt) (*nats.Subscription, error) { m.ctrl.T.Helper() varargs := []interface{}{subj} for _, a := range opts { varargs = append(varargs, a) } ret := m.ctrl.Call(m, "SubscribeSync", varargs...) - ret0, _ := ret[0].(*nats_go.Subscription) + ret0, _ := ret[0].(*nats.Subscription) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -800,14 +800,14 @@ func (mr *MockJetStreamContextInterfaceMockRecorder) SubscribeSync(subj interfac } // UpdateConsumer mocks base method. -func (m *MockJetStreamContextInterface) UpdateConsumer(stream string, cfg *nats_go.ConsumerConfig, opts ...nats_go.JSOpt) (*nats_go.ConsumerInfo, error) { +func (m *MockJetStreamContextInterface) UpdateConsumer(stream string, cfg *nats.ConsumerConfig, opts ...nats.JSOpt) (*nats.ConsumerInfo, error) { m.ctrl.T.Helper() varargs := []interface{}{stream, cfg} for _, a := range opts { varargs = append(varargs, a) } ret := m.ctrl.Call(m, "UpdateConsumer", varargs...) - ret0, _ := ret[0].(*nats_go.ConsumerInfo) + ret0, _ := ret[0].(*nats.ConsumerInfo) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -820,14 +820,14 @@ func (mr *MockJetStreamContextInterfaceMockRecorder) UpdateConsumer(stream, cfg } // UpdateStream mocks base method. -func (m *MockJetStreamContextInterface) UpdateStream(cfg *nats_go.StreamConfig, opts ...nats_go.JSOpt) (*nats_go.StreamInfo, error) { +func (m *MockJetStreamContextInterface) UpdateStream(cfg *nats.StreamConfig, opts ...nats.JSOpt) (*nats.StreamInfo, error) { m.ctrl.T.Helper() varargs := []interface{}{cfg} for _, a := range opts { varargs = append(varargs, a) } ret := m.ctrl.Call(m, "UpdateStream", varargs...) - ret0, _ := ret[0].(*nats_go.StreamInfo) + ret0, _ := ret[0].(*nats.StreamInfo) ret1, _ := ret[1].(error) return ret0, ret1 } diff --git a/pkg/nats/sdk/client.go b/pkg/nats/sdk/client.go new file mode 100644 index 00000000..489af676 --- /dev/null +++ b/pkg/nats/sdk/client.go @@ -0,0 +1,73 @@ +package sdk + +import ( + "errors" + "fmt" + "log" + + "github.com/nats-io/nats.go" +) + +type NATSClientInterface interface { + CreateStream(streamName string) error + Publish(subject string, data []byte) error +} + +type NATSClient struct { + conn *nats.Conn + js nats.JetStreamContext + config natsConfig +} + +func NewNATSClient() (*NATSClient, error) { + config, err := loadNatsConfig() + if err != nil { + return nil, errors.New("Unable to load the nats configurations , error :" + err.Error()) + } + options := []nats.Option{} + if config.EnableToken { + options = append(options, nats.Token(config.NatsToken)) + } + if config.MtlsConfig.IsEnabled { + tlsConfig, err := createTLSConfig(config.MtlsConfig) + if err != nil { + return nil, err + } + options = append(options, nats.Secure(tlsConfig)) + } + conn, err := nats.Connect(config.NatsAddress, options...) + if err != nil { + return nil, err + } + + js, err := conn.JetStream() + if err != nil { + return nil, err + } + + return &NATSClient{conn: conn, js: js, config: *config}, nil +} + +func (natsCli *NATSClient) CreateStream(streamName string) error { + stream, err := natsCli.js.StreamInfo(streamName) + log.Printf("Retrieved stream %s", fmt.Sprintf("%v", stream)) + if err != nil { + log.Printf("Error getting stream %s", err) + } + if stream == nil { + log.Printf("creating stream %q and subjects %q", streamName, streamName+".*") + _, err = natsCli.js.AddStream(&nats.StreamConfig{ + Name: streamName, + Subjects: []string{streamName + ".*"}, + }) + if err != nil { + return err + } + } + return nil +} + +func (natsCli *NATSClient) Publish(subject string, data []byte) error { + _, err := natsCli.js.Publish(subject, data) + return err +} diff --git a/pkg/nats/sdk/config.go b/pkg/nats/sdk/config.go new file mode 100644 index 00000000..13f4083e --- /dev/null +++ b/pkg/nats/sdk/config.go @@ -0,0 +1,28 @@ +package sdk + +import ( + "github.com/kelseyhightower/envconfig" + "github.com/pkg/errors" +) + +type natsConfig struct { + NatsAddress string `envconfig:"NATS_ADDRESS"` + NatsToken string `envconfig:"NATS_TOKEN"` + MtlsConfig mtlsConfig + EnableToken bool `envconfig:"ENABLE_TOKEN"` +} + +type mtlsConfig struct { + CertificateFilePath string `envconfig:"CERT_FILE" default:""` + KeyFilePath string `envconfig:"KEY_FILE" default:""` + CAFilePath string `envconfig:"CA_FILE" default:""` + IsEnabled bool `envconfig:"ENABLE_MTLS_NATS" default:"false"` +} + +func loadNatsConfig() (*natsConfig, error) { + natsConf := &natsConfig{} + if err := envconfig.Process("", natsConf); err != nil { + return nil, errors.WithStack(err) + } + return natsConf, nil +} diff --git a/pkg/nats/sdk/mocks/nats_client_mock.go b/pkg/nats/sdk/mocks/nats_client_mock.go new file mode 100644 index 00000000..cb35a3a3 --- /dev/null +++ b/pkg/nats/sdk/mocks/nats_client_mock.go @@ -0,0 +1,62 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: client.go + +// Package mocks is a generated GoMock package. +package mocks + +import ( + reflect "reflect" + + gomock "github.com/golang/mock/gomock" +) + +// MockNATSClientInterface is a mock of NATSClientInterface interface. +type MockNATSClientInterface struct { + ctrl *gomock.Controller + recorder *MockNATSClientInterfaceMockRecorder +} + +// MockNATSClientInterfaceMockRecorder is the mock recorder for MockNATSClientInterface. +type MockNATSClientInterfaceMockRecorder struct { + mock *MockNATSClientInterface +} + +// NewMockNATSClientInterface creates a new mock instance. +func NewMockNATSClientInterface(ctrl *gomock.Controller) *MockNATSClientInterface { + mock := &MockNATSClientInterface{ctrl: ctrl} + mock.recorder = &MockNATSClientInterfaceMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockNATSClientInterface) EXPECT() *MockNATSClientInterfaceMockRecorder { + return m.recorder +} + +// CreateStream mocks base method. +func (m *MockNATSClientInterface) CreateStream(streamName string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateStream", streamName) + ret0, _ := ret[0].(error) + return ret0 +} + +// CreateStream indicates an expected call of CreateStream. +func (mr *MockNATSClientInterfaceMockRecorder) CreateStream(streamName interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateStream", reflect.TypeOf((*MockNATSClientInterface)(nil).CreateStream), streamName) +} + +// Publish mocks base method. +func (m *MockNATSClientInterface) Publish(subject string, data []byte) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Publish", subject, data) + ret0, _ := ret[0].(error) + return ret0 +} + +// Publish indicates an expected call of Publish. +func (mr *MockNATSClientInterfaceMockRecorder) Publish(subject, data interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Publish", reflect.TypeOf((*MockNATSClientInterface)(nil).Publish), subject, data) +} diff --git a/pkg/nats/sdk/utils.go b/pkg/nats/sdk/utils.go new file mode 100644 index 00000000..0c0594e8 --- /dev/null +++ b/pkg/nats/sdk/utils.go @@ -0,0 +1,77 @@ +package sdk + +import ( + "crypto/tls" + "crypto/x509" + "errors" + "fmt" + "io" + "os" +) + +func createTLSConfig(config mtlsConfig) (*tls.Config, error) { + certPEM, keyPEM, CACertPEM, err := readMtlsCerts(config.CertificateFilePath, config.KeyFilePath, config.CAFilePath) + if err != nil { + return nil, errors.New("unable to read the mtls certificates error:" + err.Error()) + } + cert, err := tls.X509KeyPair(certPEM, keyPEM) + if err != nil { + return nil, fmt.Errorf("error loading X509 key pair from PEM: %w", err) + } + + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(CACertPEM) + tlsConfig := &tls.Config{ + Certificates: []tls.Certificate{cert}, + RootCAs: caCertPool, + InsecureSkipVerify: false, + } + return tlsConfig, nil +} + +func readMtlsCerts(certificateFilePath, keyFilePath, CAFilePath string) (certPEM, keyPEM, CACertPEM []byte, err error) { + certPEM, err = readMtlsFileContents(certificateFilePath) + if err != nil { + err = fmt.Errorf("error while reading cert file: %w", err) + return + } + + keyPEM, err = readMtlsFileContents(keyFilePath) + if err != nil { + err = fmt.Errorf("error while reading key file: %w", err) + return + } + + CACertPEM, err = readMtlsFileContents(CAFilePath) + if err != nil { + err = fmt.Errorf("error while reading CAcert file: %w", err) + return + } + + return + +} + +func openMtlsCertFile(filepath string) (f *os.File, err error) { + f, err = os.Open(filepath) + if err != nil { + return nil, fmt.Errorf("failed to open mtls certificate file: %w", err) + } + return f, nil +} + +func readMtlsFileContents(filePath string) ([]byte, error) { + file, err := openMtlsCertFile(filePath) + if err != nil { + return nil, err + } + + defer file.Close() + + contents, err := io.ReadAll(file) + if err != nil { + return nil, fmt.Errorf("error while reading file %s:%w", filePath, err) + } + + return contents, nil +} diff --git a/sdk/example/main.go b/sdk/example/main.go deleted file mode 100644 index d8c9e6dc..00000000 --- a/sdk/example/main.go +++ /dev/null @@ -1,60 +0,0 @@ -package main - -import ( - "fmt" - "log" - "time" - - "github.com/intelops/kubviz/sdk/pkg/clickhouse" - "github.com/intelops/kubviz/sdk/pkg/nats" - "github.com/intelops/kubviz/sdk/pkg/sdk" -) - -func main() { - natsConfig, err := nats.LoadConfig() - if err != nil { - log.Fatalf("Failed to load NATS config: %v", err) - } - - chConfig, err := clickhouse.LoadConfig() - if err != nil { - log.Fatalf("Failed to load ClickHouse config: %v", err) - } - - mySDK, err := sdk.New(natsConfig, chConfig) - if err != nil { - log.Fatalf("Failed to initialize SDK: %v", err) - } - streamName := "Simple" - streamSubjects := "Simple.*" - err = mySDK.CreateNatsStream(streamName, []string{streamSubjects}) - if err != nil { - fmt.Println("Error creating NATS Stream:", err) - return - } - - time.Sleep(2 * time.Second) - - data := map[string]interface{}{ - "key": "value", - "count": 42, - } - subject := "Simple.event" - err = mySDK.PublishToNats(subject, streamName, data) - if err != nil { - fmt.Println("Error publishing message to NATS:", err) - return - } - time.Sleep(2 * time.Second) - consumerName := "myConsumer" - err = mySDK.ConsumeNatsData(subject, consumerName) - if err != nil { - fmt.Println("Error creating NATS consumer:", err) - return - } - err = mySDK.ClickHouseInsertData("mytable", data) - if err != nil { - fmt.Println("Error while inserting data into nats:", err) - return - } -} diff --git a/sdk/pkg/clickhouse/client.go b/sdk/pkg/clickhouse/client.go deleted file mode 100644 index c4edb29c..00000000 --- a/sdk/pkg/clickhouse/client.go +++ /dev/null @@ -1,28 +0,0 @@ -// /pkg/clickhouse/client.go -package clickhouse - -import ( - "database/sql" - "fmt" - - _ "github.com/ClickHouse/clickhouse-go/v2" -) - -type Client struct { - db *sql.DB -} - -func NewClient(cfg *Config) (*Client, error) { - dataSourceName := fmt.Sprintf("tcp://%s:%d", cfg.DBAddress, cfg.DBPort) - - db, err := sql.Open("clickhouse", dataSourceName) - if err != nil { - return nil, err - } - - if err := db.Ping(); err != nil { - return nil, err - } - - return &Client{db: db}, nil -} diff --git a/sdk/pkg/clickhouse/config.go b/sdk/pkg/clickhouse/config.go deleted file mode 100644 index a56baa91..00000000 --- a/sdk/pkg/clickhouse/config.go +++ /dev/null @@ -1,21 +0,0 @@ -package clickhouse - -import ( - "github.com/kelseyhightower/envconfig" -) - -type Config struct { - DBAddress string `envconfig:"DB_ADDRESS" default:"localhost"` - DBPort int `envconfig:"DB_PORT" default:"9000"` - Username string `envconfig:"CLICKHOUSE_USERNAME"` - Password string `envconfig:"CLICKHOUSE_PASSWORD"` -} - -func LoadConfig() (*Config, error) { - var cfg Config - err := envconfig.Process("", &cfg) - if err != nil { - return nil, err - } - return &cfg, nil -} diff --git a/sdk/pkg/clickhouse/utils.go b/sdk/pkg/clickhouse/utils.go deleted file mode 100644 index 2525d5e4..00000000 --- a/sdk/pkg/clickhouse/utils.go +++ /dev/null @@ -1,72 +0,0 @@ -package clickhouse - -import ( - "context" - "errors" - "strings" - "time" -) - -func (c *Client) InsertData(tableName string, data interface{}) error { - ctx := context.Background() - - tx, err := c.db.Begin() - if err != nil { - return err - } - defer tx.Rollback() - - dataMap, ok := data.(map[string]interface{}) - if !ok { - return errors.New("data is not in the expected format") - } - - columns := make([]string, 0, len(dataMap)) - values := make([]interface{}, 0, len(dataMap)) - placeholders := make([]string, 0, len(dataMap)) - - for column, value := range dataMap { - columns = append(columns, column) - values = append(values, value) - placeholders = append(placeholders, "?") - } - - stmt, err := tx.PrepareContext(ctx, "INSERT INTO "+tableName+" ("+strings.Join(columns, ",")+") VALUES ("+strings.Join(placeholders, ",")+")") - if err != nil { - return err - } - defer stmt.Close() - - values = append(values, time.Now().UTC()) - - _, err = stmt.ExecContext(ctx, values...) - if err != nil { - return err - } - - return tx.Commit() -} - -func (c *Client) List(input interface{}) ([]map[string]interface{}, error) { - var dataList []map[string]interface{} - - inputMap, ok := input.(map[string]interface{}) - if !ok { - return nil, errors.New("input is not a map[string]interface{}") - } - - var traverse func(m map[string]interface{}) - traverse = func(m map[string]interface{}) { - dataList = append(dataList, m) - - for _, v := range m { - if subMap, ok := v.(map[string]interface{}); ok { - traverse(subMap) - } - } - } - - traverse(inputMap) - - return dataList, nil -} diff --git a/sdk/pkg/nats/client.go b/sdk/pkg/nats/client.go deleted file mode 100644 index b5c26a37..00000000 --- a/sdk/pkg/nats/client.go +++ /dev/null @@ -1,36 +0,0 @@ -// /pkg/nats/client.go -package nats - -import ( - "fmt" - "log" - "os" - - "github.com/nats-io/nats.go" -) - -type Client struct { - js nats.JetStreamContext - logger *log.Logger -} - -func NewClient(cfg *Config) (*Client, error) { - logger := log.New(os.Stdout, "", log.LstdFlags|log.Lshortfile) - - opts := []nats.Option{nats.Token(cfg.Token)} - - conn, err := nats.Connect(cfg.Address, opts...) - if err != nil { - return nil, fmt.Errorf("error connecting to NATS: %v", err) - } - - js, err := conn.JetStream() - if err != nil { - return nil, fmt.Errorf("error obtaining JetStream context: %v", err) - } - - return &Client{ - js: js, - logger: logger, - }, nil -} diff --git a/sdk/pkg/nats/config.go b/sdk/pkg/nats/config.go deleted file mode 100644 index 0fddfa96..00000000 --- a/sdk/pkg/nats/config.go +++ /dev/null @@ -1,19 +0,0 @@ -package nats - -import ( - "github.com/kelseyhightower/envconfig" -) - -type Config struct { - Address string `envconfig:"NATS_ADDRESS" default:"nats://localhost:4222"` - Token string `envconfig:"NATS_TOKEN"` -} - -func LoadConfig() (*Config, error) { - var cfg Config - err := envconfig.Process("", &cfg) - if err != nil { - return nil, err - } - return &cfg, nil -} diff --git a/sdk/pkg/nats/utils.go b/sdk/pkg/nats/utils.go deleted file mode 100644 index 36e144fb..00000000 --- a/sdk/pkg/nats/utils.go +++ /dev/null @@ -1,85 +0,0 @@ -package nats - -import ( - "encoding/json" - "fmt" - "log" - - "github.com/nats-io/nats.go" - "github.com/pkg/errors" -) - -func (client *Client) CreateStream(streamName string, streamSubjects []string) error { - js := client.js - - stream, err := js.StreamInfo(streamName) - if err != nil { - if err == nats.ErrStreamNotFound { - client.logger.Printf("Stream does not exist, creating: %s", streamName) - } else { - client.logger.Printf("Error getting stream: %s", err) - return err - } - } - - if stream != nil { - client.logger.Printf("Stream already exists: %s", fmt.Sprintf("%v", stream)) - return nil - } - client.logger.Printf("Creating stream %q with subjects %q", streamName, streamSubjects) - streamInfo, err := js.AddStream(&nats.StreamConfig{ - Name: streamName, - Subjects: streamSubjects, - }) - - if err != nil { - return errors.WithMessage(err, "Error creating stream") - } - fmt.Println(streamInfo) - return nil -} - -func (client *Client) Consumer(subject, consumerName string) (interface{}, error) { - js := client.js - var data interface{} - handler := func(msg *nats.Msg) { - msg.Ack() - err := json.Unmarshal(msg.Data, &data) - if err != nil { - log.Println("Error unmarshalling message data:", err) - return - } - log.Printf("Data Received: %#v,", data) - } - _, err := js.Subscribe(subject, handler, nats.Durable(consumerName), nats.ManualAck()) - if err != nil { - return nil, fmt.Errorf("error subscribing to stream %s: %w", subject, err) - } - return data, nil -} - -func (client *Client) Publish(subject string, streamName string, data interface{}) error { - js := client.js - - resultdata, err := json.Marshal(data) - if err != nil { - return errors.WithMessage(err, "Error marshaling data to JSON") - } - stream, err := js.StreamInfo(streamName) - if err != nil { - if err == nats.ErrStreamNotFound { - client.logger.Printf("Stream does not exist %s", subject) - } else { - client.logger.Printf("Error getting stream: %s", err) - return err - } - } - if stream == nil { - return errors.New("Stream does not exist") - } - _, err = js.Publish(subject, resultdata) - if err != nil { - return errors.WithMessage(err, "Error publishing message") - } - return nil -} diff --git a/sdk/pkg/sdk/clickhouse_insert.go b/sdk/pkg/sdk/clickhouse_insert.go deleted file mode 100644 index 7fb4a65d..00000000 --- a/sdk/pkg/sdk/clickhouse_insert.go +++ /dev/null @@ -1,10 +0,0 @@ -package sdk - -func (sdk *SDK) ClickHouseInsertData(tableName string, data interface{}) error { - err := sdk.clickhouseClient.InsertData(tableName, data) - if err != nil { - return err - } - sdk.logger.Printf("insert into table successfully %v", data) - return nil -} diff --git a/sdk/pkg/sdk/listdata.go b/sdk/pkg/sdk/listdata.go deleted file mode 100644 index 59580a55..00000000 --- a/sdk/pkg/sdk/listdata.go +++ /dev/null @@ -1,10 +0,0 @@ -package sdk - -func (sdk *SDK) ListtData(data interface{}) error { - data, err := sdk.clickhouseClient.List(data) - if err != nil { - return err - } - sdk.logger.Printf("insert into table successfully %v", data) - return nil -} diff --git a/sdk/pkg/sdk/nats_consumer.go b/sdk/pkg/sdk/nats_consumer.go deleted file mode 100644 index 88cffa4e..00000000 --- a/sdk/pkg/sdk/nats_consumer.go +++ /dev/null @@ -1,10 +0,0 @@ -package sdk - -func (sdk *SDK) ConsumeNatsData(subject, consumerName string) error { - data, err := sdk.natsClient.Consumer(subject, consumerName) - if err != nil { - return err - } - sdk.logger.Printf("Consumed successfully from stream %v", data) - return nil -} diff --git a/sdk/pkg/sdk/nats_publisher.go b/sdk/pkg/sdk/nats_publisher.go deleted file mode 100644 index 43f9476a..00000000 --- a/sdk/pkg/sdk/nats_publisher.go +++ /dev/null @@ -1,9 +0,0 @@ -package sdk - -func (sdk *SDK) PublishToNats(subject string, streamName string, data interface{}) error { - if err := sdk.natsClient.Publish(subject, streamName, data); err != nil { - return err - } - sdk.logger.Printf("Message published successfully to stream %v", streamName) - return nil -} diff --git a/sdk/pkg/sdk/nats_stream.go b/sdk/pkg/sdk/nats_stream.go deleted file mode 100644 index 006b3932..00000000 --- a/sdk/pkg/sdk/nats_stream.go +++ /dev/null @@ -1,9 +0,0 @@ -package sdk - -func (sdk *SDK) CreateNatsStream(streamName string, streamSubjects []string) error { - if err := sdk.natsClient.CreateStream(streamName, streamSubjects); err != nil { - return err - } - sdk.logger.Printf("Stream created successfully for streamName %v, streamSubjects %v", streamName, streamSubjects) - return nil -} diff --git a/sdk/pkg/sdk/sdk.go b/sdk/pkg/sdk/sdk.go deleted file mode 100644 index ade6a6c8..00000000 --- a/sdk/pkg/sdk/sdk.go +++ /dev/null @@ -1,38 +0,0 @@ -package sdk - -import ( - "log" - "os" - - "github.com/intelops/kubviz/sdk/pkg/clickhouse" - "github.com/intelops/kubviz/sdk/pkg/nats" -) - -type SDK struct { - natsClient *nats.Client - clickhouseClient *clickhouse.Client - logger *log.Logger -} - -func New(natsCfg *nats.Config, chCfg *clickhouse.Config) (*SDK, error) { - logger := log.New(os.Stdout, "", log.LstdFlags|log.Lshortfile) - natsClient, err := nats.NewClient(natsCfg) - if err != nil { - return nil, err - } - - chClient, err := clickhouse.NewClient(chCfg) - if err != nil { - return nil, err - } - - return &SDK{ - natsClient: natsClient, - clickhouseClient: chClient, - logger: logger, - }, nil -} - -func (sdk *SDK) Start() error { - return nil -}