Skip to content

Conversation

cpegeric
Copy link
Contributor

@cpegeric cpegeric commented Oct 7, 2025

User description

What type of PR is this?

  • API-change
  • BUG
  • Improvement
  • Documentation
  • Feature
  • Test and CI
  • Code Refactoring

Which issue(s) this PR fixes:

issue ##21835

What this PR does / why we need it:

  1. hnsw update improvement - hnsw update will call hnsw APIs instead of running SQL. With HNSW APIs, we can
    load the models once and perform all batches from DataRetriever and finally save model files into database.
    This changes save a lot of time for upload/download the model every time there is a 8192 vector block.
  2. Common interface SqlProcess created to allow vector index and fulltext APIs in both frontend (process.Process) and background mode (when process.Process is absent)
  3. iscp integration
  4. fix alter table add column by using TxnOperator.AppendEventCallback (Closed event)
  5. fully async ivfflat index update. Kmean clustering will be done in separate thread in async mode.

PR Type

Enhancement, Tests


Description

ISCP Integration: Added comprehensive Index Sync Change Processing (ISCP) support for async vector index operations with CDC task management, job lifecycle handling, and background processing capabilities
Fully Async IVF Flat: Implemented fully asynchronous IVF flat index updates with K-means clustering running in separate threads, eliminating blocking operations during index updates
HNSW Performance Improvements: Refactored HNSW updates to use direct APIs instead of SQL execution, enabling batch processing with model persistence and significantly reducing upload/download overhead
SqlProcess Abstraction: Created unified SqlProcess wrapper to support both frontend (process.Process) and background execution modes, enabling vector index operations in headless environments
Transaction Event System: Enhanced transaction event handling with context-aware callbacks and structured TxnEventCallback interface for better error handling and lifecycle management
DDL Integration: Added ISCP job management to DDL operations including table/index creation, drops, and alter operations with proper async index handling
Comprehensive Testing: Added extensive test coverage for async vector operations, ISCP utilities, and new interfaces across HNSW, IVF, and fulltext indexes


Diagram Walkthrough

flowchart LR
  A["SQL Operations"] --> B["SqlProcess Wrapper"]
  B --> C["Frontend Mode"]
  B --> D["Background Mode"]
  
  E["Vector Index Updates"] --> F["ISCP Consumer"]
  F --> G["CDC Processing"]
  G --> H["Async HNSW"]
  G --> I["Async IVF Flat"]
  
  J["Transaction Events"] --> K["TxnEventCallback"]
  K --> L["Context-Aware Handling"]
  
  M["DDL Operations"] --> N["ISCP Job Management"]
  N --> O["Index Lifecycle"]
Loading

File Walkthrough

Relevant files
Enhancement
27 files
sync.go
Refactor HNSW sync to use SqlProcess wrapper and separate lifecycle
methods

pkg/vectorindex/hnsw/sync.go

• Refactored CdcSync function to use new HnswSync struct with separate
NewHnswSync, RunOnce, Update, and Save methods
• Replaced
process.Process parameter with SqlProcess wrapper to support both
frontend and background execution
• Added DownloadAll method to
pre-download all models and improved model lifecycle management

Updated all function signatures to use SqlProcess instead of
process.Process

+120/-76
alter.go
Integrate ISCP job management into alter table operations

pkg/sql/compile/alter.go

• Added ISCP job management during alter table operations - dropping
jobs for temp tables and registering jobs for unaffected indexes

Enhanced index cloning logic to handle async indexes and skip certain
index types during clone operations
• Added validation for index CDC
tasks and improved error handling for IVF index table operations

Restructured the alter table flow to better handle ISCP integration

+150/-47
sqlexec.go
Create SqlProcess wrapper for unified frontend and background SQL
execution

pkg/vectorindex/sqlexec/sqlexec.go

• Created new SqlProcess wrapper to support both frontend
(process.Process) and background (SqlContext) execution modes
• Added
SqlContext struct for background SQL execution with required context,
UUID, and transaction operator
• Implemented RunTxnWithSqlContext
function for running transactions in background mode
• Updated all SQL
execution functions to work with SqlProcess wrapper

+273/-61
operator.go
Add context-aware transaction event handling and callback system

pkg/txn/client/operator.go

• Updated transaction event handling to pass context.Context to all
triggerEvent calls
• Modified callback signatures to include context
parameter for better error handling
• Enhanced transaction lifecycle
methods (Commit, Rollback, closeLocked) with context-aware event
triggering
• Updated error handling in transaction operations to
properly propagate context

+40/-17 
ddl.go
Integrate ISCP job lifecycle management into DDL operations

pkg/sql/compile/ddl.go

• Added ISCP job management for database drops, table drops, and index
operations
• Integrated async index handling to skip sync operations
for async IVF and fulltext indexes
• Added ISCP job creation for new
tables with indexes and proper cleanup during drops
• Enhanced alter
table rename operations to update ISCP job references

