From 68a9a6d3b0c39482a0635b7ee7623b6a5ae54236 Mon Sep 17 00:00:00 2001 From: Serkan Erip Date: Mon, 8 Dec 2025 17:02:53 +0300 Subject: [PATCH 1/5] half close right after write --- packages/grpc-js/src/retrying-call.ts | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/packages/grpc-js/src/retrying-call.ts b/packages/grpc-js/src/retrying-call.ts index 1d49ad337..cb4f7b3f8 100644 --- a/packages/grpc-js/src/retrying-call.ts +++ b/packages/grpc-js/src/retrying-call.ts @@ -211,6 +211,7 @@ export class RetryingCall implements Call, DeadlineInfoProvider { private nextRetryBackoffSec = 0; private startTime: Date; private maxAttempts: number; + private halfClosed: boolean = false; constructor( private readonly channel: InternalChannel, private readonly callConfig: CallConfig, @@ -789,8 +790,11 @@ export class RetryingCall implements Call, DeadlineInfoProvider { ); break; case 'HALF_CLOSE': - childCall.nextMessageToSend += 1; - childCall.call.halfClose(); + if (this.halfClosed !== true) { + childCall.nextMessageToSend += 1; + childCall.call.halfClose(); + this.halfClosed = true; + } break; case 'FREED': // Should not be possible @@ -868,12 +872,13 @@ export class RetryingCall implements Call, DeadlineInfoProvider { allocated: false, }); for (const call of this.underlyingCalls) { - if ( - call?.state === 'ACTIVE' && - call.nextMessageToSend === halfCloseIndex - ) { - call.nextMessageToSend += 1; - call.call.halfClose(); + if (call?.state === 'ACTIVE' && this.halfClosed !== true) { + // || call.nextMessageToSend === halfCloseIndex - 1 added to not wait for write callback + if (call.nextMessageToSend === halfCloseIndex || call.nextMessageToSend === halfCloseIndex - 1) { + call.nextMessageToSend += 1; + call.call.halfClose(); + this.halfClosed = true; + } } } } From 298e39a9383b54646cdca8adb518f8c77f2e1ab9 Mon Sep 17 00:00:00 2001 From: Serkan Erip Date: Mon, 8 Dec 2025 20:50:49 +0300 Subject: [PATCH 2/5] revert changes --- packages/grpc-js/src/retrying-call.ts | 23 +++++++++-------------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/packages/grpc-js/src/retrying-call.ts b/packages/grpc-js/src/retrying-call.ts index cb4f7b3f8..5a7957fd2 100644 --- a/packages/grpc-js/src/retrying-call.ts +++ b/packages/grpc-js/src/retrying-call.ts @@ -211,7 +211,6 @@ export class RetryingCall implements Call, DeadlineInfoProvider { private nextRetryBackoffSec = 0; private startTime: Date; private maxAttempts: number; - private halfClosed: boolean = false; constructor( private readonly channel: InternalChannel, private readonly callConfig: CallConfig, @@ -790,11 +789,8 @@ export class RetryingCall implements Call, DeadlineInfoProvider { ); break; case 'HALF_CLOSE': - if (this.halfClosed !== true) { - childCall.nextMessageToSend += 1; - childCall.call.halfClose(); - this.halfClosed = true; - } + childCall.nextMessageToSend += 1; + childCall.call.halfClose(); break; case 'FREED': // Should not be possible @@ -872,13 +868,12 @@ export class RetryingCall implements Call, DeadlineInfoProvider { allocated: false, }); for (const call of this.underlyingCalls) { - if (call?.state === 'ACTIVE' && this.halfClosed !== true) { - // || call.nextMessageToSend === halfCloseIndex - 1 added to not wait for write callback - if (call.nextMessageToSend === halfCloseIndex || call.nextMessageToSend === halfCloseIndex - 1) { - call.nextMessageToSend += 1; - call.call.halfClose(); - this.halfClosed = true; - } + if ( + call?.state === 'ACTIVE' && + call.nextMessageToSend === halfCloseIndex + ) { + call.nextMessageToSend += 1; + call.call.halfClose(); } } } @@ -900,4 +895,4 @@ export class RetryingCall implements Call, DeadlineInfoProvider { return null; } } -} +} \ No newline at end of file From 1d546eeb1498a64cea0fcb984b7d5d68ad5d62f6 Mon Sep 17 00:00:00 2001 From: Serkan Erip Date: Tue, 9 Dec 2025 22:36:25 +0300 Subject: [PATCH 3/5] Send halfClose immediately after messages to prevent late halfClose issues with Envoy --- packages/grpc-js/src/retrying-call.ts | 56 +++++++++++++++++++++++---- 1 file changed, 48 insertions(+), 8 deletions(-) diff --git a/packages/grpc-js/src/retrying-call.ts b/packages/grpc-js/src/retrying-call.ts index 5a7957fd2..cbaec082c 100644 --- a/packages/grpc-js/src/retrying-call.ts +++ b/packages/grpc-js/src/retrying-call.ts @@ -123,6 +123,15 @@ interface UnderlyingCall { state: UnderlyingCallState; call: LoadBalancingCall; nextMessageToSend: number; + /** + * Tracks the highest message index that has been sent to the underlying call. + * This is different from nextMessageToSend which tracks completion/acknowledgment. + */ + highestSentMessageIndex: number; + /** + * Tracks whether halfClose has been sent to this child call. + */ + halfCloseSent: boolean; startTime: Date; } @@ -695,6 +704,8 @@ export class RetryingCall implements Call, DeadlineInfoProvider { state: 'ACTIVE', call: child, nextMessageToSend: 0, + highestSentMessageIndex: -1, + halfCloseSent: false, startTime: new Date(), }); const previousAttempts = this.attempts - 1; @@ -778,6 +789,7 @@ export class RetryingCall implements Call, DeadlineInfoProvider { const bufferEntry = this.getBufferEntry(childCall.nextMessageToSend); switch (bufferEntry.entryType) { case 'MESSAGE': + childCall.highestSentMessageIndex = childCall.nextMessageToSend; childCall.call.sendMessageWithContext( { callback: error => { @@ -787,10 +799,26 @@ export class RetryingCall implements Call, DeadlineInfoProvider { }, bufferEntry.message!.message ); + // Optimization: if the next entry is HALF_CLOSE, send it immediately + // without waiting for the message callback. This is safe because the message + // has already been passed to the underlying transport. + const nextEntry = this.getBufferEntry(childCall.nextMessageToSend + 1); + if (nextEntry.entryType === 'HALF_CLOSE' && !childCall.halfCloseSent) { + this.trace( + 'Sending halfClose immediately after message to child [' + + childCall.call.getCallNumber() + + '] - optimizing for unary/final message' + ); + childCall.halfCloseSent = true; + childCall.call.halfClose(); + } break; case 'HALF_CLOSE': - childCall.nextMessageToSend += 1; - childCall.call.halfClose(); + if (!childCall.halfCloseSent) { + childCall.nextMessageToSend += 1; + childCall.halfCloseSent = true; + childCall.call.halfClose(); + } break; case 'FREED': // Should not be possible @@ -819,6 +847,7 @@ export class RetryingCall implements Call, DeadlineInfoProvider { call.state === 'ACTIVE' && call.nextMessageToSend === messageIndex ) { + call.highestSentMessageIndex = messageIndex; call.call.sendMessageWithContext( { callback: error => { @@ -839,6 +868,7 @@ export class RetryingCall implements Call, DeadlineInfoProvider { const call = this.underlyingCalls[this.committedCallIndex]; bufferEntry.callback = context.callback; if (call.state === 'ACTIVE' && call.nextMessageToSend === messageIndex) { + call.highestSentMessageIndex = messageIndex; call.call.sendMessageWithContext( { callback: error => { @@ -868,12 +898,22 @@ export class RetryingCall implements Call, DeadlineInfoProvider { allocated: false, }); for (const call of this.underlyingCalls) { - if ( - call?.state === 'ACTIVE' && - call.nextMessageToSend === halfCloseIndex - ) { - call.nextMessageToSend += 1; - call.call.halfClose(); + if (call?.state === 'ACTIVE' && !call.halfCloseSent) { + // Send halfClose immediately if all messages have been sent to this call + // We check highestSentMessageIndex >= halfCloseIndex - 1 because: + // - If halfCloseIndex is 0, there are no messages, so send immediately + // - If halfCloseIndex is N, the last message is at index N-1 + // - If highestSentMessageIndex >= N-1, all messages have been sent + if (halfCloseIndex === 0 || call.highestSentMessageIndex >= halfCloseIndex - 1) { + this.trace( + 'Sending halfClose immediately to child [' + + call.call.getCallNumber() + + '] - all messages already sent' + ); + call.halfCloseSent = true; + call.call.halfClose(); + } + // Otherwise, halfClose will be sent by sendNextChildMessage when messages complete } } } From 699ca49f2a7c976b64fb393d603ee15c32c02e3f Mon Sep 17 00:00:00 2001 From: Serkan Erip Date: Wed, 10 Dec 2025 19:05:58 +0300 Subject: [PATCH 4/5] remove halfCloseSent field --- packages/grpc-js/src/retrying-call.ts | 45 +++++++++++---------------- 1 file changed, 18 insertions(+), 27 deletions(-) diff --git a/packages/grpc-js/src/retrying-call.ts b/packages/grpc-js/src/retrying-call.ts index cbaec082c..733947200 100644 --- a/packages/grpc-js/src/retrying-call.ts +++ b/packages/grpc-js/src/retrying-call.ts @@ -128,10 +128,6 @@ interface UnderlyingCall { * This is different from nextMessageToSend which tracks completion/acknowledgment. */ highestSentMessageIndex: number; - /** - * Tracks whether halfClose has been sent to this child call. - */ - halfCloseSent: boolean; startTime: Date; } @@ -705,7 +701,6 @@ export class RetryingCall implements Call, DeadlineInfoProvider { call: child, nextMessageToSend: 0, highestSentMessageIndex: -1, - halfCloseSent: false, startTime: new Date(), }); const previousAttempts = this.attempts - 1; @@ -771,11 +766,10 @@ export class RetryingCall implements Call, DeadlineInfoProvider { this.maybeStartHedgingTimer(); } - private handleChildWriteCompleted(childIndex: number) { - const childCall = this.underlyingCalls[childIndex]; - const messageIndex = childCall.nextMessageToSend; + private handleChildWriteCompleted(childIndex: number, messageIndex: number) { this.getBufferEntry(messageIndex).callback?.(); this.clearSentMessages(); + const childCall = this.underlyingCalls[childIndex]; childCall.nextMessageToSend += 1; this.sendNextChildMessage(childIndex); } @@ -785,40 +779,38 @@ export class RetryingCall implements Call, DeadlineInfoProvider { if (childCall.state === 'COMPLETED') { return; } - if (this.getBufferEntry(childCall.nextMessageToSend)) { - const bufferEntry = this.getBufferEntry(childCall.nextMessageToSend); + const messageIndex = childCall.nextMessageToSend; + if (this.getBufferEntry(messageIndex)) { + const bufferEntry = this.getBufferEntry(messageIndex); switch (bufferEntry.entryType) { case 'MESSAGE': - childCall.highestSentMessageIndex = childCall.nextMessageToSend; childCall.call.sendMessageWithContext( { callback: error => { // Ignore error - this.handleChildWriteCompleted(childIndex); + this.handleChildWriteCompleted(childIndex, messageIndex); }, }, bufferEntry.message!.message ); + childCall.highestSentMessageIndex = messageIndex; // Optimization: if the next entry is HALF_CLOSE, send it immediately // without waiting for the message callback. This is safe because the message // has already been passed to the underlying transport. - const nextEntry = this.getBufferEntry(childCall.nextMessageToSend + 1); - if (nextEntry.entryType === 'HALF_CLOSE' && !childCall.halfCloseSent) { + const nextEntry = this.getBufferEntry(messageIndex + 1); + if (nextEntry.entryType === 'HALF_CLOSE') { this.trace( 'Sending halfClose immediately after message to child [' + childCall.call.getCallNumber() + '] - optimizing for unary/final message' ); - childCall.halfCloseSent = true; + childCall.nextMessageToSend += 1; childCall.call.halfClose(); } break; case 'HALF_CLOSE': - if (!childCall.halfCloseSent) { - childCall.nextMessageToSend += 1; - childCall.halfCloseSent = true; - childCall.call.halfClose(); - } + childCall.nextMessageToSend += 1; + childCall.call.halfClose(); break; case 'FREED': // Should not be possible @@ -847,16 +839,16 @@ export class RetryingCall implements Call, DeadlineInfoProvider { call.state === 'ACTIVE' && call.nextMessageToSend === messageIndex ) { - call.highestSentMessageIndex = messageIndex; call.call.sendMessageWithContext( { callback: error => { // Ignore error - this.handleChildWriteCompleted(callIndex); + this.handleChildWriteCompleted(callIndex, messageIndex); }, }, message ); + call.highestSentMessageIndex = messageIndex; } } } else { @@ -873,7 +865,7 @@ export class RetryingCall implements Call, DeadlineInfoProvider { { callback: error => { // Ignore error - this.handleChildWriteCompleted(this.committedCallIndex!); + this.handleChildWriteCompleted(this.committedCallIndex!, messageIndex); }, }, message @@ -898,19 +890,18 @@ export class RetryingCall implements Call, DeadlineInfoProvider { allocated: false, }); for (const call of this.underlyingCalls) { - if (call?.state === 'ACTIVE' && !call.halfCloseSent) { + if (call?.state === 'ACTIVE') { // Send halfClose immediately if all messages have been sent to this call // We check highestSentMessageIndex >= halfCloseIndex - 1 because: - // - If halfCloseIndex is 0, there are no messages, so send immediately // - If halfCloseIndex is N, the last message is at index N-1 // - If highestSentMessageIndex >= N-1, all messages have been sent - if (halfCloseIndex === 0 || call.highestSentMessageIndex >= halfCloseIndex - 1) { + if (call.highestSentMessageIndex >= halfCloseIndex - 1) { this.trace( 'Sending halfClose immediately to child [' + call.call.getCallNumber() + '] - all messages already sent' ); - call.halfCloseSent = true; + call.nextMessageToSend += 1; call.call.halfClose(); } // Otherwise, halfClose will be sent by sendNextChildMessage when messages complete From b62f6096804b3071cfba79934a1103bb192ef91e Mon Sep 17 00:00:00 2001 From: Serkan Erip Date: Wed, 10 Dec 2025 20:03:43 +0300 Subject: [PATCH 5/5] Use nextMessageToSend for early half-close --- packages/grpc-js/src/retrying-call.ts | 27 ++++++++++-------------- packages/grpc-js/test/test-end-to-end.ts | 27 +++++++++++++++++++++++- 2 files changed, 37 insertions(+), 17 deletions(-) diff --git a/packages/grpc-js/src/retrying-call.ts b/packages/grpc-js/src/retrying-call.ts index 733947200..61ff58fa1 100644 --- a/packages/grpc-js/src/retrying-call.ts +++ b/packages/grpc-js/src/retrying-call.ts @@ -123,11 +123,6 @@ interface UnderlyingCall { state: UnderlyingCallState; call: LoadBalancingCall; nextMessageToSend: number; - /** - * Tracks the highest message index that has been sent to the underlying call. - * This is different from nextMessageToSend which tracks completion/acknowledgment. - */ - highestSentMessageIndex: number; startTime: Date; } @@ -700,7 +695,6 @@ export class RetryingCall implements Call, DeadlineInfoProvider { state: 'ACTIVE', call: child, nextMessageToSend: 0, - highestSentMessageIndex: -1, startTime: new Date(), }); const previousAttempts = this.attempts - 1; @@ -793,7 +787,6 @@ export class RetryingCall implements Call, DeadlineInfoProvider { }, bufferEntry.message!.message ); - childCall.highestSentMessageIndex = messageIndex; // Optimization: if the next entry is HALF_CLOSE, send it immediately // without waiting for the message callback. This is safe because the message // has already been passed to the underlying transport. @@ -833,7 +826,11 @@ export class RetryingCall implements Call, DeadlineInfoProvider { }; this.writeBuffer.push(bufferEntry); if (bufferEntry.allocated) { - context.callback?.(); + // Run this in next tick to avoid suspending the current execution context + // otherwise it might cause half closing the call before sending message + process.nextTick(() => { + context.callback?.(); + }); for (const [callIndex, call] of this.underlyingCalls.entries()) { if ( call.state === 'ACTIVE' && @@ -848,7 +845,6 @@ export class RetryingCall implements Call, DeadlineInfoProvider { }, message ); - call.highestSentMessageIndex = messageIndex; } } } else { @@ -860,7 +856,6 @@ export class RetryingCall implements Call, DeadlineInfoProvider { const call = this.underlyingCalls[this.committedCallIndex]; bufferEntry.callback = context.callback; if (call.state === 'ACTIVE' && call.nextMessageToSend === messageIndex) { - call.highestSentMessageIndex = messageIndex; call.call.sendMessageWithContext( { callback: error => { @@ -891,11 +886,11 @@ export class RetryingCall implements Call, DeadlineInfoProvider { }); for (const call of this.underlyingCalls) { if (call?.state === 'ACTIVE') { - // Send halfClose immediately if all messages have been sent to this call - // We check highestSentMessageIndex >= halfCloseIndex - 1 because: - // - If halfCloseIndex is N, the last message is at index N-1 - // - If highestSentMessageIndex >= N-1, all messages have been sent - if (call.highestSentMessageIndex >= halfCloseIndex - 1) { + // Send halfClose to call when either: + // - nextMessageToSend === halfCloseIndex - 1: last message sent, callback pending (optimization) + // - nextMessageToSend === halfCloseIndex: all messages sent and acknowledged + if (call.nextMessageToSend === halfCloseIndex + || call.nextMessageToSend === halfCloseIndex - 1) { this.trace( 'Sending halfClose immediately to child [' + call.call.getCallNumber() + @@ -904,7 +899,7 @@ export class RetryingCall implements Call, DeadlineInfoProvider { call.nextMessageToSend += 1; call.call.halfClose(); } - // Otherwise, halfClose will be sent by sendNextChildMessage when messages complete + // Otherwise, halfClose will be sent by sendNextChildMessage when message callbacks complete } } } diff --git a/packages/grpc-js/test/test-end-to-end.ts b/packages/grpc-js/test/test-end-to-end.ts index c7de2d6a6..676b4cff0 100644 --- a/packages/grpc-js/test/test-end-to-end.ts +++ b/packages/grpc-js/test/test-end-to-end.ts @@ -18,7 +18,7 @@ import * as assert from 'assert'; import * as path from 'path'; import { loadProtoFile } from './common'; -import { Metadata, Server, ServerDuplexStream, ServerUnaryCall, ServiceClientConstructor, ServiceError, experimental, sendUnaryData } from '../src'; +import { Metadata, Server, ServerCredentials, ServerDuplexStream, ServerReadableStream, ServerUnaryCall, ServiceClientConstructor, ServiceError, credentials, experimental, sendUnaryData } from '../src'; import { ServiceClient } from '../src/make-client'; const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto'); @@ -36,6 +36,15 @@ const echoServiceImplementation = { call.end(); }); }, + echoClientStream(call: ServerReadableStream, callback: sendUnaryData) { + const messages: any[] = []; + call.on('data', (message: any) => { + messages.push(message); + }); + call.on('end', () => { + callback(null, { value: messages.map(m => m.value).join(','), value2: messages.length }); + }); + }, }; describe('Client should successfully communicate with server', () => { @@ -77,4 +86,20 @@ describe('Client should successfully communicate with server', () => { }); }); }).timeout(5000); + + it('Client streaming with one message should work', done => { + server = new Server(); + server.addService(EchoService.service, echoServiceImplementation); + server.bindAsync('localhost:0', ServerCredentials.createInsecure(), (error, port) => { + assert.ifError(error); + client = new EchoService(`localhost:${port}`, credentials.createInsecure()); + const call = client.echoClientStream((error: ServiceError, response: any) => { + assert.ifError(error); + assert.deepStrictEqual(response, { value: 'test value', value2: 1 }); + done(); + }); + call.write({ value: 'test value', value2: 42 }); + call.end(); + }); + }); });