Skip to content

Commit f885cbe

Browse files
committed
Claude sketches parallel-tx re-execution for testing.
1 parent 9819d1b commit f885cbe

File tree

6 files changed

+481
-64
lines changed

6 files changed

+481
-64
lines changed

src/herder/ParallelTxSetBuilder.cpp

Lines changed: 93 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@ struct ParallelPartitionConfig
2424
, mInstructionsPerCluster(sorobanCfg.ledgerMaxInstructions() /
2525
mStageCount)
2626
{
27+
CLOG_INFO(Herder,
28+
"ParallelPartitionConfig: stages={}, clusters/stage={}, "
29+
"instructions/cluster={}, total max instructions={}",
30+
mStageCount, mClustersPerStage, mInstructionsPerCluster,
31+
sorobanCfg.ledgerMaxInstructions());
2732
}
2833

2934
uint64_t
@@ -108,10 +113,17 @@ class Stage
108113
// exceed the theoretical limit of instructions per stage.
109114
if (mInstructions + tx.mInstructions > mConfig.instructionsPerStage())
110115
{
116+
CLOG_DEBUG(Herder,
117+
"Tx {} rejected from stage: would exceed instruction "
118+
"limit ({} + {} > {})",
119+
tx.mId, mInstructions, tx.mInstructions,
120+
mConfig.instructionsPerStage());
111121
return false;
112122
}
113123
// First, find all clusters that conflict with the new transaction.
114124
auto conflictingClusters = getConflictingClusters(tx);
125+
CLOG_DEBUG(Herder, "Tx {} has {} conflicting clusters", tx.mId,
126+
conflictingClusters.size());
115127

