Skip to content

Commit

Permalink
Merge pull request #179 from kcalvinalvin/2024-05-07-handle-batch-rpc…
Browse files Browse the repository at this point in the history
…-requests

electrum: handle batch rpc requests
  • Loading branch information
kcalvinalvin authored May 7, 2024
2 parents 69d6be7 + 7a7f6d9 commit 37eb522
Showing 1 changed file with 84 additions and 72 deletions.
156 changes: 84 additions & 72 deletions electrum/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,69 @@ func (s *ElectrumServer) writeErrorResponse(conn net.Conn, rpcError btcjson.RPCE
s.writeChan <- []connAndBytes{{conn, bytes}}
}

func (s *ElectrumServer) handleSingleMsg(msg btcjson.Request, conn net.Conn) {
log.Debugf("unmarshalled %v from conn %s\n", msg, conn.RemoteAddr().String())
handler, found := rpcHandlers[msg.Method]
if !found || handler == nil {
log.Warnf("handler not found for method %v. Sending error msg to %v",
msg.Method, conn.RemoteAddr().String())
pid := &msg.ID
s.writeErrorResponse(conn, *btcjson.ErrRPCMethodNotFound, pid)
return
}

result, err := handler(s, &msg, conn, nil)
if err != nil {
log.Warnf("Errored while handling method %s. Sending error message to %v err: %v\n",
msg.Method, conn.RemoteAddr().String(), err)

pid := &msg.ID
rpcError := btcjson.RPCError{
Code: 1,
Message: err.Error(),
}
s.writeErrorResponse(conn, rpcError, pid)
return
}

marshalledResult, err := json.Marshal(result)
if err != nil {
log.Warnf("Errored while marshaling result for method %s. Sending error message to %v err: %v\n",
msg.Method, conn.RemoteAddr().String(), err)
rpcError := btcjson.RPCError{
Code: btcjson.ErrRPCInternal.Code,
Message: fmt.Sprintf("%s: error: %s",
btcjson.ErrRPCInternal.Message, err.Error()),
}
pid := &msg.ID
s.writeErrorResponse(conn, rpcError, pid)
return
}
pid := &msg.ID
resp := btcjson.Response{
Jsonrpc: btcjson.RpcVersion2,
Result: json.RawMessage(marshalledResult),
ID: pid,
}
bytes, err := json.Marshal(resp)
if err != nil {
log.Warnf("Errored while marshaling response for method %s. Sending error message to %v err: %v\n",
msg.Method, conn.RemoteAddr().String(), err)
rpcError := btcjson.RPCError{
Code: btcjson.ErrRPCInternal.Code,
Message: fmt.Sprintf("%s: error: %s",
btcjson.ErrRPCInternal.Message, err.Error()),
}
pid := &msg.ID
s.writeErrorResponse(conn, rpcError, pid)
return
}
bytes = append(bytes, delim)

log.Debugf("put %v to be written to %v\n", result, conn.RemoteAddr().String())
s.writeChan <- []connAndBytes{{conn, bytes}}
}

func (s *ElectrumServer) handleConnection(conn net.Conn) {
// The timer is stopped when a new message is received and reset after it
// is processed.
Expand All @@ -935,85 +998,34 @@ func (s *ElectrumServer) handleConnection(conn net.Conn) {
}
idleTimer.Stop()

msg := btcjson.Request{}
err = msg.UnmarshalJSON(line)
// Attempt to unmarshal as a batched json request.
msgs := []btcjson.Request{}
err = json.Unmarshal(line, &msgs)
if err != nil {
log.Warnf("error while unmarshalling %v. Sending error message to %v. Error: %v",
hex.EncodeToString(line), conn.RemoteAddr().String(), err)
if e, ok := err.(*json.SyntaxError); ok {
log.Warnf("syntax error at byte offset %d", e.Offset)
}

pid := &msg.ID
s.writeErrorResponse(conn, *btcjson.ErrRPCParse, pid)
continue
}

log.Debugf("unmarshalled %v from conn %s\n", msg, conn.RemoteAddr().String())
handler, found := rpcHandlers[msg.Method]
if !found || handler == nil {
log.Warnf("handler not found for method %v. Sending error msg to %v",
msg.Method, conn.RemoteAddr().String())
pid := &msg.ID
s.writeErrorResponse(conn, *btcjson.ErrRPCMethodNotFound, pid)
idleTimer.Reset(idleTimeout)
continue
}

result, err := handler(s, &msg, conn, nil)
if err != nil {
log.Warnf("Errored while handling method %s. Sending error message to %v err: %v\n",
msg.Method, conn.RemoteAddr().String(), err)
// If that fails, attempt to unmarshal as a single request.
msg := btcjson.Request{}
err = msg.UnmarshalJSON(line)
if err != nil {
log.Warnf("error while unmarshalling %v. Sending error message to %v. Error: %v",
hex.EncodeToString(line), conn.RemoteAddr().String(), err)
if e, ok := err.(*json.SyntaxError); ok {
log.Warnf("syntax error at byte offset %d", e.Offset)
}

pid := &msg.ID
rpcError := btcjson.RPCError{
Code: 1,
Message: err.Error(),
pid := &msgs[0].ID
s.writeErrorResponse(conn, *btcjson.ErrRPCParse, pid)
continue
}
s.writeErrorResponse(conn, rpcError, pid)
idleTimer.Reset(idleTimeout)
continue
}

marshalledResult, err := json.Marshal(result)
if err != nil {
log.Warnf("Errored while marshaling result for method %s. Sending error message to %v err: %v\n",
msg.Method, conn.RemoteAddr().String(), err)
rpcError := btcjson.RPCError{
Code: btcjson.ErrRPCInternal.Code,
Message: fmt.Sprintf("%s: error: %s",
btcjson.ErrRPCInternal.Message, err.Error()),
}
pid := &msg.ID
s.writeErrorResponse(conn, rpcError, pid)
continue
// If the single request unmarshal was successful, append to the
// msgs for processing below.
msgs = append(msgs, msg)
}
pid := &msg.ID
resp := btcjson.Response{
Jsonrpc: btcjson.RpcVersion2,
Result: json.RawMessage(marshalledResult),
ID: pid,
}
bytes, err := json.Marshal(resp)
if err != nil {
log.Warnf("Errored while marshaling response for method %s. Sending error message to %v err: %v\n",
msg.Method, conn.RemoteAddr().String(), err)
rpcError := btcjson.RPCError{
Code: btcjson.ErrRPCInternal.Code,
Message: fmt.Sprintf("%s: error: %s",
btcjson.ErrRPCInternal.Message, err.Error()),
}
pid := &msg.ID
s.writeErrorResponse(conn, rpcError, pid)

for _, msg := range msgs {
s.handleSingleMsg(msg, conn)
idleTimer.Reset(idleTimeout)
continue
}
bytes = append(bytes, delim)

log.Debugf("put %v to be written to %v\n", result, conn.RemoteAddr().String())
s.writeChan <- []connAndBytes{{conn, bytes}}

idleTimer.Reset(idleTimeout)
}

idleTimer.Stop()
Expand Down

0 comments on commit 37eb522

Please sign in to comment.