From bd9ccc0dee2e1ee8979942f8dc1e1d5fef64c8e7 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 8 Jul 2025 15:13:07 -0700 Subject: [PATCH 1/2] Fix empty VALUES with ordinals grouping (#130861) We should not build the sorted structure for the ordinal grouping operator if the requested position is larger than maxGroupId. This situation occurs with nulls. We should benchmark the ordinal blocks and consider removing the ordinal grouping operator if performance is similar; otherwise, we need to integrate this operator with GroupingAggregatorFunctionTestCase. Relates #130576 --- .../aggregation/ValuesBytesRefAggregator.java | 2 +- .../aggregation/ValuesDoubleAggregator.java | 2 +- .../aggregation/ValuesFloatAggregator.java | 2 +- .../aggregation/ValuesIntAggregator.java | 2 +- .../aggregation/ValuesLongAggregator.java | 2 +- .../aggregation/X-ValuesAggregator.java.st | 2 +- .../elasticsearch/compute/OperatorTests.java | 108 ++++++++++++++++++ 7 files changed, 114 insertions(+), 6 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesBytesRefAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesBytesRefAggregator.java index 01322f939766f..98d57f78c8625 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesBytesRefAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesBytesRefAggregator.java @@ -88,10 +88,10 @@ public static void combineIntermediate(GroupingState state, int groupId, BytesRe } public static void combineStates(GroupingState current, int currentGroupId, GroupingState state, int statePosition) { - var sorted = state.sortedForOrdinalMerging(current); if (statePosition > state.maxGroupId) { return; } + var sorted = state.sortedForOrdinalMerging(current); var start = statePosition > 0 ? sorted.counts[statePosition - 1] : 0; var end = sorted.counts[statePosition]; for (int i = start; i < end; i++) { diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesDoubleAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesDoubleAggregator.java index 9aebe85c0cd89..3c0dcd58c29ee 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesDoubleAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesDoubleAggregator.java @@ -67,10 +67,10 @@ public static void combineIntermediate(GroupingState state, int groupId, DoubleB } public static void combineStates(GroupingState current, int currentGroupId, GroupingState state, int statePosition) { - var sorted = state.sortedForOrdinalMerging(current); if (statePosition > state.maxGroupId) { return; } + var sorted = state.sortedForOrdinalMerging(current); var start = statePosition > 0 ? sorted.counts[statePosition - 1] : 0; var end = sorted.counts[statePosition]; for (int i = start; i < end; i++) { diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesFloatAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesFloatAggregator.java index 28c736783122d..a25d69b712538 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesFloatAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesFloatAggregator.java @@ -66,10 +66,10 @@ public static void combineIntermediate(GroupingState state, int groupId, FloatBl } public static void combineStates(GroupingState current, int currentGroupId, GroupingState state, int statePosition) { - var sorted = state.sortedForOrdinalMerging(current); if (statePosition > state.maxGroupId) { return; } + var sorted = state.sortedForOrdinalMerging(current); var start = statePosition > 0 ? sorted.counts[statePosition - 1] : 0; var end = sorted.counts[statePosition]; for (int i = start; i < end; i++) { diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesIntAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesIntAggregator.java index 39dbcd155954c..2c8c0f409dd5b 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesIntAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesIntAggregator.java @@ -66,10 +66,10 @@ public static void combineIntermediate(GroupingState state, int groupId, IntBloc } public static void combineStates(GroupingState current, int currentGroupId, GroupingState state, int statePosition) { - var sorted = state.sortedForOrdinalMerging(current); if (statePosition > state.maxGroupId) { return; } + var sorted = state.sortedForOrdinalMerging(current); var start = statePosition > 0 ? sorted.counts[statePosition - 1] : 0; var end = sorted.counts[statePosition]; for (int i = start; i < end; i++) { diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesLongAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesLongAggregator.java index a7d3a8fe539df..2790a182d5041 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesLongAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesLongAggregator.java @@ -67,10 +67,10 @@ public static void combineIntermediate(GroupingState state, int groupId, LongBlo } public static void combineStates(GroupingState current, int currentGroupId, GroupingState state, int statePosition) { - var sorted = state.sortedForOrdinalMerging(current); if (statePosition > state.maxGroupId) { return; } + var sorted = state.sortedForOrdinalMerging(current); var start = statePosition > 0 ? sorted.counts[statePosition - 1] : 0; var end = sorted.counts[statePosition]; for (int i = start; i < end; i++) { diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-ValuesAggregator.java.st b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-ValuesAggregator.java.st index d504b63e3ed30..d28cf3342030e 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-ValuesAggregator.java.st +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-ValuesAggregator.java.st @@ -125,10 +125,10 @@ $endif$ } public static void combineStates(GroupingState current, int currentGroupId, GroupingState state, int statePosition) { - var sorted = state.sortedForOrdinalMerging(current); if (statePosition > state.maxGroupId) { return; } + var sorted = state.sortedForOrdinalMerging(current); var start = statePosition > 0 ? sorted.counts[statePosition - 1] : 0; var end = sorted.counts[statePosition]; for (int i = start; i < end; i++) { diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java index 9efff55fc51c8..b9587be7d23cc 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java @@ -11,6 +11,7 @@ import org.apache.lucene.document.Field; import org.apache.lucene.document.LongField; import org.apache.lucene.document.LongPoint; +import org.apache.lucene.document.SortedNumericDocValuesField; import org.apache.lucene.document.SortedSetDocValuesField; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; @@ -35,6 +36,7 @@ import org.elasticsearch.common.util.MockPageCacheRecycler; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.compute.aggregation.CountAggregatorFunction; +import org.elasticsearch.compute.aggregation.ValuesLongAggregatorFunctionSupplier; import org.elasticsearch.compute.aggregation.blockhash.BlockHash; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; @@ -252,6 +254,112 @@ public String toString() { assertThat(blockFactory.breaker().getUsed(), equalTo(0L)); } + // TODO: Remove ordinals grouping operator or enable it GroupingAggregatorFunctionTestCase + public void testValuesWithOrdinalGrouping() throws Exception { + DriverContext driverContext = driverContext(); + BlockFactory blockFactory = driverContext.blockFactory(); + + final int numDocs = between(100, 1000); + Map> expectedValues = new HashMap<>(); + try (BaseDirectoryWrapper dir = newDirectory(); RandomIndexWriter writer = new RandomIndexWriter(random(), dir)) { + String VAL_NAME = "val"; + String KEY_NAME = "key"; + for (int i = 0; i < numDocs; i++) { + Document doc = new Document(); + BytesRef key = new BytesRef(Integer.toString(between(1, 100))); + SortedSetDocValuesField keyField = new SortedSetDocValuesField(KEY_NAME, key); + doc.add(keyField); + if (randomBoolean()) { + int numValues = between(0, 2); + for (int v = 0; v < numValues; v++) { + long val = between(1, 1000); + var valuesField = new SortedNumericDocValuesField(VAL_NAME, val); + doc.add(valuesField); + expectedValues.computeIfAbsent(key, k -> new HashSet<>()).add(val); + } + } + writer.addDocument(doc); + } + writer.commit(); + try (DirectoryReader reader = writer.getReader()) { + List operators = new ArrayList<>(); + if (randomBoolean()) { + operators.add(new ShuffleDocsOperator(blockFactory)); + } + operators.add( + new ValuesSourceReaderOperator( + blockFactory, + List.of( + new ValuesSourceReaderOperator.FieldInfo( + VAL_NAME, + ElementType.LONG, + unused -> new BlockDocValuesReader.LongsBlockLoader(VAL_NAME) + ) + ), + List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> { + throw new UnsupportedOperationException(); + }, 0.2)), + 0 + ) + ); + operators.add( + new OrdinalsGroupingOperator( + shardIdx -> new KeywordFieldMapper.KeywordFieldType(KEY_NAME).blockLoader(mockBlContext()), + List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE, 0.2)), + ElementType.BYTES_REF, + 0, + KEY_NAME, + List.of(new ValuesLongAggregatorFunctionSupplier().groupingAggregatorFactory(INITIAL, List.of(1))), + randomPageSize(), + driverContext + ) + ); + operators.add( + new HashAggregationOperator( + List.of(new ValuesLongAggregatorFunctionSupplier().groupingAggregatorFactory(FINAL, List.of(1))), + () -> BlockHash.build( + List.of(new BlockHash.GroupSpec(0, ElementType.BYTES_REF)), + driverContext.blockFactory(), + randomPageSize(), + false + ), + driverContext + ) + ); + Map> actualValues = new HashMap<>(); + Driver driver = TestDriverFactory.create( + driverContext, + luceneOperatorFactory( + reader, + List.of(new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())), + LuceneOperator.NO_LIMIT + ).get(driverContext), + operators, + new PageConsumerOperator(page -> { + BytesRefBlock keyBlock = page.getBlock(0); + LongBlock valueBlock = page.getBlock(1); + BytesRef spare = new BytesRef(); + for (int p = 0; p < page.getPositionCount(); p++) { + var key = keyBlock.getBytesRef(p, spare); + int valueCount = valueBlock.getValueCount(p); + for (int i = 0; i < valueCount; i++) { + long val = valueBlock.getLong(valueBlock.getFirstValueIndex(p) + i); + boolean added = actualValues.computeIfAbsent(BytesRef.deepCopyOf(key), k -> new HashSet<>()).add(val); + assertTrue(actualValues.toString(), added); + } + } + page.releaseBlocks(); + }) + ); + OperatorTestCase.runDriver(driver); + assertDriverContext(driverContext); + assertThat(actualValues, equalTo(expectedValues)); + org.elasticsearch.common.util.MockBigArrays.ensureAllArraysAreReleased(); + } + } + assertThat(blockFactory.breaker().getUsed(), equalTo(0L)); + } + public void testPushRoundToToQuery() throws IOException { long firstGroupMax = randomLong(); long secondGroupMax = randomLong(); From ef405c30610f590aba29d2f0001814a7202f6486 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Tue, 8 Jul 2025 15:49:11 -0700 Subject: [PATCH 2/2] Fix test --- .../src/test/java/org/elasticsearch/compute/OperatorTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java index b9587be7d23cc..142ceb74fe05f 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java @@ -304,7 +304,7 @@ public void testValuesWithOrdinalGrouping() throws Exception { ); operators.add( new OrdinalsGroupingOperator( - shardIdx -> new KeywordFieldMapper.KeywordFieldType(KEY_NAME).blockLoader(mockBlContext()), + shardIdx -> new KeywordFieldMapper.KeywordFieldType(KEY_NAME).blockLoader(null), List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE, 0.2)), ElementType.BYTES_REF, 0,