Skip to content

Conversation

@XuPeng-SH
Copy link
Contributor

@XuPeng-SH XuPeng-SH commented Nov 12, 2025

User description

What type of PR is this?

  • API-change
  • BUG
  • Improvement
  • Documentation
  • Feature
  • Test and CI
  • Code Refactoring

Which issue(s) this PR fixes:

issue #22718

What this PR does / why we need it:

cdc watermark updater main


PR Type

Bug fix, Enhancement


Description

  • Add watermark stall detection to CDC table streams with configurable thresholds

  • Track snapshot progress and emit warnings when watermark fails to advance

  • Implement retryable error handling when stall threshold exceeded

  • Add new metrics for monitoring snapshot stalls and table activity

  • Fix watermark updater to handle missing watermarks gracefully


Diagram Walkthrough

flowchart LR
  A["TableChangeStream"] -->|"detects no progress"| B["handleSnapshotNoProgress"]
  B -->|"increments counter"| C["CdcTableNoProgressCounter"]
  B -->|"sets gauge"| D["CdcTableStuckGauge"]
  B -->|"exceeds threshold"| E["retryable error"]
  F["onWatermarkAdvanced"] -->|"resets state"| G["resetWatermarkStallState"]
  G -->|"clears metrics"| D
  H["WatermarkUpdater"] -->|"handles missing WM"| I["graceful fallback"]
Loading

File Walkthrough

Relevant files
Enhancement
table_change_stream.go
Implement watermark stall detection framework                       

pkg/cdc/table_change_stream.go

  • Add watermark stall detection fields to TableChangeStream struct
  • Implement TableChangeStreamOption pattern with configurable thresholds
    for stall detection and warning intervals
  • Add handleSnapshotNoProgress() to detect and report snapshot timestamp
    stalls with throttled warnings
  • Add resetWatermarkStallState() to clear stall tracking when progress
    resumes
  • Add onWatermarkAdvanced() callback to update metrics and reset stall
    state on successful watermark advancement
  • Initialize stall detection metrics in stream constructor
  • Integrate stall detection into processWithTxn() workflow
+159/-27
cdc_metrics.go
Add snapshot stall counter metric                                               

pkg/util/metric/v2/cdc_metrics.go

  • Add CdcTableNoProgressCounter metric to track snapshot stall
    occurrences per table
  • Register new counter metric in initCDCMetrics()
+10/-0   
Tests
table_change_stream_test.go
Add comprehensive stall detection tests                                   

pkg/cdc/table_change_stream_test.go

  • Add test helpers readGaugeValue() and readCounterValue() for metric
    assertions
  • Add TestTableChangeStream_HandleSnapshotNoProgress_WarningAndReset to
    verify warning emission and metric reset
  • Add TestTableChangeStream_HandleSnapshotNoProgress_ThresholdExceeded
    to verify error on stall threshold breach
  • Add TestTableChangeStream_HandleSnapshotNoProgress_WarningThrottle to
    verify warning throttling behavior
  • Add TestTableChangeStream_HandleSnapshotNoProgress_Defaults to verify
    default configuration values
  • Update createTestStream() helper to accept optional configuration
    parameters
+153/-1 
watermark_updater_test.go
Update watermark updater tests                                                     

pkg/cdc/watermark_updater_test.go

  • Update TestCDCWatermarkUpdater_UpdateWatermarkErrMsg to expect success
    instead of error
  • Add TestCDCWatermarkUpdater_RemoveThenUpdateErrMsg to verify graceful
    handling after watermark removal
+24/-2   
Bug fix
watermark_updater.go
Fix watermark updater error handling                                         

pkg/cdc/watermark_updater.go

  • Fix onJobs() to gracefully handle ErrNoWatermarkFound in
    JT_CDC_UpdateWMErrMsg case
  • Initialize missing watermark entries in readKeysBuffer instead of
    failing
  • Add cache population logic in execReadWM() to persist successfully
    read watermarks
  • Import errors package for error type checking
+13/-2   
Documentation
CDC_USER_GUIDE.md
Document snapshot stall detection feature                               

pkg/cdc/CDC_USER_GUIDE.md

  • Add "Detect Snapshot Stalls" section documenting stall detection
    behavior and metrics
  • Document default thresholds (1 minute stall, 10 second warning
    interval)
  • Provide PromQL query examples for monitoring stalls
  • Add mo_cdc_table_snapshot_no_progress_total to metrics reference table
  • Add example queries for stuck tables and activity timestamps
  • Add snapshot stall detection to alerting recommendations
+35/-0   

@qodo-merge-pro
Copy link

You are nearing your monthly Qodo Merge usage quota. For more information, please visit here.

PR Compliance Guide 🔍

Below is a summary of compliance checks for this PR:

Security Compliance
🟢
No security concerns identified No security vulnerabilities detected by AI analysis. Human verification advised for critical code.
Ticket Compliance
🎫 No ticket provided
  • Create ticket/issue
Codebase Duplication Compliance
Codebase context is not defined

Follow the guide to enable codebase context checks.

Custom Compliance
🟢
Generic: Meaningful Naming and Self-Documenting Code

Objective: Ensure all identifiers clearly express their purpose and intent, making code
self-documenting

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Robust Error Handling and Edge Case Management

Objective: Ensure comprehensive error handling that provides meaningful context and graceful
degradation

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Secure Logging Practices

