Skip to content

HNSW index update with CDC #21917

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 241 commits into
base: main
Choose a base branch
from

Conversation

cpegeric
Copy link
Contributor

@cpegeric cpegeric commented May 27, 2025

User description

What type of PR is this?

  • API-change
  • BUG
  • Improvement
  • Documentation
  • Feature
  • Test and CI
  • Code Refactoring

Which issue(s) this PR fixes:

issue #21835

What this PR does / why we need it:

To update the HNSW index via CDC changes.

The design doc:
https://github.com/cpegeric/mo-docs/blob/hnsw_cdc/design/mo/sql/20250501-cpegeric-hnswsync.md


PR Type

Enhancement, Tests


Description

• Implement comprehensive HNSW index CDC (Change Data Capture) synchronization functionality
• Add new HnswSync struct and hnswCdcUpdate SQL function for processing CDC updates via multi-threaded operations
• Introduce hnswSyncSinker for updating HNSW indexes with CDC changes, supporting both float32 and float64 vector types
• Refactor HNSW architecture by replacing HnswBuildIndex and HnswSearchIndex with unified HnswModel structure
• Add CDC data structures (VectorIndexCdc, VectorIndexCdcEntry) and operations for insert, update, delete operations
• Implement transaction-aware SQL execution with RunTxn function and enhanced error handling
• Add comprehensive test suites covering CDC sinker functionality, synchronization operations, and model operations
• Integrate CDC task creation into HNSW index creation workflow with automatic cleanup placeholders
• Enhance array casting with dimension validation and standardize error message formats across vector operations
• Add distributed test cases for HNSW CDC synchronization scenarios including bulk loads and incremental updates


Changes walkthrough 📝

Relevant files
Tests
12 files
hnsw_sinker_test.go
Add comprehensive test suite for HNSW CDC sinker                 

pkg/cdc/hnsw_sinker_test.go

• Comprehensive test suite for HNSW CDC sinker functionality with 692
lines of test code
• Mock implementations for SQL executors and error
handling scenarios
• Test cases covering sinker creation, execution,
error handling, and data processing
• Tests for snapshot and atomic
batch processing with vector data

+692/-0 
sync_test.go
Add test suite for HNSW CDC synchronization                           

pkg/vectorindex/hnsw/sync_test.go

• Test suite for HNSW CDC synchronization with various operation
scenarios
• Tests covering upsert, delete, insert operations with
single and multiple models
• Mock implementations for SQL execution
and streaming operations
• Shuffle testing for concurrent operation
handling

+370/-0 
search_test.go
Enhance HNSW search tests with multi-file support               

pkg/vectorindex/hnsw/search_test.go

• Added mock functions for catalog SQL operations and multi-file
scenarios
• New test helper functions for creating metadata and index
batches
• Enhanced test coverage for search operations with multiple
index files

+106/-0 
model_test.go
Add comprehensive test suite for HnswModel functionality 

pkg/vectorindex/hnsw/model_test.go

• Added comprehensive test suite for HnswModel functionality
• Tests
cover search operations, loading/unloading, add/remove operations, and
SQL generation
• Includes edge case testing for nil model scenarios

Uses mock SQL functions for testing database interactions

+206/-0 
func_hnsw_test.go
Add test cases for HNSW CDC update function                           

pkg/sql/plan/function/func_hnsw_test.go

• Added test cases for hnswCdcUpdate function
• Tests various error
conditions including null arguments and invalid JSON
• Validates
function parameter validation and error handling

+129/-0 
build_test.go
Update HNSW build tests to use new HnswModel structure     

pkg/vectorindex/hnsw/build_test.go

• Updated test code to use HnswModel instead of HnswSearchIndex

Changed function call from NewHnswBuildIndex to NewHnswModelForBuild

Updated struct initialization to use new model type

+5/-5     
types_test.go
Add test cases for vector index CDC functionality               

pkg/vectorindex/types_test.go

• Added test cases for CDC functionality
• Tests Insert, Delete,
Upsert operations and JSON serialization
• Validates CDC data
structure behavior and state management

+63/-0   
sinker_test.go
Update sinker tests for new function signature                     

