Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

processor: fix a bug that will cause processor Tick get stuck when downstream is Kafka #11339

Merged
merged 14 commits into from
Jul 4, 2024
39 changes: 0 additions & 39 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/pingcap/tiflow/cdc/sink/dmlsink/factory"
tablesinkmetrics "github.com/pingcap/tiflow/cdc/sink/metrics/tablesink"
"github.com/pingcap/tiflow/cdc/sink/tablesink"
"github.com/pingcap/tiflow/pkg/config"
pconfig "github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/retry"
Expand Down Expand Up @@ -330,12 +329,6 @@ func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err er
}
}

func (m *SinkManager) needsStuckCheck() bool {
m.sinkFactory.Lock()
defer m.sinkFactory.Unlock()
return m.sinkFactory.f != nil && m.sinkFactory.f.Category() == factory.CategoryMQ
}

func (m *SinkManager) initSinkFactory() (chan error, uint64) {
m.sinkFactory.Lock()
defer m.sinkFactory.Unlock()
Expand Down Expand Up @@ -403,19 +396,6 @@ func (m *SinkManager) clearSinkFactory() {
}
}

func (m *SinkManager) putSinkFactoryError(err error, version uint64) (success bool) {
m.sinkFactory.Lock()
defer m.sinkFactory.Unlock()
if version == m.sinkFactory.version {
select {
case m.sinkFactory.errors <- err:
default:
}
return true
}
return false
}

