Skip to content

Commit

Permalink
Speed up anchor healing
Browse files Browse the repository at this point in the history
  • Loading branch information
firelizzard18 committed Jun 13, 2024
1 parent 400dd01 commit 9a639ef
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 44 deletions.
55 changes: 18 additions & 37 deletions internal/core/healing/anchors.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,13 @@ package healing

import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"strings"
"time"

"github.com/multiformats/go-multiaddr"
"gitlab.com/accumulatenetwork/accumulate/internal/api/private"
"gitlab.com/accumulatenetwork/accumulate/internal/logging"
"gitlab.com/accumulatenetwork/accumulate/pkg/api/v3"
"gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/message"
"gitlab.com/accumulatenetwork/accumulate/pkg/errors"
Expand All @@ -27,13 +25,13 @@ import (
)

type HealAnchorArgs struct {
Client message.AddressedClient
Querier api.Querier
Submitter api.Submitter
NetInfo *NetworkInfo
Known map[[32]byte]*protocol.Transaction
Pretend bool
Wait bool
Client message.AddressedClient
Querier api.Querier
Submit func(...messaging.Message) error
NetInfo *NetworkInfo
Known map[[32]byte]*protocol.Transaction
Pretend bool
Wait bool
}

func HealAnchor(ctx context.Context, args HealAnchorArgs, si SequencedInfo) error {
Expand All @@ -43,9 +41,6 @@ func HealAnchor(ctx context.Context, args HealAnchorArgs, si SequencedInfo) erro
if args.Querier == nil {
args.Querier = args.Client
}
if args.Submitter == nil {
args.Submitter = args.Client
}

// If the message ID is not known, resolve it
var theAnchorTxn *protocol.Transaction
Expand Down Expand Up @@ -247,39 +242,25 @@ func HealAnchor(ctx context.Context, args HealAnchorArgs, si SequencedInfo) erro
} else {
slog.InfoCtx(ctx, "Submitting signatures", "count", len(signatures))

submit := func(env *messaging.Envelope) {
// addr := api.ServiceTypeSubmit.AddressFor(seq.Destination).Multiaddr()
sub, err := args.Submitter.Submit(ctx, env, api.SubmitOptions{})
if err != nil {
b, e := env.MarshalBinary()
if e == nil {
h := sha256.Sum256(b)
b = h[:]
}
slog.ErrorCtx(ctx, "Submission failed", "error", err, "id", env.Messages[0].ID(), "hash", logging.AsHex(b))
}
for _, sub := range sub {
if sub.Success {
slog.InfoCtx(ctx, "Submission succeeded", "id", sub.Status.TxID)
} else {
slog.ErrorCtx(ctx, "Submission failed", "message", sub, "status", sub.Status)
}
}
}

if args.NetInfo.Status.ExecutorVersion.V2Enabled() {
for _, sig := range signatures {
blk := &messaging.BlockAnchor{
Signature: sig.(protocol.KeySignature),
Anchor: seq,
}
submit(&messaging.Envelope{Messages: []messaging.Message{blk}})
err = args.Submit(blk)
}
} else {
env := new(messaging.Envelope)
env.Transaction = []*protocol.Transaction{theAnchorTxn}
env.Signatures = signatures
submit(env)
msg := []messaging.Message{
&messaging.TransactionMessage{Transaction: theAnchorTxn},
}
for _, sig := range signatures {
msg = append(msg, &messaging.SignatureMessage{Signature: sig})
}
err = args.Submit(msg...)
}
if err != nil {
return err
}
}