+97/-9   
mock_consumer.go
Update mock consumer to use direct transaction operators instead of
executors

pkg/iscp/mock_consumer.go

• Updated NewInteralSqlConsumer to accept cnEngine and cnTxnClient
parameters
• Replaced executor-based transaction handling with direct
transaction operator usage
• Modified consumeData and related methods
to use client.TxnOperator instead of executor.TxnExecutor
• Added
getTxn helper function for transaction creation

+53/-29 
build_dml_util.go
Add async index support to DML operations and plan building

pkg/sql/plan/build_dml_util.go

• Added async index checks to skip sync operations for async IVF and
fulltext indexes in DML operations
• Updated
buildPreInsertMultiTableIndexes, buildDeleteMultiTableIndexes, and
fulltext index functions
• Enhanced index handling logic to
differentiate between sync and async index operations
• Improved DML
plan building to respect async index configuration

+65/-18 
fulltext.go
Update fulltext functions to use SqlProcess wrapper           

pkg/sql/colexec/table_function/fulltext.go

• Updated fulltext SQL execution calls to use SqlProcess wrapper

Modified runWordStats and runCountStar functions to use
sqlexec.NewSqlProcess(proc)
• Aligned with the new SQL execution
pattern using SqlProcess abstraction

+2/-2     
storage_test.go
Update transaction event callbacks to new interface           

pkg/partitionservice/storage_test.go

• Updated transaction event callback to use new TxnEventCallback
structure with context and error handling
• Modified callback
functions to return error instead of void

+24/-20 
build_test.go
Integrate SqlProcess wrapper for HNSW build operations     

pkg/vectorindex/hnsw/build_test.go

• Added import for sqlexec package
• Updated NewHnswBuild calls to use
sqlexec.NewSqlProcess(proc) instead of direct proc

+5/-2     
hnsw_search.go
Integrate SqlProcess wrapper for HNSW search operations   

pkg/sql/colexec/table_function/hnsw_search.go

• Added import for sqlexec package
• Updated vector cache search call
to use sqlexec.NewSqlProcess(proc)

+2/-1     
data_retriever.go
Refactor watermark update to use transaction operator       

pkg/iscp/data_retriever.go

• Updated UpdateWatermark method signature to use context and
transaction operator instead of executor
• Added system account
context setup with timeout
• Replaced SQL execution with
ExecWithResult function

+18/-4   
service.go
Update increment service transaction callbacks                     

pkg/incrservice/service.go

• Updated transaction event callback registration to use new
TxnEventCallback structure
• Modified txnClosed method to match new
callback signature with error return

+4/-3     
storage_test.go
Update shard service transaction callbacks                             

pkg/shardservice/storage_test.go

• Updated transaction event callbacks to use new TxnEventCallback
structure
• Modified callback functions to include context parameter
and return error

+20/-16 
hnsw_create.go
Integrate SqlProcess wrapper for HNSW creation                     

pkg/sql/colexec/table_function/hnsw_create.go

• Updated SQL execution calls to use sqlexec.NewSqlProcess(proc)

Modified HNSW build creation to use SqlProcess wrapper

+3/-3     
consumer.go
Add engine and transaction client to consumer creation     

pkg/iscp/consumer.go

• Added engine and transaction client parameters to consumer
constructors
• Updated NewConsumer function signature to include
additional dependencies

+9/-3     
types.go
Introduce structured transaction event callbacks                 

pkg/txn/client/types.go

• Added new TxnEventCallback struct with function and value fields

Updated AppendEventCallback method signature to use callback struct

Added constructor functions for creating callback instances

+18/-1   
types.go
Update DataRetriever interface for transaction integration

pkg/iscp/types.go

• Updated DataRetriever interface to use context and transaction
operator for watermark updates
• Removed dependency on executor
package

+1/-2     
metadata_scan.go
Integrate SqlProcess wrapper for metadata scanning             

pkg/sql/colexec/table_function/metadata_scan.go

• Updated SQL execution to use sqlexec.NewSqlProcess(proc)
• Added
import for sqlexec package and reorganized imports

+4/-2     
store_mem.go
Update memory store transaction callbacks                               

pkg/incrservice/store_mem.go

• Updated transaction event callback to use new TxnEventCallback
structure
• Modified callback function to include context and return
error

+12/-10 
sql.go
Integrate SqlProcess wrapper for IVF flat operations         

pkg/vectorindex/ivfflat/sql.go

• Updated GetVersion function to use SqlProcess instead of
process.Process
• Modified SQL execution and error context to use
SqlProcess

+4/-4     
types.go
Enhance vector index CDC with capacity and sonic JSON       

pkg/vectorindex/types.go

