-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathconn_pool.go
101 lines (81 loc) · 1.8 KB
/
conn_pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package client
import (
"context"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
)
var (
DefaultDialTimeout = 3 * time.Second
)
type Pool struct {
size int
ttl int64
sync.Mutex
conns map[string][]*poolConn
}
type poolConn struct {
cc *grpc.ClientConn
created int64
}
func NewPool(size int, ttl time.Duration) *Pool {
return &Pool{
size: size,
ttl: int64(ttl.Seconds()),
conns: make(map[string][]*poolConn),
}
}
func (p *Pool) Get(addr string, opts ...grpc.DialOption) (*poolConn, error) {
p.Lock()
conns := p.conns[addr]
now := time.Now().Unix()
// while we have conns check age and then return one
// otherwise we'll create a new conn
for len(conns) > 0 {
conn := conns[len(conns)-1]
conns = conns[:len(conns)-1]
p.conns[addr] = conns
// if conn is old or not ready kill it and move on
if d := now - conn.created; d > p.ttl || conn.cc.GetState() != connectivity.Ready {
conn.cc.Close()
continue
}
// we got a good conn, lets unlock and return it
p.Unlock()
return conn, nil
}
p.Unlock()
// create new conn
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// TODO timeout option
if DefaultDialTimeout > 0 {
ctx, _ = context.WithTimeout(ctx, DefaultDialTimeout)
}
cc, err := grpc.DialContext(ctx, addr, opts...)
if err != nil {
return nil, err
}
return &poolConn{cc, time.Now().Unix()}, nil
}
func (p *Pool) Put(addr string, conn *poolConn, err error) {
// don't store the conn if it has errored
if err != nil {
conn.cc.Close()
return
}
// otherwise put it back for reuse
p.Lock()
conns := p.conns[addr]
if len(conns) >= p.size {
p.Unlock()
conn.cc.Close()
return
}
p.conns[addr] = append(conns, conn)
p.Unlock()
}
func (pc *poolConn) GetCC() *grpc.ClientConn {
return pc.cc
}