Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 27 additions & 20 deletions integration-tests/logpoller/pg_logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,17 @@ import (
"github.com/smartcontractkit/chainlink-ton/pkg/logpoller/store/postgres"
)

// testBlockIDExt creates a valid BlockIDExt for testing with required hash fields
func testBlockIDExt(seqNo uint32) *ton.BlockIDExt {
return &ton.BlockIDExt{
Workchain: 0,
Shard: -1,
SeqNo: seqNo,
RootHash: make([]byte, 32),
FileHash: make([]byte, 32),
}
}

// createTestLogs creates sample logs for testing with actual Counter events
func createTestLogs(t *testing.T, addr *address.Address, filterID int64) []models.Log {
t.Helper()
Expand All @@ -46,21 +57,17 @@ func createTestLogs(t *testing.T, addr *address.Address, filterID int64) []model
EndCell()

logs[i] = models.Log{
FilterID: filterID,
ChainID: "test-chain",
Address: addr,
EventSig: counter.TopicCountIncreased,
Data: eventCell,
TxHash: models.TxHash{byte(i + 1), 2, 3, 4, 5},
TxLT: uint64(1000 + i), //nolint:gosec // test code with small values
MsgLT: uint64(1000 + i), //nolint:gosec // test code with small values - same as TxLT for simplicity
TxTimestamp: time.Now().Add(time.Duration(i) * time.Minute),
Block: &ton.BlockIDExt{
Workchain: 0,
Shard: -1,
SeqNo: uint32(100 + i), //nolint:gosec // test code with small values
},
MCBlockSeqno: uint32(200 + i), //nolint:gosec // test code with small values
FilterID: filterID,
ChainID: "test-chain",
Address: addr,
EventSig: counter.TopicCountIncreased,
Data: eventCell,
TxHash: models.TxHash{byte(i + 1), 2, 3, 4, 5},
TxLT: uint64(1000 + i), //nolint:gosec // test code with small values
MsgLT: uint64(1000 + i), //nolint:gosec // test code with small values - same as TxLT for simplicity
TxTimestamp: time.Now().Add(time.Duration(i) * time.Minute),
Block: testBlockIDExt(uint32(100 + i)), //nolint:gosec // test code with small values
MCBlockSeqno: uint32(200 + i), //nolint:gosec // test code with small values
MsgIndex: int64(i),
}
}
Expand Down Expand Up @@ -197,8 +204,8 @@ func TestPgLogStore(t *testing.T) {
TxLT: uint64(5000 - i), //nolint:gosec // test code with small values
MsgLT: uint64(5000 - i), //nolint:gosec // test code with small values
TxTimestamp: sameTimestamp,
Block: &ton.BlockIDExt{Workchain: 0, Shard: -1, SeqNo: uint32(500 + i)}, //nolint:gosec // test code
MCBlockSeqno: uint32(600 + i), //nolint:gosec // test code
Block: testBlockIDExt(uint32(500 + i)), //nolint:gosec // test code
MCBlockSeqno: uint32(600 + i), //nolint:gosec // test code
MsgIndex: int64(i),
}
}
Expand Down Expand Up @@ -354,7 +361,7 @@ func TestGetLatestBlock(t *testing.T) {
TxLT: uint64(1000 + idx), //nolint:gosec // test code
MsgLT: uint64(1000 + idx), //nolint:gosec // test code
TxTimestamp: time.Now(),
Block: &ton.BlockIDExt{Workchain: 0, Shard: -1, SeqNo: uint32(100 + idx)}, //nolint:gosec // test code
Block: testBlockIDExt(uint32(100 + idx)), //nolint:gosec // test code
MCBlockSeqno: mcSeqno,
MsgIndex: int64(idx),
}
Expand Down Expand Up @@ -449,8 +456,8 @@ func TestMultiFilterDeduplication(t *testing.T) {
TxLT: uint64(1000 + eventIdx), //nolint:gosec // test code
MsgLT: uint64(1000 + eventIdx), //nolint:gosec // test code
TxTimestamp: baseTime.Add(time.Duration(eventIdx) * time.Minute),
Block: &ton.BlockIDExt{Workchain: 0, Shard: -1, SeqNo: uint32(100 + eventIdx)}, //nolint:gosec // test code
MCBlockSeqno: uint32(200 + eventIdx), //nolint:gosec // test code
Block: testBlockIDExt(uint32(100 + eventIdx)), //nolint:gosec // test code
MCBlockSeqno: uint32(200 + eventIdx), //nolint:gosec // test code
MsgIndex: int64(eventIdx),
}
inserted, ierr := logStore.SaveLogs(ctx, []models.Log{log}, logpoller.DefaultConfigSet.BatchInsertSize, logpoller.DefaultConfigSet.MinBatchSize)
Expand Down
9 changes: 4 additions & 5 deletions integration-tests/logpoller/pg_pruning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

