Skip to content

Transport action changes for streams write restrictions #130742

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from

Conversation

lukewhiting
Copy link
Contributor

@lukewhiting lukewhiting commented Jul 7, 2025

Very early PR for restricting writes to substreams (<streamName>.*) via the single document, bulk document and reroute pipeline processors.

Discussion to be had with product on if additional restrictions (Such as changing routing in painless scripts) also needs to be restricted and the additional cost of that.

Fixes ES-11941

@lukewhiting lukewhiting changed the title Es 11941 streams logs bulk transport changes Transport action changes for streams write restrictions Jul 7, 2025
@lukewhiting lukewhiting requested a review from Copilot July 7, 2025 16:04
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces write restrictions for substreams (e.g., <streamName>.*) across single-document indexing, bulk operations, and ingest pipeline reroute processors. It adds a StreamsPermissionsUtils helper for checking and enforcing these restrictions, integrates checks in BulkOperation and RerouteProcessor, and extends tests and REST specs for validating the new behavior.

  • Added StreamType enum and StreamsPermissionsUtils singleton with streamTypeIsEnabled and throwIfRetrouteToSubstreamNotAllowed.
  • Updated BulkOperation and its factory/tests to inject and apply StreamsPermissionsUtils for blocking writes to child streams when enabled.
  • Enhanced RerouteProcessor and its factory/tests to enforce reroute restrictions, and added YAML REST tests under modules/streams.

Reviewed Changes

Copilot reviewed 13 out of 13 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
server/src/main/java/org/elasticsearch/common/streams/StreamType.java Introduces a new enum for stream names (currently only LOGS).
server/src/main/java/org/elasticsearch/common/streams/StreamsPermissionsUtils.java Adds singleton utility to check stream enablement and block reroutes.
server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java Injects and uses StreamsPermissionsUtils to filter bulk substream writes.
server/src/test/java/org/elasticsearch/common/streams/StreamsPermissionsUtilsTests.java Unit tests for new permissions utility.
server/src/test/java/org/elasticsearch/action/bulk/BulkOperationTests.java Extended tests for bulk write restrictions and mocks for new utility.
modules/streams/src/yamlRestTest/resources/rest-api-spec/test/streams/logs/20_substream_restrictions.yml REST YAML tests validating substream write errors.
modules/streams/src/yamlRestTest/java/org/elasticsearch/streams/StreamsYamlTestSuiteIT.java Adjusts test cluster modules to include ingest-common.
modules/streams/build.gradle Exports common.streams package and updates REST resource includes.
modules/ingest-common/src/java/.../RerouteProcessor*.java Updates RerouteProcessor and factory to invoke StreamsPermissionsUtils.

Comment on lines 286 to 305
ProjectMetadata projectMetadata = projectResolver.getProjectMetadata(clusterState);

Set<StreamType> enabledStreamTypes = Arrays.stream(StreamType.values())
.filter(t -> streamsPermissionsUtils.streamTypeIsEnabled(t, projectMetadata))
.collect(Collectors.toCollection(() -> EnumSet.noneOf(StreamType.class)));

for (StreamType streamType : enabledStreamTypes) {
for (int i = 0; i < bulkRequest.requests.size(); i++) {
DocWriteRequest<?> req = bulkRequest.requests.get(i);
String prefix = streamType.getStreamName() + ".";
if (req != null && req.index().startsWith(prefix)) {
IllegalArgumentException exception = new IllegalArgumentException(
"Writes to child stream ["
+ req.index()
+ "] are not allowed, use the parent stream instead: ["
+ streamType.getStreamName()
+ "]"
);
IndexDocFailureStoreStatus failureStoreStatus = processFailure(new BulkItemRequest(i, req), projectMetadata, exception);
addFailureAndDiscardRequest(req, i, req.index(), exception, failureStoreStatus);
}
}
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're going to want to move this logic to somewhere earlier, like in TransportAbstractBulkAction::doRun, just before calling applyPipelinesAndDoInternalExecute. The reason is that at this point in BulkOperation the req.index() will already be the final destination, which could very well be a substream. And we have no history. Take this example:

PUT _ingest/pipeline/pipeline1
{
   "processors" : [
    {
      "reroute" : {
        "destination": "logs.abc"
      }
    }
  ]
}

PUT _ingest/pipeline/pipeline2
{
   "processors" : [
    {
      "reroute" : {
        "destination": "logs.abc.def"
      }
    }
  ]
}

PUT logs
{
    "settings": {
    "index.default_pipeline": "pipeline1",
    "number_of_replicas": 0
  }
}

PUT logs.abc
{
    "settings": {
    "index.default_pipeline": "pipeline2",
    "number_of_replicas": 0
  }
}

PUT logs/_bulk?_source=true
{ "create":{ } }
{ "@timestamp": "2099-05-06T16:21:15.000Z", "message": "192.0.2.42 - - [06/May/2099:16:21:15 +0000] \"GET /images/bg.jpg HTTP/1.0\" 200 24736" }
{ "delete" : { "_id" : "vYQUpJcBmbnJKL4xPLSK" } }

I would expect the first entry to successfully write into logs.abc.def, and the second one to fail (since we don't allow delete). But the first one fails (and so does the 2nd one, but only b/c the entry doesn't exist).

@lukewhiting lukewhiting force-pushed the es-11941-streams-logs-bulk-transport-changes branch from 5b26058 to 5b3f2c6 Compare July 10, 2025 12:43
@lukewhiting lukewhiting force-pushed the es-11941-streams-logs-bulk-transport-changes branch from 5b3f2c6 to 14aefc8 Compare July 14, 2025 15:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants