diff --git a/sdk/example/main.go b/sdk/example/main.go new file mode 100644 index 00000000..d8c9e6dc --- /dev/null +++ b/sdk/example/main.go @@ -0,0 +1,60 @@ +package main + +import ( + "fmt" + "log" + "time" + + "github.com/intelops/kubviz/sdk/pkg/clickhouse" + "github.com/intelops/kubviz/sdk/pkg/nats" + "github.com/intelops/kubviz/sdk/pkg/sdk" +) + +func main() { + natsConfig, err := nats.LoadConfig() + if err != nil { + log.Fatalf("Failed to load NATS config: %v", err) + } + + chConfig, err := clickhouse.LoadConfig() + if err != nil { + log.Fatalf("Failed to load ClickHouse config: %v", err) + } + + mySDK, err := sdk.New(natsConfig, chConfig) + if err != nil { + log.Fatalf("Failed to initialize SDK: %v", err) + } + streamName := "Simple" + streamSubjects := "Simple.*" + err = mySDK.CreateNatsStream(streamName, []string{streamSubjects}) + if err != nil { + fmt.Println("Error creating NATS Stream:", err) + return + } + + time.Sleep(2 * time.Second) + + data := map[string]interface{}{ + "key": "value", + "count": 42, + } + subject := "Simple.event" + err = mySDK.PublishToNats(subject, streamName, data) + if err != nil { + fmt.Println("Error publishing message to NATS:", err) + return + } + time.Sleep(2 * time.Second) + consumerName := "myConsumer" + err = mySDK.ConsumeNatsData(subject, consumerName) + if err != nil { + fmt.Println("Error creating NATS consumer:", err) + return + } + err = mySDK.ClickHouseInsertData("mytable", data) + if err != nil { + fmt.Println("Error while inserting data into nats:", err) + return + } +} diff --git a/sdk/pkg/clickhouse/client.go b/sdk/pkg/clickhouse/client.go new file mode 100644 index 00000000..c4edb29c --- /dev/null +++ b/sdk/pkg/clickhouse/client.go @@ -0,0 +1,28 @@ +// /pkg/clickhouse/client.go +package clickhouse + +import ( + "database/sql" + "fmt" + + _ "github.com/ClickHouse/clickhouse-go/v2" +) + +type Client struct { + db *sql.DB +} + +func NewClient(cfg *Config) (*Client, error) { + dataSourceName := fmt.Sprintf("tcp://%s:%d", cfg.DBAddress, cfg.DBPort) + + db, err := sql.Open("clickhouse", dataSourceName) + if err != nil { + return nil, err + } + + if err := db.Ping(); err != nil { + return nil, err + } + + return &Client{db: db}, nil +} diff --git a/sdk/pkg/clickhouse/config.go b/sdk/pkg/clickhouse/config.go new file mode 100644 index 00000000..a56baa91 --- /dev/null +++ b/sdk/pkg/clickhouse/config.go @@ -0,0 +1,21 @@ +package clickhouse + +import ( + "github.com/kelseyhightower/envconfig" +) + +type Config struct { + DBAddress string `envconfig:"DB_ADDRESS" default:"localhost"` + DBPort int `envconfig:"DB_PORT" default:"9000"` + Username string `envconfig:"CLICKHOUSE_USERNAME"` + Password string `envconfig:"CLICKHOUSE_PASSWORD"` +} + +func LoadConfig() (*Config, error) { + var cfg Config + err := envconfig.Process("", &cfg) + if err != nil { + return nil, err + } + return &cfg, nil +} diff --git a/sdk/pkg/clickhouse/utils.go b/sdk/pkg/clickhouse/utils.go new file mode 100644 index 00000000..2525d5e4 --- /dev/null +++ b/sdk/pkg/clickhouse/utils.go @@ -0,0 +1,72 @@ +package clickhouse + +import ( + "context" + "errors" + "strings" + "time" +) + +func (c *Client) InsertData(tableName string, data interface{}) error { + ctx := context.Background() + + tx, err := c.db.Begin() + if err != nil { + return err + } + defer tx.Rollback() + + dataMap, ok := data.(map[string]interface{}) + if !ok { + return errors.New("data is not in the expected format") + } + + columns := make([]string, 0, len(dataMap)) + values := make([]interface{}, 0, len(dataMap)) + placeholders := make([]string, 0, len(dataMap)) + + for column, value := range dataMap { + columns = append(columns, column) + values = append(values, value) + placeholders = append(placeholders, "?") + } + + stmt, err := tx.PrepareContext(ctx, "INSERT INTO "+tableName+" ("+strings.Join(columns, ",")+") VALUES ("+strings.Join(placeholders, ",")+")") + if err != nil { + return err + } + defer stmt.Close() + + values = append(values, time.Now().UTC()) + + _, err = stmt.ExecContext(ctx, values...) + if err != nil { + return err + } + + return tx.Commit() +} + +func (c *Client) List(input interface{}) ([]map[string]interface{}, error) { + var dataList []map[string]interface{} + + inputMap, ok := input.(map[string]interface{}) + if !ok { + return nil, errors.New("input is not a map[string]interface{}") + } + + var traverse func(m map[string]interface{}) + traverse = func(m map[string]interface{}) { + dataList = append(dataList, m) + + for _, v := range m { + if subMap, ok := v.(map[string]interface{}); ok { + traverse(subMap) + } + } + } + + traverse(inputMap) + + return dataList, nil +} diff --git a/sdk/pkg/nats/client.go b/sdk/pkg/nats/client.go new file mode 100644 index 00000000..b5c26a37 --- /dev/null +++ b/sdk/pkg/nats/client.go @@ -0,0 +1,36 @@ +// /pkg/nats/client.go +package nats + +import ( + "fmt" + "log" + "os" + + "github.com/nats-io/nats.go" +) + +type Client struct { + js nats.JetStreamContext + logger *log.Logger +} + +func NewClient(cfg *Config) (*Client, error) { + logger := log.New(os.Stdout, "", log.LstdFlags|log.Lshortfile) + + opts := []nats.Option{nats.Token(cfg.Token)} + + conn, err := nats.Connect(cfg.Address, opts...) + if err != nil { + return nil, fmt.Errorf("error connecting to NATS: %v", err) + } + + js, err := conn.JetStream() + if err != nil { + return nil, fmt.Errorf("error obtaining JetStream context: %v", err) + } + + return &Client{ + js: js, + logger: logger, + }, nil +} diff --git a/sdk/pkg/nats/config.go b/sdk/pkg/nats/config.go new file mode 100644 index 00000000..0fddfa96 --- /dev/null +++ b/sdk/pkg/nats/config.go @@ -0,0 +1,19 @@ +package nats + +import ( + "github.com/kelseyhightower/envconfig" +) + +type Config struct { + Address string `envconfig:"NATS_ADDRESS" default:"nats://localhost:4222"` + Token string `envconfig:"NATS_TOKEN"` +} + +func LoadConfig() (*Config, error) { + var cfg Config + err := envconfig.Process("", &cfg) + if err != nil { + return nil, err + } + return &cfg, nil +} diff --git a/sdk/pkg/nats/utils.go b/sdk/pkg/nats/utils.go new file mode 100644 index 00000000..36e144fb --- /dev/null +++ b/sdk/pkg/nats/utils.go @@ -0,0 +1,85 @@ +package nats + +import ( + "encoding/json" + "fmt" + "log" + + "github.com/nats-io/nats.go" + "github.com/pkg/errors" +) + +func (client *Client) CreateStream(streamName string, streamSubjects []string) error { + js := client.js + + stream, err := js.StreamInfo(streamName) + if err != nil { + if err == nats.ErrStreamNotFound { + client.logger.Printf("Stream does not exist, creating: %s", streamName) + } else { + client.logger.Printf("Error getting stream: %s", err) + return err + } + } + + if stream != nil { + client.logger.Printf("Stream already exists: %s", fmt.Sprintf("%v", stream)) + return nil + } + client.logger.Printf("Creating stream %q with subjects %q", streamName, streamSubjects) + streamInfo, err := js.AddStream(&nats.StreamConfig{ + Name: streamName, + Subjects: streamSubjects, + }) + + if err != nil { + return errors.WithMessage(err, "Error creating stream") + } + fmt.Println(streamInfo) + return nil +} + +func (client *Client) Consumer(subject, consumerName string) (interface{}, error) { + js := client.js + var data interface{} + handler := func(msg *nats.Msg) { + msg.Ack() + err := json.Unmarshal(msg.Data, &data) + if err != nil { + log.Println("Error unmarshalling message data:", err) + return + } + log.Printf("Data Received: %#v,", data) + } + _, err := js.Subscribe(subject, handler, nats.Durable(consumerName), nats.ManualAck()) + if err != nil { + return nil, fmt.Errorf("error subscribing to stream %s: %w", subject, err) + } + return data, nil +} + +func (client *Client) Publish(subject string, streamName string, data interface{}) error { + js := client.js + + resultdata, err := json.Marshal(data) + if err != nil { + return errors.WithMessage(err, "Error marshaling data to JSON") + } + stream, err := js.StreamInfo(streamName) + if err != nil { + if err == nats.ErrStreamNotFound { + client.logger.Printf("Stream does not exist %s", subject) + } else { + client.logger.Printf("Error getting stream: %s", err) + return err + } + } + if stream == nil { + return errors.New("Stream does not exist") + } + _, err = js.Publish(subject, resultdata) + if err != nil { + return errors.WithMessage(err, "Error publishing message") + } + return nil +} diff --git a/sdk/pkg/sdk/clickhouse_insert.go b/sdk/pkg/sdk/clickhouse_insert.go new file mode 100644 index 00000000..7fb4a65d --- /dev/null +++ b/sdk/pkg/sdk/clickhouse_insert.go @@ -0,0 +1,10 @@ +package sdk + +func (sdk *SDK) ClickHouseInsertData(tableName string, data interface{}) error { + err := sdk.clickhouseClient.InsertData(tableName, data) + if err != nil { + return err + } + sdk.logger.Printf("insert into table successfully %v", data) + return nil +} diff --git a/sdk/pkg/sdk/listdata.go b/sdk/pkg/sdk/listdata.go new file mode 100644 index 00000000..59580a55 --- /dev/null +++ b/sdk/pkg/sdk/listdata.go @@ -0,0 +1,10 @@ +package sdk + +func (sdk *SDK) ListtData(data interface{}) error { + data, err := sdk.clickhouseClient.List(data) + if err != nil { + return err + } + sdk.logger.Printf("insert into table successfully %v", data) + return nil +} diff --git a/sdk/pkg/sdk/nats_consumer.go b/sdk/pkg/sdk/nats_consumer.go new file mode 100644 index 00000000..88cffa4e --- /dev/null +++ b/sdk/pkg/sdk/nats_consumer.go @@ -0,0 +1,10 @@ +package sdk + +func (sdk *SDK) ConsumeNatsData(subject, consumerName string) error { + data, err := sdk.natsClient.Consumer(subject, consumerName) + if err != nil { + return err + } + sdk.logger.Printf("Consumed successfully from stream %v", data) + return nil +} diff --git a/sdk/pkg/sdk/nats_publisher.go b/sdk/pkg/sdk/nats_publisher.go new file mode 100644 index 00000000..43f9476a --- /dev/null +++ b/sdk/pkg/sdk/nats_publisher.go @@ -0,0 +1,9 @@ +package sdk + +func (sdk *SDK) PublishToNats(subject string, streamName string, data interface{}) error { + if err := sdk.natsClient.Publish(subject, streamName, data); err != nil { + return err + } + sdk.logger.Printf("Message published successfully to stream %v", streamName) + return nil +} diff --git a/sdk/pkg/sdk/nats_stream.go b/sdk/pkg/sdk/nats_stream.go new file mode 100644 index 00000000..006b3932 --- /dev/null +++ b/sdk/pkg/sdk/nats_stream.go @@ -0,0 +1,9 @@ +package sdk + +func (sdk *SDK) CreateNatsStream(streamName string, streamSubjects []string) error { + if err := sdk.natsClient.CreateStream(streamName, streamSubjects); err != nil { + return err + } + sdk.logger.Printf("Stream created successfully for streamName %v, streamSubjects %v", streamName, streamSubjects) + return nil +} diff --git a/sdk/pkg/sdk/sdk.go b/sdk/pkg/sdk/sdk.go new file mode 100644 index 00000000..ade6a6c8 --- /dev/null +++ b/sdk/pkg/sdk/sdk.go @@ -0,0 +1,38 @@ +package sdk + +import ( + "log" + "os" + + "github.com/intelops/kubviz/sdk/pkg/clickhouse" + "github.com/intelops/kubviz/sdk/pkg/nats" +) + +type SDK struct { + natsClient *nats.Client + clickhouseClient *clickhouse.Client + logger *log.Logger +} + +func New(natsCfg *nats.Config, chCfg *clickhouse.Config) (*SDK, error) { + logger := log.New(os.Stdout, "", log.LstdFlags|log.Lshortfile) + natsClient, err := nats.NewClient(natsCfg) + if err != nil { + return nil, err + } + + chClient, err := clickhouse.NewClient(chCfg) + if err != nil { + return nil, err + } + + return &SDK{ + natsClient: natsClient, + clickhouseClient: chClient, + logger: logger, + }, nil +} + +func (sdk *SDK) Start() error { + return nil +}