pkg/cdc/sinker_test.go

• Updated test calls to NewSinker to include the new cnUUID parameter

• Maintains test compatibility with updated function signature

+1/-1     
cdc_test.go
Update CDC test mocks for new sinker signature                     

pkg/frontend/cdc_test.go

• Updated mock NewSinker stub to include cnUUID parameter
• Maintains
test compatibility with updated function signature

+1/-1     
function_id_test.go
Update function ID tests for HNSW CDC function                     

pkg/sql/plan/function/function_id_test.go

• Updated predefined function IDs to include HNSW_CDC_UPDATE

Incremented FUNCTION_END_NUMBER to maintain test consistency

+3/-1     
vector_hnsw_sync.result
Add test results for HNSW CDC synchronization functionality

test/distributed/cases/vector/vector_hnsw_sync.result

• Added test results for HNSW CDC synchronization scenarios
• Covers
empty data, bulk load, and incremental update test cases
• Validates
vector search functionality after CDC operations

+81/-0   
vector_hnsw_sync.sql
Add comprehensive HNSW CDC synchronization test cases       

test/distributed/cases/vector/vector_hnsw_sync.sql

• Added comprehensive test cases for HNSW CDC synchronization
• Tests
PITR and CDC task creation, data operations, and vector searches

Includes scenarios for empty tables, bulk loads, and incremental
updates

+113/-0 
Feature
6 files
sync.go
Implement HNSW index CDC synchronization functionality     

pkg/vectorindex/hnsw/sync.go

• New CDC synchronization functionality for HNSW index updates via SQL
function hnsw_cdc_update()
HnswSync struct for managing CDC
operations with insert, update, delete operations
• Multi-threaded
processing support with concurrent model loading and vector operations

• SQL generation for metadata and storage table updates

+631/-0 
hnsw_sinker.go
Add HNSW CDC sinker for vector index updates                         

pkg/cdc/hnsw_sinker.go

• New hnswSyncSinker implementation for updating HNSW indexes via CDC
changes
• Support for both float32 and float64 vector types with
JSON-based CDC updates
• Transaction-based SQL execution with error
handling and rollback support
• Processing of snapshot and tail data
with atomic batch operations

+573/-0 
func_hnsw.go
Implement HNSW CDC update function for vector index synchronization

pkg/sql/plan/function/func_hnsw.go

• Implemented hnswCdcUpdate function for processing CDC updates

Validates input parameters (database name, table name, dimension, CDC
JSON)
• Calls hnsw.CdcSync to perform the actual synchronization

Includes comprehensive error handling and logging

+77/-0   
util.go
Add CDC task generation for HNSW index synchronization     

pkg/sql/compile/util.go

• Added genCdcHnswIndex function to generate CDC task creation SQL

Creates PITR and CDC task SQL statements for HNSW index
synchronization
• Includes placeholder logic for future CDC task
registration

+38/-0   
list_builtIn.go
Register HNSW CDC update function in built-in functions   

pkg/sql/plan/function/list_builtIn.go

• Added HNSW_CDC_UPDATE function definition to built-in functions list

• Configured function signature with varchar and int32 parameters
returning uint64

+21/-0   
ddl_index_algo.go
Integrate CDC task creation into HNSW index creation         

pkg/sql/compile/ddl_index_algo.go

• Added call to genCdcHnswIndex in vector HNSW index handling

Executes generated CDC SQL statements during index creation

+13/-0   
Enhancement
9 files
model.go
Refactor HNSW model with CDC support and enhanced operations

pkg/vectorindex/hnsw/model.go

• New HnswModel struct replacing HnswBuildIndex with enhanced CDC
support
• Added dirty tracking, atomic length counters, and view mode
support
• Enhanced file operations with checksum validation and
streaming SQL loading
• Methods for concurrent vector operations and
model lifecycle management

+524/-0 
types.go
Add CDC data structures and operations for vector index   

pkg/vectorindex/types.go

• Added CDC-related constants (CDC_INSERT, CDC_UPSERT, CDC_DELETE)

Introduced VectorIndexCdc and VectorIndexCdcEntry structs for CDC
operations
• Added HnswCdcParam struct for CDC parameters

