Skip to content

Commit 5a1fee4

Browse files
committed
faet: increased use of custom routine pools with req.
1 parent b4444e9 commit 5a1fee4

File tree

2 files changed

+33
-2
lines changed

2 files changed

+33
-2
lines changed

โ€Žserver/option.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,13 @@ func WithCustomPool(pool WorkerPool) OptionFn {
5454
}
5555
}
5656

57+
// WithReqPool uses a custom goroutine pool with req.
58+
func WithReqPool(reqPool ReqWorkerPool) OptionFn {
59+
return func(s *Server) {
60+
s.reqPool = reqPool
61+
}
62+
}
63+
5764
// WithAsyncWrite sets AsyncWrite to true.
5865
func WithAsyncWrite() OptionFn {
5966
return func(s *Server) {

โ€Žserver/server.go

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,13 @@ type WorkerPool interface {
7676
StopAndWait()
7777
}
7878

79+
type ReqWorkerPool interface {
80+
Submit(req *protocol.Message, task func())
81+
StopAndWaitFor(deadline time.Duration)
82+
Stop()
83+
StopAndWait()
84+
}
85+
7986
// Server is rpcx server that use TCP or UDP.
8087
type Server struct {
8188
ln net.Listener
@@ -90,6 +97,7 @@ type Server struct {
9097
EnableProfile bool // enable profile and statsview or not
9198
AsyncWrite bool // set true if your server only serves few clients
9299
pool WorkerPool
100+
reqPool ReqWorkerPool
93101

94102
serviceMapMu sync.RWMutex
95103
serviceMap map[string]*service
@@ -365,7 +373,15 @@ func (s *Server) sendResponse(ctx *share.Context, conn net.Conn, err error, req,
365373

366374
data := res.EncodeSlicePointer()
367375
if s.AsyncWrite {
368-
if s.pool != nil {
376+
if s.reqPool != nil {
377+
s.reqPool.Submit(req, func() {
378+
if s.writeTimeout != 0 {
379+
conn.SetWriteDeadline(time.Now().Add(s.writeTimeout))
380+
}
381+
conn.Write(*data)
382+
protocol.PutData(data)
383+
})
384+
} else if s.pool != nil {
369385
s.pool.Submit(func() {
370386
if s.writeTimeout != 0 {
371387
conn.SetWriteDeadline(time.Now().Add(s.writeTimeout))
@@ -514,7 +530,11 @@ func (s *Server) serveConn(conn net.Conn) {
514530
continue
515531
}
516532

517-
if s.pool != nil {
533+
if s.reqPool != nil {
534+
s.reqPool.Submit(req, func() {
535+
s.processOneRequest(ctx, req, conn)
536+
})
537+
} else if s.pool != nil {
518538
s.pool.Submit(func() {
519539
s.processOneRequest(ctx, req, conn)
520540
})
@@ -921,6 +941,10 @@ func (s *Server) Close() error {
921941
s.pool.StopAndWaitFor(10 * time.Second)
922942
}
923943

944+
if s.reqPool != nil {
945+
s.reqPool.StopAndWaitFor(10 * time.Second)
946+
}
947+
924948
return err
925949
}
926950

0 commit comments

Comments
ย (0)