Skip to content

Improve rum injection output stream #9107

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;

/*
* Benchmark Mode Cnt Score Error Units
* InjectingPipeOutputStreamBenchmark.withPipe avgt 2 16.802 us/op
* InjectingPipeOutputStreamBenchmark.withoutPipe avgt 2 13.001 us/op
*/
@State(Scope.Benchmark)
@Warmup(iterations = 1, time = 30, timeUnit = SECONDS)
@Measurement(iterations = 2, time = 30, timeUnit = SECONDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.function.Consumer;

/**
* A circular buffer that holds n+1 bytes and with a lookbehind buffer of n bytes. The first time
* that the latest n bytes matches the marker, a content is injected before.
* A circular buffer with a lookbehind buffer of n bytes. The first time that the latest n bytes
* matches the marker, a content is injected before.
*/
public class InjectingPipeOutputStream extends FilterOutputStream {
private final byte[] lookbehind;
Expand All @@ -17,7 +16,7 @@ public class InjectingPipeOutputStream extends FilterOutputStream {
private final byte[] contentToInject;
private boolean found = false;
private int matchingPos = 0;
private final Consumer<Void> onContentInjected;
private final Runnable onContentInjected;

/**
* @param downstream the delegate output stream
Expand All @@ -29,7 +28,7 @@ public InjectingPipeOutputStream(
final OutputStream downstream,
final byte[] marker,
final byte[] contentToInject,
final Consumer<Void> onContentInjected) {
final Runnable onContentInjected) {
super(downstream);
this.marker = marker;
this.lookbehind = new byte[marker.length + 1];
Expand All @@ -44,47 +43,108 @@ public void write(int b) throws IOException {
out.write(b);
return;
}

if (bufferFilled) {
out.write(lookbehind[pos]);
}

lookbehind[pos] = (byte) b;
pos = (pos + 1) % lookbehind.length;

if (!bufferFilled) {
bufferFilled = pos == 0;
}

if (marker[matchingPos++] == b) {
if (matchingPos == marker.length) {
found = true;
out.write(lookbehind[pos]);
pos = (pos + 1) % lookbehind.length;
out.write(contentToInject);
if (onContentInjected != null) {
onContentInjected.accept(null);
onContentInjected.run();
}
drain((pos + 1) % lookbehind.length, marker.length);
return;
drain(lookbehind.length - 1);
}
} else {
matchingPos = 0;
}
}

if (!bufferFilled) {
bufferFilled = pos == lookbehind.length - 1;
@Override
public void write(byte[] b, int off, int len) throws IOException {
if (found) {
out.write(b, off, len);
return;
}
if (len > marker.length * 2) {
int idx = arrayContains(b, marker);
if (idx >= 0) {
// we have a full match. just write everything
found = true;
drain(lookbehind.length);
out.write(b, off, idx);
out.write(contentToInject);
if (onContentInjected != null) {
onContentInjected.run();
}
out.write(b, off + idx, len - idx);
} else {
// we don't have a full match. write everything in a bulk except the lookbehind buffer
// sequentially
for (int i = off; i < off + marker.length; i++) {
write(b[i]);
}
drain(lookbehind.length);
out.write(b, off + marker.length, len - marker.length * 2);
for (int i = len - marker.length; i < len; i++) {
write(b[i]);
}
drain(lookbehind.length);
}
} else {
// use slow path because the length to write is small and within the lookbehind buffer size
super.write(b, off, len);
}
}

if (bufferFilled) {
super.write(lookbehind[(pos + 1) % lookbehind.length]);
private int arrayContains(byte[] array, byte[] search) {
for (int i = 0; i < array.length - search.length; i++) {
if (array[i] == search[0]) {
boolean found = true;
int k = i;
for (int j = 1; j < search.length; j++) {
k++;
if (array[k] != search[j]) {
found = false;
break;
}
}
if (found) {
return i;
}
}
}
return -1;
}

private void drain(int from, int size) throws IOException {
while (size-- > 0) {
super.write(Character.valueOf((char) lookbehind[from]));
from = (from + 1) % lookbehind.length;
private void drain(int len) throws IOException {
if (bufferFilled) {
for (int i = 0; i < len; i++) {
out.write(lookbehind[(pos + i) % lookbehind.length]);
}
} else {
out.write(this.lookbehind, 0, pos);
}
pos = 0;
matchingPos = 0;
bufferFilled = false;
}

@Override
public void close() throws IOException {
if (!found) {
if (bufferFilled) {
drain((pos + 2) % lookbehind.length, marker.length - 1);
} else {
drain(0, pos);
}
drain(lookbehind.length);
}
super.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public void resetBuffer() {
super.resetBuffer();
}

public void onInjected(Void ignored) {
public void onInjected() {
try {
setHeader("x-datadog-rum-injected", "1");
} catch (Throwable ignored2) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import java.io.IOException;
import java.io.OutputStream;
import java.lang.invoke.MethodHandle;
import java.util.function.Consumer;
import javax.servlet.ServletOutputStream;
import javax.servlet.WriteListener;

Expand All @@ -26,10 +25,7 @@ private static final MethodHandle getMh(final String name) {
}

public WrappedServletOutputStream(
ServletOutputStream delegate,
byte[] marker,
byte[] contentToInject,
Consumer<Void> onInjected) {
ServletOutputStream delegate, byte[] marker, byte[] contentToInject, Runnable onInjected) {
this.filtered = new InjectingPipeOutputStream(delegate, marker, contentToInject, onInjected);
this.delegate = delegate;
}
Expand Down