• Updated NewVectorIndexCdc to accept capacity parameter
• Replaced
json.Marshal with sonic.Marshal for better performance

+4/-4     
ivf_create.go
Integrate SqlProcess wrapper for IVF creation                       

pkg/sql/colexec/table_function/ivf_create.go

• Updated version retrieval and SQL execution to use
sqlexec.NewSqlProcess(proc)

+2/-2     
service.go
Update partition service transaction callbacks                     

pkg/partitionservice/service.go

• Updated transaction event callback to use new TxnEventCallback
structure
• Modified callback function to include context and return
error

+7/-5     
iteration.go
Update consumer creation with additional dependencies       

pkg/iscp/iteration.go

• Updated NewConsumer call to include engine and transaction client
parameters

+1/-1     
storage_txn_client.go
Update memory storage transaction callback interface         

pkg/txn/storage/memorystorage/storage_txn_client.go

• Updated AppendEventCallback method signature to use TxnEventCallback
parameter

+1/-1     
watermark_updater.go
Make ExecWithResult function configurable                               

pkg/iscp/watermark_updater.go

• Made ExecWithResult function a variable for potential mocking or
replacement

+1/-1     
Tests
24 files
index_consumer_test.go
Update ISCP consumer tests for new engine integration and IVF support

pkg/iscp/index_consumer_test.go

• Updated test functions to use new NewConsumer signature with
cnEngine and cnClient parameters
• Replaced mock SQL executor with
ExecWithResult stub for cleaner testing
• Added new test functions for
IVF index testing (newTestIvfTableDef, newTestIvfConsumerInfo)

Updated HNSW test to verify JSON CDC format instead of SQL function
calls

+158/-106
sync_test.go
Update HNSW sync tests to use SqlProcess wrapper and new API

pkg/vectorindex/hnsw/sync_test.go

• Updated all test functions to use SqlProcess wrapper instead of
direct process.Process
• Modified test calls from CdcSync to
NewHnswSync followed by RunOnce pattern
• Added new continuous update
test TestSyncContinuousUpdateInsertShuffle2FilesF32WithSmallCap

Updated mock functions to accept SqlProcess parameter

+136/-27
iscp_util_test.go
Add comprehensive tests for ISCP utility functions             

pkg/sql/compile/iscp_util_test.go

• New test file for ISCP utility functions
• Added comprehensive tests
for CDC task validation and creation
• Included mock functions for
testing error scenarios
• Added tests for index algorithm validation
and sinker type determination
• Implemented panic testing for invalid
algorithm types

+301/-0 
model_test.go
Update HNSW model tests for SqlProcess interface                 

pkg/vectorindex/hnsw/model_test.go

• Updated test functions to use *sqlexec.SqlProcess instead of
*process.Process
• Modified mock streaming function signatures to
match new interface
• Added sqlproc initialization in test setup

Updated all model method calls to use sqlproc parameter

+22/-18 
engine_mock.go
Update engine mock to match interface changes                       

pkg/frontend/test/engine_mock.go

• Updated mock method signatures to match interface changes
• Added
skipDeletes parameter to CollectChanges method
• Added partitionIndex
parameter to PrimaryKeysMayBeModified method
• Added new HasTempEngine
mock method
• Updated method call recordings to include new parameters

+22/-20 
search_test.go
Update HNSW search tests for SqlProcess interface               

pkg/vectorindex/hnsw/search_test.go

• Updated mock function signatures to use *sqlexec.SqlProcess instead
of *process.Process
• Added sqlproc initialization in test functions

Modified streaming mock functions to handle new interface
• Updated
cache search calls to use sqlproc parameter

+15/-10 
search_test.go
Update IVF search tests for SqlProcess interface                 

pkg/vectorindex/ivfflat/search_test.go

• Updated mock function signatures to use *sqlexec.SqlProcess instead
of *process.Process
• Added sqlproc initialization in test functions

Modified streaming mock functions to handle new interface
• Updated
search method calls to use sqlproc parameter

+12/-7   
ivf_search_test.go
Update IVF search table function tests for SqlProcess interface

pkg/sql/colexec/table_function/ivf_search_test.go

• Updated mock function signatures to use *sqlexec.SqlProcess instead
of *process.Process
• Modified mock search interface methods to accept
SqlProcess parameter
• Added import for sqlexec package

+6/-5     
build_dml_util_test.go
Add tests for async FullText index DML operations               

pkg/sql/plan/build_dml_util_test.go

• Added new test functions for async FullText index processing

Included tests for invalid JSON parameter handling
• Added validation
tests for async parameter parsing
• Added import for plan package

+49/-0   
fulltext_test.go
Update fulltext tests for SqlProcess integration                 

pkg/sql/colexec/table_function/fulltext_test.go

• Updated test functions to use SqlProcess parameter instead of direct
process.Process
• Modified fake SQL execution functions to accept
SqlProcess wrapper

