Skip to content

Commit

Permalink
processor: fix a bug that will cause processor Tick get stuck when do…
Browse files Browse the repository at this point in the history
…wnstream is Kafka (#11339)

close #11340
  • Loading branch information
asddongmen authored Jul 4, 2024
1 parent 04a7d6a commit 695f932
Show file tree
Hide file tree
Showing 10 changed files with 20 additions and 214 deletions.
39 changes: 0 additions & 39 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/pingcap/tiflow/cdc/sink/dmlsink/factory"
tablesinkmetrics "github.com/pingcap/tiflow/cdc/sink/metrics/tablesink"
"github.com/pingcap/tiflow/cdc/sink/tablesink"
"github.com/pingcap/tiflow/pkg/config"
pconfig "github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/retry"
Expand Down Expand Up @@ -330,12 +329,6 @@ func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err er
}
}

func (m *SinkManager) needsStuckCheck() bool {
m.sinkFactory.Lock()
defer m.sinkFactory.Unlock()
return m.sinkFactory.f != nil && m.sinkFactory.f.Category() == factory.CategoryMQ
}

func (m *SinkManager) initSinkFactory() (chan error, uint64) {
m.sinkFactory.Lock()
defer m.sinkFactory.Unlock()
Expand Down Expand Up @@ -403,19 +396,6 @@ func (m *SinkManager) clearSinkFactory() {
}
}

func (m *SinkManager) putSinkFactoryError(err error, version uint64) (success bool) {
m.sinkFactory.Lock()
defer m.sinkFactory.Unlock()
if version == m.sinkFactory.version {
select {
case m.sinkFactory.errors <- err:
default:
}
return true
}
return false
}

