Skip to content

8358764: (sc) SocketChannel.close when thread blocked in read causes connection to be reset (win) #25700

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/java.base/share/classes/sun/nio/ch/Net.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,15 @@ static boolean useExclusiveBind() {
return EXCLUSIVE_BIND;
}

private static final StableValue<Boolean> SHUTDOWN_WRITE_BEFORE_CLOSE = StableValue.of();

/**
* Tells whether a TCP connection should be shutdown for writing before closing.
*/
static boolean shouldShutdownWriteBeforeClose() {
return SHUTDOWN_WRITE_BEFORE_CLOSE.orElseSet(Net::shouldShutdownWriteBeforeClose0);
}

/**
* Tells whether both IPV6_XXX and IP_XXX socket options should be set on
* IPv6 sockets. On some kernels, both IPV6_XXX and IP_XXX socket options
Expand Down Expand Up @@ -462,6 +471,8 @@ private static boolean isFastTcpLoopbackRequested() {
*/
private static native int isExclusiveBindAvailable();

private static native boolean shouldShutdownWriteBeforeClose0();

private static native boolean shouldSetBothIPv4AndIPv6Options0();

private static native boolean canIPv6SocketJoinIPv4Group0();
Expand Down
42 changes: 26 additions & 16 deletions src/java.base/share/classes/sun/nio/ch/SocketChannelImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -846,7 +846,7 @@ public boolean isConnectionPending() {
/**
* Marks the beginning of a connect operation that might block.
* @param blocking true if configured blocking
* @param isa the remote address
* @param sa the remote socket address
* @throws ClosedChannelException if the channel is closed
* @throws AlreadyConnectedException if already connected
* @throws ConnectionPendingException is a connection is pending
Expand Down Expand Up @@ -1070,8 +1070,8 @@ public boolean finishConnect() throws IOException {
}

/**
* Closes the socket if there are no I/O operations in progress and the
* channel is not registered with a Selector.
* Closes the socket if there are no I/O operations in progress (or no I/O
* operations tracked), and the channel is not registered with a Selector.
*/
private boolean tryClose() throws IOException {
assert Thread.holdsLock(stateLock) && state == ST_CLOSING;
Expand All @@ -1096,11 +1096,21 @@ private void tryFinishClose() {
}

/**
* Closes this channel when configured in blocking mode.
* Closes this channel when configured in blocking mode. If there are no I/O
* operations in progress (or tracked), then the channel's socket is closed. If
* there are I/O operations in progress then the behavior is platform specific.
*
* If there is an I/O operation in progress then the socket is pre-closed
* and the I/O threads signalled, in which case the final close is deferred
* until all I/O operations complete.
* On Unix systems, the channel's socket is pre-closed. This unparks any virtual
* threads that are blocked in I/O operations on this channel. If there are
* platform threads blocked on the channel's socket then the socket is dup'ed
* and the platform threads signalled. The final close is deferred until all I/O
* operations complete.
*
* On Windows, the channel's socket is pre-closed. This unparks any virtual
* threads that are blocked in I/O operations on this channel. If there are no
* virtual threads blocked in I/O operations on this channel then the channel's
* socket is closed. If there are virtual threads in I/O then the final close is
* deferred until all I/O operations on virtual threads complete.
*
* Note that a channel configured blocking may be registered with a Selector
* This arises when a key is canceled and the channel configured to blocking
Expand All @@ -1112,17 +1122,17 @@ private void implCloseBlockingMode() throws IOException {
boolean connected = (state == ST_CONNECTED);
state = ST_CLOSING;

if (!tryClose()) {
if (connected && Net.shouldShutdownWriteBeforeClose()) {
// shutdown output when linger interval not set to 0
if (connected) {
try {
var SO_LINGER = StandardSocketOptions.SO_LINGER;
if ((int) Net.getSocketOption(fd, SO_LINGER) != 0) {
Net.shutdown(fd, Net.SHUT_WR);
}
} catch (IOException ignore) { }
}
try {
var SO_LINGER = StandardSocketOptions.SO_LINGER;
if ((int) Net.getSocketOption(fd, SO_LINGER) != 0) {
Net.shutdown(fd, Net.SHUT_WR);
}
} catch (IOException ignore) { }
}

if (!tryClose()) {
// prepare file descriptor for closing
nd.preClose(fd, readerThread, writerThread);
}
Expand Down
5 changes: 5 additions & 0 deletions src/java.base/unix/native/libnio/ch/Net.c
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,11 @@ Java_sun_nio_ch_Net_isExclusiveBindAvailable(JNIEnv *env, jclass clazz) {
return -1;
}

JNIEXPORT jboolean JNICALL
Java_sun_nio_ch_Net_shouldShutdownWriteBeforeClose0(JNIEnv *env, jclass clazz) {
return JNI_FALSE;
}

JNIEXPORT jboolean JNICALL
Java_sun_nio_ch_Net_shouldSetBothIPv4AndIPv6Options0(JNIEnv* env, jclass cl)
{
Expand Down
7 changes: 6 additions & 1 deletion src/java.base/windows/native/libnio/ch/Net.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2001, 2023, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2001, 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand Down Expand Up @@ -117,6 +117,11 @@ Java_sun_nio_ch_Net_isExclusiveBindAvailable(JNIEnv *env, jclass clazz) {
return 1;
}

JNIEXPORT jboolean JNICALL
Java_sun_nio_ch_Net_shouldShutdownWriteBeforeClose0(JNIEnv *env, jclass clazz) {
return JNI_TRUE;
}

JNIEXPORT jboolean JNICALL
Java_sun_nio_ch_Net_shouldSetBothIPv4AndIPv6Options0(JNIEnv* env, jclass cl)
{
Expand Down
195 changes: 195 additions & 0 deletions test/jdk/java/nio/channels/SocketChannel/PeerReadsAfterAsyncClose.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
/*
* Copyright (c) 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/

/*
* @test
* @bug 8358764
* @summary Test closing a socket while a thread is blocked in read. The connection
* should be closed gracefuly so that the peer reads EOF.
* @run junit PeerReadsAfterAsyncClose
*/

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;

import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import static org.junit.jupiter.api.Assertions.*;

class PeerReadsAfterAsyncClose {

static Stream<ThreadFactory> factories() {
return Stream.of(Thread.ofPlatform().factory(), Thread.ofVirtual().factory());
}

/**
* Close SocketChannel while a thread is blocked reading from the channel's socket.
*/
@ParameterizedTest
@MethodSource("factories")
void testCloseDuringSocketChannelRead(ThreadFactory factory) throws Exception {
var loopback = InetAddress.getLoopbackAddress();
try (var listener = new ServerSocket()) {
listener.bind(new InetSocketAddress(loopback, 0));

try (SocketChannel sc = SocketChannel.open(listener.getLocalSocketAddress());
Socket peer = listener.accept()) {

// start thread to read from channel
var cceThrown = new AtomicBoolean();
Thread thread = factory.newThread(() -> {
try {
sc.read(ByteBuffer.allocate(1));
fail();
} catch (ClosedChannelException e) {
cceThrown.set(true);
} catch (Throwable e) {
e.printStackTrace();
}
});
thread.start();
try {
// close SocketChannel when thread sampled in implRead
onReach(thread, "sun.nio.ch.SocketChannelImpl.implRead", () -> {
try {
sc.close();
} catch (IOException ignore) { }
});

// peer should read EOF
int n = peer.getInputStream().read();
assertEquals(-1, n);
} finally {
thread.join();
}
assertEquals(true, cceThrown.get(), "ClosedChannelException not thrown");
}
}
}

/**
* Close Socket while a thread is blocked reading from the socket.
*/
@ParameterizedTest
@MethodSource("factories")
void testCloseDuringSocketUntimedRead(ThreadFactory factory) throws Exception {
testCloseDuringSocketRead(factory, 0);
}

/**
* Close Socket while a thread is blocked reading from the socket with a timeout.
*/
@ParameterizedTest
@MethodSource("factories")
void testCloseDuringSockeTimedRead(ThreadFactory factory) throws Exception {
testCloseDuringSocketRead(factory, 60_000);
}

private void testCloseDuringSocketRead(ThreadFactory factory, int timeout) throws Exception {
var loopback = InetAddress.getLoopbackAddress();
try (var listener = new ServerSocket()) {
listener.bind(new InetSocketAddress(loopback, 0));

try (Socket s = new Socket(loopback, listener.getLocalPort());
Socket peer = listener.accept()) {

// start thread to read from socket
var seThrown = new AtomicBoolean();
Thread thread = factory.newThread(() -> {
try {
s.setSoTimeout(timeout);
s.getInputStream().read();
fail();
} catch (SocketException e) {
seThrown.set(true);
} catch (Throwable e) {
e.printStackTrace();
}
});
thread.start();
try {
// close Socket when thread sampled in implRead
onReach(thread, "sun.nio.ch.NioSocketImpl.implRead", () -> {
try {
s.close();
} catch (IOException ignore) { }
});

// peer should read EOF
int n = peer.getInputStream().read();
assertEquals(-1, n);
} finally {
thread.join();
}
assertEquals(true, seThrown.get(), "SocketException not thrown");
}
}
}

/**
* Runs the given action when the given target thread is sampled at the given
* location. The location takes the form "{@code c.m}" where
* {@code c} is the fully qualified class name and {@code m} is the method name.
*/
private void onReach(Thread target, String location, Runnable action) {
int index = location.lastIndexOf('.');
String className = location.substring(0, index);
String methodName = location.substring(index + 1);
Thread.ofPlatform().daemon(true).start(() -> {
try {
boolean found = false;
while (!found) {
found = contains(target.getStackTrace(), className, methodName);
if (!found) {
Thread.sleep(20);
}
}
action.run();
} catch (Exception e) {
e.printStackTrace();
}
});
}

/**
* Returns true if the given stack trace contains an element for the given class
* and method name.
*/
private boolean contains(StackTraceElement[] stack, String className, String methodName) {
return Arrays.stream(stack)
.anyMatch(e -> className.equals(e.getClassName())
&& methodName.equals(e.getMethodName()));
}
}