From 97aa0a4d04206ccd91f7ce6d00d001bbf3ea6803 Mon Sep 17 00:00:00 2001 From: Maxim Date: Fri, 2 Jul 2021 12:25:03 +0300 Subject: [PATCH] Refactored tx broadcast. * Don't use commit/block method, wait instead * Added AsyncBroadcastMsg for cases when waiting is implemented externally * QueueBroadcastMsg must await its transactions by design * Fixed QueueBroadcastMsg enqueueing bug. --- chain/client/cosmos.go | 119 +++++++++++++++++++++++++++++++++-------- 1 file changed, 96 insertions(+), 23 deletions(-) diff --git a/chain/client/cosmos.go b/chain/client/cosmos.go index 38adf5e5..1f9da46d 100644 --- a/chain/client/cosmos.go +++ b/chain/client/cosmos.go @@ -1,6 +1,8 @@ package client import ( + "context" + "encoding/hex" "encoding/json" "strings" "sync" @@ -31,6 +33,7 @@ type CosmosClient interface { FromAddress() sdk.AccAddress QueryClient() *grpc.ClientConn SyncBroadcastMsg(msgs ...sdk.Msg) (*sdk.TxResponse, error) + AsyncBroadcastMsg(msgs ...sdk.Msg) (*sdk.TxResponse, error) QueueBroadcastMsg(msgs ...sdk.Msg) error ClientContext() client.Context Close() @@ -177,6 +180,7 @@ var ( ErrReadOnly = errors.New("client is in read-only mode") ) +// SyncBroadcastMsg sends Tx to chain and waits until Tx is included in block. func (c *cosmosClient) SyncBroadcastMsg(msgs ...sdk.Msg) (*sdk.TxResponse, error) { c.syncMux.Lock() defer c.syncMux.Unlock() @@ -204,6 +208,41 @@ func (c *cosmosClient) SyncBroadcastMsg(msgs ...sdk.Msg) (*sdk.TxResponse, error return res, nil } +// AsyncBroadcastMsg sends Tx to chain and doesn't wait until Tx is included in block. This method +// cannot be used for rapid Tx sending, it is expected that you wait for transaction status with +// external tools. If you want sdk to wait for it, use SyncBroadcastMsg. +func (c *cosmosClient) AsyncBroadcastMsg(msgs ...sdk.Msg) (*sdk.TxResponse, error) { + c.syncMux.Lock() + defer c.syncMux.Unlock() + + c.txFactory = c.txFactory.WithSequence(c.accSeq) + c.txFactory = c.txFactory.WithAccountNumber(c.accNum) + res, err := c.broadcastTx(c.ctx, c.txFactory, false, msgs...) + if err != nil { + if strings.Contains(err.Error(), "account sequence mismatch") { + c.syncNonce() + c.txFactory = c.txFactory.WithSequence(c.accSeq) + c.txFactory = c.txFactory.WithAccountNumber(c.accNum) + log.Debugln("retrying broadcastTx with nonce", c.accSeq) + res, err = c.broadcastTx(c.ctx, c.txFactory, false, msgs...) + } + if err != nil { + resJSON, _ := json.MarshalIndent(res, "", "\t") + c.logger.WithField("size", len(msgs)).WithError(err).Errorln("failed to commit msg batch:", string(resJSON)) + return nil, err + } + } + + c.accSeq++ + + return res, nil +} + +const ( + defaultBroadcastStatusPoll = 100 * time.Millisecond + defaultBroadcastTimeout = 40 * time.Second +) + func (c *cosmosClient) broadcastTx( clientCtx client.Context, txf tx.Factory, @@ -246,17 +285,48 @@ func (c *cosmosClient) broadcastTx( return nil, err } - // broadcast to a Tendermint node - if await { - // BroadcastTxCommit - full synced commit with await - res, err := clientCtx.BroadcastTxCommit(txBytes) + res, err := clientCtx.BroadcastTxSync(txBytes) + if !await || err != nil { return res, err } - // BroadcastTxSync - only CheckTx, don't wait confirmation - return clientCtx.BroadcastTxSync(txBytes) + awaitCtx, cancelFn := context.WithTimeout(context.Background(), defaultBroadcastTimeout) + defer cancelFn() + + txHash, _ := hex.DecodeString(res.TxHash) + t := time.NewTimer(defaultBroadcastStatusPoll) + + for { + select { + case <-awaitCtx.Done(): + err := errors.Wrapf(ErrTimedOut, "%s", res.TxHash) + t.Stop() + return nil, err + case <-t.C: + resultTx, err := clientCtx.Client.Tx(awaitCtx, txHash, false) + if err != nil { + if errRes := client.CheckTendermintError(err, txBytes); errRes != nil { + return errRes, err + } + + // log.WithError(err).Warningln("Tx Error for Hash:", res.TxHash) + + t.Reset(defaultBroadcastStatusPoll) + continue + + } else if resultTx.Height > 0 { + res = sdk.NewResponseResultTx(resultTx, res.Tx, res.Timestamp) + t.Stop() + return res, err + } + + t.Reset(defaultBroadcastStatusPoll) + } + } } +var ErrTimedOut = errors.New("tx timed out") + // prepareFactory ensures the account defined by ctx.GetFromAddress() exists and // if the account number and/or the account sequence number are zero (not set), // they will be queried for and set on the provided Factory. A new Factory with @@ -287,6 +357,8 @@ func (c *cosmosClient) prepareFactory(clientCtx client.Context, txf tx.Factory) return txf, nil } +// QueueBroadcastMsg enqueues a list of messages. Messages will added to the queue +// and grouped into Txns in chunks. Use this method to mass broadcast Txns with efficiency. func (c *cosmosClient) QueueBroadcastMsg(msgs ...sdk.Msg) error { if !c.canSign { return ErrReadOnly @@ -324,7 +396,7 @@ func (c *cosmosClient) Close() { } const ( - msgCommitBatchSizeLimit = 512 + msgCommitBatchSizeLimit = 1024 msgCommitBatchTimeLimit = 500 * time.Millisecond ) @@ -332,31 +404,25 @@ func (c *cosmosClient) runBatchBroadcast() { expirationTimer := time.NewTimer(msgCommitBatchTimeLimit) msgBatch := make([]sdk.Msg, 0, msgCommitBatchSizeLimit) - resetBatch := func() { - msgBatch = msgBatch[:0] - - expirationTimer.Reset(msgCommitBatchTimeLimit) - } - - submitBatch := func() { + submitBatch := func(toSubmit []sdk.Msg) { c.syncMux.Lock() defer c.syncMux.Unlock() c.txFactory = c.txFactory.WithSequence(c.accSeq) c.txFactory = c.txFactory.WithAccountNumber(c.accNum) log.Debugln("broadcastTx with nonce", c.accSeq) - res, err := c.broadcastTx(c.ctx, c.txFactory, false, msgBatch...) + res, err := c.broadcastTx(c.ctx, c.txFactory, true, toSubmit...) if err != nil { if strings.Contains(err.Error(), "account sequence mismatch") { c.syncNonce() c.txFactory = c.txFactory.WithSequence(c.accSeq) c.txFactory = c.txFactory.WithAccountNumber(c.accNum) log.Debugln("retrying broadcastTx with nonce", c.accSeq) - res, err = c.broadcastTx(c.ctx, c.txFactory, false, msgBatch...) + res, err = c.broadcastTx(c.ctx, c.txFactory, true, toSubmit...) } if err != nil { resJSON, _ := json.MarshalIndent(res, "", "\t") - c.logger.WithField("size", len(msgBatch)).WithError(err).Errorln("failed to commit msg batch:", string(resJSON)) + c.logger.WithField("size", len(toSubmit)).WithError(err).Errorln("failed to commit msg batch:", string(resJSON)) return } } @@ -378,7 +444,7 @@ func (c *cosmosClient) runBatchBroadcast() { if !ok { // exit required if len(msgBatch) > 0 { - submitBatch() + submitBatch(msgBatch) } close(c.doneC) @@ -388,15 +454,22 @@ func (c *cosmosClient) runBatchBroadcast() { msgBatch = append(msgBatch, msg) if len(msgBatch) >= msgCommitBatchSizeLimit { - submitBatch() - resetBatch() + toSubmit := msgBatch + msgBatch = msgBatch[:0] + expirationTimer.Reset(msgCommitBatchTimeLimit) + + submitBatch(toSubmit) } case <-expirationTimer.C: if len(msgBatch) > 0 { - submitBatch() - } + toSubmit := msgBatch + msgBatch = msgBatch[:0] + expirationTimer.Reset(msgCommitBatchTimeLimit) - resetBatch() + submitBatch(toSubmit) + } else { + expirationTimer.Reset(msgCommitBatchTimeLimit) + } } } }