diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesAggregatorBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesAggregatorBenchmark.java index 879418e7f954c..dfd56996e1c15 100644 --- a/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesAggregatorBenchmark.java +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesAggregatorBenchmark.java @@ -95,8 +95,7 @@ static void selfTest() { try { for (String groups : ValuesAggregatorBenchmark.class.getField("groups").getAnnotationsByType(Param.class)[0].value()) { for (String dataType : ValuesAggregatorBenchmark.class.getField("dataType").getAnnotationsByType(Param.class)[0].value()) { - run(Integer.parseInt(groups), dataType, 10, 0); - run(Integer.parseInt(groups), dataType, 10, 1); + run(Integer.parseInt(groups), dataType, 10); } } } catch (NoSuchFieldException e) { @@ -114,10 +113,7 @@ static void selfTest() { @Param({ BYTES_REF, INT, LONG }) public String dataType; - @Param({ "0", "1" }) - public int numOrdinalMerges; - - private static Operator operator(DriverContext driverContext, int groups, String dataType, int numOrdinalMerges) { + private static Operator operator(DriverContext driverContext, int groups, String dataType) { if (groups == 1) { return new AggregationOperator( List.of(supplier(dataType).aggregatorFactory(AggregatorMode.SINGLE, List.of(0)).apply(driverContext)), @@ -132,20 +128,8 @@ private static Operator operator(DriverContext driverContext, int groups, String ) { @Override public Page getOutput() { - mergeOrdinal(); return super.getOutput(); } - - // simulate OrdinalsGroupingOperator - void mergeOrdinal() { - var merged = supplier(dataType).groupingAggregatorFactory(AggregatorMode.SINGLE, List.of(1)).apply(driverContext); - for (int i = 0; i < numOrdinalMerges; i++) { - for (int p = 0; p < groups; p++) { - merged.addIntermediateRow(p, aggregators.getFirst(), p); - } - } - aggregators.set(0, merged); - } }; } @@ -352,12 +336,12 @@ private static Block groupingBlock(int groups) { @Benchmark public void run() { - run(groups, dataType, OP_COUNT, numOrdinalMerges); + run(groups, dataType, OP_COUNT); } - private static void run(int groups, String dataType, int opCount, int numOrdinalMerges) { + private static void run(int groups, String dataType, int opCount) { DriverContext driverContext = driverContext(); - try (Operator operator = operator(driverContext, groups, dataType, numOrdinalMerges)) { + try (Operator operator = operator(driverContext, groups, dataType)) { Page page = page(groups, dataType); for (int i = 0; i < opCount; i++) { operator.addInput(page.shallowCopy()); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/GroupingAggregator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/GroupingAggregator.java index 931ebcb7afe49..e21f6205fb690 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/GroupingAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/GroupingAggregator.java @@ -70,13 +70,6 @@ public void close() {} } } - /** - * Add the position-th row from the intermediate output of the given aggregator to this aggregator at the groupId position - */ - public void addIntermediateRow(int groupId, GroupingAggregator input, int position) { - aggregatorFunction.addIntermediateRowInput(groupId, input.aggregatorFunction, position); - } - /** * Build the results for this aggregation. * @param selected the groupIds that have been selected to be included in diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/GroupingAggregatorFunction.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/GroupingAggregatorFunction.java index 0de8bb9896d64..4bc7d91ad6037 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/GroupingAggregatorFunction.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/GroupingAggregatorFunction.java @@ -130,6 +130,7 @@ default void add(int positionOffset, IntBlock groupIds) { /** * Add the position-th row from the intermediate output of the given aggregator function to the groupId + * TODO: Remove this method as the grouping operator has been removed */ void addIntermediateRowInput(int groupId, GroupingAggregatorFunction input, int position); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java deleted file mode 100644 index 58466cffee78e..0000000000000 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java +++ /dev/null @@ -1,647 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -package org.elasticsearch.compute.operator; - -import org.apache.lucene.index.DocValues; -import org.apache.lucene.index.SortedDocValues; -import org.apache.lucene.index.SortedSetDocValues; -import org.apache.lucene.util.BytesRef; -import org.apache.lucene.util.BytesRefBuilder; -import org.apache.lucene.util.PriorityQueue; -import org.elasticsearch.common.CheckedSupplier; -import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.common.util.BitArray; -import org.elasticsearch.compute.Describable; -import org.elasticsearch.compute.aggregation.GroupingAggregator; -import org.elasticsearch.compute.aggregation.GroupingAggregator.Factory; -import org.elasticsearch.compute.aggregation.GroupingAggregatorEvaluationContext; -import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction; -import org.elasticsearch.compute.aggregation.SeenGroupIds; -import org.elasticsearch.compute.aggregation.blockhash.BlockHash; -import org.elasticsearch.compute.aggregation.blockhash.BlockHash.GroupSpec; -import org.elasticsearch.compute.data.Block; -import org.elasticsearch.compute.data.BlockFactory; -import org.elasticsearch.compute.data.DocBlock; -import org.elasticsearch.compute.data.DocVector; -import org.elasticsearch.compute.data.ElementType; -import org.elasticsearch.compute.data.IntBlock; -import org.elasticsearch.compute.data.IntVector; -import org.elasticsearch.compute.data.Page; -import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperator; -import org.elasticsearch.core.RefCounted; -import org.elasticsearch.core.Releasable; -import org.elasticsearch.core.Releasables; -import org.elasticsearch.index.mapper.BlockLoader; - -import java.io.IOException; -import java.io.UncheckedIOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.function.IntFunction; -import java.util.function.Supplier; -import java.util.stream.Collectors; - -import static java.util.Objects.requireNonNull; -import static java.util.stream.Collectors.joining; - -/** - * Unlike {@link HashAggregationOperator}, this hash operator also extracts values or ordinals of the input documents. - */ -public class OrdinalsGroupingOperator implements Operator { - public record OrdinalsGroupingOperatorFactory( - IntFunction blockLoaders, - List shardContexts, - ElementType groupingElementType, - int docChannel, - String groupingField, - List aggregators, - int maxPageSize - ) implements OperatorFactory { - - @Override - public Operator get(DriverContext driverContext) { - return new OrdinalsGroupingOperator( - blockLoaders, - shardContexts, - groupingElementType, - docChannel, - groupingField, - aggregators, - maxPageSize, - driverContext - ); - } - - @Override - public String describe() { - return "OrdinalsGroupingOperator(aggs = " + aggregators.stream().map(Describable::describe).collect(joining(", ")) + ")"; - } - } - - private final IntFunction blockLoaders; - private final List shardContexts; - private final int docChannel; - private final String groupingField; - - private final List aggregatorFactories; - private final ElementType groupingElementType; - private final Map ordinalAggregators; - - private final DriverContext driverContext; - - private boolean finished = false; - - // used to extract and aggregate values - private final int maxPageSize; - private ValuesAggregator valuesAggregator; - - public OrdinalsGroupingOperator( - IntFunction blockLoaders, - List shardContexts, - ElementType groupingElementType, - int docChannel, - String groupingField, - List aggregatorFactories, - int maxPageSize, - DriverContext driverContext - ) { - Objects.requireNonNull(aggregatorFactories); - this.blockLoaders = blockLoaders; - this.shardContexts = shardContexts; - this.groupingElementType = groupingElementType; - this.docChannel = docChannel; - this.groupingField = groupingField; - this.aggregatorFactories = aggregatorFactories; - this.ordinalAggregators = new HashMap<>(); - this.maxPageSize = maxPageSize; - this.driverContext = driverContext; - } - - @Override - public boolean needsInput() { - return finished == false; - } - - @Override - public void addInput(Page page) { - checkState(needsInput(), "Operator is already finishing"); - requireNonNull(page, "page is null"); - DocVector docVector = page.getBlock(docChannel).asVector(); - final int shardIndex = docVector.shards().getInt(0); - RefCounted shardRefCounter = docVector.shardRefCounted().get(shardIndex); - final var blockLoader = blockLoaders.apply(shardIndex); - boolean pagePassed = false; - try { - if (docVector.singleSegmentNonDecreasing() && blockLoader.supportsOrdinals()) { - final IntVector segmentIndexVector = docVector.segments(); - assert segmentIndexVector.isConstant(); - final OrdinalSegmentAggregator ordinalAggregator = this.ordinalAggregators.computeIfAbsent( - new SegmentID(shardIndex, segmentIndexVector.getInt(0)), - k -> { - try { - return new OrdinalSegmentAggregator( - driverContext.blockFactory(), - this::createGroupingAggregators, - () -> blockLoader.ordinals(shardContexts.get(k.shardIndex).reader().leaves().get(k.segmentIndex)), - driverContext.bigArrays(), - shardRefCounter - ); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - ); - pagePassed = true; - ordinalAggregator.addInput(docVector.docs(), page); - } else { - if (valuesAggregator == null) { - int channelIndex = page.getBlockCount(); // extractor will append a new block at the end - valuesAggregator = new ValuesAggregator( - blockLoaders, - shardContexts, - groupingElementType, - docChannel, - groupingField, - channelIndex, - aggregatorFactories, - maxPageSize, - driverContext - ); - } - pagePassed = true; - valuesAggregator.addInput(page); - } - } finally { - if (pagePassed == false) { - Releasables.closeExpectNoException(page::releaseBlocks); - } - } - } - - private List createGroupingAggregators() { - boolean success = false; - List aggregators = new ArrayList<>(aggregatorFactories.size()); - try { - for (GroupingAggregator.Factory aggregatorFactory : aggregatorFactories) { - aggregators.add(aggregatorFactory.apply(driverContext)); - } - success = true; - return aggregators; - } finally { - if (success == false) { - Releasables.close(aggregators); - } - } - } - - @Override - public Page getOutput() { - if (finished == false) { - return null; - } - if (valuesAggregator != null) { - try { - return valuesAggregator.getOutput(); - } finally { - final ValuesAggregator aggregator = this.valuesAggregator; - this.valuesAggregator = null; - Releasables.close(aggregator); - } - } - if (ordinalAggregators.isEmpty() == false) { - try { - return mergeOrdinalsSegmentResults(); - } catch (IOException e) { - throw new UncheckedIOException(e); - } finally { - Releasables.close(() -> Releasables.close(ordinalAggregators.values()), ordinalAggregators::clear); - } - } - return null; - } - - @Override - public void finish() { - finished = true; - if (valuesAggregator != null) { - valuesAggregator.finish(); - } - } - - private Page mergeOrdinalsSegmentResults() throws IOException { - // TODO: Should we also combine from the results from ValuesAggregator - final PriorityQueue pq = new PriorityQueue<>(ordinalAggregators.size()) { - @Override - protected boolean lessThan(AggregatedResultIterator a, AggregatedResultIterator b) { - return a.currentTerm.compareTo(b.currentTerm) < 0; - } - }; - final List aggregators = createGroupingAggregators(); - try { - boolean seenNulls = false; - for (OrdinalSegmentAggregator agg : ordinalAggregators.values()) { - if (agg.seenNulls()) { - seenNulls = true; - for (int i = 0; i < aggregators.size(); i++) { - aggregators.get(i).addIntermediateRow(0, agg.aggregators.get(i), 0); - } - } - } - for (OrdinalSegmentAggregator agg : ordinalAggregators.values()) { - final AggregatedResultIterator it = agg.getResultIterator(); - if (it.next()) { - pq.add(it); - } - } - final int startPosition = seenNulls ? 0 : -1; - int position = startPosition; - final BytesRefBuilder lastTerm = new BytesRefBuilder(); - final Block[] blocks; - final int[] aggBlockCounts; - try (var keysBuilder = driverContext.blockFactory().newBytesRefBlockBuilder(1)) { - if (seenNulls) { - keysBuilder.appendNull(); - } - while (pq.size() > 0) { - final AggregatedResultIterator top = pq.top(); - if (position == startPosition || lastTerm.get().equals(top.currentTerm) == false) { - position++; - lastTerm.copyBytes(top.currentTerm); - keysBuilder.appendBytesRef(top.currentTerm); - } - for (int i = 0; i < top.aggregators.size(); i++) { - aggregators.get(i).addIntermediateRow(position, top.aggregators.get(i), top.currentPosition()); - } - if (top.next()) { - pq.updateTop(); - } else { - pq.pop(); - } - } - aggBlockCounts = aggregators.stream().mapToInt(GroupingAggregator::evaluateBlockCount).toArray(); - blocks = new Block[1 + Arrays.stream(aggBlockCounts).sum()]; - blocks[0] = keysBuilder.build(); - } - boolean success = false; - try { - try (IntVector selected = IntVector.range(0, blocks[0].getPositionCount(), driverContext.blockFactory())) { - int offset = 1; - for (int i = 0; i < aggregators.size(); i++) { - aggregators.get(i).evaluate(blocks, offset, selected, new GroupingAggregatorEvaluationContext(driverContext)); - offset += aggBlockCounts[i]; - } - } - success = true; - return new Page(blocks); - } finally { - if (success == false) { - Releasables.closeExpectNoException(blocks); - } - } - } finally { - Releasables.close(() -> Releasables.close(aggregators)); - } - } - - @Override - public boolean isFinished() { - return finished && valuesAggregator == null && ordinalAggregators.isEmpty(); - } - - @Override - public void close() { - Releasables.close(() -> Releasables.close(ordinalAggregators.values()), valuesAggregator); - } - - private static void checkState(boolean condition, String msg) { - if (condition == false) { - throw new IllegalArgumentException(msg); - } - } - - @Override - public String toString() { - String aggregatorDescriptions = aggregatorFactories.stream() - .map(factory -> "\"" + factory.describe() + "\"") - .collect(Collectors.joining(", ")); - - return this.getClass().getSimpleName() + "[" + "aggregators=[" + aggregatorDescriptions + "]]"; - } - - record SegmentID(int shardIndex, int segmentIndex) { - - } - - static final class OrdinalSegmentAggregator implements Releasable, SeenGroupIds { - private final BlockFactory blockFactory; - private final List aggregators; - private final CheckedSupplier docValuesSupplier; - private final BitArray visitedOrds; - private final RefCounted shardRefCounted; - private BlockOrdinalsReader currentReader; - - OrdinalSegmentAggregator( - BlockFactory blockFactory, - Supplier> aggregatorsSupplier, - CheckedSupplier docValuesSupplier, - BigArrays bigArrays, - RefCounted shardRefCounted - ) throws IOException { - boolean success = false; - this.shardRefCounted = shardRefCounted; - this.shardRefCounted.mustIncRef(); - List groupingAggregators = null; - BitArray bitArray = null; - try { - final SortedSetDocValues sortedSetDocValues = docValuesSupplier.get(); - bitArray = new BitArray(sortedSetDocValues.getValueCount(), bigArrays); - groupingAggregators = aggregatorsSupplier.get(); - this.currentReader = BlockOrdinalsReader.newReader(blockFactory, sortedSetDocValues); - this.blockFactory = blockFactory; - this.docValuesSupplier = docValuesSupplier; - this.aggregators = groupingAggregators; - this.visitedOrds = bitArray; - success = true; - } finally { - if (success == false) { - if (bitArray != null) Releasables.close(bitArray); - if (groupingAggregators != null) Releasables.close(groupingAggregators); - // There is no danger of double decRef here, since this decRef is called only if the constructor throws, so it would be - // impossible to call close on the instance. - shardRefCounted.decRef(); - } - } - } - - void addInput(IntVector docs, Page page) { - GroupingAggregatorFunction.AddInput[] prepared = new GroupingAggregatorFunction.AddInput[aggregators.size()]; - try { - for (int i = 0; i < prepared.length; i++) { - prepared[i] = aggregators.get(i).prepareProcessPage(this, page); - } - - if (BlockOrdinalsReader.canReuse(currentReader, docs.getInt(0)) == false) { - currentReader = BlockOrdinalsReader.newReader(blockFactory, docValuesSupplier.get()); - } - try (IntBlock ordinals = currentReader.readOrdinalsAdded1(docs)) { - final IntVector ordinalsVector = ordinals.asVector(); - if (ordinalsVector != null) { - addOrdinalsInput(ordinalsVector, prepared); - } else { - addOrdinalsInput(ordinals, prepared); - } - } - } catch (IOException e) { - throw new UncheckedIOException(e); - } finally { - Releasables.close(page::releaseBlocks, Releasables.wrap(prepared)); - } - } - - void addOrdinalsInput(IntBlock ordinals, GroupingAggregatorFunction.AddInput[] prepared) { - for (int p = 0; p < ordinals.getPositionCount(); p++) { - int start = ordinals.getFirstValueIndex(p); - int end = start + ordinals.getValueCount(p); - for (int i = start; i < end; i++) { - long ord = ordinals.getInt(i); - visitedOrds.set(ord); - } - } - for (GroupingAggregatorFunction.AddInput addInput : prepared) { - addInput.add(0, ordinals); - } - } - - void addOrdinalsInput(IntVector ordinals, GroupingAggregatorFunction.AddInput[] prepared) { - for (int p = 0; p < ordinals.getPositionCount(); p++) { - long ord = ordinals.getInt(p); - visitedOrds.set(ord); - } - for (GroupingAggregatorFunction.AddInput addInput : prepared) { - addInput.add(0, ordinals); - } - } - - AggregatedResultIterator getResultIterator() throws IOException { - return new AggregatedResultIterator(aggregators, visitedOrds, docValuesSupplier.get()); - } - - boolean seenNulls() { - return visitedOrds.get(0); - } - - @Override - public BitArray seenGroupIds(BigArrays bigArrays) { - final BitArray seen = new BitArray(0, bigArrays); - boolean success = false; - try { - // the or method can grow the `seen` bits - seen.or(visitedOrds); - success = true; - return seen; - } finally { - if (success == false) { - Releasables.close(seen); - } - } - } - - @Override - public void close() { - Releasables.close(visitedOrds, () -> Releasables.close(aggregators), Releasables.fromRefCounted(shardRefCounted)); - } - } - - private static class AggregatedResultIterator { - private BytesRef currentTerm; - private long currentOrd = 0; - private final List aggregators; - private final BitArray ords; - private final SortedSetDocValues dv; - - AggregatedResultIterator(List aggregators, BitArray ords, SortedSetDocValues dv) { - this.aggregators = aggregators; - this.ords = ords; - this.dv = dv; - } - - int currentPosition() { - assert currentOrd != Long.MAX_VALUE : "Must not read position when iterator is exhausted"; - return Math.toIntExact(currentOrd); - } - - boolean next() throws IOException { - currentOrd = ords.nextSetBit(currentOrd + 1); - assert currentOrd > 0 : currentOrd; - if (currentOrd < Long.MAX_VALUE) { - currentTerm = dv.lookupOrd(currentOrd - 1); - return true; - } else { - currentTerm = null; - return false; - } - } - } - - private static class ValuesAggregator implements Releasable { - private final ValuesSourceReaderOperator extractor; - private final HashAggregationOperator aggregator; - - ValuesAggregator( - IntFunction blockLoaders, - List shardContexts, - ElementType groupingElementType, - int docChannel, - String groupingField, - int channelIndex, - List aggregatorFactories, - int maxPageSize, - DriverContext driverContext - ) { - this.extractor = new ValuesSourceReaderOperator( - driverContext.blockFactory(), - List.of(new ValuesSourceReaderOperator.FieldInfo(groupingField, groupingElementType, blockLoaders)), - shardContexts, - docChannel - ); - this.aggregator = new HashAggregationOperator( - aggregatorFactories, - () -> BlockHash.build( - List.of(new GroupSpec(channelIndex, groupingElementType)), - driverContext.blockFactory(), - maxPageSize, - false - ), - driverContext - ); - } - - void addInput(Page page) { - extractor.addInput(page); - Page out = extractor.getOutput(); - if (out != null) { - aggregator.addInput(out); - } - } - - void finish() { - aggregator.finish(); - } - - Page getOutput() { - return aggregator.getOutput(); - } - - @Override - public void close() { - Releasables.close(extractor, aggregator); - } - } - - abstract static class BlockOrdinalsReader { - protected final Thread creationThread; - protected final BlockFactory blockFactory; - - BlockOrdinalsReader(BlockFactory blockFactory) { - this.blockFactory = blockFactory; - this.creationThread = Thread.currentThread(); - } - - static BlockOrdinalsReader newReader(BlockFactory blockFactory, SortedSetDocValues sortedSetDocValues) { - SortedDocValues singleValues = DocValues.unwrapSingleton(sortedSetDocValues); - if (singleValues != null) { - return new SortedDocValuesBlockOrdinalsReader(blockFactory, singleValues); - } else { - return new SortedSetDocValuesBlockOrdinalsReader(blockFactory, sortedSetDocValues); - } - } - - abstract IntBlock readOrdinalsAdded1(IntVector docs) throws IOException; - - abstract int docID(); - - /** - * Checks if the reader can be used to read a range documents starting with the given docID by the current thread. - */ - static boolean canReuse(BlockOrdinalsReader reader, int startingDocID) { - return reader != null && reader.creationThread == Thread.currentThread() && reader.docID() <= startingDocID; - } - } - - private static class SortedSetDocValuesBlockOrdinalsReader extends BlockOrdinalsReader { - private final SortedSetDocValues sortedSetDocValues; - - SortedSetDocValuesBlockOrdinalsReader(BlockFactory blockFactory, SortedSetDocValues sortedSetDocValues) { - super(blockFactory); - this.sortedSetDocValues = sortedSetDocValues; - } - - @Override - IntBlock readOrdinalsAdded1(IntVector docs) throws IOException { - final int positionCount = docs.getPositionCount(); - try (IntBlock.Builder builder = blockFactory.newIntBlockBuilder(positionCount)) { - for (int p = 0; p < positionCount; p++) { - int doc = docs.getInt(p); - if (false == sortedSetDocValues.advanceExact(doc)) { - builder.appendInt(0); - continue; - } - int count = sortedSetDocValues.docValueCount(); - if (count == 1) { - builder.appendInt(Math.toIntExact(sortedSetDocValues.nextOrd() + 1)); - continue; - } - builder.beginPositionEntry(); - for (int i = 0; i < count; i++) { - builder.appendInt(Math.toIntExact(sortedSetDocValues.nextOrd() + 1)); - } - builder.endPositionEntry(); - } - return builder.build(); - } - } - - @Override - int docID() { - return sortedSetDocValues.docID(); - } - } - - private static class SortedDocValuesBlockOrdinalsReader extends BlockOrdinalsReader { - private final SortedDocValues sortedDocValues; - - SortedDocValuesBlockOrdinalsReader(BlockFactory blockFactory, SortedDocValues sortedDocValues) { - super(blockFactory); - this.sortedDocValues = sortedDocValues; - } - - @Override - IntBlock readOrdinalsAdded1(IntVector docs) throws IOException { - final int positionCount = docs.getPositionCount(); - try (IntVector.FixedBuilder builder = blockFactory.newIntVectorFixedBuilder(positionCount)) { - for (int p = 0; p < positionCount; p++) { - if (sortedDocValues.advanceExact(docs.getInt(p))) { - builder.appendInt(p, sortedDocValues.ordValue() + 1); - } else { - builder.appendInt(p, 0); - } - } - return builder.build().asBlock(); - } - } - - @Override - int docID() { - return sortedDocValues.docID(); - } - } -} diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java index e3ddbe0b58aed..db4febdf8ddca 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java @@ -11,8 +11,6 @@ import org.apache.lucene.document.Field; import org.apache.lucene.document.LongField; import org.apache.lucene.document.LongPoint; -import org.apache.lucene.document.SortedNumericDocValuesField; -import org.apache.lucene.document.SortedSetDocValuesField; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.LeafReaderContext; @@ -20,13 +18,11 @@ import org.apache.lucene.search.Collector; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.LeafCollector; -import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.Query; import org.apache.lucene.search.Scorable; import org.apache.lucene.search.ScoreMode; import org.apache.lucene.store.Directory; import org.apache.lucene.tests.index.RandomIndexWriter; -import org.apache.lucene.tests.store.BaseDirectoryWrapper; import org.apache.lucene.util.BytesRef; import org.elasticsearch.common.breaker.CircuitBreakingException; import org.elasticsearch.common.settings.Settings; @@ -35,12 +31,8 @@ import org.elasticsearch.common.util.MockBigArrays; import org.elasticsearch.common.util.MockPageCacheRecycler; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; -import org.elasticsearch.compute.aggregation.CountAggregatorFunction; -import org.elasticsearch.compute.aggregation.ValuesLongAggregatorFunctionSupplier; -import org.elasticsearch.compute.aggregation.blockhash.BlockHash; import org.elasticsearch.compute.data.Block; import org.elasticsearch.compute.data.BlockFactory; -import org.elasticsearch.compute.data.BytesRefBlock; import org.elasticsearch.compute.data.DocBlock; import org.elasticsearch.compute.data.DocVector; import org.elasticsearch.compute.data.ElementType; @@ -55,15 +47,10 @@ import org.elasticsearch.compute.lucene.LuceneSourceOperatorTests; import org.elasticsearch.compute.lucene.ShardContext; import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperator; -import org.elasticsearch.compute.operator.AbstractPageMappingOperator; import org.elasticsearch.compute.operator.Driver; import org.elasticsearch.compute.operator.DriverContext; -import org.elasticsearch.compute.operator.HashAggregationOperator; -import org.elasticsearch.compute.operator.Operator; -import org.elasticsearch.compute.operator.OrdinalsGroupingOperator; import org.elasticsearch.compute.operator.PageConsumerOperator; import org.elasticsearch.compute.operator.RowInTableLookupOperator; -import org.elasticsearch.compute.operator.ShuffleDocsOperator; import org.elasticsearch.compute.test.BlockTestUtils; import org.elasticsearch.compute.test.OperatorTestCase; import org.elasticsearch.compute.test.SequenceLongBlockSourceOperator; @@ -77,7 +64,6 @@ import org.elasticsearch.index.mapper.KeywordFieldMapper; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.MapperServiceTestCase; -import org.elasticsearch.index.mapper.SourceLoader; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.search.lookup.SearchLookup; @@ -92,8 +78,6 @@ import java.util.Set; import java.util.TreeMap; -import static org.elasticsearch.compute.aggregation.AggregatorMode.FINAL; -import static org.elasticsearch.compute.aggregation.AggregatorMode.INITIAL; import static org.elasticsearch.compute.test.OperatorTestCase.randomPageSize; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; @@ -168,200 +152,6 @@ public void testQueryOperator() throws IOException { } } - public void testGroupingWithOrdinals() throws Exception { - DriverContext driverContext = driverContext(); - BlockFactory blockFactory = driverContext.blockFactory(); - - final String gField = "g"; - final int numDocs = 2856; // between(100, 10000); - final Map expectedCounts = new HashMap<>(); - int keyLength = randomIntBetween(1, 10); - try (BaseDirectoryWrapper dir = newDirectory(); RandomIndexWriter writer = new RandomIndexWriter(random(), dir)) { - for (int i = 0; i < numDocs; i++) { - Document doc = new Document(); - BytesRef key = new BytesRef(randomByteArrayOfLength(keyLength)); - SortedSetDocValuesField docValuesField = new SortedSetDocValuesField(gField, key); - doc.add(docValuesField); - writer.addDocument(doc); - expectedCounts.compute(key, (k, v) -> v == null ? 1 : v + 1); - } - writer.commit(); - Map actualCounts = new HashMap<>(); - - try (DirectoryReader reader = writer.getReader()) { - List operators = new ArrayList<>(); - if (randomBoolean()) { - operators.add(new ShuffleDocsOperator(blockFactory)); - } - operators.add(new AbstractPageMappingOperator() { - @Override - protected Page process(Page page) { - return page.appendBlock(driverContext.blockFactory().newConstantIntBlockWith(1, page.getPositionCount())); - } - - @Override - public String toString() { - return "Add(1)"; - } - }); - operators.add( - new OrdinalsGroupingOperator( - shardIdx -> new KeywordFieldMapper.KeywordFieldType("g").blockLoader(mockBlContext()), - List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE, 0.2)), - ElementType.BYTES_REF, - 0, - gField, - List.of(CountAggregatorFunction.supplier().groupingAggregatorFactory(INITIAL, List.of(1))), - randomPageSize(), - driverContext - ) - ); - operators.add( - new HashAggregationOperator( - List.of(CountAggregatorFunction.supplier().groupingAggregatorFactory(FINAL, List.of(1, 2))), - () -> BlockHash.build( - List.of(new BlockHash.GroupSpec(0, ElementType.BYTES_REF)), - driverContext.blockFactory(), - randomPageSize(), - false - ), - driverContext - ) - ); - Driver driver = TestDriverFactory.create( - driverContext, - luceneOperatorFactory( - reader, - List.of(new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())), - LuceneOperator.NO_LIMIT - ).get(driverContext), - operators, - new PageConsumerOperator(page -> { - BytesRefBlock keys = page.getBlock(0); - LongBlock counts = page.getBlock(1); - for (int i = 0; i < keys.getPositionCount(); i++) { - BytesRef spare = new BytesRef(); - keys.getBytesRef(i, spare); - actualCounts.put(BytesRef.deepCopyOf(spare), counts.getLong(i)); - } - page.releaseBlocks(); - }) - ); - OperatorTestCase.runDriver(driver); - assertThat(actualCounts, equalTo(expectedCounts)); - assertDriverContext(driverContext); - org.elasticsearch.common.util.MockBigArrays.ensureAllArraysAreReleased(); - } - } - assertThat(blockFactory.breaker().getUsed(), equalTo(0L)); - } - - // TODO: Remove ordinals grouping operator or enable it GroupingAggregatorFunctionTestCase - public void testValuesWithOrdinalGrouping() throws Exception { - DriverContext driverContext = driverContext(); - BlockFactory blockFactory = driverContext.blockFactory(); - - final int numDocs = between(100, 1000); - Map> expectedValues = new HashMap<>(); - try (BaseDirectoryWrapper dir = newDirectory(); RandomIndexWriter writer = new RandomIndexWriter(random(), dir)) { - String VAL_NAME = "val"; - String KEY_NAME = "key"; - for (int i = 0; i < numDocs; i++) { - Document doc = new Document(); - BytesRef key = new BytesRef(Integer.toString(between(1, 100))); - SortedSetDocValuesField keyField = new SortedSetDocValuesField(KEY_NAME, key); - doc.add(keyField); - if (randomBoolean()) { - int numValues = between(0, 2); - for (int v = 0; v < numValues; v++) { - long val = between(1, 1000); - var valuesField = new SortedNumericDocValuesField(VAL_NAME, val); - doc.add(valuesField); - expectedValues.computeIfAbsent(key, k -> new HashSet<>()).add(val); - } - } - writer.addDocument(doc); - } - writer.commit(); - try (DirectoryReader reader = writer.getReader()) { - List operators = new ArrayList<>(); - if (randomBoolean()) { - operators.add(new ShuffleDocsOperator(blockFactory)); - } - operators.add( - new ValuesSourceReaderOperator( - blockFactory, - List.of( - new ValuesSourceReaderOperator.FieldInfo( - VAL_NAME, - ElementType.LONG, - unused -> new BlockDocValuesReader.LongsBlockLoader(VAL_NAME) - ) - ), - List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> { - throw new UnsupportedOperationException(); - }, 0.2)), - 0 - ) - ); - operators.add( - new OrdinalsGroupingOperator( - shardIdx -> new KeywordFieldMapper.KeywordFieldType(KEY_NAME).blockLoader(mockBlContext()), - List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE, 0.2)), - ElementType.BYTES_REF, - 0, - KEY_NAME, - List.of(new ValuesLongAggregatorFunctionSupplier().groupingAggregatorFactory(INITIAL, List.of(1))), - randomPageSize(), - driverContext - ) - ); - operators.add( - new HashAggregationOperator( - List.of(new ValuesLongAggregatorFunctionSupplier().groupingAggregatorFactory(FINAL, List.of(1))), - () -> BlockHash.build( - List.of(new BlockHash.GroupSpec(0, ElementType.BYTES_REF)), - driverContext.blockFactory(), - randomPageSize(), - false - ), - driverContext - ) - ); - Map> actualValues = new HashMap<>(); - Driver driver = TestDriverFactory.create( - driverContext, - luceneOperatorFactory( - reader, - List.of(new LuceneSliceQueue.QueryAndTags(new MatchAllDocsQuery(), List.of())), - LuceneOperator.NO_LIMIT - ).get(driverContext), - operators, - new PageConsumerOperator(page -> { - BytesRefBlock keyBlock = page.getBlock(0); - LongBlock valueBlock = page.getBlock(1); - BytesRef spare = new BytesRef(); - for (int p = 0; p < page.getPositionCount(); p++) { - var key = keyBlock.getBytesRef(p, spare); - int valueCount = valueBlock.getValueCount(p); - for (int i = 0; i < valueCount; i++) { - long val = valueBlock.getLong(valueBlock.getFirstValueIndex(p) + i); - boolean added = actualValues.computeIfAbsent(BytesRef.deepCopyOf(key), k -> new HashSet<>()).add(val); - assertTrue(actualValues.toString(), added); - } - } - page.releaseBlocks(); - }) - ); - OperatorTestCase.runDriver(driver); - assertDriverContext(driverContext); - assertThat(actualValues, equalTo(expectedValues)); - org.elasticsearch.common.util.MockBigArrays.ensureAllArraysAreReleased(); - } - } - assertThat(blockFactory.breaker().getUsed(), equalTo(0L)); - } - public void testPushRoundToToQuery() throws IOException { long firstGroupMax = randomLong(); long secondGroupMax = randomLong(); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/FilteredGroupingAggregatorFunctionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/FilteredGroupingAggregatorFunctionTests.java index 407056f429dad..948aca115f829 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/FilteredGroupingAggregatorFunctionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/FilteredGroupingAggregatorFunctionTests.java @@ -11,14 +11,12 @@ import org.elasticsearch.compute.data.BlockFactory; import org.elasticsearch.compute.data.BooleanVector; import org.elasticsearch.compute.data.IntBlock; -import org.elasticsearch.compute.data.IntVector; import org.elasticsearch.compute.data.LongBlock; import org.elasticsearch.compute.data.Page; import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.compute.operator.EvalOperator; import org.elasticsearch.compute.operator.LongIntBlockSourceOperator; import org.elasticsearch.compute.operator.SourceOperator; -import org.elasticsearch.core.Releasables; import org.elasticsearch.core.Tuple; import org.junit.After; @@ -105,43 +103,6 @@ protected SourceOperator simpleInput(BlockFactory blockFactory, int size) { ); } - /** - * Tests {@link GroupingAggregator#addIntermediateRow} by building results using the traditional - * add mechanism and using {@link GroupingAggregator#addIntermediateRow} then asserting that they - * produce the same output. - */ - public void testAddIntermediateRowInput() { - DriverContext ctx = driverContext(); - AggregatorFunctionSupplier supplier = aggregatorFunction(); - List channels = channels(AggregatorMode.SINGLE); - Block[] results = new Block[2]; - try ( - GroupingAggregatorFunction main = supplier.groupingAggregator(ctx, channels); - GroupingAggregatorFunction leaf = supplier.groupingAggregator(ctx, channels); - SourceOperator source = simpleInput(ctx.blockFactory(), 10); - ) { - Page p; - while ((p = source.getOutput()) != null) { - try ( - IntVector group = ctx.blockFactory().newConstantIntVector(0, p.getPositionCount()); - GroupingAggregatorFunction.AddInput addInput = leaf.prepareProcessRawInputPage(null, p) - ) { - addInput.add(0, group); - } finally { - p.releaseBlocks(); - } - } - main.addIntermediateRowInput(0, leaf, 0); - try (IntVector selected = ctx.blockFactory().newConstantIntVector(0, 1)) { - main.evaluateFinal(results, 0, selected, new GroupingAggregatorEvaluationContext(ctx)); - leaf.evaluateFinal(results, 1, selected, new GroupingAggregatorEvaluationContext(ctx)); - } - assertThat(results[0], equalTo(results[1])); - } finally { - Releasables.close(results); - } - } - @After public void checkUnclosed() { for (Exception tracker : unclosed) { diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java index 68c606f2e3fa2..e8153673adb89 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java @@ -71,7 +71,6 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.oneOf; @@ -466,34 +465,6 @@ public void assertDriverData(Map driverMetadata, Map) driverSliceArgs.get("operators")), not(empty())); } - public void testProfileOrdinalsGroupingOperator() throws IOException { - assumeTrue("requires pragmas", Build.current().isSnapshot()); - indexTimestampData(1); - - RequestObjectBuilder builder = requestObjectBuilder().query(fromIndex() + " | STATS AVG(value) BY test.keyword"); - builder.profile(true); - // Lock to shard level partitioning, so we get consistent profile output - builder.pragmas(Settings.builder().put("data_partitioning", "shard").build()); - Map result = runEsql(builder); - - List> signatures = new ArrayList<>(); - @SuppressWarnings("unchecked") - List> profiles = (List>) ((Map) result.get("profile")).get("drivers"); - for (Map p : profiles) { - fixTypesOnProfile(p); - assertThat(p, commonProfile()); - List sig = new ArrayList<>(); - @SuppressWarnings("unchecked") - List> operators = (List>) p.get("operators"); - for (Map o : operators) { - sig.add((String) o.get("operator")); - } - signatures.add(sig); - } - - assertThat(signatures, hasItem(hasItem("OrdinalsGroupingOperator[aggregators=[\"sum of longs\", \"count\"]]"))); - } - @AwaitsFix(bugUrl = "disabled until JOIN infrastructrure properly lands") public void testInlineStatsProfile() throws IOException { assumeTrue("INLINESTATS only available on snapshots", Build.current().isSnapshot()); diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/unmapped_fields.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/unmapped_fields.csv-spec index a0828ff628a6d..c2d02d8d60d51 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/unmapped_fields.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/unmapped_fields.csv-spec @@ -495,8 +495,8 @@ required_capability: source_field_mapping required_capability: unmapped_fields FROM partial_mapping_sample_data,partial_mapping_excluded_source_sample_data METADATA _index | INSIST_🐔 message -| SORT message, @timestamp | STATS max(@timestamp), count(*) BY message +| SORT message NULLS FIRST ; max(@timestamp):date | count(*):long | message:keyword diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java index efefde8871546..5d3586b689832 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java @@ -597,8 +597,8 @@ public void testTaskContentsForGroupingStatsQuery() throws Exception { equalTo( """ \\_LuceneSourceOperator[sourceStatus] - \\_ValuesSourceReaderOperator[fields = [foo]] - \\_OrdinalsGroupingOperator(aggs = max of longs) + \\_ValuesSourceReaderOperator[fields = [pause_me, foo]] + \\_HashAggregationOperator[mode = , aggs = max of longs] \\_ExchangeSinkOperator""".replace("sourceStatus", sourceStatus) ) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/InsertFieldExtraction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/InsertFieldExtraction.java index a4ec64b004a0c..69e20e4895e48 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/InsertFieldExtraction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/InsertFieldExtraction.java @@ -8,13 +8,11 @@ package org.elasticsearch.xpack.esql.optimizer.rules.physical.local; import org.elasticsearch.xpack.esql.core.expression.Attribute; -import org.elasticsearch.xpack.esql.core.expression.Expressions; import org.elasticsearch.xpack.esql.core.expression.FieldAttribute; import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute; import org.elasticsearch.xpack.esql.optimizer.LocalPhysicalOptimizerContext; import org.elasticsearch.xpack.esql.optimizer.PhysicalOptimizerRules; import org.elasticsearch.xpack.esql.optimizer.rules.physical.ProjectAwayColumns; -import org.elasticsearch.xpack.esql.plan.physical.AggregateExec; import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec; import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec; import org.elasticsearch.xpack.esql.plan.physical.LeafExec; @@ -48,16 +46,6 @@ public PhysicalPlan rule(PhysicalPlan plan, LocalPhysicalOptimizerContext contex var missing = missingAttributes(p); - /* - * If there is a single grouping then we'll try to use ords. Either way - * it loads the field lazily. If we have more than one field we need to - * make sure the fields are loaded for the standard hash aggregator. - */ - if (p instanceof AggregateExec agg) { - var ordinalAttributes = agg.ordinalAttributes(); - missing.removeAll(Expressions.references(ordinalAttributes)); - } - // add extractor if (missing.isEmpty() == false) { // identify child (for binary nodes) that exports _doc and place the field extractor there diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/AggregateExec.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/AggregateExec.java index 6d0991a24a36c..8b1bebad97cef 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/AggregateExec.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/AggregateExec.java @@ -18,13 +18,10 @@ import org.elasticsearch.xpack.esql.core.expression.NamedExpression; import org.elasticsearch.xpack.esql.core.tree.NodeInfo; import org.elasticsearch.xpack.esql.core.tree.Source; -import org.elasticsearch.xpack.esql.expression.function.grouping.Categorize; import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; import org.elasticsearch.xpack.esql.plan.logical.Aggregate; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashSet; import java.util.List; import java.util.Objects; @@ -191,27 +188,7 @@ public List output() { @Override protected AttributeSet computeReferences() { - return mode.isInputPartial() - ? AttributeSet.of(intermediateAttributes) - : Aggregate.computeReferences(aggregates, groupings).subtract(AttributeSet.of(ordinalAttributes())); - } - - /** Returns the attributes that can be loaded from ordinals -- no explicit extraction is needed */ - public List ordinalAttributes() { - List orginalAttributs = new ArrayList<>(groupings.size()); - // Ordinals can be leveraged just for a single grouping. If there are multiple groupings, fields need to be laoded for the - // hash aggregator. - // CATEGORIZE requires the standard hash aggregator as well. - if (groupings().size() == 1 && groupings.get(0).anyMatch(e -> e instanceof Categorize) == false) { - var leaves = new HashSet<>(); - aggregates.stream().filter(a -> groupings.contains(a) == false).forEach(a -> leaves.addAll(a.collectLeaves())); - groupings.forEach(g -> { - if (leaves.contains(g) == false) { - orginalAttributs.add((Attribute) g); - } - }); - } - return orginalAttributs; + return mode.isInputPartial() ? AttributeSet.of(intermediateAttributes) : Aggregate.computeReferences(aggregates, groupings); } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java index a78a15bf1ca48..a5d19fcc3fb14 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java @@ -174,16 +174,6 @@ else if (aggregatorMode.isOutputPartial()) { groupSpecs.stream().map(GroupSpec::toHashGroupSpec).toList(), context ); - // ordinal grouping - } else if (groupSpecs.size() == 1 && groupSpecs.get(0).channel == null) { - operatorFactory = ordinalGroupingOperatorFactory( - source, - aggregateExec, - aggregatorFactories, - groupSpecs.get(0).attribute, - groupSpecs.get(0).elementType(), - context - ); } else { operatorFactory = new HashAggregationOperatorFactory( groupSpecs.stream().map(GroupSpec::toHashGroupSpec).toList(), @@ -362,18 +352,6 @@ ElementType elementType() { } } - /** - * Build a grouping operator that operates on ordinals if possible. - */ - public abstract Operator.OperatorFactory ordinalGroupingOperatorFactory( - PhysicalOperation source, - AggregateExec aggregateExec, - List aggregatorFactories, - Attribute attrSource, - ElementType groupType, - LocalExecutionPlannerContext context - ); - public abstract Operator.OperatorFactory timeSeriesAggregatorOperatorFactory( TimeSeriesAggregateExec ts, AggregatorMode aggregatorMode, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java index 3f403d3e4fcd2..47deb3d78ca72 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java @@ -32,7 +32,6 @@ import org.elasticsearch.compute.lucene.read.TimeSeriesExtractFieldOperator; import org.elasticsearch.compute.lucene.read.ValuesSourceReaderOperator; import org.elasticsearch.compute.operator.Operator; -import org.elasticsearch.compute.operator.OrdinalsGroupingOperator; import org.elasticsearch.compute.operator.SourceOperator; import org.elasticsearch.compute.operator.TimeSeriesAggregationOperator; import org.elasticsearch.core.AbstractRefCounted; @@ -67,7 +66,6 @@ import org.elasticsearch.xpack.esql.core.type.MultiTypeEsField; import org.elasticsearch.xpack.esql.core.type.PotentiallyUnmappedKeywordEsField; import org.elasticsearch.xpack.esql.expression.function.scalar.convert.AbstractConvertFunction; -import org.elasticsearch.xpack.esql.plan.physical.AggregateExec; import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec; import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec.Sort; import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec; @@ -89,7 +87,6 @@ import static org.elasticsearch.common.lucene.search.Queries.newNonNestedFilter; import static org.elasticsearch.compute.lucene.LuceneSourceOperator.NO_LIMIT; -import static org.elasticsearch.index.mapper.MappedFieldType.FieldExtractPreference.NONE; public class EsPhysicalOperationProviders extends AbstractPhysicalOperationProviders { private static final Logger logger = LogManager.getLogger(EsPhysicalOperationProviders.class); @@ -350,39 +347,6 @@ public LuceneCountOperator.Factory countSource(LocalExecutionPlannerContext cont ); } - @Override - public final Operator.OperatorFactory ordinalGroupingOperatorFactory( - LocalExecutionPlanner.PhysicalOperation source, - AggregateExec aggregateExec, - List aggregatorFactories, - Attribute attrSource, - ElementType groupElementType, - LocalExecutionPlannerContext context - ) { - var sourceAttribute = FieldExtractExec.extractSourceAttributesFrom(aggregateExec.child()); - int docChannel = source.layout.get(sourceAttribute.id()).channel(); - List vsShardContexts = shardContexts.stream() - .map( - s -> new ValuesSourceReaderOperator.ShardContext( - s.searcher().getIndexReader(), - s::newSourceLoader, - s.storedFieldsSequentialProportion() - ) - ) - .toList(); - // The grouping-by values are ready, let's group on them directly. - // Costin: why are they ready and not already exposed in the layout? - return new OrdinalsGroupingOperator.OrdinalsGroupingOperatorFactory( - shardIdx -> getBlockLoaderFor(shardIdx, attrSource, NONE), - vsShardContexts, - groupElementType, - docChannel, - attrSource.name(), - aggregatorFactories, - context.pageSize(aggregateExec.estimatedRowSize()) - ); - } - @Override public Operator.OperatorFactory timeSeriesAggregatorOperatorFactory( TimeSeriesAggregateExec ts, diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java index 6d7f818d922ca..79d58341783aa 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java @@ -773,7 +773,17 @@ public void testExtractorsOverridingFields() { assertThat(names(extract.attributesToExtract()), contains("emp_no")); } - public void testDoNotExtractGroupingFields() { + /** + * LimitExec[1000[INTEGER],58] + * \_AggregateExec[[first_name{f}#3520],[SUM(salary{f}#3524,true[BOOLEAN]) AS x#3518, first_name{f}#3520],FINAL,[first_name{f}#3520, + * $$x$sum{r}#3530, $$x$seen{r}#3531],58] + * \_ExchangeExec[[first_name{f}#3520, $$x$sum{r}#3530, $$x$seen{r}#3531],true] + * \_AggregateExec[[first_name{f}#3520],[SUM(salary{f}#3524,true[BOOLEAN]) AS x#3518, first_name{f}#3520],INITIAL,[first_name{f}#352 + * 0, $$x$sum{r}#3546, $$x$seen{r}#3547],58] + * \_FieldExtractExec[first_name{f}#3520, salary{f}#3524] + * \_EsQueryExec[test], indexMode[standard], query[][_doc{f}#3548], limit[], sort[] estimatedRowSize[58] + */ + public void testDoExtractGroupingFields() { var plan = physicalPlan(""" from test | stats x = sum(salary) by first_name @@ -791,12 +801,12 @@ public void testDoNotExtractGroupingFields() { assertThat(aggregate.groupings(), hasSize(1)); var extract = as(aggregate.child(), FieldExtractExec.class); - assertThat(names(extract.attributesToExtract()), equalTo(List.of("salary"))); + assertThat(names(extract.attributesToExtract()), equalTo(List.of("first_name", "salary"))); var source = source(extract.child()); // doc id and salary are ints. salary isn't extracted. // TODO salary kind of is extracted. At least sometimes it is. should it count? - assertThat(source.estimatedRowSize(), equalTo(Integer.BYTES * 2)); + assertThat(source.estimatedRowSize(), equalTo(Integer.BYTES * 2 + 50)); } public void testExtractGroupingFieldsIfAggd() { @@ -4045,7 +4055,7 @@ public void testSpatialTypesAndStatsUseDocValuesMultiAggregationsGrouped() { * \_AggregateExec[[scalerank{f}#16],[SPATIALCENTROID(location{f}#18) AS centroid, COUNT([2a][KEYWORD]) AS count],FINAL,58] * \_ExchangeExec[[scalerank{f}#16, xVal{r}#19, xDel{r}#20, yVal{r}#21, yDel{r}#22, count{r}#23, count{r}#24, seen{r}#25],true] * \_AggregateExec[[scalerank{f}#16],[SPATIALCENTROID(location{f}#18) AS centroid, COUNT([2a][KEYWORD]) AS count],PARTIAL,58] - * \_FieldExtractExec[location{f}#18][location{f}#18] + * \_FieldExtractExec[scalerank{f}#16][location{f}#18][location{f}#18] * \_EsQueryExec[airports], query[][_doc{f}#42], limit[], sort[] estimatedRowSize[54] * * Note the FieldExtractExec has 'location' set for stats: FieldExtractExec[location{f}#9][location{f}#9] @@ -5474,6 +5484,7 @@ public void testPushSpatialDistanceEvalWithSimpleStatsToSource() { * \_ExchangeExec[[country{f}#21, count{r}#24, seen{r}#25, xVal{r}#26, xDel{r}#27, yVal{r}#28, yDel{r}#29, count{r}#30],true] * \_AggregateExec[[country{f}#21],[COUNT([2a][KEYWORD]) AS count, SPATIALCENTROID(location{f}#20) AS centroid, country{f}#21],INIT * IAL,[country{f}#21, count{r}#49, seen{r}#50, xVal{r}#51, xDel{r}#52, yVal{r}#53, yDel{r}#54, count{r}#55],79] + * \_FieldExtractExec[country{f}#15254] * \_EvalExec[[STDISTANCE(location{f}#20,[1 1 0 0 0 e1 7a 14 ae 47 21 29 40 a0 1a 2f dd 24 d6 4b 40][GEO_POINT]) * AS distance]] * \_FieldExtractExec[location{f}#20][location{f}#20] @@ -5550,7 +5561,8 @@ public void testPushSpatialDistanceEvalWithStatsToSource() { var exchangeExec = as(aggExec.child(), ExchangeExec.class); var aggExec2 = as(exchangeExec.child(), AggregateExec.class); // TODO: Remove the eval entirely, since the distance is no longer required after filter pushdown - var evalExec = as(aggExec2.child(), EvalExec.class); + var extract = as(aggExec2.child(), FieldExtractExec.class); + var evalExec = as(extract.child(), EvalExec.class); var stDistance = as(evalExec.fields().get(0).child(), StDistance.class); assertThat("Expect distance function to expect doc-values", stDistance.leftDocValues(), is(true)); var source = assertChildIsGeoPointExtract(evalExec, FieldExtractPreference.DOC_VALUES); @@ -8110,7 +8122,8 @@ private static EsQueryExec assertChildIsExtractedAs( "Expect field attribute to be extracted as " + fieldExtractPreference, extract.attributesToExtract() .stream() - .allMatch(attr -> extract.fieldExtractPreference(attr) == fieldExtractPreference && attr.dataType() == dataType) + .filter(t -> t.dataType() == dataType) + .allMatch(attr -> extract.fieldExtractPreference(attr) == fieldExtractPreference) ); return source(extract.child()); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/TestPhysicalOperationProviders.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/TestPhysicalOperationProviders.java index a8916f140ea1f..dbd888ed29fd2 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/TestPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/TestPhysicalOperationProviders.java @@ -9,10 +9,7 @@ import org.apache.lucene.util.BytesRef; import org.elasticsearch.analysis.common.CommonAnalysisPlugin; -import org.elasticsearch.common.Randomness; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.BigArrays; -import org.elasticsearch.compute.Describable; import org.elasticsearch.compute.aggregation.AggregatorMode; import org.elasticsearch.compute.aggregation.GroupingAggregator; import org.elasticsearch.compute.aggregation.blockhash.BlockHash; @@ -30,7 +27,6 @@ import org.elasticsearch.compute.operator.DriverContext; import org.elasticsearch.compute.operator.HashAggregationOperator; import org.elasticsearch.compute.operator.Operator; -import org.elasticsearch.compute.operator.OrdinalsGroupingOperator; import org.elasticsearch.compute.operator.SourceOperator; import org.elasticsearch.compute.operator.SourceOperator.SourceOperatorFactory; import org.elasticsearch.compute.operator.TimeSeriesAggregationOperator; @@ -58,7 +54,6 @@ import org.elasticsearch.xpack.esql.core.util.SpatialCoordinateTypes; import org.elasticsearch.xpack.esql.expression.function.UnsupportedAttribute; import org.elasticsearch.xpack.esql.expression.function.scalar.convert.AbstractConvertFunction; -import org.elasticsearch.xpack.esql.plan.physical.AggregateExec; import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec; import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec; import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesAggregateExec; @@ -71,7 +66,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; -import java.util.Random; import java.util.Set; import java.util.function.BiFunction; import java.util.function.Consumer; @@ -79,8 +73,6 @@ import java.util.function.Supplier; import java.util.stream.IntStream; -import static com.carrotsearch.randomizedtesting.generators.RandomNumbers.randomIntBetween; -import static java.util.stream.Collectors.joining; import static org.apache.lucene.tests.util.LuceneTestCase.createTempDir; import static org.elasticsearch.compute.aggregation.spatial.SpatialAggregationUtils.encodeLongitude; import static org.elasticsearch.index.mapper.MappedFieldType.FieldExtractPreference.DOC_VALUES; @@ -138,25 +130,6 @@ public PhysicalOperation timeSeriesSourceOperation(TimeSeriesSourceExec ts, Loca throw new UnsupportedOperationException("time-series source is not supported in CSV tests"); } - @Override - public Operator.OperatorFactory ordinalGroupingOperatorFactory( - PhysicalOperation source, - AggregateExec aggregateExec, - List aggregatorFactories, - Attribute attrSource, - ElementType groupElementType, - LocalExecutionPlannerContext context - ) { - int channelIndex = source.layout.numberOfChannels(); - return new TestOrdinalsGroupingAggregationOperatorFactory( - channelIndex, - aggregatorFactories, - groupElementType, - context.bigArrays(), - attrSource - ); - } - @Override public Operator.OperatorFactory timeSeriesAggregatorOperatorFactory( TimeSeriesAggregateExec ts, @@ -408,58 +381,6 @@ protected Page wrapPage(Page page) { } } - /** - * Pretends to be the {@link OrdinalsGroupingOperator} but always delegates to the - * {@link HashAggregationOperator}. - */ - private class TestOrdinalsGroupingAggregationOperatorFactory implements Operator.OperatorFactory { - private final int groupByChannel; - private final List aggregators; - private final ElementType groupElementType; - private final BigArrays bigArrays; - private final Attribute attribute; - - TestOrdinalsGroupingAggregationOperatorFactory( - int channelIndex, - List aggregatorFactories, - ElementType groupElementType, - BigArrays bigArrays, - Attribute attribute - ) { - this.groupByChannel = channelIndex; - this.aggregators = aggregatorFactories; - this.groupElementType = groupElementType; - this.bigArrays = bigArrays; - this.attribute = attribute; - } - - @Override - public Operator get(DriverContext driverContext) { - Random random = Randomness.get(); - int pageSize = random.nextBoolean() ? randomIntBetween(random, 1, 16) : randomIntBetween(random, 1, 10 * 1024); - return new TestHashAggregationOperator( - aggregators, - () -> BlockHash.build( - List.of(new BlockHash.GroupSpec(groupByChannel, groupElementType)), - driverContext.blockFactory(), - pageSize, - false - ), - attribute, - driverContext - ); - } - - @Override - public String describe() { - return "TestHashAggregationOperator(mode = " - + "" - + ", aggs = " - + aggregators.stream().map(Describable::describe).collect(joining(", ")) - + ")"; - } - } - private Block extractBlockForColumn( DocBlock docBlock, DataType dataType,