-
Notifications
You must be signed in to change notification settings - Fork 0
/
unixlistener.go
185 lines (159 loc) · 4.5 KB
/
unixlistener.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
package acomm
import (
"fmt"
"net"
"net/url"
"os"
"path/filepath"
"sync"
"time"
log "github.com/Sirupsen/logrus"
logx "github.com/mistifyio/mistify-logrus-ext"
)
// UnixListener is a wrapper for a unix socket. It handles creation and
// listening for new connections, as well as graceful shutdown.
type UnixListener struct {
acceptLimit int
addr *net.UnixAddr
listener *net.UnixListener
waitgroup sync.WaitGroup
stopChan chan struct{}
connChan chan net.Conn
}
// NewUnixListener creates and initializes a new UnixListener. AcceptLimit
// controls how many connections it will listen for before stopping; 0 and
// below is unlimited.
func NewUnixListener(socketPath string, acceptLimit int) *UnixListener {
// Ignore error since the only time it would arise is with a bad net
// parameter
addr, _ := net.ResolveUnixAddr("unix", socketPath)
// Negatives are easier to work with for unlimited than zero
if acceptLimit <= 0 {
acceptLimit = -1
}
return &UnixListener{
addr: addr,
// Note: The chan here just holds conns until they get passed to a
// handler. The buffer size does not control conn handling concurrency.
connChan: make(chan net.Conn, 1000),
acceptLimit: acceptLimit,
}
}
// Addr returns the string representation of the unix address.
func (ul *UnixListener) Addr() string {
return ul.addr.String()
}
// URL returns the URL representation of the unix address.
func (ul *UnixListener) URL() *url.URL {
u, _ := url.ParseRequestURI(fmt.Sprintf("unix://%s", ul.Addr()))
return u
}
// Start prepares the listener and starts listening for new connections.
func (ul *UnixListener) Start() error {
if err := ul.createListener(); err != nil {
return err
}
ul.stopChan = make(chan struct{})
// Waitgroup should wait for the listener itself to close
ul.waitgroup.Add(1)
go ul.listen()
return nil
}
// createListener creates a new net.UnixListener
func (ul *UnixListener) createListener() error {
// create directory structure if it does not exist yet
directory := filepath.Dir(ul.Addr())
// TODO: Decide on permissions
if err := os.MkdirAll(directory, os.ModePerm); err != nil {
log.WithFields(log.Fields{
"directory": directory,
"perm": os.ModePerm,
"error": err,
}).Error("failed to create directory for socket")
return err
}
listener, err := net.ListenUnix("unix", ul.addr)
if err != nil {
log.WithFields(log.Fields{
"addr": ul.Addr(),
"error": err,
}).Error("failed to create listener")
return err
}
ul.listener = listener
return nil
}
// listen continuously listens and accepts new connections up to the accept
// limit.
func (ul *UnixListener) listen() {
defer ul.waitgroup.Done()
defer logx.LogReturnedErr(ul.listener.Close, log.Fields{
"addr": ul.Addr(),
}, "failed to close listener")
for i := ul.acceptLimit; i != 0; {
select {
case <-ul.stopChan:
log.WithFields(log.Fields{
"addr": ul.Addr(),
}).Info("stop listening")
return
default:
}
if err := ul.listener.SetDeadline(time.Now().Add(time.Second)); err != nil {
log.WithFields(log.Fields{
"addr": ul.Addr(),
"error": err,
}).Error("failed to set listener deadline")
}
conn, err := ul.listener.Accept()
if nil != err {
// Don't worry about a timeout
if opErr, ok := err.(*net.OpError); ok && opErr.Timeout() {
continue
}
log.WithFields(log.Fields{
"addr": ul.Addr(),
"error": err,
}).Error("failed to accept new connection")
continue
}
ul.waitgroup.Add(1)
ul.connChan <- conn
// Only decrement i when there's a limit it is counting down
if i > 0 {
i--
}
}
}
// Stop stops listening for new connections. It blocks until existing
// connections are handled and the listener closed.
func (ul *UnixListener) Stop(timeout time.Duration) {
close(ul.stopChan)
ul.waitgroup.Wait()
return
}
// NextConn blocks and returns the next connection. It will return nil when the
// listener is stopped and all existing connections have been handled.
// Connections should be handled in a go routine to take advantage of
// concurrency. When done, the connection MUST be finished with a call to
// DoneConn.
func (ul *UnixListener) NextConn() net.Conn {
select {
case <-ul.stopChan:
return nil
case conn := <-ul.connChan:
return conn
}
}
// DoneConn completes the handling of a connection.
func (ul *UnixListener) DoneConn(conn net.Conn) {
if conn == nil {
return
}
defer ul.waitgroup.Done()
defer logx.LogReturnedErr(conn.Close,
log.Fields{
"addr": ul.addr,
}, "failed to close unix connection",
)
}