Skip to content

Commit c2171d7

Browse files
committed
Merge branch 'master' into mockery-v3
required changing a couple mocks and regenerating a bit, but nothing major
2 parents a8f5caa + 70c5df0 commit c2171d7

File tree

122 files changed

+3268
-1700
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

122 files changed

+3268
-1700
lines changed

.github/dco.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
require:
2+
members: false

.github/workflows/replication-simulation.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,16 @@ jobs:
1414
matrix:
1515
scenario:
1616
- activeactive
17+
- activeactive_same_wfid
18+
- activeactive_same_wfid_signalwithstart
19+
- activeactive_same_wfid_signalwithstart_delayed
1720
- activeactive_cron
1821
- activeactive_regional_failover
1922
- activeactive_regional_failover_start_same_wfid
23+
- activeactive_regional_failover_start_same_wfid_2
2024
- activepassive_to_activeactive
2125
- clusterredirection
26+
- reset
2227
- default
2328

2429
steps:

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ dockerize
3838
/cadence-cassandra-tool
3939
/cadence-sql-tool
4040
/cadence-releaser
41+
/sharddistributor-canary
4142

4243
# SQLite databases
4344
cadence.db*

Makefile

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -515,6 +515,11 @@ cadence-canary: $(BINS_DEPEND_ON)
515515
$Q echo "compiling cadence-canary with OS: $(GOOS), ARCH: $(GOARCH)"
516516
$Q ./scripts/build-with-ldflags.sh -o $@ cmd/canary/main.go
517517

518+
BINS += sharddistributor-canary
519+
sharddistributor-canary: $(BINS_DEPEND_ON)
520+
$Q echo "compiling sharddistributor-canary with OS: $(GOOS), ARCH: $(GOARCH)"
521+
$Q ./scripts/build-with-ldflags.sh -o $@ cmd/sharddistributor-canary/main.go
522+
518523
BINS += cadence-bench
519524
cadence-bench: $(BINS_DEPEND_ON)
520525
$Q echo "compiling cadence-bench with OS: $(GOOS), ARCH: $(GOARCH)"
@@ -828,6 +833,9 @@ start-xdc-cluster2: cadence-server
828833
start-canary: cadence-canary
829834
./cadence-canary start
830835

836+
start-sharddistributor-canary: sharddistributor-canary
837+
./sharddistributor-canary start
838+
831839
start-bench: cadence-bench
832840
./cadence-bench start
833841

