From 689e95d84c28ac42e7ffd6dce6500c61c8407fc4 Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Tue, 1 Jul 2025 14:27:10 +0100 Subject: [PATCH 01/11] Reroute code complete --- .../ingest/common/IngestCommonPlugin.java | 2 +- .../ingest/common/RerouteProcessor.java | 35 ++++++++++- .../common/RerouteProcessorFactoryTests.java | 7 ++- .../ingest/common/RerouteProcessorTests.java | 41 ++++++++++++- server/src/main/java/module-info.java | 1 + .../common/streams/StreamTypes.java | 25 ++++++++ .../streams/StreamsPermissionsUtils.java | 61 +++++++++++++++++++ 7 files changed, 166 insertions(+), 6 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/common/streams/StreamTypes.java create mode 100644 server/src/main/java/org/elasticsearch/common/streams/StreamsPermissionsUtils.java diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java index 6e517d644cadb..47fcf4f89bfae 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java @@ -65,7 +65,7 @@ public Map getProcessors(Processor.Parameters paramet entry(RegisteredDomainProcessor.TYPE, new RegisteredDomainProcessor.Factory()), entry(RemoveProcessor.TYPE, new RemoveProcessor.Factory(parameters.scriptService)), entry(RenameProcessor.TYPE, new RenameProcessor.Factory(parameters.scriptService)), - entry(RerouteProcessor.TYPE, new RerouteProcessor.Factory()), + entry(RerouteProcessor.TYPE, new RerouteProcessor.Factory(parameters.ingestService)), entry(ScriptProcessor.TYPE, new ScriptProcessor.Factory(parameters.scriptService)), entry(SetProcessor.TYPE, new SetProcessor.Factory(parameters.scriptService)), entry(SortProcessor.TYPE, new SortProcessor.Factory()), diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RerouteProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RerouteProcessor.java index 6580a5af3d005..1fb7205b0856c 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RerouteProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RerouteProcessor.java @@ -10,10 +10,14 @@ package org.elasticsearch.ingest.common; import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.streams.StreamsPermissionsUtils; import org.elasticsearch.core.Nullable; import org.elasticsearch.ingest.AbstractProcessor; import org.elasticsearch.ingest.ConfigurationUtils; import org.elasticsearch.ingest.IngestDocument; +import org.elasticsearch.ingest.IngestService; import org.elasticsearch.ingest.Processor; import java.util.List; @@ -45,6 +49,9 @@ public final class RerouteProcessor extends AbstractProcessor { private final List dataset; private final List namespace; private final String destination; + private final ClusterService clusterService; + private final ProjectId projectId; + private final StreamsPermissionsUtils streamsPermissionsUtils; RerouteProcessor( String tag, @@ -52,7 +59,10 @@ public final class RerouteProcessor extends AbstractProcessor { List type, List dataset, List namespace, - String destination + String destination, + ClusterService clusterService, + ProjectId projectId, + StreamsPermissionsUtils streamsPermissionsUtils ) { super(tag, description); if (type.isEmpty()) { @@ -71,11 +81,16 @@ public final class RerouteProcessor extends AbstractProcessor { this.namespace = namespace; } this.destination = destination; + this.clusterService = clusterService; + this.projectId = projectId; + this.streamsPermissionsUtils = streamsPermissionsUtils; } @Override public IngestDocument execute(IngestDocument ingestDocument) throws Exception { if (destination != null) { + ProjectMetadata projectMetadata = clusterService.state().projectState(projectId).metadata(); + streamsPermissionsUtils.throwIfRetrouteToSubstreamNotAllowed(projectMetadata, ingestDocument.getIndexHistory(), destination); ingestDocument.reroute(destination); return ingestDocument; } @@ -171,6 +186,12 @@ String getDestination() { public static final class Factory implements Processor.Factory { + private final IngestService ingestService; + + public Factory(IngestService ingestService) { + this.ingestService = ingestService; + } + @Override public RerouteProcessor create( Map processorFactories, @@ -212,7 +233,17 @@ public RerouteProcessor create( throw newConfigurationException(TYPE, tag, "destination", "can only be set if type, dataset, and namespace are not set"); } - return new RerouteProcessor(tag, description, type, dataset, namespace, destination); + return new RerouteProcessor( + tag, + description, + type, + dataset, + namespace, + destination, + ingestService.getClusterService(), + projectId, + StreamsPermissionsUtils.getInstance() + ); } } diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/RerouteProcessorFactoryTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/RerouteProcessorFactoryTests.java index 4906cf7057cec..57a7d0e09853c 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/RerouteProcessorFactoryTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/RerouteProcessorFactoryTests.java @@ -10,6 +10,8 @@ package org.elasticsearch.ingest.common; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.ingest.IngestService; import org.elasticsearch.ingest.common.RerouteProcessor.DataStreamValueSource; import org.elasticsearch.test.ESTestCase; @@ -18,6 +20,8 @@ import java.util.Map; import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Answers.RETURNS_SMART_NULLS; +import static org.mockito.Mockito.mock; public class RerouteProcessorFactoryTests extends ESTestCase { @@ -74,6 +78,7 @@ private static RerouteProcessor create(String dataset, String namespace) throws } private static RerouteProcessor create(Map config) throws Exception { - return new RerouteProcessor.Factory().create(null, null, null, new HashMap<>(config), null); + IngestService ingestService = mock(IngestService.class, RETURNS_SMART_NULLS); + return new RerouteProcessor.Factory(ingestService).create(null, null, null, new HashMap<>(config), ProjectId.DEFAULT); } } diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/RerouteProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/RerouteProcessorTests.java index 55eebbc160486..411b495b975ab 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/RerouteProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/RerouteProcessorTests.java @@ -9,6 +9,11 @@ package org.elasticsearch.ingest.common; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.streams.StreamsPermissionsUtils; import org.elasticsearch.ingest.CompoundProcessor; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Processor; @@ -16,14 +21,33 @@ import org.elasticsearch.ingest.TestProcessor; import org.elasticsearch.ingest.WrappingProcessor; import org.elasticsearch.test.ESTestCase; +import org.junit.Before; import java.util.List; import java.util.Map; import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class RerouteProcessorTests extends ESTestCase { + private final StreamsPermissionsUtils streamsPermissionsUtilsMock = mock(StreamsPermissionsUtils.class); + private final ClusterService clusterServiceMock = mock(ClusterService.class, RETURNS_DEEP_STUBS); + + @Before + public void setUpStreamsPermissionsUtils() { + ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE) + .putProjectMetadata(ProjectMetadata.builder(ProjectId.DEFAULT).build()) + .build(); + when(clusterServiceMock.state()).thenReturn(clusterState); + doNothing().when(streamsPermissionsUtilsMock).throwIfRetrouteToSubstreamNotAllowed(any(), any(), anyString()); + } + public void testDefaults() throws Exception { IngestDocument ingestDocument = createIngestDocument("logs-generic-default"); @@ -291,12 +315,25 @@ private RerouteProcessor createRerouteProcessor(List type, List type.stream().map(RerouteProcessor.DataStreamValueSource::type).toList(), dataset.stream().map(RerouteProcessor.DataStreamValueSource::dataset).toList(), namespace.stream().map(RerouteProcessor.DataStreamValueSource::namespace).toList(), - null + null, + clusterServiceMock, + ProjectId.DEFAULT, + streamsPermissionsUtilsMock ); } private RerouteProcessor createRerouteProcessor(String destination) { - return new RerouteProcessor(null, null, List.of(), List.of(), List.of(), destination); + return new RerouteProcessor( + null, + null, + List.of(), + List.of(), + List.of(), + destination, + clusterServiceMock, + ProjectId.DEFAULT, + streamsPermissionsUtilsMock + ); } private void assertDataSetFields(IngestDocument ingestDocument, String type, String dataset, String namespace) { diff --git a/server/src/main/java/module-info.java b/server/src/main/java/module-info.java index ef3d5da1c9531..b797932367218 100644 --- a/server/src/main/java/module-info.java +++ b/server/src/main/java/module-info.java @@ -213,6 +213,7 @@ exports org.elasticsearch.common.regex; exports org.elasticsearch.common.scheduler; exports org.elasticsearch.common.settings; + exports org.elasticsearch.common.streams; exports org.elasticsearch.common.text; exports org.elasticsearch.common.time; exports org.elasticsearch.common.transport; diff --git a/server/src/main/java/org/elasticsearch/common/streams/StreamTypes.java b/server/src/main/java/org/elasticsearch/common/streams/StreamTypes.java new file mode 100644 index 0000000000000..e193742d1586c --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/streams/StreamTypes.java @@ -0,0 +1,25 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.common.streams; + +public enum StreamTypes { + + LOGS("logs"); + + private final String streamName; + + StreamTypes(String streamName) { + this.streamName = streamName; + } + + public String getStreamName() { + return streamName; + } +} diff --git a/server/src/main/java/org/elasticsearch/common/streams/StreamsPermissionsUtils.java b/server/src/main/java/org/elasticsearch/common/streams/StreamsPermissionsUtils.java new file mode 100644 index 0000000000000..91ef0c5c2db9d --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/streams/StreamsPermissionsUtils.java @@ -0,0 +1,61 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.common.streams; + +import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.cluster.metadata.StreamsMetadata; + +import java.util.Set; + +public class StreamsPermissionsUtils { + + private static volatile StreamsPermissionsUtils INSTANCE = null; + + // Visible for testing only + StreamsPermissionsUtils() {} + + public static StreamsPermissionsUtils getInstance() { + if (INSTANCE == null) { + synchronized (StreamsPermissionsUtils.class) { + if (INSTANCE == null) { + INSTANCE = new StreamsPermissionsUtils(); + } + } + } + return INSTANCE; + } + + public void throwIfRetrouteToSubstreamNotAllowed(ProjectMetadata projectMetadata, Set indexHistory, String destination) + throws IllegalArgumentException { + for (StreamTypes streamType : StreamTypes.values()) { + String streamName = streamType.getStreamName(); + if (streamTypeIsEnabled(streamType, projectMetadata) + && destination.startsWith(streamName + ".") + && indexHistory.contains(streamName) == false) { + throw new IllegalArgumentException( + "Cannot reroute to substream [" + + destination + + "] as only the stream itself can reroute to substreams. " + + "Please reroute to the stream [" + + streamName + + "] instead." + ); + } + } + } + + public boolean streamTypeIsEnabled(StreamTypes streamType, ProjectMetadata projectMetadata) { + StreamsMetadata metadata = projectMetadata.custom(StreamsMetadata.TYPE, StreamsMetadata.EMPTY); + return switch (streamType) { + case LOGS -> metadata.isLogsEnabled(); + }; + } + +} From a8da9a1325c3fa310c44a2bec6e0ec2298e4c552 Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Wed, 2 Jul 2025 09:32:50 +0100 Subject: [PATCH 02/11] SNAPSHOT --- .../streams/StreamsPermissionsUtilsTests.java | 106 ++++++++++++++++++ 1 file changed, 106 insertions(+) create mode 100644 server/src/test/java/org/elasticsearch/common/streams/StreamsPermissionsUtilsTests.java diff --git a/server/src/test/java/org/elasticsearch/common/streams/StreamsPermissionsUtilsTests.java b/server/src/test/java/org/elasticsearch/common/streams/StreamsPermissionsUtilsTests.java new file mode 100644 index 0000000000000..42bb7063ed9e9 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/common/streams/StreamsPermissionsUtilsTests.java @@ -0,0 +1,106 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.common.streams; + +import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.cluster.metadata.StreamsMetadata; +import org.elasticsearch.test.ESTestCase; +import org.junit.Before; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +import static org.hamcrest.Matchers.sameInstance; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class StreamsPermissionsUtilsTests extends ESTestCase { + + private StreamsPermissionsUtils utils; + private ProjectMetadata projectMetadataMock; + private StreamsMetadata streamsMetadataMock; + + @Before + public void setup() { + utils = StreamsPermissionsUtils.getInstance(); + projectMetadataMock = mock(ProjectMetadata.class); + streamsMetadataMock = mock(StreamsMetadata.class); + when(projectMetadataMock.custom(eq(StreamsMetadata.TYPE), any())).thenReturn(streamsMetadataMock); + } + + public void testGetInstanceReturnsSingleton() { + StreamsPermissionsUtils instance1 = StreamsPermissionsUtils.getInstance(); + StreamsPermissionsUtils instance2 = StreamsPermissionsUtils.getInstance(); + assertThat(instance1, sameInstance(instance2)); + } + + public void testStreamTypeIsEnabledReturnsTrueWhenLogsEnabled() { + when(streamsMetadataMock.isLogsEnabled()).thenReturn(true); + + boolean result = utils.streamTypeIsEnabled(StreamTypes.LOGS, projectMetadataMock); + assertTrue(result); + } + + public void testStreamTypeIsEnabledReturnsFalseWhenLogsDisabled() { + when(streamsMetadataMock.isLogsEnabled()).thenReturn(false); + + boolean result = utils.streamTypeIsEnabled(StreamTypes.LOGS, projectMetadataMock); + assertFalse(result); + } + + public void testIfRetrouteToSubstreamNotAllowedThrows() { + when(streamsMetadataMock.isLogsEnabled()).thenReturn(true); + + Set indexHistory = new HashSet<>(); // empty, so reroute not allowed + String destination = StreamTypes.LOGS.getStreamName() + ".substream"; + + IllegalArgumentException ex = expectThrows( + IllegalArgumentException.class, + () -> utils.throwIfRetrouteToSubstreamNotAllowed(projectMetadataMock, indexHistory, destination) + ); + + assertTrue(ex.getMessage().contains("Cannot reroute to substream")); + assertTrue(ex.getMessage().contains(destination)); + } + + public void testThrowIfRetrouteToSubstreamNotAllowedDoesNotThrowWhenStreamTypeDisabled() { + when(streamsMetadataMock.isLogsEnabled()).thenReturn(false); + + Set indexHistory = Collections.emptySet(); + String destination = StreamTypes.LOGS.getStreamName() + ".substream"; + + // Should not throw since stream type is disabled + utils.throwIfRetrouteToSubstreamNotAllowed(projectMetadataMock, indexHistory, destination); + } + + public void testThrowIfRetrouteToSubstreamNotAllowedDoesNotThrowWhenDestinationNotSubstream() { + when(streamsMetadataMock.isLogsEnabled()).thenReturn(true); + + Set indexHistory = Collections.emptySet(); + String destination = StreamTypes.LOGS.getStreamName(); // not a substream + + // Should not throw since destination is not a substream + utils.throwIfRetrouteToSubstreamNotAllowed(projectMetadataMock, indexHistory, destination); + } + + public void testThrowIfRetrouteToSubstreamNotAllowedDoesNotThrowWhenIndexHistoryContainsStream() { + when(streamsMetadataMock.isLogsEnabled()).thenReturn(true); + + Set indexHistory = new HashSet<>(); + indexHistory.add(StreamTypes.LOGS.getStreamName()); + String destination = StreamTypes.LOGS.getStreamName() + ".substream"; + + // Should not throw since indexHistory contains the stream name + utils.throwIfRetrouteToSubstreamNotAllowed(projectMetadataMock, indexHistory, destination); + } +} From 723281c516d49483b7f11db5f558000f5d1c492d Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Wed, 2 Jul 2025 16:53:44 +0100 Subject: [PATCH 03/11] SNAPSHOT - Bulk Ops --- .../action/bulk/BulkOperation.java | 37 ++++++++++++++++++- .../{StreamTypes.java => StreamType.java} | 4 +- .../streams/StreamsPermissionsUtils.java | 4 +- .../action/bulk/BulkOperationTests.java | 19 ++++++++-- .../streams/StreamsPermissionsUtilsTests.java | 14 +++---- 5 files changed, 61 insertions(+), 17 deletions(-) rename server/src/main/java/org/elasticsearch/common/streams/{StreamTypes.java => StreamType.java} (91%) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java index 9c5f40577b32c..db49a4ec89205 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java @@ -43,6 +43,8 @@ import org.elasticsearch.cluster.routing.IndexRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.Iterators; +import org.elasticsearch.common.streams.StreamType; +import org.elasticsearch.common.streams.StreamsPermissionsUtils; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; @@ -59,6 +61,8 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.EnumSet; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -70,6 +74,7 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.LongSupplier; +import java.util.stream.Collectors; import static org.elasticsearch.action.bulk.TransportBulkAction.LAZY_ROLLOVER_ORIGIN; import static org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.EXCLUDED_DATA_STREAMS_KEY; @@ -104,6 +109,7 @@ final class BulkOperation extends ActionRunnable { private final FailureStoreMetrics failureStoreMetrics; private final DataStreamFailureStoreSettings dataStreamFailureStoreSettings; private final boolean clusterHasFailureStoreFeature; + private final StreamsPermissionsUtils streamsPermissionsUtils; BulkOperation( Task task, @@ -139,7 +145,8 @@ final class BulkOperation extends ActionRunnable { new FailureStoreDocumentConverter(), failureStoreMetrics, dataStreamFailureStoreSettings, - clusterHasFailureStoreFeature + clusterHasFailureStoreFeature, + StreamsPermissionsUtils.getInstance() ); } @@ -160,7 +167,8 @@ final class BulkOperation extends ActionRunnable { FailureStoreDocumentConverter failureStoreDocumentConverter, FailureStoreMetrics failureStoreMetrics, DataStreamFailureStoreSettings dataStreamFailureStoreSettings, - boolean clusterHasFailureStoreFeature + boolean clusterHasFailureStoreFeature, + StreamsPermissionsUtils streamsPermissionsUtils ) { super(listener); this.task = task; @@ -182,6 +190,7 @@ final class BulkOperation extends ActionRunnable { this.failureStoreMetrics = failureStoreMetrics; this.dataStreamFailureStoreSettings = dataStreamFailureStoreSettings; this.clusterHasFailureStoreFeature = clusterHasFailureStoreFeature; + this.streamsPermissionsUtils = streamsPermissionsUtils; } @Override @@ -274,6 +283,30 @@ private long buildTookInMillis(long startTimeNanos) { } private Map> groupBulkRequestsByShards(ClusterState clusterState) { + ProjectMetadata projectMetadata = projectResolver.getProjectMetadata(clusterState); + + Set enabledStreamTypes = Arrays.stream(StreamType.values()) + .filter(t -> streamsPermissionsUtils.streamTypeIsEnabled(t, projectMetadata)) + .collect(Collectors.toCollection(() -> EnumSet.noneOf(StreamType.class))); + + for (StreamType streamType : enabledStreamTypes) { + for (int i = 0; i < bulkRequest.requests.size(); i++) { + DocWriteRequest req = bulkRequest.requests.get(i); + String prefix = streamType.getStreamName() + "."; + if (req != null && req.index().startsWith(prefix)) { + IllegalArgumentException exception = new IllegalArgumentException( + "Bulk requests for streams with type [" + + streamType.getStreamName() + + "] are not supported, use the [" + + streamType.getStreamName() + + "] API instead." + ); + IndexDocFailureStoreStatus failureStoreStatus = processFailure(new BulkItemRequest(i, req), projectMetadata, exception); + addFailureAndDiscardRequest(req, i, req.index(), exception, failureStoreStatus); + } + } + } + return groupRequestsByShards( clusterState, Iterators.enumerate(bulkRequest.requests.iterator(), BulkItemRequest::new), diff --git a/server/src/main/java/org/elasticsearch/common/streams/StreamTypes.java b/server/src/main/java/org/elasticsearch/common/streams/StreamType.java similarity index 91% rename from server/src/main/java/org/elasticsearch/common/streams/StreamTypes.java rename to server/src/main/java/org/elasticsearch/common/streams/StreamType.java index e193742d1586c..afcbdb4120e09 100644 --- a/server/src/main/java/org/elasticsearch/common/streams/StreamTypes.java +++ b/server/src/main/java/org/elasticsearch/common/streams/StreamType.java @@ -9,13 +9,13 @@ package org.elasticsearch.common.streams; -public enum StreamTypes { +public enum StreamType { LOGS("logs"); private final String streamName; - StreamTypes(String streamName) { + StreamType(String streamName) { this.streamName = streamName; } diff --git a/server/src/main/java/org/elasticsearch/common/streams/StreamsPermissionsUtils.java b/server/src/main/java/org/elasticsearch/common/streams/StreamsPermissionsUtils.java index 91ef0c5c2db9d..af1ee0cb741fe 100644 --- a/server/src/main/java/org/elasticsearch/common/streams/StreamsPermissionsUtils.java +++ b/server/src/main/java/org/elasticsearch/common/streams/StreamsPermissionsUtils.java @@ -34,7 +34,7 @@ public static StreamsPermissionsUtils getInstance() { public void throwIfRetrouteToSubstreamNotAllowed(ProjectMetadata projectMetadata, Set indexHistory, String destination) throws IllegalArgumentException { - for (StreamTypes streamType : StreamTypes.values()) { + for (StreamType streamType : StreamType.values()) { String streamName = streamType.getStreamName(); if (streamTypeIsEnabled(streamType, projectMetadata) && destination.startsWith(streamName + ".") @@ -51,7 +51,7 @@ public void throwIfRetrouteToSubstreamNotAllowed(ProjectMetadata projectMetadata } } - public boolean streamTypeIsEnabled(StreamTypes streamType, ProjectMetadata projectMetadata) { + public boolean streamTypeIsEnabled(StreamType streamType, ProjectMetadata projectMetadata) { StreamsMetadata metadata = projectMetadata.custom(StreamsMetadata.TYPE, StreamsMetadata.EMPTY); return switch (streamType) { case LOGS -> metadata.isLogsEnabled(); diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkOperationTests.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkOperationTests.java index 63e05c21d3ca3..91540cdac6ecb 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkOperationTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkOperationTests.java @@ -42,10 +42,12 @@ import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.metadata.Template; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.project.TestProjectResolvers; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.streams.StreamsPermissionsUtils; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.ThreadContext; @@ -215,10 +217,18 @@ public class BulkOperationTests extends ESTestCase { private TestThreadPool threadPool; + private StreamsPermissionsUtils streamsPermissionsUtilsMock; + private ProjectResolver projectResolverMock; + private IndexNameExpressionResolver indexNameExpressionResolverMock; + @Before - public void setupThreadpool() { + public void setupTest() { threadPool = new TestThreadPool(getClass().getName()); threadPool.getThreadContext().putHeader(Task.X_ELASTIC_PROJECT_ID_HTTP_HEADER, projectId.id()); + streamsPermissionsUtilsMock = mock(StreamsPermissionsUtils.class); + when(streamsPermissionsUtilsMock.streamTypeIsEnabled(any(), any())).thenReturn(false); + projectResolverMock = mock(ProjectResolver.class); + indexNameExpressionResolverMock = mock(IndexNameExpressionResolver.class); } @After @@ -688,8 +698,8 @@ public void testRetryableBlockAcceptsFailureStoreDocument() throws Exception { } /** - * A bulk operation to a data stream with a failure store enabled may still partially fail if the cluster is experiencing a - * non-retryable block when the redirected documents would be sent to the shard-level action. + * A bulk operation to a data stream with a failure store enabled may still partially fail if the redirected documents experience + * a shard-level failure while writing to the failure store indices. */ public void testBlockedClusterRejectsFailureStoreDocument() throws Exception { // Requests that go to two separate shards @@ -1239,7 +1249,8 @@ private BulkOperation newBulkOperation( failureStoreDocumentConverter, FailureStoreMetrics.NOOP, dataStreamFailureStoreSettings, - failureStoreNodeFeatureEnabled + failureStoreNodeFeatureEnabled, + streamsPermissionsUtilsMock ); } diff --git a/server/src/test/java/org/elasticsearch/common/streams/StreamsPermissionsUtilsTests.java b/server/src/test/java/org/elasticsearch/common/streams/StreamsPermissionsUtilsTests.java index 42bb7063ed9e9..10d1b3cf016de 100644 --- a/server/src/test/java/org/elasticsearch/common/streams/StreamsPermissionsUtilsTests.java +++ b/server/src/test/java/org/elasticsearch/common/streams/StreamsPermissionsUtilsTests.java @@ -47,14 +47,14 @@ public void testGetInstanceReturnsSingleton() { public void testStreamTypeIsEnabledReturnsTrueWhenLogsEnabled() { when(streamsMetadataMock.isLogsEnabled()).thenReturn(true); - boolean result = utils.streamTypeIsEnabled(StreamTypes.LOGS, projectMetadataMock); + boolean result = utils.streamTypeIsEnabled(StreamType.LOGS, projectMetadataMock); assertTrue(result); } public void testStreamTypeIsEnabledReturnsFalseWhenLogsDisabled() { when(streamsMetadataMock.isLogsEnabled()).thenReturn(false); - boolean result = utils.streamTypeIsEnabled(StreamTypes.LOGS, projectMetadataMock); + boolean result = utils.streamTypeIsEnabled(StreamType.LOGS, projectMetadataMock); assertFalse(result); } @@ -62,7 +62,7 @@ public void testIfRetrouteToSubstreamNotAllowedThrows() { when(streamsMetadataMock.isLogsEnabled()).thenReturn(true); Set indexHistory = new HashSet<>(); // empty, so reroute not allowed - String destination = StreamTypes.LOGS.getStreamName() + ".substream"; + String destination = StreamType.LOGS.getStreamName() + ".substream"; IllegalArgumentException ex = expectThrows( IllegalArgumentException.class, @@ -77,7 +77,7 @@ public void testThrowIfRetrouteToSubstreamNotAllowedDoesNotThrowWhenStreamTypeDi when(streamsMetadataMock.isLogsEnabled()).thenReturn(false); Set indexHistory = Collections.emptySet(); - String destination = StreamTypes.LOGS.getStreamName() + ".substream"; + String destination = StreamType.LOGS.getStreamName() + ".substream"; // Should not throw since stream type is disabled utils.throwIfRetrouteToSubstreamNotAllowed(projectMetadataMock, indexHistory, destination); @@ -87,7 +87,7 @@ public void testThrowIfRetrouteToSubstreamNotAllowedDoesNotThrowWhenDestinationN when(streamsMetadataMock.isLogsEnabled()).thenReturn(true); Set indexHistory = Collections.emptySet(); - String destination = StreamTypes.LOGS.getStreamName(); // not a substream + String destination = StreamType.LOGS.getStreamName(); // not a substream // Should not throw since destination is not a substream utils.throwIfRetrouteToSubstreamNotAllowed(projectMetadataMock, indexHistory, destination); @@ -97,8 +97,8 @@ public void testThrowIfRetrouteToSubstreamNotAllowedDoesNotThrowWhenIndexHistory when(streamsMetadataMock.isLogsEnabled()).thenReturn(true); Set indexHistory = new HashSet<>(); - indexHistory.add(StreamTypes.LOGS.getStreamName()); - String destination = StreamTypes.LOGS.getStreamName() + ".substream"; + indexHistory.add(StreamType.LOGS.getStreamName()); + String destination = StreamType.LOGS.getStreamName() + ".substream"; // Should not throw since indexHistory contains the stream name utils.throwIfRetrouteToSubstreamNotAllowed(projectMetadataMock, indexHistory, destination); From bf09df29abe0260f188cbbfc81a8e4cdf557b119 Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Thu, 3 Jul 2025 13:42:32 +0100 Subject: [PATCH 04/11] SNAPSHOT - Bulk Ops tests --- .../action/bulk/BulkOperation.java | 8 +- .../action/bulk/BulkOperationTests.java | 156 +++++++++++++++--- 2 files changed, 137 insertions(+), 27 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java index db49a4ec89205..b3a540edce922 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java @@ -295,11 +295,11 @@ private Map> groupBulkRequestsByShards(ClusterSta String prefix = streamType.getStreamName() + "."; if (req != null && req.index().startsWith(prefix)) { IllegalArgumentException exception = new IllegalArgumentException( - "Bulk requests for streams with type [" + "Writes to child stream [" + + req.index() + + "] are not allowed, use the parent stream instead: [" + streamType.getStreamName() - + "] are not supported, use the [" - + streamType.getStreamName() - + "] API instead." + + "]" ); IndexDocFailureStoreStatus failureStoreStatus = processFailure(new BulkItemRequest(i, req), projectMetadata, exception); addFailureAndDiscardRequest(req, i, req.index(), exception, failureStoreStatus); diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkOperationTests.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkOperationTests.java index 91540cdac6ecb..bf35d2b1f529a 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkOperationTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkOperationTests.java @@ -47,6 +47,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.streams.StreamType; import org.elasticsearch.common.streams.StreamsPermissionsUtils; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.EsExecutors; @@ -66,6 +67,7 @@ import java.io.IOException; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -75,12 +77,14 @@ import java.util.function.BiConsumer; import java.util.function.Supplier; +import static org.hamcrest.CoreMatchers.containsStringIgnoringCase; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.notNullValue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -93,10 +97,14 @@ public class BulkOperationTests extends ESTestCase { private final long millis = randomMillisUpToYear9999(); private final String indexName = "my_index"; private final String dataStreamName = "my_data_stream"; + private final String fsDataStreamName = "my_failure_store_data_stream"; private final String fsRolloverDataStreamName = "my_failure_store_to_be_rolled_over_data_stream"; private final String fsBySettingsDataStreamName = "my_failure_store_enabled_by_setting_data_stream"; + private final String logsStreamDsName = StreamType.LOGS.getStreamName(); + private final String logsChildStreamDsName = StreamType.LOGS.getStreamName() + ".child"; + private final IndexMetadata indexMetadata = IndexMetadata.builder(indexName) .settings( Settings.builder() @@ -134,6 +142,20 @@ public class BulkOperationTests extends ESTestCase { .numberOfShards(1) .build(); + private final IndexMetadata streamBackingIndex = DataStreamTestHelper.createFailureStore(logsStreamDsName, 1, millis) + .numberOfShards(1) + .build(); + private final IndexMetadata streamFailureStore = DataStreamTestHelper.createFailureStore(logsStreamDsName, 1, millis) + .numberOfShards(1) + .build(); + + private final IndexMetadata streamChildBackingIndex = DataStreamTestHelper.createFailureStore(logsChildStreamDsName, 1, millis) + .numberOfShards(1) + .build(); + private final IndexMetadata streamChildFailureStore = DataStreamTestHelper.createFailureStore(logsChildStreamDsName, 1, millis) + .numberOfShards(1) + .build(); + private final DataStream dataStream1 = DataStreamTestHelper.newInstance( dataStreamName, List.of(ds1BackingIndex1.getIndex(), ds1BackingIndex2.getIndex()) @@ -156,6 +178,18 @@ public class BulkOperationTests extends ESTestCase { .setFailureIndices(DataStream.DataStreamIndices.failureIndicesBuilder(List.of(ds4FailureStore1.getIndex())).build()) .build(); + private final DataStream logsDataStream = DataStream.builder(logsStreamDsName, List.of(streamChildBackingIndex.getIndex())) + .setGeneration(1) + .setDataStreamOptions(DataStreamOptions.EMPTY) + .setFailureIndices(DataStream.DataStreamIndices.failureIndicesBuilder(List.of(streamFailureStore.getIndex())).build()) + .build(); + + private final DataStream logsChildDataStream = DataStream.builder(logsChildStreamDsName, List.of(streamChildBackingIndex.getIndex())) + .setGeneration(1) + .setDataStreamOptions(DataStreamOptions.EMPTY) + .setFailureIndices(DataStream.DataStreamIndices.failureIndicesBuilder(List.of(streamChildFailureStore.getIndex())).build()) + .build(); + private final ProjectId projectId = randomProjectIdOrDefault(); private final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) .putProjectMetadata( @@ -176,28 +210,23 @@ public class BulkOperationTests extends ESTestCase { .build() ) ) - .indices( - Map.of( - indexName, - indexMetadata, - ds1BackingIndex1.getIndex().getName(), - ds1BackingIndex1, - ds1BackingIndex2.getIndex().getName(), - ds1BackingIndex2, - ds2BackingIndex1.getIndex().getName(), - ds2BackingIndex1, - ds2FailureStore1.getIndex().getName(), - ds2FailureStore1, - ds3BackingIndex1.getIndex().getName(), - ds3BackingIndex1, - ds3FailureStore1.getIndex().getName(), - ds3FailureStore1, - ds4BackingIndex1.getIndex().getName(), - ds4BackingIndex1, - ds4FailureStore1.getIndex().getName(), - ds4FailureStore1 - ) - ) + .indices(new HashMap<>() { + { + put(indexName, indexMetadata); + put(ds1BackingIndex1.getIndex().getName(), ds1BackingIndex1); + put(ds1BackingIndex2.getIndex().getName(), ds1BackingIndex2); + put(ds2BackingIndex1.getIndex().getName(), ds2BackingIndex1); + put(ds2FailureStore1.getIndex().getName(), ds2FailureStore1); + put(ds3BackingIndex1.getIndex().getName(), ds3BackingIndex1); + put(ds3FailureStore1.getIndex().getName(), ds3FailureStore1); + put(ds4BackingIndex1.getIndex().getName(), ds4BackingIndex1); + put(ds4FailureStore1.getIndex().getName(), ds4FailureStore1); + put(streamBackingIndex.getIndex().getName(), streamBackingIndex); + put(streamFailureStore.getIndex().getName(), streamFailureStore); + put(streamChildBackingIndex.getIndex().getName(), streamChildBackingIndex); + put(streamChildFailureStore.getIndex().getName(), streamChildFailureStore); + } + }) .dataStreams( Map.of( dataStreamName, @@ -207,7 +236,11 @@ public class BulkOperationTests extends ESTestCase { fsRolloverDataStreamName, dataStream3, fsBySettingsDataStreamName, - dataStream4 + dataStream4, + logsStreamDsName, + logsDataStream, + logsChildStreamDsName, + logsChildDataStream ), Map.of() ) @@ -956,6 +989,83 @@ public void testFailureWhileRollingOverFailureStore() throws Exception { assertThat(failedItem.getFailureStoreStatus(), equalTo(IndexDocFailureStoreStatus.FAILED)); } + public void testIndexWriteSucceededWhenStreamsEnabled() throws Exception { + // Requests that go to two separate shards + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.add(new IndexRequest(indexName).id("1").source(Map.of("key", "val"))); + + NodeClient client = getNodeClient(acceptAllShardWrites()); + + when(streamsPermissionsUtilsMock.streamTypeIsEnabled(eq(StreamType.LOGS), any())).thenReturn(true); + + BulkResponse bulkItemResponses = safeAwait(l -> newBulkOperation(client, bulkRequest, l).run()); + assertThat(bulkItemResponses.hasFailures(), is(false)); + } + + public void testLogsDatastreamWriteSucceededWhenStreamsEnabled() throws Exception { + // Requests that go to two separate shards + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.add(new IndexRequest(logsStreamDsName).id("1").source(Map.of("key", "val")).opType(DocWriteRequest.OpType.CREATE)); + bulkRequest.add(new IndexRequest(logsStreamDsName).id("3").source(Map.of("key", "val")).opType(DocWriteRequest.OpType.CREATE)); + + NodeClient client = getNodeClient(acceptAllShardWrites()); + + when(streamsPermissionsUtilsMock.streamTypeIsEnabled(eq(StreamType.LOGS), any())).thenReturn(true); + + BulkResponse bulkItemResponses = safeAwait(l -> newBulkOperation(client, bulkRequest, l).run()); + assertThat(bulkItemResponses.hasFailures(), is(false)); + } + + public void testLogsDatastreamWriteSucceededWhenStreamsDisabled() throws Exception { + // Requests that go to two separate shards + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.add(new IndexRequest(logsStreamDsName).id("1").source(Map.of("key", "val")).opType(DocWriteRequest.OpType.CREATE)); + bulkRequest.add(new IndexRequest(logsStreamDsName).id("3").source(Map.of("key", "val")).opType(DocWriteRequest.OpType.CREATE)); + + NodeClient client = getNodeClient(acceptAllShardWrites()); + + when(streamsPermissionsUtilsMock.streamTypeIsEnabled(eq(StreamType.LOGS), any())).thenReturn(false); + + BulkResponse bulkItemResponses = safeAwait(l -> newBulkOperation(client, bulkRequest, l).run()); + assertThat(bulkItemResponses.hasFailures(), is(false)); + } + + public void testLogsChildDatastreamWriteRejectedWhenStreamsEnabled() throws Exception { + // Requests that go to two separate shards + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.add(new IndexRequest(logsChildStreamDsName).id("1").source(Map.of("key", "val")).opType(DocWriteRequest.OpType.CREATE)); + bulkRequest.add(new IndexRequest(logsChildStreamDsName).id("3").source(Map.of("key", "val")).opType(DocWriteRequest.OpType.CREATE)); + + NodeClient client = getNodeClient(acceptAllShardWrites()); + + when(streamsPermissionsUtilsMock.streamTypeIsEnabled(eq(StreamType.LOGS), any())).thenReturn(true); + + BulkResponse bulkItemResponses = safeAwait(l -> newBulkOperation(client, bulkRequest, l).run()); + assertThat(bulkItemResponses.hasFailures(), is(true)); + + for (int i = 0; i < bulkItemResponses.getItems().length; i++) { + assertThat(bulkItemResponses.getItems()[i].getFailure().getCause(), instanceOf(IllegalArgumentException.class)); + assertThat( + bulkItemResponses.getItems()[i].getFailure().getCause().getMessage(), + is(containsStringIgnoringCase("Writes to child stream [" + logsChildStreamDsName + "] are not allowed")) + ); + } + } + + public void testLogsChildDatastreamWriteSucceededWhenStreamsDisabled() throws Exception { + // Requests that go to two separate shards + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.add(new IndexRequest(logsChildStreamDsName).id("1").source(Map.of("key", "val")).opType(DocWriteRequest.OpType.CREATE)); + bulkRequest.add(new IndexRequest(logsChildStreamDsName).id("3").source(Map.of("key", "val")).opType(DocWriteRequest.OpType.CREATE)); + + NodeClient client = getNodeClient(acceptAllShardWrites()); + + when(streamsPermissionsUtilsMock.streamTypeIsEnabled(eq(StreamType.LOGS), any())).thenReturn(false); + + BulkResponse bulkItemResponses = safeAwait(l -> newBulkOperation(client, bulkRequest, l).run()); + assertThat(bulkItemResponses.hasFailures(), is(false)); + } + /** * Throws an assertion error with the given message if the client operation executes */ From 40649ccdb860ab6f088df9c5d328a19c5a7c12ab Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Mon, 7 Jul 2025 15:14:41 +0100 Subject: [PATCH 05/11] SNAPSHOT - YAML tests --- modules/streams/build.gradle | 3 +- .../streams/StreamsYamlTestSuiteIT.java | 6 +- .../logs/20_substream_restrictions.yml | 72 +++++++++++++++++++ 3 files changed, 79 insertions(+), 2 deletions(-) create mode 100644 modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/20_substream_restrictions.yml diff --git a/modules/streams/build.gradle b/modules/streams/build.gradle index fd56a627026b6..506a51f36a245 100644 --- a/modules/streams/build.gradle +++ b/modules/streams/build.gradle @@ -20,7 +20,7 @@ esplugin { restResources { restApi { - include '_common', 'streams' + include '_common', 'streams', "bulk", "index", "ingest", "indices" } } @@ -38,4 +38,5 @@ artifacts { dependencies { testImplementation project(path: ':test:test-clusters') + clusterModules project(':modules:ingest-common') } diff --git a/modules/streams/src/yamlRestTest/java/org/elasticsearch/streams/StreamsYamlTestSuiteIT.java b/modules/streams/src/yamlRestTest/java/org/elasticsearch/streams/StreamsYamlTestSuiteIT.java index 9d5a1033faf57..d5b306a6cf9b4 100644 --- a/modules/streams/src/yamlRestTest/java/org/elasticsearch/streams/StreamsYamlTestSuiteIT.java +++ b/modules/streams/src/yamlRestTest/java/org/elasticsearch/streams/StreamsYamlTestSuiteIT.java @@ -29,7 +29,11 @@ public static Iterable parameters() throws Exception { } @ClassRule - public static ElasticsearchCluster cluster = ElasticsearchCluster.local().module("streams").feature(FeatureFlag.LOGS_STREAM).build(); + public static ElasticsearchCluster cluster = ElasticsearchCluster.local() + .module("streams") + .module("ingest-common") + .feature(FeatureFlag.LOGS_STREAM) + .build(); @Override protected String getTestRestCluster() { diff --git a/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/20_substream_restrictions.yml b/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/20_substream_restrictions.yml new file mode 100644 index 0000000000000..001fdb118f53b --- /dev/null +++ b/modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/20_substream_restrictions.yml @@ -0,0 +1,72 @@ +--- +"Check User Can't Write To Substream Directly": + - do: + streams.logs_enable: { } + - is_true: acknowledged + + - do: + streams.status: { } + - is_true: logs.enabled + + - do: + bulk: + body: | + { "index": { "_index": "logs.foo" } } + { "foo": "bar" } + - match: { errors: true } + - match: { items.0.index.status: 400 } + - match: { items.0.index.error.type: "illegal_argument_exception" } + - match: { items.0.index.error.reason: "Writes to child stream [logs.foo] are not allowed, use the parent stream instead: [logs]" } + +--- +"Check User Can't Write To Substream Directly With Single Doc": + - do: + streams.logs_enable: { } + - is_true: acknowledged + + - do: + streams.status: { } + - is_true: logs.enabled + + - do: + catch: bad_request + index: + index: logs.foo + id: "1" + body: + foo: bar + - match: { error.type: "illegal_argument_exception" } + - match: { error.reason: "Writes to child stream [logs.foo] are not allowed, use the parent stream instead: [logs]" } + +--- +"Check Bulk Index With Reroute Processor To Substream Is Rejected": + - do: + streams.logs_enable: { } + - is_true: acknowledged + + - do: + streams.status: { } + - is_true: logs.enabled + + - do: + ingest.put_pipeline: + id: "reroute-to-logs-foo" + body: + processors: + - reroute: + destination: "logs.foo" + - do: + indices.create: + index: "bad-index" + body: + settings: + index.default_pipeline: "reroute-to-logs-foo" + - do: + bulk: + body: | + { "index": { "_index": "bad-index" } } + { "foo": "bar" } + - match: { errors: true } + - match: { items.0.index.status: 400 } + - match: { items.0.index.error.type: "illegal_argument_exception" } + - match: { items.0.index.error.reason: "Cannot reroute to substream [logs.foo] as only the stream itself can reroute to substreams. Please reroute to the stream [logs] instead." } From bce3a1bcaef38657fe1c0cedd777effab42da2bb Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Tue, 8 Jul 2025 09:22:29 +0100 Subject: [PATCH 06/11] Update server/src/main/java/org/elasticsearch/common/streams/StreamsPermissionsUtils.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../elasticsearch/common/streams/StreamsPermissionsUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/common/streams/StreamsPermissionsUtils.java b/server/src/main/java/org/elasticsearch/common/streams/StreamsPermissionsUtils.java index af1ee0cb741fe..26d266d537243 100644 --- a/server/src/main/java/org/elasticsearch/common/streams/StreamsPermissionsUtils.java +++ b/server/src/main/java/org/elasticsearch/common/streams/StreamsPermissionsUtils.java @@ -32,7 +32,7 @@ public static StreamsPermissionsUtils getInstance() { return INSTANCE; } - public void throwIfRetrouteToSubstreamNotAllowed(ProjectMetadata projectMetadata, Set indexHistory, String destination) + public void throwIfRerouteToSubstreamNotAllowed(ProjectMetadata projectMetadata, Set indexHistory, String destination) throws IllegalArgumentException { for (StreamType streamType : StreamType.values()) { String streamName = streamType.getStreamName(); From be4515ac22dae10cbb40c413cdaedd402b467fdf Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Tue, 8 Jul 2025 09:22:59 +0100 Subject: [PATCH 07/11] Update server/src/test/java/org/elasticsearch/action/bulk/BulkOperationTests.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../java/org/elasticsearch/action/bulk/BulkOperationTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkOperationTests.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkOperationTests.java index bf35d2b1f529a..a3ba535c924b1 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkOperationTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkOperationTests.java @@ -251,7 +251,7 @@ public class BulkOperationTests extends ESTestCase { private TestThreadPool threadPool; private StreamsPermissionsUtils streamsPermissionsUtilsMock; - private ProjectResolver projectResolverMock; +// Removed the unused projectResolverMock field. private IndexNameExpressionResolver indexNameExpressionResolverMock; @Before From 35ad613246eb4e7cb817373e22ece440e77de61e Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Tue, 8 Jul 2025 08:30:41 +0000 Subject: [PATCH 08/11] [CI] Auto commit changes from spotless --- .../java/org/elasticsearch/action/bulk/BulkOperationTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkOperationTests.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkOperationTests.java index a3ba535c924b1..e5a248b2be109 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkOperationTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkOperationTests.java @@ -251,7 +251,7 @@ public class BulkOperationTests extends ESTestCase { private TestThreadPool threadPool; private StreamsPermissionsUtils streamsPermissionsUtilsMock; -// Removed the unused projectResolverMock field. + // Removed the unused projectResolverMock field. private IndexNameExpressionResolver indexNameExpressionResolverMock; @Before From 0c8cbcdbff95b74745712b45638d0e2ad2094c40 Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Tue, 8 Jul 2025 09:47:21 +0100 Subject: [PATCH 09/11] SNAPSHOT - Revert copilot suggestions --- .../ingest/common/RerouteProcessor.java | 2 +- .../ingest/common/RerouteProcessorTests.java | 2 +- .../action/bulk/BulkOperationTests.java | 2 +- .../streams/StreamsPermissionsUtilsTests.java | 14 +++++++------- 4 files changed, 10 insertions(+), 10 deletions(-) diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RerouteProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RerouteProcessor.java index 1fb7205b0856c..29bc72f70af3a 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RerouteProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RerouteProcessor.java @@ -90,7 +90,7 @@ public final class RerouteProcessor extends AbstractProcessor { public IngestDocument execute(IngestDocument ingestDocument) throws Exception { if (destination != null) { ProjectMetadata projectMetadata = clusterService.state().projectState(projectId).metadata(); - streamsPermissionsUtils.throwIfRetrouteToSubstreamNotAllowed(projectMetadata, ingestDocument.getIndexHistory(), destination); + streamsPermissionsUtils.throwIfRerouteToSubstreamNotAllowed(projectMetadata, ingestDocument.getIndexHistory(), destination); ingestDocument.reroute(destination); return ingestDocument; } diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/RerouteProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/RerouteProcessorTests.java index 411b495b975ab..728facf9a932a 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/RerouteProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/RerouteProcessorTests.java @@ -45,7 +45,7 @@ public void setUpStreamsPermissionsUtils() { .putProjectMetadata(ProjectMetadata.builder(ProjectId.DEFAULT).build()) .build(); when(clusterServiceMock.state()).thenReturn(clusterState); - doNothing().when(streamsPermissionsUtilsMock).throwIfRetrouteToSubstreamNotAllowed(any(), any(), anyString()); + doNothing().when(streamsPermissionsUtilsMock).throwIfRerouteToSubstreamNotAllowed(any(), any(), anyString()); } public void testDefaults() throws Exception { diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkOperationTests.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkOperationTests.java index e5a248b2be109..bf35d2b1f529a 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/BulkOperationTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkOperationTests.java @@ -251,7 +251,7 @@ public class BulkOperationTests extends ESTestCase { private TestThreadPool threadPool; private StreamsPermissionsUtils streamsPermissionsUtilsMock; - // Removed the unused projectResolverMock field. + private ProjectResolver projectResolverMock; private IndexNameExpressionResolver indexNameExpressionResolverMock; @Before diff --git a/server/src/test/java/org/elasticsearch/common/streams/StreamsPermissionsUtilsTests.java b/server/src/test/java/org/elasticsearch/common/streams/StreamsPermissionsUtilsTests.java index 10d1b3cf016de..42f8caf1b37b7 100644 --- a/server/src/test/java/org/elasticsearch/common/streams/StreamsPermissionsUtilsTests.java +++ b/server/src/test/java/org/elasticsearch/common/streams/StreamsPermissionsUtilsTests.java @@ -66,34 +66,34 @@ public void testIfRetrouteToSubstreamNotAllowedThrows() { IllegalArgumentException ex = expectThrows( IllegalArgumentException.class, - () -> utils.throwIfRetrouteToSubstreamNotAllowed(projectMetadataMock, indexHistory, destination) + () -> utils.throwIfRerouteToSubstreamNotAllowed(projectMetadataMock, indexHistory, destination) ); assertTrue(ex.getMessage().contains("Cannot reroute to substream")); assertTrue(ex.getMessage().contains(destination)); } - public void testThrowIfRetrouteToSubstreamNotAllowedDoesNotThrowWhenStreamTypeDisabled() { + public void testthrowIfRerouteToSubstreamNotAllowedDoesNotThrowWhenStreamTypeDisabled() { when(streamsMetadataMock.isLogsEnabled()).thenReturn(false); Set indexHistory = Collections.emptySet(); String destination = StreamType.LOGS.getStreamName() + ".substream"; // Should not throw since stream type is disabled - utils.throwIfRetrouteToSubstreamNotAllowed(projectMetadataMock, indexHistory, destination); + utils.throwIfRerouteToSubstreamNotAllowed(projectMetadataMock, indexHistory, destination); } - public void testThrowIfRetrouteToSubstreamNotAllowedDoesNotThrowWhenDestinationNotSubstream() { + public void testthrowIfRerouteToSubstreamNotAllowedDoesNotThrowWhenDestinationNotSubstream() { when(streamsMetadataMock.isLogsEnabled()).thenReturn(true); Set indexHistory = Collections.emptySet(); String destination = StreamType.LOGS.getStreamName(); // not a substream // Should not throw since destination is not a substream - utils.throwIfRetrouteToSubstreamNotAllowed(projectMetadataMock, indexHistory, destination); + utils.throwIfRerouteToSubstreamNotAllowed(projectMetadataMock, indexHistory, destination); } - public void testThrowIfRetrouteToSubstreamNotAllowedDoesNotThrowWhenIndexHistoryContainsStream() { + public void testthrowIfRerouteToSubstreamNotAllowedDoesNotThrowWhenIndexHistoryContainsStream() { when(streamsMetadataMock.isLogsEnabled()).thenReturn(true); Set indexHistory = new HashSet<>(); @@ -101,6 +101,6 @@ public void testThrowIfRetrouteToSubstreamNotAllowedDoesNotThrowWhenIndexHistory String destination = StreamType.LOGS.getStreamName() + ".substream"; // Should not throw since indexHistory contains the stream name - utils.throwIfRetrouteToSubstreamNotAllowed(projectMetadataMock, indexHistory, destination); + utils.throwIfRerouteToSubstreamNotAllowed(projectMetadataMock, indexHistory, destination); } } From cc64df89564602e4ef5ba2f2f32f59799afca443 Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Wed, 9 Jul 2025 15:28:32 +0100 Subject: [PATCH 10/11] SNAPSHOT - Refactor for ingest service to handle reroute path checks --- .../runConfigurations/Debug_Elasticsearch.xml | 3 +- .../ingest/common/IngestCommonPlugin.java | 2 +- .../ingest/common/RerouteProcessor.java | 35 +------------ .../common/RerouteProcessorFactoryTests.java | 6 +-- .../ingest/common/RerouteProcessorTests.java | 41 +--------------- .../elasticsearch/ingest/IngestService.java | 49 ++++++++++++++++--- .../elasticsearch/node/NodeConstruction.java | 4 +- .../ingest/IngestServiceTests.java | 21 +++++--- .../ingest/SimulateIngestServiceTests.java | 9 +++- .../snapshots/SnapshotResiliencyTests.java | 10 +++- ...sportGetTrainedModelsStatsActionTests.java | 9 +++- 11 files changed, 92 insertions(+), 97 deletions(-) diff --git a/.idea/runConfigurations/Debug_Elasticsearch.xml b/.idea/runConfigurations/Debug_Elasticsearch.xml index 051b746a1a497..9c062d3cfd790 100644 --- a/.idea/runConfigurations/Debug_Elasticsearch.xml +++ b/.idea/runConfigurations/Debug_Elasticsearch.xml @@ -1,5 +1,6 @@ + - + \ No newline at end of file diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java index 47fcf4f89bfae..6e517d644cadb 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/IngestCommonPlugin.java @@ -65,7 +65,7 @@ public Map getProcessors(Processor.Parameters paramet entry(RegisteredDomainProcessor.TYPE, new RegisteredDomainProcessor.Factory()), entry(RemoveProcessor.TYPE, new RemoveProcessor.Factory(parameters.scriptService)), entry(RenameProcessor.TYPE, new RenameProcessor.Factory(parameters.scriptService)), - entry(RerouteProcessor.TYPE, new RerouteProcessor.Factory(parameters.ingestService)), + entry(RerouteProcessor.TYPE, new RerouteProcessor.Factory()), entry(ScriptProcessor.TYPE, new ScriptProcessor.Factory(parameters.scriptService)), entry(SetProcessor.TYPE, new SetProcessor.Factory(parameters.scriptService)), entry(SortProcessor.TYPE, new SortProcessor.Factory()), diff --git a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RerouteProcessor.java b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RerouteProcessor.java index 29bc72f70af3a..6580a5af3d005 100644 --- a/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RerouteProcessor.java +++ b/modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RerouteProcessor.java @@ -10,14 +10,10 @@ package org.elasticsearch.ingest.common; import org.elasticsearch.cluster.metadata.ProjectId; -import org.elasticsearch.cluster.metadata.ProjectMetadata; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.streams.StreamsPermissionsUtils; import org.elasticsearch.core.Nullable; import org.elasticsearch.ingest.AbstractProcessor; import org.elasticsearch.ingest.ConfigurationUtils; import org.elasticsearch.ingest.IngestDocument; -import org.elasticsearch.ingest.IngestService; import org.elasticsearch.ingest.Processor; import java.util.List; @@ -49,9 +45,6 @@ public final class RerouteProcessor extends AbstractProcessor { private final List dataset; private final List namespace; private final String destination; - private final ClusterService clusterService; - private final ProjectId projectId; - private final StreamsPermissionsUtils streamsPermissionsUtils; RerouteProcessor( String tag, @@ -59,10 +52,7 @@ public final class RerouteProcessor extends AbstractProcessor { List type, List dataset, List namespace, - String destination, - ClusterService clusterService, - ProjectId projectId, - StreamsPermissionsUtils streamsPermissionsUtils + String destination ) { super(tag, description); if (type.isEmpty()) { @@ -81,16 +71,11 @@ public final class RerouteProcessor extends AbstractProcessor { this.namespace = namespace; } this.destination = destination; - this.clusterService = clusterService; - this.projectId = projectId; - this.streamsPermissionsUtils = streamsPermissionsUtils; } @Override public IngestDocument execute(IngestDocument ingestDocument) throws Exception { if (destination != null) { - ProjectMetadata projectMetadata = clusterService.state().projectState(projectId).metadata(); - streamsPermissionsUtils.throwIfRerouteToSubstreamNotAllowed(projectMetadata, ingestDocument.getIndexHistory(), destination); ingestDocument.reroute(destination); return ingestDocument; } @@ -186,12 +171,6 @@ String getDestination() { public static final class Factory implements Processor.Factory { - private final IngestService ingestService; - - public Factory(IngestService ingestService) { - this.ingestService = ingestService; - } - @Override public RerouteProcessor create( Map processorFactories, @@ -233,17 +212,7 @@ public RerouteProcessor create( throw newConfigurationException(TYPE, tag, "destination", "can only be set if type, dataset, and namespace are not set"); } - return new RerouteProcessor( - tag, - description, - type, - dataset, - namespace, - destination, - ingestService.getClusterService(), - projectId, - StreamsPermissionsUtils.getInstance() - ); + return new RerouteProcessor(tag, description, type, dataset, namespace, destination); } } diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/RerouteProcessorFactoryTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/RerouteProcessorFactoryTests.java index 57a7d0e09853c..8cd87d78c57c5 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/RerouteProcessorFactoryTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/RerouteProcessorFactoryTests.java @@ -11,7 +11,6 @@ import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.cluster.metadata.ProjectId; -import org.elasticsearch.ingest.IngestService; import org.elasticsearch.ingest.common.RerouteProcessor.DataStreamValueSource; import org.elasticsearch.test.ESTestCase; @@ -20,8 +19,6 @@ import java.util.Map; import static org.hamcrest.Matchers.equalTo; -import static org.mockito.Answers.RETURNS_SMART_NULLS; -import static org.mockito.Mockito.mock; public class RerouteProcessorFactoryTests extends ESTestCase { @@ -78,7 +75,6 @@ private static RerouteProcessor create(String dataset, String namespace) throws } private static RerouteProcessor create(Map config) throws Exception { - IngestService ingestService = mock(IngestService.class, RETURNS_SMART_NULLS); - return new RerouteProcessor.Factory(ingestService).create(null, null, null, new HashMap<>(config), ProjectId.DEFAULT); + return new RerouteProcessor.Factory().create(null, null, null, new HashMap<>(config), ProjectId.DEFAULT); } } diff --git a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/RerouteProcessorTests.java b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/RerouteProcessorTests.java index 728facf9a932a..55eebbc160486 100644 --- a/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/RerouteProcessorTests.java +++ b/modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/RerouteProcessorTests.java @@ -9,11 +9,6 @@ package org.elasticsearch.ingest.common; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.metadata.ProjectId; -import org.elasticsearch.cluster.metadata.ProjectMetadata; -import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.streams.StreamsPermissionsUtils; import org.elasticsearch.ingest.CompoundProcessor; import org.elasticsearch.ingest.IngestDocument; import org.elasticsearch.ingest.Processor; @@ -21,33 +16,14 @@ import org.elasticsearch.ingest.TestProcessor; import org.elasticsearch.ingest.WrappingProcessor; import org.elasticsearch.test.ESTestCase; -import org.junit.Before; import java.util.List; import java.util.Map; import static org.hamcrest.Matchers.equalTo; -import static org.mockito.Mockito.RETURNS_DEEP_STUBS; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.anyString; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; public class RerouteProcessorTests extends ESTestCase { - private final StreamsPermissionsUtils streamsPermissionsUtilsMock = mock(StreamsPermissionsUtils.class); - private final ClusterService clusterServiceMock = mock(ClusterService.class, RETURNS_DEEP_STUBS); - - @Before - public void setUpStreamsPermissionsUtils() { - ClusterState clusterState = ClusterState.builder(ClusterState.EMPTY_STATE) - .putProjectMetadata(ProjectMetadata.builder(ProjectId.DEFAULT).build()) - .build(); - when(clusterServiceMock.state()).thenReturn(clusterState); - doNothing().when(streamsPermissionsUtilsMock).throwIfRerouteToSubstreamNotAllowed(any(), any(), anyString()); - } - public void testDefaults() throws Exception { IngestDocument ingestDocument = createIngestDocument("logs-generic-default"); @@ -315,25 +291,12 @@ private RerouteProcessor createRerouteProcessor(List type, List type.stream().map(RerouteProcessor.DataStreamValueSource::type).toList(), dataset.stream().map(RerouteProcessor.DataStreamValueSource::dataset).toList(), namespace.stream().map(RerouteProcessor.DataStreamValueSource::namespace).toList(), - null, - clusterServiceMock, - ProjectId.DEFAULT, - streamsPermissionsUtilsMock + null ); } private RerouteProcessor createRerouteProcessor(String destination) { - return new RerouteProcessor( - null, - null, - List.of(), - List.of(), - List.of(), - destination, - clusterServiceMock, - ProjectId.DEFAULT, - streamsPermissionsUtilsMock - ); + return new RerouteProcessor(null, null, List.of(), List.of(), List.of(), destination); } private void assertDataSetFields(IngestDocument ingestDocument, String type, String dataset, String namespace) { diff --git a/server/src/main/java/org/elasticsearch/ingest/IngestService.java b/server/src/main/java/org/elasticsearch/ingest/IngestService.java index b4233cb94e21b..e2facd56a4b69 100644 --- a/server/src/main/java/org/elasticsearch/ingest/IngestService.java +++ b/server/src/main/java/org/elasticsearch/ingest/IngestService.java @@ -26,6 +26,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.ingest.DeletePipelineRequest; import org.elasticsearch.action.ingest.PutPipelineRequest; +import org.elasticsearch.action.ingest.ReservedPipelineAction; import org.elasticsearch.action.support.RefCountingRunnable; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.internal.Client; @@ -55,6 +56,8 @@ import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.streams.StreamType; +import org.elasticsearch.common.streams.StreamsPermissionsUtils; import org.elasticsearch.common.util.CollectionUtils; import org.elasticsearch.common.util.Maps; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -75,6 +78,7 @@ import org.elasticsearch.node.ReportingService; import org.elasticsearch.plugins.IngestPlugin; import org.elasticsearch.plugins.internal.XContentParserDecorator; +import org.elasticsearch.script.Metadata; import org.elasticsearch.script.ScriptService; import org.elasticsearch.threadpool.Scheduler; import org.elasticsearch.threadpool.ThreadPool; @@ -154,6 +158,7 @@ public static boolean locallySupportedIngestFeature(NodeFeature nodeFeature) { private volatile ClusterState state; private final ProjectResolver projectResolver; private final FeatureService featureService; + private final StreamsPermissionsUtils streamsPermissionsUtils; private static BiFunction createScheduler(ThreadPool threadPool) { return (delay, command) -> threadPool.schedule(command, TimeValue.timeValueMillis(delay), threadPool.generic()); @@ -241,7 +246,8 @@ public IngestService( MatcherWatchdog matcherWatchdog, FailureStoreMetrics failureStoreMetrics, ProjectResolver projectResolver, - FeatureService featureService + FeatureService featureService, + StreamsPermissionsUtils streamsPermissionsUtils ) { this.clusterService = clusterService; this.scriptService = scriptService; @@ -265,6 +271,7 @@ public IngestService( this.failureStoreMetrics = failureStoreMetrics; this.projectResolver = projectResolver; this.featureService = featureService; + this.streamsPermissionsUtils = streamsPermissionsUtils; } /** @@ -283,6 +290,7 @@ public IngestService( this.failureStoreMetrics = ingestService.failureStoreMetrics; this.projectResolver = ingestService.projectResolver; this.featureService = ingestService.featureService; + streamsPermissionsUtils = ingestService.streamsPermissionsUtils; } private static Map processorFactories(List ingestPlugins, Processor.Parameters parameters) { @@ -301,7 +309,7 @@ private static Map processorFactories(List * Also, this method marks the request as `isPipelinesResolved = true`: Due to the request could be rerouted from a coordinating node * to an ingest node, we have to be able to avoid double resolving the pipelines and also able to distinguish that either the pipeline @@ -309,7 +317,7 @@ private static Map processorFactories(List documentListener = ActionListener.runAfter( @@ -1198,6 +1206,31 @@ private void executePipelines( return; // document failed! } + StreamsPermissionsUtils permissionsUtils = StreamsPermissionsUtils.getInstance(); + for (StreamType streamType : StreamType.values()) { + if (permissionsUtils.streamTypeIsEnabled(streamType, project)) { + if (newIndex.startsWith(streamType.getStreamName() + ".") + && ingestDocument.getIndexHistory().stream().noneMatch(s -> s.equals(streamType.getStreamName()))) { + exceptionHandler.accept( + new IngestPipelineException( + pipelineId, + new IllegalArgumentException( + format( + "Pipelines can't re-route documents to child streams, but pipeline [%s] tried to reroute " + + "this document from index [%s] to index [%s]. Reroute history: %s", + pipelineId, + originalIndex, + newIndex, + String.join(" -> ", ingestDocument.getIndexHistory()) + ) + ) + ) + ); + return; // document failed! + } + } + } + // add the index to the document's index history, and check for cycles in the visited indices boolean cycle = ingestDocument.updateIndexHistory(newIndex) == false; if (cycle) { @@ -1352,7 +1385,7 @@ private static IngestDocument newIngestDocument(final IndexRequest request) { /** * Updates an index request based on the metadata of an ingest document. */ - private static void updateIndexRequestMetadata(final IndexRequest request, final org.elasticsearch.script.Metadata metadata) { + private static void updateIndexRequestMetadata(final IndexRequest request, final Metadata metadata) { // it's fine to set all metadata fields all the time, as ingest document holds their starting values // before ingestion, which might also get modified during ingestion. request.index(metadata.getIndex()); diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index bb28ed4a8aff5..f85fd27aa769d 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -77,6 +77,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsModule; +import org.elasticsearch.common.streams.StreamsPermissionsUtils; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.util.set.Sets; @@ -716,7 +717,8 @@ private void construct( IngestService.createGrokThreadWatchdog(environment, threadPool), failureStoreMetrics, projectResolver, - featureService + featureService, + StreamsPermissionsUtils.getInstance() ); SystemIndices systemIndices = createSystemIndices(settings); diff --git a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java index 15ff956c598dc..179ca02eca5b5 100644 --- a/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/IngestServiceTests.java @@ -48,6 +48,7 @@ import org.elasticsearch.common.TriConsumer; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.streams.StreamsPermissionsUtils; import org.elasticsearch.common.time.DateFormatter; import org.elasticsearch.common.util.Maps; import org.elasticsearch.common.util.concurrent.EsExecutors; @@ -143,12 +144,14 @@ public Map getProcessors(Processor.Parameters paramet }; private ThreadPool threadPool; + private static StreamsPermissionsUtils streamsPermissionsUtils = mock(StreamsPermissionsUtils.class); @Before public void setup() { threadPool = mock(ThreadPool.class); when(threadPool.generic()).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE); when(threadPool.executor(anyString())).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE); + when(streamsPermissionsUtils.streamTypeIsEnabled(any(), any())).thenReturn(false); } public void testIngestPlugin() { @@ -169,7 +172,8 @@ public void testIngestPlugin() { public boolean clusterHasFeature(ClusterState state, NodeFeature feature) { return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature); } - } + }, + streamsPermissionsUtils ); Map factories = ingestService.getProcessorFactories(); assertTrue(factories.containsKey("foo")); @@ -196,7 +200,8 @@ public void testIngestPluginDuplicate() { public boolean clusterHasFeature(ClusterState state, NodeFeature feature) { return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature); } - } + }, + streamsPermissionsUtils ) ); assertTrue(e.getMessage(), e.getMessage().contains("already registered")); @@ -220,7 +225,8 @@ public void testExecuteIndexPipelineDoesNotExist() { public boolean clusterHasFeature(ClusterState state, NodeFeature feature) { return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature); } - } + }, + streamsPermissionsUtils ); final IndexRequest indexRequest = new IndexRequest("_index").id("_id") .source(Map.of()) @@ -2505,7 +2511,8 @@ public Map getProcessors(Processor.Parameters paramet public boolean clusterHasFeature(ClusterState state, NodeFeature feature) { return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature); } - } + }, + streamsPermissionsUtils ); ingestService.addIngestClusterStateListener(ingestClusterStateListener); @@ -2998,7 +3005,8 @@ private void testUpdatingPipeline(String pipelineString) throws Exception { public boolean clusterHasFeature(ClusterState state, NodeFeature feature) { return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature); } - } + }, + streamsPermissionsUtils ); ingestService.applyClusterState(new ClusterChangedEvent("", clusterState, clusterState)); @@ -3342,7 +3350,8 @@ public Map getProcessors(final Processor.Parameters p public boolean clusterHasFeature(ClusterState state, NodeFeature feature) { return featureTest.test(feature); } - } + }, + streamsPermissionsUtils ); if (randomBoolean()) { /* diff --git a/server/src/test/java/org/elasticsearch/ingest/SimulateIngestServiceTests.java b/server/src/test/java/org/elasticsearch/ingest/SimulateIngestServiceTests.java index 9cd3a74d9e73a..174c3ecccb9e5 100644 --- a/server/src/test/java/org/elasticsearch/ingest/SimulateIngestServiceTests.java +++ b/server/src/test/java/org/elasticsearch/ingest/SimulateIngestServiceTests.java @@ -18,6 +18,7 @@ import org.elasticsearch.cluster.project.TestProjectResolvers; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.streams.StreamsPermissionsUtils; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.features.FeatureService; import org.elasticsearch.features.NodeFeature; @@ -34,6 +35,7 @@ import static org.elasticsearch.test.LambdaMatchers.transformedMatch; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -123,6 +125,10 @@ private static IngestService createWithProcessors(ProjectId projectId, Map getProcessors(final Processor.Parameters parameters) { @@ -145,7 +151,8 @@ public Map getProcessors(final Processor.Parameters p public boolean clusterHasFeature(ClusterState state, NodeFeature feature) { return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature); } - } + }, + streamsPermissionsUtils ); } } diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 81f4f743d821a..2389c699da9af 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -126,6 +126,7 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.streams.StreamsPermissionsUtils; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.PageCacheRecycler; @@ -248,7 +249,9 @@ import static org.hamcrest.Matchers.iterableWithSize; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.notNullValue; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class SnapshotResiliencyTests extends ESTestCase { @@ -2628,6 +2631,10 @@ public RecyclerBytesStreamOutput newNetworkBytesStream() { final MappingUpdatedAction mappingUpdatedAction = new MappingUpdatedAction(settings, clusterSettings); final IndexingPressure indexingMemoryLimits = new IndexingPressure(settings); mappingUpdatedAction.setClient(client); + + StreamsPermissionsUtils streamsPermissionsUtils = mock(StreamsPermissionsUtils.class); + when(streamsPermissionsUtils.streamTypeIsEnabled(any(), any())).thenReturn(false); + actions.put( TransportBulkAction.TYPE, new TransportBulkAction( @@ -2650,7 +2657,8 @@ public RecyclerBytesStreamOutput newNetworkBytesStream() { public boolean clusterHasFeature(ClusterState state, NodeFeature feature) { return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature); } - } + }, + streamsPermissionsUtils ), client, actionFilters, diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java index 06ba7ba113d4e..59fc34fd8e67a 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetTrainedModelsStatsActionTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.cluster.service.MasterService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.streams.StreamsPermissionsUtils; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.features.FeatureService; @@ -54,6 +55,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasEntry; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -143,6 +145,10 @@ public void setUpVariables() { ) ); clusterService = new ClusterService(settings, clusterSettings, tp, null); + + StreamsPermissionsUtils permissionsUtils = mock(StreamsPermissionsUtils.class); + when(permissionsUtils.streamTypeIsEnabled(any(), any())).thenReturn(false); + ingestService = new IngestService( clusterService, tp, @@ -159,7 +165,8 @@ public void setUpVariables() { public boolean clusterHasFeature(ClusterState state, NodeFeature feature) { return DataStream.DATA_STREAM_FAILURE_STORE_FEATURE.equals(feature); } - } + }, + permissionsUtils ); } From 14aefc8e952a2b75072f47e9c36a226f3773f140 Mon Sep 17 00:00:00 2001 From: Luke Whiting Date: Mon, 14 Jul 2025 16:11:46 +0100 Subject: [PATCH 11/11] SNAPSHOT - Refactor to move substream check earlier in indexing process --- .../action/bulk/BulkOperation.java | 50 +++++++++---------- .../bulk/TransportAbstractBulkAction.java | 31 ++++++++++++ 2 files changed, 54 insertions(+), 27 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java b/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java index b3a540edce922..2309275b43c7c 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java @@ -43,7 +43,6 @@ import org.elasticsearch.cluster.routing.IndexRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.Iterators; -import org.elasticsearch.common.streams.StreamType; import org.elasticsearch.common.streams.StreamsPermissionsUtils; import org.elasticsearch.common.util.concurrent.AtomicArray; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; @@ -61,8 +60,6 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; -import java.util.EnumSet; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -74,7 +71,6 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.LongSupplier; -import java.util.stream.Collectors; import static org.elasticsearch.action.bulk.TransportBulkAction.LAZY_ROLLOVER_ORIGIN; import static org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.EXCLUDED_DATA_STREAMS_KEY; @@ -283,29 +279,29 @@ private long buildTookInMillis(long startTimeNanos) { } private Map> groupBulkRequestsByShards(ClusterState clusterState) { - ProjectMetadata projectMetadata = projectResolver.getProjectMetadata(clusterState); - - Set enabledStreamTypes = Arrays.stream(StreamType.values()) - .filter(t -> streamsPermissionsUtils.streamTypeIsEnabled(t, projectMetadata)) - .collect(Collectors.toCollection(() -> EnumSet.noneOf(StreamType.class))); - - for (StreamType streamType : enabledStreamTypes) { - for (int i = 0; i < bulkRequest.requests.size(); i++) { - DocWriteRequest req = bulkRequest.requests.get(i); - String prefix = streamType.getStreamName() + "."; - if (req != null && req.index().startsWith(prefix)) { - IllegalArgumentException exception = new IllegalArgumentException( - "Writes to child stream [" - + req.index() - + "] are not allowed, use the parent stream instead: [" - + streamType.getStreamName() - + "]" - ); - IndexDocFailureStoreStatus failureStoreStatus = processFailure(new BulkItemRequest(i, req), projectMetadata, exception); - addFailureAndDiscardRequest(req, i, req.index(), exception, failureStoreStatus); - } - } - } + // ProjectMetadata projectMetadata = projectResolver.getProjectMetadata(clusterState); + // + // Set enabledStreamTypes = Arrays.stream(StreamType.values()) + // .filter(t -> streamsPermissionsUtils.streamTypeIsEnabled(t, projectMetadata)) + // .collect(Collectors.toCollection(() -> EnumSet.noneOf(StreamType.class))); + // + // for (StreamType streamType : enabledStreamTypes) { + // for (int i = 0; i < bulkRequest.requests.size(); i++) { + // DocWriteRequest req = bulkRequest.requests.get(i); + // String prefix = streamType.getStreamName() + "."; + // if (req != null && req.index().startsWith(prefix)) { + // IllegalArgumentException exception = new IllegalArgumentException( + // "Writes to child stream [" + // + req.index() + // + "] are not allowed, use the parent stream instead: [" + // + streamType.getStreamName() + // + "]" + // ); + // IndexDocFailureStoreStatus failureStoreStatus = processFailure(new BulkItemRequest(i, req), projectMetadata, exception); + // addFailureAndDiscardRequest(req, i, req.index(), exception, failureStoreStatus); + // } + // } + // } return groupRequestsByShards( clusterState, diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java index 96fbb2c5d6649..ee35a41ce782a 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportAbstractBulkAction.java @@ -32,6 +32,8 @@ import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.streams.StreamType; +import org.elasticsearch.common.streams.StreamsPermissionsUtils; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.core.Assertions; import org.elasticsearch.core.FixForMultiProject; @@ -46,12 +48,16 @@ import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.Arrays; +import java.util.EnumSet; import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.function.LongSupplier; +import java.util.stream.Collectors; /** * This is an abstract base class for bulk actions. It traverses all indices that the request gets routed to, executes all applicable @@ -400,6 +406,31 @@ private void applyPipelinesAndDoInternalExecute( ActionListener listener ) throws IOException { final long relativeStartTimeNanos = relativeTimeNanos(); + + // Validate child stream writes before processing pipelines + ProjectMetadata projectMetadata = projectResolver.getProjectMetadata(clusterService.state()); + Set enabledStreamTypes = Arrays.stream(StreamType.values()) + .filter(t -> StreamsPermissionsUtils.getInstance().streamTypeIsEnabled(t, projectMetadata)) + .collect(Collectors.toCollection(() -> EnumSet.noneOf(StreamType.class))); + + BulkRequestModifier bulkRequestModifier = new BulkRequestModifier(bulkRequest); + + for (StreamType streamType : enabledStreamTypes) { + for (int i = 0; i < bulkRequest.requests.size(); i++) { + DocWriteRequest req = bulkRequest.requests.get(i); + String prefix = streamType.getStreamName() + "."; + if (req != null && req.index() != null && req.index().startsWith(prefix)) { + IllegalArgumentException e = new IllegalArgumentException("Can't write to child stream"); + Boolean failureStore = resolveFailureStore(req.index(), projectMetadata, threadPool.absoluteTimeInMillis()); + if (failureStore != null && failureStore) { + bulkRequestModifier.markItemForFailureStore(i, req.index(), e); + } else { + bulkRequestModifier.markItemAsFailed(i, e, IndexDocFailureStoreStatus.NOT_APPLICABLE_OR_UNKNOWN); + } + } + } + } + if (applyPipelines(task, bulkRequest, executor, listener) == false) { doInternalExecute(task, bulkRequest, executor, listener, relativeStartTimeNanos); }