"github.com/xssnick/tonutils-go/address"
"github.com/xssnick/tonutils-go/tlb"
"github.com/xssnick/tonutils-go/ton"
"github.com/xssnick/tonutils-go/tvm/cell"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
Expand Down Expand Up @@ -58,8 +57,8 @@ func createTestLogsForPruning(t *testing.T, addr *address.Address, filterID int6
TxLT: uint64(1000 + i), //nolint:gosec // test code
MsgLT: uint64(1000 + i), //nolint:gosec // test code
TxTimestamp: baseTime.Add(time.Duration(i) * time.Minute),
Block: &ton.BlockIDExt{Workchain: 0, Shard: -1, SeqNo: uint32(100 + i)}, //nolint:gosec // test code
MCBlockSeqno: uint32(200 + i), //nolint:gosec // test code
Block: testBlockIDExt(uint32(100 + i)), //nolint:gosec // test code
MCBlockSeqno: uint32(200 + i), //nolint:gosec // test code
MsgIndex: int64(i),
}

Expand Down Expand Up @@ -94,8 +93,8 @@ func createLogsWithTxLT(t *testing.T, addr *address.Address, filterID int64, txL
TxLT: txLT,
MsgLT: txLT,
TxTimestamp: baseTime.Add(time.Duration(i) * time.Minute),
Block: &ton.BlockIDExt{Workchain: 0, Shard: -1, SeqNo: uint32(100 + i)}, //nolint:gosec // test code
MCBlockSeqno: uint32(200 + i), //nolint:gosec // test code
Block: testBlockIDExt(uint32(100 + i)), //nolint:gosec // test code
MCBlockSeqno: uint32(200 + i), //nolint:gosec // test code
MsgIndex: 0,
}
}
Expand Down
29 changes: 20 additions & 9 deletions integration-tests/smoke/chainaccessor/accessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,17 @@ const (
MockOffRampAddr = "EQDKbjIcfM6ezt8KjKJJLshZJJSqX7XOA4ff-W72r5gqPrHF"
)

// testBlockIDExt creates a valid BlockIDExt for testing with required hash fields
func testBlockIDExt(seqNo uint32) *ton.BlockIDExt {
return &ton.BlockIDExt{
Workchain: 0,
Shard: -1,
SeqNo: seqNo,
RootHash: make([]byte, 32),
FileHash: make([]byte, 32),
}
}

