Skip to content

Commit

Permalink
Merge pull request #369 from intelops/sdk
Browse files Browse the repository at this point in the history
sdk
  • Loading branch information
vijeyash1 authored May 14, 2024
2 parents 5cb2363 + 89337da commit 1f965d9
Show file tree
Hide file tree
Showing 13 changed files with 407 additions and 0 deletions.
60 changes: 60 additions & 0 deletions sdk/example/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package main

import (
"fmt"
"log"
"time"

"github.com/intelops/kubviz/sdk/pkg/clickhouse"
"github.com/intelops/kubviz/sdk/pkg/nats"
"github.com/intelops/kubviz/sdk/pkg/sdk"
)

func main() {
natsConfig, err := nats.LoadConfig()
if err != nil {
log.Fatalf("Failed to load NATS config: %v", err)
}

chConfig, err := clickhouse.LoadConfig()
if err != nil {
log.Fatalf("Failed to load ClickHouse config: %v", err)
}

mySDK, err := sdk.New(natsConfig, chConfig)
if err != nil {
log.Fatalf("Failed to initialize SDK: %v", err)
}
streamName := "Simple"
streamSubjects := "Simple.*"
err = mySDK.CreateNatsStream(streamName, []string{streamSubjects})
if err != nil {
fmt.Println("Error creating NATS Stream:", err)
return
}

time.Sleep(2 * time.Second)

data := map[string]interface{}{
"key": "value",
"count": 42,
}
subject := "Simple.event"
err = mySDK.PublishToNats(subject, streamName, data)
if err != nil {
fmt.Println("Error publishing message to NATS:", err)
return
}
time.Sleep(2 * time.Second)
consumerName := "myConsumer"
err = mySDK.ConsumeNatsData(subject, consumerName)
if err != nil {
fmt.Println("Error creating NATS consumer:", err)
return
}
err = mySDK.ClickHouseInsertData("mytable", data)
if err != nil {
fmt.Println("Error while inserting data into nats:", err)
return
}
}
28 changes: 28 additions & 0 deletions sdk/pkg/clickhouse/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// /pkg/clickhouse/client.go
package clickhouse

import (
"database/sql"
"fmt"

_ "github.com/ClickHouse/clickhouse-go/v2"
)

type Client struct {
db *sql.DB
}

func NewClient(cfg *Config) (*Client, error) {
dataSourceName := fmt.Sprintf("tcp://%s:%d", cfg.DBAddress, cfg.DBPort)

db, err := sql.Open("clickhouse", dataSourceName)
if err != nil {
return nil, err
}

if err := db.Ping(); err != nil {
return nil, err
}

return &Client{db: db}, nil
}
21 changes: 21 additions & 0 deletions sdk/pkg/clickhouse/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package clickhouse

import (
"github.com/kelseyhightower/envconfig"
)

type Config struct {
DBAddress string `envconfig:"DB_ADDRESS" default:"localhost"`
DBPort int `envconfig:"DB_PORT" default:"9000"`
Username string `envconfig:"CLICKHOUSE_USERNAME"`
Password string `envconfig:"CLICKHOUSE_PASSWORD"`
}

func LoadConfig() (*Config, error) {
var cfg Config
err := envconfig.Process("", &cfg)
if err != nil {
return nil, err
}
return &cfg, nil
}
72 changes: 72 additions & 0 deletions sdk/pkg/clickhouse/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package clickhouse

import (
"context"
"errors"
"strings"
"time"
)

func (c *Client) InsertData(tableName string, data interface{}) error {
ctx := context.Background()

tx, err := c.db.Begin()
if err != nil {
return err
}
defer tx.Rollback()

dataMap, ok := data.(map[string]interface{})
if !ok {
return errors.New("data is not in the expected format")
}

columns := make([]string, 0, len(dataMap))
values := make([]interface{}, 0, len(dataMap))
placeholders := make([]string, 0, len(dataMap))

for column, value := range dataMap {
columns = append(columns, column)
values = append(values, value)
placeholders = append(placeholders, "?")
}

stmt, err := tx.PrepareContext(ctx, "INSERT INTO "+tableName+" ("+strings.Join(columns, ",")+") VALUES ("+strings.Join(placeholders, ",")+")")
if err != nil {
return err
}
defer stmt.Close()

values = append(values, time.Now().UTC())

_, err = stmt.ExecContext(ctx, values...)
if err != nil {
return err
}

return tx.Commit()
}

func (c *Client) List(input interface{}) ([]map[string]interface{}, error) {
var dataList []map[string]interface{}

inputMap, ok := input.(map[string]interface{})
if !ok {
return nil, errors.New("input is not a map[string]interface{}")
}

var traverse func(m map[string]interface{})
traverse = func(m map[string]interface{}) {
dataList = append(dataList, m)

for _, v := range m {
if subMap, ok := v.(map[string]interface{}); ok {
traverse(subMap)
}
}
}

traverse(inputMap)

return dataList, nil
}
36 changes: 36 additions & 0 deletions sdk/pkg/nats/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// /pkg/nats/client.go
package nats

import (
"fmt"
"log"
"os"

"github.com/nats-io/nats.go"
)

type Client struct {
js nats.JetStreamContext
logger *log.Logger
}

