Skip to content

Commit 524d565

Browse files
committed
pkg/capabilities: support replacing registered capabilities after shutdown
1 parent 951aaee commit 524d565

File tree

10 files changed

+297
-44
lines changed

10 files changed

+297
-44
lines changed

pkg/capabilities/registry/base.go

Lines changed: 242 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,11 @@ import (
66
"fmt"
77
"strings"
88
"sync"
9+
"sync/atomic"
910

1011
"github.com/Masterminds/semver/v3"
12+
"google.golang.org/grpc"
13+
"google.golang.org/grpc/connectivity"
1114

1215
"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
1316
"github.com/smartcontractkit/chainlink-common/pkg/logger"
@@ -18,8 +21,22 @@ var (
1821
ErrCapabilityAlreadyExists = errors.New("capability already exists")
1922
)
2023

24+
// atomicBaseCapability extends [capabilities.BaseCapability] to support atomic updates and forward client state checks.
25+
type atomicBaseCapability interface {
26+
capabilities.BaseCapability
27+
Update(capabilities.BaseCapability) error
28+
StateGetter
29+
}
30+
31+
var _ StateGetter = (*grpc.ClientConn)(nil)
32+
33+
// StateGetter is implemented by GRPC client connections.
34+
type StateGetter interface {
35+
GetState() connectivity.State
36+
}
37+
2138
type baseRegistry struct {
22-
m map[string]capabilities.BaseCapability
39+
m map[string]atomicBaseCapability
2340
lggr logger.Logger
2441
mu sync.RWMutex
2542
}
@@ -28,7 +45,7 @@ var _ core.CapabilitiesRegistryBase = (*baseRegistry)(nil)
2845

2946
func NewBaseRegistry(lggr logger.Logger) core.CapabilitiesRegistryBase {
3047
return &baseRegistry{
31-
m: map[string]capabilities.BaseCapability{},
48+
m: map[string]atomicBaseCapability{},
3249
lggr: logger.Named(lggr, "registries.basic"),
3350
}
3451
}
@@ -142,46 +159,243 @@ func (r *baseRegistry) Add(ctx context.Context, c capabilities.BaseCapability) e
142159
return err
143160
}
144161

