diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesBytesRefAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesBytesRefAggregator.java index 992fff70f90a0..cb0dff8a86dc5 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesBytesRefAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesBytesRefAggregator.java @@ -90,10 +90,10 @@ public static void combineIntermediate(GroupingState state, int groupId, BytesRe } public static void combineStates(GroupingState current, int currentGroupId, GroupingState state, int statePosition) { - var sorted = state.sortedForOrdinalMerging(current); if (statePosition > state.maxGroupId) { return; } + var sorted = state.sortedForOrdinalMerging(current); var start = statePosition > 0 ? sorted.counts[statePosition - 1] : 0; var end = sorted.counts[statePosition]; for (int i = start; i < end; i++) { diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesDoubleAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesDoubleAggregator.java index 9aebe85c0cd89..3c0dcd58c29ee 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesDoubleAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesDoubleAggregator.java @@ -67,10 +67,10 @@ public static void combineIntermediate(GroupingState state, int groupId, DoubleB } public static void combineStates(GroupingState current, int currentGroupId, GroupingState state, int statePosition) { - var sorted = state.sortedForOrdinalMerging(current); if (statePosition > state.maxGroupId) { return; } + var sorted = state.sortedForOrdinalMerging(current); var start = statePosition > 0 ? sorted.counts[statePosition - 1] : 0; var end = sorted.counts[statePosition]; for (int i = start; i < end; i++) { diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesFloatAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesFloatAggregator.java index 28c736783122d..a25d69b712538 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesFloatAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesFloatAggregator.java @@ -66,10 +66,10 @@ public static void combineIntermediate(GroupingState state, int groupId, FloatBl } public static void combineStates(GroupingState current, int currentGroupId, GroupingState state, int statePosition) { - var sorted = state.sortedForOrdinalMerging(current); if (statePosition > state.maxGroupId) { return; } + var sorted = state.sortedForOrdinalMerging(current); var start = statePosition > 0 ? sorted.counts[statePosition - 1] : 0; var end = sorted.counts[statePosition]; for (int i = start; i < end; i++) { diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesIntAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesIntAggregator.java index 39dbcd155954c..2c8c0f409dd5b 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesIntAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesIntAggregator.java @@ -66,10 +66,10 @@ public static void combineIntermediate(GroupingState state, int groupId, IntBloc } public static void combineStates(GroupingState current, int currentGroupId, GroupingState state, int statePosition) { - var sorted = state.sortedForOrdinalMerging(current); if (statePosition > state.maxGroupId) { return; } + var sorted = state.sortedForOrdinalMerging(current); var start = statePosition > 0 ? sorted.counts[statePosition - 1] : 0; var end = sorted.counts[statePosition]; for (int i = start; i < end; i++) { diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesLongAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesLongAggregator.java index a7d3a8fe539df..2790a182d5041 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesLongAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesLongAggregator.java @@ -67,10 +67,10 @@ public static void combineIntermediate(GroupingState state, int groupId, LongBlo } public static void combineStates(GroupingState current, int currentGroupId, GroupingState state, int statePosition) { - var sorted = state.sortedForOrdinalMerging(current); if (statePosition > state.maxGroupId) { return; } + var sorted = state.sortedForOrdinalMerging(current); var start = statePosition > 0 ? sorted.counts[statePosition - 1] : 0; var end = sorted.counts[statePosition]; for (int i = start; i < end; i++) { diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-ValuesAggregator.java.st b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-ValuesAggregator.java.st index 6a6adca143e94..fa8ffecea052d 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-ValuesAggregator.java.st +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-ValuesAggregator.java.st @@ -129,10 +129,10 @@ $endif$ } public static void combineStates(GroupingState current, int currentGroupId, GroupingState state, int statePosition) { - var sorted = state.sortedForOrdinalMerging(current); if (statePosition > state.maxGroupId) { return; } + var sorted = state.sortedForOrdinalMerging(current); var start = statePosition > 0 ? sorted.counts[statePosition - 1] : 0; var end = sorted.counts[statePosition]; for (int i = start; i < end; i++) { diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java index 6d345d510f4b0..e3ddbe0b58aed 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java @@ -11,6 +11,7 @@ import org.apache.lucene.document.Field; import org.apache.lucene.document.LongField; import org.apache.lucene.document.LongPoint; +import org.apache.lucene.document.SortedNumericDocValuesField; import org.apache.lucene.document.SortedSetDocValuesField; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; @@ -35,6 +36,7 @@ import org.elasticsearch.common.util.MockPageCacheRecycler; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.compute.aggregation.CountAggregatorFunction; +import org.elasticsearch.compute.aggregation.ValuesLongAggregatorFunctionSupplier; import org.elasticsearch.compute.aggregation.blockhash.BlockHash; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; @@ -254,6 +256,112 @@ public String toString() { assertThat(blockFactory.breaker().getUsed(), equalTo(0L)); } + // TODO: Remove ordinals grouping operator or enable it GroupingAggregatorFunctionTestCase + public void testValuesWithOrdinalGrouping() throws Exception { + DriverContext driverContext = driverContext(); + BlockFactory blockFactory = driverContext.blockFactory(); + + final int numDocs = between(100, 1000); + Map> expectedValues = new HashMap<>(); + try (BaseDirectoryWrapper dir = newDirectory(); RandomIndexWriter writer = new RandomIndexWriter(random(), dir)) { + String VAL_NAME = "val"; + String KEY_NAME = "key"; + for (int i = 0; i < numDocs; i++) { + Document doc = new Document(); + BytesRef key = new BytesRef(Integer.toString(between(1, 100))); + SortedSetDocValuesField keyField = new SortedSetDocValuesField(KEY_NAME, key); + doc.add(keyField); + if (randomBoolean()) { + int numValues = between(0, 2); + for (int v = 0; v < numValues; v++) { + long val = between(1, 1000); + var valuesField = new SortedNumericDocValuesField(VAL_NAME, val); + doc.add(valuesField); + expectedValues.computeIfAbsent(key, k -> new HashSet<>()).add(val); + } + } + writer.addDocument(doc); + } + writer.commit(); + try (DirectoryReader reader = writer.getReader()) { + List operators = new ArrayList<>(); + if (randomBoolean()) { + operators.add(new ShuffleDocsOperator(blockFactory)); + } + operators.add( + new ValuesSourceReaderOperator( + blockFactory, + List.of( + new ValuesSourceReaderOperator.FieldInfo( + VAL_NAME, + ElementType.LONG, + unused -> new BlockDocValuesReader.LongsBlockLoader(VAL_NAME) + ) + ), + List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> { + throw new UnsupportedOperationException(); + }, 0.2)), + 0 + ) + ); + operators.add( + new OrdinalsGroupingOperator( + shardIdx -> new KeywordFieldMapper.KeywordFieldType(KEY_NAME).blockLoader(mockBlContext()), + List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE, 0.2)), + ElementType.BYTES_REF, + 0, + KEY_NAME, + List.of(new ValuesLongAggregatorFunctionSupplier().groupingAggregatorFactory(INITIAL, List.of(1))), + randomPageSize(), + driverContext + ) + ); + operators.add( + new HashAggregationOperator( + List.of(new ValuesLongAggregatorFunctionSupplier().groupingAggregatorFactory(FINAL, List.of(1))), + () -> BlockHash.build( + List.of(new BlockHash.GroupSpec(0, ElementType.BYTES_REF)), + driverContext.blockFactory(), + randomPageSize(), + false + ), + driverContext + ) + ); + Map> actualValues = new HashMap<>(); + Driver driver = TestDriverFactory.create( + driverContext, + luceneOperatorFactory( + reader, + List.of(new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())), + LuceneOperator.NO_LIMIT + ).get(driverContext), + operators, + new PageConsumerOperator(page -> { + BytesRefBlock keyBlock = page.getBlock(0); + LongBlock valueBlock = page.getBlock(1); + BytesRef spare = new BytesRef(); + for (int p = 0; p < page.getPositionCount(); p++) { + var key = keyBlock.getBytesRef(p, spare); + int valueCount = valueBlock.getValueCount(p); + for (int i = 0; i < valueCount; i++) { + long val = valueBlock.getLong(valueBlock.getFirstValueIndex(p) + i); + boolean added = actualValues.computeIfAbsent(BytesRef.deepCopyOf(key), k -> new HashSet<>()).add(val); + assertTrue(actualValues.toString(), added); + } + } + page.releaseBlocks(); + }) + ); + OperatorTestCase.runDriver(driver); + assertDriverContext(driverContext); + assertThat(actualValues, equalTo(expectedValues)); + org.elasticsearch.common.util.MockBigArrays.ensureAllArraysAreReleased(); + } + } + assertThat(blockFactory.breaker().getUsed(), equalTo(0L)); + } + public void testPushRoundToToQuery() throws IOException { long firstGroupMax = randomLong(); long secondGroupMax = randomLong();