diff --git a/internal/core/healing/anchors.go b/internal/core/healing/anchors.go index 97664f0a4..4724d32e4 100644 --- a/internal/core/healing/anchors.go +++ b/internal/core/healing/anchors.go @@ -14,27 +14,70 @@ import ( "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3" "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/message" + "gitlab.com/accumulatenetwork/accumulate/pkg/errors" "gitlab.com/accumulatenetwork/accumulate/pkg/types/messaging" "gitlab.com/accumulatenetwork/accumulate/pkg/types/network" - "gitlab.com/accumulatenetwork/accumulate/pkg/url" "gitlab.com/accumulatenetwork/accumulate/protocol" "golang.org/x/exp/slog" ) -func HealAnchor(ctx context.Context, - C1 api.Submitter, C2 *message.Client, net *NetworkInfo, - srcUrl, dstUrl *url.URL, seqNum uint64, - theAnchorTxn *protocol.Transaction, sigSets []*api.SignatureSetRecord, - pretend bool, -) error { - srcId, ok := protocol.ParsePartitionUrl(srcUrl) - if !ok { - panic("not a partition: " + srcUrl.String()) +type HealAnchorArgs struct { + Client message.AddressedClient + Querier api.Querier + Submitter api.Submitter + NetInfo *NetworkInfo + Known map[[32]byte]*protocol.Transaction + Pretend bool +} + +func HealAnchor(ctx context.Context, args HealAnchorArgs, si SequencedInfo) error { + srcUrl := protocol.PartitionUrl(si.Source) + dstUrl := protocol.PartitionUrl(si.Destination) + + if args.Querier == nil { + args.Querier = args.Client + } + if args.Submitter == nil { + args.Submitter = args.Client } - dstId, ok := protocol.ParsePartitionUrl(dstUrl) - if !ok { - panic("not a partition: " + dstUrl.String()) + // If the message ID is not known, resolve it + var theAnchorTxn *protocol.Transaction + if si.ID == nil { + r, err := ResolveSequenced[*messaging.TransactionMessage](ctx, args.Client, args.NetInfo, si.Source, si.Destination, si.Number, true) + if err != nil { + return err + } + si.ID = r.ID + theAnchorTxn = r.Message.Transaction + } + + // Fetch the transaction and signatures + var sigSets []*api.SignatureSetRecord + res, err := api.Querier2{Querier: args.Querier}.QueryMessage(ctx, si.ID, nil) + switch { + case err == nil: + seq, ok := res.Message.(*messaging.SequencedMessage) + if !ok { + return errors.InternalError.WithFormat("expected %v, got %v", messaging.MessageTypeSequenced, res.Message.Type()) + } + txn, ok := seq.Message.(*messaging.TransactionMessage) + if !ok { + return errors.InternalError.WithFormat("expected %v, got %v", messaging.MessageTypeTransaction, seq.Message.Type()) + } + + theAnchorTxn = txn.Transaction + sigSets = res.Signatures.Records + + case !errors.Is(err, errors.NotFound): + return err + + case theAnchorTxn == nil: + var ok bool + theAnchorTxn, ok = args.Known[si.ID.Hash()] + if !ok { + return err + } } // Mark which validators have signed @@ -52,18 +95,18 @@ func HealAnchor(ctx context.Context, } g := &network.GlobalValues{ - Oracle: net.Status.Oracle, - Globals: net.Status.Globals, - Network: net.Status.Network, - Routing: net.Status.Routing, - ExecutorVersion: net.Status.ExecutorVersion, + Oracle: args.NetInfo.Status.Oracle, + Globals: args.NetInfo.Status.Globals, + Network: args.NetInfo.Status.Network, + Routing: args.NetInfo.Status.Routing, + ExecutorVersion: args.NetInfo.Status.ExecutorVersion, } - threshold := g.ValidatorThreshold(srcId) + threshold := g.ValidatorThreshold(si.Source) lkv := []any{ - "source", srcId, - "destination", dstId, - "sequence-number", seqNum, + "source", si.Source, + "destination", si.Destination, + "sequence-number", si.Number, "want", threshold, "have", len(signed), } @@ -82,7 +125,7 @@ func HealAnchor(ctx context.Context, seq := &messaging.SequencedMessage{ Source: srcUrl, Destination: dstUrl, - Number: seqNum, + Number: si.Number, } if theAnchorTxn != nil { seq.Message = &messaging.TransactionMessage{ @@ -93,7 +136,7 @@ func HealAnchor(ctx context.Context, // Get a signature from each node that hasn't signed var gotPartSig bool var signatures []protocol.Signature - for peer, info := range net.Peers[strings.ToLower(srcId)] { + for peer, info := range args.NetInfo.Peers[strings.ToLower(si.Source)] { if signed[info.Key] { continue } @@ -101,8 +144,8 @@ func HealAnchor(ctx context.Context, ctx, cancel := context.WithTimeout(ctx, 5*time.Minute) defer cancel() - slog.InfoCtx(ctx, "Querying node for its signature", "id", peer) - res, err := C2.ForPeer(peer).Private().Sequence(ctx, srcUrl.JoinPath(protocol.AnchorPool), dstUrl, seqNum) + slog.InfoCtx(ctx, "Querying node for its signature", "seq.ID", peer) + res, err := args.Client.ForPeer(peer).Private().Sequence(ctx, srcUrl.JoinPath(protocol.AnchorPool), dstUrl, si.Number) if err != nil { slog.ErrorCtx(ctx, "Query failed", "error", err) continue @@ -110,7 +153,7 @@ func HealAnchor(ctx context.Context, myTxn, ok := res.Message.(*messaging.TransactionMessage) if !ok { - slog.ErrorCtx(ctx, "Node gave us an anchor that is not a transaction", "id", info, "type", res.Message.Type()) + slog.ErrorCtx(ctx, "Node gave us an anchor that is not a transaction", "seq.ID", info, "type", res.Message.Type()) continue } if theAnchorTxn == nil { @@ -119,7 +162,7 @@ func HealAnchor(ctx context.Context, Transaction: theAnchorTxn, } } else if !myTxn.Transaction.Equal(theAnchorTxn) { - slog.ErrorCtx(ctx, "Node gave us an anchor with a different hash", "id", info, + slog.ErrorCtx(ctx, "Node gave us an anchor with a different hash", "seq.ID", info, "expected", hex.EncodeToString(theAnchorTxn.GetHash()), "got", hex.EncodeToString(myTxn.Transaction.GetHash())) continue @@ -133,7 +176,7 @@ func HealAnchor(ctx context.Context, continue } - if net.Status.ExecutorVersion.V2() { + if args.NetInfo.Status.ExecutorVersion.V2() { sig, ok := msg.Signature.(protocol.KeySignature) if !ok { slog.ErrorCtx(ctx, "Node gave us a signature that is not a key signature", "id", info, "type", sig.Type()) @@ -159,12 +202,12 @@ func HealAnchor(ctx context.Context, case protocol.UserSignature: // Filter out bad signatures if !sig.Verify(nil, theAnchorTxn.GetHash()) { - slog.ErrorCtx(ctx, "Node gave us an invalid signature", "id", info) + slog.ErrorCtx(ctx, "Node gave us an invalid signature", "seq.ID", info) continue } default: - slog.ErrorCtx(ctx, "Node gave us a signature that is not a user signature", "id", info, "type", sig.Type()) + slog.ErrorCtx(ctx, "Node gave us a signature that is not a user signature", "seq.ID", info, "type", sig.Type()) continue } } @@ -174,12 +217,12 @@ func HealAnchor(ctx context.Context, } } - if pretend { + if args.Pretend { b, err := theAnchorTxn.MarshalBinary() if err != nil { panic(err) } - slog.InfoCtx(ctx, "Would have submitted anchor", "signatures", len(signatures), "source", srcId, "destination", dstId, "number", seqNum, "txn-size", len(b)) + slog.InfoCtx(ctx, "Would have submitted anchor", "signatures", len(signatures), "source", si.Source, "destination", si.Destination, "number", si.Number, "txn-size", len(b)) return nil } @@ -192,7 +235,7 @@ func HealAnchor(ctx context.Context, slog.InfoCtx(ctx, "Submitting signatures", "count", len(signatures)) env := new(messaging.Envelope) - if net.Status.ExecutorVersion.V2() { + if args.NetInfo.Status.ExecutorVersion.V2() { for i, sig := range signatures { seq := seq.Copy() if i > 0 { @@ -214,8 +257,8 @@ func HealAnchor(ctx context.Context, env.Signatures = signatures } - // addr := api.ServiceTypeSubmit.AddressFor(dstId).Multiaddr() - sub, err := C1.Submit(ctx, env, api.SubmitOptions{}) + // addr := api.ServiceTypeSubmit.AddressFor(seq.Destination).Multiaddr() + sub, err := args.Submitter.Submit(ctx, env, api.SubmitOptions{}) if err != nil { slog.ErrorCtx(ctx, "Submission failed", "error", err) } diff --git a/internal/core/healing/sequenced.go b/internal/core/healing/sequenced.go new file mode 100644 index 000000000..0a10d551d --- /dev/null +++ b/internal/core/healing/sequenced.go @@ -0,0 +1,83 @@ +// Copyright 2023 The Accumulate Authors +// +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file or at +// https://opensource.org/licenses/MIT. + +package healing + +import ( + "context" + "strings" + "time" + + "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3" + "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/message" + "gitlab.com/accumulatenetwork/accumulate/pkg/errors" + "gitlab.com/accumulatenetwork/accumulate/pkg/types/messaging" + "gitlab.com/accumulatenetwork/accumulate/pkg/url" + "gitlab.com/accumulatenetwork/accumulate/protocol" + "golang.org/x/exp/slog" +) + +type SequencedInfo struct { + Source string + Destination string + Number uint64 + ID *url.TxID +} + +// ResolveSequenced resolves an anchor or synthetic message (a sequenced +// message). If the client's address is non-nil, the query will be sent to that +// address. Otherwise, all of the source partition's nodes will be queried in +// order until one responds. +func ResolveSequenced[T messaging.Message](ctx context.Context, client message.AddressedClient, net *NetworkInfo, srcId, dstId string, seqNum uint64, anchor bool) (*api.MessageRecord[T], error) { + srcUrl := protocol.PartitionUrl(srcId) + dstUrl := protocol.PartitionUrl(dstId) + + var account string + if anchor { + account = protocol.AnchorPool + } else { + account = protocol.Synthetic + } + + // If the client has an address, use that + if client.Address != nil { + slog.InfoCtx(ctx, "Querying node", "address", client.Address) + res, err := client.Private().Sequence(ctx, srcUrl.JoinPath(account), dstUrl, seqNum) + if err != nil { + return nil, err + } + + r2, err := api.MessageRecordAs[T](res) + if err != nil { + return nil, err + } + return r2, nil + } + + // Otherwise try each node until one succeeds + slog.InfoCtx(ctx, "Resolving the message ID", "source", srcId, "destination", dstId, "number", seqNum) + for peer := range net.Peers[strings.ToLower(srcId)] { + ctx, cancel := context.WithTimeout(ctx, 5*time.Minute) + defer cancel() + + slog.InfoCtx(ctx, "Querying node", "id", peer) + res, err := client.ForPeer(peer).Private().Sequence(ctx, srcUrl.JoinPath(account), dstUrl, seqNum) + if err != nil { + slog.ErrorCtx(ctx, "Query failed", "error", err) + continue + } + + r2, err := api.MessageRecordAs[T](res) + if err != nil { + slog.ErrorCtx(ctx, "Query failed", "error", err) + continue + } + + return r2, nil + } + + return nil, errors.UnknownError.WithFormat("cannot resolve %s→%s #%d", srcId, dstId, seqNum) +} diff --git a/internal/core/healing/synthetic.go b/internal/core/healing/synthetic.go new file mode 100644 index 000000000..1d9bf3dd0 --- /dev/null +++ b/internal/core/healing/synthetic.go @@ -0,0 +1,130 @@ +// Copyright 2023 The Accumulate Authors +// +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file or at +// https://opensource.org/licenses/MIT. + +package healing + +import ( + "context" + "fmt" + "strings" + "time" + + "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3" + "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/message" + "gitlab.com/accumulatenetwork/accumulate/pkg/errors" + "gitlab.com/accumulatenetwork/accumulate/pkg/types/messaging" + "gitlab.com/accumulatenetwork/accumulate/protocol" + "golang.org/x/exp/slog" +) + +type HealSyntheticArgs struct { + Client message.AddressedClient + Querier api.Querier + Submitter api.Submitter + NetInfo *NetworkInfo + Pretend bool + Wait bool +} + +func HealSynthetic(ctx context.Context, args HealSyntheticArgs, si SequencedInfo) error { + if args.Querier == nil { + args.Querier = args.Client + } + if args.Submitter == nil { + args.Submitter = args.Client + } + + // Query the synthetic transaction + r, err := ResolveSequenced[messaging.Message](ctx, args.Client, args.NetInfo, si.Source, si.Destination, si.Number, false) + if err != nil { + return err + } + + slog.InfoCtx(ctx, "Resubmitting", "source", si.Source, "destination", si.Destination, "number", si.Number, "id", r.Message.ID()) + + // Submit the synthetic transaction directly to the destination partition + h := r.Sequence.Hash() + msg := &messaging.SyntheticMessage{ + Message: r.Sequence, + Proof: &protocol.AnnotatedReceipt{ + Receipt: r.SourceReceipt, + Anchor: &protocol.AnchorMetadata{ + Account: protocol.DnUrl(), + }, + }, + } + for _, sigs := range r.Signatures.Records { + for _, sig := range sigs.Signatures.Records { + sig, ok := sig.Message.(*messaging.SignatureMessage) + if !ok { + continue + } + ks, ok := sig.Signature.(protocol.KeySignature) + if !ok { + continue + } + msg.Signature = ks + } + } + if msg.Signature == nil { + return fmt.Errorf("synthetic message is not signed") + } + + h = msg.Message.Hash() + if !msg.Signature.Verify(nil, h[:]) { + return fmt.Errorf("signature is not valid") + } + + if args.Pretend { + return nil + } + + // Submit directly to an appropriate node + if args.Client.Address == nil { + for peer := range args.NetInfo.Peers[strings.ToLower(si.Destination)] { + args.Client = args.Client.ForPeer(peer) + break + } + } + + sub, err := args.Client.Submit(ctx, &messaging.Envelope{Messages: []messaging.Message{msg}}, api.SubmitOptions{}) + if err != nil { + slog.ErrorCtx(ctx, "Submission failed", "error", err) + } + for _, sub := range sub { + if !sub.Success { + slog.ErrorCtx(ctx, "Submission failed", "message", sub, "status", sub.Status) + } + } + if !args.Wait { + return nil + } + + Q := api.Querier2{Querier: args.Client} + txid := r.Sequence.Destination.WithTxID(msg.Hash()) + slog.InfoCtx(ctx, "Waiting", "for", txid) + for { + time.Sleep(time.Second) + r, err := Q.QueryMessage(ctx, txid, nil) + switch { + case errors.Is(err, errors.NotFound): + // Not found, wait + continue + + case err != nil: + // Unknown error + return err + + case !r.Status.Delivered(): + // Pending, wait + continue + + default: + // Failed? + return r.AsError() + } + } +} diff --git a/tools/cmd/debug/heal_anchor.go b/tools/cmd/debug/heal_anchor.go index 732777dce..db0c70199 100644 --- a/tools/cmd/debug/heal_anchor.go +++ b/tools/cmd/debug/heal_anchor.go @@ -15,7 +15,7 @@ import ( "strings" "time" - "github.com/multiformats/go-multiaddr" + "github.com/libp2p/go-libp2p/core/peer" "github.com/spf13/cobra" "gitlab.com/accumulatenetwork/accumulate/internal/api/routing" "gitlab.com/accumulatenetwork/accumulate/internal/core/healing" @@ -23,11 +23,8 @@ import ( "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/jsonrpc" "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/message" "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/p2p" - "gitlab.com/accumulatenetwork/accumulate/pkg/errors" - "gitlab.com/accumulatenetwork/accumulate/pkg/types/messaging" "gitlab.com/accumulatenetwork/accumulate/pkg/url" "gitlab.com/accumulatenetwork/accumulate/protocol" - "golang.org/x/exp/slog" ) var cmdHealAnchor = &cobra.Command{ @@ -43,64 +40,15 @@ func init() { _ = cmdHealAnchor.MarkFlagFilename("cached-scan", ".json") } -var mainnetAddrs = func() []multiaddr.Multiaddr { - s := []string{ - "/dns/apollo-mainnet.accumulate.defidevs.io/tcp/16593/p2p/12D3KooWAgrBYpWEXRViTnToNmpCoC3dvHdmR6m1FmyKjDn1NYpj", - "/dns/yutu-mainnet.accumulate.defidevs.io/tcp/16593/p2p/12D3KooWDqFDwjHEog1bNbxai2dKSaR1aFvq2LAZ2jivSohgoSc7", - "/dns/chandrayaan-mainnet.accumulate.defidevs.io/tcp/16593/p2p/12D3KooWHzjkoeAqe7L55tAaepCbMbhvNu9v52ayZNVQobdEE1RL", - "/ip4/116.202.214.38/tcp/16593/p2p/12D3KooWBkJQiuvotpMemWBYfAe4ctsVHi7fLvT8RT83oXJ5dsgV", - "/ip4/83.97.19.82/tcp/16593/p2p/12D3KooWHSbqS6K52d4ReauHAg4n8MFbAKkdEAae2fZXnzRYi9ce", - "/ip4/206.189.97.165/tcp/16593/p2p/12D3KooWHyA7zgAVqGvCBBJejgvKzv7DQZ3LabJMWqmCQ9wFbT3o", - "/ip4/144.76.105.23/tcp/16593/p2p/12D3KooWS2Adojqun5RV1Xy4k6vKXWpRQ3VdzXnW8SbW7ERzqKie", - "/ip4/18.190.77.236/tcp/16593/p2p/12D3KooWP1d9vUJCzqX5bTv13tCHmVssJrgK3EnJCC2C5Ep2SXbS", - "/ip4/3.28.207.55/tcp/16593/p2p/12D3KooWEzhg3CRvC3xdrUBFsWETF1nG3gyYfEjx4oEJer95y1Rk", - "/ip4/38.135.195.81/tcp/16593/p2p/12D3KooWDWCHGAyeUWdP8yuuSYvMoUfaPoGu4p3gJb51diqNQz6j", - // "/ip4/50.17.246.3/tcp/16593/p2p/12D3KooWKkNsxkHJqvSje2viyqKVxtqvbTpFrbASD3q1uv6td1pW", - "/dns/validator-eu01.acme.sphereon.com/tcp/16593/p2p/12D3KooWKYTWKJ5jeuZmbbwiN7PoinJ2yJLoQtZyfWi2ihjBnSUR", - "/ip4/35.86.120.53/tcp/16593/p2p/12D3KooWKJuspMDC5GXzLYJs9nHwYfqst9QAW4m5FakXNHVMNiq7", - "/ip4/65.109.48.173/tcp/16593/p2p/12D3KooWHkUtGcHY96bNavZMCP2k5ps5mC7GrF1hBC1CsyGJZSPY", - "/dns/accumulate.detroitledger.tech/tcp/16593/p2p/12D3KooWNe1QNh5mKAa8iAEP8vFwvmWFxaCLNcAdE1sH38Bz8sc9", - "/ip4/3.135.9.97/tcp/16593/p2p/12D3KooWEQG3X528Ct2Kd3kxhv6WZDBqaAoEw7AKiPoK1NmWJgx1", - // "/ip4/3.86.85.133/tcp/16593/p2p/12D3KooWJvReA1SuLkppyXKXq6fifVPLqvNtzsvPUqagVjvYe7qe", - "/ip4/193.35.56.176/tcp/16593/p2p/12D3KooWJevZUFLqN7zAamDh2EEYNQZPvxGFwiFVyPXfuXZNjg1J", - "/ip4/35.177.70.195/tcp/16593/p2p/12D3KooWPzpRp1UCu4nvXT9h8jKvmBmCADrMnoF72DrEbUrWrB2G", - "/ip4/3.99.81.122/tcp/16593/p2p/12D3KooWLL5kAbD7nhv6CM9x9L1zjxSnc6hdMVKcsK9wzMGBo99X", - "/ip4/34.219.75.234/tcp/16593/p2p/12D3KooWKHjS5nzG9dipBXn31pYEnfa8g5UzvkSYEsuiukGHzPvt", - "/ip4/3.122.254.53/tcp/16593/p2p/12D3KooWRU8obVzgfw6TsUHjoy2FDD3Vd7swrPNTM7DMFs8JG4dx", - "/ip4/35.92.228.236/tcp/16593/p2p/12D3KooWQqMqbyJ2Zay9KHeEDgDMAxQpKD1ypiBX5ByQAA2XpsZL", - "/ip4/3.135.184.194/tcp/16593/p2p/12D3KooWHcxyiE3AGdPnhtj87tByfLnJZVR6mLefadWccbMByrBa", - "/ip4/18.133.170.113/tcp/16593/p2p/12D3KooWFbWY2NhBEWTLHUCwwPmNHm4BoJXbojnrJJfuDCVoqrFY", - // "/ip4/44.204.224.126/tcp/16593/p2p/12D3KooWAiJJxdgsB39up5h6fz6TSfBz4HsLKTFiBXUrbwA8o54m", - "/ip4/35.92.21.90/tcp/16593/p2p/12D3KooWLTV3pTN2NbKeFeseCGHyMXuAkQv68KfCeK4uqJzJMfhZ", - "/ip4/3.99.166.147/tcp/16593/p2p/12D3KooWGYUf93iYWsUibSvKdxsYUY1p7fC1nQotCpUcDXD1ABvR", - "/ip4/16.171.4.135/tcp/16593/p2p/12D3KooWEMpAxKnXJPkcEXpDmrnjrZ5iFMZvvQtimmTTxuoRGkXV", - "/ip4/54.237.244.42/tcp/16593/p2p/12D3KooWLoMkrgW862Gs152jLt6FiZZs4GkY24Su4QojnvMoSNaQ", - // "/ip4/3.238.124.43/tcp/16593/p2p/12D3KooWJ8CA8pacTnKWVgBSEav4QG1zJpyeSSME47RugpDUrZp8", - "/ip4/13.53.125.115/tcp/16593/p2p/12D3KooWBJk52fQExXHWhFNk692hP7JvTxNTvUMdVne8tbJ3DBf3", - "/ip4/13.59.241.224/tcp/16593/p2p/12D3KooWKjYKqg2TgUSLq8CZAP8G6LhjXUWTcQBd9qYL2JHug9HW", - "/ip4/18.168.202.86/tcp/16593/p2p/12D3KooWDiKGbUZg1rB5EufRCkRPiDCEPMjyvTfTVR9qsKVVkcuC", - "/ip4/35.183.112.161/tcp/16593/p2p/12D3KooWFPKeXzKMd3jtoeG6ts6ADKmVV8rVkXR9k9YkQPgpLzd6", - } - addrs := make([]multiaddr.Multiaddr, len(s)) - for i, s := range s { - addr, err := multiaddr.NewMultiaddr(s) - if err != nil { - panic(err) - } - addrs[i] = addr - } - return addrs -}() - func healAnchor(_ *cobra.Command, args []string) { ctx, cancel, _ := api.ContextWithBatchData(context.Background()) defer cancel() networkID := args[0] node, err := p2p.New(p2p.Options{ - Network: networkID, - // BootstrapPeers: api.BootstrapServers, - BootstrapPeers: mainnetAddrs, + Network: networkID, + BootstrapPeers: api.BootstrapServers, + // BootstrapPeers: mainnetAddrs, }) checkf(err, "start p2p node") defer func() { _ = node.Close() }() @@ -112,9 +60,7 @@ func healAnchor(_ *cobra.Command, args []string) { // We should be able to use only the p2p client but it doesn't work well for // some reason - ///C1 := jsonrpc.NewClient(api.ResolveWellKnownEndpoint(networkID)) - // C1 := jsonrpc.NewClient(api.ResolveWellKnownEndpoint("http://65.109.48.173:16695/v3")) - C1 := jsonrpc.NewClient(api.ResolveWellKnownEndpoint("http://apollo-mainnet.accumulate.defidevs.io:16695/v3")) + C1 := jsonrpc.NewClient(api.ResolveWellKnownEndpoint(networkID)) C1.Client.Timeout = time.Hour // Use a hack dialer that uses the API for peer discovery @@ -122,8 +68,8 @@ func healAnchor(_ *cobra.Command, args []string) { C2 := &message.Client{ Transport: &message.RoutedTransport{ Network: networkID, - Dialer: node.DialNetwork(), - // Dialer: &hackDialer{C1, node.DialNetwork(), map[string]peer.ID{}}, + // Dialer: node.DialNetwork(), + Dialer: &hackDialer{C1, node.DialNetwork(), map[string]peer.ID{}}, Router: router, }, } @@ -146,9 +92,9 @@ func healAnchor(_ *cobra.Command, args []string) { if r.Sequence == nil { fatalf("%v is not sequenced", txid) } - - err = healing.HealAnchor(ctx, C1, C2, net, r.Sequence.Source, r.Sequence.Destination, r.Sequence.Number, r.Message.Transaction, r.Signatures.Records, pretend) - check(err) + srcId, _ := protocol.ParsePartitionUrl(r.Sequence.Source) + dstId, _ := protocol.ParsePartitionUrl(r.Sequence.Destination) + healSingleAnchor(ctx, C1, C2, net, srcId, dstId, r.Sequence.Number, txid, nil) return } @@ -213,105 +159,18 @@ func healAnchorSequence(ctx context.Context, C1 *jsonrpc.Client, C2 *message.Cli } func healSingleAnchor(ctx context.Context, C1 *jsonrpc.Client, C2 *message.Client, net *healing.NetworkInfo, srcId, dstId string, seqNum uint64, txid *url.TxID, txns map[[32]byte]*protocol.Transaction) { - srcUrl := protocol.PartitionUrl(srcId) - dstUrl := protocol.PartitionUrl(dstId) - - if txid == nil { - // Get a signature from each node that hasn't signed - r := resolveSeq[*messaging.TransactionMessage](ctx, C2.ForAddress(nil), net, srcId, dstId, seqNum, true) - txid = r.ID - txns[txid.Hash()] = r.Message.Transaction - } - - var txn *protocol.Transaction - var sigSets []*api.SignatureSetRecord - res, err := api.Querier2{Querier: C1}.QueryTransaction(ctx, txid, nil) - switch { - case err == nil: - txn = res.Message.Transaction - sigSets = res.Signatures.Records - case !errors.Is(err, errors.NotFound): - //check to see if the message is sequence message - res, err := api.Querier2{Querier: C1}.QueryMessage(ctx, txid, nil) - if err != nil { - slog.ErrorCtx(ctx, "Query message failed", "error", err) - return - } - - seq, ok := res.Message.(*messaging.SequencedMessage) - if !ok { - slog.ErrorCtx(ctx, "Message receieved was not a sequenced message") - return - } - txm, ok := seq.Message.(*messaging.TransactionMessage) - if !ok { - slog.ErrorCtx(ctx, "Sequenced message does not contain a transaction message") - return - } - - txn = txm.Transaction - sigSets = res.Signatures.Records - default: - var ok bool - txn, ok = txns[txid.Hash()] - if !ok { - check(err) - } - } - err = healing.HealAnchor(ctx, C1, C2, net, srcUrl, dstUrl, seqNum, txn, sigSets, pretend) + err := healing.HealAnchor(ctx, healing.HealAnchorArgs{ + Client: C2.ForAddress(nil), + Querier: C1, + Submitter: C1, + NetInfo: net, + Known: txns, + Pretend: pretend, + }, healing.SequencedInfo{ + Source: srcId, + Destination: dstId, + Number: seqNum, + ID: txid, + }) check(err) } - -func resolveSeq[T messaging.Message](ctx context.Context, C2 message.AddressedClient, net *healing.NetworkInfo, srcId, dstId string, seqNum uint64, anchor bool) *api.MessageRecord[T] { - srcUrl := protocol.PartitionUrl(srcId) - dstUrl := protocol.PartitionUrl(dstId) - - var account string - if anchor { - account = protocol.AnchorPool - } else { - account = protocol.Synthetic - } - - if C2.Address != nil { - slog.InfoCtx(ctx, "Querying node", "address", C2.Address) - res, err := C2.Private().Sequence(ctx, srcUrl.JoinPath(account), dstUrl, seqNum) - check(err) - - r2, err := api.MessageRecordAs[T](res) - check(err) - return r2 - } - - // Get a signature from each node that hasn't signed - slog.InfoCtx(ctx, "Resolving the message ID", "source", srcId, "destination", dstId, "number", seqNum) - for peer := range net.Peers[strings.ToLower(srcId)] { - ctx, cancel := context.WithTimeout(ctx, 5*time.Minute) - defer cancel() - - slog.InfoCtx(ctx, "Querying node", "id", peer) - res, err := C2.ForPeer(peer).Private().Sequence(ctx, srcUrl.JoinPath(account), dstUrl, seqNum) - if err != nil { - slog.ErrorCtx(ctx, "Query failed", "error", err) - continue - } - - r2, err := api.MessageRecordAs[T](res) - if err != nil { - slog.ErrorCtx(ctx, "Query failed", "error", err) - continue - } - - return r2 - } - - fatalf("cannot resolve %s→%s #%d", srcId, dstId, seqNum) - panic("not reached") -} - -func getAccount[T protocol.Account](C api.Querier, ctx context.Context, u *url.URL) T { - var v T - _, err := api.Querier2{Querier: C}.QueryAccountAs(ctx, u, nil, &v) - checkf(err, "get %v", u) - return v -} diff --git a/tools/cmd/debug/heal_common.go b/tools/cmd/debug/heal_common.go new file mode 100644 index 000000000..4e18ac118 --- /dev/null +++ b/tools/cmd/debug/heal_common.go @@ -0,0 +1,86 @@ +// Copyright 2023 The Accumulate Authors +// +// Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file or at +// https://opensource.org/licenses/MIT. + +package main + +import ( + "context" + + "github.com/multiformats/go-multiaddr" + "gitlab.com/accumulatenetwork/accumulate/internal/core/healing" + "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3" + "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/message" + "gitlab.com/accumulatenetwork/accumulate/pkg/types/messaging" + "gitlab.com/accumulatenetwork/accumulate/pkg/url" + "gitlab.com/accumulatenetwork/accumulate/protocol" +) + +// resolveSeq resolves an anchor or synthetic message (a sequenced message). If +// the client's address is non-nil, the query will be sent to that address. +// Otherwise, all of the source partition's nodes will be queried in order until +// one responds. +func resolveSeq[T messaging.Message](ctx context.Context, client message.AddressedClient, net *healing.NetworkInfo, srcId, dstId string, seqNum uint64, anchor bool) *api.MessageRecord[T] { + r, err := healing.ResolveSequenced[T](ctx, client, net, srcId, dstId, seqNum, anchor) + check(err) + return r +} + +// getAccount fetches the given account. +func getAccount[T protocol.Account](C api.Querier, ctx context.Context, u *url.URL) T { + var v T + _, err := api.Querier2{Querier: C}.QueryAccountAs(ctx, u, nil, &v) + checkf(err, "get %v", u) + return v +} + +var mainnetAddrs = func() []multiaddr.Multiaddr { + s := []string{ + "/dns/apollo-mainnet.accumulate.defidevs.io/tcp/16593/p2p/12D3KooWAgrBYpWEXRViTnToNmpCoC3dvHdmR6m1FmyKjDn1NYpj", + "/dns/yutu-mainnet.accumulate.defidevs.io/tcp/16593/p2p/12D3KooWDqFDwjHEog1bNbxai2dKSaR1aFvq2LAZ2jivSohgoSc7", + "/dns/chandrayaan-mainnet.accumulate.defidevs.io/tcp/16593/p2p/12D3KooWHzjkoeAqe7L55tAaepCbMbhvNu9v52ayZNVQobdEE1RL", + "/ip4/116.202.214.38/tcp/16593/p2p/12D3KooWBkJQiuvotpMemWBYfAe4ctsVHi7fLvT8RT83oXJ5dsgV", + "/ip4/83.97.19.82/tcp/16593/p2p/12D3KooWHSbqS6K52d4ReauHAg4n8MFbAKkdEAae2fZXnzRYi9ce", + "/ip4/206.189.97.165/tcp/16593/p2p/12D3KooWHyA7zgAVqGvCBBJejgvKzv7DQZ3LabJMWqmCQ9wFbT3o", + "/ip4/144.76.105.23/tcp/16593/p2p/12D3KooWS2Adojqun5RV1Xy4k6vKXWpRQ3VdzXnW8SbW7ERzqKie", + "/ip4/18.190.77.236/tcp/16593/p2p/12D3KooWP1d9vUJCzqX5bTv13tCHmVssJrgK3EnJCC2C5Ep2SXbS", + "/ip4/3.28.207.55/tcp/16593/p2p/12D3KooWEzhg3CRvC3xdrUBFsWETF1nG3gyYfEjx4oEJer95y1Rk", + "/ip4/38.135.195.81/tcp/16593/p2p/12D3KooWDWCHGAyeUWdP8yuuSYvMoUfaPoGu4p3gJb51diqNQz6j", + // "/ip4/50.17.246.3/tcp/16593/p2p/12D3KooWKkNsxkHJqvSje2viyqKVxtqvbTpFrbASD3q1uv6td1pW", + "/dns/validator-eu01.acme.sphereon.com/tcp/16593/p2p/12D3KooWKYTWKJ5jeuZmbbwiN7PoinJ2yJLoQtZyfWi2ihjBnSUR", + "/ip4/35.86.120.53/tcp/16593/p2p/12D3KooWKJuspMDC5GXzLYJs9nHwYfqst9QAW4m5FakXNHVMNiq7", + "/ip4/65.109.48.173/tcp/16593/p2p/12D3KooWHkUtGcHY96bNavZMCP2k5ps5mC7GrF1hBC1CsyGJZSPY", + "/dns/accumulate.detroitledger.tech/tcp/16593/p2p/12D3KooWNe1QNh5mKAa8iAEP8vFwvmWFxaCLNcAdE1sH38Bz8sc9", + "/ip4/3.135.9.97/tcp/16593/p2p/12D3KooWEQG3X528Ct2Kd3kxhv6WZDBqaAoEw7AKiPoK1NmWJgx1", + // "/ip4/3.86.85.133/tcp/16593/p2p/12D3KooWJvReA1SuLkppyXKXq6fifVPLqvNtzsvPUqagVjvYe7qe", + "/ip4/193.35.56.176/tcp/16593/p2p/12D3KooWJevZUFLqN7zAamDh2EEYNQZPvxGFwiFVyPXfuXZNjg1J", + "/ip4/35.177.70.195/tcp/16593/p2p/12D3KooWPzpRp1UCu4nvXT9h8jKvmBmCADrMnoF72DrEbUrWrB2G", + "/ip4/3.99.81.122/tcp/16593/p2p/12D3KooWLL5kAbD7nhv6CM9x9L1zjxSnc6hdMVKcsK9wzMGBo99X", + "/ip4/34.219.75.234/tcp/16593/p2p/12D3KooWKHjS5nzG9dipBXn31pYEnfa8g5UzvkSYEsuiukGHzPvt", + "/ip4/3.122.254.53/tcp/16593/p2p/12D3KooWRU8obVzgfw6TsUHjoy2FDD3Vd7swrPNTM7DMFs8JG4dx", + "/ip4/35.92.228.236/tcp/16593/p2p/12D3KooWQqMqbyJ2Zay9KHeEDgDMAxQpKD1ypiBX5ByQAA2XpsZL", + "/ip4/3.135.184.194/tcp/16593/p2p/12D3KooWHcxyiE3AGdPnhtj87tByfLnJZVR6mLefadWccbMByrBa", + "/ip4/18.133.170.113/tcp/16593/p2p/12D3KooWFbWY2NhBEWTLHUCwwPmNHm4BoJXbojnrJJfuDCVoqrFY", + // "/ip4/44.204.224.126/tcp/16593/p2p/12D3KooWAiJJxdgsB39up5h6fz6TSfBz4HsLKTFiBXUrbwA8o54m", + "/ip4/35.92.21.90/tcp/16593/p2p/12D3KooWLTV3pTN2NbKeFeseCGHyMXuAkQv68KfCeK4uqJzJMfhZ", + "/ip4/3.99.166.147/tcp/16593/p2p/12D3KooWGYUf93iYWsUibSvKdxsYUY1p7fC1nQotCpUcDXD1ABvR", + "/ip4/16.171.4.135/tcp/16593/p2p/12D3KooWEMpAxKnXJPkcEXpDmrnjrZ5iFMZvvQtimmTTxuoRGkXV", + "/ip4/54.237.244.42/tcp/16593/p2p/12D3KooWLoMkrgW862Gs152jLt6FiZZs4GkY24Su4QojnvMoSNaQ", + // "/ip4/3.238.124.43/tcp/16593/p2p/12D3KooWJ8CA8pacTnKWVgBSEav4QG1zJpyeSSME47RugpDUrZp8", + "/ip4/13.53.125.115/tcp/16593/p2p/12D3KooWBJk52fQExXHWhFNk692hP7JvTxNTvUMdVne8tbJ3DBf3", + "/ip4/13.59.241.224/tcp/16593/p2p/12D3KooWKjYKqg2TgUSLq8CZAP8G6LhjXUWTcQBd9qYL2JHug9HW", + "/ip4/18.168.202.86/tcp/16593/p2p/12D3KooWDiKGbUZg1rB5EufRCkRPiDCEPMjyvTfTVR9qsKVVkcuC", + "/ip4/35.183.112.161/tcp/16593/p2p/12D3KooWFPKeXzKMd3jtoeG6ts6ADKmVV8rVkXR9k9YkQPgpLzd6", + } + addrs := make([]multiaddr.Multiaddr, len(s)) + for i, s := range s { + addr, err := multiaddr.NewMultiaddr(s) + if err != nil { + panic(err) + } + addrs[i] = addr + } + return addrs +}() diff --git a/tools/cmd/debug/heal_synth.go b/tools/cmd/debug/heal_synth.go index 7d8dcd216..54830a700 100644 --- a/tools/cmd/debug/heal_synth.go +++ b/tools/cmd/debug/heal_synth.go @@ -11,7 +11,6 @@ import ( "encoding/json" "fmt" "os" - "strings" "time" "github.com/libp2p/go-libp2p/core/peer" @@ -22,10 +21,7 @@ import ( "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/jsonrpc" "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/message" "gitlab.com/accumulatenetwork/accumulate/pkg/api/v3/p2p" - "gitlab.com/accumulatenetwork/accumulate/pkg/errors" - "gitlab.com/accumulatenetwork/accumulate/pkg/types/messaging" "gitlab.com/accumulatenetwork/accumulate/protocol" - "golang.org/x/exp/slog" ) var cmdHeal = &cobra.Command{ @@ -144,95 +140,17 @@ func healSynth(_ *cobra.Command, args []string) { } func resubmitByNumber(ctx context.Context, C2 message.AddressedClient, net *healing.NetworkInfo, source, destination string, number uint64) { - // Query the synthetic transaction - r := resolveSeq[messaging.Message](ctx, C2, net, source, destination, number, false) - - slog.InfoCtx(ctx, "Resubmitting", "source", source, "destination", destination, "number", number, "id", r.Message.ID()) - - // Submit the synthetic transaction directly to the destination partition - h := r.Sequence.Hash() - msg := &messaging.SyntheticMessage{ - Message: r.Sequence, - Proof: &protocol.AnnotatedReceipt{ - Receipt: r.SourceReceipt, - Anchor: &protocol.AnchorMetadata{ - Account: protocol.DnUrl(), - }, - }, - } - for _, sigs := range r.Signatures.Records { - for _, sig := range sigs.Signatures.Records { - sig, ok := sig.Message.(*messaging.SignatureMessage) - if !ok { - continue - } - ks, ok := sig.Signature.(protocol.KeySignature) - if !ok { - continue - } - msg.Signature = ks - } - } - if msg.Signature == nil { - slog.ErrorCtx(ctx, "Synthetic message is not signed") - return - } - - h = msg.Message.Hash() - if !msg.Signature.Verify(nil, h[:]) { - fatalf("signature is not valid") - } - - if pretend { - return - } - - // Submit directly to an appropriate node - if C2.Address == nil { - for peer := range net.Peers[strings.ToLower(destination)] { - C2 = C2.ForPeer(peer) - break - } - } - - sub, err := C2.Submit(ctx, &messaging.Envelope{Messages: []messaging.Message{msg}}, api.SubmitOptions{}) - if err != nil { - slog.ErrorCtx(ctx, "Submission failed", "error", err) - } - for _, sub := range sub { - if !sub.Success { - slog.ErrorCtx(ctx, "Submission failed", "message", sub, "status", sub.Status) - } - } - if !waitForTxn { - return - } - - Q := api.Querier2{Querier: C2} - txid := r.Sequence.Destination.WithTxID(msg.Hash()) - slog.InfoCtx(ctx, "Waiting", "for", txid) - for { - time.Sleep(time.Second) - r, err := Q.QueryMessage(ctx, txid, nil) - switch { - case errors.Is(err, errors.NotFound): - // Not found, wait - continue - - case err != nil: - // Unknown error - check(err) - - case !r.Status.Delivered(): - // Pending, wait - continue - - default: - // Failed? - check(r.AsError()) - - // Succeeded - return - } - } + err := healing.HealSynthetic(ctx, healing.HealSyntheticArgs{ + Client: C2, + Querier: C2, + Submitter: C2, + NetInfo: net, + Pretend: pretend, + Wait: waitForTxn, + }, healing.SequencedInfo{ + Source: source, + Destination: destination, + Number: number, + }) + check(err) }