Skip to content

Commit

Permalink
Refactored tx broadcast.
Browse files Browse the repository at this point in the history
* Don't use commit/block method, wait instead
* Added AsyncBroadcastMsg for cases when waiting is implemented externally
* QueueBroadcastMsg must await its transactions by design
* Fixed QueueBroadcastMsg enqueueing bug.
  • Loading branch information
Maxim committed Jul 2, 2021
1 parent 1562454 commit 97aa0a4
Showing 1 changed file with 96 additions and 23 deletions.
119 changes: 96 additions & 23 deletions chain/client/cosmos.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package client

import (
"context"
"encoding/hex"
"encoding/json"
"strings"
"sync"
Expand Down Expand Up @@ -31,6 +33,7 @@ type CosmosClient interface {
FromAddress() sdk.AccAddress
QueryClient() *grpc.ClientConn
SyncBroadcastMsg(msgs ...sdk.Msg) (*sdk.TxResponse, error)
AsyncBroadcastMsg(msgs ...sdk.Msg) (*sdk.TxResponse, error)
QueueBroadcastMsg(msgs ...sdk.Msg) error
ClientContext() client.Context
Close()
Expand Down Expand Up @@ -177,6 +180,7 @@ var (
ErrReadOnly = errors.New("client is in read-only mode")
)

// SyncBroadcastMsg sends Tx to chain and waits until Tx is included in block.
func (c *cosmosClient) SyncBroadcastMsg(msgs ...sdk.Msg) (*sdk.TxResponse, error) {
c.syncMux.Lock()
defer c.syncMux.Unlock()
Expand Down Expand Up @@ -204,6 +208,41 @@ func (c *cosmosClient) SyncBroadcastMsg(msgs ...sdk.Msg) (*sdk.TxResponse, error
return res, nil
}

// AsyncBroadcastMsg sends Tx to chain and doesn't wait until Tx is included in block. This method
// cannot be used for rapid Tx sending, it is expected that you wait for transaction status with
// external tools. If you want sdk to wait for it, use SyncBroadcastMsg.
func (c *cosmosClient) AsyncBroadcastMsg(msgs ...sdk.Msg) (*sdk.TxResponse, error) {
c.syncMux.Lock()
defer c.syncMux.Unlock()

c.txFactory = c.txFactory.WithSequence(c.accSeq)
c.txFactory = c.txFactory.WithAccountNumber(c.accNum)
res, err := c.broadcastTx(c.ctx, c.txFactory, false, msgs...)
if err != nil {
if strings.Contains(err.Error(), "account sequence mismatch") {
c.syncNonce()
c.txFactory = c.txFactory.WithSequence(c.accSeq)
c.txFactory = c.txFactory.WithAccountNumber(c.accNum)
log.Debugln("retrying broadcastTx with nonce", c.accSeq)
res, err = c.broadcastTx(c.ctx, c.txFactory, false, msgs...)
}
if err != nil {
resJSON, _ := json.MarshalIndent(res, "", "\t")
c.logger.WithField("size", len(msgs)).WithError(err).Errorln("failed to commit msg batch:", string(resJSON))
return nil, err
}
}

c.accSeq++

return res, nil
}

const (
defaultBroadcastStatusPoll = 100 * time.Millisecond
defaultBroadcastTimeout = 40 * time.Second
)

func (c *cosmosClient) broadcastTx(
clientCtx client.Context,
txf tx.Factory,
Expand Down Expand Up @@ -246,17 +285,48 @@ func (c *cosmosClient) broadcastTx(
return nil, err
}

// broadcast to a Tendermint node
if await {
// BroadcastTxCommit - full synced commit with await
res, err := clientCtx.BroadcastTxCommit(txBytes)
res, err := clientCtx.BroadcastTxSync(txBytes)
if !await || err != nil {
return res, err
}

// BroadcastTxSync - only CheckTx, don't wait confirmation
return clientCtx.BroadcastTxSync(txBytes)
awaitCtx, cancelFn := context.WithTimeout(context.Background(), defaultBroadcastTimeout)
defer cancelFn()

txHash, _ := hex.DecodeString(res.TxHash)
t := time.NewTimer(defaultBroadcastStatusPoll)

for {
select {
case <-awaitCtx.Done():
err := errors.Wrapf(ErrTimedOut, "%s", res.TxHash)
t.Stop()
return nil, err
case <-t.C:
resultTx, err := clientCtx.Client.Tx(awaitCtx, txHash, false)
if err != nil {
if errRes := client.CheckTendermintError(err, txBytes); errRes != nil {
return errRes, err
}

// log.WithError(err).Warningln("Tx Error for Hash:", res.TxHash)

t.Reset(defaultBroadcastStatusPoll)
continue

} else if resultTx.Height > 0 {
res = sdk.NewResponseResultTx(resultTx, res.Tx, res.Timestamp)
t.Stop()
return res, err
}

t.Reset(defaultBroadcastStatusPoll)
}
}
}

var ErrTimedOut = errors.New("tx timed out")

// prepareFactory ensures the account defined by ctx.GetFromAddress() exists and
// if the account number and/or the account sequence number are zero (not set),
// they will be queried for and set on the provided Factory. A new Factory with
Expand Down Expand Up @@ -287,6 +357,8 @@ func (c *cosmosClient) prepareFactory(clientCtx client.Context, txf tx.Factory)
return txf, nil
}

// QueueBroadcastMsg enqueues a list of messages. Messages will added to the queue
// and grouped into Txns in chunks. Use this method to mass broadcast Txns with efficiency.
func (c *cosmosClient) QueueBroadcastMsg(msgs ...sdk.Msg) error {
if !c.canSign {
return ErrReadOnly
Expand Down Expand Up @@ -324,39 +396,33 @@ func (c *cosmosClient) Close() {
}

const (
msgCommitBatchSizeLimit = 512
msgCommitBatchSizeLimit = 1024
msgCommitBatchTimeLimit = 500 * time.Millisecond
)

func (c *cosmosClient) runBatchBroadcast() {
expirationTimer := time.NewTimer(msgCommitBatchTimeLimit)
msgBatch := make([]sdk.Msg, 0, msgCommitBatchSizeLimit)

resetBatch := func() {
msgBatch = msgBatch[:0]

expirationTimer.Reset(msgCommitBatchTimeLimit)
}

submitBatch := func() {
submitBatch := func(toSubmit []sdk.Msg) {
c.syncMux.Lock()
defer c.syncMux.Unlock()

c.txFactory = c.txFactory.WithSequence(c.accSeq)
c.txFactory = c.txFactory.WithAccountNumber(c.accNum)
log.Debugln("broadcastTx with nonce", c.accSeq)
res, err := c.broadcastTx(c.ctx, c.txFactory, false, msgBatch...)
res, err := c.broadcastTx(c.ctx, c.txFactory, true, toSubmit...)
if err != nil {
if strings.Contains(err.Error(), "account sequence mismatch") {
c.syncNonce()
c.txFactory = c.txFactory.WithSequence(c.accSeq)
c.txFactory = c.txFactory.WithAccountNumber(c.accNum)
log.Debugln("retrying broadcastTx with nonce", c.accSeq)
res, err = c.broadcastTx(c.ctx, c.txFactory, false, msgBatch...)
res, err = c.broadcastTx(c.ctx, c.txFactory, true, toSubmit...)
}
if err != nil {
resJSON, _ := json.MarshalIndent(res, "", "\t")
c.logger.WithField("size", len(msgBatch)).WithError(err).Errorln("failed to commit msg batch:", string(resJSON))
c.logger.WithField("size", len(toSubmit)).WithError(err).Errorln("failed to commit msg batch:", string(resJSON))
return
}
}
Expand All @@ -378,7 +444,7 @@ func (c *cosmosClient) runBatchBroadcast() {
if !ok {
// exit required
if len(msgBatch) > 0 {
submitBatch()
submitBatch(msgBatch)
}

close(c.doneC)
Expand All @@ -388,15 +454,22 @@ func (c *cosmosClient) runBatchBroadcast() {
msgBatch = append(msgBatch, msg)

if len(msgBatch) >= msgCommitBatchSizeLimit {
submitBatch()
resetBatch()
toSubmit := msgBatch
msgBatch = msgBatch[:0]
expirationTimer.Reset(msgCommitBatchTimeLimit)

submitBatch(toSubmit)
}
case <-expirationTimer.C:
if len(msgBatch) > 0 {
submitBatch()
}
toSubmit := msgBatch
msgBatch = msgBatch[:0]
expirationTimer.Reset(msgCommitBatchTimeLimit)

resetBatch()
submitBatch(toSubmit)
} else {
expirationTimer.Reset(msgCommitBatchTimeLimit)
}
}
}
}

0 comments on commit 97aa0a4

Please sign in to comment.