Expand Down
22 changes: 15 additions & 7 deletions tools/cmd/debug/heal_anchor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/spf13/cobra"
"gitlab.com/accumulatenetwork/accumulate/internal/core/healing"
"gitlab.com/accumulatenetwork/accumulate/pkg/errors"
"gitlab.com/accumulatenetwork/accumulate/pkg/types/messaging"
"gitlab.com/accumulatenetwork/accumulate/pkg/url"
"gitlab.com/accumulatenetwork/accumulate/protocol"
"golang.org/x/exp/slog"
Expand Down Expand Up @@ -72,13 +73,20 @@ func (h *healer) healSingleAnchor(srcId, dstId string, seqNum uint64, txid *url.
var count int
retry:
err := healing.HealAnchor(h.ctx, healing.HealAnchorArgs{
Client: h.C2.ForAddress(nil),
Querier: h.tryEach(),
Submitter: h.C2,
NetInfo: h.net,
Known: txns,
Pretend: pretend,
Wait: waitForTxn,
Client: h.C2.ForAddress(nil),
Querier: h.tryEach(),
NetInfo: h.net,
Known: txns,
Pretend: pretend,
Wait: waitForTxn,
Submit: func(m ...messaging.Message) error {
select {
case h.submit <- m:
return nil
case <-h.ctx.Done():
return errors.NotReady.With("canceled")
}
},
}, healing.SequencedInfo{
Source: srcId,
Destination: dstId,
Expand Down
49 changes: 49 additions & 0 deletions tools/cmd/debug/heal_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"reflect"
"strconv"
"strings"
"sync"
"time"

"github.com/spf13/cobra"
Expand All @@ -34,6 +35,7 @@ import (
client "gitlab.com/accumulatenetwork/accumulate/pkg/client/api/v2"
"gitlab.com/accumulatenetwork/accumulate/pkg/database/keyvalue/bolt"
"gitlab.com/accumulatenetwork/accumulate/pkg/errors"
"gitlab.com/accumulatenetwork/accumulate/pkg/types/messaging"
"gitlab.com/accumulatenetwork/accumulate/pkg/url"
"gitlab.com/accumulatenetwork/accumulate/protocol"
"golang.org/x/exp/slog"
Expand Down Expand Up @@ -86,6 +88,8 @@ type healer struct {
light *light.Client
router routing.Router

submit chan []messaging.Message

accounts map[[32]byte]protocol.Account
}

Expand Down Expand Up @@ -171,6 +175,13 @@ func (h *healer) heal(args []string) {
},
}

wg := new(sync.WaitGroup)
defer wg.Wait()

h.submit = make(chan []messaging.Message)
wg.Add(1)
go h.submitLoop(wg)

if lightDb != "" {
cv2, err := client.New(accumulate.ResolveWellKnownEndpoint(args[0], "v2"))
check(err)
Expand Down Expand Up @@ -258,6 +269,44 @@ func (h *healer) heal(args []string) {
h.healSingle(h, parts[strings.ToLower(srcId)], parts[strings.ToLower(dstId)], seqNo, nil)
}

func (h *healer) submitLoop(wg *sync.WaitGroup) {
defer wg.Done()
t := time.NewTicker(3 * time.Second)
defer t.Stop()

var messages []messaging.Message
var stop bool
for !stop {
select {
case <-h.ctx.Done():
stop = true
case msg := <-h.submit:
messages = append(messages, msg...)
if len(messages) < 50 {
continue
}
case <-t.C:
}
if len(messages) == 0 {
continue
}

env := &messaging.Envelope{Messages: messages}
subs, err := h.C2.Submit(h.ctx, env, api.SubmitOptions{})
messages = messages[:0]
if err != nil {
slog.ErrorCtx(h.ctx, "Submission failed", "error", err, "id", env.Messages[0].ID())
}
for _, sub := range subs {
if sub.Success {
slog.InfoCtx(h.ctx, "Submission succeeded", "id", sub.Status.TxID)
} else {
slog.ErrorCtx(h.ctx, "Submission failed", "message", sub, "status", sub.Status)
}
}
}
}

// getAccount fetches the given account.
func getAccount[T protocol.Account](ctx context.Context, q api.Querier, u *url.URL) T {
r, err := api.Querier2{Querier: q}.QueryAccount(ctx, u, nil)
Expand Down

0 comments on commit 9a639ef

Please sign in to comment.