From b45c672ae3f3edd07baa5609e4f57431170337ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Cea=20Fontenla?= Date: Mon, 7 Jul 2025 15:16:10 +0200 Subject: [PATCH 1/3] ESQL: Added status to OrdinalsGroupingOperator --- .../org/elasticsearch/TransportVersions.java | 1 + .../operator/HashAggregationOperator.java | 2 +- .../operator/OrdinalsGroupingOperator.java | 174 +++++++++++++++++- .../OrdinalsGroupingOperatorStatusTests.java | 67 +++++++ 4 files changed, 238 insertions(+), 6 deletions(-) create mode 100644 x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperatorStatusTests.java diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 486804a1f57ee..3c30391e2b8f4 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -329,6 +329,7 @@ static TransportVersion def(int id) { public static final TransportVersion PROJECT_STATE_REGISTRY_RECORDS_DELETIONS = def(9_113_0_00); public static final TransportVersion ESQL_SERIALIZE_TIMESERIES_FIELD_TYPE = def(9_114_0_00); public static final TransportVersion ML_INFERENCE_IBM_WATSONX_COMPLETION_ADDED = def(9_115_0_00); + public static final TransportVersion ESQL_ORDINALS_OPERATOR_STATUS = def(9_116_0_00); /* * STOP! READ THIS FIRST! No, really, * ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _ diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashAggregationOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashAggregationOperator.java index 9c4b9dd360062..d0318529ae636 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashAggregationOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashAggregationOperator.java @@ -113,7 +113,7 @@ public String describe() { /** * Total nanos for emitting the output */ - protected long emitNanos; + private long emitNanos; @SuppressWarnings("this-escape") public HashAggregationOperator( diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java index 9c15b0f3fc7d5..52e8ea0829e68 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java @@ -13,7 +13,13 @@ import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefBuilder; import org.apache.lucene.util.PriorityQueue; +import org.elasticsearch.TransportVersion; +import org.elasticsearch.TransportVersions; import org.elasticsearch.common.CheckedSupplier; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BitArray; import org.elasticsearch.compute.Describable; @@ -36,7 +42,9 @@ import org.elasticsearch.core.RefCounted; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.mapper.BlockLoader; +import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; import java.io.UncheckedIOException; @@ -100,6 +108,23 @@ public String describe() { private boolean finished = false; + /** + * Nanoseconds this operator has spent processing the rows. + */ + private long processNanos; + /** + * Count of pages this operator has processed. + */ + private int pagesProcessed; + /** + * Count of rows this operator has received. + */ + private long rowsReceived; + /** + * Count of rows this operator has emitted. + */ + private long rowsEmitted; + // used to extract and aggregate values private final int maxPageSize; private ValuesAggregator valuesAggregator; @@ -135,6 +160,7 @@ public boolean needsInput() { public void addInput(Page page) { checkState(needsInput(), "Operator is already finishing"); requireNonNull(page, "page is null"); + long start = System.nanoTime(); DocVector docVector = page.getBlock(docChannel).asVector(); final int shardIndex = docVector.shards().getInt(0); RefCounted shardRefCounter = docVector.shardRefCounted().get(shardIndex); @@ -184,6 +210,9 @@ public void addInput(Page page) { if (pagePassed == false) { Releasables.closeExpectNoException(page::releaseBlocks); } + pagesProcessed++; + rowsReceived += page.getPositionCount(); + processNanos += System.nanoTime() - start; } } @@ -208,25 +237,28 @@ public Page getOutput() { if (finished == false) { return null; } + Page page = null; if (valuesAggregator != null) { try { - return valuesAggregator.getOutput(); + page = valuesAggregator.getOutput(); } finally { final ValuesAggregator aggregator = this.valuesAggregator; this.valuesAggregator = null; Releasables.close(aggregator); } - } - if (ordinalAggregators.isEmpty() == false) { + } else if (ordinalAggregators.isEmpty() == false) { try { - return mergeOrdinalsSegmentResults(); + page = mergeOrdinalsSegmentResults(); } catch (IOException e) { throw new UncheckedIOException(e); } finally { Releasables.close(() -> Releasables.close(ordinalAggregators.values()), ordinalAggregators::clear); } } - return null; + if (page != null) { + rowsEmitted += page.getPositionCount(); + } + return page; } @Override @@ -322,6 +354,11 @@ public void close() { Releasables.close(() -> Releasables.close(ordinalAggregators.values()), valuesAggregator); } + @Override + public Operator.Status status() { + return new Status(processNanos, pagesProcessed, rowsReceived, rowsEmitted); + } + private static void checkState(boolean condition, String msg) { if (condition == false) { throw new IllegalArgumentException(msg); @@ -337,6 +374,133 @@ public String toString() { return this.getClass().getSimpleName() + "[" + "aggregators=[" + aggregatorDescriptions + "]]"; } + public static class Status implements Operator.Status { + public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( + Operator.Status.class, + "ordinals_grouping", + Status::new + ); + + /** + * Nanoseconds this operator has spent processing the rows. + */ + private final long processNanos; + /** + * Count of pages this operator has processed. + */ + private final int pagesProcessed; + /** + * Count of rows this operator has received. + */ + private final long rowsReceived; + /** + * Count of rows this operator has emitted. + */ + private final long rowsEmitted; + + /** + * Build. + * @param processNanos Nanoseconds this operator has spent processing the rows. + * @param pagesProcessed Count of pages this operator has processed. + * @param rowsReceived Count of rows this operator has received. + * @param rowsEmitted Count of rows this operator has emitted. + */ + public Status(long processNanos, int pagesProcessed, long rowsReceived, long rowsEmitted) { + this.processNanos = processNanos; + this.pagesProcessed = pagesProcessed; + this.rowsReceived = rowsReceived; + this.rowsEmitted = rowsEmitted; + } + + protected Status(StreamInput in) throws IOException { + processNanos = in.readVLong(); + pagesProcessed = in.readVInt(); + rowsReceived = in.readVLong(); + rowsEmitted = in.readVLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(processNanos); + out.writeVInt(pagesProcessed); + out.writeVLong(rowsReceived); + out.writeVLong(rowsEmitted); + } + + @Override + public String getWriteableName() { + return ENTRY.name; + } + + /** + * Nanoseconds this operator has spent processing the rows. + */ + public long processNanos() { + return processNanos; + } + + /** + * Count of pages this operator has processed. + */ + public int pagesProcessed() { + return pagesProcessed; + } + + /** + * Count of rows this operator has received. + */ + public long rowsReceived() { + return rowsReceived; + } + + /** + * Count of rows this operator has emitted. + */ + public long rowsEmitted() { + return rowsEmitted; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field("process_nanos", processNanos); + if (builder.humanReadable()) { + builder.field("process_time", TimeValue.timeValueNanos(processNanos)); + } + builder.field("pages_processed", pagesProcessed); + builder.field("rows_received", rowsReceived); + builder.field("rows_emitted", rowsEmitted); + return builder.endObject(); + + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Status status = (Status) o; + return processNanos == status.processNanos + && pagesProcessed == status.pagesProcessed + && rowsReceived == status.rowsReceived + && rowsEmitted == status.rowsEmitted; + } + + @Override + public int hashCode() { + return Objects.hash(processNanos, pagesProcessed, rowsReceived, rowsEmitted); + } + + @Override + public String toString() { + return Strings.toString(this); + } + + @Override + public TransportVersion getMinimalSupportedVersion() { + return TransportVersions.ESQL_ORDINALS_OPERATOR_STATUS; + } + } + record SegmentID(int shardIndex, int segmentIndex) { } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperatorStatusTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperatorStatusTests.java new file mode 100644 index 0000000000000..1ed65f8c6c4c1 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperatorStatusTests.java @@ -0,0 +1,67 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.operator; + +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.test.ESTestCase; + +import static org.hamcrest.Matchers.equalTo; + +public class OrdinalsGroupingOperatorStatusTests extends AbstractWireSerializingTestCase { + public static OrdinalsGroupingOperator.Status simple() { + return new OrdinalsGroupingOperator.Status(200012, 123, 111, 222); + } + + public static String simpleToJson() { + return """ + { + "process_nanos" : 200012, + "process_time" : "200micros", + "pages_processed" : 123, + "rows_received" : 111, + "rows_emitted" : 222 + }"""; + } + + public void testToXContent() { + assertThat(Strings.toString(simple(), true, true), equalTo(simpleToJson())); + } + + @Override + protected Writeable.Reader instanceReader() { + return OrdinalsGroupingOperator.Status::new; + } + + @Override + public OrdinalsGroupingOperator.Status createTestInstance() { + return new OrdinalsGroupingOperator.Status( + randomNonNegativeLong(), + randomNonNegativeInt(), + randomNonNegativeLong(), + randomNonNegativeLong() + ); + } + + @Override + protected OrdinalsGroupingOperator.Status mutateInstance(OrdinalsGroupingOperator.Status instance) { + long processNanos = instance.processNanos(); + int pagesProcessed = instance.pagesProcessed(); + long rowsReceived = instance.rowsReceived(); + long rowsEmitted = instance.rowsEmitted(); + switch (between(0, 3)) { + case 0 -> processNanos = randomValueOtherThan(processNanos, ESTestCase::randomNonNegativeLong); + case 1 -> pagesProcessed = randomValueOtherThan(pagesProcessed, ESTestCase::randomNonNegativeInt); + case 2 -> rowsReceived = randomValueOtherThan(rowsReceived, ESTestCase::randomNonNegativeLong); + case 3 -> rowsEmitted = randomValueOtherThan(rowsEmitted, ESTestCase::randomNonNegativeLong); + default -> throw new UnsupportedOperationException(); + } + return new OrdinalsGroupingOperator.Status(processNanos, pagesProcessed, rowsReceived, rowsEmitted); + } +} From 76fbdc74514083ae5ca4bc1f9e61812563baca64 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Cea=20Fontenla?= Date: Tue, 8 Jul 2025 15:27:24 +0200 Subject: [PATCH 2/3] Added emit times and specific times per branch --- .../operator/OrdinalsGroupingOperator.java | 163 +++++++++++++----- .../OrdinalsGroupingOperatorStatusTests.java | 53 ++++-- 2 files changed, 166 insertions(+), 50 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java index 66ba58b359a0c..15ff819090f3c 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java @@ -111,7 +111,9 @@ public String describe() { /** * Nanoseconds this operator has spent processing the rows. */ - private long processNanos; + private long totalProcessNanos; + private long ordinalsProcessNanos; + private long valuesProcessNanos; /** * Count of pages this operator has processed. */ @@ -125,6 +127,13 @@ public String describe() { */ private long rowsEmitted; + /** + * Nanoseconds this operator has spent emitting the results. + */ + private long totalEmitNanos; + private long ordinalsEmitNanos; + private long valuesEmitNanos; + // used to extract and aggregate values private final int maxPageSize; private ValuesAggregator valuesAggregator; @@ -160,7 +169,7 @@ public boolean needsInput() { public void addInput(Page page) { checkState(needsInput(), "Operator is already finishing"); requireNonNull(page, "page is null"); - long start = System.nanoTime(); + long startNanos = System.nanoTime(); DocVector docVector = page.getBlock(docChannel).asVector(); final int shardIndex = docVector.shards().getInt(0); RefCounted shardRefCounter = docVector.shardRefCounted().get(shardIndex); @@ -168,6 +177,7 @@ public void addInput(Page page) { boolean pagePassed = false; try { if (docVector.singleSegmentNonDecreasing() && blockLoader.supportsOrdinals()) { + long ordinalsStartNanos = System.nanoTime(); final IntVector segmentIndexVector = docVector.segments(); assert segmentIndexVector.isConstant(); final OrdinalSegmentAggregator ordinalAggregator = this.ordinalAggregators.computeIfAbsent( @@ -188,7 +198,9 @@ public void addInput(Page page) { ); pagePassed = true; ordinalAggregator.addInput(docVector.docs(), page); + ordinalsProcessNanos += System.nanoTime() - ordinalsStartNanos; } else { + long valuesStartNanos = System.nanoTime(); if (valuesAggregator == null) { int channelIndex = page.getBlockCount(); // extractor will append a new block at the end valuesAggregator = new ValuesAggregator( @@ -205,6 +217,7 @@ public void addInput(Page page) { } pagePassed = true; valuesAggregator.addInput(page); + valuesProcessNanos += System.nanoTime() - valuesStartNanos; } } finally { if (pagePassed == false) { @@ -212,7 +225,7 @@ public void addInput(Page page) { } pagesProcessed++; rowsReceived += page.getPositionCount(); - processNanos += System.nanoTime() - start; + totalProcessNanos += System.nanoTime() - startNanos; } } @@ -237,6 +250,7 @@ public Page getOutput() { if (finished == false) { return null; } + long startNanos = System.nanoTime(); Page page = null; if (valuesAggregator != null) { try { @@ -245,6 +259,7 @@ public Page getOutput() { final ValuesAggregator aggregator = this.valuesAggregator; this.valuesAggregator = null; Releasables.close(aggregator); + valuesEmitNanos += System.nanoTime() - startNanos; } } else if (ordinalAggregators.isEmpty() == false) { try { @@ -253,11 +268,13 @@ public Page getOutput() { throw new UncheckedIOException(e); } finally { Releasables.close(() -> Releasables.close(ordinalAggregators.values()), ordinalAggregators::clear); + ordinalsEmitNanos += System.nanoTime() - startNanos; } } if (page != null) { rowsEmitted += page.getPositionCount(); } + totalEmitNanos += System.nanoTime() - startNanos; return page; } @@ -265,7 +282,9 @@ public Page getOutput() { public void finish() { finished = true; if (valuesAggregator != null) { + long startNanos = System.nanoTime(); valuesAggregator.finish(); + valuesEmitNanos += System.nanoTime() - startNanos; } } @@ -356,7 +375,10 @@ public void close() { @Override public Operator.Status status() { - return new Status(processNanos, pagesProcessed, rowsReceived, rowsEmitted); + return new Status( + totalProcessNanos, ordinalsProcessNanos, valuesProcessNanos, + totalEmitNanos, ordinalsEmitNanos, valuesEmitNanos, + pagesProcessed, rowsReceived, rowsEmitted); } private static void checkState(boolean condition, String msg) { @@ -381,39 +403,50 @@ public static class Status implements Operator.Status { Status::new ); - /** - * Nanoseconds this operator has spent processing the rows. - */ - private final long processNanos; - /** - * Count of pages this operator has processed. - */ + private final long totalProcessNanos; + private final long ordinalsProcessNanos; + private final long valuesProcessNanos; + private final long totalEmitNanos; + private final long ordinalsEmitNanos; + private final long valuesEmitNanos; private final int pagesProcessed; - /** - * Count of rows this operator has received. - */ private final long rowsReceived; - /** - * Count of rows this operator has emitted. - */ private final long rowsEmitted; /** * Build. - * @param processNanos Nanoseconds this operator has spent processing the rows. + * @param totalProcessNanos Nanoseconds this operator has spent processing the rows. * @param pagesProcessed Count of pages this operator has processed. * @param rowsReceived Count of rows this operator has received. * @param rowsEmitted Count of rows this operator has emitted. */ - public Status(long processNanos, int pagesProcessed, long rowsReceived, long rowsEmitted) { - this.processNanos = processNanos; + public Status( + long totalProcessNanos, long ordinalsProcessNanos, long valuesProcessNanos, + long totalEmitNanos, long ordinalsEmitNanos, long valuesEmitNanos, + int pagesProcessed, long rowsReceived, long rowsEmitted + ) { + this.totalProcessNanos = totalProcessNanos; + this.ordinalsProcessNanos = ordinalsProcessNanos; + this.valuesProcessNanos = valuesProcessNanos; + + this.totalEmitNanos = totalEmitNanos; + this.ordinalsEmitNanos = ordinalsEmitNanos; + this.valuesEmitNanos = valuesEmitNanos; + this.pagesProcessed = pagesProcessed; this.rowsReceived = rowsReceived; this.rowsEmitted = rowsEmitted; } protected Status(StreamInput in) throws IOException { - processNanos = in.readVLong(); + totalProcessNanos = in.readVLong(); + ordinalsProcessNanos = in.readVLong(); + valuesProcessNanos = in.readVLong(); + + totalEmitNanos = in.readVLong(); + ordinalsEmitNanos = in.readVLong(); + valuesEmitNanos = in.readVLong(); + pagesProcessed = in.readVInt(); rowsReceived = in.readVLong(); rowsEmitted = in.readVLong(); @@ -421,7 +454,14 @@ protected Status(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { - out.writeVLong(processNanos); + out.writeVLong(totalProcessNanos); + out.writeVLong(ordinalsProcessNanos); + out.writeVLong(valuesProcessNanos); + + out.writeVLong(totalEmitNanos); + out.writeVLong(ordinalsEmitNanos); + out.writeVLong(valuesEmitNanos); + out.writeVInt(pagesProcessed); out.writeVLong(rowsReceived); out.writeVLong(rowsEmitted); @@ -432,30 +472,38 @@ public String getWriteableName() { return ENTRY.name; } - /** - * Nanoseconds this operator has spent processing the rows. - */ - public long processNanos() { - return processNanos; + public long totalProcessNanos() { + return totalProcessNanos; + } + + public long ordinalsProcessNanos() { + return ordinalsProcessNanos; + } + + public long valuesProcessNanos() { + return valuesProcessNanos; + } + + public long totalEmitNanos() { + return totalEmitNanos; + } + + public long ordinalsEmitNanos() { + return ordinalsEmitNanos; + } + + public long valuesEmitNanos() { + return valuesEmitNanos; } - /** - * Count of pages this operator has processed. - */ public int pagesProcessed() { return pagesProcessed; } - /** - * Count of rows this operator has received. - */ public long rowsReceived() { return rowsReceived; } - /** - * Count of rows this operator has emitted. - */ public long rowsEmitted() { return rowsEmitted; } @@ -463,10 +511,36 @@ public long rowsEmitted() { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - builder.field("process_nanos", processNanos); + + // Process nanos + builder.field("total_process_nanos", totalProcessNanos); if (builder.humanReadable()) { - builder.field("process_time", TimeValue.timeValueNanos(processNanos)); + builder.field("total_process_time", TimeValue.timeValueNanos(totalProcessNanos)); } + builder.field("ordinals_process_nanos", ordinalsProcessNanos); + if (builder.humanReadable()) { + builder.field("ordinals_process_time", TimeValue.timeValueNanos(ordinalsProcessNanos)); + } + builder.field("values_process_nanos", valuesProcessNanos); + if (builder.humanReadable()) { + builder.field("values_process_time", TimeValue.timeValueNanos(valuesProcessNanos)); + } + + // Emit nanos + builder.field("total_emit_nanos", totalEmitNanos); + if (builder.humanReadable()) { + builder.field("total_emit_time", TimeValue.timeValueNanos(totalEmitNanos)); + } + builder.field("ordinals_emit_nanos", ordinalsEmitNanos); + if (builder.humanReadable()) { + builder.field("ordinals_emit_time", TimeValue.timeValueNanos(ordinalsEmitNanos)); + } + builder.field("values_emit_nanos", valuesEmitNanos); + if (builder.humanReadable()) { + builder.field("values_emit_time", TimeValue.timeValueNanos(valuesEmitNanos)); + } + + // Page/row counts builder.field("pages_processed", pagesProcessed); builder.field("rows_received", rowsReceived); builder.field("rows_emitted", rowsEmitted); @@ -479,7 +553,12 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Status status = (Status) o; - return processNanos == status.processNanos + return totalProcessNanos == status.totalProcessNanos + && ordinalsProcessNanos == status.ordinalsProcessNanos + && valuesProcessNanos == status.valuesProcessNanos + && totalEmitNanos == status.totalEmitNanos + && ordinalsEmitNanos == status.ordinalsEmitNanos + && valuesEmitNanos == status.valuesEmitNanos && pagesProcessed == status.pagesProcessed && rowsReceived == status.rowsReceived && rowsEmitted == status.rowsEmitted; @@ -487,7 +566,11 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hash(processNanos, pagesProcessed, rowsReceived, rowsEmitted); + return Objects.hash( + totalProcessNanos, ordinalsProcessNanos, valuesProcessNanos, + totalEmitNanos, ordinalsEmitNanos, valuesEmitNanos, + pagesProcessed, rowsReceived, rowsEmitted + ); } @Override diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperatorStatusTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperatorStatusTests.java index 1ed65f8c6c4c1..6b80631136dae 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperatorStatusTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperatorStatusTests.java @@ -16,14 +16,28 @@ public class OrdinalsGroupingOperatorStatusTests extends AbstractWireSerializingTestCase { public static OrdinalsGroupingOperator.Status simple() { - return new OrdinalsGroupingOperator.Status(200012, 123, 111, 222); + return new OrdinalsGroupingOperator.Status( + 200012, 100010, 100011, + 600012, 300010, 300011, + 123, 111, 222 + ); } public static String simpleToJson() { return """ { - "process_nanos" : 200012, - "process_time" : "200micros", + "total_process_nanos" : 200012, + "total_process_time" : "200micros", + "ordinals_process_nanos" : 100010, + "ordinals_process_time" : "100micros", + "values_process_nanos" : 100011, + "values_process_time" : "100micros", + "total_emit_nanos" : 600012, + "total_emit_time" : "600micros", + "ordinals_emit_nanos" : 300010, + "ordinals_emit_time" : "300micros", + "values_emit_nanos" : 300011, + "values_emit_time" : "300micros", "pages_processed" : 123, "rows_received" : 111, "rows_emitted" : 222 @@ -42,6 +56,11 @@ protected Writeable.Reader instanceReader() { @Override public OrdinalsGroupingOperator.Status createTestInstance() { return new OrdinalsGroupingOperator.Status( + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeInt(), randomNonNegativeLong(), @@ -51,17 +70,31 @@ public OrdinalsGroupingOperator.Status createTestInstance() { @Override protected OrdinalsGroupingOperator.Status mutateInstance(OrdinalsGroupingOperator.Status instance) { - long processNanos = instance.processNanos(); + long totalProcessNanos = instance.totalProcessNanos(); + long ordinalsProcessNanos = instance.ordinalsProcessNanos(); + long valuesProcessNanos = instance.valuesProcessNanos(); + long totalEmitNanos = instance.totalEmitNanos(); + long ordinalsEmitNanos = instance.ordinalsEmitNanos(); + long valuesEmitNanos = instance.valuesEmitNanos(); int pagesProcessed = instance.pagesProcessed(); long rowsReceived = instance.rowsReceived(); long rowsEmitted = instance.rowsEmitted(); - switch (between(0, 3)) { - case 0 -> processNanos = randomValueOtherThan(processNanos, ESTestCase::randomNonNegativeLong); - case 1 -> pagesProcessed = randomValueOtherThan(pagesProcessed, ESTestCase::randomNonNegativeInt); - case 2 -> rowsReceived = randomValueOtherThan(rowsReceived, ESTestCase::randomNonNegativeLong); - case 3 -> rowsEmitted = randomValueOtherThan(rowsEmitted, ESTestCase::randomNonNegativeLong); + switch (between(0, 8)) { + case 0 -> totalProcessNanos = randomValueOtherThan(totalProcessNanos, ESTestCase::randomNonNegativeLong); + case 1 -> ordinalsProcessNanos = randomValueOtherThan(ordinalsProcessNanos, ESTestCase::randomNonNegativeLong); + case 2 -> valuesProcessNanos = randomValueOtherThan(valuesProcessNanos, ESTestCase::randomNonNegativeLong); + case 3 -> totalEmitNanos = randomValueOtherThan(totalEmitNanos, ESTestCase::randomNonNegativeLong); + case 4 -> ordinalsEmitNanos = randomValueOtherThan(ordinalsEmitNanos, ESTestCase::randomNonNegativeLong); + case 5 -> valuesEmitNanos = randomValueOtherThan(valuesEmitNanos, ESTestCase::randomNonNegativeLong); + case 6 -> pagesProcessed = randomValueOtherThan(pagesProcessed, ESTestCase::randomNonNegativeInt); + case 7 -> rowsReceived = randomValueOtherThan(rowsReceived, ESTestCase::randomNonNegativeLong); + case 8 -> rowsEmitted = randomValueOtherThan(rowsEmitted, ESTestCase::randomNonNegativeLong); default -> throw new UnsupportedOperationException(); } - return new OrdinalsGroupingOperator.Status(processNanos, pagesProcessed, rowsReceived, rowsEmitted); + return new OrdinalsGroupingOperator.Status( + totalProcessNanos, ordinalsProcessNanos, valuesProcessNanos, + totalEmitNanos, ordinalsEmitNanos, valuesEmitNanos, + pagesProcessed, rowsReceived, rowsEmitted + ); } } From 2bb1a40b38232bc87f6c435f317e88fed9a8d8f1 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Tue, 8 Jul 2025 13:36:26 +0000 Subject: [PATCH 3/3] [CI] Auto commit changes from spotless --- .../operator/OrdinalsGroupingOperator.java | 37 ++++++++++++++----- .../OrdinalsGroupingOperatorStatusTests.java | 18 +++++---- 2 files changed, 38 insertions(+), 17 deletions(-) diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java index 15ff819090f3c..879d6f2b8bada 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java @@ -376,9 +376,16 @@ public void close() { @Override public Operator.Status status() { return new Status( - totalProcessNanos, ordinalsProcessNanos, valuesProcessNanos, - totalEmitNanos, ordinalsEmitNanos, valuesEmitNanos, - pagesProcessed, rowsReceived, rowsEmitted); + totalProcessNanos, + ordinalsProcessNanos, + valuesProcessNanos, + totalEmitNanos, + ordinalsEmitNanos, + valuesEmitNanos, + pagesProcessed, + rowsReceived, + rowsEmitted + ); } private static void checkState(boolean condition, String msg) { @@ -421,9 +428,15 @@ public static class Status implements Operator.Status { * @param rowsEmitted Count of rows this operator has emitted. */ public Status( - long totalProcessNanos, long ordinalsProcessNanos, long valuesProcessNanos, - long totalEmitNanos, long ordinalsEmitNanos, long valuesEmitNanos, - int pagesProcessed, long rowsReceived, long rowsEmitted + long totalProcessNanos, + long ordinalsProcessNanos, + long valuesProcessNanos, + long totalEmitNanos, + long ordinalsEmitNanos, + long valuesEmitNanos, + int pagesProcessed, + long rowsReceived, + long rowsEmitted ) { this.totalProcessNanos = totalProcessNanos; this.ordinalsProcessNanos = ordinalsProcessNanos; @@ -567,9 +580,15 @@ public boolean equals(Object o) { @Override public int hashCode() { return Objects.hash( - totalProcessNanos, ordinalsProcessNanos, valuesProcessNanos, - totalEmitNanos, ordinalsEmitNanos, valuesEmitNanos, - pagesProcessed, rowsReceived, rowsEmitted + totalProcessNanos, + ordinalsProcessNanos, + valuesProcessNanos, + totalEmitNanos, + ordinalsEmitNanos, + valuesEmitNanos, + pagesProcessed, + rowsReceived, + rowsEmitted ); } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperatorStatusTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperatorStatusTests.java index 6b80631136dae..2e339923d0151 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperatorStatusTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperatorStatusTests.java @@ -16,11 +16,7 @@ public class OrdinalsGroupingOperatorStatusTests extends AbstractWireSerializingTestCase { public static OrdinalsGroupingOperator.Status simple() { - return new OrdinalsGroupingOperator.Status( - 200012, 100010, 100011, - 600012, 300010, 300011, - 123, 111, 222 - ); + return new OrdinalsGroupingOperator.Status(200012, 100010, 100011, 600012, 300010, 300011, 123, 111, 222); } public static String simpleToJson() { @@ -92,9 +88,15 @@ protected OrdinalsGroupingOperator.Status mutateInstance(OrdinalsGroupingOperato default -> throw new UnsupportedOperationException(); } return new OrdinalsGroupingOperator.Status( - totalProcessNanos, ordinalsProcessNanos, valuesProcessNanos, - totalEmitNanos, ordinalsEmitNanos, valuesEmitNanos, - pagesProcessed, rowsReceived, rowsEmitted + totalProcessNanos, + ordinalsProcessNanos, + valuesProcessNanos, + totalEmitNanos, + ordinalsEmitNanos, + valuesEmitNanos, + pagesProcessed, + rowsReceived, + rowsEmitted ); } }