Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NONEVM-916] logpoller log processing & decoding #1002

Open
wants to merge 45 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
e97ee34
Add client & collector to LogPoller struct
reductionista Dec 10, 2024
b75a0e1
Re-generate RPClient & ReaderWriter mocks
reductionista Dec 11, 2024
4eefa8a
Update loader tests for interface
reductionista Dec 11, 2024
87bbf0a
Add ILogpoller interface and call NewLogPoller() from NewChain()
reductionista Dec 12, 2024
1674be0
fix orm_test.go
reductionista Dec 12, 2024
f16c435
Fix lints in log_poller.go & types.go
reductionista Dec 12, 2024
a236d9a
Fix lints in log_poller.go
reductionista Dec 12, 2024
cab34e0
Add BlockTime & Program to Event data passed to Process()
reductionista Dec 17, 2024
b0e96b3
WIP
reductionista Dec 16, 2024
f9f6e3f
Use EventTypeProvider interface to represent a codec function for ret…
reductionista Dec 20, 2024
58abe17
remove unimplemented test
reductionista Dec 30, 2024
d08829e
Regenerate mock_orm.go
reductionista Dec 30, 2024
3879dbd
Fix lints
reductionista Dec 30, 2024
624ef9e
Add IndexedValue type for converting indexable types to be stored in db
reductionista Dec 31, 2024
2bf6656
Fill in log.ExpiresAt
reductionista Dec 31, 2024
bc8022e
Add sequence # tracking, fix floating point encoding, add tests for N…
reductionista Jan 2, 2025
8982cb8
Fix tests
reductionista Jan 7, 2025
c5b9fe4
Remove mock_orm.go
reductionista Jan 8, 2025
4f7f21a
Use UTC time, and fix lints
reductionista Jan 8, 2025
3443f93
Add tests - WIP
reductionista Jan 8, 2025
45623f5
Refactor to use new codec api, based on review
reductionista Jan 9, 2025
0cc5cfb
Add SeqNum test and fix other tests
reductionista Jan 10, 2025
8498b80
Don't set ExpireAt unless Retention is set
reductionista Jan 10, 2025
255d493
Move decoder creation before InsertFilter
reductionista Jan 10, 2025
4951646
Remove unused functions in anchor.go, add validation to TestProess test
reductionista Jan 10, 2025
a6d7085
Fix lints
reductionista Jan 10, 2025
ab24226
Remove lp.typeProvider, generate EventIdl for test event
reductionista Jan 10, 2025
40d8004
Remove unused Prefix field from ProgramEvent
reductionista Jan 13, 2025
b552a22
Fix decoding issues
reductionista Jan 13, 2025
19f47d6
Add ExtractField function, and fix DecodeSubkeyValues
reductionista Jan 14, 2025
981693f
MatchingFilters -> matchingFilters, and fix rest of expectedLog field…
reductionista Jan 14, 2025
98bd093
Fix lint errors, rename err2 & err3
reductionista Jan 14, 2025
b70cd69
Fix overflows, lints, use default map vals, & add early return to Pro…
reductionista Jan 15, 2025
bc853cd
Remove utils package, add 12-character assertion to Discriminator()
reductionista Jan 15, 2025
4f82cec
Remove TODO, uncomment internal loaders
reductionista Jan 15, 2025
f18898f
Remove ILogPoller interface, add LogPoller interface to chain.go
reductionista Jan 15, 2025
c8d5e62
Start EncodedLogCollector as soon as filters are loaded
reductionista Jan 15, 2025
e9983a3
Add comment explaining < 12 validation
reductionista Jan 15, 2025
e84fbe5
Address 2 more PR comments, and remove accidentally added file
reductionista Jan 15, 2025
82fa8cc
Use ch.multiClient instead of lazy-loader
reductionista Jan 16, 2025
d961eee
revert changes to chain.go
reductionista Jan 16, 2025
ec90e37
logpoller.LogPoller -> logpoller.Service
reductionista Jan 16, 2025
55938a0
Make MatchFiltersForEncodedEvent thread safe
reductionista Jan 16, 2025
3094b3c
Merge branch 'develop' into NONEVM-916-logpoller-process-decode
reductionista Jan 17, 2025
ec368e7
Merge branch 'develop' into NONEVM-916-logpoller-process-decode
reductionista Jan 17, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .tool-versions
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ nodejs 18.20.2
yarn 1.22.19
rust 1.59.0
golang 1.23.3
golangci-lint 1.60.1
golangci-lint 1.61.0
actionlint 1.6.22
shellcheck 0.8.0
helm 3.9.4
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ require (
github.com/hashicorp/go-plugin v1.6.2
github.com/jackc/pgx/v4 v4.18.3
github.com/jpillora/backoff v1.0.0
github.com/lib/pq v1.10.9
github.com/pelletier/go-toml/v2 v2.2.0
github.com/prometheus/client_golang v1.17.0
github.com/smartcontractkit/chainlink-common v0.4.2-0.20250116214855-f49c5c27db51
Expand Down Expand Up @@ -76,6 +75,7 @@ require (
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/leodido/go-urn v1.2.0 // indirect
github.com/lib/pq v1.10.9 // indirect
github.com/linkedin/goavro/v2 v2.12.0 // indirect
github.com/logrusorgru/aurora v2.0.3+incompatible // indirect
github.com/mailru/easyjson v0.7.7 // indirect
Expand Down
6 changes: 4 additions & 2 deletions integration-tests/smoke/event_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

contract "github.com/smartcontractkit/chainlink-solana/contracts/generated/log_read_test"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/client"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/config"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/logpoller"

"github.com/smartcontractkit/chainlink-solana/integration-tests/solclient"
Expand All @@ -49,7 +50,8 @@ func TestEventLoader(t *testing.T) {
require.NoError(t, err)

rpcURL, wsURL := setupTestValidator(t, privateKey.PublicKey().String())
rpcClient := rpc.New(rpcURL)
cl, rpcClient, err := client.NewTestClient(rpcURL, config.NewDefault(), 1*time.Second, logger.Nop())
require.NoError(t, err)
wsClient, err := ws.Connect(ctx, wsURL)
require.NoError(t, err)

Expand All @@ -62,7 +64,7 @@ func TestEventLoader(t *testing.T) {
parser := &printParser{t: t}
sender := newLogSender(t, rpcClient, wsClient)
collector := logpoller.NewEncodedLogCollector(
rpcClient,
cl,
parser,
logger.Nop(),
)
Expand Down
16 changes: 16 additions & 0 deletions pkg/solana/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,25 @@ import (
mn "github.com/smartcontractkit/chainlink-solana/pkg/solana/client/multinode"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/config"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/internal"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/logpoller"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/monitor"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/txm"
txmutils "github.com/smartcontractkit/chainlink-solana/pkg/solana/txm/utils"
)

type LogPoller interface {
Start(context.Context) error
Close() error
RegisterFilter(ctx context.Context, filter logpoller.Filter) error
UnregisterFilter(ctx context.Context, name string) error
}

type Chain interface {
types.ChainService

ID() string
Config() config.Config
LogPoller() LogPoller
TxManager() TxManager
// Reader returns a new Reader from the available list of nodes (if there are multiple, it will randomly select one)
Reader() (client.Reader, error)
Expand Down Expand Up @@ -90,6 +99,7 @@ type chain struct {
services.StateMachine
EasterTheBunny marked this conversation as resolved.
Show resolved Hide resolved
id string
cfg *config.TOMLConfig
lp LogPoller
txm *txm.Txm
balanceMonitor services.Service
lggr logger.Logger
Expand Down Expand Up @@ -312,6 +322,8 @@ func newChain(id string, cfg *config.TOMLConfig, ks core.Keystore, lggr logger.L
bc = internal.NewLoader[monitor.BalanceClient](func() (monitor.BalanceClient, error) { return ch.multiNode.SelectRPC() })
}

// TODO: import typeProvider function from codec package and pass to constructor
ch.lp = logpoller.New(logger.Sugared(logger.Named(lggr, "LogPoller")), logpoller.NewORM(ch.ID(), ds, lggr), ch.multiClient)
ch.txm = txm.NewTxm(ch.id, tc, sendTx, cfg, ks, lggr)
ch.balanceMonitor = monitor.NewBalanceMonitor(ch.id, cfg, lggr, ks, bc)
return &ch, nil
Expand Down Expand Up @@ -401,6 +413,10 @@ func (c *chain) Config() config.Config {
return c.cfg
}

func (c *chain) LogPoller() LogPoller {
return c.lp
}

func (c *chain) TxManager() TxManager {
return c.txm
}
Expand Down
25 changes: 21 additions & 4 deletions pkg/solana/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type Reader interface {
GetTransaction(ctx context.Context, txHash solana.Signature, opts *rpc.GetTransactionOpts) (*rpc.GetTransactionResult, error)
GetBlocks(ctx context.Context, startSlot uint64, endSlot *uint64) (rpc.BlocksResult, error)
GetBlocksWithLimit(ctx context.Context, startSlot uint64, limit uint64) (*rpc.BlocksResult, error)
GetBlockWithOpts(context.Context, uint64, *rpc.GetBlockOpts) (*rpc.GetBlockResult, error)
GetBlock(ctx context.Context, slot uint64) (*rpc.GetBlockResult, error)
GetSignaturesForAddressWithOpts(ctx context.Context, addr solana.PublicKey, opts *rpc.GetSignaturesForAddressOpts) ([]*rpc.TransactionSignature, error)
}
Expand Down Expand Up @@ -72,18 +73,25 @@ type Client struct {
requestGroup *singleflight.Group
}

func NewClient(endpoint string, cfg config.Config, requestTimeout time.Duration, log logger.Logger) (*Client, error) {
return &Client{
// Return both the client and the underlying rpc client for testing
func NewTestClient(endpoint string, cfg config.Config, requestTimeout time.Duration, log logger.Logger) (*Client, *rpc.Client, error) {
rpcClient := Client{
url: endpoint,
rpc: rpc.New(endpoint),
skipPreflight: cfg.SkipPreflight(),
commitment: cfg.Commitment(),
maxRetries: cfg.MaxRetries(),
txTimeout: cfg.TxTimeout(),
contextDuration: requestTimeout,
log: log,
requestGroup: &singleflight.Group{},
}, nil
}
rpcClient.rpc = rpc.New(endpoint)
return &rpcClient, rpcClient.rpc, nil
}

func NewClient(endpoint string, cfg config.Config, requestTimeout time.Duration, log logger.Logger) (*Client, error) {
rpcClient, _, err := NewTestClient(endpoint, cfg, requestTimeout, log)
return rpcClient, err
}

func (c *Client) latency(name string) func() {
Expand Down Expand Up @@ -346,6 +354,15 @@ func (c *Client) GetLatestBlockHeight(ctx context.Context) (uint64, error) {
return v.(uint64), err
}

func (c *Client) GetBlockWithOpts(ctx context.Context, slot uint64, opts *rpc.GetBlockOpts) (*rpc.GetBlockResult, error) {
// get block based on slot with custom options set
done := c.latency("get_block_with_opts")
defer done()
ctx, cancel := context.WithTimeout(ctx, c.txTimeout)
defer cancel()
return c.rpc.GetBlockWithOpts(ctx, slot, opts)
}

func (c *Client) GetBlock(ctx context.Context, slot uint64) (*rpc.GetBlockResult, error) {
// get block based on slot
done := c.latency("get_block")
Expand Down
60 changes: 60 additions & 0 deletions pkg/solana/client/mocks/reader_writer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions pkg/solana/client/multi_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,3 +166,12 @@ func (m *MultiClient) GetSignaturesForAddressWithOpts(ctx context.Context, addr

return r.GetSignaturesForAddressWithOpts(ctx, addr, opts)
}

func (m *MultiClient) GetBlockWithOpts(ctx context.Context, slot uint64, opts *rpc.GetBlockOpts) (*rpc.GetBlockResult, error) {
r, err := m.getClient()
if err != nil {
return nil, err
}

return r.GetBlockWithOpts(ctx, slot, opts)
}
33 changes: 33 additions & 0 deletions pkg/solana/codec/solana.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,3 +491,36 @@ func saveDependency(refs *codecRefs, parent, child string) {

refs.dependencies[parent] = append(deps, child)
}
func NewIDLEventCodec(idl IDL, builder commonencodings.Builder) (commontypes.RemoteCodec, error) {
typeCodecs := make(commonencodings.LenientCodecFromTypeCodec)
refs := &codecRefs{
builder: builder,
codecs: make(map[string]commonencodings.TypeCodec),
typeDefs: idl.Types,
dependencies: make(map[string][]string),
}

for _, event := range idl.Events {
name, instCodec, err := asStruct(eventFieldsAsStandardFields(event.Fields), refs, event.Name, false, false)
if err != nil {
return nil, err
}

typeCodecs[name] = instCodec
}

return typeCodecs, nil
}

func eventFieldsAsStandardFields(event []IdlEventField) []IdlField {
output := make([]IdlField, len(event))

for idx := range output {
output[idx] = IdlField{
Name: event[idx].Name,
Type: event[idx].Type,
}
}

return output
}
14 changes: 14 additions & 0 deletions pkg/solana/logpoller/discriminator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package logpoller

import (
"crypto/sha256"
"fmt"
)

const DiscriminatorLength = 8

func Discriminator(namespace, name string) [DiscriminatorLength]byte {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ilija42 has a PR that also adds a discriminator: here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that PR will go in after this one... this will use his discriminator once that gets merged.

h := sha256.New()
h.Write([]byte(fmt.Sprintf("%s:%s", namespace, name)))
return [DiscriminatorLength]byte(h.Sum(nil)[:DiscriminatorLength])
}
Loading
Loading