Skip to content

perf(queriesObserver): fix O(n²) performance issue in batch updates #9467

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions packages/query-core/src/queriesObserver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ export class QueriesObserver<
#lastCombine?: CombineFn<TCombinedResult>
#lastResult?: Array<QueryObserverResult>
#observerMatches: Array<QueryObserverMatch> = []
#indexMap: WeakMap<QueryObserver, number> = new WeakMap()

constructor(
client: QueryClient,
Expand Down Expand Up @@ -129,6 +130,11 @@ export class QueriesObserver<
this.#observers = newObservers
this.#result = newResult

this.#indexMap = new WeakMap()
newObservers.forEach((observer, index) => {
this.#indexMap.set(observer, index)
})

if (!this.hasListeners()) {
return
}
Expand Down Expand Up @@ -252,8 +258,8 @@ export class QueriesObserver<
}

#onUpdate(observer: QueryObserver, result: QueryObserverResult): void {
const index = this.#observers.indexOf(observer)
if (index !== -1) {
const index = this.#indexMap.get(observer)
if (index !== undefined) {
Comment on lines -255 to +262
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why don’t we just pass the index into onUpdate instead of passing the whole observer and then trying to find the index again? For example:

diff --git a/packages/query-core/src/queriesObserver.ts b/packages/query-core/src/queriesObserver.ts
index 853e490ab..84defa874 100644
--- a/packages/query-core/src/queriesObserver.ts
+++ b/packages/query-core/src/queriesObserver.ts
@@ -63,9 +63,9 @@ export class QueriesObserver<
 
   protected onSubscribe(): void {
     if (this.listeners.size === 1) {
-      this.#observers.forEach((observer) => {
+      this.#observers.forEach((observer, index) => {
         observer.subscribe((result) => {
-          this.#onUpdate(observer, result)
+          this.#onUpdate(index, result)
         })
       })
     }
@@ -137,9 +137,9 @@ export class QueriesObserver<
         observer.destroy()
       })
 
-      difference(newObservers, prevObservers).forEach((observer) => {
+      difference(newObservers, prevObservers).forEach((observer, index) => {
         observer.subscribe((result) => {
-          this.#onUpdate(observer, result)
+          this.#onUpdate(index, result)
         })
       })
 
@@ -251,12 +251,9 @@ export class QueriesObserver<
     return observers
   }
 
-  #onUpdate(observer: QueryObserver, result: QueryObserverResult): void {
-    const index = this.#observers.indexOf(observer)
-    if (index !== -1) {
-      this.#result = replaceAt(this.#result, index, result)
-      this.#notify()
-    }
+  #onUpdate(index: number, result: QueryObserverResult): void {
+    this.#result = replaceAt(this.#result, index, result)
+    this.#notify()
   }
 
   #notify(): void {

Copy link
Contributor Author

@joseph0926 joseph0926 Jul 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the suggestion
I considered passing the index directly, but noticed these potential issues

When setQueries() reorders observers, existing subscriptions keep their old captured indices

// Initial: observers = [A, B, C]
// A subscribes with index=0 in onSubscribe()

// After setQueries(): observers = [D, A, B, C]  
// A still uses index=0, but should use index=1

The index from difference().forEach() is not the actual position in this.#observers

difference(newObservers, prevObservers).forEach((observer, index) => {
  // If difference returns [D, E], index is 0,1
  // But their actual positions in observers might be [A, D, B, E, C] -> 1,3
  observer.subscribe((result) => {
    this.#onUpdate(index, result)  // <- I think there is a problem here.
  })
})

What do you think about these concerns?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, can you then please add a test case where the implementation I suggested would fail.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import { afterEach, beforeEach, describe, expect, test, vi } from 'vitest'
import { queryKey } from '@tanstack/query-test-utils'
import { QueriesObserver, QueryClient } from '..'
import type { QueryObserverResult } from '..'