func NewClient(cfg *Config) (*Client, error) {
logger := log.New(os.Stdout, "", log.LstdFlags|log.Lshortfile)

opts := []nats.Option{nats.Token(cfg.Token)}

conn, err := nats.Connect(cfg.Address, opts...)
if err != nil {
return nil, fmt.Errorf("error connecting to NATS: %v", err)
}

js, err := conn.JetStream()
if err != nil {
return nil, fmt.Errorf("error obtaining JetStream context: %v", err)
}

return &Client{
js: js,
logger: logger,
}, nil
}
19 changes: 19 additions & 0 deletions sdk/pkg/nats/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package nats

import (
"github.com/kelseyhightower/envconfig"
)

type Config struct {
Address string `envconfig:"NATS_ADDRESS" default:"nats://localhost:4222"`
Token string `envconfig:"NATS_TOKEN"`
}

func LoadConfig() (*Config, error) {
var cfg Config
err := envconfig.Process("", &cfg)
if err != nil {
return nil, err
}
return &cfg, nil
}
85 changes: 85 additions & 0 deletions sdk/pkg/nats/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package nats

import (
"encoding/json"
"fmt"
"log"

"github.com/nats-io/nats.go"
"github.com/pkg/errors"
)

func (client *Client) CreateStream(streamName string, streamSubjects []string) error {
js := client.js

stream, err := js.StreamInfo(streamName)
if err != nil {
if err == nats.ErrStreamNotFound {
client.logger.Printf("Stream does not exist, creating: %s", streamName)
} else {
client.logger.Printf("Error getting stream: %s", err)
return err
}
}

if stream != nil {
client.logger.Printf("Stream already exists: %s", fmt.Sprintf("%v", stream))
return nil
}
client.logger.Printf("Creating stream %q with subjects %q", streamName, streamSubjects)
streamInfo, err := js.AddStream(&nats.StreamConfig{
Name: streamName,
Subjects: streamSubjects,
})

if err != nil {
return errors.WithMessage(err, "Error creating stream")
}
fmt.Println(streamInfo)
return nil
}

func (client *Client) Consumer(subject, consumerName string) (interface{}, error) {
js := client.js
var data interface{}
handler := func(msg *nats.Msg) {
msg.Ack()
err := json.Unmarshal(msg.Data, &data)
if err != nil {
log.Println("Error unmarshalling message data:", err)
return
}
log.Printf("Data Received: %#v,", data)
}
_, err := js.Subscribe(subject, handler, nats.Durable(consumerName), nats.ManualAck())
if err != nil {
return nil, fmt.Errorf("error subscribing to stream %s: %w", subject, err)
}
return data, nil
}

func (client *Client) Publish(subject string, streamName string, data interface{}) error {
js := client.js

resultdata, err := json.Marshal(data)
if err != nil {
return errors.WithMessage(err, "Error marshaling data to JSON")
}
stream, err := js.StreamInfo(streamName)
if err != nil {
if err == nats.ErrStreamNotFound {
client.logger.Printf("Stream does not exist %s", subject)
} else {
client.logger.Printf("Error getting stream: %s", err)
return err
}
}
if stream == nil {
return errors.New("Stream does not exist")
}
_, err = js.Publish(subject, resultdata)
if err != nil {
return errors.WithMessage(err, "Error publishing message")
}
return nil
}
10 changes: 10 additions & 0 deletions sdk/pkg/sdk/clickhouse_insert.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package sdk

func (sdk *SDK) ClickHouseInsertData(tableName string, data interface{}) error {
err := sdk.clickhouseClient.InsertData(tableName, data)
if err != nil {
return err
}
sdk.logger.Printf("insert into table successfully %v", data)
return nil
}
10 changes: 10 additions & 0 deletions sdk/pkg/sdk/listdata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package sdk

func (sdk *SDK) ListtData(data interface{}) error {
data, err := sdk.clickhouseClient.List(data)
if err != nil {
return err
}
sdk.logger.Printf("insert into table successfully %v", data)
return nil
}
10 changes: 10 additions & 0 deletions sdk/pkg/sdk/nats_consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package sdk

func (sdk *SDK) ConsumeNatsData(subject, consumerName string) error {
data, err := sdk.natsClient.Consumer(subject, consumerName)
if err != nil {
return err
}
sdk.logger.Printf("Consumed successfully from stream %v", data)
return nil
}
9 changes: 9 additions & 0 deletions sdk/pkg/sdk/nats_publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package sdk

func (sdk *SDK) PublishToNats(subject string, streamName string, data interface{}) error {
if err := sdk.natsClient.Publish(subject, streamName, data); err != nil {
return err
}
sdk.logger.Printf("Message published successfully to stream %v", streamName)
return nil
}
9 changes: 9 additions & 0 deletions sdk/pkg/sdk/nats_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package sdk

func (sdk *SDK) CreateNatsStream(streamName string, streamSubjects []string) error {
if err := sdk.natsClient.CreateStream(streamName, streamSubjects); err != nil {
return err
}
sdk.logger.Printf("Stream created successfully for streamName %v, streamSubjects %v", streamName, streamSubjects)
return nil
}
Loading

0 comments on commit 1f965d9

Please sign in to comment.