From 04390ab447af4eaad705cb5123b06e97d362aa56 Mon Sep 17 00:00:00 2001 From: dickens7 Date: Sun, 15 Sep 2024 11:14:50 +0800 Subject: [PATCH] feat: write separation --- server/context.go | 5 +-- server/option.go | 7 ++++ server/server.go | 93 ++++++++++++++++++++++++++++++++++------------- 3 files changed, 76 insertions(+), 29 deletions(-) diff --git a/server/context.go b/server/context.go index 9a6dc9fd..3d6f5006 100644 --- a/server/context.go +++ b/server/context.go @@ -125,10 +125,7 @@ func (ctx *Context) Write(v interface{}) error { var err error if ctx.async { - go func() { - _, err = ctx.conn.Write(*respData) - protocol.PutData(respData) - }() + return wirteResp(ctx.ctx, res) } else { _, err = ctx.conn.Write(*respData) protocol.PutData(respData) diff --git a/server/option.go b/server/option.go index 016f31e1..da3d09e4 100644 --- a/server/option.go +++ b/server/option.go @@ -60,3 +60,10 @@ func WithAsyncWrite() OptionFn { s.AsyncWrite = true } } + +// AsyncOutgoing sets AsyncWrite outgoing queue +func AsyncOutgoing(limit int) OptionFn { + return func(s *Server) { + s.AsyncOutgoing = limit + } +} diff --git a/server/server.go b/server/server.go index 644f9ce8..a509369d 100644 --- a/server/server.go +++ b/server/server.go @@ -64,6 +64,8 @@ var ( TagContextKey = &contextKey{"service-tag"} // HttpConnContextKey is used to store http connection. HttpConnContextKey = &contextKey{"http-conn"} + // AsyncWriteCh + AsyncWriteCh = &contextKey{"async-write-ch"} ) type Handler func(ctx *Context) error @@ -87,6 +89,7 @@ type Server struct { DisableHTTPGateway bool // disable http invoke or not. DisableJSONRPC bool // disable json rpc or not. AsyncWrite bool // set true if your server only serves few clients + AsyncOutgoing int // write message conn outgoing queue max pool WorkerPool serviceMapMu sync.RWMutex @@ -133,14 +136,15 @@ type Server struct { // NewServer returns a server. func NewServer(options ...OptionFn) *Server { s := &Server{ - Plugins: &pluginContainer{}, - options: make(map[string]interface{}), - activeConn: make(map[net.Conn]struct{}), - doneChan: make(chan struct{}), - serviceMap: make(map[string]*service), - router: make(map[string]Handler), - AsyncWrite: false, // 除非你想做进一步的优化测试,否则建议你设置为false - Started: make(chan struct{}), + Plugins: &pluginContainer{}, + options: make(map[string]interface{}), + activeConn: make(map[net.Conn]struct{}), + doneChan: make(chan struct{}), + serviceMap: make(map[string]*service), + router: make(map[string]Handler), + AsyncWrite: false, // 除非你想做进一步的优化测试,否则建议你设置为false + AsyncOutgoing: 100000, // + Started: make(chan struct{}), } for _, op := range options { @@ -364,23 +368,11 @@ func (s *Server) sendResponse(ctx *share.Context, conn net.Conn, err error, req, data := res.EncodeSlicePointer() if s.AsyncWrite { - if s.pool != nil { - s.pool.Submit(func() { - if s.writeTimeout != 0 { - conn.SetWriteDeadline(time.Now().Add(s.writeTimeout)) - } - conn.Write(*data) - protocol.PutData(data) - }) - } else { - go func() { - if s.writeTimeout != 0 { - conn.SetWriteDeadline(time.Now().Add(s.writeTimeout)) - } - conn.Write(*data) - protocol.PutData(data) - }() + err := wirteResp(ctx, res) + if err != nil { + log.Errorf(err.Error()) } + return } else { if s.writeTimeout != 0 { conn.SetWriteDeadline(time.Now().Add(s.writeTimeout)) @@ -433,9 +425,14 @@ func (s *Server) serveConn(conn net.Conn) { return } } + var asyncWriteCh chan *protocol.Message + // async write + if s.AsyncWrite { + asyncWriteCh = make(chan *protocol.Message, s.AsyncOutgoing) + go s.asyncWrite(conn, asyncWriteCh) + } r := bufio.NewReaderSize(conn, ReaderBuffsize) - // read requests and handle it for { if s.isShutdown() { @@ -449,6 +446,9 @@ func (s *Server) serveConn(conn net.Conn) { // create a rpcx Context ctx := share.WithValue(context.Background(), RemoteConnContextKey, conn) + if s.AsyncWrite { + ctx = share.WithValue(ctx, AsyncWriteCh, asyncWriteCh) + } // read a request from the underlying connection req, err := s.readRequest(ctx, r) @@ -522,6 +522,36 @@ func (s *Server) serveConn(conn net.Conn) { } } +// syncWrite +func (s *Server) asyncWrite(conn net.Conn, asyncWriteCh chan *protocol.Message) { + if err := recover(); err != nil { + const size = 64 << 10 + buf := make([]byte, size) + ss := runtime.Stack(buf, false) + if ss > size { + ss = size + } + buf = buf[:ss] + log.Errorf("serving %s panic error: %s, stack:\n %s", conn.RemoteAddr(), err, string(buf)) + } + for { + if s.isShutdown() { + return + } + select { + case <-s.doneChan: + return + case res := <-asyncWriteCh: + data := res.EncodeSlicePointer() + if s.writeTimeout != 0 { + conn.SetWriteDeadline(time.Now().Add(s.writeTimeout)) + } + conn.Write(*data) + protocol.PutData(data) + } + } +} + func (s *Server) processOneRequest(ctx *share.Context, req *protocol.Message, conn net.Conn) { defer func() { if r := recover(); r != nil { @@ -1073,6 +1103,19 @@ func (s *Server) closeDoneChanLocked() { } } +func wirteResp(ctx context.Context, res *protocol.Message) error { + ch, ok := ctx.Value(AsyncWriteCh).(chan *protocol.Message) + if !ok { + return fmt.Errorf("async write chan closed") + } + select { + case ch <- res: + default: + return fmt.Errorf("could not write message, conn outgoing queue full") + } + return nil +} + var ip4Reg = regexp.MustCompile(`^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])$`) func validIP4(ipAddress string) bool {