+5/-3     
hnsw_search_test.go
Update HNSW search tests for SqlProcess integration           

pkg/sql/colexec/table_function/hnsw_search_test.go

• Updated mock search interface to use SqlProcess instead of
process.Process
• Modified Search and Load methods to accept
SqlProcess parameter

+3/-2     
hnsw_create_test.go
Update HNSW create tests for SqlProcess integration           

pkg/sql/colexec/table_function/hnsw_create_test.go

• Updated mock SQL execution function to use SqlProcess parameter

Modified test helper to accept SqlProcess wrapper

+3/-2     
sqlexec_test.go
Update SQL execution tests for SqlProcess integration       

pkg/vectorindex/sqlexec/sqlexec_test.go

• Updated test functions to use SqlProcess wrapper instead of direct
process
• Modified RunTxn calls to accept SqlProcess parameter

+5/-2     
operator_events_test.go
Update transaction operator event tests                                   

pkg/txn/client/operator_events_test.go

• Updated transaction event callback test to use new TxnEventCallback
structure
• Modified callback function to match new signature with
context and error return

+6/-3     
txn_mock.go
Update transaction operator mock for new callback interface

pkg/frontend/test/txn_mock.go

• Updated mock AppendEventCallback method signature to use
TxnEventCallback parameter

+1/-1     
store_sql_test.go
Update increment service SQL store test mocks                       

pkg/incrservice/store_sql_test.go

• Updated test transaction operator mock to use new TxnEventCallback
parameter

+1/-1     
service_test.go
Update bootstrap service test mocks                                           

pkg/bootstrap/service_test.go

• Updated test transaction operator mock to use new TxnEventCallback
parameter

+1/-1     
txn_test.go
Update frontend transaction test mocks                                     

pkg/frontend/txn_test.go

• Updated test transaction operator mock to use new TxnEventCallback
parameter

+1/-1     
entire_engine_test.go
Update engine test operator mocks                                               

pkg/vm/engine/entire_engine_test.go

• Updated test operator mock to use new TxnEventCallback parameter

+1/-1     
types_test.go
Update vector index CDC test for capacity parameter           

pkg/vectorindex/types_test.go

• Updated NewVectorIndexCdc call to include capacity parameter (8192)

+1/-1     
vector_ivf_async.result
Add async IVF flat vector index test results                         

test/distributed/cases/vector/vector_ivf_async.result

• Added comprehensive test results for async IVF flat vector index
operations
• Includes create, insert, search, and load operations with
expected outputs

+61/-0   
vector_hnsw.result
Update HNSW vector index test results for async behavior 

test/distributed/cases/vector/vector_hnsw.result

• Added sleep operation and updated expected search results
• Modified
test results to show proper async index behavior

+10/-2   
vector_hnsw_f64_async.sql
Add async HNSW F64 vector index test suite                             

test/distributed/cases/vector/vector_hnsw_f64_async.sql

• Added comprehensive test suite for async HNSW F64 vector operations

• Includes CDC operations, index creation, and search functionality
tests

+96/-0   
vector_hnsw.sql
Update HNSW vector test for async index operations             

test/distributed/cases/vector/vector_hnsw.sql

• Added sleep operation to wait for async index updates
• Updated
comments to reflect async index behavior

+6/-1     
Refactoring
13 files
cache_test.go
Update cache tests to use SqlProcess interface                     

pkg/vectorindex/cache/cache_test.go

• Updated import from process to sqlexec package
• Modified mock
search interfaces to use *sqlexec.SqlProcess instead of
*process.Process
• Added sqlproc variable initialization in test
functions
• Updated all cache search calls to use sqlproc parameter

+24/-17 
model.go
Refactor HNSW model to use SqlProcess interface                   

pkg/vectorindex/hnsw/model.go

• Updated function signatures to use *sqlexec.SqlProcess instead of
*process.Process
• Added NThread field to HnswModel struct and
initialization logic
• Modified index loading methods to handle empty
checksums and file sizes
• Updated context handling to use
sqlproc.GetContext() instead of proc.Ctx
• Added initIndex method for
index initialization

+70/-26 
search.go
Refactor IVF search to use SqlProcess interface                   

pkg/vectorindex/ivfflat/search.go

• Updated function signatures to use *sqlexec.SqlProcess instead of
*process.Process
• Modified context handling to use
sqlproc.GetContext() and sqlproc.GetTopContext()
• Updated error
handling to use new context references
• Changed all search and load
methods to accept SqlProcess parameter

+22/-19 
service_txn_event.go
Update transaction event handling to new callback interface

pkg/txn/trace/service_txn_event.go

• Updated transaction event callback registrations to use
NewTxnEventCallback wrapper
• Modified event handler function
signatures to include context and additional parameters
• Added return
statements to event handler functions
• Updated all event callback
functions to match new interface requirements

