Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 134 additions & 0 deletions testservice/flag_change_listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package main

import (
"context"
"sync"

"github.com/launchdarkly/go-sdk-common/v3/ldcontext"
"github.com/launchdarkly/go-sdk-common/v3/ldvalue"
"github.com/launchdarkly/go-server-sdk/v7/interfaces"
"github.com/launchdarkly/go-server-sdk/v7/testservice/servicedef"
)

// listenerEntry holds the cancellation handle for one registered listener goroutine.
type listenerEntry struct {
cancel context.CancelFunc
}

// listenerRegistry manages all active flag change listener registrations for a single
// SDK client entity. It is safe to use from multiple goroutines.
type listenerRegistry struct {
mu sync.Mutex
listeners map[string]*listenerEntry // keyed by listenerId
tracker interfaces.FlagTracker
}

func newListenerRegistry(tracker interfaces.FlagTracker) *listenerRegistry {
return &listenerRegistry{
listeners: make(map[string]*listenerEntry),
tracker: tracker,
}
}

// storeListener registers a new listener entry under listenerID, cancelling any
// previously registered listener with the same ID. Returns the new entry's context.
func (r *listenerRegistry) storeListener(listenerID string) context.Context {
ctx, cancel := context.WithCancel(context.Background())
r.mu.Lock()
if old, exists := r.listeners[listenerID]; exists {
old.cancel()
}
r.listeners[listenerID] = &listenerEntry{cancel: cancel}
r.mu.Unlock()
return ctx
}

// registerFlagChangeListener subscribes to general flag configuration changes.
// All flag change events are forwarded to the callback URI.
func (r *listenerRegistry) registerFlagChangeListener(listenerID, callbackURI string) {
ch := r.tracker.AddFlagChangeListener()
ctx := r.storeListener(listenerID)

svc := callbackService{baseURL: callbackURI}
go func() {
defer r.tracker.RemoveFlagChangeListener(ch)
for {
select {
case <-ctx.Done():
return
case event, ok := <-ch:
if !ok {
return
}
_ = svc.post("", servicedef.ListenerNotification{
ListenerID: listenerID,
FlagKey: event.Key,
}, nil)
}
}
}()
}

// registerFlagValueChangeListener subscribes to value changes for a specific flag and
// evaluation context. The callback is invoked only when the evaluated value actually
// changes; configuration changes that leave the value unchanged are suppressed by the SDK.
func (r *listenerRegistry) registerFlagValueChangeListener(
listenerID, flagKey string,
evalCtx ldcontext.Context,
defaultValue ldvalue.Value,
callbackURI string,
) {
ch := r.tracker.AddFlagValueChangeListener(flagKey, evalCtx, defaultValue)
ctx := r.storeListener(listenerID)

svc := callbackService{baseURL: callbackURI}
go func() {
defer r.tracker.RemoveFlagValueChangeListener(ch)
for {
select {
case <-ctx.Done():
return
case event, ok := <-ch:
if !ok {
return
}
oldVal := event.OldValue
newVal := event.NewValue
_ = svc.post("", servicedef.ListenerNotification{
ListenerID: listenerID,
FlagKey: event.Key,
OldValue: &oldVal,
NewValue: &newVal,
}, nil)
}
}
}()
}

// unregister stops the listener goroutine for the given ID and removes it from the
// registry. Returns false if no listener with that ID was found.
func (r *listenerRegistry) unregister(listenerID string) bool {
r.mu.Lock()
entry, ok := r.listeners[listenerID]
if ok {
delete(r.listeners, listenerID)
}
r.mu.Unlock()

if ok {
entry.cancel()
}
return ok
}