cmd/server/go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ require (
2222
github.com/cch123/elasticsql v0.0.0-20190321073543-a1a440758eb9 // indirect
2323
github.com/cristalhq/jwt/v3 v3.1.0 // indirect
2424
github.com/davecgh/go-spew v1.1.1 // indirect
25-
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 // indirect
25+
github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da // indirect
2626
github.com/go-sql-driver/mysql v1.7.1 // indirect
2727
github.com/gocql/gocql v0.0.0-20211015133455-b225f9b53fa1 // indirect
2828
github.com/gogo/protobuf v1.3.2 // indirect
@@ -79,6 +79,7 @@ require (
7979
github.com/coreos/go-semver v0.3.0 // indirect
8080
github.com/coreos/go-systemd/v22 v22.3.2 // indirect
8181
github.com/fatih/color v1.13.0 // indirect
82+
github.com/google/gofuzz v1.0.0 // indirect
8283
github.com/mattn/go-colorable v0.1.9 // indirect
8384
github.com/mattn/go-isatty v0.0.14 // indirect
8485
github.com/ncruces/go-sqlite3 v0.22.0 // indirect

cmd/server/go.sum

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,9 @@ github.com/cristalhq/jwt/v3 v3.1.0/go.mod h1:XOnIXst8ozq/esy5N1XOlSyQqBd+84fxJ99
7878
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
7979
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
8080
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
81-
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 h1:fAjc9m62+UWV/WAFKLNi6ZS0675eEUC9y3AlwSbQu1Y=
8281
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
82+
github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da h1:aIftn67I1fkbMa512G+w+Pxci9hJPB8oMnkcP3iZF38=
83+
github.com/dgryski/go-farm v0.0.0-20240924180020-3414d57e47da/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
8384
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
8485
github.com/eapache/go-resiliency v1.7.0 h1:n3NRTnBn5N0Cbi/IeOHuQn9s2UwVUH7Ga0ZWcP+9JTA=
8586
github.com/eapache/go-resiliency v1.7.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho=
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
package main
2+
3+
import (
4+
"os"
5+
"time"
6+
7+
"github.com/uber-go/tally"
8+
"github.com/urfave/cli/v2"
9+
"go.uber.org/fx"
10+
"go.uber.org/yarpc"
11+
"go.uber.org/yarpc/transport/grpc"
12+
"go.uber.org/zap"
13+
14+
"github.com/uber/cadence/common/clock"
15+
"github.com/uber/cadence/common/log"
16+
"github.com/uber/cadence/service/sharddistributor/canary"
17+
"github.com/uber/cadence/service/sharddistributor/executorclient"
18+
"github.com/uber/cadence/tools/common/commoncli"
19+
)
20+
21+
const (
22+
// Default configuration
23+
defaultShardDistributorEndpoint = "127.0.0.1:7943"
24+
defaultFixedNamespace = "shard-distributor-canary"
25+
defaultEphemeralNamespace = "shard-distributor-canary-ephemeral"
26+
27+
shardDistributorServiceName = "cadence-shard-distributor"
28+
)
29+
30+
func runApp(c *cli.Context) {
31+
endpoint := c.String("endpoint")
32+
fixedNamespace := c.String("fixed-namespace")
33+
ephemeralNamespace := c.String("ephemeral-namespace")
34+
35+
fx.New(opts(fixedNamespace, ephemeralNamespace, endpoint)).Run()
36+
}
37+
38+
func opts(fixedNamespace, ephemeralNamespace, endpoint string) fx.Option {
39+
config := executorclient.Config{
40+
Namespaces: []executorclient.NamespaceConfig{
41+
{Namespace: fixedNamespace, HeartBeatInterval: 1 * time.Second},
42+
{Namespace: ephemeralNamespace, HeartBeatInterval: 1 * time.Second},
43+
},
44+
}
45+
46+
transport := grpc.NewTransport()
47+
yarpcConfig := yarpc.Config{
48+
Name: "shard-distributor-canary",
49+
Outbounds: yarpc.Outbounds{
50+
shardDistributorServiceName: {
51+
Unary: transport.NewSingleOutbound(endpoint),
52+
},
53+
},
54+
}
55+
56+
return fx.Options(
57+
fx.Supply(
58+
fx.Annotate(tally.NoopScope, fx.As(new(tally.Scope))),
59+
fx.Annotate(clock.NewRealTimeSource(), fx.As(new(clock.TimeSource))),
60+
yarpcConfig,
61+
config,
62+
),
63+
fx.Provide(
64+
yarpc.NewDispatcher,
65+
func(d *yarpc.Dispatcher) yarpc.ClientConfig { return d }, // Reprovide the dispatcher as a client config
66+
),
67+
fx.Provide(zap.NewDevelopment),
68+
fx.Provide(log.NewLogger),
69+
70+
// Start the YARPC dispatcher
71+
fx.Invoke(func(lc fx.Lifecycle, dispatcher *yarpc.Dispatcher) {
72+
lc.Append(fx.StartStopHook(dispatcher.Start, dispatcher.Stop))
73+
}),
74+
75+
// Include the canary module
76+
canary.Module(fixedNamespace, ephemeralNamespace, shardDistributorServiceName),
77+
)
78+
}
79+
80+
func buildCLI() *cli.App {
81+
app := cli.NewApp()
82+
app.Name = "sharddistributor-canary"
83+
app.Usage = "Cadence shard distributor canary"
84+
app.Version = "0.0.1"
85+
86+
app.Commands = []*cli.Command{
87+
{
88+
Name: "start",
89+
Usage: "start shard distributor canary",
90+
Flags: []cli.Flag{
91+
&cli.StringFlag{
92+
Name: "endpoint",
93+
Aliases: []string{"e"},
94+
Value: defaultShardDistributorEndpoint,
95+
Usage: "shard distributor endpoint address",
96+
},
97+
&cli.StringFlag{
98+
Name: "fixed-namespace",
99+
Value: defaultFixedNamespace,
100+
Usage: "namespace for fixed shard processing",
101+
},
102+
&cli.StringFlag{
103+
Name: "ephemeral-namespace",
104+
Value: defaultEphemeralNamespace,
105+
Usage: "namespace for ephemeral shard creation testing",
106+
},
107+
},
108+
Action: func(c *cli.Context) error {
109+
runApp(c)
110+
return nil
111+
},
112+
},
113+
}
114+
115+
return app
116+
}
117+
118+
func main() {
119+
app := buildCLI()
120+
commoncli.ExitHandler(app.Run(os.Args))
121+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package main
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
"go.uber.org/fx"
8+
)
9+
10+
func TestDependenciesAreSatisfied(t *testing.T) {
11+
assert.NoError(t, fx.ValidateApp(opts(defaultFixedNamespace, defaultEphemeralNamespace, defaultShardDistributorEndpoint)))
12+
}

common/activecluster/manager.go

Lines changed: 2 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -245,25 +245,7 @@ func (m *managerImpl) LookupNewWorkflow(ctx context.Context, domainID string, po
245245
}, nil
246246
}
247247

248-
if policy.GetStrategy() != types.ActiveClusterSelectionStrategyExternalEntity {
249-
return nil, fmt.Errorf("unsupported active cluster selection strategy: %s", policy.GetStrategy())
250-
}
251-
252-
// find cluster name & failover version of the external entity
253-
externalEntity, err := m.getExternalEntity(ctx, policy.ExternalEntityType, policy.ExternalEntityKey)
254-
if err != nil {
255-
return nil, err
256-
}
257-
cluster, err := m.ClusterNameForFailoverVersion(externalEntity.FailoverVersion, domainID)
258-
if err != nil {
259-
return nil, err
260-
}
261-
262-
return &LookupResult{
263-
Region: externalEntity.Region,
264-
ClusterName: cluster,
265-
FailoverVersion: externalEntity.FailoverVersion,
266-
}, nil
248+
return nil, fmt.Errorf("unsupported active cluster selection strategy: %s", policy.GetStrategy())
267249
}
268250

269251
func (m *managerImpl) LookupWorkflow(ctx context.Context, domainID, wfID, rID string) (res *LookupResult, e error) {
@@ -345,7 +327,7 @@ func (m *managerImpl) LookupWorkflow(ctx context.Context, domainID, wfID, rID st
345327
return nil, err
346328
}
347329

348-
cluster, err := m.ClusterNameForFailoverVersion(externalEntity.FailoverVersion, domainID)
330+
cluster, err := m.clusterMetadata.ClusterNameForFailoverVersion(externalEntity.FailoverVersion)
349331
if err != nil {
350332
return nil, err
351333
}
@@ -398,88 +380,6 @@ func (m *managerImpl) LookupWorkflow(ctx context.Context, domainID, wfID, rID st
398380
}, nil
399381
}
400382

401-
func (m *managerImpl) LookupCluster(ctx context.Context, domainID, clusterName string) (res *LookupResult, e error) {
402-
d, scope, err := m.getDomainAndScope(domainID, LookupClusterOpName)
403-
if err != nil {
404-
return nil, err
405-
}
406-
defer m.handleError(scope, &e, time.Now())
407-
408-
clusterInfo, ok := m.clusterMetadata.GetAllClusterInfo()[clusterName]
409-
if !ok {
410-
return nil, newClusterNotFoundError(clusterName)
411-
}
412-
413-
if !d.GetReplicationConfig().IsActiveActive() {
414-
// Not an active-active domain. return ActiveClusterName from domain entry
415-
m.logger.Debug("LookupCluster: not an active-active domain. returning given clusterName",
416-
tag.WorkflowDomainID(domainID),
417-
tag.ClusterName(clusterName),
418-
)
419-
420-
return &LookupResult{
421-
ClusterName: clusterName,
422-
FailoverVersion: clusterInfo.InitialFailoverVersion,
423-
Region: clusterInfo.Region,
424-
}, nil
425-
}
426-
427-
region := clusterInfo.Region
428-
activeCluster, ok := d.GetReplicationConfig().ActiveClusters.ActiveClustersByRegion[region]
429-
if !ok {
430-
return nil, newRegionNotFoundForDomainError(region, domainID)
431-
}
432-
433-
return &LookupResult{
434-
Region: region,
435-
ClusterName: activeCluster.ActiveClusterName,
436-
FailoverVersion: activeCluster.FailoverVersion,
437-
}, nil
438-
}
439-
440-
func (m *managerImpl) ClusterNameForFailoverVersion(failoverVersion int64, domainID string) (string, error) {
441-
d, err := m.domainIDToDomainFn(domainID)
442-
if err != nil {
443-
return "", err
444-
}
445-
446-
if !d.GetReplicationConfig().IsActiveActive() {
447-
cluster, err := m.clusterMetadata.ClusterNameForFailoverVersion(failoverVersion)
448-
if err != nil {
449-
return "", err
450-
}
451-
return cluster, nil
452-
}
453-
454-
// For active-active domains, the failover version might be mapped to a cluster or a region
455-
// First check if it maps to a cluster
456-
cluster, err := m.clusterMetadata.ClusterNameForFailoverVersion(failoverVersion)
457-
if err == nil {
458-
// failover version belongs to a cluster.
459-
return cluster, nil
460-
}
461-
462-
// Check if it maps to a region.
463-
region, err := m.clusterMetadata.RegionForFailoverVersion(failoverVersion)
464-
if err != nil {
465-
return "", err
466-
}
467-
468-
// Now we know the region, find the cluster in the domain's active cluster list which belongs to the region
469-
cfg, ok := d.GetReplicationConfig().ActiveClusters.ActiveClustersByRegion[region]
470-
if !ok {
471-
return "", newRegionNotFoundForDomainError(region, domainID)
472-
}
473-
474-
allClusters := m.clusterMetadata.GetAllClusterInfo()
475-
_, ok = allClusters[cfg.ActiveClusterName]
476-
if !ok {
477-
return "", newClusterNotFoundForRegionError(cfg.ActiveClusterName, region)
478-
}
479-
480-
return cfg.ActiveClusterName, nil
481-
}
482-
483383
func (m *managerImpl) RegisterChangeCallback(shardID int, callback func(ChangeType)) {
484384
m.changeCallbacksLock.Lock()
485385
defer m.changeCallbacksLock.Unlock()

common/activecluster/manager_mock.go

Lines changed: 0 additions & 30 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)