-
Notifications
You must be signed in to change notification settings - Fork 182
Add batch direct get #1229
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add batch direct get #1229
Conversation
|
||
CachedStreamInfo csi = getCachedStreamInfo(streamName); | ||
if (!csi.allowDirect) { | ||
throw new IllegalArgumentException("Allow direct not enabled for stream: " + streamName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two non-server error conditions:
- either
AllowDirect
is not enabled, in which case we'd get a timeout from the server - the server is not 2.11+ and doesn't support this feature yet
Currently made both of them IllegalArgumentException
, but not sure if that's the right fit..
JetStreamApiException
doesn't fit either as it's for server errors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is fine. Are you going to update NatsJetStreamImpl feature flags to know if the server supports? See lines 48/49 and 71/72. What's a good name for this feature? "direct211"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you going to update NatsJetStreamImpl feature flags to know if the server supports?
Will do 👍
Wondering whether those fields will be updated if we were initially connected to a server that supports it, and we reconnect to another server that doesn't?
What's a good name for this feature? "direct211"?
Yeah, something like that. directBatchGet211Available
, maybe
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added as directBatchGet211Available
.
And added exceptions for these cases: JsDirectBatchGet211NotAvailable
and JsAllowDirectRequired
.
Signed-off-by: Maurice van Veen <[email protected]>
c3dac04
to
00a8a02
Compare
Signed-off-by: Maurice van Veen <[email protected]>
* server such as timeout or interruption | ||
* @throws JetStreamApiException the request had an error related to the data | ||
*/ | ||
void getMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, Consumer<MessageInfo> consumer) throws IOException, JetStreamApiException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't use "Consumer". You should make something like MessageInfoHandler, similar to MessageHandler. The justification for this is that we have a class called Consumer and I don't want there to be any confusion, especially when it comes to user's code and their imports. This is not the first time I've made this choice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, added MessageInfoHandler
.
} | ||
JsonUtils.addField(sb, TIME, time); | ||
JsonUtils.addField(sb, STREAM, stream); | ||
JsonUtils.addField(sb, "last_seq", lastSeq); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just noticed there is an ApiConstant.LAST_SEQ. Can you update this since you are here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, using LAST_SEQ
and also ERROR
above existed already.
public static final NatsJetStreamClientError JsConsumerNameDurableMismatch = new NatsJetStreamClientError(CON, 90302, "Name must match durable if both are supplied."); | ||
public static final NatsJetStreamClientError JsMultipleFilterSubjects210NotAvailable = new NatsJetStreamClientError(CON, 90303, "Multiple filter subjects not available until server version 2.10.0."); | ||
public static final NatsJetStreamClientError JsAllowDirectRequired = new NatsJetStreamClientError(CON, 90304, "Stream must have allow direct set."); | ||
public static final NatsJetStreamClientError JsDirectBatchGet211NotAvailable = new NatsJetStreamClientError(CON, 90305, "Batch direct get not available until server version 2.11.0."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please update the README
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
long maxTimeMillis = messageBatchGetRequest.getTimeout().toMillis(); | ||
long timeLeft = maxTimeMillis; | ||
while (true) { | ||
Message msg = responseRequired(sub.nextMessage(timeLeft)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's talk through a few things.
- The user processing the message takes time away from your wait.
- Do we //how would we notify the user about when we've asked for 10 messages, the server has plenty in the range, but there is a failure after less than 10, like a real disconnection.
- Are there other ways to give the user messages, meaning should they have the choice to fetch, iterate or consume (callback is consume)
Signed-off-by: Maurice van Veen <[email protected]>
Signed-off-by: Maurice van Veen <[email protected]>
Signed-off-by: Maurice van Veen <[email protected]>
Signed-off-by: Maurice van Veen <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes discussed
Signed-off-by: Maurice van Veen <[email protected]>
Signed-off-by: Maurice van Veen <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Naming nitpick
* server such as timeout or interruption | ||
* @throws JetStreamApiException the request had an error related to the data | ||
*/ | ||
void gatherMessageBatch(String streamName, MessageBatchGetRequest messageBatchGetRequest, MessageInfoHandler handler) throws IOException, JetStreamApiException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got rid of "gather" and just did "request" so as not to make it seem like I only support "scatter gather". I also changed "iterate" to "queue" since it seems more accurate and also since we aren't giving them an iterator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done 👌
Signed-off-by: Maurice van Veen <[email protected]>
Signed-off-by: Maurice van Veen <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Implements batch direct get as defined in ADR-31.