Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 40 additions & 12 deletions internal/verifier/compare.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@
srcCache := map[string]docWithTs{}
dstCache := map[string]docWithTs{}

var idToMismatchCount map[string]int32

// This is the core document-handling logic. It either:
//
// a) caches the new document if its mapKey is unseen, or
Expand Down Expand Up @@ -174,20 +176,28 @@
}

// Finally we compare the documents and save any mismatch report(s).
mismatches, err := verifier.compareOneDocument(srcDoc.doc, dstDoc.doc, namespace)
curResults, err := verifier.compareOneDocument(srcDoc.doc, dstDoc.doc, namespace)

pool.Put(srcDoc.doc)
pool.Put(dstDoc.doc)

if err != nil {
return errors.Wrap(err, "failed to compare documents")
}

pool.Put(srcDoc.doc)
pool.Put(dstDoc.doc)
if len(curResults) > 0 && idToMismatchCount == nil {
idToMismatchCount = createIdToMismatchCount(task)
}

mismatchCount, _ := idToMismatchCount[string(rvToMapKey(nil, srcDoc.doc.Lookup("_id")))]

Check failure on line 192 in internal/verifier/compare.go

View workflow job for this annotation

GitHub Actions / lint

S1005: unnecessary assignment to the blank identifier (staticcheck)

for i := range mismatches {
mismatches[i].SrcTimestamp = option.Some(srcDoc.ts)
mismatches[i].DstTimestamp = option.Some(dstDoc.ts)
for i := range curResults {
curResults[i].mismatches = 1 + mismatchCount
curResults[i].SrcTimestamp = option.Some(srcDoc.ts)
curResults[i].DstTimestamp = option.Some(dstDoc.ts)
}

results = append(results, mismatches...)
results = append(results, curResults...)

return nil
}
Expand Down Expand Up @@ -355,6 +365,20 @@
return results, srcDocCount, srcByteCount, nil
}

func createIdToMismatchCount(task *VerificationTask) map[string]int32 {
idToMismatchCount := map[string]int32{}

for i, id := range task.Ids {
count, _ := task.Mismatches[int32(i)]

Check failure on line 372 in internal/verifier/compare.go

View workflow job for this annotation

GitHub Actions / lint

S1005: unnecessary assignment to the blank identifier (staticcheck)

if count > 0 {
idToMismatchCount[string(rvToMapKey(nil, id))] = count
}
}

return idToMismatchCount
}

func getDocIdFromComparison(
docCompareMethod DocCompareMethod,
doc bson.Raw,
Expand Down Expand Up @@ -566,14 +590,18 @@
}

func getMapKey(docKeyValues []bson.RawValue) string {
var keyBuffer bytes.Buffer
var buf []byte
for _, value := range docKeyValues {
keyBuffer.Grow(1 + len(value.Value))
keyBuffer.WriteByte(byte(value.Type))
keyBuffer.Write(value.Value)
buf = rvToMapKey(buf, value)
}

return keyBuffer.String()
return string(buf)
}

func rvToMapKey(buf []byte, rv bson.RawValue) []byte {
buf = slices.Grow(buf, 1+len(rv.Value))
buf = append(buf, byte(rv.Type))
return append(buf, rv.Value...)
}

