Skip to content

[Logstash] Move elastic_integration plugin usage to ES logstash-bridge. #131486

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
b9cdd5b
Testing painless processors extension.
mashhurs Jul 14, 2025
e3310cd
Test paintless spaial, wildcard and constant keyword extensions.
mashhurs Jul 14, 2025
3a6a795
Fix the bug that when null applied for Map.of() which may cause NPE.
mashhurs Jul 15, 2025
e2dd123
Debug
mashhurs Jul 15, 2025
84f38e3
Debugging: require delegate and its metadata non null to get specific…
mashhurs Jul 16, 2025
78a3394
Debugging: instead of Map.copyOf(), send ingest metadata as it is.
mashhurs Jul 16, 2025
38b25ce
Make metadata version field accessible, introduce inverse wrapper wit…
mashhurs Jul 16, 2025
0e70a6c
RedactPlugin, IngestCommonPlugin and IngestUserAgent plugins moved to…
mashhurs Jul 17, 2025
b7ac030
Ingest common plugin simplification.
mashhurs Jul 17, 2025
0d0acef
[CI] Auto commit changes from spotless
Jul 17, 2025
577e0f2
Open an access for the x-pack spatial module resources.
mashhurs Jul 17, 2025
94aa3e2
[CI] Auto commit changes from spotless
Jul 17, 2025
87774ae
Open access to resources in mapper constant keyword.
mashhurs Jul 18, 2025
bfa2d27
Add an access to resource in x-pack wildcard module.
mashhurs Jul 18, 2025
4b1ef11
Export and open access to sub-package spatial modules.
mashhurs Jul 18, 2025
fb835ba
Provide module service implementations with provides keyword in x-pac…
mashhurs Jul 18, 2025
b4346d3
Export x-pack spatial module packages to make accessible to ES server…
mashhurs Jul 18, 2025
a9981f5
Merge branch 'main' into move-to-bridge-stable-api-investigation
mashhurs Jul 18, 2025
290e82a
spike refactor of logstashbridge stable API
yaauie Jul 21, 2025
ba80d06
Merge pull request #1 from yaauie/stable-api-clarity-external-internal
mashhurs Jul 22, 2025
cd44d2f
Format the recent logstash-bridge changes.
mashhurs Jul 22, 2025
4eee343
Wildcard and mapper constant keyword modules open resource access to …
mashhurs Jul 22, 2025
ede5de5
Merge branch 'main' into move-to-bridge-stable-api-investigation
mashhurs Jul 22, 2025
166d4be
Rename constant-keyword, wildcard, redact and spatial modules in a wa…
mashhurs Jul 23, 2025
c9b3be1
Merge branch 'main' into move-to-bridge-stable-api-investigation
mashhurs Jul 23, 2025
7f84caf
Remove unnecessary constant class in the ingest common plugin bridge.
mashhurs Jul 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion libs/logstash-bridge/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@ dependencies {
compileOnly project(':modules:lang-painless:spi')
compileOnly project(':modules:lang-mustache')
compileOnly project(':modules:ingest-common')
// compileOnly project(':modules:ingest-geoip')
compileOnly project(':modules:ingest-geoip')
compileOnly project(':modules:ingest-user-agent')
compileOnly project(':x-pack:plugin:core')
compileOnly project(':x-pack:plugin:mapper-constant-keyword')
compileOnly project(':x-pack:plugin:redact')
compileOnly project(':x-pack:plugin:spatial')
compileOnly project(':x-pack:plugin:wildcard')
}

