Skip to content

Conversation

@XuPeng-SH
Copy link
Contributor

@XuPeng-SH XuPeng-SH commented Nov 12, 2025

User description

What type of PR is this?

  • API-change
  • BUG
  • Improvement
  • Documentation
  • Feature
  • Test and CI
  • Code Refactoring

Which issue(s) this PR fixes:

issue #22718

What this PR does / why we need it:

cdc watermark updater


PR Type

Bug fix, Enhancement


Description

  • Add watermark stall detection with configurable thresholds

  • Track snapshot progress and emit warnings when watermark stops advancing

  • Implement automatic error recovery when stall threshold exceeded

  • Add metrics for monitoring table stream health and stall detection


Diagram Walkthrough

flowchart LR
  A["TableChangeStream"] -->|"detects no progress"| B["handleSnapshotNoProgress"]
  B -->|"stall < threshold"| C["emit warning logs"]
  B -->|"stall >= threshold"| D["return retryable error"]
  B -->|"update metrics"| E["CdcTableStuckGauge<br/>CdcTableNoProgressCounter"]
  F["onWatermarkAdvanced"] -->|"reset state"| G["resetWatermarkStallState"]
  G -->|"clear metrics"| E
Loading

File Walkthrough

Relevant files
Enhancement
table_change_stream.go
Add watermark stall detection and monitoring                         

pkg/cdc/table_change_stream.go

  • Add watermark stall detection fields to TableChangeStream struct
  • Implement TableChangeStreamOption pattern with configurable thresholds
  • Add handleSnapshotNoProgress() to detect and handle stalled snapshots
  • Add resetWatermarkStallState() and onWatermarkAdvanced() lifecycle
    methods
  • Initialize metrics on stream creation and update on watermark events
  • Check for snapshot progress in processWithTxn() before processing
+159/-27
cdc_metrics.go
Add snapshot stall counter metric                                               

pkg/util/metric/v2/cdc_metrics.go

  • Add CdcTableNoProgressCounter metric to track snapshot stall
    occurrences
  • Register new counter in initCDCMetrics() function
+10/-0   
Tests
table_change_stream_test.go
Add comprehensive tests for stall detection                           

pkg/cdc/table_change_stream_test.go

  • Add test helpers for reading Prometheus gauge and counter values
  • Add TestTableChangeStream_HandleSnapshotNoProgress_WarningAndReset for
    basic stall detection
  • Add TestTableChangeStream_HandleSnapshotNoProgress_ThresholdExceeded
    for error threshold
  • Add TestTableChangeStream_HandleSnapshotNoProgress_WarningThrottle for
    warning throttling
  • Add TestTableChangeStream_HandleSnapshotNoProgress_Defaults for
    default configuration
  • Update createTestStream() helper to accept options
+153/-1 
watermark_updater_test.go
Update watermark updater tests                                                     

pkg/cdc/watermark_updater_test.go

  • Update TestCDCWatermarkUpdater_UpdateWatermarkErrMsg to expect success
    instead of error
  • Add TestCDCWatermarkUpdater_RemoveThenUpdateErrMsg to verify error
    updates work after removal
+24/-2   
Bug fix
watermark_updater.go
Fix watermark updater error handling                                         

pkg/cdc/watermark_updater.go

  • Fix onJobs() to handle ErrNoWatermarkFound gracefully in
    JT_CDC_UpdateWMErrMsg case
  • Initialize missing watermark entries in readKeysBuffer when not found
  • Update execReadWM() to populate cache from readKeysBuffer before
    processing jobs
  • Allow watermark error updates even when watermark not yet cached
+13/-2   
Documentation
CDC_USER_GUIDE.md
Document snapshot stall detection feature                               

pkg/cdc/CDC_USER_GUIDE.md

  • Add "Detect Snapshot Stalls" section with explanation of stall
    detection behavior
  • Document metrics used for stall monitoring with PromQL examples
  • Add configuration note about adjustable thresholds
  • Update metrics reference table with new
    mo_cdc_table_snapshot_no_progress_total counter
  • Update table stream metrics section with stuck and activity timestamp
    gauges
  • Add alerting recommendations for snapshot stall detection
+35/-0   

@qodo-merge-pro
Copy link

qodo-merge-pro bot commented Nov 12, 2025

PR Compliance Guide 🔍

Below is a summary of compliance checks for this PR:

Security Compliance
🟢
No security concerns identified No security vulnerabilities detected by AI analysis. Human verification advised for critical code.
Ticket Compliance
🎫 No ticket provided
  • Create ticket/issue
