Skip to content

feat(l1): pull new peers from discovery #3500

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 38 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
055492b
refactor: move parameter to function call
MegaRedHand Jul 4, 2025
0e9961a
refactor: split step logic from loop
MegaRedHand Jul 4, 2025
2d95ec1
feat: add random node lookup iterator
MegaRedHand Jul 4, 2025
e49cc72
refactor: start RPC API after networking
MegaRedHand Jul 4, 2025
a6e4699
wip: start RLPx server
MegaRedHand Jul 4, 2025
d6152de
Merge branch 'main' into p2p/pull-new-peers-from-discovery
MegaRedHand Jul 7, 2025
f7e7f34
wip: add "lookup worker" genserver
MegaRedHand Jul 7, 2025
65a0b19
feat: look for peers in new RLPxServer
MegaRedHand Jul 7, 2025
896bda1
fix: poll future
MegaRedHand Jul 7, 2025
de71b8e
feat: stop looking for peers once limit is reached
MegaRedHand Jul 7, 2025
6827c55
chore: remove spammy log
MegaRedHand Jul 7, 2025
154d6b9
fix: add missing early return
MegaRedHand Jul 7, 2025
0885f01
fix: avoid early return in bookkeeping function
MegaRedHand Jul 7, 2025
2c2254c
Merge branch 'main' into p2p/pull-new-peers-from-discovery
MegaRedHand Jul 7, 2025
d3797fb
feat: prune peers according to supported capabilities
MegaRedHand Jul 7, 2025
1a5b1a8
chore: comment out rlpx connection in discv4 server
MegaRedHand Jul 7, 2025
8df43ee
fix: don't spawn lookup servers if we already spawned them
MegaRedHand Jul 7, 2025
bbe5d41
fix: don't fail if peer has unknown capabilities
MegaRedHand Jul 7, 2025
50ab0a5
fix: don't fail if peer has unknown capabilities
MegaRedHand Jul 7, 2025
8098f3a
refactor: use small strings for capability.protocol
MegaRedHand Jul 7, 2025
94ef036
fix: use static constructors in tests
MegaRedHand Jul 7, 2025
1b3605e
fix: use new function in serialize impl
MegaRedHand Jul 7, 2025
68fcef7
fix: invert condition
MegaRedHand Jul 7, 2025
d6dc362
fix: assert input is less than 8 bytes long
MegaRedHand Jul 7, 2025
5e54807
fix: encode protocol correctly
MegaRedHand Jul 7, 2025
70fcbd2
Merge branch 'p2p/ignore-unknown-capabilities' into p2p/pull-new-peer…
MegaRedHand Jul 7, 2025
91ed06f
fix: initialize table entries before connecting
MegaRedHand Jul 7, 2025
e091616
fix: make force push ignore replacements
MegaRedHand Jul 7, 2025
264df3d
fix: use Kademlia table to fetch server state
MegaRedHand Jul 7, 2025
5265562
chore: remove unused struct
MegaRedHand Jul 7, 2025
8d1dd1e
chore: fix lints
MegaRedHand Jul 8, 2025
caf11fc
Merge branch 'main' into p2p/pull-new-peers-from-discovery
MegaRedHand Jul 8, 2025
f837113
wip: commit some changes
MegaRedHand Jul 8, 2025
708c153
chore: remove `known_node` constant
MegaRedHand Jul 8, 2025
9d47e0f
fix: verify peer before adding to table
MegaRedHand Jul 8, 2025
515d75d
chore: remove unwraps
MegaRedHand Jul 8, 2025
1abf74a
fix: don't check P2P capability
MegaRedHand Jul 8, 2025
9557483
Merge branch 'main' into p2p/pull-new-peers-from-discovery
MegaRedHand Jul 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions cmd/ethrex/ethrex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,18 +98,6 @@ async fn main() -> eyre::Result<()> {

let cancel_token = tokio_util::sync::CancellationToken::new();

init_rpc_api(
&opts,
peer_table.clone(),
local_p2p_node.clone(),
local_node_record.lock().await.clone(),
store.clone(),
blockchain.clone(),
cancel_token.clone(),
tracker.clone(),
)
.await;

if opts.metrics_enabled {
init_metrics(&opts, tracker.clone());
}
Expand All @@ -127,7 +115,7 @@ async fn main() -> eyre::Result<()> {
&opts,
&network,
&data_dir,
local_p2p_node,
local_p2p_node.clone(),
local_node_record.clone(),
signer,
peer_table.clone(),
Expand All @@ -142,6 +130,18 @@ async fn main() -> eyre::Result<()> {
}
}

init_rpc_api(
&opts,
peer_table.clone(),
local_p2p_node,
local_node_record.lock().await.clone(),
store.clone(),
blockchain.clone(),
cancel_token.clone(),
tracker.clone(),
)
.await;

let mut signal_terminate = signal(SignalKind::terminate())?;

tokio::select! {
Expand Down
2 changes: 1 addition & 1 deletion cmd/ethrex/initializers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ pub fn get_bootnodes(opts: &Options, network: &Network, data_dir: &str) -> Vec<N
bootnodes.extend(networks::HOLESKY_BOOTNODES.clone());
}
Network::PublicNetwork(PublicNetwork::Hoodi) => {
info!("Addig hoodi preset bootnodes");
info!("Adding hoodi preset bootnodes");
bootnodes.extend(networks::HOODI_BOOTNODES.clone());
}
Network::PublicNetwork(PublicNetwork::Mainnet) => {
Expand Down
161 changes: 121 additions & 40 deletions crates/networking/p2p/discv4/lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,11 @@ use tracing::debug;
pub struct Discv4LookupHandler {
ctx: P2PContext,
udp_socket: Arc<UdpSocket>,
interval_minutes: u64,
}

impl Discv4LookupHandler {
pub fn new(ctx: P2PContext, udp_socket: Arc<UdpSocket>, interval_minutes: u64) -> Self {
Self {
ctx,
udp_socket,
interval_minutes,
}
pub fn new(ctx: P2PContext, udp_socket: Arc<UdpSocket>) -> Self {
Self { ctx, udp_socket }
}

/// Starts a tokio scheduler that:
Expand All @@ -52,19 +47,19 @@ impl Discv4LookupHandler {
/// doesn't have any node to ask.
///
/// See more https://github.com/ethereum/devp2p/blob/master/discv4.md#recursive-lookup
pub fn start(&self, initial_interval_wait_seconds: u64) {
pub fn start(&self, interval_minutes: u64, initial_interval_wait_seconds: u64) {
self.ctx.tracker.spawn({
let self_clone = self.clone();
async move {
self_clone
.start_lookup_loop(initial_interval_wait_seconds)
.start_lookup_loop(interval_minutes, initial_interval_wait_seconds)
.await;
}
});
}

async fn start_lookup_loop(&self, initial_interval_wait_seconds: u64) {
let mut interval = tokio::time::interval(Duration::from_secs(self.interval_minutes));
async fn start_lookup_loop(&self, interval_minutes: u64, initial_interval_wait_seconds: u64) {
let mut interval = tokio::time::interval(Duration::from_secs(interval_minutes * 60));
tokio::time::sleep(Duration::from_secs(initial_interval_wait_seconds)).await;

loop {
Expand Down Expand Up @@ -104,48 +99,77 @@ impl Discv4LookupHandler {
async fn recursive_lookup(&self, target: H512) {
// lookups start with the closest nodes to the target from our table
let target_node_id = node_id(&target);
let mut peers_to_ask: Vec<Node> = self
.ctx
.table
.lock()
.await
.get_closest_nodes(target_node_id);

let mut peers_to_ask = vec![];
// stores the peers in peers_to_ask + the peers that were in peers_to_ask but were replaced by closer targets
let mut seen_peers: HashSet<H512> = HashSet::default();
let mut seen_peers = HashSet::default();
let mut asked_peers = HashSet::default();

seen_peers.insert(self.ctx.local_node.public_key);
for node in &peers_to_ask {
seen_peers.insert(node.public_key);
loop {
let (_, should_continue) = self
.recursive_lookup_step(
target,
target_node_id,
&mut peers_to_ask,
&mut seen_peers,
&mut asked_peers,
)
.await;
// the lookup finishes when there are no more queries to do
// that happens when we have asked all the peers
if !should_continue {
break;
}
}
}

loop {
let (nodes_found, queries) = self.lookup(target, &mut asked_peers, &peers_to_ask).await;
/// Performs a single step of the recursive lookup.
/// Returns `true` if we found new peers, `false` otherwise.
async fn recursive_lookup_step(
&self,
target: H512,
target_node_id: H256,
peers_to_ask: &mut Vec<Node>,
seen_peers: &mut HashSet<H512>,
asked_peers: &mut HashSet<H512>,
) -> (Vec<Node>, bool) {
if peers_to_ask.is_empty() {
let initial_peers = self
.ctx
.table
.lock()
.await
.get_closest_nodes(target_node_id);

for node in nodes_found {
if !seen_peers.contains(&node.public_key) {
seen_peers.insert(node.public_key);
self.peers_to_ask_push(&mut peers_to_ask, target_node_id, node);
}
peers_to_ask.extend(initial_peers.iter().cloned());

seen_peers.insert(self.ctx.local_node.public_key);
for node in peers_to_ask {
seen_peers.insert(node.public_key);
}
return (initial_peers, true);
}
let (nodes_found, queries) = self.lookup(target, asked_peers, peers_to_ask).await;

// the lookup finishes when there are no more queries to do
// that happens when we have asked all the peers
if queries == 0 {
break;
for node in &nodes_found {
if !seen_peers.contains(&node.public_key) {
seen_peers.insert(node.public_key);
self.peers_to_ask_push(peers_to_ask, target_node_id, node.clone());
}
}

(nodes_found, queries > 0)
}

/// We use the public key instead of the node_id as target as we might need to perform a FindNode request
async fn lookup(
pub async fn lookup(
&self,
target: H512,
asked_peers: &mut HashSet<H512>,
nodes_to_ask: &Vec<Node>,
) -> (Vec<Node>, u32) {
// send FIND_NODE as much as three times
let alpha = 3;
let concurrency_factor = 3;
let mut queries = 0;
let mut nodes = vec![];

Expand Down Expand Up @@ -184,7 +208,7 @@ impl Discv4LookupHandler {
}
}

if queries == alpha {
if queries == concurrency_factor {
break;
}
}
Expand Down Expand Up @@ -263,6 +287,67 @@ impl Discv4LookupHandler {
}
}

#[derive(Debug, Clone)]
pub struct Discv4NodeIterator {
lookup_handler: Discv4LookupHandler,
target: H512,
target_node_id: H256,
peers_to_ask: Vec<Node>,
seen_peers: HashSet<H512>,
asked_peers: HashSet<H512>,
buffer: Vec<Node>,
}

impl Discv4NodeIterator {
pub fn new(ctx: P2PContext, udp_socket: Arc<UdpSocket>) -> Self {
let random_signing_key = SigningKey::random(&mut OsRng);
let target = public_key_from_signing_key(&random_signing_key);
let target_node_id = node_id(&target);

let lookup_handler = Discv4LookupHandler::new(ctx, udp_socket);

Discv4NodeIterator {
lookup_handler,
target,
target_node_id,
peers_to_ask: vec![],
seen_peers: Default::default(),
asked_peers: Default::default(),
buffer: vec![],
}
}

pub async fn next(&mut self) -> Node {
if let Some(node) = self.buffer.pop() {
return node;
}
loop {
let (found_nodes, should_continue) = self
.lookup_handler
.recursive_lookup_step(
self.target,
self.target_node_id,
&mut self.peers_to_ask,
&mut self.seen_peers,
&mut self.asked_peers,
)
.await;

self.buffer.extend(found_nodes);

if !should_continue {
self.peers_to_ask.clear();
self.seen_peers.clear();
self.asked_peers.clear();
}

if let Some(node) = self.buffer.pop() {
return node;
}
}
}
}

#[cfg(test)]
mod tests {
use tokio::time::sleep;
Expand All @@ -277,11 +362,7 @@ mod tests {
};

fn lookup_handler_from_server(server: Discv4Server) -> Discv4LookupHandler {
Discv4LookupHandler::new(
server.ctx.clone(),
server.udp_socket.clone(),
server.lookup_interval_minutes,
)
Discv4LookupHandler::new(server.ctx.clone(), server.udp_socket.clone())
}

#[tokio::test]
Expand Down
Loading
Loading