From 34988784a55471342c1ef7ab4965acd40f89221b Mon Sep 17 00:00:00 2001 From: Viktor Debulat Date: Sun, 17 Aug 2025 21:32:23 +0200 Subject: [PATCH] feat: propagate error later for streams --- .../src/orchestrator/QueryCache.ts | 24 +++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts index d24bc065663be..bf754359453c0 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts @@ -498,11 +498,27 @@ export class QueryCache { if (client.stream) { tableData = await client.stream(q.query, q.values, q); const errors = []; - await pipeline(tableData.rowStream, writer, (err) => { - if (err) { - errors.push(err); + + const iterator = tableData.rowStream[Symbol.asyncIterator](); + try { + let result = await iterator.next(); + while (!result.done) { + try { + writer.write(result.value); + } catch (writeErr) { + errors.push(writeErr); + } + result = await iterator.next(); } - }); + } catch (streamErr) { + errors.push(streamErr); + } + try { + writer.end(); + } catch (endErr) { + errors.push(endErr); + } + if (errors.length > 0) { throw new Error(`Lambda query errors ${errors.join(', ')}`); }