+26/-20 
search.go
Refactor HNSW search to use SqlProcess interface                 

pkg/vectorindex/hnsw/search.go

• Updated function signatures to use *sqlexec.SqlProcess instead of
*process.Process
• Modified context handling and error reporting to
use sqlproc.GetContext()
• Updated metadata loading and index loading
methods
• Changed search method to accept SqlProcess parameter

+9/-10   
cache.go
Update vector index cache to use SqlProcess interface       

pkg/vectorindex/cache/cache.go

• Updated VectorIndexSearchIf interface to use *sqlexec.SqlProcess
instead of *process.Process
• Modified cache search and load methods
to accept SqlProcess parameter
• Updated all internal method calls to
use new interface
• Changed import from process to sqlexec package

+10/-10 
client.go
Update transaction client for new event callback interface

pkg/txn/client/client.go

• Updated transaction event callback registration to use
TxnEventCallback struct
• Modified callback function signatures to
include context and additional parameters
• Added return statements to
callback functions
• Updated SyncLatestCommitTS call to match new
interface

+11/-5   
index_sqlwriter.go
Update HNSW SQL writer for CDC integration                             

pkg/iscp/index_sqlwriter.go

• Modified HNSW SQL writer to return JSON data instead of SQL
statements
• Added NewSync method to create HnswSync instances

Updated writer capacity initialization for CDC processing
• Added
imports for hnsw and sqlexec packages

+9/-4     
ivf_search.go
Update IVF search table function for SqlProcess interface

pkg/sql/colexec/table_function/ivf_search.go

• Updated function calls to use sqlexec.NewSqlProcess(proc) wrapper

Modified getVersion and cache search calls to use SqlProcess interface

• Added import for sqlexec package

+3/-2     
service.go
Update shard service for new transaction event interface 

pkg/shardservice/service.go

• Updated transaction event callback registration to use
NewTxnEventCallback wrapper
• Modified callback function signatures to
include context and additional parameters
• Added return statements to
callback functions
• Updated both create and delete callback
registrations

+26/-22 
operator_events.go
Refactor transaction event callbacks with structured interface

pkg/txn/client/operator_events.go

• Added TxnEventCallback struct to replace function callbacks

Updated AppendEventCallback to accept TxnEventCallback instead of
functions
• Modified event triggering to handle new callback interface
with context
• Added error handling to callback execution

+11/-6   
build.go
Refactor HNSW build to use SqlProcess interface                   

pkg/vectorindex/hnsw/build.go

• Updated function signatures to use *sqlexec.SqlProcess instead of
*process.Process
• Modified context handling to use
sqlproc.GetContext()
• Updated NewHnswBuild and addFromChannel methods

• Changed import from process to sqlexec package

+7/-6     
function_id.go
Remove HNSW CDC update function constant                                 

pkg/sql/plan/function/function_id.go

• Removed HNSW_CDC_UPDATE function constant
• Decreased
FUNCTION_END_NUMBER from 351 to 350

+1/-7     
Feature
3 files
index_consumer.go
Integrate HNSW and async processing with ISCP consumer     

pkg/iscp/index_consumer.go

• Added new imports for sonic, types, hnsw, sqlexec, and engine

Modified IndexConsumer struct to include cnEngine, cnTxnClient, and
algo fields
• Replaced SQL executor with transaction-based SQL
execution using sqlexec.RunTxnWithSqlContext
• Added specialized HNSW
processing with runHnsw function for different numeric types

Implemented CDC data processing with JSON unmarshaling for HNSW
updates

+171/-55
iscp_util.go
Add ISCP utility functions for CDC task management             

pkg/sql/compile/iscp_util.go

• New utility file for ISCP (Index Sync Change Processing) integration

• Added functions for CDC task management: create, delete, and
validation
• Implemented transaction event callbacks for async index
operations
• Added support for different index algorithms (HNSW, IVF,
FullText)
• Included helper functions for sinker type determination
and job ID generation

+321/-0 
ddl_index_algo.go
Add async index processing support to DDL operations         

pkg/sql/compile/ddl_index_algo.go

• Added async processing support for FullText and IVF index creation

Modified index creation to check for async parameter and create ISCP
jobs accordingly
• Updated HNSW index handling to support both sync
and async modes
• Added transaction event callbacks for long-running
index operations
• Integrated CDC task creation for async index
updates

+119/-47
Error handling
1 files
util.go
Add error handling to fulltext index SQL generation           

pkg/sql/compile/util.go

• Updated genInsertIndexTableSqlForFullTextIndex function to return
error alongside SQL strings

+2/-2     
Miscellaneous
1 files
function_id_test.go
Remove HNSW CDC update function ID                                             

pkg/sql/plan/function/function_id_test.go

