Skip to content

Commit 7706c7b

Browse files
committed
feat: make "get providers" non-blocking when there are closer peers
Let's assume there's one (or zero) providers for a record and everyone is looking for 10 providers. - Before, peers would wait for all peers along the path to return this one provider. However, because there's only one provider, peers won't be able to short-circuit anyways. - Now, peers will go to the end of the path before waiting. This may make some queries slower, but it attempts to give "priority" to peers that actually _need_ responses as opposed to peers that are "optimistically" waiting for responses.
1 parent 08d6e8e commit 7706c7b

File tree

2 files changed

+68
-12
lines changed

2 files changed

+68
-12
lines changed

handlers.go

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"github.com/libp2p/go-libp2p-core/peer"
1111
"github.com/libp2p/go-libp2p-core/peerstore"
12+
kb "github.com/libp2p/go-libp2p-kbucket"
1213
pstore "github.com/libp2p/go-libp2p-peerstore"
1314

1415
"github.com/gogo/protobuf/proto"
@@ -317,23 +318,46 @@ func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb.
317318

318319
resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
319320

321+
// Find closer peers.
322+
closer := dht.betterPeersToQuery(pmes, p, dht.bucketSize)
323+
myBucket := true
324+
if len(closer) > 0 {
325+
// Fill out peer infos.
326+
// TODO: pstore.PeerInfos should move to core (=> peerstore.AddrInfos).
327+
infos := pstore.PeerInfos(dht.peerstore, closer)
328+
resp.CloserPeers = pb.PeerInfosToPBPeers(dht.host.Network(), infos)
329+
330+
// If we have a full bucket of closer peers, check to see if _we're_ in the closest
331+
// set.
332+
if len(closer) >= dht.bucketSize {
333+
// Check to see if _we're_ in the "close" bucket.
334+
// If not, we _may_
335+
peers := append(closer, dht.self)
336+
peers = kb.SortClosestPeers(peers, kb.ConvertKey(string(pmes.GetKey())))
337+
myBucket = peers[len(peers)-1] != dht.self
338+
}
339+
}
340+
320341
// setup providers
321-
providers := dht.ProviderManager.GetProviders(ctx, key)
342+
var providers []peer.ID
343+
if myBucket {
344+
// If we're in the closest set, block getting providers.
345+
providers = dht.ProviderManager.GetProviders(ctx, key)
346+
} else {
347+
// Otherwise, don't block. The peer will find a closer peer.
348+
var err error
349+
providers, err = dht.ProviderManager.GetProvidersNonBlocking(ctx, key)
350+
if err != nil {
351+
logger.Debugw("dropping get providers requests", err)
352+
}
353+
}
322354

323355
if len(providers) > 0 {
324356
// TODO: pstore.PeerInfos should move to core (=> peerstore.AddrInfos).
325357
infos := pstore.PeerInfos(dht.peerstore, providers)
326358
resp.ProviderPeers = pb.PeerInfosToPBPeers(dht.host.Network(), infos)
327359
}
328360

329-
// Also send closer peers.
330-
closer := dht.betterPeersToQuery(pmes, p, dht.bucketSize)
331-
if closer != nil {
332-
// TODO: pstore.PeerInfos should move to core (=> peerstore.AddrInfos).
333-
infos := pstore.PeerInfos(dht.peerstore, closer)
334-
resp.CloserPeers = pb.PeerInfosToPBPeers(dht.host.Network(), infos)
335-
}
336-
337361
return resp, nil
338362
}
339363

providers/providers_manager.go

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ var lruCacheSize = 256
3636
var batchBufferSize = 256
3737
var log = logging.Logger("providers")
3838
var defaultProvideBufferSize = 256
39+
var defaultGetProvidersBufferSize = 16
40+
var defaultGetProvidersNonBlockingBufferSize = defaultGetProvidersBufferSize / 4
3941

4042
// ProviderManager adds and pulls providers out of the datastore,
4143
// caching them in between
@@ -107,9 +109,7 @@ type getProv struct {
107109
func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Batching, opts ...Option) (*ProviderManager, error) {
108110
pm := new(ProviderManager)
109111
pm.nonBlocking = true
110-
// buffer size of one to reduce context switching.
111-
pm.getprovs = make(chan *getProv, 1)
112-
// buffer so we can handle bursts.
112+
pm.getprovs = make(chan *getProv, defaultGetProvidersBufferSize)
113113
pm.newprovs = make(chan *addProv, defaultProvideBufferSize)
114114
pm.dstore = autobatch.NewAutoBatching(dstore, batchBufferSize)
115115
cache, err := lru.NewLRU(lruCacheSize, nil)
@@ -336,6 +336,38 @@ func (pm *ProviderManager) GetProviders(ctx context.Context, k []byte) []peer.ID
336336
}
337337
}
338338

339+
// GetProvidersNonBlocking returns the set of providers for the given key. If the "get providers"
340+
// queue is full, it returns immediately.
341+
//
342+
// This method _does not_ copy the set. Do not modify it.
343+
func (pm *ProviderManager) GetProvidersNonBlocking(ctx context.Context, k []byte) ([]peer.ID, error) {
344+
// If we're "busy", don't even try. This is clearly racy, but it's mostly an "optimistic"
345+
// check anyways and it should stabalize pretty quickly when we're under load.
346+
//
347+
// This helps leave some space for peers that actually need responses.
348+
if len(pm.getprovs) > defaultGetProvidersNonBlockingBufferSize {
349+
return nil, ErrWouldBlock
350+
}
351+
352+
gp := &getProv{
353+
key: k,
354+
resp: make(chan []peer.ID, 1), // buffered to prevent sender from blocking
355+
}
356+
select {
357+
case pm.getprovs <- gp:
358+
default:
359+
return nil, ErrWouldBlock
360+
}
361+
select {
362+
case <-pm.proc.Closing():
363+
return nil, ErrClosing
364+
case <-ctx.Done():
365+
return nil, ctx.Err()
366+
case peers := <-gp.resp:
367+
return peers, nil
368+
}
369+
}
370+
339371
func (pm *ProviderManager) getProvidersForKey(k []byte) ([]peer.ID, error) {
340372
pset, err := pm.getProviderSetForKey(k)
341373
if err != nil {

0 commit comments

Comments
 (0)