2525import io .netty .channel .nio .NioEventLoopGroup ;
2626import io .netty .channel .socket .nio .NioServerSocketChannel ;
2727import io .netty .channel .socket .nio .NioSocketChannel ;
28+ import io .netty .util .concurrent .Future ;
2829import io .netty .util .concurrent .GlobalEventExecutor ;
2930import org .slf4j .Logger ;
3031import org .slf4j .LoggerFactory ;
7879import org .terasology .reflection .metadata .ClassLibrary ;
7980import org .terasology .reflection .metadata .ClassMetadata ;
8081import org .terasology .reflection .metadata .FieldMetadata ;
82+ import reactor .core .publisher .Flux ;
83+ import reactor .core .publisher .Mono ;
84+ import reactor .netty .FutureMono ;
8185
8286import java .net .BindException ;
8387import java .net .InetSocketAddress ;
88+ import java .time .Duration ;
89+ import java .util .ArrayList ;
8490import java .util .Collection ;
8591import java .util .List ;
8692import java .util .Map ;
8793import java .util .Optional ;
8894import java .util .Set ;
8995import java .util .concurrent .BlockingQueue ;
96+ import java .util .concurrent .TimeUnit ;
9097
9198import static org .terasology .engine .registry .InjectionHelper .createWithConstructorInjection ;
9299
95102 * Implementation of the Network System using Netty and TCP/IP
96103 */
97104public class NetworkSystemImpl implements EntityChangeSubscriber , NetworkSystem {
105+ public static int shutdownQuietMs = 2_000 ;
106+ public static int shutdownTimeoutMs = 15_000 ;
107+
98108 private static final Logger logger = LoggerFactory .getLogger (NetworkSystemImpl .class );
99109 private static final int OWNER_DEPTH_LIMIT = 50 ;
100110 private static final int NET_TICK_RATE = 50 ;
101111 private static final int NULL_NET_ID = 0 ;
112+
102113 private final Set <Client > clientList = Sets .newLinkedHashSet ();
103114 private final Set <NetClient > netClientList = Sets .newLinkedHashSet ();
104115 // Shared
@@ -231,7 +242,8 @@ public JoinStatus join(String address, int port) throws InterruptedException {
231242 } else {
232243 logger .warn ("Failed to connect to server" , connectCheck .cause ());
233244 connectCheck .channel ().closeFuture ().awaitUninterruptibly ();
234- clientGroup .shutdownGracefully ().syncUninterruptibly ();
245+ clientGroup .shutdownGracefully (shutdownQuietMs , shutdownTimeoutMs , TimeUnit .MILLISECONDS )
246+ .syncUninterruptibly ();
235247 return new JoinStatusImpl ("Failed to connect to server - " + connectCheck .cause ().getMessage ());
236248 }
237249 }
@@ -251,24 +263,28 @@ public JoinStatus join(String address, int port) throws InterruptedException {
251263 @ Override
252264 public void shutdown () {
253265 allChannels .close ().awaitUninterruptibly ();
266+ List <Future <?>> shutdowns = new ArrayList <>(3 );
254267 if (serverChannelFuture != null ) {
255- serverChannelFuture .channel ().closeFuture ();
256268 // Wait until all threads are terminated.
257- try {
258- bossGroup .shutdownGracefully ().sync ();
259- workerGroup .shutdownGracefully ().sync ();
260- bossGroup .terminationFuture ().sync ();
261- workerGroup .terminationFuture ().sync ();
262- } catch (InterruptedException e ) {
263- logger .error ("Cannot terminateFuture - interrupted" );
264- throw new RuntimeException (e );
265- }
266-
269+ shutdowns .add (bossGroup .shutdownGracefully (shutdownQuietMs , shutdownTimeoutMs , TimeUnit .MILLISECONDS ));
270+ shutdowns .add (workerGroup .shutdownGracefully (shutdownQuietMs , shutdownTimeoutMs , TimeUnit .MILLISECONDS ));
267271 }
268272 if (clientGroup != null ) {
269- clientGroup .shutdownGracefully (). syncUninterruptibly ( );
273+ shutdowns . add ( clientGroup .shutdownGracefully (shutdownQuietMs , shutdownTimeoutMs , TimeUnit . MILLISECONDS ) );
270274 }
275+
271276 // Shut down all event loops to terminate all threads.
277+ // I want their timeouts to count in parallel, instead of blocking on one after the other,
278+ // but turning the netty Future into something we can do this with is a bit of a mess until
279+ // we switch to using reactor-netty consistently.
280+ Mono .whenDelayError (
281+ Flux .fromIterable (shutdowns )
282+ .map (x -> {
283+ @ SuppressWarnings ("unchecked" ) Future <Void > f = (Future <Void >) x ;
284+ return FutureMono .from (f );
285+ })
286+ .collectList ()
287+ ).block (Duration .ofMillis (shutdownTimeoutMs ));
272288
273289 processPendingDisconnects ();
274290 clientList .forEach (this ::processRemovedClient );
0 commit comments