Skip to content

Commit d49c07b

Browse files
authored
Merge branch 'main' into fengttt-starlark
2 parents 99ef482 + 55988c6 commit d49c07b

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

80 files changed

+5230
-1763
lines changed

pkg/cdc/reader.go

Lines changed: 59 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ type tableReader struct {
5050
runningReaders *sync.Map
5151
startTs, endTs types.TS
5252
noFull bool
53+
frequency time.Duration
5354

5455
tableDef *plan.TableDef
5556
insTsColIdx, insCompositedPkColIdx int
@@ -76,12 +77,13 @@ var NewTableReader = func(
7677
frequency string,
7778
) Reader {
7879
var tick *time.Ticker
80+
var dur time.Duration
7981
if frequency != "" {
80-
dur := parseFrequencyToDuration(frequency)
81-
tick = time.NewTicker(dur)
82+
dur = parseFrequencyToDuration(frequency)
8283
} else {
83-
tick = time.NewTicker(200 * time.Millisecond)
84+
dur = 200 * time.Millisecond
8485
}
86+
tick = time.NewTicker(dur)
8587
reader := &tableReader{
8688
cnTxnClient: cnTxnClient,
8789
cnEngine: cnEngine,
@@ -99,6 +101,7 @@ var NewTableReader = func(
99101
endTs: endTs,
100102
noFull: noFull,
101103
tableDef: tableDef,
104+
frequency: dur,
102105
}
103106

104107
// batch columns layout:
@@ -184,6 +187,30 @@ func (reader *tableReader) Run(
184187
)
185188
}()
186189

190+
lastSync, err := reader.getLastSyncTime(ctx)
191+
if err != nil {
192+
logutil.Errorf("GetLastSyncTime failed: %v", err)
193+
lastSync = time.Time{}
194+
}
195+
196+
if reader.frequency <= 0 {
197+
reader.frequency = 200 * time.Millisecond
198+
}
199+
nextSyncTime := time.Now()
200+
if !lastSync.IsZero() {
201+
nextSyncTime = lastSync.Add(reader.frequency)
202+
}
203+
204+
var wait time.Duration
205+
if now := time.Now(); now.Before(nextSyncTime) {
206+
wait = nextSyncTime.Sub(now)
207+
} else {
208+
wait = 200 * time.Millisecond
209+
}
210+
211+
firstSync := true
212+
reader.tick.Reset(wait)
213+
187214
for {
188215
select {
189216
case <-ctx.Done():
@@ -199,9 +226,36 @@ func (reader *tableReader) Run(
199226
logutil.Errorf("cdc tableReader(%v) failed, err: %v", reader.info, err)
200227
return
201228
}
229+
230+
if firstSync {
231+
firstSync = false
232+
reader.tick.Reset(reader.frequency)
233+
}
234+
202235
}
203236
}
204237

238+
func (reader *tableReader) getLastSyncTime(ctx context.Context) (time.Time, error) {
239+
wKey := WatermarkKey{
240+
AccountId: reader.accountId,
241+
TaskId: reader.taskId,
242+
DBName: reader.info.SourceDbName,
243+
TableName: reader.info.SourceTblName,
244+
}
245+
246+
ts, err := reader.wMarkUpdater.GetFromCache(ctx, &wKey)
247+
if err == nil && !ts.ToTimestamp().IsEmpty() {
248+
return ts.ToTimestamp().ToStdTime(), nil
249+
}
250+
251+
defaultTS := reader.startTs
252+
ts, err = reader.wMarkUpdater.GetOrAddCommitted(ctx, &wKey, &defaultTS)
253+
if err != nil {
254+
return time.Time{}, err
255+
}
256+
return ts.ToTimestamp().ToStdTime(), nil
257+
}
258+
205259
var readTableWithTxn = func(
206260
reader *tableReader,
207261
ctx context.Context,
@@ -215,6 +269,7 @@ var readTableWithTxn = func(
215269
func (reader *tableReader) readTable(ctx context.Context, ar *ActiveRoutine) (err error) {
216270
//step1 : create an txnOp
217271
txnOp, err := GetTxnOp(ctx, reader.cnEngine, reader.cnTxnClient, "readMultipleTables")
272+
218273
if err != nil {
219274
return err
220275
}
@@ -314,6 +369,7 @@ func (reader *tableReader) readTableWithTxn(
314369

315370
start := time.Now()
316371
changes, err = CollectChanges(ctx, rel, fromTs, toTs, reader.mp)
372+
317373
v2.CdcReadDurationHistogram.Observe(time.Since(start).Seconds())
318374
if err != nil {
319375
return

pkg/cdc/sinker.go

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,12 +91,22 @@ var NewSinker = func(
9191
ctx := context.Background()
9292
padding := strings.Repeat(" ", sqlBufReserved)
9393
// create db
94-
_ = sink.Send(ctx, ar, []byte(padding+fmt.Sprintf("CREATE DATABASE IF NOT EXISTS `%s`", dbTblInfo.SinkDbName)), false)
94+
err = sink.Send(ctx, ar, []byte(padding+fmt.Sprintf("CREATE DATABASE IF NOT EXISTS `%s`", dbTblInfo.SinkDbName)), false)
95+
if err != nil {
96+
return nil, err
97+
}
9598
// use db
96-
_ = sink.Send(ctx, ar, []byte(padding+fmt.Sprintf("use `%s`", dbTblInfo.SinkDbName)), false)
99+
err = sink.Send(ctx, ar, []byte(padding+fmt.Sprintf("use `%s`", dbTblInfo.SinkDbName)), false)
100+
if err != nil {
101+
return nil, err
102+
}
97103
// possibly need to drop table first
98104
if dbTblInfo.IdChanged {
99-
_ = sink.Send(ctx, ar, []byte(padding+fmt.Sprintf("DROP TABLE IF EXISTS `%s`", dbTblInfo.SinkTblName)), false)
105+
err = sink.Send(ctx, ar, []byte(padding+fmt.Sprintf("DROP TABLE IF EXISTS `%s`", dbTblInfo.SinkTblName)), false)
106+
if err != nil {
107+
return nil, err
108+
}
109+
dbTblInfo.IdChanged = false
100110
}
101111
// create table
102112
createSql := strings.TrimSpace(dbTblInfo.SourceCreateSql)
@@ -112,7 +122,10 @@ var NewSinker = func(
112122
newTablePart = dbTblInfo.SinkTblName
113123
}
114124
createSql = createSql[:tableStart] + " " + newTablePart + createSql[tableEnd:]
115-
_ = sink.Send(ctx, ar, []byte(padding+createSql), false)
125+
err = sink.Send(ctx, ar, []byte(padding+createSql), false)
126+
if err != nil {
127+
return nil, err
128+
}
116129

117130
return NewMysqlSinker(
118131
sink,

pkg/cdc/sinker_test.go

Lines changed: 35 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"database/sql"
2020
"errors"
2121
"fmt"
22+
"github.com/stretchr/testify/require"
2223
"testing"
2324
"time"
2425

@@ -109,34 +110,42 @@ func TestNewSinker(t *testing.T) {
109110
},
110111
}
111112

112-
db, mock, err := sqlmock.New()
113-
assert.NoError(t, err)
114-
mock.ExpectExec(fakeSql).WillReturnResult(sqlmock.NewResult(1, 1))
115-
mock.ExpectExec(fakeSql).WillReturnResult(sqlmock.NewResult(1, 1))
116-
mock.ExpectExec(fakeSql).WillReturnResult(sqlmock.NewResult(1, 1))
117-
118-
sink := &mysqlSink{
119-
user: "root",
120-
password: "123456",
121-
ip: "127.0.0.1",
122-
port: 3306,
123-
retryTimes: CDCDefaultRetryTimes,
124-
retryDuration: CDCDefaultRetryDuration,
125-
conn: db,
126-
}
127-
128-
sinkStub := gostub.Stub(&NewMysqlSink, func(_, _, _ string, _, _ int, _ time.Duration, _ string, _ bool) (Sink, error) {
129-
return sink, nil
130-
})
131-
defer sinkStub.Reset()
132-
133-
sinkerStub := gostub.Stub(&NewMysqlSinker, func(Sink, uint64, string, *DbTableInfo, *CDCWatermarkUpdater, *plan.TableDef, *ActiveRoutine, uint64, bool) Sinker {
134-
return nil
135-
})
136-
defer sinkerStub.Reset()
137-
138113
for _, tt := range tests {
139114
t.Run(tt.name, func(t *testing.T) {
115+
var sink Sink
116+
var stub *gostub.Stubs
117+
118+
if tt.args.sinkUri.SinkTyp == CDCSinkType_MySQL {
119+
db, mock, err := sqlmock.New()
120+
require.NoError(t, err)
121+
defer db.Close()
122+
123+
// create db + use + create table
124+
expectCount := 3
125+
if tt.args.dbTblInfo != nil && tt.args.dbTblInfo.IdChanged {
126+
// create db + use + drop + create table
127+
expectCount = 4
128+
}
129+
130+
for i := 0; i < expectCount; i++ {
131+
mock.ExpectExec(fakeSql).WillReturnResult(sqlmock.NewResult(1, 1))
132+
}
133+
134+
sink = &mysqlSink{
135+
conn: db,
136+
}
137+
138+
stub = gostub.Stub(&NewMysqlSink, func(_, _, _ string, _, _ int, _ time.Duration, _ string, _ bool) (Sink, error) {
139+
return sink, nil
140+
})
141+
defer stub.Reset()
142+
}
143+
144+
sinkerStub := gostub.Stub(&NewMysqlSinker, func(Sink, uint64, string, *DbTableInfo, *CDCWatermarkUpdater, *plan.TableDef, *ActiveRoutine, uint64, bool) Sinker {
145+
return nil
146+
})
147+
defer sinkerStub.Reset()
148+
140149
got, err := NewSinker(
141150
tt.args.sinkUri,
142151
tt.args.accountId,

0 commit comments

Comments
 (0)