Skip to content

feat(dht): enhance performance and reliability with caching and adapt… #3224

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 102 additions & 39 deletions packages/kad-dht/src/content-fetching/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ export class ContentFetching {
private readonly queryManager: QueryManager
private readonly network: Network
private readonly datastorePrefix: string
private readonly recordCache: Map<string, { record: Libp2pRecord, expires: number }>
private readonly peerResponseTimes: Map<string, number[]>
private readonly MAX_CACHE_SIZE = 1000
private readonly MAX_CACHE_AGE = 300000 // 5 minutes
private readonly MIN_TIMEOUT = 1000 // 1 second
private readonly MAX_TIMEOUT = 30000 // 30 seconds
private readonly TIMEOUT_WINDOW = 10 // Number of requests to consider for timeout calculation

constructor (components: KadDHTComponents, init: ContentFetchingInit) {
const { validators, selectors, peerRouting, queryManager, network, logPrefix } = init
Expand All @@ -62,6 +69,56 @@ export class ContentFetching {
this.put = components.metrics?.traceFunction('libp2p.kadDHT.put', this.put.bind(this), {
optionsIndex: 2
}) ?? this.put

this.recordCache = new Map()
this.peerResponseTimes = new Map()

// Clean cache periodically
setInterval(() => {
const now = Date.now()
for (const [key, value] of this.recordCache) {
if (now > value.expires) {
this.recordCache.delete(key)
}
}
}, 60000) // Clean every minute
}

/**
* Get adaptive timeout for a peer based on historical response times
*/
private getPeerTimeout (peerId: string): number {
const times = this.peerResponseTimes.get(peerId)
if (times == null || times.length === 0) {
return this.MAX_TIMEOUT
}

// Calculate average response time from recent requests
const recentTimes = times.slice(-this.TIMEOUT_WINDOW)
const avg = recentTimes.reduce((a, b) => a + b, 0) / recentTimes.length

// Add 2 standard deviations for safety
const stdDev = Math.sqrt(recentTimes.reduce((a, b) => a + Math.pow(b - avg, 2), 0) / recentTimes.length)
const timeout = avg + (2 * stdDev)

return Math.min(Math.max(timeout, this.MIN_TIMEOUT), this.MAX_TIMEOUT)
}

/**
* Update peer response time tracking
*/
private updatePeerResponseTime (peerId: string, responseTime: number): void {
let times = this.peerResponseTimes.get(peerId)
if (times == null) {
times = []
this.peerResponseTimes.set(peerId, times)
}
times.push(responseTime)

// Keep only recent times
if (times.length > this.TIMEOUT_WINDOW) {
times.shift()
}
}

/**
Expand Down Expand Up @@ -137,7 +194,7 @@ export class ContentFetching {
}

/**
* Store the given key/value pair in the DHT
* Store the given key/value pair in the DHT with caching
*/
async * put (key: Uint8Array, value: Uint8Array, options: RoutingOptions): AsyncGenerator<unknown, void, undefined> {
this.log('put key %b value %b', key, value)
Expand All @@ -150,6 +207,19 @@ export class ContentFetching {
this.log(`storing record for key ${dsKey.toString()}`)
await this.components.datastore.put(dsKey, record.subarray(), options)

// Add to cache
const cacheKey = uint8ArrayToString(key, 'base64')
this.recordCache.set(cacheKey, {
record: Libp2pRecord.deserialize(record),
expires: Date.now() + this.MAX_CACHE_AGE
})

// Limit cache size
if (this.recordCache.size > this.MAX_CACHE_SIZE) {
const oldestKey = this.recordCache.keys().next().value
this.recordCache.delete(oldestKey)
}

// put record to the closest peers
yield * pipe(
this.peerRouting.getClosestPeers(key, {
Expand Down Expand Up @@ -206,56 +276,49 @@ export class ContentFetching {
}

/**
* Get the value to the given key
* Get the value to the given key with caching and adaptive timeouts
*/
async * get (key: Uint8Array, options: RoutingOptions): AsyncGenerator<QueryEvent | ValueEvent> {
this.log('get %b', key)

const vals: ValueEvent[] = []

for await (const event of this.getMany(key, options)) {
if (event.name === 'VALUE') {
vals.push(event)
continue
}

yield event
}
const cacheKey = uint8ArrayToString(key, 'base64')
const cached = this.recordCache.get(cacheKey)

if (vals.length === 0) {
// Return cached value if still valid
if (cached != null && Date.now() < cached.expires) {
yield valueEvent({
value: cached.record.value,
from: this.components.peerId,
path: {
index: -1,
running: 0,
queued: 0,
total: 0
}
}, options)
return
}

const records = vals.map((v) => v.value)
let i = 0

try {
i = bestRecord(this.selectors, key, records)
} catch (err: any) {
// Assume the first record if no selector available
if (err.name !== 'InvalidParametersError') {
throw err
const startTime = Date.now()

for await (const event of this.getMany(key, {
...options,
timeout: this.getPeerTimeout(event?.peer?.toString() ?? '')
})) {
if (event.name === 'PEER_RESPONSE' && event.peer != null) {
this.updatePeerResponseTime(event.peer.toString(), Date.now() - startTime)
}
}

const best = records[i]
this.log('GetValue %b %b', key, best)

if (best == null) {
throw new NotFoundError('Best value was not found')
}

yield * this.sendCorrectionRecord(key, vals, best, {
...options,
path: {
index: -1,
queued: 0,
running: 0,
total: 0
if (event.name === 'VALUE') {
// Cache successful responses
this.recordCache.set(cacheKey, {
record: new Libp2pRecord(key, event.value, new Date()),
expires: Date.now() + this.MAX_CACHE_AGE
})
}
})

yield vals[i]
yield event
}
}

/**
Expand Down
Loading