Skip to content

Tracking Search API Calls #18601

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 3.x]
### Added
- Add search API tracker ([#18601](https://github.com/opensearch-project/OpenSearch/pull/18601))
- Add support for linux riscv64 platform ([#18156](https://github.com/opensearch-project/OpenSearch/pull/18156))
- [Rule based auto-tagging] Add get rule API ([#17336](https://github.com/opensearch-project/OpenSearch/pull/17336))
- [Rule based auto-tagging] Add Delete Rule API ([#18184](https://github.com/opensearch-project/OpenSearch/pull/18184))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.action.search;

import org.opensearch.ExceptionsHelper;
import org.opensearch.action.OriginalIndices;
import org.opensearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
Expand Down Expand Up @@ -69,6 +70,8 @@
import org.opensearch.core.indices.breaker.CircuitBreakerService;
import org.opensearch.core.tasks.TaskId;
import org.opensearch.index.query.Rewriteable;
import org.opensearch.index.search.stats.SearchStats.Stats.SearchResponseStatusStats;
import org.opensearch.indices.IndicesService;
import org.opensearch.search.SearchPhaseResult;
import org.opensearch.search.SearchService;
import org.opensearch.search.SearchShardTarget;
Expand Down Expand Up @@ -176,6 +179,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
private final SearchPipelineService searchPipelineService;
private final SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory;
private final Tracer tracer;
private final IndicesService indicesService;

private final MetricsRegistry metricsRegistry;

Expand All @@ -198,7 +202,8 @@ public TransportSearchAction(
MetricsRegistry metricsRegistry,
SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory,
Tracer tracer,
TaskResourceTrackingService taskResourceTrackingService
TaskResourceTrackingService taskResourceTrackingService,
IndicesService indicesService
) {
super(SearchAction.NAME, transportService, actionFilters, (Writeable.Reader<SearchRequest>) SearchRequest::new);
this.client = client;
Expand All @@ -217,6 +222,7 @@ public TransportSearchAction(
this.searchRequestOperationsCompositeListenerFactory = searchRequestOperationsCompositeListenerFactory;
this.tracer = tracer;
this.taskResourceTrackingService = taskResourceTrackingService;
this.indicesService = indicesService;
}

private Map<String, AliasFilter> buildPerIndexAliasFilter(
Expand Down Expand Up @@ -305,20 +311,38 @@ long buildTookInMillis() {

@Override
protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
// searchStatusStatsUpdateListener will execute the logic within the original listener but
// also track the response status of the searchResponse.
ActionListener<SearchResponse> searchStatusStatsUpdateListener;
// only if task is of type CancellableTask and support cancellation on timeout, treat this request eligible for timeout based
// cancellation. There may be other top level requests like AsyncSearch which is using SearchRequest internally and has it's own
// cancellation mechanism. For such cases, the SearchRequest when created can override the createTask and set the
// cancelAfterTimeInterval to NO_TIMEOUT and bypass this mechanism
if (task instanceof CancellableTask) {
listener = TimeoutTaskCancellationUtility.wrapWithCancellationListener(
ActionListener<SearchResponse> cancellationListener = TimeoutTaskCancellationUtility.wrapWithCancellationListener(
client,
(CancellableTask) task,
clusterService.getClusterSettings().get(SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING),
listener,
e -> {}
);
searchStatusStatsUpdateListener = ActionListener.wrap((searchResponse) -> {
cancellationListener.onResponse(searchResponse);
indicesService.getSearchResponseStatusStats().inc(searchResponse.status());
}, (e) -> {
cancellationListener.onFailure(e);
indicesService.getSearchResponseStatusStats().inc(ExceptionsHelper.status(e));
});
} else {
searchStatusStatsUpdateListener = ActionListener.wrap((searchResponse) -> {
listener.onResponse(searchResponse);
indicesService.getSearchResponseStatusStats().inc(searchResponse.status());
}, (e) -> {
listener.onFailure(e);
indicesService.getSearchResponseStatusStats().inc(ExceptionsHelper.status(e));
});
}
executeRequest(task, searchRequest, this::searchAsyncAction, listener);
executeRequest(task, searchRequest, this::searchAsyncAction, searchStatusStatsUpdateListener);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.ToXContentFragment;
Expand All @@ -51,6 +52,7 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

/**
* Encapsulates stats for search time
Expand Down Expand Up @@ -169,12 +171,16 @@ public static class Stats implements Writeable, ToXContentFragment {
@Nullable
private RequestStatsLongHolder requestStatsLongHolder;

@Nullable
private SearchResponseStatusStats searchResponseStatusStats;

public RequestStatsLongHolder getRequestStatsLongHolder() {
return requestStatsLongHolder;
}

private Stats() {
// for internal use, initializes all counts to 0
this.searchResponseStatusStats = new SearchResponseStatusStats();
}

public Stats(
Expand All @@ -197,7 +203,8 @@ public Stats(
long suggestCount,
long suggestTimeInMillis,
long suggestCurrent,
long searchIdleReactivateCount
long searchIdleReactivateCount,
SearchResponseStatusStats searchResponseStatusStats
) {
this.requestStatsLongHolder = new RequestStatsLongHolder();
this.queryCount = queryCount;
Expand Down Expand Up @@ -226,6 +233,7 @@ public Stats(
this.pitCurrent = pitCurrent;

this.searchIdleReactivateCount = searchIdleReactivateCount;
this.searchResponseStatusStats = searchResponseStatusStats;
}

private Stats(StreamInput in) throws IOException {
Expand Down Expand Up @@ -265,6 +273,12 @@ private Stats(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_2_14_0)) {
searchIdleReactivateCount = in.readVLong();
}

if (in.getVersion().onOrAfter(Version.V_3_1_0)) {
searchResponseStatusStats = in.readOptionalWriteable(SearchResponseStatusStats::new);
} else {
searchResponseStatusStats = null;
}
}

public void add(Stats stats) {
Expand Down Expand Up @@ -294,6 +308,10 @@ public void add(Stats stats) {
pitCurrent += stats.pitCurrent;

searchIdleReactivateCount += stats.searchIdleReactivateCount;

if (getSearchResponseStatusStats() != null) {
getSearchResponseStatusStats().add(stats.getSearchResponseStatusStats());
}
}

public void addForClosingShard(Stats stats) {
Expand All @@ -320,6 +338,10 @@ public void addForClosingShard(Stats stats) {
queryConcurrency += stats.queryConcurrency;

searchIdleReactivateCount += stats.searchIdleReactivateCount;

if (getSearchResponseStatusStats() != null) {
getSearchResponseStatusStats().add(stats.getSearchResponseStatusStats());
}
}

public long getQueryCount() {
Expand Down Expand Up @@ -430,6 +452,10 @@ public long getSearchIdleReactivateCount() {
return searchIdleReactivateCount;
}

public SearchResponseStatusStats getSearchResponseStatusStats() {
return searchResponseStatusStats;
}

public static Stats readStats(StreamInput in) throws IOException {
return new Stats(in);
}
Expand Down Expand Up @@ -479,6 +505,10 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_14_0)) {
out.writeVLong(searchIdleReactivateCount);
}

if (out.getVersion().onOrAfter(Version.V_3_1_0)) {
out.writeOptionalWriteable(searchResponseStatusStats);
}
}

@Override
Expand Down Expand Up @@ -535,8 +565,95 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}
builder.endObject();
}

if (getSearchResponseStatusStats() != null) {
getSearchResponseStatusStats().toXContent(builder, params);
}

return builder;
}

/**
* Tracks rest category class codes from search requests
*
* @opensearch.api
*/
@PublicApi(since = "1.0.0")
public static class SearchResponseStatusStats implements Writeable, ToXContentFragment {
final AtomicLong[] searchResponseStatusCounter;

public SearchResponseStatusStats() {
searchResponseStatusCounter = new AtomicLong[5];
for (int i = 0; i < searchResponseStatusCounter.length; i++) {
searchResponseStatusCounter[i] = new AtomicLong();
}
}

public SearchResponseStatusStats(StreamInput in) throws IOException {
searchResponseStatusCounter = in.readArray(i -> new AtomicLong(i.readLong()), AtomicLong[]::new);

assert searchResponseStatusCounter.length == 5 : "Length of incoming array should be 5! Got " + searchResponseStatusCounter.length;
}

/**
* Increment counter for status
*
* @param status {@link RestStatus}
*/
public void inc(final RestStatus status) {
add(status, 1L);
}

/**
* Increment counter for status by count
*
* @param status {@link RestStatus}
* @param delta The value to add
*/
void add(final RestStatus status, final long delta) {
searchResponseStatusCounter[status.getStatusFamilyCode() - 1].addAndGet(delta);
}

/**
* Accumulate stats from the passed Object
*
* @param stats Instance storing {@link SearchResponseStatusStats}
*/
public void add(final SearchResponseStatusStats stats) {
if (null == stats) {
return;
}

for (int i = 0; i < searchResponseStatusCounter.length; ++i) {
searchResponseStatusCounter[i].addAndGet(stats.searchResponseStatusCounter[i].longValue());
}
}

public AtomicLong[] getSearchResponseStatusCounter() {
return searchResponseStatusCounter;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.SEARCH_RESPONSE_STATUS);

for (int i = 0; i < searchResponseStatusCounter.length; ++i) {
long value = searchResponseStatusCounter[i].longValue();

if (value > 0) {
String key = i + 1 + "xx";
builder.field(key, value);
}
}

return builder.endObject();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeArray((o, v) -> o.writeLong(v.longValue()), searchResponseStatusCounter);
}
}
}

private final Stats totalStats;
Expand Down Expand Up @@ -700,6 +817,7 @@ static final class Fields {
static final String TOTAL = "total";
static final String SEARCH_IDLE_REACTIVATE_COUNT_TOTAL = "search_idle_reactivate_count_total";
static final String TOOK = "took";
static final String SEARCH_RESPONSE_STATUS = "search_response_status";

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,8 @@ SearchStats.Stats stats() {
suggestMetric.count(),
TimeUnit.NANOSECONDS.toMillis(suggestMetric.sum()),
suggestCurrent.count(),
searchIdleMetric.count()
searchIdleMetric.count(),
new SearchStats.Stats.SearchResponseStatusStats()
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@
import org.opensearch.index.refresh.RefreshStats;
import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory;
import org.opensearch.index.search.stats.SearchStats;
import org.opensearch.index.search.stats.SearchStats.Stats.SearchResponseStatusStats;
import org.opensearch.index.seqno.RetentionLeaseStats;
import org.opensearch.index.seqno.RetentionLeaseSyncer;
import org.opensearch.index.seqno.SeqNoStats;
Expand Down Expand Up @@ -1317,6 +1318,13 @@ public void addDocStatusStats(final DocStatusStats stats) {
oldShardsStats.indexingStats.getTotal().getDocStatusStats().add(stats);
}

/**
* Retrieves the current statistics for search response.
*/
public SearchResponseStatusStats getSearchResponseStatusStats() {
return oldShardsStats.searchStats.getTotal().getSearchResponseStatusStats();
}

/**
* Statistics for old shards
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.search.stats.SearchStats.Stats;
import org.opensearch.index.search.stats.SearchStats.Stats.SearchResponseStatusStats;
import org.opensearch.test.OpenSearchTestCase;

import java.util.HashMap;
Expand All @@ -58,9 +59,9 @@ public void testShardLevelSearchGroupStats() throws Exception {
// let's create two dummy search stats with groups
Map<String, Stats> groupStats1 = new HashMap<>();
Map<String, Stats> groupStats2 = new HashMap<>();
groupStats2.put("group1", new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1));
SearchStats searchStats1 = new SearchStats(new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1), 0, groupStats1);
SearchStats searchStats2 = new SearchStats(new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1), 0, groupStats2);
groupStats2.put("group1", new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, new SearchResponseStatusStats()));
SearchStats searchStats1 = new SearchStats(new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, new SearchResponseStatusStats()), 0, groupStats1);
SearchStats searchStats2 = new SearchStats(new Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, new SearchResponseStatusStats()), 0, groupStats2);

// adding these two search stats and checking group stats are correct
searchStats1.add(searchStats2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2398,7 +2398,8 @@ public void onFailure(final Exception e) {
NoopMetricsRegistry.INSTANCE,
searchRequestOperationsCompositeListenerFactory,
NoopTracer.INSTANCE,
new TaskResourceTrackingService(settings, clusterSettings, threadPool)
new TaskResourceTrackingService(settings, clusterSettings, threadPool),
mock(IndicesService.class)
)
);
actions.put(
Expand Down
Loading