// BOC (Bag of Cells) data captured from TypeScript tests.
//
// IMPORTANT: These BOCs are captured from contracts/tests/ccip/CCIPRouter.spec.ts
Expand Down Expand Up @@ -484,7 +495,7 @@ func testCommitReportsMixedHelper(t *testing.T, lp logpoller.Service, logStore l
TxHash: lptypes.TxHash{1, 2, 3, 4, 5},
TxLT: 1000,
TxTimestamp: baseTimestamp.Add(1 * time.Second),
Block: &ton.BlockIDExt{Workchain: 0, Shard: -1, SeqNo: 100},
Block: testBlockIDExt(100),
MCBlockSeqno: 200,
MsgIndex: 0,
},
Expand All @@ -498,7 +509,7 @@ func testCommitReportsMixedHelper(t *testing.T, lp logpoller.Service, logStore l
TxHash: lptypes.TxHash{2, 3, 4, 5, 6},
TxLT: 1001,
TxTimestamp: baseTimestamp.Add(2 * time.Second),
Block: &ton.BlockIDExt{Workchain: 0, Shard: -1, SeqNo: 101},
Block: testBlockIDExt(101),
MCBlockSeqno: 201,
MsgIndex: 1,
},
Expand All @@ -512,7 +523,7 @@ func testCommitReportsMixedHelper(t *testing.T, lp logpoller.Service, logStore l
TxHash: lptypes.TxHash{3, 4, 5, 6, 7},
TxLT: 1002,
TxTimestamp: baseTimestamp.Add(3 * time.Second),
Block: &ton.BlockIDExt{Workchain: 0, Shard: -1, SeqNo: 102},
Block: testBlockIDExt(102),
MCBlockSeqno: 202,
MsgIndex: 2,
},
Expand All @@ -526,7 +537,7 @@ func testCommitReportsMixedHelper(t *testing.T, lp logpoller.Service, logStore l
TxHash: lptypes.TxHash{4, 5, 6, 7, 8},
TxLT: 1003,
TxTimestamp: baseTimestamp.Add(4 * time.Second),
Block: &ton.BlockIDExt{Workchain: 0, Shard: -1, SeqNo: 103},
Block: testBlockIDExt(103),
MCBlockSeqno: 203,
MsgIndex: 3,
},
Expand All @@ -540,7 +551,7 @@ func testCommitReportsMixedHelper(t *testing.T, lp logpoller.Service, logStore l
TxHash: lptypes.TxHash{5, 6, 7, 8, 9},
TxLT: 1004,
TxTimestamp: baseTimestamp.Add(5 * time.Second),
Block: &ton.BlockIDExt{Workchain: 0, Shard: -1, SeqNo: 104},
Block: testBlockIDExt(104),
MCBlockSeqno: 204,
MsgIndex: 4,
},
Expand Down Expand Up @@ -609,7 +620,7 @@ func testCommitReportsBasicHelper(t *testing.T, lp logpoller.Service, logStore l
TxHash: lptypes.TxHash{1, 2, 3, 4, 5},
TxLT: 1000,
TxTimestamp: logTimestamp,
Block: &ton.BlockIDExt{Workchain: 0, Shard: -1, SeqNo: 100},
Block: testBlockIDExt(100),
MCBlockSeqno: 200,
MsgIndex: 0,
}}, logpoller.DefaultConfigSet.BatchInsertSize, logpoller.DefaultConfigSet.MinBatchSize)
Expand Down Expand Up @@ -870,7 +881,7 @@ func testExecutedMessagesHelper(t *testing.T, lp logpoller.Service, logStore log
TxHash: lptypes.TxHash{1, 2, 3, 4, 5},
TxLT: 1000,
TxTimestamp: baseTimestamp.Add(1 * time.Second),
Block: &ton.BlockIDExt{Workchain: 0, Shard: -1, SeqNo: 100},
Block: testBlockIDExt(100),
MCBlockSeqno: 200,
MsgLT: 1000,
MsgIndex: 0,
Expand All @@ -884,7 +895,7 @@ func testExecutedMessagesHelper(t *testing.T, lp logpoller.Service, logStore log
TxHash: lptypes.TxHash{2, 3, 4, 5, 6},
TxLT: 1001,
TxTimestamp: baseTimestamp.Add(2 * time.Second),
Block: &ton.BlockIDExt{Workchain: 0, Shard: -1, SeqNo: 101},
Block: testBlockIDExt(101),
MCBlockSeqno: 201,
MsgLT: 1001,
MsgIndex: 1,
Expand All @@ -898,7 +909,7 @@ func testExecutedMessagesHelper(t *testing.T, lp logpoller.Service, logStore log
TxHash: lptypes.TxHash{3, 4, 5, 6, 7},
TxLT: 1002,
TxTimestamp: baseTimestamp.Add(3 * time.Second),
Block: &ton.BlockIDExt{Workchain: 0, Shard: -1, SeqNo: 102},
Block: testBlockIDExt(102),
MCBlockSeqno: 202,
MsgLT: 1002,
MsgIndex: 2,
Expand Down
127 changes: 100 additions & 27 deletions pkg/logpoller/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,59 +2,132 @@ package logpoller

import (
"context"
"fmt"

"github.com/xssnick/tonutils-go/address"

"github.com/smartcontractkit/chainlink-ton/pkg/logpoller/models"
)

// buildFilterIndex creates a filter index for efficient lookup during processing.
// Returns FilterIndex mapping filter keys to Filter objects, enabling direct property access.
func (lp *service) buildFilterIndex(ctx context.Context, addresses []*address.Address) (models.FilterIndex, error) {
filterIndex := make(models.FilterIndex)
// buildFilterIndex builds FilterIndex from the in-memory filtersByName cache.
// Returns FilterIndex mapping filter keys to Filter objects for direct property access.
// Note: returned Filter pointers reference cached data - callers must not mutate them.
func (lp *service) buildFilterIndex(ctx context.Context) (models.FilterIndex, error) {
if err := lp.loadFilters(ctx); err != nil {
return nil, err
}

for _, addr := range addresses {
filters, err := lp.filterStore.GetFiltersByAddress(ctx, addr)
if err != nil {
return nil, fmt.Errorf("failed to get filters for %s: %w", addr.String(), err)
}
lp.filtersMu.RLock()
defer lp.filtersMu.RUnlock()

for _, filter := range filters {
key := models.FilterKey{
Address: addr,
MsgType: filter.MsgType,
EventSig: filter.EventSig,
}
keyStr := key.String()
filterIndex[keyStr] = append(filterIndex[keyStr], &filter)
filterIndex := make(models.FilterIndex)
for _, filter := range lp.filtersByName {
key := models.FilterKey{
Address: filter.Address,
MsgType: filter.MsgType,
EventSig: filter.EventSig,
}
keyStr := key.String()
filterIndex[keyStr] = append(filterIndex[keyStr], filter)
}

return filterIndex, nil
}

// RegisterFilter adds a new filter to monitor specific address/event signature combinations.
// Note: Filter changes take effect on the next LogPoller loop tick (up to pollPeriod delay)
// getDistinctAddresses returns unique addresses from the in-memory filtersByName cache.
func (lp *service) getDistinctAddresses(ctx context.Context) ([]*address.Address, error) {
if err := lp.loadFilters(ctx); err != nil {
return nil, err
}

lp.filtersMu.RLock()
defer lp.filtersMu.RUnlock()

addressSet := make(map[string]*address.Address)
for _, filter := range lp.filtersByName {
addrStr := filter.Address.String()
if _, ok := addressSet[addrStr]; !ok {
addressSet[addrStr] = filter.Address
}
}

addresses := make([]*address.Address, 0, len(addressSet))
for _, addr := range addressSet {
addresses = append(addresses, addr)
}

return addresses, nil
}

// RegisterFilter registers a filter for log polling.
// Checks cache first - skips DB if filter already exists with same config.
//
// Note: Filter changes take effect on the next LogPoller loop tick (up to pollPeriod delay).
// If registration occurs before run() reads addresses, the change applies immediately.
// Otherwise, it waits until the next tick.
func (lp *service) RegisterFilter(ctx context.Context, flt models.Filter) (int64, error) {
id, err := lp.filterStore.RegisterFilter(ctx, flt)
func (lp *service) RegisterFilter(ctx context.Context, filter models.Filter) (int64, error) {
// Ensure cache is loaded
if err := lp.loadFilters(ctx); err != nil {
return 0, err
}

// Check cache first (read lock)
lp.filtersMu.RLock()
if cached, ok := lp.filtersByName[filter.Name]; ok {
// Filter exists - check if config matches
if cached.Address.String() == filter.Address.String() &&
cached.MsgType == filter.MsgType &&
cached.EventSig == filter.EventSig {
// Same config - return cached ID, skip DB
id := cached.ID
lp.filtersMu.RUnlock()
return id, nil
}
}
lp.filtersMu.RUnlock()

// Cache miss or config changed - hit DB
id, err := lp.filterStore.RegisterFilter(ctx, filter)
if err != nil {
return 0, err
}

// Update cache (write lock)
lp.filtersMu.Lock()
filter.ID = id
lp.filtersByName[filter.Name] = &filter
lp.filtersMu.Unlock()

return id, nil
}

// UnregisterFilter removes a filter by name.
// Note: Filter removal takes effect on the next LogPoller loop tick (up to pollPeriod delay)
// UnregisterFilter marks a filter as deleted.
//
// Note: Filter removal takes effect on the next LogPoller loop tick (up to pollPeriod delay).
// If unregistration occurs during an active tick, the old filter continues processing for that tick.
func (lp *service) UnregisterFilter(ctx context.Context, name string) error {
return lp.filterStore.UnregisterFilter(ctx, name)
// Hit DB first (marks is_deleted = true)
if err := lp.filterStore.UnregisterFilter(ctx, name); err != nil {
return err
}

// Update cache
lp.filtersMu.Lock()
delete(lp.filtersByName, name)
lp.filtersMu.Unlock()

return nil
}

// HasFilter checks if a filter with the given name exists
// HasFilter checks if a filter with the given name exists.
// Uses in-memory cache - no database query.
func (lp *service) HasFilter(ctx context.Context, name string) (bool, error) {
return lp.filterStore.HasFilter(ctx, name)
if err := lp.loadFilters(ctx); err != nil {
return false, err
}

lp.filtersMu.RLock()
_, exists := lp.filtersByName[name]
lp.filtersMu.RUnlock()

return exists, nil
}
Loading
Loading