diff --git a/cdc/puller/multiplexing_puller.go b/cdc/puller/multiplexing_puller.go new file mode 100644 index 00000000000..a7821a04b95 --- /dev/null +++ b/cdc/puller/multiplexing_puller.go @@ -0,0 +1,517 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package puller + +import ( + "context" + "sync" + "sync/atomic" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/kv" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/processor/tablepb" + "github.com/pingcap/tiflow/cdc/puller/frontier" + "github.com/pingcap/tiflow/pkg/pdutil" + "github.com/pingcap/tiflow/pkg/spanz" + "github.com/prometheus/client_golang/prometheus" + "github.com/tikv/client-go/v2/oracle" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" +) + +const ( + resolveLockFence time.Duration = 4 * time.Second + resolveLockTickInterval time.Duration = 2 * time.Second + + // Suppose there are 50K tables, total size of `resolvedEventsCache`s will be + // unsafe.SizeOf(kv.MultiplexingEvent) * 50K * 256 = 800M. + tableResolvedTsBufferSize int = 256 + + defaultPullerOutputChanSize = 128 + + inputChSize = 1024 + tableProgressAdvanceChSize = 128 +) + +type tableProgress struct { + changefeed model.ChangeFeedID + client *kv.SharedClient + spans []tablepb.Span + subscriptionIDs []kv.SubscriptionID + startTs model.Ts + tableName string + + initialized atomic.Bool + resolvedTsUpdated atomic.Int64 + resolvedTs atomic.Uint64 + maxIngressResolvedTs atomic.Uint64 + + resolvedEventsCache chan kv.MultiplexingEvent + tsTracker frontier.Frontier + + consume struct { + // This lock is used to prevent the table progress from being + // removed while consuming events. + sync.RWMutex + removed bool + f func(context.Context, *model.RawKVEntry, []tablepb.Span) error + } + + scheduled atomic.Bool + start time.Time +} + +func (p *tableProgress) handleResolvedSpans(ctx context.Context, e *model.ResolvedSpans) (err error) { + for _, resolvedSpan := range e.Spans { + if !spanz.IsSubSpan(resolvedSpan.Span, p.spans...) { + log.Panic("the resolved span is not in the table spans", + zap.String("namespace", p.changefeed.Namespace), + zap.String("changefeed", p.changefeed.ID), + zap.String("tableName", p.tableName), + zap.Any("spans", p.spans)) + } + p.tsTracker.Forward(resolvedSpan.Region, resolvedSpan.Span, e.ResolvedTs) + if e.ResolvedTs > p.maxIngressResolvedTs.Load() { + p.maxIngressResolvedTs.Store(e.ResolvedTs) + } + } + resolvedTs := p.tsTracker.Frontier() + + if resolvedTs > 0 && p.initialized.CompareAndSwap(false, true) { + log.Info("puller is initialized", + zap.String("namespace", p.changefeed.Namespace), + zap.String("changefeed", p.changefeed.ID), + zap.String("tableName", p.tableName), + zap.Any("tableID", p.spans), + zap.Uint64("resolvedTs", resolvedTs), + zap.Duration("duration", time.Since(p.start)), + ) + } + if resolvedTs > p.resolvedTs.Load() { + p.resolvedTs.Store(resolvedTs) + p.resolvedTsUpdated.Store(time.Now().Unix()) + raw := &model.RawKVEntry{CRTs: resolvedTs, OpType: model.OpTypeResolved} + err = p.consume.f(ctx, raw, p.spans) + } + + return +} + +func (p *tableProgress) resolveLock(currentTime time.Time) { + resolvedTsUpdated := time.Unix(p.resolvedTsUpdated.Load(), 0) + if !p.initialized.Load() || time.Since(resolvedTsUpdated) < resolveLockFence { + return + } + resolvedTs := p.resolvedTs.Load() + resolvedTime := oracle.GetTimeFromTS(resolvedTs) + if currentTime.Sub(resolvedTime) < resolveLockFence { + return + } + + targetTs := oracle.GoTimeToTS(resolvedTime.Add(resolveLockFence)) + for _, subID := range p.subscriptionIDs { + p.client.ResolveLock(subID, targetTs) + } +} + +type subscription struct { + *tableProgress + subID kv.SubscriptionID +} + +// MultiplexingPuller works with `kv.SharedClient`. All tables share resources. +type MultiplexingPuller struct { + changefeed model.ChangeFeedID + client *kv.SharedClient + pdClock pdutil.Clock + consume func(context.Context, *model.RawKVEntry, []tablepb.Span, model.ShouldSplitKVEntry) error + // inputChannelIndexer is used to determine which input channel to use for a given span. + inputChannelIndexer func(span tablepb.Span, workerCount int) int + + // inputChs is used to collect events from client. + inputChs []chan kv.MultiplexingEvent + // tableProgressAdvanceCh is used to notify the tableAdvancer goroutine + // to advance the progress of a table. + tableProgressAdvanceCh chan *tableProgress + + // NOTE: A tableProgress can have multiple subscription, all of them share + // the same tableProgress. So, we use two maps to store the relationshipxxx + // between subscription and tableProgress. + subscriptions struct { + sync.RWMutex + // m map subscriptionID -> tableProgress, used to cache and + // get tableProgress by subscriptionID quickly. + m map[kv.SubscriptionID]*tableProgress + // n map span -> subscription, used to cache and get subscription by span quickly. + n *spanz.HashMap[subscription] + } + + resolvedTsAdvancerCount int + + CounterKv prometheus.Counter + CounterResolved prometheus.Counter + CounterResolvedDropped prometheus.Counter + queueKvDuration prometheus.Observer + queueResolvedDuration prometheus.Observer +} + +// NewMultiplexingPuller creates a MultiplexingPuller. +// `workerCount` specifies how many workers will be spawned to handle events from kv client. +// `frontierCount` specifies how many workers will be spawned to handle resolvedTs event. +func NewMultiplexingPuller( + changefeed model.ChangeFeedID, + client *kv.SharedClient, + pdClock pdutil.Clock, + consume func(context.Context, *model.RawKVEntry, []tablepb.Span, model.ShouldSplitKVEntry) error, + workerCount int, + inputChannelIndexer func(tablepb.Span, int) int, + resolvedTsAdvancerCount int, +) *MultiplexingPuller { + mpuller := &MultiplexingPuller{ + changefeed: changefeed, + client: client, + pdClock: pdClock, + consume: consume, + inputChannelIndexer: inputChannelIndexer, + resolvedTsAdvancerCount: resolvedTsAdvancerCount, + tableProgressAdvanceCh: make(chan *tableProgress, tableProgressAdvanceChSize), + } + mpuller.subscriptions.m = make(map[kv.SubscriptionID]*tableProgress) + mpuller.subscriptions.n = spanz.NewHashMap[subscription]() + + mpuller.inputChs = make([]chan kv.MultiplexingEvent, 0, workerCount) + for i := 0; i < workerCount; i++ { + mpuller.inputChs = append(mpuller.inputChs, make(chan kv.MultiplexingEvent, inputChSize)) + } + return mpuller +} + +// Subscribe some spans. They will share one same resolved timestamp progress. +func (p *MultiplexingPuller) Subscribe( + spans []tablepb.Span, + startTs model.Ts, + tableName string, + shouldSplitKVEntry model.ShouldSplitKVEntry, +) { + p.subscriptions.Lock() + defer p.subscriptions.Unlock() + p.subscribe(spans, startTs, tableName, shouldSplitKVEntry) +} + +func (p *MultiplexingPuller) subscribe( + spans []tablepb.Span, + startTs model.Ts, + tableName string, + shouldSplitKVEntry model.ShouldSplitKVEntry, +) { + for _, span := range spans { + // Base on the current design, a MultiplexingPuller is only used for one changefeed. + // So, one span can only be subscribed once. + if _, exists := p.subscriptions.n.Get(span); exists { + log.Panic("redundant subscription", + zap.String("namespace", p.changefeed.Namespace), + zap.String("changefeed", p.changefeed.ID), + zap.String("span", span.String())) + } + } + + // Create a new table progress for the spans. + progress := &tableProgress{ + changefeed: p.changefeed, + client: p.client, + spans: spans, + subscriptionIDs: make([]kv.SubscriptionID, len(spans)), + startTs: startTs, + tableName: tableName, + + resolvedEventsCache: make(chan kv.MultiplexingEvent, tableResolvedTsBufferSize), + tsTracker: frontier.NewFrontier(0, spans...), + start: time.Now(), + } + + progress.consume.f = func( + ctx context.Context, + raw *model.RawKVEntry, + spans []tablepb.Span, + ) error { + progress.consume.RLock() + defer progress.consume.RUnlock() + if !progress.consume.removed { + return p.consume(ctx, raw, spans, shouldSplitKVEntry) + } + return nil + } + + for i, span := range spans { + subID := p.client.AllocSubscriptionID() + progress.subscriptionIDs[i] = subID + + p.subscriptions.m[subID] = progress + p.subscriptions.n.ReplaceOrInsert(span, subscription{progress, subID}) + + slot := p.inputChannelIndexer(span, len(p.inputChs)) + p.client.Subscribe(subID, span, startTs, p.inputChs[slot]) + } + + progress.initialized.Store(false) + progress.resolvedTsUpdated.Store(time.Now().Unix()) +} + +// Unsubscribe some spans, which must be subscribed in one call. +func (p *MultiplexingPuller) Unsubscribe(spans []tablepb.Span) { + p.subscriptions.Lock() + defer p.subscriptions.Unlock() + p.unsubscribe(spans) +} + +func (p *MultiplexingPuller) unsubscribe(spans []tablepb.Span) { + var progress *tableProgress + for _, span := range spans { + if prog, exists := p.subscriptions.n.Get(span); exists { + if prog.tableProgress != progress && progress != nil { + log.Panic("unsubscribe spans not in one subscription", + zap.String("namespace", p.changefeed.Namespace), + zap.String("changefeed", p.changefeed.ID)) + } + progress = prog.tableProgress + } else { + log.Panic("unexist unsubscription", + zap.String("namespace", p.changefeed.Namespace), + zap.String("changefeed", p.changefeed.ID), + zap.String("span", span.String())) + } + } + if len(progress.spans) != len(spans) { + log.Panic("unsubscribe spans not same with subscription", + zap.String("namespace", p.changefeed.Namespace), + zap.String("changefeed", p.changefeed.ID)) + } + + progress.consume.Lock() + progress.consume.removed = true + progress.consume.Unlock() + for i, span := range progress.spans { + p.client.Unsubscribe(progress.subscriptionIDs[i]) + delete(p.subscriptions.m, progress.subscriptionIDs[i]) + p.subscriptions.n.Delete(span) + } +} + +// Run the puller. +func (p *MultiplexingPuller) Run(ctx context.Context) (err error) { + return p.run(ctx, true) +} + +func (p *MultiplexingPuller) run(ctx context.Context, includeClient bool) error { + p.CounterKv = PullerEventCounter.WithLabelValues(p.changefeed.Namespace, p.changefeed.ID, "kv") + p.CounterResolved = PullerEventCounter.WithLabelValues(p.changefeed.Namespace, p.changefeed.ID, "resolved") + p.CounterResolvedDropped = PullerEventCounter.WithLabelValues(p.changefeed.Namespace, p.changefeed.ID, "resolved-dropped") + p.queueKvDuration = pullerQueueDuration.WithLabelValues(p.changefeed.Namespace, p.changefeed.ID, "kv") + p.queueResolvedDuration = pullerQueueDuration.WithLabelValues(p.changefeed.Namespace, p.changefeed.ID, "resolved") + defer func() { + PullerEventCounter.DeleteLabelValues(p.changefeed.Namespace, p.changefeed.ID, "kv") + PullerEventCounter.DeleteLabelValues(p.changefeed.Namespace, p.changefeed.ID, "resolved") + PullerEventCounter.DeleteLabelValues(p.changefeed.Namespace, p.changefeed.ID, "resolved-dropped") + pullerQueueDuration.DeleteLabelValues(p.changefeed.Namespace, p.changefeed.ID, "kv") + pullerQueueDuration.DeleteLabelValues(p.changefeed.Namespace, p.changefeed.ID, "resolved") + log.Info("MultiplexingPuller exits", + zap.String("namespace", p.changefeed.Namespace), + zap.String("changefeed", p.changefeed.ID)) + }() + + eg, ctx := errgroup.WithContext(ctx) + + // Only !includeClient in tests. + if includeClient { + eg.Go(func() error { return p.client.Run(ctx) }) + } + + // Start workers to handle events received from kv client. + for i := range p.inputChs { + inputCh := p.inputChs[i] + eg.Go(func() error { return p.runEventHandler(ctx, inputCh) }) + } + + // Start workers to check and resolve stale locks. + eg.Go(func() error { return p.runResolveLockChecker(ctx) }) + + for i := 0; i < p.resolvedTsAdvancerCount; i++ { + eg.Go(func() error { return p.runResolvedTsAdvancer(ctx) }) + } + + log.Info("MultiplexingPuller starts", + zap.String("namespace", p.changefeed.Namespace), + zap.String("changefeed", p.changefeed.ID), + zap.Int("workerConcurrent", len(p.inputChs)), + zap.Int("frontierConcurrent", p.resolvedTsAdvancerCount)) + return eg.Wait() +} + +// runEventHandler consumes events from inputCh: +// 1. If the event is a kv event, consume by calling progress.consume.f. +// 2. If the event is a resolved event, send it to the resolvedEventsCache of the corresponding progress. +func (p *MultiplexingPuller) runEventHandler(ctx context.Context, inputCh <-chan kv.MultiplexingEvent) error { + for { + var e kv.MultiplexingEvent + select { + case <-ctx.Done(): + return ctx.Err() + case e = <-inputCh: + } + + progress := p.getProgress(e.SubscriptionID) + // There is a chance that some stale events are received after + // the subscription is removed. We can just ignore them. + if progress == nil { + continue + } + + if e.Val != nil { + p.queueKvDuration.Observe(float64(time.Since(e.Start).Milliseconds())) + p.CounterKv.Inc() + if err := progress.consume.f(ctx, e.Val, progress.spans); err != nil { + return errors.Trace(err) + } + } else if e.Resolved != nil { + p.CounterResolved.Add(float64(len(e.Resolved.Spans))) + select { + case <-ctx.Done(): + return ctx.Err() + case progress.resolvedEventsCache <- e: + p.schedule(ctx, progress) + default: + p.CounterResolvedDropped.Add(float64(len(e.Resolved.Spans))) + } + } + } +} + +func (p *MultiplexingPuller) getProgress(subID kv.SubscriptionID) *tableProgress { + p.subscriptions.RLock() + defer p.subscriptions.RUnlock() + return p.subscriptions.m[subID] +} + +func (p *MultiplexingPuller) getAllProgresses() map[*tableProgress]struct{} { + p.subscriptions.RLock() + defer p.subscriptions.RUnlock() + hashset := make(map[*tableProgress]struct{}, len(p.subscriptions.m)) + for _, value := range p.subscriptions.m { + hashset[value] = struct{}{} + } + return hashset +} + +func (p *MultiplexingPuller) schedule(ctx context.Context, progress *tableProgress) { + if progress.scheduled.CompareAndSwap(false, true) { + select { + case <-ctx.Done(): + case p.tableProgressAdvanceCh <- progress: + } + } +} + +// runResolvedTsAdvancer receives tableProgress from tableProgressAdvanceCh +// and advances the resolvedTs of the tableProgress. +func (p *MultiplexingPuller) runResolvedTsAdvancer(ctx context.Context) error { + advanceTableProgress := func(ctx context.Context, progress *tableProgress) error { + defer func() { + progress.scheduled.Store(false) + // Schedule the progress again if there are still events in the cache. + if len(progress.resolvedEventsCache) > 0 { + p.schedule(ctx, progress) + } + }() + + var event kv.MultiplexingEvent + var spans *model.ResolvedSpans + for i := 0; i < 128; i++ { + select { + case <-ctx.Done(): + return ctx.Err() + case event = <-progress.resolvedEventsCache: + spans = event.RegionFeedEvent.Resolved + default: + return nil + } + p.queueResolvedDuration.Observe(float64(time.Since(event.Start).Milliseconds())) + if err := progress.handleResolvedSpans(ctx, spans); err != nil { + return errors.Trace(err) + } + } + return nil + } + + var progress *tableProgress + for { + select { + case <-ctx.Done(): + return ctx.Err() + case progress = <-p.tableProgressAdvanceCh: + if err := advanceTableProgress(ctx, progress); err != nil { + return errors.Trace(err) + } + } + } +} + +func (p *MultiplexingPuller) runResolveLockChecker(ctx context.Context) error { + resolveLockTicker := time.NewTicker(resolveLockTickInterval) + defer resolveLockTicker.Stop() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-resolveLockTicker.C: + } + currentTime := p.pdClock.CurrentTime() + for progress := range p.getAllProgresses() { + select { + case <-ctx.Done(): + return ctx.Err() + default: + progress.resolveLock(currentTime) + } + } + } +} + +// Stats of a puller. +type Stats struct { + RegionCount uint64 + CheckpointTsIngress model.Ts + ResolvedTsIngress model.Ts + CheckpointTsEgress model.Ts + ResolvedTsEgress model.Ts +} + +// Stats returns Stats. +func (p *MultiplexingPuller) Stats(span tablepb.Span) Stats { + p.subscriptions.RLock() + progress := p.subscriptions.n.GetV(span) + p.subscriptions.RUnlock() + if progress.tableProgress == nil { + return Stats{} + } + return Stats{ + RegionCount: p.client.RegionCount(progress.subID), + ResolvedTsIngress: progress.maxIngressResolvedTs.Load(), + CheckpointTsIngress: progress.maxIngressResolvedTs.Load(), + ResolvedTsEgress: progress.resolvedTs.Load(), + CheckpointTsEgress: progress.resolvedTs.Load(), + } +} diff --git a/cdc/scheduler/internal/v3/agent/table.go b/cdc/scheduler/internal/v3/agent/table.go index efc4e8dc4cd..e4b2f1a23f3 100644 --- a/cdc/scheduler/internal/v3/agent/table.go +++ b/cdc/scheduler/internal/v3/agent/table.go @@ -271,7 +271,7 @@ func (t *table) injectDispatchTableTask(task *dispatchTableTask) { t.task = task return } - log.Debug("schedulerv3: table inject dispatch table task ignored,"+ + log.Warn("schedulerv3: table inject dispatch table task ignored,"+ "since there is one not finished yet", zap.String("namespace", t.changefeedID.Namespace), zap.String("changefeed", t.changefeedID.ID), diff --git a/cdc/scheduler/internal/v3/transport/transport.go b/cdc/scheduler/internal/v3/transport/transport.go index b12ff441677..ade03146df9 100644 --- a/cdc/scheduler/internal/v3/transport/transport.go +++ b/cdc/scheduler/internal/v3/transport/transport.go @@ -65,6 +65,10 @@ type p2pTransport struct { // FIXME it's an unbounded buffer, and may cause OOM! msgBuf []*schedulepb.Message } + lastPrintTime time.Time + ignoreCount int64 + totalMsg int64 + role Role } // Role of the transport user. @@ -89,6 +93,7 @@ func NewTransport( peerTopic: peerTopic, messageServer: server, messageRouter: router, + role: role, } var err error trans.errCh, err = trans.messageServer.SyncAddHandler( @@ -112,6 +117,7 @@ func NewTransport( func (t *p2pTransport) Send( ctx context.Context, msgs []*schedulepb.Message, ) error { + t.totalMsg += int64(len(msgs)) for i := range msgs { value := msgs[i] to := value.To @@ -127,6 +133,18 @@ func (t *p2pTransport) Send( _, err := client.TrySendMessage(ctx, t.peerTopic, value) if err != nil { if cerror.ErrPeerMessageSendTryAgain.Equal(err) { + t.ignoreCount++ + if time.Since(t.lastPrintTime) > 30*time.Second { + log.Warn("schedulerv3: message send failed since ignored, retry later", + zap.String("namespace", t.changefeed.Namespace), + zap.String("changefeed", t.changefeed.ID), + zap.String("to", to), + zap.Int64("ignoreCount", t.ignoreCount), + zap.Int64("totalMsg", t.totalMsg), + zap.Float64("ignoreRate", float64(t.ignoreCount)/float64(t.totalMsg)), + zap.String("role", string(t.role)), + ) + } return nil } if cerror.ErrPeerMessageClientClosed.Equal(err) { diff --git a/pkg/cmd/server/server_test.go b/pkg/cmd/server/server_test.go index 4b3feaf2e15..27e8dbab9c6 100644 --- a/pkg/cmd/server/server_test.go +++ b/pkg/cmd/server/server_test.go @@ -196,12 +196,12 @@ func TestParseCfg(t *testing.T) { // We expect the default configuration here. Messages: &config.MessagesConfig{ ClientMaxBatchInterval: config.TomlDuration(time.Millisecond * 10), - ClientMaxBatchSize: 8 * 1024 * 1024, - ClientMaxBatchCount: 128, + ClientMaxBatchSize: 64 * 1024 * 1024, + ClientMaxBatchCount: 1024, ClientRetryRateLimit: 1.0, ServerMaxPendingMessageCount: 102400, ServerAckInterval: config.TomlDuration(time.Millisecond * 100), - ServerWorkerPoolSize: 4, + ServerWorkerPoolSize: 8, MaxRecvMsgSize: 256 * 1024 * 1024, KeepAliveTimeout: config.TomlDuration(time.Second * 10), KeepAliveTime: config.TomlDuration(time.Second * 30), @@ -510,12 +510,12 @@ cert-allowed-cn = ["dd","ee"] // We expect the default configuration here. Messages: &config.MessagesConfig{ ClientMaxBatchInterval: config.TomlDuration(time.Millisecond * 10), - ClientMaxBatchSize: 8 * 1024 * 1024, - ClientMaxBatchCount: 128, + ClientMaxBatchSize: 64 * 1024 * 1024, + ClientMaxBatchCount: 1024, ClientRetryRateLimit: 1.0, ServerMaxPendingMessageCount: 102400, ServerAckInterval: config.TomlDuration(time.Millisecond * 100), - ServerWorkerPoolSize: 4, + ServerWorkerPoolSize: 8, MaxRecvMsgSize: 256 * 1024 * 1024, KeepAliveTimeout: config.TomlDuration(time.Second * 10), KeepAliveTime: config.TomlDuration(time.Second * 30), @@ -584,12 +584,12 @@ unknown3 = 3 // We expect the default configuration here. Messages: &config.MessagesConfig{ ClientMaxBatchInterval: config.TomlDuration(time.Millisecond * 10), - ClientMaxBatchSize: 8 * 1024 * 1024, - ClientMaxBatchCount: 128, + ClientMaxBatchSize: 64 * 1024 * 1024, + ClientMaxBatchCount: 1024, ClientRetryRateLimit: 1.0, ServerMaxPendingMessageCount: 102400, ServerAckInterval: config.TomlDuration(time.Millisecond * 100), - ServerWorkerPoolSize: 4, + ServerWorkerPoolSize: 8, MaxRecvMsgSize: 256 * 1024 * 1024, KeepAliveTimeout: config.TomlDuration(time.Second * 10), KeepAliveTime: config.TomlDuration(time.Second * 30), diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index c653447ce4c..24b85e40740 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -144,12 +144,12 @@ const ( "enable-new-scheduler": true, "messages": { "client-max-batch-interval": 10000000, - "client-max-batch-size": 8388608, - "client-max-batch-count": 128, + "client-max-batch-size": 67108864, + "client-max-batch-count": 1024, "client-retry-rate-limit": 1, "server-max-pending-message-count": 102400, "server-ack-interval": 100000000, - "server-worker-pool-size": 4, + "server-worker-pool-size": 8, "max-recv-msg-size": 268435456, "keep-alive-time": 30000000000, "keep-alive-timeout": 10000000000 diff --git a/pkg/config/messages.go b/pkg/config/messages.go index 7b906518ca8..fc90d1eff6f 100644 --- a/pkg/config/messages.go +++ b/pkg/config/messages.go @@ -49,12 +49,12 @@ type MessagesConfig struct { var defaultMessageConfig = &MessagesConfig{ // Note that ClientMaxBatchInterval may increase the checkpoint latency. ClientMaxBatchInterval: TomlDuration(time.Millisecond * 10), - ClientMaxBatchSize: 8 * 1024 * 1024, // 8MB - ClientMaxBatchCount: 128, + ClientMaxBatchSize: 64 * 1024 * 1024, // 64MB + ClientMaxBatchCount: 1024, ClientRetryRateLimit: 1.0, // Once per second ServerMaxPendingMessageCount: 102400, ServerAckInterval: TomlDuration(time.Millisecond * 100), - ServerWorkerPoolSize: 4, + ServerWorkerPoolSize: 8, MaxRecvMsgSize: defaultMaxRecvMsgSize, KeepAliveTime: TomlDuration(time.Second * 30), KeepAliveTimeout: TomlDuration(time.Second * 10), @@ -66,7 +66,7 @@ const ( // clientSendChannelSize represents the size of an internal channel used to buffer // unsent messages. - clientSendChannelSize = 128 + clientSendChannelSize = 1024 // clientDialTimeout represents the timeout given to gRPC to dial. 5 seconds seems reasonable // because it is unlikely that the latency between TiCDC nodes is larger than 5 seconds.