Codebase Duplication Compliance
Codebase context is not defined

Follow the guide to enable codebase context checks.

Custom Compliance
🟢
Generic: Meaningful Naming and Self-Documenting Code

Objective: Ensure all identifiers clearly express their purpose and intent, making code
self-documenting

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Robust Error Handling and Edge Case Management

Objective: Ensure comprehensive error handling that provides meaningful context and graceful
degradation

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Secure Logging Practices

Objective: To ensure logs are useful for debugging and auditing without exposing sensitive
information like PII, PHI, or cardholder data.

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Security-First Input Validation and Data Handling

Objective: Ensure all data inputs are validated, sanitized, and handled securely to prevent
vulnerabilities

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Comprehensive Audit Trails

Objective: To create a detailed and reliable record of critical system actions for security analysis
and compliance.

Status:
Action Logging: New logic adds warnings and metrics for snapshot stalls but does not clearly log or audit
successful watermark advances or critical state changes with user/context, making audit
completeness unclear from the diff.

Referred Code
func (s *TableChangeStream) onWatermarkAdvanced() {
	now := time.Now()
	s.lastWatermarkAdvance = now
	s.resetWatermarkStallState()
	v2.CdcTableLastActivityTimestamp.WithLabelValues(
		s.progressTracker.tableKey(),
	).Set(float64(now.Unix()))
}

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Secure Error Handling

Objective: To prevent the leakage of sensitive system information through error messages while
providing sufficient detail for internal debugging.

Status:
Error Detail: The retryable error includes table identifiers in the message which might be user-facing
depending on propagation; verify this is internal-only to avoid leaking internals.

Referred Code
if stalledFor >= s.watermarkStallThreshold {
	s.retryable = true
	return moerr.NewInternalErrorf(
		ctx,
		"CDC tableChangeStream %s snapshot timestamp stuck for %v (threshold %v)",
		s.tableInfo.String(),
		stalledFor,
		s.watermarkStallThreshold,
	)
}

Learn more about managing compliance generic rules or creating your own custom rules

  • Update
Compliance status legend 🟢 - Fully Compliant
🟡 - Partial Compliant
🔴 - Not Compliant
⚪ - Requires Further Human Verification
🏷️ - Compliance label

@qodo-merge-pro
Copy link

qodo-merge-pro bot commented Nov 12, 2025

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Fix race condition on map

To prevent a race condition, wrap the access to the readKeysBuffer map within a
lock in the JT_CDC_UpdateWMErrMsg job handling logic.

pkg/cdc/watermark_updater.go [432-442]

 		case JT_CDC_UpdateWMErrMsg:
 			if _, err := u.GetFromCache(context.Background(), job.Key); err != nil {
 				if !errors.Is(err, ErrNoWatermarkFound) {
 					job.DoneWithErr(err)
 					continue
 				}
+				u.Lock()
 				if _, exists := u.readKeysBuffer[*job.Key]; !exists {
 					u.readKeysBuffer[*job.Key] = WatermarkResult{}
 				}
+				u.Unlock()
 			}
 			u.committingErrMsgBuffer = append(u.committingErrMsgBuffer, job)
  • Apply / Chat
Suggestion importance[1-10]: 9

__

Why: The suggestion correctly identifies a race condition on the readKeysBuffer map, which is accessed without a lock while other parts of the code modify it under a lock, potentially causing a panic.

High
General
Log when a stall is resolved

Add a log message in onWatermarkAdvanced to indicate when a snapshot stall has
been resolved, improving observability of the system's recovery.

pkg/cdc/table_change_stream.go [846-853]

 func (s *TableChangeStream) onWatermarkAdvanced() {
 	now := time.Now()
 	s.lastWatermarkAdvance = now
+	if !s.noProgressSince.IsZero() {
+		logutil.Info(
+			"cdc.table_stream.snapshot_progress_resumed",
+			zap.String("table", s.tableInfo.String()),
+			zap.Duration("stalled_for", now.Sub(s.noProgressSince)),
+		)
+	}
 	s.resetWatermarkStallState()
 	v2.CdcTableLastActivityTimestamp.WithLabelValues(
 		s.progressTracker.tableKey(),
 	).Set(float64(now.Unix()))
 }
  • Apply / Chat
Suggestion importance[1-10]: 6

__

Why: The suggestion proposes adding a log message to indicate when a stall condition is resolved, which improves observability and makes debugging easier.

Low
  • Update

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

kind/bug Something isn't working Review effort 3/5 size/XXL Denotes a PR that changes 2000+ lines

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants