Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions pkg/cdc/CDC_USER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,28 @@ where t.state = 'running'
order by lag_minutes desc;
```

#### Detect Snapshot Stalls

Each table stream tracks whether recent polling rounds failed to advance the snapshot watermark. When this happens, CDC:

- Increments `mo_cdc_table_snapshot_no_progress_total` for the affected table on every stalled round.
- Sets `mo_cdc_table_stuck` to `1` and records the time in `mo_cdc_table_last_activity_timestamp`.
- Emits throttled warning logs; if the stall exceeds the internal threshold (default 1 minute) the table stream raises a retryable error so the scheduler can retry later.
- Resets counters and gauges automatically once progress resumes.

```promql
# Tables currently flagged as stalled
mo_cdc_table_stuck == 1

# Minutes since last successful progress
(time() - mo_cdc_table_last_activity_timestamp) / 60

# Rounds without progress in the last 10 minutes
increase(mo_cdc_table_snapshot_no_progress_total[10m]) > 0
```

> **Note**: The stall threshold and warning interval are configurable (defaults: 1 minute stall threshold, 10 second warning throttle). Adjust your alerting thresholds if you override these values.

### Common Error Messages

| Error Message | Cause | Resolution |
Expand Down Expand Up @@ -785,6 +807,7 @@ rate(mo_cdc_task_state_change_total[5m])
| `mo_cdc_watermark_cache_size` | Gauge | `tier` | Number of watermarks in each cache tier |
| `mo_cdc_watermark_update_total` | Counter | `table`, `update_type` | Count of watermark updates |
| `mo_cdc_watermark_commit_duration_seconds` | Histogram | - | Duration of watermark commits to database |
| `mo_cdc_table_snapshot_no_progress_total` | Counter | `table` | Count of processing rounds where snapshot timestamps failed to advance (stall detection) |

**Example Queries**:
```promql
Expand All @@ -808,6 +831,9 @@ mo_cdc_watermark_cache_size{tier="committed"}
# Watermark commit latency (P99)
histogram_quantile(0.99, mo_cdc_watermark_commit_duration_seconds_bucket)

# Snapshot stalls detected in the last 10 minutes
increase(mo_cdc_table_snapshot_no_progress_total[10m]) > 0

# ⚠️ Planned feature: Watermark Lag Ratio (frequency-agnostic)
# mo_cdc_watermark_lag_ratio # <2: normal, 2-5: warning, >5: critical
```
Expand Down Expand Up @@ -860,6 +886,8 @@ avg(mo_cdc_batch_size_rows)
| `mo_cdc_table_stream_total` | Gauge | `state` | Number of active table streams |
| `mo_cdc_table_stream_round_total` | Counter | `table`, `status` | Count of processing rounds |
| `mo_cdc_table_stream_round_duration_seconds` | Histogram | `table` | Duration of processing rounds |
| `mo_cdc_table_stuck` | Gauge | `table` | Whether a table stream is currently flagged as stalled (1 = stuck, 0 = healthy) |
| `mo_cdc_table_last_activity_timestamp` | Gauge | `table` | Unix timestamp when the table stream last made forward progress |

**Example Queries**:
```promql
Expand All @@ -873,6 +901,12 @@ rate(mo_cdc_table_stream_round_total[5m])

# Processing duration P99
histogram_quantile(0.99, mo_cdc_table_stream_round_duration_seconds_bucket)

# Tables currently marked as stuck
mo_cdc_table_stuck == 1