describe('queriesObserver - index tracking issue', () => {
  let queryClient: QueryClient

  beforeEach(() => {
    vi.useFakeTimers()
    queryClient = new QueryClient()
    queryClient.mount()
  })

  afterEach(() => {
    queryClient.clear()
    vi.useRealTimers()
  })

  test('should fail when using forEach index with dynamic query changes', async () => {
    const key1 = queryKey()
    const key2 = queryKey()
    const key3 = queryKey()
    const key4 = queryKey()

    const queryFn1 = vi.fn().mockReturnValue('data1')
    const queryFn2 = vi.fn().mockReturnValue('data2')
    const queryFn3 = vi.fn().mockReturnValue('data3')
    const queryFn4 = vi.fn().mockReturnValue('data4')

    const observer = new QueriesObserver(queryClient, [
      { queryKey: key1, queryFn: queryFn1 },
      { queryKey: key2, queryFn: queryFn2 },
    ])

    const results: Array<Array<QueryObserverResult>> = []
    const unsubscribe = observer.subscribe((result) => {
      results.push([...result])
    })

    await vi.advanceTimersByTimeAsync(0)

    results.length = 0

    observer.setQueries([
      { queryKey: key3, queryFn: queryFn3 },
      { queryKey: key1, queryFn: queryFn1 },
      { queryKey: key4, queryFn: queryFn4 },
      { queryKey: key2, queryFn: queryFn2 },
    ])

    await vi.advanceTimersByTimeAsync(0)

    queryClient.setQueryData(key3, 'updated3')
    queryClient.setQueryData(key4, 'updated4')

    queryClient.setQueryData(key1, 'updated1')

    unsubscribe()

    const finalResult = results[results.length - 1]

    expect(finalResult).toHaveLength(4)
    expect(finalResult?.[0]).toMatchObject({ data: 'updated3' })
    expect(finalResult?.[1]).toMatchObject({ data: 'updated1' })
    expect(finalResult?.[2]).toMatchObject({ data: 'updated4' })
    expect(finalResult?.[3]).toMatchObject({ data: 'data2' })
  })

  test('should fail when reordering queries with existing subscriptions', async () => {
    const key1 = queryKey()
    const key2 = queryKey()
    const key3 = queryKey()

    let updateCount = 0
    const queryFn1 = vi.fn().mockImplementation(() => `data1-${++updateCount}`)
    const queryFn2 = vi.fn().mockImplementation(() => `data2-${++updateCount}`)
    const queryFn3 = vi.fn().mockImplementation(() => `data3-${++updateCount}`)

    const observer = new QueriesObserver(queryClient, [
      { queryKey: key1, queryFn: queryFn1 },
      { queryKey: key2, queryFn: queryFn2 },
      { queryKey: key3, queryFn: queryFn3 },
    ])

    const results: Array<Array<QueryObserverResult>> = []
    const unsubscribe = observer.subscribe((result) => {
      results.push([...result])
    })

    await vi.advanceTimersByTimeAsync(0)

    results.length = 0

    observer.setQueries([
      { queryKey: key3, queryFn: queryFn3 },
      { queryKey: key1, queryFn: queryFn1 },
      { queryKey: key2, queryFn: queryFn2 },
    ])

    await queryClient.invalidateQueries({ queryKey: key1 })
    await vi.advanceTimersByTimeAsync(0)

    const resultAfterInvalidate = results[results.length - 1]

    expect(resultAfterInvalidate?.[1]?.data).toMatch(/data1-\d+/)
    expect(resultAfterInvalidate?.[0]?.data).toBe('data3-3')
    expect(resultAfterInvalidate?.[2]?.data).toBe('data2-2')

    unsubscribe()
  })
})
queries-test

the failure only occurs with the version that captures the index in the subscription callback. When I run the same tests against either the current main branch’s queriesObserver.ts or the queriesObserver.ts in my PR, all of the tests pass without any issues.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe you, but then please add the test case to the PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following your suggestion, I've created and run various performance tests, and I'd like to share the results. While theoretically replacing indexOf O(n) with WeakMap O(1) should improve performance, the actual test results were different from what I expected.

  • Even in extreme cases with 10,000 queries, there was no meaningful performance difference
  • In some cases, the WeakMap version was actually slightly slower

It seems that other optimizations like using Set in the difference function and caching in findMatchingObservers have already resolved the main O(n²) issues.
I apologize for taking up your valuable time reviewing a PR that ultimately doesn't provide substantial improvements. I would appreciate it if you could close the PR.
This has been a great learning experience, and I'm grateful for your time and feedback. Thank you for the opportunity to contribute to the project.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For reference, I am sharing one of the codes I tested.

