Skip to content

Commit b2ceb1a

Browse files
authored
Tuning futures used for internal tracking (#1216)
1 parent 0e4f39c commit b2ceb1a

File tree

1 file changed

+25
-9
lines changed

1 file changed

+25
-9
lines changed

src/main/java/io/nats/client/impl/NatsConnection.java

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1143,9 +1143,11 @@ String getResponseToken(String responseInbox) {
11431143

11441144
void cleanResponses(boolean closing) {
11451145
ArrayList<String> toRemove = new ArrayList<>();
1146+
boolean wasInterrupted = false;
11461147

1147-
responsesAwaiting.forEach((key, future) -> {
1148+
for (Map.Entry<String, NatsRequestCompletableFuture> entry : responsesAwaiting.entrySet()) {
11481149
boolean remove = false;
1150+
NatsRequestCompletableFuture future = entry.getValue();
11491151
if (future.hasExceededTimeout()) {
11501152
remove = true;
11511153
future.cancelTimedOut();
@@ -1158,25 +1160,39 @@ else if (future.isDone()) {
11581160
// done should have already been removed, not sure if
11591161
// this even needs checking, but it won't hurt
11601162
remove = true;
1163+
try {
1164+
future.get();
1165+
}
1166+
catch (InterruptedException e) {
1167+
Thread.currentThread().interrupt();
1168+
// we might have collected some entries already, but were interrupted
1169+
// break out so we finish as quick as possible
1170+
// cleanResponses will be called again anyway
1171+
wasInterrupted = true;
1172+
break;
1173+
}
1174+
catch (ExecutionException ignore) {}
11611175
}
11621176

11631177
if (remove) {
1164-
toRemove.add(key);
1178+
toRemove.add(entry.getKey());
11651179
statistics.decrementOutstandingRequests();
11661180
}
1167-
});
1181+
}
11681182

1169-
for (String token : toRemove) {
1170-
responsesAwaiting.remove(token);
1183+
for (String key : toRemove) {
1184+
responsesAwaiting.remove(key);
11711185
}
11721186

1173-
if (advancedTracking) {
1187+
if (advancedTracking && !wasInterrupted) {
11741188
toRemove.clear(); // just reuse this
1175-
responsesRespondedTo.forEach((key, future) -> {
1189+
for (Map.Entry<String, NatsRequestCompletableFuture> entry : responsesRespondedTo.entrySet()) {
1190+
NatsRequestCompletableFuture future = entry.getValue();
11761191
if (future.hasExceededTimeout()) {
1177-
toRemove.add(key);
1192+
toRemove.add(entry.getKey());
1193+
future.cancelTimedOut();
11781194
}
1179-
});
1195+
}
11801196

11811197
for (String token : toRemove) {
11821198
responsesRespondedTo.remove(token);

0 commit comments

Comments
 (0)