Skip to content

Commit 14aefc8

Browse files
committed
SNAPSHOT - Refactor to move substream check earlier in indexing process
1 parent cc64df8 commit 14aefc8

File tree

2 files changed

+54
-27
lines changed

2 files changed

+54
-27
lines changed

server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java

Lines changed: 23 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import org.elasticsearch.cluster.routing.IndexRouting;
4444
import org.elasticsearch.cluster.service.ClusterService;
4545
import org.elasticsearch.common.collect.Iterators;
46-
import org.elasticsearch.common.streams.StreamType;
4746
import org.elasticsearch.common.streams.StreamsPermissionsUtils;
4847
import org.elasticsearch.common.util.concurrent.AtomicArray;
4948
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
@@ -61,8 +60,6 @@
6160

6261
import java.io.IOException;
6362
import java.util.ArrayList;
64-
import java.util.Arrays;
65-
import java.util.EnumSet;
6663
import java.util.HashMap;
6764
import java.util.Iterator;
6865
import java.util.List;
@@ -74,7 +71,6 @@
7471
import java.util.function.BiConsumer;
7572
import java.util.function.Consumer;
7673
import java.util.function.LongSupplier;
77-
import java.util.stream.Collectors;
7874

7975
import static org.elasticsearch.action.bulk.TransportBulkAction.LAZY_ROLLOVER_ORIGIN;
8076
import static org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.EXCLUDED_DATA_STREAMS_KEY;
@@ -283,29 +279,29 @@ private long buildTookInMillis(long startTimeNanos) {
283279
}
284280

285281
private Map<ShardId, List<BulkItemRequest>> groupBulkRequestsByShards(ClusterState clusterState) {
286-
ProjectMetadata projectMetadata = projectResolver.getProjectMetadata(clusterState);
287-
288-
Set<StreamType> enabledStreamTypes = Arrays.stream(StreamType.values())
289-
.filter(t -> streamsPermissionsUtils.streamTypeIsEnabled(t, projectMetadata))
290-
.collect(Collectors.toCollection(() -> EnumSet.noneOf(StreamType.class)));
291-
292-
for (StreamType streamType : enabledStreamTypes) {
293-
for (int i = 0; i < bulkRequest.requests.size(); i++) {
294-
DocWriteRequest<?> req = bulkRequest.requests.get(i);
295-
String prefix = streamType.getStreamName() + ".";
296-
if (req != null && req.index().startsWith(prefix)) {
297-
IllegalArgumentException exception = new IllegalArgumentException(
298-
"Writes to child stream ["
299-
+ req.index()
300-
+ "] are not allowed, use the parent stream instead: ["
301-
+ streamType.getStreamName()
302-
+ "]"
303-
);
304-
IndexDocFailureStoreStatus failureStoreStatus = processFailure(new BulkItemRequest(i, req), projectMetadata, exception);
305-
addFailureAndDiscardRequest(req, i, req.index(), exception, failureStoreStatus);
306-
}
307-
}
308-
}
282+
// ProjectMetadata projectMetadata = projectResolver.getProjectMetadata(clusterState);
283+
//
284+
// Set<StreamType> enabledStreamTypes = Arrays.stream(StreamType.values())
285+
// .filter(t -> streamsPermissionsUtils.streamTypeIsEnabled(t, projectMetadata))
286+
// .collect(Collectors.toCollection(() -> EnumSet.noneOf(StreamType.class)));
287+
//
288+
// for (StreamType streamType : enabledStreamTypes) {
289+
// for (int i = 0; i < bulkRequest.requests.size(); i++) {
290+
// DocWriteRequest<?> req = bulkRequest.requests.get(i);
291+
// String prefix = streamType.getStreamName() + ".";
292+
// if (req != null && req.index().startsWith(prefix)) {
293+
// IllegalArgumentException exception = new IllegalArgumentException(
294+
// "Writes to child stream ["
295+
// + req.index()
296+
// + "] are not allowed, use the parent stream instead: ["
297+
// + streamType.getStreamName()
298+
// + "]"
299+
// );
300+
// IndexDocFailureStoreStatus failureStoreStatus = processFailure(new BulkItemRequest(i, req), projectMetadata, exception);
301+
// addFailureAndDiscardRequest(req, i, req.index(), exception, failureStoreStatus);
302+
// }
303+
// }
304+
// }
309305

310306
return groupRequestsByShards(
311307
clusterState,

server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import org.elasticsearch.cluster.project.ProjectResolver;
3333
import org.elasticsearch.cluster.service.ClusterService;
3434
import org.elasticsearch.common.io.stream.Writeable;
35+
import org.elasticsearch.common.streams.StreamType;
36+
import org.elasticsearch.common.streams.StreamsPermissionsUtils;
3537
import org.elasticsearch.common.util.concurrent.EsExecutors;
3638
import org.elasticsearch.core.Assertions;
3739
import org.elasticsearch.core.FixForMultiProject;
@@ -46,12 +48,16 @@
4648
import org.elasticsearch.transport.TransportService;
4749

4850
import java.io.IOException;
51+
import java.util.Arrays;
52+
import java.util.EnumSet;
4953
import java.util.HashMap;
5054
import java.util.Map;
5155
import java.util.Objects;
56+
import java.util.Set;
5257
import java.util.concurrent.Executor;
5358
import java.util.concurrent.TimeUnit;
5459
import java.util.function.LongSupplier;
60+
import java.util.stream.Collectors;
5561

5662
/**
5763
* This is an abstract base class for bulk actions. It traverses all indices that the request gets routed to, executes all applicable
@@ -400,6 +406,31 @@ private void applyPipelinesAndDoInternalExecute(
400406
ActionListener<BulkResponse> listener
401407
) throws IOException {
402408
final long relativeStartTimeNanos = relativeTimeNanos();
409+
410+
// Validate child stream writes before processing pipelines
411+
ProjectMetadata projectMetadata = projectResolver.getProjectMetadata(clusterService.state());
412+
Set<StreamType> enabledStreamTypes = Arrays.stream(StreamType.values())
413+
.filter(t -> StreamsPermissionsUtils.getInstance().streamTypeIsEnabled(t, projectMetadata))
414+
.collect(Collectors.toCollection(() -> EnumSet.noneOf(StreamType.class)));
415+
416+
BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(bulkRequest);
417+
418+
for (StreamType streamType : enabledStreamTypes) {
419+
for (int i = 0; i < bulkRequest.requests.size(); i++) {
420+
DocWriteRequest<?> req = bulkRequest.requests.get(i);
421+
String prefix = streamType.getStreamName() + ".";
422+
if (req != null && req.index() != null && req.index().startsWith(prefix)) {
423+
IllegalArgumentException e = new IllegalArgumentException("Can't write to child stream");
424+
Boolean failureStore = resolveFailureStore(req.index(), projectMetadata, threadPool.absoluteTimeInMillis());
425+
if (failureStore != null && failureStore) {
426+
bulkRequestModifier.markItemForFailureStore(i, req.index(), e);
427+
} else {
428+
bulkRequestModifier.markItemAsFailed(i, e, IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN);
429+
}
430+
}
431+
}
432+
}
433+
403434
if (applyPipelines(task, bulkRequest, executor, listener) == false) {
404435
doInternalExecute(task, bulkRequest, executor, listener, relativeStartTimeNanos);
405436
}

0 commit comments

Comments
 (0)