Objective: To ensure logs are useful for debugging and auditing without exposing sensitive
information like PII, PHI, or cardholder data.

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Security-First Input Validation and Data Handling

Objective: Ensure all data inputs are validated, sanitized, and handled securely to prevent
vulnerabilities

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Comprehensive Audit Trails

Objective: To create a detailed and reliable record of critical system actions for security analysis
and compliance.

Status:
Action logging: New critical behaviors around snapshot stall detection and retryable errors add warnings
and metrics but do not clearly log the final error/decision context when the stall
threshold is exceeded, which may hinder auditability of who/what triggered retries and
why.

Referred Code
if s.lastNoProgressWarning.IsZero() ||
	now.Sub(s.lastNoProgressWarning) >= s.noProgressWarningInterval {
	logutil.Warn(
		"cdc.table_stream.snapshot_not_advanced",
		zap.String("table", s.tableInfo.String()),
		zap.String("from-ts", fromTs.ToString()),
		zap.String("snapshot-ts", snapshotTs.ToString()),
		zap.Duration("stall-duration", stalledFor),
		zap.Duration("threshold", s.watermarkStallThreshold),
	)
	s.lastNoProgressWarning = now
}

if stalledFor >= s.watermarkStallThreshold {
	s.retryable = true
	return moerr.NewInternalErrorf(
		ctx,
		"CDC tableChangeStream %s snapshot timestamp stuck for %v (threshold %v)",
		s.tableInfo.String(),
		stalledFor,
		s.watermarkStallThreshold,


 ... (clipped 2 lines)

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Secure Error Handling

Objective: To prevent the leakage of sensitive system information through error messages while
providing sufficient detail for internal debugging.

Status:
Error detail exposure: Warning and error messages include table identifiers via s.tableInfo.String(), which could
expose internal resource names if surfaced to end users rather than internal logs.

Referred Code
logutil.Warn(
	"cdc.table_stream.snapshot_not_advanced",
	zap.String("table", s.tableInfo.String()),
	zap.String("from-ts", fromTs.ToString()),
	zap.String("snapshot-ts", snapshotTs.ToString()),
	zap.Duration("stall-duration", stalledFor),
	zap.Duration("threshold", s.watermarkStallThreshold),
)
s.lastNoProgressWarning = now

Learn more about managing compliance generic rules or creating your own custom rules

Compliance status legend 🟢 - Fully Compliant
🟡 - Partial Compliant
🔴 - Not Compliant
⚪ - Requires Further Human Verification
🏷️ - Compliance label

@mergify mergify bot added the kind/bug Something isn't working label Nov 12, 2025
@qodo-merge-pro
Copy link

You are nearing your monthly Qodo Merge usage quota. For more information, please visit here.

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Fix incorrect metric update logic

Remove the incorrect update to the mo_cdc_table_last_activity_timestamp metric
within the handleSnapshotNoProgress function to ensure it only reflects the time
of successful progress.

pkg/cdc/table_change_stream.go [794-804]

 func (s *TableChangeStream) handleSnapshotNoProgress(ctx context.Context, fromTs, snapshotTs types.TS) error {
 	now := time.Now()
 	tableLabel := s.progressTracker.tableKey()
 
 	s.progressTracker.RecordRetry()
 	v2.CdcTableNoProgressCounter.WithLabelValues(tableLabel).Inc()
-	v2.CdcTableLastActivityTimestamp.WithLabelValues(tableLabel).Set(float64(now.Unix()))
 
 	if s.noProgressSince.IsZero() {
 		s.noProgressSince = now
 	}
 ...

[To ensure code accuracy, apply this suggestion manually]

Suggestion importance[1-10]: 8

__

Why: The suggestion correctly identifies a logical flaw where a metric for last successful activity is updated upon failure, which would make monitoring and alerting on this metric unreliable.

Medium
Prevent data race on shared map

Prevent a potential data race by adding a lock around the modification of
u.readKeysBuffer when handling a JT_CDC_UpdateWMErrMsg job.

pkg/cdc/watermark_updater.go [423-442]

 		case JT_CDC_UpdateWMErrMsg:
 			if _, err := u.GetFromCache(context.Background(), job.Key); err != nil {
--				job.DoneWithErr(err)
--				continue
-+				if !errors.Is(err, ErrNoWatermarkFound) {
-+					job.DoneWithErr(err)
-+					continue
-+				}
-+				if _, exists := u.readKeysBuffer[*job.Key]; !exists {
-+					u.readKeysBuffer[*job.Key] = WatermarkResult{}
-+				}
+				if !errors.Is(err, ErrNoWatermarkFound) {
+					job.DoneWithErr(err)
+					continue
+				}
+				u.Lock()
+				if _, exists := u.readKeysBuffer[*job.Key]; !exists {
+					u.readKeysBuffer[*job.Key] = WatermarkResult{}
+				}
+				u.Unlock()
 			}
 			u.committingErrMsgBuffer = append(u.committingErrMsgBuffer, job)

[To ensure code accuracy, apply this suggestion manually]

Suggestion importance[1-10]: 8

__

Why: The suggestion correctly identifies a potential data race on the readKeysBuffer map due to concurrent access without a lock, which could lead to unpredictable behavior or crashes.

Medium
  • More

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

kind/bug Something isn't working Review effort 3/5 size/M Denotes a PR that changes [100,499] lines

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants