diff --git a/internal/verifier/compare.go b/internal/verifier/compare.go index 678cbf9b..906d7749 100644 --- a/internal/verifier/compare.go +++ b/internal/verifier/compare.go @@ -118,6 +118,8 @@ func (verifier *Verifier) compareDocsFromChannels( srcCache := map[string]docWithTs{} dstCache := map[string]docWithTs{} + var idToMismatchCount map[string]int32 + // This is the core document-handling logic. It either: // // a) caches the new document if its mapKey is unseen, or @@ -174,20 +176,28 @@ func (verifier *Verifier) compareDocsFromChannels( } // Finally we compare the documents and save any mismatch report(s). - mismatches, err := verifier.compareOneDocument(srcDoc.doc, dstDoc.doc, namespace) + curResults, err := verifier.compareOneDocument(srcDoc.doc, dstDoc.doc, namespace) + + pool.Put(srcDoc.doc) + pool.Put(dstDoc.doc) + if err != nil { return errors.Wrap(err, "failed to compare documents") } - pool.Put(srcDoc.doc) - pool.Put(dstDoc.doc) + if len(curResults) > 0 && idToMismatchCount == nil { + idToMismatchCount = createIdToMismatchCount(task) + } + + mismatchCount, _ := idToMismatchCount[string(rvToMapKey(nil, srcDoc.doc.Lookup("_id")))] - for i := range mismatches { - mismatches[i].SrcTimestamp = option.Some(srcDoc.ts) - mismatches[i].DstTimestamp = option.Some(dstDoc.ts) + for i := range curResults { + curResults[i].mismatches = 1 + mismatchCount + curResults[i].SrcTimestamp = option.Some(srcDoc.ts) + curResults[i].DstTimestamp = option.Some(dstDoc.ts) } - results = append(results, mismatches...) + results = append(results, curResults...) return nil } @@ -355,6 +365,20 @@ func (verifier *Verifier) compareDocsFromChannels( return results, srcDocCount, srcByteCount, nil } +func createIdToMismatchCount(task *VerificationTask) map[string]int32 { + idToMismatchCount := map[string]int32{} + + for i, id := range task.Ids { + count, _ := task.Mismatches[int32(i)] + + if count > 0 { + idToMismatchCount[string(rvToMapKey(nil, id))] = count + } + } + + return idToMismatchCount +} + func getDocIdFromComparison( docCompareMethod DocCompareMethod, doc bson.Raw, @@ -566,14 +590,18 @@ func iterateCursorToChannel( } func getMapKey(docKeyValues []bson.RawValue) string { - var keyBuffer bytes.Buffer + var buf []byte for _, value := range docKeyValues { - keyBuffer.Grow(1 + len(value.Value)) - keyBuffer.WriteByte(byte(value.Type)) - keyBuffer.Write(value.Value) + buf = rvToMapKey(buf, value) } - return keyBuffer.String() + return string(buf) +} + +func rvToMapKey(buf []byte, rv bson.RawValue) []byte { + buf = slices.Grow(buf, 1+len(rv.Value)) + buf = append(buf, byte(rv.Type)) + return append(buf, rv.Value...) } func (verifier *Verifier) getDocumentsCursor( diff --git a/internal/verifier/migration_verifier.go b/internal/verifier/migration_verifier.go index e23445cb..74495192 100644 --- a/internal/verifier/migration_verifier.go +++ b/internal/verifier/migration_verifier.go @@ -573,19 +573,21 @@ func (verifier *Verifier) ProcessVerifyTask(ctx context.Context, workerNum int, Msg("Discrepancies found. Will recheck in the next generation.") dataSizes := make([]int32, 0, len(problems)) + mismatches := make([]int32, 0, len(problems)) // This stores all IDs for the next generation to check. // Its length should equal len(mismatches) + len(missingIds). idsToRecheck := make([]bson.RawValue, 0, len(problems)) - for _, mismatch := range problems { - idsToRecheck = append(idsToRecheck, mismatch.ID) - dataSizes = append(dataSizes, mismatch.dataSize) + for _, problem := range problems { + idsToRecheck = append(idsToRecheck, problem.ID) + dataSizes = append(dataSizes, problem.dataSize) + mismatches = append(mismatches, problem.mismatches) } // Create a task for the next generation to recheck the // mismatched & missing docs. - err := verifier.InsertFailedCompareRecheckDocs(ctx, task.QueryFilter.Namespace, idsToRecheck, dataSizes) + err := verifier.InsertFailedCompareRecheckDocs(ctx, task.QueryFilter.Namespace, idsToRecheck, dataSizes, mismatches) if err != nil { return errors.Wrapf( err, diff --git a/internal/verifier/migration_verifier_test.go b/internal/verifier/migration_verifier_test.go index e714491e..13dfa808 100644 --- a/internal/verifier/migration_verifier_test.go +++ b/internal/verifier/migration_verifier_test.go @@ -358,7 +358,10 @@ func (suite *IntegrationTestSuite) TestVerifier_DocFilter_ObjectID() { task := &VerificationTask{ PrimaryKey: bson.NewObjectID(), - Ids: []any{id1, id2}, + Ids: mslices.Of( + mbson.ToRawValue(id1), + mbson.ToRawValue(id2), + ), QueryFilter: QueryFilter{ Namespace: namespace, To: namespace, @@ -529,9 +532,12 @@ func (suite *IntegrationTestSuite) TestVerifierFetchDocuments() { }) suite.Require().NoError(err) task := &VerificationTask{ - PrimaryKey: bson.NewObjectID(), - Generation: 1, - Ids: []any{id, id + 1}, + PrimaryKey: bson.NewObjectID(), + Generation: 1, + Ids: mslices.Of( + mbson.ToRawValue(id), + mbson.ToRawValue(id+1), + ), QueryFilter: basicQueryFilter("keyhole.dealers"), } @@ -952,6 +958,7 @@ func (suite *IntegrationTestSuite) TestFailedVerificationTaskInsertions() { "foo.bar", mslices.Of(mbson.ToRawValue(42)), []int32{100}, + []int32{0}, ) suite.Require().NoError(err) err = verifier.InsertFailedCompareRecheckDocs( @@ -959,6 +966,7 @@ func (suite *IntegrationTestSuite) TestFailedVerificationTaskInsertions() { "foo.bar", mslices.Of(mbson.ToRawValue(43), mbson.ToRawValue(44)), []int32{100, 100}, + []int32{0, 0}, ) suite.Require().NoError(err) err = verifier.InsertFailedCompareRecheckDocs( @@ -966,6 +974,7 @@ func (suite *IntegrationTestSuite) TestFailedVerificationTaskInsertions() { "foo.bar2", mslices.Of(mbson.ToRawValue(42)), []int32{100}, + []int32{0}, ) suite.Require().NoError(err) diff --git a/internal/verifier/recheck.go b/internal/verifier/recheck.go index 70cbe09b..7c830ea7 100644 --- a/internal/verifier/recheck.go +++ b/internal/verifier/recheck.go @@ -42,7 +42,11 @@ const ( // InsertFailedCompareRecheckDocs is for inserting RecheckDocs based on failures during Check. func (verifier *Verifier) InsertFailedCompareRecheckDocs( ctx context.Context, - namespace string, documentIDs []bson.RawValue, dataSizes []int32) error { + namespace string, + documentIDs []bson.RawValue, + dataSizes []int32, + mismatches []int32, +) error { dbName, collName := SplitNamespace(namespace) dbNames := make([]string, len(documentIDs)) @@ -56,7 +60,7 @@ func (verifier *Verifier) InsertFailedCompareRecheckDocs( Int("count", len(documentIDs)). Msg("Persisting rechecks for mismatched or missing documents.") - return verifier.insertRecheckDocs(ctx, dbNames, collNames, documentIDs, dataSizes) + return verifier.insertRecheckDocs(ctx, dbNames, collNames, documentIDs, dataSizes, mismatches) } func (verifier *Verifier) insertRecheckDocs( @@ -65,6 +69,7 @@ func (verifier *Verifier) insertRecheckDocs( collNames []string, documentIDs []bson.RawValue, dataSizes []int32, + mismatches []int32, ) error { verifier.mux.RLock() defer verifier.mux.RUnlock() @@ -149,6 +154,10 @@ func (verifier *Verifier) insertRecheckDocs( DataSize: dataSizes[i], } + if mismatches != nil { + recheckDoc.Mismatches = mismatches[i] + } + recheckRaw := recheckDoc.MarshalToBSON() curRechecks = append( @@ -286,11 +295,13 @@ func (verifier *Verifier) GenerateRecheckTasks(ctx context.Context) error { // this to prevent one thread from doing all of the rechecks. var prevDBName, prevCollName string - var idAccum []any + var idAccum []bson.RawValue var idsSizer util.BSONArraySizer var totalDocs types.DocumentCount var dataSizeAccum, totalRecheckData int64 + mismatchesMap := map[int32]int32{} + // The sort here is important because the recheck _id is an embedded // document that includes the namespace. Thus, all rechecks for a given // namespace will be consecutive in this query’s result. @@ -323,6 +334,7 @@ func (verifier *Verifier) GenerateRecheckTasks(ctx context.Context) error { task, err := verifier.createDocumentRecheckTask( idAccum, + mismatchesMap, types.ByteCount(dataSizeAccum), namespace, ) @@ -407,14 +419,32 @@ func (verifier *Verifier) GenerateRecheckTasks(ctx context.Context) error { dataSizeAccum = 0 idAccum = idAccum[:0] lastIDRaw = bson.RawValue{} + clear(mismatchesMap) } + // A document can be enqueued for recheck for multiple reasons: + // - changed on source + // - changed on destination + // - mismatch seen + // // We’re iterating the rechecks in order such that, if the same doc // gets enqueued from multiple sources, we’ll see those records // consecutively. We can deduplicate here, then, by checking to see if // the doc ID has changed. (NB: At this point we know the namespace // has *not* changed because we just checked for that.) if idRaw.Equal(lastIDRaw) { + + if doc.Mismatches == 0 { + // A non-mismatch recheck means the document changed. In that + // case we want to clear the mismatch count. This way a document + // that changes over & over won’t seem persistently mismatched + // merely because the replicator hasn’t kept up with the rate + // of change. + lastIDIndex := len(idAccum) - 1 + + delete(mismatchesMap, int32(lastIDIndex)) + } + continue } @@ -422,6 +452,9 @@ func (verifier *Verifier) GenerateRecheckTasks(ctx context.Context) error { idsSizer.Add(idRaw) dataSizeAccum += int64(doc.DataSize) + if doc.Mismatches > 0 { + mismatchesMap[int32(len(idAccum))] = doc.Mismatches + } idAccum = append(idAccum, doc.PrimaryKey.DocumentID) totalRecheckData += int64(doc.DataSize) diff --git a/internal/verifier/recheck/recheck.go b/internal/verifier/recheck/recheck.go index 6186cf57..308f3a0b 100644 --- a/internal/verifier/recheck/recheck.go +++ b/internal/verifier/recheck/recheck.go @@ -120,6 +120,10 @@ type Doc struct { // and any others that may be added will remain unchanged even if a recheck // is enqueued multiple times for the same document in the same generation. DataSize int32 `bson:"dataSize"` + + // Mismatches is the number of generations that have seen a mismatch on + // this document. + Mismatches int32 `bson:"mismatches"` } var _ bson.Marshaler = Doc{} diff --git a/internal/verifier/recheck_persist.go b/internal/verifier/recheck_persist.go index 82db17fc..d6a370c6 100644 --- a/internal/verifier/recheck_persist.go +++ b/internal/verifier/recheck_persist.go @@ -173,5 +173,5 @@ func (verifier *Verifier) PersistChangeEvents(ctx context.Context, batch changeE Time("latestTimestampTime", latestTimestampTime). Msg("Persisting rechecks for change events.") - return verifier.insertRecheckDocs(ctx, dbNames, collNames, docIDs, dataSizes) + return verifier.insertRecheckDocs(ctx, dbNames, collNames, docIDs, dataSizes, nil) } diff --git a/internal/verifier/recheck_test.go b/internal/verifier/recheck_test.go index 0da76289..b277ccac 100644 --- a/internal/verifier/recheck_test.go +++ b/internal/verifier/recheck_test.go @@ -29,6 +29,7 @@ func (suite *IntegrationTestSuite) TestFailedCompareThenReplace() { "the.namespace", []bson.RawValue{mbson.ToRawValue("theDocID")}, []int32{1234}, + []int32{0}, ), "insert failed-comparison recheck", ) @@ -315,7 +316,7 @@ func (suite *IntegrationTestSuite) TestLargeIDInsertions() { t1 := VerificationTask{ Generation: 1, - Ids: []any{id1}, + Ids: mslices.Of(mbson.ToRawValue(id1)), Status: verificationTaskAdded, Type: verificationTaskVerifyDocuments, QueryFilter: QueryFilter{ @@ -327,10 +328,10 @@ func (suite *IntegrationTestSuite) TestLargeIDInsertions() { } t2 := t1 - t2.Ids = []any{id2} + t2.Ids = mslices.Of(mbson.ToRawValue(id2)) t3 := t1 - t3.Ids = []any{id3} + t3.Ids = mslices.Of(mbson.ToRawValue(id3)) suite.ElementsMatch([]VerificationTask{t1, t2, t3}, foundTasks) } @@ -374,9 +375,12 @@ func (suite *IntegrationTestSuite) TestLargeDataInsertions() { t1 := VerificationTask{ Generation: 1, - Ids: []any{id1, id2}, - Status: verificationTaskAdded, - Type: verificationTaskVerifyDocuments, + Ids: mslices.Of( + mbson.ToRawValue(id1), + mbson.ToRawValue(id2), + ), + Status: verificationTaskAdded, + Type: verificationTaskVerifyDocuments, QueryFilter: QueryFilter{ Namespace: "testDB.testColl", To: "testDB.testColl", @@ -386,7 +390,7 @@ func (suite *IntegrationTestSuite) TestLargeDataInsertions() { } t2 := t1 - t2.Ids = []any{id3} + t2.Ids = mslices.Of(mbson.ToRawValue(id3)) t2.SourceDocumentCount = 1 t2.SourceByteCount = 1024 @@ -423,9 +427,13 @@ func (suite *IntegrationTestSuite) TestMultipleNamespaces() { t1 := VerificationTask{ Generation: 1, - Ids: []any{id1, id2, id3}, - Status: verificationTaskAdded, - Type: verificationTaskVerifyDocuments, + Ids: mslices.Of( + mbson.ToRawValue(id1), + mbson.ToRawValue(id2), + mbson.ToRawValue(id3), + ), + Status: verificationTaskAdded, + Type: verificationTaskVerifyDocuments, QueryFilter: QueryFilter{ Namespace: "testDB1.testColl1", To: "testDB1.testColl1", @@ -507,5 +515,12 @@ func insertRecheckDocs( }, ) - return verifier.insertRecheckDocs(ctx, dbNames, collNames, rawIDs, dataSizes) + return verifier.insertRecheckDocs( + ctx, + dbNames, + collNames, + rawIDs, + dataSizes, + make([]int32, len(documentIDs)), + ) } diff --git a/internal/verifier/result.go b/internal/verifier/result.go index 7be6aa32..5b6ed793 100644 --- a/internal/verifier/result.go +++ b/internal/verifier/result.go @@ -37,6 +37,10 @@ type VerificationResult struct { // don't get too large. dataSize int32 + // The number of generations where we’ve seen this document ID mismatched + // without a change event. + mismatches int32 + SrcTimestamp option.Option[bson.Timestamp] DstTimestamp option.Option[bson.Timestamp] } diff --git a/internal/verifier/verification_task.go b/internal/verifier/verification_task.go index a842817e..425c35bf 100644 --- a/internal/verifier/verification_task.go +++ b/internal/verifier/verification_task.go @@ -77,7 +77,7 @@ type VerificationTask struct { Generation int `bson:"generation"` // For recheck tasks, this stores the document IDs to check. - Ids []any `bson:"_ids"` + Ids []bson.RawValue `bson:"_ids"` QueryFilter QueryFilter `bson:"query_filter" json:"query_filter"` @@ -88,6 +88,10 @@ type VerificationTask struct { // ByteCount is like DocumentCount: set when the verifier is done // with the task. SourceByteCount types.ByteCount `bson:"source_bytes_count"` + + // Mismatches correlates an index of Ids with the # of generations where + // this document was seen to mismatch. + Mismatches map[int32]int32 } func (t *VerificationTask) augmentLogWithDetails(evt *zerolog.Event) { @@ -200,7 +204,8 @@ func (verifier *Verifier) InsertPartitionVerificationTask( } func (verifier *Verifier) createDocumentRecheckTask( - ids []any, + ids []bson.RawValue, + mismatches map[int32]int32, dataSize types.ByteCount, srcNamespace string, ) (*VerificationTask, error) { @@ -225,6 +230,7 @@ func (verifier *Verifier) createDocumentRecheckTask( }, SourceDocumentCount: types.DocumentCount(len(ids)), SourceByteCount: dataSize, + Mismatches: mismatches, }, nil } diff --git a/mbson/raw_value.go b/mbson/raw_value.go index b0b1c96c..6765c05f 100644 --- a/mbson/raw_value.go +++ b/mbson/raw_value.go @@ -13,7 +13,7 @@ type bsonCastRecipient interface { } type bsonSourceTypes interface { - string | int | int32 | int64 + string | int | int32 | int64 | bson.ObjectID } type cannotCastErr struct { @@ -121,6 +121,11 @@ func ToRawValue[T bsonSourceTypes](in T) bson.RawValue { return i32ToRawValue(typedIn) case int64: return i64ToRawValue(typedIn) + case bson.ObjectID: + return bson.RawValue{ + Type: bson.TypeObjectID, + Value: bsoncore.AppendObjectID(nil, typedIn), + } case string: return bson.RawValue{ Type: bson.TypeString, diff --git a/mbson/raw_value_test.go b/mbson/raw_value_test.go index 3c35c32c..141b99ae 100644 --- a/mbson/raw_value_test.go +++ b/mbson/raw_value_test.go @@ -132,6 +132,8 @@ func TestObjectID(t *testing.T) { for _, cur := range vals { viaMarshal := MustConvertToRawValue(cur) + assert.Equal(t, viaMarshal, ToRawValue(cur)) + assert.Equal(t, cur, lo.Must(CastRawValue[bson.ObjectID](viaMarshal))) } }