Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 17 additions & 17 deletions src/node/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,23 +174,23 @@ where
// Validate file paths early with clear error messages.
// On Linux, File::open() succeeds on directories, then read_to_end()
// fails with a cryptic "Is a directory" error.
if command.without_evm {
if let Some(ref path) = command.header {
if path.is_dir() {
return Err(eyre::eyre!(
"--header path '{}' is a directory, not a file. \
If using Docker, ensure the source file exists on the host \
(missing source files cause Docker to create directories \
at the mount point).",
path.display()
));
}
if !path.exists() {
return Err(eyre::eyre!(
"--header path '{}' does not exist",
path.display()
));
}
if command.without_evm
&& let Some(ref path) = command.header
{
if path.is_dir() {
return Err(eyre::eyre!(
"--header path '{}' is a directory, not a file. \
If using Docker, ensure the source file exists on the host \
(missing source files cause Docker to create directories \
at the mount point).",
path.display()
));
}
if !path.exists() {
return Err(eyre::eyre!(
"--header path '{}' does not exist",
path.display()
));
}
}
if command.state.is_dir() {
Expand Down
19 changes: 13 additions & 6 deletions src/node/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
rpc::engine_api::payload::HlPayloadTypes,
types::ReadPrecompileCalls,
},
pseudo_peer::{BlockSourceConfig, start_pseudo_peer},
pseudo_peer::{BlockSourceConfig, DbBlockNumberFn, start_pseudo_peer},
};
use alloy_primitives::B256;
use alloy_rlp::{Decodable, Encodable};
Expand All @@ -27,8 +27,8 @@ use reth_network::{NetworkConfig, NetworkHandle, NetworkManager};
use reth_network_api::PeersInfo;
use reth_payload_primitives::EngineApiMessageVersion;
use reth_provider::StageCheckpointReader;
use reth_storage_api::{BlockHashReader, BlockNumReader};
use reth_stages_types::StageId;
use reth_storage_api::{BlockHashReader, BlockNumReader};
use std::{
net::{Ipv4Addr, SocketAddr},
sync::Arc,
Expand Down Expand Up @@ -267,8 +267,15 @@ where
.provider()
.get_stage_checkpoint(StageId::Finish)?
.unwrap_or_default()
.block_number
+ 1;
.block_number +
1;

// Give the pseudo-peer a handle to the node's database so it can
// resolve hash→number directly instead of scanning the block source.
let provider = ctx.provider().clone();
let db_block_number: DbBlockNumberFn = Arc::new(move |hash| {
provider.block_number(hash).ok().flatten()
});

let chain_spec = ctx.chain_spec();
let chain_id = chain_spec.inner.chain().id();
Expand All @@ -284,8 +291,7 @@ where
match block_source.collect_block(latest).await {
Ok(block) => {
let reth_block = block.to_reth_block(chain_id);
let hash =
alloy_primitives::Sealable::hash_slow(&reth_block.header);
let hash = alloy_primitives::Sealable::hash_slow(&reth_block.header);
info!(
target: "reth::cli",
number = %latest,
Expand All @@ -309,6 +315,7 @@ where
local_node_record.to_string(),
block_source,
debug_cutoff_height,
Some(db_block_number),
)
.await
.unwrap();
Expand Down
55 changes: 31 additions & 24 deletions src/node/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,20 +84,34 @@ impl BlockAndReceipts {
let all_txs = block.body.inner.transactions;

// Split system txs from regular txs
let (system_tx_list, regular_tx_list) = if system_tx_count > 0 && system_tx_count <= all_txs.len() {
let (sys, reg) = all_txs.into_iter().enumerate().partition::<Vec<_>, _>(|(i, _)| *i < system_tx_count);
(sys.into_iter().map(|(_, tx)| tx).collect::<Vec<_>>(), reg.into_iter().map(|(_, tx)| tx).collect::<Vec<_>>())
} else {
(vec![], all_txs)
};
let (system_tx_list, regular_tx_list) =
if system_tx_count > 0 && system_tx_count <= all_txs.len() {
let (sys, reg) = all_txs
.into_iter()
.enumerate()
.partition::<Vec<_>, _>(|(i, _)| *i < system_tx_count);
(
sys.into_iter().map(|(_, tx)| tx).collect::<Vec<_>>(),
reg.into_iter().map(|(_, tx)| tx).collect::<Vec<_>>(),
)
} else {
(vec![], all_txs)
};

// Split receipts
let (system_receipts, regular_receipts) = if system_tx_count > 0 && system_tx_count <= receipts.len() {
let (sys, reg) = receipts.into_iter().enumerate().partition::<Vec<_>, _>(|(i, _)| *i < system_tx_count);
(sys.into_iter().map(|(_, r)| r).collect::<Vec<_>>(), reg.into_iter().map(|(_, r)| r).collect::<Vec<_>>())
} else {
(vec![], receipts)
};
let (system_receipts, regular_receipts) =
if system_tx_count > 0 && system_tx_count <= receipts.len() {
let (sys, reg) = receipts
.into_iter()
.enumerate()
.partition::<Vec<_>, _>(|(i, _)| *i < system_tx_count);
(
sys.into_iter().map(|(_, r)| r).collect::<Vec<_>>(),
reg.into_iter().map(|(_, r)| r).collect::<Vec<_>>(),
)
} else {
(vec![], receipts)
};

// Convert system transactions
let system_txs: Vec<SystemTx> = system_tx_list
Expand All @@ -110,22 +124,15 @@ impl BlockAndReceipts {
.collect();

// Convert regular transactions to reth_compat format
let compat_txs: Vec<reth_compat::TransactionSigned> = regular_tx_list
.into_iter()
.map(reth_compat::TransactionSigned::from_node_tx)
.collect();
let compat_txs: Vec<reth_compat::TransactionSigned> =
regular_tx_list.into_iter().map(reth_compat::TransactionSigned::from_node_tx).collect();

// Convert regular receipts
let legacy_receipts: Vec<LegacyReceipt> = regular_receipts
.into_iter()
.map(Into::into)
.collect();
let legacy_receipts: Vec<LegacyReceipt> =
regular_receipts.into_iter().map(Into::into).collect();

let sealed_block = reth_compat::SealedBlock {
header: reth_compat::SealedHeader {
hash,
header: block.header.inner,
},
header: reth_compat::SealedHeader { hash, header: block.header.inner },
body: alloy_consensus::BlockBody {
transactions: compat_txs,
ommers: vec![],
Expand Down
5 changes: 1 addition & 4 deletions src/pseudo_peer/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,7 @@ impl BlockSourceArgs {
} else {
format!("http://{url}")
};
Ok(Some(BlockSourceConfig::rpc(
url,
Duration::from_millis(self.rpc_polling_interval),
)))
Ok(Some(BlockSourceConfig::rpc(url, Duration::from_millis(self.rpc_polling_interval))))
} else {
Ok(Some(BlockSourceConfig::local(value.into())))
}
Expand Down
27 changes: 23 additions & 4 deletions src/pseudo_peer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,35 @@ pub mod prelude {
pub use super::{
config::BlockSourceConfig,
service::{BlockPoller, PseudoPeer},
sources::{BlockSource, CachedBlockSource, LocalBlockSource, RpcBlockSource, S3BlockSource},
sources::{
BlockSource, CachedBlockSource, LocalBlockSource, RpcBlockSource, S3BlockSource,
},
};
}

use crate::chainspec::HlChainSpec;
use reth_discv4::NodeRecord;
use reth_network::{NetworkEvent, NetworkEventListenerProvider};
use reth_network_api::Peers;
use std::str::FromStr;

/// Main function that starts the network manager and processes eth requests
pub async fn start_pseudo_peer(
chain_spec: Arc<HlChainSpec>,
destination_peer: String,
block_source: BlockSourceBoxed,
debug_cutoff_height: Option<u64>,
db_block_number: Option<DbBlockNumberFn>,
) -> eyre::Result<()> {
let blockhash_cache = new_blockhash_cache();

// Create network manager
// Parse the destination peer enode string
let node_record = NodeRecord::from_str(&destination_peer)
.map_err(|e| eyre::eyre!("Failed to parse destination peer: {e}"))?;

// Create network manager (no boot_nodes — we add the peer directly)
let (mut network, start_tx) = create_network_manager::<BlockSourceBoxed>(
(*chain_spec).clone(),
destination_peer,
block_source.clone(),
blockhash_cache.clone(),
debug_cutoff_height,
Expand All @@ -62,8 +71,18 @@ pub async fn start_pseudo_peer(
let mut network_events = network_handle.event_listener();
info!("Starting network manager...");

let mut service = PseudoPeer::new(chain_spec, block_source, blockhash_cache.clone());
let mut service =
PseudoPeer::new(chain_spec, block_source, blockhash_cache.clone(), db_block_number);
tokio::spawn(network);

// Directly add the main node as a peer (bypasses discovery)
info!(
peer_id = %node_record.id,
addr = %node_record.tcp_addr(),
"Adding main node as direct peer"
);
network_handle.add_trusted_peer(node_record.id, node_record.tcp_addr());

let mut first = true;

// Main event loop
Expand Down
18 changes: 4 additions & 14 deletions src/pseudo_peer/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,16 @@ use reth_network::{
NetworkConfig, NetworkManager, PeersConfig,
config::{SecretKey, rng_secret_key},
};
use reth_network_peers::TrustedPeer;
use reth_provider::test_utils::NoopProvider;
use std::{
net::{Ipv4Addr, SocketAddr},
str::FromStr,
sync::Arc,
};
use tokio::sync::mpsc;

pub struct NetworkBuilder {
secret: SecretKey,
peer_config: PeersConfig,
boot_nodes: Vec<TrustedPeer>,
discovery_port: u16,
listener_port: u16,
chain_spec: HlChainSpec,
Expand All @@ -27,8 +24,7 @@ impl Default for NetworkBuilder {
fn default() -> Self {
Self {
secret: rng_secret_key(),
peer_config: PeersConfig::default().with_max_outbound(1).with_max_inbound(1),
boot_nodes: vec![],
peer_config: PeersConfig::default().with_max_outbound(1).with_max_inbound(0),
discovery_port: 0,
listener_port: 0,
chain_spec: HlChainSpec::default(),
Expand All @@ -38,11 +34,6 @@ impl Default for NetworkBuilder {
}

impl NetworkBuilder {
pub fn with_boot_nodes(mut self, boot_nodes: Vec<TrustedPeer>) -> Self {
self.boot_nodes = boot_nodes;
self
}

pub fn with_chain_spec(mut self, chain_spec: HlChainSpec) -> Self {
self.chain_spec = chain_spec;
self
Expand All @@ -59,10 +50,11 @@ impl NetworkBuilder {
blockhash_cache: BlockHashCache,
) -> eyre::Result<(NetworkManager<HlNetworkPrimitives>, mpsc::Sender<()>)> {
let builder = NetworkConfig::<(), HlNetworkPrimitives>::builder(self.secret)
.boot_nodes(self.boot_nodes)
.peer_config(self.peer_config)
.discovery_addr(SocketAddr::new(Ipv4Addr::LOCALHOST.into(), self.discovery_port))
.listener_addr(SocketAddr::new(Ipv4Addr::LOCALHOST.into(), self.listener_port));
.listener_addr(SocketAddr::new(Ipv4Addr::LOCALHOST.into(), self.listener_port))
.disable_discv4_discovery()
.disable_dns_discovery();
let chain_id = self.chain_spec.inner.chain().id();

let (block_poller, start_tx) = BlockPoller::new_suspended(
Expand All @@ -85,13 +77,11 @@ impl NetworkBuilder {

pub async fn create_network_manager<BS>(
chain_spec: HlChainSpec,
destination_peer: String,
block_source: Arc<Box<dyn super::sources::BlockSource>>,
blockhash_cache: BlockHashCache,
debug_cutoff_height: Option<u64>,
) -> eyre::Result<(NetworkManager<HlNetworkPrimitives>, mpsc::Sender<()>)> {
NetworkBuilder::default()
.with_boot_nodes(vec![TrustedPeer::from_str(&destination_peer).unwrap()])
.with_chain_spec(chain_spec)
.with_debug_cutoff_height(debug_cutoff_height)
.build::<BS>(block_source, blockhash_cache)
Expand Down
Loading
Loading