func (verifier *Verifier) getDocumentsCursor(
Expand Down
10 changes: 6 additions & 4 deletions internal/verifier/migration_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,19 +573,21 @@ func (verifier *Verifier) ProcessVerifyTask(ctx context.Context, workerNum int,
Msg("Discrepancies found. Will recheck in the next generation.")

dataSizes := make([]int32, 0, len(problems))
mismatches := make([]int32, 0, len(problems))

// This stores all IDs for the next generation to check.
// Its length should equal len(mismatches) + len(missingIds).
idsToRecheck := make([]bson.RawValue, 0, len(problems))

for _, mismatch := range problems {
idsToRecheck = append(idsToRecheck, mismatch.ID)
dataSizes = append(dataSizes, mismatch.dataSize)
for _, problem := range problems {
idsToRecheck = append(idsToRecheck, problem.ID)
dataSizes = append(dataSizes, problem.dataSize)
mismatches = append(mismatches, problem.mismatches)
}

// Create a task for the next generation to recheck the
// mismatched & missing docs.
err := verifier.InsertFailedCompareRecheckDocs(ctx, task.QueryFilter.Namespace, idsToRecheck, dataSizes)
err := verifier.InsertFailedCompareRecheckDocs(ctx, task.QueryFilter.Namespace, idsToRecheck, dataSizes, mismatches)
if err != nil {
return errors.Wrapf(
err,
Expand Down
17 changes: 13 additions & 4 deletions internal/verifier/migration_verifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,10 @@ func (suite *IntegrationTestSuite) TestVerifier_DocFilter_ObjectID() {

task := &VerificationTask{
PrimaryKey: bson.NewObjectID(),
Ids: []any{id1, id2},
Ids: mslices.Of(
mbson.ToRawValue(id1),
mbson.ToRawValue(id2),
),
QueryFilter: QueryFilter{
Namespace: namespace,
To: namespace,
Expand Down Expand Up @@ -529,9 +532,12 @@ func (suite *IntegrationTestSuite) TestVerifierFetchDocuments() {
})
suite.Require().NoError(err)
task := &VerificationTask{
PrimaryKey: bson.NewObjectID(),
Generation: 1,
Ids: []any{id, id + 1},
PrimaryKey: bson.NewObjectID(),
Generation: 1,
Ids: mslices.Of(
mbson.ToRawValue(id),
mbson.ToRawValue(id+1),
),
QueryFilter: basicQueryFilter("keyhole.dealers"),
}

Expand Down Expand Up @@ -952,20 +958,23 @@ func (suite *IntegrationTestSuite) TestFailedVerificationTaskInsertions() {
"foo.bar",
mslices.Of(mbson.ToRawValue(42)),
[]int32{100},
[]int32{0},
)
suite.Require().NoError(err)
err = verifier.InsertFailedCompareRecheckDocs(
ctx,
"foo.bar",
mslices.Of(mbson.ToRawValue(43), mbson.ToRawValue(44)),
[]int32{100, 100},
[]int32{0, 0},
)
suite.Require().NoError(err)
err = verifier.InsertFailedCompareRecheckDocs(
ctx,
"foo.bar2",
mslices.Of(mbson.ToRawValue(42)),
[]int32{100},
[]int32{0},
)
suite.Require().NoError(err)

Expand Down
39 changes: 36 additions & 3 deletions internal/verifier/recheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ const (
// InsertFailedCompareRecheckDocs is for inserting RecheckDocs based on failures during Check.
func (verifier *Verifier) InsertFailedCompareRecheckDocs(
ctx context.Context,
namespace string, documentIDs []bson.RawValue, dataSizes []int32) error {
namespace string,
documentIDs []bson.RawValue,
dataSizes []int32,
mismatches []int32,
) error {
dbName, collName := SplitNamespace(namespace)

dbNames := make([]string, len(documentIDs))
Expand All @@ -56,7 +60,7 @@ func (verifier *Verifier) InsertFailedCompareRecheckDocs(
Int("count", len(documentIDs)).
Msg("Persisting rechecks for mismatched or missing documents.")

return verifier.insertRecheckDocs(ctx, dbNames, collNames, documentIDs, dataSizes)
return verifier.insertRecheckDocs(ctx, dbNames, collNames, documentIDs, dataSizes, mismatches)
}

func (verifier *Verifier) insertRecheckDocs(
Expand All @@ -65,6 +69,7 @@ func (verifier *Verifier) insertRecheckDocs(
collNames []string,
documentIDs []bson.RawValue,
dataSizes []int32,
mismatches []int32,
) error {
verifier.mux.RLock()
defer verifier.mux.RUnlock()
Expand Down Expand Up @@ -149,6 +154,10 @@ func (verifier *Verifier) insertRecheckDocs(
DataSize: dataSizes[i],
}

if mismatches != nil {
recheckDoc.Mismatches = mismatches[i]
}

recheckRaw := recheckDoc.MarshalToBSON()

curRechecks = append(
Expand Down Expand Up @@ -286,11 +295,13 @@ func (verifier *Verifier) GenerateRecheckTasks(ctx context.Context) error {
// this to prevent one thread from doing all of the rechecks.

var prevDBName, prevCollName string
var idAccum []any
var idAccum []bson.RawValue
var idsSizer util.BSONArraySizer
var totalDocs types.DocumentCount
var dataSizeAccum, totalRecheckData int64

mismatchesMap := map[int32]int32{}

// The sort here is important because the recheck _id is an embedded
// document that includes the namespace. Thus, all rechecks for a given
// namespace will be consecutive in this query’s result.
Expand Down Expand Up @@ -323,6 +334,7 @@ func (verifier *Verifier) GenerateRecheckTasks(ctx context.Context) error {

task, err := verifier.createDocumentRecheckTask(
idAccum,
mismatchesMap,
types.ByteCount(dataSizeAccum),
namespace,
)
Expand Down Expand Up @@ -407,21 +419,42 @@ func (verifier *Verifier) GenerateRecheckTasks(ctx context.Context) error {
dataSizeAccum = 0
idAccum = idAccum[:0]
lastIDRaw = bson.RawValue{}
clear(mismatchesMap)
}

// A document can be enqueued for recheck for multiple reasons:
// - changed on source
// - changed on destination
// - mismatch seen
//
// We’re iterating the rechecks in order such that, if the same doc
// gets enqueued from multiple sources, we’ll see those records
// consecutively. We can deduplicate here, then, by checking to see if
// the doc ID has changed. (NB: At this point we know the namespace
// has *not* changed because we just checked for that.)
if idRaw.Equal(lastIDRaw) {

if doc.Mismatches == 0 {
// A non-mismatch recheck means the document changed. In that
// case we want to clear the mismatch count. This way a document
// that changes over & over won’t seem persistently mismatched
// merely because the replicator hasn’t kept up with the rate
// of change.
lastIDIndex := len(idAccum) - 1

delete(mismatchesMap, int32(lastIDIndex))
}

continue
}

lastIDRaw = idRaw

idsSizer.Add(idRaw)
dataSizeAccum += int64(doc.DataSize)
if doc.Mismatches > 0 {
mismatchesMap[int32(len(idAccum))] = doc.Mismatches
}
idAccum = append(idAccum, doc.PrimaryKey.DocumentID)

totalRecheckData += int64(doc.DataSize)
Expand Down
4 changes: 4 additions & 0 deletions internal/verifier/recheck/recheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ type Doc struct {
// and any others that may be added will remain unchanged even if a recheck
// is enqueued multiple times for the same document in the same generation.
DataSize int32 `bson:"dataSize"`

// Mismatches is the number of generations that have seen a mismatch on
// this document.
Mismatches int32 `bson:"mismatches"`
}

var _ bson.Marshaler = Doc{}
Expand Down
2 changes: 1 addition & 1 deletion internal/verifier/recheck_persist.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,5 +173,5 @@ func (verifier *Verifier) PersistChangeEvents(ctx context.Context, batch changeE
Time("latestTimestampTime", latestTimestampTime).
Msg("Persisting rechecks for change events.")

return verifier.insertRecheckDocs(ctx, dbNames, collNames, docIDs, dataSizes)
return verifier.insertRecheckDocs(ctx, dbNames, collNames, docIDs, dataSizes, nil)
}
37 changes: 26 additions & 11 deletions internal/verifier/recheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func (suite *IntegrationTestSuite) TestFailedCompareThenReplace() {
"the.namespace",
[]bson.RawValue{mbson.ToRawValue("theDocID")},
[]int32{1234},
[]int32{0},
),
"insert failed-comparison recheck",
)
Expand Down Expand Up @@ -315,7 +316,7 @@ func (suite *IntegrationTestSuite) TestLargeIDInsertions() {

t1 := VerificationTask{
Generation: 1,
Ids: []any{id1},
Ids: mslices.Of(mbson.ToRawValue(id1)),
Status: verificationTaskAdded,
Type: verificationTaskVerifyDocuments,
QueryFilter: QueryFilter{
Expand All @@ -327,10 +328,10 @@ func (suite *IntegrationTestSuite) TestLargeIDInsertions() {
}

t2 := t1
t2.Ids = []any{id2}
t2.Ids = mslices.Of(mbson.ToRawValue(id2))

t3 := t1
t3.Ids = []any{id3}
t3.Ids = mslices.Of(mbson.ToRawValue(id3))

suite.ElementsMatch([]VerificationTask{t1, t2, t3}, foundTasks)
}
Expand Down Expand Up @@ -374,9 +375,12 @@ func (suite *IntegrationTestSuite) TestLargeDataInsertions() {

t1 := VerificationTask{
Generation: 1,
Ids: []any{id1, id2},
Status: verificationTaskAdded,
Type: verificationTaskVerifyDocuments,
Ids: mslices.Of(
mbson.ToRawValue(id1),
mbson.ToRawValue(id2),
),
Status: verificationTaskAdded,
Type: verificationTaskVerifyDocuments,
QueryFilter: QueryFilter{
Namespace: "testDB.testColl",
To: "testDB.testColl",
Expand All @@ -386,7 +390,7 @@ func (suite *IntegrationTestSuite) TestLargeDataInsertions() {
}

t2 := t1
t2.Ids = []any{id3}
t2.Ids = mslices.Of(mbson.ToRawValue(id3))
t2.SourceDocumentCount = 1
t2.SourceByteCount = 1024

Expand Down Expand Up @@ -423,9 +427,13 @@ func (suite *IntegrationTestSuite) TestMultipleNamespaces() {

t1 := VerificationTask{
Generation: 1,
Ids: []any{id1, id2, id3},
Status: verificationTaskAdded,
Type: verificationTaskVerifyDocuments,
Ids: mslices.Of(
mbson.ToRawValue(id1),
mbson.ToRawValue(id2),
mbson.ToRawValue(id3),
),
Status: verificationTaskAdded,
Type: verificationTaskVerifyDocuments,
QueryFilter: QueryFilter{
Namespace: "testDB1.testColl1",
To: "testDB1.testColl1",
Expand Down Expand Up @@ -507,5 +515,12 @@ func insertRecheckDocs(
},
)

return verifier.insertRecheckDocs(ctx, dbNames, collNames, rawIDs, dataSizes)
return verifier.insertRecheckDocs(
ctx,
dbNames,
collNames,
rawIDs,
dataSizes,
make([]int32, len(documentIDs)),
)
}
4 changes: 4 additions & 0 deletions internal/verifier/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ type VerificationResult struct {
// don't get too large.
dataSize int32

// The number of generations where we’ve seen this document ID mismatched
// without a change event.
mismatches int32

SrcTimestamp option.Option[bson.Timestamp]
DstTimestamp option.Option[bson.Timestamp]
}
Expand Down
Loading
Loading