Skip to content

Commit 05fe19c

Browse files
authored
Add multi-namespace support to shard distributor executor client (#7236)
What changed? Added multi-namespace support to the shard distributor executor client: Refactored Config to support multiple NamespaceConfig entries instead of a single namespace Added NewExecutorWithNamespace() function for explicit namespace selection Updated NewExecutor() to auto-select when only one namespace is configured Added ModuleWithNamespace() for creating namespace-specific executor modules Added config validation with proper error handling Updated development.yaml with ephemeral shard distributor configuration Why? Enables running multiple shard distributor executors for different namespaces within the same service. How did you test it? Added unit tests for both single and multi-namespace scenarios Potential risks Breaking change to Config struct (requires config migration) - still prototyping Release notes Documentation Changes
1 parent e90dcc2 commit 05fe19c

File tree

4 files changed

+167
-7
lines changed

4 files changed

+167
-7
lines changed

config/development.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,8 @@ shardDistribution:
158158
- name: shard-distributor-canary
159159
type: fixed
160160
shardNum: 32
161+
- name: shard-distributor-canary-ephemeral
162+
type: ephemeral
161163
leaderStore:
162164
storageParams:
163165
endpoints: [localhost:2379]

service/sharddistributor/executorclient/client.go

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,39 @@ type Params[SP ShardProcessor] struct {
5454
TimeSource clock.TimeSource
5555
}
5656

57+
// NewExecutorWithNamespace creates an executor for a specific namespace
58+
func NewExecutorWithNamespace[SP ShardProcessor](params Params[SP], namespace string) (Executor[SP], error) {
59+
// Validate the config first
60+
if err := params.Config.Validate(); err != nil {
61+
return nil, fmt.Errorf("invalid config: %w", err)
62+
}
63+
64+
// Get config for the specified namespace
65+
namespaceConfig, err := params.Config.GetConfigForNamespace(namespace)
66+
if err != nil {
67+
return nil, fmt.Errorf("get config for namespace %s: %w", namespace, err)
68+
}
69+
70+
return newExecutorWithConfig(params, namespaceConfig)
71+
}
72+
73+
// NewExecutor creates an executor using auto-selection (single namespace only)
5774
func NewExecutor[SP ShardProcessor](params Params[SP]) (Executor[SP], error) {
75+
// Validate the config first
76+
if err := params.Config.Validate(); err != nil {
77+
return nil, fmt.Errorf("invalid config: %w", err)
78+
}
79+
80+
// Auto-select if there's only one namespace
81+
namespaceConfig, err := params.Config.GetSingleConfig()
82+
if err != nil {
83+
return nil, fmt.Errorf("auto-select namespace: %w", err)
84+
}
85+
86+
return newExecutorWithConfig(params, namespaceConfig)
87+
}
88+
89+
func newExecutorWithConfig[SP ShardProcessor](params Params[SP], namespaceConfig *NamespaceConfig) (Executor[SP], error) {
5890
shardDistributorClient, err := createShardDistributorExecutorClient(params.YarpcClient, params.MetricsScope, params.Logger)
5991
if err != nil {
6092
return nil, fmt.Errorf("create shard distributor executor client: %w", err)
@@ -65,15 +97,15 @@ func NewExecutor[SP ShardProcessor](params Params[SP]) (Executor[SP], error) {
6597

6698
metricsScope := params.MetricsScope.Tagged(map[string]string{
6799
metrics.OperationTagName: metricsconstants.ShardDistributorExecutorOperationTagName,
68-
"namespace": params.Config.Namespace,
100+
"namespace": namespaceConfig.Namespace,
69101
})
70102

71103
return &executorImpl[SP]{
72104
logger: params.Logger,
73105
shardDistributorClient: shardDistributorClient,
74106
shardProcessorFactory: params.ShardProcessorFactory,
75-
heartBeatInterval: params.Config.HeartBeatInterval,
76-
namespace: params.Config.Namespace,
107+
heartBeatInterval: namespaceConfig.HeartBeatInterval,
108+
namespace: namespaceConfig.Namespace,
77109
executorID: executorID,
78110
timeSource: params.TimeSource,
79111
stopC: make(chan struct{}),
@@ -101,3 +133,15 @@ func Module[SP ShardProcessor]() fx.Option {
101133
}),
102134
)
103135
}
136+
137+
// ModuleWithNamespace creates an executor module for a specific namespace
138+
func ModuleWithNamespace[SP ShardProcessor](namespace string) fx.Option {
139+
return fx.Module(fmt.Sprintf("shard-distributor-executor-client-%s", namespace),
140+
fx.Provide(func(params Params[SP]) (Executor[SP], error) {
141+
return NewExecutorWithNamespace(params, namespace)
142+
}),
143+
fx.Invoke(func(executor Executor[SP], lc fx.Lifecycle) {
144+
lc.Append(fx.StartStopHook(executor.Start, executor.Stop))
145+
}),
146+
)
147+
}

service/sharddistributor/executorclient/client_test.go

Lines changed: 64 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,12 @@ func TestModule(t *testing.T) {
3737

3838
// Example config
3939
config := Config{
40-
Namespace: "test-namespace",
41-
HeartBeatInterval: 5 * time.Second,
40+
Namespaces: []NamespaceConfig{
41+
{
42+
Namespace: "test-namespace",
43+
HeartBeatInterval: 5 * time.Second,
44+
},
45+
},
4246
}
4347

4448
// Create a test app with the library, check that it starts and stops
@@ -54,3 +58,61 @@ func TestModule(t *testing.T) {
5458
Module[*MockShardProcessor](),
5559
).RequireStart().RequireStop()
5660
}
61+
62+
// Create distinct mock processor types for testing multiple namespaces
63+
type MockShardProcessor1 struct {
64+
*MockShardProcessor
65+
}
66+
67+
type MockShardProcessor2 struct {
68+
*MockShardProcessor
69+
}
70+
71+
func TestModuleWithNamespace(t *testing.T) {
72+
// Create mocks
73+
ctrl := gomock.NewController(t)
74+
uberCtrl := uber_gomock.NewController(t)
75+
mockLogger := log.NewNoop()
76+
77+
mockFactory1 := NewMockShardProcessorFactory[*MockShardProcessor1](uberCtrl)
78+
mockFactory2 := NewMockShardProcessorFactory[*MockShardProcessor2](uberCtrl)
79+
80+
// Create shard distributor yarpc client
81+
outbound := grpc.NewTransport().NewOutbound(yarpctest.NewFakePeerList())
82+
83+
mockClientConfig := transporttest.NewMockClientConfig(ctrl)
84+
mockClientConfig.EXPECT().Caller().Return("test-executor").AnyTimes()
85+
mockClientConfig.EXPECT().Service().Return("shard-distributor").AnyTimes()
86+
mockClientConfig.EXPECT().GetUnaryOutbound().Return(outbound).AnyTimes()
87+
yarpcClient := sharddistributorv1.NewShardDistributorExecutorAPIYARPCClient(mockClientConfig)
88+
89+
// Multi-namespace config
90+
config := Config{
91+
Namespaces: []NamespaceConfig{
92+
{
93+
Namespace: "namespace1",
94+
HeartBeatInterval: 5 * time.Second,
95+
},
96+
{
97+
Namespace: "namespace2",
98+
HeartBeatInterval: 10 * time.Second,
99+
},
100+
},
101+
}
102+
103+
// Create a test app with two namespace-specific modules using different processor types
104+
fxtest.New(t,
105+
fx.Supply(
106+
fx.Annotate(yarpcClient, fx.As(new(sharddistributorv1.ShardDistributorExecutorAPIYARPCClient))),
107+
fx.Annotate(tally.NoopScope, fx.As(new(tally.Scope))),
108+
fx.Annotate(mockLogger, fx.As(new(log.Logger))),
109+
fx.Annotate(clock.NewMockedTimeSource(), fx.As(new(clock.TimeSource))),
110+
fx.Annotate(mockFactory1, fx.As(new(ShardProcessorFactory[*MockShardProcessor1]))),
111+
fx.Annotate(mockFactory2, fx.As(new(ShardProcessorFactory[*MockShardProcessor2]))),
112+
config,
113+
),
114+
// Two namespace-specific modules with different processor types
115+
ModuleWithNamespace[*MockShardProcessor1]("namespace1"),
116+
ModuleWithNamespace[*MockShardProcessor2]("namespace2"),
117+
).RequireStart().RequireStop()
118+
}
Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,60 @@
11
package executorclient
22

3-
import "time"
3+
import (
4+
"fmt"
5+
"time"
6+
)
47

5-
type Config struct {
8+
// NamespaceConfig represents configuration for a single namespace
9+
type NamespaceConfig struct {
610
Namespace string `yaml:"namespace"`
711
HeartBeatInterval time.Duration `yaml:"heartbeat_interval"`
812
}
13+
14+
// Config represents configuration for multiple namespaces
15+
type Config struct {
16+
Namespaces []NamespaceConfig `yaml:"namespaces"`
17+
}
18+
19+
// GetConfigForNamespace returns the config for a specific namespace
20+
func (c *Config) GetConfigForNamespace(namespace string) (*NamespaceConfig, error) {
21+
for _, ns := range c.Namespaces {
22+
if ns.Namespace == namespace {
23+
return &ns, nil
24+
}
25+
}
26+
return nil, fmt.Errorf("namespace %s not found in config", namespace)
27+
}
28+
29+
// GetSingleConfig returns the config if there's exactly one namespace, otherwise returns an error
30+
func (c *Config) GetSingleConfig() (*NamespaceConfig, error) {
31+
if len(c.Namespaces) == 0 {
32+
return nil, fmt.Errorf("no namespaces configured")
33+
}
34+
if len(c.Namespaces) > 1 {
35+
return nil, fmt.Errorf("multiple namespaces configured (%d), must specify which namespace to use", len(c.Namespaces))
36+
}
37+
return &c.Namespaces[0], nil
38+
}
39+
40+
// Validate validates the configuration
41+
func (c *Config) Validate() error {
42+
if len(c.Namespaces) == 0 {
43+
return fmt.Errorf("at least one namespace must be configured")
44+
}
45+
46+
seenNamespaces := make(map[string]bool)
47+
for i, ns := range c.Namespaces {
48+
if ns.Namespace == "" {
49+
return fmt.Errorf("namespace %d: namespace name cannot be empty", i)
50+
}
51+
if ns.HeartBeatInterval <= 0 {
52+
return fmt.Errorf("namespace %d (%s): heartbeat_interval must be greater than 0", i, ns.Namespace)
53+
}
54+
if seenNamespaces[ns.Namespace] {
55+
return fmt.Errorf("duplicate namespace: %s", ns.Namespace)
56+
}
57+
seenNamespaces[ns.Namespace] = true
58+
}
59+
return nil
60+
}

0 commit comments

Comments
 (0)