Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions client/plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package client

import "go.temporal.io/sdk/internal"

// Plugin is a plugin that can configure client options and surround client
// creation/connection. Many plugin implementers may prefer the simpler
// [go.temporal.io/sdk/temporal.SimplePlugin] instead.
//
// All client plugins must embed [go.temporal.io/sdk/client.PluginBase]. All
// plugins must implement Name().
//
// All client plugins that also implement [go.temporal.io/sdk/worker.Plugin] are
// automatically configured on workers made from the client.
//
// NOTE: Experimental
type Plugin = internal.ClientPlugin

// PluginBase must be embedded into client plugin implementations.
//
// NOTE: Experimental
type PluginBase = internal.ClientPluginBase

// PluginConfigureClientOptions are options for ConfigureClient on a
// client plugin.
//
// NOTE: Experimental
type PluginConfigureClientOptions = internal.ClientPluginConfigureClientOptions

// PluginNewClientOptions are options for NewClient on a client plugin.
//
// NOTE: Experimental
type PluginNewClientOptions = internal.ClientPluginNewClientOptions
78 changes: 65 additions & 13 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,17 @@ type (

// If set true, error code labels will not be included on request failure metrics.
DisableErrorCodeMetricTags bool

// Plugins that can configure options and intercept client creation.
//
// Any plugins here that also implement worker.Plugin will be used as
// worker plugins as well.
//
// Plugins themselves should never mutate this field, the behavior is
// undefined.
//
// NOTE: Experimental
Plugins []ClientPlugin
}

// HeadersProvider returns a map of gRPC headers that should be used on every request.
Expand Down Expand Up @@ -963,14 +974,17 @@ func NewClient(ctx context.Context, options ClientOptions) (Client, error) {
//
// Exposed as: [go.temporal.io/sdk/client.NewClientFromExistingWithContext]
func NewClientFromExisting(ctx context.Context, existingClient Client, options ClientOptions) (Client, error) {
existing, _ := existingClient.(*WorkflowClient)
if existing == nil {
return nil, fmt.Errorf("existing client must have been created directly from a client package call")
}
return newClient(ctx, options, existing)
return newClient(ctx, options, existingClient)
}

func newClient(ctx context.Context, options ClientOptions, existing *WorkflowClient) (Client, error) {
func newClient(ctx context.Context, options ClientOptions, existing Client) (Client, error) {
// Go over all plugins allowing them to configure the options
for _, plugin := range options.Plugins {
if err := plugin.ConfigureClient(ctx, ClientPluginConfigureClientOptions{ClientOptions: &options}); err != nil {
return nil, err
}
}

if options.Namespace == "" {
options.Namespace = DefaultNamespace
}
Expand All @@ -996,20 +1010,51 @@ func newClient(ctx context.Context, options ClientOptions, existing *WorkflowCli
}
}

// Go over each plugin in reverse, allowing it to wrap connect
var client Client
connect := func(ctx context.Context, options ClientPluginNewClientOptions) (err error) {
client, err = newClientPluginRoot(ctx, options)
return
}
for i := len(options.Plugins) - 1; i >= 0; i-- {
plugin := options.Plugins[i]
next := connect
connect = func(ctx context.Context, options ClientPluginNewClientOptions) error {
return plugin.NewClient(ctx, options, next)
}
}
// Invoke and confirm client was created
if err := connect(ctx, ClientPluginNewClientOptions{
ClientOptions: options,
Lazy: options.ConnectionOptions.disableEagerConnection,
FromExisting: existing,
}); err != nil {
return nil, err
} else if client == nil {
return nil, fmt.Errorf("client plugin did not call next to build the client")
}
return client, nil
}

func newClientPluginRoot(ctx context.Context, options ClientPluginNewClientOptions) (Client, error) {
// Dial or use existing connection
var connection *grpc.ClientConn
var err error
if existing == nil {
options.ConnectionOptions.excludeInternalFromRetry = &atomic.Bool{}
connection, err = dial(newDialParameters(&options, options.ConnectionOptions.excludeInternalFromRetry))
var existing *WorkflowClient
if options.FromExisting == nil {
options.ClientOptions.ConnectionOptions.excludeInternalFromRetry = &atomic.Bool{}
connection, err = dial(newDialParameters(
&options.ClientOptions, options.ClientOptions.ConnectionOptions.excludeInternalFromRetry))
if err != nil {
return nil, err
}
} else {
} else if existing, _ = options.FromExisting.(*WorkflowClient); existing != nil {
connection = existing.conn
} else if options.FromExisting != nil {
return nil, fmt.Errorf("existing client must have been created directly from a client package call")
}

client := NewServiceClient(workflowservice.NewWorkflowServiceClient(connection), connection, options)
client := NewServiceClient(workflowservice.NewWorkflowServiceClient(connection), connection, options.ClientOptions)

// If using existing connection, always load its capabilities and use them for
// the new connection. Otherwise, only load server capabilities eagerly if not
Expand All @@ -1020,7 +1065,7 @@ func newClient(ctx context.Context, options ClientOptions, existing *WorkflowCli
}
client.unclosedClients = existing.unclosedClients
} else {
if !options.ConnectionOptions.disableEagerConnection {
if !options.ClientOptions.ConnectionOptions.disableEagerConnection {
if _, err := client.loadCapabilities(ctx); err != nil {
client.Close()
return nil, err
Expand Down Expand Up @@ -1074,7 +1119,13 @@ func NewServiceClient(workflowServiceClient workflowservice.WorkflowServiceClien
options.ConnectionOptions.GetSystemInfoTimeout = defaultGetSystemInfoTimeout
}

// Collect set of applicable worker interceptors
// Collect set of applicable worker plugins and interceptors
var workerPlugins []WorkerPlugin
for _, plugin := range options.Plugins {
if workerPlugin, _ := plugin.(WorkerPlugin); workerPlugin != nil {
workerPlugins = append(workerPlugins, workerPlugin)
}
}
var workerInterceptors []WorkerInterceptor
for _, interceptor := range options.Interceptors {
if workerInterceptor, _ := interceptor.(WorkerInterceptor); workerInterceptor != nil {
Expand All @@ -1093,6 +1144,7 @@ func NewServiceClient(workflowServiceClient workflowservice.WorkflowServiceClien
dataConverter: options.DataConverter,
failureConverter: options.FailureConverter,
contextPropagators: options.ContextPropagators,
workerPlugins: workerPlugins,
workerInterceptors: workerInterceptors,
excludeInternalFromRetry: options.ConnectionOptions.excludeInternalFromRetry,
eagerDispatcher: &eagerWorkflowDispatcher{
Expand Down
6 changes: 6 additions & 0 deletions internal/cmd/tools/doclink/doclink.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,12 @@ func isValidDefinition(line string, inGroup *string, insideStruct *bool) bool {

// Checks if `line` is a valid definition, and that definition is for `private`
func isValidDefinitionWithMatch(line, private string, inGroup string, insideStruct bool) bool {
// Vars with underscores are often used to assert interface validation and
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

// do not require docs
if strings.HasPrefix(line, "var _") {
return false
}

tokens := strings.Fields(line)
if strings.HasPrefix(line, "func "+private+"(") {
return true
Expand Down
Loading
Loading