diff --git a/dd-java-agent/agent-bootstrap/src/jmh/java/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeOutputStreamBenchmark.java b/dd-java-agent/agent-bootstrap/src/jmh/java/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeOutputStreamBenchmark.java index 2432469949c..fed7bfe9d77 100644 --- a/dd-java-agent/agent-bootstrap/src/jmh/java/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeOutputStreamBenchmark.java +++ b/dd-java-agent/agent-bootstrap/src/jmh/java/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeOutputStreamBenchmark.java @@ -21,6 +21,11 @@ import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.Warmup; +/* + * Benchmark Mode Cnt Score Error Units + * InjectingPipeOutputStreamBenchmark.withPipe avgt 2 16.802 us/op + * InjectingPipeOutputStreamBenchmark.withoutPipe avgt 2 13.001 us/op + */ @State(Scope.Benchmark) @Warmup(iterations = 1, time = 30, timeUnit = SECONDS) @Measurement(iterations = 2, time = 30, timeUnit = SECONDS) diff --git a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeOutputStream.java b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeOutputStream.java index ea3322103f7..18d135d77ad 100644 --- a/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeOutputStream.java +++ b/dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/instrumentation/buffer/InjectingPipeOutputStream.java @@ -3,11 +3,10 @@ import java.io.FilterOutputStream; import java.io.IOException; import java.io.OutputStream; -import java.util.function.Consumer; /** - * A circular buffer that holds n+1 bytes and with a lookbehind buffer of n bytes. The first time - * that the latest n bytes matches the marker, a content is injected before. + * A circular buffer with a lookbehind buffer of n bytes. The first time that the latest n bytes + * matches the marker, a content is injected before. */ public class InjectingPipeOutputStream extends FilterOutputStream { private final byte[] lookbehind; @@ -17,7 +16,7 @@ public class InjectingPipeOutputStream extends FilterOutputStream { private final byte[] contentToInject; private boolean found = false; private int matchingPos = 0; - private final Consumer onContentInjected; + private final Runnable onContentInjected; /** * @param downstream the delegate output stream @@ -29,7 +28,7 @@ public InjectingPipeOutputStream( final OutputStream downstream, final byte[] marker, final byte[] contentToInject, - final Consumer onContentInjected) { + final Runnable onContentInjected) { super(downstream); this.marker = marker; this.lookbehind = new byte[marker.length + 1]; @@ -44,47 +43,108 @@ public void write(int b) throws IOException { out.write(b); return; } + + if (bufferFilled) { + out.write(lookbehind[pos]); + } + lookbehind[pos] = (byte) b; pos = (pos + 1) % lookbehind.length; + if (!bufferFilled) { + bufferFilled = pos == 0; + } + if (marker[matchingPos++] == b) { if (matchingPos == marker.length) { found = true; + out.write(lookbehind[pos]); + pos = (pos + 1) % lookbehind.length; out.write(contentToInject); if (onContentInjected != null) { - onContentInjected.accept(null); + onContentInjected.run(); } - drain((pos + 1) % lookbehind.length, marker.length); - return; + drain(lookbehind.length - 1); } } else { matchingPos = 0; } + } - if (!bufferFilled) { - bufferFilled = pos == lookbehind.length - 1; + @Override + public void write(byte[] b, int off, int len) throws IOException { + if (found) { + out.write(b, off, len); + return; + } + if (len > marker.length * 2) { + int idx = arrayContains(b, marker); + if (idx >= 0) { + // we have a full match. just write everything + found = true; + drain(lookbehind.length); + out.write(b, off, idx); + out.write(contentToInject); + if (onContentInjected != null) { + onContentInjected.run(); + } + out.write(b, off + idx, len - idx); + } else { + // we don't have a full match. write everything in a bulk except the lookbehind buffer + // sequentially + for (int i = off; i < off + marker.length; i++) { + write(b[i]); + } + drain(lookbehind.length); + out.write(b, off + marker.length, len - marker.length * 2); + for (int i = len - marker.length; i < len; i++) { + write(b[i]); + } + drain(lookbehind.length); + } + } else { + // use slow path because the length to write is small and within the lookbehind buffer size + super.write(b, off, len); } + } - if (bufferFilled) { - super.write(lookbehind[(pos + 1) % lookbehind.length]); + private int arrayContains(byte[] array, byte[] search) { + for (int i = 0; i < array.length - search.length; i++) { + if (array[i] == search[0]) { + boolean found = true; + int k = i; + for (int j = 1; j < search.length; j++) { + k++; + if (array[k] != search[j]) { + found = false; + break; + } + } + if (found) { + return i; + } + } } + return -1; } - private void drain(int from, int size) throws IOException { - while (size-- > 0) { - super.write(Character.valueOf((char) lookbehind[from])); - from = (from + 1) % lookbehind.length; + private void drain(int len) throws IOException { + if (bufferFilled) { + for (int i = 0; i < len; i++) { + out.write(lookbehind[(pos + i) % lookbehind.length]); + } + } else { + out.write(this.lookbehind, 0, pos); } + pos = 0; + matchingPos = 0; + bufferFilled = false; } @Override public void close() throws IOException { if (!found) { - if (bufferFilled) { - drain((pos + 2) % lookbehind.length, marker.length - 1); - } else { - drain(0, pos); - } + drain(lookbehind.length); } super.close(); } diff --git a/dd-java-agent/instrumentation/servlet/request-3/src/main/java/datadog/trace/instrumentation/servlet3/RumHttpServletResponseWrapper.java b/dd-java-agent/instrumentation/servlet/request-3/src/main/java/datadog/trace/instrumentation/servlet3/RumHttpServletResponseWrapper.java index 826b38355d1..8d8aa1d6971 100644 --- a/dd-java-agent/instrumentation/servlet/request-3/src/main/java/datadog/trace/instrumentation/servlet3/RumHttpServletResponseWrapper.java +++ b/dd-java-agent/instrumentation/servlet/request-3/src/main/java/datadog/trace/instrumentation/servlet3/RumHttpServletResponseWrapper.java @@ -70,7 +70,7 @@ public void resetBuffer() { super.resetBuffer(); } - public void onInjected(Void ignored) { + public void onInjected() { try { setHeader("x-datadog-rum-injected", "1"); } catch (Throwable ignored2) { diff --git a/dd-java-agent/instrumentation/servlet/request-3/src/main/java/datadog/trace/instrumentation/servlet3/WrappedServletOutputStream.java b/dd-java-agent/instrumentation/servlet/request-3/src/main/java/datadog/trace/instrumentation/servlet3/WrappedServletOutputStream.java index 32323a1cfff..31635a886e0 100644 --- a/dd-java-agent/instrumentation/servlet/request-3/src/main/java/datadog/trace/instrumentation/servlet3/WrappedServletOutputStream.java +++ b/dd-java-agent/instrumentation/servlet/request-3/src/main/java/datadog/trace/instrumentation/servlet3/WrappedServletOutputStream.java @@ -5,7 +5,6 @@ import java.io.IOException; import java.io.OutputStream; import java.lang.invoke.MethodHandle; -import java.util.function.Consumer; import javax.servlet.ServletOutputStream; import javax.servlet.WriteListener; @@ -26,10 +25,7 @@ private static final MethodHandle getMh(final String name) { } public WrappedServletOutputStream( - ServletOutputStream delegate, - byte[] marker, - byte[] contentToInject, - Consumer onInjected) { + ServletOutputStream delegate, byte[] marker, byte[] contentToInject, Runnable onInjected) { this.filtered = new InjectingPipeOutputStream(delegate, marker, contentToInject, onInjected); this.delegate = delegate; }