Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions llo/channel_definitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"sort"
"sync"

llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo"
)
Expand Down Expand Up @@ -69,3 +70,57 @@ func subtractChannelDefinitions(minuend llotypes.ChannelDefinitions, subtrahend

return difference
}

// channelDefinitionOptsCache stores pre-parsed channel opts to avoid repeated JSON parsing.
// See: BenchmarkChannelOptsCache_* tests in channel_definitions_bench_test.go for performance details.
type channelDefinitionOptsCache struct {
// OCR driver should ensure that each phase is only called by one thread at a time.
// The mu here is added as a safeguard for future proofing.
// Benchmarks showed that without a mutex we were saving about 10ns per call to Get .
mu sync.Mutex
cache map[llotypes.ChannelID]any
}

var _ ChannelDefinitionOptsCache = (*channelDefinitionOptsCache)(nil)

func NewChannelDefinitionOptsCache() ChannelDefinitionOptsCache {
return &channelDefinitionOptsCache{
cache: make(map[llotypes.ChannelID]any),
}
}

func (c *channelDefinitionOptsCache) Set(
channelID llotypes.ChannelID,
channelOpts llotypes.ChannelOpts,
codec ReportCodec,
) error {
// codec may or may not implement OptsParser interface - that is the codec's choice.
// if codec does not then we cannot cache the opts. Codec may do this if they do not have opts.
optsParser, ok := codec.(OptsParser)
if !ok {
return fmt.Errorf("codec does not implement OptsParser interface")
}

parsedOpts, err := optsParser.ParseOpts(channelOpts)
if err != nil {
return fmt.Errorf("failed to parse opts for channelID %d: %w", channelID, err)
}

c.mu.Lock()
defer c.mu.Unlock()
c.cache[channelID] = parsedOpts
return nil
}

func (c *channelDefinitionOptsCache) Get(channelID llotypes.ChannelID) (any, bool) {
c.mu.Lock()
defer c.mu.Unlock()
val, ok := c.cache[channelID]
return val, ok
}

func (c *channelDefinitionOptsCache) Delete(channelID llotypes.ChannelID) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.cache, channelID)
}
84 changes: 84 additions & 0 deletions llo/channel_definitions_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package llo

import (
"fmt"
"testing"

"github.com/goccy/go-json"

llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo"
)

// Benchmark the cost of parsing JSON vs using the cache.
//
// Run with:
// go test . -bench=BenchmarkChannelOptsCache -benchmem -run=NONE
//
// Expected results (approximate):
//
// DirectParse: ~600-800 ns/op | 640 B/op | 6 allocs
// CacheGet: ~15-25 ns/op | 0 B/op | 0 allocs
// Speedup: ~40-50x faster, zero allocations
//
// =============================================================================

// Example opts for existing codecs that use JSON opts format
var benchmarkOptsJSON = []byte(`{"feedID":"0x0001020304050607080910111213141516171819202122232425262728293031","baseUSDFee":"1.5","expirationWindow":3600,"timeResolution":"ns","abi":[{"type":"int192","expression":"Sum(s1,s2)","expressionStreamId":100},{"type":"int192"},{"type":"uint256"}]}`)

type benchParsedOpts struct {
FeedID string `json:"feedID"`
BaseUSDFee string `json:"baseUSDFee"`
ExpirationWindow uint32 `json:"expirationWindow"`
TimeResolution string `json:"timeResolution,omitempty"`
ABI []abiEntry `json:"abi"`
}

type abiEntry struct {
Type string `json:"type"`
Expression string `json:"expression,omitempty"`
ExpressionStreamID uint32 `json:"expressionStreamId,omitempty"`
}

type benchMockCodec struct{}

func (benchMockCodec) Encode(Report, llotypes.ChannelDefinition, any) ([]byte, error) {
return nil, nil
}
func (benchMockCodec) Verify(llotypes.ChannelDefinition) error { return nil }
func (benchMockCodec) ParseOpts(opts []byte) (any, error) {
var parsed benchParsedOpts
if err := json.Unmarshal(opts, &parsed); err != nil {
return nil, fmt.Errorf("failed to parse opts: %w", err)
}
return parsed, nil
}

