Skip to content

Commit

Permalink
saving work
Browse files Browse the repository at this point in the history
  • Loading branch information
firelizzard18 committed Sep 8, 2023
1 parent d3854e6 commit b99967c
Show file tree
Hide file tree
Showing 6 changed files with 414 additions and 295 deletions.
115 changes: 79 additions & 36 deletions internal/core/healing/anchors.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,70 @@ import (

"gitlab.com/accumulatenetwork/accumulate/pkg/api/v3"
"gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/message"
"gitlab.com/accumulatenetwork/accumulate/pkg/errors"
"gitlab.com/accumulatenetwork/accumulate/pkg/types/messaging"
"gitlab.com/accumulatenetwork/accumulate/pkg/types/network"
"gitlab.com/accumulatenetwork/accumulate/pkg/url"
"gitlab.com/accumulatenetwork/accumulate/protocol"
"golang.org/x/exp/slog"
)

func HealAnchor(ctx context.Context,
C1 api.Submitter, C2 *message.Client, net *NetworkInfo,
srcUrl, dstUrl *url.URL, seqNum uint64,
theAnchorTxn *protocol.Transaction, sigSets []*api.SignatureSetRecord,
pretend bool,
) error {
srcId, ok := protocol.ParsePartitionUrl(srcUrl)
if !ok {
panic("not a partition: " + srcUrl.String())
type HealAnchorArgs struct {
Client message.AddressedClient
Querier api.Querier
Submitter api.Submitter
NetInfo *NetworkInfo
Known map[[32]byte]*protocol.Transaction
Pretend bool
}

func HealAnchor(ctx context.Context, args HealAnchorArgs, si SequencedInfo) error {
srcUrl := protocol.PartitionUrl(si.Source)
dstUrl := protocol.PartitionUrl(si.Destination)

if args.Querier == nil {
args.Querier = args.Client
}
if args.Submitter == nil {
args.Submitter = args.Client
}

dstId, ok := protocol.ParsePartitionUrl(dstUrl)
if !ok {
panic("not a partition: " + dstUrl.String())
// If the message ID is not known, resolve it
var theAnchorTxn *protocol.Transaction
if si.ID == nil {
r, err := ResolveSequenced[*messaging.TransactionMessage](ctx, args.Client, args.NetInfo, si.Source, si.Destination, si.Number, true)
if err != nil {
return err
}
si.ID = r.ID
theAnchorTxn = r.Message.Transaction
}

// Fetch the transaction and signatures
var sigSets []*api.SignatureSetRecord
res, err := api.Querier2{Querier: args.Querier}.QueryMessage(ctx, si.ID, nil)
switch {
case err == nil:
seq, ok := res.Message.(*messaging.SequencedMessage)
if !ok {
return errors.InternalError.WithFormat("expected %v, got %v", messaging.MessageTypeSequenced, res.Message.Type())
}
txn, ok := seq.Message.(*messaging.TransactionMessage)
if !ok {
return errors.InternalError.WithFormat("expected %v, got %v", messaging.MessageTypeTransaction, seq.Message.Type())
}

theAnchorTxn = txn.Transaction
sigSets = res.Signatures.Records

case !errors.Is(err, errors.NotFound):
return err

case theAnchorTxn == nil:
var ok bool
theAnchorTxn, ok = args.Known[si.ID.Hash()]
if !ok {
return err
}
}

// Mark which validators have signed
Expand All @@ -52,18 +95,18 @@ func HealAnchor(ctx context.Context,
}

g := &network.GlobalValues{
Oracle: net.Status.Oracle,
Globals: net.Status.Globals,
Network: net.Status.Network,
Routing: net.Status.Routing,
ExecutorVersion: net.Status.ExecutorVersion,
Oracle: args.NetInfo.Status.Oracle,
Globals: args.NetInfo.Status.Globals,
Network: args.NetInfo.Status.Network,
Routing: args.NetInfo.Status.Routing,
ExecutorVersion: args.NetInfo.Status.ExecutorVersion,
}
threshold := g.ValidatorThreshold(srcId)
threshold := g.ValidatorThreshold(si.Source)

lkv := []any{
"source", srcId,
"destination", dstId,
"sequence-number", seqNum,
"source", si.Source,
"destination", si.Destination,
"sequence-number", si.Number,
"want", threshold,
"have", len(signed),
}
Expand All @@ -82,7 +125,7 @@ func HealAnchor(ctx context.Context,
seq := &messaging.SequencedMessage{
Source: srcUrl,
Destination: dstUrl,
Number: seqNum,
Number: si.Number,
}
if theAnchorTxn != nil {
seq.Message = &messaging.TransactionMessage{
Expand All @@ -93,24 +136,24 @@ func HealAnchor(ctx context.Context,
// Get a signature from each node that hasn't signed
var gotPartSig bool
var signatures []protocol.Signature
for peer, info := range net.Peers[strings.ToLower(srcId)] {
for peer, info := range args.NetInfo.Peers[strings.ToLower(si.Source)] {
if signed[info.Key] {
continue
}

ctx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()

slog.InfoCtx(ctx, "Querying node for its signature", "id", peer)
res, err := C2.ForPeer(peer).Private().Sequence(ctx, srcUrl.JoinPath(protocol.AnchorPool), dstUrl, seqNum)
slog.InfoCtx(ctx, "Querying node for its signature", "seq.ID", peer)
res, err := args.Client.ForPeer(peer).Private().Sequence(ctx, srcUrl.JoinPath(protocol.AnchorPool), dstUrl, si.Number)
if err != nil {
slog.ErrorCtx(ctx, "Query failed", "error", err)
continue
}

myTxn, ok := res.Message.(*messaging.TransactionMessage)
if !ok {
slog.ErrorCtx(ctx, "Node gave us an anchor that is not a transaction", "id", info, "type", res.Message.Type())
slog.ErrorCtx(ctx, "Node gave us an anchor that is not a transaction", "seq.ID", info, "type", res.Message.Type())
continue
}
if theAnchorTxn == nil {
Expand All @@ -119,7 +162,7 @@ func HealAnchor(ctx context.Context,
Transaction: theAnchorTxn,
}
} else if !myTxn.Transaction.Equal(theAnchorTxn) {
slog.ErrorCtx(ctx, "Node gave us an anchor with a different hash", "id", info,
slog.ErrorCtx(ctx, "Node gave us an anchor with a different hash", "seq.ID", info,
"expected", hex.EncodeToString(theAnchorTxn.GetHash()),
"got", hex.EncodeToString(myTxn.Transaction.GetHash()))
continue
Expand All @@ -133,7 +176,7 @@ func HealAnchor(ctx context.Context,
continue
}

if net.Status.ExecutorVersion.V2() {
if args.NetInfo.Status.ExecutorVersion.V2() {
sig, ok := msg.Signature.(protocol.KeySignature)
if !ok {
slog.ErrorCtx(ctx, "Node gave us a signature that is not a key signature", "id", info, "type", sig.Type())
Expand All @@ -159,12 +202,12 @@ func HealAnchor(ctx context.Context,
case protocol.UserSignature:
// Filter out bad signatures
if !sig.Verify(nil, theAnchorTxn.GetHash()) {
slog.ErrorCtx(ctx, "Node gave us an invalid signature", "id", info)
slog.ErrorCtx(ctx, "Node gave us an invalid signature", "seq.ID", info)
continue
}

default:
slog.ErrorCtx(ctx, "Node gave us a signature that is not a user signature", "id", info, "type", sig.Type())
slog.ErrorCtx(ctx, "Node gave us a signature that is not a user signature", "seq.ID", info, "type", sig.Type())
continue
}
}
Expand All @@ -174,12 +217,12 @@ func HealAnchor(ctx context.Context,
}
}

if pretend {
if args.Pretend {
b, err := theAnchorTxn.MarshalBinary()
if err != nil {
panic(err)
}
slog.InfoCtx(ctx, "Would have submitted anchor", "signatures", len(signatures), "source", srcId, "destination", dstId, "number", seqNum, "txn-size", len(b))
slog.InfoCtx(ctx, "Would have submitted anchor", "signatures", len(signatures), "source", si.Source, "destination", si.Destination, "number", si.Number, "txn-size", len(b))
return nil
}

Expand All @@ -192,7 +235,7 @@ func HealAnchor(ctx context.Context,

slog.InfoCtx(ctx, "Submitting signatures", "count", len(signatures))
env := new(messaging.Envelope)
if net.Status.ExecutorVersion.V2() {
if args.NetInfo.Status.ExecutorVersion.V2() {
for i, sig := range signatures {
seq := seq.Copy()
if i > 0 {
Expand All @@ -214,8 +257,8 @@ func HealAnchor(ctx context.Context,
env.Signatures = signatures
}

// addr := api.ServiceTypeSubmit.AddressFor(dstId).Multiaddr()
sub, err := C1.Submit(ctx, env, api.SubmitOptions{})
// addr := api.ServiceTypeSubmit.AddressFor(seq.Destination).Multiaddr()
sub, err := args.Submitter.Submit(ctx, env, api.SubmitOptions{})
if err != nil {
slog.ErrorCtx(ctx, "Submission failed", "error", err)
}
Expand Down
83 changes: 83 additions & 0 deletions internal/core/healing/sequenced.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2023 The Accumulate Authors
//
// Use of this source code is governed by an MIT-style
// license that can be found in the LICENSE file or at
// https://opensource.org/licenses/MIT.

package healing

import (
"context"
"strings"
"time"

"gitlab.com/accumulatenetwork/accumulate/pkg/api/v3"
"gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/message"
"gitlab.com/accumulatenetwork/accumulate/pkg/errors"
"gitlab.com/accumulatenetwork/accumulate/pkg/types/messaging"
"gitlab.com/accumulatenetwork/accumulate/pkg/url"
"gitlab.com/accumulatenetwork/accumulate/protocol"
"golang.org/x/exp/slog"
)

type SequencedInfo struct {
Source string
Destination string
Number uint64
ID *url.TxID
}

// ResolveSequenced resolves an anchor or synthetic message (a sequenced
// message). If the client's address is non-nil, the query will be sent to that
// address. Otherwise, all of the source partition's nodes will be queried in
// order until one responds.
func ResolveSequenced[T messaging.Message](ctx context.Context, client message.AddressedClient, net *NetworkInfo, srcId, dstId string, seqNum uint64, anchor bool) (*api.MessageRecord[T], error) {
srcUrl := protocol.PartitionUrl(srcId)
dstUrl := protocol.PartitionUrl(dstId)

var account string
if anchor {
account = protocol.AnchorPool
} else {
account = protocol.Synthetic
}

// If the client has an address, use that
if client.Address != nil {
slog.InfoCtx(ctx, "Querying node", "address", client.Address)
res, err := client.Private().Sequence(ctx, srcUrl.JoinPath(account), dstUrl, seqNum)
if err != nil {
return nil, err
}

r2, err := api.MessageRecordAs[T](res)
if err != nil {
return nil, err
}
return r2, nil
}

// Otherwise try each node until one succeeds
slog.InfoCtx(ctx, "Resolving the message ID", "source", srcId, "destination", dstId, "number", seqNum)
for peer := range net.Peers[strings.ToLower(srcId)] {
ctx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()

slog.InfoCtx(ctx, "Querying node", "id", peer)
res, err := client.ForPeer(peer).Private().Sequence(ctx, srcUrl.JoinPath(account), dstUrl, seqNum)
if err != nil {
slog.ErrorCtx(ctx, "Query failed", "error", err)
continue
}

r2, err := api.MessageRecordAs[T](res)
if err != nil {
slog.ErrorCtx(ctx, "Query failed", "error", err)
continue
}

return r2, nil
}

return nil, errors.UnknownError.WithFormat("cannot resolve %s→%s #%d", srcId, dstId, seqNum)
}
Loading

0 comments on commit b99967c

Please sign in to comment.