From 055492be8675725a030ed082a8006bfe759dd560 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Fri, 4 Jul 2025 18:14:38 -0300 Subject: [PATCH 01/33] refactor: move parameter to function call --- crates/networking/p2p/discv4/lookup.rs | 23 +++++++---------------- crates/networking/p2p/discv4/server.rs | 10 +++------- 2 files changed, 10 insertions(+), 23 deletions(-) diff --git a/crates/networking/p2p/discv4/lookup.rs b/crates/networking/p2p/discv4/lookup.rs index 9e695c3517..bb32c1e550 100644 --- a/crates/networking/p2p/discv4/lookup.rs +++ b/crates/networking/p2p/discv4/lookup.rs @@ -20,16 +20,11 @@ use tracing::debug; pub struct Discv4LookupHandler { ctx: P2PContext, udp_socket: Arc, - interval_minutes: u64, } impl Discv4LookupHandler { - pub fn new(ctx: P2PContext, udp_socket: Arc, interval_minutes: u64) -> Self { - Self { - ctx, - udp_socket, - interval_minutes, - } + pub fn new(ctx: P2PContext, udp_socket: Arc) -> Self { + Self { ctx, udp_socket } } /// Starts a tokio scheduler that: @@ -52,19 +47,19 @@ impl Discv4LookupHandler { /// doesn't have any node to ask. /// /// See more https://github.com/ethereum/devp2p/blob/master/discv4.md#recursive-lookup - pub fn start(&self, initial_interval_wait_seconds: u64) { + pub fn start(&self, interval_minutes: u64, initial_interval_wait_seconds: u64) { self.ctx.tracker.spawn({ let self_clone = self.clone(); async move { self_clone - .start_lookup_loop(initial_interval_wait_seconds) + .start_lookup_loop(interval_minutes, initial_interval_wait_seconds) .await; } }); } - async fn start_lookup_loop(&self, initial_interval_wait_seconds: u64) { - let mut interval = tokio::time::interval(Duration::from_secs(self.interval_minutes)); + async fn start_lookup_loop(&self, interval_minutes: u64, initial_interval_wait_seconds: u64) { + let mut interval = tokio::time::interval(Duration::from_secs(interval_minutes * 60)); tokio::time::sleep(Duration::from_secs(initial_interval_wait_seconds)).await; loop { @@ -277,11 +272,7 @@ mod tests { }; fn lookup_handler_from_server(server: Discv4Server) -> Discv4LookupHandler { - Discv4LookupHandler::new( - server.ctx.clone(), - server.udp_socket.clone(), - server.lookup_interval_minutes, - ) + Discv4LookupHandler::new(server.ctx.clone(), server.udp_socket.clone()) } #[tokio::test] diff --git a/crates/networking/p2p/discv4/server.rs b/crates/networking/p2p/discv4/server.rs index 44d224c99a..dab7a9edac 100644 --- a/crates/networking/p2p/discv4/server.rs +++ b/crates/networking/p2p/discv4/server.rs @@ -74,11 +74,7 @@ impl Discv4Server { /// - Loads bootnodes to establish initial peer connections. /// - Starts the lookup handler via [`Discv4LookupHandler`] to periodically search for new peers. pub async fn start(&self, bootnodes: Vec) -> Result<(), DiscoveryError> { - let lookup_handler = Discv4LookupHandler::new( - self.ctx.clone(), - self.udp_socket.clone(), - self.lookup_interval_minutes, - ); + let lookup_handler = Discv4LookupHandler::new(self.ctx.clone(), self.udp_socket.clone()); self.ctx.tracker.spawn({ let self_clone = self.clone(); @@ -89,7 +85,7 @@ impl Discv4Server { async move { self_clone.start_revalidation().await } }); self.load_bootnodes(bootnodes).await; - lookup_handler.start(10); + lookup_handler.start(self.lookup_interval_minutes, 10); Ok(()) } @@ -102,7 +98,7 @@ impl Discv4Server { } } - pub async fn receive(&self) { + async fn receive(&self) { let mut buf = vec![0; MAX_DISC_PACKET_SIZE]; loop { From 0e9961a2d686a509b3e1349b0cca442228cbe72c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Fri, 4 Jul 2025 18:51:31 -0300 Subject: [PATCH 02/33] refactor: split step logic from loop --- crates/networking/p2p/discv4/lookup.rs | 73 ++++++++++++++++++-------- crates/networking/p2p/discv4/server.rs | 6 ++- 2 files changed, 56 insertions(+), 23 deletions(-) diff --git a/crates/networking/p2p/discv4/lookup.rs b/crates/networking/p2p/discv4/lookup.rs index bb32c1e550..d9f9ed3a9f 100644 --- a/crates/networking/p2p/discv4/lookup.rs +++ b/crates/networking/p2p/discv4/lookup.rs @@ -99,41 +99,70 @@ impl Discv4LookupHandler { async fn recursive_lookup(&self, target: H512) { // lookups start with the closest nodes to the target from our table let target_node_id = node_id(&target); - let mut peers_to_ask: Vec = self - .ctx - .table - .lock() - .await - .get_closest_nodes(target_node_id); + + let mut peers_to_ask = vec![]; // stores the peers in peers_to_ask + the peers that were in peers_to_ask but were replaced by closer targets - let mut seen_peers: HashSet = HashSet::default(); + let mut seen_peers = HashSet::default(); let mut asked_peers = HashSet::default(); - seen_peers.insert(self.ctx.local_node.public_key); - for node in &peers_to_ask { - seen_peers.insert(node.public_key); + loop { + let (_, should_continue) = self + .recursive_lookup_step( + target, + target_node_id, + &mut peers_to_ask, + &mut seen_peers, + &mut asked_peers, + ) + .await; + // the lookup finishes when there are no more queries to do + // that happens when we have asked all the peers + if !should_continue { + break; + } } + } - loop { - let (nodes_found, queries) = self.lookup(target, &mut asked_peers, &peers_to_ask).await; + /// Performs a single step of the recursive lookup. + /// Returns `true` if we found new peers, `false` otherwise. + async fn recursive_lookup_step( + &self, + target: H512, + target_node_id: H256, + peers_to_ask: &mut Vec, + seen_peers: &mut HashSet, + asked_peers: &mut HashSet, + ) -> (Vec, bool) { + if peers_to_ask.is_empty() { + let initial_peers = self + .ctx + .table + .lock() + .await + .get_closest_nodes(target_node_id); - for node in nodes_found { - if !seen_peers.contains(&node.public_key) { - seen_peers.insert(node.public_key); - self.peers_to_ask_push(&mut peers_to_ask, target_node_id, node); - } + peers_to_ask.extend(initial_peers.clone()); + + seen_peers.insert(self.ctx.local_node.public_key); + for node in peers_to_ask { + seen_peers.insert(node.public_key); } + return (initial_peers, true); + } + let (nodes_found, queries) = self.lookup(target, asked_peers, &peers_to_ask).await; - // the lookup finishes when there are no more queries to do - // that happens when we have asked all the peers - if queries == 0 { - break; + for node in &nodes_found { + if !seen_peers.contains(&node.public_key) { + seen_peers.insert(node.public_key); + self.peers_to_ask_push(peers_to_ask, target_node_id, node.clone()); } } + + return (nodes_found, queries > 0); } /// We use the public key instead of the node_id as target as we might need to perform a FindNode request - async fn lookup( + pub async fn lookup( &self, target: H512, asked_peers: &mut HashSet, diff --git a/crates/networking/p2p/discv4/server.rs b/crates/networking/p2p/discv4/server.rs index dab7a9edac..01034778cb 100644 --- a/crates/networking/p2p/discv4/server.rs +++ b/crates/networking/p2p/discv4/server.rs @@ -74,7 +74,7 @@ impl Discv4Server { /// - Loads bootnodes to establish initial peer connections. /// - Starts the lookup handler via [`Discv4LookupHandler`] to periodically search for new peers. pub async fn start(&self, bootnodes: Vec) -> Result<(), DiscoveryError> { - let lookup_handler = Discv4LookupHandler::new(self.ctx.clone(), self.udp_socket.clone()); + let lookup_handler = self.new_lookup_handler(); self.ctx.tracker.spawn({ let self_clone = self.clone(); @@ -90,6 +90,10 @@ impl Discv4Server { Ok(()) } + fn new_lookup_handler(&self) -> Discv4LookupHandler { + Discv4LookupHandler::new(self.ctx.clone(), self.udp_socket.clone()) + } + async fn load_bootnodes(&self, bootnodes: Vec) { for node in bootnodes { if let Err(e) = self.try_add_peer_and_ping(node).await { From 2d95ec189a0108f7ecfd392f35b96177e0adf8d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Fri, 4 Jul 2025 19:03:22 -0300 Subject: [PATCH 03/33] feat: add random node lookup iterator --- crates/networking/p2p/discv4/lookup.rs | 60 ++++++++++++++++++++++++++ crates/networking/p2p/discv4/server.rs | 5 +++ 2 files changed, 65 insertions(+) diff --git a/crates/networking/p2p/discv4/lookup.rs b/crates/networking/p2p/discv4/lookup.rs index d9f9ed3a9f..43269a8eaf 100644 --- a/crates/networking/p2p/discv4/lookup.rs +++ b/crates/networking/p2p/discv4/lookup.rs @@ -287,6 +287,66 @@ impl Discv4LookupHandler { } } +pub struct Discv4NodeIterator { + lookup_handler: Discv4LookupHandler, + target: H512, + target_node_id: H256, + peers_to_ask: Vec, + seen_peers: HashSet, + asked_peers: HashSet, + buffer: Vec, +} + +impl Discv4NodeIterator { + pub fn new(ctx: P2PContext, udp_socket: Arc) -> Self { + let random_signing_key = SigningKey::random(&mut OsRng); + let target = public_key_from_signing_key(&random_signing_key); + let target_node_id = node_id(&target); + + let lookup_handler = Discv4LookupHandler::new(ctx, udp_socket); + + Discv4NodeIterator { + lookup_handler, + target, + target_node_id, + peers_to_ask: vec![], + seen_peers: Default::default(), + asked_peers: Default::default(), + buffer: vec![], + } + } + + pub async fn next(&mut self) -> Node { + if let Some(node) = self.buffer.pop() { + return node; + } + loop { + let (found_nodes, should_continue) = self + .lookup_handler + .recursive_lookup_step( + self.target, + self.target_node_id, + &mut self.peers_to_ask, + &mut self.seen_peers, + &mut self.asked_peers, + ) + .await; + + self.buffer.extend(found_nodes); + + if !should_continue { + self.peers_to_ask.clear(); + self.seen_peers.clear(); + self.asked_peers.clear(); + } + + if let Some(node) = self.buffer.pop() { + return node; + } + } + } +} + #[cfg(test)] mod tests { use tokio::time::sleep; diff --git a/crates/networking/p2p/discv4/server.rs b/crates/networking/p2p/discv4/server.rs index 01034778cb..56f6dcb943 100644 --- a/crates/networking/p2p/discv4/server.rs +++ b/crates/networking/p2p/discv4/server.rs @@ -9,6 +9,7 @@ use super::{ }, }; use crate::{ + discv4::lookup::Discv4NodeIterator, kademlia::{KademliaTable, MAX_NODES_PER_BUCKET}, network::P2PContext, rlpx::{connection::server::RLPxConnection, utils::node_id}, @@ -90,6 +91,10 @@ impl Discv4Server { Ok(()) } + pub fn new_random_iterator(&self) -> Discv4NodeIterator { + Discv4NodeIterator::new(self.ctx.clone(), self.udp_socket.clone()) + } + fn new_lookup_handler(&self) -> Discv4LookupHandler { Discv4LookupHandler::new(self.ctx.clone(), self.udp_socket.clone()) } From e49cc72204086f6578f74e1603e956cf03cf0bef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Fri, 4 Jul 2025 19:17:47 -0300 Subject: [PATCH 04/33] refactor: start RPC API after networking --- cmd/ethrex/ethrex.rs | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/cmd/ethrex/ethrex.rs b/cmd/ethrex/ethrex.rs index a7ad4e0bda..be19cadbd9 100644 --- a/cmd/ethrex/ethrex.rs +++ b/cmd/ethrex/ethrex.rs @@ -98,18 +98,6 @@ async fn main() -> eyre::Result<()> { let cancel_token = tokio_util::sync::CancellationToken::new(); - init_rpc_api( - &opts, - peer_table.clone(), - local_p2p_node.clone(), - local_node_record.lock().await.clone(), - store.clone(), - blockchain.clone(), - cancel_token.clone(), - tracker.clone(), - ) - .await; - if opts.metrics_enabled { init_metrics(&opts, tracker.clone()); } @@ -127,7 +115,7 @@ async fn main() -> eyre::Result<()> { &opts, &network, &data_dir, - local_p2p_node, + local_p2p_node.clone(), local_node_record.clone(), signer, peer_table.clone(), @@ -142,6 +130,18 @@ async fn main() -> eyre::Result<()> { } } + init_rpc_api( + &opts, + peer_table.clone(), + local_p2p_node, + local_node_record.lock().await.clone(), + store.clone(), + blockchain.clone(), + cancel_token.clone(), + tracker.clone(), + ) + .await; + let mut signal_terminate = signal(SignalKind::terminate())?; tokio::select! { From a6e46991b31ade8071bde57aac3d87a7511f83bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Fri, 4 Jul 2025 19:54:55 -0300 Subject: [PATCH 05/33] wip: start RLPx server --- crates/networking/p2p/discv4/lookup.rs | 1 + crates/networking/p2p/discv4/server.rs | 3 +- crates/networking/p2p/network.rs | 4 ++ crates/networking/p2p/rlpx.rs | 1 + crates/networking/p2p/rlpx/server.rs | 70 ++++++++++++++++++++++++++ 5 files changed, 78 insertions(+), 1 deletion(-) create mode 100644 crates/networking/p2p/rlpx/server.rs diff --git a/crates/networking/p2p/discv4/lookup.rs b/crates/networking/p2p/discv4/lookup.rs index 43269a8eaf..54c68f4e24 100644 --- a/crates/networking/p2p/discv4/lookup.rs +++ b/crates/networking/p2p/discv4/lookup.rs @@ -287,6 +287,7 @@ impl Discv4LookupHandler { } } +#[derive(Debug, Clone)] pub struct Discv4NodeIterator { lookup_handler: Discv4LookupHandler, target: H512, diff --git a/crates/networking/p2p/discv4/server.rs b/crates/networking/p2p/discv4/server.rs index 56f6dcb943..2763b0d007 100644 --- a/crates/networking/p2p/discv4/server.rs +++ b/crates/networking/p2p/discv4/server.rs @@ -9,7 +9,6 @@ use super::{ }, }; use crate::{ - discv4::lookup::Discv4NodeIterator, kademlia::{KademliaTable, MAX_NODES_PER_BUCKET}, network::P2PContext, rlpx::{connection::server::RLPxConnection, utils::node_id}, @@ -26,6 +25,8 @@ use std::{ use tokio::{net::UdpSocket, sync::MutexGuard}; use tracing::{debug, error}; +pub use crate::discv4::lookup::Discv4NodeIterator; + const MAX_DISC_PACKET_SIZE: usize = 1280; const PROOF_EXPIRATION_IN_HS: u64 = 12; pub const MAX_PEERS_TCP_CONNECTIONS: usize = 100; diff --git a/crates/networking/p2p/network.rs b/crates/networking/p2p/network.rs index 4b90710778..e4dbe8200b 100644 --- a/crates/networking/p2p/network.rs +++ b/crates/networking/p2p/network.rs @@ -3,6 +3,7 @@ use crate::kademlia::{self, KademliaTable}; use crate::rlpx::connection::server::{RLPxConnBroadcastSender, RLPxConnection}; use crate::rlpx::message::Message as RLPxMessage; use crate::rlpx::p2p::SUPPORTED_SNAP_CAPABILITIES; +use crate::rlpx::server::RLPxServer; use crate::types::{Node, NodeRecord}; use ethrex_blockchain::Blockchain; use ethrex_common::{H256, H512}; @@ -102,6 +103,9 @@ pub async fn start_network(context: P2PContext, bootnodes: Vec) -> Result< .await .map_err(NetworkError::DiscoveryStart)?; + let iter = discovery.new_random_iterator(); + let _rlpx_server = RLPxServer::spawn(iter); + info!( "Listening for requests at {}", context.local_node.tcp_addr() diff --git a/crates/networking/p2p/rlpx.rs b/crates/networking/p2p/rlpx.rs index 1880b39487..d0e03c03dd 100644 --- a/crates/networking/p2p/rlpx.rs +++ b/crates/networking/p2p/rlpx.rs @@ -3,5 +3,6 @@ pub mod error; pub mod eth; pub mod message; pub mod p2p; +pub mod server; pub mod snap; pub mod utils; diff --git a/crates/networking/p2p/rlpx/server.rs b/crates/networking/p2p/rlpx/server.rs new file mode 100644 index 0000000000..c30c96d2ee --- /dev/null +++ b/crates/networking/p2p/rlpx/server.rs @@ -0,0 +1,70 @@ +use std::time::Duration; + +use spawned_concurrency::{ + messages::Unused, + tasks::{CastResponse, GenServer, GenServerHandle, send_after}, +}; + +use crate::{discv4::server::Discv4NodeIterator, kademlia::PeerChannels}; + +#[derive(Debug, thiserror::Error)] +pub enum RLPxServerError {} + +#[derive(Debug, Clone)] +pub struct RLPxServerState { + node_iterator: Discv4NodeIterator, + connections: Vec, +} + +#[derive(Clone)] +pub enum InMessage { + FetchPeers, +} + +#[derive(Clone, PartialEq)] +pub enum OutMessage {} + +pub struct RLPxServer; + +impl RLPxServer { + pub fn spawn(node_iterator: Discv4NodeIterator) -> GenServerHandle { + let state = RLPxServerState { + node_iterator, + connections: vec![], + }; + let handle = Self::start(state); + send_after( + Duration::from_millis(100), + handle.clone(), + InMessage::FetchPeers, + ); + handle + } +} + +impl GenServer for RLPxServer { + type CallMsg = Unused; + type CastMsg = InMessage; + type OutMsg = OutMessage; + type State = RLPxServerState; + type Error = RLPxServerError; + + fn new() -> Self { + Self + } + + async fn handle_cast( + &mut self, + InMessage::FetchPeers: Self::CastMsg, + handle: &GenServerHandle, + mut state: Self::State, + ) -> CastResponse { + send_after( + Duration::from_millis(10), + handle.clone(), + InMessage::FetchPeers, + ); + let node = state.node_iterator.next().await; + CastResponse::NoReply(state) + } +} From f7e7f346f3859530af66109c6db3c27b6063c0a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 7 Jul 2025 12:58:03 -0300 Subject: [PATCH 06/33] wip: add "lookup worker" genserver --- crates/networking/p2p/discv4/server.rs | 1 + crates/networking/p2p/network.rs | 3 +- crates/networking/p2p/rlpx.rs | 1 + crates/networking/p2p/rlpx/lookup.rs | 70 ++++++++++++++++++++++ crates/networking/p2p/rlpx/server.rs | 83 ++++++++++++++++++++------ 5 files changed, 137 insertions(+), 21 deletions(-) create mode 100644 crates/networking/p2p/rlpx/lookup.rs diff --git a/crates/networking/p2p/discv4/server.rs b/crates/networking/p2p/discv4/server.rs index 2763b0d007..dfce20a8e2 100644 --- a/crates/networking/p2p/discv4/server.rs +++ b/crates/networking/p2p/discv4/server.rs @@ -92,6 +92,7 @@ impl Discv4Server { Ok(()) } + /// Creates an iterator that yields nodes retrieved by doing random lookups. pub fn new_random_iterator(&self) -> Discv4NodeIterator { Discv4NodeIterator::new(self.ctx.clone(), self.udp_socket.clone()) } diff --git a/crates/networking/p2p/network.rs b/crates/networking/p2p/network.rs index e4dbe8200b..8689b8dc96 100644 --- a/crates/networking/p2p/network.rs +++ b/crates/networking/p2p/network.rs @@ -103,8 +103,7 @@ pub async fn start_network(context: P2PContext, bootnodes: Vec) -> Result< .await .map_err(NetworkError::DiscoveryStart)?; - let iter = discovery.new_random_iterator(); - let _rlpx_server = RLPxServer::spawn(iter); + let _rlpx_server = RLPxServer::spawn(discovery); info!( "Listening for requests at {}", diff --git a/crates/networking/p2p/rlpx.rs b/crates/networking/p2p/rlpx.rs index d0e03c03dd..4d7afbc25f 100644 --- a/crates/networking/p2p/rlpx.rs +++ b/crates/networking/p2p/rlpx.rs @@ -1,6 +1,7 @@ pub mod connection; pub mod error; pub mod eth; +pub mod lookup; pub mod message; pub mod p2p; pub mod server; diff --git a/crates/networking/p2p/rlpx/lookup.rs b/crates/networking/p2p/rlpx/lookup.rs new file mode 100644 index 0000000000..69fa584d93 --- /dev/null +++ b/crates/networking/p2p/rlpx/lookup.rs @@ -0,0 +1,70 @@ +use spawned_concurrency::{ + error::GenServerError, + messages::Unused, + tasks::{CastResponse, GenServer, GenServerHandle}, +}; +use tracing::error; + +use crate::{ + discv4::server::Discv4NodeIterator, + rlpx::server::{InMessage, RLPxServer}, +}; + +#[derive(Debug, Clone)] +pub struct RLPxLookupServerState { + node_iterator: Discv4NodeIterator, + consumer: GenServerHandle, +} + +#[derive(Debug, Clone)] +pub struct RLPxLookupServer; + +impl RLPxLookupServer { + pub async fn spawn( + node_iterator: Discv4NodeIterator, + consumer: GenServerHandle, + ) -> Result, GenServerError> { + let state = RLPxLookupServerState { + node_iterator, + consumer, + }; + let mut handle = Self::start(state); + handle.cast(FetchPeers).await?; + Ok(handle) + } +} + +#[derive(Debug, Clone)] +struct FetchPeers; + +impl GenServer for RLPxLookupServer { + type CallMsg = Unused; + type CastMsg = FetchPeers; + type OutMsg = Unused; + type State = RLPxLookupServerState; + type Error = std::convert::Infallible; + + fn new() -> Self { + Self + } + + async fn handle_cast( + &mut self, + _msg: Self::CastMsg, + handle: &GenServerHandle, + mut state: Self::State, + ) -> CastResponse { + // Stop on error + if handle.clone().cast(FetchPeers).await.is_err() { + error!("RLPxLookupServer: failed to send message to self, stopping lookup"); + return CastResponse::Stop; + } + let node = state.node_iterator.next().await; + + if state.consumer.cast(InMessage::NewPeer(node)).await.is_err() { + error!("RLPxLookupServer: failed to send message to consumer, stopping lookup"); + return CastResponse::Stop; + } + CastResponse::NoReply(state) + } +} diff --git a/crates/networking/p2p/rlpx/server.rs b/crates/networking/p2p/rlpx/server.rs index c30c96d2ee..64cabd0164 100644 --- a/crates/networking/p2p/rlpx/server.rs +++ b/crates/networking/p2p/rlpx/server.rs @@ -1,44 +1,57 @@ use std::time::Duration; use spawned_concurrency::{ + error::GenServerError, messages::Unused, tasks::{CastResponse, GenServer, GenServerHandle, send_after}, }; +use tracing::{error, info}; -use crate::{discv4::server::Discv4NodeIterator, kademlia::PeerChannels}; +use crate::{ + discv4::server::Discv4Server, + kademlia::PeerChannels, + rlpx::{connection::server::RLPxConnection, lookup::RLPxLookupServer}, + types::Node, +}; + +const MAX_PEER_COUNT: usize = 50; +const MAX_CONCURRENT_LOOKUPS: usize = 4; #[derive(Debug, thiserror::Error)] pub enum RLPxServerError {} #[derive(Debug, Clone)] pub struct RLPxServerState { - node_iterator: Discv4NodeIterator, + discovery_server: Discv4Server, + lookup_servers: Vec>, connections: Vec, } #[derive(Clone)] pub enum InMessage { - FetchPeers, + NewPeer(Node), + BookKeeping, } #[derive(Clone, PartialEq)] pub enum OutMessage {} +#[derive(Debug, Clone)] pub struct RLPxServer; impl RLPxServer { - pub fn spawn(node_iterator: Discv4NodeIterator) -> GenServerHandle { + pub async fn spawn( + discovery_server: Discv4Server, + ) -> Result, GenServerError> { let state = RLPxServerState { - node_iterator, + discovery_server, + lookup_servers: vec![], connections: vec![], }; - let handle = Self::start(state); - send_after( - Duration::from_millis(100), - handle.clone(), - InMessage::FetchPeers, - ); - handle + // TODO: spawn multiple lookup servers + let mut handle = Self::start(state); + handle.cast(InMessage::BookKeeping).await?; + Ok(handle) } } @@ -55,16 +68,48 @@ impl GenServer for RLPxServer { async fn handle_cast( &mut self, - InMessage::FetchPeers: Self::CastMsg, + msg: Self::CastMsg, handle: &GenServerHandle, mut state: Self::State, ) -> CastResponse { - send_after( - Duration::from_millis(10), - handle.clone(), - InMessage::FetchPeers, - ); - let node = state.node_iterator.next().await; + match msg { + InMessage::NewPeer(node) => { + info!("Found new peer: {node}"); + // start_new_connection + RLPxConnection::spawn_as_initiator(state.discovery_server, &node); + state.connections.append(node); + } + InMessage::BookKeeping => { + info!("Performing bookkeeping"); + bookkeeping(handle, &mut state).await; + } + } CastResponse::NoReply(state) } } + +/// Perform periodic tasks +async fn bookkeeping(handle: &GenServerHandle, state: &mut RLPxServerState) { + if state.connections.len() >= MAX_PEER_COUNT + || state.lookup_servers.len() >= MAX_CONCURRENT_LOOKUPS + { + return; + } + info!("Spawning new lookup servers"); + + for _ in 0..MAX_CONCURRENT_LOOKUPS { + let node_iterator = state.discovery_server.new_random_iterator(); + let Ok(new_lookup_server) = RLPxLookupServer::spawn(node_iterator, handle.clone()) + .await + .inspect_err(|e| error!("Failed to spawn lookup server: {e}")) + else { + continue; + }; + state.lookup_servers.push(new_lookup_server); + } + send_after( + Duration::from_secs(5), + handle.clone(), + InMessage::BookKeeping, + ); +} From 65a0b19d407556b20bd9cb3e582fa4512f3065c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 7 Jul 2025 13:27:52 -0300 Subject: [PATCH 07/33] feat: look for peers in new RLPxServer --- crates/networking/p2p/network.rs | 2 +- crates/networking/p2p/rlpx/lookup.rs | 14 ++++++++++++-- crates/networking/p2p/rlpx/server.rs | 20 ++++++++++---------- 3 files changed, 23 insertions(+), 13 deletions(-) diff --git a/crates/networking/p2p/network.rs b/crates/networking/p2p/network.rs index 8689b8dc96..12e4518500 100644 --- a/crates/networking/p2p/network.rs +++ b/crates/networking/p2p/network.rs @@ -103,7 +103,7 @@ pub async fn start_network(context: P2PContext, bootnodes: Vec) -> Result< .await .map_err(NetworkError::DiscoveryStart)?; - let _rlpx_server = RLPxServer::spawn(discovery); + let _rlpx_server = RLPxServer::spawn(context.clone(), discovery); info!( "Listening for requests at {}", diff --git a/crates/networking/p2p/rlpx/lookup.rs b/crates/networking/p2p/rlpx/lookup.rs index 69fa584d93..a15f78d238 100644 --- a/crates/networking/p2p/rlpx/lookup.rs +++ b/crates/networking/p2p/rlpx/lookup.rs @@ -7,11 +7,16 @@ use tracing::error; use crate::{ discv4::server::Discv4NodeIterator, - rlpx::server::{InMessage, RLPxServer}, + network::P2PContext, + rlpx::{ + connection::server::RLPxConnection, + server::{InMessage, RLPxServer}, + }, }; #[derive(Debug, Clone)] pub struct RLPxLookupServerState { + ctx: P2PContext, node_iterator: Discv4NodeIterator, consumer: GenServerHandle, } @@ -21,10 +26,12 @@ pub struct RLPxLookupServer; impl RLPxLookupServer { pub async fn spawn( + ctx: P2PContext, node_iterator: Discv4NodeIterator, consumer: GenServerHandle, ) -> Result, GenServerError> { let state = RLPxLookupServerState { + ctx, node_iterator, consumer, }; @@ -35,7 +42,7 @@ impl RLPxLookupServer { } #[derive(Debug, Clone)] -struct FetchPeers; +pub struct FetchPeers; impl GenServer for RLPxLookupServer { type CallMsg = Unused; @@ -61,6 +68,9 @@ impl GenServer for RLPxLookupServer { } let node = state.node_iterator.next().await; + // Start a connection + RLPxConnection::spawn_as_initiator(state.ctx.clone(), &node).await; + if state.consumer.cast(InMessage::NewPeer(node)).await.is_err() { error!("RLPxLookupServer: failed to send message to consumer, stopping lookup"); return CastResponse::Stop; diff --git a/crates/networking/p2p/rlpx/server.rs b/crates/networking/p2p/rlpx/server.rs index 64cabd0164..e16c823144 100644 --- a/crates/networking/p2p/rlpx/server.rs +++ b/crates/networking/p2p/rlpx/server.rs @@ -8,10 +8,7 @@ use spawned_concurrency::{ use tracing::{error, info}; use crate::{ - discv4::server::Discv4Server, - kademlia::PeerChannels, - rlpx::{connection::server::RLPxConnection, lookup::RLPxLookupServer}, - types::Node, + discv4::server::Discv4Server, network::P2PContext, rlpx::lookup::RLPxLookupServer, types::Node, }; const MAX_PEER_COUNT: usize = 50; @@ -22,9 +19,10 @@ pub enum RLPxServerError {} #[derive(Debug, Clone)] pub struct RLPxServerState { + ctx: P2PContext, discovery_server: Discv4Server, lookup_servers: Vec>, - connections: Vec, + connections: Vec, } #[derive(Clone)] @@ -41,9 +39,11 @@ pub struct RLPxServer; impl RLPxServer { pub async fn spawn( + ctx: P2PContext, discovery_server: Discv4Server, ) -> Result, GenServerError> { let state = RLPxServerState { + ctx, discovery_server, lookup_servers: vec![], connections: vec![], @@ -76,8 +76,7 @@ impl GenServer for RLPxServer { InMessage::NewPeer(node) => { info!("Found new peer: {node}"); // start_new_connection - RLPxConnection::spawn_as_initiator(state.discovery_server, &node); - state.connections.append(node); + state.connections.push(node); } InMessage::BookKeeping => { info!("Performing bookkeeping"); @@ -99,9 +98,10 @@ async fn bookkeeping(handle: &GenServerHandle, state: &mut RLPxServe for _ in 0..MAX_CONCURRENT_LOOKUPS { let node_iterator = state.discovery_server.new_random_iterator(); - let Ok(new_lookup_server) = RLPxLookupServer::spawn(node_iterator, handle.clone()) - .await - .inspect_err(|e| error!("Failed to spawn lookup server: {e}")) + let Ok(new_lookup_server) = + RLPxLookupServer::spawn(state.ctx.clone(), node_iterator, handle.clone()) + .await + .inspect_err(|e| error!("Failed to spawn lookup server: {e}")) else { continue; }; From 896bda1538d2beb2528711a9eca15fd9535b1104 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 7 Jul 2025 13:44:21 -0300 Subject: [PATCH 08/33] fix: poll future --- crates/networking/p2p/network.rs | 7 ++++++- crates/networking/p2p/rlpx/server.rs | 1 + 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/crates/networking/p2p/network.rs b/crates/networking/p2p/network.rs index 12e4518500..f043a36acb 100644 --- a/crates/networking/p2p/network.rs +++ b/crates/networking/p2p/network.rs @@ -12,6 +12,7 @@ use k256::{ ecdsa::SigningKey, elliptic_curve::{PublicKey, sec1::ToEncodedPoint}, }; +use spawned_concurrency::error::GenServerError; use std::{io, net::SocketAddr, sync::Arc}; use tokio::{ net::{TcpListener, TcpSocket}, @@ -33,6 +34,7 @@ pub fn peer_table(node_id: H256) -> Arc> { #[derive(Debug)] pub enum NetworkError { DiscoveryStart(DiscoveryError), + RLPxStart(GenServerError), } #[derive(Clone, Debug)] @@ -103,7 +105,10 @@ pub async fn start_network(context: P2PContext, bootnodes: Vec) -> Result< .await .map_err(NetworkError::DiscoveryStart)?; - let _rlpx_server = RLPxServer::spawn(context.clone(), discovery); + info!("Starting P2P service"); + RLPxServer::spawn(context.clone(), discovery) + .await + .map_err(NetworkError::RLPxStart)?; info!( "Listening for requests at {}", diff --git a/crates/networking/p2p/rlpx/server.rs b/crates/networking/p2p/rlpx/server.rs index e16c823144..01b285b5bd 100644 --- a/crates/networking/p2p/rlpx/server.rs +++ b/crates/networking/p2p/rlpx/server.rs @@ -72,6 +72,7 @@ impl GenServer for RLPxServer { handle: &GenServerHandle, mut state: Self::State, ) -> CastResponse { + info!("Received cast message"); match msg { InMessage::NewPeer(node) => { info!("Found new peer: {node}"); From de71b8e97d2bc83240e796a981b424984d2c9e28 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 7 Jul 2025 15:17:18 -0300 Subject: [PATCH 09/33] feat: stop looking for peers once limit is reached --- crates/networking/p2p/rlpx/lookup.rs | 31 +++++++++++++++++++++------- crates/networking/p2p/rlpx/server.rs | 22 +++++++++++--------- 2 files changed, 35 insertions(+), 18 deletions(-) diff --git a/crates/networking/p2p/rlpx/lookup.rs b/crates/networking/p2p/rlpx/lookup.rs index a15f78d238..f87e01ada3 100644 --- a/crates/networking/p2p/rlpx/lookup.rs +++ b/crates/networking/p2p/rlpx/lookup.rs @@ -10,15 +10,17 @@ use crate::{ network::P2PContext, rlpx::{ connection::server::RLPxConnection, - server::{InMessage, RLPxServer}, + server::{RLPxServer, RLPxServerHandle}, }, }; +pub type RLPxLookupServerHandle = GenServerHandle; + #[derive(Debug, Clone)] pub struct RLPxLookupServerState { ctx: P2PContext, node_iterator: Discv4NodeIterator, - consumer: GenServerHandle, + consumer: RLPxServerHandle, } #[derive(Debug, Clone)] @@ -36,17 +38,24 @@ impl RLPxLookupServer { consumer, }; let mut handle = Self::start(state); - handle.cast(FetchPeers).await?; + handle.cast(InMessage::FetchPeers).await?; Ok(handle) } + + pub async fn stop(handle: &mut RLPxLookupServerHandle) -> Result<(), GenServerError> { + handle.cast(InMessage::Stop).await + } } #[derive(Debug, Clone)] -pub struct FetchPeers; +pub enum InMessage { + FetchPeers, + Stop, +} impl GenServer for RLPxLookupServer { type CallMsg = Unused; - type CastMsg = FetchPeers; + type CastMsg = InMessage; type OutMsg = Unused; type State = RLPxLookupServerState; type Error = std::convert::Infallible; @@ -57,12 +66,15 @@ impl GenServer for RLPxLookupServer { async fn handle_cast( &mut self, - _msg: Self::CastMsg, + msg: Self::CastMsg, handle: &GenServerHandle, mut state: Self::State, ) -> CastResponse { + if matches!(msg, InMessage::Stop) { + return CastResponse::Stop; + } // Stop on error - if handle.clone().cast(FetchPeers).await.is_err() { + if handle.clone().cast(InMessage::FetchPeers).await.is_err() { error!("RLPxLookupServer: failed to send message to self, stopping lookup"); return CastResponse::Stop; } @@ -71,7 +83,10 @@ impl GenServer for RLPxLookupServer { // Start a connection RLPxConnection::spawn_as_initiator(state.ctx.clone(), &node).await; - if state.consumer.cast(InMessage::NewPeer(node)).await.is_err() { + if RLPxServer::add_peer(&mut state.consumer, node) + .await + .is_err() + { error!("RLPxLookupServer: failed to send message to consumer, stopping lookup"); return CastResponse::Stop; } diff --git a/crates/networking/p2p/rlpx/server.rs b/crates/networking/p2p/rlpx/server.rs index 01b285b5bd..b4e7bbc3c0 100644 --- a/crates/networking/p2p/rlpx/server.rs +++ b/crates/networking/p2p/rlpx/server.rs @@ -14,8 +14,7 @@ use crate::{ const MAX_PEER_COUNT: usize = 50; const MAX_CONCURRENT_LOOKUPS: usize = 4; -#[derive(Debug, thiserror::Error)] -pub enum RLPxServerError {} +pub type RLPxServerHandle = GenServerHandle; #[derive(Debug, Clone)] pub struct RLPxServerState { @@ -53,6 +52,10 @@ impl RLPxServer { handle.cast(InMessage::BookKeeping).await?; Ok(handle) } + + pub async fn add_peer(handle: &mut RLPxServerHandle, peer: Node) -> Result<(), GenServerError> { + handle.cast(InMessage::NewPeer(peer)).await + } } impl GenServer for RLPxServer { @@ -60,7 +63,7 @@ impl GenServer for RLPxServer { type CastMsg = InMessage; type OutMsg = OutMessage; type State = RLPxServerState; - type Error = RLPxServerError; + type Error = std::convert::Infallible; fn new() -> Self { Self @@ -75,12 +78,9 @@ impl GenServer for RLPxServer { info!("Received cast message"); match msg { InMessage::NewPeer(node) => { - info!("Found new peer: {node}"); - // start_new_connection state.connections.push(node); } InMessage::BookKeeping => { - info!("Performing bookkeeping"); bookkeeping(handle, &mut state).await; } } @@ -90,10 +90,12 @@ impl GenServer for RLPxServer { /// Perform periodic tasks async fn bookkeeping(handle: &GenServerHandle, state: &mut RLPxServerState) { - if state.connections.len() >= MAX_PEER_COUNT - || state.lookup_servers.len() >= MAX_CONCURRENT_LOOKUPS - { - return; + if state.connections.len() >= MAX_PEER_COUNT { + for mut server in state.lookup_servers.drain(..) { + let _ = RLPxLookupServer::stop(&mut server) + .await + .inspect_err(|e| error!("Failed to stop lookup server: {e}")); + } } info!("Spawning new lookup servers"); From 6827c551f4fa7f088cedb845675775de3ecaaf12 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 7 Jul 2025 15:22:23 -0300 Subject: [PATCH 10/33] chore: remove spammy log --- crates/networking/p2p/rlpx/server.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/networking/p2p/rlpx/server.rs b/crates/networking/p2p/rlpx/server.rs index b4e7bbc3c0..eef92ce833 100644 --- a/crates/networking/p2p/rlpx/server.rs +++ b/crates/networking/p2p/rlpx/server.rs @@ -75,7 +75,6 @@ impl GenServer for RLPxServer { handle: &GenServerHandle, mut state: Self::State, ) -> CastResponse { - info!("Received cast message"); match msg { InMessage::NewPeer(node) => { state.connections.push(node); From 154d6b9ac82917957a27995e0313adbd5c604aeb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 7 Jul 2025 15:23:41 -0300 Subject: [PATCH 11/33] fix: add missing early return --- crates/networking/p2p/rlpx/server.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/networking/p2p/rlpx/server.rs b/crates/networking/p2p/rlpx/server.rs index eef92ce833..df7c0ee4a4 100644 --- a/crates/networking/p2p/rlpx/server.rs +++ b/crates/networking/p2p/rlpx/server.rs @@ -95,6 +95,7 @@ async fn bookkeeping(handle: &GenServerHandle, state: &mut RLPxServe .await .inspect_err(|e| error!("Failed to stop lookup server: {e}")); } + return; } info!("Spawning new lookup servers"); From 0885f019b77a3df18a6c48c5ac132007bd7a1a4d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 7 Jul 2025 15:44:34 -0300 Subject: [PATCH 12/33] fix: avoid early return in bookkeeping function --- crates/networking/p2p/rlpx/server.rs | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/crates/networking/p2p/rlpx/server.rs b/crates/networking/p2p/rlpx/server.rs index df7c0ee4a4..9e21d86edd 100644 --- a/crates/networking/p2p/rlpx/server.rs +++ b/crates/networking/p2p/rlpx/server.rs @@ -95,20 +95,20 @@ async fn bookkeeping(handle: &GenServerHandle, state: &mut RLPxServe .await .inspect_err(|e| error!("Failed to stop lookup server: {e}")); } - return; - } - info!("Spawning new lookup servers"); + } else { + info!("Spawning new lookup servers"); - for _ in 0..MAX_CONCURRENT_LOOKUPS { - let node_iterator = state.discovery_server.new_random_iterator(); - let Ok(new_lookup_server) = - RLPxLookupServer::spawn(state.ctx.clone(), node_iterator, handle.clone()) - .await - .inspect_err(|e| error!("Failed to spawn lookup server: {e}")) - else { - continue; - }; - state.lookup_servers.push(new_lookup_server); + for _ in 0..MAX_CONCURRENT_LOOKUPS { + let node_iterator = state.discovery_server.new_random_iterator(); + let Ok(new_lookup_server) = + RLPxLookupServer::spawn(state.ctx.clone(), node_iterator, handle.clone()) + .await + .inspect_err(|e| error!("Failed to spawn lookup server: {e}")) + else { + continue; + }; + state.lookup_servers.push(new_lookup_server); + } } send_after( Duration::from_secs(5), From d3797fb9c93652980cfb795a06e50c08417e862b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 7 Jul 2025 16:23:39 -0300 Subject: [PATCH 13/33] feat: prune peers according to supported capabilities --- crates/networking/p2p/rlpx/lookup.rs | 13 +++++++++++++ crates/networking/p2p/rlpx/server.rs | 27 ++++++++++++++++++++++++--- 2 files changed, 37 insertions(+), 3 deletions(-) diff --git a/crates/networking/p2p/rlpx/lookup.rs b/crates/networking/p2p/rlpx/lookup.rs index f87e01ada3..3a7d652dc8 100644 --- a/crates/networking/p2p/rlpx/lookup.rs +++ b/crates/networking/p2p/rlpx/lookup.rs @@ -79,7 +79,20 @@ impl GenServer for RLPxLookupServer { return CastResponse::Stop; } let node = state.node_iterator.next().await; + let node_id = node.node_id(); + let is_connected = state + .ctx + .table + .lock() + .await + .get_by_node_id(node_id) + .map(|p| p.is_connected) + .unwrap_or(false); + // If we already have a connection to this node, we don't need to start a new one + if is_connected { + return CastResponse::NoReply(state); + } // Start a connection RLPxConnection::spawn_as_initiator(state.ctx.clone(), &node).await; diff --git a/crates/networking/p2p/rlpx/server.rs b/crates/networking/p2p/rlpx/server.rs index 9e21d86edd..272363ec04 100644 --- a/crates/networking/p2p/rlpx/server.rs +++ b/crates/networking/p2p/rlpx/server.rs @@ -1,5 +1,6 @@ use std::time::Duration; +use ethrex_common::H256; use spawned_concurrency::{ error::GenServerError, messages::Unused, @@ -8,7 +9,10 @@ use spawned_concurrency::{ use tracing::{error, info}; use crate::{ - discv4::server::Discv4Server, network::P2PContext, rlpx::lookup::RLPxLookupServer, types::Node, + discv4::server::Discv4Server, + network::P2PContext, + rlpx::{lookup::RLPxLookupServer, p2p::SUPPORTED_SNAP_CAPABILITIES}, + types::Node, }; const MAX_PEER_COUNT: usize = 50; @@ -21,7 +25,7 @@ pub struct RLPxServerState { ctx: P2PContext, discovery_server: Discv4Server, lookup_servers: Vec>, - connections: Vec, + connections: Vec, } #[derive(Clone)] @@ -77,7 +81,7 @@ impl GenServer for RLPxServer { ) -> CastResponse { match msg { InMessage::NewPeer(node) => { - state.connections.push(node); + state.connections.push(node.node_id()); } InMessage::BookKeeping => { bookkeeping(handle, &mut state).await; @@ -89,6 +93,8 @@ impl GenServer for RLPxServer { /// Perform periodic tasks async fn bookkeeping(handle: &GenServerHandle, state: &mut RLPxServerState) { + prune_peers(state).await; + if state.connections.len() >= MAX_PEER_COUNT { for mut server in state.lookup_servers.drain(..) { let _ = RLPxLookupServer::stop(&mut server) @@ -116,3 +122,18 @@ async fn bookkeeping(handle: &GenServerHandle, state: &mut RLPxServe InMessage::BookKeeping, ); } + +async fn prune_peers(state: &mut RLPxServerState) { + let table = state.ctx.table.lock().await; + state.connections = state + .connections + .iter() + .flat_map(|node_id| table.get_by_node_id(*node_id)) + .filter(|peer| { + peer.supported_capabilities + .iter() + .any(|c| SUPPORTED_SNAP_CAPABILITIES.contains(c)) + }) + .map(|peer| peer.node.node_id()) + .collect(); +} From 1a5b1a8e1e20245dccd61146e43a314844ac51e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 7 Jul 2025 16:26:21 -0300 Subject: [PATCH 14/33] chore: comment out rlpx connection in discv4 server --- crates/networking/p2p/discv4/server.rs | 74 +++++++++++++------------- 1 file changed, 37 insertions(+), 37 deletions(-) diff --git a/crates/networking/p2p/discv4/server.rs b/crates/networking/p2p/discv4/server.rs index dfce20a8e2..3928252d39 100644 --- a/crates/networking/p2p/discv4/server.rs +++ b/crates/networking/p2p/discv4/server.rs @@ -200,7 +200,7 @@ impl Discv4Server { )); } - // all validations went well, mark as answered and start a rlpx connection + // all validations went well, mark as answered self.ctx .table .lock() @@ -217,22 +217,22 @@ impl Discv4Server { } } - // We won't initiate a connection if we are already connected. - // This will typically be the case when revalidating a node. - if peer.is_connected { - return Ok(()); - } + // // We won't initiate a connection if we are already connected. + // // This will typically be the case when revalidating a node. + // if peer.is_connected { + // return Ok(()); + // } - // We won't initiate a connection if we have reached the maximum number of peers. - let active_connections = { - let table = self.ctx.table.lock().await; - table.count_connected_peers() - }; - if active_connections >= MAX_PEERS_TCP_CONNECTIONS { - return Ok(()); - } + // // We won't initiate a connection if we have reached the maximum number of peers. + // let active_connections = { + // let table = self.ctx.table.lock().await; + // table.count_connected_peers() + // }; + // if active_connections >= MAX_PEERS_TCP_CONNECTIONS { + // return Ok(()); + // } - RLPxConnection::spawn_as_initiator(self.ctx.clone(), &peer.node).await; + // RLPxConnection::spawn_as_initiator(self.ctx.clone(), &peer.node).await; Ok(()) } @@ -504,28 +504,28 @@ impl Discv4Server { peer.node.public_key ); } - let peer = { - let table = self.ctx.table.lock().await; - table.get_by_node_id(packet.get_node_id()).cloned() - }; - let Some(peer) = peer else { - return Err(DiscoveryError::InvalidMessage("not known node".into())); - }; - // This will typically be the case when revalidating a node. - if peer.is_connected { - return Ok(()); - } - - // We won't initiate a connection if we have reached the maximum number of peers. - let active_connections = { - let table = self.ctx.table.lock().await; - table.count_connected_peers() - }; - if active_connections >= MAX_PEERS_TCP_CONNECTIONS { - return Ok(()); - } - - RLPxConnection::spawn_as_initiator(self.ctx.clone(), &peer.node).await; + // let peer = { + // let table = self.ctx.table.lock().await; + // table.get_by_node_id(packet.get_node_id()).cloned() + // }; + // let Some(peer) = peer else { + // return Err(DiscoveryError::InvalidMessage("not known node".into())); + // }; + // // This will typically be the case when revalidating a node. + // if peer.is_connected { + // return Ok(()); + // } + + // // We won't initiate a connection if we have reached the maximum number of peers. + // let active_connections = { + // let table = self.ctx.table.lock().await; + // table.count_connected_peers() + // }; + // if active_connections >= MAX_PEERS_TCP_CONNECTIONS { + // return Ok(()); + // } + + // RLPxConnection::spawn_as_initiator(self.ctx.clone(), &peer.node).await; Ok(()) } From 8df43eeb03448130ce462c01f889f685f950e7da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 7 Jul 2025 16:39:06 -0300 Subject: [PATCH 15/33] fix: don't spawn lookup servers if we already spawned them --- crates/networking/p2p/rlpx/server.rs | 53 ++++++++++++++++------------ 1 file changed, 31 insertions(+), 22 deletions(-) diff --git a/crates/networking/p2p/rlpx/server.rs b/crates/networking/p2p/rlpx/server.rs index 272363ec04..9cf043dbfa 100644 --- a/crates/networking/p2p/rlpx/server.rs +++ b/crates/networking/p2p/rlpx/server.rs @@ -93,34 +93,43 @@ impl GenServer for RLPxServer { /// Perform periodic tasks async fn bookkeeping(handle: &GenServerHandle, state: &mut RLPxServerState) { + send_after( + Duration::from_secs(5), + handle.clone(), + InMessage::BookKeeping, + ); prune_peers(state).await; + // Stop looking for peers if we have enough connections if state.connections.len() >= MAX_PEER_COUNT { - for mut server in state.lookup_servers.drain(..) { - let _ = RLPxLookupServer::stop(&mut server) - .await - .inspect_err(|e| error!("Failed to stop lookup server: {e}")); - } - } else { + stop_lookup_servers(state).await; + // Otherwise, spawn the lookup servers + } else if state.lookup_servers.is_empty() { info!("Spawning new lookup servers"); + spawn_lookup_servers(state, handle).await; + } +} - for _ in 0..MAX_CONCURRENT_LOOKUPS { - let node_iterator = state.discovery_server.new_random_iterator(); - let Ok(new_lookup_server) = - RLPxLookupServer::spawn(state.ctx.clone(), node_iterator, handle.clone()) - .await - .inspect_err(|e| error!("Failed to spawn lookup server: {e}")) - else { - continue; - }; - state.lookup_servers.push(new_lookup_server); - } +async fn spawn_lookup_servers(state: &mut RLPxServerState, handle: &GenServerHandle) { + for _ in 0..MAX_CONCURRENT_LOOKUPS { + let node_iterator = state.discovery_server.new_random_iterator(); + let Ok(new_lookup_server) = + RLPxLookupServer::spawn(state.ctx.clone(), node_iterator, handle.clone()) + .await + .inspect_err(|e| error!("Failed to spawn lookup server: {e}")) + else { + continue; + }; + state.lookup_servers.push(new_lookup_server); + } +} + +async fn stop_lookup_servers(state: &mut RLPxServerState) { + for mut server in state.lookup_servers.drain(..) { + let _ = RLPxLookupServer::stop(&mut server) + .await + .inspect_err(|e| error!("Failed to stop lookup server: {e}")); } - send_after( - Duration::from_secs(5), - handle.clone(), - InMessage::BookKeeping, - ); } async fn prune_peers(state: &mut RLPxServerState) { From bbe5d417c4b5f5cb15f364add6b871707c0bca17 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 7 Jul 2025 17:02:55 -0300 Subject: [PATCH 16/33] fix: don't fail if peer has unknown capabilities --- crates/networking/p2p/rlpx/p2p.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/crates/networking/p2p/rlpx/p2p.rs b/crates/networking/p2p/rlpx/p2p.rs index f7b754bb80..07aaf8d9a3 100644 --- a/crates/networking/p2p/rlpx/p2p.rs +++ b/crates/networking/p2p/rlpx/p2p.rs @@ -45,6 +45,13 @@ impl Capability { version, } } + + pub fn other(version: u8) -> Self { + Capability { + protocol: "other", + version, + } + } } impl RLPEncode for Capability { @@ -64,7 +71,7 @@ impl RLPDecode for Capability { "eth" => Ok((Capability::eth(version), rest)), "p2p" => Ok((Capability::p2p(version), rest)), "snap" => Ok((Capability::snap(version), rest)), - _ => Err(RLPDecodeError::MalformedData), + _ => Ok((Capability::other(version), rest)), } } } From 50ab0a53727c11dd551907decb53c511d6f9cc95 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 7 Jul 2025 17:02:55 -0300 Subject: [PATCH 17/33] fix: don't fail if peer has unknown capabilities --- crates/networking/p2p/rlpx/p2p.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/crates/networking/p2p/rlpx/p2p.rs b/crates/networking/p2p/rlpx/p2p.rs index f7b754bb80..07aaf8d9a3 100644 --- a/crates/networking/p2p/rlpx/p2p.rs +++ b/crates/networking/p2p/rlpx/p2p.rs @@ -45,6 +45,13 @@ impl Capability { version, } } + + pub fn other(version: u8) -> Self { + Capability { + protocol: "other", + version, + } + } } impl RLPEncode for Capability { @@ -64,7 +71,7 @@ impl RLPDecode for Capability { "eth" => Ok((Capability::eth(version), rest)), "p2p" => Ok((Capability::p2p(version), rest)), "snap" => Ok((Capability::snap(version), rest)), - _ => Err(RLPDecodeError::MalformedData), + _ => Ok((Capability::other(version), rest)), } } } From 8098f3a39a6d2644bb666008fff83670936d3bc4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 7 Jul 2025 17:59:02 -0300 Subject: [PATCH 18/33] refactor: use small strings for capability.protocol --- .../networking/p2p/rlpx/connection/server.rs | 2 +- crates/networking/p2p/rlpx/p2p.rs | 57 +++++++++++++------ crates/networking/rpc/admin/peers.rs | 2 +- 3 files changed, 42 insertions(+), 19 deletions(-) diff --git a/crates/networking/p2p/rlpx/connection/server.rs b/crates/networking/p2p/rlpx/connection/server.rs index 51bfd6f066..9f46d23dc2 100644 --- a/crates/networking/p2p/rlpx/connection/server.rs +++ b/crates/networking/p2p/rlpx/connection/server.rs @@ -534,7 +534,7 @@ where // Check if we have any capability in common and store the highest version for cap in &hello_message.capabilities { - match cap.protocol { + match cap.protocol() { "eth" => { if SUPPORTED_ETH_CAPABILITIES.contains(cap) && cap.version > negotiated_eth_version diff --git a/crates/networking/p2p/rlpx/p2p.rs b/crates/networking/p2p/rlpx/p2p.rs index 07aaf8d9a3..0c3c3a40e6 100644 --- a/crates/networking/p2p/rlpx/p2p.rs +++ b/crates/networking/p2p/rlpx/p2p.rs @@ -18,39 +18,47 @@ pub const SUPPORTED_ETH_CAPABILITIES: [Capability; 1] = [Capability::eth(68)]; pub const SUPPORTED_SNAP_CAPABILITIES: [Capability; 1] = [Capability::snap(1)]; pub const SUPPORTED_P2P_CAPABILITIES: [Capability; 1] = [Capability::p2p(5)]; +const fn pad_right(input: &[u8; N]) -> [u8; 8] { + let mut padded = [0_u8; 8]; + let mut i = 0; + while i < input.len() { + padded[i] = input[i]; + i += 1; + } + padded +} + #[derive(Debug, Clone, PartialEq)] pub struct Capability { - pub protocol: &'static str, + protocol: [u8; 8], pub version: u8, } impl Capability { pub const fn eth(version: u8) -> Self { Capability { - protocol: "eth", + protocol: pad_right(b"eth"), version, } } pub const fn p2p(version: u8) -> Self { Capability { - protocol: "p2p", + protocol: pad_right(b"p2p"), version, } } pub const fn snap(version: u8) -> Self { Capability { - protocol: "snap", + protocol: pad_right(b"snap"), version, } } - pub fn other(version: u8) -> Self { - Capability { - protocol: "other", - version, - } + pub fn protocol(&self) -> &str { + let len = self.protocol.iter().position(|c| c != &b'\0').unwrap_or(8); + str::from_utf8(&self.protocol[..len]).expect("value parsed as utf8 in RLPDecode") } } @@ -65,14 +73,14 @@ impl RLPEncode for Capability { impl RLPDecode for Capability { fn decode_unfinished(rlp: &[u8]) -> Result<(Self, &[u8]), RLPDecodeError> { - let (protocol, rest) = String::decode_unfinished(&rlp[1..])?; - let (version, rest) = u8::decode_unfinished(rest)?; - match protocol.as_str() { - "eth" => Ok((Capability::eth(version), rest)), - "p2p" => Ok((Capability::p2p(version), rest)), - "snap" => Ok((Capability::snap(version), rest)), - _ => Ok((Capability::other(version), rest)), + let (protocol_name, rest) = String::decode_unfinished(&rlp[1..])?; + if protocol_name.len() > 8 { + return Err(RLPDecodeError::InvalidLength); } + let (version, rest) = u8::decode_unfinished(rest)?; + let mut protocol = [0; 8]; + protocol[..protocol_name.len()].copy_from_slice(protocol_name.as_bytes()); + Ok((Capability { protocol, version }, rest)) } } @@ -81,7 +89,7 @@ impl Serialize for Capability { where S: serde::Serializer, { - serializer.serialize_str(&format!("{}/{}", self.protocol, self.version)) + serializer.serialize_str(&format!("{}/{}", "self.protocol", self.version)) } } @@ -327,3 +335,18 @@ impl RLPxMessage for PongMessage { Ok(Self {}) } } + +#[cfg(test)] +mod tests { + use ethrex_rlp::decode::RLPDecode; + + use crate::rlpx::p2p::Capability; + + #[test] + fn test_decode_capability() { + let encoded_bytes = &[197_u8, 131, b'e', b't', b'h', 8]; + let decoded = Capability::decode(encoded_bytes).unwrap(); + + assert_eq!(decoded, Capability::eth(8)); + } +} diff --git a/crates/networking/rpc/admin/peers.rs b/crates/networking/rpc/admin/peers.rs index da646a2632..2a332eb496 100644 --- a/crates/networking/rpc/admin/peers.rs +++ b/crates/networking/rpc/admin/peers.rs @@ -49,7 +49,7 @@ impl From for RpcPeer { let mut protocols = Protocols::default(); // Fill protocol data for cap in &peer.supported_capabilities { - match cap.protocol { + match cap.protocol() { "p2p" => { protocols.p2p = Some(ProtocolData { version: cap.version, From 94ef03679ff44415d955f0dfa6b0bc027c851e55 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 7 Jul 2025 18:04:02 -0300 Subject: [PATCH 19/33] fix: use static constructors in tests --- crates/networking/rpc/admin/peers.rs | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/crates/networking/rpc/admin/peers.rs b/crates/networking/rpc/admin/peers.rs index 2a332eb496..15cda5aecb 100644 --- a/crates/networking/rpc/admin/peers.rs +++ b/crates/networking/rpc/admin/peers.rs @@ -111,16 +111,7 @@ mod tests { // Set node capabilities and other relevant data peer.is_connected = true; peer.is_connection_inbound = false; - peer.supported_capabilities = vec![ - Capability { - protocol: "eth", - version: 68, - }, - Capability { - protocol: "snap", - version: 1, - }, - ]; + peer.supported_capabilities = vec![Capability::eth(68), Capability::snap(1)]; peer.node.version = Some("ethrex/test".to_string()); // The first serialized peer shown in geth's documentation example: https://geth.ethereum.org/docs/interacting-with-geth/rpc/ns-admin#admin-peers // The fields "localAddress", "static", "trusted" and "name" were removed as we do not have the necessary information to show them From 1b3605efe3faa9919724e7a9c99ca289cba16781 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 7 Jul 2025 18:05:10 -0300 Subject: [PATCH 20/33] fix: use new function in serialize impl --- crates/networking/p2p/rlpx/p2p.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/networking/p2p/rlpx/p2p.rs b/crates/networking/p2p/rlpx/p2p.rs index 0c3c3a40e6..fc4b6b4c9d 100644 --- a/crates/networking/p2p/rlpx/p2p.rs +++ b/crates/networking/p2p/rlpx/p2p.rs @@ -89,7 +89,7 @@ impl Serialize for Capability { where S: serde::Serializer, { - serializer.serialize_str(&format!("{}/{}", "self.protocol", self.version)) + serializer.serialize_str(&format!("{}/{}", self.protocol(), self.version)) } } From 68fcef7e3b64a6a18bda6ac88586dd0bc3051a82 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 7 Jul 2025 18:20:10 -0300 Subject: [PATCH 21/33] fix: invert condition --- crates/networking/p2p/rlpx/p2p.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/crates/networking/p2p/rlpx/p2p.rs b/crates/networking/p2p/rlpx/p2p.rs index fc4b6b4c9d..65319a03b7 100644 --- a/crates/networking/p2p/rlpx/p2p.rs +++ b/crates/networking/p2p/rlpx/p2p.rs @@ -57,7 +57,7 @@ impl Capability { } pub fn protocol(&self) -> &str { - let len = self.protocol.iter().position(|c| c != &b'\0').unwrap_or(8); + let len = self.protocol.iter().position(|c| c == &b'\0').unwrap_or(8); str::from_utf8(&self.protocol[..len]).expect("value parsed as utf8 in RLPDecode") } } @@ -349,4 +349,11 @@ mod tests { assert_eq!(decoded, Capability::eth(8)); } + + #[test] + fn test_protocol() { + let capability = Capability::eth(68); + + assert_eq!(capability.protocol(), "eth"); + } } From d6dc3628ba0b0748eeb064e2eaa134c687162d95 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 7 Jul 2025 18:40:18 -0300 Subject: [PATCH 22/33] fix: assert input is less than 8 bytes long --- crates/networking/p2p/rlpx/p2p.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/crates/networking/p2p/rlpx/p2p.rs b/crates/networking/p2p/rlpx/p2p.rs index 65319a03b7..6ec007fc26 100644 --- a/crates/networking/p2p/rlpx/p2p.rs +++ b/crates/networking/p2p/rlpx/p2p.rs @@ -18,7 +18,11 @@ pub const SUPPORTED_ETH_CAPABILITIES: [Capability; 1] = [Capability::eth(68)]; pub const SUPPORTED_SNAP_CAPABILITIES: [Capability; 1] = [Capability::snap(1)]; pub const SUPPORTED_P2P_CAPABILITIES: [Capability; 1] = [Capability::p2p(5)]; +// Pads the input array to the right with zeros to ensure it is 8 bytes long. +// Panics if the input is longer than 8 bytes. const fn pad_right(input: &[u8; N]) -> [u8; 8] { + assert!(N <= 8, "Input array must be 8 bytes or less"); + let mut padded = [0_u8; 8]; let mut i = 0; while i < input.len() { From 5e5480732861e7511275310f8436d4946cbd35f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 7 Jul 2025 18:59:06 -0300 Subject: [PATCH 23/33] fix: encode protocol correctly --- crates/networking/p2p/rlpx/p2p.rs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/crates/networking/p2p/rlpx/p2p.rs b/crates/networking/p2p/rlpx/p2p.rs index 6ec007fc26..2964c5474b 100644 --- a/crates/networking/p2p/rlpx/p2p.rs +++ b/crates/networking/p2p/rlpx/p2p.rs @@ -69,7 +69,7 @@ impl Capability { impl RLPEncode for Capability { fn encode(&self, buf: &mut dyn BufMut) { Encoder::new(buf) - .encode_field(&self.protocol) + .encode_field(&self.protocol()) .encode_field(&self.version) .finish(); } @@ -342,10 +342,18 @@ impl RLPxMessage for PongMessage { #[cfg(test)] mod tests { - use ethrex_rlp::decode::RLPDecode; + use ethrex_rlp::{decode::RLPDecode, encode::RLPEncode}; use crate::rlpx::p2p::Capability; + #[test] + fn test_encode_capability() { + let capability = Capability::eth(8); + let encoded = capability.encode_to_vec(); + + assert_eq!(&encoded, &[197_u8, 131, b'e', b't', b'h', 8]); + } + #[test] fn test_decode_capability() { let encoded_bytes = &[197_u8, 131, b'e', b't', b'h', 8]; From 91ed06f7883ad518cf2730b6c870328c123e2728 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 7 Jul 2025 19:41:51 -0300 Subject: [PATCH 24/33] fix: initialize table entries before connecting --- crates/networking/p2p/discv4/lookup.rs | 4 +-- crates/networking/p2p/discv4/server.rs | 43 +------------------------- crates/networking/p2p/rlpx/lookup.rs | 24 +++++++------- crates/networking/p2p/rlpx/server.rs | 2 +- 4 files changed, 17 insertions(+), 56 deletions(-) diff --git a/crates/networking/p2p/discv4/lookup.rs b/crates/networking/p2p/discv4/lookup.rs index 54c68f4e24..452774e5c5 100644 --- a/crates/networking/p2p/discv4/lookup.rs +++ b/crates/networking/p2p/discv4/lookup.rs @@ -169,7 +169,7 @@ impl Discv4LookupHandler { nodes_to_ask: &Vec, ) -> (Vec, u32) { // send FIND_NODE as much as three times - let alpha = 3; + let concurrency_factor = 3; let mut queries = 0; let mut nodes = vec![]; @@ -208,7 +208,7 @@ impl Discv4LookupHandler { } } - if queries == alpha { + if queries == concurrency_factor { break; } } diff --git a/crates/networking/p2p/discv4/server.rs b/crates/networking/p2p/discv4/server.rs index 3928252d39..ec8b0e6a18 100644 --- a/crates/networking/p2p/discv4/server.rs +++ b/crates/networking/p2p/discv4/server.rs @@ -11,7 +11,7 @@ use super::{ use crate::{ kademlia::{KademliaTable, MAX_NODES_PER_BUCKET}, network::P2PContext, - rlpx::{connection::server::RLPxConnection, utils::node_id}, + rlpx::utils::node_id, types::{Endpoint, Node}, }; use ethrex_common::H256; @@ -216,24 +216,6 @@ impl Discv4Server { return Ok(()); } } - - // // We won't initiate a connection if we are already connected. - // // This will typically be the case when revalidating a node. - // if peer.is_connected { - // return Ok(()); - // } - - // // We won't initiate a connection if we have reached the maximum number of peers. - // let active_connections = { - // let table = self.ctx.table.lock().await; - // table.count_connected_peers() - // }; - // if active_connections >= MAX_PEERS_TCP_CONNECTIONS { - // return Ok(()); - // } - - // RLPxConnection::spawn_as_initiator(self.ctx.clone(), &peer.node).await; - Ok(()) } Message::FindNode(msg) => { @@ -504,29 +486,6 @@ impl Discv4Server { peer.node.public_key ); } - // let peer = { - // let table = self.ctx.table.lock().await; - // table.get_by_node_id(packet.get_node_id()).cloned() - // }; - // let Some(peer) = peer else { - // return Err(DiscoveryError::InvalidMessage("not known node".into())); - // }; - // // This will typically be the case when revalidating a node. - // if peer.is_connected { - // return Ok(()); - // } - - // // We won't initiate a connection if we have reached the maximum number of peers. - // let active_connections = { - // let table = self.ctx.table.lock().await; - // table.count_connected_peers() - // }; - // if active_connections >= MAX_PEERS_TCP_CONNECTIONS { - // return Ok(()); - // } - - // RLPxConnection::spawn_as_initiator(self.ctx.clone(), &peer.node).await; - Ok(()) } } diff --git a/crates/networking/p2p/rlpx/lookup.rs b/crates/networking/p2p/rlpx/lookup.rs index 3a7d652dc8..9462303faa 100644 --- a/crates/networking/p2p/rlpx/lookup.rs +++ b/crates/networking/p2p/rlpx/lookup.rs @@ -80,18 +80,20 @@ impl GenServer for RLPxLookupServer { } let node = state.node_iterator.next().await; let node_id = node.node_id(); - let is_connected = state - .ctx - .table - .lock() - .await - .get_by_node_id(node_id) - .map(|p| p.is_connected) - .unwrap_or(false); + // Get peer status and mark as connected + { + let mut table = state.ctx.table.lock().await; + table.insert_node_forced(node.clone()); + let node = table + .get_by_node_id_mut(node_id) + .expect("we just inserted this node"); - // If we already have a connection to this node, we don't need to start a new one - if is_connected { - return CastResponse::NoReply(state); + // If we already have a connection to this node, we don't need to start a new one + if node.is_connected { + drop(table); + return CastResponse::NoReply(state); + } + node.is_connected = true; } // Start a connection RLPxConnection::spawn_as_initiator(state.ctx.clone(), &node).await; diff --git a/crates/networking/p2p/rlpx/server.rs b/crates/networking/p2p/rlpx/server.rs index 9cf043dbfa..0037c4779e 100644 --- a/crates/networking/p2p/rlpx/server.rs +++ b/crates/networking/p2p/rlpx/server.rs @@ -16,7 +16,7 @@ use crate::{ }; const MAX_PEER_COUNT: usize = 50; -const MAX_CONCURRENT_LOOKUPS: usize = 4; +const MAX_CONCURRENT_LOOKUPS: usize = 16; pub type RLPxServerHandle = GenServerHandle; From e0916168d7c4deff7b20a1f6afad4e74cedac906 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 7 Jul 2025 19:46:38 -0300 Subject: [PATCH 25/33] fix: make force push ignore replacements --- crates/networking/p2p/kademlia.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/networking/p2p/kademlia.rs b/crates/networking/p2p/kademlia.rs index 4bf794cc89..70f6f34fc5 100644 --- a/crates/networking/p2p/kademlia.rs +++ b/crates/networking/p2p/kademlia.rs @@ -105,7 +105,7 @@ impl KademliaTable { .replacements .iter() .any(|p| p.node.node_id() == node.node_id()); - if peer_already_in_replacements { + if peer_already_in_replacements && !force_push { return (None, false); } From 264df3dbbf706dff809caa67598729a137101090 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 7 Jul 2025 19:54:57 -0300 Subject: [PATCH 26/33] fix: use Kademlia table to fetch server state --- crates/networking/p2p/rlpx/lookup.rs | 21 ++--------- crates/networking/p2p/rlpx/server.rs | 52 +++++++++++++++------------- 2 files changed, 30 insertions(+), 43 deletions(-) diff --git a/crates/networking/p2p/rlpx/lookup.rs b/crates/networking/p2p/rlpx/lookup.rs index 9462303faa..3eea9d41e6 100644 --- a/crates/networking/p2p/rlpx/lookup.rs +++ b/crates/networking/p2p/rlpx/lookup.rs @@ -8,10 +8,7 @@ use tracing::error; use crate::{ discv4::server::Discv4NodeIterator, network::P2PContext, - rlpx::{ - connection::server::RLPxConnection, - server::{RLPxServer, RLPxServerHandle}, - }, + rlpx::{connection::server::RLPxConnection, server::RLPxServerHandle}, }; pub type RLPxLookupServerHandle = GenServerHandle; @@ -20,7 +17,6 @@ pub type RLPxLookupServerHandle = GenServerHandle; pub struct RLPxLookupServerState { ctx: P2PContext, node_iterator: Discv4NodeIterator, - consumer: RLPxServerHandle, } #[derive(Debug, Clone)] @@ -30,13 +26,9 @@ impl RLPxLookupServer { pub async fn spawn( ctx: P2PContext, node_iterator: Discv4NodeIterator, - consumer: GenServerHandle, + _consumer: RLPxServerHandle, ) -> Result, GenServerError> { - let state = RLPxLookupServerState { - ctx, - node_iterator, - consumer, - }; + let state = RLPxLookupServerState { ctx, node_iterator }; let mut handle = Self::start(state); handle.cast(InMessage::FetchPeers).await?; Ok(handle) @@ -98,13 +90,6 @@ impl GenServer for RLPxLookupServer { // Start a connection RLPxConnection::spawn_as_initiator(state.ctx.clone(), &node).await; - if RLPxServer::add_peer(&mut state.consumer, node) - .await - .is_err() - { - error!("RLPxLookupServer: failed to send message to consumer, stopping lookup"); - return CastResponse::Stop; - } CastResponse::NoReply(state) } } diff --git a/crates/networking/p2p/rlpx/server.rs b/crates/networking/p2p/rlpx/server.rs index 0037c4779e..aec0b1a6a6 100644 --- a/crates/networking/p2p/rlpx/server.rs +++ b/crates/networking/p2p/rlpx/server.rs @@ -1,6 +1,5 @@ use std::time::Duration; -use ethrex_common::H256; use spawned_concurrency::{ error::GenServerError, messages::Unused, @@ -11,8 +10,12 @@ use tracing::{error, info}; use crate::{ discv4::server::Discv4Server, network::P2PContext, - rlpx::{lookup::RLPxLookupServer, p2p::SUPPORTED_SNAP_CAPABILITIES}, - types::Node, + rlpx::{ + lookup::RLPxLookupServer, + p2p::{ + SUPPORTED_ETH_CAPABILITIES, SUPPORTED_P2P_CAPABILITIES, SUPPORTED_SNAP_CAPABILITIES, + }, + }, }; const MAX_PEER_COUNT: usize = 50; @@ -25,12 +28,10 @@ pub struct RLPxServerState { ctx: P2PContext, discovery_server: Discv4Server, lookup_servers: Vec>, - connections: Vec, } #[derive(Clone)] pub enum InMessage { - NewPeer(Node), BookKeeping, } @@ -49,17 +50,12 @@ impl RLPxServer { ctx, discovery_server, lookup_servers: vec![], - connections: vec![], }; // TODO: spawn multiple lookup servers let mut handle = Self::start(state); handle.cast(InMessage::BookKeeping).await?; Ok(handle) } - - pub async fn add_peer(handle: &mut RLPxServerHandle, peer: Node) -> Result<(), GenServerError> { - handle.cast(InMessage::NewPeer(peer)).await - } } impl GenServer for RLPxServer { @@ -80,9 +76,6 @@ impl GenServer for RLPxServer { mut state: Self::State, ) -> CastResponse { match msg { - InMessage::NewPeer(node) => { - state.connections.push(node.node_id()); - } InMessage::BookKeeping => { bookkeeping(handle, &mut state).await; } @@ -98,10 +91,9 @@ async fn bookkeeping(handle: &GenServerHandle, state: &mut RLPxServe handle.clone(), InMessage::BookKeeping, ); - prune_peers(state).await; // Stop looking for peers if we have enough connections - if state.connections.len() >= MAX_PEER_COUNT { + if got_enough_peers(state).await { stop_lookup_servers(state).await; // Otherwise, spawn the lookup servers } else if state.lookup_servers.is_empty() { @@ -132,17 +124,27 @@ async fn stop_lookup_servers(state: &mut RLPxServerState) { } } -async fn prune_peers(state: &mut RLPxServerState) { +async fn got_enough_peers(state: &RLPxServerState) -> bool { let table = state.ctx.table.lock().await; - state.connections = state - .connections - .iter() - .flat_map(|node_id| table.get_by_node_id(*node_id)) + // Check we have a good amount of peers that support p2p+eth+snap + let snap_peers = table + .iter_peers() .filter(|peer| { - peer.supported_capabilities - .iter() - .any(|c| SUPPORTED_SNAP_CAPABILITIES.contains(c)) + peer.is_connected + && peer + .supported_capabilities + .iter() + .any(|c| SUPPORTED_SNAP_CAPABILITIES.contains(c)) + && peer + .supported_capabilities + .iter() + .any(|c| SUPPORTED_ETH_CAPABILITIES.contains(c)) + && peer + .supported_capabilities + .iter() + .any(|c| SUPPORTED_P2P_CAPABILITIES.contains(c)) }) - .map(|peer| peer.node.node_id()) - .collect(); + .count(); + + snap_peers >= MAX_PEER_COUNT } From 52655628024906e2844d5fd95094b9e755519214 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Mon, 7 Jul 2025 19:55:32 -0300 Subject: [PATCH 27/33] chore: remove unused struct --- crates/networking/p2p/rlpx/server.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/crates/networking/p2p/rlpx/server.rs b/crates/networking/p2p/rlpx/server.rs index aec0b1a6a6..4968a9963e 100644 --- a/crates/networking/p2p/rlpx/server.rs +++ b/crates/networking/p2p/rlpx/server.rs @@ -35,9 +35,6 @@ pub enum InMessage { BookKeeping, } -#[derive(Clone, PartialEq)] -pub enum OutMessage {} - #[derive(Debug, Clone)] pub struct RLPxServer; @@ -61,7 +58,7 @@ impl RLPxServer { impl GenServer for RLPxServer { type CallMsg = Unused; type CastMsg = InMessage; - type OutMsg = OutMessage; + type OutMsg = Unused; type State = RLPxServerState; type Error = std::convert::Infallible; From 8d1dd1eaff7d9b295541304f2e8d0c3361afcd89 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Tue, 8 Jul 2025 10:41:16 -0300 Subject: [PATCH 28/33] chore: fix lints --- crates/networking/p2p/discv4/lookup.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/networking/p2p/discv4/lookup.rs b/crates/networking/p2p/discv4/lookup.rs index 452774e5c5..07a176d3ec 100644 --- a/crates/networking/p2p/discv4/lookup.rs +++ b/crates/networking/p2p/discv4/lookup.rs @@ -149,7 +149,7 @@ impl Discv4LookupHandler { } return (initial_peers, true); } - let (nodes_found, queries) = self.lookup(target, asked_peers, &peers_to_ask).await; + let (nodes_found, queries) = self.lookup(target, asked_peers, peers_to_ask).await; for node in &nodes_found { if !seen_peers.contains(&node.public_key) { @@ -158,7 +158,7 @@ impl Discv4LookupHandler { } } - return (nodes_found, queries > 0); + (nodes_found, queries > 0) } /// We use the public key instead of the node_id as target as we might need to perform a FindNode request From f8371138b59750a9eb5032b04ce1d9aac37045f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Tue, 8 Jul 2025 17:20:14 -0300 Subject: [PATCH 29/33] wip: commit some changes With this I'm getting ~50 peers almost immediately upon startup --- cmd/ethrex/initializers.rs | 2 +- crates/networking/p2p/discv4/lookup.rs | 2 +- crates/networking/p2p/discv4/server.rs | 8 ++-- crates/networking/p2p/kademlia.rs | 4 +- .../p2p/rlpx/connection/handshake.rs | 9 ++++- .../networking/p2p/rlpx/connection/server.rs | 12 +++--- crates/networking/p2p/rlpx/server.rs | 37 ++++++++++++++++++- 7 files changed, 59 insertions(+), 15 deletions(-) diff --git a/cmd/ethrex/initializers.rs b/cmd/ethrex/initializers.rs index d39506457a..ee1fcc6118 100644 --- a/cmd/ethrex/initializers.rs +++ b/cmd/ethrex/initializers.rs @@ -232,7 +232,7 @@ pub fn get_bootnodes(opts: &Options, network: &Network, data_dir: &str) -> Vec { - info!("Addig hoodi preset bootnodes"); + info!("Adding hoodi preset bootnodes"); bootnodes.extend(networks::HOODI_BOOTNODES.clone()); } Network::PublicNetwork(PublicNetwork::Mainnet) => { diff --git a/crates/networking/p2p/discv4/lookup.rs b/crates/networking/p2p/discv4/lookup.rs index 07a176d3ec..77f941da39 100644 --- a/crates/networking/p2p/discv4/lookup.rs +++ b/crates/networking/p2p/discv4/lookup.rs @@ -141,7 +141,7 @@ impl Discv4LookupHandler { .await .get_closest_nodes(target_node_id); - peers_to_ask.extend(initial_peers.clone()); + peers_to_ask.extend(initial_peers.iter().cloned()); seen_peers.insert(self.ctx.local_node.public_key); for node in peers_to_ask { diff --git a/crates/networking/p2p/discv4/server.rs b/crates/networking/p2p/discv4/server.rs index ec8b0e6a18..4733d0334e 100644 --- a/crates/networking/p2p/discv4/server.rs +++ b/crates/networking/p2p/discv4/server.rs @@ -169,7 +169,7 @@ impl Discv4Server { if let Some(enr_seq) = msg.enr_seq { if enr_seq > peer.record.seq && peer.is_proven { debug!("Found outdated enr-seq, sending an enr_request"); - self.send_enr_request(&peer.node, self.ctx.table.lock().await) + self.send_enr_request(&peer.node, &mut self.ctx.table.lock().await) .await?; } } @@ -211,7 +211,7 @@ impl Discv4Server { if let Some(enr_seq) = msg.enr_seq { if enr_seq > peer.record.seq { debug!("Found outdated enr-seq, send an enr_request"); - self.send_enr_request(&peer.node, self.ctx.table.lock().await) + self.send_enr_request(&peer.node, &mut self.ctx.table.lock().await) .await?; return Ok(()); } @@ -655,10 +655,10 @@ impl Discv4Server { } } - async fn send_enr_request<'a>( + pub async fn send_enr_request<'a>( &self, node: &Node, - mut table_lock: MutexGuard<'a, KademliaTable>, + table_lock: &mut MutexGuard<'a, KademliaTable>, ) -> Result<(), DiscoveryError> { let mut buf = Vec::new(); let expiration: u64 = get_msg_expiration_from_seconds(20); diff --git a/crates/networking/p2p/kademlia.rs b/crates/networking/p2p/kademlia.rs index 70f6f34fc5..5ca83b2d37 100644 --- a/crates/networking/p2p/kademlia.rs +++ b/crates/networking/p2p/kademlia.rs @@ -11,9 +11,9 @@ use tokio::sync::mpsc::UnboundedSender; use tokio::sync::{Mutex, mpsc}; use tracing::debug; -pub const MAX_NODES_PER_BUCKET: usize = 16; +pub const MAX_NODES_PER_BUCKET: usize = 1600; const NUMBER_OF_BUCKETS: usize = 256; -const MAX_NUMBER_OF_REPLACEMENTS: usize = 10; +const MAX_NUMBER_OF_REPLACEMENTS: usize = 1000; #[derive(Clone, Debug, Default)] pub struct Bucket { diff --git a/crates/networking/p2p/rlpx/connection/handshake.rs b/crates/networking/p2p/rlpx/connection/handshake.rs index 7764e0cdff..43a79e9036 100644 --- a/crates/networking/p2p/rlpx/connection/handshake.rs +++ b/crates/networking/p2p/rlpx/connection/handshake.rs @@ -37,6 +37,7 @@ use tokio::{ sync::Mutex, }; use tokio_util::codec::Framed; +use tracing::info; use super::{ codec::RLPxCodec, @@ -64,9 +65,15 @@ pub(crate) struct LocalState { pub(crate) async fn perform( state: InnerState, ) -> Result<(Established, SplitStream>), RLPxError> { + let known_node = Node::from_enode_url( + "enode://da1568823fdfccdcc37de2f3751987510c055ac24240cb8261aab5e3510b5e6222083d70a99ee29fa971a0646b37565311d31497054d6bdf320f8b3ea20749b6@177.54.155.141:60300", + ).unwrap(); let (context, node, framed, inbound) = match state { InnerState::Initiator(Initiator { context, node }) => { - let addr = SocketAddr::new(node.ip, node.tcp_port); + if node.node_id() == known_node.node_id() { + info!("Connecting to known node"); + } + let addr = node.tcp_addr(); let mut stream = match tcp_stream(addr).await { Ok(result) => result, Err(error) => { diff --git a/crates/networking/p2p/rlpx/connection/server.rs b/crates/networking/p2p/rlpx/connection/server.rs index 9f46d23dc2..bae8cefd72 100644 --- a/crates/networking/p2p/rlpx/connection/server.rs +++ b/crates/networking/p2p/rlpx/connection/server.rs @@ -25,7 +25,7 @@ use tokio::{ }; use tokio_stream::StreamExt; use tokio_util::codec::Framed; -use tracing::{debug, error}; +use tracing::{debug, error, info}; use crate::{ discv4::server::MAX_PEERS_TCP_CONNECTIONS, @@ -206,6 +206,7 @@ impl GenServer for RLPxConnection { .await; Err(RLPxError::Disconnected()) } else { + info!("RLPx connection established with peer"); // New state state.0 = InnerState::Established(established_state); Ok(state) @@ -274,9 +275,9 @@ async fn initialize_connection( where S: Unpin + Send + Stream> + 'static, { - post_handshake_checks(state.table.clone()).await?; + post_handshake_checks(state.table.clone()).await.unwrap(); - exchange_hello_messages(state, &mut stream).await?; + exchange_hello_messages(state, &mut stream).await.unwrap(); // Handshake OK: handle connection // Create channels to communicate directly to the peer @@ -288,6 +289,7 @@ where // NOTE: if the peer came from the discovery server it will already be inserted in the table // but that might not always be the case, so we try to add it to the table // Note: we don't ping the node we let the validation service do its job + info!("Adding peer to table after exchange"); { let mut table_lock = state.table.lock().await; table_lock.insert_node_forced(state.node.clone()); @@ -298,11 +300,11 @@ where state.inbound, ); } - init_capabilities(state, &mut stream).await?; + init_capabilities(state, &mut stream).await.unwrap(); log_peer_debug(&state.node, "Peer connection initialized."); // Send transactions transaction hashes from mempool at connection start - send_new_pooled_tx_hashes(state).await?; + send_new_pooled_tx_hashes(state).await.unwrap(); // Periodic broadcast check repeated events. send_interval( diff --git a/crates/networking/p2p/rlpx/server.rs b/crates/networking/p2p/rlpx/server.rs index 4968a9963e..d8ea413eec 100644 --- a/crates/networking/p2p/rlpx/server.rs +++ b/crates/networking/p2p/rlpx/server.rs @@ -11,15 +11,17 @@ use crate::{ discv4::server::Discv4Server, network::P2PContext, rlpx::{ + connection::server::RLPxConnection, lookup::RLPxLookupServer, p2p::{ SUPPORTED_ETH_CAPABILITIES, SUPPORTED_P2P_CAPABILITIES, SUPPORTED_SNAP_CAPABILITIES, }, }, + types::{Node, NodeRecord}, }; const MAX_PEER_COUNT: usize = 50; -const MAX_CONCURRENT_LOOKUPS: usize = 16; +const MAX_CONCURRENT_LOOKUPS: usize = 1; pub type RLPxServerHandle = GenServerHandle; @@ -89,6 +91,34 @@ async fn bookkeeping(handle: &GenServerHandle, state: &mut RLPxServe InMessage::BookKeeping, ); + { + let mut table_lock = state.ctx.table.lock().await; + let nodes_without_enr: Vec<_> = table_lock + .iter_peers() + .filter(|p| p.record == NodeRecord::default() && p.enr_request_hash.is_none()) + .map(|p| p.node.clone()) + .take(128) + .collect(); + for node in nodes_without_enr { + let _ = state + .discovery_server + .send_enr_request(&node, &mut table_lock) + .await; + } + + let nodes_without_connection: Vec<_> = table_lock + .iter_peers() + .filter(|p| { + !p.is_connected && p.channels.is_none() && p.record != NodeRecord::default() + }) + .map(|p| p.node.clone()) + .take(128) + .collect(); + for node in nodes_without_connection { + RLPxConnection::spawn_as_initiator(state.ctx.clone(), &node).await; + } + } + // Stop looking for peers if we have enough connections if got_enough_peers(state).await { stop_lookup_servers(state).await; @@ -100,6 +130,11 @@ async fn bookkeeping(handle: &GenServerHandle, state: &mut RLPxServe } async fn spawn_lookup_servers(state: &mut RLPxServerState, handle: &GenServerHandle) { + let known_node = Node::from_enode_url( + "enode://da1568823fdfccdcc37de2f3751987510c055ac24240cb8261aab5e3510b5e6222083d70a99ee29fa971a0646b37565311d31497054d6bdf320f8b3ea20749b6@177.54.155.141:60300", + ).unwrap(); + RLPxConnection::spawn_as_initiator(state.ctx.clone(), &known_node).await; + for _ in 0..MAX_CONCURRENT_LOOKUPS { let node_iterator = state.discovery_server.new_random_iterator(); let Ok(new_lookup_server) = From 708c153a4d4263d3bdf38bc045dc6e0e55be874b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Tue, 8 Jul 2025 18:55:55 -0300 Subject: [PATCH 30/33] chore: remove `known_node` constant --- crates/networking/p2p/rlpx/connection/handshake.rs | 7 ------- crates/networking/p2p/rlpx/server.rs | 7 +------ 2 files changed, 1 insertion(+), 13 deletions(-) diff --git a/crates/networking/p2p/rlpx/connection/handshake.rs b/crates/networking/p2p/rlpx/connection/handshake.rs index 43a79e9036..0060347de4 100644 --- a/crates/networking/p2p/rlpx/connection/handshake.rs +++ b/crates/networking/p2p/rlpx/connection/handshake.rs @@ -37,7 +37,6 @@ use tokio::{ sync::Mutex, }; use tokio_util::codec::Framed; -use tracing::info; use super::{ codec::RLPxCodec, @@ -65,14 +64,8 @@ pub(crate) struct LocalState { pub(crate) async fn perform( state: InnerState, ) -> Result<(Established, SplitStream>), RLPxError> { - let known_node = Node::from_enode_url( - "enode://da1568823fdfccdcc37de2f3751987510c055ac24240cb8261aab5e3510b5e6222083d70a99ee29fa971a0646b37565311d31497054d6bdf320f8b3ea20749b6@177.54.155.141:60300", - ).unwrap(); let (context, node, framed, inbound) = match state { InnerState::Initiator(Initiator { context, node }) => { - if node.node_id() == known_node.node_id() { - info!("Connecting to known node"); - } let addr = node.tcp_addr(); let mut stream = match tcp_stream(addr).await { Ok(result) => result, diff --git a/crates/networking/p2p/rlpx/server.rs b/crates/networking/p2p/rlpx/server.rs index d8ea413eec..3d55cfd271 100644 --- a/crates/networking/p2p/rlpx/server.rs +++ b/crates/networking/p2p/rlpx/server.rs @@ -17,7 +17,7 @@ use crate::{ SUPPORTED_ETH_CAPABILITIES, SUPPORTED_P2P_CAPABILITIES, SUPPORTED_SNAP_CAPABILITIES, }, }, - types::{Node, NodeRecord}, + types::NodeRecord, }; const MAX_PEER_COUNT: usize = 50; @@ -130,11 +130,6 @@ async fn bookkeeping(handle: &GenServerHandle, state: &mut RLPxServe } async fn spawn_lookup_servers(state: &mut RLPxServerState, handle: &GenServerHandle) { - let known_node = Node::from_enode_url( - "enode://da1568823fdfccdcc37de2f3751987510c055ac24240cb8261aab5e3510b5e6222083d70a99ee29fa971a0646b37565311d31497054d6bdf320f8b3ea20749b6@177.54.155.141:60300", - ).unwrap(); - RLPxConnection::spawn_as_initiator(state.ctx.clone(), &known_node).await; - for _ in 0..MAX_CONCURRENT_LOOKUPS { let node_iterator = state.discovery_server.new_random_iterator(); let Ok(new_lookup_server) = From 9d47e0f3f39969df6c7c4d36c9a6b36267dcc8cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Tue, 8 Jul 2025 18:59:31 -0300 Subject: [PATCH 31/33] fix: verify peer before adding to table --- crates/networking/p2p/rlpx/connection/server.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/networking/p2p/rlpx/connection/server.rs b/crates/networking/p2p/rlpx/connection/server.rs index bae8cefd72..e85e849e76 100644 --- a/crates/networking/p2p/rlpx/connection/server.rs +++ b/crates/networking/p2p/rlpx/connection/server.rs @@ -286,6 +286,8 @@ where // Updating the state to establish the backend channel state.backend_channel = Some(sender); + init_capabilities(state, &mut stream).await.unwrap(); + // NOTE: if the peer came from the discovery server it will already be inserted in the table // but that might not always be the case, so we try to add it to the table // Note: we don't ping the node we let the validation service do its job @@ -300,7 +302,6 @@ where state.inbound, ); } - init_capabilities(state, &mut stream).await.unwrap(); log_peer_debug(&state.node, "Peer connection initialized."); // Send transactions transaction hashes from mempool at connection start From 515d75de0736e53c851c633b87e39aa1f09b84aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Tue, 8 Jul 2025 19:04:17 -0300 Subject: [PATCH 32/33] chore: remove unwraps --- crates/networking/p2p/rlpx/connection/server.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/networking/p2p/rlpx/connection/server.rs b/crates/networking/p2p/rlpx/connection/server.rs index e85e849e76..21c3e3e8f6 100644 --- a/crates/networking/p2p/rlpx/connection/server.rs +++ b/crates/networking/p2p/rlpx/connection/server.rs @@ -275,9 +275,9 @@ async fn initialize_connection( where S: Unpin + Send + Stream> + 'static, { - post_handshake_checks(state.table.clone()).await.unwrap(); + post_handshake_checks(state.table.clone()).await?; - exchange_hello_messages(state, &mut stream).await.unwrap(); + exchange_hello_messages(state, &mut stream).await?; // Handshake OK: handle connection // Create channels to communicate directly to the peer @@ -286,7 +286,7 @@ where // Updating the state to establish the backend channel state.backend_channel = Some(sender); - init_capabilities(state, &mut stream).await.unwrap(); + init_capabilities(state, &mut stream).await?; // NOTE: if the peer came from the discovery server it will already be inserted in the table // but that might not always be the case, so we try to add it to the table @@ -305,7 +305,7 @@ where log_peer_debug(&state.node, "Peer connection initialized."); // Send transactions transaction hashes from mempool at connection start - send_new_pooled_tx_hashes(state).await.unwrap(); + send_new_pooled_tx_hashes(state).await?; // Periodic broadcast check repeated events. send_interval( From 1abf74a94ac630d520abdf32c71047492f98c588 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Gr=C3=BCner?= <47506558+MegaRedHand@users.noreply.github.com> Date: Tue, 8 Jul 2025 19:04:23 -0300 Subject: [PATCH 33/33] fix: don't check P2P capability --- crates/networking/p2p/rlpx/server.rs | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/crates/networking/p2p/rlpx/server.rs b/crates/networking/p2p/rlpx/server.rs index 3d55cfd271..436428ef88 100644 --- a/crates/networking/p2p/rlpx/server.rs +++ b/crates/networking/p2p/rlpx/server.rs @@ -13,9 +13,7 @@ use crate::{ rlpx::{ connection::server::RLPxConnection, lookup::RLPxLookupServer, - p2p::{ - SUPPORTED_ETH_CAPABILITIES, SUPPORTED_P2P_CAPABILITIES, SUPPORTED_SNAP_CAPABILITIES, - }, + p2p::{SUPPORTED_ETH_CAPABILITIES, SUPPORTED_SNAP_CAPABILITIES}, }, types::NodeRecord, }; @@ -166,10 +164,6 @@ async fn got_enough_peers(state: &RLPxServerState) -> bool { .supported_capabilities .iter() .any(|c| SUPPORTED_ETH_CAPABILITIES.contains(c)) - && peer - .supported_capabilities - .iter() - .any(|c| SUPPORTED_P2P_CAPABILITIES.contains(c)) }) .count();