Skip to content

Commit 125c3c4

Browse files
Fix: interrupt bundle selection on timeout (#1756)
* Fix: Interrupt bundle selection on timeout Signed-off-by: Fabio Di Fabio <[email protected]> * Apply suggestions from code review Signed-off-by: Fabio Di Fabio <[email protected]> * Added SELECTION_CANCELLED as interrupt reason Signed-off-by: Fabio Di Fabio <[email protected]> * Apply suggestions from code review Signed-off-by: Fabio Di Fabio <[email protected]> * Fix/interrupt bundle selection on timeout test fix (#1758) * fix bundle test * Rename variable Signed-off-by: Fabio Di Fabio <[email protected]> --------- Signed-off-by: Fabio Di Fabio <[email protected]> Co-authored-by: Fluent Crafter <[email protected]>
1 parent 298e981 commit 125c3c4

File tree

2 files changed

+66
-42
lines changed

2 files changed

+66
-42
lines changed

besu-plugins/linea-sequencer/acceptance-tests/src/test/java/linea/plugin/acc/test/rpc/linea/BundleSelectionTimeoutTest.java

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public void multipleBundleSelectionTimeout() throws Exception {
6363
final var mulmodExecutor = deployMulmodExecutor();
6464

6565
final var calls =
66-
IntStream.rangeClosed(1, 10)
66+
IntStream.rangeClosed(1, 11)
6767
.mapToObj(
6868
nonce ->
6969
mulmodOperation(
@@ -74,16 +74,23 @@ public void multipleBundleSelectionTimeout() throws Exception {
7474
BigInteger.valueOf(MAX_TX_GAS_LIMIT / 10)))
7575
.toArray(MulmodCall[]::new);
7676

77+
final var rawTxs = Arrays.stream(calls).map(MulmodCall::rawTx).toArray(String[]::new);
7778
final var sendBundleRequestSmall =
7879
new SendBundleRequest(
79-
new BundleParams(new String[] {calls[0].rawTx()}, Integer.toHexString(2)));
80-
final var sendBundleResponseSmall = sendBundleRequestSmall.execute(minerNode.nodeRequests());
80+
new BundleParams(Arrays.copyOfRange(rawTxs, 0, 1), Integer.toHexString(2)));
8181

82-
final var rawTxs = Arrays.stream(calls).skip(1).map(MulmodCall::rawTx).toArray(String[]::new);
82+
final var sendBundleRequestBig1 =
83+
new SendBundleRequest(
84+
new BundleParams(Arrays.copyOfRange(rawTxs, 1, 10), Integer.toHexString(2)));
85+
// second bundle contains one tx only to be fast to execute,
86+
// and ensure timeout occurs on the 2nd bundle and 3rd is not event considered
87+
final var sendBundleRequestBig2 =
88+
new SendBundleRequest(
89+
new BundleParams(Arrays.copyOfRange(rawTxs, 1, 2), Integer.toHexString(2)));
8390

84-
final var sendBundleRequestBig =
85-
new SendBundleRequest(new BundleParams(rawTxs, Integer.toHexString(2)));
86-
final var sendBundleResponseBig = sendBundleRequestBig.execute(minerNode.nodeRequests());
91+
final var sendBundleResponseSmall = sendBundleRequestSmall.execute(minerNode.nodeRequests());
92+
final var sendBundleResponseBig1 = sendBundleRequestBig1.execute(minerNode.nodeRequests());
93+
final var sendBundleResponseBig2 = sendBundleRequestBig2.execute(minerNode.nodeRequests());
8794

8895
final var transferTxHash =
8996
accountTransactions
@@ -93,15 +100,18 @@ public void multipleBundleSelectionTimeout() throws Exception {
93100
assertThat(sendBundleResponseSmall.hasError()).isFalse();
94101
assertThat(sendBundleResponseSmall.getResult().bundleHash()).isNotBlank();
95102

96-
assertThat(sendBundleResponseBig.hasError()).isFalse();
97-
assertThat(sendBundleResponseBig.getResult().bundleHash()).isNotBlank();
103+
assertThat(sendBundleResponseBig1.hasError()).isFalse();
104+
assertThat(sendBundleResponseBig1.getResult().bundleHash()).isNotBlank();
105+
106+
assertThat(sendBundleResponseBig2.hasError()).isFalse();
107+
assertThat(sendBundleResponseBig2.getResult().bundleHash()).isNotBlank();
98108

99109
minerNode.verify(eth.expectSuccessfulTransactionReceipt(transferTxHash.toHexString()));
100110

101111
// first bundle is successful
102112
minerNode.verify(eth.expectSuccessfulTransactionReceipt(calls[0].txHash()));
103113

104-
// second bundle is not
114+
// following bundles are not selected
105115
Arrays.stream(calls)
106116
.skip(1)
107117
.map(MulmodCall::txHash)

besu-plugins/linea-sequencer/sequencer/src/main/java/net/consensys/linea/sequencer/txselection/LineaTransactionSelectorFactory.java

Lines changed: 46 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,16 @@
99

1010
package net.consensys.linea.sequencer.txselection;
1111

12+
import static org.hyperledger.besu.plugin.data.TransactionSelectionResult.PLUGIN_SELECTION_TIMEOUT;
13+
import static org.hyperledger.besu.plugin.data.TransactionSelectionResult.PLUGIN_SELECTION_TIMEOUT_INVALID_TX;
14+
import static org.hyperledger.besu.plugin.data.TransactionSelectionResult.SELECTION_CANCELLED;
15+
1216
import java.time.Instant;
1317
import java.util.Map;
1418
import java.util.Optional;
1519
import java.util.Set;
1620
import java.util.concurrent.TimeUnit;
21+
import java.util.concurrent.atomic.AtomicBoolean;
1722
import java.util.concurrent.atomic.AtomicReference;
1823
import lombok.extern.slf4j.Slf4j;
1924
import net.consensys.linea.bundles.BundlePoolService;
@@ -29,6 +34,7 @@
2934
import net.consensys.linea.sequencer.txselection.selectors.TransactionEventFilter;
3035
import org.hyperledger.besu.datatypes.Address;
3136
import org.hyperledger.besu.plugin.data.ProcessableBlockHeader;
37+
import org.hyperledger.besu.plugin.data.TransactionSelectionResult;
3238
import org.hyperledger.besu.plugin.services.BlockchainService;
3339
import org.hyperledger.besu.plugin.services.txselection.BlockTransactionSelectionService;
3440
import org.hyperledger.besu.plugin.services.txselection.PluginTransactionSelector;
@@ -56,6 +62,7 @@ public class LineaTransactionSelectorFactory implements PluginTransactionSelecto
5662
private final AtomicReference<LineaTransactionSelector> currSelector = new AtomicReference<>();
5763
private final AtomicReference<Map<Address, Set<TransactionEventFilter>>> deniedEvents;
5864
private final AtomicReference<Map<Address, Set<TransactionEventFilter>>> deniedBundleEvents;
65+
private final AtomicBoolean isSelectionInterrupted = new AtomicBoolean(false);
5966

6067
public LineaTransactionSelectorFactory(
6168
final BlockchainService blockchainService,
@@ -109,8 +116,6 @@ public void selectPendingTransactions(
109116
// check and send liveness bundle if any
110117
checkAndSendLivenessBundle(bts, pendingBlockHeader.getNumber());
111118

112-
if (isSelectionInterrupted()) return;
113-
114119
final var bundlesByBlockNumber =
115120
bundlePoolService.getBundlesByBlockNumber(pendingBlockHeader.getNumber());
116121

@@ -121,20 +126,17 @@ public void selectPendingTransactions(
121126
.addArgument(bundlesByBlockNumber::size)
122127
.log();
123128

124-
if (isSelectionInterrupted()) return;
125-
126129
final var selectionStartedAt = System.nanoTime();
127130

128131
bundlesByBlockNumber.stream()
129-
.takeWhile(unused -> !isSelectionInterrupted())
132+
.takeWhile(unused -> !isSelectionInterrupted.get())
130133
.forEach(
131134
bundle -> {
132135
final var bundleStartedAt = System.nanoTime();
133136
log.trace("Starting evaluation of bundle {}", bundle.bundleIdentifier());
134137

135138
var maybeBadBundleRes =
136139
bundle.pendingTransactions().stream()
137-
.takeWhile(unused -> !isSelectionInterrupted())
138140
.map(bts::evaluatePendingTransaction)
139141
.filter(evalRes -> !evalRes.selected())
140142
.findFirst();
@@ -143,45 +145,46 @@ public void selectPendingTransactions(
143145
final var cumulativeBundleSelectionTime = now - selectionStartedAt;
144146
final var currentBundleSelectionTime = now - bundleStartedAt;
145147

146-
if (isSelectionInterrupted()) {
147-
log.atDebug()
148-
.setMessage(
149-
"Bundle selection interrupted while processing bundle {},"
150-
+ " elapsed time: current bundle {}ms, cumulative {}ms")
151-
.addArgument(bundle::bundleIdentifier)
152-
.addArgument(() -> nanosToMillis(currentBundleSelectionTime))
153-
.addArgument(() -> nanosToMillis(cumulativeBundleSelectionTime))
154-
.log();
155-
rollback(bts);
156-
} else {
157-
if (maybeBadBundleRes.isPresent()) {
148+
if (maybeBadBundleRes.isPresent()) {
149+
final var notSelectedReason = maybeBadBundleRes.get();
150+
151+
if (isSelectionInterrupted(notSelectedReason)) {
152+
isSelectionInterrupted.set(true);
158153
log.atDebug()
159154
.setMessage(
160-
"Failed bundle {}, reason {}, elapsed time: current bundle {}ms, cumulative {}ms")
155+
"Bundle selection interrupted while processing bundle {},"
156+
+ " elapsed time: current bundle {}ms, cumulative {}ms")
161157
.addArgument(bundle::bundleIdentifier)
162-
.addArgument(maybeBadBundleRes::get)
163158
.addArgument(() -> nanosToMillis(currentBundleSelectionTime))
164159
.addArgument(() -> nanosToMillis(cumulativeBundleSelectionTime))
165160
.log();
166-
rollback(bts);
167161
} else {
168162
log.atDebug()
169163
.setMessage(
170-
"Selected bundle {}, elapsed time: current bundle {}ms, cumulative {}ms")
164+
"Failed bundle {}, reason {}, elapsed time: current bundle {}ms, cumulative {}ms")
171165
.addArgument(bundle::bundleIdentifier)
166+
.addArgument(notSelectedReason)
172167
.addArgument(() -> nanosToMillis(currentBundleSelectionTime))
173168
.addArgument(() -> nanosToMillis(cumulativeBundleSelectionTime))
174169
.log();
175-
commit(bts);
176170
}
171+
172+
rollback(bts);
173+
} else {
174+
log.atDebug()
175+
.setMessage(
176+
"Selected bundle {}, elapsed time: current bundle {}ms, cumulative {}ms")
177+
.addArgument(bundle::bundleIdentifier)
178+
.addArgument(() -> nanosToMillis(currentBundleSelectionTime))
179+
.addArgument(() -> nanosToMillis(cumulativeBundleSelectionTime))
180+
.log();
181+
182+
commit(bts);
177183
}
178184
});
179185
} finally {
180186
currSelector.set(null);
181-
if (isSelectionInterrupted()) {
182-
// finally consume the interrupt
183-
Thread.currentThread().interrupt();
184-
}
187+
isSelectionInterrupted.set(false);
185188
}
186189
}
187190

@@ -206,7 +209,18 @@ private void checkAndSendLivenessBundle(
206209
.findFirst();
207210

208211
if (badBundleRes.isPresent()) {
209-
log.debug("Failed liveness bundle {}, reason {}", livenessBundle.get(), badBundleRes);
212+
final var notSelectedReason = badBundleRes.get();
213+
214+
if (isSelectionInterrupted(notSelectedReason)) {
215+
isSelectionInterrupted.set(true);
216+
log.debug(
217+
"Bundle selection interrupted while processing liveness bundle {}, reason {}",
218+
livenessBundle.get(),
219+
notSelectedReason);
220+
} else {
221+
log.debug(
222+
"Failed liveness bundle {}, reason {}", livenessBundle.get(), notSelectedReason);
223+
}
210224
livenessService.get().updateUptimeMetrics(false, headBlockTimestamp);
211225
rollback(bts);
212226
} else {
@@ -231,9 +245,9 @@ private long nanosToMillis(final long nanos) {
231245
return TimeUnit.NANOSECONDS.toMillis(nanos);
232246
}
233247

234-
private boolean isSelectionInterrupted() {
235-
// returns if the thread is interrupted without resetting the state
236-
// so it can be called many times without changing the interrupt state
237-
return Thread.currentThread().isInterrupted();
248+
private boolean isSelectionInterrupted(final TransactionSelectionResult selectionResult) {
249+
return selectionResult.equals(PLUGIN_SELECTION_TIMEOUT)
250+
|| selectionResult.equals(PLUGIN_SELECTION_TIMEOUT_INVALID_TX)
251+
|| selectionResult.equals(SELECTION_CANCELLED);
238252
}
239253
}

0 commit comments

Comments
 (0)