func BenchmarkChannelOptsCache(b *testing.B) {
b.Run("DirectParse", func(b *testing.B) {
// Measures the cost of parsing JSON directly.
// This is the usage pattern without caching.
for i := 0; i < b.N; i++ {
var opts benchParsedOpts
_ = json.Unmarshal(benchmarkOptsJSON, &opts)
}
})

b.Run("CacheGet", func(b *testing.B) {
// Measures the cost of cache lookup + type assertion.
// This is the required usage pattern for the cache.
cache := NewChannelDefinitionOptsCache()
codec := benchMockCodec{}

if err := cache.Set(1, benchmarkOptsJSON, codec); err != nil {
b.Fatal(err)
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
val, ok := cache.Get(1)
if ok {
_ = val.(benchParsedOpts)
}
}
})
}
107 changes: 106 additions & 1 deletion llo/channel_definitions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type mockReportCodec struct {
err error
}

func (m mockReportCodec) Encode(Report, llotypes.ChannelDefinition) ([]byte, error) {
func (m mockReportCodec) Encode(Report, llotypes.ChannelDefinition, any) ([]byte, error) {
return nil, nil
}

Expand Down Expand Up @@ -122,3 +122,108 @@ func Test_VerifyChannelDefinitions(t *testing.T) {
require.NoError(t, err)
})
}

func Test_ChannelDefinitionsOptsCache(t *testing.T) {
t.Run("Set and Get with OptsParser codec", func(t *testing.T) {
cache := NewChannelDefinitionOptsCache()
codec := mockCodec{timeResolution: 4}
channelID := llotypes.ChannelID(1)

setErr := cache.Set(channelID, llotypes.ChannelOpts{}, codec)
require.NoError(t, setErr)

val, exists := cache.Get(channelID)
require.True(t, exists)
require.NotNil(t, val)
})

t.Run("Set returns error when codec does not implement OptsParser", func(t *testing.T) {
cache := NewChannelDefinitionOptsCache()
codec := mockReportCodec{}

err := cache.Set(llotypes.ChannelID(1), llotypes.ChannelOpts{}, codec)
require.Error(t, err)
require.Contains(t, err.Error(), "does not implement OptsParser")

val, exists := cache.Get(llotypes.ChannelID(1))
require.False(t, exists)
require.Nil(t, val)
})

t.Run("Set replaces existing value", func(t *testing.T) {
cache := NewChannelDefinitionOptsCache()
codec1 := mockCodec{timeResolution: 4}
codec2 := mockCodec{timeResolution: 3}
channelID := llotypes.ChannelID(1)

cache.Set(channelID, llotypes.ChannelOpts{}, codec1)
cache.Set(channelID, llotypes.ChannelOpts{}, codec2)

val, exists := cache.Get(channelID)
require.True(t, exists)
mc := val.(mockCodec)
require.Equal(t, codec2.timeResolution, mc.timeResolution)
})

t.Run("Multiple items in cache", func(t *testing.T) {
cache := NewChannelDefinitionOptsCache()
codec := mockCodec{timeResolution: 4}

cache.Set(llotypes.ChannelID(1), llotypes.ChannelOpts{}, codec)
cache.Set(llotypes.ChannelID(2), llotypes.ChannelOpts{}, codec)
cache.Set(llotypes.ChannelID(3), llotypes.ChannelOpts{}, codec)

val1, exists1 := cache.Get(llotypes.ChannelID(1))
require.True(t, exists1)
require.Equal(t, codec.timeResolution, val1.(mockCodec).timeResolution)

val2, exists2 := cache.Get(llotypes.ChannelID(2))
require.True(t, exists2)
require.Equal(t, codec.timeResolution, val2.(mockCodec).timeResolution)

val3, exists3 := cache.Get(llotypes.ChannelID(3))
require.True(t, exists3)
require.Equal(t, codec.timeResolution, val3.(mockCodec).timeResolution)
})

t.Run("Delete removes item", func(t *testing.T) {
cache := NewChannelDefinitionOptsCache()
codec := mockCodec{}
channelID := llotypes.ChannelID(1)

cache.Set(channelID, llotypes.ChannelOpts{}, codec)
cache.Delete(channelID)

val, exists := cache.Get(channelID)
require.False(t, exists)
require.Nil(t, val)
})

t.Run("Delete does not affect other items", func(t *testing.T) {
cache := NewChannelDefinitionOptsCache()
codec := mockCodec{timeResolution: 4}

cache.Set(llotypes.ChannelID(1), llotypes.ChannelOpts{}, codec)
cache.Set(llotypes.ChannelID(2), llotypes.ChannelOpts{}, codec)

val1, exists1 := cache.Get(llotypes.ChannelID(1))
require.True(t, exists1)
require.NotNil(t, val1)

val2, exists2 := cache.Get(llotypes.ChannelID(2))
require.True(t, exists2)
require.NotNil(t, val2)

cache.Delete(llotypes.ChannelID(1))

// ChannelID 1 should be deleted
val, exists := cache.Get(llotypes.ChannelID(1))
require.False(t, exists)
require.Nil(t, val)

// ChannelID 2 should still exist
val, exists = cache.Get(llotypes.ChannelID(2))
require.True(t, exists)
require.NotNil(t, val)
})
}
2 changes: 1 addition & 1 deletion llo/json_report_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ var _ ReportCodec = JSONReportCodec{}

