Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,10 @@
import io.helidon.http.Headers;
import io.helidon.http.WritableHeaders;
import io.helidon.http.http2.Http2FrameData;
import io.helidon.http.http2.Http2FrameHeader;
import io.helidon.http.http2.Http2Headers;
import io.helidon.http.http2.Http2Ping;
import io.helidon.http.http2.Http2StreamState;
import io.helidon.webclient.http2.Http2ClientStream;
import io.helidon.webclient.http2.StreamTimeoutException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -184,58 +180,6 @@ private boolean isStreamOpen() {
&& clientStream.streamState() != Http2StreamState.CLOSED;
}

/**
* Send a ping to the server.
* <p>
* Do NOT use Http2ClientStream.sendPing()! It works once. A second ping results in sending garbage frames
* to the server (indirectly), and the server closes the connection. The exact cause is still unknown, but it may
* be related to the usage of this connection's flowControl object for sending the pings which may
* interfere with the regular data transfers occurring via this same connection concurrently with the ping.
* Another reason for this may be the fact that it uses a static HTTP2_PING object for sending pings, but
* it never rewind()'s the buffer that holds the ping payload, so the server may read bytes from a subsequent
* regular data frame and interpret them as the ping payload, which should break the HTTP2 connection as a whole.
* <p>
* There's Http2ClientConnection.ping() method that explicitly uses the FlowControl.Outbound.NOOP for sending
* new ping objects. However, that method is package-private.
* <p>
* So we implement our own sendPing() here that uses new Http2Ping objects and doesn't use the flowControl.
* <p>
* NOTE: Http2ClientStream methods use an Http2ConnectionWriter object via Http2ClientConnection.writer()
* to write data, and it's a wrapper around the ClientConnection's DataWriter object.
* And the Http2ConnectionWriter has some additional synchronization around DataWriter.write() calls.
* However, ironically, it doesn't synchronize access to the flowControl object. Regardless, there's no public
* methods to obtain a reference to the Http2ConnectionWriter or its internal lock. So we have to write
* to the ClientConnection's DataWriter object directly. Stress-testing hasn't revealed any thread-races so far.
* <p>
* It's difficult to imagine a situation where the thread-race could occur. Perhaps a single PbjGrpcClient
* (aka a single HTTP2 connection) and two streaming PbjGrpcCalls (aka HTTP2 streams) open concurrently,
* one being very chatty and another one being very silent. The latter may start sending pings while the former
* is sending requests to the server. However, this scenario seems very rare. If we ever encounter this issue,
* then it's easy to work-around by creating separate PbjGrpcClients for the two calls on the client side.
* To fix it, ideally we'd work with Helidon to expose the necessary APIs for synchronous writes. Alternatively,
* we could introduce a PbjGrpcClient-level outgoing queue and send all requests and pings through it as
* a work-around. However, this work-around may not fully cover the issue because Helidon can write window update
* frames for the flowControl changes concurrently still as it reads data from the stream/socket.
*/
private void sendPing() {
final Http2Ping ping = Http2Ping.create();
final Http2FrameData frameData = ping.toFrameData();
final Http2FrameHeader frameHeader = frameData.header();
if (frameHeader.length() == 0) {
throw new IllegalStateException("Ping with zero length. This should never happen.");
} else {
final BufferData headerData = frameHeader.write();
final BufferData data = frameData.data().copy();
try {
grpcClient.getClientConnection().writer().writeNow(BufferData.create(headerData, data));
} catch (IllegalStateException e) {
// It may throw IllegalStateException: Attempt to call writer() on a closed connection
// But callers usually expect an UncheckedIOException:
throw new UncheckedIOException(new IOException("sendPing failed", e));
}
}
}

private void receiveRepliesLoop() {
try {
Http2Headers http2Headers = null;
Expand All @@ -251,7 +195,7 @@ private void receiveRepliesLoop() {
// if the server died.
// FUTURE WORK: consider a separate KeepAlive timeout for these pings, so that we don't flood the
// network.
sendPing();
clientStream.sendPing();
}
} while (http2Headers == null && isStreamOpen());

Expand All @@ -269,7 +213,7 @@ private void receiveRepliesLoop() {
frameData = clientStream.readOne(grpcClient.getConfig().readTimeout());
} catch (StreamTimeoutException e) {
// Check if the connection is alive. See a comment above about the KeepAlive timeout.
sendPing();
clientStream.sendPing();
// FUTURE WORK: implement an uber timeout to return
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,7 @@ public void testReceiveRepliesLoopPingAndStreamClosed() {

runnable.run();

// A ping:
verify(dataWriter, times(1)).writeNow(any(BufferData.class));
verify(grpcClientStream, times(1)).sendPing();
verify(pipeline, times(1)).onComplete();
verifyNoMoreInteractions(pipeline);
}
Expand Down Expand Up @@ -292,7 +291,7 @@ public void testReceiveRepliesLoopSingleReply(final boolean isTimeout) throws Ex
verify(pipeline, times(1)).onNext(reply);
verify(pipeline, times(1)).onComplete();
// A ping:
if (isTimeout) verify(dataWriter, times(1)).writeNow(any(BufferData.class));
if (isTimeout) verify(grpcClientStream, times(1)).sendPing();
verifyNoMoreInteractions(pipeline);
}

Expand Down
Loading