-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathconfig.go
87 lines (78 loc) · 2.27 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package tickstore
import (
"database/sql"
"fmt"
"log"
"time"
"github.com/ClickHouse/clickhouse-go"
kiteticker "github.com/zerodha/gokiteconnect/v4/ticker"
)
// Client represents clickhouse DB client connection
type Client struct {
dbClient *sql.DB
apiKey string
accessToken string
tokenList []uint32
dumpSize int
ticker *kiteticker.Ticker
pipeline chan tickData
}
// ClientParam represents interface to connect clickhouse and kite ticker stream
type ClientParam struct {
DBSource string
ApiKey string
AccessToken string
TokenList []uint32
DumpSize int
}
// tickData is struct to store streaming tick data in clickhouse
type tickData struct {
Token uint32
TimeStamp time.Time
LastPrice float64
}
// Creates a new DB connection client
func New(userParam ClientParam) *Client {
if userParam.DBSource == "" {
userParam.DBSource = "tcp://127.0.0.1:9000?debug=true"
}
connect, err := sql.Open("clickhouse", userParam.DBSource)
if err = connect.Ping(); err != nil {
if exception, ok := err.(*clickhouse.Exception); ok {
fmt.Printf("[%d] %s \n%s\n", exception.Code, exception.Message, exception.StackTrace)
} else {
fmt.Println(err)
}
}
// Set default dump size to 5 times of the token list length
if userParam.DumpSize == 0 {
userParam.DumpSize = len(userParam.TokenList) * 5
}
// Create tickdata table for fresh instance
// Replacingmergetree engine removes all duplicate entries with the same timestamp and price
// As those won't be useful for candle creation
_, err = connect.Exec(`
CREATE TABLE IF NOT EXISTS tickdata (
instrument_token UInt32,
timestamp DateTime('Asia/Calcutta'),
price FLOAT()
) engine=ReplacingMergeTree()
ORDER BY (timestamp, instrument_token, price)
`)
if err != nil {
log.Fatalf("Error creating tickdata table: %v", err)
}
// Create new Kite ticker instance
ticker := kiteticker.New(userParam.ApiKey, userParam.AccessToken)
// Channel to store all upcoming streams of ticks
pipeline := make(chan tickData, userParam.DumpSize)
return &Client{
dbClient: connect,
apiKey: userParam.ApiKey,
accessToken: userParam.AccessToken,
tokenList: userParam.TokenList,
dumpSize: userParam.DumpSize,
ticker: ticker,
pipeline: pipeline,
}
}