type JSONReportCodec struct{}

func (cdc JSONReportCodec) Encode(r Report, _ llotypes.ChannelDefinition) ([]byte, error) {
func (cdc JSONReportCodec) Encode(r Report, _ llotypes.ChannelDefinition, _ any) ([]byte, error) {
type encode struct {
ConfigDigest types.ConfigDigest
SeqNr uint64
Expand Down
4 changes: 2 additions & 2 deletions llo/json_report_codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func Test_JSONCodec_Properties(t *testing.T) {

properties.Property("Encode/Decode", prop.ForAll(
func(r Report) bool {
b, err := codec.Encode(r, cd)
b, err := codec.Encode(r, cd, nil)
require.NoError(t, err)
r2, err := codec.Decode(b)
require.NoError(t, err)
Expand Down Expand Up @@ -305,7 +305,7 @@ func Test_JSONCodec(t *testing.T) {

cdc := JSONReportCodec{}

encoded, err := cdc.Encode(r, llo.ChannelDefinition{})
encoded, err := cdc.Encode(r, llo.ChannelDefinition{}, nil)
require.NoError(t, err)

assert.Equal(t, `{"ConfigDigest":"0102030000000000000000000000000000000000000000000000000000000000","SeqNr":43,"ChannelID":46,"ValidAfterNanoseconds":44,"ObservationTimestampNanoseconds":45,"Values":[{"t":0,"v":"1"},{"t":0,"v":"2"},{"t":1,"v":"Q{Bid: 3.13, Benchmark: 4.4, Ask: 5.12}"}],"Specimen":true}`, string(encoded)) //nolint:testifylint // need to verify exact match including order for determinism
Expand Down
23 changes: 23 additions & 0 deletions llo/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,25 @@ type ChannelDefinitionCache interface {
Definitions() llotypes.ChannelDefinitions
}

// ChannelDefinitionOptsCache stores parsed channel definition opts to avoid
// repeated JSON unmarshalling in hot paths like Observation and Reports.
// Despite the name, this acts as a store rather than a cache: there is no TTL
// and entries are managed explicitly (set on channel add/update, deleted on remove).
// Stored as `any` requiring type assertion, but orders of magnitude faster than JSON parsing.
type ChannelDefinitionOptsCache interface {
// Set parses and caches the channel definition opts for the given channelID
// The channelOpts should match the ReportCodec's opts type.
// The codec is responsible for having a method to parse `channelOpts` into the codec's expected opts type.
// Failure to cache opts should return an error.
Set(channelID llotypes.ChannelID, channelOpts llotypes.ChannelOpts, codec ReportCodec) error
// Get retrieves cached opts for the given channelID
// Returning `any` requires type assertion to the specific ReportCodec's opts type.
// This is still considered better than parsing the opts from JSON every time we need to access them.
Get(channelID llotypes.ChannelID) (any, bool)
// Delete removes cached opts for the given channelID
Delete(channelID llotypes.ChannelID)
}

// A ReportingPlugin allows plugging custom logic into the OCR3 protocol. The OCR
// protocol handles cryptography, networking, ensuring that a sufficient number
// of nodes is in agreement about any report, transmitting the report to the
Expand Down Expand Up @@ -278,13 +297,16 @@ func (f *PluginFactory) NewReportingPlugin(ctx context.Context, cfg ocr3types.Re
ballastAlloc = make([]byte, ballastSz)
})

channelOptsCache := NewChannelDefinitionOptsCache()

return &Plugin{
f.Config,
onchainConfig.PredecessorConfigDigest,
cfg.ConfigDigest,
f.PredecessorRetirementReportCache,
f.ShouldRetireCache,
f.ChannelDefinitionCache,
channelOptsCache,
f.DataSource,
l,
cfg.N,
Expand Down Expand Up @@ -320,6 +342,7 @@ type Plugin struct {
PredecessorRetirementReportCache PredecessorRetirementReportCache
ShouldRetireCache ShouldRetireCache
ChannelDefinitionCache ChannelDefinitionCache
ChannelDefinitionOptsCache ChannelDefinitionOptsCache
DataSource DataSource
Logger logger.Logger
N int
Expand Down
Loading
Loading