diff --git a/pkg/cdc/reader.go b/pkg/cdc/reader.go index 37cf1bbc4456f..8e32f96eb996d 100644 --- a/pkg/cdc/reader.go +++ b/pkg/cdc/reader.go @@ -50,6 +50,7 @@ type tableReader struct { runningReaders *sync.Map startTs, endTs types.TS noFull bool + frequency time.Duration tableDef *plan.TableDef insTsColIdx, insCompositedPkColIdx int @@ -76,12 +77,13 @@ var NewTableReader = func( frequency string, ) Reader { var tick *time.Ticker + var dur time.Duration if frequency != "" { - dur := parseFrequencyToDuration(frequency) - tick = time.NewTicker(dur) + dur = parseFrequencyToDuration(frequency) } else { - tick = time.NewTicker(200 * time.Millisecond) + dur = 200 * time.Millisecond } + tick = time.NewTicker(dur) reader := &tableReader{ cnTxnClient: cnTxnClient, cnEngine: cnEngine, @@ -99,6 +101,7 @@ var NewTableReader = func( endTs: endTs, noFull: noFull, tableDef: tableDef, + frequency: dur, } // batch columns layout: @@ -184,6 +187,43 @@ func (reader *tableReader) Run( ) }() + lastSync, err := reader.getLastSyncTime(ctx) + if err != nil { + logutil.Errorf("GetLastSyncTime failed: %v", err) + lastSync = time.Time{} + } + + nextSyncTime := time.Now() + if !lastSync.IsZero() { + nextSyncTime = lastSync.Add(reader.frequency) + } + + if now := time.Now(); now.Before(nextSyncTime) { + wait := nextSyncTime.Sub(now) + select { + case <-ctx.Done(): + return + case <-ar.Pause: + return + case <-ar.Cancel: + return + case <-time.After(wait): + } + } + + if err = reader.readTable(ctx, ar); err != nil { + logutil.Errorf("cdc tableReader(%v) failed, err: %v", reader.info, err) + return + } + + if reader.frequency <= 0 { + reader.frequency = 200 * time.Millisecond + } + if reader.tick != nil { + reader.tick.Stop() + } + reader.tick = time.NewTicker(reader.frequency) + for { select { case <-ctx.Done(): @@ -202,6 +242,27 @@ func (reader *tableReader) Run( } } +func (reader *tableReader) getLastSyncTime(ctx context.Context) (time.Time, error) { + wKey := WatermarkKey{ + AccountId: reader.accountId, + TaskId: reader.taskId, + DBName: reader.info.SourceDbName, + TableName: reader.info.SourceTblName, + } + + ts, err := reader.wMarkUpdater.GetFromCache(ctx, &wKey) + if err == nil && !ts.ToTimestamp().IsEmpty() { + return ts.ToTimestamp().ToStdTime(), nil + } + + defaultTS := reader.startTs + ts, err = reader.wMarkUpdater.GetOrAddCommitted(ctx, &wKey, &defaultTS) + if err != nil { + return time.Time{}, err + } + return ts.ToTimestamp().ToStdTime(), nil +} + var readTableWithTxn = func( reader *tableReader, ctx context.Context, @@ -215,6 +276,7 @@ var readTableWithTxn = func( func (reader *tableReader) readTable(ctx context.Context, ar *ActiveRoutine) (err error) { //step1 : create an txnOp txnOp, err := GetTxnOp(ctx, reader.cnEngine, reader.cnTxnClient, "readMultipleTables") + if err != nil { return err } @@ -314,6 +376,7 @@ func (reader *tableReader) readTableWithTxn( start := time.Now() changes, err = CollectChanges(ctx, rel, fromTs, toTs, reader.mp) + v2.CdcReadDurationHistogram.Observe(time.Since(start).Seconds()) if err != nil { return diff --git a/pkg/cdc/sinker.go b/pkg/cdc/sinker.go index 4ac4cc6e9ce2d..fd3634991ce48 100644 --- a/pkg/cdc/sinker.go +++ b/pkg/cdc/sinker.go @@ -91,12 +91,22 @@ var NewSinker = func( ctx := context.Background() padding := strings.Repeat(" ", sqlBufReserved) // create db - _ = sink.Send(ctx, ar, []byte(padding+fmt.Sprintf("CREATE DATABASE IF NOT EXISTS `%s`", dbTblInfo.SinkDbName)), false) + err = sink.Send(ctx, ar, []byte(padding+fmt.Sprintf("CREATE DATABASE IF NOT EXISTS `%s`", dbTblInfo.SinkDbName)), false) + if err != nil { + return nil, err + } // use db - _ = sink.Send(ctx, ar, []byte(padding+fmt.Sprintf("use `%s`", dbTblInfo.SinkDbName)), false) + err = sink.Send(ctx, ar, []byte(padding+fmt.Sprintf("use `%s`", dbTblInfo.SinkDbName)), false) + if err != nil { + return nil, err + } // possibly need to drop table first if dbTblInfo.IdChanged { - _ = sink.Send(ctx, ar, []byte(padding+fmt.Sprintf("DROP TABLE IF EXISTS `%s`", dbTblInfo.SinkTblName)), false) + err = sink.Send(ctx, ar, []byte(padding+fmt.Sprintf("DROP TABLE IF EXISTS `%s`", dbTblInfo.SinkTblName)), false) + if err != nil { + return nil, err + } + dbTblInfo.IdChanged = false } // create table createSql := strings.TrimSpace(dbTblInfo.SourceCreateSql) @@ -112,7 +122,10 @@ var NewSinker = func( newTablePart = dbTblInfo.SinkTblName } createSql = createSql[:tableStart] + " " + newTablePart + createSql[tableEnd:] - _ = sink.Send(ctx, ar, []byte(padding+createSql), false) + err = sink.Send(ctx, ar, []byte(padding+createSql), false) + if err != nil { + return nil, err + } return NewMysqlSinker( sink, diff --git a/pkg/cdc/sinker_test.go b/pkg/cdc/sinker_test.go index 67b98e63fd59d..c3a59ae655023 100644 --- a/pkg/cdc/sinker_test.go +++ b/pkg/cdc/sinker_test.go @@ -19,6 +19,7 @@ import ( "database/sql" "errors" "fmt" + "github.com/stretchr/testify/require" "testing" "time" @@ -109,34 +110,42 @@ func TestNewSinker(t *testing.T) { }, } - db, mock, err := sqlmock.New() - assert.NoError(t, err) - mock.ExpectExec(fakeSql).WillReturnResult(sqlmock.NewResult(1, 1)) - mock.ExpectExec(fakeSql).WillReturnResult(sqlmock.NewResult(1, 1)) - mock.ExpectExec(fakeSql).WillReturnResult(sqlmock.NewResult(1, 1)) - - sink := &mysqlSink{ - user: "root", - password: "123456", - ip: "127.0.0.1", - port: 3306, - retryTimes: CDCDefaultRetryTimes, - retryDuration: CDCDefaultRetryDuration, - conn: db, - } - - sinkStub := gostub.Stub(&NewMysqlSink, func(_, _, _ string, _, _ int, _ time.Duration, _ string, _ bool) (Sink, error) { - return sink, nil - }) - defer sinkStub.Reset() - - sinkerStub := gostub.Stub(&NewMysqlSinker, func(Sink, uint64, string, *DbTableInfo, *CDCWatermarkUpdater, *plan.TableDef, *ActiveRoutine, uint64, bool) Sinker { - return nil - }) - defer sinkerStub.Reset() - for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + var sink Sink + var stub *gostub.Stubs + + if tt.args.sinkUri.SinkTyp == CDCSinkType_MySQL { + db, mock, err := sqlmock.New() + require.NoError(t, err) + defer db.Close() + + // create db + use + create table + expectCount := 3 + if tt.args.dbTblInfo != nil && tt.args.dbTblInfo.IdChanged { + // create db + use + drop + create table + expectCount = 4 + } + + for i := 0; i < expectCount; i++ { + mock.ExpectExec(fakeSql).WillReturnResult(sqlmock.NewResult(1, 1)) + } + + sink = &mysqlSink{ + conn: db, + } + + stub = gostub.Stub(&NewMysqlSink, func(_, _, _ string, _, _ int, _ time.Duration, _ string, _ bool) (Sink, error) { + return sink, nil + }) + defer stub.Reset() + } + + sinkerStub := gostub.Stub(&NewMysqlSinker, func(Sink, uint64, string, *DbTableInfo, *CDCWatermarkUpdater, *plan.TableDef, *ActiveRoutine, uint64, bool) Sinker { + return nil + }) + defer sinkerStub.Reset() + got, err := NewSinker( tt.args.sinkUri, tt.args.accountId, diff --git a/pkg/cdc/table_scanner.go b/pkg/cdc/table_scanner.go index 07543ec3aa9ab..af627a6e94199 100644 --- a/pkg/cdc/table_scanner.go +++ b/pkg/cdc/table_scanner.go @@ -17,6 +17,8 @@ package cdc import ( "context" "fmt" + "github.com/matrixorigin/matrixone/pkg/common/moerr" + "github.com/matrixorigin/matrixone/pkg/objectio" "slices" "strings" "sync" @@ -46,7 +48,7 @@ var GetTableDetector = func(cnUUID string) *TableDetector { once.Do(func() { detector = &TableDetector{ Mp: make(map[uint32]TblMap), - Callbacks: make(map[string]func(map[uint32]TblMap)), + Callbacks: make(map[string]TableCallback), exec: getSqlExecutor(cnUUID), CallBackAccountId: make(map[string]uint32), SubscribedAccountIds: make(map[uint32][]string), @@ -62,11 +64,13 @@ var GetTableDetector = func(cnUUID string) *TableDetector { // TblMap key is dbName.tableName, e.g. db1.t1 type TblMap map[string]*DbTableInfo +type TableCallback func(map[uint32]TblMap) error + type TableDetector struct { sync.Mutex Mp map[uint32]TblMap - Callbacks map[string]func(map[uint32]TblMap) + Callbacks map[string]TableCallback exec executor.SQLExecutor cancel context.CancelFunc @@ -82,9 +86,14 @@ type TableDetector struct { CallBackTableName map[string][]string // tablename -> [taska, taskb ...] SubscribedTableNames map[string][]string + + // to make sure there is at most only one handleNewTables running, so the truncate info will not be lost + handling bool + lastMp map[uint32]TblMap + mu sync.Mutex } -func (s *TableDetector) Register(id string, accountId uint32, dbs []string, tables []string, cb func(map[uint32]TblMap)) { +func (s *TableDetector) Register(id string, accountId uint32, dbs []string, tables []string, cb TableCallback) { s.Lock() defer s.Unlock() @@ -173,33 +182,83 @@ func (s *TableDetector) UnRegister(id string) { func (s *TableDetector) scanTableLoop(ctx context.Context) { logutil.Info("CDC-TableDetector-Scan-Start") - defer func() { - logutil.Info("CDC-TableDetector-Scan-End") - }() + defer logutil.Info("CDC-TableDetector-Scan-End") + + ticker := time.NewTicker(15 * time.Second) + defer ticker.Stop() + + retryTicker := time.NewTicker(5 * time.Second) + defer retryTicker.Stop() - timeTick := time.Tick(15 * time.Second) for { select { case <-ctx.Done(): return - case <-timeTick: - if err := s.scanTable(); err != nil { - logutil.Error( - "CDC-TableDetector-Scan-Error", - zap.Error(err), - ) + case <-ticker.C: + s.mu.Lock() + + if s.handling { + s.mu.Unlock() + continue } - // do callbacks - s.Lock() - for _, cb := range s.Callbacks { - go cb(s.Mp) + + s.mu.Unlock() + + go s.scanAndProcess(ctx) + case <-retryTicker.C: + s.mu.Lock() + if s.handling || s.lastMp == nil { + s.mu.Unlock() + continue } - s.Unlock() + s.mu.Unlock() + + go s.processCallback(ctx, s.lastMp) } } } +func (s *TableDetector) scanAndProcess(ctx context.Context) { + if err := s.scanTable(); err != nil { + logutil.Error("CDC-TableDetector-Scan-Error", zap.Error(err)) + return + } + + s.mu.Lock() + s.lastMp = s.Mp + s.mu.Unlock() + + s.processCallback(ctx, s.lastMp) +} + +func (s *TableDetector) processCallback(ctx context.Context, tables map[uint32]TblMap) { + s.mu.Lock() + s.handling = true + s.mu.Unlock() + + var err error + for _, cb := range s.Callbacks { + err = cb(tables) + } + + s.mu.Lock() + defer s.mu.Unlock() + + if err != nil { + logutil.Warn("CDC-TableDetector-Callback-Failed", zap.Error(err)) + } else { + logutil.Info("CDC-TableDetector-Callback-Success") + s.lastMp = nil + } + + s.handling = false +} + func (s *TableDetector) scanTable() error { + if objectio.CDCScanTableErrInjected() { + return moerr.NewInternalError(context.Background(), "CDC_SCANTABLE_ERR") + } + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() var ( @@ -289,14 +348,14 @@ func (s *TableDetector) scanTable() error { if !exists { mp[accountId][key] = newInfo } else { - mp[accountId][key] = &DbTableInfo{ - SourceDbId: dbId, - SourceDbName: dbName, - SourceTblId: tblId, - SourceTblName: tblName, - SourceCreateSql: createSql, - IdChanged: oldInfo.OnlyDiffinTblId(newInfo), - } + idChanged := oldInfo.OnlyDiffinTblId(newInfo) + oldInfo.SourceDbId = dbId + oldInfo.SourceDbName = dbName + oldInfo.SourceTblId = tblId + oldInfo.SourceTblName = tblName + oldInfo.SourceCreateSql = createSql + oldInfo.IdChanged = oldInfo.IdChanged || idChanged + mp[accountId][key] = oldInfo } } return true diff --git a/pkg/cdc/table_scanner_test.go b/pkg/cdc/table_scanner_test.go index d0720f55f0139..59620c03b5aa5 100644 --- a/pkg/cdc/table_scanner_test.go +++ b/pkg/cdc/table_scanner_test.go @@ -15,6 +15,9 @@ package cdc import ( + "context" + "github.com/matrixorigin/matrixone/pkg/objectio" + "github.com/matrixorigin/matrixone/pkg/util/fault" "strings" "sync" "testing" @@ -59,7 +62,7 @@ func TestTableScanner(t *testing.T) { td := &TableDetector{ Mutex: sync.Mutex{}, Mp: make(map[uint32]TblMap), - Callbacks: make(map[string]func(map[uint32]TblMap)), + Callbacks: make(map[string]TableCallback), CallBackAccountId: make(map[string]uint32), SubscribedAccountIds: make(map[uint32][]string), CallBackDbName: make(map[string][]string), @@ -69,13 +72,13 @@ func TestTableScanner(t *testing.T) { exec: mockSqlExecutor, } - td.Register("id1", 1, []string{"db1"}, []string{"tbl1"}, func(mp map[uint32]TblMap) {}) + td.Register("id1", 1, []string{"db1"}, []string{"tbl1"}, func(mp map[uint32]TblMap) error { return nil }) assert.Equal(t, 1, len(td.Callbacks)) - td.Register("id2", 2, []string{"db2"}, []string{"tbl2"}, func(mp map[uint32]TblMap) {}) + td.Register("id2", 2, []string{"db2"}, []string{"tbl2"}, func(mp map[uint32]TblMap) error { return nil }) assert.Equal(t, 2, len(td.Callbacks)) assert.Equal(t, 2, len(td.SubscribedAccountIds)) - td.Register("id3", 1, []string{"db1"}, []string{"tbl1"}, func(mp map[uint32]TblMap) {}) + td.Register("id3", 1, []string{"db1"}, []string{"tbl1"}, func(mp map[uint32]TblMap) error { return nil }) assert.Equal(t, 3, len(td.Callbacks)) assert.Equal(t, 2, len(td.SubscribedAccountIds)) assert.Equal(t, 2, len(td.SubscribedDbNames["db1"])) @@ -96,7 +99,7 @@ func TestTableScanner(t *testing.T) { assert.Equal(t, 0, len(td.SubscribedAccountIds)) assert.Equal(t, 0, len(td.SubscribedDbNames)) - td.Register("id4", 1, []string{"db4"}, []string{"tbl4"}, func(mp map[uint32]TblMap) {}) + td.Register("id4", 1, []string{"db4"}, []string{"tbl4"}, func(mp map[uint32]TblMap) error { return nil }) assert.Equal(t, 1, len(td.Callbacks)) assert.Equal(t, 1, len(td.SubscribedAccountIds)) @@ -139,3 +142,158 @@ func Test_CollectTableInfoSQL(t *testing.T) { expected = "SELECT REL_ID, RELNAME, RELDATABASE_ID, RELDATABASE, REL_CREATESQL, ACCOUNT_ID FROM `MO_CATALOG`.`MO_TABLES` WHERE ACCOUNT_ID IN (0) AND RELDATABASE IN ('SOURCE_DB') AND RELNAME IN ('ORDERS') AND RELKIND = 'R' AND RELDATABASE NOT IN ('INFORMATION_SCHEMA','MO_CATALOG','MO_DEBUG','MO_TASK','MYSQL','SYSTEM','SYSTEM_METRICS')" assert.Equal(t, strings.ToUpper(expected), strings.ToUpper(sql)) } + +func TestScanAndProcess(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + td := &TableDetector{ + Mutex: sync.Mutex{}, + Mp: make(map[uint32]TblMap), + Callbacks: make(map[string]TableCallback), + CallBackAccountId: make(map[string]uint32), + SubscribedAccountIds: make(map[uint32][]string), + CallBackDbName: make(map[string][]string), + SubscribedDbNames: make(map[string][]string), + CallBackTableName: make(map[string][]string), + SubscribedTableNames: make(map[string][]string), + exec: nil, + } + + fault.Enable() + objectio.SimpleInject(objectio.FJ_CDCScanTableErr) + td.scanAndProcess(context.Background()) + fault.Disable() + + td.scanAndProcess(context.Background()) +} + +func TestProcessCallBack(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + td := &TableDetector{ + Mutex: sync.Mutex{}, + Mp: make(map[uint32]TblMap), + Callbacks: make(map[string]TableCallback), + CallBackAccountId: make(map[string]uint32), + SubscribedAccountIds: make(map[uint32][]string), + CallBackDbName: make(map[string][]string), + SubscribedDbNames: make(map[string][]string), + CallBackTableName: make(map[string][]string), + SubscribedTableNames: make(map[string][]string), + exec: nil, + lastMp: make(map[uint32]TblMap), + } + + tables := map[uint32]TblMap{ + 1: { + "db1.tbl1": &DbTableInfo{ + SourceDbId: 1, + SourceDbName: "db1", + SourceTblId: 1001, + SourceTblName: "tbl1", + SourceCreateSql: "create table tbl1 (a int)", + IdChanged: false, + }, + }, + } + td.Register("id1", 1, []string{"db1"}, []string{"tbl1"}, func(mp map[uint32]TblMap) error { return moerr.NewInternalErrorNoCtx("ERR") }) + assert.Equal(t, 1, len(td.Callbacks)) + td.mu.Lock() + td.lastMp = tables + td.mu.Unlock() + + td.processCallback(context.Background(), tables) + + td.mu.Lock() + defer td.mu.Unlock() + + assert.False(t, td.handling, "handling should be reset to false") + + assert.NotNil(t, td.lastMp, "lastMp should not be cleared on error") + assert.Equal(t, tables, td.lastMp, "lastMp should remain unchanged") +} + +func TestTableScanner_UpdateTableInfo(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + bat1 := batch.New([]string{"tblId", "tblName", "dbId", "dbName", "createSql", "accountId"}) + bat1.Vecs[0] = testutil.MakeUint64Vector([]uint64{1001}, nil) + bat1.Vecs[1] = testutil.MakeVarcharVector([]string{"tbl1"}, nil) + bat1.Vecs[2] = testutil.MakeUint64Vector([]uint64{1}, nil) + bat1.Vecs[3] = testutil.MakeVarcharVector([]string{"db1"}, nil) + bat1.Vecs[4] = testutil.MakeVarcharVector([]string{"create table tbl1 (a int)"}, nil) + bat1.Vecs[5] = testutil.MakeUint32Vector([]uint32{1}, nil) + bat1.SetRowCount(1) + res1 := executor.Result{ + Mp: testutil.TestUtilMp, + Batches: []*batch.Batch{bat1}, + } + + bat2 := batch.New([]string{"tblId", "tblName", "dbId", "dbName", "createSql", "accountId"}) + bat2.Vecs[0] = testutil.MakeUint64Vector([]uint64{1002}, nil) // 新的表ID + bat2.Vecs[1] = testutil.MakeVarcharVector([]string{"tbl1"}, nil) + bat2.Vecs[2] = testutil.MakeUint64Vector([]uint64{1}, nil) + bat2.Vecs[3] = testutil.MakeVarcharVector([]string{"db1"}, nil) + bat2.Vecs[4] = testutil.MakeVarcharVector([]string{"create table tbl1 (a int)"}, nil) + bat2.Vecs[5] = testutil.MakeUint32Vector([]uint32{1}, nil) + bat2.SetRowCount(1) + res2 := executor.Result{ + Mp: testutil.TestUtilMp, + Batches: []*batch.Batch{bat2}, + } + + mockSqlExecutor := mock_executor.NewMockSQLExecutor(ctrl) + + mockSqlExecutor.EXPECT().Exec( + gomock.Any(), + CDCSQLBuilder.CollectTableInfoSQL("1", "'db1'", "'tbl1'"), + gomock.Any(), + ).Return(res1, nil) + + mockSqlExecutor.EXPECT().Exec( + gomock.Any(), + CDCSQLBuilder.CollectTableInfoSQL("1", "'db1'", "'tbl1'"), + gomock.Any(), + ).Return(res2, nil) + + td := &TableDetector{ + Mutex: sync.Mutex{}, + Mp: make(map[uint32]TblMap), + Callbacks: make(map[string]TableCallback), + CallBackAccountId: make(map[string]uint32), + SubscribedAccountIds: make(map[uint32][]string), + CallBackDbName: make(map[string][]string), + SubscribedDbNames: make(map[string][]string), + CallBackTableName: make(map[string][]string), + SubscribedTableNames: make(map[string][]string), + exec: mockSqlExecutor, + } + + td.Register("test-task", 1, []string{"db1"}, []string{"tbl1"}, func(mp map[uint32]TblMap) error { + return nil + }) + + err := td.scanTable() + assert.NoError(t, err) + assert.Equal(t, 1, len(td.Mp)) + + accountMap, ok := td.Mp[1] + assert.True(t, ok) + + tblInfo, ok := accountMap["db1.tbl1"] + assert.True(t, ok) + assert.Equal(t, uint64(1001), tblInfo.SourceTblId) + assert.False(t, tblInfo.IdChanged) + + err = td.scanTable() + assert.NoError(t, err) + assert.Equal(t, 1, len(td.Mp)) + + accountMap = td.Mp[1] + tblInfo = accountMap["db1.tbl1"] + assert.Equal(t, uint64(1002), tblInfo.SourceTblId) + assert.True(t, tblInfo.IdChanged) +} diff --git a/pkg/frontend/cdc_exector.go b/pkg/frontend/cdc_exector.go index 4e9e302b7e5fa..4c38ae3ac8470 100644 --- a/pkg/frontend/cdc_exector.go +++ b/pkg/frontend/cdc_exector.go @@ -17,6 +17,7 @@ package frontend import ( "context" "encoding/json" + "github.com/matrixorigin/matrixone/pkg/objectio" "regexp" "strconv" "sync" @@ -397,11 +398,17 @@ func (exec *CDCTaskExecutor) updateErrMsg(ctx context.Context, errMsg string) (e ) } -func (exec *CDCTaskExecutor) handleNewTables(allAccountTbls map[uint32]cdc.TblMap) { +func (exec *CDCTaskExecutor) handleNewTables(allAccountTbls map[uint32]cdc.TblMap) error { // lock to avoid create pipelines for the same table + // 2025.7, this lock might be needless now exec.Lock() defer exec.Unlock() + // if injected, we expect nothing + if sleepSeconds, injected := objectio.CDCHandleSlowInjected(); injected { + time.Sleep(time.Duration(sleepSeconds) * time.Second) + } + accountId := uint32(exec.spec.Accounts[0].GetId()) ctx := defines.AttachAccountId(context.Background(), accountId) @@ -413,19 +420,26 @@ func (exec *CDCTaskExecutor) handleNewTables(allAccountTbls map[uint32]cdc.TblMa zap.String("task-name", exec.spec.TaskName), zap.Error(err), ) - return + return err } defer func() { cdc.FinishTxnOp(ctx, err, txnOp, exec.cnEngine) }() - if err = exec.cnEngine.New(ctx, txnOp); err != nil { + err = exec.cnEngine.New(ctx, txnOp) + + // if injected, we expect the handleNewTables to keep retrying + if objectio.CDCHandleErrInjected() { + err = moerr.NewInternalError(context.Background(), "CDC_HANDLENEWTABLES_ERR") + } + + if err != nil { logutil.Error( "CDC-Task-HandleNewTables-NewEngineFailed", zap.String("task-id", exec.spec.TaskId), zap.String("task-name", exec.spec.TaskName), zap.Error(err), ) - return + return err } for key, info := range allAccountTbls[accountId] { @@ -459,24 +473,13 @@ func (exec *CDCTaskExecutor) handleNewTables(allAccountTbls map[uint32]cdc.TblMa logutil.Infof("cdc task find new table: %s", newTableInfo) if err = exec.addExecPipelineForTable(ctx, newTableInfo, txnOp); err != nil { logutil.Errorf("cdc task %s add exec pipeline for table %s failed, err: %v", exec.spec.TaskName, key, err) + return err } else { + info.IdChanged = newTableInfo.IdChanged logutil.Infof("cdc task %s add exec pipeline for table %s successfully", exec.spec.TaskName, key) } - logger := logutil.Info - msg := "CDC-Task-HandleNewTables-Success" - if err != nil { - logger = logutil.Error - msg = "CDC-Task-HandleNewTables-Failed" - } - logger( - msg, - zap.String("task-id", exec.spec.TaskId), - zap.String("task-name", exec.spec.TaskName), - zap.String("table-key", key), - zap.String("table-info", newTableInfo.String()), - zap.Error(err), - ) } + return nil } func (exec *CDCTaskExecutor) matchAnyPattern(key string, info *cdc.DbTableInfo) bool { @@ -511,6 +514,16 @@ func (exec *CDCTaskExecutor) addExecPipelineForTable( info *cdc.DbTableInfo, txnOp client.TxnOperator, ) (err error) { + // for ut + if objectio.CDCAddExecConsumeTruncateInjected() { + info.IdChanged = false + return nil + } + + if objectio.CDCAddExecErrInjected() { + return moerr.NewInternalErrorNoCtx("CDC_AddExecPipelineForTable_ERR") + } + // step 1. init watermarkUpdater // get watermark from db watermark := exec.startTs diff --git a/pkg/frontend/cdc_test.go b/pkg/frontend/cdc_test.go index ebbdf84083a55..aba0ee7de649e 100644 --- a/pkg/frontend/cdc_test.go +++ b/pkg/frontend/cdc_test.go @@ -18,6 +18,9 @@ import ( "context" "database/sql" "fmt" + "github.com/matrixorigin/matrixone/pkg/objectio" + "github.com/matrixorigin/matrixone/pkg/util/fault" + "github.com/stretchr/testify/require" "regexp" "sync" "testing" @@ -1002,7 +1005,7 @@ func TestRegisterCdcExecutor(t *testing.T) { gostub.Stub(&cdc.GetTableDetector, func(cnUUID string) *cdc.TableDetector { return &cdc.TableDetector{ Mp: make(map[uint32]cdc.TblMap), - Callbacks: map[string]func(map[uint32]cdc.TblMap){"id": func(mp map[uint32]cdc.TblMap) {}}, + Callbacks: map[string]cdc.TableCallback{"id": func(mp map[uint32]cdc.TblMap) error { return nil }}, CallBackAccountId: map[string]uint32{"id": 0}, SubscribedAccountIds: map[uint32][]string{0: {"id"}}, CallBackDbName: make(map[string][]string), @@ -2286,6 +2289,7 @@ func TestCdcTask_Cancel(t *testing.T) { } func TestCdcTask_retrieveCdcTask(t *testing.T) { + fault.EnableDomain(fault.DomainFrontend) type fields struct { logger *zap.Logger ie ie.InternalExecutor @@ -2767,6 +2771,152 @@ func TestCdcTask_handleNewTables(t *testing.T) { cdcTask.handleNewTables(mp) } +func TestCdcTask_handleNewTables_addpipeline(t *testing.T) { + stub1 := gostub.Stub(&cdc.GetTxnOp, func(context.Context, engine.Engine, client.TxnClient, string) (client.TxnOperator, error) { + return nil, nil + }) + defer stub1.Reset() + + stub2 := gostub.Stub(&cdc.FinishTxnOp, func(context.Context, error, client.TxnOperator, engine.Engine) {}) + defer stub2.Reset() + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + eng := mock_frontend.NewMockEngine(ctrl) + eng.EXPECT().New(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + + cdcTask := &CDCTaskExecutor{ + spec: &task.CreateCdcDetails{ + Accounts: []*task.Account{{Id: 0}}, + }, + tables: cdc.PatternTuples{ + Pts: []*cdc.PatternTuple{ + { + Source: cdc.PatternTable{ + Database: "db1", + Table: cdc.CDCPitrGranularity_All, + }, + }, + }, + }, + exclude: regexp.MustCompile("db1.tb1"), + cnEngine: eng, + runningReaders: &sync.Map{}, + } + + mp := map[uint32]cdc.TblMap{ + 0: { + "db1.tb1": &cdc.DbTableInfo{}, + "db1.tb2": &cdc.DbTableInfo{IdChanged: true}, + }, + } + + fault.Enable() + objectio.SimpleInject(objectio.FJ_CDCAddExecErr) + err := cdcTask.handleNewTables(mp) + require.Error(t, err) + fault.Disable() + + fault.Enable() + objectio.SimpleInject(objectio.FJ_CDCAddExecConsumeTruncate) + err = cdcTask.handleNewTables(mp) + require.NoError(t, err) + require.Equal(t, false, mp[0]["db1.tb2"].IdChanged) + fault.Disable() +} + +func TestCdcTask_handleNewTables_GetTxnOpErr(t *testing.T) { + stub1 := gostub.Stub(&cdc.GetTxnOp, func(context.Context, engine.Engine, client.TxnClient, string) (client.TxnOperator, error) { + return nil, moerr.NewInternalErrorNoCtx("ERR") + }) + defer stub1.Reset() + + stub2 := gostub.Stub(&cdc.FinishTxnOp, func(context.Context, error, client.TxnOperator, engine.Engine) {}) + defer stub2.Reset() + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + eng := mock_frontend.NewMockEngine(ctrl) + eng.EXPECT().New(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + + cdcTask := &CDCTaskExecutor{ + spec: &task.CreateCdcDetails{ + Accounts: []*task.Account{{Id: 0}}, + }, + tables: cdc.PatternTuples{ + Pts: []*cdc.PatternTuple{ + { + Source: cdc.PatternTable{ + Database: "db1", + Table: cdc.CDCPitrGranularity_All, + }, + }, + }, + }, + exclude: regexp.MustCompile("db1.tb1"), + cnEngine: eng, + runningReaders: &sync.Map{}, + } + + mp := map[uint32]cdc.TblMap{ + 0: { + "db1.tb1": &cdc.DbTableInfo{}, + "db2.tb1": &cdc.DbTableInfo{}, + }, + } + err := cdcTask.handleNewTables(mp) + require.Error(t, err) +} + +func TestCdcTask_handleNewTables_NewEngineFailed(t *testing.T) { + stub1 := gostub.Stub(&cdc.GetTxnOp, func(context.Context, engine.Engine, client.TxnClient, string) (client.TxnOperator, error) { + return nil, nil + }) + defer stub1.Reset() + + stub2 := gostub.Stub(&cdc.FinishTxnOp, func(context.Context, error, client.TxnOperator, engine.Engine) {}) + defer stub2.Reset() + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + eng := mock_frontend.NewMockEngine(ctrl) + eng.EXPECT().New(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + + cdcTask := &CDCTaskExecutor{ + spec: &task.CreateCdcDetails{ + Accounts: []*task.Account{{Id: 0}}, + }, + tables: cdc.PatternTuples{ + Pts: []*cdc.PatternTuple{ + { + Source: cdc.PatternTable{ + Database: "db1", + Table: cdc.CDCPitrGranularity_All, + }, + }, + }, + }, + exclude: regexp.MustCompile("db1.tb1"), + cnEngine: eng, + runningReaders: &sync.Map{}, + } + + mp := map[uint32]cdc.TblMap{ + 0: { + "db1.tb1": &cdc.DbTableInfo{}, + "db2.tb1": &cdc.DbTableInfo{}, + }, + } + fault.Enable() + objectio.SimpleInject(objectio.FJ_CDCHandleErr) + err := cdcTask.handleNewTables(mp) + require.Error(t, err) + fault.Disable() +} + func TestCdcTask_handleNewTables_existingReaderWithDifferentTableID(t *testing.T) { stub1 := gostub.Stub(&cdc.GetTxnOp, func(context.Context, engine.Engine, client.TxnClient, string) (client.TxnOperator, error) { return nil, nil diff --git a/pkg/objectio/injects.go b/pkg/objectio/injects.go index 306eac14f672c..beaeab6400270 100644 --- a/pkg/objectio/injects.go +++ b/pkg/objectio/injects.go @@ -53,6 +53,12 @@ const ( FJ_CronJobsOpen = "fj/cronjobs/open" FJ_CDCRecordTxn = "fj/cdc/recordtxn" + + FJ_CDCHandleSlow = "fj/cdc/handleslow" + FJ_CDCHandleErr = "fj/cdc/handleerr" + FJ_CDCScanTableErr = "fj/cdc/scantableerr" + FJ_CDCAddExecErr = "fj/cdc/addexecerr" + FJ_CDCAddExecConsumeTruncate = "fj/cdc/addexecconsumetruncate" ) const ( @@ -546,3 +552,28 @@ func CDCRecordTxnInjected(dbName, tableName string) (bool, int) { } return checkLoggingArgs(int(iarg), sarg, dbName, tableName) } + +func CDCHandleSlowInjected() (sleepSeconds int64, injected bool) { + iarg, _, injected := fault.TriggerFault(FJ_CDCHandleSlow) + return iarg, injected +} + +func CDCHandleErrInjected() bool { + _, _, injected := fault.TriggerFault(FJ_CDCHandleErr) + return injected +} + +func CDCScanTableErrInjected() bool { + _, _, injected := fault.TriggerFault(FJ_CDCScanTableErr) + return injected +} + +func CDCAddExecErrInjected() bool { + _, _, injected := fault.TriggerFault(FJ_CDCAddExecErr) + return injected +} + +func CDCAddExecConsumeTruncateInjected() bool { + _, _, injected := fault.TriggerFault(FJ_CDCAddExecConsumeTruncate) + return injected +}