diff --git a/integration-tests/smoke/event_loader_test.go b/integration-tests/smoke/event_loader_test.go index cd4bc678c..cd8518e81 100644 --- a/integration-tests/smoke/event_loader_test.go +++ b/integration-tests/smoke/event_loader_test.go @@ -25,6 +25,7 @@ import ( contract "github.com/smartcontractkit/chainlink-solana/contracts/generated/log_read_test" "github.com/smartcontractkit/chainlink-solana/pkg/solana/client" + "github.com/smartcontractkit/chainlink-solana/pkg/solana/config" "github.com/smartcontractkit/chainlink-solana/pkg/solana/logpoller" "github.com/smartcontractkit/chainlink-solana/integration-tests/solclient" @@ -49,7 +50,8 @@ func TestEventLoader(t *testing.T) { require.NoError(t, err) rpcURL, wsURL := setupTestValidator(t, privateKey.PublicKey().String()) - rpcClient := rpc.New(rpcURL) + cl, rpcClient, err := client.NewTestClient(rpcURL, config.NewDefault(), 1*time.Second, logger.Nop()) + require.NoError(t, err) wsClient, err := ws.Connect(ctx, wsURL) require.NoError(t, err) @@ -62,7 +64,7 @@ func TestEventLoader(t *testing.T) { parser := &printParser{t: t} sender := newLogSender(t, rpcClient, wsClient) collector := logpoller.NewEncodedLogCollector( - rpcClient, + cl, parser, logger.Nop(), ) diff --git a/pkg/solana/client/client.go b/pkg/solana/client/client.go index 6558f96a1..c39e64218 100644 --- a/pkg/solana/client/client.go +++ b/pkg/solana/client/client.go @@ -71,10 +71,10 @@ type Client struct { requestGroup *singleflight.Group } -func NewClient(endpoint string, cfg config.Config, requestTimeout time.Duration, log logger.Logger) (*Client, error) { - return &Client{ +// Return both the client and the underlying rpc client for testing +func NewTestClient(endpoint string, cfg config.Config, requestTimeout time.Duration, log logger.Logger) (*Client, *rpc.Client, error) { + rpcClient := Client{ url: endpoint, - rpc: rpc.New(endpoint), skipPreflight: cfg.SkipPreflight(), commitment: cfg.Commitment(), maxRetries: cfg.MaxRetries(), @@ -82,7 +82,14 @@ func NewClient(endpoint string, cfg config.Config, requestTimeout time.Duration, contextDuration: requestTimeout, log: log, requestGroup: &singleflight.Group{}, - }, nil + } + rpcClient.rpc = rpc.New(endpoint) + return &rpcClient, rpcClient.rpc, nil +} + +func NewClient(endpoint string, cfg config.Config, requestTimeout time.Duration, log logger.Logger) (*Client, error) { + rpcClient, _, err := NewTestClient(endpoint, cfg, requestTimeout, log) + return rpcClient, err } func (c *Client) latency(name string) func() { diff --git a/pkg/solana/logpoller/filters.go b/pkg/solana/logpoller/filters.go index 93af60a85..20f971ec2 100644 --- a/pkg/solana/logpoller/filters.go +++ b/pkg/solana/logpoller/filters.go @@ -12,8 +12,6 @@ import ( "github.com/gagliardetto/solana-go" "github.com/smartcontractkit/chainlink-common/pkg/logger" - - "github.com/smartcontractkit/chainlink-solana/pkg/solana/logpoller/utils" ) type filters struct { @@ -88,8 +86,6 @@ func (fl *filters) RegisterFilter(ctx context.Context, filter Filter) error { return fmt.Errorf("failed to load filters: %w", err) } - filter.EventSig = utils.Discriminator("event", filter.EventName) - fl.filtersMutex.Lock() defer fl.filtersMutex.Unlock() @@ -134,17 +130,17 @@ func (fl *filters) RegisterFilter(ctx context.Context, filter Filter) error { } programID := filter.Address.ToSolana().String() - if _, ok := fl.knownPrograms[programID]; !ok { + if _, ok = fl.knownPrograms[programID]; !ok { fl.knownPrograms[programID] = 1 } else { fl.knownPrograms[programID]++ } - discriminator := base64.StdEncoding.EncodeToString(filter.EventSig[:])[:10] + discriminatorHead := filter.Discriminator()[:10] if _, ok := fl.knownPrograms[programID]; !ok { - fl.knownDiscriminators[discriminator] = 1 + fl.knownDiscriminators[discriminatorHead] = 1 } else { - fl.knownDiscriminators[discriminator]++ + fl.knownDiscriminators[discriminatorHead]++ } return nil @@ -220,13 +216,13 @@ func (fl *filters) removeFilterFromIndexes(filter Filter) { } } - discriminator := base64.StdEncoding.EncodeToString(filter.EventSig[:])[:10] - if refcount, ok := fl.knownDiscriminators[discriminator]; ok { + discriminatorHead := filter.Discriminator()[:10] + if refcount, ok := fl.knownDiscriminators[discriminatorHead]; ok { refcount-- if refcount > 0 { - fl.knownDiscriminators[discriminator] = refcount + fl.knownDiscriminators[discriminatorHead] = refcount } else { - delete(fl.knownDiscriminators, discriminator) + delete(fl.knownDiscriminators, discriminatorHead) } } } @@ -345,6 +341,8 @@ func (fl *filters) LoadFilters(ctx context.Context) error { fl.filtersByAddress = make(map[PublicKey]map[EventSignature]map[int64]struct{}) fl.filtersToBackfill = make(map[int64]struct{}) fl.filtersToDelete = make(map[int64]Filter) + fl.knownPrograms = make(map[string]uint) + fl.knownDiscriminators = make(map[string]uint) filters, err := fl.orm.SelectFilters(ctx) if err != nil { diff --git a/pkg/solana/logpoller/filters_test.go b/pkg/solana/logpoller/filters_test.go index 710f08a9f..15b14c22b 100644 --- a/pkg/solana/logpoller/filters_test.go +++ b/pkg/solana/logpoller/filters_test.go @@ -39,6 +39,12 @@ func TestFilters_LoadFilters(t *testing.T) { happyPath2, }, nil).Once() + orm.On("SelectSeqNums", mock.Anything).Return(map[int64]int64{ + 1: 18, + 2: 25, + 3: 0, + }, nil) + err := fs.LoadFilters(ctx) require.EqualError(t, err, "failed to select filters from db: db failed") err = fs.LoadFilters(ctx) @@ -110,6 +116,7 @@ func TestFilters_RegisterFilter(t *testing.T) { const filterName = "Filter" dbFilter := Filter{Name: filterName} orm.On("SelectFilters", mock.Anything).Return([]Filter{dbFilter}, nil).Once() + orm.On("SelectSeqNums", mock.Anything).Return(map[int64]int64{}, nil) newFilter := dbFilter tc.ModifyField(&newFilter) err := fs.RegisterFilter(tests.Context(t), newFilter) @@ -122,6 +129,7 @@ func TestFilters_RegisterFilter(t *testing.T) { fs := newFilters(lggr, orm) const filterName = "Filter" orm.On("SelectFilters", mock.Anything).Return(nil, nil).Once() + orm.On("SelectSeqNums", mock.Anything).Return(map[int64]int64{}, nil).Once() orm.On("InsertFilter", mock.Anything, mock.Anything).Return(int64(0), errors.New("failed to insert")).Once() filter := Filter{Name: filterName} err := fs.RegisterFilter(tests.Context(t), filter) @@ -149,6 +157,7 @@ func TestFilters_RegisterFilter(t *testing.T) { fs := newFilters(lggr, orm) const filterName = "Filter" orm.On("SelectFilters", mock.Anything).Return(nil, nil).Once() + orm.On("SelectSeqNums", mock.Anything).Return(map[int64]int64{}, nil).Once() const filterID = int64(10) orm.On("InsertFilter", mock.Anything, mock.Anything).Return(filterID, nil).Once() err := fs.RegisterFilter(tests.Context(t), Filter{Name: filterName}) @@ -180,6 +189,7 @@ func TestFilters_UnregisterFilter(t *testing.T) { fs := newFilters(lggr, orm) const filterName = "Filter" orm.On("SelectFilters", mock.Anything).Return(nil, nil).Once() + orm.On("SelectSeqNums", mock.Anything).Return(map[int64]int64{}, nil).Once() err := fs.UnregisterFilter(tests.Context(t), filterName) require.NoError(t, err) }) @@ -189,6 +199,7 @@ func TestFilters_UnregisterFilter(t *testing.T) { const filterName = "Filter" const id int64 = 10 orm.On("SelectFilters", mock.Anything).Return([]Filter{{ID: id, Name: filterName}}, nil).Once() + orm.On("SelectSeqNums", mock.Anything).Return(map[int64]int64{}, nil).Once() orm.On("MarkFilterDeleted", mock.Anything, id).Return(errors.New("db query failed")).Once() err := fs.UnregisterFilter(tests.Context(t), filterName) require.EqualError(t, err, "failed to mark filter deleted: db query failed") @@ -199,6 +210,7 @@ func TestFilters_UnregisterFilter(t *testing.T) { const filterName = "Filter" const id int64 = 10 orm.On("SelectFilters", mock.Anything).Return([]Filter{{ID: id, Name: filterName}}, nil).Once() + orm.On("SelectSeqNums", mock.Anything).Return(map[int64]int64{}, nil).Once() orm.On("MarkFilterDeleted", mock.Anything, id).Return(nil).Once() err := fs.UnregisterFilter(tests.Context(t), filterName) require.NoError(t, err) @@ -226,6 +238,9 @@ func TestFilters_PruneFilters(t *testing.T) { Name: "To keep", }, }, nil).Once() + orm.On("SelectSeqNums", mock.Anything).Return(map[int64]int64{ + 2: 25, + }, nil).Once() orm.On("DeleteFilters", mock.Anything, map[int64]Filter{toDelete.ID: toDelete}).Return(nil).Once() err := fs.PruneFilters(tests.Context(t)) require.NoError(t, err) @@ -246,6 +261,10 @@ func TestFilters_PruneFilters(t *testing.T) { Name: "To keep", }, }, nil).Once() + orm.EXPECT().SelectSeqNums(mock.Anything).Return(map[int64]int64{ + 1: 18, + 2: 25, + }, nil).Once() newToDelete := Filter{ ID: 3, Name: "To delete 2", @@ -291,6 +310,12 @@ func TestFilters_MatchingFilters(t *testing.T) { EventSig: expectedFilter1.EventSig, } orm.On("SelectFilters", mock.Anything).Return([]Filter{expectedFilter1, expectedFilter2, sameAddress, sameEventSig}, nil).Once() + orm.On("SelectSeqNums", mock.Anything).Return(map[int64]int64{ + 1: 18, + 2: 25, + 3: 14, + 4: 0, + }, nil) filters := newFilters(lggr, orm) err := filters.LoadFilters(tests.Context(t)) require.NoError(t, err) @@ -319,6 +344,10 @@ func TestFilters_GetFiltersToBackfill(t *testing.T) { Name: "notBackfilled", } orm.EXPECT().SelectFilters(mock.Anything).Return([]Filter{backfilledFilter, notBackfilled}, nil).Once() + orm.EXPECT().SelectSeqNums(mock.Anything).Return(map[int64]int64{ + 1: 18, + 2: 25, + }, nil) filters := newFilters(lggr, orm) err := filters.LoadFilters(tests.Context(t)) require.NoError(t, err) diff --git a/pkg/solana/logpoller/job.go b/pkg/solana/logpoller/job.go index 8e24d4165..bb336893c 100644 --- a/pkg/solana/logpoller/job.go +++ b/pkg/solana/logpoller/job.go @@ -122,8 +122,8 @@ func (j *getTransactionsFromBlockJob) Run(ctx context.Context) error { if block.BlockTime == nil { return fmt.Errorf("received block %d from rpc with missing block time", block.BlockHeight) - detail.blockTime = *block.BlockTime } + detail.blockTime = *block.BlockTime if len(block.Transactions) != len(blockSigsOnly.Signatures) { return fmt.Errorf("block %d has %d transactions but %d signatures", block.BlockHeight, len(block.Transactions), len(blockSigsOnly.Signatures)) diff --git a/pkg/solana/logpoller/loader_test.go b/pkg/solana/logpoller/loader_test.go index 4d3dcd8cc..2b057f7d1 100644 --- a/pkg/solana/logpoller/loader_test.go +++ b/pkg/solana/logpoller/loader_test.go @@ -3,7 +3,6 @@ package logpoller_test import ( "context" "crypto/rand" - "reflect" "sync" "sync/atomic" "testing" @@ -78,11 +77,13 @@ func TestEncodedLogCollector_ParseSingleEvent(t *testing.T) { GetBlockWithOpts(mock.Anything, mock.Anything, mock.Anything). RunAndReturn(func(_ context.Context, slot uint64, _ *rpc.GetBlockOpts) (*rpc.GetBlockResult, error) { height := slot - 1 + timeStamp := solana.UnixTimeSeconds(time.Now().Unix()) result := rpc.GetBlockResult{ Transactions: []rpc.TransactionWithMeta{}, Signatures: []solana.Signature{}, BlockHeight: &height, + BlockTime: &timeStamp, } _, _ = rand.Read(result.Blockhash[:]) @@ -132,6 +133,8 @@ func TestEncodedLogCollector_MultipleEventOrdered(t *testing.T) { hashes := make([]solana.Hash, len(slots)) scrambler := &slotUnsync{ch: make(chan struct{})} + timeStamp := solana.UnixTimeSeconds(time.Now().Unix()) + for idx := range len(sigs) { _, _ = rand.Read(sigs[idx][:]) _, _ = rand.Read(hashes[idx][:]) @@ -176,6 +179,7 @@ func TestEncodedLogCollector_MultipleEventOrdered(t *testing.T) { Transactions: []rpc.TransactionWithMeta{}, Signatures: []solana.Signature{}, BlockHeight: &height, + BlockTime: &timeStamp, }, nil } @@ -190,61 +194,72 @@ func TestEncodedLogCollector_MultipleEventOrdered(t *testing.T) { }, Signatures: []solana.Signature{sigs[slotIdx]}, BlockHeight: &height, + BlockTime: &timeStamp, }, nil }) tests.AssertEventually(t, func() bool { - return reflect.DeepEqual(parser.Events(), []logpoller.ProgramEvent{ - { - BlockData: logpoller.BlockData{ - SlotNumber: 41, - BlockHeight: 40, - BlockHash: hashes[3], - TransactionHash: sigs[3], - TransactionIndex: 0, - TransactionLogIndex: 0, - }, - Prefix: ">", - Data: "HDQnaQjSWwkNAAAASGVsbG8sIFdvcmxkISoAAAAAAAAA", + return len(parser.Events()) >= 4 + }) + + assert.Equal(t, []logpoller.ProgramEvent{ + { + BlockData: logpoller.BlockData{ + SlotNumber: 41, + BlockHeight: 40, + BlockTime: timeStamp, + BlockHash: hashes[3], + TransactionHash: sigs[3], + TransactionIndex: 0, + TransactionLogIndex: 0, }, - { - BlockData: logpoller.BlockData{ - SlotNumber: 42, - BlockHeight: 41, - BlockHash: hashes[2], - TransactionHash: sigs[2], - TransactionIndex: 0, - TransactionLogIndex: 0, - }, - Prefix: ">", - Data: "HDQnaQjSWwkNAAAASGVsbG8sIFdvcmxkISoAAAAAAAAA", + Prefix: ">", + Program: "J1zQwrBNBngz26jRPNWsUSZMHJwBwpkoDitXRV95LdK4", + Data: "HDQnaQjSWwkNAAAASGVsbG8sIFdvcmxkISoAAAAAAAAA", + }, + { + BlockData: logpoller.BlockData{ + SlotNumber: 42, + BlockHeight: 41, + BlockTime: timeStamp, + BlockHash: hashes[2], + TransactionHash: sigs[2], + TransactionIndex: 0, + TransactionLogIndex: 0, }, - { - BlockData: logpoller.BlockData{ - SlotNumber: 43, - BlockHeight: 42, - BlockHash: hashes[1], - TransactionHash: sigs[1], - TransactionIndex: 0, - TransactionLogIndex: 0, - }, - Prefix: ">", - Data: "HDQnaQjSWwkNAAAASGVsbG8sIFdvcmxkISoAAAAAAAAA", + Prefix: ">", + Program: "J1zQwrBNBngz26jRPNWsUSZMHJwBwpkoDitXRV95LdK4", + Data: "HDQnaQjSWwkNAAAASGVsbG8sIFdvcmxkISoAAAAAAAAA", + }, + { + BlockData: logpoller.BlockData{ + SlotNumber: 43, + BlockHeight: 42, + BlockTime: timeStamp, + BlockHash: hashes[1], + TransactionHash: sigs[1], + TransactionIndex: 0, + TransactionLogIndex: 0, }, - { - BlockData: logpoller.BlockData{ - SlotNumber: 44, - BlockHeight: 43, - BlockHash: hashes[0], - TransactionHash: sigs[0], - TransactionIndex: 0, - TransactionLogIndex: 0, - }, - Prefix: ">", - Data: "HDQnaQjSWwkNAAAASGVsbG8sIFdvcmxkISoAAAAAAAAA", + Prefix: ">", + Program: "J1zQwrBNBngz26jRPNWsUSZMHJwBwpkoDitXRV95LdK4", + Data: "HDQnaQjSWwkNAAAASGVsbG8sIFdvcmxkISoAAAAAAAAA", + }, + { + BlockData: logpoller.BlockData{ + SlotNumber: 44, + BlockHeight: 43, + BlockTime: timeStamp, + BlockHash: hashes[0], + TransactionHash: sigs[0], + TransactionIndex: 0, + TransactionLogIndex: 0, }, - }) - }) + Prefix: ">", + Program: "J1zQwrBNBngz26jRPNWsUSZMHJwBwpkoDitXRV95LdK4", + Data: "HDQnaQjSWwkNAAAASGVsbG8sIFdvcmxkISoAAAAAAAAA", + }, + }, parser.Events()) client.AssertExpectations(t) } @@ -337,12 +352,14 @@ func TestEncodedLogCollector_BackfillForAddress(t *testing.T) { } height := slot - 1 + timeStamp := solana.UnixTimeSeconds(time.Now().Unix()) if idx == -1 { return &rpc.GetBlockResult{ Transactions: []rpc.TransactionWithMeta{}, Signatures: []solana.Signature{}, BlockHeight: &height, + BlockTime: &timeStamp, }, nil } @@ -361,6 +378,7 @@ func TestEncodedLogCollector_BackfillForAddress(t *testing.T) { }, Signatures: []solana.Signature{sigs[idx*2], sigs[(idx*2)+1]}, BlockHeight: &height, + BlockTime: &timeStamp, }, nil }) diff --git a/pkg/solana/logpoller/models.go b/pkg/solana/logpoller/models.go index b1a1db3ac..8d1ec356a 100644 --- a/pkg/solana/logpoller/models.go +++ b/pkg/solana/logpoller/models.go @@ -1,9 +1,12 @@ package logpoller import ( + "encoding/base64" "time" "github.com/lib/pq" + + "github.com/smartcontractkit/chainlink-solana/pkg/solana/logpoller/utils" ) type Filter struct { @@ -26,6 +29,11 @@ func (f Filter) MatchSameLogs(other Filter) bool { f.EventIdl.Equal(other.EventIdl) && f.SubkeyPaths.Equal(other.SubkeyPaths) } +func (f Filter) Discriminator() string { + d := utils.Discriminator("event", f.Name) + return base64.StdEncoding.EncodeToString(d[:]) +} + type Log struct { ID int64 FilterID int64