145-
switch info.CapabilityType {
146-
case capabilities.CapabilityTypeTrigger:
147-
_, ok := c.(capabilities.TriggerCapability)
148-
if !ok {
149-
return errors.New("trigger capability does not satisfy TriggerCapability interface")
162+
id := info.ID
163+
bc, ok := r.m[id]
164+
if ok {
165+
switch state := bc.GetState(); state {
166+
case connectivity.Shutdown, connectivity.TransientFailure, connectivity.Idle:
167+
// allow replace
168+
default:
169+
return fmt.Errorf("%w: id %s found in registry: state %s", ErrCapabilityAlreadyExists, id, state)
150170
}
151-
case capabilities.CapabilityTypeAction, capabilities.CapabilityTypeConsensus, capabilities.CapabilityTypeTarget:
152-
_, ok := c.(capabilities.ExecutableCapability)
153-
if !ok {
154-
return errors.New("action does not satisfy ExecutableCapability interface")
171+
if err := bc.Update(c); err != nil {
172+
return fmt.Errorf("failed to update capability %s: %w", id, err)
155173
}
156-
case capabilities.CapabilityTypeCombined:
157-
_, ok := c.(capabilities.ExecutableAndTriggerCapability)
158-
if !ok {
159-
return errors.New("target capability does not satisfy ExecutableAndTriggerCapability interface")
174+
} else {
175+
var ac atomicBaseCapability
176+
switch info.CapabilityType {
177+
case capabilities.CapabilityTypeTrigger:
178+
ac = &atomicTriggerCapability{}
179+
case capabilities.CapabilityTypeAction, capabilities.CapabilityTypeConsensus, capabilities.CapabilityTypeTarget:
180+
ac = &atomicExecuteCapability{}
181+
case capabilities.CapabilityTypeCombined:
182+
ac = &atomicExecuteAndTriggerCapability{}
183+
default:
184+
return fmt.Errorf("unknown capability type: %s", info.CapabilityType)
160185
}
161-
default:
162-
return fmt.Errorf("unknown capability type: %s", info.CapabilityType)
163-
}
164-
165-
id := info.ID
166-
_, ok := r.m[id]
167-
if ok {
168-
return fmt.Errorf("%w: id %s found in registry", ErrCapabilityAlreadyExists, id)
186+
if err := ac.Update(c); err != nil {
187+
return err
188+
}
189+
r.m[id] = ac
169190
}
170-
171-
r.m[id] = c
172191
r.lggr.Infow("capability added", "id", id, "type", info.CapabilityType, "description", info.Description, "version", info.Version())
173192
return nil
174193
}
175194

176195
func (r *baseRegistry) Remove(_ context.Context, id string) error {
177196
r.mu.Lock()
178197
defer r.mu.Unlock()
179-
_, ok := r.m[id]
198+
ac, ok := r.m[id]
180199
if !ok {
181200
return fmt.Errorf("unable to remove, capability not found: %s", id)
182201
}
183-
184-
delete(r.m, id)
202+
if err := ac.Update(nil); err != nil {
203+
return fmt.Errorf("failed to remove capability %s: %w", id, err)
204+
}
185205
r.lggr.Infow("capability removed", "id", id)
186206
return nil
187207
}
208+
209+
var _ capabilities.TriggerCapability = &atomicTriggerCapability{}
210+
211+
type atomicTriggerCapability struct {
212+
atomic.Pointer[capabilities.TriggerCapability]
213+
}
214+
215+
func (a *atomicTriggerCapability) Update(c capabilities.BaseCapability) error {
216+
if c == nil {
217+
a.Store(nil)
218+
return nil
219+
}
220+
tc, ok := c.(capabilities.TriggerCapability)
221+
if !ok {
222+
return errors.New("trigger capability does not satisfy TriggerCapability interface")
223+
}
224+
a.Store(&tc)
225+
return nil
226+
}
227+
228+
func (a *atomicTriggerCapability) Info(ctx context.Context) (capabilities.CapabilityInfo, error) {
229+
c := a.Load()
230+
if c == nil {
231+
return capabilities.CapabilityInfo{}, errors.New("capability unavailable")
232+
}
233+
return (*c).Info(ctx)
234+
}
235+
236+
func (a *atomicTriggerCapability) GetState() connectivity.State {
237+
c := a.Load()
238+
if c == nil {
239+
return connectivity.Shutdown
240+
}
241+
if sg, ok := (*c).(StateGetter); ok {
242+
return sg.GetState()
243+
}
244+
return connectivity.State(-1) // unknown
245+
}
246+
247+
func (a *atomicTriggerCapability) RegisterTrigger(ctx context.Context, request capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) {
248+
c := a.Load()
249+
if c == nil {
250+
return nil, errors.New("capability unavailable")
251+
}
252+
return (*c).RegisterTrigger(ctx, request)
253+
}
254+
255+
func (a *atomicTriggerCapability) UnregisterTrigger(ctx context.Context, request capabilities.TriggerRegistrationRequest) error {
256+
c := a.Load()
257+
if c == nil {
258+
return errors.New("capability unavailable")
259+
}
260+
return (*c).UnregisterTrigger(ctx, request)
261+
}
262+
263+
var _ capabilities.ExecutableCapability = &atomicExecuteCapability{}
264+
265+
type atomicExecuteCapability struct {
266+
atomic.Pointer[capabilities.ExecutableCapability]
267+
}
268+
269+
func (a *atomicExecuteCapability) Update(c capabilities.BaseCapability) error {
270+
if c == nil {
271+
a.Store(nil)
272+
return nil
273+
}
274+
tc, ok := c.(capabilities.ExecutableCapability)
275+
if !ok {
276+
return errors.New("action does not satisfy ExecutableCapability interface")
277+
}
278+
a.Store(&tc)
279+
return nil
280+
}
281+
282+
func (a *atomicExecuteCapability) Info(ctx context.Context) (capabilities.CapabilityInfo, error) {
283+
c := a.Load()
284+
if c == nil {
285+
return capabilities.CapabilityInfo{}, errors.New("capability unavailable")
286+
}
287+
return (*c).Info(ctx)
288+
}
289+
290+
func (a *atomicExecuteCapability) GetState() connectivity.State {
291+
c := a.Load()
292+
if c == nil {
293+
return connectivity.Shutdown
294+
}
295+
if sg, ok := (*c).(StateGetter); ok {
296+
return sg.GetState()
297+
}
298+
return connectivity.State(-1) // unknown
299+
}
300+
301+
func (a *atomicExecuteCapability) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error {
302+
c := a.Load()
303+
if c == nil {
304+
return errors.New("capability unavailable")
305+
}
306+
return (*c).RegisterToWorkflow(ctx, request)
307+
}
308+
309+
func (a *atomicExecuteCapability) UnregisterFromWorkflow(ctx context.Context, request capabilities.UnregisterFromWorkflowRequest) error {
310+
c := a.Load()
311+
if c == nil {
312+
return errors.New("capability unavailable")
313+
}
314+
return (*c).UnregisterFromWorkflow(ctx, request)
315+
}
316+
317+
func (a *atomicExecuteCapability) Execute(ctx context.Context, request capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) {
318+
c := a.Load()
319+
if c == nil {
320+
return capabilities.CapabilityResponse{}, errors.New("capability unavailable")
321+
}
322+
return (*c).Execute(ctx, request)
323+
}
324+
325+
var _ capabilities.ExecutableAndTriggerCapability = &atomicExecuteAndTriggerCapability{}
326+
327+
type atomicExecuteAndTriggerCapability struct {
328+
atomic.Pointer[capabilities.ExecutableAndTriggerCapability]
329+
}
330+
331+
func (a *atomicExecuteAndTriggerCapability) Update(c capabilities.BaseCapability) error {
332+
if c == nil {
333+
a.Store(nil)
334+
return nil
335+
}
336+
tc, ok := c.(capabilities.ExecutableAndTriggerCapability)
337+
if !ok {
338+
return errors.New("target capability does not satisfy ExecutableAndTriggerCapability interface")
339+
}
340+
a.Store(&tc)
341+
return nil
342+
}
343+
344+
func (a *atomicExecuteAndTriggerCapability) Info(ctx context.Context) (capabilities.CapabilityInfo, error) {
345+
c := a.Load()
346+
if c == nil {
347+
return capabilities.CapabilityInfo{}, errors.New("capability unavailable")
348+
}
349+
return (*c).Info(ctx)
350+
}
351+
352+
func (a *atomicExecuteAndTriggerCapability) GetState() connectivity.State {
353+
c := a.Load()
354+
if c == nil {
355+
return connectivity.Shutdown
356+
}
357+
if sg, ok := (*c).(StateGetter); ok {
358+
return sg.GetState()
359+
}
360+
return connectivity.State(-1) // unknown
361+
}
362+
363+
func (a *atomicExecuteAndTriggerCapability) RegisterTrigger(ctx context.Context, request capabilities.TriggerRegistrationRequest) (<-chan capabilities.TriggerResponse, error) {
364+
c := a.Load()
365+
if c == nil {
366+
return nil, errors.New("capability unavailable")
367+
}
368+
return (*c).RegisterTrigger(ctx, request)
369+
}
370+
371+
func (a *atomicExecuteAndTriggerCapability) UnregisterTrigger(ctx context.Context, request capabilities.TriggerRegistrationRequest) error {
372+
c := a.Load()
373+
if c == nil {
374+
return errors.New("capability unavailable")
375+
}
376+
return (*c).UnregisterTrigger(ctx, request)
377+
}
378+
379+
func (a *atomicExecuteAndTriggerCapability) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error {
380+
c := a.Load()
381+
if c == nil {
382+
return errors.New("capability unavailable")
383+
}
384+
return (*c).RegisterToWorkflow(ctx, request)
385+
}
386+
387+
func (a *atomicExecuteAndTriggerCapability) UnregisterFromWorkflow(ctx context.Context, request capabilities.UnregisterFromWorkflowRequest) error {
388+
c := a.Load()
389+
if c == nil {
390+
return errors.New("capability unavailable")
391+
}
392+
return (*c).UnregisterFromWorkflow(ctx, request)
393+
}
394+
395+
func (a *atomicExecuteAndTriggerCapability) Execute(ctx context.Context, request capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) {
396+
c := a.Load()
397+
if c == nil {
398+
return capabilities.CapabilityResponse{}, errors.New("capability unavailable")
399+
}
400+
return (*c).Execute(ctx, request)
401+
}

pkg/capabilities/registry/base_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,17 @@ func TestRegistry(t *testing.T) {
5050

5151
gc, err := r.Get(ctx, id)
5252
require.NoError(t, err)
53+
info, err := gc.Info(t.Context())
54+
require.NoError(t, err)
5355

54-
assert.Equal(t, c, gc)
56+
assert.Equal(t, c.CapabilityInfo, info)
5557

5658
cs, err := r.List(ctx)
5759
require.NoError(t, err)
5860
assert.Len(t, cs, 1)
59-
assert.Equal(t, c, cs[0])
61+
info, err = cs[0].Info(t.Context())
62+
require.NoError(t, err)
63+
assert.Equal(t, c.CapabilityInfo, info)
6064
}
6165

6266
func TestRegistryCompatibleVersions(t *testing.T) {

pkg/loop/internal/core/services/capability/capabilities.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"sync"
99

1010
"google.golang.org/grpc"
11+
"google.golang.org/grpc/connectivity"
1112
"google.golang.org/protobuf/types/known/emptypb"
1213

1314
"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
@@ -22,7 +23,7 @@ type TriggerCapabilityClient struct {
2223
*baseCapabilityClient
2324
}
2425

25-
func NewTriggerCapabilityClient(brokerExt *net.BrokerExt, conn grpc.ClientConnInterface) capabilities.TriggerCapability {
26+
func NewTriggerCapabilityClient(brokerExt *net.BrokerExt, conn net.ClientConnInterface) capabilities.TriggerCapability {
2627
return &TriggerCapabilityClient{
2728
triggerExecutableClient: newTriggerExecutableClient(brokerExt, conn),
2829
baseCapabilityClient: newBaseCapabilityClient(brokerExt, conn),
@@ -39,7 +40,7 @@ type ExecutableCapability interface {
3940
capabilities.BaseCapability
4041
}
4142

42-
func NewExecutableCapabilityClient(brokerExt *net.BrokerExt, conn grpc.ClientConnInterface) ExecutableCapability {
43+
func NewExecutableCapabilityClient(brokerExt *net.BrokerExt, conn net.ClientConnInterface) ExecutableCapability {
4344
return &ExecutableCapabilityClient{
4445
executableClient: newExecutableClient(brokerExt, conn),
4546
baseCapabilityClient: newBaseCapabilityClient(brokerExt, conn),
@@ -52,7 +53,7 @@ type CombinedCapabilityClient struct {
5253
*triggerExecutableClient
5354
}
5455

55-
func NewCombinedCapabilityClient(brokerExt *net.BrokerExt, conn grpc.ClientConnInterface) ExecutableCapability {
56+
func NewCombinedCapabilityClient(brokerExt *net.BrokerExt, conn net.ClientConnInterface) ExecutableCapability {
5657
return &CombinedCapabilityClient{
5758
executableClient: newExecutableClient(brokerExt, conn),
5859
baseCapabilityClient: newBaseCapabilityClient(brokerExt, conn),
@@ -135,14 +136,18 @@ func InfoToReply(info capabilities.CapabilityInfo) *capabilitiespb.CapabilityInf
135136
}
136137

137138
type baseCapabilityClient struct {
139+
c net.ClientConnInterface
138140
grpc capabilitiespb.BaseCapabilityClient
139141
*net.BrokerExt
140142
}
141143

142144
var _ capabilities.BaseCapability = (*baseCapabilityClient)(nil)
143145

144-
func newBaseCapabilityClient(brokerExt *net.BrokerExt, conn grpc.ClientConnInterface) *baseCapabilityClient {
145-
return &baseCapabilityClient{grpc: capabilitiespb.NewBaseCapabilityClient(conn), BrokerExt: brokerExt}
146+
func newBaseCapabilityClient(brokerExt *net.BrokerExt, conn net.ClientConnInterface) *baseCapabilityClient {
147+
return &baseCapabilityClient{c: conn, grpc: capabilitiespb.NewBaseCapabilityClient(conn), BrokerExt: brokerExt}
148+
}
149+
func (c *baseCapabilityClient) GetState() connectivity.State {
150+
return c.c.GetState()
146151
}
147152

148153
func (c *baseCapabilityClient) Info(ctx context.Context) (capabilities.CapabilityInfo, error) {

0 commit comments

Comments
 (0)