forked from koblas/impalathing
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathconnection.go
119 lines (93 loc) · 2.68 KB
/
connection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package impalathing
import (
"context"
"fmt"
"log"
"time"
"git.apache.org/thrift.git/lib/go/thrift"
"github.com/MediaMath/impalathing/services/beeswax"
impala "github.com/MediaMath/impalathing/services/impalaservice"
)
type Options struct {
PollIntervalSeconds float64
BatchSize int64
}
var (
DefaultOptions = Options{PollIntervalSeconds: 0.1, BatchSize: 10000}
)
type Connection struct {
ctx context.Context
client *impala.ImpalaServiceClient
handle *beeswax.QueryHandle
transport thrift.TTransport
options Options
Host string
Port int
Timeout time.Duration
}
func Connect(ctx context.Context, host string, port int, options Options, timeout time.Duration) (*Connection, error) {
socket, err := thrift.NewTSocketTimeout(fmt.Sprintf("%s:%d", host, port), timeout)
if err != nil {
return nil, err
}
transportFactory := thrift.NewTBufferedTransportFactory(16 * 1024)
protocolFactory := thrift.NewTBinaryProtocolFactoryDefault()
transport, _ := transportFactory.GetTransport(socket)
if err := transport.Open(); err != nil {
if transport != nil {
transport.Close()
}
return nil, err
}
client := impala.NewImpalaServiceClientFactory(transport, protocolFactory)
return &Connection{ctx, client, nil, transport, options, host, port, timeout}, nil
}
func (c *Connection) isOpen() bool {
return c.client != nil
}
func (c *Connection) Close() error {
if c.isOpen() {
if c.handle != nil {
status, err := c.client.Cancel(c.ctx, c.handle)
if err != nil {
return err
} else {
log.Println(status)
}
c.handle = nil
}
c.transport.Close()
c.client = nil
}
return nil
}
func (c *Connection) CloseQuery(ctx context.Context, handle *beeswax.QueryHandle) error {
return c.client.Close(ctx, handle)
}
func (c *Connection) CloseInsert(ctx context.Context, handle *beeswax.QueryHandle) (map[string]int64, error) {
result, err := c.client.CloseInsert(ctx, handle)
if err != nil {
return nil, err
}
return result.RowsAppended, nil
}
func (c *Connection) ExecuteAndWait(ctx context.Context, query string) (RowSet, error) {
bquery := beeswax.Query{}
bquery.Query = query
bquery.Configuration = []string{}
handle, err := c.client.ExecuteAndWait(ctx, &bquery, "impala")
if err != nil {
return nil, err
}
return newRowSet(ctx, c.client, handle, c.options, c.Host, c.Port, c.Timeout), nil
}
func (c *Connection) Query(ctx context.Context, query string) (RowSet, error) {
bquery := beeswax.Query{}
bquery.Query = query
bquery.Configuration = []string{}
handle, err := c.client.Query(ctx, &bquery)
if err != nil {
return nil, err
}
return newRowSet(ctx, c.client, handle, c.options, c.Host, c.Port, c.Timeout), nil
}