From 91563833e93870f483f344fbc3439dd747c77cd4 Mon Sep 17 00:00:00 2001 From: tabcat Date: Sun, 29 Sep 2024 22:33:09 +0700 Subject: [PATCH 1/4] fix: prevent duplicate connections to multiaddr After opening a new connection, check if there is an existing connection to the same peer. If the existing connection is not limited or the new connection is also limit, return the existing connection. This will prevent multiple connections to the same peer in cases where the multiaddr being dialed does not contain a peer id. --- .../src/connection-manager/dial-queue.ts | 27 ++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/packages/libp2p/src/connection-manager/dial-queue.ts b/packages/libp2p/src/connection-manager/dial-queue.ts index 2c3c8b5a34..f789bd4ee4 100644 --- a/packages/libp2p/src/connection-manager/dial-queue.ts +++ b/packages/libp2p/src/connection-manager/dial-queue.ts @@ -135,11 +135,12 @@ export class DialQueue { */ async dial (peerIdOrMultiaddr: PeerId | Multiaddr | Multiaddr[], options: OpenConnectionOptions = {}): Promise { const { peerId, multiaddrs } = getPeerAddress(peerIdOrMultiaddr) + const { force } = options // make sure we don't have an existing connection to any of the addresses we // are about to dial const existingConnection = Array.from(this.connections.values()).flat().find(conn => { - if (options.force === true) { + if (force === true) { return false } @@ -260,6 +261,30 @@ export class DialQueue { this.log.error('could not update last dial failure key for %p', peerId, err) } + const { remotePeer } = conn + + // make sure we don't have an existing connection to the address we dialed + const existingConnection = Array.from(this.connections.values()).flat().find(_conn => { + if (force === true) { + return false + } + + if (_conn.remotePeer.equals(remotePeer) && _conn !== conn) { + return true + } + + return false + }) + + // return existing, open connection to peer if equal or better limits + if (existingConnection?.status === 'open' && (existingConnection?.limits == null || conn?.limits != null)) { + this.log('already connected to %a', existingConnection.remoteAddr) + options?.onProgress?.(new CustomProgressEvent('dial-queue:already-connected')) + this.log('closing duplicate connection to %p', remotePeer) + await conn.close() + return existingConnection + } + return conn } catch (err: any) { this.log.error('dial failed to %a', address.multiaddr, err) From d627b12b7d23c784f4993d8c68d49f8b08c21ad8 Mon Sep 17 00:00:00 2001 From: tabcat Date: Wed, 2 Oct 2024 21:08:05 +0700 Subject: [PATCH 2/4] test: prevent duplicate connections to multiaddr Checks that the dialer is returning existing connections to the same peer when given a multiaddr without a peer id. --- .../connection-manager/dial-queue.spec.ts | 71 +++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/packages/libp2p/test/connection-manager/dial-queue.spec.ts b/packages/libp2p/test/connection-manager/dial-queue.spec.ts index dc1d07df62..cddf7e4cb8 100644 --- a/packages/libp2p/test/connection-manager/dial-queue.spec.ts +++ b/packages/libp2p/test/connection-manager/dial-queue.spec.ts @@ -5,6 +5,7 @@ import { NotFoundError } from '@libp2p/interface' import { matchMultiaddr } from '@libp2p/interface-compliance-tests/matchers' import { mockConnection, mockDuplex, mockMultiaddrConnection } from '@libp2p/interface-compliance-tests/mocks' import { peerLogger } from '@libp2p/logger' +import { PeerMap } from '@libp2p/peer-collections' import { peerIdFromPrivateKey } from '@libp2p/peer-id' import { multiaddr, resolvers } from '@multiformats/multiaddr' import { WebRTC } from '@multiformats/multiaddr-matcher' @@ -325,4 +326,74 @@ describe('dial queue', () => { dialer = new DialQueue(components) await expect(dialer.dial(remotePeer)).to.eventually.equal(connection) }) + + it('should return existing connection when dialing a multiaddr without a peer id', async () => { + const remotePeer = peerIdFromPrivateKey(await generateKeyPair('Ed25519')) + const ip = multiaddr('/ip4/123.123.123.123') + const addr1 = ip.encapsulate('/tcp/123') + const addr2 = ip.encapsulate('/tcp/321') + + const existingConnection = stubInterface({ + limits: { + bytes: 100n + }, + remotePeer, + remoteAddr: addr1.encapsulate(`/p2p/${remotePeer}`), + status: 'open' + }) + + const newConnection = stubInterface({ + limits: { + bytes: 100n + }, + remotePeer, + remoteAddr: addr2.encapsulate(`/p2p/${remotePeer}`), + status: 'open' + }) + + const connections = new PeerMap() + connections.set(remotePeer, [existingConnection]) + + components.transportManager.dialTransportForMultiaddr.callsFake(ma => { + return stubInterface() + }) + components.transportManager.dial.callsFake(async (ma, opts = {}) => newConnection) + dialer = new DialQueue(components, { connections }) + + await expect(dialer.dial(addr2)).to.eventually.equal(existingConnection) + }) + + it('should return new connection when existing connection to same peer is worse', async () => { + const remotePeer = peerIdFromPrivateKey(await generateKeyPair('Ed25519')) + const ip = multiaddr('/ip4/123.123.123.123') + const addr1 = ip.encapsulate('/tcp/123') + const addr2 = ip.encapsulate('/tcp/321') + + const existingConnection = stubInterface({ + limits: { + bytes: 100n + }, + remotePeer, + remoteAddr: addr1.encapsulate(`/p2p/${remotePeer}`), + status: 'open' + }) + + const newConnection = stubInterface({ + limits: undefined, + remotePeer, + remoteAddr: addr2.encapsulate(`/p2p/${remotePeer}`), + status: 'open' + }) + + const connections = new PeerMap() + connections.set(remotePeer, [existingConnection]) + + components.transportManager.dialTransportForMultiaddr.callsFake(ma => { + return stubInterface() + }) + components.transportManager.dial.callsFake(async (ma, opts = {}) => newConnection) + dialer = new DialQueue(components, { connections }) + + await expect(dialer.dial(addr2)).to.eventually.equal(newConnection) + }) }) From cdfb408a536533f7a733392f2c68d2c628cd46ea Mon Sep 17 00:00:00 2001 From: tabcat Date: Fri, 13 Jun 2025 14:32:22 -0500 Subject: [PATCH 3/4] add some newlines --- packages/libp2p/src/connection-manager/dial-queue.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/libp2p/src/connection-manager/dial-queue.ts b/packages/libp2p/src/connection-manager/dial-queue.ts index a8cb5fa726..edc1a86a12 100644 --- a/packages/libp2p/src/connection-manager/dial-queue.ts +++ b/packages/libp2p/src/connection-manager/dial-queue.ts @@ -338,8 +338,10 @@ export class DialQueue { if (existingConnection?.status === 'open' && (existingConnection?.limits == null || conn?.limits != null)) { this.log('already connected to %a', existingConnection.remoteAddr) options?.onProgress?.(new CustomProgressEvent('dial-queue:already-connected')) + this.log('closing duplicate connection to %p', remotePeer) await conn.close() + return existingConnection } From 71ea20e66c141430ee9f9fc92bf00e337123df9e Mon Sep 17 00:00:00 2001 From: tabcat Date: Fri, 13 Jun 2025 14:33:38 -0500 Subject: [PATCH 4/4] undo different conn return based on limits --- .../src/connection-manager/dial-queue.ts | 3 +- .../connection-manager/dial-queue.spec.ts | 34 ------------------- 2 files changed, 1 insertion(+), 36 deletions(-) diff --git a/packages/libp2p/src/connection-manager/dial-queue.ts b/packages/libp2p/src/connection-manager/dial-queue.ts index edc1a86a12..4c8ee7a406 100644 --- a/packages/libp2p/src/connection-manager/dial-queue.ts +++ b/packages/libp2p/src/connection-manager/dial-queue.ts @@ -334,8 +334,7 @@ export class DialQueue { return false }) - // return existing, open connection to peer if equal or better limits - if (existingConnection?.status === 'open' && (existingConnection?.limits == null || conn?.limits != null)) { + if (existingConnection?.status === 'open') { this.log('already connected to %a', existingConnection.remoteAddr) options?.onProgress?.(new CustomProgressEvent('dial-queue:already-connected')) diff --git a/packages/libp2p/test/connection-manager/dial-queue.spec.ts b/packages/libp2p/test/connection-manager/dial-queue.spec.ts index a1de20030e..ec235b3e8f 100644 --- a/packages/libp2p/test/connection-manager/dial-queue.spec.ts +++ b/packages/libp2p/test/connection-manager/dial-queue.spec.ts @@ -414,40 +414,6 @@ describe('dial queue', () => { await expect(dialer.dial(addr2)).to.eventually.equal(existingConnection) }) - it('should return new connection when existing connection to same peer is worse', async () => { - const remotePeer = peerIdFromPrivateKey(await generateKeyPair('Ed25519')) - const ip = multiaddr('/ip4/123.123.123.123') - const addr1 = ip.encapsulate('/tcp/123') - const addr2 = ip.encapsulate('/tcp/321') - - const existingConnection = stubInterface({ - limits: { - bytes: 100n - }, - remotePeer, - remoteAddr: addr1.encapsulate(`/p2p/${remotePeer}`), - status: 'open' - }) - - const newConnection = stubInterface({ - limits: undefined, - remotePeer, - remoteAddr: addr2.encapsulate(`/p2p/${remotePeer}`), - status: 'open' - }) - - const connections = new PeerMap() - connections.set(remotePeer, [existingConnection]) - - components.transportManager.dialTransportForMultiaddr.callsFake(ma => { - return stubInterface() - }) - components.transportManager.dial.callsFake(async (ma, opts = {}) => newConnection) - dialer = new DialQueue(components, { connections }) - - await expect(dialer.dial(addr2)).to.eventually.equal(newConnection) - }) - it('should respect user dial signal over default timeout if it is passed', async () => { const dialTimeout = 10 const userTimeout = 500