Skip to content

Commit 36a16a5

Browse files
authored
Merge pull request #46 from Consensys/fix-sync-for-old-state
fix sync for old state
2 parents 26e9784 + 637387d commit 36a16a5

14 files changed

+347
-102
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
# Changelog
22

33
## Unreleased
4+
- Fixed a bug that prevented the node from syncing with an old snapshot [#46](https://github.com/Consensys/besu-fleet-plugin/pull/46)
5+
## 0.3.8
46
- `--plugin-fleet-follower-http-host` and `--plugin-fleet-follower-http-port` options are deprecated and will be removed in a future release.
57
- use BesuConfiguration service to get RPC host and port values as configured in Besu
8+
- Changes the logic to have the new head information directly in the besu event. [#45](https://github.com/Consensys/besu-fleet-plugin/pull/45)
69

710
## 0.2.0
811
- fix cli parameter parsing regression in 0.1.0

src/main/java/net/consensys/fleet/common/plugin/FleetPlugin.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@
3333
import net.consensys.fleet.leader.peer.LeaderPeerNetworkMaintainer;
3434
import net.consensys.fleet.leader.rpc.client.FleetShipNewHeadClient;
3535
import net.consensys.fleet.leader.rpc.server.FleetAddFollowerServer;
36-
import net.consensys.fleet.leader.rpc.server.FleetGetBlockServer;
36+
import net.consensys.fleet.leader.rpc.server.FleetGetBlockByHashServer;
37+
import net.consensys.fleet.leader.rpc.server.FleetGetBlockByNumberServer;
3738

3839
import java.util.ArrayList;
3940
import java.util.List;
@@ -237,7 +238,8 @@ private List<PluginRpcMethod> createServerMethods() {
237238
new BlockContextProvider(pluginServiceProvider, new FleetGetBlockClient(webClient));
238239
methods.add(new FleetGetConfigServer(convertMapperProvider));
239240
methods.add(new FleetAddFollowerServer(peerManagers));
240-
methods.add(new FleetGetBlockServer(convertMapperProvider, pluginServiceProvider));
241+
methods.add(new FleetGetBlockByHashServer(convertMapperProvider, pluginServiceProvider));
242+
methods.add(new FleetGetBlockByNumberServer(convertMapperProvider, pluginServiceProvider));
241243
fleetModeSynchronizer =
242244
new FleetModeSynchronizer(
243245
pluginServiceProvider,

src/main/java/net/consensys/fleet/common/rpc/client/AbstractStateRpcSender.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@ public AbstractStateRpcSender(final WebClientWrapper webClient) {
2525
this.webClient = webClient;
2626
}
2727

28-
protected abstract String getMethodeName();
29-
3028
public static String getENDPOINT() {
3129
return ENDPOINT;
3230
}

src/main/java/net/consensys/fleet/common/rpc/model/GetBlockRequest.java renamed to src/main/java/net/consensys/fleet/common/rpc/model/AbstractGetBlockRequest.java

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,28 +15,21 @@
1515
package net.consensys.fleet.common.rpc.model;
1616

1717
import com.fasterxml.jackson.annotation.JsonProperty;
18-
import org.hyperledger.besu.datatypes.Hash;
1918

20-
public class GetBlockRequest {
21-
22-
@JsonProperty("blockHash")
23-
private Hash blockHash;
19+
public abstract class AbstractGetBlockRequest {
2420

2521
@JsonProperty("fetchReceipts")
2622
private boolean fetchReceipts;
2723

28-
public GetBlockRequest() {}
24+
public AbstractGetBlockRequest() {}
2925

30-
public GetBlockRequest(final Hash blockHash, final boolean fetchReceipts) {
31-
this.blockHash = blockHash;
26+
public AbstractGetBlockRequest(final boolean fetchReceipts) {
3227
this.fetchReceipts = fetchReceipts;
3328
}
3429

35-
public Hash getBlockHash() {
36-
return blockHash;
37-
}
38-
3930
public boolean isFetchReceipts() {
4031
return fetchReceipts;
4132
}
33+
34+
public abstract String getMethodName();
4235
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright ConsenSys 2023
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*
13+
* SPDX-License-Identifier: Apache-2.0
14+
*/
15+
package net.consensys.fleet.common.rpc.model;
16+
17+
import com.fasterxml.jackson.annotation.JsonIgnore;
18+
import com.fasterxml.jackson.annotation.JsonProperty;
19+
import org.hyperledger.besu.datatypes.Hash;
20+
21+
public class GetBlockByHashRequest extends AbstractGetBlockRequest {
22+
23+
@JsonProperty("blockHash")
24+
private Hash blockHash;
25+
26+
public GetBlockByHashRequest() {}
27+
28+
public GetBlockByHashRequest(final Hash blockHash, final boolean fetchReceipts) {
29+
super(fetchReceipts);
30+
this.blockHash = blockHash;
31+
}
32+
33+
@JsonIgnore
34+
@Override
35+
public String getMethodName() {
36+
return "fleet_getBlockByHash";
37+
}
38+
39+
public Hash getBlockHash() {
40+
return blockHash;
41+
}
42+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Copyright ConsenSys 2023
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*
13+
* SPDX-License-Identifier: Apache-2.0
14+
*/
15+
package net.consensys.fleet.common.rpc.model;
16+
17+
import com.fasterxml.jackson.annotation.JsonIgnore;
18+
import com.fasterxml.jackson.annotation.JsonProperty;
19+
20+
public class GetBlockByNumberRequest extends AbstractGetBlockRequest {
21+
22+
@JsonProperty("blockNumber")
23+
private long blockNumber;
24+
25+
public GetBlockByNumberRequest() {}
26+
27+
public GetBlockByNumberRequest(final long blockNumber, final boolean fetchReceipts) {
28+
super(fetchReceipts);
29+
this.blockNumber = blockNumber;
30+
}
31+
32+
@JsonIgnore
33+
@Override
34+
public String getMethodName() {
35+
return "fleet_getBlockByNumber";
36+
}
37+
38+
public long getBlockNumber() {
39+
return blockNumber;
40+
}
41+
}

src/main/java/net/consensys/fleet/follower/rpc/client/FleetAddFollowerClient.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,17 +32,12 @@ public FleetAddFollowerClient(final WebClientWrapper webClient) {
3232
super(webClient);
3333
}
3434

35-
@Override
36-
protected String getMethodeName() {
37-
return METHOD_NAME;
38-
}
39-
4035
@Override
4136
public CompletableFuture<Boolean> sendData(PeerNode data) {
4237
final CompletableFuture<Boolean> completableFuture = new CompletableFuture<>();
4338
try {
4439
webClient
45-
.sendToLeader(ENDPOINT, getMethodeName(), data)
40+
.sendToLeader(ENDPOINT, METHOD_NAME, data)
4641
.whenComplete(
4742
(bufferHttpResponse, throwable) -> {
4843
completableFuture.complete(isConnected(bufferHttpResponse, throwable));

src/main/java/net/consensys/fleet/follower/rpc/client/FleetGetBlockClient.java

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,32 +16,27 @@
1616

1717
import net.consensys.fleet.common.rpc.client.AbstractStateRpcSender;
1818
import net.consensys.fleet.common.rpc.client.WebClientWrapper;
19-
import net.consensys.fleet.common.rpc.model.GetBlockRequest;
19+
import net.consensys.fleet.common.rpc.model.AbstractGetBlockRequest;
2020
import net.consensys.fleet.common.rpc.model.GetBlockResponse;
2121

2222
import java.util.concurrent.CompletableFuture;
2323

2424
import com.fasterxml.jackson.core.JsonProcessingException;
2525

26-
public class FleetGetBlockClient extends AbstractStateRpcSender<GetBlockRequest, GetBlockResponse> {
27-
28-
private static final String METHOD_NAME = "fleet_getBlock";
26+
public class FleetGetBlockClient
27+
extends AbstractStateRpcSender<AbstractGetBlockRequest, GetBlockResponse> {
2928

3029
public FleetGetBlockClient(final WebClientWrapper webClient) {
3130
super(webClient);
3231
}
3332

3433
@Override
35-
protected String getMethodeName() {
36-
return METHOD_NAME;
37-
}
38-
39-
@Override
40-
public CompletableFuture<GetBlockResponse> sendData(final GetBlockRequest blockNumber) {
34+
public CompletableFuture<GetBlockResponse> sendData(
35+
final AbstractGetBlockRequest getBlockRequest) {
4136
final CompletableFuture<GetBlockResponse> completableFuture = new CompletableFuture<>();
4237
try {
4338
webClient
44-
.sendToLeader(ENDPOINT, getMethodeName(), blockNumber)
39+
.sendToLeader(ENDPOINT, getBlockRequest.getMethodName(), getBlockRequest)
4540
.whenCompleteAsync(
4641
(result, throwable) -> {
4742
if (throwable == null) {

src/main/java/net/consensys/fleet/follower/sync/BlockContextProvider.java

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515
package net.consensys.fleet.follower.sync;
1616

1717
import net.consensys.fleet.common.plugin.PluginServiceProvider;
18-
import net.consensys.fleet.common.rpc.model.GetBlockRequest;
18+
import net.consensys.fleet.common.rpc.model.AbstractGetBlockRequest;
19+
import net.consensys.fleet.common.rpc.model.GetBlockByHashRequest;
20+
import net.consensys.fleet.common.rpc.model.GetBlockByNumberRequest;
1921
import net.consensys.fleet.common.rpc.model.GetBlockResponse;
2022
import net.consensys.fleet.common.rpc.model.NewHeadParams;
2123
import net.consensys.fleet.follower.rpc.client.FleetGetBlockClient;
@@ -53,8 +55,8 @@ public BlockContextProvider(
5355
this.getBlockClient = getBlockClient;
5456
}
5557

56-
public Optional<FleetBlockContext> getLeaderBlockContextByNumber(
57-
final CompositeBlockKey compositeBlockKey, final boolean fetchReceipts) {
58+
private Optional<FleetBlockContext> getLeaderBlockContext(
59+
final CompositeBlockKey compositeBlockKey, AbstractGetBlockRequest request) {
5860
try {
5961
Optional<FleetBlockContext> cachedContext =
6062
Optional.ofNullable(leaderBlock.getIfPresent(compositeBlockKey));
@@ -63,10 +65,7 @@ public Optional<FleetBlockContext> getLeaderBlockContextByNumber(
6365
return cachedContext;
6466
}
6567

66-
GetBlockResponse response =
67-
getBlockClient
68-
.sendData(new GetBlockRequest(compositeBlockKey.getBlockHash(), fetchReceipts))
69-
.get();
68+
GetBlockResponse response = getBlockClient.sendData(request).get();
7069

7170
FleetBlockContext context =
7271
new FleetBlockContext(
@@ -82,6 +81,20 @@ public Optional<FleetBlockContext> getLeaderBlockContextByNumber(
8281
}
8382
}
8483

84+
public Optional<FleetBlockContext> getLeaderBlockContextByHash(
85+
final CompositeBlockKey compositeBlockKey, final boolean fetchReceipts) {
86+
return getLeaderBlockContext(
87+
compositeBlockKey,
88+
new GetBlockByHashRequest(compositeBlockKey.getBlockHash(), fetchReceipts));
89+
}
90+
91+
public Optional<FleetBlockContext> getLeaderBlockContextByNumber(
92+
final CompositeBlockKey compositeBlockKey, final boolean fetchReceipts) {
93+
return getLeaderBlockContext(
94+
compositeBlockKey,
95+
new GetBlockByNumberRequest(compositeBlockKey.getBlockNumber(), fetchReceipts));
96+
}
97+
8598
public void provideLeaderBlockContext(final NewHeadParams newHeadParams) {
8699
CompositeBlockKey key = new CompositeBlockKey(newHeadParams.getHead());
87100
leaderBlock.put(

src/main/java/net/consensys/fleet/follower/sync/FleetModeSynchronizer.java

Lines changed: 41 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,29 @@ private void disableWaitingSync() {
115115
syncDelay = 1;
116116
}
117117

118+
/**
119+
* Starts the synchronization process between the follower node and the captain chain head.
120+
*
121+
* <p>The synchronization is divided into two main mechanisms depending on the distance from the
122+
* chain head:
123+
*
124+
* <p>1. Far from head (when captainHead-followerHead > maxBlocksPerPersist): Blocks are fetched
125+
* by block number. This approach allows retrieving a range of blocks efficiently without needing
126+
* to know their hashes. If we were to fetch by block hash, we would have to walk back from the
127+
* head one block at a time to discover the right hashes.
128+
*
129+
* <p>2. Close to head: Once synchronization is close enough to the head, it switches to fetching
130+
* blocks by hash. This is more reliable and ensures we are on the correct fork.
131+
*
132+
* <p>Fetching by block number is safe because block hash validation is performed once we approach
133+
* the head. If the remote peer (captain) is on a different fork, it will be detected during the
134+
* hash-based phase, and the older blocks will be updated
135+
*
136+
* <p>After each range of blocks is retrieved, trielogs are applied in the same manner as in the
137+
* Bonsai trie model.
138+
*/
118139
private void startSync() {
119-
if (syncScheduler == null || syncScheduler.isDone()) {
140+
if (syncScheduler == null || syncScheduler.isDone() || syncScheduler.isCancelled()) {
120141
syncScheduler =
121142
EXECUTOR_SERVICE.schedule(
122143
() -> {
@@ -142,18 +163,19 @@ private void startSync() {
142163
long persistedBlockNumber = persistedBlock.getBlockHeader().getNumber();
143164

144165
FleetBlockContext targetBlock;
145-
if ((leaderHeader.getFinalizedBlockNumber()
166+
if ((leaderHeader.getHead().getNumber()
146167
- persistedBlock.getBlockHeader().getNumber())
147168
> maxBlocksPerPersist) {
148169
targetBlock =
149-
getLeaderBlockContext(
170+
getLeaderBlockContextByNumber(
150171
new CompositeBlockKey(
151-
leaderHeader.getFinalizedBlockNumber(),
152-
leaderHeader.getFinalizedBlockHash()))
172+
persistedBlock.getBlockHeader().getNumber()
173+
+ maxBlocksPerPersist))
153174
.orElseThrow(MissingBlockException::new);
154175
} else {
155176
targetBlock =
156-
getLeaderBlockContext(new CompositeBlockKey(leaderHeader.getHead()))
177+
getLeaderBlockContextByHash(
178+
new CompositeBlockKey(leaderHeader.getHead()))
157179
.orElseThrow(MissingBlockException::new);
158180
}
159181

@@ -186,7 +208,7 @@ private void startSync() {
186208
< targetBlock.getBlockHeader().getNumber()) {
187209
LOG.debug("Rollforward {}", targetBlockHash);
188210
final FleetBlockContext toRollForwardBlock =
189-
getLeaderBlockContext(
211+
getLeaderBlockContextByHash(
190212
new CompositeBlockKey(targetBlock.getBlockHeader()))
191213
.orElseThrow(MissingBlockException::new);
192214
rollForward.add(toRollForwardBlock);
@@ -199,7 +221,7 @@ private void startSync() {
199221
targetBlock = persistedBlock;
200222
} else {
201223
targetBlock =
202-
getLeaderBlockContext(
224+
getLeaderBlockContextByHash(
203225
new CompositeBlockKey(
204226
targetBlock.getBlockHeader().getNumber() - 1,
205227
targetBlock.getBlockHeader().getParentHash()))
@@ -219,11 +241,11 @@ private void startSync() {
219241
LOG.debug("Paired rollforward {}", targetBlockHash);
220242

221243
rollForward.add(
222-
getLeaderBlockContext(
244+
getLeaderBlockContextByHash(
223245
new CompositeBlockKey(targetBlock.getBlockHeader()))
224246
.orElseThrow(MissingBlockException::new));
225247
targetBlock =
226-
getLeaderBlockContext(
248+
getLeaderBlockContextByHash(
227249
new CompositeBlockKey(
228250
targetBlock.getBlockHeader().getNumber() - 1,
229251
targetBlock.getBlockHeader().getParentHash()))
@@ -390,7 +412,15 @@ private boolean isBlockchainServiceReady() {
390412
return pluginServiceProvider.isServiceAvailable(BlockchainService.class);
391413
}
392414

393-
private Optional<FleetBlockContext> getLeaderBlockContext(
415+
private Optional<FleetBlockContext> getLeaderBlockContextByHash(
416+
final CompositeBlockKey compositeBlockKey) {
417+
return blockContextProvider.getLeaderBlockContextByHash(
418+
compositeBlockKey,
419+
(leaderHeader.getHead().getNumber() - compositeBlockKey.getBlockNumber())
420+
<= headDistanceForReceiptFetch);
421+
}
422+
423+
private Optional<FleetBlockContext> getLeaderBlockContextByNumber(
394424
final CompositeBlockKey compositeBlockKey) {
395425
return blockContextProvider.getLeaderBlockContextByNumber(
396426
compositeBlockKey,

0 commit comments

Comments
 (0)