func (m *SinkManager) startSinkWorkers(ctx context.Context, eg *errgroup.Group, splitTxn bool) {
for i := 0; i < sinkWorkerNum; i++ {
w := newSinkWorker(m.changefeedID, m.sourceManager,
Expand Down Expand Up @@ -1023,25 +1003,6 @@ func (m *SinkManager) GetTableStats(span tablepb.Span) TableStats {
m.sinkMemQuota.Release(span, checkpointTs)
m.redoMemQuota.Release(span, checkpointTs)

advanceTimeoutInSec := util.GetOrZero(m.config.Sink.AdvanceTimeoutInSec)
if advanceTimeoutInSec <= 0 {
advanceTimeoutInSec = config.DefaultAdvanceTimeoutInSec
}
stuckCheck := time.Duration(advanceTimeoutInSec) * time.Second

if m.needsStuckCheck() {
isStuck, sinkVersion := tableSink.sinkMaybeStuck(stuckCheck)
if isStuck && m.putSinkFactoryError(errors.New("table sink stuck"), sinkVersion) {
log.Warn("Table checkpoint is stuck too long, will restart the sink backend",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Stringer("span", &span),
zap.Any("checkpointTs", checkpointTs),
zap.Float64("stuckCheck", stuckCheck.Seconds()),
zap.Uint64("factoryVersion", sinkVersion))
}
}

var resolvedTs model.Ts
// If redo log is enabled, we have to use redo log's resolved ts to calculate processor's min resolved ts.
if m.redoDMLMgr != nil {
Expand Down
15 changes: 0 additions & 15 deletions cdc/processor/sinkmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,21 +378,6 @@ func TestSinkManagerRunWithErrors(t *testing.T) {
}
}

func TestSinkManagerNeedsStuckCheck(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
errCh := make(chan error, 16)
changefeedInfo := getChangefeedInfo()
manager, _, _ := CreateManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, errCh)
defer func() {
cancel()
manager.Close()
}()

require.False(t, manager.needsStuckCheck())
}

func TestSinkManagerRestartTableSinks(t *testing.T) {
failpoint.Enable("github.com/pingcap/tiflow/cdc/processor/sinkmanager/SinkWorkerTaskHandlePause", "return")
defer failpoint.Disable("github.com/pingcap/tiflow/cdc/processor/sinkmanager/SinkWorkerTaskHandlePause")
Expand Down
19 changes: 0 additions & 19 deletions cdc/processor/sinkmanager/table_sink_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,25 +453,6 @@ func (t *tableSinkWrapper) cleanRangeEventCounts(upperBound sorter.Position, min
return shouldClean
}

func (t *tableSinkWrapper) sinkMaybeStuck(stuckCheck time.Duration) (bool, uint64) {
t.getCheckpointTs()

t.tableSink.RLock()
defer t.tableSink.RUnlock()
t.tableSink.innerMu.Lock()
defer t.tableSink.innerMu.Unlock()

// What these conditions mean:
// 1. the table sink has been associated with a valid sink;
// 2. its checkpoint hasn't been advanced for a while;
version := t.tableSink.version
advanced := t.tableSink.advanced
if version > 0 && time.Since(advanced) > stuckCheck {
return true, version
}
return false, uint64(0)
}

func handleRowChangedEvents(
changefeed model.ChangeFeedID, span tablepb.Span,
events ...*model.PolymorphicEvent,
Expand Down
62 changes: 0 additions & 62 deletions cdc/processor/sinkmanager/table_sink_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"math"
"sync"
"testing"
"time"

"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/tablepb"
Expand All @@ -29,7 +28,6 @@ import (
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
)

type mockSink struct {
Expand Down Expand Up @@ -315,63 +313,3 @@ func TestTableSinkWrapperSinkVersion(t *testing.T) {
require.Nil(t, wrapper.tableSink.s)
require.Equal(t, wrapper.tableSink.version, uint64(0))
}

func TestTableSinkWrapperSinkInner(t *testing.T) {
t.Parallel()

innerTableSink := tablesink.New[*model.RowChangedEvent](
model.ChangeFeedID{}, tablepb.Span{}, model.Ts(0),
newMockSink(), &dmlsink.RowChangeEventAppender{},
pdutil.NewClock4Test(),
prometheus.NewCounter(prometheus.CounterOpts{}),
prometheus.NewHistogram(prometheus.HistogramOpts{}),
)
version := new(uint64)

wrapper := newTableSinkWrapper(
model.DefaultChangeFeedID("1"),
spanz.TableIDToComparableSpan(1),
func() (tablesink.TableSink, uint64) {
*version += 1
return innerTableSink, *version
},
tablepb.TableStatePrepared,
oracle.GoTimeToTS(time.Now()),
oracle.GoTimeToTS(time.Now().Add(10000*time.Second)),
func(_ context.Context) (model.Ts, error) { return math.MaxUint64, nil },
)

require.True(t, wrapper.initTableSink())

wrapper.closeAndClearTableSink()

// Shouldn't be stuck because version is 0.
require.Equal(t, wrapper.tableSink.version, uint64(0))
isStuck, _ := wrapper.sinkMaybeStuck(100 * time.Millisecond)
require.False(t, isStuck)

// Shouldn't be stuck because tableSink.advanced is just updated.
require.True(t, wrapper.initTableSink())
isStuck, _ = wrapper.sinkMaybeStuck(100 * time.Millisecond)
require.False(t, isStuck)

// Shouldn't be stuck because upperbound hasn't been advanced.
time.Sleep(200 * time.Millisecond)
isStuck, _ = wrapper.sinkMaybeStuck(100 * time.Millisecond)
require.False(t, isStuck)

// Shouldn't be stuck because `getCheckpointTs` will update tableSink.advanced.
nowTs := oracle.GoTimeToTS(time.Now())
wrapper.updateReceivedSorterResolvedTs(nowTs)
wrapper.barrierTs.Store(nowTs)
isStuck, _ = wrapper.sinkMaybeStuck(100 * time.Millisecond)
require.False(t, isStuck)

time.Sleep(200 * time.Millisecond)
nowTs = oracle.GoTimeToTS(time.Now())
wrapper.updateReceivedSorterResolvedTs(nowTs)
wrapper.barrierTs.Store(nowTs)
wrapper.updateResolvedTs(model.NewResolvedTs(nowTs))
isStuck, _ = wrapper.sinkMaybeStuck(100 * time.Millisecond)
require.True(t, isStuck)
}
1 change: 1 addition & 0 deletions pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ type SinkConfig struct {

// AdvanceTimeoutInSec is a duration in second. If a table sink progress hasn't been
// advanced for this given duration, the sink will be canceled and re-established.
// Deprecated from v8.1.1.
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
AdvanceTimeoutInSec *uint `toml:"advance-timeout-in-sec" json:"advance-timeout-in-sec,omitempty"`

// Simple Protocol only config, use to control the behavior of sending bootstrap message.
Expand Down
18 changes: 18 additions & 0 deletions pkg/sink/kafka/sarama_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@

import (
"context"
"time"

"github.com/IBM/sarama"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/util"
"github.com/rcrowley/go-metrics"
"go.uber.org/zap"
)

type saramaFactory struct {
Expand All @@ -43,17 +46,32 @@
}

func (f *saramaFactory) AdminClient(ctx context.Context) (ClusterAdminClient, error) {
start := time.Now()

Check warning on line 49 in pkg/sink/kafka/sarama_factory.go

View check run for this annotation

Codecov / codecov/patch

pkg/sink/kafka/sarama_factory.go#L49

Added line #L49 was not covered by tests
config, err := NewSaramaConfig(ctx, f.option)
duration := time.Since(start).Seconds()
if duration > 2 {
log.Warn("new sarama config cost too much time", zap.Any("duration", duration), zap.Stringer("changefeedID", f.changefeedID))
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
}

Check warning on line 54 in pkg/sink/kafka/sarama_factory.go

View check run for this annotation

Codecov / codecov/patch

pkg/sink/kafka/sarama_factory.go#L51-L54

Added lines #L51 - L54 were not covered by tests
if err != nil {
return nil, err
}

start = time.Now()

Check warning on line 59 in pkg/sink/kafka/sarama_factory.go

View check run for this annotation

Codecov / codecov/patch

pkg/sink/kafka/sarama_factory.go#L59

Added line #L59 was not covered by tests
client, err := sarama.NewClient(f.option.BrokerEndpoints, config)
duration = time.Since(start).Seconds()
if duration > 2 {
log.Warn("new sarama client cost too much time", zap.Any("duration", duration), zap.Stringer("changefeedID", f.changefeedID))
}

Check warning on line 64 in pkg/sink/kafka/sarama_factory.go

View check run for this annotation

Codecov / codecov/patch

pkg/sink/kafka/sarama_factory.go#L61-L64

Added lines #L61 - L64 were not covered by tests
if err != nil {
return nil, errors.Trace(err)
}

start = time.Now()

Check warning on line 69 in pkg/sink/kafka/sarama_factory.go

View check run for this annotation

Codecov / codecov/patch

pkg/sink/kafka/sarama_factory.go#L69

Added line #L69 was not covered by tests
admin, err := sarama.NewClusterAdminFromClient(client)
duration = time.Since(start).Seconds()
if duration > 2 {
log.Warn("new sarama cluster admin cost too much time", zap.Any("duration", duration), zap.Stringer("changefeedID", f.changefeedID))
}

Check warning on line 74 in pkg/sink/kafka/sarama_factory.go

View check run for this annotation

Codecov / codecov/patch

pkg/sink/kafka/sarama_factory.go#L71-L74

Added lines #L71 - L74 were not covered by tests
if err != nil {
return nil, errors.Trace(err)
}
Expand Down

This file was deleted.

29 changes: 0 additions & 29 deletions tests/integration_tests/hang_sink_suicide/conf/diff_config.toml

This file was deleted.

47 changes: 0 additions & 47 deletions tests/integration_tests/hang_sink_suicide/run.sh

This file was deleted.

2 changes: 1 addition & 1 deletion tests/integration_tests/run_group.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ group_num=${group#G}
# Other tests that only support mysql: batch_update_to_no_batch ddl_reentrant
# changefeed_fast_fail changefeed_resume_with_checkpoint_ts sequence
# multi_cdc_cluster capture_suicide_while_balance_table
mysql_only="bdr_mode capture_suicide_while_balance_table syncpoint syncpoint_check_ts hang_sink_suicide server_config_compatibility changefeed_dup_error_restart"
mysql_only="bdr_mode capture_suicide_while_balance_table syncpoint syncpoint_check_ts server_config_compatibility changefeed_dup_error_restart"
mysql_only_http="http_api http_api_tls api_v2 http_api_tls_with_user_auth cli_tls_with_auth"
mysql_only_consistent_replicate="consistent_replicate_ddl consistent_replicate_gbk consistent_replicate_nfs consistent_replicate_storage_file consistent_replicate_storage_file_large_value consistent_replicate_storage_s3 consistent_partition_table"

Expand Down
Loading