diff --git a/packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts b/packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts index 7187683388da1..9da2518b55b91 100644 --- a/packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts +++ b/packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts @@ -136,12 +136,31 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface { } public async getActiveAndToProcess(): Promise { + const active: QueryKeysTuple[] = []; + const toProcess: QueryKeysTuple[] = []; + + const rows = await this.driver.query('QUEUE LIST ?', [ + this.options.redisQueuePrefix + ]); + if (rows.length) { + for (const row of rows) { + if (row.status === 'active') { + active.push([ + row.id as QueryKeyHash, + row.queue_id ? parseInt(row.queue_id, 10) : null, + ]); + } else { + toProcess.push([ + row.id as QueryKeyHash, + row.queue_id ? parseInt(row.queue_id, 10) : null, + ]); + } + } + } + return [ - // We don't return active queries, because it's useless - // There is only one place where it's used, and it's QueryQueue.reconcileQueueImpl - // Cube Store provides strict guarantees that queue item cannot be active & pending in the same time - [], - await this.getToProcessQueries() + active, + toProcess, ]; } diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js b/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js index 06fbe8f7d49ae..4b4e5b9e70da3 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js @@ -564,7 +564,16 @@ export class QueryQueue { } })); - const [_active, toProcess] = await queueConnection.getActiveAndToProcess(); + const [active, toProcess] = await queueConnection.getActiveAndToProcess(); + + /** + * Important notice: Concurrency configuration works per a specific queue, not per node. + * + * In production clusters where it contains N nodes, it shares the same concurrency. It leads to a point + * where every node tries to pick up jobs as much as concurrency is defined for the whole cluster. To minimize + * the effect of competition between nodes, it's important to reduce the number of tries to process by active jobs. + */ + const toProcessLimit = active.length >= this.concurrency ? 1 : this.concurrency - active.length; await Promise.all( R.pipe( @@ -581,7 +590,7 @@ export class QueryQueue { return false; } }), - R.take(this.concurrency), + R.take(toProcessLimit), R.map((([queryKey, queueId]) => this.sendProcessMessageFn(queryKey, queueId))) )(toProcess) ); @@ -740,8 +749,8 @@ export class QueryQueue { } /** - * Processing query specified by the `queryKey`. This method encapsulate most - * of the logic related with the queues updates, heartbeat, etc. + * Processing query specified by the `queryKey`. This method encapsulates most + * of the logic related to the queue updates, heartbeat, etc. * * @param {QueryKeyHash} queryKeyHashed * @param {QueueId | null} queueId Supported by new Cube Store and Memory