diff --git a/cmd/kafka-consumer/consumer.go b/cmd/kafka-consumer/consumer.go index 2c0d474859a..825f0da9ecf 100644 --- a/cmd/kafka-consumer/consumer.go +++ b/cmd/kafka-consumer/consumer.go @@ -130,8 +130,14 @@ func (c *consumer) Consume(ctx context.Context) { if !needCommit { continue } - if _, err = c.client.CommitMessage(msg); err != nil { - log.Error("read message failed, just continue to retry", zap.Error(err)) + + topicPartition, err := c.client.CommitMessage(msg) + if err != nil { + log.Error("commit message failed, just continue", zap.Error(err)) + continue } + log.Info("commit message success", + zap.String("topic", topicPartition[0].String()), zap.Int32("partition", topicPartition[0].Partition), + zap.Any("offset", topicPartition[0].Offset)) } } diff --git a/cmd/kafka-consumer/event_group.go b/cmd/kafka-consumer/event_group.go index 01aa155f290..23a6c26133b 100644 --- a/cmd/kafka-consumer/event_group.go +++ b/cmd/kafka-consumer/event_group.go @@ -19,15 +19,9 @@ import ( "github.com/pingcap/tiflow/cdc/model" ) -const ( - defaultMaxBufferedBytes = 100 * 1024 * 1024 - defaultMaxBufferedCount = 500 -) - // EventsGroup could store change event message. type eventsGroup struct { events []*model.RowChangedEvent - bytes int } // NewEventsGroup will create new event group. @@ -40,16 +34,10 @@ func NewEventsGroup() *eventsGroup { // Append will append an event to event groups. func (g *eventsGroup) Append(e *model.RowChangedEvent) { g.events = append(g.events, e) - g.bytes += e.ApproximateBytes() -} - -// ShouldFlushEvents return true if buffered events too much, to reduce memory usage. -func (g *eventsGroup) ShouldFlushEvents() bool { - return g.bytes >= defaultMaxBufferedBytes || len(g.events) >= defaultMaxBufferedCount } // Resolve will get events where CommitTs is less than resolveTs. -func (g *eventsGroup) Resolve(resolveTs uint64) *eventsGroup { +func (g *eventsGroup) Resolve(resolveTs uint64) []*model.RowChangedEvent { sort.Slice(g.events, func(i, j int) bool { return g.events[i].CommitTs < g.events[j].CommitTs }) @@ -58,17 +46,7 @@ func (g *eventsGroup) Resolve(resolveTs uint64) *eventsGroup { return g.events[i].CommitTs > resolveTs }) - result := &eventsGroup{ - events: g.events[:i], - } - var bytes int - for _, e := range result.events { - bytes += e.ApproximateBytes() - } - result.bytes = bytes - + result := g.events[:i] g.events = g.events[i:] - g.bytes = g.bytes - bytes - return result } diff --git a/cmd/kafka-consumer/writer.go b/cmd/kafka-consumer/writer.go index 858528eb159..919a3853f38 100644 --- a/cmd/kafka-consumer/writer.go +++ b/cmd/kafka-consumer/writer.go @@ -72,7 +72,8 @@ func NewDecoder(ctx context.Context, option *option, upstreamTiDB *sql.DB) (code } type partitionProgress struct { - watermark uint64 + watermark uint64 + watermarkOffset kafka.Offset // tableSinkMap -> [tableID]tableSink tableSinkMap sync.Map @@ -222,13 +223,8 @@ func (w *writer) forEachPartition(fn func(p *partitionProgress)) { } // Write will synchronously write data downstream -func (w *writer) Write(ctx context.Context, messageType model.MessageType, commitTs uint64) bool { - var watermark uint64 - if messageType == model.MessageTypeRow { - watermark = commitTs - } else { - watermark = w.getMinWatermark() - } +func (w *writer) Write(ctx context.Context, messageType model.MessageType) bool { + watermark := w.getMinWatermark() var todoDDL *model.DDLEvent for { todoDDL = w.getFrontDDL() @@ -288,7 +284,6 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool counter int needFlush bool messageType model.MessageType - commitTs uint64 ) for { ty, hasNext, err := decoder.HasNext() @@ -379,22 +374,21 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool } if partition != target { log.Panic("RowChangedEvent dispatched to wrong partition", - zap.Int32("partition", partition), - zap.Int32("expected", target), + zap.Int32("partition", partition), zap.Int32("expected", target), zap.Int32("partitionNum", w.option.partitionNum), zap.Any("offset", message.TopicPartition.Offset), - zap.Int64("tableID", tableID), - zap.Any("row", row), + zap.Int64("tableID", tableID), zap.Any("row", row), ) } watermark := atomic.LoadUint64(&progress.watermark) + // if the kafka cluster is normal, this should not hit. + // else if the cluster is abnormal, the consumer may consume old message, then cause the watermark fallback. if row.CommitTs < watermark { log.Panic("RowChangedEvent fallback row, ignore it", - zap.Uint64("commitTs", row.CommitTs), - zap.Uint64("watermark", watermark), - zap.Int32("partition", partition), zap.Any("offset", message.TopicPartition.Offset), - zap.Int64("tableID", tableID), + zap.Uint64("commitTs", row.CommitTs), zap.Any("offset", message.TopicPartition.Offset), + zap.Uint64("watermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset), + zap.Int32("partition", partition), zap.Int64("tableID", tableID), zap.String("schema", row.TableInfo.GetSchemaName()), zap.String("table", row.TableInfo.GetTableName())) } @@ -412,26 +406,6 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool zap.Int64("tableID", tableID), zap.String("schema", row.TableInfo.GetSchemaName()), zap.String("table", row.TableInfo.GetTableName())) - - if group.ShouldFlushEvents() { - g := group.Resolve(row.CommitTs) - tableSink, ok := progress.tableSinkMap.Load(tableID) - if !ok { - tableSink = w.sinkFactory.CreateTableSinkForConsumer( - model.DefaultChangeFeedID("kafka-consumer"), - spanz.TableIDToComparableSpan(tableID), - g.events[0].CommitTs, - ) - progress.tableSinkMap.Store(tableID, tableSink) - } - tableSink.(tablesink.TableSink).AppendRowChangedEvents(g.events...) - log.Warn("too much events buffered, should flush them to reduce memory usage", - zap.Uint64("resolvedTs", row.CommitTs), zap.Int64("tableID", tableID), - zap.Int("count", len(g.events)), zap.Int("bytes", g.bytes), - zap.Int32("partition", partition), zap.Any("offset", message.TopicPartition.Offset)) - needFlush = true - commitTs = row.CommitTs - } case model.MessageTypeResolved: ts, err := decoder.NextResolvedEvent() if err != nil { @@ -449,14 +423,13 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool watermark := atomic.LoadUint64(&progress.watermark) if ts < watermark { log.Panic("partition resolved ts fallback, skip it", - zap.Uint64("ts", ts), - zap.Uint64("watermark", watermark), - zap.Int32("partition", partition), zap.Any("offset", message.TopicPartition.Offset)) + zap.Uint64("ts", ts), zap.Any("offset", message.TopicPartition.Offset), + zap.Uint64("watermark", watermark), zap.Any("watermarkOffset", progress.watermarkOffset), + zap.Int32("partition", partition)) } for tableID, group := range eventGroup { - g := group.Resolve(ts) - events := g.events + events := group.Resolve(ts) if len(events) == 0 { continue } @@ -471,11 +444,11 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool } tableSink.(tablesink.TableSink).AppendRowChangedEvents(events...) log.Debug("append row changed events to table sink", - zap.Uint64("resolvedTs", ts), zap.Int64("tableID", tableID), - zap.Int("count", len(events)), zap.Int("bytes", g.bytes), + zap.Uint64("resolvedTs", ts), zap.Int64("tableID", tableID), zap.Int("count", len(events)), zap.Int32("partition", partition), zap.Any("offset", message.TopicPartition.Offset)) } atomic.StoreUint64(&progress.watermark, ts) + progress.watermarkOffset = message.TopicPartition.Offset needFlush = true default: log.Panic("unknown message type", zap.Any("messageType", messageType), @@ -493,7 +466,7 @@ func (w *writer) WriteMessage(ctx context.Context, message *kafka.Message) bool return false } // flush when received DDL event or resolvedTs - return w.Write(ctx, messageType, commitTs) + return w.Write(ctx, messageType) } type fakeTableIDGenerator struct { diff --git a/pkg/sink/codec/canal/canal_json_decoder.go b/pkg/sink/codec/canal/canal_json_decoder.go index 4b945674d34..18a135b9c12 100644 --- a/pkg/sink/codec/canal/canal_json_decoder.go +++ b/pkg/sink/codec/canal/canal_json_decoder.go @@ -20,6 +20,7 @@ import ( "path/filepath" "strconv" "strings" + "time" "github.com/goccy/go-json" "github.com/pingcap/errors" @@ -59,7 +60,7 @@ func NewBatchDecoder( ) if codecConfig.LargeMessageHandle.EnableClaimCheck() { storageURI := codecConfig.LargeMessageHandle.ClaimCheckStorageURI - externalStorage, err = util.GetExternalStorageFromURI(ctx, storageURI) + externalStorage, err = util.GetExternalStorage(ctx, storageURI, nil, util.NewS3Retryer(10, 10*time.Second, 10*time.Second)) if err != nil { return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) } diff --git a/pkg/sink/codec/open/open_protocol_decoder.go b/pkg/sink/codec/open/open_protocol_decoder.go index 235ef26e30f..fb31e7e660c 100644 --- a/pkg/sink/codec/open/open_protocol_decoder.go +++ b/pkg/sink/codec/open/open_protocol_decoder.go @@ -19,6 +19,7 @@ import ( "encoding/binary" "path/filepath" "strings" + "time" "github.com/pingcap/errors" "github.com/pingcap/log" @@ -57,7 +58,7 @@ func NewBatchDecoder(ctx context.Context, config *common.Config, db *sql.DB) (co ) if config.LargeMessageHandle.EnableClaimCheck() { storageURI := config.LargeMessageHandle.ClaimCheckStorageURI - externalStorage, err = util.GetExternalStorageFromURI(ctx, storageURI) + externalStorage, err = util.GetExternalStorage(ctx, storageURI, nil, util.NewS3Retryer(10, 10*time.Second, 10*time.Second)) if err != nil { return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) } diff --git a/pkg/sink/codec/simple/decoder.go b/pkg/sink/codec/simple/decoder.go index 25cff0a3b15..a7c5cabce05 100644 --- a/pkg/sink/codec/simple/decoder.go +++ b/pkg/sink/codec/simple/decoder.go @@ -18,6 +18,7 @@ import ( "context" "database/sql" "path/filepath" + "time" "github.com/pingcap/errors" "github.com/pingcap/log" @@ -57,7 +58,7 @@ func NewDecoder(ctx context.Context, config *common.Config, db *sql.DB) (*Decode ) if config.LargeMessageHandle.EnableClaimCheck() { storageURI := config.LargeMessageHandle.ClaimCheckStorageURI - externalStorage, err = util.GetExternalStorageFromURI(ctx, storageURI) + externalStorage, err = util.GetExternalStorage(ctx, storageURI, nil, util.NewS3Retryer(10, 10*time.Second, 10*time.Second)) if err != nil { return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err) } @@ -318,7 +319,7 @@ func (m *memoryTableInfoProvider) Write(info *model.TableInfo) { _, ok := m.memo[key] if ok { - log.Warn("table info not stored, since it already exists", + log.Debug("table info not stored, since it already exists", zap.String("schema", info.TableName.Schema), zap.String("table", info.TableName.Table), zap.Uint64("version", info.UpdateTS)) diff --git a/pkg/util/external_storage.go b/pkg/util/external_storage.go index 0d96c47045f..1d7a23c661b 100644 --- a/pkg/util/external_storage.go +++ b/pkg/util/external_storage.go @@ -140,6 +140,17 @@ func DefaultS3Retryer() request.Retryer { } } +// NewS3Retryer creates a new s3 retryer. +func NewS3Retryer(maxRetries int, minRetryDelay, minThrottleDelay time.Duration) request.Retryer { + return retryerWithLog{ + DefaultRetryer: client.DefaultRetryer{ + NumMaxRetries: maxRetries, + MinRetryDelay: minRetryDelay, + MinThrottleDelay: minThrottleDelay, + }, + } +} + type extStorageWithTimeout struct { storage.ExternalStorage timeout time.Duration