# Minutes since last successful progress
(time() - mo_cdc_table_last_activity_timestamp) / 60
```

#### Sinker Metrics
Expand Down Expand Up @@ -1227,6 +1261,7 @@ Configure alerts for critical conditions:
- ✅ Watermark stuck (`mo_cdc_watermark_lag_seconds > 300`)
- ✅ No heartbeat (`rate(mo_cdc_heartbeat_total[5m]) == 0`)
- ✅ High error rate (`rate(mo_cdc_task_error_total[5m]) > 0.01`)
- ✅ Snapshot stall detection (`increase(mo_cdc_table_snapshot_no_progress_total[10m]) > 0` or `mo_cdc_table_stuck == 1`)

#### 2. Monitor Watermark Lag

Expand Down
186 changes: 159 additions & 27 deletions pkg/cdc/table_change_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,54 @@ type TableChangeStream struct {

// Observability
progressTracker *ProgressTracker

// Watermark stall detection
watermarkStallThreshold time.Duration
noProgressWarningInterval time.Duration
lastWatermarkAdvance time.Time
noProgressSince time.Time
lastNoProgressWarning time.Time
}

type TableChangeStreamOption func(*tableChangeStreamOptions)

type tableChangeStreamOptions struct {
watermarkStallThreshold time.Duration
noProgressWarningInterval time.Duration
}

const (
defaultWatermarkStallThreshold = time.Minute
defaultNoProgressWarningInterval = 10 * time.Second
)

func defaultTableChangeStreamOptions() *tableChangeStreamOptions {
opts := &tableChangeStreamOptions{}
opts.fillDefaults()
return opts
}

func (opts *tableChangeStreamOptions) fillDefaults() {
if opts.watermarkStallThreshold <= 0 {
opts.watermarkStallThreshold = defaultWatermarkStallThreshold
}
if opts.noProgressWarningInterval <= 0 {
opts.noProgressWarningInterval = defaultNoProgressWarningInterval
}
}

// WithWatermarkStallThreshold configures how long snapshot stagnation is tolerated before surfacing an error.
func WithWatermarkStallThreshold(threshold time.Duration) TableChangeStreamOption {
return func(opts *tableChangeStreamOptions) {
opts.watermarkStallThreshold = threshold
}
}

// WithNoProgressWarningInterval configures how frequently warnings are emitted when snapshot timestamps stall.
func WithNoProgressWarningInterval(interval time.Duration) TableChangeStreamOption {
return func(opts *tableChangeStreamOptions) {
opts.noProgressWarningInterval = interval
}
}

// NewTableChangeStream creates a new table change stream
Expand All @@ -111,12 +159,21 @@ var NewTableChangeStream = func(
startTs, endTs types.TS,
noFull bool,
frequency time.Duration,
options ...TableChangeStreamOption,
) *TableChangeStream {
// Parse frequency
if frequency <= 0 {
frequency = DefaultFrequency
}

opts := defaultTableChangeStreamOptions()
for _, opt := range options {
if opt != nil {
opt(opts)
}
}
opts.fillDefaults()

// Create watermark key
watermarkKey := &WatermarkKey{
AccountId: accountId,
Expand Down Expand Up @@ -179,33 +236,40 @@ var NewTableChangeStream = func(
}

stream := &TableChangeStream{
txnManager: txnManager,
dataProcessor: dataProcessor,
cnTxnClient: cnTxnClient,
cnEngine: cnEngine,
mp: mp,
packerPool: packerPool,
accountId: accountId,
taskId: taskId,
tableInfo: tableInfo,
tableDef: tableDef,
sinker: sinker,
watermarkUpdater: watermarkUpdater,
watermarkKey: watermarkKey,
tick: time.NewTicker(frequency),
frequency: frequency,
runningReaders: runningReaders,
runningReaderKey: GenDbTblKey(tableInfo.SourceDbName, tableInfo.SourceTblName),
initSnapshotSplitTxn: initSnapshotSplitTxn,
startTs: startTs,
endTs: endTs,
noFull: noFull,
insTsColIdx: insTsColIdx,
insCompositedPkColIdx: insCompositedPkColIdx,
delTsColIdx: delTsColIdx,
delCompositedPkColIdx: delCompositedPkColIdx,
progressTracker: progressTracker,
}
txnManager: txnManager,
dataProcessor: dataProcessor,
cnTxnClient: cnTxnClient,
cnEngine: cnEngine,
mp: mp,
packerPool: packerPool,
accountId: accountId,
taskId: taskId,
tableInfo: tableInfo,
tableDef: tableDef,
sinker: sinker,
watermarkUpdater: watermarkUpdater,
watermarkKey: watermarkKey,
tick: time.NewTicker(frequency),
frequency: frequency,
runningReaders: runningReaders,
runningReaderKey: GenDbTblKey(tableInfo.SourceDbName, tableInfo.SourceTblName),
initSnapshotSplitTxn: initSnapshotSplitTxn,
startTs: startTs,
endTs: endTs,
noFull: noFull,
insTsColIdx: insTsColIdx,
insCompositedPkColIdx: insCompositedPkColIdx,
delTsColIdx: delTsColIdx,
delCompositedPkColIdx: delCompositedPkColIdx,
progressTracker: progressTracker,
watermarkStallThreshold: opts.watermarkStallThreshold,
noProgressWarningInterval: opts.noProgressWarningInterval,
lastWatermarkAdvance: time.Now(),
}

tableLabel := progressTracker.tableKey()
v2.CdcTableStuckGauge.WithLabelValues(tableLabel).Set(0)
v2.CdcTableLastActivityTimestamp.WithLabelValues(tableLabel).Set(float64(time.Now().Unix()))

stream.start.Add(1)
return stream
Expand Down Expand Up @@ -567,6 +631,12 @@ func (s *TableChangeStream) processWithTxn(
toTs = s.endTs
}

if !toTs.GT(&fromTs) {
return s.handleSnapshotNoProgress(ctx, fromTs, toTs)
}

s.resetWatermarkStallState()

// Start tracking this round
s.progressTracker.StartRound(fromTs, toTs)
s.progressTracker.SetTargetWatermark(toTs)
Expand Down Expand Up @@ -697,6 +767,7 @@ func (s *TableChangeStream) processWithTxn(
s.progressTracker.EndRound(true, nil)
s.progressTracker.UpdateWatermark(toTs)
s.progressTracker.RecordTransaction()
s.onWatermarkAdvanced()

logutil.Debug(
"cdc.table_stream.round_complete",
Expand All @@ -720,6 +791,67 @@ func (s *TableChangeStream) processWithTxn(
}
}

func (s *TableChangeStream) handleSnapshotNoProgress(ctx context.Context, fromTs, snapshotTs types.TS) error {
now := time.Now()
tableLabel := s.progressTracker.tableKey()

s.progressTracker.RecordRetry()
v2.CdcTableNoProgressCounter.WithLabelValues(tableLabel).Inc()
v2.CdcTableLastActivityTimestamp.WithLabelValues(tableLabel).Set(float64(now.Unix()))

if s.noProgressSince.IsZero() {
s.noProgressSince = now
}

stalledFor := now.Sub(s.noProgressSince)
v2.CdcTableStuckGauge.WithLabelValues(tableLabel).Set(1)

if s.lastNoProgressWarning.IsZero() ||
now.Sub(s.lastNoProgressWarning) >= s.noProgressWarningInterval {
logutil.Warn(
"cdc.table_stream.snapshot_not_advanced",
zap.String("table", s.tableInfo.String()),
zap.String("from-ts", fromTs.ToString()),
zap.String("snapshot-ts", snapshotTs.ToString()),
zap.Duration("stall-duration", stalledFor),
zap.Duration("threshold", s.watermarkStallThreshold),
)
s.lastNoProgressWarning = now
}

if stalledFor >= s.watermarkStallThreshold {
s.retryable = true
return moerr.NewInternalErrorf(
ctx,
"CDC tableChangeStream %s snapshot timestamp stuck for %v (threshold %v)",
s.tableInfo.String(),
stalledFor,
s.watermarkStallThreshold,
)
}

return nil
}

func (s *TableChangeStream) resetWatermarkStallState() {
if s.noProgressSince.IsZero() && s.lastNoProgressWarning.IsZero() {
return
}

s.noProgressSince = time.Time{}
s.lastNoProgressWarning = time.Time{}
v2.CdcTableStuckGauge.WithLabelValues(s.progressTracker.tableKey()).Set(0)
}

func (s *TableChangeStream) onWatermarkAdvanced() {
now := time.Now()
s.lastWatermarkAdvance = now
s.resetWatermarkStallState()
v2.CdcTableLastActivityTimestamp.WithLabelValues(
s.progressTracker.tableKey(),
).Set(float64(now.Unix()))
}

// handleStaleRead handles StaleRead error by resetting watermark
func (s *TableChangeStream) handleStaleRead(ctx context.Context, txnOp client.TxnOperator) error {
// If startTs is set and noFull is false, StaleRead is fatal
Expand Down
Loading
Loading