import { afterEach, beforeEach, describe, expect, test } from 'vitest'
import { QueriesObserver, QueryClient } from '..'
import type { QueryObserverOptions } from '..'

describe('QueriesObserver extreme cases - Real-time Dashboard', () => {
  let queryClient: QueryClient

  beforeEach(() => {
    queryClient = new QueryClient({
      defaultOptions: {
        queries: {
          retry: false,
        },
      },
    })
    queryClient.mount()
  })

  afterEach(() => {
    queryClient.clear()
  })

  test('should handle real-time dashboard with 10,000+ queries where last ones update frequently', async () => {
    const TOTAL_QUERIES = 10000
    const ACTIVE_QUERIES = 100
    const UPDATE_ROUNDS = 10

    console.log(`\n Real-time Dashboard Simulation:`)
    console.log(`Total queries: ${TOTAL_QUERIES}`)
    console.log(`Active queries (frequently updating): ${ACTIVE_QUERIES}`)
    console.log(`Update rounds: ${UPDATE_ROUNDS}\n`)

    const createQueries = (round: number = 0): Array<QueryObserverOptions> => {
      return Array.from({ length: TOTAL_QUERIES }, (_, i) => {
        const isActive = i >= TOTAL_QUERIES - ACTIVE_QUERIES

        return {
          queryKey: ['dashboard', i, round],
          queryFn: async () => {
            await new Promise((resolve) => setTimeout(resolve, 0))
            return {
              id: i,
              data: `${round === 0 ? 'initial' : `update-${round}`}-${i}`,
              timestamp: Date.now(),
              isActive,
            }
          },

          staleTime: isActive ? 0 : Infinity,
        }
      })
    }

    const startSetup = performance.now()
    const observer = new QueriesObserver(queryClient, createQueries(0))

    let totalUpdates = 0
    const updateTimes: Array<number> = []
    let lastUpdateTime = performance.now()
    const updateMetrics: Array<{
      round: number
      updateCount: number
      duration: number
      avgTimePerUpdate: number
    }> = []

    const unsubscribe = observer.subscribe(() => {
      const now = performance.now()
      updateTimes.push(now - lastUpdateTime)
      lastUpdateTime = now
      totalUpdates++
    })

    const setupTime = performance.now() - startSetup
    console.log(`Setup time: ${setupTime.toFixed(1)}ms`)

    console.log('\n Initial load...')
    const initialLoadStart = performance.now()

    await new Promise((resolve) => setTimeout(resolve, 100))

    const initialLoadTime = performance.now() - initialLoadStart
    const initialUpdateCount = totalUpdates
    console.log(
      `Initial load completed: ${initialLoadTime.toFixed(1)}ms, ${initialUpdateCount} updates`,
    )

    console.log('\n Starting frequent updates on active queries...')

    for (let round = 1; round <= UPDATE_ROUNDS; round++) {
      const roundStart = performance.now()
      const updateCountBefore = totalUpdates

      const updatedQueries = Array.from({ length: TOTAL_QUERIES }, (_, i) => {
        const isActive = i >= TOTAL_QUERIES - ACTIVE_QUERIES

        if (isActive) {
          return {
            queryKey: ['dashboard', i, round],
            queryFn: async () => {
              await new Promise((resolve) => setTimeout(resolve, 0))
              return {
                id: i,
                data: `update-${round}-${i}`,
                timestamp: Date.now(),
                isActive: true,
              }
            },
            staleTime: 0,
          }
        } else {
          return {
            queryKey: ['dashboard', i, 0],
            queryFn: async () => {
              return {
                id: i,
                data: `initial-${i}`,
                timestamp: Date.now(),
                isActive: false,
              }
            },
            staleTime: Infinity,
          }
        }
      })

      observer.setQueries(updatedQueries)

      await new Promise((resolve) => setTimeout(resolve, 50))

      const roundDuration = performance.now() - roundStart
      const roundUpdates = totalUpdates - updateCountBefore

      updateMetrics.push({
        round,
        updateCount: roundUpdates,
        duration: roundDuration,
        avgTimePerUpdate: roundDuration / ACTIVE_QUERIES,
      })
    }

    console.log('\n Performance Analysis:')
    console.log('Round | Duration | Updates | Avg/Update')
    console.log('------|----------|---------|------------')

    updateMetrics.forEach(
      ({ round, duration, updateCount, avgTimePerUpdate }) => {
        console.log(
          `${round.toString().padEnd(5)} | ${duration.toFixed(1).padEnd(8)}ms | ${updateCount.toString().padEnd(7)} | ${avgTimePerUpdate.toFixed(2)}ms`,
        )
      },
    )

    const validMetrics = updateMetrics.filter((m) => m.updateCount > 0)
    if (validMetrics.length >= 2) {
      const firstRoundAvg = validMetrics[0].avgTimePerUpdate
      const lastRoundAvg =
        validMetrics[validMetrics.length - 1].avgTimePerUpdate
      const degradation = lastRoundAvg / firstRoundAvg

      console.log(`\n Performance degradation: ${degradation.toFixed(2)}x`)
      console.log(`Total updates processed: ${totalUpdates}`)

      if (updateTimes.length > 100) {
        const sortedTimes = [...updateTimes].sort((a, b) => a - b)
        const p50 = sortedTimes[Math.floor(sortedTimes.length * 0.5)]
        const p95 = sortedTimes[Math.floor(sortedTimes.length * 0.95)]
        const p99 = sortedTimes[Math.floor(sortedTimes.length * 0.99)]

        console.log('\n Update time distribution:')
        console.log(`P50: ${p50?.toFixed(2)}ms`)
        console.log(`P95: ${p95?.toFixed(2)}ms`)
        console.log(`P99: ${p99?.toFixed(2)}ms`)
      }

      expect(degradation).toBeLessThan(3.0)
    }

    unsubscribe()
  })

  test('should demonstrate O(n) vs O(n²) behavior with increasing observer counts', async () => {
    const testSizes = [1000, 2000, 4000, 8000]
    const results: Array<{
      size: number
      setupTime: number
      updateTime: number
      timePerQuery: number
    }> = []

    console.log('\n Scalability Analysis:')

    for (const size of testSizes) {
      const activeCount = Math.min(100, size / 10)

      const queries = Array.from({ length: size }, (_, i) => ({
        queryKey: ['scale-test', size, i],
        queryFn: async () => ({ id: i, data: `result-${i}` }),
        staleTime: i >= size - activeCount ? 0 : Infinity,
      }))

      const setupStart = performance.now()
      const observer = new QueriesObserver(queryClient, queries)

      let updateCount = 0
      const unsubscribe = observer.subscribe(() => {
        updateCount++
      })

      const setupTime = performance.now() - setupStart

      await new Promise((resolve) => setTimeout(resolve, 50))

      const updateStart = performance.now()

      const updatedQueries = queries.map((q, i) => {
        if (i >= size - activeCount) {
          return {
            ...q,
            queryKey: ['scale-test', size, i, 'updated'],
          }
        }
        return q
      })

      observer.setQueries(updatedQueries)
      await new Promise((resolve) => setTimeout(resolve, 50))

      const updateTime = performance.now() - updateStart

      results.push({
        size,
        setupTime,
        updateTime,
        timePerQuery: updateTime / activeCount,
      })

      unsubscribe()
      await queryClient.clear()
    }

    console.log('Size  | Setup Time | Update Time | Time/Query | Growth')
    console.log('------|------------|-------------|------------|--------')

    results.forEach((result, i) => {
      const growth =
        i > 0
          ? (result.timePerQuery / results[0].timePerQuery).toFixed(2)
          : '1.00'
      console.log(
        `${result.size.toString().padEnd(5)} | ${result.setupTime.toFixed(1).padEnd(10)}ms | ${result.updateTime.toFixed(1).padEnd(11)}ms | ${result.timePerQuery.toFixed(2).padEnd(10)}ms | ${growth}x`,
      )
    })

    const timePerQueryRatios = results.map((r) => r.timePerQuery)
    const maxRatio = Math.max(...timePerQueryRatios)
    const minRatio = Math.min(...timePerQueryRatios)
    const varianceRatio = maxRatio / minRatio

    console.log(
      `\nTime per query variance: ${varianceRatio.toFixed(2)}x (should be close to 1.0 for O(n))`,
    )

    expect(varianceRatio).toBeLessThan(3.0)
  })
})

this.#result = replaceAt(this.#result, index, result)
this.#notify()
}
Expand Down
Loading