Skip to content

ESQL: Added status to OrdinalsGroupingOperator #130720

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ static TransportVersion def(int id) {
public static final TransportVersion ESQL_SPLIT_ON_BIG_VALUES = def(9_116_0_00);
public static final TransportVersion ESQL_LOCAL_RELATION_WITH_NEW_BLOCKS = def(9_117_0_00);
public static final TransportVersion ML_INFERENCE_CUSTOM_SERVICE_EMBEDDING_TYPE = def(9_118_0_00);
public static final TransportVersion ESQL_ORDINALS_OPERATOR_STATUS = def(9_119_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public String describe() {
/**
* Total nanos for emitting the output
*/
protected long emitNanos;
private long emitNanos;

@SuppressWarnings("this-escape")
public HashAggregationOperator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,13 @@
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.apache.lucene.util.PriorityQueue;
import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.CheckedSupplier;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.BitArray;
import org.elasticsearch.compute.Describable;
Expand All @@ -36,7 +42,9 @@
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.mapper.BlockLoader;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.io.UncheckedIOException;
Expand Down Expand Up @@ -100,6 +108,32 @@ public String describe() {

private boolean finished = false;

/**
* Nanoseconds this operator has spent processing the rows.
*/
private long totalProcessNanos;
private long ordinalsProcessNanos;
private long valuesProcessNanos;
/**
* Count of pages this operator has processed.
*/
private int pagesProcessed;
/**
* Count of rows this operator has received.
*/
private long rowsReceived;
/**
* Count of rows this operator has emitted.
*/
private long rowsEmitted;

/**
* Nanoseconds this operator has spent emitting the results.
*/
private long totalEmitNanos;
private long ordinalsEmitNanos;
private long valuesEmitNanos;

// used to extract and aggregate values
private final int maxPageSize;
private ValuesAggregator valuesAggregator;
Expand Down Expand Up @@ -135,13 +169,15 @@ public boolean needsInput() {
public void addInput(Page page) {
checkState(needsInput(), "Operator is already finishing");
requireNonNull(page, "page is null");
long startNanos = System.nanoTime();
DocVector docVector = page.<DocBlock>getBlock(docChannel).asVector();
final int shardIndex = docVector.shards().getInt(0);
RefCounted shardRefCounter = docVector.shardRefCounted().get(shardIndex);
final var blockLoader = blockLoaders.apply(shardIndex);
boolean pagePassed = false;
try {
if (docVector.singleSegmentNonDecreasing() && blockLoader.supportsOrdinals()) {
long ordinalsStartNanos = System.nanoTime();
final IntVector segmentIndexVector = docVector.segments();
assert segmentIndexVector.isConstant();
final OrdinalSegmentAggregator ordinalAggregator = this.ordinalAggregators.computeIfAbsent(
Expand All @@ -162,7 +198,9 @@ public void addInput(Page page) {
);
pagePassed = true;
ordinalAggregator.addInput(docVector.docs(), page);
ordinalsProcessNanos += System.nanoTime() - ordinalsStartNanos;
} else {
long valuesStartNanos = System.nanoTime();
if (valuesAggregator == null) {
int channelIndex = page.getBlockCount(); // extractor will append a new block at the end
valuesAggregator = new ValuesAggregator(
Expand All @@ -179,11 +217,15 @@ public void addInput(Page page) {
}
pagePassed = true;
valuesAggregator.addInput(page);
valuesProcessNanos += System.nanoTime() - valuesStartNanos;
}
} finally {
if (pagePassed == false) {
Releasables.closeExpectNoException(page::releaseBlocks);
}
pagesProcessed++;
rowsReceived += page.getPositionCount();
totalProcessNanos += System.nanoTime() - startNanos;
}
}

Expand All @@ -208,32 +250,41 @@ public Page getOutput() {
if (finished == false) {
return null;
}
long startNanos = System.nanoTime();
Page page = null;
if (valuesAggregator != null) {
try {
return valuesAggregator.getOutput();
page = valuesAggregator.getOutput();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you count the time of this one?

} finally {
final ValuesAggregator aggregator = this.valuesAggregator;
this.valuesAggregator = null;
Releasables.close(aggregator);
valuesEmitNanos += System.nanoTime() - startNanos;
}
}
if (ordinalAggregators.isEmpty() == false) {
} else if (ordinalAggregators.isEmpty() == false) {
try {
return mergeOrdinalsSegmentResults();
page = mergeOrdinalsSegmentResults();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And this one?

That way in the profile output we can tell which of the two we used.

} catch (IOException e) {
throw new UncheckedIOException(e);
} finally {
Releasables.close(() -> Releasables.close(ordinalAggregators.values()), ordinalAggregators::clear);
ordinalsEmitNanos += System.nanoTime() - startNanos;
}
}
return null;
if (page != null) {
rowsEmitted += page.getPositionCount();
}
totalEmitNanos += System.nanoTime() - startNanos;
return page;
}

@Override
public void finish() {
finished = true;
if (valuesAggregator != null) {
long startNanos = System.nanoTime();
valuesAggregator.finish();
valuesEmitNanos += System.nanoTime() - startNanos;
}
}

Expand Down Expand Up @@ -322,6 +373,21 @@ public void close() {
Releasables.close(() -> Releasables.close(ordinalAggregators.values()), valuesAggregator);
}

@Override
public Operator.Status status() {
return new Status(
totalProcessNanos,
ordinalsProcessNanos,
valuesProcessNanos,
totalEmitNanos,
ordinalsEmitNanos,
valuesEmitNanos,
pagesProcessed,
rowsReceived,
rowsEmitted
);
}

private static void checkState(boolean condition, String msg) {
if (condition == false) {
throw new IllegalArgumentException(msg);
Expand All @@ -337,6 +403,206 @@ public String toString() {
return this.getClass().getSimpleName() + "[" + "aggregators=[" + aggregatorDescriptions + "]]";
}

public static class Status implements Operator.Status {
public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
Operator.Status.class,
"ordinals_grouping",
Status::new
);

private final long totalProcessNanos;
private final long ordinalsProcessNanos;
private final long valuesProcessNanos;
private final long totalEmitNanos;
private final long ordinalsEmitNanos;
private final long valuesEmitNanos;
private final int pagesProcessed;
private final long rowsReceived;
private final long rowsEmitted;

/**
* Build.
* @param totalProcessNanos Nanoseconds this operator has spent processing the rows.
* @param pagesProcessed Count of pages this operator has processed.
* @param rowsReceived Count of rows this operator has received.
* @param rowsEmitted Count of rows this operator has emitted.
*/
public Status(
long totalProcessNanos,
long ordinalsProcessNanos,
long valuesProcessNanos,
long totalEmitNanos,
long ordinalsEmitNanos,
long valuesEmitNanos,
int pagesProcessed,
long rowsReceived,
long rowsEmitted
) {
this.totalProcessNanos = totalProcessNanos;
this.ordinalsProcessNanos = ordinalsProcessNanos;
this.valuesProcessNanos = valuesProcessNanos;

this.totalEmitNanos = totalEmitNanos;
this.ordinalsEmitNanos = ordinalsEmitNanos;
this.valuesEmitNanos = valuesEmitNanos;

this.pagesProcessed = pagesProcessed;
this.rowsReceived = rowsReceived;
this.rowsEmitted = rowsEmitted;
}

protected Status(StreamInput in) throws IOException {
totalProcessNanos = in.readVLong();
ordinalsProcessNanos = in.readVLong();
valuesProcessNanos = in.readVLong();

totalEmitNanos = in.readVLong();
ordinalsEmitNanos = in.readVLong();
valuesEmitNanos = in.readVLong();

pagesProcessed = in.readVInt();
rowsReceived = in.readVLong();
rowsEmitted = in.readVLong();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(totalProcessNanos);
out.writeVLong(ordinalsProcessNanos);
out.writeVLong(valuesProcessNanos);

out.writeVLong(totalEmitNanos);
out.writeVLong(ordinalsEmitNanos);
out.writeVLong(valuesEmitNanos);

out.writeVInt(pagesProcessed);
out.writeVLong(rowsReceived);
out.writeVLong(rowsEmitted);
}

@Override
public String getWriteableName() {
return ENTRY.name;
}

public long totalProcessNanos() {
return totalProcessNanos;
}

public long ordinalsProcessNanos() {
return ordinalsProcessNanos;
}

public long valuesProcessNanos() {
return valuesProcessNanos;
}

public long totalEmitNanos() {
return totalEmitNanos;
}

public long ordinalsEmitNanos() {
return ordinalsEmitNanos;
}

public long valuesEmitNanos() {
return valuesEmitNanos;
}

public int pagesProcessed() {
return pagesProcessed;
}

public long rowsReceived() {
return rowsReceived;
}

public long rowsEmitted() {
return rowsEmitted;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();

// Process nanos
builder.field("total_process_nanos", totalProcessNanos);
if (builder.humanReadable()) {
builder.field("total_process_time", TimeValue.timeValueNanos(totalProcessNanos));
}
builder.field("ordinals_process_nanos", ordinalsProcessNanos);
if (builder.humanReadable()) {
builder.field("ordinals_process_time", TimeValue.timeValueNanos(ordinalsProcessNanos));
}
builder.field("values_process_nanos", valuesProcessNanos);
if (builder.humanReadable()) {
builder.field("values_process_time", TimeValue.timeValueNanos(valuesProcessNanos));
}

// Emit nanos
builder.field("total_emit_nanos", totalEmitNanos);
if (builder.humanReadable()) {
builder.field("total_emit_time", TimeValue.timeValueNanos(totalEmitNanos));
}
builder.field("ordinals_emit_nanos", ordinalsEmitNanos);
if (builder.humanReadable()) {
builder.field("ordinals_emit_time", TimeValue.timeValueNanos(ordinalsEmitNanos));
}
builder.field("values_emit_nanos", valuesEmitNanos);
if (builder.humanReadable()) {
builder.field("values_emit_time", TimeValue.timeValueNanos(valuesEmitNanos));
}

// Page/row counts
builder.field("pages_processed", pagesProcessed);
builder.field("rows_received", rowsReceived);
builder.field("rows_emitted", rowsEmitted);
return builder.endObject();

}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Status status = (Status) o;
return totalProcessNanos == status.totalProcessNanos
&& ordinalsProcessNanos == status.ordinalsProcessNanos
&& valuesProcessNanos == status.valuesProcessNanos
&& totalEmitNanos == status.totalEmitNanos
&& ordinalsEmitNanos == status.ordinalsEmitNanos
&& valuesEmitNanos == status.valuesEmitNanos
&& pagesProcessed == status.pagesProcessed
&& rowsReceived == status.rowsReceived
&& rowsEmitted == status.rowsEmitted;
}

@Override
public int hashCode() {
return Objects.hash(
totalProcessNanos,
ordinalsProcessNanos,
valuesProcessNanos,
totalEmitNanos,
ordinalsEmitNanos,
valuesEmitNanos,
pagesProcessed,
rowsReceived,
rowsEmitted
);
}

@Override
public String toString() {
return Strings.toString(this);
}

@Override
public TransportVersion getMinimalSupportedVersion() {
return TransportVersions.ESQL_ORDINALS_OPERATOR_STATUS;
}
}

record SegmentID(int shardIndex, int segmentIndex) {

}
Expand Down
Loading