• Removed HNSW_CDC_UPDATE function ID entry
• Updated
FUNCTION_END_NUMBER from 351 to 350

+1/-3     
Additional files
9 files
func_hnsw.go +0/-106 
func_hnsw_test.go +0/-191 
list_builtIn.go +0/-21   
fulltext_async.result +23/-0   
fulltext_async.sql +25/-0   
vector_hnsw_async.result +66/-0   
vector_hnsw_async.sql +96/-0   
vector_hnsw_f64_async.result +66/-0   
vector_ivf_async.sql +60/-0   

@cpegeric cpegeric closed this Oct 9, 2025
@cpegeric cpegeric reopened this Oct 9, 2025
@cpegeric cpegeric closed this Oct 9, 2025
Copy link

qodo-merge-pro bot commented Oct 9, 2025

PR Compliance Guide 🔍

Below is a summary of compliance checks for this PR:

Security Compliance
Callback interruption

Description: Transaction event callbacks now accept context and are invoked on Commit/Rollback/Closed
paths; if untrusted callbacks are registered, they could block or panic and disrupt
transaction flow, so ensure callbacks are trusted and bounded in time.
operator.go [617-714]

Referred Code
}

func (tc *txnOperator) Commit(ctx context.Context) (err error) {

	if tc.reset.runningSQL.Load() && !tc.markAborted() {
		tc.logger.Fatal("commit on running txn",
			zap.String("txnID", hex.EncodeToString(tc.reset.txnID)))
	}

	tc.reset.commitCounter.addEnter()
	defer tc.reset.commitCounter.addExit()
	txnMeta := tc.getTxnMeta(false)
	util.LogTxnCommit(tc.logger, txnMeta)

	readonly := tc.reset.workspace != nil && tc.reset.workspace.Readonly()
	if !readonly {
		tc.reset.commitSeq = tc.NextSequence()
		tc.reset.commitAt = time.Now()

		err = tc.triggerEvent(ctx, newEvent(CommitEvent, txnMeta, tc.reset.commitSeq, nil))
		if err != nil {


 ... (clipped 77 lines)
DoS via CDC load

Description: Parallel CDC insert/update logic spawns goroutines processing user-supplied CDC data
without per-item validation or bounds checks, which could lead to resource exhaustion if
CDC batches are very large; rate limiting or concurrency caps should be enforced beyond
ThreadsBuild.
sync.go [241-361]

Referred Code
	for _, m := range s.indexes {
		err = m.LoadIndex(sqlproc, s.idxcfg, s.tblcfg, s.tblcfg.ThreadsBuild, false)
		if err != nil {
			return
		}
		err = m.Unload()
		if err != nil {
			return
		}
	}

	return
}

func (s *HnswSync[T]) checkContains(sqlproc *sqlexec.SqlProcess, cdc *vectorindex.VectorIndexCdc[T]) (maxcap uint, midx []int, err error) {
	err_chan := make(chan error, s.tblcfg.ThreadsBuild)

	maxcap = uint(s.tblcfg.IndexCapacity)

	// try to find index cap



 ... (clipped 100 lines)
SQL construction risk

Description: DDL path issues DELETE statements on index tables constructed with formatted strings;
while identifiers appear controlled, ensure all dynamic table names/DB names are properly
sanitized/quoted to avoid SQL injection through metadata tampering.
alter.go [770-833]

Referred Code
	continue
}

async, err := catalog.IsIndexAsync(oriIdxTblNames.IndexAlgoParams)
if err != nil {
	return err
}

if !oriIdxTblNames.Unique &&
	((catalog.IsFullTextIndexAlgo(oriIdxTblNames.IndexAlgo) && async) ||
		catalog.IsHnswIndexAlgo(oriIdxTblNames.IndexAlgo)) {
	// skip fultext async index and hsnw index clone because index table may not be fully sync'd
	logutil.Infof("cloneUnaffectedIndex: skip async index %v\n", oriIdxTblNames)
	continue
}

for _, oriIdxTblName := range oriIdxTblNames.Indexes {

	var newIdxTblName IndexTypeInfo
	found := false
	for _, idxinfo := range newIdxTblNames.Indexes {


 ... (clipped 43 lines)
Ticket Compliance
🟡
🎫 #21835
🟢 Provide a mechanism to update HNSW vector indexes via CDC for INSERT, DELETE, and UPDATE
operations.
Implement a general framework/interface so other vector index types (e.g., IVF, fulltext)
can also be updated via CDC.
Integrate the CDC-driven index updates with the system’s background/async processing
model.
Ensure transactional consistency by hooking into transaction lifecycle/events where
needed.
Add tests validating HNSW (and other vector index) CDC updates including async paths.
Optimize performance so index model load/save overhead is minimized during CDC processing.
Validate end-to-end CDC throughput and latency under production-like workloads for HNSW
and IVF async paths.
Confirm compatibility and stability across cluster upgrades and rollback scenarios with
live CDC jobs.
Verify operational runbooks and documentation for configuring SqlProcess, async variables,
and ISCP jobs.
Codebase Duplication Compliance
Codebase context is not defined

Follow the guide to enable codebase context checks.

Custom Compliance
No custom compliance provided

Follow the guide to enable custom compliance check.

Compliance status legend 🟢 - Fully Compliant
🟡 - Partial Compliant
🔴 - Not Compliant
⚪ - Requires Further Human Verification
🏷️ - Compliance label

Copy link

qodo-merge-pro bot commented Oct 9, 2025

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Execute all callbacks despite errors

Modify triggerEventLocked to continue executing all registered callbacks even if
one returns an error, logging any errors to prevent inconsistent states.

pkg/txn/client/operator_events.go [70-81]

 func (tc *txnOperator) triggerEventLocked(ctx context.Context, event TxnEvent) (err error) {
 	if tc.mu.callbacks == nil {
-		return
+		return nil
 	}
 	for _, cb := range tc.mu.callbacks[event.Event] {
-		err = cb.Func(ctx, tc, event, cb.Value)
-		if err != nil {
-			return
+		if cbErr := cb.Func(ctx, tc, event, cb.Value); cbErr != nil {
+			// TODO: maybe use a multi-error wrapper
+			logutil.Error("txn event callback failed",
+				zap.String("txn-id", event.Txn.ID.String()),
+				zap.Error(cbErr))
+			err = cbErr
 		}
 	}
-	return
+	return err
 }
  • Apply / Chat
Suggestion importance[1-10]: 8

__

Why: The suggestion correctly identifies that failing to execute all callbacks upon an error can lead to an inconsistent state, and the proposed fix significantly improves the system's robustness.

Medium
Remove deferred object destruction

Remove the deferred s.Destroy() call from the RunOnce method. The caller who
creates the HnswSync object should be responsible for its lifecycle to prevent
potential use-after-free errors.

pkg/vectorindex/hnsw/sync.go [61-75]

 func (s *HnswSync[T]) RunOnce(sqlproc *sqlexec.SqlProcess, cdc *vectorindex.VectorIndexCdc[T]) (err error) {
 
-	defer s.Destroy()
 	err = s.Update(sqlproc, cdc)
 	if err != nil {
 		return err
 	}
 
 	err = s.Save(sqlproc)
 	if err != nil {
 		return err
 	}
 
 	return nil
 }

[To ensure code accuracy, apply this suggestion manually]

Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies that RunOnce destroying the object it runs on is poor API design, as it can lead to use-after-free bugs if the object is reused. The caller who creates the object should be responsible for its destruction.

Medium
Handle errors in background goroutine

Log the error returned from sqlexec.RunTxnWithSqlContext in the background
goroutine to prevent silent failures during asynchronous index creation.

pkg/sql/compile/iscp_util.go [291-305]

 				if len(cbdata.sql) > 0 {
 					// long running SQL so run in separate thread
 					go func() {
-						sqlexec.RunTxnWithSqlContext(ctx,
+						err := sqlexec.RunTxnWithSqlContext(context.Background(),
 							cbdata.cnEngine,
 							cbdata.txnClient,
 							cbdata.cnUUID,
 							cbdata.accountId,
 							24*time.Hour,
 							cbdata.resolveVariableFunc,
 							cbdata,
 							iscpRegisterEventCallbackFn)
-
+						if err != nil {
+							logutil.Errorf("failed to run async index build task: %v", err)
+						}
 					}()
 					return
 
 				} else {

[To ensure code accuracy, apply this suggestion manually]

Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies an unhandled error in a goroutine, which could lead to silent failures, and proposes logging it for better observability.

Medium
General
Complete the unfinished test case

Complete the TestHnswConsumer function by adding a sub-test for the HNSW
consumer itself, not just the HnswSqlWriter. This will close a gap in test
coverage for the HNSW consumer logic.

pkg/iscp/index_consumer_test.go [392-433]

 func TestHnswConsumer(t *testing.T) {
-
-	//proc := testutil.NewProcess(t)
-
 	ctx := context.WithValue(context.Background(), defines.TenantIDKey{}, catalog.System_Account)
 	ctx, cancel := context.WithCancel(ctx)
 	defer cancel()
 
 	sqls := make([]string, 0, 1)
 	stub1 := gostub.Stub(&ExecWithResult, func(_ context.Context, sql string, _ string, _ client.TxnOperator) (executor.Result, error) {
 		sqls = append(sqls, sql)
 		return executor.Result{}, nil
 	})
 	defer stub1.Reset()
 
 	tblDef := newTestTableDef("pk", types.T_int64, "vec", types.T_array_float32, 2)
 	info := newTestConsumerInfo()
 	job := newTestJobID()
-	/*
-		cnUUID := "a-b-c-d"
-		catalog.SetupDefines("")
-		cnEngine, cnClient, _ := testengine.New(ctx)
-	*/
+	cnUUID := "a-b-c-d"
+	catalog.SetupDefines("")
+	cnEngine, cnClient, _ := testengine.New(ctx)
 
 	t.Run("HnswSqlWriter", func(t *testing.T) {
-
 		row1 := []any{int64(1), []float32{0.1, 0.2}}
 		row2 := []any{int64(2), []float32{0.3, 0.4}}
 
 		writer, err := NewHnswSqlWriter("hnsw", job, info, tblDef, tblDef.Indexes)
 		require.NoError(t, err)
 		writer.Insert(ctx, row1)
 		writer.Insert(ctx, row2)
 		writer.Delete(ctx, row1)
 		writer.Delete(ctx, row2)
 		json, _ := writer.ToSql()
 
 		expectedSqlBytes := `{"cdc":[{"t":"I","pk":1,"v":[0.1,0.2]},{"t":"I","pk":2,"v":[0.3,0.4]},{"t":"D","pk":1},{"t":"D","pk":2}]}`
 		require.Equal(t, expectedSqlBytes, string(json))
+	})
 
+	t.Run("HnswConsumer", func(t *testing.T) {
+		sqls = sqls[:0]
+		consumer, err := NewConsumer(cnUUID, cnEngine, cnClient, tblDef, job, info)
+		require.NoError(t, err)
+
+		bat := testutil.NewBatchWithVectors(
+			[]string{"pk", "vec"},
+			[]types.Type{types.T_int64.ToType(), types.T_array_float32.ToType()},
+			[][]any{
+				{int64(1), []float32{0.1, 0.2}},
+				{int64(2), []float32{0.3, 0.4}},
+			},
+		)
+		defer bat.Clean(testutil.NewProcess(t).Mp())
+
+		output := &MockRetriever{
+			insertBatch: bat,
+			deleteBatch: nil,
+			dtype:       ISCPDataType_Tail,
+		}
+		err = consumer.Consume(ctx, output)
+		require.NoError(t, err)
+		require.Equal(t, 1, len(sqls))
+		expectedSQL := `SELECT hnsw_cdc_update('test_db', 'test_tbl', 224, 2, '{"cdc":[{"t":"I","pk":1,"v":[0.1,0.2]},{"t":"I","pk":2,"v":[0.3,0.4]}]}');`
+		require.Equal(t, expectedSQL, sqls[0])
 	})
 }
  • Apply / Chat
Suggestion importance[1-10]: 6

__

Why: The suggestion correctly points out that TestHnswConsumer is incomplete and only tests the HnswSqlWriter, leaving the HNSW consumer logic untested. Completing the test by adding a sub-test for the consumer would improve test coverage.

Low
Avoid using nil in tests

Improve test robustness by replacing nil parameters with mock objects or minimal
valid structs in Test_buildPostDmlFullTextIndexAsync.

pkg/sql/plan/build_dml_util_test.go [55-77]

 func Test_buildPostDmlFullTextIndexAsync(t *testing.T) {
+	builder := &QueryBuilder{}
+	bindContext := &BindContext{}
+	tableDef := &plan.TableDef{}
 	{
 		//invalid json
 		idxdef := &plan.IndexDef{
 			IndexAlgoParams: `{"async":1}`,
 		}
 
-		err := buildPostDmlFullTextIndex(nil, nil, nil, nil, nil, nil, 0, idxdef, 0, false, false, false)
+		err := buildPostDmlFullTextIndex(builder, bindContext, nil, tableDef, nil, nil, 0, idxdef, 0, false, false, false)
 		require.NotNil(t, err)
 	}
 
 	{
 
 		// async true
 		idxdef := &plan.IndexDef{
 			IndexAlgoParams: `{"async":"true"}`,
 		}
 
-		err := buildPostDmlFullTextIndex(nil, nil, nil, nil, nil, nil, 0, idxdef, 0, false, false, false)
+		err := buildPostDmlFullTextIndex(builder, bindContext, nil, tableDef, nil, nil, 0, idxdef, 0, false, false, false)
 		require.Nil(t, err)
 	}
 
 }
  • Apply / Chat
Suggestion importance[1-10]: 5

__

Why: The suggestion correctly points out that using nil for parameters makes tests brittle; providing minimal valid structs would make the tests more robust and less prone to future breakage.

Low
  • More

Copy link
Contributor

mergify bot commented Oct 9, 2025

⚠️ The sha of the head commit of this PR conflicts with #22609. Mergify cannot evaluate rules on this PR. ⚠️

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/feature Review effort 5/5 size/L Denotes a PR that changes [500,999] lines
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants