From 4d45c05859fae19db1969b21aee4b855868acf65 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Fri, 28 Jun 2024 16:37:30 +0800 Subject: [PATCH 1/5] enlarge s3 retry --- pkg/sink/codec/simple/decoder.go | 3 ++- pkg/util/external_storage.go | 11 +++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/pkg/sink/codec/simple/decoder.go b/pkg/sink/codec/simple/decoder.go index 25cff0a3b15..abfb2e355a9 100644 --- a/pkg/sink/codec/simple/decoder.go +++ b/pkg/sink/codec/simple/decoder.go @@ -18,6 +18,7 @@ import ( "context" "database/sql" "path/filepath" + "time" "github.com/pingcap/errors" "github.com/pingcap/log" @@ -57,7 +58,7 @@ func NewDecoder(ctx context.Context, config *common.Config, db *sql.DB) (*Decode ) if config.LargeMessageHandle.EnableClaimCheck() { storageURI := config.LargeMessageHandle.ClaimCheckStorageURI - externalStorage, err = util.GetExternalStorageFromURI(ctx, storageURI) + externalStorage, err = util.GetExternalStorage(ctx, storageURI, nil, util.NewS3Retryer(10, 10 * time.Second, 10 * time.Second) if err != nil { return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) } diff --git a/pkg/util/external_storage.go b/pkg/util/external_storage.go index 0d96c47045f..1d7a23c661b 100644 --- a/pkg/util/external_storage.go +++ b/pkg/util/external_storage.go @@ -140,6 +140,17 @@ func DefaultS3Retryer() request.Retryer { } } +// NewS3Retryer creates a new s3 retryer. +func NewS3Retryer(maxRetries int, minRetryDelay, minThrottleDelay time.Duration) request.Retryer { + return retryerWithLog{ + DefaultRetryer: client.DefaultRetryer{ + NumMaxRetries: maxRetries, + MinRetryDelay: minRetryDelay, + MinThrottleDelay: minThrottleDelay, + }, + } +} + type extStorageWithTimeout struct { storage.ExternalStorage timeout time.Duration From b4f34591f840502cd1517a95a4da6386e873a209 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Fri, 28 Jun 2024 16:43:27 +0800 Subject: [PATCH 2/5] enlarge s3 retry --- pkg/sink/codec/canal/canal_json_decoder.go | 3 ++- pkg/sink/codec/open/open_protocol_decoder.go | 3 ++- pkg/sink/codec/simple/decoder.go | 2 +- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/pkg/sink/codec/canal/canal_json_decoder.go b/pkg/sink/codec/canal/canal_json_decoder.go index 4b945674d34..18a135b9c12 100644 --- a/pkg/sink/codec/canal/canal_json_decoder.go +++ b/pkg/sink/codec/canal/canal_json_decoder.go @@ -20,6 +20,7 @@ import ( "path/filepath" "strconv" "strings" + "time" "github.com/goccy/go-json" "github.com/pingcap/errors" @@ -59,7 +60,7 @@ func NewBatchDecoder( ) if codecConfig.LargeMessageHandle.EnableClaimCheck() { storageURI := codecConfig.LargeMessageHandle.ClaimCheckStorageURI - externalStorage, err = util.GetExternalStorageFromURI(ctx, storageURI) + externalStorage, err = util.GetExternalStorage(ctx, storageURI, nil, util.NewS3Retryer(10, 10*time.Second, 10*time.Second)) if err != nil { return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) } diff --git a/pkg/sink/codec/open/open_protocol_decoder.go b/pkg/sink/codec/open/open_protocol_decoder.go index 235ef26e30f..fb31e7e660c 100644 --- a/pkg/sink/codec/open/open_protocol_decoder.go +++ b/pkg/sink/codec/open/open_protocol_decoder.go @@ -19,6 +19,7 @@ import ( "encoding/binary" "path/filepath" "strings" + "time" "github.com/pingcap/errors" "github.com/pingcap/log" @@ -57,7 +58,7 @@ func NewBatchDecoder(ctx context.Context, config *common.Config, db *sql.DB) (co ) if config.LargeMessageHandle.EnableClaimCheck() { storageURI := config.LargeMessageHandle.ClaimCheckStorageURI - externalStorage, err = util.GetExternalStorageFromURI(ctx, storageURI) + externalStorage, err = util.GetExternalStorage(ctx, storageURI, nil, util.NewS3Retryer(10, 10*time.Second, 10*time.Second)) if err != nil { return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) } diff --git a/pkg/sink/codec/simple/decoder.go b/pkg/sink/codec/simple/decoder.go index abfb2e355a9..3c611a37f54 100644 --- a/pkg/sink/codec/simple/decoder.go +++ b/pkg/sink/codec/simple/decoder.go @@ -58,7 +58,7 @@ func NewDecoder(ctx context.Context, config *common.Config, db *sql.DB) (*Decode ) if config.LargeMessageHandle.EnableClaimCheck() { storageURI := config.LargeMessageHandle.ClaimCheckStorageURI - externalStorage, err = util.GetExternalStorage(ctx, storageURI, nil, util.NewS3Retryer(10, 10 * time.Second, 10 * time.Second) + externalStorage, err = util.GetExternalStorage(ctx, storageURI, nil, util.NewS3Retryer(10, 10*time.Second, 10*time.Second)) if err != nil { return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) } From 2b1de2a6d5088eebf1cb3b2c8b0c4d4c5705e254 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Mon, 1 Jul 2024 11:07:13 +0800 Subject: [PATCH 3/5] remove flush row changed immedately to reduce memory allocation related code --- cmd/kafka-consumer/event_group.go | 26 ++------------------- cmd/kafka-consumer/writer.go | 38 ++++--------------------------- 2 files changed, 7 insertions(+), 57 deletions(-) diff --git a/cmd/kafka-consumer/event_group.go b/cmd/kafka-consumer/event_group.go index 01aa155f290..23a6c26133b 100644 --- a/cmd/kafka-consumer/event_group.go +++ b/cmd/kafka-consumer/event_group.go @@ -19,15 +19,9 @@ import ( "github.com/pingcap/tiflow/cdc/model" ) -const ( - defaultMaxBufferedBytes = 100 * 1024 * 1024 - defaultMaxBufferedCount = 500 -) - // EventsGroup could store change event message. type eventsGroup struct { events []*model.RowChangedEvent - bytes int } // NewEventsGroup will create new event group. @@ -40,16 +34,10 @@ func NewEventsGroup() *eventsGroup { // Append will append an event to event groups. func (g *eventsGroup) Append(e *model.RowChangedEvent) { g.events = append(g.events, e) - g.bytes += e.ApproximateBytes() -} - -// ShouldFlushEvents return true if buffered events too much, to reduce memory usage. -func (g *eventsGroup) ShouldFlushEvents() bool { - return g.bytes >= defaultMaxBufferedBytes || len(g.events) >= defaultMaxBufferedCount } // Resolve will get events where CommitTs is less than resolveTs. -func (g *eventsGroup) Resolve(resolveTs uint64) *eventsGroup { +func (g *eventsGroup) Resolve(resolveTs uint64) []*model.RowChangedEvent { sort.Slice(g.events, func(i, j int) bool { return g.events[i].CommitTs < g.events[j].CommitTs }) @@ -58,17 +46,7 @@ func (g *eventsGroup) Resolve(resolveTs uint64) *eventsGroup { return g.events[i].CommitTs > resolveTs }) - result := &eventsGroup{ - events: g.events[:i], - } - var bytes int - for _, e := range result.events { - bytes += e.ApproximateBytes() - } - result.bytes = bytes - + result := g.events[:i] g.events = g.events[i:] - g.bytes = g.bytes - bytes - return result } diff --git a/cmd/kafka-consumer/writer.go b/cmd/kafka-consumer/writer.go index 858528eb159..82c3534aa38 100644 --- a/cmd/kafka-consumer/writer.go +++ b/cmd/kafka-consumer/writer.go @@ -222,13 +222,8 @@ func (w *writer) forEachPartition(fn func(p *partitionProgress)) { } // Write will synchronously write data downstream -func (w *writer) Write(ctx context.Context, messageType model.MessageType, commitTs uint64) bool { - var watermark uint64 - if messageType == model.MessageTypeRow { - watermark = commitTs - } else { - watermark = w.getMinWatermark() - } +func (w *writer) Write(ctx context.Context, messageType model.MessageType) bool { + watermark := w.getMinWatermark() var todoDDL *model.DDLEvent for { todoDDL = w.getFrontDDL() @@ -288,7 +283,6 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool counter int needFlush bool messageType model.MessageType - commitTs uint64 ) for { ty, hasNext, err := decoder.HasNext() @@ -412,26 +406,6 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool zap.Int64("tableID", tableID), zap.String("schema", row.TableInfo.GetSchemaName()), zap.String("table", row.TableInfo.GetTableName())) - - if group.ShouldFlushEvents() { - g := group.Resolve(row.CommitTs) - tableSink, ok := progress.tableSinkMap.Load(tableID) - if !ok { - tableSink = w.sinkFactory.CreateTableSinkForConsumer( - model.DefaultChangeFeedID("kafka-consumer"), - spanz.TableIDToComparableSpan(tableID), - g.events[0].CommitTs, - ) - progress.tableSinkMap.Store(tableID, tableSink) - } - tableSink.(tablesink.TableSink).AppendRowChangedEvents(g.events...) - log.Warn("too much events buffered, should flush them to reduce memory usage", - zap.Uint64("resolvedTs", row.CommitTs), zap.Int64("tableID", tableID), - zap.Int("count", len(g.events)), zap.Int("bytes", g.bytes), - zap.Int32("partition", partition), zap.Any("offset", message.TopicPartition.Offset)) - needFlush = true - commitTs = row.CommitTs - } case model.MessageTypeResolved: ts, err := decoder.NextResolvedEvent() if err != nil { @@ -455,8 +429,7 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool } for tableID, group := range eventGroup { - g := group.Resolve(ts) - events := g.events + events := group.Resolve(ts) if len(events) == 0 { continue } @@ -471,8 +444,7 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool } tableSink.(tablesink.TableSink).AppendRowChangedEvents(events...) log.Debug("append row changed events to table sink", - zap.Uint64("resolvedTs", ts), zap.Int64("tableID", tableID), - zap.Int("count", len(events)), zap.Int("bytes", g.bytes), + zap.Uint64("resolvedTs", ts), zap.Int64("tableID", tableID), zap.Int("count", len(events)), zap.Int32("partition", partition), zap.Any("offset", message.TopicPartition.Offset)) } atomic.StoreUint64(&progress.watermark, ts) @@ -493,7 +465,7 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool return false } // flush when received DDL event or resolvedTs - return w.Write(ctx, messageType, commitTs) + return w.Write(ctx, messageType) } type fakeTableIDGenerator struct { From c3d64a1c533c2b19bda89dea4b02d070f879733e Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Tue, 2 Jul 2024 15:10:17 +0800 Subject: [PATCH 4/5] add comment about row changed event commit ts fall back. --- cmd/kafka-consumer/writer.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cmd/kafka-consumer/writer.go b/cmd/kafka-consumer/writer.go index 82c3534aa38..eee1d000c63 100644 --- a/cmd/kafka-consumer/writer.go +++ b/cmd/kafka-consumer/writer.go @@ -383,6 +383,8 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool } watermark := atomic.LoadUint64(&progress.watermark) + // if the kafka cluster is normal, this should not hit. + // else if the cluster is abnormal, the consumer may consume old message, then cause the watermark fallback. if row.CommitTs < watermark { log.Panic("RowChangedEvent fallback row, ignore it", zap.Uint64("commitTs", row.CommitTs), From f45ba0f003f90a43db5df97e35e343b7d33e1985 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Tue, 2 Jul 2024 15:26:45 +0800 Subject: [PATCH 5/5] adjust logs. --- cmd/kafka-consumer/consumer.go | 10 ++++++++-- cmd/kafka-consumer/writer.go | 23 +++++++++++------------ pkg/sink/codec/simple/decoder.go | 2 +- 3 files changed, 20 insertions(+), 15 deletions(-) diff --git a/cmd/kafka-consumer/consumer.go b/cmd/kafka-consumer/consumer.go index 2c0d474859a..825f0da9ecf 100644 --- a/cmd/kafka-consumer/consumer.go +++ b/cmd/kafka-consumer/consumer.go @@ -130,8 +130,14 @@ func (c *consumer) Consume(ctx context.Context) { if !needCommit { continue } - if _, err = c.client.CommitMessage(msg); err != nil { - log.Error("read message failed, just continue to retry", zap.Error(err)) + + topicPartition, err := c.client.CommitMessage(msg) + if err != nil { + log.Error("commit message failed, just continue", zap.Error(err)) + continue } + log.Info("commit message success", + zap.String("topic", topicPartition[0].String()), zap.Int32("partition", topicPartition[0].Partition), + zap.Any("offset", topicPartition[0].Offset)) } } diff --git a/cmd/kafka-consumer/writer.go b/cmd/kafka-consumer/writer.go index eee1d000c63..919a3853f38 100644 --- a/cmd/kafka-consumer/writer.go +++ b/cmd/kafka-consumer/writer.go @@ -72,7 +72,8 @@ func NewDecoder(ctx context.Context, option *option, upstreamTiDB *sql.DB) (code } type partitionProgress struct { - watermark uint64 + watermark uint64 + watermarkOffset kafka.Offset // tableSinkMap -> [tableID]tableSink tableSinkMap sync.Map @@ -373,12 +374,10 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool } if partition != target { log.Panic("RowChangedEvent dispatched to wrong partition", - zap.Int32("partition", partition), - zap.Int32("expected", target), + zap.Int32("partition", partition), zap.Int32("expected", target), zap.Int32("partitionNum", w.option.partitionNum), zap.Any("offset", message.TopicPartition.Offset), - zap.Int64("tableID", tableID), - zap.Any("row", row), + zap.Int64("tableID", tableID), zap.Any("row", row), ) } @@ -387,10 +386,9 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool // else if the cluster is abnormal, the consumer may consume old message, then cause the watermark fallback. if row.CommitTs < watermark { log.Panic("RowChangedEvent fallback row, ignore it", - zap.Uint64("commitTs", row.CommitTs), - zap.Uint64("watermark", watermark), - zap.Int32("partition", partition), zap.Any("offset", message.TopicPartition.Offset), - zap.Int64("tableID", tableID), + zap.Uint64("commitTs", row.CommitTs), zap.Any("offset", message.TopicPartition.Offset), + zap.Uint64("watermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset), + zap.Int32("partition", partition), zap.Int64("tableID", tableID), zap.String("schema", row.TableInfo.GetSchemaName()), zap.String("table", row.TableInfo.GetTableName())) } @@ -425,9 +423,9 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool watermark := atomic.LoadUint64(&progress.watermark) if ts < watermark { log.Panic("partition resolved ts fallback, skip it", - zap.Uint64("ts", ts), - zap.Uint64("watermark", watermark), - zap.Int32("partition", partition), zap.Any("offset", message.TopicPartition.Offset)) + zap.Uint64("ts", ts), zap.Any("offset", message.TopicPartition.Offset), + zap.Uint64("watermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset), + zap.Int32("partition", partition)) } for tableID, group := range eventGroup { @@ -450,6 +448,7 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool zap.Int32("partition", partition), zap.Any("offset", message.TopicPartition.Offset)) } atomic.StoreUint64(&progress.watermark, ts) + progress.watermarkOffset = message.TopicPartition.Offset needFlush = true default: log.Panic("unknown message type", zap.Any("messageType", messageType), diff --git a/pkg/sink/codec/simple/decoder.go b/pkg/sink/codec/simple/decoder.go index 3c611a37f54..a7c5cabce05 100644 --- a/pkg/sink/codec/simple/decoder.go +++ b/pkg/sink/codec/simple/decoder.go @@ -319,7 +319,7 @@ func (m *memoryTableInfoProvider) Write(info *model.TableInfo) { _, ok := m.memo[key] if ok { - log.Warn("table info not stored, since it already exists", + log.Debug("table info not stored, since it already exists", zap.String("schema", info.TableName.Schema), zap.String("table", info.TableName.Table), zap.Uint64("version", info.UpdateTS))