116128
// Then, try creating new clusters by merging the conflicting clusters
117129
// together and adding the new transaction to the resulting cluster.
@@ -122,6 +134,9 @@ class Stage
122134
// stage and thus no new clusters could be created.
123135
if (!newClusters)
124136
{
137+
CLOG_DEBUG(Herder,
138+
"Tx {} rejected: cluster would be too large after merge",
139+
tx.mId);
125140
return false;
126141
}
127142
// If it's possible to pack the newly-created cluster into one of the
@@ -175,6 +190,9 @@ class Stage
175190
// the required number of bins.
176191
if (!newPacking)
177192
{
193+
CLOG_DEBUG(Herder,
194+
"Tx {} rejected: bin packing failed after rebuild",
195+
tx.mId);
178196
return false;
179197
}
180198
mClusters = std::move(newClusters.value());
@@ -399,35 +417,53 @@ buildSurgePricedParallelSorobanPhaseWithStageCount(
399417

400418
// Visit the transactions in the surge pricing queue and try to add them to
401419
// at least one of the stages.
402-
auto visitor = [&stages,
403-
&builderTxForTx](TransactionFrameBaseConstPtr const& tx) {
420+
size_t processedCount = 0;
421+
size_t rejectedCount = 0;
422+
auto visitor = [&stages, &builderTxForTx, &processedCount,
423+
&rejectedCount](TransactionFrameBaseConstPtr const& tx) {
404424
bool added = false;
405425
auto builderTxIt = builderTxForTx.find(tx);
406426
releaseAssert(builderTxIt != builderTxForTx.end());
427+
428+
size_t stageNum = 0;
407429
for (auto& stage : stages)
408430
{
409431
if (stage.tryAdd(*builderTxIt->second))
410432
{
411433
added = true;
434+
CLOG_DEBUG(Herder, "Transaction {} added to stage {}",
435+
builderTxIt->second->mId, stageNum);
412436
break;
413437
}
438+
stageNum++;
414439
}
415440
if (added)
416441
{
442+
processedCount++;
417443
return SurgePricingPriorityQueue::VisitTxResult::PROCESSED;
418444
}
419445
// If a transaction didn't fit into any of the stages, we consider it
420446
// to have been excluded due to resource limits and thus notify the
421447
// surge pricing queue that surge pricing should be triggered (
422448
// REJECTED imitates the behavior for exceeding the resource limit
423449
// within the queue itself).
450+
rejectedCount++;
451+
CLOG_DEBUG(Herder,
452+
"Transaction {} rejected - couldn't fit in any stage",
453+
builderTxIt->second->mId);
424454
return SurgePricingPriorityQueue::VisitTxResult::REJECTED;
425455
};
426456

427457
ParallelPhaseBuildResult result;
428458
std::vector<Resource> laneLeftUntilLimitUnused;
429459
queue.popTopTxs(/* allowGaps */ true, visitor, laneLeftUntilLimitUnused,
430460
result.mHadTxNotFittingLane, ledgerVersion);
461+
462+
CLOG_INFO(
463+
Herder,
464+
"Stage building complete: {} processed, {} rejected, stage count {}",
465+
processedCount, rejectedCount, stageCount);
466+
431467
// There is only a single fee lane for Soroban, so there is only a single
432468
// flag that indicates whether there was a transaction that didn't fit into
433469
// lane (and thus all transactions are surge priced at once).
@@ -464,14 +500,24 @@ buildSurgePricedParallelSorobanPhaseWithStageCount(
464500
{
465501
releaseAssert(!cluster.empty());
466502
}
503+
CLOG_DEBUG(Herder, "Stage {} has {} clusters", stages.size() - 1,
504+
resStage.size());
467505
}
468506
// Ensure we don't return any empty stages, which is prohibited by the
469507
// protocol. The algorithm builds the stages such that the stages are
470508
// populated from first to last.
509+
size_t emptyStagesRemoved = 0;
471510
while (!result.mStages.empty() && result.mStages.back().empty())
472511
{
473512
result.mStages.pop_back();
513+
emptyStagesRemoved++;
474514
}
515+
516+
CLOG_INFO(Herder,
517+
"Final result: {} stages, {} empty stages removed, total fee {}",
518+
result.mStages.size(), emptyStagesRemoved,
519+
result.mTotalInclusionFee);
520+
475521
for (auto const& stage : result.mStages)
476522
{
477523
releaseAssert(!stage.empty());
@@ -490,6 +536,9 @@ buildSurgePricedParallelSorobanPhase(
490536
std::vector<bool>& hadTxNotFittingLane, uint32_t ledgerVersion)
491537
{
492538
ZoneScoped;
539+
CLOG_DEBUG(Herder, "Building parallel Soroban phase with {} transactions",
540+
txFrames.size());
541+
493542
// We prefer the transaction sets that are well utilized, but we also want
494543
// to lower the stage count when possible. Thus we will nominate a tx set
495544
// that has the lowest amount of stages while still being within
@@ -507,6 +556,8 @@ buildSurgePricedParallelSorobanPhase(
507556
auto const& txFrame = txFrames[i];
508557
builderTxs.emplace_back(std::make_unique<BuilderTx>(i, *txFrame));
509558
builderTxForTx.emplace(txFrame, builderTxs.back().get());
559+
CLOG_DEBUG(Herder, "Transaction {} has {} instructions", i,
560+
txFrame->sorobanResources().instructions);
510561
}
511562

512563
// Before trying to include any transactions, find all the pairs of the
@@ -542,6 +593,11 @@ buildSurgePricedParallelSorobanPhase(
542593
}
543594
}
544595

596+
CLOG_DEBUG(Herder,
597+
"Found {} RO keys and {} RW keys across all transactions",
598+
txsWithRoKey.size(), txsWithRwKey.size());
599+
600+
size_t totalConflicts = 0;
545601
for (auto const& [key, rwTxIds] : txsWithRwKey)
546602
{
547603
// RW-RW conflicts
@@ -551,6 +607,7 @@ buildSurgePricedParallelSorobanPhase(
551607
{
552608
builderTxs[rwTxIds[i]]->mConflictTxs.set(rwTxIds[j]);
553609
builderTxs[rwTxIds[j]]->mConflictTxs.set(rwTxIds[i]);
610+
totalConflicts += 2;
554611
}
555612
}
556613
// RO-RW conflicts
@@ -564,17 +621,25 @@ buildSurgePricedParallelSorobanPhase(
564621
{
565622
builderTxs[roTxIds[i]]->mConflictTxs.set(rwTxIds[j]);
566623
builderTxs[rwTxIds[j]]->mConflictTxs.set(roTxIds[i]);
624+
totalConflicts += 2;
567625
}
568626
}
569627
}
570628
}
571629

630+
CLOG_DEBUG(Herder, "Found {} total conflicts between transactions",
631+
totalConflicts);
632+
572633
// Process the transactions in the surge pricing (decreasing fee) order.
573634
// This also automatically ensures that the resource limits are respected
574635
// for all the dimensions besides instructions.
575636
SurgePricingPriorityQueue queue(
576637
/* isHighestPriority */ true, laneConfig,
577638
stellar::rand_uniform<size_t>(0, std::numeric_limits<size_t>::max()));
639+
640+
CLOG_DEBUG(Herder, "Adding {} transactions to surge pricing queue",
641+
txFrames.size());
642+
578643
for (auto const& tx : txFrames)
579644
{
580645
queue.add(tx, ledgerVersion);
@@ -586,6 +651,10 @@ buildSurgePricedParallelSorobanPhase(
586651
cfg.SOROBAN_PHASE_MIN_STAGE_COUNT + 1;
587652
std::vector<ParallelPhaseBuildResult> results(stageCountOptions);
588653

654+
CLOG_DEBUG(Herder, "Creating {} threads for stage counts {} to {}",
655+
stageCountOptions, cfg.SOROBAN_PHASE_MIN_STAGE_COUNT,
656+
cfg.SOROBAN_PHASE_MAX_STAGE_COUNT);
657+
589658
for (uint32_t stageCount = cfg.SOROBAN_PHASE_MIN_STAGE_COUNT;
590659
stageCount <= cfg.SOROBAN_PHASE_MAX_STAGE_COUNT; ++stageCount)
591660
{
@@ -611,26 +680,46 @@ buildSurgePricedParallelSorobanPhase(
611680
std::max(maxTotalInclusionFee, result.mTotalInclusionFee);
612681
}
613682
maxTotalInclusionFee *= MAX_INCLUSION_FEE_TOLERANCE_FOR_STAGE_COUNT;
683+
684+
CLOG_DEBUG(Herder, "Max total inclusion fee: {}, tolerance threshold: {}",
685+
maxTotalInclusionFee /
686+
MAX_INCLUSION_FEE_TOLERANCE_FOR_STAGE_COUNT,
687+
maxTotalInclusionFee);
688+
614689
std::optional<size_t> bestResultIndex = std::nullopt;
615690
for (size_t i = 0; i < results.size(); ++i)
616691
{
617692
CLOG_DEBUG(Herder,
618693
"Parallel Soroban tx set nomination: {} stages => {} total "
619694
"inclusion fee",
620-
results[i].mTotalInclusionFee, results[i].mStages.size());
695+
results[i].mStages.size(), results[i].mTotalInclusionFee);
621696
if (results[i].mTotalInclusionFee < maxTotalInclusionFee)
622697
{
698+
CLOG_DEBUG(Herder, "Skipping result {} - fee {} below threshold {}",
699+
i, results[i].mTotalInclusionFee, maxTotalInclusionFee);
623700
continue;
624701
}
625702
if (!bestResultIndex ||
626703
results[i].mStages.size() <
627704
results[bestResultIndex.value()].mStages.size())
628705
{
706+
CLOG_DEBUG(Herder, "New best result: index {} with {} stages", i,
707+
results[i].mStages.size());
629708
bestResultIndex = std::make_optional(i);
630709
}
631710
}
632-
releaseAssert(bestResultIndex.has_value());
711+
712+
if (!bestResultIndex.has_value())
713+
{
714+
CLOG_ERROR(Herder, "No valid parallel phase result found!");
715+
releaseAssert(false);
716+
}
717+
633718
auto& bestResult = results[bestResultIndex.value()];
719+
CLOG_INFO(Herder, "Selected result {} with {} stages and {} total fee",
720+
bestResultIndex.value(), bestResult.mStages.size(),
721+
bestResult.mTotalInclusionFee);
722+
634723
hadTxNotFittingLane = std::move(bestResult.mHadTxNotFittingLane);
635724
return std::move(bestResult.mStages);
636725
}

0 commit comments

Comments
 (0)