Implemented CDC data manipulation methods (Insert, Upsert, Delete,
ToJson)

+85/-0   
sinker.go
Add HNSW sync sinker support and improve error handling   

pkg/cdc/sinker.go

• Added support for CDCSinkType_HnswSync sink type
• Updated NewSinker
function signature to include cnUUID parameter
• Fixed potential nil
pointer dereference in error handling

+9/-2     
ddl.go
Add placeholder logic for CDC task cleanup in DDL operations

pkg/sql/compile/ddl.go

• Added TODO comments for CDC task cleanup in DropIndex and DropTable
methods
• Placeholder logic for cleaning up CDC tasks when dropping
vector/fulltext indexes

+9/-0     
sqlexec.go
Add transaction-aware SQL execution function                         

pkg/vectorindex/sqlexec/sqlexec.go

• Added RunTxn function for executing SQL operations within
transactions
• Provides transaction-aware SQL execution with proper
context and options setup

+27/-0   
func_cast.go
Enhance array casting with dimension validation                   

pkg/sql/plan/function/func_cast.go

• Enhanced array casting with dimension validation
• Added bypass for
max dimension check when width equals MaxArrayDimension
• Improved
error handling for dimension mismatches

+11/-2   
hnsw.go
Relax table scan validation in HNSW query building             

pkg/sql/plan/hnsw.go

• Commented out table scan validation in buildHnswCreate
• Relaxed
constraints on child node type checking

+6/-4     
cdc_options.go
Add HNSW sync sink type support in CDC options                     

pkg/frontend/cdc_options.go

• Added support for CDCSinkType_HnswSync in CDC options validation

Extended sink type validation to include HNSW sync type

+1/-1     
cdc_exector.go
Update CDC executor to pass CN UUID to sinker                       

pkg/frontend/cdc_exector.go

• Updated NewSinker call to include cnUUID parameter
• Passes
executor's CN UUID to sinker creation

+1/-0     
Code refactoring
2 files
build.go
Refactor HNSW build to use unified model structure             

pkg/vectorindex/hnsw/build.go

• Refactored to use HnswModel instead of HnswBuildIndex for
consistency
• Removed duplicate HnswBuildIndex struct and related
methods
• Updated build operations to work with the new unified model
structure

+11/-182
search.go
Refactor HNSW search to use HnswModel instead of HnswSearchIndex

pkg/vectorindex/hnsw/search.go

• Removed HnswSearchIndex struct and related methods (loadChunk,
LoadIndex, Search)
• Replaced HnswSearchIndex with HnswModel in the
HnswSearch struct
• Refactored LoadMetadata to be a standalone
function and updated field mappings
• Updated LoadIndex method to use
new HnswModel.LoadIndex signature

+8/-141 
Configuration changes
2 files
function_id.go
Add function ID for HNSW CDC update function                         

pkg/sql/plan/function/function_id.go

• Added HNSW_CDC_UPDATE function ID constant
• Updated
FUNCTION_END_NUMBER and function registry mapping

+7/-1     
types.go
Add HNSW sync sink type constant                                                 

pkg/cdc/types.go

• Added CDCSinkType_HnswSync constant for HNSW synchronization sink
type

+4/-3     
Miscellaneous
3 files
vector_hnsw.result
Update vector dimension error message format                         

test/distributed/cases/vector/vector_hnsw.result

• Updated error message format for dimension mismatch from "vector ops
between different dimensions" to "expected vector dimension X !=
actual dimension Y"

+1/-1     
vector_index.result
Update vector index error message format                                 

test/distributed/cases/vector/vector_index.result

• Updated error message format for dimension mismatch to use new
standardized format

+1/-1     
array.result
Update array dimension error message format                           

test/distributed/cases/array/array.result

• Updated error messages for array dimension mismatches to use new
standardized format

+2/-2     

Need help?
  • Type /help how to ... in the comments thread for any questions about Qodo Merge usage.
  • Check out the documentation for more information.
  • Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
    Labels
    kind/feature Review effort 4/5 size/XXL Denotes a PR that changes 2000+ lines
    Projects
    None yet
    Development

    Successfully merging this pull request may close these issues.

    9 participants