-
I'm using cadence helm deploy cadence service. go SDK use goroutine concurrently start many workflow worker and activity worker. But the Some instance use many memory but other use very less memory. Here's client code for start dispatcher func NewYARPCDispatcher(hostPort string) (*yarpc.Dispatcher, error) {
channelTransport, err := tchannel.NewChannelTransport(tchannel.ServiceName("cadence-client"))
if err != nil {
return nil, err
}
dispatcher := yarpc.NewDispatcher(yarpc.Config{
Name: "cadence-client",
Outbounds: yarpc.Outbounds{
"cadence-frontend": {
Unary: channelTransport.NewSingleOutbound(hostPort),
},
},
})
if err := dispatcher.Start(); err != nil {
dispatcher.Stop()
return nil, err
}
return dispatcher, err
}
func NewWorkflowServiceClient(dispatcher *yarpc.Dispatcher) workflowserviceclient.Interface {
return workflowserviceclient.New(dispatcher.ClientConfig( "cadence-frontend"))
} Question: Is the |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment
-
For older version users (i.e. using tchanncel) The cadence-samples codes give the basic yarpc dispatch construct using func (b *WorkflowClientBuilder) build() error {
if b.dispatcher != nil {
return nil
}
if len(b.hostPort) == 0 {
return errors.New("HostPort is empty")
}
ch, err := tchannel.NewChannelTransport(
tchannel.ServiceName(_cadenceClientName))
if err != nil {
b.Logger.Fatal("Failed to create transport channel", zap.Error(err))
}
b.Logger.Debug("Creating RPC dispatcher outbound",
zap.String("ServiceName", _cadenceFrontendService),
zap.String("HostPort", b.hostPort))
b.dispatcher = yarpc.NewDispatcher(yarpc.Config{
Name: _cadenceClientName,
Outbounds: yarpc.Outbounds{
_cadenceFrontendService: {Unary: ch.NewSingleOutbound(b.hostPort)},
},
})
if b.dispatcher != nil {
if err := b.dispatcher.Start(); err != nil {
b.Logger.Fatal("Failed to create outbound transport channel: %v", zap.Error(err))
}
}
return nil
} But when you try to start many worker in one service in such dispatcher Outbound will cause all your poll request route to same ip (using k8s service). So client load balancer is required. Due to func (p *dnsDispatcherProvider) GetTChannel(serviceName string, address string, options *DispatcherOptions) (*yarpc.Dispatcher, error) {
tchanTransport, err := tchannel.NewTransport(
tchannel.ServiceName(serviceName),
// this aim to get rid of the annoying popup about accepting incoming network connections
tchannel.ListenAddr("127.0.0.1:0"),
)
if err != nil {
return nil, err
}
peerList := roundrobin.New(tchanTransport)
peerListUpdater, err := newDNSUpdater(peerList, address, p.interval, p.logger)
if err != nil {
return nil, err
}
peerListUpdater.Start()
outbound := tchanTransport.NewOutbound(peerList)
p.logger.Info("Creating TChannel dispatcher outbound", tag.Address(address))
return p.createOutboundDispatcher(serviceName, outbound, options)
} The code sample use |
Beta Was this translation helpful? Give feedback.
For older version users (i.e. using tchanncel)
The cadence-samples codes give the basic yarpc dispatch construct using
tchannel.NewChannelTransport
dispatcher