Skip to content

Commit bb7ad59

Browse files
committed
pkg/capabilities: support replacing registered capabilities after shutdown
1 parent d611017 commit bb7ad59

File tree

5 files changed

+166
-7
lines changed

5 files changed

+166
-7
lines changed

pkg/capabilities/capabilities.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"time"
1010

1111
p2ptypes "github.com/smartcontractkit/libocr/ragep2p/types"
12+
"google.golang.org/grpc/connectivity"
1213
"google.golang.org/protobuf/proto"
1314
"google.golang.org/protobuf/types/known/anypb"
1415

@@ -209,6 +210,7 @@ type Validatable interface {
209210
// or extension in the future.
210211
type BaseCapability interface {
211212
Info(ctx context.Context) (CapabilityInfo, error)
213+
GetState() connectivity.State
212214
}
213215

214216
type TriggerRegistrationRequest struct {
@@ -397,6 +399,11 @@ type CapabilityInfo struct {
397399
SpendTypes []CapabilitySpendType
398400
}
399401

402+
// GetState is included to implement BaseCapability.
403+
func (c CapabilityInfo) GetState() connectivity.State {
404+
return connectivity.Idle
405+
}
406+
400407
// Parse out the version from the ID.
401408
func (c CapabilityInfo) Version() string {
402409
return c.ID[strings.Index(c.ID, "@")+1:]

pkg/capabilities/registry/base.go

Lines changed: 141 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@ import (
66
"fmt"
77
"strings"
88
"sync"
9+
"sync/atomic"
910

1011
"github.com/Masterminds/semver/v3"
12+
"google.golang.org/grpc/connectivity"
1113

1214
"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
1315
"github.com/smartcontractkit/chainlink-common/pkg/logger"
@@ -18,8 +20,13 @@ var (
1820
ErrCapabilityAlreadyExists = errors.New("capability already exists")
1921
)
2022

23+
type atomicBaseCapability interface {
24+
capabilities.BaseCapability
25+
Update(capabilities.BaseCapability) error
26+
}
27+
2128
type baseRegistry struct {
22-
m map[string]capabilities.BaseCapability
29+
m map[string]atomicBaseCapability
2330
lggr logger.Logger
2431
mu sync.RWMutex
2532
}
@@ -28,7 +35,7 @@ var _ core.CapabilitiesRegistryBase = (*baseRegistry)(nil)
2835

2936
func NewBaseRegistry(lggr logger.Logger) core.CapabilitiesRegistryBase {
3037
return &baseRegistry{
31-
m: map[string]capabilities.BaseCapability{},
38+
m: map[string]atomicBaseCapability{},
3239
lggr: logger.Named(lggr, "registries.basic"),
3340
}
3441
}
@@ -163,12 +170,31 @@ func (r *baseRegistry) Add(ctx context.Context, c capabilities.BaseCapability) e
163170
}
164171

165172
id := info.ID
166-
_, ok := r.m[id]
173+
bc, ok := r.m[id]
167174
if ok {
168-
return fmt.Errorf("%w: id %s found in registry", ErrCapabilityAlreadyExists, id)
175+
if bc.GetState() != connectivity.Shutdown {
176+
return fmt.Errorf("%w: id %s found in registry", ErrCapabilityAlreadyExists, id)
177+
}
178+
if err := bc.Update(c); err != nil {
179+
return fmt.Errorf("failed to update capability %s: %w", id, err)
180+
}
181+
} else {
182+
var ac atomicBaseCapability
183+
switch info.CapabilityType {
184+
case capabilities.CapabilityTypeTrigger:
185+
ac = &atomicTriggerCapability{}
186+
case capabilities.CapabilityTypeAction, capabilities.CapabilityTypeConsensus, capabilities.CapabilityTypeTarget:
187+
ac = &atomicExecuteCapability{}
188+
case capabilities.CapabilityTypeCombined:
189+
ac = &atomicExecuteAndTriggerCapability{}
190+
default:
191+
return fmt.Errorf("unknown capability type: %s", info.CapabilityType)
192+
}
193+
if err := ac.Update(c); err != nil {
194+
return err
195+
}
196+
r.m[id] = ac
169197
}
170-
171-
r.m[id] = c
172198
r.lggr.Infow("capability added", "id", id, "type", info.CapabilityType, "description", info.Description, "version", info.Version())
173199
return nil
174200
}
@@ -185,3 +211,112 @@ func (r *baseRegistry) Remove(_ context.Context, id string) error {
185211
r.lggr.Infow("capability removed", "id", id)
186212
return nil
187213
}
214+
215+
var _ capabilities.TriggerCapability = &atomicTriggerCapability{}
216+
217+
type atomicTriggerCapability struct {
218+
atomic.Pointer[capabilities.TriggerCapability]
219+
}
220+
221+
func (a *atomicTriggerCapability) Update(c capabilities.BaseCapability) error {
222+
tc, ok := c.(capabilities.TriggerCapability)
223+
if !ok {
224+
return errors.New("trigger capability does not satisfy TriggerCapability interface")
225+
}
226+
a.Store(&tc)
227+
return nil
228+
}
229+
230+
func (a *atomicTriggerCapability) Info(ctx context.Context) (capabilities.CapabilityInfo, error) {
231+
return (*a.Load()).Info(ctx)
232+
}
233+
234+
func (a *atomicTriggerCapability) GetState() connectivity.State {
235+
return (*a.Load()).GetState()
236+
}
237+
238+
func (a *atomicTriggerCapability) RegisterTrigger(ctx context.Context, request capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) {
239+
return (*a.Load()).RegisterTrigger(ctx, request)
240+
}
241+
242+
func (a *atomicTriggerCapability) UnregisterTrigger(ctx context.Context, request capabilities.TriggerRegistrationRequest) error {
243+
return (*a.Load()).UnregisterTrigger(ctx, request)
244+
}
245+
246+
var _ capabilities.ExecutableCapability = &atomicExecuteCapability{}
247+
248+
type atomicExecuteCapability struct {
249+
atomic.Pointer[capabilities.ExecutableCapability]
250+
}
251+
252+
func (a *atomicExecuteCapability) Update(c capabilities.BaseCapability) error {
253+
tc, ok := c.(capabilities.ExecutableCapability)
254+
if !ok {
255+
return errors.New("action does not satisfy ExecutableCapability interface")
256+
}
257+
a.Store(&tc)
258+
return nil
259+
}
260+
261+
func (a *atomicExecuteCapability) Info(ctx context.Context) (capabilities.CapabilityInfo, error) {
262+
return (*a.Load()).Info(ctx)
263+
}
264+
265+
func (a *atomicExecuteCapability) GetState() connectivity.State {
266+
return (*a.Load()).GetState()
267+
}
268+
269+
func (a *atomicExecuteCapability) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error {
270+
return (*a.Load()).RegisterToWorkflow(ctx, request)
271+
}
272+
273+
func (a *atomicExecuteCapability) UnregisterFromWorkflow(ctx context.Context, request capabilities.UnregisterFromWorkflowRequest) error {
274+
return (*a.Load()).UnregisterFromWorkflow(ctx, request)
275+
}
276+
277+
func (a *atomicExecuteCapability) Execute(ctx context.Context, request capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) {
278+
return (*a.Load()).Execute(ctx, request)
279+
}
280+
281+
var _ capabilities.ExecutableAndTriggerCapability = &atomicExecuteAndTriggerCapability{}
282+
283+
type atomicExecuteAndTriggerCapability struct {
284+
atomic.Pointer[capabilities.ExecutableAndTriggerCapability]
285+
}
286+
287+
func (a *atomicExecuteAndTriggerCapability) Update(c capabilities.BaseCapability) error {
288+
tc, ok := c.(capabilities.ExecutableAndTriggerCapability)
289+
if !ok {
290+
return errors.New("target capability does not satisfy ExecutableAndTriggerCapability interface")
291+
}
292+
a.Store(&tc)
293+
return nil
294+
}
295+
296+
func (a *atomicExecuteAndTriggerCapability) Info(ctx context.Context) (capabilities.CapabilityInfo, error) {
297+
return (*a.Load()).Info(ctx)
298+
}
299+
300+
func (a *atomicExecuteAndTriggerCapability) GetState() connectivity.State {
301+
return (*a.Load()).GetState()
302+
}
303+
304+
func (a *atomicExecuteAndTriggerCapability) RegisterTrigger(ctx context.Context, request capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) {
305+
return (*a.Load()).RegisterTrigger(ctx, request)
306+
}
307+
308+
func (a *atomicExecuteAndTriggerCapability) UnregisterTrigger(ctx context.Context, request capabilities.TriggerRegistrationRequest) error {
309+
return (*a.Load()).UnregisterTrigger(ctx, request)
310+
}
311+
312+
func (a *atomicExecuteAndTriggerCapability) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error {
313+
return (*a.Load()).RegisterToWorkflow(ctx, request)
314+
}
315+
316+
func (a *atomicExecuteAndTriggerCapability) UnregisterFromWorkflow(ctx context.Context, request capabilities.UnregisterFromWorkflowRequest) error {
317+
return (*a.Load()).UnregisterFromWorkflow(ctx, request)
318+
}
319+
320+
func (a *atomicExecuteAndTriggerCapability) Execute(ctx context.Context, request capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) {
321+
return (*a.Load()).Execute(ctx, request)
322+
}

pkg/loop/internal/core/services/capability/capabilities.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"sync"
99

1010
"google.golang.org/grpc"
11+
"google.golang.org/grpc/connectivity"
1112
"google.golang.org/protobuf/types/known/emptypb"
1213

1314
"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
@@ -135,14 +136,19 @@ func InfoToReply(info capabilities.CapabilityInfo) *capabilitiespb.CapabilityInf
135136
}
136137

137138
type baseCapabilityClient struct {
139+
c *grpc.ClientConn
138140
grpc capabilitiespb.BaseCapabilityClient
139141
*net.BrokerExt
140142
}
141143

142144
var _ capabilities.BaseCapability = (*baseCapabilityClient)(nil)
143145

144146
func newBaseCapabilityClient(brokerExt *net.BrokerExt, conn *grpc.ClientConn) *baseCapabilityClient {
145-
return &baseCapabilityClient{grpc: capabilitiespb.NewBaseCapabilityClient(conn), BrokerExt: brokerExt}
147+
return &baseCapabilityClient{c: conn, grpc: capabilitiespb.NewBaseCapabilityClient(conn), BrokerExt: brokerExt}
148+
}
149+
150+
func (c *baseCapabilityClient) GetState() connectivity.State {
151+
return c.c.GetState()
146152
}
147153

148154
func (c *baseCapabilityClient) Info(ctx context.Context) (capabilities.CapabilityInfo, error) {

pkg/loop/internal/core/services/capability/capabilities_registry_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/stretchr/testify/mock"
1212
"github.com/stretchr/testify/require"
1313
"google.golang.org/grpc"
14+
"google.golang.org/grpc/connectivity"
1415

1516
p2ptypes "github.com/smartcontractkit/libocr/ragep2p/types"
1617

@@ -29,6 +30,10 @@ type mockBaseCapability struct {
2930
info capabilities.CapabilityInfo
3031
}
3132

33+
func (f *mockBaseCapability) GetState() connectivity.State {
34+
return connectivity.Idle
35+
}
36+
3237
func (f *mockBaseCapability) Info(ctx context.Context) (capabilities.CapabilityInfo, error) {
3338
return f.info, nil
3439
}

pkg/loop/internal/test/test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package test
33
import (
44
"context"
55

6+
"google.golang.org/grpc/connectivity"
7+
68
"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
79
"github.com/smartcontractkit/chainlink-common/pkg/services"
810

@@ -33,6 +35,10 @@ var _ capabilities.BaseCapability = (*baseCapability)(nil)
3335
type baseCapability struct {
3436
}
3537

38+
func (e baseCapability) GetState() connectivity.State {
39+
return connectivity.Idle
40+
}
41+
3642
func (e baseCapability) Info(ctx context.Context) (capabilities.CapabilityInfo, error) {
3743
return CapabilityInfo, nil
3844
}

0 commit comments

Comments
 (0)