forked from funny-falcon/go-iproto
-
Notifications
You must be signed in to change notification settings - Fork 0
/
service.go
162 lines (130 loc) · 2.5 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
package iproto
import (
"log"
"time"
)
var _ = log.Print
type Service interface {
// Send accepts request to work. It should setup deadline, if it is defined for end point
Send(*Request)
DefaultTimeout() time.Duration
Runned() bool
}
type Route func(*Request)
func (f Route) Send(r *Request) {
if r.state == RsNew {
f(r)
}
}
func (f Route) Runned() bool {
return true
}
func (f Route) DefaultTimeout() time.Duration {
return 0
}
type SF func(*Request)
func (f SF) Send(r *Request) {
if r.SetPending() && r.SetInFly(nil) {
f(r)
}
}
func (f SF) Runned() bool {
return true
}
func (f SF) DefaultTimeout() time.Duration {
return 0
}
type EndPoint interface {
Service
Run(requests chan *Request)
Stop()
}
func Run(s EndPoint) {
if s.Runned() {
log.Panicf("EndPoint already runned ( %v )", s)
}
s.Run(nil)
}
type PointLoop interface {
Loop()
}
type SimplePoint struct {
b Buffer
exit chan bool
stopped bool
standalone bool
PointLoop
Timeout time.Duration
TimeoutCode RetCode
}
var _ EndPoint = (*SimplePoint)(nil)
func (s *SimplePoint) DefaultTimeout() time.Duration {
return s.Timeout
}
func (s *SimplePoint) Runned() bool {
return s.b.ch != nil
}
func (s *SimplePoint) Standalone() bool {
return s.standalone
}
func (s *SimplePoint) Run(ch chan *Request) {
if ch == nil {
ch = make(chan *Request, 16*1024)
s.standalone = true
s.b.init()
}
s.b.ch = ch
if s.standalone {
go s.b.loop()
}
go s.Loop()
}
func (s *SimplePoint) ReceiveChan() <-chan *Request {
return s.b.ch
}
func (s *SimplePoint) RunChild(p EndPoint) {
if p.Runned() {
log.Panicf("EndPoint already runned ( %v )", s)
}
p.Run(s.b.ch)
}
func (s *SimplePoint) Init(p PointLoop) {
s.PointLoop = p
s.exit = make(chan bool)
}
func (s *SimplePoint) ExitChan() <-chan bool {
return s.exit
}
func (s *SimplePoint) Send(r *Request) {
if s.b.ch == nil {
panic("EndPoint is not running")
}
if !s.standalone {
log.Panicf("you should not call Send on child endpoint %+v", s)
}
r.Lock()
defer r.Unlock()
if !r.SetPending() {
/* this could happen if SetDeadline already respond with timeout */
if r.Performed() {
return
}
log.Panicf("Request already sent somewhere %+v", s)
}
r.SetTimeout(s.Timeout)
/* this could happen if SetDeadline already respond with timeout */
if r.Performed() {
return
}
s.b.push(r)
}
func (s *SimplePoint) Stop() {
if s.standalone {
s.b.close()
}
s.stopped = true
s.exit <- true
}
func (s *SimplePoint) Stopped() bool {
return s.stopped
}