Skip to content

Commit 2e52e89

Browse files
authored
Minor refactoring and regression test change. (#1204)
* Tuning to improve memory use. Trying to address super high volume consuming. * Better tests * fixed test to not run against before 2.9 * fixed failed merge * fixed failed merge
1 parent 93ad1c2 commit 2e52e89

File tree

7 files changed

+81
-95
lines changed

7 files changed

+81
-95
lines changed

src/main/java/io/nats/client/impl/NatsConnection.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ class NatsConnection implements Connection {
101101

102102
private final ServerPool serverPool;
103103
private final DispatcherFactory dispatcherFactory;
104-
private final CancelAction cancelAction;
104+
final CancelAction cancelAction;
105105

106106
private final boolean trace;
107107
private final TimeTraceLogger timeTraceLogger;
@@ -970,9 +970,9 @@ public void publish(Message message) {
970970
publishInternal(message.getSubject(), message.getReplyTo(), message.getHeaders(), message.getData(), false);
971971
}
972972

973-
void publishInternal(String subject, String replyTo, Headers headers, byte[] data, boolean validateSubRep) {
973+
void publishInternal(String subject, String replyTo, Headers headers, byte[] data, boolean validateSubjectAndReplyTo) {
974974
checkPayloadSize(data);
975-
NatsPublishableMessage npm = new NatsPublishableMessage(subject, replyTo, headers, data, validateSubRep);
975+
NatsPublishableMessage npm = new NatsPublishableMessage(subject, replyTo, headers, data, validateSubjectAndReplyTo);
976976
if (npm.hasHeaders && !serverInfo.get().isHeadersSupported()) {
977977
throw new IllegalArgumentException("Headers are not supported by the server, version: " + serverInfo.get().getVersion());
978978
}
@@ -1211,8 +1211,8 @@ public Message request(Message message, Duration timeout) throws InterruptedExce
12111211
return requestInternal(message.getSubject(), message.getHeaders(), message.getData(), timeout, cancelAction, false);
12121212
}
12131213

1214-
Message requestInternal(String subject, Headers headers, byte[] data, Duration timeout, CancelAction cancelAction, boolean validateSubRep) throws InterruptedException {
1215-
CompletableFuture<Message> incoming = requestFutureInternal(subject, headers, data, timeout, cancelAction, validateSubRep);
1214+
Message requestInternal(String subject, Headers headers, byte[] data, Duration timeout, CancelAction cancelAction, boolean validateSubjectAndReplyTo) throws InterruptedException {
1215+
CompletableFuture<Message> incoming = requestFutureInternal(subject, headers, data, timeout, cancelAction, validateSubjectAndReplyTo);
12161216
try {
12171217
return incoming.get(timeout.toNanos(), TimeUnit.NANOSECONDS);
12181218
} catch (TimeoutException | ExecutionException | CancellationException e) {
@@ -1270,7 +1270,7 @@ public CompletableFuture<Message> request(Message message) {
12701270
return requestFutureInternal(message.getSubject(), message.getHeaders(), message.getData(), null, cancelAction, false);
12711271
}
12721272

1273-
CompletableFuture<Message> requestFutureInternal(String subject, Headers headers, byte[] data, Duration futureTimeout, CancelAction cancelAction, boolean validateSubRep) {
1273+
CompletableFuture<Message> requestFutureInternal(String subject, Headers headers, byte[] data, Duration futureTimeout, CancelAction cancelAction, boolean validateSubjectAndReplyTo) {
12741274
checkPayloadSize(data);
12751275

12761276
if (isClosed()) {
@@ -1322,7 +1322,7 @@ CompletableFuture<Message> requestFutureInternal(String subject, Headers headers
13221322
responsesAwaiting.put(sub.getSID(), future);
13231323
}
13241324

1325-
publishInternal(subject, responseInbox, headers, data, validateSubRep);
1325+
publishInternal(subject, responseInbox, headers, data, validateSubjectAndReplyTo);
13261326
statistics.incrementRequestsSent();
13271327

13281328
return future;

src/main/java/io/nats/client/impl/NatsJetStream.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -142,29 +142,29 @@ public CompletableFuture<PublishAck> publishAsync(Message message, PublishOption
142142
return publishAsyncInternal(message.getSubject(), message.getHeaders(), message.getData(), options, null, false);
143143
}
144144

145-
private PublishAck publishSyncInternal(String subject, Headers headers, byte[] data, PublishOptions options, boolean validateSubRep) throws IOException, JetStreamApiException {
145+
private PublishAck publishSyncInternal(String subject, Headers headers, byte[] data, PublishOptions options, boolean validateSubjectAndReplyTo) throws IOException, JetStreamApiException {
146146
Headers merged = mergePublishOptions(headers, options);
147147

148148
if (jso.isPublishNoAck()) {
149-
conn.publishInternal(subject, null, merged, data, validateSubRep);
149+
conn.publishInternal(subject, null, merged, data, validateSubjectAndReplyTo);
150150
return null;
151151
}
152152

153153
Duration timeout = options == null ? jso.getRequestTimeout() : options.getStreamTimeout();
154154

155-
Message resp = makeInternalRequestResponseRequired(subject, merged, data, timeout, CancelAction.COMPLETE, validateSubRep);
155+
Message resp = makeInternalRequestResponseRequired(subject, merged, data, timeout, CancelAction.COMPLETE, validateSubjectAndReplyTo);
156156
return processPublishResponse(resp, options);
157157
}
158158

159-
private CompletableFuture<PublishAck> publishAsyncInternal(String subject, Headers headers, byte[] data, PublishOptions options, Duration knownTimeout, boolean validateSubRep) {
159+
private CompletableFuture<PublishAck> publishAsyncInternal(String subject, Headers headers, byte[] data, PublishOptions options, Duration knownTimeout, boolean validateSubjectAndReplyTo) {
160160
Headers merged = mergePublishOptions(headers, options);
161161

162162
if (jso.isPublishNoAck()) {
163-
conn.publishInternal(subject, null, merged, data, validateSubRep);
163+
conn.publishInternal(subject, null, merged, data, validateSubjectAndReplyTo);
164164
return null;
165165
}
166166

167-
CompletableFuture<Message> future = conn.requestFutureInternal(subject, merged, data, knownTimeout, CancelAction.COMPLETE, validateSubRep);
167+
CompletableFuture<Message> future = conn.requestFutureInternal(subject, merged, data, knownTimeout, CancelAction.COMPLETE, validateSubjectAndReplyTo);
168168

169169
return future.thenCompose(resp -> {
170170
try {

src/main/java/io/nats/client/impl/NatsJetStreamImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -236,9 +236,9 @@ Message makeRequestResponseRequired(String subject, byte[] bytes, Duration timeo
236236
}
237237
}
238238

239-
Message makeInternalRequestResponseRequired(String subject, Headers headers, byte[] data, Duration timeout, CancelAction cancelAction, boolean validateSubRep) throws IOException {
239+
Message makeInternalRequestResponseRequired(String subject, Headers headers, byte[] data, Duration timeout, CancelAction cancelAction, boolean validateSubjectAndReplyTo) throws IOException {
240240
try {
241-
return responseRequired(conn.requestInternal(subject, headers, data, timeout, cancelAction, validateSubRep));
241+
return responseRequired(conn.requestInternal(subject, headers, data, timeout, cancelAction, validateSubjectAndReplyTo));
242242
} catch (InterruptedException e) {
243243
throw new IOException(e);
244244
}

src/main/java/io/nats/client/impl/NatsJetStreamMetaData.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,11 @@
1414
package io.nats.client.impl;
1515

1616
import io.nats.client.support.DateTimeUtils;
17+
1718
import java.time.ZonedDateTime;
1819

1920
/**
20-
* Jetstream meta data about a message, when applicable.
21+
* Jetstream Metadata about a message, when applicable.
2122
*/
2223
public class NatsJetStreamMetaData {
2324

src/main/java/io/nats/client/impl/NatsPublishableMessage.java

Lines changed: 13 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@
1313

1414
package io.nats.client.impl;
1515

16-
import io.nats.client.support.ByteArrayBuilder;
17-
1816
import static io.nats.client.support.Validator.validateReplyTo;
1917
import static io.nats.client.support.Validator.validateSubject;
2018

@@ -25,56 +23,28 @@ public NatsPublishableMessage(boolean hasHeaders) {
2523
this.hasHeaders = hasHeaders;
2624
}
2725

28-
public NatsPublishableMessage(String subject, String replyTo, Headers headers, byte[] data, boolean validateSubRep) {
26+
public NatsPublishableMessage(String subject, String replyTo, Headers headers, byte[] data, boolean validateSubjectAndReplyTo) {
2927
super(data);
30-
this.subject = validateSubRep ? validateSubject(subject, true) : subject;
31-
this.replyTo = validateSubRep ? validateReplyTo(replyTo, false) : replyTo;
28+
if (validateSubjectAndReplyTo) {
29+
this.subject = validateSubject(subject, true);
30+
this.replyTo = validateReplyTo(replyTo, false);
31+
}
32+
else {
33+
this.subject = subject;
34+
this.replyTo = replyTo;
35+
}
3236
if (headers == null || headers.isEmpty()) {
3337
hasHeaders = false;
3438
}
3539
else {
3640
hasHeaders = true;
37-
headers = headers.isReadOnly() ? headers : new Headers(headers, true, null);
41+
this.headers = headers.isReadOnly() ? headers : new Headers(headers, true, null);
3842
}
39-
this.headers = new Headers(headers, false, null);
40-
calculate();
43+
super.calculate();
4144
}
4245

4346
@Override
44-
ByteArrayBuilder getProtocolBab() {
45-
// compared to base class, skips calling calculate()
46-
return protocolBab;
47-
}
48-
49-
@Override
50-
long getSizeInBytes() {
51-
// compared to base class, skips calling calculate()
52-
return sizeInBytes;
53-
}
54-
55-
@Override
56-
byte[] getProtocolBytes() {
57-
// compared to base class, skips calling calculate()
58-
return protocolBab.toByteArray();
59-
}
60-
61-
@Override
62-
int getControlLineLength() {
63-
// compared to base class, skips calling calculate()
64-
return controlLineLength;
65-
}
66-
67-
/**
68-
* @param destPosition the position index in destination byte array to start
69-
* @param dest is the byte array to write to
70-
* @return the length of the header
71-
*/
72-
@Override
73-
int copyNotEmptyHeaders(int destPosition, byte[] dest) {
74-
// compared to base class, skips calling calculate()
75-
if (headerLen > 0) {
76-
return headers.serializeToArray(destPosition, dest);
77-
}
78-
return 0;
47+
protected void calculate() {
48+
// it's already done in the constructor
7949
}
8050
}

src/test/java/io/nats/client/impl/NatsJetStreamMetaDataTests.java

Lines changed: 32 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,10 @@
1414
package io.nats.client.impl;
1515

1616
import io.nats.client.Message;
17+
import org.junit.jupiter.api.Test;
18+
1719
import java.time.ZoneId;
1820
import java.time.ZonedDateTime;
19-
import org.junit.jupiter.api.Test;
2021

2122
import static org.junit.jupiter.api.Assertions.*;
2223

@@ -29,25 +30,19 @@ public void testMetaData() {
2930
assertNotNull(msg.metaData()); // 2nd time, coverage lazy check is not null
3031
assertNotNull(meta.toString()); // COVERAGE toString
3132

32-
validateMeta(false, false, getTestMessage(TestMetaV0).metaData());
33-
validateMeta(true, false, getTestMessage(TestMetaV1).metaData());
34-
validateMeta(true, true, getTestMessage(TestMetaV2).metaData());
35-
validateMeta(true, true, getTestMessage(TestMetaVFuture).metaData());
36-
37-
assertThrows(IllegalArgumentException.class, () -> getTestMessage(InvalidMetaLt8Tokens).metaData());
38-
assertThrows(IllegalArgumentException.class, () -> getTestMessage(InvalidMeta10Tokens).metaData());
39-
assertThrows(IllegalArgumentException.class, () -> getTestMessage(InvalidMetaData).metaData());
40-
41-
// InvalidMetaNoAck is actually not even a JS message
42-
assertThrows(IllegalStateException.class, () -> getTestMessage(InvalidMetaNoAck).metaData());
33+
validateMeta(false, false, getTestMessage(TestMetaV0));
34+
validateMeta(true, false, getTestMessage(TestMetaV1));
35+
validateMeta(true, true, getTestMessage(TestMetaV2));
36+
validateMeta(true, true, getTestMessage(TestMetaVFuture));
4337

4438
// since I can't make a JS message directly, do it indirectly
4539
NatsMessage nm = getTestMessage(InvalidMetaLt8Tokens);
4640
nm.replyTo = InvalidMetaNoAck;
4741
assertThrows(IllegalArgumentException.class, nm::metaData);
4842
}
4943

50-
private void validateMeta(boolean hasPending, boolean hasDomainHashToken, NatsJetStreamMetaData meta) {
44+
private void validateMeta(boolean hasPending, boolean hasDomainHashToken, Message msg) {
45+
NatsJetStreamMetaData meta = msg.metaData();
5146
assertEquals("test-stream", meta.getStream());
5247
assertEquals("test-consumer", meta.getConsumer());
5348
assertEquals(1, meta.deliveredCount());
@@ -75,20 +70,38 @@ private void validateMeta(boolean hasPending, boolean hasDomainHashToken, NatsJe
7570
}
7671

7772
@Test
78-
public void testInvalidMetaDataConstruction() {
73+
public void testNotInVersion() {
74+
assertEquals(-1, new NatsJetStreamMetaData(getTestMessage(TestMetaV0)).pendingCount());
75+
assertNull(new NatsJetStreamMetaData(getTestMessage(TestMetaV0)).getDomain());
76+
assertNull(new NatsJetStreamMetaData(getTestMessage(TestMetaV0)).getAccountHash());
77+
assertNull(new NatsJetStreamMetaData(getTestMessage(TestMetaV1)).getDomain());
78+
assertNull(new NatsJetStreamMetaData(getTestMessage(TestMetaV1)).getAccountHash());
79+
}
80+
81+
@Test
82+
public void testInvalidMetaData() {
83+
assertThrows(IllegalArgumentException.class, () -> getTestMessage(InvalidMetaLt8Tokens).metaData());
84+
assertThrows(IllegalArgumentException.class, () -> getTestMessage(InvalidMeta10Tokens).metaData());
85+
86+
// InvalidMetaNoAck is actually not even a JS message
87+
assertThrows(IllegalStateException.class, () -> getTestMessage(InvalidMetaNoAck).metaData());
88+
89+
assertThrows(IllegalArgumentException.class,
90+
() -> new NatsJetStreamMetaData(getTestMessage("$JS.invalid.test-stream.test-consumer.1.2.3.1605139610113260000")));
91+
7992
assertThrows(IllegalArgumentException.class,
80-
() -> new NatsJetStreamMetaData(NatsMessage.builder().subject("test").build()));
93+
() -> new NatsJetStreamMetaData(getTestMessage("$JS.ACK.not.enough.parts")));
8194

8295
assertThrows(IllegalArgumentException.class,
83-
() -> new NatsJetStreamMetaData(getTestMessage("$JS.ACK.not.enough.parts")));
96+
() -> new NatsJetStreamMetaData(getTestMessage("$JS.ACK.test-stream.test-consumer.invalid.2.3.1605139610113260000")));
8497

8598
assertThrows(IllegalArgumentException.class,
86-
() -> new NatsJetStreamMetaData(getTestMessage(TestMetaV0 + ".too.many.parts")));
99+
() -> new NatsJetStreamMetaData(getTestMessage("$JS.ACK.test-stream.test-consumer.1.invalid.3.1605139610113260000")));
87100

88101
assertThrows(IllegalArgumentException.class,
89-
() -> new NatsJetStreamMetaData(getTestMessage("$JS.ZZZ.enough.parts.though.need.three.more")));
102+
() -> new NatsJetStreamMetaData(getTestMessage("$JS.ACK.test-stream.test-consumer.1.2.invalid.1605139610113260000")));
90103

91104
assertThrows(IllegalArgumentException.class,
92-
() -> new NatsJetStreamMetaData(new NatsMessage("sub", null, new byte[0])));
105+
() -> new NatsJetStreamMetaData(getTestMessage("$JS.ACK.test-stream.test-consumer.1.2.3.1605139610113260000.invalid")));
93106
}
94107
}

src/test/java/io/nats/client/impl/ReconnectTests.java

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -765,28 +765,30 @@ public boolean includeAllServers() {
765765
@Test
766766
public void testForceReconnectQueueBehaviorCheck() throws Exception {
767767
runInJsCluster((nc0, nc1, nc2) -> {
768-
int pubCount = 100_000;
769-
int subscribeTime = 5000;
770-
int flushWait = 2500;
771-
int port = nc0.getServerInfo().getPort();
768+
if (atLeast2_9_0(nc0)) {
769+
int pubCount = 100_000;
770+
int subscribeTime = 5000;
771+
int flushWait = 2500;
772+
int port = nc0.getServerInfo().getPort();
772773

773-
ForceReconnectQueueCheckDataPort.DELAY = 75;
774+
ForceReconnectQueueCheckDataPort.DELAY = 75;
774775

775-
String subject = subject();
776-
ForceReconnectQueueCheckDataPort.WRITE_CHECK = "PUB " + subject;
777-
_testForceReconnectQueueCheck(subject, pubCount, subscribeTime, port, false, 0);
776+
String subject = subject();
777+
ForceReconnectQueueCheckDataPort.WRITE_CHECK = "PUB " + subject;
778+
_testForceReconnectQueueCheck(subject, pubCount, subscribeTime, port, false, 0);
778779

779-
subject = subject();
780-
ForceReconnectQueueCheckDataPort.WRITE_CHECK = "PUB " + subject;
781-
_testForceReconnectQueueCheck(subject, pubCount, subscribeTime, port, false, flushWait);
780+
subject = subject();
781+
ForceReconnectQueueCheckDataPort.WRITE_CHECK = "PUB " + subject;
782+
_testForceReconnectQueueCheck(subject, pubCount, subscribeTime, port, false, flushWait);
782783

783-
subject = subject();
784-
ForceReconnectQueueCheckDataPort.WRITE_CHECK = "PUB " + subject;
785-
_testForceReconnectQueueCheck(subject, pubCount, subscribeTime, port, true, 0);
784+
subject = subject();
785+
ForceReconnectQueueCheckDataPort.WRITE_CHECK = "PUB " + subject;
786+
_testForceReconnectQueueCheck(subject, pubCount, subscribeTime, port, true, 0);
786787

787-
subject = subject();
788-
ForceReconnectQueueCheckDataPort.WRITE_CHECK = "PUB " + subject;
789-
_testForceReconnectQueueCheck(subject, pubCount, subscribeTime, port, true, flushWait);
788+
subject = subject();
789+
ForceReconnectQueueCheckDataPort.WRITE_CHECK = "PUB " + subject;
790+
_testForceReconnectQueueCheck(subject, pubCount, subscribeTime, port, true, flushWait);
791+
}
790792
});
791793
}
792794

0 commit comments

Comments
 (0)