From cf9b5a7375218a54fd2ecd36812be0af707f3516 Mon Sep 17 00:00:00 2001 From: Calvin Kim Date: Tue, 7 May 2024 13:20:18 +0900 Subject: [PATCH 1/2] electrum: refactor out message handling code We refactor out the message handling code to a separate method on ElectrumServer. This helsp readablility as well as help with reducing code diff when changing the code to handle batched rpc requests. --- electrum/server.go | 128 ++++++++++++++++++++++----------------------- 1 file changed, 64 insertions(+), 64 deletions(-) diff --git a/electrum/server.go b/electrum/server.go index 99e343d8..cf1f531c 100644 --- a/electrum/server.go +++ b/electrum/server.go @@ -912,6 +912,69 @@ func (s *ElectrumServer) writeErrorResponse(conn net.Conn, rpcError btcjson.RPCE s.writeChan <- []connAndBytes{{conn, bytes}} } +func (s *ElectrumServer) handleSingleMsg(msg btcjson.Request, conn net.Conn) { + log.Debugf("unmarshalled %v from conn %s\n", msg, conn.RemoteAddr().String()) + handler, found := rpcHandlers[msg.Method] + if !found || handler == nil { + log.Warnf("handler not found for method %v. Sending error msg to %v", + msg.Method, conn.RemoteAddr().String()) + pid := &msg.ID + s.writeErrorResponse(conn, *btcjson.ErrRPCMethodNotFound, pid) + return + } + + result, err := handler(s, &msg, conn, nil) + if err != nil { + log.Warnf("Errored while handling method %s. Sending error message to %v err: %v\n", + msg.Method, conn.RemoteAddr().String(), err) + + pid := &msg.ID + rpcError := btcjson.RPCError{ + Code: 1, + Message: err.Error(), + } + s.writeErrorResponse(conn, rpcError, pid) + return + } + + marshalledResult, err := json.Marshal(result) + if err != nil { + log.Warnf("Errored while marshaling result for method %s. Sending error message to %v err: %v\n", + msg.Method, conn.RemoteAddr().String(), err) + rpcError := btcjson.RPCError{ + Code: btcjson.ErrRPCInternal.Code, + Message: fmt.Sprintf("%s: error: %s", + btcjson.ErrRPCInternal.Message, err.Error()), + } + pid := &msg.ID + s.writeErrorResponse(conn, rpcError, pid) + return + } + pid := &msg.ID + resp := btcjson.Response{ + Jsonrpc: btcjson.RpcVersion2, + Result: json.RawMessage(marshalledResult), + ID: pid, + } + bytes, err := json.Marshal(resp) + if err != nil { + log.Warnf("Errored while marshaling response for method %s. Sending error message to %v err: %v\n", + msg.Method, conn.RemoteAddr().String(), err) + rpcError := btcjson.RPCError{ + Code: btcjson.ErrRPCInternal.Code, + Message: fmt.Sprintf("%s: error: %s", + btcjson.ErrRPCInternal.Message, err.Error()), + } + pid := &msg.ID + s.writeErrorResponse(conn, rpcError, pid) + return + } + bytes = append(bytes, delim) + + log.Debugf("put %v to be written to %v\n", result, conn.RemoteAddr().String()) + s.writeChan <- []connAndBytes{{conn, bytes}} +} + func (s *ElectrumServer) handleConnection(conn net.Conn) { // The timer is stopped when a new message is received and reset after it // is processed. @@ -949,70 +1012,7 @@ func (s *ElectrumServer) handleConnection(conn net.Conn) { continue } - log.Debugf("unmarshalled %v from conn %s\n", msg, conn.RemoteAddr().String()) - handler, found := rpcHandlers[msg.Method] - if !found || handler == nil { - log.Warnf("handler not found for method %v. Sending error msg to %v", - msg.Method, conn.RemoteAddr().String()) - pid := &msg.ID - s.writeErrorResponse(conn, *btcjson.ErrRPCMethodNotFound, pid) - idleTimer.Reset(idleTimeout) - continue - } - - result, err := handler(s, &msg, conn, nil) - if err != nil { - log.Warnf("Errored while handling method %s. Sending error message to %v err: %v\n", - msg.Method, conn.RemoteAddr().String(), err) - - pid := &msg.ID - rpcError := btcjson.RPCError{ - Code: 1, - Message: err.Error(), - } - s.writeErrorResponse(conn, rpcError, pid) - idleTimer.Reset(idleTimeout) - continue - } - - marshalledResult, err := json.Marshal(result) - if err != nil { - log.Warnf("Errored while marshaling result for method %s. Sending error message to %v err: %v\n", - msg.Method, conn.RemoteAddr().String(), err) - rpcError := btcjson.RPCError{ - Code: btcjson.ErrRPCInternal.Code, - Message: fmt.Sprintf("%s: error: %s", - btcjson.ErrRPCInternal.Message, err.Error()), - } - pid := &msg.ID - s.writeErrorResponse(conn, rpcError, pid) - continue - } - pid := &msg.ID - resp := btcjson.Response{ - Jsonrpc: btcjson.RpcVersion2, - Result: json.RawMessage(marshalledResult), - ID: pid, - } - bytes, err := json.Marshal(resp) - if err != nil { - log.Warnf("Errored while marshaling response for method %s. Sending error message to %v err: %v\n", - msg.Method, conn.RemoteAddr().String(), err) - rpcError := btcjson.RPCError{ - Code: btcjson.ErrRPCInternal.Code, - Message: fmt.Sprintf("%s: error: %s", - btcjson.ErrRPCInternal.Message, err.Error()), - } - pid := &msg.ID - s.writeErrorResponse(conn, rpcError, pid) - idleTimer.Reset(idleTimeout) - continue - } - bytes = append(bytes, delim) - - log.Debugf("put %v to be written to %v\n", result, conn.RemoteAddr().String()) - s.writeChan <- []connAndBytes{{conn, bytes}} - + s.handleSingleMsg(msg, conn) idleTimer.Reset(idleTimeout) } From 7a7f6d98af5e58eb8dc828b4c7e8fe6928fd3843 Mon Sep 17 00:00:00 2001 From: Calvin Kim Date: Tue, 7 May 2024 13:35:12 +0900 Subject: [PATCH 2/2] electrum: handle batched rpc requests Some clients will send batched rpc requests. To handle this, we assume every request is a batched rpc request and fall back to unmarshalling as a single request when it errors. --- electrum/server.go | 34 +++++++++++++++++++++++----------- 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/electrum/server.go b/electrum/server.go index cf1f531c..1e298df3 100644 --- a/electrum/server.go +++ b/electrum/server.go @@ -998,22 +998,34 @@ func (s *ElectrumServer) handleConnection(conn net.Conn) { } idleTimer.Stop() - msg := btcjson.Request{} - err = msg.UnmarshalJSON(line) + // Attempt to unmarshal as a batched json request. + msgs := []btcjson.Request{} + err = json.Unmarshal(line, &msgs) if err != nil { - log.Warnf("error while unmarshalling %v. Sending error message to %v. Error: %v", - hex.EncodeToString(line), conn.RemoteAddr().String(), err) - if e, ok := err.(*json.SyntaxError); ok { - log.Warnf("syntax error at byte offset %d", e.Offset) + // If that fails, attempt to unmarshal as a single request. + msg := btcjson.Request{} + err = msg.UnmarshalJSON(line) + if err != nil { + log.Warnf("error while unmarshalling %v. Sending error message to %v. Error: %v", + hex.EncodeToString(line), conn.RemoteAddr().String(), err) + if e, ok := err.(*json.SyntaxError); ok { + log.Warnf("syntax error at byte offset %d", e.Offset) + } + + pid := &msgs[0].ID + s.writeErrorResponse(conn, *btcjson.ErrRPCParse, pid) + continue } - pid := &msg.ID - s.writeErrorResponse(conn, *btcjson.ErrRPCParse, pid) - continue + // If the single request unmarshal was successful, append to the + // msgs for processing below. + msgs = append(msgs, msg) } - s.handleSingleMsg(msg, conn) - idleTimer.Reset(idleTimeout) + for _, msg := range msgs { + s.handleSingleMsg(msg, conn) + idleTimer.Reset(idleTimeout) + } } idleTimer.Stop()