Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ Or, run the besu with follower mode:
--Xchain-pruning-blocks-retained=512 \
--plugin-fleet-leader-http-host=127.0.0.1 \
--plugin-fleet-leader-http-port=8545
--plugin-fleet-follower-http-host=127.0.0.1 \
--plugin-fleet-follower-http-port=8888
```

# Download
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
releaseVersion=0.3.8-SNAPSHOT
besuVersion=25.4.1
besuVersion=25.5.0
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,13 @@ public class FleetOptions {
description = "HTTP host port of the leader peer")
Integer leaderPeerHttpPort = DEFAULT_LEADER_PEER_HTTP_PORT;

@Deprecated
@CommandLine.Option(
names = {OPTION_FOLLOWER_PEER_HTTP_HOST},
hidden = true,
paramLabel = "<STRING>",
description = "HTTP host of the follower peer")
String followerPeerHttpHost = DEFAULT_FOLLOWER_PEER_HTTP_HOST;

@Deprecated
@CommandLine.Option(
names = {OPTION_FOLLOWER_PEER_HTTP_PORT},
hidden = true,
Expand Down Expand Up @@ -123,12 +121,10 @@ public Integer getLeaderPeerHttpPort() {
return leaderPeerHttpPort;
}

@Deprecated
public String getFollowerPeerHttpHost() {
return followerPeerHttpHost;
}

@Deprecated
public Integer getFollowerPeerHttpPort() {
return followerPeerHttpPort;
}
Expand All @@ -151,6 +147,8 @@ public String toString() {
.add("nodeRole", nodeRole)
.add("leaderPeerHttpHost", leaderPeerHttpHost)
.add("leaderPeerHttpPort", leaderPeerHttpPort)
.add("followerPeerHttpHost", followerPeerHttpHost)
.add("followerPeerHttpPort", followerPeerHttpPort)
.add("followerHeartBeatDelay", followerHeartBeatDelay)
.add("persistRangeSize", maxBlocksPerPersist)
.add("headDistanceForReceiptFetch", headDistanceForReceiptFetch)
Expand Down
38 changes: 14 additions & 24 deletions src/main/java/net/consensys/fleet/common/plugin/FleetPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import com.google.auto.service.AutoService;
import org.hyperledger.besu.plugin.BesuPlugin;
import org.hyperledger.besu.plugin.ServiceManager;
import org.hyperledger.besu.plugin.services.BesuConfiguration;
import org.hyperledger.besu.plugin.services.BesuEvents;
import org.hyperledger.besu.plugin.services.BlockchainService;
import org.hyperledger.besu.plugin.services.PicoCLIOptions;
Expand Down Expand Up @@ -90,11 +89,11 @@ public void register(final ServiceManager serviceManager) {
}
cmdlineOptions.get().addPicoCLIOptions(NAME, CLI_OPTIONS);

LOG.debug("Creating peer manager");
LOG.info("Creating peer manager");
peerManagers = new PeerNodesManager();
this.webClient = new WebClientWrapper(convertMapperProvider, peerManagers);

LOG.debug("Setting up RPC endpoints");
LOG.info("Setting up RPC endpoints");
final List<PluginRpcMethod> pluginRpcMethods = createServerMethods();
serviceManager
.getService(RpcEndpointService.class)
Expand All @@ -110,7 +109,7 @@ public void register(final ServiceManager serviceManager) {
method.getNamespace(), method.getName(), method::execute);
}));

LOG.debug("Registering trieLog service");
LOG.info("Registering trieLog service");
final TrieLogService trieLogService = new FleetTrieLogService();
serviceManager.addService(TrieLogService.class, trieLogService);
pluginServiceProvider.provideService(TrieLogService.class, () -> trieLogService);
Expand All @@ -119,7 +118,7 @@ public void register(final ServiceManager serviceManager) {
@Override
public void start() {

LOG.debug("Loading RLP converter service");
LOG.info("Loading RLP converter service");
final RlpConverterService rlpConverterService =
serviceManager
.getService(RlpConverterService.class)
Expand All @@ -129,15 +128,15 @@ public void start() {
"Expecting a RLP converter service, but none found."));
pluginServiceProvider.provideService(RlpConverterService.class, () -> rlpConverterService);

LOG.debug("Loading blockchain service");
LOG.info("Loading blockchain service");
final BlockchainService blockchainService =
serviceManager
.getService(BlockchainService.class)
.orElseThrow(
() -> new IllegalStateException("Expecting a blockchain service, but none found."));
pluginServiceProvider.provideService(BlockchainService.class, () -> blockchainService);

LOG.debug("Loading synchronization service");
LOG.info("Loading synchronization service");
final SynchronizationService synchronizationService =
serviceManager
.getService(SynchronizationService.class)
Expand All @@ -148,7 +147,7 @@ public void start() {
pluginServiceProvider.provideService(
SynchronizationService.class, () -> synchronizationService);

LOG.debug("Loading P2P network service");
LOG.info("Loading P2P network service");
final P2PService p2PService =
serviceManager
.getService(P2PService.class)
Expand All @@ -157,6 +156,8 @@ public void start() {
new IllegalStateException("Expecting a P2P network service, but none found."));
pluginServiceProvider.provideService(P2PService.class, () -> p2PService);

loadingClientsMethods();

createPeerNetworkMaintainer();
}

Expand All @@ -175,18 +176,8 @@ public void stop() {
}

private void createPeerNetworkMaintainer() {
LOG.debug("Setting up connection parameters");
LOG.info("Setting up connection parameters");
final PeerNetworkMaintainer peerNetworkMaintainer;

LOG.debug("Loading BesuConfiguration service");
final BesuConfiguration besuConfigurationService =
serviceManager
.getService(BesuConfiguration.class)
.orElseThrow(
() ->
new IllegalStateException(
"Expecting a BesuConfiguration service, but none found."));

switch (CLI_OPTIONS.getNodeRole()) {
case LEADER -> {
/* ********** LEADER ************* */
Expand All @@ -198,8 +189,8 @@ private void createPeerNetworkMaintainer() {
new FollowerPeerNetworkMaintainer(
CLI_OPTIONS.getLeaderPeerHttpHost(),
CLI_OPTIONS.getLeaderPeerHttpPort(),
besuConfigurationService.getRpcHttpHost().orElse("default"),
besuConfigurationService.getRpcHttpPort().orElse(0),
CLI_OPTIONS.getFollowerPeerHttpHost(),
CLI_OPTIONS.getFollowerPeerHttpPort(),
CLI_OPTIONS.getFollowerHeartBeatDelay(),
peerManagers,
webClient);
Expand Down Expand Up @@ -255,8 +246,7 @@ private List<PluginRpcMethod> createServerMethods() {
CLI_OPTIONS.getHeadDistanceForReceiptFetch());
methods.add(
new FleetShipNewHeadServer(
(head, safeBlock, finalizedBlock) ->
fleetModeSynchronizer.syncNewHead(head, safeBlock, finalizedBlock),
(newHeadParams) -> fleetModeSynchronizer.syncNewHead(newHeadParams),
convertMapperProvider,
pluginServiceProvider));
return methods;
Expand Down Expand Up @@ -304,7 +294,7 @@ private boolean isFollower() {
}

private void disableTransactionPool() {
LOG.debug("Disable transaction pool");
LOG.info("Disable transaction pool");
serviceManager
.getService(TransactionPoolService.class)
.ifPresent(TransactionPoolService::disableTransactionPool);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public CompletableFuture<HttpResponse<Buffer>> sendToLeader(

webClient
.post(leader.port(), leader.host(), endpoint)
.timeout(100)
.putHeader("Content-Type", CONTENT_TYPE)
.sendJsonObject(
jsonObject,
Expand Down Expand Up @@ -98,6 +99,7 @@ public void sendToFollowers(final String endpoint, final String methodName, fina
peerNode -> {
webClient
.post(peerNode.port(), peerNode.host(), endpoint)
.timeout(100)
.putHeader("Content-Type", CONTENT_TYPE)
.sendJsonObject(
jsonObject,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import net.consensys.fleet.common.plugin.PluginServiceProvider;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.hyperledger.besu.plugin.services.rlp.RlpConverterService;

Expand All @@ -32,6 +33,7 @@ public ObjectMapper getJsonConverter() {
if (objectMapper == null) {
if (pluginServiceProvider.isServiceAvailable(RlpConverterService.class)) {
objectMapper = new ObjectMapper();
objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
objectMapper.registerModule(
new JsonModule(pluginServiceProvider.getService(RlpConverterService.class)));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,25 @@
package net.consensys.fleet.common.rpc.model;

import com.fasterxml.jackson.annotation.JsonProperty;
import org.hyperledger.besu.datatypes.Hash;

public class GetBlockRequest {

@JsonProperty("blockNumber")
private Long blockNumber;
@JsonProperty("blockHash")
private Hash blockHash;

@JsonProperty("fetchReceipts")
private boolean fetchReceipts;

public GetBlockRequest() {}

public GetBlockRequest(final Long blockNumber, final boolean fetchReceipts) {
this.blockNumber = blockNumber;
public GetBlockRequest(final Hash blockHash, final boolean fetchReceipts) {
this.blockHash = blockHash;
this.fetchReceipts = fetchReceipts;
}

public Long getBlockNumber() {
return blockNumber;
public Hash getBlockHash() {
return blockHash;
}

public boolean isFetchReceipts() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,38 +14,103 @@
*/
package net.consensys.fleet.common.rpc.model;

import java.util.List;

import com.fasterxml.jackson.annotation.JsonProperty;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.plugin.data.BlockBody;
import org.hyperledger.besu.plugin.data.BlockHeader;
import org.hyperledger.besu.plugin.data.TransactionReceipt;

public class NewHeadParams {

@JsonProperty("newHead")
private BlockHeader head;

@JsonProperty("safeBlock")
private Hash safeBlock;
@JsonProperty("headBody")
private BlockBody blockBody;

@JsonProperty("headReceipts")
private List<TransactionReceipt> receipts;

@JsonProperty("headTrieLogRlp")
private String trieLogRlp;

@JsonProperty("safeBlockHash")
private Hash safeBlockHash;

@JsonProperty("safeBlockNumber")
private long safeBlockNumber;

@JsonProperty("finalizedBlock")
private Hash finalizedBlock;
@JsonProperty("finalizedBlockHash")
private Hash finalizedBlockHash;

@JsonProperty("finalizedBlockNumber")
private long finalizedBlockNumber;

public NewHeadParams() {}

public NewHeadParams(final BlockHeader head, final Hash safeBlock, final Hash finalizedBlock) {
public NewHeadParams(
final BlockHeader head,
final BlockBody blockBody,
final List<TransactionReceipt> receipts,
final String trieLogRlp,
final Hash safeBlockHash,
final long safeBlockNumber,
final Hash finalizedBlockHash,
final long finalizedBlockNumber) {
this.head = head;
this.blockBody = blockBody;
this.receipts = receipts;
this.trieLogRlp = trieLogRlp;
this.safeBlockHash = safeBlockHash;
this.safeBlockNumber = safeBlockNumber;
this.finalizedBlockHash = finalizedBlockHash;
this.finalizedBlockNumber = finalizedBlockNumber;
}

public NewHeadParams(
final BlockHeader head,
final Hash safeBlockHash,
final long safeBlockNumber,
final Hash finalizedBlockHash,
final long finalizedBlockNumber) {
this.safeBlockHash = safeBlockHash;
this.safeBlockNumber = safeBlockNumber;
this.finalizedBlockHash = finalizedBlockHash;
this.finalizedBlockNumber = finalizedBlockNumber;
this.head = head;
this.safeBlock = safeBlock;
this.finalizedBlock = finalizedBlock;
}

public BlockHeader getHead() {
return head;
}

public Hash getSafeBlock() {
return safeBlock;
public BlockBody getBlockBody() {
return blockBody;
}

public List<TransactionReceipt> getReceipts() {
return receipts;
}

public String getTrieLogRlp() {
return trieLogRlp;
}

public Hash getSafeBlockHash() {
return safeBlockHash;
}

public long getSafeBlockNumber() {
return safeBlockNumber;
}

public Hash getFinalizedBlockHash() {
return finalizedBlockHash;
}

public Hash getFinalizedBlock() {
return finalizedBlock;
public long getFinalizedBlockNumber() {
return finalizedBlockNumber;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,9 @@ public Object execute(PluginRpcRequest rpcRequest) {
"receive new head {} ({}) , safe block {} and finalized block {}",
newHeadParams.getHead().getNumber(),
newHeadParams.getHead().getBlockHash(),
newHeadParams.getSafeBlock(),
newHeadParams.getFinalizedBlock());
newHeadObserver.onNewHead(
newHeadParams.getHead(),
newHeadParams.getSafeBlock(),
newHeadParams.getFinalizedBlock());
newHeadParams.getSafeBlockHash(),
newHeadParams.getFinalizedBlockHash());
newHeadObserver.onNewHead(newHeadParams);
}
} catch (Exception e) {
LOG.trace("Ignore invalid request for method {} with {}", getName(), rpcRequest.getParams());
Expand Down
Loading
Loading