diff --git a/cmd/ethrex/ethrex.rs b/cmd/ethrex/ethrex.rs index a7ad4e0bda..be19cadbd9 100644 --- a/cmd/ethrex/ethrex.rs +++ b/cmd/ethrex/ethrex.rs @@ -98,18 +98,6 @@ async fn main() -> eyre::Result<()> { let cancel_token = tokio_util::sync::CancellationToken::new(); - init_rpc_api( - &opts, - peer_table.clone(), - local_p2p_node.clone(), - local_node_record.lock().await.clone(), - store.clone(), - blockchain.clone(), - cancel_token.clone(), - tracker.clone(), - ) - .await; - if opts.metrics_enabled { init_metrics(&opts, tracker.clone()); } @@ -127,7 +115,7 @@ async fn main() -> eyre::Result<()> { &opts, &network, &data_dir, - local_p2p_node, + local_p2p_node.clone(), local_node_record.clone(), signer, peer_table.clone(), @@ -142,6 +130,18 @@ async fn main() -> eyre::Result<()> { } } + init_rpc_api( + &opts, + peer_table.clone(), + local_p2p_node, + local_node_record.lock().await.clone(), + store.clone(), + blockchain.clone(), + cancel_token.clone(), + tracker.clone(), + ) + .await; + let mut signal_terminate = signal(SignalKind::terminate())?; tokio::select! { diff --git a/cmd/ethrex/initializers.rs b/cmd/ethrex/initializers.rs index d39506457a..ee1fcc6118 100644 --- a/cmd/ethrex/initializers.rs +++ b/cmd/ethrex/initializers.rs @@ -232,7 +232,7 @@ pub fn get_bootnodes(opts: &Options, network: &Network, data_dir: &str) -> Vec { - info!("Addig hoodi preset bootnodes"); + info!("Adding hoodi preset bootnodes"); bootnodes.extend(networks::HOODI_BOOTNODES.clone()); } Network::PublicNetwork(PublicNetwork::Mainnet) => { diff --git a/crates/networking/p2p/discv4/lookup.rs b/crates/networking/p2p/discv4/lookup.rs index 9e695c3517..77f941da39 100644 --- a/crates/networking/p2p/discv4/lookup.rs +++ b/crates/networking/p2p/discv4/lookup.rs @@ -20,16 +20,11 @@ use tracing::debug; pub struct Discv4LookupHandler { ctx: P2PContext, udp_socket: Arc, - interval_minutes: u64, } impl Discv4LookupHandler { - pub fn new(ctx: P2PContext, udp_socket: Arc, interval_minutes: u64) -> Self { - Self { - ctx, - udp_socket, - interval_minutes, - } + pub fn new(ctx: P2PContext, udp_socket: Arc) -> Self { + Self { ctx, udp_socket } } /// Starts a tokio scheduler that: @@ -52,19 +47,19 @@ impl Discv4LookupHandler { /// doesn't have any node to ask. /// /// See more https://github.com/ethereum/devp2p/blob/master/discv4.md#recursive-lookup - pub fn start(&self, initial_interval_wait_seconds: u64) { + pub fn start(&self, interval_minutes: u64, initial_interval_wait_seconds: u64) { self.ctx.tracker.spawn({ let self_clone = self.clone(); async move { self_clone - .start_lookup_loop(initial_interval_wait_seconds) + .start_lookup_loop(interval_minutes, initial_interval_wait_seconds) .await; } }); } - async fn start_lookup_loop(&self, initial_interval_wait_seconds: u64) { - let mut interval = tokio::time::interval(Duration::from_secs(self.interval_minutes)); + async fn start_lookup_loop(&self, interval_minutes: u64, initial_interval_wait_seconds: u64) { + let mut interval = tokio::time::interval(Duration::from_secs(interval_minutes * 60)); tokio::time::sleep(Duration::from_secs(initial_interval_wait_seconds)).await; loop { @@ -104,48 +99,77 @@ impl Discv4LookupHandler { async fn recursive_lookup(&self, target: H512) { // lookups start with the closest nodes to the target from our table let target_node_id = node_id(&target); - let mut peers_to_ask: Vec = self - .ctx - .table - .lock() - .await - .get_closest_nodes(target_node_id); + + let mut peers_to_ask = vec![]; // stores the peers in peers_to_ask + the peers that were in peers_to_ask but were replaced by closer targets - let mut seen_peers: HashSet = HashSet::default(); + let mut seen_peers = HashSet::default(); let mut asked_peers = HashSet::default(); - seen_peers.insert(self.ctx.local_node.public_key); - for node in &peers_to_ask { - seen_peers.insert(node.public_key); + loop { + let (_, should_continue) = self + .recursive_lookup_step( + target, + target_node_id, + &mut peers_to_ask, + &mut seen_peers, + &mut asked_peers, + ) + .await; + // the lookup finishes when there are no more queries to do + // that happens when we have asked all the peers + if !should_continue { + break; + } } + } - loop { - let (nodes_found, queries) = self.lookup(target, &mut asked_peers, &peers_to_ask).await; + /// Performs a single step of the recursive lookup. + /// Returns `true` if we found new peers, `false` otherwise. + async fn recursive_lookup_step( + &self, + target: H512, + target_node_id: H256, + peers_to_ask: &mut Vec, + seen_peers: &mut HashSet, + asked_peers: &mut HashSet, + ) -> (Vec, bool) { + if peers_to_ask.is_empty() { + let initial_peers = self + .ctx + .table + .lock() + .await + .get_closest_nodes(target_node_id); - for node in nodes_found { - if !seen_peers.contains(&node.public_key) { - seen_peers.insert(node.public_key); - self.peers_to_ask_push(&mut peers_to_ask, target_node_id, node); - } + peers_to_ask.extend(initial_peers.iter().cloned()); + + seen_peers.insert(self.ctx.local_node.public_key); + for node in peers_to_ask { + seen_peers.insert(node.public_key); } + return (initial_peers, true); + } + let (nodes_found, queries) = self.lookup(target, asked_peers, peers_to_ask).await; - // the lookup finishes when there are no more queries to do - // that happens when we have asked all the peers - if queries == 0 { - break; + for node in &nodes_found { + if !seen_peers.contains(&node.public_key) { + seen_peers.insert(node.public_key); + self.peers_to_ask_push(peers_to_ask, target_node_id, node.clone()); } } + + (nodes_found, queries > 0) } /// We use the public key instead of the node_id as target as we might need to perform a FindNode request - async fn lookup( + pub async fn lookup( &self, target: H512, asked_peers: &mut HashSet, nodes_to_ask: &Vec, ) -> (Vec, u32) { // send FIND_NODE as much as three times - let alpha = 3; + let concurrency_factor = 3; let mut queries = 0; let mut nodes = vec![]; @@ -184,7 +208,7 @@ impl Discv4LookupHandler { } } - if queries == alpha { + if queries == concurrency_factor { break; } } @@ -263,6 +287,67 @@ impl Discv4LookupHandler { } } +#[derive(Debug, Clone)] +pub struct Discv4NodeIterator { + lookup_handler: Discv4LookupHandler, + target: H512, + target_node_id: H256, + peers_to_ask: Vec, + seen_peers: HashSet, + asked_peers: HashSet, + buffer: Vec, +} + +impl Discv4NodeIterator { + pub fn new(ctx: P2PContext, udp_socket: Arc) -> Self { + let random_signing_key = SigningKey::random(&mut OsRng); + let target = public_key_from_signing_key(&random_signing_key); + let target_node_id = node_id(&target); + + let lookup_handler = Discv4LookupHandler::new(ctx, udp_socket); + + Discv4NodeIterator { + lookup_handler, + target, + target_node_id, + peers_to_ask: vec![], + seen_peers: Default::default(), + asked_peers: Default::default(), + buffer: vec![], + } + } + + pub async fn next(&mut self) -> Node { + if let Some(node) = self.buffer.pop() { + return node; + } + loop { + let (found_nodes, should_continue) = self + .lookup_handler + .recursive_lookup_step( + self.target, + self.target_node_id, + &mut self.peers_to_ask, + &mut self.seen_peers, + &mut self.asked_peers, + ) + .await; + + self.buffer.extend(found_nodes); + + if !should_continue { + self.peers_to_ask.clear(); + self.seen_peers.clear(); + self.asked_peers.clear(); + } + + if let Some(node) = self.buffer.pop() { + return node; + } + } + } +} + #[cfg(test)] mod tests { use tokio::time::sleep; @@ -277,11 +362,7 @@ mod tests { }; fn lookup_handler_from_server(server: Discv4Server) -> Discv4LookupHandler { - Discv4LookupHandler::new( - server.ctx.clone(), - server.udp_socket.clone(), - server.lookup_interval_minutes, - ) + Discv4LookupHandler::new(server.ctx.clone(), server.udp_socket.clone()) } #[tokio::test] diff --git a/crates/networking/p2p/discv4/server.rs b/crates/networking/p2p/discv4/server.rs index 44d224c99a..4733d0334e 100644 --- a/crates/networking/p2p/discv4/server.rs +++ b/crates/networking/p2p/discv4/server.rs @@ -11,7 +11,7 @@ use super::{ use crate::{ kademlia::{KademliaTable, MAX_NODES_PER_BUCKET}, network::P2PContext, - rlpx::{connection::server::RLPxConnection, utils::node_id}, + rlpx::utils::node_id, types::{Endpoint, Node}, }; use ethrex_common::H256; @@ -25,6 +25,8 @@ use std::{ use tokio::{net::UdpSocket, sync::MutexGuard}; use tracing::{debug, error}; +pub use crate::discv4::lookup::Discv4NodeIterator; + const MAX_DISC_PACKET_SIZE: usize = 1280; const PROOF_EXPIRATION_IN_HS: u64 = 12; pub const MAX_PEERS_TCP_CONNECTIONS: usize = 100; @@ -74,11 +76,7 @@ impl Discv4Server { /// - Loads bootnodes to establish initial peer connections. /// - Starts the lookup handler via [`Discv4LookupHandler`] to periodically search for new peers. pub async fn start(&self, bootnodes: Vec) -> Result<(), DiscoveryError> { - let lookup_handler = Discv4LookupHandler::new( - self.ctx.clone(), - self.udp_socket.clone(), - self.lookup_interval_minutes, - ); + let lookup_handler = self.new_lookup_handler(); self.ctx.tracker.spawn({ let self_clone = self.clone(); @@ -89,11 +87,20 @@ impl Discv4Server { async move { self_clone.start_revalidation().await } }); self.load_bootnodes(bootnodes).await; - lookup_handler.start(10); + lookup_handler.start(self.lookup_interval_minutes, 10); Ok(()) } + /// Creates an iterator that yields nodes retrieved by doing random lookups. + pub fn new_random_iterator(&self) -> Discv4NodeIterator { + Discv4NodeIterator::new(self.ctx.clone(), self.udp_socket.clone()) + } + + fn new_lookup_handler(&self) -> Discv4LookupHandler { + Discv4LookupHandler::new(self.ctx.clone(), self.udp_socket.clone()) + } + async fn load_bootnodes(&self, bootnodes: Vec) { for node in bootnodes { if let Err(e) = self.try_add_peer_and_ping(node).await { @@ -102,7 +109,7 @@ impl Discv4Server { } } - pub async fn receive(&self) { + async fn receive(&self) { let mut buf = vec![0; MAX_DISC_PACKET_SIZE]; loop { @@ -162,7 +169,7 @@ impl Discv4Server { if let Some(enr_seq) = msg.enr_seq { if enr_seq > peer.record.seq && peer.is_proven { debug!("Found outdated enr-seq, sending an enr_request"); - self.send_enr_request(&peer.node, self.ctx.table.lock().await) + self.send_enr_request(&peer.node, &mut self.ctx.table.lock().await) .await?; } } @@ -193,7 +200,7 @@ impl Discv4Server { )); } - // all validations went well, mark as answered and start a rlpx connection + // all validations went well, mark as answered self.ctx .table .lock() @@ -204,29 +211,11 @@ impl Discv4Server { if let Some(enr_seq) = msg.enr_seq { if enr_seq > peer.record.seq { debug!("Found outdated enr-seq, send an enr_request"); - self.send_enr_request(&peer.node, self.ctx.table.lock().await) + self.send_enr_request(&peer.node, &mut self.ctx.table.lock().await) .await?; return Ok(()); } } - - // We won't initiate a connection if we are already connected. - // This will typically be the case when revalidating a node. - if peer.is_connected { - return Ok(()); - } - - // We won't initiate a connection if we have reached the maximum number of peers. - let active_connections = { - let table = self.ctx.table.lock().await; - table.count_connected_peers() - }; - if active_connections >= MAX_PEERS_TCP_CONNECTIONS { - return Ok(()); - } - - RLPxConnection::spawn_as_initiator(self.ctx.clone(), &peer.node).await; - Ok(()) } Message::FindNode(msg) => { @@ -497,29 +486,6 @@ impl Discv4Server { peer.node.public_key ); } - let peer = { - let table = self.ctx.table.lock().await; - table.get_by_node_id(packet.get_node_id()).cloned() - }; - let Some(peer) = peer else { - return Err(DiscoveryError::InvalidMessage("not known node".into())); - }; - // This will typically be the case when revalidating a node. - if peer.is_connected { - return Ok(()); - } - - // We won't initiate a connection if we have reached the maximum number of peers. - let active_connections = { - let table = self.ctx.table.lock().await; - table.count_connected_peers() - }; - if active_connections >= MAX_PEERS_TCP_CONNECTIONS { - return Ok(()); - } - - RLPxConnection::spawn_as_initiator(self.ctx.clone(), &peer.node).await; - Ok(()) } } @@ -689,10 +655,10 @@ impl Discv4Server { } } - async fn send_enr_request<'a>( + pub async fn send_enr_request<'a>( &self, node: &Node, - mut table_lock: MutexGuard<'a, KademliaTable>, + table_lock: &mut MutexGuard<'a, KademliaTable>, ) -> Result<(), DiscoveryError> { let mut buf = Vec::new(); let expiration: u64 = get_msg_expiration_from_seconds(20); diff --git a/crates/networking/p2p/kademlia.rs b/crates/networking/p2p/kademlia.rs index 4bf794cc89..5ca83b2d37 100644 --- a/crates/networking/p2p/kademlia.rs +++ b/crates/networking/p2p/kademlia.rs @@ -11,9 +11,9 @@ use tokio::sync::mpsc::UnboundedSender; use tokio::sync::{Mutex, mpsc}; use tracing::debug; -pub const MAX_NODES_PER_BUCKET: usize = 16; +pub const MAX_NODES_PER_BUCKET: usize = 1600; const NUMBER_OF_BUCKETS: usize = 256; -const MAX_NUMBER_OF_REPLACEMENTS: usize = 10; +const MAX_NUMBER_OF_REPLACEMENTS: usize = 1000; #[derive(Clone, Debug, Default)] pub struct Bucket { @@ -105,7 +105,7 @@ impl KademliaTable { .replacements .iter() .any(|p| p.node.node_id() == node.node_id()); - if peer_already_in_replacements { + if peer_already_in_replacements && !force_push { return (None, false); } diff --git a/crates/networking/p2p/network.rs b/crates/networking/p2p/network.rs index 4b90710778..f043a36acb 100644 --- a/crates/networking/p2p/network.rs +++ b/crates/networking/p2p/network.rs @@ -3,6 +3,7 @@ use crate::kademlia::{self, KademliaTable}; use crate::rlpx::connection::server::{RLPxConnBroadcastSender, RLPxConnection}; use crate::rlpx::message::Message as RLPxMessage; use crate::rlpx::p2p::SUPPORTED_SNAP_CAPABILITIES; +use crate::rlpx::server::RLPxServer; use crate::types::{Node, NodeRecord}; use ethrex_blockchain::Blockchain; use ethrex_common::{H256, H512}; @@ -11,6 +12,7 @@ use k256::{ ecdsa::SigningKey, elliptic_curve::{PublicKey, sec1::ToEncodedPoint}, }; +use spawned_concurrency::error::GenServerError; use std::{io, net::SocketAddr, sync::Arc}; use tokio::{ net::{TcpListener, TcpSocket}, @@ -32,6 +34,7 @@ pub fn peer_table(node_id: H256) -> Arc> { #[derive(Debug)] pub enum NetworkError { DiscoveryStart(DiscoveryError), + RLPxStart(GenServerError), } #[derive(Clone, Debug)] @@ -102,6 +105,11 @@ pub async fn start_network(context: P2PContext, bootnodes: Vec) -> Result< .await .map_err(NetworkError::DiscoveryStart)?; + info!("Starting P2P service"); + RLPxServer::spawn(context.clone(), discovery) + .await + .map_err(NetworkError::RLPxStart)?; + info!( "Listening for requests at {}", context.local_node.tcp_addr() diff --git a/crates/networking/p2p/rlpx.rs b/crates/networking/p2p/rlpx.rs index 1880b39487..4d7afbc25f 100644 --- a/crates/networking/p2p/rlpx.rs +++ b/crates/networking/p2p/rlpx.rs @@ -1,7 +1,9 @@ pub mod connection; pub mod error; pub mod eth; +pub mod lookup; pub mod message; pub mod p2p; +pub mod server; pub mod snap; pub mod utils; diff --git a/crates/networking/p2p/rlpx/connection/handshake.rs b/crates/networking/p2p/rlpx/connection/handshake.rs index 7764e0cdff..0060347de4 100644 --- a/crates/networking/p2p/rlpx/connection/handshake.rs +++ b/crates/networking/p2p/rlpx/connection/handshake.rs @@ -66,7 +66,7 @@ pub(crate) async fn perform( ) -> Result<(Established, SplitStream>), RLPxError> { let (context, node, framed, inbound) = match state { InnerState::Initiator(Initiator { context, node }) => { - let addr = SocketAddr::new(node.ip, node.tcp_port); + let addr = node.tcp_addr(); let mut stream = match tcp_stream(addr).await { Ok(result) => result, Err(error) => { diff --git a/crates/networking/p2p/rlpx/connection/server.rs b/crates/networking/p2p/rlpx/connection/server.rs index 9f46d23dc2..21c3e3e8f6 100644 --- a/crates/networking/p2p/rlpx/connection/server.rs +++ b/crates/networking/p2p/rlpx/connection/server.rs @@ -25,7 +25,7 @@ use tokio::{ }; use tokio_stream::StreamExt; use tokio_util::codec::Framed; -use tracing::{debug, error}; +use tracing::{debug, error, info}; use crate::{ discv4::server::MAX_PEERS_TCP_CONNECTIONS, @@ -206,6 +206,7 @@ impl GenServer for RLPxConnection { .await; Err(RLPxError::Disconnected()) } else { + info!("RLPx connection established with peer"); // New state state.0 = InnerState::Established(established_state); Ok(state) @@ -285,9 +286,12 @@ where // Updating the state to establish the backend channel state.backend_channel = Some(sender); + init_capabilities(state, &mut stream).await?; + // NOTE: if the peer came from the discovery server it will already be inserted in the table // but that might not always be the case, so we try to add it to the table // Note: we don't ping the node we let the validation service do its job + info!("Adding peer to table after exchange"); { let mut table_lock = state.table.lock().await; table_lock.insert_node_forced(state.node.clone()); @@ -298,7 +302,6 @@ where state.inbound, ); } - init_capabilities(state, &mut stream).await?; log_peer_debug(&state.node, "Peer connection initialized."); // Send transactions transaction hashes from mempool at connection start diff --git a/crates/networking/p2p/rlpx/lookup.rs b/crates/networking/p2p/rlpx/lookup.rs new file mode 100644 index 0000000000..3eea9d41e6 --- /dev/null +++ b/crates/networking/p2p/rlpx/lookup.rs @@ -0,0 +1,95 @@ +use spawned_concurrency::{ + error::GenServerError, + messages::Unused, + tasks::{CastResponse, GenServer, GenServerHandle}, +}; +use tracing::error; + +use crate::{ + discv4::server::Discv4NodeIterator, + network::P2PContext, + rlpx::{connection::server::RLPxConnection, server::RLPxServerHandle}, +}; + +pub type RLPxLookupServerHandle = GenServerHandle; + +#[derive(Debug, Clone)] +pub struct RLPxLookupServerState { + ctx: P2PContext, + node_iterator: Discv4NodeIterator, +} + +#[derive(Debug, Clone)] +pub struct RLPxLookupServer; + +impl RLPxLookupServer { + pub async fn spawn( + ctx: P2PContext, + node_iterator: Discv4NodeIterator, + _consumer: RLPxServerHandle, + ) -> Result, GenServerError> { + let state = RLPxLookupServerState { ctx, node_iterator }; + let mut handle = Self::start(state); + handle.cast(InMessage::FetchPeers).await?; + Ok(handle) + } + + pub async fn stop(handle: &mut RLPxLookupServerHandle) -> Result<(), GenServerError> { + handle.cast(InMessage::Stop).await + } +} + +#[derive(Debug, Clone)] +pub enum InMessage { + FetchPeers, + Stop, +} + +impl GenServer for RLPxLookupServer { + type CallMsg = Unused; + type CastMsg = InMessage; + type OutMsg = Unused; + type State = RLPxLookupServerState; + type Error = std::convert::Infallible; + + fn new() -> Self { + Self + } + + async fn handle_cast( + &mut self, + msg: Self::CastMsg, + handle: &GenServerHandle, + mut state: Self::State, + ) -> CastResponse { + if matches!(msg, InMessage::Stop) { + return CastResponse::Stop; + } + // Stop on error + if handle.clone().cast(InMessage::FetchPeers).await.is_err() { + error!("RLPxLookupServer: failed to send message to self, stopping lookup"); + return CastResponse::Stop; + } + let node = state.node_iterator.next().await; + let node_id = node.node_id(); + // Get peer status and mark as connected + { + let mut table = state.ctx.table.lock().await; + table.insert_node_forced(node.clone()); + let node = table + .get_by_node_id_mut(node_id) + .expect("we just inserted this node"); + + // If we already have a connection to this node, we don't need to start a new one + if node.is_connected { + drop(table); + return CastResponse::NoReply(state); + } + node.is_connected = true; + } + // Start a connection + RLPxConnection::spawn_as_initiator(state.ctx.clone(), &node).await; + + CastResponse::NoReply(state) + } +} diff --git a/crates/networking/p2p/rlpx/server.rs b/crates/networking/p2p/rlpx/server.rs new file mode 100644 index 0000000000..436428ef88 --- /dev/null +++ b/crates/networking/p2p/rlpx/server.rs @@ -0,0 +1,171 @@ +use std::time::Duration; + +use spawned_concurrency::{ + error::GenServerError, + messages::Unused, + tasks::{CastResponse, GenServer, GenServerHandle, send_after}, +}; +use tracing::{error, info}; + +use crate::{ + discv4::server::Discv4Server, + network::P2PContext, + rlpx::{ + connection::server::RLPxConnection, + lookup::RLPxLookupServer, + p2p::{SUPPORTED_ETH_CAPABILITIES, SUPPORTED_SNAP_CAPABILITIES}, + }, + types::NodeRecord, +}; + +const MAX_PEER_COUNT: usize = 50; +const MAX_CONCURRENT_LOOKUPS: usize = 1; + +pub type RLPxServerHandle = GenServerHandle; + +#[derive(Debug, Clone)] +pub struct RLPxServerState { + ctx: P2PContext, + discovery_server: Discv4Server, + lookup_servers: Vec>, +} + +#[derive(Clone)] +pub enum InMessage { + BookKeeping, +} + +#[derive(Debug, Clone)] +pub struct RLPxServer; + +impl RLPxServer { + pub async fn spawn( + ctx: P2PContext, + discovery_server: Discv4Server, + ) -> Result, GenServerError> { + let state = RLPxServerState { + ctx, + discovery_server, + lookup_servers: vec![], + }; + // TODO: spawn multiple lookup servers + let mut handle = Self::start(state); + handle.cast(InMessage::BookKeeping).await?; + Ok(handle) + } +} + +impl GenServer for RLPxServer { + type CallMsg = Unused; + type CastMsg = InMessage; + type OutMsg = Unused; + type State = RLPxServerState; + type Error = std::convert::Infallible; + + fn new() -> Self { + Self + } + + async fn handle_cast( + &mut self, + msg: Self::CastMsg, + handle: &GenServerHandle, + mut state: Self::State, + ) -> CastResponse { + match msg { + InMessage::BookKeeping => { + bookkeeping(handle, &mut state).await; + } + } + CastResponse::NoReply(state) + } +} + +/// Perform periodic tasks +async fn bookkeeping(handle: &GenServerHandle, state: &mut RLPxServerState) { + send_after( + Duration::from_secs(5), + handle.clone(), + InMessage::BookKeeping, + ); + + { + let mut table_lock = state.ctx.table.lock().await; + let nodes_without_enr: Vec<_> = table_lock + .iter_peers() + .filter(|p| p.record == NodeRecord::default() && p.enr_request_hash.is_none()) + .map(|p| p.node.clone()) + .take(128) + .collect(); + for node in nodes_without_enr { + let _ = state + .discovery_server + .send_enr_request(&node, &mut table_lock) + .await; + } + + let nodes_without_connection: Vec<_> = table_lock + .iter_peers() + .filter(|p| { + !p.is_connected && p.channels.is_none() && p.record != NodeRecord::default() + }) + .map(|p| p.node.clone()) + .take(128) + .collect(); + for node in nodes_without_connection { + RLPxConnection::spawn_as_initiator(state.ctx.clone(), &node).await; + } + } + + // Stop looking for peers if we have enough connections + if got_enough_peers(state).await { + stop_lookup_servers(state).await; + // Otherwise, spawn the lookup servers + } else if state.lookup_servers.is_empty() { + info!("Spawning new lookup servers"); + spawn_lookup_servers(state, handle).await; + } +} + +async fn spawn_lookup_servers(state: &mut RLPxServerState, handle: &GenServerHandle) { + for _ in 0..MAX_CONCURRENT_LOOKUPS { + let node_iterator = state.discovery_server.new_random_iterator(); + let Ok(new_lookup_server) = + RLPxLookupServer::spawn(state.ctx.clone(), node_iterator, handle.clone()) + .await + .inspect_err(|e| error!("Failed to spawn lookup server: {e}")) + else { + continue; + }; + state.lookup_servers.push(new_lookup_server); + } +} + +async fn stop_lookup_servers(state: &mut RLPxServerState) { + for mut server in state.lookup_servers.drain(..) { + let _ = RLPxLookupServer::stop(&mut server) + .await + .inspect_err(|e| error!("Failed to stop lookup server: {e}")); + } +} + +async fn got_enough_peers(state: &RLPxServerState) -> bool { + let table = state.ctx.table.lock().await; + // Check we have a good amount of peers that support p2p+eth+snap + let snap_peers = table + .iter_peers() + .filter(|peer| { + peer.is_connected + && peer + .supported_capabilities + .iter() + .any(|c| SUPPORTED_SNAP_CAPABILITIES.contains(c)) + && peer + .supported_capabilities + .iter() + .any(|c| SUPPORTED_ETH_CAPABILITIES.contains(c)) + }) + .count(); + + snap_peers >= MAX_PEER_COUNT +}