From 3cae3af8806fad20e1808baed693c515c18c0a4d Mon Sep 17 00:00:00 2001 From: Przemyslaw Witek Date: Mon, 21 Jul 2025 11:39:17 +0200 Subject: [PATCH] Add "emitEmptyBuckets" parameter to the "Bucket" function. --- .../compute/operator/AggregatorBenchmark.java | 3 +- .../operator/ValuesAggregatorBenchmark.java | 1 + .../_snippets/functions/parameters/bucket.md | 3 + .../esql/_snippets/functions/types/bucket.md | 120 +++---- .../esql/images/functions/bucket.svg | 2 +- .../org/elasticsearch/TransportVersions.java | 1 + .../aggregation/GroupingAggregator.java | 4 + .../aggregation/blockhash/BlockHash.java | 18 +- .../operator/HashAggregationOperator.java | 57 ++++ .../TimeSeriesAggregationOperator.java | 5 +- .../GroupingAggregatorFunctionTestCase.java | 2 +- .../blockhash/TopNBlockHashTests.java | 2 +- .../HashAggregationOperatorTests.java | 6 +- .../src/main/resources/bucket.csv-spec | 311 ++++++++++++++++++ .../xpack/esql/action/EsqlCapabilities.java | 5 + .../function/EsqlFunctionRegistry.java | 38 ++- .../function/ThreeOptionalArguments.java | 16 + .../expression/function/grouping/Bucket.java | 194 +++++++++-- .../AbstractPhysicalOperationProviders.java | 41 ++- .../expression/function/RailRoadDiagram.java | 18 +- .../grouping/BucketSerializationTests.java | 9 +- .../function/grouping/BucketTests.java | 6 +- .../rules/logical/FoldNullTests.java | 2 +- .../TestPhysicalOperationProviders.java | 3 +- 24 files changed, 746 insertions(+), 121 deletions(-) create mode 100644 x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/ThreeOptionalArguments.java diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/AggregatorBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/AggregatorBenchmark.java index d5fe1b4a697e0..b447fd29bffa5 100644 --- a/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/AggregatorBenchmark.java +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/AggregatorBenchmark.java @@ -191,11 +191,12 @@ private static Operator operator(DriverContext driverContext, String grouping, S new BlockHash.GroupSpec(2, ElementType.BYTES_REF) ); case TOP_N_LONGS -> List.of( - new BlockHash.GroupSpec(0, ElementType.LONG, null, new BlockHash.TopNDef(0, true, true, TOP_N_LIMIT)) + new BlockHash.GroupSpec(0, ElementType.LONG, null, new BlockHash.TopNDef(0, true, true, TOP_N_LIMIT), null) ); default -> throw new IllegalArgumentException("unsupported grouping [" + grouping + "]"); }; return new HashAggregationOperator( + groups, List.of(supplier(op, dataType, filter).groupingAggregatorFactory(AggregatorMode.SINGLE, List.of(groups.size()))), () -> BlockHash.build(groups, driverContext.blockFactory(), 16 * 1024, false), driverContext diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesAggregatorBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesAggregatorBenchmark.java index def9c58160002..6bf451514aa1a 100644 --- a/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesAggregatorBenchmark.java +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesAggregatorBenchmark.java @@ -122,6 +122,7 @@ private static Operator operator(DriverContext driverContext, int groups, String } List groupSpec = List.of(new BlockHash.GroupSpec(0, ElementType.LONG)); return new HashAggregationOperator( + groupSpec, List.of(supplier(dataType).groupingAggregatorFactory(mode, List.of(1))), () -> BlockHash.build(groupSpec, driverContext.blockFactory(), 16 * 1024, false), driverContext diff --git a/docs/reference/query-languages/esql/_snippets/functions/parameters/bucket.md b/docs/reference/query-languages/esql/_snippets/functions/parameters/bucket.md index cadd93c20be11..9bda78c46e8fe 100644 --- a/docs/reference/query-languages/esql/_snippets/functions/parameters/bucket.md +++ b/docs/reference/query-languages/esql/_snippets/functions/parameters/bucket.md @@ -14,3 +14,6 @@ `to` : End of the range. Can be a number, a date or a date expressed as a string. +`emitEmptyBuckets` +: Whether or not empty buckets should be emitted. + diff --git a/docs/reference/query-languages/esql/_snippets/functions/types/bucket.md b/docs/reference/query-languages/esql/_snippets/functions/types/bucket.md index 658d11d6f1130..578f527efb4ce 100644 --- a/docs/reference/query-languages/esql/_snippets/functions/types/bucket.md +++ b/docs/reference/query-languages/esql/_snippets/functions/types/bucket.md @@ -2,64 +2,64 @@ **Supported types** -| field | buckets | from | to | result | -| --- | --- | --- | --- | --- | -| date | date_period | | | date | -| date | integer | date | date | date | -| date | integer | date | keyword | date | -| date | integer | date | text | date | -| date | integer | keyword | date | date | -| date | integer | keyword | keyword | date | -| date | integer | keyword | text | date | -| date | integer | text | date | date | -| date | integer | text | keyword | date | -| date | integer | text | text | date | -| date | time_duration | | | date | -| date_nanos | date_period | | | date_nanos | -| date_nanos | integer | date | date | date_nanos | -| date_nanos | integer | date | keyword | date_nanos | -| date_nanos | integer | date | text | date_nanos | -| date_nanos | integer | keyword | date | date_nanos | -| date_nanos | integer | keyword | keyword | date_nanos | -| date_nanos | integer | keyword | text | date_nanos | -| date_nanos | integer | text | date | date_nanos | -| date_nanos | integer | text | keyword | date_nanos | -| date_nanos | integer | text | text | date_nanos | -| date_nanos | time_duration | | | date_nanos | -| double | double | | | double | -| double | integer | double | double | double | -| double | integer | double | integer | double | -| double | integer | double | long | double | -| double | integer | integer | double | double | -| double | integer | integer | integer | double | -| double | integer | integer | long | double | -| double | integer | long | double | double | -| double | integer | long | integer | double | -| double | integer | long | long | double | -| double | integer | | | double | -| double | long | | | double | -| integer | double | | | double | -| integer | integer | double | double | double | -| integer | integer | double | integer | double | -| integer | integer | double | long | double | -| integer | integer | integer | double | double | -| integer | integer | integer | integer | double | -| integer | integer | integer | long | double | -| integer | integer | long | double | double | -| integer | integer | long | integer | double | -| integer | integer | long | long | double | -| integer | integer | | | double | -| integer | long | | | double | -| long | double | | | double | -| long | integer | double | double | double | -| long | integer | double | integer | double | -| long | integer | double | long | double | -| long | integer | integer | double | double | -| long | integer | integer | integer | double | -| long | integer | integer | long | double | -| long | integer | long | double | double | -| long | integer | long | integer | double | -| long | integer | long | long | double | -| long | integer | | | double | -| long | long | | | double | +| field | buckets | from | to | emitEmptyBuckets | result | +| --- | --- | --- | --- | --- | --- | +| date | date_period | | | | date | +| date | integer | date | date | | date | +| date | integer | date | keyword | | date | +| date | integer | date | text | | date | +| date | integer | keyword | date | | date | +| date | integer | keyword | keyword | | date | +| date | integer | keyword | text | | date | +| date | integer | text | date | | date | +| date | integer | text | keyword | | date | +| date | integer | text | text | | date | +| date | time_duration | | | | date | +| date_nanos | date_period | | | | date_nanos | +| date_nanos | integer | date | date | | date_nanos | +| date_nanos | integer | date | keyword | | date_nanos | +| date_nanos | integer | date | text | | date_nanos | +| date_nanos | integer | keyword | date | | date_nanos | +| date_nanos | integer | keyword | keyword | | date_nanos | +| date_nanos | integer | keyword | text | | date_nanos | +| date_nanos | integer | text | date | | date_nanos | +| date_nanos | integer | text | keyword | | date_nanos | +| date_nanos | integer | text | text | | date_nanos | +| date_nanos | time_duration | | | | date_nanos | +| double | double | | | | double | +| double | integer | double | double | | double | +| double | integer | double | integer | | double | +| double | integer | double | long | | double | +| double | integer | integer | double | | double | +| double | integer | integer | integer | | double | +| double | integer | integer | long | | double | +| double | integer | long | double | | double | +| double | integer | long | integer | | double | +| double | integer | long | long | | double | +| double | integer | | | | double | +| double | long | | | | double | +| integer | double | | | | double | +| integer | integer | double | double | | double | +| integer | integer | double | integer | | double | +| integer | integer | double | long | | double | +| integer | integer | integer | double | | double | +| integer | integer | integer | integer | | double | +| integer | integer | integer | long | | double | +| integer | integer | long | double | | double | +| integer | integer | long | integer | | double | +| integer | integer | long | long | | double | +| integer | integer | | | | double | +| integer | long | | | | double | +| long | double | | | | double | +| long | integer | double | double | | double | +| long | integer | double | integer | | double | +| long | integer | double | long | | double | +| long | integer | integer | double | | double | +| long | integer | integer | integer | | double | +| long | integer | integer | long | | double | +| long | integer | long | double | | double | +| long | integer | long | integer | | double | +| long | integer | long | long | | double | +| long | integer | | | | double | +| long | long | | | | double | diff --git a/docs/reference/query-languages/esql/images/functions/bucket.svg b/docs/reference/query-languages/esql/images/functions/bucket.svg index 78694296922ed..900db7701480f 100644 --- a/docs/reference/query-languages/esql/images/functions/bucket.svg +++ b/docs/reference/query-languages/esql/images/functions/bucket.svg @@ -1 +1 @@ -BUCKET(field,buckets,from,to) \ No newline at end of file +BUCKET(field,buckets,from,to,emitEmptyBuckets) \ No newline at end of file diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index dad300ae72744..6c56a5321058b 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -352,6 +352,7 @@ static TransportVersion def(int id) { public static final TransportVersion ESQL_TOPN_TIMINGS = def(9_128_0_00); public static final TransportVersion NODE_WEIGHTS_ADDED_TO_NODE_BALANCE_STATS = def(9_129_0_00); public static final TransportVersion RERANK_SNIPPETS = def(9_130_0_00); + public static final TransportVersion ESQL_EMIT_EMPTY_BUCKETS = def(9_131_0_00); /* * STOP! READ THIS FIRST! No, really, diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/GroupingAggregator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/GroupingAggregator.java index e84560a39cd4f..b0edca4ae25ba 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/GroupingAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/GroupingAggregator.java @@ -23,6 +23,10 @@ public class GroupingAggregator implements Releasable { private final AggregatorMode mode; + public AggregatorMode getMode() { + return mode; + } + public interface Factory extends Function, Describable {} public GroupingAggregator(GroupingAggregatorFunction aggregatorFunction, AggregatorMode mode) { diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/BlockHash.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/BlockHash.java index 63f4d9c96bcd0..2c10b248082de 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/BlockHash.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/blockhash/BlockHash.java @@ -127,6 +127,12 @@ public abstract class BlockHash implements Releasable, SeenGroupIds { */ public record TopNDef(int order, boolean asc, boolean nullsFirst, int limit) {} + public interface EmptyBucketGenerator { + int getEmptyBucketCount(); + + void generate(Block.Builder blockBuilder); + } + /** * Configuration for a BlockHash group spec that is doing text categorization. */ @@ -137,13 +143,19 @@ public enum OutputFormat { } } - public record GroupSpec(int channel, ElementType elementType, @Nullable CategorizeDef categorizeDef, @Nullable TopNDef topNDef) { + public record GroupSpec( + int channel, + ElementType elementType, + @Nullable CategorizeDef categorizeDef, + @Nullable TopNDef topNDef, + @Nullable EmptyBucketGenerator emptyBucketGenerator + ) { public GroupSpec(int channel, ElementType elementType) { - this(channel, elementType, null, null); + this(channel, elementType, null, null, null); } public GroupSpec(int channel, ElementType elementType, CategorizeDef categorizeDef) { - this(channel, elementType, categorizeDef, null); + this(channel, elementType, categorizeDef, null, null); } public boolean isCategorize() { diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashAggregationOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashAggregationOperator.java index cbce712ed9cdb..20734fe8c1a44 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashAggregationOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashAggregationOperator.java @@ -20,6 +20,7 @@ import org.elasticsearch.compute.aggregation.GroupingAggregatorFunction; import org.elasticsearch.compute.aggregation.blockhash.BlockHash; import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.DocBlock; import org.elasticsearch.compute.data.IntArrayBlock; import org.elasticsearch.compute.data.IntBigArrayBlock; import org.elasticsearch.compute.data.IntVector; @@ -34,6 +35,7 @@ import java.util.Arrays; import java.util.List; import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; import static java.util.Objects.requireNonNull; @@ -52,6 +54,7 @@ public record HashAggregationOperatorFactory( public Operator get(DriverContext driverContext) { if (groups.stream().anyMatch(BlockHash.GroupSpec::isCategorize)) { return new HashAggregationOperator( + groups, aggregators, () -> BlockHash.buildCategorizeBlockHash( groups, @@ -64,6 +67,7 @@ public Operator get(DriverContext driverContext) { ); } return new HashAggregationOperator( + groups, aggregators, () -> BlockHash.build(groups, driverContext.blockFactory(), maxPageSize, false), driverContext @@ -83,6 +87,7 @@ public String describe() { private boolean finished; private Page output; + private final List groups; private final BlockHash blockHash; protected final List aggregators; @@ -117,10 +122,12 @@ public String describe() { @SuppressWarnings("this-escape") public HashAggregationOperator( + List groups, List aggregators, Supplier blockHash, DriverContext driverContext ) { + this.groups = groups; this.aggregators = new ArrayList<>(aggregators.size()); this.driverContext = driverContext; boolean success = false; @@ -142,8 +149,22 @@ public boolean needsInput() { return finished == false; } + private final AtomicBoolean isInitialPage = new AtomicBoolean(true); + @Override public void addInput(Page page) { + if (isInitialPage.compareAndSet(true, false) + && (aggregators.size() == 0 || AggregatorMode.INITIAL.equals(aggregators.get(0).getMode()))) { + Page initialPage = createInitialPage(page); + if (initialPage != null) { + addInputInternal(initialPage); + return; + } + } + addInputInternal(page); + } + + private void addInputInternal(Page page) { try { GroupingAggregatorFunction.AddInput[] prepared = new GroupingAggregatorFunction.AddInput[aggregators.size()]; class AddInput implements GroupingAggregatorFunction.AddInput { @@ -289,6 +310,42 @@ protected Page wrapPage(Page page) { return page; } + private Page createInitialPage(Page page) { + // If no groups are generating bucket keys, move on + if (groups.stream().allMatch(g -> g.emptyBucketGenerator() == null)) { + return page; + } + Block.Builder[] blockBuilders = new Block.Builder[page.getBlockCount()]; + for (int channel = 0; channel < page.getBlockCount(); channel++) { + Block block = page.getBlock(channel); + blockBuilders[channel] = block.elementType().newBlockBuilder(block.getPositionCount(), driverContext.blockFactory()); + blockBuilders[channel].copyFrom(block, 0, block.getPositionCount()); + } + for (BlockHash.GroupSpec group : groups) { + BlockHash.EmptyBucketGenerator emptyBucketGenerator = group.emptyBucketGenerator(); + if (emptyBucketGenerator != null) { + for (int channel = 0; channel < page.getBlockCount(); channel++) { + if (group.channel() == channel) { + emptyBucketGenerator.generate(blockBuilders[channel]); + } else { + for (int i = 0; i < emptyBucketGenerator.getEmptyBucketCount(); i++) { + if (page.getBlock(channel) instanceof DocBlock) { + // TODO: DocBlock doesn't allow appending nulls + ((DocBlock.Builder) blockBuilders[channel]).appendShard(0).appendSegment(0).appendDoc(0); + } else { + blockBuilders[channel].appendNull(); + } + } + } + } + } + } + Block[] blocks = Arrays.stream(blockBuilders).map(Block.Builder::build).toArray(Block[]::new); + Releasables.closeExpectNoException(blockBuilders); + page.releaseBlocks(); + return new Page(blocks); + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperator.java index 6ab0291c718a7..9a5f78132b266 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperator.java @@ -40,7 +40,7 @@ public record Factory( @Override public Operator get(DriverContext driverContext) { // TODO: use TimeSeriesBlockHash when possible - return new TimeSeriesAggregationOperator(timeBucket, aggregators, () -> { + return new TimeSeriesAggregationOperator(timeBucket, groups, aggregators, () -> { if (sortedInput && groups.size() == 2) { return new TimeSeriesBlockHash(groups.get(0).channel(), groups.get(1).channel(), driverContext.blockFactory()); } else { @@ -68,11 +68,12 @@ public String describe() { public TimeSeriesAggregationOperator( Rounding.Prepared timeBucket, + List groups, List aggregators, Supplier blockHash, DriverContext driverContext ) { - super(aggregators, blockHash, driverContext); + super(groups, aggregators, blockHash, driverContext); this.timeBucket = timeBucket; } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/GroupingAggregatorFunctionTestCase.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/GroupingAggregatorFunctionTestCase.java index dda9671b3b242..f35935e9b5e9b 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/GroupingAggregatorFunctionTestCase.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/GroupingAggregatorFunctionTestCase.java @@ -910,7 +910,7 @@ public void close() { }; }; - return new HashAggregationOperator(aggregators, blockHashSupplier, driverContext); + return new HashAggregationOperator(groups, aggregators, blockHashSupplier, driverContext); } @Override diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/TopNBlockHashTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/TopNBlockHashTests.java index 0ebfa7e72b805..8d3b662cb0023 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/TopNBlockHashTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/TopNBlockHashTests.java @@ -363,7 +363,7 @@ private void hashBatchesCallbackOnLast(Consumer callback, Block[].. private BlockHash buildBlockHash(int emitBatchSize, Block... values) { List specs = new ArrayList<>(values.length); for (int c = 0; c < values.length; c++) { - specs.add(new BlockHash.GroupSpec(c, values[c].elementType(), null, topNDef(c))); + specs.add(new BlockHash.GroupSpec(c, values[c].elementType(), null, topNDef(c), null)); } assert forcePackedHash == false : "Packed TopN hash not implemented yet"; /*return forcePackedHash diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/HashAggregationOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/HashAggregationOperatorTests.java index a4072754fae10..7fdb8c1235d17 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/HashAggregationOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/HashAggregationOperatorTests.java @@ -114,7 +114,7 @@ public void testTopNNullsLast() { try ( var operator = new HashAggregationOperator.HashAggregationOperatorFactory( - List.of(new BlockHash.GroupSpec(groupChannel, ElementType.LONG, null, new BlockHash.TopNDef(0, ascOrder, false, 3))), + List.of(new BlockHash.GroupSpec(groupChannel, ElementType.LONG, null, new BlockHash.TopNDef(0, ascOrder, false, 3), null)), mode, List.of( new SumLongAggregatorFunctionSupplier().groupingAggregatorFactory(mode, aggregatorChannels), @@ -191,7 +191,7 @@ public void testTopNNullsFirst() { try ( var operator = new HashAggregationOperator.HashAggregationOperatorFactory( - List.of(new BlockHash.GroupSpec(groupChannel, ElementType.LONG, null, new BlockHash.TopNDef(0, ascOrder, true, 3))), + List.of(new BlockHash.GroupSpec(groupChannel, ElementType.LONG, null, new BlockHash.TopNDef(0, ascOrder, true, 3), null)), mode, List.of( new SumLongAggregatorFunctionSupplier().groupingAggregatorFactory(mode, aggregatorChannels), @@ -277,7 +277,7 @@ public void testTopNNullsIntermediateDiscards() { var maxAggregatorChannels = mode.isInputPartial() ? List.of(3, 4) : List.of(1); return new HashAggregationOperator.HashAggregationOperatorFactory( - List.of(new BlockHash.GroupSpec(groupChannel, ElementType.LONG, null, new BlockHash.TopNDef(0, ascOrder, false, 3))), + List.of(new BlockHash.GroupSpec(groupChannel, ElementType.LONG, null, new BlockHash.TopNDef(0, ascOrder, false, 3), null)), mode, List.of( new SumLongAggregatorFunctionSupplier().groupingAggregatorFactory(mode, sumAggregatorChannels), diff --git a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/bucket.csv-spec b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/bucket.csv-spec index 49b16baf30f58..c2aa481dea541 100644 --- a/x-pack/plugin/esql/qa/testFixtures/src/main/resources/bucket.csv-spec +++ b/x-pack/plugin/esql/qa/testFixtures/src/main/resources/bucket.csv-spec @@ -145,6 +145,317 @@ AVG(salary):double | bucket:date // end::bucket_in_agg-result[] ; +bucketMonthWithEmpty +required_capability: bucket_emit_empty + +FROM employees +| WHERE hire_date >= "1985-01-01T00:00:00Z" AND hire_date < "1986-01-01T00:00:00Z" +| STATS BY bucket = BUCKET(hire_date, 20, "1985-01-01T00:00:00Z", "1986-01-01T00:00:00Z", true) +| SORT bucket +; + +bucket:datetime +1985-01-01T00:00:00.000Z +1985-02-01T00:00:00.000Z +1985-03-01T00:00:00.000Z +1985-04-01T00:00:00.000Z +1985-05-01T00:00:00.000Z +1985-06-01T00:00:00.000Z +1985-07-01T00:00:00.000Z +1985-08-01T00:00:00.000Z +1985-09-01T00:00:00.000Z +1985-10-01T00:00:00.000Z +1985-11-01T00:00:00.000Z +1985-12-01T00:00:00.000Z +; + +bucketHeightWithEmpty +required_capability: bucket_emit_empty + +FROM employees +| WHERE hire_date >= "1985-01-01T00:00:00Z" AND hire_date < "1986-01-01T00:00:00Z" +| STATS BY heightBucket = ROUND(BUCKET(height, 10, 1.0, 2.0, true), 1) +| SORT heightBucket +; + +heightBucket:double +1.0 +1.1 +1.2 +1.3 +1.4 +1.5 +1.6 +1.7 +1.8 +1.9 +2.0 +; + +bucketsWithEmptyYear +required_capability: bucket_emit_empty + +FROM employees +| WHERE hire_date >= "1980-01-01T00:00:00Z" AND hire_date < "1990-01-01T00:00:00Z" +| STATS BY + yearBucket = BUCKET(hire_date, 1 year, "1980-01-01T00:00:00Z", "1990-01-01T00:00:00Z", true), + heightBucket = ROUND(BUCKET(height, 10, 1.0, 2.0), 1) +| SORT yearBucket, heightBucket +; + +yearBucket:datetime | heightBucket:double +1980-01-01T00:00:00.000Z | null +1981-01-01T00:00:00.000Z | null +1982-01-01T00:00:00.000Z | null +1983-01-01T00:00:00.000Z | null +1984-01-01T00:00:00.000Z | null +1985-01-01T00:00:00.000Z | 1.4 +1985-01-01T00:00:00.000Z | 1.7 +1985-01-01T00:00:00.000Z | 1.8 +1985-01-01T00:00:00.000Z | 1.9 +1985-01-01T00:00:00.000Z | 2.0 +1985-01-01T00:00:00.000Z | null +1986-01-01T00:00:00.000Z | 1.4 +1986-01-01T00:00:00.000Z | 1.5 +1986-01-01T00:00:00.000Z | 1.7 +1986-01-01T00:00:00.000Z | 1.8 +1986-01-01T00:00:00.000Z | 2.0 +1986-01-01T00:00:00.000Z | 2.1 +1986-01-01T00:00:00.000Z | null +1987-01-01T00:00:00.000Z | 1.4 +1987-01-01T00:00:00.000Z | 1.5 +1987-01-01T00:00:00.000Z | 1.6 +1987-01-01T00:00:00.000Z | 1.7 +1987-01-01T00:00:00.000Z | 1.8 +1987-01-01T00:00:00.000Z | 1.9 +1987-01-01T00:00:00.000Z | 2.0 +1987-01-01T00:00:00.000Z | 2.1 +1987-01-01T00:00:00.000Z | null +1988-01-01T00:00:00.000Z | 1.4 +1988-01-01T00:00:00.000Z | 1.5 +1988-01-01T00:00:00.000Z | 1.7 +1988-01-01T00:00:00.000Z | 1.8 +1988-01-01T00:00:00.000Z | 1.9 +1988-01-01T00:00:00.000Z | null +1989-01-01T00:00:00.000Z | 1.5 +1989-01-01T00:00:00.000Z | 1.7 +1989-01-01T00:00:00.000Z | 2.0 +1989-01-01T00:00:00.000Z | null +; + +bucketsWithEmptyHeight +required_capability: bucket_emit_empty + +FROM employees +| WHERE hire_date >= "1985-01-01T00:00:00Z" AND hire_date < "1990-01-01T00:00:00Z" +| STATS BY + yearBucket = BUCKET(hire_date, 1 year, "1985-01-01T00:00:00Z", "1990-01-01T00:00:00Z"), + heightBucket = ROUND(BUCKET(height, 10, 1.0, 2.0, true), 1) +| SORT yearBucket, heightBucket +; + +yearBucket:datetime | heightBucket:double +1985-01-01T00:00:00.000Z | 1.4 +1985-01-01T00:00:00.000Z | 1.7 +1985-01-01T00:00:00.000Z | 1.8 +1985-01-01T00:00:00.000Z | 1.9 +1985-01-01T00:00:00.000Z | 2.0 +1986-01-01T00:00:00.000Z | 1.4 +1986-01-01T00:00:00.000Z | 1.5 +1986-01-01T00:00:00.000Z | 1.7 +1986-01-01T00:00:00.000Z | 1.8 +1986-01-01T00:00:00.000Z | 2.0 +1986-01-01T00:00:00.000Z | 2.1 +1987-01-01T00:00:00.000Z | 1.4 +1987-01-01T00:00:00.000Z | 1.5 +1987-01-01T00:00:00.000Z | 1.6 +1987-01-01T00:00:00.000Z | 1.7 +1987-01-01T00:00:00.000Z | 1.8 +1987-01-01T00:00:00.000Z | 1.9 +1987-01-01T00:00:00.000Z | 2.0 +1987-01-01T00:00:00.000Z | 2.1 +1988-01-01T00:00:00.000Z | 1.4 +1988-01-01T00:00:00.000Z | 1.5 +1988-01-01T00:00:00.000Z | 1.7 +1988-01-01T00:00:00.000Z | 1.8 +1988-01-01T00:00:00.000Z | 1.9 +1989-01-01T00:00:00.000Z | 1.5 +1989-01-01T00:00:00.000Z | 1.7 +1989-01-01T00:00:00.000Z | 2.0 +null | 1.0 +null | 1.1 +null | 1.2 +null | 1.3 +null | 1.4 +null | 1.5 +null | 1.6 +null | 1.7 +null | 1.8 +null | 1.9 +; + +bucketMonthInAggWithEmpty +required_capability: bucket_emit_empty + +FROM employees +| WHERE hire_date >= "1985-01-01T00:00:00Z" AND hire_date < "1986-01-01T00:00:00Z" +| STATS MAX(salary) BY bucket = BUCKET(hire_date, 20, "1985-01-01T00:00:00Z", "1986-01-01T00:00:00Z", true) +| SORT bucket +; + +MAX(salary):integer | bucket:datetime +null | 1985-01-01T00:00:00.000Z +66174 | 1985-02-01T00:00:00.000Z +null | 1985-03-01T00:00:00.000Z +null | 1985-04-01T00:00:00.000Z +44817 | 1985-05-01T00:00:00.000Z +null | 1985-06-01T00:00:00.000Z +62405 | 1985-07-01T00:00:00.000Z +null | 1985-08-01T00:00:00.000Z +49095 | 1985-09-01T00:00:00.000Z +54329 | 1985-10-01T00:00:00.000Z +74999 | 1985-11-01T00:00:00.000Z +null | 1985-12-01T00:00:00.000Z +; + +bucketMonthInAggWithEmpty2 +required_capability: bucket_emit_empty + +FROM employees +| WHERE hire_date >= "1985-01-01T00:00:00Z" AND hire_date < "1986-01-01T00:00:00Z" +| STATS MAX(salary) BY bucket = BUCKET(hire_date, 1 month, "1985-01-01T00:00:00Z", "1986-01-01T00:00:00Z", true) +| SORT bucket +; + +MAX(salary):integer | bucket:datetime +null | 1985-01-01T00:00:00.000Z +66174 | 1985-02-01T00:00:00.000Z +null | 1985-03-01T00:00:00.000Z +null | 1985-04-01T00:00:00.000Z +44817 | 1985-05-01T00:00:00.000Z +null | 1985-06-01T00:00:00.000Z +62405 | 1985-07-01T00:00:00.000Z +null | 1985-08-01T00:00:00.000Z +49095 | 1985-09-01T00:00:00.000Z +54329 | 1985-10-01T00:00:00.000Z +74999 | 1985-11-01T00:00:00.000Z +null | 1985-12-01T00:00:00.000Z +; + +bucketMonthInAggWithEmpty3 +required_capability: bucket_emit_empty + +FROM employees +| WHERE hire_date >= "1985-01-01T00:00:00Z" AND hire_date < "1986-01-01T00:00:00Z" +| STATS MAX(salary) BY bucket = BUCKET(hire_date, 1 month, "1985-01-01T00:00:00Z", "1987-01-01T00:00:00Z", true) +| SORT bucket +; + +MAX(salary):integer | bucket:datetime +null | 1985-01-01T00:00:00.000Z +66174 | 1985-02-01T00:00:00.000Z +null | 1985-03-01T00:00:00.000Z +null | 1985-04-01T00:00:00.000Z +44817 | 1985-05-01T00:00:00.000Z +null | 1985-06-01T00:00:00.000Z +62405 | 1985-07-01T00:00:00.000Z +null | 1985-08-01T00:00:00.000Z +49095 | 1985-09-01T00:00:00.000Z +54329 | 1985-10-01T00:00:00.000Z +74999 | 1985-11-01T00:00:00.000Z +null | 1985-12-01T00:00:00.000Z +null | 1986-01-01T00:00:00.000Z +null | 1986-02-01T00:00:00.000Z +null | 1986-03-01T00:00:00.000Z +null | 1986-04-01T00:00:00.000Z +null | 1986-05-01T00:00:00.000Z +null | 1986-06-01T00:00:00.000Z +null | 1986-07-01T00:00:00.000Z +null | 1986-08-01T00:00:00.000Z +null | 1986-09-01T00:00:00.000Z +null | 1986-10-01T00:00:00.000Z +null | 1986-11-01T00:00:00.000Z +null | 1986-12-01T00:00:00.000Z +; + +bucketMonthInAggWithEmpty4 +required_capability: bucket_emit_empty + +FROM employees +| WHERE hire_date >= "1985-01-01T00:00:00Z" AND hire_date < "1989-01-01T00:00:00Z" +| STATS MAX(salary) BY bucket = BUCKET(hire_date, 1 month, "1985-01-01T00:00:00Z", "1987-01-01T00:00:00Z", true) +| SORT bucket +; + +MAX(salary):integer | bucket:datetime +null | 1985-01-01T00:00:00.000Z +66174 | 1985-02-01T00:00:00.000Z +null | 1985-03-01T00:00:00.000Z +null | 1985-04-01T00:00:00.000Z +44817 | 1985-05-01T00:00:00.000Z +null | 1985-06-01T00:00:00.000Z +62405 | 1985-07-01T00:00:00.000Z +null | 1985-08-01T00:00:00.000Z +49095 | 1985-09-01T00:00:00.000Z +54329 | 1985-10-01T00:00:00.000Z +74999 | 1985-11-01T00:00:00.000Z +null | 1985-12-01T00:00:00.000Z +null | 1986-01-01T00:00:00.000Z +54462 | 1986-02-01T00:00:00.000Z +44956 | 1986-03-01T00:00:00.000Z +null | 1986-04-01T00:00:00.000Z +null | 1986-05-01T00:00:00.000Z +57305 | 1986-06-01T00:00:00.000Z +37702 | 1986-07-01T00:00:00.000Z +61805 | 1986-08-01T00:00:00.000Z +32272 | 1986-09-01T00:00:00.000Z +50128 | 1986-10-01T00:00:00.000Z +null | 1986-11-01T00:00:00.000Z +36174 | 1986-12-01T00:00:00.000Z +70011 | 1987-03-01T00:00:00.000Z +66817 | 1987-04-01T00:00:00.000Z +69904 | 1987-05-01T00:00:00.000Z +25324 | 1987-07-01T00:00:00.000Z +47411 | 1987-08-01T00:00:00.000Z +68431 | 1987-09-01T00:00:00.000Z +40612 | 1987-10-01T00:00:00.000Z +29175 | 1987-11-01T00:00:00.000Z +36051 | 1988-01-01T00:00:00.000Z +60408 | 1988-02-01T00:00:00.000Z +55360 | 1988-05-01T00:00:00.000Z +54518 | 1988-07-01T00:00:00.000Z +39878 | 1988-09-01T00:00:00.000Z +73578 | 1988-10-01T00:00:00.000Z +; + +bucketMonthInAggsWithEmpty +required_capability: bucket_emit_empty + +FROM employees +| WHERE hire_date >= "1985-01-01T00:00:00Z" AND hire_date < "1986-01-01T00:00:00Z" +| STATS MIN(salary), MAX(salary), AVG(salary) BY bucket = BUCKET(hire_date, 20, "1985-01-01T00:00:00Z", "1986-01-01T00:00:00Z", true) +| SORT bucket +; +warningRegex:evaluation of \[AVG\(salary\)\] failed, treating result as null. Only first 20 failures recorded +warningRegex:java.lang.ArithmeticException: / by zero + +// tag::bucket_in_aggs_with_empty-result[] +MIN(salary):integer | MAX(salary):integer | AVG(salary):double | bucket:datetime +null | null | null | 1985-01-01T00:00:00.000Z +26436 | 66174 | 46305.0 | 1985-02-01T00:00:00.000Z +null | null | null | 1985-03-01T00:00:00.000Z +null | null | null | 1985-04-01T00:00:00.000Z +44817 | 44817 | 44817.0 | 1985-05-01T00:00:00.000Z +null | null | null | 1985-06-01T00:00:00.000Z +62405 | 62405 | 62405.0 | 1985-07-01T00:00:00.000Z +null | null | null | 1985-08-01T00:00:00.000Z +49095 | 49095 | 49095.0 | 1985-09-01T00:00:00.000Z +48735 | 54329 | 51532.0 | 1985-10-01T00:00:00.000Z +33956 | 74999 | 54539.75 | 1985-11-01T00:00:00.000Z +null | null | null | 1985-12-01T00:00:00.000Z +// end::bucket_in_aggs_with_empty-result[] +; + bucketWithOffset#[skip:-8.13.99, reason:BUCKET renamed in 8.14] // tag::bucketWithOffset[] FROM employees diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java index 5491ba58887f7..b59f7e043f67d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java @@ -653,6 +653,11 @@ public enum Cap { */ BUCKET_WHOLE_NUMBER_AS_SPAN, + /** + * Support for the BUCKET function emitting empty buckets. + */ + BUCKET_EMIT_EMPTY(Build.current().isSnapshot()), + /** * Allow mixed numeric types in coalesce */ diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java index 707de5b31ffb7..f2cd11fa8281f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java @@ -305,7 +305,7 @@ private static FunctionDefinition[][] functions() { return new FunctionDefinition[][] { // grouping functions new FunctionDefinition[] { - def(Bucket.class, Bucket::new, "bucket", "bin"), + def(Bucket.class, quin(Bucket::new), "bucket", "bin"), def(Categorize.class, Categorize::new, "categorize") }, // aggregate functions // since they declare two public constructors - one with filter (for nested where) and one without @@ -1018,6 +1018,39 @@ protected static FunctionDefinition def(Class function, return def(function, builder, names); } + /** + * Build a {@linkplain FunctionDefinition} for a quinary function. + */ + @SuppressWarnings("overloads") // These are ambiguous if you aren't using ctor references but we always do + protected static FunctionDefinition def(Class function, QuinaryBuilder ctorRef, String... names) { + FunctionBuilder builder = (source, children, cfg) -> { + if (OptionalArgument.class.isAssignableFrom(function)) { + if (children.size() > 5 || children.size() < 4) { + throw new QlIllegalArgumentException("expects four or five arguments"); + } + } else if (TwoOptionalArguments.class.isAssignableFrom(function)) { + if (children.size() > 5 || children.size() < 3) { + throw new QlIllegalArgumentException("expects minimum three, maximum five arguments"); + } + } else if (ThreeOptionalArguments.class.isAssignableFrom(function)) { + if (children.size() > 5 || children.size() < 2) { + throw new QlIllegalArgumentException("expects minimum two, maximum five arguments"); + } + } else if (children.size() != 5) { + throw new QlIllegalArgumentException("expects exactly five arguments"); + } + return ctorRef.build( + source, + children.get(0), + children.get(1), + children.size() > 2 ? children.get(2) : null, + children.size() > 3 ? children.get(3) : null, + children.size() > 4 ? children.get(4) : null + ); + }; + return def(function, builder, names); + } + protected interface QuaternaryBuilder { T build(Source source, Expression one, Expression two, Expression three, Expression four); } @@ -1216,4 +1249,7 @@ private static QuaternaryBuilder quad(QuaternaryBuilder< return function; } + private static QuinaryBuilder quin(QuinaryBuilder function) { + return function; + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/ThreeOptionalArguments.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/ThreeOptionalArguments.java new file mode 100644 index 0000000000000..be464c05ef6cb --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/ThreeOptionalArguments.java @@ -0,0 +1,16 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.expression.function; + +/** + * Marker interface indicating that a function accepts three optional arguments (the last three). + * This is used by the {@link EsqlFunctionRegistry} to perform validation of function declaration. + */ +public interface ThreeOptionalArguments { + +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/grouping/Bucket.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/grouping/Bucket.java index 01bc4dd2b4eec..163e804bc44ba 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/grouping/Bucket.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/grouping/Bucket.java @@ -8,10 +8,15 @@ package org.elasticsearch.xpack.esql.expression.function.grouping; import org.apache.lucene.util.BytesRef; +import org.elasticsearch.TransportVersions; import org.elasticsearch.common.Rounding; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.compute.aggregation.blockhash.BlockHash; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.DoubleBlock; +import org.elasticsearch.compute.data.LongBlock; import org.elasticsearch.compute.operator.EvalOperator.ExpressionEvaluator; import org.elasticsearch.core.TimeValue; import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException; @@ -30,7 +35,7 @@ import org.elasticsearch.xpack.esql.expression.function.FunctionInfo; import org.elasticsearch.xpack.esql.expression.function.FunctionType; import org.elasticsearch.xpack.esql.expression.function.Param; -import org.elasticsearch.xpack.esql.expression.function.TwoOptionalArguments; +import org.elasticsearch.xpack.esql.expression.function.ThreeOptionalArguments; import org.elasticsearch.xpack.esql.expression.function.scalar.date.DateTrunc; import org.elasticsearch.xpack.esql.expression.function.scalar.math.Floor; import org.elasticsearch.xpack.esql.expression.predicate.operator.arithmetic.Div; @@ -39,6 +44,8 @@ import org.elasticsearch.xpack.esql.stats.SearchStats; import java.io.IOException; +import java.math.BigDecimal; +import java.math.RoundingMode; import java.util.ArrayList; import java.util.List; @@ -63,8 +70,9 @@ public class Bucket extends GroupingFunction.EvaluatableGroupingFunction implements PostOptimizationVerificationAware, - TwoOptionalArguments, + ThreeOptionalArguments, LocalSurrogateExpression { + public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "Bucket", Bucket::new); // TODO maybe we should just cover the whole of representable dates here - like ten years, 100 years, 1000 years, all the way up. @@ -94,6 +102,7 @@ public class Bucket extends GroupingFunction.EvaluatableGroupingFunction private final Expression buckets; private final Expression from; private final Expression to; + private final Expression emitEmptyBuckets; @FunctionInfo( returnType = { "double", "date", "date_nanos" }, @@ -212,13 +221,20 @@ public Bucket( type = { "integer", "long", "double", "date", "keyword", "text" }, optional = true, description = "End of the range. Can be a number, a date or a date expressed as a string." - ) Expression to + ) Expression to, + @Param( + name = "emitEmptyBuckets", + type = { "boolean" }, + optional = true, + description = "Whether or not empty buckets should be emitted." + ) Expression emitEmptyBuckets ) { - super(source, fields(field, buckets, from, to)); + super(source, fields(field, buckets, from, to, emitEmptyBuckets)); this.field = field; this.buckets = buckets; this.from = from; this.to = to; + this.emitEmptyBuckets = emitEmptyBuckets; } private Bucket(StreamInput in) throws IOException { @@ -227,11 +243,20 @@ private Bucket(StreamInput in) throws IOException { in.readNamedWriteable(Expression.class), in.readNamedWriteable(Expression.class), in.readOptionalNamedWriteable(Expression.class), - in.readOptionalNamedWriteable(Expression.class) + in.readOptionalNamedWriteable(Expression.class), + in.getTransportVersion().onOrAfter(TransportVersions.ESQL_EMIT_EMPTY_BUCKETS) + ? in.readOptionalNamedWriteable(Expression.class) + : null ); } - private static List fields(Expression field, Expression buckets, Expression from, Expression to) { + private static List fields( + Expression field, + Expression buckets, + Expression from, + Expression to, + Expression emitEmptyBuckets + ) { List list = new ArrayList<>(4); list.add(field); list.add(buckets); @@ -241,6 +266,9 @@ private static List fields(Expression field, Expression buckets, Exp list.add(to); } } + if (emitEmptyBuckets != null) { + list.add(emitEmptyBuckets); + } return list; } @@ -251,6 +279,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeNamedWriteable(buckets); out.writeOptionalNamedWriteable(from); out.writeOptionalNamedWriteable(to); + if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_EMIT_EMPTY_BUCKETS)) { + out.writeOptionalNamedWriteable(emitEmptyBuckets); + } } @Override @@ -260,25 +291,21 @@ public String getWriteableName() { @Override public boolean foldable() { - return field.foldable() && buckets.foldable() && (from == null || from.foldable()) && (to == null || to.foldable()); + return field.foldable() + && buckets.foldable() + && (from == null || from.foldable()) + && (to == null || to.foldable()) + && (emitEmptyBuckets == null || emitEmptyBuckets.foldable()); } @Override public ExpressionEvaluator.Factory toEvaluator(ToEvaluator toEvaluator) { if (field.dataType() == DataType.DATETIME || field.dataType() == DataType.DATE_NANOS) { - Rounding.Prepared preparedRounding = getDateRounding(toEvaluator.foldCtx()); + Rounding.Prepared preparedRounding = getDateRounding(field, buckets, from, to, toEvaluator.foldCtx()); return DateTrunc.evaluator(field.dataType(), source(), toEvaluator.apply(field), preparedRounding); } if (field.dataType().isNumeric()) { - double roundTo; - if (from != null) { - int b = ((Number) buckets.fold(toEvaluator.foldCtx())).intValue(); - double f = ((Number) from.fold(toEvaluator.foldCtx())).doubleValue(); - double t = ((Number) to.fold(toEvaluator.foldCtx())).doubleValue(); - roundTo = pickRounding(b, f, t); - } else { - roundTo = ((Number) buckets.fold(toEvaluator.foldCtx())).doubleValue(); - } + double roundTo = determineRounding(buckets, from, to, toEvaluator.foldCtx()); Literal rounding = new Literal(source(), roundTo, DataType.DOUBLE); // We could make this more efficient, either by generating the evaluators with byte code or hand rolling this one. @@ -290,22 +317,49 @@ public ExpressionEvaluator.Factory toEvaluator(ToEvaluator toEvaluator) { throw EsqlIllegalArgumentException.illegalDataType(field.dataType()); } + public BlockHash.EmptyBucketGenerator createEmptyBucketGenerator() { + assert emitEmptyBuckets() != null; + FoldContext foldContext = new FoldContext(128); + Boolean emit = (Boolean) emitEmptyBuckets.fold(foldContext); + if (Boolean.TRUE.equals(emit) == false) { + return null; + } else if (field.dataType() == DataType.DATETIME || field.dataType() == DataType.DATE_NANOS) { + return new DatetimeEmptyBucketGenerator(field, buckets, from, to, foldContext); + } else { + return new NumericEmptyBucketGenerator(buckets, from, to, foldContext); + } + } + /** * Returns the date rounding from this bucket function if the target field is a date type; otherwise, returns null. */ public Rounding.Prepared getDateRoundingOrNull(FoldContext foldCtx) { if (field.dataType() == DataType.DATETIME || field.dataType() == DataType.DATE_NANOS) { - return getDateRounding(foldCtx); + return getDateRounding(field, buckets, from, to, foldCtx); } else { return null; } } - private Rounding.Prepared getDateRounding(FoldContext foldContext) { - return getDateRounding(foldContext, null, null); + private static Rounding.Prepared getDateRounding( + Expression field, + Expression buckets, + Expression from, + Expression to, + FoldContext foldContext + ) { + return getDateRounding(field, buckets, from, to, foldContext, null, null); } - private Rounding.Prepared getDateRounding(FoldContext foldContext, Long min, Long max) { + private static Rounding.Prepared getDateRounding( + Expression field, + Expression buckets, + Expression from, + Expression to, + FoldContext foldContext, + Long min, + Long max + ) { assert field.dataType() == DataType.DATETIME || field.dataType() == DataType.DATE_NANOS : "expected date type; got " + field; if (buckets.dataType().isWholeNumber()) { int b = ((Number) buckets.fold(foldContext)).intValue(); @@ -352,7 +406,18 @@ boolean roundingIsOk(Rounding rounding) { } } - private double pickRounding(int buckets, double from, double to) { + static double determineRounding(Expression buckets, Expression from, Expression to, FoldContext foldContext) { + if (from != null) { + int b = ((Number) buckets.fold(foldContext)).intValue(); + double f = ((Number) from.fold(foldContext)).doubleValue(); + double t = ((Number) to.fold(foldContext)).doubleValue(); + return pickRounding(b, f, t); + } else { + return ((Number) buckets.fold(foldContext)).doubleValue(); + } + } + + private static double pickRounding(int buckets, double from, double to) { double precise = (to - from) / buckets; double nextPowerOfTen = Math.pow(10, Math.ceil(Math.log10(precise))); double halfPower = nextPowerOfTen / 2; @@ -410,9 +475,8 @@ protected TypeResolution resolveType() { private TypeResolution checkArgsCount(int expectedCount) { String expected = null; - if (expectedCount == 2 && (from != null || to != null)) { - expected = "two"; - } else if (expectedCount == 4 && (from == null || to == null)) { + + if (expectedCount == 4 && (from == null || to == null)) { expected = "four"; } else if ((from == null && to != null) || (from != null && to == null)) { expected = "two or four"; @@ -451,7 +515,7 @@ public void postOptimizationVerification(Failures failures) { .add(to != null ? isFoldable(to, operation, FOURTH) : null); } - private long foldToLong(FoldContext ctx, Expression e) { + private static long foldToLong(FoldContext ctx, Expression e) { Object value = Foldables.valueOf(ctx, e); return DataType.isDateTime(e.dataType()) ? ((Number) value).longValue() : dateTimeToLong(((BytesRef) value).utf8ToString()); } @@ -468,12 +532,13 @@ public DataType dataType() { public Expression replaceChildren(List newChildren) { Expression from = newChildren.size() > 2 ? newChildren.get(2) : null; Expression to = newChildren.size() > 3 ? newChildren.get(3) : null; - return new Bucket(source(), newChildren.get(0), newChildren.get(1), from, to); + Expression emitEmptyBuckets = newChildren.size() > 4 ? newChildren.get(4) : null; + return new Bucket(source(), newChildren.get(0), newChildren.get(1), from, to, emitEmptyBuckets); } @Override protected NodeInfo info() { - return NodeInfo.create(this, Bucket::new, field, buckets, from, to); + return NodeInfo.create(this, Bucket::new, field, buckets, from, to, emitEmptyBuckets); } public Expression field() { @@ -492,9 +557,24 @@ public Expression to() { return to; } + public Expression emitEmptyBuckets() { + return emitEmptyBuckets; + } + @Override public String toString() { - return "Bucket{" + "field=" + field + ", buckets=" + buckets + ", from=" + from + ", to=" + to + '}'; + return "Bucket{" + + "field=" + + field + + ", buckets=" + + buckets + + ", from=" + + from + + ", to=" + + to + + ", emitEmptyBuckets=" + + emitEmptyBuckets + + '}'; } @Override @@ -506,7 +586,61 @@ public Expression surrogate(SearchStats searchStats) { field(), buckets(), searchStats, - (interval, minValue, maxValue) -> getDateRounding(FoldContext.small(), minValue, maxValue) + (interval, minValue, maxValue) -> getDateRounding(field, buckets, from, to, FoldContext.small(), minValue, maxValue) ); } + + record DatetimeEmptyBucketGenerator(long from, long to, Rounding.Prepared rounding) implements BlockHash.EmptyBucketGenerator { + + DatetimeEmptyBucketGenerator(Expression field, Expression buckets, Expression from, Expression to, FoldContext foldContext) { + this(foldToLong(foldContext, from), foldToLong(foldContext, to), getDateRounding(field, buckets, from, to, foldContext)); + } + + @Override + public int getEmptyBucketCount() { + int i = 0; + for (long bucket = rounding.round(from); bucket < to; bucket = rounding.nextRoundingValue(bucket)) { + i++; + } + return i; + } + + @Override + public void generate(Block.Builder blockBuilder) { + for (long bucket = rounding.round(from); bucket < to; bucket = rounding.nextRoundingValue(bucket)) { + ((LongBlock.Builder) blockBuilder).appendLong(bucket); + } + } + } + + record NumericEmptyBucketGenerator(double from, double to, double roundTo) implements BlockHash.EmptyBucketGenerator { + + NumericEmptyBucketGenerator(Expression buckets, Expression from, Expression to, FoldContext foldContext) { + this( + ((Number) from.fold(foldContext)).doubleValue(), + ((Number) to.fold(foldContext)).doubleValue(), + determineRounding(buckets, from, to, foldContext) + ); + } + + @Override + public int getEmptyBucketCount() { + int i = 0; + for (double bucket = round(Math.floor(from / roundTo) * roundTo, 2); bucket < to; bucket = round(bucket + roundTo, 2)) { + i++; + } + return i; + } + + @Override + public void generate(Block.Builder blockBuilder) { + for (double bucket = round(Math.floor(from / roundTo) * roundTo, 2); bucket < to; bucket = round(bucket + roundTo, 2)) { + ((DoubleBlock.Builder) blockBuilder).appendDouble(bucket); + } + } + + private static double round(double value, int n) { + return new BigDecimal(value).setScale(n, RoundingMode.HALF_UP).doubleValue(); + } + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java index e45fe2b0e81d8..53e5829e3c281 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/AbstractPhysicalOperationProviders.java @@ -18,6 +18,7 @@ import org.elasticsearch.compute.operator.EvalOperator; import org.elasticsearch.compute.operator.HashAggregationOperator.HashAggregationOperatorFactory; import org.elasticsearch.compute.operator.Operator; +import org.elasticsearch.core.Nullable; import org.elasticsearch.index.analysis.AnalysisRegistry; import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException; import org.elasticsearch.xpack.esql.core.InvalidArgumentException; @@ -28,10 +29,13 @@ import org.elasticsearch.xpack.esql.core.expression.FoldContext; import org.elasticsearch.xpack.esql.core.expression.NameId; import org.elasticsearch.xpack.esql.core.expression.NamedExpression; +import org.elasticsearch.xpack.esql.core.util.Holder; import org.elasticsearch.xpack.esql.evaluator.EvalMapper; import org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateFunction; import org.elasticsearch.xpack.esql.expression.function.aggregate.Count; +import org.elasticsearch.xpack.esql.expression.function.grouping.Bucket; import org.elasticsearch.xpack.esql.expression.function.grouping.Categorize; +import org.elasticsearch.xpack.esql.expression.function.scalar.math.Round; import org.elasticsearch.xpack.esql.plan.physical.AggregateExec; import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesAggregateExec; import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner.LocalExecutionPlannerContext; @@ -56,6 +60,23 @@ public abstract class AbstractPhysicalOperationProviders implements PhysicalOper this.analysisRegistry = analysisRegistry; } + private static Bucket findBucket(AggregateExec aggregateExec, NameId bucketId) { + Holder foundBucket = new Holder<>(); + aggregateExec.forEachExpressionDown(NamedExpression.class, ne -> { + if (ne.id().equals(bucketId)) { + if (ne.children().size() > 0 && ne.children().get(0) instanceof Bucket bucket) { + foundBucket.set(bucket); + // TODO: Hack used when BUCKET is wrapped with ROUND. How to generalize it? + } else if (ne.children().size() > 0 + && ne.children().get(0) instanceof Round round + && round.field() instanceof Bucket bucket) { + foundBucket.set(bucket); + } + } + }); + return foundBucket.get(); + } + @Override public final PhysicalOperation groupingPhysicalOperation( AggregateExec aggregateExec, @@ -100,6 +121,7 @@ public final PhysicalOperation groupingPhysicalOperation( List groupSpecs = new ArrayList<>(aggregateExec.groupings().size()); for (Expression group : aggregateExec.groupings()) { Attribute groupAttribute = Expressions.attribute(group); + Bucket bucket = findBucket(aggregateExec, groupAttribute.id()); // In case of `... BY groupAttribute = CATEGORIZE(sourceGroupAttribute)` the actual source attribute is different. Attribute sourceGroupAttribute = (aggregatorMode.isInputPartial() == false && group instanceof Alias as @@ -143,7 +165,7 @@ else if (aggregatorMode.isOutputPartial()) { } layout.append(groupAttributeLayout); Layout.ChannelAndType groupInput = source.layout.get(sourceGroupAttribute.id()); - groupSpecs.add(new GroupSpec(groupInput == null ? null : groupInput.channel(), sourceGroupAttribute, group)); + groupSpecs.add(new GroupSpec(groupInput == null ? null : groupInput.channel(), sourceGroupAttribute, group, bucket)); } if (aggregatorMode == AggregatorMode.FINAL) { @@ -338,17 +360,20 @@ private static AggregatorFunctionSupplier supplier(AggregateFunction aggregateFu * @param attribute The attribute, source of this group * @param expression The expression being used to group */ - private record GroupSpec(Integer channel, Attribute attribute, Expression expression) { + private record GroupSpec(Integer channel, Attribute attribute, Expression expression, @Nullable Bucket bucket) { BlockHash.GroupSpec toHashGroupSpec() { if (channel == null) { throw new EsqlIllegalArgumentException("planned to use ordinals but tried to use the hash instead"); } - return new BlockHash.GroupSpec( - channel, - elementType(), - Alias.unwrap(expression) instanceof Categorize categorize ? categorize.categorizeDef() : null, - null - ); + + Expression unwrappedExpression = Alias.unwrap(expression); + if (unwrappedExpression instanceof Categorize categorize) { + return new BlockHash.GroupSpec(channel, elementType(), categorize.categorizeDef()); + } else if (bucket != null && bucket.emitEmptyBuckets() != null) { + return new BlockHash.GroupSpec(channel, elementType(), null, null, bucket.createEmptyBucketGenerator()); + } else { + return new BlockHash.GroupSpec(channel, elementType()); + } } ElementType elementType() { diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/RailRoadDiagram.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/RailRoadDiagram.java index dba6facde2b25..86adee31559d9 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/RailRoadDiagram.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/RailRoadDiagram.java @@ -69,10 +69,20 @@ static String functionSignature(FunctionDefinition definition) throws IOExceptio } else { if (arg.optional) { if (definition.name().equals("bucket")) { - // BUCKET requires optional args to be optional together, so we need custom code to do that - var nextArg = args.get(++i); - assert nextArg.optional(); - Sequence seq = new Sequence(new Literal(argName), new Syntax(","), new Literal(nextArg.name)); + // BUCKET requires optional args to be optional together, so we need custom code to do that. + // After "from", we expect 2 more optional args: "to" and "emitEmptyBuckets". + var toArg = args.get(++i); + assert toArg.optional(); + var emitEmptyBucketsArg = args.get(++i); + assert emitEmptyBucketsArg.optional(); + // TODO: Should it be possible to be able to specify "emitEmptyBuckets" but not "from" and "to"? + Sequence seq = new Sequence( + new Literal(argName), + new Syntax(","), + new Literal(toArg.name), + new Syntax(","), + new Literal(emitEmptyBucketsArg.name) + ); argExpressions.add(new Repetition(seq, 0, 1)); } else if (i < args.size() - 1 && args.get(i + 1).optional() == false) { // Special case with leading optional args diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/grouping/BucketSerializationTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/grouping/BucketSerializationTests.java index 5fe270a4cce42..c30c842bad3b0 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/grouping/BucketSerializationTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/grouping/BucketSerializationTests.java @@ -25,7 +25,8 @@ public static Bucket createRandomBucket() { Expression buckets = randomChild(); Expression from = randomChild(); Expression to = randomChild(); - return new Bucket(source, field, buckets, from, to); + Expression emitEmptyBuckets = randomChild(); + return new Bucket(source, field, buckets, from, to, emitEmptyBuckets); } @Override @@ -35,12 +36,14 @@ protected Bucket mutateInstance(Bucket instance) throws IOException { Expression buckets = instance.buckets(); Expression from = instance.from(); Expression to = instance.to(); - switch (between(0, 3)) { + Expression emitEmptyBuckets = instance.emitEmptyBuckets(); + switch (between(0, 4)) { case 0 -> field = randomValueOtherThan(field, AbstractExpressionSerializationTests::randomChild); case 1 -> buckets = randomValueOtherThan(buckets, AbstractExpressionSerializationTests::randomChild); case 2 -> from = randomValueOtherThan(from, AbstractExpressionSerializationTests::randomChild); case 3 -> to = randomValueOtherThan(to, AbstractExpressionSerializationTests::randomChild); + case 4 -> emitEmptyBuckets = randomValueOtherThan(emitEmptyBuckets, AbstractExpressionSerializationTests::randomChild); } - return new Bucket(source, field, buckets, from, to); + return new Bucket(source, field, buckets, from, to, emitEmptyBuckets); } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/grouping/BucketTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/grouping/BucketTests.java index f01b06c23e8a8..cf4f7e0f8f015 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/grouping/BucketTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/grouping/BucketTests.java @@ -319,10 +319,14 @@ private static Matcher resultsMatcher(List t protected Expression build(Source source, List args) { Expression from = null; Expression to = null; + Expression emitEmptyBuckets = null; if (args.size() > 2) { from = args.get(2); to = args.get(3); } - return new Bucket(source, args.get(0), args.get(1), from, to); + if (args.size() > 4) { + emitEmptyBuckets = args.get(4); + } + return new Bucket(source, args.get(0), args.get(1), from, to, emitEmptyBuckets); } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/FoldNullTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/FoldNullTests.java index ae30dce97ce5a..2db996651d9f4 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/FoldNullTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/rules/logical/FoldNullTests.java @@ -265,7 +265,7 @@ public void testNullFoldableDoesNotApplyToIsNullAndNotNull() { } public void testNullBucketGetsFolded() { - assertEquals(NULL, foldNull(new Bucket(EMPTY, NULL, NULL, NULL, NULL))); + assertEquals(NULL, foldNull(new Bucket(EMPTY, NULL, NULL, NULL, NULL, NULL))); } public void testNullCategorizeGroupingNotFolded() { diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/TestPhysicalOperationProviders.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/TestPhysicalOperationProviders.java index ff15f3cc1e4ba..1fc05b9d3f2db 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/TestPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/TestPhysicalOperationProviders.java @@ -366,12 +366,13 @@ private class TestHashAggregationOperator extends HashAggregationOperator { private final Attribute attribute; TestHashAggregationOperator( + List groups, List aggregators, Supplier blockHash, Attribute attribute, DriverContext driverContext ) { - super(aggregators, blockHash, driverContext); + super(groups, aggregators, blockHash, driverContext); this.attribute = attribute; }