Skip to content

Commit 164d14f

Browse files
authored
Merge pull request #3031 from serkanerip/master
Send halfClose immediately after messages to prevent late halfClose issues with Envoy
2 parents ec81924 + b62f609 commit 164d14f

File tree

2 files changed

+68
-17
lines changed

2 files changed

+68
-17
lines changed

packages/grpc-js/src/retrying-call.ts

Lines changed: 42 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -760,11 +760,10 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
760760
this.maybeStartHedgingTimer();
761761
}
762762

763-
private handleChildWriteCompleted(childIndex: number) {
764-
const childCall = this.underlyingCalls[childIndex];
765-
const messageIndex = childCall.nextMessageToSend;
763+
private handleChildWriteCompleted(childIndex: number, messageIndex: number) {
766764
this.getBufferEntry(messageIndex).callback?.();
767765
this.clearSentMessages();
766+
const childCall = this.underlyingCalls[childIndex];
768767
childCall.nextMessageToSend += 1;
769768
this.sendNextChildMessage(childIndex);
770769
}
@@ -774,19 +773,33 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
774773
if (childCall.state === 'COMPLETED') {
775774
return;
776775
}
777-
if (this.getBufferEntry(childCall.nextMessageToSend)) {
778-
const bufferEntry = this.getBufferEntry(childCall.nextMessageToSend);
776+
const messageIndex = childCall.nextMessageToSend;
777+
if (this.getBufferEntry(messageIndex)) {
778+
const bufferEntry = this.getBufferEntry(messageIndex);
779779
switch (bufferEntry.entryType) {
780780
case 'MESSAGE':
781781
childCall.call.sendMessageWithContext(
782782
{
783783
callback: error => {
784784
// Ignore error
785-
this.handleChildWriteCompleted(childIndex);
785+
this.handleChildWriteCompleted(childIndex, messageIndex);
786786
},
787787
},
788788
bufferEntry.message!.message
789789
);
790+
// Optimization: if the next entry is HALF_CLOSE, send it immediately
791+
// without waiting for the message callback. This is safe because the message
792+
// has already been passed to the underlying transport.
793+
const nextEntry = this.getBufferEntry(messageIndex + 1);
794+
if (nextEntry.entryType === 'HALF_CLOSE') {
795+
this.trace(
796+
'Sending halfClose immediately after message to child [' +
797+
childCall.call.getCallNumber() +
798+
'] - optimizing for unary/final message'
799+
);
800+
childCall.nextMessageToSend += 1;
801+
childCall.call.halfClose();
802+
}
790803
break;
791804
case 'HALF_CLOSE':
792805
childCall.nextMessageToSend += 1;
@@ -813,7 +826,11 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
813826
};
814827
this.writeBuffer.push(bufferEntry);
815828
if (bufferEntry.allocated) {
816-
context.callback?.();
829+
// Run this in next tick to avoid suspending the current execution context
830+
// otherwise it might cause half closing the call before sending message
831+
process.nextTick(() => {
832+
context.callback?.();
833+
});
817834
for (const [callIndex, call] of this.underlyingCalls.entries()) {
818835
if (
819836
call.state === 'ACTIVE' &&
@@ -823,7 +840,7 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
823840
{
824841
callback: error => {
825842
// Ignore error
826-
this.handleChildWriteCompleted(callIndex);
843+
this.handleChildWriteCompleted(callIndex, messageIndex);
827844
},
828845
},
829846
message
@@ -843,7 +860,7 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
843860
{
844861
callback: error => {
845862
// Ignore error
846-
this.handleChildWriteCompleted(this.committedCallIndex!);
863+
this.handleChildWriteCompleted(this.committedCallIndex!, messageIndex);
847864
},
848865
},
849866
message
@@ -868,12 +885,21 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
868885
allocated: false,
869886
});
870887
for (const call of this.underlyingCalls) {
871-
if (
872-
call?.state === 'ACTIVE' &&
873-
call.nextMessageToSend === halfCloseIndex
874-
) {
875-
call.nextMessageToSend += 1;
876-
call.call.halfClose();
888+
if (call?.state === 'ACTIVE') {
889+
// Send halfClose to call when either:
890+
// - nextMessageToSend === halfCloseIndex - 1: last message sent, callback pending (optimization)
891+
// - nextMessageToSend === halfCloseIndex: all messages sent and acknowledged
892+
if (call.nextMessageToSend === halfCloseIndex
893+
|| call.nextMessageToSend === halfCloseIndex - 1) {
894+
this.trace(
895+
'Sending halfClose immediately to child [' +
896+
call.call.getCallNumber() +
897+
'] - all messages already sent'
898+
);
899+
call.nextMessageToSend += 1;
900+
call.call.halfClose();
901+
}
902+
// Otherwise, halfClose will be sent by sendNextChildMessage when message callbacks complete
877903
}
878904
}
879905
}
@@ -895,4 +921,4 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
895921
return null;
896922
}
897923
}
898-
}
924+
}

packages/grpc-js/test/test-end-to-end.ts

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import * as assert from 'assert';
1919
import * as path from 'path';
2020
import { loadProtoFile } from './common';
21-
import { Metadata, Server, ServerDuplexStream, ServerUnaryCall, ServiceClientConstructor, ServiceError, experimental, sendUnaryData } from '../src';
21+
import { Metadata, Server, ServerCredentials, ServerDuplexStream, ServerReadableStream, ServerUnaryCall, ServiceClientConstructor, ServiceError, credentials, experimental, sendUnaryData } from '../src';
2222
import { ServiceClient } from '../src/make-client';
2323

2424
const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto');
@@ -36,6 +36,15 @@ const echoServiceImplementation = {
3636
call.end();
3737
});
3838
},
39+
echoClientStream(call: ServerReadableStream<any, any>, callback: sendUnaryData<any>) {
40+
const messages: any[] = [];
41+
call.on('data', (message: any) => {
42+
messages.push(message);
43+
});
44+
call.on('end', () => {
45+
callback(null, { value: messages.map(m => m.value).join(','), value2: messages.length });
46+
});
47+
},
3948
};
4049

4150
describe('Client should successfully communicate with server', () => {
@@ -77,4 +86,20 @@ describe('Client should successfully communicate with server', () => {
7786
});
7887
});
7988
}).timeout(5000);
89+
90+
it('Client streaming with one message should work', done => {
91+
server = new Server();
92+
server.addService(EchoService.service, echoServiceImplementation);
93+
server.bindAsync('localhost:0', ServerCredentials.createInsecure(), (error, port) => {
94+
assert.ifError(error);
95+
client = new EchoService(`localhost:${port}`, credentials.createInsecure());
96+
const call = client.echoClientStream((error: ServiceError, response: any) => {
97+
assert.ifError(error);
98+
assert.deepStrictEqual(response, { value: 'test value', value2: 1 });
99+
done();
100+
});
101+
call.write({ value: 'test value', value2: 42 });
102+
call.end();
103+
});
104+
});
80105
});

0 commit comments

Comments
 (0)