From b76c4733f6abb21558e89193a43942b57e7231a6 Mon Sep 17 00:00:00 2001 From: Karim Taam Date: Thu, 3 Apr 2025 09:01:10 +0100 Subject: [PATCH 1/8] new head event can provide block information --- .../fleet/common/plugin/FleetPlugin.java | 3 +- .../fleet/common/rpc/model/NewHeadParams.java | 36 ++++++++++++++++++- .../rpc/server/FleetShipNewHeadServer.java | 5 +-- .../follower/sync/BlockContextProvider.java | 14 +++++++- .../follower/sync/FleetModeSynchronizer.java | 20 +++++++---- .../leader/event/BlockAddedObserver.java | 36 +++++++++++++++---- .../fleet/leader/event/NewHeadObserver.java | 5 ++- 7 files changed, 95 insertions(+), 24 deletions(-) diff --git a/src/main/java/net/consensys/fleet/common/plugin/FleetPlugin.java b/src/main/java/net/consensys/fleet/common/plugin/FleetPlugin.java index 8329164..b521b16 100644 --- a/src/main/java/net/consensys/fleet/common/plugin/FleetPlugin.java +++ b/src/main/java/net/consensys/fleet/common/plugin/FleetPlugin.java @@ -255,8 +255,7 @@ private List createServerMethods() { CLI_OPTIONS.getHeadDistanceForReceiptFetch()); methods.add( new FleetShipNewHeadServer( - (head, safeBlock, finalizedBlock) -> - fleetModeSynchronizer.syncNewHead(head, safeBlock, finalizedBlock), + (newHeadParams) -> fleetModeSynchronizer.syncNewHead(newHeadParams), convertMapperProvider, pluginServiceProvider)); return methods; diff --git a/src/main/java/net/consensys/fleet/common/rpc/model/NewHeadParams.java b/src/main/java/net/consensys/fleet/common/rpc/model/NewHeadParams.java index ef2ed1a..c2e1fa4 100644 --- a/src/main/java/net/consensys/fleet/common/rpc/model/NewHeadParams.java +++ b/src/main/java/net/consensys/fleet/common/rpc/model/NewHeadParams.java @@ -14,15 +14,28 @@ */ package net.consensys.fleet.common.rpc.model; +import java.util.List; + import com.fasterxml.jackson.annotation.JsonProperty; import org.hyperledger.besu.datatypes.Hash; +import org.hyperledger.besu.plugin.data.BlockBody; import org.hyperledger.besu.plugin.data.BlockHeader; +import org.hyperledger.besu.plugin.data.TransactionReceipt; public class NewHeadParams { @JsonProperty("newHead") private BlockHeader head; + @JsonProperty("headBody") + private BlockBody blockBody; + + @JsonProperty("headReceipts") + private List receipts; + + @JsonProperty("headTrieLogRlp") + private String trieLogRlp; + @JsonProperty("safeBlock") private Hash safeBlock; @@ -31,8 +44,17 @@ public class NewHeadParams { public NewHeadParams() {} - public NewHeadParams(final BlockHeader head, final Hash safeBlock, final Hash finalizedBlock) { + public NewHeadParams( + final BlockHeader head, + final BlockBody blockBody, + final List receipts, + final String trieLogRlp, + final Hash safeBlock, + final Hash finalizedBlock) { this.head = head; + this.blockBody = blockBody; + this.receipts = receipts; + this.trieLogRlp = trieLogRlp; this.safeBlock = safeBlock; this.finalizedBlock = finalizedBlock; } @@ -41,6 +63,18 @@ public BlockHeader getHead() { return head; } + public BlockBody getBlockBody() { + return blockBody; + } + + public List getReceipts() { + return receipts; + } + + public String getTrieLogRlp() { + return trieLogRlp; + } + public Hash getSafeBlock() { return safeBlock; } diff --git a/src/main/java/net/consensys/fleet/follower/rpc/server/FleetShipNewHeadServer.java b/src/main/java/net/consensys/fleet/follower/rpc/server/FleetShipNewHeadServer.java index 48dc404..ab8d588 100644 --- a/src/main/java/net/consensys/fleet/follower/rpc/server/FleetShipNewHeadServer.java +++ b/src/main/java/net/consensys/fleet/follower/rpc/server/FleetShipNewHeadServer.java @@ -63,10 +63,7 @@ public Object execute(PluginRpcRequest rpcRequest) { newHeadParams.getHead().getBlockHash(), newHeadParams.getSafeBlock(), newHeadParams.getFinalizedBlock()); - newHeadObserver.onNewHead( - newHeadParams.getHead(), - newHeadParams.getSafeBlock(), - newHeadParams.getFinalizedBlock()); + newHeadObserver.onNewHead(newHeadParams); } } catch (Exception e) { LOG.trace("Ignore invalid request for method {} with {}", getName(), rpcRequest.getParams()); diff --git a/src/main/java/net/consensys/fleet/follower/sync/BlockContextProvider.java b/src/main/java/net/consensys/fleet/follower/sync/BlockContextProvider.java index b508618..c3263cc 100644 --- a/src/main/java/net/consensys/fleet/follower/sync/BlockContextProvider.java +++ b/src/main/java/net/consensys/fleet/follower/sync/BlockContextProvider.java @@ -61,7 +61,7 @@ public Optional getLeaderBlockContextByNumber( GetBlockResponse getBlockParams = getBlockClient .sendData(new GetBlockRequest(blockNumber, fetchReceipts)) - .get(1, TimeUnit.SECONDS); + .get(10, TimeUnit.MILLISECONDS); return Optional.of( new FleetBlockContext( getBlockParams.getBlockHeader(), @@ -74,6 +74,18 @@ public Optional getLeaderBlockContextByNumber( } } + public void provideLocalBlockContext( + final BlockHeader blockHeader, + final BlockBody blockBody, + final List receipts, + final String trielogRlp) { + localBlock.put( + new CompositeBlockKey(blockHeader.getNumber(), blockHeader.getBlockHash()), + Optional.of( + new FleetBlockContext( + blockHeader, blockBody, receipts, Optional.of(Bytes.fromHexString(trielogRlp))))); + } + public Optional getLocalBlockContextByNumber( final long number, final boolean fetchReceipts) { try { diff --git a/src/main/java/net/consensys/fleet/follower/sync/FleetModeSynchronizer.java b/src/main/java/net/consensys/fleet/follower/sync/FleetModeSynchronizer.java index f0e23ff..43bcdb0 100644 --- a/src/main/java/net/consensys/fleet/follower/sync/FleetModeSynchronizer.java +++ b/src/main/java/net/consensys/fleet/follower/sync/FleetModeSynchronizer.java @@ -15,6 +15,7 @@ package net.consensys.fleet.follower.sync; import net.consensys.fleet.common.plugin.PluginServiceProvider; +import net.consensys.fleet.common.rpc.model.NewHeadParams; import java.util.ArrayList; import java.util.Comparator; @@ -76,17 +77,24 @@ public FleetModeSynchronizer( this.headDistanceForReceiptFetch = headDistanceForReceiptFetch; } - public synchronized void syncNewHead( - final BlockHeader head, final Hash safeBlock, final Hash finalizedBlock) { - this.leaderHeader = head; + public synchronized void syncNewHead(final NewHeadParams newHeadParams) { + this.leaderHeader = newHeadParams.getHead(); if (isBlockchainServiceReady()) { final SynchronizationService synchronizationService = pluginServiceProvider.getService(SynchronizationService.class); + blockContextProvider.provideLocalBlockContext( + newHeadParams.getHead(), + newHeadParams.getBlockBody(), + newHeadParams.getReceipts(), + newHeadParams.getTrieLogRlp()); synchronizationService.fireNewUnverifiedForkchoiceEvent( - head.getBlockHash(), safeBlock, finalizedBlock); + newHeadParams.getHead().getBlockHash(), + newHeadParams.getSafeBlock(), + newHeadParams.getFinalizedBlock()); LOG.debug( - "Fire fork choice for safe block {} and finalized block {} ", safeBlock, finalizedBlock); - + "Fire fork choice for safe block {} and finalized block {} ", + newHeadParams.getSafeBlock(), + newHeadParams.getFinalizedBlock()); if (isWaitingForSync.get()) { LOG.debug("Waiting for the end of the initial synchronization phase"); } else { diff --git a/src/main/java/net/consensys/fleet/leader/event/BlockAddedObserver.java b/src/main/java/net/consensys/fleet/leader/event/BlockAddedObserver.java index 51895e8..e93d921 100644 --- a/src/main/java/net/consensys/fleet/leader/event/BlockAddedObserver.java +++ b/src/main/java/net/consensys/fleet/leader/event/BlockAddedObserver.java @@ -18,10 +18,14 @@ import net.consensys.fleet.common.rpc.model.NewHeadParams; import net.consensys.fleet.leader.rpc.client.FleetShipNewHeadClient; +import org.apache.tuweni.bytes.Bytes; import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.plugin.data.AddedBlockContext; +import org.hyperledger.besu.plugin.data.TransactionReceipt; import org.hyperledger.besu.plugin.services.BesuEvents; import org.hyperledger.besu.plugin.services.BlockchainService; +import org.hyperledger.besu.plugin.services.TrieLogService; +import org.hyperledger.besu.plugin.services.trielogs.TrieLogProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,15 +49,33 @@ public void onBlockAdded(final AddedBlockContext addedBlockContext) { .addArgument(() -> addedBlockContext.getBlockHeader().getBlockHash()) .log(); if (pluginServiceProvider.isServiceAvailable(BlockchainService.class)) { - final BlockchainService service = pluginServiceProvider.getService(BlockchainService.class); - final Hash safeBlock = - service.getSafeBlock().orElse(addedBlockContext.getBlockHeader().getBlockHash()); - final Hash finalizedBlock = - service.getFinalizedBlock().orElse(addedBlockContext.getBlockHeader().getBlockHash()); - stateShipNewHeadSender.sendData( - new NewHeadParams(addedBlockContext.getBlockHeader(), safeBlock, finalizedBlock)); + stateShipNewHeadSender.sendData(buildNewHeadEvent(addedBlockContext)); } else { LOG.error("BlockchainService is not available"); } } + + private NewHeadParams buildNewHeadEvent(final AddedBlockContext headBlockContext) { + final BlockchainService service = pluginServiceProvider.getService(BlockchainService.class); + final Hash safeBlock = + service.getSafeBlock().orElse(headBlockContext.getBlockHeader().getBlockHash()); + final Hash finalizedBlock = + service.getFinalizedBlock().orElse(headBlockContext.getBlockHeader().getBlockHash()); + final TrieLogProvider trieLogProvider = + pluginServiceProvider.getService(TrieLogService.class).getTrieLogProvider(); + final String trieLogRlp = + trieLogProvider + .getRawTrieLogLayer(headBlockContext.getBlockHeader().getBlockHash()) + .map(Bytes::toHexString) + .orElseThrow(); + return new NewHeadParams( + headBlockContext.getBlockHeader(), + headBlockContext.getBlockBody(), + headBlockContext.getTransactionReceipts().stream() + .map(TransactionReceipt.class::cast) + .toList(), + trieLogRlp, + safeBlock, + finalizedBlock); + } } diff --git a/src/main/java/net/consensys/fleet/leader/event/NewHeadObserver.java b/src/main/java/net/consensys/fleet/leader/event/NewHeadObserver.java index 19556a1..332617f 100644 --- a/src/main/java/net/consensys/fleet/leader/event/NewHeadObserver.java +++ b/src/main/java/net/consensys/fleet/leader/event/NewHeadObserver.java @@ -14,10 +14,9 @@ */ package net.consensys.fleet.leader.event; -import org.hyperledger.besu.datatypes.Hash; -import org.hyperledger.besu.plugin.data.BlockHeader; +import net.consensys.fleet.common.rpc.model.NewHeadParams; public interface NewHeadObserver { - void onNewHead(final BlockHeader head, final Hash safeBlock, final Hash finalizedBlock); + void onNewHead(final NewHeadParams newHeadParams); } From a2a3ff5936e8c3567876ee7e11ed216cdb33e1d6 Mon Sep 17 00:00:00 2001 From: Sally MacFarlane Date: Fri, 4 Apr 2025 15:11:27 +1000 Subject: [PATCH 2/8] add logging Signed-off-by: Sally MacFarlane --- .../fleet/follower/rpc/server/FleetShipNewHeadServer.java | 2 +- .../fleet/follower/sync/FleetModeSynchronizer.java | 6 ++++-- .../consensys/fleet/leader/event/BlockAddedObserver.java | 1 + 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/main/java/net/consensys/fleet/follower/rpc/server/FleetShipNewHeadServer.java b/src/main/java/net/consensys/fleet/follower/rpc/server/FleetShipNewHeadServer.java index ab8d588..9b90f3a 100644 --- a/src/main/java/net/consensys/fleet/follower/rpc/server/FleetShipNewHeadServer.java +++ b/src/main/java/net/consensys/fleet/follower/rpc/server/FleetShipNewHeadServer.java @@ -57,7 +57,7 @@ public Object execute(PluginRpcRequest rpcRequest) { convertMapperProvider .getJsonConverter() .readValue(rpcRequest.getParams()[0].toString(), NewHeadParams.class); - LOG.debug( + LOG.info( "receive new head {} ({}) , safe block {} and finalized block {}", newHeadParams.getHead().getNumber(), newHeadParams.getHead().getBlockHash(), diff --git a/src/main/java/net/consensys/fleet/follower/sync/FleetModeSynchronizer.java b/src/main/java/net/consensys/fleet/follower/sync/FleetModeSynchronizer.java index 43bcdb0..36c1ec6 100644 --- a/src/main/java/net/consensys/fleet/follower/sync/FleetModeSynchronizer.java +++ b/src/main/java/net/consensys/fleet/follower/sync/FleetModeSynchronizer.java @@ -91,16 +91,18 @@ public synchronized void syncNewHead(final NewHeadParams newHeadParams) { newHeadParams.getHead().getBlockHash(), newHeadParams.getSafeBlock(), newHeadParams.getFinalizedBlock()); - LOG.debug( + LOG.info( "Fire fork choice for safe block {} and finalized block {} ", newHeadParams.getSafeBlock(), newHeadParams.getFinalizedBlock()); if (isWaitingForSync.get()) { - LOG.debug("Waiting for the end of the initial synchronization phase"); + LOG.info("Waiting for the end of the initial synchronization phase"); } else { disableWaitingSync(); startSync(); } + } else { + LOG.debug("Blockchain service is not ready"); } } diff --git a/src/main/java/net/consensys/fleet/leader/event/BlockAddedObserver.java b/src/main/java/net/consensys/fleet/leader/event/BlockAddedObserver.java index e93d921..d4cd285 100644 --- a/src/main/java/net/consensys/fleet/leader/event/BlockAddedObserver.java +++ b/src/main/java/net/consensys/fleet/leader/event/BlockAddedObserver.java @@ -49,6 +49,7 @@ public void onBlockAdded(final AddedBlockContext addedBlockContext) { .addArgument(() -> addedBlockContext.getBlockHeader().getBlockHash()) .log(); if (pluginServiceProvider.isServiceAvailable(BlockchainService.class)) { + LOG.debug("stateShipNewHeadSender new block"); stateShipNewHeadSender.sendData(buildNewHeadEvent(addedBlockContext)); } else { LOG.error("BlockchainService is not available"); From bda09234fa9f6760776ccbc097983b28d94e430d Mon Sep 17 00:00:00 2001 From: Karim Taam Date: Fri, 4 Apr 2025 09:44:05 +0100 Subject: [PATCH 3/8] fix add block listener issue --- .../java/net/consensys/fleet/common/plugin/FleetPlugin.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main/java/net/consensys/fleet/common/plugin/FleetPlugin.java b/src/main/java/net/consensys/fleet/common/plugin/FleetPlugin.java index b521b16..7e828a0 100644 --- a/src/main/java/net/consensys/fleet/common/plugin/FleetPlugin.java +++ b/src/main/java/net/consensys/fleet/common/plugin/FleetPlugin.java @@ -157,6 +157,8 @@ public void start() { new IllegalStateException("Expecting a P2P network service, but none found.")); pluginServiceProvider.provideService(P2PService.class, () -> p2PService); + loadingClientsMethods(); + createPeerNetworkMaintainer(); } From 7244c9b694902bb487b432591d5ce052d899b00e Mon Sep 17 00:00:00 2001 From: Karim Taam Date: Mon, 7 Apr 2025 08:45:55 +0100 Subject: [PATCH 4/8] add follower rpc host and port flags again --- README.md | 2 ++ .../fleet/common/config/FleetOptions.java | 6 ++---- .../fleet/common/plugin/FleetPlugin.java | 15 ++------------- 3 files changed, 6 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index 364af93..68d73fb 100644 --- a/README.md +++ b/README.md @@ -35,6 +35,8 @@ Or, run the besu with follower mode: --Xchain-pruning-blocks-retained=512 \ --plugin-fleet-leader-http-host=127.0.0.1 \ --plugin-fleet-leader-http-port=8545 +--plugin-fleet-follower-http-host=127.0.0.1 \ +--plugin-fleet-follower-http-port=8888 ``` # Download diff --git a/src/main/java/net/consensys/fleet/common/config/FleetOptions.java b/src/main/java/net/consensys/fleet/common/config/FleetOptions.java index cfd8a7c..c970d92 100644 --- a/src/main/java/net/consensys/fleet/common/config/FleetOptions.java +++ b/src/main/java/net/consensys/fleet/common/config/FleetOptions.java @@ -67,7 +67,6 @@ public class FleetOptions { description = "HTTP host port of the leader peer") Integer leaderPeerHttpPort = DEFAULT_LEADER_PEER_HTTP_PORT; - @Deprecated @CommandLine.Option( names = {OPTION_FOLLOWER_PEER_HTTP_HOST}, hidden = true, @@ -75,7 +74,6 @@ public class FleetOptions { description = "HTTP host of the follower peer") String followerPeerHttpHost = DEFAULT_FOLLOWER_PEER_HTTP_HOST; - @Deprecated @CommandLine.Option( names = {OPTION_FOLLOWER_PEER_HTTP_PORT}, hidden = true, @@ -123,12 +121,10 @@ public Integer getLeaderPeerHttpPort() { return leaderPeerHttpPort; } - @Deprecated public String getFollowerPeerHttpHost() { return followerPeerHttpHost; } - @Deprecated public Integer getFollowerPeerHttpPort() { return followerPeerHttpPort; } @@ -151,6 +147,8 @@ public String toString() { .add("nodeRole", nodeRole) .add("leaderPeerHttpHost", leaderPeerHttpHost) .add("leaderPeerHttpPort", leaderPeerHttpPort) + .add("followerPeerHttpHost", followerPeerHttpHost) + .add("followerPeerHttpPort", followerPeerHttpPort) .add("followerHeartBeatDelay", followerHeartBeatDelay) .add("persistRangeSize", maxBlocksPerPersist) .add("headDistanceForReceiptFetch", headDistanceForReceiptFetch) diff --git a/src/main/java/net/consensys/fleet/common/plugin/FleetPlugin.java b/src/main/java/net/consensys/fleet/common/plugin/FleetPlugin.java index 7e828a0..a5a51da 100644 --- a/src/main/java/net/consensys/fleet/common/plugin/FleetPlugin.java +++ b/src/main/java/net/consensys/fleet/common/plugin/FleetPlugin.java @@ -44,7 +44,6 @@ import com.google.auto.service.AutoService; import org.hyperledger.besu.plugin.BesuPlugin; import org.hyperledger.besu.plugin.ServiceManager; -import org.hyperledger.besu.plugin.services.BesuConfiguration; import org.hyperledger.besu.plugin.services.BesuEvents; import org.hyperledger.besu.plugin.services.BlockchainService; import org.hyperledger.besu.plugin.services.PicoCLIOptions; @@ -179,16 +178,6 @@ public void stop() { private void createPeerNetworkMaintainer() { LOG.debug("Setting up connection parameters"); final PeerNetworkMaintainer peerNetworkMaintainer; - - LOG.debug("Loading BesuConfiguration service"); - final BesuConfiguration besuConfigurationService = - serviceManager - .getService(BesuConfiguration.class) - .orElseThrow( - () -> - new IllegalStateException( - "Expecting a BesuConfiguration service, but none found.")); - switch (CLI_OPTIONS.getNodeRole()) { case LEADER -> { /* ********** LEADER ************* */ @@ -200,8 +189,8 @@ private void createPeerNetworkMaintainer() { new FollowerPeerNetworkMaintainer( CLI_OPTIONS.getLeaderPeerHttpHost(), CLI_OPTIONS.getLeaderPeerHttpPort(), - besuConfigurationService.getRpcHttpHost().orElse("default"), - besuConfigurationService.getRpcHttpPort().orElse(0), + CLI_OPTIONS.getFollowerPeerHttpHost(), + CLI_OPTIONS.getFollowerPeerHttpPort(), CLI_OPTIONS.getFollowerHeartBeatDelay(), peerManagers, webClient); From 5afe9cdc476f3939406899fff1e66902fbd842f2 Mon Sep 17 00:00:00 2001 From: Karim Taam Date: Tue, 8 Apr 2025 10:23:58 +0100 Subject: [PATCH 5/8] fix and clean --- .../rpc/json/ConvertMapperProvider.java | 2 + .../fleet/common/rpc/model/NewHeadParams.java | 6 + .../follower/sync/BlockContextProvider.java | 114 ++++++++++-------- .../follower/sync/FleetModeSynchronizer.java | 31 +++-- .../leader/event/BlockAddedObserver.java | 30 +++-- 5 files changed, 111 insertions(+), 72 deletions(-) diff --git a/src/main/java/net/consensys/fleet/common/rpc/json/ConvertMapperProvider.java b/src/main/java/net/consensys/fleet/common/rpc/json/ConvertMapperProvider.java index 9719a9d..9c469ec 100644 --- a/src/main/java/net/consensys/fleet/common/rpc/json/ConvertMapperProvider.java +++ b/src/main/java/net/consensys/fleet/common/rpc/json/ConvertMapperProvider.java @@ -16,6 +16,7 @@ import net.consensys.fleet.common.plugin.PluginServiceProvider; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.databind.ObjectMapper; import org.hyperledger.besu.plugin.services.rlp.RlpConverterService; @@ -32,6 +33,7 @@ public ObjectMapper getJsonConverter() { if (objectMapper == null) { if (pluginServiceProvider.isServiceAvailable(RlpConverterService.class)) { objectMapper = new ObjectMapper(); + objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); objectMapper.registerModule( new JsonModule(pluginServiceProvider.getService(RlpConverterService.class))); } else { diff --git a/src/main/java/net/consensys/fleet/common/rpc/model/NewHeadParams.java b/src/main/java/net/consensys/fleet/common/rpc/model/NewHeadParams.java index c2e1fa4..9971910 100644 --- a/src/main/java/net/consensys/fleet/common/rpc/model/NewHeadParams.java +++ b/src/main/java/net/consensys/fleet/common/rpc/model/NewHeadParams.java @@ -59,6 +59,12 @@ public NewHeadParams( this.finalizedBlock = finalizedBlock; } + public NewHeadParams(final BlockHeader head, final Hash safeBlock, final Hash finalizedBlock) { + this.safeBlock = safeBlock; + this.finalizedBlock = finalizedBlock; + this.head = head; + } + public BlockHeader getHead() { return head; } diff --git a/src/main/java/net/consensys/fleet/follower/sync/BlockContextProvider.java b/src/main/java/net/consensys/fleet/follower/sync/BlockContextProvider.java index c3263cc..b9ea958 100644 --- a/src/main/java/net/consensys/fleet/follower/sync/BlockContextProvider.java +++ b/src/main/java/net/consensys/fleet/follower/sync/BlockContextProvider.java @@ -37,10 +37,10 @@ public class BlockContextProvider { - private final Cache> leaderBlock = + private final Cache leaderBlock = CacheBuilder.newBuilder().maximumSize(20).expireAfterAccess(1, TimeUnit.MINUTES).build(); - private final Cache> localBlock = + private final Cache localBlock = CacheBuilder.newBuilder().maximumSize(20).expireAfterAccess(1, TimeUnit.MINUTES).build(); private final PluginServiceProvider pluginServiceProvider; @@ -55,66 +55,80 @@ public BlockContextProvider( public Optional getLeaderBlockContextByNumber( final long blockNumber, final boolean fetchReceipts) { try { - return leaderBlock.get( - new CompositeBlockKey(blockNumber), - () -> { - GetBlockResponse getBlockParams = - getBlockClient - .sendData(new GetBlockRequest(blockNumber, fetchReceipts)) - .get(10, TimeUnit.MILLISECONDS); - return Optional.of( - new FleetBlockContext( - getBlockParams.getBlockHeader(), - getBlockParams.getBlockBody(), - getBlockParams.getReceipts(), - Optional.of(Bytes.fromHexString(getBlockParams.getTrieLogRlp())))); - }); + CompositeBlockKey key = new CompositeBlockKey(blockNumber); + Optional cachedContext = + Optional.ofNullable(leaderBlock.getIfPresent(key)); + + if (cachedContext.isPresent()) { + return cachedContext; + } + + GetBlockResponse response = + getBlockClient + .sendData(new GetBlockRequest(blockNumber, fetchReceipts)) + .get(50, TimeUnit.MILLISECONDS); + + FleetBlockContext context = + new FleetBlockContext( + response.getBlockHeader(), + response.getBlockBody(), + response.getReceipts(), + Optional.of(Bytes.fromHexString(response.getTrieLogRlp()))); + + leaderBlock.put(key, context); + return Optional.of(context); } catch (Exception e) { return Optional.empty(); } } - public void provideLocalBlockContext( + public void provideLeaderBlockContext( final BlockHeader blockHeader, final BlockBody blockBody, final List receipts, - final String trielogRlp) { - localBlock.put( - new CompositeBlockKey(blockHeader.getNumber(), blockHeader.getBlockHash()), - Optional.of( - new FleetBlockContext( - blockHeader, blockBody, receipts, Optional.of(Bytes.fromHexString(trielogRlp))))); + final String trieLogRlp) { + + CompositeBlockKey key = + new CompositeBlockKey(blockHeader.getNumber(), blockHeader.getBlockHash()); + FleetBlockContext context = + new FleetBlockContext( + blockHeader, blockBody, receipts, Optional.of(Bytes.fromHexString(trieLogRlp))); + + leaderBlock.put(key, context); } public Optional getLocalBlockContextByNumber( final long number, final boolean fetchReceipts) { try { - return localBlock.get( - new CompositeBlockKey(number), - () -> { - final BlockchainService blockchainService = - pluginServiceProvider.getService(BlockchainService.class); - return blockchainService - .getBlockByNumber(number) - .map( - blockContext -> { - final List receiptsByBlockHash; - if (fetchReceipts) { - receiptsByBlockHash = - blockchainService - .getReceiptsByBlockHash( - blockContext.getBlockHeader().getBlockHash()) - .orElse(Collections.emptyList()); - } else { - receiptsByBlockHash = Collections.emptyList(); - } - return new FleetBlockContext( - blockContext.getBlockHeader(), - blockContext.getBlockBody(), - receiptsByBlockHash, - Optional.empty()); - }); - }); + CompositeBlockKey key = new CompositeBlockKey(number); + Optional cachedContext = Optional.ofNullable(localBlock.getIfPresent(key)); + + if (cachedContext.isPresent()) { + return cachedContext; + } + + BlockchainService blockchainService = + pluginServiceProvider.getService(BlockchainService.class); + + return blockchainService + .getBlockByNumber(number) + .map( + block -> { + List receipts = + fetchReceipts + ? blockchainService + .getReceiptsByBlockHash(block.getBlockHeader().getBlockHash()) + .orElse(Collections.emptyList()) + : Collections.emptyList(); + + FleetBlockContext context = + new FleetBlockContext( + block.getBlockHeader(), block.getBlockBody(), receipts, Optional.empty()); + + localBlock.put(key, context); + return context; + }); + } catch (Exception e) { return Optional.empty(); } @@ -161,7 +175,7 @@ public boolean equals(final Object o) { @Override public int hashCode() { - return Objects.hash(blockNumber, blockHash); + return Objects.hash(blockNumber); } } diff --git a/src/main/java/net/consensys/fleet/follower/sync/FleetModeSynchronizer.java b/src/main/java/net/consensys/fleet/follower/sync/FleetModeSynchronizer.java index 36c1ec6..921d0fb 100644 --- a/src/main/java/net/consensys/fleet/follower/sync/FleetModeSynchronizer.java +++ b/src/main/java/net/consensys/fleet/follower/sync/FleetModeSynchronizer.java @@ -82,11 +82,14 @@ public synchronized void syncNewHead(final NewHeadParams newHeadParams) { if (isBlockchainServiceReady()) { final SynchronizationService synchronizationService = pluginServiceProvider.getService(SynchronizationService.class); - blockContextProvider.provideLocalBlockContext( - newHeadParams.getHead(), - newHeadParams.getBlockBody(), - newHeadParams.getReceipts(), - newHeadParams.getTrieLogRlp()); + if (newHeadParams.getTrieLogRlp() != null) { + LOG.debug("add block to cache from leader {}", newHeadParams.getHead()); + blockContextProvider.provideLeaderBlockContext( + newHeadParams.getHead(), + newHeadParams.getBlockBody(), + newHeadParams.getReceipts(), + newHeadParams.getTrieLogRlp()); + } synchronizationService.fireNewUnverifiedForkchoiceEvent( newHeadParams.getHead().getBlockHash(), newHeadParams.getSafeBlock(), @@ -172,12 +175,22 @@ private void startSync() { while (persistedBlock.getBlockHeader().getNumber() < targetBlock.getBlockHeader().getNumber()) { LOG.debug("Rollforward {}", targetBlockHash); - rollForward.add( + final BlockContextProvider.FleetBlockContext toRollForwardBlock = getLeaderBlockContext(targetBlock.getBlockHeader().getNumber()) - .orElseThrow(MissingBlockException::new)); - targetBlock = - getLeaderBlockContext(targetBlock.getBlockHeader().getNumber() - 1) .orElseThrow(MissingBlockException::new); + rollForward.add(toRollForwardBlock); + if (persistedBlock.getBlockHeader().getNumber() + == (toRollForwardBlock.getBlockHeader().getNumber() - 1) + && toRollForwardBlock + .getBlockHeader() + .getParentHash() + .equals(persistedBlockHash)) { + targetBlock = persistedBlock; + } else { + targetBlock = + getLeaderBlockContext(targetBlock.getBlockHeader().getNumber() - 1) + .orElseThrow(MissingBlockException::new); + } targetBlockHash = targetBlock.getBlockHeader().getBlockHash(); } diff --git a/src/main/java/net/consensys/fleet/leader/event/BlockAddedObserver.java b/src/main/java/net/consensys/fleet/leader/event/BlockAddedObserver.java index d4cd285..dc503ab 100644 --- a/src/main/java/net/consensys/fleet/leader/event/BlockAddedObserver.java +++ b/src/main/java/net/consensys/fleet/leader/event/BlockAddedObserver.java @@ -18,6 +18,8 @@ import net.consensys.fleet.common.rpc.model.NewHeadParams; import net.consensys.fleet.leader.rpc.client.FleetShipNewHeadClient; +import java.util.Optional; + import org.apache.tuweni.bytes.Bytes; import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.plugin.data.AddedBlockContext; @@ -49,7 +51,6 @@ public void onBlockAdded(final AddedBlockContext addedBlockContext) { .addArgument(() -> addedBlockContext.getBlockHeader().getBlockHash()) .log(); if (pluginServiceProvider.isServiceAvailable(BlockchainService.class)) { - LOG.debug("stateShipNewHeadSender new block"); stateShipNewHeadSender.sendData(buildNewHeadEvent(addedBlockContext)); } else { LOG.error("BlockchainService is not available"); @@ -64,19 +65,22 @@ private NewHeadParams buildNewHeadEvent(final AddedBlockContext headBlockContext service.getFinalizedBlock().orElse(headBlockContext.getBlockHeader().getBlockHash()); final TrieLogProvider trieLogProvider = pluginServiceProvider.getService(TrieLogService.class).getTrieLogProvider(); - final String trieLogRlp = + final Optional maybeTrielog = trieLogProvider .getRawTrieLogLayer(headBlockContext.getBlockHeader().getBlockHash()) - .map(Bytes::toHexString) - .orElseThrow(); - return new NewHeadParams( - headBlockContext.getBlockHeader(), - headBlockContext.getBlockBody(), - headBlockContext.getTransactionReceipts().stream() - .map(TransactionReceipt.class::cast) - .toList(), - trieLogRlp, - safeBlock, - finalizedBlock); + .map(Bytes::toHexString); + if (maybeTrielog.isEmpty()) { + return new NewHeadParams(headBlockContext.getBlockHeader(), safeBlock, finalizedBlock); + } else { + return new NewHeadParams( + headBlockContext.getBlockHeader(), + headBlockContext.getBlockBody(), + headBlockContext.getTransactionReceipts().stream() + .map(TransactionReceipt.class::cast) + .toList(), + maybeTrielog.get(), + safeBlock, + finalizedBlock); + } } } From 8fbf9cf0829ef03227d551169c68d8b9ce29dcf9 Mon Sep 17 00:00:00 2001 From: Karim Taam Date: Wed, 9 Apr 2025 11:02:48 +0100 Subject: [PATCH 6/8] add logs --- .../fleet/common/plugin/FleetPlugin.java | 18 +++++++------- .../common/rpc/client/WebClientWrapper.java | 7 ++++-- .../event/InitialSyncCompletionObserver.java | 2 +- .../rpc/client/FleetAddFollowerClient.java | 12 ++++++++++ .../rpc/client/FleetGetBlockClient.java | 14 +++++++++++ .../rpc/server/FleetShipNewHeadServer.java | 4 ++-- .../follower/sync/BlockContextProvider.java | 13 +++++++--- .../follower/sync/FleetModeSynchronizer.java | 24 +++++++++---------- .../leader/event/BlockAddedObserver.java | 3 ++- .../rpc/client/FleetShipNewHeadClient.java | 2 +- .../rpc/server/FleetAddFollowerServer.java | 4 ++-- .../rpc/server/FleetGetBlockServer.java | 5 ++-- 12 files changed, 72 insertions(+), 36 deletions(-) diff --git a/src/main/java/net/consensys/fleet/common/plugin/FleetPlugin.java b/src/main/java/net/consensys/fleet/common/plugin/FleetPlugin.java index a5a51da..3ff7fe2 100644 --- a/src/main/java/net/consensys/fleet/common/plugin/FleetPlugin.java +++ b/src/main/java/net/consensys/fleet/common/plugin/FleetPlugin.java @@ -89,11 +89,11 @@ public void register(final ServiceManager serviceManager) { } cmdlineOptions.get().addPicoCLIOptions(NAME, CLI_OPTIONS); - LOG.debug("Creating peer manager"); + LOG.info("Creating peer manager"); peerManagers = new PeerNodesManager(); this.webClient = new WebClientWrapper(convertMapperProvider, peerManagers); - LOG.debug("Setting up RPC endpoints"); + LOG.info("Setting up RPC endpoints"); final List pluginRpcMethods = createServerMethods(); serviceManager .getService(RpcEndpointService.class) @@ -109,7 +109,7 @@ public void register(final ServiceManager serviceManager) { method.getNamespace(), method.getName(), method::execute); })); - LOG.debug("Registering trieLog service"); + LOG.info("Registering trieLog service"); final TrieLogService trieLogService = new FleetTrieLogService(); serviceManager.addService(TrieLogService.class, trieLogService); pluginServiceProvider.provideService(TrieLogService.class, () -> trieLogService); @@ -118,7 +118,7 @@ public void register(final ServiceManager serviceManager) { @Override public void start() { - LOG.debug("Loading RLP converter service"); + LOG.info("Loading RLP converter service"); final RlpConverterService rlpConverterService = serviceManager .getService(RlpConverterService.class) @@ -128,7 +128,7 @@ public void start() { "Expecting a RLP converter service, but none found.")); pluginServiceProvider.provideService(RlpConverterService.class, () -> rlpConverterService); - LOG.debug("Loading blockchain service"); + LOG.info("Loading blockchain service"); final BlockchainService blockchainService = serviceManager .getService(BlockchainService.class) @@ -136,7 +136,7 @@ public void start() { () -> new IllegalStateException("Expecting a blockchain service, but none found.")); pluginServiceProvider.provideService(BlockchainService.class, () -> blockchainService); - LOG.debug("Loading synchronization service"); + LOG.info("Loading synchronization service"); final SynchronizationService synchronizationService = serviceManager .getService(SynchronizationService.class) @@ -147,7 +147,7 @@ public void start() { pluginServiceProvider.provideService( SynchronizationService.class, () -> synchronizationService); - LOG.debug("Loading P2P network service"); + LOG.info("Loading P2P network service"); final P2PService p2PService = serviceManager .getService(P2PService.class) @@ -176,7 +176,7 @@ public void stop() { } private void createPeerNetworkMaintainer() { - LOG.debug("Setting up connection parameters"); + LOG.info("Setting up connection parameters"); final PeerNetworkMaintainer peerNetworkMaintainer; switch (CLI_OPTIONS.getNodeRole()) { case LEADER -> { @@ -294,7 +294,7 @@ private boolean isFollower() { } private void disableTransactionPool() { - LOG.debug("Disable transaction pool"); + LOG.info("Disable transaction pool"); serviceManager .getService(TransactionPoolService.class) .ifPresent(TransactionPoolService::disableTransactionPool); diff --git a/src/main/java/net/consensys/fleet/common/rpc/client/WebClientWrapper.java b/src/main/java/net/consensys/fleet/common/rpc/client/WebClientWrapper.java index 5a3b9c6..5d41e8f 100644 --- a/src/main/java/net/consensys/fleet/common/rpc/client/WebClientWrapper.java +++ b/src/main/java/net/consensys/fleet/common/rpc/client/WebClientWrapper.java @@ -64,16 +64,18 @@ public CompletableFuture> sendToLeader( webClient .post(leader.port(), leader.host(), endpoint) + .timeout(100) .putHeader("Content-Type", CONTENT_TYPE) .sendJsonObject( jsonObject, event -> { if (event.failed()) { + LOG.info("event failed {}", String.valueOf(event.cause())); completableFuture.completeExceptionally(event.cause()); } else { completableFuture.complete(event.result()); } - LOG.trace( + LOG.info( "Send RPC request {} result {} for body {} {}", methodName, event.succeeded(), @@ -98,11 +100,12 @@ public void sendToFollowers(final String endpoint, final String methodName, fina peerNode -> { webClient .post(peerNode.port(), peerNode.host(), endpoint) + .timeout(100) .putHeader("Content-Type", CONTENT_TYPE) .sendJsonObject( jsonObject, event -> { - LOG.trace( + LOG.info( "Send RPC request {} to {} result {} for body {}", methodName, peerNode, diff --git a/src/main/java/net/consensys/fleet/follower/event/InitialSyncCompletionObserver.java b/src/main/java/net/consensys/fleet/follower/event/InitialSyncCompletionObserver.java index 27521e9..db18455 100644 --- a/src/main/java/net/consensys/fleet/follower/event/InitialSyncCompletionObserver.java +++ b/src/main/java/net/consensys/fleet/follower/event/InitialSyncCompletionObserver.java @@ -38,7 +38,7 @@ public void onInitialSyncCompleted() { fleetModeSynchronizerSupplier.get().disableP2P(); fleetModeSynchronizerSupplier.get().disableTrie(); } else { - LOG.debug("Cannot disable initial sync because fleet mode synchronizer is not ready"); + LOG.info("Cannot disable initial sync because fleet mode synchronizer is not ready"); } } diff --git a/src/main/java/net/consensys/fleet/follower/rpc/client/FleetAddFollowerClient.java b/src/main/java/net/consensys/fleet/follower/rpc/client/FleetAddFollowerClient.java index f2216c3..b72daa1 100644 --- a/src/main/java/net/consensys/fleet/follower/rpc/client/FleetAddFollowerClient.java +++ b/src/main/java/net/consensys/fleet/follower/rpc/client/FleetAddFollowerClient.java @@ -18,14 +18,20 @@ import net.consensys.fleet.common.rpc.client.WebClientWrapper; import net.consensys.fleet.common.rpc.model.PeerNode; +import java.io.PrintWriter; +import java.io.StringWriter; import java.util.concurrent.CompletableFuture; import com.fasterxml.jackson.core.JsonProcessingException; import io.vertx.core.buffer.Buffer; import io.vertx.ext.web.client.HttpResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class FleetAddFollowerClient extends AbstractStateRpcSender { + private static final Logger LOG = LoggerFactory.getLogger(FleetAddFollowerClient.class); + private static final String METHOD_NAME = "fleet_addFollowerNode"; public FleetAddFollowerClient(final WebClientWrapper webClient) { @@ -57,6 +63,12 @@ private boolean isConnected(final HttpResponse response, final Throwable if (throwable == null && response.statusCode() == 200) { return !response.bodyAsJsonObject().containsKey("error"); } + if (throwable != null) { + StringWriter sw = new StringWriter(); + throwable.printStackTrace(new PrintWriter(sw)); + LOG.info(sw.toString()); + } + LOG.info("is connect {}", response.statusCode()); return false; } } diff --git a/src/main/java/net/consensys/fleet/follower/rpc/client/FleetGetBlockClient.java b/src/main/java/net/consensys/fleet/follower/rpc/client/FleetGetBlockClient.java index f878499..a875d5b 100644 --- a/src/main/java/net/consensys/fleet/follower/rpc/client/FleetGetBlockClient.java +++ b/src/main/java/net/consensys/fleet/follower/rpc/client/FleetGetBlockClient.java @@ -19,12 +19,18 @@ import net.consensys.fleet.common.rpc.model.GetBlockRequest; import net.consensys.fleet.common.rpc.model.GetBlockResponse; +import java.io.PrintWriter; +import java.io.StringWriter; import java.util.concurrent.CompletableFuture; import com.fasterxml.jackson.core.JsonProcessingException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class FleetGetBlockClient extends AbstractStateRpcSender { + private static final Logger LOG = LoggerFactory.getLogger(FleetGetBlockClient.class); + private static final String METHOD_NAME = "fleet_getBlock"; public FleetGetBlockClient(final WebClientWrapper webClient) { @@ -49,9 +55,17 @@ public CompletableFuture sendData(final GetBlockRequest blockN completableFuture.complete( webClient.decode(result, "result", GetBlockResponse.class)); } catch (JsonProcessingException e) { + StringWriter sw = new StringWriter(); + e.printStackTrace(new PrintWriter(sw)); + LOG.info(sw.toString()); completableFuture.completeExceptionally(e); } } else { + + LOG.info("get leader error"); + StringWriter sw = new StringWriter(); + throwable.printStackTrace(new PrintWriter(sw)); + LOG.info(sw.toString()); completableFuture.completeExceptionally(throwable); } }); diff --git a/src/main/java/net/consensys/fleet/follower/rpc/server/FleetShipNewHeadServer.java b/src/main/java/net/consensys/fleet/follower/rpc/server/FleetShipNewHeadServer.java index 9b90f3a..8307f65 100644 --- a/src/main/java/net/consensys/fleet/follower/rpc/server/FleetShipNewHeadServer.java +++ b/src/main/java/net/consensys/fleet/follower/rpc/server/FleetShipNewHeadServer.java @@ -49,7 +49,7 @@ public String getName() { @Override public Object execute(PluginRpcRequest rpcRequest) { - LOG.debug("execute {} request with body {}", getName(), rpcRequest.getParams()); + LOG.info("execute {} request with body {}", getName(), rpcRequest.getParams()); try { if (isRlpConverterReady()) { @@ -66,7 +66,7 @@ public Object execute(PluginRpcRequest rpcRequest) { newHeadObserver.onNewHead(newHeadParams); } } catch (Exception e) { - LOG.trace("Ignore invalid request for method {} with {}", getName(), rpcRequest.getParams()); + LOG.info("Ignore invalid request for method {} with {}", getName(), rpcRequest.getParams()); } return null; diff --git a/src/main/java/net/consensys/fleet/follower/sync/BlockContextProvider.java b/src/main/java/net/consensys/fleet/follower/sync/BlockContextProvider.java index b9ea958..5c64021 100644 --- a/src/main/java/net/consensys/fleet/follower/sync/BlockContextProvider.java +++ b/src/main/java/net/consensys/fleet/follower/sync/BlockContextProvider.java @@ -19,6 +19,8 @@ import net.consensys.fleet.common.rpc.model.GetBlockResponse; import net.consensys.fleet.follower.rpc.client.FleetGetBlockClient; +import java.io.PrintWriter; +import java.io.StringWriter; import java.util.Collections; import java.util.List; import java.util.Objects; @@ -34,9 +36,13 @@ import org.hyperledger.besu.plugin.data.BlockHeader; import org.hyperledger.besu.plugin.data.TransactionReceipt; import org.hyperledger.besu.plugin.services.BlockchainService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class BlockContextProvider { + private static final Logger LOG = LoggerFactory.getLogger(BlockContextProvider.class); + private final Cache leaderBlock = CacheBuilder.newBuilder().maximumSize(20).expireAfterAccess(1, TimeUnit.MINUTES).build(); @@ -64,9 +70,7 @@ public Optional getLeaderBlockContextByNumber( } GetBlockResponse response = - getBlockClient - .sendData(new GetBlockRequest(blockNumber, fetchReceipts)) - .get(50, TimeUnit.MILLISECONDS); + getBlockClient.sendData(new GetBlockRequest(blockNumber, fetchReceipts)).get(); FleetBlockContext context = new FleetBlockContext( @@ -78,6 +82,9 @@ public Optional getLeaderBlockContextByNumber( leaderBlock.put(key, context); return Optional.of(context); } catch (Exception e) { + StringWriter sw = new StringWriter(); + e.printStackTrace(new PrintWriter(sw)); + LOG.info(sw.toString()); return Optional.empty(); } } diff --git a/src/main/java/net/consensys/fleet/follower/sync/FleetModeSynchronizer.java b/src/main/java/net/consensys/fleet/follower/sync/FleetModeSynchronizer.java index 921d0fb..1fdf273 100644 --- a/src/main/java/net/consensys/fleet/follower/sync/FleetModeSynchronizer.java +++ b/src/main/java/net/consensys/fleet/follower/sync/FleetModeSynchronizer.java @@ -83,7 +83,7 @@ public synchronized void syncNewHead(final NewHeadParams newHeadParams) { final SynchronizationService synchronizationService = pluginServiceProvider.getService(SynchronizationService.class); if (newHeadParams.getTrieLogRlp() != null) { - LOG.debug("add block to cache from leader {}", newHeadParams.getHead()); + LOG.info("add block to cache from leader {}", newHeadParams.getHead()); blockContextProvider.provideLeaderBlockContext( newHeadParams.getHead(), newHeadParams.getBlockBody(), @@ -105,7 +105,7 @@ public synchronized void syncNewHead(final NewHeadParams newHeadParams) { startSync(); } } else { - LOG.debug("Blockchain service is not ready"); + LOG.info("Blockchain service is not ready"); } } @@ -151,18 +151,18 @@ private void startSync() { .orElseThrow(MissingBlockException::new); Hash targetBlockHash = targetBlock.getBlockHeader().getBlockHash(); - LOG.debug( + LOG.info( "New head (or leader block) being detected. {} ({})", targetBlock.getBlockHeader().getNumber(), targetBlock.getBlockHeader().getBlockHash()); - LOG.debug( + LOG.info( "Detected local chain head {} ({}", chainHead.getNumber(), chainHead.getBlockHash()); while (persistedBlock.getBlockHeader().getNumber() > targetBlock.getBlockHeader().getNumber()) { - LOG.debug("Rollback {}", persistedBlockHash); + LOG.info("Rollback {}", persistedBlockHash); rollBackward.add( getLocalBlockContext(persistedBlock.getBlockHeader().getNumber()) .orElseThrow()); @@ -174,7 +174,7 @@ private void startSync() { while (persistedBlock.getBlockHeader().getNumber() < targetBlock.getBlockHeader().getNumber()) { - LOG.debug("Rollforward {}", targetBlockHash); + LOG.info("Rollforward {}", targetBlockHash); final BlockContextProvider.FleetBlockContext toRollForwardBlock = getLeaderBlockContext(targetBlock.getBlockHeader().getNumber()) .orElseThrow(MissingBlockException::new); @@ -197,8 +197,8 @@ private void startSync() { while (!persistedBlockHash.equals(targetBlockHash)) { LOG.info("Reorg detected so we clean the cache"); blockContextProvider.clear(); - LOG.debug("Paired rollback {}", persistedBlockHash); - LOG.debug("Paired rollforward {}", targetBlockHash); + LOG.info("Paired rollback {}", persistedBlockHash); + LOG.info("Paired rollforward {}", targetBlockHash); rollForward.add( getLeaderBlockContext(targetBlock.getBlockHeader().getNumber()) @@ -220,7 +220,7 @@ private void startSync() { } for (BlockContextProvider.FleetBlockContext blockContext : rollBackward) { - LOG.debug( + LOG.info( "Attempting rollback of {}", blockContext.getBlockHeader().getBlockHash()); Optional maybeTrieLog = blockContext.trieLogRlp(); @@ -244,7 +244,7 @@ private void startSync() { Comparator.comparingLong(o -> o.getBlockHeader().getNumber())); for (BlockContextProvider.FleetBlockContext blockContext : rollForward) { - LOG.debug( + LOG.info( "Attempting rollforward of {}", blockContext.getBlockHeader().getBlockHash()); // save trielog and set head @@ -283,7 +283,7 @@ private void startSync() { .getBlockHeader() .getBlockHash() .equals(chainHead.getBlockHash())) { - LOG.debug("head not changed {}", chainHead.getBlockHash()); + LOG.info("head not changed {}", chainHead.getBlockHash()); } else if (Math.abs( newHead.getBlockHeader().getNumber() - oldHead.getBlockHeader().getNumber()) @@ -323,7 +323,7 @@ public void disableP2P() { if (synchronizationService.isInitialSyncPhaseDone()) { if (isWaitingForSync.getAndSet(false)) { final P2PService p2PService = pluginServiceProvider.getService(P2PService.class); - LOG.debug("Disable P2P discovery"); + LOG.info("Disable P2P discovery"); p2PService.disableDiscovery(); } } diff --git a/src/main/java/net/consensys/fleet/leader/event/BlockAddedObserver.java b/src/main/java/net/consensys/fleet/leader/event/BlockAddedObserver.java index dc503ab..a38f618 100644 --- a/src/main/java/net/consensys/fleet/leader/event/BlockAddedObserver.java +++ b/src/main/java/net/consensys/fleet/leader/event/BlockAddedObserver.java @@ -46,7 +46,7 @@ public BlockAddedObserver( @Override public void onBlockAdded(final AddedBlockContext addedBlockContext) { - LOG.atDebug() + LOG.atInfo() .setMessage("New block added: {}") .addArgument(() -> addedBlockContext.getBlockHeader().getBlockHash()) .log(); @@ -70,6 +70,7 @@ private NewHeadParams buildNewHeadEvent(final AddedBlockContext headBlockContext .getRawTrieLogLayer(headBlockContext.getBlockHeader().getBlockHash()) .map(Bytes::toHexString); if (maybeTrielog.isEmpty()) { + LOG.info("send new block {}", maybeTrielog.isEmpty()); return new NewHeadParams(headBlockContext.getBlockHeader(), safeBlock, finalizedBlock); } else { return new NewHeadParams( diff --git a/src/main/java/net/consensys/fleet/leader/rpc/client/FleetShipNewHeadClient.java b/src/main/java/net/consensys/fleet/leader/rpc/client/FleetShipNewHeadClient.java index 6d65e6b..a856dfd 100644 --- a/src/main/java/net/consensys/fleet/leader/rpc/client/FleetShipNewHeadClient.java +++ b/src/main/java/net/consensys/fleet/leader/rpc/client/FleetShipNewHeadClient.java @@ -40,7 +40,7 @@ protected String getMethodeName() { @Override public CompletableFuture sendData(final NewHeadParams data) { - LOG.debug("Sending new head to followers"); + LOG.info("Sending new head to followers"); final CompletableFuture completableFuture = new CompletableFuture<>(); try { webClient.sendToFollowers(ENDPOINT, getMethodeName(), data); diff --git a/src/main/java/net/consensys/fleet/leader/rpc/server/FleetAddFollowerServer.java b/src/main/java/net/consensys/fleet/leader/rpc/server/FleetAddFollowerServer.java index 4e249b8..7c352ef 100644 --- a/src/main/java/net/consensys/fleet/leader/rpc/server/FleetAddFollowerServer.java +++ b/src/main/java/net/consensys/fleet/leader/rpc/server/FleetAddFollowerServer.java @@ -41,13 +41,13 @@ public String getName() { @Override public Object execute(PluginRpcRequest rpcRequest) { - LOG.debug("execute {} request with body {}", getName(), rpcRequest.getParams()); + LOG.info("execute {} request with body {}", getName(), rpcRequest.getParams()); try { final PeerNode peerNode = OBJECT_MAPPER.readValue(rpcRequest.getParams()[0].toString(), PeerNode.class); followerNodesManager.register(peerNode); } catch (JsonProcessingException e) { - LOG.trace("Ignore invalid request for method {} with {}", getName(), rpcRequest.getParams()); + LOG.info("Ignore invalid request for method {} with {}", getName(), rpcRequest.getParams()); } return null; } diff --git a/src/main/java/net/consensys/fleet/leader/rpc/server/FleetGetBlockServer.java b/src/main/java/net/consensys/fleet/leader/rpc/server/FleetGetBlockServer.java index 12dd326..d300bc0 100644 --- a/src/main/java/net/consensys/fleet/leader/rpc/server/FleetGetBlockServer.java +++ b/src/main/java/net/consensys/fleet/leader/rpc/server/FleetGetBlockServer.java @@ -59,7 +59,7 @@ public String getName() { @Override public Object execute(PluginRpcRequest rpcRequest) { - LOG.debug("execute {} request with body {}", getName(), rpcRequest.getParams()); + LOG.info("execute {} request with body {}", getName(), rpcRequest.getParams()); if (isBlockchainServiceReady() && rpcRequest.getParams().length > 0) { final BlockchainService blockchainService = pluginServiceProvider.getService(BlockchainService.class); @@ -96,8 +96,7 @@ public Object execute(PluginRpcRequest rpcRequest) { } } catch (JsonProcessingException e) { - LOG.trace( - "Ignore invalid request for method {} with {}", getName(), rpcRequest.getParams()); + LOG.info("Ignore invalid request for method {} with {}", getName(), rpcRequest.getParams()); } return null; } From e1f0284ada7ff6884f9c88aa3b8c04ad024b7170 Mon Sep 17 00:00:00 2001 From: Karim Taam Date: Mon, 14 Apr 2025 13:26:54 +0200 Subject: [PATCH 7/8] get block by hash to fix reorg issue --- gradle.properties | 2 +- .../common/rpc/model/GetBlockRequest.java | 13 ++- .../fleet/common/rpc/model/NewHeadParams.java | 55 ++++++--- .../rpc/server/FleetShipNewHeadServer.java | 4 +- .../follower/sync/BlockContextProvider.java | 47 ++++---- .../follower/sync/FleetModeSynchronizer.java | 105 +++++++++++------- .../leader/event/BlockAddedObserver.java | 33 ++++-- .../rpc/server/FleetGetBlockServer.java | 6 +- 8 files changed, 166 insertions(+), 99 deletions(-) diff --git a/gradle.properties b/gradle.properties index 0c72072..088d0c0 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,2 +1,2 @@ releaseVersion=0.2.8-SNAPSHOT -besuVersion=25.3.0 +besuVersion=25.4.0 diff --git a/src/main/java/net/consensys/fleet/common/rpc/model/GetBlockRequest.java b/src/main/java/net/consensys/fleet/common/rpc/model/GetBlockRequest.java index 63e8b93..50fa324 100644 --- a/src/main/java/net/consensys/fleet/common/rpc/model/GetBlockRequest.java +++ b/src/main/java/net/consensys/fleet/common/rpc/model/GetBlockRequest.java @@ -15,24 +15,25 @@ package net.consensys.fleet.common.rpc.model; import com.fasterxml.jackson.annotation.JsonProperty; +import org.hyperledger.besu.datatypes.Hash; public class GetBlockRequest { - @JsonProperty("blockNumber") - private Long blockNumber; + @JsonProperty("blockHash") + private Hash blockHash; @JsonProperty("fetchReceipts") private boolean fetchReceipts; public GetBlockRequest() {} - public GetBlockRequest(final Long blockNumber, final boolean fetchReceipts) { - this.blockNumber = blockNumber; + public GetBlockRequest(final Hash blockHash, final boolean fetchReceipts) { + this.blockHash = blockHash; this.fetchReceipts = fetchReceipts; } - public Long getBlockNumber() { - return blockNumber; + public Hash getBlockHash() { + return blockHash; } public boolean isFetchReceipts() { diff --git a/src/main/java/net/consensys/fleet/common/rpc/model/NewHeadParams.java b/src/main/java/net/consensys/fleet/common/rpc/model/NewHeadParams.java index 9971910..c803493 100644 --- a/src/main/java/net/consensys/fleet/common/rpc/model/NewHeadParams.java +++ b/src/main/java/net/consensys/fleet/common/rpc/model/NewHeadParams.java @@ -36,11 +36,17 @@ public class NewHeadParams { @JsonProperty("headTrieLogRlp") private String trieLogRlp; - @JsonProperty("safeBlock") - private Hash safeBlock; + @JsonProperty("safeBlockHash") + private Hash safeBlockHash; - @JsonProperty("finalizedBlock") - private Hash finalizedBlock; + @JsonProperty("safeBlockNumber") + private long safeBlockNumber; + + @JsonProperty("finalizedBlockHash") + private Hash finalizedBlockHash; + + @JsonProperty("finalizedBlockNumber") + private long finalizedBlockNumber; public NewHeadParams() {} @@ -49,19 +55,30 @@ public NewHeadParams( final BlockBody blockBody, final List receipts, final String trieLogRlp, - final Hash safeBlock, - final Hash finalizedBlock) { + final Hash safeBlockHash, + final long safeBlockNumber, + final Hash finalizedBlockHash, + final long finalizedBlockNumber) { this.head = head; this.blockBody = blockBody; this.receipts = receipts; this.trieLogRlp = trieLogRlp; - this.safeBlock = safeBlock; - this.finalizedBlock = finalizedBlock; + this.safeBlockHash = safeBlockHash; + this.safeBlockNumber = safeBlockNumber; + this.finalizedBlockHash = finalizedBlockHash; + this.finalizedBlockNumber = finalizedBlockNumber; } - public NewHeadParams(final BlockHeader head, final Hash safeBlock, final Hash finalizedBlock) { - this.safeBlock = safeBlock; - this.finalizedBlock = finalizedBlock; + public NewHeadParams( + final BlockHeader head, + final Hash safeBlockHash, + final long safeBlockNumber, + final Hash finalizedBlockHash, + final long finalizedBlockNumber) { + this.safeBlockHash = safeBlockHash; + this.safeBlockNumber = safeBlockNumber; + this.finalizedBlockHash = finalizedBlockHash; + this.finalizedBlockNumber = finalizedBlockNumber; this.head = head; } @@ -81,11 +98,19 @@ public String getTrieLogRlp() { return trieLogRlp; } - public Hash getSafeBlock() { - return safeBlock; + public Hash getSafeBlockHash() { + return safeBlockHash; + } + + public long getSafeBlockNumber() { + return safeBlockNumber; + } + + public Hash getFinalizedBlockHash() { + return finalizedBlockHash; } - public Hash getFinalizedBlock() { - return finalizedBlock; + public long getFinalizedBlockNumber() { + return finalizedBlockNumber; } } diff --git a/src/main/java/net/consensys/fleet/follower/rpc/server/FleetShipNewHeadServer.java b/src/main/java/net/consensys/fleet/follower/rpc/server/FleetShipNewHeadServer.java index 8307f65..21f49fa 100644 --- a/src/main/java/net/consensys/fleet/follower/rpc/server/FleetShipNewHeadServer.java +++ b/src/main/java/net/consensys/fleet/follower/rpc/server/FleetShipNewHeadServer.java @@ -61,8 +61,8 @@ public Object execute(PluginRpcRequest rpcRequest) { "receive new head {} ({}) , safe block {} and finalized block {}", newHeadParams.getHead().getNumber(), newHeadParams.getHead().getBlockHash(), - newHeadParams.getSafeBlock(), - newHeadParams.getFinalizedBlock()); + newHeadParams.getSafeBlockHash(), + newHeadParams.getFinalizedBlockHash()); newHeadObserver.onNewHead(newHeadParams); } } catch (Exception e) { diff --git a/src/main/java/net/consensys/fleet/follower/sync/BlockContextProvider.java b/src/main/java/net/consensys/fleet/follower/sync/BlockContextProvider.java index 5c64021..0e1a2dd 100644 --- a/src/main/java/net/consensys/fleet/follower/sync/BlockContextProvider.java +++ b/src/main/java/net/consensys/fleet/follower/sync/BlockContextProvider.java @@ -17,6 +17,7 @@ import net.consensys.fleet.common.plugin.PluginServiceProvider; import net.consensys.fleet.common.rpc.model.GetBlockRequest; import net.consensys.fleet.common.rpc.model.GetBlockResponse; +import net.consensys.fleet.common.rpc.model.NewHeadParams; import net.consensys.fleet.follower.rpc.client.FleetGetBlockClient; import java.io.PrintWriter; @@ -59,18 +60,19 @@ public BlockContextProvider( } public Optional getLeaderBlockContextByNumber( - final long blockNumber, final boolean fetchReceipts) { + final CompositeBlockKey compositeBlockKey, final boolean fetchReceipts) { try { - CompositeBlockKey key = new CompositeBlockKey(blockNumber); Optional cachedContext = - Optional.ofNullable(leaderBlock.getIfPresent(key)); + Optional.ofNullable(leaderBlock.getIfPresent(compositeBlockKey)); if (cachedContext.isPresent()) { return cachedContext; } GetBlockResponse response = - getBlockClient.sendData(new GetBlockRequest(blockNumber, fetchReceipts)).get(); + getBlockClient + .sendData(new GetBlockRequest(compositeBlockKey.getBlockHash(), fetchReceipts)) + .get(); FleetBlockContext context = new FleetBlockContext( @@ -79,7 +81,7 @@ public Optional getLeaderBlockContextByNumber( response.getReceipts(), Optional.of(Bytes.fromHexString(response.getTrieLogRlp()))); - leaderBlock.put(key, context); + leaderBlock.put(new CompositeBlockKey(response.getBlockHeader()), context); return Optional.of(context); } catch (Exception e) { StringWriter sw = new StringWriter(); @@ -89,19 +91,15 @@ public Optional getLeaderBlockContextByNumber( } } - public void provideLeaderBlockContext( - final BlockHeader blockHeader, - final BlockBody blockBody, - final List receipts, - final String trieLogRlp) { - - CompositeBlockKey key = - new CompositeBlockKey(blockHeader.getNumber(), blockHeader.getBlockHash()); - FleetBlockContext context = + public void provideLeaderBlockContext(final NewHeadParams newHeadParams) { + CompositeBlockKey key = new CompositeBlockKey(newHeadParams.getHead()); + leaderBlock.put( + key, new FleetBlockContext( - blockHeader, blockBody, receipts, Optional.of(Bytes.fromHexString(trieLogRlp))); - - leaderBlock.put(key, context); + newHeadParams.getHead(), + newHeadParams.getBlockBody(), + newHeadParams.getReceipts(), + Optional.of(Bytes.fromHexString(newHeadParams.getTrieLogRlp())))); } public Optional getLocalBlockContextByNumber( @@ -148,7 +146,7 @@ public void clear() { public static class CompositeBlockKey { - private long blockNumber; + private final long blockNumber; private Hash blockHash; public CompositeBlockKey(final long blockNumber, final Hash blockHash) { @@ -160,8 +158,17 @@ public CompositeBlockKey(final long blockNumber) { this.blockNumber = blockNumber; } - public CompositeBlockKey(final Hash blockHash) { - this.blockHash = blockHash; + public CompositeBlockKey(final BlockHeader blockHeader) { + this.blockNumber = blockHeader.getNumber(); + this.blockHash = blockHeader.getBlockHash(); + } + + public long getBlockNumber() { + return blockNumber; + } + + public Hash getBlockHash() { + return blockHash; } @Override diff --git a/src/main/java/net/consensys/fleet/follower/sync/FleetModeSynchronizer.java b/src/main/java/net/consensys/fleet/follower/sync/FleetModeSynchronizer.java index 1fdf273..9a93acc 100644 --- a/src/main/java/net/consensys/fleet/follower/sync/FleetModeSynchronizer.java +++ b/src/main/java/net/consensys/fleet/follower/sync/FleetModeSynchronizer.java @@ -16,6 +16,8 @@ import net.consensys.fleet.common.plugin.PluginServiceProvider; import net.consensys.fleet.common.rpc.model.NewHeadParams; +import net.consensys.fleet.follower.sync.BlockContextProvider.CompositeBlockKey; +import net.consensys.fleet.follower.sync.BlockContextProvider.FleetBlockContext; import java.util.ArrayList; import java.util.Comparator; @@ -64,7 +66,7 @@ public class FleetModeSynchronizer { private ScheduledFuture syncScheduler; - private BlockHeader leaderHeader; + private NewHeadParams leaderHeader; public FleetModeSynchronizer( final PluginServiceProvider pluginServiceProvider, @@ -78,26 +80,22 @@ public FleetModeSynchronizer( } public synchronized void syncNewHead(final NewHeadParams newHeadParams) { - this.leaderHeader = newHeadParams.getHead(); + this.leaderHeader = newHeadParams; if (isBlockchainServiceReady()) { final SynchronizationService synchronizationService = pluginServiceProvider.getService(SynchronizationService.class); if (newHeadParams.getTrieLogRlp() != null) { LOG.info("add block to cache from leader {}", newHeadParams.getHead()); - blockContextProvider.provideLeaderBlockContext( - newHeadParams.getHead(), - newHeadParams.getBlockBody(), - newHeadParams.getReceipts(), - newHeadParams.getTrieLogRlp()); + blockContextProvider.provideLeaderBlockContext(newHeadParams); } synchronizationService.fireNewUnverifiedForkchoiceEvent( newHeadParams.getHead().getBlockHash(), - newHeadParams.getSafeBlock(), - newHeadParams.getFinalizedBlock()); + newHeadParams.getSafeBlockHash(), + newHeadParams.getFinalizedBlockHash()); LOG.info( "Fire fork choice for safe block {} and finalized block {} ", - newHeadParams.getSafeBlock(), - newHeadParams.getFinalizedBlock()); + newHeadParams.getSafeBlockHash(), + newHeadParams.getFinalizedBlockHash()); if (isWaitingForSync.get()) { LOG.info("Waiting for the end of the initial synchronization phase"); } else { @@ -135,21 +133,32 @@ private void startSync() { try { do { - final List rollBackward = - new ArrayList<>(); - final List rollForward = - new ArrayList<>(); + final List rollBackward = new ArrayList<>(); + final List rollForward = new ArrayList<>(); - BlockContext persistedBlock = + FleetBlockContext persistedBlock = getLocalBlockContext(chainHead.getNumber()).orElseThrow(); Hash persistedBlockHash = persistedBlock.getBlockHeader().getBlockHash(); + long persistedBlockNumber = persistedBlock.getBlockHeader().getNumber(); + + FleetBlockContext targetBlock; + if ((leaderHeader.getFinalizedBlockNumber() + - persistedBlock.getBlockHeader().getNumber()) + > maxBlocksPerPersist) { + targetBlock = + getLeaderBlockContext( + new CompositeBlockKey( + leaderHeader.getFinalizedBlockNumber(), + leaderHeader.getFinalizedBlockHash())) + .orElseThrow(MissingBlockException::new); + } else { + targetBlock = + getLeaderBlockContext(new CompositeBlockKey(leaderHeader.getHead())) + .orElseThrow(MissingBlockException::new); + } - final long targetBlockNumber = - calculateRangeLimit(chainHead.getNumber(), this.leaderHeader.getNumber()); - BlockContext targetBlock = - getLeaderBlockContext(targetBlockNumber) - .orElseThrow(MissingBlockException::new); Hash targetBlockHash = targetBlock.getBlockHeader().getBlockHash(); + long targetBlockNumber = targetBlock.getBlockHeader().getNumber(); LOG.info( "New head (or leader block) being detected. {} ({})", @@ -170,13 +179,15 @@ private void startSync() { getLocalBlockContext(persistedBlock.getBlockHeader().getNumber() - 1) .orElseThrow(); persistedBlockHash = persistedBlock.getBlockHeader().getBlockHash(); + persistedBlockNumber = persistedBlock.getBlockHeader().getNumber(); } while (persistedBlock.getBlockHeader().getNumber() < targetBlock.getBlockHeader().getNumber()) { LOG.info("Rollforward {}", targetBlockHash); - final BlockContextProvider.FleetBlockContext toRollForwardBlock = - getLeaderBlockContext(targetBlock.getBlockHeader().getNumber()) + final FleetBlockContext toRollForwardBlock = + getLeaderBlockContext( + new CompositeBlockKey(targetBlock.getBlockHeader())) .orElseThrow(MissingBlockException::new); rollForward.add(toRollForwardBlock); if (persistedBlock.getBlockHeader().getNumber() @@ -188,24 +199,34 @@ private void startSync() { targetBlock = persistedBlock; } else { targetBlock = - getLeaderBlockContext(targetBlock.getBlockHeader().getNumber() - 1) + getLeaderBlockContext( + new CompositeBlockKey( + targetBlock.getBlockHeader().getNumber() - 1, + targetBlock.getBlockHeader().getParentHash())) .orElseThrow(MissingBlockException::new); } targetBlockHash = targetBlock.getBlockHeader().getBlockHash(); + targetBlockNumber = targetBlock.getBlockHeader().getNumber(); } - while (!persistedBlockHash.equals(targetBlockHash)) { + while (!persistedBlockHash.equals(targetBlockHash) + && persistedBlockNumber == targetBlockNumber) { LOG.info("Reorg detected so we clean the cache"); blockContextProvider.clear(); + // add again the the new head in the cache (avoid useless rpc request) + blockContextProvider.provideLeaderBlockContext(leaderHeader); LOG.info("Paired rollback {}", persistedBlockHash); LOG.info("Paired rollforward {}", targetBlockHash); rollForward.add( - getLeaderBlockContext(targetBlock.getBlockHeader().getNumber()) + getLeaderBlockContext( + new CompositeBlockKey(targetBlock.getBlockHeader())) .orElseThrow(MissingBlockException::new)); - final long targetBlockParent = targetBlock.getBlockHeader().getNumber() - 1; targetBlock = - getLeaderBlockContext(targetBlockParent) + getLeaderBlockContext( + new CompositeBlockKey( + targetBlock.getBlockHeader().getNumber() - 1, + targetBlock.getBlockHeader().getParentHash())) .orElseThrow(MissingBlockException::new); rollBackward.add( @@ -216,10 +237,12 @@ private void startSync() { .orElseThrow(); targetBlockHash = targetBlock.getBlockHeader().getBlockHash(); + targetBlockNumber = targetBlock.getBlockHeader().getNumber(); persistedBlockHash = persistedBlock.getBlockHeader().getBlockHash(); + persistedBlockNumber = persistedBlock.getBlockHeader().getNumber(); } - for (BlockContextProvider.FleetBlockContext blockContext : rollBackward) { + for (FleetBlockContext blockContext : rollBackward) { LOG.info( "Attempting rollback of {}", blockContext.getBlockHeader().getBlockHash()); @@ -243,7 +266,7 @@ private void startSync() { rollForward.sort( Comparator.comparingLong(o -> o.getBlockHeader().getNumber())); - for (BlockContextProvider.FleetBlockContext blockContext : rollForward) { + for (FleetBlockContext blockContext : rollForward) { LOG.info( "Attempting rollforward of {}", blockContext.getBlockHeader().getBlockHash()); @@ -299,7 +322,9 @@ private void startSync() { // reset local cache blockContextProvider.clear(); - } while (!chainHead.getBlockHash().equals(this.leaderHeader.getBlockHash())); + } while (!chainHead + .getBlockHash() + .equals(this.leaderHeader.getHead().getBlockHash())); } catch (MissingBlockException e) { // increase the time we wait before retrying syncDelay += retryIncrease; @@ -361,25 +386,19 @@ private void logImportedBlockInfo(final BlockHeader header, final BlockBody body LOG.info(String.format(message.toString(), messageArgs.toArray())); } - private long calculateRangeLimit(final long min, final long max) { - if ((max - min) > (long) maxBlocksPerPersist) { - return min + (long) maxBlocksPerPersist; - } - return max; - } - private boolean isBlockchainServiceReady() { return pluginServiceProvider.isServiceAvailable(BlockchainService.class); } - private Optional getLeaderBlockContext( - final long blockNumber) { + private Optional getLeaderBlockContext( + final CompositeBlockKey compositeBlockKey) { return blockContextProvider.getLeaderBlockContextByNumber( - blockNumber, (leaderHeader.getNumber() - blockNumber) <= headDistanceForReceiptFetch); + compositeBlockKey, + (leaderHeader.getHead().getNumber() - compositeBlockKey.getBlockNumber()) + <= headDistanceForReceiptFetch); } - private Optional getLocalBlockContext( - final long blockNumber) { + private Optional getLocalBlockContext(final long blockNumber) { return blockContextProvider.getLocalBlockContextByNumber(blockNumber, false); } } diff --git a/src/main/java/net/consensys/fleet/leader/event/BlockAddedObserver.java b/src/main/java/net/consensys/fleet/leader/event/BlockAddedObserver.java index a38f618..1165c4e 100644 --- a/src/main/java/net/consensys/fleet/leader/event/BlockAddedObserver.java +++ b/src/main/java/net/consensys/fleet/leader/event/BlockAddedObserver.java @@ -21,8 +21,9 @@ import java.util.Optional; import org.apache.tuweni.bytes.Bytes; -import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.plugin.data.AddedBlockContext; +import org.hyperledger.besu.plugin.data.BlockContext; +import org.hyperledger.besu.plugin.data.BlockHeader; import org.hyperledger.besu.plugin.data.TransactionReceipt; import org.hyperledger.besu.plugin.services.BesuEvents; import org.hyperledger.besu.plugin.services.BlockchainService; @@ -59,10 +60,18 @@ public void onBlockAdded(final AddedBlockContext addedBlockContext) { private NewHeadParams buildNewHeadEvent(final AddedBlockContext headBlockContext) { final BlockchainService service = pluginServiceProvider.getService(BlockchainService.class); - final Hash safeBlock = - service.getSafeBlock().orElse(headBlockContext.getBlockHeader().getBlockHash()); - final Hash finalizedBlock = - service.getFinalizedBlock().orElse(headBlockContext.getBlockHeader().getBlockHash()); + final BlockHeader safeBlock = + service + .getSafeBlock() + .flatMap(service::getBlockByHash) + .map(BlockContext::getBlockHeader) + .orElse(headBlockContext.getBlockHeader()); + final BlockHeader finalizedBlock = + service + .getFinalizedBlock() + .flatMap(service::getBlockByHash) + .map(BlockContext::getBlockHeader) + .orElse(headBlockContext.getBlockHeader()); final TrieLogProvider trieLogProvider = pluginServiceProvider.getService(TrieLogService.class).getTrieLogProvider(); final Optional maybeTrielog = @@ -70,8 +79,12 @@ private NewHeadParams buildNewHeadEvent(final AddedBlockContext headBlockContext .getRawTrieLogLayer(headBlockContext.getBlockHeader().getBlockHash()) .map(Bytes::toHexString); if (maybeTrielog.isEmpty()) { - LOG.info("send new block {}", maybeTrielog.isEmpty()); - return new NewHeadParams(headBlockContext.getBlockHeader(), safeBlock, finalizedBlock); + return new NewHeadParams( + headBlockContext.getBlockHeader(), + safeBlock.getBlockHash(), + safeBlock.getNumber(), + finalizedBlock.getBlockHash(), + finalizedBlock.getNumber()); } else { return new NewHeadParams( headBlockContext.getBlockHeader(), @@ -80,8 +93,10 @@ private NewHeadParams buildNewHeadEvent(final AddedBlockContext headBlockContext .map(TransactionReceipt.class::cast) .toList(), maybeTrielog.get(), - safeBlock, - finalizedBlock); + safeBlock.getBlockHash(), + safeBlock.getNumber(), + finalizedBlock.getBlockHash(), + finalizedBlock.getNumber()); } } } diff --git a/src/main/java/net/consensys/fleet/leader/rpc/server/FleetGetBlockServer.java b/src/main/java/net/consensys/fleet/leader/rpc/server/FleetGetBlockServer.java index d300bc0..8a86f7e 100644 --- a/src/main/java/net/consensys/fleet/leader/rpc/server/FleetGetBlockServer.java +++ b/src/main/java/net/consensys/fleet/leader/rpc/server/FleetGetBlockServer.java @@ -27,6 +27,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.tuweni.bytes.Bytes; +import org.hyperledger.besu.datatypes.Hash; import org.hyperledger.besu.plugin.data.BlockContext; import org.hyperledger.besu.plugin.data.BlockHeader; import org.hyperledger.besu.plugin.data.TransactionReceipt; @@ -68,9 +69,8 @@ public Object execute(PluginRpcRequest rpcRequest) { try { final GetBlockRequest getBlockRequest = OBJECT_MAPPER.readValue(rpcRequest.getParams()[0].toString(), GetBlockRequest.class); - final long blockNumber = getBlockRequest.getBlockNumber(); - final Optional blockByNumber = - blockchainService.getBlockByNumber(blockNumber); + final Hash blockHash = getBlockRequest.getBlockHash(); + final Optional blockByNumber = blockchainService.getBlockByHash(blockHash); if (blockByNumber.isPresent()) { final BlockHeader blockHeader = blockByNumber.get().getBlockHeader(); final List receipts; From dc46521680ce5f5a2451921afaa20888d564b6b5 Mon Sep 17 00:00:00 2001 From: Karim Taam Date: Wed, 30 Apr 2025 09:52:29 +0200 Subject: [PATCH 8/8] only notify when head is updated --- gradle.properties | 2 +- .../common/rpc/client/WebClientWrapper.java | 5 ++-- .../event/InitialSyncCompletionObserver.java | 2 +- .../rpc/client/FleetAddFollowerClient.java | 12 --------- .../rpc/client/FleetGetBlockClient.java | 14 ----------- .../rpc/server/FleetShipNewHeadServer.java | 6 ++--- .../follower/sync/BlockContextProvider.java | 9 ------- .../follower/sync/FleetModeSynchronizer.java | 24 +++++++++--------- .../leader/event/BlockAddedObserver.java | 25 ++++++++++--------- .../rpc/client/FleetShipNewHeadClient.java | 2 +- .../rpc/server/FleetAddFollowerServer.java | 4 +-- .../rpc/server/FleetGetBlockServer.java | 5 ++-- 12 files changed, 38 insertions(+), 72 deletions(-) diff --git a/gradle.properties b/gradle.properties index 088d0c0..7b6e4a7 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,2 +1,2 @@ releaseVersion=0.2.8-SNAPSHOT -besuVersion=25.4.0 +besuVersion=24.5.1-local diff --git a/src/main/java/net/consensys/fleet/common/rpc/client/WebClientWrapper.java b/src/main/java/net/consensys/fleet/common/rpc/client/WebClientWrapper.java index 5d41e8f..d0f2c77 100644 --- a/src/main/java/net/consensys/fleet/common/rpc/client/WebClientWrapper.java +++ b/src/main/java/net/consensys/fleet/common/rpc/client/WebClientWrapper.java @@ -70,12 +70,11 @@ public CompletableFuture> sendToLeader( jsonObject, event -> { if (event.failed()) { - LOG.info("event failed {}", String.valueOf(event.cause())); completableFuture.completeExceptionally(event.cause()); } else { completableFuture.complete(event.result()); } - LOG.info( + LOG.trace( "Send RPC request {} result {} for body {} {}", methodName, event.succeeded(), @@ -105,7 +104,7 @@ public void sendToFollowers(final String endpoint, final String methodName, fina .sendJsonObject( jsonObject, event -> { - LOG.info( + LOG.trace( "Send RPC request {} to {} result {} for body {}", methodName, peerNode, diff --git a/src/main/java/net/consensys/fleet/follower/event/InitialSyncCompletionObserver.java b/src/main/java/net/consensys/fleet/follower/event/InitialSyncCompletionObserver.java index db18455..27521e9 100644 --- a/src/main/java/net/consensys/fleet/follower/event/InitialSyncCompletionObserver.java +++ b/src/main/java/net/consensys/fleet/follower/event/InitialSyncCompletionObserver.java @@ -38,7 +38,7 @@ public void onInitialSyncCompleted() { fleetModeSynchronizerSupplier.get().disableP2P(); fleetModeSynchronizerSupplier.get().disableTrie(); } else { - LOG.info("Cannot disable initial sync because fleet mode synchronizer is not ready"); + LOG.debug("Cannot disable initial sync because fleet mode synchronizer is not ready"); } } diff --git a/src/main/java/net/consensys/fleet/follower/rpc/client/FleetAddFollowerClient.java b/src/main/java/net/consensys/fleet/follower/rpc/client/FleetAddFollowerClient.java index b72daa1..f2216c3 100644 --- a/src/main/java/net/consensys/fleet/follower/rpc/client/FleetAddFollowerClient.java +++ b/src/main/java/net/consensys/fleet/follower/rpc/client/FleetAddFollowerClient.java @@ -18,20 +18,14 @@ import net.consensys.fleet.common.rpc.client.WebClientWrapper; import net.consensys.fleet.common.rpc.model.PeerNode; -import java.io.PrintWriter; -import java.io.StringWriter; import java.util.concurrent.CompletableFuture; import com.fasterxml.jackson.core.JsonProcessingException; import io.vertx.core.buffer.Buffer; import io.vertx.ext.web.client.HttpResponse; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class FleetAddFollowerClient extends AbstractStateRpcSender { - private static final Logger LOG = LoggerFactory.getLogger(FleetAddFollowerClient.class); - private static final String METHOD_NAME = "fleet_addFollowerNode"; public FleetAddFollowerClient(final WebClientWrapper webClient) { @@ -63,12 +57,6 @@ private boolean isConnected(final HttpResponse response, final Throwable if (throwable == null && response.statusCode() == 200) { return !response.bodyAsJsonObject().containsKey("error"); } - if (throwable != null) { - StringWriter sw = new StringWriter(); - throwable.printStackTrace(new PrintWriter(sw)); - LOG.info(sw.toString()); - } - LOG.info("is connect {}", response.statusCode()); return false; } } diff --git a/src/main/java/net/consensys/fleet/follower/rpc/client/FleetGetBlockClient.java b/src/main/java/net/consensys/fleet/follower/rpc/client/FleetGetBlockClient.java index a875d5b..f878499 100644 --- a/src/main/java/net/consensys/fleet/follower/rpc/client/FleetGetBlockClient.java +++ b/src/main/java/net/consensys/fleet/follower/rpc/client/FleetGetBlockClient.java @@ -19,18 +19,12 @@ import net.consensys.fleet.common.rpc.model.GetBlockRequest; import net.consensys.fleet.common.rpc.model.GetBlockResponse; -import java.io.PrintWriter; -import java.io.StringWriter; import java.util.concurrent.CompletableFuture; import com.fasterxml.jackson.core.JsonProcessingException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class FleetGetBlockClient extends AbstractStateRpcSender { - private static final Logger LOG = LoggerFactory.getLogger(FleetGetBlockClient.class); - private static final String METHOD_NAME = "fleet_getBlock"; public FleetGetBlockClient(final WebClientWrapper webClient) { @@ -55,17 +49,9 @@ public CompletableFuture sendData(final GetBlockRequest blockN completableFuture.complete( webClient.decode(result, "result", GetBlockResponse.class)); } catch (JsonProcessingException e) { - StringWriter sw = new StringWriter(); - e.printStackTrace(new PrintWriter(sw)); - LOG.info(sw.toString()); completableFuture.completeExceptionally(e); } } else { - - LOG.info("get leader error"); - StringWriter sw = new StringWriter(); - throwable.printStackTrace(new PrintWriter(sw)); - LOG.info(sw.toString()); completableFuture.completeExceptionally(throwable); } }); diff --git a/src/main/java/net/consensys/fleet/follower/rpc/server/FleetShipNewHeadServer.java b/src/main/java/net/consensys/fleet/follower/rpc/server/FleetShipNewHeadServer.java index 21f49fa..db0e56a 100644 --- a/src/main/java/net/consensys/fleet/follower/rpc/server/FleetShipNewHeadServer.java +++ b/src/main/java/net/consensys/fleet/follower/rpc/server/FleetShipNewHeadServer.java @@ -49,7 +49,7 @@ public String getName() { @Override public Object execute(PluginRpcRequest rpcRequest) { - LOG.info("execute {} request with body {}", getName(), rpcRequest.getParams()); + LOG.debug("execute {} request with body {}", getName(), rpcRequest.getParams()); try { if (isRlpConverterReady()) { @@ -57,7 +57,7 @@ public Object execute(PluginRpcRequest rpcRequest) { convertMapperProvider .getJsonConverter() .readValue(rpcRequest.getParams()[0].toString(), NewHeadParams.class); - LOG.info( + LOG.debug( "receive new head {} ({}) , safe block {} and finalized block {}", newHeadParams.getHead().getNumber(), newHeadParams.getHead().getBlockHash(), @@ -66,7 +66,7 @@ public Object execute(PluginRpcRequest rpcRequest) { newHeadObserver.onNewHead(newHeadParams); } } catch (Exception e) { - LOG.info("Ignore invalid request for method {} with {}", getName(), rpcRequest.getParams()); + LOG.trace("Ignore invalid request for method {} with {}", getName(), rpcRequest.getParams()); } return null; diff --git a/src/main/java/net/consensys/fleet/follower/sync/BlockContextProvider.java b/src/main/java/net/consensys/fleet/follower/sync/BlockContextProvider.java index 0e1a2dd..0a6c4a3 100644 --- a/src/main/java/net/consensys/fleet/follower/sync/BlockContextProvider.java +++ b/src/main/java/net/consensys/fleet/follower/sync/BlockContextProvider.java @@ -20,8 +20,6 @@ import net.consensys.fleet.common.rpc.model.NewHeadParams; import net.consensys.fleet.follower.rpc.client.FleetGetBlockClient; -import java.io.PrintWriter; -import java.io.StringWriter; import java.util.Collections; import java.util.List; import java.util.Objects; @@ -37,13 +35,9 @@ import org.hyperledger.besu.plugin.data.BlockHeader; import org.hyperledger.besu.plugin.data.TransactionReceipt; import org.hyperledger.besu.plugin.services.BlockchainService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class BlockContextProvider { - private static final Logger LOG = LoggerFactory.getLogger(BlockContextProvider.class); - private final Cache leaderBlock = CacheBuilder.newBuilder().maximumSize(20).expireAfterAccess(1, TimeUnit.MINUTES).build(); @@ -84,9 +78,6 @@ public Optional getLeaderBlockContextByNumber( leaderBlock.put(new CompositeBlockKey(response.getBlockHeader()), context); return Optional.of(context); } catch (Exception e) { - StringWriter sw = new StringWriter(); - e.printStackTrace(new PrintWriter(sw)); - LOG.info(sw.toString()); return Optional.empty(); } } diff --git a/src/main/java/net/consensys/fleet/follower/sync/FleetModeSynchronizer.java b/src/main/java/net/consensys/fleet/follower/sync/FleetModeSynchronizer.java index 9a93acc..c8054f7 100644 --- a/src/main/java/net/consensys/fleet/follower/sync/FleetModeSynchronizer.java +++ b/src/main/java/net/consensys/fleet/follower/sync/FleetModeSynchronizer.java @@ -85,14 +85,14 @@ public synchronized void syncNewHead(final NewHeadParams newHeadParams) { final SynchronizationService synchronizationService = pluginServiceProvider.getService(SynchronizationService.class); if (newHeadParams.getTrieLogRlp() != null) { - LOG.info("add block to cache from leader {}", newHeadParams.getHead()); + LOG.trace("add block to cache from leader {}", newHeadParams.getHead()); blockContextProvider.provideLeaderBlockContext(newHeadParams); } synchronizationService.fireNewUnverifiedForkchoiceEvent( newHeadParams.getHead().getBlockHash(), newHeadParams.getSafeBlockHash(), newHeadParams.getFinalizedBlockHash()); - LOG.info( + LOG.debug( "Fire fork choice for safe block {} and finalized block {} ", newHeadParams.getSafeBlockHash(), newHeadParams.getFinalizedBlockHash()); @@ -160,18 +160,18 @@ private void startSync() { Hash targetBlockHash = targetBlock.getBlockHeader().getBlockHash(); long targetBlockNumber = targetBlock.getBlockHeader().getNumber(); - LOG.info( + LOG.debug( "New head (or leader block) being detected. {} ({})", targetBlock.getBlockHeader().getNumber(), targetBlock.getBlockHeader().getBlockHash()); - LOG.info( + LOG.debug( "Detected local chain head {} ({}", chainHead.getNumber(), chainHead.getBlockHash()); while (persistedBlock.getBlockHeader().getNumber() > targetBlock.getBlockHeader().getNumber()) { - LOG.info("Rollback {}", persistedBlockHash); + LOG.debug("Rollback {}", persistedBlockHash); rollBackward.add( getLocalBlockContext(persistedBlock.getBlockHeader().getNumber()) .orElseThrow()); @@ -184,7 +184,7 @@ private void startSync() { while (persistedBlock.getBlockHeader().getNumber() < targetBlock.getBlockHeader().getNumber()) { - LOG.info("Rollforward {}", targetBlockHash); + LOG.debug("Rollforward {}", targetBlockHash); final FleetBlockContext toRollForwardBlock = getLeaderBlockContext( new CompositeBlockKey(targetBlock.getBlockHeader())) @@ -211,12 +211,12 @@ private void startSync() { while (!persistedBlockHash.equals(targetBlockHash) && persistedBlockNumber == targetBlockNumber) { - LOG.info("Reorg detected so we clean the cache"); + LOG.debug("Reorg detected so we clean the cache"); blockContextProvider.clear(); // add again the the new head in the cache (avoid useless rpc request) blockContextProvider.provideLeaderBlockContext(leaderHeader); - LOG.info("Paired rollback {}", persistedBlockHash); - LOG.info("Paired rollforward {}", targetBlockHash); + LOG.debug("Paired rollback {}", persistedBlockHash); + LOG.debug("Paired rollforward {}", targetBlockHash); rollForward.add( getLeaderBlockContext( @@ -243,7 +243,7 @@ private void startSync() { } for (FleetBlockContext blockContext : rollBackward) { - LOG.info( + LOG.debug( "Attempting rollback of {}", blockContext.getBlockHeader().getBlockHash()); Optional maybeTrieLog = blockContext.trieLogRlp(); @@ -267,7 +267,7 @@ private void startSync() { Comparator.comparingLong(o -> o.getBlockHeader().getNumber())); for (FleetBlockContext blockContext : rollForward) { - LOG.info( + LOG.debug( "Attempting rollforward of {}", blockContext.getBlockHeader().getBlockHash()); // save trielog and set head @@ -306,7 +306,7 @@ private void startSync() { .getBlockHeader() .getBlockHash() .equals(chainHead.getBlockHash())) { - LOG.info("head not changed {}", chainHead.getBlockHash()); + LOG.debug("head not changed {}", chainHead.getBlockHash()); } else if (Math.abs( newHead.getBlockHeader().getNumber() - oldHead.getBlockHeader().getNumber()) diff --git a/src/main/java/net/consensys/fleet/leader/event/BlockAddedObserver.java b/src/main/java/net/consensys/fleet/leader/event/BlockAddedObserver.java index 1165c4e..60a3dd5 100644 --- a/src/main/java/net/consensys/fleet/leader/event/BlockAddedObserver.java +++ b/src/main/java/net/consensys/fleet/leader/event/BlockAddedObserver.java @@ -22,7 +22,6 @@ import org.apache.tuweni.bytes.Bytes; import org.hyperledger.besu.plugin.data.AddedBlockContext; -import org.hyperledger.besu.plugin.data.BlockContext; import org.hyperledger.besu.plugin.data.BlockHeader; import org.hyperledger.besu.plugin.data.TransactionReceipt; import org.hyperledger.besu.plugin.services.BesuEvents; @@ -47,30 +46,32 @@ public BlockAddedObserver( @Override public void onBlockAdded(final AddedBlockContext addedBlockContext) { - LOG.atInfo() - .setMessage("New block added: {}") - .addArgument(() -> addedBlockContext.getBlockHeader().getBlockHash()) - .log(); if (pluginServiceProvider.isServiceAvailable(BlockchainService.class)) { - stateShipNewHeadSender.sendData(buildNewHeadEvent(addedBlockContext)); + final BlockchainService service = pluginServiceProvider.getService(BlockchainService.class); + if (service.getChainHeadHash().equals(addedBlockContext.getBlockHeader().getBlockHash())) { + // only notify when the head is updated + LOG.atDebug() + .setMessage("New head received: {}") + .addArgument(() -> addedBlockContext.getBlockHeader().getBlockHash()) + .log(); + stateShipNewHeadSender.sendData(buildNewHeadEvent(service, addedBlockContext)); + } } else { LOG.error("BlockchainService is not available"); } } - private NewHeadParams buildNewHeadEvent(final AddedBlockContext headBlockContext) { - final BlockchainService service = pluginServiceProvider.getService(BlockchainService.class); + private NewHeadParams buildNewHeadEvent( + final BlockchainService service, final AddedBlockContext headBlockContext) { final BlockHeader safeBlock = service .getSafeBlock() - .flatMap(service::getBlockByHash) - .map(BlockContext::getBlockHeader) + .flatMap(service::getBlockHeaderByHash) .orElse(headBlockContext.getBlockHeader()); final BlockHeader finalizedBlock = service .getFinalizedBlock() - .flatMap(service::getBlockByHash) - .map(BlockContext::getBlockHeader) + .flatMap(service::getBlockHeaderByHash) .orElse(headBlockContext.getBlockHeader()); final TrieLogProvider trieLogProvider = pluginServiceProvider.getService(TrieLogService.class).getTrieLogProvider(); diff --git a/src/main/java/net/consensys/fleet/leader/rpc/client/FleetShipNewHeadClient.java b/src/main/java/net/consensys/fleet/leader/rpc/client/FleetShipNewHeadClient.java index a856dfd..6d65e6b 100644 --- a/src/main/java/net/consensys/fleet/leader/rpc/client/FleetShipNewHeadClient.java +++ b/src/main/java/net/consensys/fleet/leader/rpc/client/FleetShipNewHeadClient.java @@ -40,7 +40,7 @@ protected String getMethodeName() { @Override public CompletableFuture sendData(final NewHeadParams data) { - LOG.info("Sending new head to followers"); + LOG.debug("Sending new head to followers"); final CompletableFuture completableFuture = new CompletableFuture<>(); try { webClient.sendToFollowers(ENDPOINT, getMethodeName(), data); diff --git a/src/main/java/net/consensys/fleet/leader/rpc/server/FleetAddFollowerServer.java b/src/main/java/net/consensys/fleet/leader/rpc/server/FleetAddFollowerServer.java index 7c352ef..4e249b8 100644 --- a/src/main/java/net/consensys/fleet/leader/rpc/server/FleetAddFollowerServer.java +++ b/src/main/java/net/consensys/fleet/leader/rpc/server/FleetAddFollowerServer.java @@ -41,13 +41,13 @@ public String getName() { @Override public Object execute(PluginRpcRequest rpcRequest) { - LOG.info("execute {} request with body {}", getName(), rpcRequest.getParams()); + LOG.debug("execute {} request with body {}", getName(), rpcRequest.getParams()); try { final PeerNode peerNode = OBJECT_MAPPER.readValue(rpcRequest.getParams()[0].toString(), PeerNode.class); followerNodesManager.register(peerNode); } catch (JsonProcessingException e) { - LOG.info("Ignore invalid request for method {} with {}", getName(), rpcRequest.getParams()); + LOG.trace("Ignore invalid request for method {} with {}", getName(), rpcRequest.getParams()); } return null; } diff --git a/src/main/java/net/consensys/fleet/leader/rpc/server/FleetGetBlockServer.java b/src/main/java/net/consensys/fleet/leader/rpc/server/FleetGetBlockServer.java index 8a86f7e..c32c707 100644 --- a/src/main/java/net/consensys/fleet/leader/rpc/server/FleetGetBlockServer.java +++ b/src/main/java/net/consensys/fleet/leader/rpc/server/FleetGetBlockServer.java @@ -60,7 +60,7 @@ public String getName() { @Override public Object execute(PluginRpcRequest rpcRequest) { - LOG.info("execute {} request with body {}", getName(), rpcRequest.getParams()); + LOG.debug("execute {} request with body {}", getName(), rpcRequest.getParams()); if (isBlockchainServiceReady() && rpcRequest.getParams().length > 0) { final BlockchainService blockchainService = pluginServiceProvider.getService(BlockchainService.class); @@ -96,7 +96,8 @@ public Object execute(PluginRpcRequest rpcRequest) { } } catch (JsonProcessingException e) { - LOG.info("Ignore invalid request for method {} with {}", getName(), rpcRequest.getParams()); + LOG.trace( + "Ignore invalid request for method {} with {}", getName(), rpcRequest.getParams()); } return null; }