3030#include < iterator>
3131#include < map>
3232#include < streambuf>
33+ #include < numeric>
3334#include < toml.hpp>
3435
3536#include " flow/ActorCollection.h"
@@ -969,20 +970,23 @@ ACTOR Future<Void> testerServerCore(TesterInterface interf,
969970 .detail (" ConsistencyCheckerId" , work.sharedRandomNumber )
970971 .detail (" ClientId" , work.clientId )
971972 .detail (" ClientCount" , work.clientCount );
973+ work.reply .sendError (consistency_check_urgent_duplicate_request ());
972974 } else if (consistencyCheckerUrgentTester.second .isValid () &&
973975 !consistencyCheckerUrgentTester.second .isReady ()) {
974976 TraceEvent (SevWarnAlways, " ConsistencyCheckUrgent_TesterWorkloadConflict" , interf.id ())
975977 .detail (" ExistingConsistencyCheckerId" , consistencyCheckerUrgentTester.first )
976978 .detail (" ArrivingConsistencyCheckerId" , work.sharedRandomNumber )
977979 .detail (" ClientId" , work.clientId )
978980 .detail (" ClientCount" , work.clientCount );
981+ work.reply .sendError (consistency_check_urgent_conflicting_request ());
982+ } else {
983+ consistencyCheckerUrgentTester = std::make_pair (
984+ work.sharedRandomNumber , testerServerConsistencyCheckerUrgentWorkload (work, ccr, dbInfo));
985+ TraceEvent (SevInfo, " ConsistencyCheckUrgent_TesterWorkloadInitialized" , interf.id ())
986+ .detail (" ConsistencyCheckerId" , consistencyCheckerUrgentTester.first )
987+ .detail (" ClientId" , work.clientId )
988+ .detail (" ClientCount" , work.clientCount );
979989 }
980- consistencyCheckerUrgentTester = std::make_pair (
981- work.sharedRandomNumber , testerServerConsistencyCheckerUrgentWorkload (work, ccr, dbInfo));
982- TraceEvent (SevInfo, " ConsistencyCheckUrgent_TesterWorkloadInitialized" , interf.id ())
983- .detail (" ConsistencyCheckerId" , consistencyCheckerUrgentTester.first )
984- .detail (" ClientId" , work.clientId )
985- .detail (" ClientCount" , work.clientCount );
986990 } else {
987991 addWorkload.send (testerServerWorkload (work, ccr, dbInfo, locality));
988992 }
@@ -1737,7 +1741,13 @@ std::unordered_map<int, std::vector<KeyRange>> makeTaskAssignment(Database cx,
17371741 std::vector<KeyRange> shardsToCheck,
17381742 int testersCount,
17391743 int round) {
1744+ ASSERT (testersCount >= 1 );
17401745 std::unordered_map<int , std::vector<KeyRange>> assignment;
1746+
1747+ std::vector<size_t > shuffledIndices (testersCount);
1748+ std::iota (shuffledIndices.begin (), shuffledIndices.end (), 0 ); // creates [0, 1, ..., testersCount - 1]
1749+ deterministicRandom ()->randomShuffle (shuffledIndices);
1750+
17411751 int batchSize = CLIENT_KNOBS->CONSISTENCY_CHECK_URGENT_BATCH_SHARD_COUNT ;
17421752 int startingPoint = 0 ;
17431753 if (shardsToCheck.size () > batchSize * testersCount) {
@@ -1752,7 +1762,17 @@ std::unordered_map<int, std::vector<KeyRange>> makeTaskAssignment(Database cx,
17521762 if (testerIdx > testersCount - 1 ) {
17531763 break ; // Have filled up all testers
17541764 }
1755- assignment[testerIdx].push_back (shardsToCheck[i]);
1765+ // When assigning a shards/batch to a tester idx, there are certain edge cases which can result in urgent
1766+ // consistency checker being infinetely stuck in a loop. Examples:
1767+ // 1. if there is 1 remaining shard, and tester 0 consistently fails, we will still always pick tester 0
1768+ // 2. if there are 10 remaining shards, and batch size is 10, and tester 0 consistently fails, we will
1769+ // still always pick tester 0
1770+ // 3. if there are 20 remaining shards, and batch size is 10, and testers {0, 1} consistently fail, we will
1771+ // keep picking testers {0, 1}
1772+ // To avoid repeatedly picking the same testers even though they could be failing, shuffledIndices provides an
1773+ // indirection to a random tester idx. That way, each invocation of makeTaskAssignment won't
1774+ // result in the same task assignment for the class of edge cases mentioned above.
1775+ assignment[shuffledIndices[testerIdx]].push_back (shardsToCheck[i]);
17561776 }
17571777 std::unordered_map<int , std::vector<KeyRange>>::iterator assignIt;
17581778 for (assignIt = assignment.begin (); assignIt != assignment.end (); assignIt++) {
0 commit comments