Skip to content

Commit 94ac213

Browse files
Merge pull request #45 from Consensys/feature/push-for-new-head
new head information directly in the besu event
2 parents 80d0c26 + 4890919 commit 94ac213

File tree

14 files changed

+333
-160
lines changed

14 files changed

+333
-160
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ Or, run the besu with follower mode:
3535
--Xchain-pruning-blocks-retained=512 \
3636
--plugin-fleet-leader-http-host=127.0.0.1 \
3737
--plugin-fleet-leader-http-port=8545
38+
--plugin-fleet-follower-http-host=127.0.0.1 \
39+
--plugin-fleet-follower-http-port=8888
3840
```
3941

4042
# Download

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
releaseVersion=0.3.8-SNAPSHOT
2-
besuVersion=25.4.1
2+
besuVersion=25.5.0

src/main/java/net/consensys/fleet/common/config/FleetOptions.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,15 +67,13 @@ public class FleetOptions {
6767
description = "HTTP host port of the leader peer")
6868
Integer leaderPeerHttpPort = DEFAULT_LEADER_PEER_HTTP_PORT;
6969

70-
@Deprecated
7170
@CommandLine.Option(
7271
names = {OPTION_FOLLOWER_PEER_HTTP_HOST},
7372
hidden = true,
7473
paramLabel = "<STRING>",
7574
description = "HTTP host of the follower peer")
7675
String followerPeerHttpHost = DEFAULT_FOLLOWER_PEER_HTTP_HOST;
7776

78-
@Deprecated
7977
@CommandLine.Option(
8078
names = {OPTION_FOLLOWER_PEER_HTTP_PORT},
8179
hidden = true,
@@ -123,12 +121,10 @@ public Integer getLeaderPeerHttpPort() {
123121
return leaderPeerHttpPort;
124122
}
125123

126-
@Deprecated
127124
public String getFollowerPeerHttpHost() {
128125
return followerPeerHttpHost;
129126
}
130127

131-
@Deprecated
132128
public Integer getFollowerPeerHttpPort() {
133129
return followerPeerHttpPort;
134130
}
@@ -151,6 +147,8 @@ public String toString() {
151147
.add("nodeRole", nodeRole)
152148
.add("leaderPeerHttpHost", leaderPeerHttpHost)
153149
.add("leaderPeerHttpPort", leaderPeerHttpPort)
150+
.add("followerPeerHttpHost", followerPeerHttpHost)
151+
.add("followerPeerHttpPort", followerPeerHttpPort)
154152
.add("followerHeartBeatDelay", followerHeartBeatDelay)
155153
.add("persistRangeSize", maxBlocksPerPersist)
156154
.add("headDistanceForReceiptFetch", headDistanceForReceiptFetch)

src/main/java/net/consensys/fleet/common/plugin/FleetPlugin.java

Lines changed: 14 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
import com.google.auto.service.AutoService;
4545
import org.hyperledger.besu.plugin.BesuPlugin;
4646
import org.hyperledger.besu.plugin.ServiceManager;
47-
import org.hyperledger.besu.plugin.services.BesuConfiguration;
4847
import org.hyperledger.besu.plugin.services.BesuEvents;
4948
import org.hyperledger.besu.plugin.services.BlockchainService;
5049
import org.hyperledger.besu.plugin.services.PicoCLIOptions;
@@ -90,11 +89,11 @@ public void register(final ServiceManager serviceManager) {
9089
}
9190
cmdlineOptions.get().addPicoCLIOptions(NAME, CLI_OPTIONS);
9291

93-
LOG.debug("Creating peer manager");
92+
LOG.info("Creating peer manager");
9493
peerManagers = new PeerNodesManager();
9594
this.webClient = new WebClientWrapper(convertMapperProvider, peerManagers);
9695

97-
LOG.debug("Setting up RPC endpoints");
96+
LOG.info("Setting up RPC endpoints");
9897
final List<PluginRpcMethod> pluginRpcMethods = createServerMethods();
9998
serviceManager
10099
.getService(RpcEndpointService.class)
@@ -110,7 +109,7 @@ public void register(final ServiceManager serviceManager) {
110109
method.getNamespace(), method.getName(), method::execute);
111110
}));
112111

113-
LOG.debug("Registering trieLog service");
112+
LOG.info("Registering trieLog service");
114113
final TrieLogService trieLogService = new FleetTrieLogService();
115114
serviceManager.addService(TrieLogService.class, trieLogService);
116115
pluginServiceProvider.provideService(TrieLogService.class, () -> trieLogService);
@@ -119,7 +118,7 @@ public void register(final ServiceManager serviceManager) {
119118
@Override
120119
public void start() {
121120

122-
LOG.debug("Loading RLP converter service");
121+
LOG.info("Loading RLP converter service");
123122
final RlpConverterService rlpConverterService =
124123
serviceManager
125124
.getService(RlpConverterService.class)
@@ -129,15 +128,15 @@ public void start() {
129128
"Expecting a RLP converter service, but none found."));
130129
pluginServiceProvider.provideService(RlpConverterService.class, () -> rlpConverterService);
131130

132-
LOG.debug("Loading blockchain service");
131+
LOG.info("Loading blockchain service");
133132
final BlockchainService blockchainService =
134133
serviceManager
135134
.getService(BlockchainService.class)
136135
.orElseThrow(
137136
() -> new IllegalStateException("Expecting a blockchain service, but none found."));
138137
pluginServiceProvider.provideService(BlockchainService.class, () -> blockchainService);
139138

140-
LOG.debug("Loading synchronization service");
139+
LOG.info("Loading synchronization service");
141140
final SynchronizationService synchronizationService =
142141
serviceManager
143142
.getService(SynchronizationService.class)
@@ -148,7 +147,7 @@ public void start() {
148147
pluginServiceProvider.provideService(
149148
SynchronizationService.class, () -> synchronizationService);
150149

151-
LOG.debug("Loading P2P network service");
150+
LOG.info("Loading P2P network service");
152151
final P2PService p2PService =
153152
serviceManager
154153
.getService(P2PService.class)
@@ -157,6 +156,8 @@ public void start() {
157156
new IllegalStateException("Expecting a P2P network service, but none found."));
158157
pluginServiceProvider.provideService(P2PService.class, () -> p2PService);
159158

159+
loadingClientsMethods();
160+
160161
createPeerNetworkMaintainer();
161162
}
162163

@@ -175,18 +176,8 @@ public void stop() {
175176
}
176177

177178
private void createPeerNetworkMaintainer() {
178-
LOG.debug("Setting up connection parameters");
179+
LOG.info("Setting up connection parameters");
179180
final PeerNetworkMaintainer peerNetworkMaintainer;
180-
181-
LOG.debug("Loading BesuConfiguration service");
182-
final BesuConfiguration besuConfigurationService =
183-
serviceManager
184-
.getService(BesuConfiguration.class)
185-
.orElseThrow(
186-
() ->
187-
new IllegalStateException(
188-
"Expecting a BesuConfiguration service, but none found."));
189-
190181
switch (CLI_OPTIONS.getNodeRole()) {
191182
case LEADER -> {
192183
/* ********** LEADER ************* */
@@ -198,8 +189,8 @@ private void createPeerNetworkMaintainer() {
198189
new FollowerPeerNetworkMaintainer(
199190
CLI_OPTIONS.getLeaderPeerHttpHost(),
200191
CLI_OPTIONS.getLeaderPeerHttpPort(),
201-
besuConfigurationService.getRpcHttpHost().orElse("default"),
202-
besuConfigurationService.getRpcHttpPort().orElse(0),
192+
CLI_OPTIONS.getFollowerPeerHttpHost(),
193+
CLI_OPTIONS.getFollowerPeerHttpPort(),
203194
CLI_OPTIONS.getFollowerHeartBeatDelay(),
204195
peerManagers,
205196
webClient);
@@ -255,8 +246,7 @@ private List<PluginRpcMethod> createServerMethods() {
255246
CLI_OPTIONS.getHeadDistanceForReceiptFetch());
256247
methods.add(
257248
new FleetShipNewHeadServer(
258-
(head, safeBlock, finalizedBlock) ->
259-
fleetModeSynchronizer.syncNewHead(head, safeBlock, finalizedBlock),
249+
(newHeadParams) -> fleetModeSynchronizer.syncNewHead(newHeadParams),
260250
convertMapperProvider,
261251
pluginServiceProvider));
262252
return methods;
@@ -304,7 +294,7 @@ private boolean isFollower() {
304294
}
305295

306296
private void disableTransactionPool() {
307-
LOG.debug("Disable transaction pool");
297+
LOG.info("Disable transaction pool");
308298
serviceManager
309299
.getService(TransactionPoolService.class)
310300
.ifPresent(TransactionPoolService::disableTransactionPool);

src/main/java/net/consensys/fleet/common/rpc/client/WebClientWrapper.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ public CompletableFuture<HttpResponse<Buffer>> sendToLeader(
6464

6565
webClient
6666
.post(leader.port(), leader.host(), endpoint)
67+
.timeout(100)
6768
.putHeader("Content-Type", CONTENT_TYPE)
6869
.sendJsonObject(
6970
jsonObject,
@@ -98,6 +99,7 @@ public void sendToFollowers(final String endpoint, final String methodName, fina
9899
peerNode -> {
99100
webClient
100101
.post(peerNode.port(), peerNode.host(), endpoint)
102+
.timeout(100)
101103
.putHeader("Content-Type", CONTENT_TYPE)
102104
.sendJsonObject(
103105
jsonObject,

src/main/java/net/consensys/fleet/common/rpc/json/ConvertMapperProvider.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
import net.consensys.fleet.common.plugin.PluginServiceProvider;
1818

19+
import com.fasterxml.jackson.annotation.JsonInclude;
1920
import com.fasterxml.jackson.databind.ObjectMapper;
2021
import org.hyperledger.besu.plugin.services.rlp.RlpConverterService;
2122

@@ -32,6 +33,7 @@ public ObjectMapper getJsonConverter() {
3233
if (objectMapper == null) {
3334
if (pluginServiceProvider.isServiceAvailable(RlpConverterService.class)) {
3435
objectMapper = new ObjectMapper();
36+
objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
3537
objectMapper.registerModule(
3638
new JsonModule(pluginServiceProvider.getService(RlpConverterService.class)));
3739
} else {

src/main/java/net/consensys/fleet/common/rpc/model/GetBlockRequest.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,24 +15,25 @@
1515
package net.consensys.fleet.common.rpc.model;
1616

1717
import com.fasterxml.jackson.annotation.JsonProperty;
18+
import org.hyperledger.besu.datatypes.Hash;
1819

1920
public class GetBlockRequest {
2021

21-
@JsonProperty("blockNumber")
22-
private Long blockNumber;
22+
@JsonProperty("blockHash")
23+
private Hash blockHash;
2324

2425
@JsonProperty("fetchReceipts")
2526
private boolean fetchReceipts;
2627

2728
public GetBlockRequest() {}
2829

29-
public GetBlockRequest(final Long blockNumber, final boolean fetchReceipts) {
30-
this.blockNumber = blockNumber;
30+
public GetBlockRequest(final Hash blockHash, final boolean fetchReceipts) {
31+
this.blockHash = blockHash;
3132
this.fetchReceipts = fetchReceipts;
3233
}
3334

34-
public Long getBlockNumber() {
35-
return blockNumber;
35+
public Hash getBlockHash() {
36+
return blockHash;
3637
}
3738

3839
public boolean isFetchReceipts() {

src/main/java/net/consensys/fleet/common/rpc/model/NewHeadParams.java

Lines changed: 76 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,38 +14,103 @@
1414
*/
1515
package net.consensys.fleet.common.rpc.model;
1616

17+
import java.util.List;
18+
1719
import com.fasterxml.jackson.annotation.JsonProperty;
1820
import org.hyperledger.besu.datatypes.Hash;
21+
import org.hyperledger.besu.plugin.data.BlockBody;
1922
import org.hyperledger.besu.plugin.data.BlockHeader;
23+
import org.hyperledger.besu.plugin.data.TransactionReceipt;
2024

2125
public class NewHeadParams {
2226

2327
@JsonProperty("newHead")
2428
private BlockHeader head;
2529

26-
@JsonProperty("safeBlock")
27-
private Hash safeBlock;
30+
@JsonProperty("headBody")
31+
private BlockBody blockBody;
32+
33+
@JsonProperty("headReceipts")
34+
private List<TransactionReceipt> receipts;
35+
36+
@JsonProperty("headTrieLogRlp")
37+
private String trieLogRlp;
38+
39+
@JsonProperty("safeBlockHash")
40+
private Hash safeBlockHash;
41+
42+
@JsonProperty("safeBlockNumber")
43+
private long safeBlockNumber;
2844

29-
@JsonProperty("finalizedBlock")
30-
private Hash finalizedBlock;
45+
@JsonProperty("finalizedBlockHash")
46+
private Hash finalizedBlockHash;
47+
48+
@JsonProperty("finalizedBlockNumber")
49+
private long finalizedBlockNumber;
3150

3251
public NewHeadParams() {}
3352

34-
public NewHeadParams(final BlockHeader head, final Hash safeBlock, final Hash finalizedBlock) {
53+
public NewHeadParams(
54+
final BlockHeader head,
55+
final BlockBody blockBody,
56+
final List<TransactionReceipt> receipts,
57+
final String trieLogRlp,
58+
final Hash safeBlockHash,
59+
final long safeBlockNumber,
60+
final Hash finalizedBlockHash,
61+
final long finalizedBlockNumber) {
62+
this.head = head;
63+
this.blockBody = blockBody;
64+
this.receipts = receipts;
65+
this.trieLogRlp = trieLogRlp;
66+
this.safeBlockHash = safeBlockHash;
67+
this.safeBlockNumber = safeBlockNumber;
68+
this.finalizedBlockHash = finalizedBlockHash;
69+
this.finalizedBlockNumber = finalizedBlockNumber;
70+
}
71+
72+
public NewHeadParams(
73+
final BlockHeader head,
74+
final Hash safeBlockHash,
75+
final long safeBlockNumber,
76+
final Hash finalizedBlockHash,
77+
final long finalizedBlockNumber) {
78+
this.safeBlockHash = safeBlockHash;
79+
this.safeBlockNumber = safeBlockNumber;
80+
this.finalizedBlockHash = finalizedBlockHash;
81+
this.finalizedBlockNumber = finalizedBlockNumber;
3582
this.head = head;
36-
this.safeBlock = safeBlock;
37-
this.finalizedBlock = finalizedBlock;
3883
}
3984

4085
public BlockHeader getHead() {
4186
return head;
4287
}
4388

44-
public Hash getSafeBlock() {
45-
return safeBlock;
89+
public BlockBody getBlockBody() {
90+
return blockBody;
91+
}
92+
93+
public List<TransactionReceipt> getReceipts() {
94+
return receipts;
95+
}
96+
97+
public String getTrieLogRlp() {
98+
return trieLogRlp;
99+
}
100+
101+
public Hash getSafeBlockHash() {
102+
return safeBlockHash;
103+
}
104+
105+
public long getSafeBlockNumber() {
106+
return safeBlockNumber;
107+
}
108+
109+
public Hash getFinalizedBlockHash() {
110+
return finalizedBlockHash;
46111
}
47112

48-
public Hash getFinalizedBlock() {
49-
return finalizedBlock;
113+
public long getFinalizedBlockNumber() {
114+
return finalizedBlockNumber;
50115
}
51116
}

src/main/java/net/consensys/fleet/follower/rpc/server/FleetShipNewHeadServer.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,9 @@ public Object execute(PluginRpcRequest rpcRequest) {
6161
"receive new head {} ({}) , safe block {} and finalized block {}",
6262
newHeadParams.getHead().getNumber(),
6363
newHeadParams.getHead().getBlockHash(),
64-
newHeadParams.getSafeBlock(),
65-
newHeadParams.getFinalizedBlock());
66-
newHeadObserver.onNewHead(
67-
newHeadParams.getHead(),
68-
newHeadParams.getSafeBlock(),
69-
newHeadParams.getFinalizedBlock());
64+
newHeadParams.getSafeBlockHash(),
65+
newHeadParams.getFinalizedBlockHash());
66+
newHeadObserver.onNewHead(newHeadParams);
7067
}
7168
} catch (Exception e) {
7269
LOG.trace("Ignore invalid request for method {} with {}", getName(), rpcRequest.getParams());

0 commit comments

Comments
 (0)