tasks.named('forbiddenApisMain').configure {
Expand Down
7 changes: 7 additions & 0 deletions libs/logstash-bridge/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,15 @@
requires org.elasticsearch.server;
requires org.elasticsearch.painless;
requires org.elasticsearch.painless.spi;
requires org.elasticsearch.ingest.common;
requires org.elasticsearch.ingest.useragent;
requires org.elasticsearch.mustache;
requires org.elasticsearch.xcontent;
requires org.elasticsearch.xcore;
requires org.elasticsearch.constantkeyword;
requires org.elasticsearch.redact;
requires org.elasticsearch.spatial;
requires org.elasticsearch.wildcard;

exports org.elasticsearch.logstashbridge;
exports org.elasticsearch.logstashbridge.common;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,46 +16,52 @@
/**
* A {@code StableBridgeAPI} is the stable bridge to an Elasticsearch API, and can produce instances
* from the actual API that they mirror. As part of the LogstashBridge project, these classes are relied
* upon by the "Elastic Integration Filter Plugin" for Logstash and their external shapes mut not change
* upon by the "Elastic Integration Filter Plugin" for Logstash and their external shapes must not change
* without coordination with the maintainers of that project.
*
* @param <T> the actual type of the Elasticsearch API being mirrored
* @param <INTERNAL> the actual type of the Elasticsearch API being mirrored
*/
public interface StableBridgeAPI<T> {
T unwrap();
public interface StableBridgeAPI<INTERNAL> {
INTERNAL toInternal();

static <T> T unwrapNullable(final StableBridgeAPI<T> nullableStableBridgeAPI) {
static <T> T toInternalNullable(final StableBridgeAPI<T> nullableStableBridgeAPI) {
if (Objects.isNull(nullableStableBridgeAPI)) {
return null;
}
return nullableStableBridgeAPI.unwrap();
return nullableStableBridgeAPI.toInternal();
}

static <K, T> Map<K, T> unwrap(final Map<K, ? extends StableBridgeAPI<T>> bridgeMap) {
return bridgeMap.entrySet().stream().collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, e -> e.getValue().unwrap()));
static <K, T> Map<K, T> toInternal(final Map<K, ? extends StableBridgeAPI<T>> bridgeMap) {
return bridgeMap.entrySet().stream().collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, e -> e.getValue().toInternal()));
}

static <K, T, B extends StableBridgeAPI<T>> Map<K, B> wrap(final Map<K, T> rawMap, final Function<T, B> wrapFunction) {
return rawMap.entrySet().stream().collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, e -> wrapFunction.apply(e.getValue())));
static <K, T, B extends StableBridgeAPI<T>> Map<K, B> fromInternal(final Map<K, T> rawMap, final Function<T, B> externalizor) {
return rawMap.entrySet().stream().collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, e -> externalizor.apply(e.getValue())));
}

