From a057c755e88e79b96d3eecaeb6975fb41c84c305 Mon Sep 17 00:00:00 2001 From: Tyler Ruppert <524638+ToasterTheBrave@users.noreply.github.com> Date: Mon, 12 Jun 2023 10:30:50 -0600 Subject: [PATCH] Support for parallel processing (#182) Co-authored-by: Tyler Ruppert <{ID}+{username}@users.noreply.github.com> --- relayer/build_processors.go | 53 ++- relayer/build_processors_test.go | 311 ++++++++++++++++++ relayer/errors.go | 33 ++ relayer/keep_alive.go | 80 ++--- relayer/keep_alive_test.go | 21 +- relayer/message_attester.go | 81 +++++ relayer/message_attester_test.go | 178 ++++++++++ relayer/message_relayer.go | 102 ++++++ ...rocess_test.go => message_relayer_test.go} | 45 +-- relayer/message_signer.go | 117 +++++++ relayer/message_signer_test.go | 171 ++++++++++ relayer/process.go | 197 ----------- relayer/relayer.go | 3 + relayer/start.go | 134 ++------ relayer/update_valset.go | 20 +- testutil/testutil.go | 6 + 16 files changed, 1148 insertions(+), 404 deletions(-) create mode 100644 relayer/build_processors_test.go create mode 100644 relayer/message_attester.go create mode 100644 relayer/message_attester_test.go create mode 100644 relayer/message_relayer.go rename relayer/{process_test.go => message_relayer_test.go} (86%) create mode 100644 relayer/message_signer.go create mode 100644 relayer/message_signer_test.go delete mode 100644 relayer/process.go create mode 100644 testutil/testutil.go diff --git a/relayer/build_processors.go b/relayer/build_processors.go index 17bd74a6..522836d9 100644 --- a/relayer/build_processors.go +++ b/relayer/build_processors.go @@ -3,6 +3,7 @@ package relayer import ( "context" "math/big" + "sync" "github.com/VolumeFi/whoops" "github.com/ethereum/go-ethereum/common" @@ -12,28 +13,60 @@ import ( log "github.com/sirupsen/logrus" ) -func (r *Relayer) buildProcessors(ctx context.Context) ([]chain.Processor, error) { - chainsInfos, err := r.palomaClient.QueryGetEVMChainInfos(ctx) +func (r *Relayer) buildProcessors(ctx context.Context, locker sync.Locker) error { + locker.Lock() + defer locker.Unlock() + queriedChainsInfos, err := r.palomaClient.QueryGetEVMChainInfos(ctx) if err != nil { - return nil, err + return err } - log.WithField("chains-infos", chainsInfos).Trace("got chain infos") + log.WithField("chains-infos", queriedChainsInfos).Trace("got chain infos") - processors := []chain.Processor{} - for _, chainInfo := range chainsInfos { + // See if we need to update + if (r.processors != nil) && (r.chainsInfos != nil) && (len(r.chainsInfos) == len(queriedChainsInfos)) { + chainsChanged := false + for k, c := range r.chainsInfos { + if c.Id != queriedChainsInfos[k].Id || + c.ChainReferenceID != queriedChainsInfos[k].ChainReferenceID || + c.ChainID != queriedChainsInfos[k].ChainID || + string(c.SmartContractUniqueID) != string(queriedChainsInfos[k].SmartContractUniqueID) || + c.SmartContractAddr != queriedChainsInfos[k].SmartContractAddr || + c.ReferenceBlockHeight != queriedChainsInfos[k].ReferenceBlockHeight || + c.ReferenceBlockHash != queriedChainsInfos[k].ReferenceBlockHash || + c.Abi != queriedChainsInfos[k].Abi || + string(c.Bytecode) != string(queriedChainsInfos[k].Bytecode) || + string(c.ConstructorInput) != string(queriedChainsInfos[k].ConstructorInput) || + c.Status != queriedChainsInfos[k].Status || + c.ActiveSmartContractID != queriedChainsInfos[k].ActiveSmartContractID || + c.MinOnChainBalance != queriedChainsInfos[k].MinOnChainBalance { + chainsChanged = true + } + } + if !chainsChanged { + log.Debug("chain infos unchanged since last tick") + return nil + } + } + + log.Debug("chain infos changed. building processors") + + r.processors = []chain.Processor{} + r.chainsInfos = []evmtypes.ChainInfo{} + for _, chainInfo := range queriedChainsInfos { processor, err := r.processorFactory(chainInfo) if errors.IsUnrecoverable(err) { - return nil, err + return err } if err := processor.IsRightChain(ctx); err != nil { - return nil, err + return err } - processors = append(processors, processor) + r.processors = append(r.processors, processor) + r.chainsInfos = append(r.chainsInfos, *chainInfo) } - return processors, nil + return nil } func (r *Relayer) processorFactory(chainInfo *evmtypes.ChainInfo) (chain.Processor, error) { diff --git a/relayer/build_processors_test.go b/relayer/build_processors_test.go new file mode 100644 index 00000000..2f23fc88 --- /dev/null +++ b/relayer/build_processors_test.go @@ -0,0 +1,311 @@ +package relayer + +import ( + "context" + "testing" + + "github.com/palomachain/paloma/x/evm/types" + "github.com/palomachain/pigeon/chain" + chainmocks "github.com/palomachain/pigeon/chain/mocks" + "github.com/palomachain/pigeon/config" + "github.com/palomachain/pigeon/relayer/mocks" + "github.com/palomachain/pigeon/testutil" + timemocks "github.com/palomachain/pigeon/util/time/mocks" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +func TestBuildProcessors(t *testing.T) { + testcases := []struct { + name string + setup func(t *testing.T) (*Relayer, []chain.Processor, []types.ChainInfo) + expectedErr error + }{ + { + name: "when there are no processors on relayer yet it builds processors", + setup: func(t *testing.T) (*Relayer, []chain.Processor, []types.ChainInfo) { + chain1Info := types.ChainInfo{ + Id: 1, + ChainReferenceID: "chain-1", + MinOnChainBalance: "5", + } + pc := mocks.NewPalomaClienter(t) + pc.On( + "QueryGetEVMChainInfos", + mock.Anything, + mock.Anything, + ).Return( + []*types.ChainInfo{ + &chain1Info, + }, + nil, + ) + + processorMock := chainmocks.NewProcessor(t) + processorMock.On("IsRightChain", mock.Anything).Return(nil) + + evmFactoryMock := mocks.NewEvmFactorier(t) + evmFactoryMock.On("Build", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(processorMock, nil) + + r := New( + config.Root{ + EVM: map[string]config.EVM{ + "chain-1": {}, + }, + }, + pc, + evmFactoryMock, + timemocks.NewTime(t), + Config{}, + ) + r.chainsInfos = []types.ChainInfo{ + chain1Info, + } + + return r, + []chain.Processor{ + processorMock, + }, + []types.ChainInfo{ + chain1Info, + } + }, + }, + { + name: "when there are no chainsInfos on relayer yet it builds processors", + setup: func(t *testing.T) (*Relayer, []chain.Processor, []types.ChainInfo) { + chain1Info := types.ChainInfo{ + Id: 1, + ChainReferenceID: "chain-1", + MinOnChainBalance: "5", + } + pc := mocks.NewPalomaClienter(t) + pc.On( + "QueryGetEVMChainInfos", + mock.Anything, + mock.Anything, + ).Return( + []*types.ChainInfo{ + &chain1Info, + }, + nil, + ) + + processorMock := chainmocks.NewProcessor(t) + processorMock.On("IsRightChain", mock.Anything).Return(nil) + + evmFactoryMock := mocks.NewEvmFactorier(t) + evmFactoryMock.On("Build", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(processorMock, nil) + + r := New( + config.Root{ + EVM: map[string]config.EVM{ + "chain-1": {}, + }, + }, + pc, + evmFactoryMock, + timemocks.NewTime(t), + Config{}, + ) + r.processors = []chain.Processor{ + chainmocks.NewProcessor(t), + } + + return r, + []chain.Processor{ + processorMock, + }, + []types.ChainInfo{ + chain1Info, + } + }, + }, + { + name: "when the chains lengths are different it builds processors", + setup: func(t *testing.T) (*Relayer, []chain.Processor, []types.ChainInfo) { + chain1Info := types.ChainInfo{ + Id: 1, + ChainReferenceID: "chain-1", + MinOnChainBalance: "5", + } + chain2Info := types.ChainInfo{ + Id: 2, + ChainReferenceID: "chain-2", + MinOnChainBalance: "5", + } + + pc := mocks.NewPalomaClienter(t) + pc.On( + "QueryGetEVMChainInfos", + mock.Anything, + mock.Anything, + ).Return( + []*types.ChainInfo{ + &chain1Info, + }, + nil, + ) + + processorMock := chainmocks.NewProcessor(t) + processorMock.On("IsRightChain", mock.Anything).Return(nil) + + evmFactoryMock := mocks.NewEvmFactorier(t) + evmFactoryMock.On("Build", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(processorMock, nil) + + r := New( + config.Root{ + EVM: map[string]config.EVM{ + "chain-1": {}, + }, + }, + pc, + evmFactoryMock, + timemocks.NewTime(t), + Config{}, + ) + r.processors = []chain.Processor{ + chainmocks.NewProcessor(t), + } + + r.chainsInfos = []types.ChainInfo{ + chain1Info, + chain2Info, + } + + return r, + []chain.Processor{ + processorMock, + }, + []types.ChainInfo{ + chain1Info, + } + }, + }, + { + name: "when there is a difference in the chain data it builds processors", + setup: func(t *testing.T) (*Relayer, []chain.Processor, []types.ChainInfo) { + chain1Info := types.ChainInfo{ + Id: 1, + ChainReferenceID: "chain-1", + MinOnChainBalance: "5", + } + + chain1NewInfo := types.ChainInfo{ + Id: 1, + ChainReferenceID: "chain-1", + MinOnChainBalance: "50", + } + pc := mocks.NewPalomaClienter(t) + pc.On( + "QueryGetEVMChainInfos", + mock.Anything, + mock.Anything, + ).Return( + []*types.ChainInfo{ + &chain1NewInfo, + }, + nil, + ) + + processorMock := chainmocks.NewProcessor(t) + processorMock.On("IsRightChain", mock.Anything).Return(nil) + + evmFactoryMock := mocks.NewEvmFactorier(t) + evmFactoryMock.On("Build", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(processorMock, nil) + + r := New( + config.Root{ + EVM: map[string]config.EVM{ + "chain-1": {}, + }, + }, + pc, + evmFactoryMock, + timemocks.NewTime(t), + Config{}, + ) + r.processors = []chain.Processor{ + chainmocks.NewProcessor(t), + } + + r.chainsInfos = []types.ChainInfo{ + chain1Info, + } + + return r, + []chain.Processor{ + processorMock, + }, + []types.ChainInfo{ + chain1NewInfo, + } + }, + }, + { + name: "when the chains are the same it doesn't build processors", + setup: func(t *testing.T) (*Relayer, []chain.Processor, []types.ChainInfo) { + chain1Info := types.ChainInfo{ + Id: 1, + ChainReferenceID: "chain-1", + MinOnChainBalance: "5", + } + + pc := mocks.NewPalomaClienter(t) + pc.On( + "QueryGetEVMChainInfos", + mock.Anything, + mock.Anything, + ).Return( + []*types.ChainInfo{ + &chain1Info, + }, + nil, + ) + + r := New( + config.Root{ + EVM: map[string]config.EVM{ + "chain-1": {}, + }, + }, + pc, + mocks.NewEvmFactorier(t), + timemocks.NewTime(t), + Config{}, + ) + + origProcessor := chainmocks.NewProcessor(t) + r.processors = []chain.Processor{ + origProcessor, + } + + r.chainsInfos = []types.ChainInfo{ + chain1Info, + } + + return r, + []chain.Processor{ + origProcessor, + }, + []types.ChainInfo{ + chain1Info, + } + }, + }, + } + + asserter := assert.New(t) + ctx := context.Background() + for _, tt := range testcases { + t.Run(tt.name, func(t *testing.T) { + relayer, expectedProcessors, expectedChainsInfos := tt.setup(t) + var locker testutil.FakeMutex + + actualErr := relayer.buildProcessors(ctx, locker) + asserter.Equal(tt.expectedErr, actualErr) + asserter.Equal(expectedProcessors, relayer.processors) + asserter.Equal(expectedChainsInfos, relayer.chainsInfos) + }) + } +} diff --git a/relayer/errors.go b/relayer/errors.go index 2b9b47c9..0c5b13c8 100644 --- a/relayer/errors.go +++ b/relayer/errors.go @@ -1,8 +1,12 @@ package relayer import ( + "context" + goerrors "errors" + "github.com/VolumeFi/whoops" "github.com/palomachain/pigeon/errors" + log "github.com/sirupsen/logrus" ) var ( @@ -15,3 +19,32 @@ var ( ErrValidatorIsNotStaking = whoops.String("validator is not staking") ) + +func handleProcessError(err error) error { + switch { + case err == nil: + // success + return nil + case goerrors.Is(err, context.Canceled): + log.WithFields(log.Fields{ + "err": err, + }).Debug("exited from the process loop due the context being canceled") + return nil + case goerrors.Is(err, context.DeadlineExceeded): + log.WithFields(log.Fields{ + "err": err, + }).Debug("exited from the process loop due the context deadline being exceeded") + return nil + case errors.IsUnrecoverable(err): + // there is no way that we can recover from this + log.WithFields(log.Fields{ + "err": err, + }).Error("unrecoverable error returned") + return err + default: + log.WithFields(log.Fields{ + "err": err, + }).Error("error returned in process loop") + return nil + } +} diff --git a/relayer/keep_alive.go b/relayer/keep_alive.go index d7a13903..71753815 100644 --- a/relayer/keep_alive.go +++ b/relayer/keep_alive.go @@ -4,64 +4,36 @@ import ( "context" "strings" "sync" - "time" - "github.com/palomachain/pigeon/util/channels" log "github.com/sirupsen/logrus" ) -func (r *Relayer) startKeepAlive(ctx context.Context, locker sync.Locker) { - log.Debug("starting keep alive loop") - defer func() { - log.Debug("existing keep alive loop") - }() - ticker := time.NewTicker(r.relayerConfig.KeepAliveLoopTimeout) - defer ticker.Stop() - - checkNow := make(chan time.Time, 1) - checkNow <- time.Time{} - tickerCh := channels.FanIn(ticker.C, checkNow) - defer func() { - log.Info("exiting keep alive loop") - }() - - for { - select { - case <-ctx.Done(): - return - case _, chOpen := <-tickerCh: - if !chOpen { - return - } - if ctx.Err() != nil { - return - } - log.Debug("querying get alive time") - aliveUntil, err := r.palomaClient.QueryGetValidatorAliveUntil(ctx) - if err != nil { - if !strings.Contains(err.Error(), "validator is not in keep alive store") { - log.WithError(err).Error("error while getting the alive time for a validator") - continue - } - } - now := r.time.Now().UTC() - ttl := aliveUntil.Sub(now) - sendKeepAlive := ttl < r.relayerConfig.KeepAliveThreshold - log.WithFields(log.Fields{ - "alive-until": aliveUntil, - "time-now": now, - "ttl": ttl, - "should-send-keep-alive": sendKeepAlive, - }).Debug("checking keep alive") - if sendKeepAlive { - locker.Lock() - err := r.palomaClient.KeepValidatorAlive(ctx) - locker.Unlock() - if err != nil { - log.WithError(err).Error("error while trying to keep pigeon alive") - continue - } - } +func (r *Relayer) keepAlive(ctx context.Context, locker sync.Locker) error { + log.Debug("querying get alive time") + aliveUntil, err := r.palomaClient.QueryGetValidatorAliveUntil(ctx) + if err != nil { + if !strings.Contains(err.Error(), "validator is not in keep alive store") { + log.WithError(err).Error("error while getting the alive time for a validator") + return err + } + } + now := r.time.Now().UTC() + ttl := aliveUntil.Sub(now) + sendKeepAlive := ttl < r.relayerConfig.KeepAliveThreshold + log.WithFields(log.Fields{ + "alive-until": aliveUntil, + "time-now": now, + "ttl": ttl, + "should-send-keep-alive": sendKeepAlive, + }).Debug("checking keep alive") + if sendKeepAlive { + locker.Lock() + err := r.palomaClient.KeepValidatorAlive(ctx) + locker.Unlock() + if err != nil { + log.WithError(err).Error("error while trying to keep pigeon alive") + return err } } + return nil } diff --git a/relayer/keep_alive_test.go b/relayer/keep_alive_test.go index ae28ad18..7effd798 100644 --- a/relayer/keep_alive_test.go +++ b/relayer/keep_alive_test.go @@ -7,21 +7,19 @@ import ( "github.com/VolumeFi/whoops" "github.com/palomachain/pigeon/relayer/mocks" + "github.com/palomachain/pigeon/testutil" timemock "github.com/palomachain/pigeon/util/time/mocks" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" ) -type FakeMutex struct{} - -func (m FakeMutex) Lock() {} -func (m FakeMutex) Unlock() {} - func TestKeepAlive(t *testing.T) { randErr := whoops.String("oh no") testdata := []struct { - name string - setup func(t *testing.T) (*mocks.PalomaClienter, *timemock.Time, context.Context) + name string + setup func(t *testing.T) (*mocks.PalomaClienter, *timemock.Time, context.Context) + expectedErr error }{ { name: "if querying alive returns an error, it does nothing", @@ -35,6 +33,7 @@ func TestKeepAlive(t *testing.T) { }) return paloma, tm, ctx }, + expectedErr: randErr, }, { name: "when validator is almost dead, it should call the keep alive method", @@ -65,6 +64,7 @@ func TestKeepAlive(t *testing.T) { }) return paloma, tm, ctx }, + expectedErr: randErr, }, { name: "when validator still has time to live, it does not call keep alive method", @@ -82,6 +82,7 @@ func TestKeepAlive(t *testing.T) { }, } + asserter := assert.New(t) for _, tt := range testdata { t.Run(tt.name, func(t *testing.T) { tm := timemock.NewTime(t) @@ -101,8 +102,10 @@ func TestKeepAlive(t *testing.T) { palomaClient: paloma, } - var locker FakeMutex - r.startKeepAlive(ctx, &locker) + var locker testutil.FakeMutex + actualErr := r.keepAlive(ctx, &locker) + + asserter.Equal(tt.expectedErr, actualErr) }) } } diff --git a/relayer/message_attester.go b/relayer/message_attester.go new file mode 100644 index 00000000..d0b7f038 --- /dev/null +++ b/relayer/message_attester.go @@ -0,0 +1,81 @@ +package relayer + +import ( + "context" + "sync" + + "github.com/palomachain/pigeon/chain" + "github.com/palomachain/pigeon/util/slice" + log "github.com/sirupsen/logrus" +) + +func (r *Relayer) AttestMessages(ctx context.Context, locker sync.Locker) error { + log.Info("attester loop") + if ctx.Err() != nil { + log.Info("exiting attester loop as context has ended") + return ctx.Err() + } + + err := r.buildProcessors(ctx, locker) + if err != nil { + return err + } + + locker.Lock() + err = r.attestMessages(ctx, r.processors) + locker.Unlock() + + return handleProcessError(err) +} + +func (r *Relayer) attestMessages(ctx context.Context, processors []chain.Processor) error { + if len(processors) == 0 { + return nil + } + + // todo randomise + for _, p := range processors { + // todo randomise + for _, queueName := range p.SupportedQueues() { + logger := log.WithFields(log.Fields{ + "queue-name": queueName, + "action": "attest", + }) + + messagesInQueue, err := r.palomaClient.QueryMessagesInQueue(ctx, queueName) + + logger = log.WithFields(log.Fields{ + "queue-name": queueName, + "message-ids": slice.Map(messagesInQueue, func(msg chain.MessageWithSignatures) uint64 { + return msg.ID + }), + }) + + logger.Debug("got ", len(messagesInQueue), " messages from ", queueName) + if err != nil { + logger.WithError(err).Error("couldn't get messages to attest") + return err + } + + msgsToAttest := slice.Filter(messagesInQueue, func(msg chain.MessageWithSignatures) bool { + return len(msg.PublicAccessData) > 0 + }) + + if len(msgsToAttest) > 0 { + logger := logger.WithFields(log.Fields{ + "messages-to-attest": slice.Map(msgsToAttest, func(msg chain.MessageWithSignatures) uint64 { + return msg.ID + }), + }) + logger.Info("attesting ", len(msgsToAttest), " messages") + err := p.ProvideEvidence(ctx, queueName, msgsToAttest) + if err != nil { + logger.WithError(err).Error("error attesting messages") + return err + } + } + } + } + + return nil +} diff --git a/relayer/message_attester_test.go b/relayer/message_attester_test.go new file mode 100644 index 00000000..35f4e362 --- /dev/null +++ b/relayer/message_attester_test.go @@ -0,0 +1,178 @@ +package relayer + +import ( + "context" + "os" + "testing" + + "github.com/ethereum/go-ethereum/common" + evmtypes "github.com/palomachain/paloma/x/evm/types" + "github.com/palomachain/pigeon/chain" + "github.com/palomachain/pigeon/chain/evm" + chainmocks "github.com/palomachain/pigeon/chain/mocks" + "github.com/palomachain/pigeon/config" + "github.com/palomachain/pigeon/relayer/mocks" + "github.com/palomachain/pigeon/testutil" + timemocks "github.com/palomachain/pigeon/util/time/mocks" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +func TestAttestMessages(t *testing.T) { + ctx := context.Background() + testcases := []struct { + name string + setup func(t *testing.T) *Relayer + expErr error + }{ + { + name: "without any processor it does nothing", + setup: func(t *testing.T) *Relayer { + pc := mocks.NewPalomaClienter(t) + pc.On("QueryGetEVMChainInfos", mock.Anything, mock.Anything).Return(nil, nil) + r := New( + config.Root{}, + pc, + evm.NewFactory(pc), + timemocks.NewTime(t), + Config{}, + ) + return r + }, + }, + { + name: "it attests messages", + setup: func(t *testing.T) *Relayer { + keyringPass := "abcd" + + dir := t.TempDir() + keyring := evm.OpenKeystore(dir) + acc, err := keyring.NewAccount(keyringPass) + require.NoError(t, err) + + p := chainmocks.NewProcessor(t) + p.On("IsRightChain", mock.Anything).Return(nil) + p.On("SupportedQueues").Return([]string{"a"}) + + p.On( + "ProvideEvidence", + mock.Anything, + "a", + []chain.MessageWithSignatures{ + {QueuedMessage: chain.QueuedMessage{ID: 789, PublicAccessData: []byte("tx hash")}}, + }, + ).Return(nil) + + pal := mocks.NewPalomaClienter(t) + pal.On("QueryGetEVMChainInfos", mock.Anything, mock.Anything).Return([]*evmtypes.ChainInfo{ + { + ChainReferenceID: "main", + ChainID: 5, + SmartContractUniqueID: []byte("5"), + SmartContractAddr: common.BytesToAddress([]byte("abcd")).Hex(), + ReferenceBlockHeight: 5, + ReferenceBlockHash: "0x12", + MinOnChainBalance: "10000", + }, + }, nil) + pal.On("QueryMessagesInQueue", mock.Anything, mock.Anything).Return( + []chain.MessageWithSignatures{ + {QueuedMessage: chain.QueuedMessage{ID: 123}}, + {QueuedMessage: chain.QueuedMessage{ID: 456}}, + {QueuedMessage: chain.QueuedMessage{ID: 789, PublicAccessData: []byte("tx hash")}}, + }, + nil, + ) + + os.Setenv("TEST_PASS", keyringPass) + t.Cleanup(func() { + os.Unsetenv("TEST_PASS") + }) + + factory := mocks.NewEvmFactorier(t) + + factory.On("Build", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(p, nil) + + return New( + config.Root{ + EVM: map[string]config.EVM{ + "main": { + ChainClientConfig: config.ChainClientConfig{ + KeyringPassEnvName: "TEST_PASS", + SigningKey: acc.Address.Hex(), + KeyringDirectory: config.Filepath(dir), + }, + }, + }, + }, + pal, + factory, + timemocks.NewTime(t), + Config{}, + ) + }, + }, + { + name: "if the processor is connected to the wrong chain it returns the error", + setup: func(t *testing.T) *Relayer { + keyringPass := "abcd" + + dir := t.TempDir() + keyring := evm.OpenKeystore(dir) + acc, err := keyring.NewAccount(keyringPass) + require.NoError(t, err) + + p := chainmocks.NewProcessor(t) + p.On("IsRightChain", mock.Anything).Return(chain.ErrNotConnectedToRightChain) + + pal := mocks.NewPalomaClienter(t) + pal.On("QueryGetEVMChainInfos", mock.Anything, mock.Anything).Return([]*evmtypes.ChainInfo{ + { + ChainReferenceID: "main", + ChainID: 5, + SmartContractUniqueID: []byte("5"), + SmartContractAddr: common.BytesToAddress([]byte("abcd")).Hex(), + ReferenceBlockHeight: 5, + ReferenceBlockHash: "0x12", + MinOnChainBalance: "10000", + }, + }, nil) + + factory := mocks.NewEvmFactorier(t) + + factory.On("Build", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(p, nil) + + return New( + config.Root{ + EVM: map[string]config.EVM{ + "main": { + ChainClientConfig: config.ChainClientConfig{ + KeyringPassEnvName: "TEST_PASS", + SigningKey: acc.Address.Hex(), + KeyringDirectory: config.Filepath(dir), + }, + }, + }, + }, + pal, + factory, + timemocks.NewTime(t), + Config{}, + ) + }, + expErr: chain.ErrNotConnectedToRightChain, + }, + } + + for _, tt := range testcases { + asserter := assert.New(t) + t.Run(tt.name, func(t *testing.T) { + relayer := tt.setup(t) + + var locker testutil.FakeMutex + actualErr := relayer.AttestMessages(ctx, &locker) + asserter.Equal(tt.expErr, actualErr) + }) + } +} diff --git a/relayer/message_relayer.go b/relayer/message_relayer.go new file mode 100644 index 00000000..2bb1307d --- /dev/null +++ b/relayer/message_relayer.go @@ -0,0 +1,102 @@ +package relayer + +import ( + "context" + "fmt" + "sync" + + "github.com/palomachain/pigeon/chain" + "github.com/palomachain/pigeon/chain/paloma/collision" + "github.com/palomachain/pigeon/util/slice" + log "github.com/sirupsen/logrus" +) + +func (r *Relayer) RelayMessages(ctx context.Context, locker sync.Locker) error { + log.Info("relayer loop") + if ctx.Err() != nil { + log.Info("exiting relayer loop as context has ended") + return ctx.Err() + } + + err := r.buildProcessors(ctx, locker) + if err != nil { + return err + } + + locker.Lock() + err = r.relayMessages(ctx, r.processors) + locker.Unlock() + + return handleProcessError(err) +} + +func (r *Relayer) relayMessages(ctx context.Context, processors []chain.Processor) error { + if len(processors) == 0 { + return nil + } + + ctx, cleanup, err := collision.GoStartLane(ctx, r.palomaClient, r.palomaClient.GetValidatorAddress()) + if err != nil { + return err + } + defer cleanup() + + // todo randomise + for _, p := range processors { + // todo randomise + for _, queueName := range p.SupportedQueues() { + logger := log.WithFields(log.Fields{ + "queue-name": queueName, + "action": "relay", + }) + + messagesInQueue, err := r.palomaClient.QueryMessagesInQueue(ctx, queueName) + + logger = logger.WithFields(log.Fields{ + "message-ids": slice.Map(messagesInQueue, func(msg chain.MessageWithSignatures) uint64 { + return msg.ID + }), + }) + + logger.Debug("got ", len(messagesInQueue), " messages from ", queueName) + if err != nil { + logger.WithError(err).Error("couldn't get messages to relay") + return err + } + + relayCandidateMsgs := slice.Filter( + messagesInQueue, + func(msg chain.MessageWithSignatures) bool { + return len(msg.PublicAccessData) == 0 + }, + func(msg chain.MessageWithSignatures) bool { + return collision.AllowedToExecute( + ctx, + []byte(fmt.Sprintf("%s-%d", queueName, msg.ID)), + ) + }, + ) + + if len(relayCandidateMsgs) > 0 { + logger := logger.WithFields(log.Fields{ + "messages-to-relay": slice.Map(relayCandidateMsgs, func(msg chain.MessageWithSignatures) uint64 { + return msg.ID + }), + }) + logger.Info("relaying ", len(relayCandidateMsgs), " messages") + err := p.ProcessMessages(ctx, queueName, relayCandidateMsgs) + if err != nil { + logger.WithFields(log.Fields{ + "err": err, + "queue-name": queueName, + "messages-to-relay": slice.Map(relayCandidateMsgs, func(msg chain.MessageWithSignatures) uint64 { + return msg.ID + }), + }).Error("error relaying messages") + return err + } + } + } + } + return nil +} diff --git a/relayer/process_test.go b/relayer/message_relayer_test.go similarity index 86% rename from relayer/process_test.go rename to relayer/message_relayer_test.go index 45ee6c8b..a01a4e1d 100644 --- a/relayer/process_test.go +++ b/relayer/message_relayer_test.go @@ -14,31 +14,33 @@ import ( chainmocks "github.com/palomachain/pigeon/chain/mocks" "github.com/palomachain/pigeon/config" "github.com/palomachain/pigeon/relayer/mocks" + "github.com/palomachain/pigeon/testutil" timemocks "github.com/palomachain/pigeon/util/time/mocks" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" ) -func TestProcessing(t *testing.T) { +func TestRelayMessages(t *testing.T) { ctx := context.Background() - for _, tt := range []struct { - name string - setup func(t *testing.T) *Relayer - expErr error - buildProcessorErr error + testcases := []struct { + name string + setup func(t *testing.T) *Relayer + expErr error }{ { name: "without any processor it does nothing", setup: func(t *testing.T) *Relayer { pc := mocks.NewPalomaClienter(t) pc.On("QueryGetEVMChainInfos", mock.Anything, mock.Anything).Return(nil, nil) - return New( + r := New( config.Root{}, pc, evm.NewFactory(pc), timemocks.NewTime(t), Config{}, ) + return r }, }, { @@ -64,15 +66,6 @@ func TestProcessing(t *testing.T) { }, ).Return(nil) - p.On( - "ProvideEvidence", - mock.Anything, - "a", - []chain.MessageWithSignatures{ - {QueuedMessage: chain.QueuedMessage{ID: 789, PublicAccessData: []byte("tx hash")}}, - }, - ).Return(nil) - pal := mocks.NewPalomaClienter(t) pal.On("GetValidatorAddress").Return(sdk.ValAddress("abc")) pal.On("BlockHeight", mock.Anything).Return(int64(555), nil) @@ -103,10 +96,7 @@ func TestProcessing(t *testing.T) { }, nil, ) - pal.On("QueryMessagesForSigning", mock.Anything, "a").Return( - []chain.QueuedMessage{}, - nil, - ) + os.Setenv("TEST_PASS", keyringPass) t.Cleanup(func() { os.Unsetenv("TEST_PASS") @@ -183,17 +173,18 @@ func TestProcessing(t *testing.T) { Config{}, ) }, - buildProcessorErr: chain.ErrNotConnectedToRightChain, + expErr: chain.ErrNotConnectedToRightChain, }, - } { + } + + for _, tt := range testcases { + asserter := assert.New(t) t.Run(tt.name, func(t *testing.T) { relayer := tt.setup(t) - processors, err := relayer.buildProcessors(ctx) - require.ErrorIs(t, err, tt.buildProcessorErr) - - err = relayer.Process(ctx, processors) - require.ErrorIs(t, err, tt.expErr) + var locker testutil.FakeMutex + actualErr := relayer.RelayMessages(ctx, &locker) + asserter.Equal(tt.expErr, actualErr) }) } } diff --git a/relayer/message_signer.go b/relayer/message_signer.go new file mode 100644 index 00000000..90f42429 --- /dev/null +++ b/relayer/message_signer.go @@ -0,0 +1,117 @@ +package relayer + +import ( + "context" + "sync" + + "github.com/palomachain/pigeon/chain" + "github.com/palomachain/pigeon/chain/paloma" + "github.com/palomachain/pigeon/util/slice" + log "github.com/sirupsen/logrus" +) + +func (r *Relayer) SignMessages(ctx context.Context, locker sync.Locker) error { + log.Info("signer loop") + if ctx.Err() != nil { + log.Info("exiting signer loop as context has ended") + return ctx.Err() + } + + err := r.buildProcessors(ctx, locker) + if err != nil { + log.Error(err) + return err + } + + locker.Lock() + err = r.signMessages(ctx, r.processors) + locker.Unlock() + + return handleProcessError(err) +} + +func (r *Relayer) signMessages(ctx context.Context, processors []chain.Processor) error { + if len(processors) == 0 { + return nil + } + + // todo randomise + for _, p := range processors { + // todo randomise + for _, queueName := range p.SupportedQueues() { + logger := log.WithFields(log.Fields{ + "queue-name": queueName, + "action": "sign", + }) + + messagesForSigning, err := r.palomaClient.QueryMessagesForSigning(ctx, queueName) + if err != nil { + logger.Error("failed getting messages to sign") + return err + } + + logger = log.WithFields(log.Fields{ + "message-ids": slice.Map(messagesForSigning, func(msg chain.QueuedMessage) uint64 { + return msg.ID + }), + }) + + if len(messagesForSigning) > 0 { + logger := logger.WithFields(log.Fields{ + "messages-to-sign": slice.Map(messagesForSigning, func(msg chain.QueuedMessage) uint64 { + return msg.ID + }), + }) + + logger.Info("signing ", len(messagesForSigning), " messages") + signedMessages, err := p.SignMessages(ctx, queueName, messagesForSigning...) + if err != nil { + logger.WithError(err).Error("unable to sign messages") + return err + } + logger = logger.WithFields(log.Fields{ + "signed-messages": slice.Map(signedMessages, func(msg chain.SignedQueuedMessage) log.Fields { + return log.Fields{ + "id": msg.ID, + } + }), + }) + logger.Info("signed messages") + + if err = r.broadcastSignatures(ctx, queueName, signedMessages); err != nil { + logger.WithError(err).Error("couldn't broadcast signatures and process attestation") + return err + } + } + + } + } + + return nil +} + +func (r *Relayer) broadcastSignatures(ctx context.Context, queueTypeName string, sigs []chain.SignedQueuedMessage) error { + broadcastMessageSignatures, err := slice.MapErr( + sigs, + func(sig chain.SignedQueuedMessage) (paloma.BroadcastMessageSignatureIn, error) { + log.WithFields( + log.Fields{ + "id": sig.ID, + "queue-type-name": queueTypeName, + }, + ).Debug("broadcasting signed message") + + return paloma.BroadcastMessageSignatureIn{ + ID: sig.ID, + QueueTypeName: queueTypeName, + Signature: sig.Signature, + SignedByAddress: sig.SignedByAddress, + }, nil + }, + ) + if err != nil { + return err + } + + return r.palomaClient.BroadcastMessageSignatures(ctx, broadcastMessageSignatures...) +} diff --git a/relayer/message_signer_test.go b/relayer/message_signer_test.go new file mode 100644 index 00000000..a4ed6da7 --- /dev/null +++ b/relayer/message_signer_test.go @@ -0,0 +1,171 @@ +package relayer + +import ( + "context" + "os" + "testing" + + "github.com/ethereum/go-ethereum/common" + evmtypes "github.com/palomachain/paloma/x/evm/types" + "github.com/palomachain/pigeon/chain" + "github.com/palomachain/pigeon/chain/evm" + chainmocks "github.com/palomachain/pigeon/chain/mocks" + "github.com/palomachain/pigeon/config" + "github.com/palomachain/pigeon/relayer/mocks" + "github.com/palomachain/pigeon/testutil" + timemocks "github.com/palomachain/pigeon/util/time/mocks" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +func TestSignMessages(t *testing.T) { + ctx := context.Background() + testcases := []struct { + name string + setup func(t *testing.T) *Relayer + expErr error + }{ + { + name: "without any processor it does nothing", + setup: func(t *testing.T) *Relayer { + pc := mocks.NewPalomaClienter(t) + pc.On("QueryGetEVMChainInfos", mock.Anything, mock.Anything).Return(nil, nil) + return New( + config.Root{}, + pc, + evm.NewFactory(pc), + timemocks.NewTime(t), + Config{}, + ) + }, + }, + { + name: "it signs messages", + setup: func(t *testing.T) *Relayer { + keyringPass := "abcd" + + dir := t.TempDir() + keyring := evm.OpenKeystore(dir) + acc, err := keyring.NewAccount(keyringPass) + require.NoError(t, err) + + p := chainmocks.NewProcessor(t) + p.On("IsRightChain", mock.Anything).Return(nil) + p.On("SupportedQueues").Return([]string{"a"}) + p.On("SignMessages", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) + + pal := mocks.NewPalomaClienter(t) + + pal.On("QueryGetEVMChainInfos", mock.Anything, mock.Anything).Return([]*evmtypes.ChainInfo{ + { + ChainReferenceID: "main", + ChainID: 5, + SmartContractUniqueID: []byte("5"), + SmartContractAddr: common.BytesToAddress([]byte("abcd")).Hex(), + ReferenceBlockHeight: 5, + ReferenceBlockHash: "0x12", + MinOnChainBalance: "10000", + }, + }, nil) + + pal.On("QueryMessagesForSigning", mock.Anything, "a").Return( + []chain.QueuedMessage{ + {ID: 123}, + }, + nil, + ) + + pal.On("BroadcastMessageSignatures", mock.Anything, mock.Anything).Return(nil) + + os.Setenv("TEST_PASS", keyringPass) + t.Cleanup(func() { + os.Unsetenv("TEST_PASS") + }) + + factory := mocks.NewEvmFactorier(t) + + factory.On("Build", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(p, nil) + + return New( + config.Root{ + EVM: map[string]config.EVM{ + "main": { + ChainClientConfig: config.ChainClientConfig{ + KeyringPassEnvName: "TEST_PASS", + SigningKey: acc.Address.Hex(), + KeyringDirectory: config.Filepath(dir), + }, + }, + }, + }, + pal, + factory, + timemocks.NewTime(t), + Config{}, + ) + }, + }, + { + name: "if the processor is connected to the wrong chain it returns the error", + setup: func(t *testing.T) *Relayer { + keyringPass := "abcd" + + dir := t.TempDir() + keyring := evm.OpenKeystore(dir) + acc, err := keyring.NewAccount(keyringPass) + require.NoError(t, err) + + p := chainmocks.NewProcessor(t) + p.On("IsRightChain", mock.Anything).Return(chain.ErrNotConnectedToRightChain) + + pal := mocks.NewPalomaClienter(t) + pal.On("QueryGetEVMChainInfos", mock.Anything, mock.Anything).Return([]*evmtypes.ChainInfo{ + { + ChainReferenceID: "main", + ChainID: 5, + SmartContractUniqueID: []byte("5"), + SmartContractAddr: common.BytesToAddress([]byte("abcd")).Hex(), + ReferenceBlockHeight: 5, + ReferenceBlockHash: "0x12", + MinOnChainBalance: "10000", + }, + }, nil) + + factory := mocks.NewEvmFactorier(t) + + factory.On("Build", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(p, nil) + + return New( + config.Root{ + EVM: map[string]config.EVM{ + "main": { + ChainClientConfig: config.ChainClientConfig{ + KeyringPassEnvName: "TEST_PASS", + SigningKey: acc.Address.Hex(), + KeyringDirectory: config.Filepath(dir), + }, + }, + }, + }, + pal, + factory, + timemocks.NewTime(t), + Config{}, + ) + }, + expErr: chain.ErrNotConnectedToRightChain, + }, + } + + for _, tt := range testcases { + asserter := assert.New(t) + t.Run(tt.name, func(t *testing.T) { + relayer := tt.setup(t) + + var locker testutil.FakeMutex + actualErr := relayer.SignMessages(ctx, &locker) + asserter.Equal(tt.expErr, actualErr) + }) + } +} diff --git a/relayer/process.go b/relayer/process.go deleted file mode 100644 index 32324c5f..00000000 --- a/relayer/process.go +++ /dev/null @@ -1,197 +0,0 @@ -package relayer - -import ( - "context" - "fmt" - - "github.com/VolumeFi/whoops" - "github.com/palomachain/pigeon/chain" - "github.com/palomachain/pigeon/chain/paloma" - "github.com/palomachain/pigeon/chain/paloma/collision" - "github.com/palomachain/pigeon/util/slice" - log "github.com/sirupsen/logrus" -) - -func (r *Relayer) SignMessages(ctx context.Context, queueName string, messagesForSigning []chain.QueuedMessage, processor chain.Processor) error { - loggerQueuedMessages := log.WithFields(log.Fields{ - "queue-name": queueName, - "message-ids": slice.Map(messagesForSigning, func(msg chain.QueuedMessage) uint64 { - return msg.ID - }), - }) - - if len(messagesForSigning) > 0 { - loggerQueuedMessages.Info("messages to sign") - signedMessages, err := processor.SignMessages(ctx, queueName, messagesForSigning...) - if err != nil { - loggerQueuedMessages.WithError(err).Error("unable to sign messages") - return err - } - loggerQueuedMessages = loggerQueuedMessages.WithFields(log.Fields{ - "signed-messages": slice.Map(signedMessages, func(msg chain.SignedQueuedMessage) log.Fields { - return log.Fields{ - "id": msg.ID, - } - }), - }) - loggerQueuedMessages.Info("signed messages") - - if err = r.broadcastSignatures(ctx, queueName, signedMessages); err != nil { - loggerQueuedMessages.WithError(err).Error("couldn't broadcast signatures and process attestation") - return err - } - } - return nil -} - -func (r *Relayer) RelayMessages(ctx context.Context, queueName string, messagesInQueue []chain.MessageWithSignatures, p chain.Processor) error { - logger := log.WithFields(log.Fields{ - "queue-name": queueName, - }) - - ctx, cleanup, err := collision.GoStartLane(ctx, r.palomaClient, r.palomaClient.GetValidatorAddress()) - if err != nil { - return err - } - defer cleanup() - - relayCandidateMsgs := slice.Filter( - messagesInQueue, - func(msg chain.MessageWithSignatures) bool { - return len(msg.PublicAccessData) == 0 - }, - func(msg chain.MessageWithSignatures) bool { - return collision.AllowedToExecute( - ctx, - []byte(fmt.Sprintf("%s-%d", queueName, msg.ID)), - ) - }, - ) - - if len(relayCandidateMsgs) > 0 { - logger := logger.WithFields(log.Fields{ - "messages-to-relay": slice.Map(relayCandidateMsgs, func(msg chain.MessageWithSignatures) uint64 { - return msg.ID - }), - }) - logger.Info("relaying messages") - err := p.ProcessMessages(ctx, queueName, relayCandidateMsgs) - if err != nil { - logger.WithFields(log.Fields{ - "err": err, - "queue-name": queueName, - "messages-to-relay": slice.Map(relayCandidateMsgs, func(msg chain.MessageWithSignatures) uint64 { - return msg.ID - }), - }).Error("error relaying messages") - return err - } - } - return nil -} - -func (r *Relayer) ProvideEvidenceForMessages(ctx context.Context, queueName string, messagesInQueue []chain.MessageWithSignatures, p chain.Processor) error { - logger := log.WithFields(log.Fields{ - "queue-name": queueName, - }) - - msgsToProvideEvidenceFor := slice.Filter(messagesInQueue, func(msg chain.MessageWithSignatures) bool { - return len(msg.PublicAccessData) > 0 - }) - - if len(msgsToProvideEvidenceFor) > 0 { - logger := logger.WithFields(log.Fields{ - "messages-to-provide-evidence-for": slice.Map(msgsToProvideEvidenceFor, func(msg chain.MessageWithSignatures) uint64 { - return msg.ID - }), - }) - logger.Info("providing evidence for messages") - err := p.ProvideEvidence(ctx, queueName, msgsToProvideEvidenceFor) - if err != nil { - logger.WithError(err).Error("error providing evidence for messages") - return err - } - } - return nil -} - -func (r *Relayer) Process(ctx context.Context, processors []chain.Processor) error { - var processErrors whoops.Group - - if len(processors) == 0 { - return nil - } - - // todo randomise - for _, p := range processors { - // todo randomise - for _, queueName := range p.SupportedQueues() { - logger := log.WithFields(log.Fields{ - "queue-name": queueName, - }) - - messagesInQueue, err := r.palomaClient.QueryMessagesInQueue(ctx, queueName) - logger.Debug("got ", len(messagesInQueue), " messages from ", queueName) - if err != nil { - logger.WithError(err).Error("couldn't get messages to relay") - return err - } - - messagesForSigning, err := r.palomaClient.QueryMessagesForSigning(ctx, queueName) - if err != nil { - logger.Error("failed getting messages to sign") - return err - } - - err = r.SignMessages(ctx, queueName, messagesForSigning, p) - if err != nil { - logger.Error("failed signing messages") - processErrors.Add(err) - } - - err = r.ProvideEvidenceForMessages(ctx, queueName, messagesInQueue, p) - if err != nil { - logger.Error("failed providing evidence for messages") - processErrors.Add(err) - } - - err = r.RelayMessages(ctx, queueName, messagesInQueue, p) - if err != nil { - logger.Error("failed relaying messages") - processErrors.Add(err) - } - } - } - - if processErrors.Err() { - return processErrors - } - - return nil -} - -func (r *Relayer) broadcastSignatures(ctx context.Context, queueTypeName string, sigs []chain.SignedQueuedMessage) error { - broadcastMessageSignatures, err := slice.MapErr( - sigs, - func(sig chain.SignedQueuedMessage) (paloma.BroadcastMessageSignatureIn, error) { - log.WithFields( - log.Fields{ - "id": sig.ID, - "queue-type-name": queueTypeName, - }, - ).Debug("broadcasting signed message") - - return paloma.BroadcastMessageSignatureIn{ - ID: sig.ID, - QueueTypeName: queueTypeName, - Signature: sig.Signature, - SignedByAddress: sig.SignedByAddress, - }, nil - }, - ) - if err != nil { - return err - } - - return r.palomaClient.BroadcastMessageSignatures(ctx, broadcastMessageSignatures...) -} diff --git a/relayer/relayer.go b/relayer/relayer.go index 75993d63..3a9b58ae 100644 --- a/relayer/relayer.go +++ b/relayer/relayer.go @@ -63,6 +63,9 @@ type Relayer struct { relayerConfig Config time utiltime.Time + + chainsInfos []evmtypes.ChainInfo + processors []chain.Processor } type Config struct { diff --git a/relayer/start.go b/relayer/start.go index 5f3e17ee..c4e4837b 100644 --- a/relayer/start.go +++ b/relayer/start.go @@ -6,13 +6,15 @@ import ( "sync" "time" - "github.com/VolumeFi/whoops" - "github.com/palomachain/pigeon/errors" - "github.com/palomachain/pigeon/util/channels" log "github.com/sirupsen/logrus" ) -const defaultLoopTimeout = 1 * time.Minute +const ( + updateExternalChainsLoopInterval = 1 * time.Minute + signMessagesLoopInterval = 1 * time.Minute + relayMessagesLoopInterval = 1 * time.Minute + attestMessagesLoopInterval = 1 * time.Minute +) func (r *Relayer) waitUntilStaking(ctx context.Context) error { for { @@ -36,118 +38,42 @@ func (r *Relayer) waitUntilStaking(ctx context.Context) error { } } -// Start starts the relayer. It's responsible for handling the communication -// with Paloma and other chains. -func (r *Relayer) Start(ctx context.Context) error { - if err := r.waitUntilStaking(ctx); err != nil { - return err - } - - log.Info("starting relayer") - var locker sync.Mutex - - go r.startUpdateExternalChainInfos(ctx, &locker) - - ticker := time.NewTicker(defaultLoopTimeout) +func (r *Relayer) startProcess(ctx context.Context, locker sync.Locker, tickerInterval time.Duration, process func(context.Context, sync.Locker) error) { + ticker := time.NewTicker(tickerInterval) defer ticker.Stop() - // only used to enter into the loop below immediately after the first "tick" - firstLoopEnter := make(chan time.Time, 1) - firstLoopEnter <- time.Time{} - - go func() { - r.startKeepAlive(ctx, &locker) - }() - - tickerCh := channels.FanIn(ticker.C, firstLoopEnter) + logger := log.WithFields(log.Fields{}) for { - log.Debug("waiting on the loop for a new tick") select { case <-ctx.Done(): - log.Warn("exiting due to context being done") - return ctx.Err() - case _, chOpen := <-tickerCh: - if !chOpen { - if ctx.Err() != nil { - return nil - } - return whoops.WrapS(ErrUnknown, "ticker channel for message processing was closed unexpectedly") + logger.Warn("exiting due to context being done") + return + case <-ticker.C: + err := process(ctx, locker) + if err != nil { + logger.Error(err) } - if err := r.process(ctx, &locker); err != nil { - log.WithError(err).Error("error while trying to process messages") - } - } - } -} - -func (r *Relayer) startUpdateExternalChainInfos(ctx context.Context, locker sync.Locker) { - ticker := time.NewTicker(defaultLoopTimeout) - defer ticker.Stop() - - for range ticker.C { - processors, err := r.buildProcessors(ctx) - if err != nil { - log.WithFields(log.Fields{ - "err": err, - }).Error("couldn't build processors to update external chain info") - - continue - } - - log.Info("trying to update external chain info") - - locker.Lock() - err = r.updateExternalChainInfos(ctx, processors) - locker.Unlock() - - if err != nil { - log.WithFields(log.Fields{ - "err": err, - }).Error("couldn't update external chain info. Will try again.") } } } -func (r *Relayer) process(ctx context.Context, locker sync.Locker) error { - log.Info("relayer loop") - if ctx.Err() != nil { - log.Info("exiting relayer loop as context has ended") - return ctx.Err() - } - - processors, err := r.buildProcessors(ctx) - if err != nil { +// Start starts the relayer. It's responsible for handling the communication +// with Paloma and other chains. +func (r *Relayer) Start(ctx context.Context) error { + if err := r.waitUntilStaking(ctx); err != nil { return err } - locker.Lock() - err = r.Process(ctx, processors) - locker.Unlock() + log.Info("starting relayer") + var locker sync.Mutex - switch { - case err == nil: - // success - return nil - case goerrors.Is(err, context.Canceled): - log.WithFields(log.Fields{ - "err": err, - }).Debug("exited from the process loop due the context being canceled") - return nil - case goerrors.Is(err, context.DeadlineExceeded): - log.WithFields(log.Fields{ - "err": err, - }).Debug("exited from the process loop due the context deadline being exceeded") - return nil - case errors.IsUnrecoverable(err): - // there is no way that we can recover from this - log.WithFields(log.Fields{ - "err": err, - }).Error("unrecoverable error returned") - return err - default: - log.WithFields(log.Fields{ - "err": err, - }).Error("error returned in process loop") - return nil - } + // Start background goroutines to run separately from each other + go r.startProcess(ctx, &locker, updateExternalChainsLoopInterval, r.UpdateExternalChainInfos) + go r.startProcess(ctx, &locker, signMessagesLoopInterval, r.SignMessages) + go r.startProcess(ctx, &locker, relayMessagesLoopInterval, r.RelayMessages) + go r.startProcess(ctx, &locker, attestMessagesLoopInterval, r.AttestMessages) + + // Start the foreground process + r.startProcess(ctx, &locker, r.relayerConfig.KeepAliveLoopTimeout, r.keepAlive) + return nil } diff --git a/relayer/update_valset.go b/relayer/update_valset.go index e197e74c..9c93efd4 100644 --- a/relayer/update_valset.go +++ b/relayer/update_valset.go @@ -2,6 +2,7 @@ package relayer import ( "context" + "sync" "github.com/palomachain/pigeon/chain" "github.com/palomachain/pigeon/chain/paloma" @@ -9,10 +10,19 @@ import ( log "github.com/sirupsen/logrus" ) -func (r *Relayer) updateExternalChainInfos(ctx context.Context, processors []chain.Processor) error { +func (r *Relayer) UpdateExternalChainInfos(ctx context.Context, locker sync.Locker) error { + err := r.buildProcessors(ctx, locker) + if err != nil { + log.WithFields(log.Fields{ + "err": err, + }).Error("couldn't build processors to update external chain info") + + return err + } + log.Info("updating external chain infos") externalAccounts := slice.Map( - processors, + r.processors, func(p chain.Processor) chain.ExternalAccount { return p.ExternalAccount() }, @@ -36,5 +46,9 @@ func (r *Relayer) updateExternalChainInfos(ctx context.Context, processors []cha return nil } - return r.palomaClient.AddExternalChainInfo(ctx, chainInfos...) + locker.Lock() + err = r.palomaClient.AddExternalChainInfo(ctx, chainInfos...) + locker.Unlock() + + return err } diff --git a/testutil/testutil.go b/testutil/testutil.go new file mode 100644 index 00000000..03c8dfa7 --- /dev/null +++ b/testutil/testutil.go @@ -0,0 +1,6 @@ +package testutil + +type FakeMutex struct{} + +func (m FakeMutex) Lock() {} +func (m FakeMutex) Unlock() {}