diff --git a/client/plugin.go b/client/plugin.go new file mode 100644 index 000000000..14c5bae35 --- /dev/null +++ b/client/plugin.go @@ -0,0 +1,32 @@ +package client + +import "go.temporal.io/sdk/internal" + +// Plugin is a plugin that can configure client options and surround client +// creation/connection. Many plugin implementers may prefer the simpler +// [go.temporal.io/sdk/temporal.SimplePlugin] instead. +// +// All client plugins must embed [go.temporal.io/sdk/client.PluginBase]. All +// plugins must implement Name(). +// +// All client plugins that also implement [go.temporal.io/sdk/worker.Plugin] are +// automatically configured on workers made from the client. +// +// NOTE: Experimental +type Plugin = internal.ClientPlugin + +// PluginBase must be embedded into client plugin implementations. +// +// NOTE: Experimental +type PluginBase = internal.ClientPluginBase + +// PluginConfigureClientOptions are options for ConfigureClient on a +// client plugin. +// +// NOTE: Experimental +type PluginConfigureClientOptions = internal.ClientPluginConfigureClientOptions + +// PluginNewClientOptions are options for NewClient on a client plugin. +// +// NOTE: Experimental +type PluginNewClientOptions = internal.ClientPluginNewClientOptions diff --git a/internal/client.go b/internal/client.go index 33c1e2d54..734b780d5 100644 --- a/internal/client.go +++ b/internal/client.go @@ -522,6 +522,17 @@ type ( // If set true, error code labels will not be included on request failure metrics. DisableErrorCodeMetricTags bool + + // Plugins that can configure options and intercept client creation. + // + // Any plugins here that also implement worker.Plugin will be used as + // worker plugins as well. + // + // Plugins themselves should never mutate this field, the behavior is + // undefined. + // + // NOTE: Experimental + Plugins []ClientPlugin } // HeadersProvider returns a map of gRPC headers that should be used on every request. @@ -963,14 +974,17 @@ func NewClient(ctx context.Context, options ClientOptions) (Client, error) { // // Exposed as: [go.temporal.io/sdk/client.NewClientFromExistingWithContext] func NewClientFromExisting(ctx context.Context, existingClient Client, options ClientOptions) (Client, error) { - existing, _ := existingClient.(*WorkflowClient) - if existing == nil { - return nil, fmt.Errorf("existing client must have been created directly from a client package call") - } - return newClient(ctx, options, existing) + return newClient(ctx, options, existingClient) } -func newClient(ctx context.Context, options ClientOptions, existing *WorkflowClient) (Client, error) { +func newClient(ctx context.Context, options ClientOptions, existing Client) (Client, error) { + // Go over all plugins allowing them to configure the options + for _, plugin := range options.Plugins { + if err := plugin.ConfigureClient(ctx, ClientPluginConfigureClientOptions{ClientOptions: &options}); err != nil { + return nil, err + } + } + if options.Namespace == "" { options.Namespace = DefaultNamespace } @@ -996,20 +1010,51 @@ func newClient(ctx context.Context, options ClientOptions, existing *WorkflowCli } } + // Go over each plugin in reverse, allowing it to wrap connect + var client Client + connect := func(ctx context.Context, options ClientPluginNewClientOptions) (err error) { + client, err = newClientPluginRoot(ctx, options) + return + } + for i := len(options.Plugins) - 1; i >= 0; i-- { + plugin := options.Plugins[i] + next := connect + connect = func(ctx context.Context, options ClientPluginNewClientOptions) error { + return plugin.NewClient(ctx, options, next) + } + } + // Invoke and confirm client was created + if err := connect(ctx, ClientPluginNewClientOptions{ + ClientOptions: options, + Lazy: options.ConnectionOptions.disableEagerConnection, + FromExisting: existing, + }); err != nil { + return nil, err + } else if client == nil { + return nil, fmt.Errorf("client plugin did not call next to build the client") + } + return client, nil +} + +func newClientPluginRoot(ctx context.Context, options ClientPluginNewClientOptions) (Client, error) { // Dial or use existing connection var connection *grpc.ClientConn var err error - if existing == nil { - options.ConnectionOptions.excludeInternalFromRetry = &atomic.Bool{} - connection, err = dial(newDialParameters(&options, options.ConnectionOptions.excludeInternalFromRetry)) + var existing *WorkflowClient + if options.FromExisting == nil { + options.ClientOptions.ConnectionOptions.excludeInternalFromRetry = &atomic.Bool{} + connection, err = dial(newDialParameters( + &options.ClientOptions, options.ClientOptions.ConnectionOptions.excludeInternalFromRetry)) if err != nil { return nil, err } - } else { + } else if existing, _ = options.FromExisting.(*WorkflowClient); existing != nil { connection = existing.conn + } else if options.FromExisting != nil { + return nil, fmt.Errorf("existing client must have been created directly from a client package call") } - client := NewServiceClient(workflowservice.NewWorkflowServiceClient(connection), connection, options) + client := NewServiceClient(workflowservice.NewWorkflowServiceClient(connection), connection, options.ClientOptions) // If using existing connection, always load its capabilities and use them for // the new connection. Otherwise, only load server capabilities eagerly if not @@ -1020,7 +1065,7 @@ func newClient(ctx context.Context, options ClientOptions, existing *WorkflowCli } client.unclosedClients = existing.unclosedClients } else { - if !options.ConnectionOptions.disableEagerConnection { + if !options.ClientOptions.ConnectionOptions.disableEagerConnection { if _, err := client.loadCapabilities(ctx); err != nil { client.Close() return nil, err @@ -1074,7 +1119,13 @@ func NewServiceClient(workflowServiceClient workflowservice.WorkflowServiceClien options.ConnectionOptions.GetSystemInfoTimeout = defaultGetSystemInfoTimeout } - // Collect set of applicable worker interceptors + // Collect set of applicable worker plugins and interceptors + var workerPlugins []WorkerPlugin + for _, plugin := range options.Plugins { + if workerPlugin, _ := plugin.(WorkerPlugin); workerPlugin != nil { + workerPlugins = append(workerPlugins, workerPlugin) + } + } var workerInterceptors []WorkerInterceptor for _, interceptor := range options.Interceptors { if workerInterceptor, _ := interceptor.(WorkerInterceptor); workerInterceptor != nil { @@ -1093,6 +1144,7 @@ func NewServiceClient(workflowServiceClient workflowservice.WorkflowServiceClien dataConverter: options.DataConverter, failureConverter: options.FailureConverter, contextPropagators: options.ContextPropagators, + workerPlugins: workerPlugins, workerInterceptors: workerInterceptors, excludeInternalFromRetry: options.ConnectionOptions.excludeInternalFromRetry, eagerDispatcher: &eagerWorkflowDispatcher{ diff --git a/internal/cmd/tools/doclink/doclink.go b/internal/cmd/tools/doclink/doclink.go index 8e48f9332..ea57440b3 100644 --- a/internal/cmd/tools/doclink/doclink.go +++ b/internal/cmd/tools/doclink/doclink.go @@ -545,6 +545,12 @@ func isValidDefinition(line string, inGroup *string, insideStruct *bool) bool { // Checks if `line` is a valid definition, and that definition is for `private` func isValidDefinitionWithMatch(line, private string, inGroup string, insideStruct bool) bool { + // Vars with underscores are often used to assert interface validation and + // do not require docs + if strings.HasPrefix(line, "var _") { + return false + } + tokens := strings.Fields(line) if strings.HasPrefix(line, "func "+private+"(") { return true diff --git a/internal/internal_worker.go b/internal/internal_worker.go index 905110ec9..7b45787c1 100644 --- a/internal/internal_worker.go +++ b/internal/internal_worker.go @@ -1171,6 +1171,10 @@ type AggregatedWorker struct { fatalErr error fatalErrLock sync.Mutex capabilities *workflowservice.GetSystemInfoResponse_Capabilities + + workerInstanceKey string + plugins []WorkerPlugin + pluginRegistryOptions *WorkerPluginConfigureWorkerRegistryOptions // Never nil } // RegisterWorkflow registers workflow implementation with the AggregatedWorker @@ -1183,6 +1187,9 @@ func (aw *AggregatedWorker) RegisterWorkflow(w interface{}) { aw.executionParams.DeploymentOptions.DefaultVersioningBehavior == VersioningBehaviorUnspecified { panic("workflow type does not have a versioning behavior") } + if aw.pluginRegistryOptions.OnRegisterWorkflow != nil { + aw.pluginRegistryOptions.OnRegisterWorkflow(w, RegisterWorkflowOptions{}) + } aw.registry.RegisterWorkflow(w) } @@ -1197,6 +1204,9 @@ func (aw *AggregatedWorker) RegisterWorkflowWithOptions(w interface{}, options R aw.executionParams.DeploymentOptions.DefaultVersioningBehavior == VersioningBehaviorUnspecified { panic("workflow type does not have a versioning behavior") } + if aw.pluginRegistryOptions.OnRegisterWorkflow != nil { + aw.pluginRegistryOptions.OnRegisterWorkflow(w, options) + } aw.registry.RegisterWorkflowWithOptions(w, options) } @@ -1210,22 +1220,34 @@ func (aw *AggregatedWorker) RegisterDynamicWorkflow(w interface{}, options Dynam aw.executionParams.DeploymentOptions.DefaultVersioningBehavior == VersioningBehaviorUnspecified { panic("dynamic workflow does not have a versioning behavior") } + if aw.pluginRegistryOptions.OnRegisterDynamicWorkflow != nil { + aw.pluginRegistryOptions.OnRegisterDynamicWorkflow(w, DynamicRegisterWorkflowOptions{}) + } aw.registry.RegisterDynamicWorkflow(w, options) } // RegisterActivity registers activity implementation with the AggregatedWorker func (aw *AggregatedWorker) RegisterActivity(a interface{}) { + if aw.pluginRegistryOptions.OnRegisterActivity != nil { + aw.pluginRegistryOptions.OnRegisterActivity(a, RegisterActivityOptions{}) + } aw.registry.RegisterActivity(a) } // RegisterActivityWithOptions registers activity implementation with the AggregatedWorker func (aw *AggregatedWorker) RegisterActivityWithOptions(a interface{}, options RegisterActivityOptions) { + if aw.pluginRegistryOptions.OnRegisterActivity != nil { + aw.pluginRegistryOptions.OnRegisterActivity(a, options) + } aw.registry.RegisterActivityWithOptions(a, options) } // RegisterDynamicActivity registers the dynamic activity function with options. // Registering activities via a structure is not supported for dynamic activities. func (aw *AggregatedWorker) RegisterDynamicActivity(a interface{}, options DynamicRegisterActivityOptions) { + if aw.pluginRegistryOptions.OnRegisterActivity != nil { + aw.pluginRegistryOptions.OnRegisterDynamicActivity(a, options) + } aw.registry.RegisterDynamicActivity(a, options) } @@ -1233,6 +1255,9 @@ func (aw *AggregatedWorker) RegisterNexusService(service *nexus.Service) { if aw.started.Load() { panic(errors.New("cannot register Nexus services after worker start")) } + if aw.pluginRegistryOptions.OnRegisterNexusService != nil { + aw.pluginRegistryOptions.OnRegisterNexusService(service) + } aw.registry.RegisterNexusService(service) } @@ -1414,35 +1439,51 @@ func (aw *AggregatedWorker) Stop() { close(aw.stopC) } - if !util.IsInterfaceNil(aw.workflowWorker) { - if aw.client.eagerDispatcher != nil { - aw.client.eagerDispatcher.deregisterWorker(aw.workflowWorker) + // Issue stop through plugins + stop := func(context.Context, WorkerPluginStopWorkerOptions) { + if !util.IsInterfaceNil(aw.workflowWorker) { + if aw.client.eagerDispatcher != nil { + aw.client.eagerDispatcher.deregisterWorker(aw.workflowWorker) + } + aw.workflowWorker.Stop() + } + if !util.IsInterfaceNil(aw.activityWorker) { + aw.activityWorker.Stop() + } + if !util.IsInterfaceNil(aw.sessionWorker) { + aw.sessionWorker.Stop() + } + if !util.IsInterfaceNil(aw.nexusWorker) { + aw.nexusWorker.Stop() } - aw.workflowWorker.Stop() - } - if !util.IsInterfaceNil(aw.activityWorker) { - aw.activityWorker.Stop() - } - if !util.IsInterfaceNil(aw.sessionWorker) { - aw.sessionWorker.Stop() } - if !util.IsInterfaceNil(aw.nexusWorker) { - aw.nexusWorker.Stop() + for i := len(aw.plugins) - 1; i >= 0; i-- { + plugin := aw.plugins[i] + next := stop + stop = func(ctx context.Context, options WorkerPluginStopWorkerOptions) { + plugin.StopWorker(ctx, options, next) + } } + stop(context.Background(), WorkerPluginStopWorkerOptions{ + WorkerInstanceKey: aw.workerInstanceKey, + }) aw.logger.Info("Stopped Worker") } // WorkflowReplayer is used to replay workflow code from an event history type WorkflowReplayer struct { - registry *registry - dataConverter converter.DataConverter - failureConverter converter.FailureConverter - contextPropagators []ContextPropagator - enableLoggingInReplay bool - disableDeadlockDetection bool - mu sync.Mutex - workflowExecutionResults map[string]*commonpb.Payloads + registry *registry + dataConverter converter.DataConverter + failureConverter converter.FailureConverter + contextPropagators []ContextPropagator + enableLoggingInReplay bool + disableDeadlockDetection bool + mu sync.Mutex + workflowExecutionResults map[string]*commonpb.Payloads + workflowReplayerInstanceKey string + plugins []WorkerPlugin + pluginRegistryOptions *WorkerPluginConfigureWorkflowReplayerRegistryOptions } // WorkflowReplayerOptions are options for creating a workflow replayer. @@ -1478,6 +1519,14 @@ type WorkflowReplayerOptions struct { // Optional: Disable the default 1 second deadlock detection timeout. This option can be used to step through // workflow code with multiple breakpoints in a debugger. DisableDeadlockDetection bool + + // Plugins that can configure options and intercept replays. + // + // Plugins themselves should never mutate this field, the behavior is + // undefined. + // + // NOTE: Experimental + Plugins []WorkerPlugin } // ReplayWorkflowHistoryOptions are options for replaying a workflow. @@ -1489,31 +1538,56 @@ type ReplayWorkflowHistoryOptions struct { // NewWorkflowReplayer creates an instance of the WorkflowReplayer. func NewWorkflowReplayer(options WorkflowReplayerOptions) (*WorkflowReplayer, error) { + // Configure replayer + workflowReplayerInstanceKey := uuid.NewString() + var pluginRegistryOptions WorkerPluginConfigureWorkflowReplayerRegistryOptions + for _, plugin := range options.Plugins { + if err := plugin.ConfigureWorkflowReplayer(context.Background(), WorkerPluginConfigureWorkflowReplayerOptions{ + WorkflowReplayerInstanceKey: workflowReplayerInstanceKey, + WorkflowReplayerOptions: &options, + WorkflowReplayerRegistryOptions: &pluginRegistryOptions, + }); err != nil { + return nil, err + } + } + registry := newRegistryWithOptions(registryOptions{disableAliasing: options.DisableRegistrationAliasing}) registry.interceptors = options.Interceptors return &WorkflowReplayer{ - registry: registry, - dataConverter: options.DataConverter, - failureConverter: options.FailureConverter, - contextPropagators: options.ContextPropagators, - enableLoggingInReplay: options.EnableLoggingInReplay, - disableDeadlockDetection: options.DisableDeadlockDetection, - workflowExecutionResults: make(map[string]*commonpb.Payloads), + registry: registry, + dataConverter: options.DataConverter, + failureConverter: options.FailureConverter, + contextPropagators: options.ContextPropagators, + enableLoggingInReplay: options.EnableLoggingInReplay, + disableDeadlockDetection: options.DisableDeadlockDetection, + workflowExecutionResults: make(map[string]*commonpb.Payloads), + workflowReplayerInstanceKey: workflowReplayerInstanceKey, + plugins: options.Plugins, + pluginRegistryOptions: &pluginRegistryOptions, }, nil } // RegisterWorkflow registers workflow function to replay func (aw *WorkflowReplayer) RegisterWorkflow(w interface{}) { + if aw.pluginRegistryOptions.OnRegisterWorkflow != nil { + aw.pluginRegistryOptions.OnRegisterWorkflow(w, RegisterWorkflowOptions{}) + } aw.registry.RegisterWorkflow(w) } // RegisterWorkflowWithOptions registers workflow function with custom workflow name to replay func (aw *WorkflowReplayer) RegisterWorkflowWithOptions(w interface{}, options RegisterWorkflowOptions) { + if aw.pluginRegistryOptions.OnRegisterWorkflow != nil { + aw.pluginRegistryOptions.OnRegisterWorkflow(w, options) + } aw.registry.RegisterWorkflowWithOptions(w, options) } // RegisterDynamicWorkflow registers a dynamic workflow function to replay func (aw *WorkflowReplayer) RegisterDynamicWorkflow(w interface{}, options DynamicRegisterWorkflowOptions) { + if aw.pluginRegistryOptions.OnRegisterDynamicWorkflow != nil { + aw.pluginRegistryOptions.OnRegisterDynamicWorkflow(w, options) + } aw.registry.RegisterDynamicWorkflow(w, options) } @@ -1622,7 +1696,47 @@ func (aw *WorkflowReplayer) GetWorkflowResult(workflowID string, valuePtr interf return dc.FromPayloads(payloads, valuePtr) } -func (aw *WorkflowReplayer) replayWorkflowHistory(logger log.Logger, service workflowservice.WorkflowServiceClient, namespace string, originalExecution WorkflowExecution, history *historypb.History) error { +func (aw *WorkflowReplayer) replayWorkflowHistory( + logger log.Logger, + service workflowservice.WorkflowServiceClient, + namespace string, + originalExecution WorkflowExecution, + history *historypb.History, +) error { + replay := func(ctx context.Context, options WorkerPluginReplayWorkflowOptions) error { + return aw.replayWorkflowHistoryRoot( + options.Logger, + options.WorkflowServiceClient, + options.Namespace, + options.OriginalExecution, + options.History, + ) + } + for i := len(aw.plugins) - 1; i >= 0; i-- { + plugin := aw.plugins[i] + next := replay + replay = func(ctx context.Context, options WorkerPluginReplayWorkflowOptions) error { + return plugin.ReplayWorkflow(ctx, options, next) + } + } + return replay(context.Background(), WorkerPluginReplayWorkflowOptions{ + WorkflowReplayerInstanceKey: aw.workflowReplayerInstanceKey, + History: history, + Logger: logger, + WorkflowServiceClient: service, + Namespace: namespace, + OriginalExecution: originalExecution, + WorkflowReplayRegistry: aw, + }) +} + +func (aw *WorkflowReplayer) replayWorkflowHistoryRoot( + logger log.Logger, + service workflowservice.WorkflowServiceClient, + namespace string, + originalExecution WorkflowExecution, + history *historypb.History, +) error { taskQueue := "ReplayTaskQueue" events := history.Events if events == nil { @@ -1827,6 +1941,23 @@ func NewAggregatedWorker(client *WorkflowClient, taskQueue string, options Worke if strings.HasPrefix(taskQueue, temporalPrefix) { panic(temporalPrefixError) } + + // Combine client-provided worker plugins with current options set and apply to options + workerInstanceKey := uuid.NewString() + var pluginRegistryOptions WorkerPluginConfigureWorkerRegistryOptions + plugins := append(append([]WorkerPlugin(nil), client.workerPlugins...), options.Plugins...) + for _, plugin := range plugins { + // No meaningful context to pass at this time, and all errors are panics when configuring worker + if err := plugin.ConfigureWorker(context.Background(), WorkerPluginConfigureWorkerOptions{ + WorkerInstanceKey: workerInstanceKey, + TaskQueue: taskQueue, + WorkerOptions: &options, + WorkerRegistryOptions: &pluginRegistryOptions, + }); err != nil { + panic(err) + } + } + setClientDefaults(client) setWorkerOptionsDefaults(&options) ctx := options.BackgroundActivityContext @@ -2015,17 +2146,35 @@ func NewAggregatedWorker(client *WorkflowClient, taskQueue string, options Worke } aw = &AggregatedWorker{ - client: client, - workflowWorker: workflowWorker, - activityWorker: activityWorker, - sessionWorker: sessionWorker, - logger: workerParams.Logger, - registry: registry, - stopC: make(chan struct{}), - capabilities: &capabilities, - executionParams: workerParams, - } - aw.memoizedStart = sync.OnceValue(aw.start) + client: client, + workflowWorker: workflowWorker, + activityWorker: activityWorker, + sessionWorker: sessionWorker, + logger: workerParams.Logger, + registry: registry, + stopC: make(chan struct{}), + capabilities: &capabilities, + executionParams: workerParams, + workerInstanceKey: workerInstanceKey, + plugins: plugins, + pluginRegistryOptions: &pluginRegistryOptions, + } + + // Set memoized start as a once-value that invokes plugins first + aw.memoizedStart = sync.OnceValue(func() error { + start := func(context.Context, WorkerPluginStartWorkerOptions) error { return aw.start() } + for i := len(plugins) - 1; i >= 0; i-- { + plugin := plugins[i] + next := start + start = func(ctx context.Context, options WorkerPluginStartWorkerOptions) error { + return plugin.StartWorker(ctx, options, next) + } + } + return start(context.Background(), WorkerPluginStartWorkerOptions{ + WorkerInstanceKey: workerInstanceKey, + WorkerRegistry: aw, + }) + }) return aw } diff --git a/internal/internal_workflow_client.go b/internal/internal_workflow_client.go index 777c214f1..3c0db0c9a 100644 --- a/internal/internal_workflow_client.go +++ b/internal/internal_workflow_client.go @@ -69,6 +69,7 @@ type ( dataConverter converter.DataConverter failureConverter converter.FailureConverter contextPropagators []ContextPropagator + workerPlugins []WorkerPlugin workerInterceptors []WorkerInterceptor interceptor ClientOutboundInterceptor excludeInternalFromRetry *atomic.Bool diff --git a/internal/plugin.go b/internal/plugin.go new file mode 100644 index 000000000..59950ea33 --- /dev/null +++ b/internal/plugin.go @@ -0,0 +1,660 @@ +package internal + +import ( + "context" + "fmt" + + "github.com/nexus-rpc/sdk-go/nexus" + historypb "go.temporal.io/api/history/v1" + "go.temporal.io/api/workflowservice/v1" + "go.temporal.io/sdk/converter" + "go.temporal.io/sdk/log" +) + +// ClientPlugin is a plugin that can configure client options and surround client +// creation/connection. Many plugin implementers may prefer the simpler +// [go.temporal.io/sdk/temporal.SimplePlugin] instead. +// +// All client plugins must embed [go.temporal.io/sdk/client.PluginBase]. All +// plugins must implement Name(). +// +// All client plugins that also implement [go.temporal.io/sdk/worker.Plugin] are +// automatically configured on workers made from the client. +// +// Exposed as: [go.temporal.io/sdk/client.Plugin] +// +// NOTE: Experimental +type ClientPlugin interface { + // Name returns the name for this plugin. + Name() string + + // ConfigureClient is called when a client is created but before the options + // are validated. This call gives plugins a chance to adjust options as + // needed. This often includes adding interceptors. + ConfigureClient(context.Context, ClientPluginConfigureClientOptions) error + + // NewClient is called when a client is being created/connected after + // options have been set. This is meant to surround dial calls. Implementers + // must either return an error or call next. + // + // This method intentionally does not allow control over the actual client + // instance because only the explicit client instance can be used in the SDK. + NewClient( + ctx context.Context, + options ClientPluginNewClientOptions, + next func(context.Context, ClientPluginNewClientOptions) error, + ) error + + // Plugins must embed [go.temporal.io/sdk/client.PluginBase]. + mustEmbedClientPluginBase() +} + +// ClientPluginConfigureClientOptions are options for ConfigureClient on a +// client plugin. +// +// Exposed as: [go.temporal.io/sdk/client.PluginConfigureClientOptions] +// +// NOTE: Experimental +type ClientPluginConfigureClientOptions struct { + // ClientOptions are the set of mutable options that can be adjusted by + // plugins. + ClientOptions *ClientOptions +} + +// ClientPluginNewClientOptions are options for NewClient on a client plugin. +// +// Exposed as: [go.temporal.io/sdk/client.PluginNewClientOptions] +// +// NOTE: Experimental +type ClientPluginNewClientOptions struct { + // ClientOptions are the set of options used for the client. These should + // not be mutated, that should be done via the ConfigureClient method. + ClientOptions ClientOptions + + // Lazy is whether the new client call is being invoked lazily or not. + Lazy bool + + // FromExisting is set to a non-nil Client if this client is being created + // from an existing client. + FromExisting Client +} + +// ClientPluginBase must be embedded into client plugin implementations. +// +// Exposed as: [go.temporal.io/sdk/client.PluginBase] +// +// NOTE: Experimental +type ClientPluginBase struct{} + +var _ ClientPlugin = struct { + pluginNamePanicForTypeChecking + ClientPluginBase +}{} + +// WorkerPlugin is a plugin that can configure worker/replayer options and +// surround worker/replayer runs. Many plugin implementers may prefer the +// simpler [go.temporal.io/sdk/temporal.SimplePlugin] instead. +// +// All worker plugins must embed [go.temporal.io/sdk/worker.PluginBase]. All +// plugins must implement Name(). +// +// Exposed as: [go.temporal.io/sdk/worker.Plugin] +// +// NOTE: Experimental +type WorkerPlugin interface { + // Name returns the name for this plugin. + Name() string + + // ConfigureWorker is called when a worker is created but before the options + // are validated. This call gives plugins a chance to adjust options as + // needed. This often includes adding interceptors. + // + // Note, at this time, due to [go.temporal.io/worker.New] not returning an + // error, any errors returned from here become panics. Also, at this time, + // the context cannot be supplied by a user, so it's effectively + // meaningless. + ConfigureWorker(context.Context, WorkerPluginConfigureWorkerOptions) error + + // StartWorker is called to start a worker. This is called on Worker.Start + // or Worker.Run. Implementers should return an error or invoke next. + StartWorker( + ctx context.Context, + options WorkerPluginStartWorkerOptions, + next func(context.Context, WorkerPluginStartWorkerOptions) error, + ) error + + // StopWorker is called to stop a worker. This is called on Worker.Stop or + // if Worker.Run is interrupted via its interrupt channel. However, if a + // fatal worker error occurs during Worker.Run, this may not be called. + // Implementers can account for this situation by setting OnFatalError in + // the worker options. Implementers should invoke next. + StopWorker( + ctx context.Context, + options WorkerPluginStopWorkerOptions, + next func(context.Context, WorkerPluginStopWorkerOptions), + ) + + // ConfigureWorkflowReplayer is called when the workflow replayer is created + // but before the options are validated. This call gives plugins a chance to + // adjust options as needed. This often includes adding interceptors. + ConfigureWorkflowReplayer(context.Context, WorkerPluginConfigureWorkflowReplayerOptions) error + + // ReplayWorkflow is called for each individual workflow replay on the + // replayer. Implementers should return an error or invoke next. + ReplayWorkflow( + ctx context.Context, + options WorkerPluginReplayWorkflowOptions, + next func(context.Context, WorkerPluginReplayWorkflowOptions) error, + ) error + + // Plugins must embed [go.temporal.io/sdk/worker.PluginBase]. + mustEmbedWorkerPluginBase() +} + +// WorkerPluginBase must be embedded into worker plugin implementations. +// +// Exposed as: [go.temporal.io/sdk/worker.PluginBase] +// +// NOTE: Experimental +type WorkerPluginBase struct{} + +var _ WorkerPlugin = struct { + pluginNamePanicForTypeChecking + WorkerPluginBase +}{} + +// WorkerPluginConfigureWorkerOptions are options for ConfigureWorker on a +// worker plugin. +// +// Exposed as: [go.temporal.io/sdk/worker.PluginConfigureWorkerOptions] +// +// NOTE: Experimental +type WorkerPluginConfigureWorkerOptions struct { + // WorkerInstanceKey is the unique, immutable instance key for this worker. + WorkerInstanceKey string + + // TaskQueue is the immutable task queue for this worker. + TaskQueue string + + // WorkerOptions are the set of mutable options that can be adjusted by + // plugins. + WorkerOptions *WorkerOptions + + // WorkerRegistryOptions are the set of callbacks that can be adjusted by + // plugins. If adjusting a callback that is already set, implementers may + // want to take care to invoke the existing callback inside their own. + WorkerRegistryOptions *WorkerPluginConfigureWorkerRegistryOptions +} + +// WorkerPluginConfigureWorkerRegistryOptions are the set of callbacks that can +// be adjusted by plugins when configuring workers. If adjusting a callback that +// is already set, implementers may want to take care to invoke the existing +// callback inside their own. +// +// Exposed as: [go.temporal.io/sdk/worker.PluginConfigureWorkerRegistryOptions] +// +// NOTE: Experimental +type WorkerPluginConfigureWorkerRegistryOptions struct { + OnRegisterWorkflow func(any, RegisterWorkflowOptions) + OnRegisterDynamicWorkflow func(any, DynamicRegisterWorkflowOptions) + OnRegisterActivity func(any, RegisterActivityOptions) + OnRegisterDynamicActivity func(any, DynamicRegisterActivityOptions) + OnRegisterNexusService func(*nexus.Service) +} + +// WorkerPluginStartWorkerOptions are options for StartWorker on a worker +// plugin. +// +// Exposed as: [go.temporal.io/sdk/worker.PluginStartWorkerOptions] +// +// NOTE: Experimental +type WorkerPluginStartWorkerOptions struct { + // WorkerInstanceKey is the unique, immutable instance key for this worker. + WorkerInstanceKey string + + // WorkerRegistry is the worker registry plugins can use to register items + // with the worker. Implementers should usually not mutate this before + // passing to "next", but instead add register callbacks in ConfigureWorker + // if needed. Calls to this registry to invoke the OnX callbacks set in + // ConfigureWorker. + WorkerRegistry interface { + RegisterWorkflowWithOptions(any, RegisterWorkflowOptions) + RegisterDynamicWorkflow(any, DynamicRegisterWorkflowOptions) + RegisterActivityWithOptions(any, RegisterActivityOptions) + RegisterDynamicActivity(any, DynamicRegisterActivityOptions) + RegisterNexusService(*nexus.Service) + } +} + +// WorkerPluginStopWorkerOptions are options for StopWorker on a worker plugin. +// +// Exposed as: [go.temporal.io/sdk/worker.PluginStopWorkerOptions] +// +// NOTE: Experimental +type WorkerPluginStopWorkerOptions struct { + // WorkerInstanceKey is the unique, immutable instance key for this worker. + WorkerInstanceKey string +} + +// WorkerPluginConfigureWorkflowReplayerOptions are options for +// ConfigureWorkflowReplayer on a worker plugin. +// +// Exposed as: [go.temporal.io/sdk/worker.PluginConfigureWorkflowReplayerOptions] +// +// NOTE: Experimental +type WorkerPluginConfigureWorkflowReplayerOptions struct { + // WorkflowReplayerInstanceKey is the unique, immutable instance key for + // this workflow replayer. + WorkflowReplayerInstanceKey string + + // WorkflowReplayerOptions are the set of mutable options that can be + // adjusted by plugins. + WorkflowReplayerOptions *WorkflowReplayerOptions + + // WorkflowReplayerRegistryOptions are the set of callbacks that can be + // adjusted by plugins. If adjusting a callback that is already set, + // implementers may want to take care to invoke the existing callback inside + // their own. + WorkflowReplayerRegistryOptions *WorkerPluginConfigureWorkflowReplayerRegistryOptions +} + +// WorkerPluginConfigureWorkflowReplayerRegistryOptions are the set of callbacks +// that can be adjusted by plugins when configuring workflow replayers. If +// adjusting a callback that is already set, implementers may want to take care +// to invoke the existing callback inside their own. +// +// Exposed as: [go.temporal.io/sdk/worker.PluginConfigureWorkflowReplayerRegistryOptions] +// +// NOTE: Experimental +type WorkerPluginConfigureWorkflowReplayerRegistryOptions struct { + OnRegisterWorkflow func(any, RegisterWorkflowOptions) + OnRegisterDynamicWorkflow func(any, DynamicRegisterWorkflowOptions) +} + +// WorkerPluginReplayWorkflowOptions are options for ReplayWorkflow on a worker +// plugin. +// +// Exposed as: [go.temporal.io/sdk/worker.PluginReplayWorkflowOptions] +// +// NOTE: Experimental +type WorkerPluginReplayWorkflowOptions struct { + // WorkflowReplayerInstanceKey is the unique, immutable instance key for + // this workflow replayer. + WorkflowReplayerInstanceKey string + + // WorkflowReplayRegistry is the workflow replayer registry plugins can use + // to register items with the replayer. Implementers should usually not + // mutate this before passing to "next", but instead add register callbacks + // in ConfigureWorkflowReplayer if needed. Calls to this registry to invoke + // the OnX callbacks set in ConfigureWorkflowReplayer. + WorkflowReplayRegistry interface { + RegisterWorkflowWithOptions(any, RegisterWorkflowOptions) + RegisterDynamicWorkflow(any, DynamicRegisterWorkflowOptions) + } + + // History to replay. + History *historypb.History + + // All fields below are coalesced from overloads. No guarantees are made + // about their values. + Logger log.Logger + WorkflowServiceClient workflowservice.WorkflowServiceClient + Namespace string + OriginalExecution WorkflowExecution +} + +type pluginNamePanicForTypeChecking struct{} + +func (pluginNamePanicForTypeChecking) Name() string { panic("unreachable") } + +// SimplePlugin implements both [go.temporal.io/sdk/client.Plugin] and +// [go.temporal.io/sdk/worker.Plugin] from a given set of options. Use +// [go.temporal.io/sdk/temporal.NewSimplePlugin] to instantiate this. +// +// Exposed as: [go.temporal.io/sdk/temporal.SimplePlugin] +// +// NOTE: Experimental +type SimplePlugin struct { + options SimplePluginOptions +} + +var _ ClientPlugin = (*SimplePlugin)(nil) +var _ WorkerPlugin = (*SimplePlugin)(nil) + +// SimplePluginOptions are options for NewSimplePlugin. +// +// Exposed as: [go.temporal.io/sdk/temporal.SimplePluginOptions] +// +// NOTE: Experimental +type SimplePluginOptions struct { + // Name is the required name of the plugin. + Name string + + // DataConverter, if set, overrides any user-set or previous-plugin-set data + // converter on client or replayer options. Use ConfigureClient or + // ConfigureWorkflowReplayer if needing to react to existing options instead + // of overwriting. + DataConverter converter.DataConverter + + // FailureConverter, if set, overrides any user-set or previous-plugin-set + // failure converter on client or replayer options. Use ConfigureClient or + // ConfigureWorkflowReplayer if needing to react to existing options instead + // of overwriting. + FailureConverter converter.FailureConverter + + // ContextPropagators are appended to any user-set or previous-plugin-set + // context propagators on client or replayer options. Use ConfigureClient or + // ConfigureWorkflowReplayer if needing to react to existing options instead + // of appending. + ContextPropagators []ContextPropagator + + // ClientInterceptors are appended to any user-set or previous-plugin-set + // client interceptors on client options. Use ConfigureClient if needing to + // react to existing options instead of appending. + ClientInterceptors []ClientInterceptor + + // WorkerInterceptors are appended to any user-set or previous-plugin-set + // worker interceptors on worker or replayer options. Use ConfigureWorker or + // ConfigureWorkflowReplayer if needing to react to existing options instead + // of appending. + WorkerInterceptors []WorkerInterceptor + + // ConfigureClient if set is invoked to adjust client options. This is + // invoked after any above options are set. + ConfigureClient func(context.Context, ClientPluginConfigureClientOptions) error + + // ConfigureWorker if set is invoked to adjust worker options. This is + // invoked after any above options are set. + ConfigureWorker func(context.Context, WorkerPluginConfigureWorkerOptions) error + + // ConfigureWorkflowReplayer if set is invoked to adjust workflow replayer + // options. This is invoked after any above options are set. + ConfigureWorkflowReplayer func(context.Context, WorkerPluginConfigureWorkflowReplayerOptions) error + + // RunContextBefore is invoked on worker start or before each workflow + // replay of a replayer. Implementers can use this to register items or + // simply start something needed. + RunContextBefore func(context.Context, SimplePluginRunContextBeforeOptions) error + + // RunContextAfter is invoked on worker stop or after each workflow replay + // of a replayer. Implementers can use this to close something started + // before. + // + // See the note on [WorkerPlugin.StopWorker] about rare situations in which + // this may not run on worker completion. + RunContextAfter func(context.Context, SimplePluginRunContextAfterOptions) +} + +// SimplePluginRunContextBeforeOptions are options for RunContextBefore on a +// simple plugin. +// +// Exposed as: [go.temporal.io/sdk/temporal.SimplePluginRunContextBeforeOptions] +// +// NOTE: Experimental +type SimplePluginRunContextBeforeOptions struct { + // InstanceKey is the unique, immutable instance key for the worker or + // workflow replayer. + InstanceKey string + + // WorkflowReplayer is true if this is a workflow replayer, or false if it + // is a worker. + WorkflowReplayer bool + + // Registry is the worker/replayer registry plugins can use to register + // items. Note, activity and Nexus service registration do nothing if + // WorkflowReplayer is true. + Registry interface { + RegisterWorkflowWithOptions(any, RegisterWorkflowOptions) + RegisterDynamicWorkflow(any, DynamicRegisterWorkflowOptions) + RegisterActivityWithOptions(any, RegisterActivityOptions) + RegisterDynamicActivity(any, DynamicRegisterActivityOptions) + RegisterNexusService(*nexus.Service) + } +} + +// SimplePluginRunContextAfterOptions are options for RunContextAfter on a +// simple plugin. +// +// Exposed as: [go.temporal.io/sdk/temporal.SimplePluginRunContextAfterOptions] +// +// NOTE: Experimental +type SimplePluginRunContextAfterOptions struct { + // InstanceKey is the unique, immutable instance key for the worker or + // workflow replayer. + InstanceKey string +} + +func (ClientPluginBase) ConfigureClient(context.Context, ClientPluginConfigureClientOptions) error { + return nil +} + +func (ClientPluginBase) NewClient( + ctx context.Context, + options ClientPluginNewClientOptions, + next func(context.Context, ClientPluginNewClientOptions) error, +) error { + return next(ctx, options) +} + +//lint:ignore U1000 Intentionally unused +func (ClientPluginBase) mustEmbedClientPluginBase() {} + +func (WorkerPluginBase) ConfigureWorker(context.Context, WorkerPluginConfigureWorkerOptions) error { + return nil +} + +func (WorkerPluginBase) StartWorker( + ctx context.Context, + options WorkerPluginStartWorkerOptions, + next func(context.Context, WorkerPluginStartWorkerOptions) error, +) error { + return next(ctx, options) +} + +func (WorkerPluginBase) StopWorker( + ctx context.Context, + options WorkerPluginStopWorkerOptions, + next func(context.Context, WorkerPluginStopWorkerOptions), +) { + next(ctx, options) +} + +func (WorkerPluginBase) ConfigureWorkflowReplayer(context.Context, WorkerPluginConfigureWorkflowReplayerOptions) error { + return nil +} + +func (WorkerPluginBase) ReplayWorkflow( + ctx context.Context, + options WorkerPluginReplayWorkflowOptions, + next func(context.Context, WorkerPluginReplayWorkflowOptions) error, +) error { + return next(ctx, options) +} + +//lint:ignore U1000 Intentionally unused +func (WorkerPluginBase) mustEmbedWorkerPluginBase() {} + +// NewSimplePlugin creates a new SimplePlugin with the given options. +func NewSimplePlugin(options SimplePluginOptions) (*SimplePlugin, error) { + if options.Name == "" { + return nil, fmt.Errorf("name required") + } + return &SimplePlugin{options}, nil +} + +// We impl these instead of embedding plugin base to force SimplePlugin to +// explicitly account for new plugin things added +func (*SimplePlugin) mustEmbedClientPluginBase() {} +func (*SimplePlugin) mustEmbedWorkerPluginBase() {} + +func (s *SimplePlugin) Name() string { return s.options.Name } + +func (s *SimplePlugin) ConfigureClient(ctx context.Context, options ClientPluginConfigureClientOptions) error { + if s.options.DataConverter != nil { + options.ClientOptions.DataConverter = s.options.DataConverter + } + if s.options.FailureConverter != nil { + options.ClientOptions.FailureConverter = s.options.FailureConverter + } + if len(s.options.ContextPropagators) > 0 { + options.ClientOptions.ContextPropagators = + append(options.ClientOptions.ContextPropagators, s.options.ContextPropagators...) + } + if len(s.options.ClientInterceptors) > 0 { + options.ClientOptions.Interceptors = append(options.ClientOptions.Interceptors, s.options.ClientInterceptors...) + } + if s.options.ConfigureClient != nil { + if err := s.options.ConfigureClient(ctx, options); err != nil { + return err + } + } + return nil +} + +func (*SimplePlugin) NewClient( + ctx context.Context, + options ClientPluginNewClientOptions, + next func(context.Context, ClientPluginNewClientOptions) error, +) error { + return next(ctx, options) +} + +func (s *SimplePlugin) ConfigureWorker(ctx context.Context, options WorkerPluginConfigureWorkerOptions) error { + if len(s.options.WorkerInterceptors) > 0 { + options.WorkerOptions.Interceptors = append(options.WorkerOptions.Interceptors, s.options.WorkerInterceptors...) + } + if s.options.ConfigureWorker != nil { + if err := s.options.ConfigureWorker(ctx, options); err != nil { + return err + } + } + return nil +} + +func (s *SimplePlugin) StartWorker( + ctx context.Context, + options WorkerPluginStartWorkerOptions, + next func(context.Context, WorkerPluginStartWorkerOptions) error, +) error { + if s.options.RunContextBefore != nil { + if err := s.options.RunContextBefore( + ctx, + SimplePluginRunContextBeforeOptions{ + InstanceKey: options.WorkerInstanceKey, + Registry: options.WorkerRegistry, + }, + ); err != nil { + return err + } + } + return next(ctx, options) +} + +func (s *SimplePlugin) StopWorker( + ctx context.Context, + options WorkerPluginStopWorkerOptions, + next func(context.Context, WorkerPluginStopWorkerOptions), +) { + if s.options.RunContextAfter != nil { + s.options.RunContextAfter( + ctx, + SimplePluginRunContextAfterOptions{InstanceKey: options.WorkerInstanceKey}, + ) + } + next(ctx, options) +} + +func (s *SimplePlugin) ConfigureWorkflowReplayer( + ctx context.Context, + options WorkerPluginConfigureWorkflowReplayerOptions, +) error { + if s.options.DataConverter != nil { + options.WorkflowReplayerOptions.DataConverter = s.options.DataConverter + } + if s.options.FailureConverter != nil { + options.WorkflowReplayerOptions.FailureConverter = s.options.FailureConverter + } + if len(s.options.ContextPropagators) > 0 { + options.WorkflowReplayerOptions.ContextPropagators = append( + options.WorkflowReplayerOptions.ContextPropagators, + s.options.ContextPropagators..., + ) + } + // Go over every client interceptor and append if it's also a worker interceptor + for _, interceptor := range s.options.ClientInterceptors { + if workerInterceptor, _ := interceptor.(WorkerInterceptor); workerInterceptor != nil { + options.WorkflowReplayerOptions.Interceptors = append( + options.WorkflowReplayerOptions.Interceptors, + workerInterceptor, + ) + } + } + if len(s.options.WorkerInterceptors) > 0 { + options.WorkflowReplayerOptions.Interceptors = append( + options.WorkflowReplayerOptions.Interceptors, + s.options.WorkerInterceptors..., + ) + } + if s.options.ConfigureWorkflowReplayer != nil { + if err := s.options.ConfigureWorkflowReplayer(ctx, options); err != nil { + return err + } + } + return nil +} + +type simplePluginWorkflowReplayerRegistry struct { + registerWorkflowWithOptions func(any, RegisterWorkflowOptions) + registerDynamicWorkflow func(any, DynamicRegisterWorkflowOptions) +} + +func (s simplePluginWorkflowReplayerRegistry) RegisterWorkflowWithOptions(w any, options RegisterWorkflowOptions) { + s.registerWorkflowWithOptions(w, options) +} + +func (s simplePluginWorkflowReplayerRegistry) RegisterDynamicWorkflow(w any, options DynamicRegisterWorkflowOptions) { + s.registerDynamicWorkflow(w, options) +} + +func (simplePluginWorkflowReplayerRegistry) RegisterActivityWithOptions(any, RegisterActivityOptions) { + // No-op +} + +func (simplePluginWorkflowReplayerRegistry) RegisterDynamicActivity(any, DynamicRegisterActivityOptions) { + // No-op +} + +func (simplePluginWorkflowReplayerRegistry) RegisterNexusService(*nexus.Service) { + // No-op +} + +func (s *SimplePlugin) ReplayWorkflow( + ctx context.Context, + options WorkerPluginReplayWorkflowOptions, + next func(context.Context, WorkerPluginReplayWorkflowOptions) error, +) error { + if s.options.RunContextBefore != nil { + if err := s.options.RunContextBefore( + ctx, + SimplePluginRunContextBeforeOptions{ + InstanceKey: options.WorkflowReplayerInstanceKey, + WorkflowReplayer: true, + Registry: simplePluginWorkflowReplayerRegistry{ + registerWorkflowWithOptions: options.WorkflowReplayRegistry.RegisterWorkflowWithOptions, + registerDynamicWorkflow: options.WorkflowReplayRegistry.RegisterDynamicWorkflow, + }, + }, + ); err != nil { + return err + } + } + if s.options.RunContextAfter != nil { + defer s.options.RunContextAfter( + ctx, + SimplePluginRunContextAfterOptions{ + InstanceKey: options.WorkflowReplayerInstanceKey, + }, + ) + } + return next(ctx, options) +} diff --git a/internal/worker.go b/internal/worker.go index d3266b0c4..83efaf21c 100644 --- a/internal/worker.go +++ b/internal/worker.go @@ -390,6 +390,18 @@ type ( // // NOTE: Experimental NexusTaskPollerBehavior PollerBehavior + + // Plugins that can configure options and intercept start/stop worker. + // + // Any plugins that were part of client options and implement + // worker.Plugin are automatically applied as worker plugins and should + // not also be set here. + // + // Plugins themselves should never mutate this field, the behavior is + // undefined. + // + // NOTE: Experimental + Plugins []WorkerPlugin } ) diff --git a/temporal/plugin.go b/temporal/plugin.go new file mode 100644 index 000000000..61784dc8c --- /dev/null +++ b/temporal/plugin.go @@ -0,0 +1,34 @@ +package temporal + +import "go.temporal.io/sdk/internal" + +// SimplePlugin implements both [go.temporal.io/sdk/client.Plugin] and +// [go.temporal.io/sdk/worker.Plugin] from a given set of options. Use +// [go.temporal.io/sdk/temporal.NewSimplePlugin] to instantiate this. +// +// NOTE: Experimental +type SimplePlugin = internal.SimplePlugin + +// SimplePluginOptions are options for NewSimplePlugin. +// +// NOTE: Experimental +type SimplePluginOptions = internal.SimplePluginOptions + +// SimplePluginRunContextBeforeOptions are options for RunContextBefore on a +// simple plugin. +// +// NOTE: Experimental +type SimplePluginRunContextBeforeOptions = internal.SimplePluginRunContextBeforeOptions + +// SimplePluginRunContextAfterOptions are options for RunContextAfter on a +// simple plugin. +// +// NOTE: Experimental +type SimplePluginRunContextAfterOptions = internal.SimplePluginRunContextAfterOptions + +// NewSimplePlugin creates a new SimplePlugin with the given options. +// +// NOTE: Experimental +func NewSimplePlugin(options SimplePluginOptions) (*SimplePlugin, error) { + return internal.NewSimplePlugin(options) +} diff --git a/test/integration_test.go b/test/integration_test.go index 97232b92b..8ba513681 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -7979,3 +7979,396 @@ func (ts *IntegrationTestSuite) TestUnhandledCommandAndMetrics() { } ts.Equal(1, workflowCompletedCount) } + +// Plugin sets client options, can fail dial +type clientPluginForTest struct { + client.PluginBase + ts *IntegrationTestSuite + callRecorder interceptortest.CallRecordingInvoker + failNew bool +} + +func (*clientPluginForTest) Name() string { return "client-plugin-for-test" } + +func (c *clientPluginForTest) ConfigureClient( + ctx context.Context, + options client.PluginConfigureClientOptions, +) error { + // Set the host:port, TLS, and interceptor (caller will set namespace) + options.ClientOptions.HostPort = c.ts.config.ServiceAddr + options.ClientOptions.ConnectionOptions.TLS = c.ts.config.TLS + options.ClientOptions.Interceptors = append( + options.ClientOptions.Interceptors, + interceptortest.NewProxy(&c.callRecorder), + ) + return nil +} + +func (c *clientPluginForTest) NewClient( + ctx context.Context, + options client.PluginNewClientOptions, + next func(context.Context, client.PluginNewClientOptions) error, +) error { + if c.failNew { + return fmt.Errorf("intentional client error") + } + // Confirm our interceptor was set + c.ts.Len(options.ClientOptions.Interceptors, 1) + return next(ctx, options) +} + +func (ts *IntegrationTestSuite) TestClientPlugin() { + // Stop any existing worker + ts.worker.Stop() + + // Failing dial first + plugin := &clientPluginForTest{ts: ts, failNew: true} + _, err := client.Dial(client.Options{Namespace: ts.config.Namespace, Plugins: []client.Plugin{plugin}}) + ts.ErrorContains(err, "intentional client error") + + // Now succeed dial and run simple workflow + plugin = &clientPluginForTest{ts: ts} + cl, err := client.Dial(client.Options{Namespace: ts.config.Namespace, Plugins: []client.Plugin{plugin}}) + ts.NoError(err) + defer cl.Close() + wrk := worker.New(cl, ts.taskQueueName, worker.Options{}) + wrk.RegisterWorkflowWithOptions( + func(workflow.Context) (string, error) { return "workflow-success", nil }, + workflow.RegisterOptions{Name: "simple-workflow"}, + ) + ts.NoError(wrk.Start()) + defer wrk.Stop() + run, err := cl.ExecuteWorkflow( + context.Background(), + ts.startWorkflowOptions(ts.T().Name()+"-client-plugin"), + "simple-workflow", + ) + var res string + ts.NoError(err) + ts.NoError(run.Get(context.Background(), &res)) + ts.Equal("workflow-success", res) + + // Confirm interceptors called + var calls []string + for _, call := range plugin.callRecorder.Calls() { + calls = append(calls, call.Interface.Name()+"."+call.Method.Name) + } + ts.Contains(calls, "ClientOutboundInterceptor.ExecuteWorkflow") + ts.Contains(calls, "WorkflowInboundInterceptor.ExecuteWorkflow") +} + +type workerPluginForTest struct { + worker.PluginBase + ts *IntegrationTestSuite + instanceKey string + callRecorder interceptortest.CallRecordingInvoker + activityRegistrationsCalled []activity.RegisterOptions + workflowRegistrationsCalled []workflow.RegisterOptions + failStart bool + failReplay bool + stopCalled bool + replayedExecutions []workflow.Execution +} + +func (w *workerPluginForTest) SomeActivity(ctx context.Context) (string, error) { + return "activity-success", nil +} + +func (*workerPluginForTest) Name() string { return "worker-plugin-for-test" } + +func (w *workerPluginForTest) ConfigureWorker(ctx context.Context, options worker.PluginConfigureWorkerOptions) error { + w.ts.Equal(w.ts.taskQueueName, options.TaskQueue) + w.ts.Empty(w.instanceKey) + w.instanceKey = options.WorkerInstanceKey + // Add an interceptor and a callback for registrations + options.WorkerOptions.Interceptors = append( + options.WorkerOptions.Interceptors, + interceptortest.NewProxy(&w.callRecorder), + ) + options.WorkerRegistryOptions.OnRegisterActivity = func(act any, options activity.RegisterOptions) { + w.activityRegistrationsCalled = append(w.activityRegistrationsCalled, options) + } + options.WorkerRegistryOptions.OnRegisterWorkflow = func(act any, options workflow.RegisterOptions) { + w.workflowRegistrationsCalled = append(w.workflowRegistrationsCalled, options) + } + return nil +} + +func (w *workerPluginForTest) StartWorker( + ctx context.Context, + options worker.PluginStartWorkerOptions, + next func(context.Context, worker.PluginStartWorkerOptions) error, +) error { + w.ts.Equal(w.instanceKey, options.WorkerInstanceKey) + if w.failStart { + return fmt.Errorf("intentional worker error") + } + // Add our activity w/ options + options.WorkerRegistry.RegisterActivityWithOptions(w.SomeActivity, activity.RegisterOptions{Name: "some-activity"}) + return next(ctx, options) +} + +func (w *workerPluginForTest) StopWorker( + ctx context.Context, + options worker.PluginStopWorkerOptions, + next func(context.Context, worker.PluginStopWorkerOptions), +) { + w.ts.Equal(w.instanceKey, options.WorkerInstanceKey) + w.ts.False(w.stopCalled) + w.stopCalled = true + next(ctx, options) +} + +func (w *workerPluginForTest) ConfigureWorkflowReplayer( + ctx context.Context, + options worker.PluginConfigureWorkflowReplayerOptions, +) error { + w.ts.Empty(w.instanceKey) + w.instanceKey = options.WorkflowReplayerInstanceKey + return nil +} + +func (w *workerPluginForTest) ReplayWorkflow( + ctx context.Context, + options worker.PluginReplayWorkflowOptions, + next func(context.Context, worker.PluginReplayWorkflowOptions) error, +) error { + w.ts.Equal(w.instanceKey, options.WorkflowReplayerInstanceKey) + if w.failReplay { + return fmt.Errorf("intentional replayer error") + } + w.replayedExecutions = append(w.replayedExecutions, options.OriginalExecution) + return next(ctx, options) +} + +func (ts *IntegrationTestSuite) TestWorkerPlugin() { + // Stop any existing worker + ts.worker.Stop() + + // Failing start first + plugin := &workerPluginForTest{ts: ts, failStart: true} + wrk := worker.New(ts.client, ts.taskQueueName, worker.Options{Plugins: []worker.Plugin{plugin}}) + err := wrk.Start() + ts.ErrorContains(err, "intentional worker error") + + // Create and setup workflow that calls our activity + plugin = &workerPluginForTest{ts: ts} + wrk = worker.New(ts.client, ts.taskQueueName, worker.Options{Plugins: []worker.Plugin{plugin}}) + wf := func(ctx workflow.Context) (s string, err error) { + err = workflow.ExecuteActivity( + workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ScheduleToCloseTimeout: 5 * time.Second}), + "some-activity", + ).Get(ctx, &s) + return + } + wrk.RegisterWorkflowWithOptions(wf, workflow.RegisterOptions{Name: "some-workflow"}) + + // Run worker and workflow + ts.NoError(wrk.Start()) + defer wrk.Stop() // Just in case, we also call stop as part of this test + run, err := ts.client.ExecuteWorkflow( + context.Background(), + ts.startWorkflowOptions(ts.T().Name()+"-worker-plugin"), + "some-workflow", + ) + var res string + ts.NoError(err) + ts.NoError(run.Get(context.Background(), &res)) + ts.Equal("activity-success", res) + + // Stop multiple times, confirm called (inside stop we check that we're only called once) + wrk.Stop() + wrk.Stop() + ts.True(plugin.stopCalled) + + // Confirm interceptor called + var calls []string + for _, call := range plugin.callRecorder.Calls() { + calls = append(calls, call.Interface.Name()+"."+call.Method.Name) + } + ts.Contains(calls, "WorkflowInboundInterceptor.ExecuteWorkflow") + + // Confirm registrations called + ts.Len(plugin.activityRegistrationsCalled, 1) + ts.Equal(plugin.activityRegistrationsCalled[0].Name, "some-activity") + ts.Len(plugin.workflowRegistrationsCalled, 1) + ts.Equal(plugin.workflowRegistrationsCalled[0].Name, "some-workflow") + + // Now replayer, first confirm we can fail it + plugin = &workerPluginForTest{ts: ts, failReplay: true} + replayer, err := worker.NewWorkflowReplayerWithOptions( + worker.WorkflowReplayerOptions{Plugins: []worker.Plugin{plugin}}, + ) + ts.NoError(err) + err = replayer.ReplayWorkflowExecution( + context.Background(), ts.client.WorkflowService(), ilog.NewDefaultLogger(), ts.config.Namespace, + workflow.Execution{ID: run.GetID()}) + ts.ErrorContains(err, "intentional replayer error") + + // Now replay and confirm what was replayed + plugin = &workerPluginForTest{ts: ts} + replayer, err = worker.NewWorkflowReplayerWithOptions( + worker.WorkflowReplayerOptions{Plugins: []worker.Plugin{plugin}}, + ) + ts.NoError(err) + replayer.RegisterWorkflowWithOptions(wf, workflow.RegisterOptions{Name: "some-workflow"}) + err = replayer.ReplayWorkflowExecution( + context.Background(), ts.client.WorkflowService(), ilog.NewDefaultLogger(), ts.config.Namespace, + workflow.Execution{ID: run.GetID()}) + ts.NoError(err) + ts.Len(plugin.replayedExecutions, 1) + ts.Equal(run.GetID(), plugin.replayedExecutions[0].ID) +} + +type toPayloadTrackingDataConverter struct { + converter.DataConverter + valuesToPayload []any +} + +func (t *toPayloadTrackingDataConverter) ToPayload(value interface{}) (*commonpb.Payload, error) { + t.valuesToPayload = append(t.valuesToPayload, value) + return t.DataConverter.ToPayload(value) +} + +func (t *toPayloadTrackingDataConverter) ToPayloads(value ...interface{}) (*commonpb.Payloads, error) { + t.valuesToPayload = append(t.valuesToPayload, value...) + return t.DataConverter.ToPayloads(value...) +} + +func (ts *IntegrationTestSuite) TestSimplePlugin() { + // Stop any existing worker + ts.worker.Stop() + + // Create a simple plugin that just confirms some things are properly set + conv := &toPayloadTrackingDataConverter{DataConverter: converter.GetDefaultDataConverter()} + var confClient, confWorker, confReplayer, beforeWorker, beforeReplayer, after int + var lastInstanceKey string + plugin, err := temporal.NewSimplePlugin(temporal.SimplePluginOptions{ + Name: "simple-plugin", + DataConverter: conv, + ConfigureClient: func(context.Context, client.PluginConfigureClientOptions) error { + confClient++ + return nil + }, + ConfigureWorker: func(ctx context.Context, options worker.PluginConfigureWorkerOptions) error { + ts.NotEmpty(options.WorkerInstanceKey) + lastInstanceKey = options.WorkerInstanceKey + confWorker++ + return nil + }, + ConfigureWorkflowReplayer: func( + ctx context.Context, + options worker.PluginConfigureWorkflowReplayerOptions, + ) error { + ts.NotEmpty(options.WorkflowReplayerInstanceKey) + lastInstanceKey = options.WorkflowReplayerInstanceKey + confReplayer++ + return nil + }, + RunContextBefore: func(ctx context.Context, options temporal.SimplePluginRunContextBeforeOptions) error { + ts.NotEmpty(options.InstanceKey) + ts.Equal(lastInstanceKey, options.InstanceKey) + if options.WorkflowReplayer { + beforeReplayer++ + } else { + beforeWorker++ + } + // Add our workflow and activity here + options.Registry.RegisterWorkflowWithOptions( + func(ctx workflow.Context) (s string, err error) { + err = workflow.ExecuteActivity( + workflow.WithActivityOptions( + ctx, + workflow.ActivityOptions{ScheduleToCloseTimeout: 5 * time.Second}, + ), + "some-activity", + ).Get(ctx, &s) + return + }, + workflow.RegisterOptions{Name: "some-workflow"}, + ) + options.Registry.RegisterActivityWithOptions( + func(context.Context) (string, error) { return "activity-success", nil }, + activity.RegisterOptions{Name: "some-activity"}, + ) + return nil + }, + RunContextAfter: func(ctx context.Context, options internal.SimplePluginRunContextAfterOptions) { + ts.NotEmpty(options.InstanceKey) + ts.Equal(lastInstanceKey, options.InstanceKey) + after++ + }, + }) + ts.NoError(err) + + // Just run a simple workflow and stop worker + cl, err := client.Dial(client.Options{Namespace: ts.config.Namespace, Plugins: []client.Plugin{plugin}}) + ts.NoError(err) + defer cl.Close() + wrk := worker.New(cl, ts.taskQueueName, worker.Options{}) + ts.NoError(wrk.Start()) + defer wrk.Stop() // We'll also manually stop this later in the test + run, err := cl.ExecuteWorkflow( + context.Background(), + ts.startWorkflowOptions(ts.T().Name()+"-simple-plugin"), + "some-workflow", + ) + var res string + ts.NoError(err) + ts.NoError(run.Get(context.Background(), &res)) + ts.Equal("activity-success", res) + wrk.Stop() + + // Check data converter was applied (it's actually called twice) + ts.Contains(conv.valuesToPayload, "activity-success") + + // Check numbers + ts.Equal(1, confClient) + ts.Equal(1, confWorker) + ts.Equal(0, confReplayer) + ts.Equal(1, beforeWorker) + ts.Equal(0, beforeReplayer) + ts.Equal(1, after) + + // Do a replay and check numbers again + replayer, err := worker.NewWorkflowReplayerWithOptions( + worker.WorkflowReplayerOptions{Plugins: []worker.Plugin{plugin}}, + ) + ts.NoError(err) + err = replayer.ReplayWorkflowExecution( + context.Background(), ts.client.WorkflowService(), ilog.NewDefaultLogger(), ts.config.Namespace, + workflow.Execution{ID: run.GetID()}) + ts.NoError(err) + ts.Equal(1, confReplayer) + ts.Equal(1, beforeWorker) + ts.Equal(1, beforeReplayer) + ts.Equal(2, after) +} + +func (ts *IntegrationTestSuite) TestSimplePluginDoNothing() { + // Stop any existing worker + ts.worker.Stop() + + // Just run a simple workflow with a do-nothing plugin and make sure it works as normal + plugin, err := temporal.NewSimplePlugin(temporal.SimplePluginOptions{Name: "simple-plugin"}) + ts.NoError(err) + cl, err := client.Dial(client.Options{Namespace: ts.config.Namespace, Plugins: []client.Plugin{plugin}}) + ts.NoError(err) + defer cl.Close() + wrk := worker.New(cl, ts.taskQueueName, worker.Options{}) + wrk.RegisterWorkflowWithOptions( + func(ctx workflow.Context) (string, error) { return "workflow-success", nil }, + workflow.RegisterOptions{Name: "some-workflow"}, + ) + ts.NoError(wrk.Start()) + defer wrk.Stop() + run, err := cl.ExecuteWorkflow( + context.Background(), + ts.startWorkflowOptions(ts.T().Name()+"-simple-plugin-do-nothing"), + "some-workflow", + ) + var res string + ts.NoError(err) + ts.NoError(run.Get(context.Background(), &res)) + ts.Equal("workflow-success", res) +} diff --git a/worker/plugin.go b/worker/plugin.go new file mode 100644 index 000000000..feb8faec1 --- /dev/null +++ b/worker/plugin.go @@ -0,0 +1,63 @@ +package worker + +import "go.temporal.io/sdk/internal" + +// Plugin is a plugin that can configure worker/replayer options and +// surround worker/replayer runs. Many plugin implementers may prefer the +// simpler [go.temporal.io/sdk/temporal.SimplePlugin] instead. +// +// All worker plugins must embed [go.temporal.io/sdk/worker.PluginBase]. All +// plugins must implement Name(). +// +// NOTE: Experimental +type Plugin = internal.WorkerPlugin + +// PluginBase must be embedded into worker plugin implementations. +// +// NOTE: Experimental +type PluginBase = internal.WorkerPluginBase + +// PluginConfigureWorkerOptions are options for ConfigureWorker on a +// worker plugin. +// +// NOTE: Experimental +type PluginConfigureWorkerOptions = internal.WorkerPluginConfigureWorkerOptions + +// PluginConfigureWorkerRegistryOptions are the set of callbacks that can +// be adjusted by plugins when configuring workers. If adjusting a callback that +// is already set, implementers may want to take care to invoke the existing +// callback inside their own. +// +// NOTE: Experimental +type PluginConfigureWorkerRegistryOptions = internal.WorkerPluginConfigureWorkerRegistryOptions + +// PluginStartWorkerOptions are options for StartWorker on a worker +// plugin. +// +// NOTE: Experimental +type PluginStartWorkerOptions = internal.WorkerPluginStartWorkerOptions + +// PluginStopWorkerOptions are options for StopWorker on a worker plugin. +// +// NOTE: Experimental +type PluginStopWorkerOptions = internal.WorkerPluginStopWorkerOptions + +// PluginConfigureWorkflowReplayerOptions are options for +// ConfigureWorkflowReplayer on a worker plugin. +// +// NOTE: Experimental +type PluginConfigureWorkflowReplayerOptions = internal.WorkerPluginConfigureWorkflowReplayerOptions + +// PluginConfigureWorkflowReplayerRegistryOptions are the set of callbacks +// that can be adjusted by plugins when configuring workflow replayers. If +// adjusting a callback that is already set, implementers may want to take care +// to invoke the existing callback inside their own. +// +// NOTE: Experimental +type PluginConfigureWorkflowReplayerRegistryOptions = internal.WorkerPluginConfigureWorkflowReplayerRegistryOptions + +// PluginReplayWorkflowOptions are options for ReplayWorkflow on a worker +// plugin. +// +// NOTE: Experimental +type PluginReplayWorkflowOptions = internal.WorkerPluginReplayWorkflowOptions