static <T, B extends StableBridgeAPI<T>> B wrap(final T delegate, final Function<T, B> wrapFunction) {
static <T, B extends StableBridgeAPI<T>> B fromInternal(final T delegate, final Function<T, B> externalizor) {
if (Objects.isNull(delegate)) {
return null;
}
return wrapFunction.apply(delegate);
return externalizor.apply(delegate);
}

abstract class Proxy<T> implements StableBridgeAPI<T> {
protected final T delegate;
/**
* An {@code ProxyInternal<INTERNAL>} is an implementation of {@code StableBridgeAPI<INTERNAL>} that
* proxies calls to a delegate that is an actual {@code INTERNAL}.
*
* @param <INTERNAL>
*/
abstract class ProxyInternal<INTERNAL> implements StableBridgeAPI<INTERNAL> {
protected final INTERNAL internalDelegate;

protected Proxy(final T delegate) {
this.delegate = delegate;
protected ProxyInternal(final INTERNAL internalDelegate) {
this.internalDelegate = internalDelegate;
}

@Override
public T unwrap() {
return delegate;
public INTERNAL toInternal() {
return internalDelegate;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,33 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.logstashbridge.StableBridgeAPI;

public class SettingsBridge extends StableBridgeAPI.Proxy<Settings> {
/**
* An external bridge for {@link Settings}
*/
public class SettingsBridge extends StableBridgeAPI.ProxyInternal<Settings> {

public static SettingsBridge wrap(final Settings delegate) {
public static SettingsBridge fromInternal(final Settings delegate) {
return new SettingsBridge(delegate);
}

public static Builder builder() {
return Builder.wrap(Settings.builder());
return Builder.fromInternal(Settings.builder());
}

public SettingsBridge(final Settings delegate) {
super(delegate);
}

@Override
public Settings unwrap() {
return this.delegate;
public Settings toInternal() {
return this.internalDelegate;
}

public static class Builder extends StableBridgeAPI.Proxy<Settings.Builder> {
static Builder wrap(final Settings.Builder delegate) {
/**
* An external bridge for {@link Settings.Builder} that proxies calls to a real {@link Settings.Builder}
*/
public static class Builder extends StableBridgeAPI.ProxyInternal<Settings.Builder> {
static Builder fromInternal(final Settings.Builder delegate) {
return new Builder(delegate);
}

Expand All @@ -40,12 +46,12 @@ private Builder(final Settings.Builder delegate) {
}

public Builder put(final String key, final String value) {
this.delegate.put(key, value);
this.internalDelegate.put(key, value);
return this;
}

public SettingsBridge build() {
return new SettingsBridge(this.delegate.build());
return new SettingsBridge(this.internalDelegate.build());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@

import java.io.Closeable;

/**
* An external bridge for {@link IOUtils}
*/
public class IOUtilsBridge {
public static void closeWhileHandlingException(final Iterable<? extends Closeable> objects) {
IOUtils.closeWhileHandlingException(objects);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,24 @@

import java.nio.file.Path;

public class EnvironmentBridge extends StableBridgeAPI.Proxy<Environment> {
public static EnvironmentBridge wrap(final Environment delegate) {
/**
* An external bridge for {@link Environment}
*/
public class EnvironmentBridge extends StableBridgeAPI.ProxyInternal<Environment> {
public static EnvironmentBridge fromInternal(final Environment delegate) {
return new EnvironmentBridge(delegate);
}

public EnvironmentBridge(final SettingsBridge settingsBridge, final Path configPath) {
this(new Environment(settingsBridge.unwrap(), configPath));
this(new Environment(settingsBridge.toInternal(), configPath));
}

private EnvironmentBridge(final Environment delegate) {
super(delegate);
}

@Override
public Environment unwrap() {
return this.delegate;
public Environment toInternal() {
return this.internalDelegate;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@

import java.util.Map;

/**
* An external bridge for {@link ConfigurationUtils}
*/
public class ConfigurationUtilsBridge {
public static TemplateScriptBridge.Factory compileTemplate(
final String processorType,
Expand All @@ -23,7 +26,7 @@ public static TemplateScriptBridge.Factory compileTemplate(
final ScriptServiceBridge scriptServiceBridge
) {
return new TemplateScriptBridge.Factory(
ConfigurationUtils.compileTemplate(processorType, processorTag, propertyName, propertyValue, scriptServiceBridge.unwrap())
ConfigurationUtils.compileTemplate(processorType, processorTag, propertyName, propertyValue, scriptServiceBridge.toInternal())
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,18 @@
import java.util.Set;
import java.util.function.BiConsumer;

public class IngestDocumentBridge extends StableBridgeAPI.Proxy<IngestDocument> {
/**
* An external bridge for {@link IngestDocument} that proxies calls through a real {@link IngestDocument}
*/
public class IngestDocumentBridge extends StableBridgeAPI.ProxyInternal<IngestDocument> {

public static final class Constants {
public static final String METADATA_VERSION_FIELD_NAME = IngestDocument.Metadata.VERSION.getFieldName();

private Constants() {}
}

public static IngestDocumentBridge wrap(final IngestDocument ingestDocument) {
public static IngestDocumentBridge fromInternalNullable(final IngestDocument ingestDocument) {
if (ingestDocument == null) {
return null;
}
Expand All @@ -36,55 +45,56 @@ private IngestDocumentBridge(IngestDocument inner) {
}

public MetadataBridge getMetadata() {
return new MetadataBridge(delegate.getMetadata());
return new MetadataBridge(internalDelegate.getMetadata());
}

public Map<String, Object> getSource() {
return delegate.getSource();
return internalDelegate.getSource();
}

public boolean updateIndexHistory(final String index) {
return delegate.updateIndexHistory(index);
return internalDelegate.updateIndexHistory(index);
}

public Set<String> getIndexHistory() {
return Set.copyOf(delegate.getIndexHistory());
return Set.copyOf(internalDelegate.getIndexHistory());
}

public boolean isReroute() {
return LogstashInternalBridge.isReroute(delegate);
return LogstashInternalBridge.isReroute(internalDelegate);
}

public void resetReroute() {
LogstashInternalBridge.resetReroute(delegate);
LogstashInternalBridge.resetReroute(internalDelegate);
}

public Map<String, Object> getIngestMetadata() {
return Map.copyOf(delegate.getIngestMetadata());
return internalDelegate.getIngestMetadata();
}

public <T> T getFieldValue(final String fieldName, final Class<T> type) {
return delegate.getFieldValue(fieldName, type);
return internalDelegate.getFieldValue(fieldName, type);
}

public <T> T getFieldValue(final String fieldName, final Class<T> type, final boolean ignoreMissing) {
return delegate.getFieldValue(fieldName, type, ignoreMissing);
return internalDelegate.getFieldValue(fieldName, type, ignoreMissing);
}

public String renderTemplate(final TemplateScriptBridge.Factory templateScriptFactory) {
return delegate.renderTemplate(templateScriptFactory.unwrap());
return internalDelegate.renderTemplate(templateScriptFactory.toInternal());
}

public void setFieldValue(final String path, final Object value) {
delegate.setFieldValue(path, value);
internalDelegate.setFieldValue(path, value);
}

public void removeField(final String path) {
delegate.removeField(path);
internalDelegate.removeField(path);
}

// public void executePipeline(Pipeline pipeline, BiConsumer<IngestDocument, Exception> handler) {
public void executePipeline(final PipelineBridge pipelineBridge, final BiConsumer<IngestDocumentBridge, Exception> handler) {
this.delegate.executePipeline(pipelineBridge.unwrap(), (unwrapped, e) -> handler.accept(IngestDocumentBridge.wrap(unwrapped), e));
this.internalDelegate.executePipeline(pipelineBridge.toInternal(), (ingestDocument, e) -> {
handler.accept(IngestDocumentBridge.fromInternalNullable(ingestDocument), e);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
import java.util.Map;
import java.util.function.BiConsumer;

public class PipelineBridge extends StableBridgeAPI.Proxy<Pipeline> {
public static PipelineBridge wrap(final Pipeline pipeline) {
/**
* An external bridge for {@link Pipeline}
*/
public class PipelineBridge extends StableBridgeAPI.ProxyInternal<Pipeline> {
public static PipelineBridge fromInternal(final Pipeline pipeline) {
return new PipelineBridge(pipeline);
}

Expand All @@ -28,12 +31,12 @@ public static PipelineBridge create(
Map<String, ProcessorBridge.Factory> processorFactories,
ScriptServiceBridge scriptServiceBridge
) throws Exception {
return wrap(
return fromInternal(
Pipeline.create(
id,
config,
StableBridgeAPI.unwrap(processorFactories),
StableBridgeAPI.unwrapNullable(scriptServiceBridge),
StableBridgeAPI.toInternal(processorFactories),
StableBridgeAPI.toInternalNullable(scriptServiceBridge),
null
)
);
Expand All @@ -44,13 +47,13 @@ public PipelineBridge(final Pipeline delegate) {
}

public String getId() {
return delegate.getId();
return internalDelegate.getId();
}

public void execute(final IngestDocumentBridge ingestDocumentBridge, final BiConsumer<IngestDocumentBridge, Exception> handler) {
this.delegate.execute(
StableBridgeAPI.unwrapNullable(ingestDocumentBridge),
(unwrapped, e) -> handler.accept(IngestDocumentBridge.wrap(unwrapped), e)
this.internalDelegate.execute(
StableBridgeAPI.toInternalNullable(ingestDocumentBridge),
(ingestDocument, e) -> handler.accept(IngestDocumentBridge.fromInternalNullable(ingestDocument), e)
);
}
}
Loading