func (m *SinkManager) startSinkWorkers(ctx context.Context, eg *errgroup.Group, splitTxn bool) {
for i := 0; i < sinkWorkerNum; i++ {
w := newSinkWorker(m.changefeedID, m.sourceManager,
Expand Down Expand Up @@ -1023,25 +1003,6 @@ func (m *SinkManager) GetTableStats(span tablepb.Span) TableStats {
m.sinkMemQuota.Release(span, checkpointTs)
m.redoMemQuota.Release(span, checkpointTs)

advanceTimeoutInSec := util.GetOrZero(m.config.Sink.AdvanceTimeoutInSec)
if advanceTimeoutInSec <= 0 {
advanceTimeoutInSec = config.DefaultAdvanceTimeoutInSec
}
stuckCheck := time.Duration(advanceTimeoutInSec) * time.Second

if m.needsStuckCheck() {
isStuck, sinkVersion := tableSink.sinkMaybeStuck(stuckCheck)
if isStuck && m.putSinkFactoryError(errors.New("table sink stuck"), sinkVersion) {
log.Warn("Table checkpoint is stuck too long, will restart the sink backend",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Stringer("span", &span),
zap.Any("checkpointTs", checkpointTs),
zap.Float64("stuckCheck", stuckCheck.Seconds()),
zap.Uint64("factoryVersion", sinkVersion))
}
}

var resolvedTs model.Ts
// If redo log is enabled, we have to use redo log's resolved ts to calculate processor's min resolved ts.
if m.redoDMLMgr != nil {
Expand Down
15 changes: 0 additions & 15 deletions cdc/processor/sinkmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,21 +378,6 @@ func TestSinkManagerRunWithErrors(t *testing.T) {
}
}

func TestSinkManagerNeedsStuckCheck(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
errCh := make(chan error, 16)
changefeedInfo := getChangefeedInfo()
manager, _, _ := CreateManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, errCh)
defer func() {
cancel()
manager.Close()
}()

require.False(t, manager.needsStuckCheck())
}

func TestSinkManagerRestartTableSinks(t *testing.T) {
failpoint.Enable("github.com/pingcap/tiflow/cdc/processor/sinkmanager/SinkWorkerTaskHandlePause", "return")
defer failpoint.Disable("github.com/pingcap/tiflow/cdc/processor/sinkmanager/SinkWorkerTaskHandlePause")
Expand Down
19 changes: 0 additions & 19 deletions cdc/processor/sinkmanager/table_sink_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,25 +453,6 @@ func (t *tableSinkWrapper) cleanRangeEventCounts(upperBound sorter.Position, min
return shouldClean
}

func (t *tableSinkWrapper) sinkMaybeStuck(stuckCheck time.Duration) (bool, uint64) {
t.getCheckpointTs()

t.tableSink.RLock()
defer t.tableSink.RUnlock()
t.tableSink.innerMu.Lock()
defer t.tableSink.innerMu.Unlock()

// What these conditions mean:
// 1. the table sink has been associated with a valid sink;
// 2. its checkpoint hasn't been advanced for a while;
version := t.tableSink.version
advanced := t.tableSink.advanced
if version > 0 && time.Since(advanced) > stuckCheck {
return true, version
}
return false, uint64(0)
}

func handleRowChangedEvents(
changefeed model.ChangeFeedID, span tablepb.Span,
events ...*model.PolymorphicEvent,
Expand Down
62 changes: 0 additions & 62 deletions cdc/processor/sinkmanager/table_sink_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"math"
"sync"
"testing"
"time"

"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/tablepb"
Expand All @@ -29,7 +28,6 @@ import (
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
)

type mockSink struct {
Expand Down Expand Up @@ -315,63 +313,3 @@ func TestTableSinkWrapperSinkVersion(t *testing.T) {
require.Nil(t, wrapper.tableSink.s)
require.Equal(t, wrapper.tableSink.version, uint64(0))
}

func TestTableSinkWrapperSinkInner(t *testing.T) {
t.Parallel()

innerTableSink := tablesink.New[*model.RowChangedEvent](
model.ChangeFeedID{}, tablepb.Span{}, model.Ts(0),
newMockSink(), &dmlsink.RowChangeEventAppender{},
pdutil.NewClock4Test(),
prometheus.NewCounter(prometheus.CounterOpts{}),
prometheus.NewHistogram(prometheus.HistogramOpts{}),
)
version := new(uint64)

wrapper := newTableSinkWrapper(
model.DefaultChangeFeedID("1"),
spanz.TableIDToComparableSpan(1),
func() (tablesink.TableSink, uint64) {
*version += 1
return innerTableSink, *version
},
tablepb.TableStatePrepared,
oracle.GoTimeToTS(time.Now()),
oracle.GoTimeToTS(time.Now().Add(10000*time.Second)),
func(_ context.Context) (model.Ts, error) { return math.MaxUint64, nil },
)

require.True(t, wrapper.initTableSink())

wrapper.closeAndClearTableSink()

// Shouldn't be stuck because version is 0.
require.Equal(t, wrapper.tableSink.version, uint64(0))
isStuck, _ := wrapper.sinkMaybeStuck(100 * time.Millisecond)
require.False(t, isStuck)

// Shouldn't be stuck because tableSink.advanced is just updated.
require.True(t, wrapper.initTableSink())
isStuck, _ = wrapper.sinkMaybeStuck(100 * time.Millisecond)
require.False(t, isStuck)

// Shouldn't be stuck because upperbound hasn't been advanced.
time.Sleep(200 * time.Millisecond)
isStuck, _ = wrapper.sinkMaybeStuck(100 * time.Millisecond)
require.False(t, isStuck)

// Shouldn't be stuck because `getCheckpointTs` will update tableSink.advanced.
nowTs := oracle.GoTimeToTS(time.Now())
wrapper.updateReceivedSorterResolvedTs(nowTs)
wrapper.barrierTs.Store(nowTs)
isStuck, _ = wrapper.sinkMaybeStuck(100 * time.Millisecond)
require.False(t, isStuck)

time.Sleep(200 * time.Millisecond)
nowTs = oracle.GoTimeToTS(time.Now())
wrapper.updateReceivedSorterResolvedTs(nowTs)
wrapper.barrierTs.Store(nowTs)
wrapper.updateResolvedTs(model.NewResolvedTs(nowTs))
isStuck, _ = wrapper.sinkMaybeStuck(100 * time.Millisecond)
require.True(t, isStuck)
}
1 change: 1 addition & 0 deletions pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ type SinkConfig struct {

// AdvanceTimeoutInSec is a duration in second. If a table sink progress hasn't been
// advanced for this given duration, the sink will be canceled and re-established.
// Deprecated since v8.1.1
AdvanceTimeoutInSec *uint `toml:"advance-timeout-in-sec" json:"advance-timeout-in-sec,omitempty"`

// Simple Protocol only config, use to control the behavior of sending bootstrap message.
Expand Down
18 changes: 18 additions & 0 deletions pkg/sink/kafka/sarama_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@ package kafka

import (
"context"
"time"

"github.com/IBM/sarama"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/util"
"github.com/rcrowley/go-metrics"
"go.uber.org/zap"
)

type saramaFactory struct {
Expand All @@ -43,17 +46,32 @@ func NewSaramaFactory(
}

func (f *saramaFactory) AdminClient(ctx context.Context) (ClusterAdminClient, error) {
start := time.Now()
config, err := NewSaramaConfig(ctx, f.option)
duration := time.Since(start).Seconds()
if duration > 2 {
log.Warn("new sarama config cost too much time", zap.Any("duration", duration), zap.Stringer("changefeedID", f.changefeedID))
}
if err != nil {
return nil, err
}

start = time.Now()
client, err := sarama.NewClient(f.option.BrokerEndpoints, config)
duration = time.Since(start).Seconds()
if duration > 2 {
log.Warn("new sarama client cost too much time", zap.Any("duration", duration), zap.Stringer("changefeedID", f.changefeedID))
}
if err != nil {
return nil, errors.Trace(err)
}

start = time.Now()
admin, err := sarama.NewClusterAdminFromClient(client)
duration = time.Since(start).Seconds()
if duration > 2 {
log.Warn("new sarama cluster admin cost too much time", zap.Any("duration", duration), zap.Stringer("changefeedID", f.changefeedID))
}
if err != nil {
return nil, errors.Trace(err)
}
Expand Down

This file was deleted.

29 changes: 0 additions & 29 deletions tests/integration_tests/hang_sink_suicide/conf/diff_config.toml

This file was deleted.

47 changes: 0 additions & 47 deletions tests/integration_tests/hang_sink_suicide/run.sh

This file was deleted.

2 changes: 1 addition & 1 deletion tests/integration_tests/run_group.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ group_num=${group#G}
# Other tests that only support mysql: batch_update_to_no_batch ddl_reentrant
# changefeed_fast_fail changefeed_resume_with_checkpoint_ts sequence
# multi_cdc_cluster capture_suicide_while_balance_table
mysql_only="bdr_mode capture_suicide_while_balance_table syncpoint syncpoint_check_ts hang_sink_suicide server_config_compatibility changefeed_dup_error_restart"
mysql_only="bdr_mode capture_suicide_while_balance_table syncpoint syncpoint_check_ts server_config_compatibility changefeed_dup_error_restart"
mysql_only_http="http_api http_api_tls api_v2 http_api_tls_with_user_auth cli_tls_with_auth"
mysql_only_consistent_replicate="consistent_replicate_ddl consistent_replicate_gbk consistent_replicate_nfs consistent_replicate_storage_file consistent_replicate_storage_file_large_value consistent_replicate_storage_s3 consistent_partition_table"

Expand Down

0 comments on commit 695f932

Please sign in to comment.