// closeAll stops all active listener goroutines. Called when the SDK client entity closes.
func (r *listenerRegistry) closeAll() {
r.mu.Lock()
listeners := r.listeners
r.listeners = make(map[string]*listenerEntry)
r.mu.Unlock()

for _, entry := range listeners {
entry.cancel()
}
}
21 changes: 19 additions & 2 deletions testservice/sdk_client_entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ import (
const defaultStartWaitTime = 5 * time.Second

type SDKClientEntity struct {
sdk *ld.LDClient
logger *log.Logger
sdk *ld.LDClient
logger *log.Logger
listeners *listenerRegistry
}

func NewSDKClientEntity(params servicedef.CreateInstanceParams) (*SDKClientEntity, error) {
Expand Down Expand Up @@ -71,11 +72,13 @@ func NewSDKClientEntity(params servicedef.CreateInstanceParams) (*SDKClientEntit
return nil, err
}
c.sdk = sdk
c.listeners = newListenerRegistry(sdk.GetFlagTracker())

return c, nil
}

func (c *SDKClientEntity) Close() {
c.listeners.closeAll()
_ = c.sdk.Close()
c.logger.Println("Test ended")
c.logger.SetOutput(io.Discard)
Expand Down Expand Up @@ -130,6 +133,20 @@ func (c *SDKClientEntity) DoCommand(params servicedef.CommandParams) (interface{
return servicedef.MigrationVariationResponse{Result: string(stage)}, nil
case servicedef.CommandMigrationOperation:
return c.migrationOperation(*params.MigrationOperation)
case servicedef.CommandRegisterFlagChangeListener:
p := params.RegisterFlagChangeListener
c.listeners.registerFlagChangeListener(p.ListenerID, p.CallbackURI)
return nil, nil
case servicedef.CommandRegisterFlagValueChangeListener:
p := params.RegisterFlagValueChangeListener
c.listeners.registerFlagValueChangeListener(p.ListenerID, p.FlagKey, p.Context, p.DefaultValue, p.CallbackURI)
return nil, nil
case servicedef.CommandUnregisterListener:
p := params.UnregisterListener
if !c.listeners.unregister(p.ListenerID) {
return nil, BadRequestError{Message: fmt.Sprintf("no listener with id %q", p.ListenerID)}
}
return nil, nil
default:
return nil, BadRequestError{Message: fmt.Sprintf("unknown command %q", params.Command)}
}
Expand Down
2 changes: 2 additions & 0 deletions testservice/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ var capabilities = []string{
servicedef.CapabilityPersistentDataStoreRedis,
servicedef.CapabilityPersistentDataStoreConsul,
servicedef.CapabilityPersistentDataStoreDynamoDB,
servicedef.CapabilityFlagChangeListeners,
servicedef.CapabilityFlagValueChangeListeners,
}

// gets the specified environment variable, or the default if not set
Expand Down
86 changes: 63 additions & 23 deletions testservice/servicedef/command_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,21 @@ import (
)

const (
CommandEvaluateFlag = "evaluate"
CommandEvaluateAllFlags = "evaluateAll"
CommandIdentifyEvent = "identifyEvent"
CommandCustomEvent = "customEvent"
CommandAliasEvent = "aliasEvent"
CommandFlushEvents = "flushEvents"
CommandGetBigSegmentStoreStatus = "getBigSegmentStoreStatus"
CommandContextBuild = "contextBuild"
CommandContextConvert = "contextConvert"
CommandSecureModeHash = "secureModeHash"
CommandMigrationVariation = "migrationVariation"
CommandMigrationOperation = "migrationOperation"
CommandEvaluateFlag = "evaluate"
CommandEvaluateAllFlags = "evaluateAll"
CommandIdentifyEvent = "identifyEvent"
CommandCustomEvent = "customEvent"
CommandAliasEvent = "aliasEvent"
CommandFlushEvents = "flushEvents"
CommandGetBigSegmentStoreStatus = "getBigSegmentStoreStatus"
CommandContextBuild = "contextBuild"
CommandContextConvert = "contextConvert"
CommandSecureModeHash = "secureModeHash"
CommandMigrationVariation = "migrationVariation"
CommandMigrationOperation = "migrationOperation"
CommandRegisterFlagChangeListener = "registerFlagChangeListener"
CommandRegisterFlagValueChangeListener = "registerFlagValueChangeListener"
CommandUnregisterListener = "unregisterListener"
)

type ValueType string
Expand All @@ -33,16 +36,19 @@ const (
)

type CommandParams struct {
Command string `json:"command"`
Evaluate *EvaluateFlagParams `json:"evaluate,omitempty"`
EvaluateAll *EvaluateAllFlagsParams `json:"evaluateAll,omitempty"`
CustomEvent *CustomEventParams `json:"customEvent,omitempty"`
IdentifyEvent *IdentifyEventParams `json:"identifyEvent,omitempty"`
ContextBuild *ContextBuildParams `json:"contextBuild,omitempty"`
ContextConvert *ContextConvertParams `json:"contextConvert,omitempty"`
SecureModeHash *SecureModeHashParams `json:"secureModeHash,omitempty"`
MigrationVariation *MigrationVariationParams `json:"migrationVariation,omitempty"`
MigrationOperation *MigrationOperationParams `json:"migrationOperation,omitempty"`
Command string `json:"command"`
Evaluate *EvaluateFlagParams `json:"evaluate,omitempty"`
EvaluateAll *EvaluateAllFlagsParams `json:"evaluateAll,omitempty"`
CustomEvent *CustomEventParams `json:"customEvent,omitempty"`
IdentifyEvent *IdentifyEventParams `json:"identifyEvent,omitempty"`
ContextBuild *ContextBuildParams `json:"contextBuild,omitempty"`
ContextConvert *ContextConvertParams `json:"contextConvert,omitempty"`
SecureModeHash *SecureModeHashParams `json:"secureModeHash,omitempty"`
MigrationVariation *MigrationVariationParams `json:"migrationVariation,omitempty"`
MigrationOperation *MigrationOperationParams `json:"migrationOperation,omitempty"`
RegisterFlagChangeListener *RegisterFlagChangeListenerParams `json:"registerFlagChangeListener,omitempty"` //nolint:lll
RegisterFlagValueChangeListener *RegisterFlagValueChangeListenerParams `json:"registerFlagValueChangeListener,omitempty"` //nolint:lll
UnregisterListener *UnregisterListenerParams `json:"unregisterListener,omitempty"`
}

type EvaluateFlagParams struct {
Expand Down Expand Up @@ -180,5 +186,39 @@ type HookExecutionEvaluationPayload struct {

type HookExecutionTrackPayload struct {
TrackSeriesContext TrackSeriesContext `json:"trackSeriesContext,omitempty"`
Stage HookStage `json:"stage,omitempty"`
Stage HookStage `json:"stage,omitempty"`
}

// RegisterFlagChangeListenerParams defines parameters for registering a general flag change listener.
// The listener will be notified whenever any flag's configuration changes.
type RegisterFlagChangeListenerParams struct {
ListenerID string `json:"listenerId"`
CallbackURI string `json:"callbackUri"`
}

// RegisterFlagValueChangeListenerParams defines parameters for registering a flag value change listener.
// The listener fires when the evaluated value of FlagKey changes for the given Context.
type RegisterFlagValueChangeListenerParams struct {
ListenerID string `json:"listenerId"`
FlagKey string `json:"flagKey"`
Context ldcontext.Context `json:"context"`
DefaultValue ldvalue.Value `json:"defaultValue"`
CallbackURI string `json:"callbackUri"`
}

// UnregisterListenerParams defines parameters for unregistering a previously registered listener.
// Works for both flag change and flag value change listeners.
type UnregisterListenerParams struct {
ListenerID string `json:"listenerId"`
}

// ListenerNotification is the JSON payload POSTed by the test service to a callback URI when a
// listener fires. OldValue and NewValue are only present for value-change notifications
// (registerFlagValueChangeListener); they are nil for general flag-change notifications
// (registerFlagChangeListener).
type ListenerNotification struct {
ListenerID string `json:"listenerId"`
FlagKey string `json:"flagKey"`
OldValue *ldvalue.Value `json:"oldValue,omitempty"`
NewValue *ldvalue.Value `json:"newValue,omitempty"`
}
2 changes: 2 additions & 0 deletions testservice/servicedef/service_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ const (
CapabilityPersistentDataStoreRedis = "persistent-data-store-redis"
CapabilityPersistentDataStoreConsul = "persistent-data-store-consul"
CapabilityPersistentDataStoreDynamoDB = "persistent-data-store-dynamodb"
CapabilityFlagChangeListeners = "flag-change-listeners"
CapabilityFlagValueChangeListeners = "flag-value-change-listeners"
)

type StatusRep struct {
Expand Down