diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 70727715a91c0..3f807c86701a8 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -211,6 +211,7 @@ static TransportVersion def(int id) { public static final TransportVersion ESQL_DOCUMENTS_FOUND_AND_VALUES_LOADED_8_19 = def(8_841_0_61); public static final TransportVersion ESQL_PROFILE_INCLUDE_PLAN_8_19 = def(8_841_0_62); public static final TransportVersion ESQL_SPLIT_ON_BIG_VALUES_8_19 = def(8_841_0_63); + public static final TransportVersion ESQL_ORDINALS_OPERATOR_STATUS_8_19 = def(8_841_0_64); public static final TransportVersion V_9_0_0 = def(9_000_0_09); public static final TransportVersion INITIAL_ELASTICSEARCH_9_0_1 = def(9_000_0_10); public static final TransportVersion INITIAL_ELASTICSEARCH_9_0_2 = def(9_000_0_11); @@ -328,6 +329,7 @@ static TransportVersion def(int id) { public static final TransportVersion ESQL_PROFILE_INCLUDE_PLAN = def(9_111_0_00); public static final TransportVersion MAPPINGS_IN_DATA_STREAMS = def(9_112_0_00); public static final TransportVersion ESQL_SPLIT_ON_BIG_VALUES_9_1 = def(9_112_0_01); + public static final TransportVersion ESQL_ORDINALS_OPERATOR_STATUS_9_1 = def(9_112_0_02); // Below is the first version in 9.2 and NOT in 9.1. public static final TransportVersion PROJECT_STATE_REGISTRY_RECORDS_DELETIONS = def(9_113_0_00); public static final TransportVersion ESQL_SERIALIZE_TIMESERIES_FIELD_TYPE = def(9_114_0_00); @@ -338,6 +340,7 @@ static TransportVersion def(int id) { public static final TransportVersion ESQL_FIXED_INDEX_LIKE = def(9_119_0_00); public static final TransportVersion LOOKUP_JOIN_CCS = def(9_120_0_00); public static final TransportVersion NODE_USAGE_STATS_FOR_THREAD_POOLS_IN_CLUSTER_INFO = def(9_121_0_00); + public static final TransportVersion ESQL_ORDINALS_OPERATOR_STATUS = def(9_122_0_00); /* * STOP! READ THIS FIRST! No, really, diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashAggregationOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashAggregationOperator.java index cbce712ed9cdb..413ae248e4c07 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashAggregationOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/HashAggregationOperator.java @@ -113,7 +113,7 @@ public String describe() { /** * Total nanos for emitting the output */ - protected long emitNanos; + private long emitNanos; @SuppressWarnings("this-escape") public HashAggregationOperator( diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java index 58466cffee78e..f8ac6259cbf28 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperator.java @@ -13,7 +13,13 @@ import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefBuilder; import org.apache.lucene.util.PriorityQueue; +import org.elasticsearch.TransportVersion; +import org.elasticsearch.TransportVersions; import org.elasticsearch.common.CheckedSupplier; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BitArray; import org.elasticsearch.compute.Describable; @@ -36,7 +42,9 @@ import org.elasticsearch.core.RefCounted; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.mapper.BlockLoader; +import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; import java.io.UncheckedIOException; @@ -100,6 +108,32 @@ public String describe() { private boolean finished = false; + /** + * Nanoseconds this operator has spent processing the rows. + */ + private long totalProcessNanos; + private long ordinalsProcessNanos; + private long valuesProcessNanos; + /** + * Count of pages this operator has processed. + */ + private int pagesProcessed; + /** + * Count of rows this operator has received. + */ + private long rowsReceived; + /** + * Count of rows this operator has emitted. + */ + private long rowsEmitted; + + /** + * Nanoseconds this operator has spent emitting the results. + */ + private long totalEmitNanos; + private long ordinalsEmitNanos; + private long valuesEmitNanos; + // used to extract and aggregate values private final int maxPageSize; private ValuesAggregator valuesAggregator; @@ -135,6 +169,7 @@ public boolean needsInput() { public void addInput(Page page) { checkState(needsInput(), "Operator is already finishing"); requireNonNull(page, "page is null"); + long startNanos = System.nanoTime(); DocVector docVector = page.getBlock(docChannel).asVector(); final int shardIndex = docVector.shards().getInt(0); RefCounted shardRefCounter = docVector.shardRefCounted().get(shardIndex); @@ -142,6 +177,7 @@ public void addInput(Page page) { boolean pagePassed = false; try { if (docVector.singleSegmentNonDecreasing() && blockLoader.supportsOrdinals()) { + long ordinalsStartNanos = System.nanoTime(); final IntVector segmentIndexVector = docVector.segments(); assert segmentIndexVector.isConstant(); final OrdinalSegmentAggregator ordinalAggregator = this.ordinalAggregators.computeIfAbsent( @@ -162,7 +198,9 @@ public void addInput(Page page) { ); pagePassed = true; ordinalAggregator.addInput(docVector.docs(), page); + ordinalsProcessNanos += System.nanoTime() - ordinalsStartNanos; } else { + long valuesStartNanos = System.nanoTime(); if (valuesAggregator == null) { int channelIndex = page.getBlockCount(); // extractor will append a new block at the end valuesAggregator = new ValuesAggregator( @@ -179,11 +217,15 @@ public void addInput(Page page) { } pagePassed = true; valuesAggregator.addInput(page); + valuesProcessNanos += System.nanoTime() - valuesStartNanos; } } finally { if (pagePassed == false) { Releasables.closeExpectNoException(page::releaseBlocks); } + pagesProcessed++; + rowsReceived += page.getPositionCount(); + totalProcessNanos += System.nanoTime() - startNanos; } } @@ -208,32 +250,41 @@ public Page getOutput() { if (finished == false) { return null; } + long startNanos = System.nanoTime(); + Page page = null; if (valuesAggregator != null) { try { - return valuesAggregator.getOutput(); + page = valuesAggregator.getOutput(); } finally { final ValuesAggregator aggregator = this.valuesAggregator; this.valuesAggregator = null; Releasables.close(aggregator); + valuesEmitNanos += System.nanoTime() - startNanos; } - } - if (ordinalAggregators.isEmpty() == false) { + } else if (ordinalAggregators.isEmpty() == false) { try { - return mergeOrdinalsSegmentResults(); + page = mergeOrdinalsSegmentResults(); } catch (IOException e) { throw new UncheckedIOException(e); } finally { Releasables.close(() -> Releasables.close(ordinalAggregators.values()), ordinalAggregators::clear); + ordinalsEmitNanos += System.nanoTime() - startNanos; } } - return null; + if (page != null) { + rowsEmitted += page.getPositionCount(); + } + totalEmitNanos += System.nanoTime() - startNanos; + return page; } @Override public void finish() { finished = true; if (valuesAggregator != null) { + long startNanos = System.nanoTime(); valuesAggregator.finish(); + valuesEmitNanos += System.nanoTime() - startNanos; } } @@ -322,6 +373,21 @@ public void close() { Releasables.close(() -> Releasables.close(ordinalAggregators.values()), valuesAggregator); } + @Override + public Operator.Status status() { + return new Status( + totalProcessNanos, + ordinalsProcessNanos, + valuesProcessNanos, + totalEmitNanos, + ordinalsEmitNanos, + valuesEmitNanos, + pagesProcessed, + rowsReceived, + rowsEmitted + ); + } + private static void checkState(boolean condition, String msg) { if (condition == false) { throw new IllegalArgumentException(msg); @@ -337,6 +403,214 @@ public String toString() { return this.getClass().getSimpleName() + "[" + "aggregators=[" + aggregatorDescriptions + "]]"; } + public static class Status implements Operator.Status { + public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( + Operator.Status.class, + "ordinals_grouping", + Status::new + ); + + private final long totalProcessNanos; + private final long ordinalsProcessNanos; + private final long valuesProcessNanos; + private final long totalEmitNanos; + private final long ordinalsEmitNanos; + private final long valuesEmitNanos; + private final int pagesProcessed; + private final long rowsReceived; + private final long rowsEmitted; + + /** + * Build. + * @param totalProcessNanos Nanoseconds this operator has spent processing the rows. + * @param pagesProcessed Count of pages this operator has processed. + * @param rowsReceived Count of rows this operator has received. + * @param rowsEmitted Count of rows this operator has emitted. + */ + public Status( + long totalProcessNanos, + long ordinalsProcessNanos, + long valuesProcessNanos, + long totalEmitNanos, + long ordinalsEmitNanos, + long valuesEmitNanos, + int pagesProcessed, + long rowsReceived, + long rowsEmitted + ) { + this.totalProcessNanos = totalProcessNanos; + this.ordinalsProcessNanos = ordinalsProcessNanos; + this.valuesProcessNanos = valuesProcessNanos; + + this.totalEmitNanos = totalEmitNanos; + this.ordinalsEmitNanos = ordinalsEmitNanos; + this.valuesEmitNanos = valuesEmitNanos; + + this.pagesProcessed = pagesProcessed; + this.rowsReceived = rowsReceived; + this.rowsEmitted = rowsEmitted; + } + + protected Status(StreamInput in) throws IOException { + totalProcessNanos = in.readVLong(); + ordinalsProcessNanos = in.readVLong(); + valuesProcessNanos = in.readVLong(); + + totalEmitNanos = in.readVLong(); + ordinalsEmitNanos = in.readVLong(); + valuesEmitNanos = in.readVLong(); + + pagesProcessed = in.readVInt(); + rowsReceived = in.readVLong(); + rowsEmitted = in.readVLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(totalProcessNanos); + out.writeVLong(ordinalsProcessNanos); + out.writeVLong(valuesProcessNanos); + + out.writeVLong(totalEmitNanos); + out.writeVLong(ordinalsEmitNanos); + out.writeVLong(valuesEmitNanos); + + out.writeVInt(pagesProcessed); + out.writeVLong(rowsReceived); + out.writeVLong(rowsEmitted); + } + + @Override + public String getWriteableName() { + return ENTRY.name; + } + + public long totalProcessNanos() { + return totalProcessNanos; + } + + public long ordinalsProcessNanos() { + return ordinalsProcessNanos; + } + + public long valuesProcessNanos() { + return valuesProcessNanos; + } + + public long totalEmitNanos() { + return totalEmitNanos; + } + + public long ordinalsEmitNanos() { + return ordinalsEmitNanos; + } + + public long valuesEmitNanos() { + return valuesEmitNanos; + } + + public int pagesProcessed() { + return pagesProcessed; + } + + public long rowsReceived() { + return rowsReceived; + } + + public long rowsEmitted() { + return rowsEmitted; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + + // Process nanos + builder.field("total_process_nanos", totalProcessNanos); + if (builder.humanReadable()) { + builder.field("total_process_time", TimeValue.timeValueNanos(totalProcessNanos)); + } + builder.field("ordinals_process_nanos", ordinalsProcessNanos); + if (builder.humanReadable()) { + builder.field("ordinals_process_time", TimeValue.timeValueNanos(ordinalsProcessNanos)); + } + builder.field("values_process_nanos", valuesProcessNanos); + if (builder.humanReadable()) { + builder.field("values_process_time", TimeValue.timeValueNanos(valuesProcessNanos)); + } + + // Emit nanos + builder.field("total_emit_nanos", totalEmitNanos); + if (builder.humanReadable()) { + builder.field("total_emit_time", TimeValue.timeValueNanos(totalEmitNanos)); + } + builder.field("ordinals_emit_nanos", ordinalsEmitNanos); + if (builder.humanReadable()) { + builder.field("ordinals_emit_time", TimeValue.timeValueNanos(ordinalsEmitNanos)); + } + builder.field("values_emit_nanos", valuesEmitNanos); + if (builder.humanReadable()) { + builder.field("values_emit_time", TimeValue.timeValueNanos(valuesEmitNanos)); + } + + // Page/row counts + builder.field("pages_processed", pagesProcessed); + builder.field("rows_received", rowsReceived); + builder.field("rows_emitted", rowsEmitted); + return builder.endObject(); + + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Status status = (Status) o; + return totalProcessNanos == status.totalProcessNanos + && ordinalsProcessNanos == status.ordinalsProcessNanos + && valuesProcessNanos == status.valuesProcessNanos + && totalEmitNanos == status.totalEmitNanos + && ordinalsEmitNanos == status.ordinalsEmitNanos + && valuesEmitNanos == status.valuesEmitNanos + && pagesProcessed == status.pagesProcessed + && rowsReceived == status.rowsReceived + && rowsEmitted == status.rowsEmitted; + } + + @Override + public int hashCode() { + return Objects.hash( + totalProcessNanos, + ordinalsProcessNanos, + valuesProcessNanos, + totalEmitNanos, + ordinalsEmitNanos, + valuesEmitNanos, + pagesProcessed, + rowsReceived, + rowsEmitted + ); + } + + @Override + public String toString() { + return Strings.toString(this); + } + + @Override + public TransportVersion getMinimalSupportedVersion() { + assert false : "should never be called when supportsVersion is used"; + return TransportVersions.ESQL_ORDINALS_OPERATOR_STATUS; + } + + @Override + public boolean supportsVersion(TransportVersion version) { + return version.onOrAfter(TransportVersions.ESQL_ORDINALS_OPERATOR_STATUS) + || version.isPatchFrom(TransportVersions.ESQL_ORDINALS_OPERATOR_STATUS_9_1) + || version.isPatchFrom(TransportVersions.ESQL_ORDINALS_OPERATOR_STATUS_8_19); + } + } + record SegmentID(int shardIndex, int segmentIndex) { } diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperatorStatusTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperatorStatusTests.java new file mode 100644 index 0000000000000..2e339923d0151 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OrdinalsGroupingOperatorStatusTests.java @@ -0,0 +1,102 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.operator; + +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.test.ESTestCase; + +import static org.hamcrest.Matchers.equalTo; + +public class OrdinalsGroupingOperatorStatusTests extends AbstractWireSerializingTestCase { + public static OrdinalsGroupingOperator.Status simple() { + return new OrdinalsGroupingOperator.Status(200012, 100010, 100011, 600012, 300010, 300011, 123, 111, 222); + } + + public static String simpleToJson() { + return """ + { + "total_process_nanos" : 200012, + "total_process_time" : "200micros", + "ordinals_process_nanos" : 100010, + "ordinals_process_time" : "100micros", + "values_process_nanos" : 100011, + "values_process_time" : "100micros", + "total_emit_nanos" : 600012, + "total_emit_time" : "600micros", + "ordinals_emit_nanos" : 300010, + "ordinals_emit_time" : "300micros", + "values_emit_nanos" : 300011, + "values_emit_time" : "300micros", + "pages_processed" : 123, + "rows_received" : 111, + "rows_emitted" : 222 + }"""; + } + + public void testToXContent() { + assertThat(Strings.toString(simple(), true, true), equalTo(simpleToJson())); + } + + @Override + protected Writeable.Reader instanceReader() { + return OrdinalsGroupingOperator.Status::new; + } + + @Override + public OrdinalsGroupingOperator.Status createTestInstance() { + return new OrdinalsGroupingOperator.Status( + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeLong(), + randomNonNegativeInt(), + randomNonNegativeLong(), + randomNonNegativeLong() + ); + } + + @Override + protected OrdinalsGroupingOperator.Status mutateInstance(OrdinalsGroupingOperator.Status instance) { + long totalProcessNanos = instance.totalProcessNanos(); + long ordinalsProcessNanos = instance.ordinalsProcessNanos(); + long valuesProcessNanos = instance.valuesProcessNanos(); + long totalEmitNanos = instance.totalEmitNanos(); + long ordinalsEmitNanos = instance.ordinalsEmitNanos(); + long valuesEmitNanos = instance.valuesEmitNanos(); + int pagesProcessed = instance.pagesProcessed(); + long rowsReceived = instance.rowsReceived(); + long rowsEmitted = instance.rowsEmitted(); + switch (between(0, 8)) { + case 0 -> totalProcessNanos = randomValueOtherThan(totalProcessNanos, ESTestCase::randomNonNegativeLong); + case 1 -> ordinalsProcessNanos = randomValueOtherThan(ordinalsProcessNanos, ESTestCase::randomNonNegativeLong); + case 2 -> valuesProcessNanos = randomValueOtherThan(valuesProcessNanos, ESTestCase::randomNonNegativeLong); + case 3 -> totalEmitNanos = randomValueOtherThan(totalEmitNanos, ESTestCase::randomNonNegativeLong); + case 4 -> ordinalsEmitNanos = randomValueOtherThan(ordinalsEmitNanos, ESTestCase::randomNonNegativeLong); + case 5 -> valuesEmitNanos = randomValueOtherThan(valuesEmitNanos, ESTestCase::randomNonNegativeLong); + case 6 -> pagesProcessed = randomValueOtherThan(pagesProcessed, ESTestCase::randomNonNegativeInt); + case 7 -> rowsReceived = randomValueOtherThan(rowsReceived, ESTestCase::randomNonNegativeLong); + case 8 -> rowsEmitted = randomValueOtherThan(rowsEmitted, ESTestCase::randomNonNegativeLong); + default -> throw new UnsupportedOperationException(); + } + return new OrdinalsGroupingOperator.Status( + totalProcessNanos, + ordinalsProcessNanos, + valuesProcessNanos, + totalEmitNanos, + ordinalsEmitNanos, + valuesEmitNanos, + pagesProcessed, + rowsReceived, + rowsEmitted + ); + } +}