Skip to content
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 49 additions & 9 deletions packages/grpc-js/src/retrying-call.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,15 @@ interface UnderlyingCall {
state: UnderlyingCallState;
call: LoadBalancingCall;
nextMessageToSend: number;
/**
* Tracks the highest message index that has been sent to the underlying call.
* This is different from nextMessageToSend which tracks completion/acknowledgment.
*/
highestSentMessageIndex: number;
/**
* Tracks whether halfClose has been sent to this child call.
*/
halfCloseSent: boolean;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think either of these fields is necessary. nextMessageToSend should be enough to track all of the relevant state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we half close + increment nextMessageTosend, this line would't not work so callback of the message would not be executed, this is why it's added.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of adding a new field, just make the message index a second argument to handleChildWriteCompleted. In sendNextChildMessage, be careful to capture childCall.nextMessageToSend in a message index variable before potentially incrementing it when sending the immediate half close.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sent a commit for this feedback.

startTime: Date;
}

Expand Down Expand Up @@ -695,6 +704,8 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
state: 'ACTIVE',
call: child,
nextMessageToSend: 0,
highestSentMessageIndex: -1,
halfCloseSent: false,
startTime: new Date(),
});
const previousAttempts = this.attempts - 1;
Expand Down Expand Up @@ -778,6 +789,7 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
const bufferEntry = this.getBufferEntry(childCall.nextMessageToSend);
switch (bufferEntry.entryType) {
case 'MESSAGE':
childCall.highestSentMessageIndex = childCall.nextMessageToSend;
childCall.call.sendMessageWithContext(
{
callback: error => {
Expand All @@ -787,10 +799,26 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
},
bufferEntry.message!.message
);
// Optimization: if the next entry is HALF_CLOSE, send it immediately
// without waiting for the message callback. This is safe because the message
// has already been passed to the underlying transport.
const nextEntry = this.getBufferEntry(childCall.nextMessageToSend + 1);
if (nextEntry.entryType === 'HALF_CLOSE' && !childCall.halfCloseSent) {
this.trace(
'Sending halfClose immediately after message to child [' +
childCall.call.getCallNumber() +
'] - optimizing for unary/final message'
);
childCall.halfCloseSent = true;
childCall.call.halfClose();
}
break;
case 'HALF_CLOSE':
childCall.nextMessageToSend += 1;
childCall.call.halfClose();
if (!childCall.halfCloseSent) {
childCall.nextMessageToSend += 1;
childCall.halfCloseSent = true;
childCall.call.halfClose();
}
break;
case 'FREED':
// Should not be possible
Expand Down Expand Up @@ -819,6 +847,7 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
call.state === 'ACTIVE' &&
call.nextMessageToSend === messageIndex
) {
call.highestSentMessageIndex = messageIndex;
call.call.sendMessageWithContext(
{
callback: error => {
Expand All @@ -839,6 +868,7 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
const call = this.underlyingCalls[this.committedCallIndex];
bufferEntry.callback = context.callback;
if (call.state === 'ACTIVE' && call.nextMessageToSend === messageIndex) {
call.highestSentMessageIndex = messageIndex;
call.call.sendMessageWithContext(
{
callback: error => {
Expand Down Expand Up @@ -868,12 +898,22 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
allocated: false,
});
for (const call of this.underlyingCalls) {
if (
call?.state === 'ACTIVE' &&
call.nextMessageToSend === halfCloseIndex
) {
call.nextMessageToSend += 1;
call.call.halfClose();
if (call?.state === 'ACTIVE' && !call.halfCloseSent) {
// Send halfClose immediately if all messages have been sent to this call
// We check highestSentMessageIndex >= halfCloseIndex - 1 because:
// - If halfCloseIndex is 0, there are no messages, so send immediately
// - If halfCloseIndex is N, the last message is at index N-1
// - If highestSentMessageIndex >= N-1, all messages have been sent
if (halfCloseIndex === 0 || call.highestSentMessageIndex >= halfCloseIndex - 1) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If halfCloseIndex is 0, then call.nextMessageToSend had better be >= -1, so there's no need to check halfCloseIndex === 0 separately.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’ll leave my comments here regarding the need for highestSentMessageIndex.

The condition
call.nextMessageToSend === halfCloseIndex || call.nextMessageToSend === halfCloseIndex - 1
was added as a quick way to confirm that the problem is related to half-close timing.

When testing this with client-streaming calls, keeping call.nextMessageToSend += 1; inside that if block results in
gRPC Error: 13 INTERNAL: Write error: write after end.
If that line is removed, no messages are sent from the client at all.

This behavior can be reproduced when the client sends only one message and then closes the stream. If multiple messages are sent before closing, the if condition does not match and the issue does not occur.

Additionally, the “Cardinality violations” tests in test-server-errors.ts also fail.

The root cause is that, for clientUnaryStreaming RPCs, a callback (Stream.onwrite) is attached to the call context and sent along with the message. When this callback is invoked, execution inside sendMessageWithContext is paused, and the next line in user code (call.end()) is executed.

At that point, underlyingCall.nextMessageToSend is 0 and halfCloseIndex is 1, making the following condition true:
call.nextMessageToSend === halfCloseIndex - 1.

As a result, halfClose is called on the stream before any message is actually sent. When execution of sendMessageWithContext later resumes, it attempts to write to an already ended stream, which leads to the write error.

Example reproducer, if you uncomment second write you'll see that issue is not reproducable:

app.post('/api/policies', (req, res) => {
  // call client.UploadPolicies which is client-side streaming
  const call = client.UploadPolicies((error, response) => {
    if (error) {
      console.error('gRPC Error:', error);
      return res.status(500).json({
        error: 'Failed to upload policies',
        details: error.message
      });
    }
    
    console.log('UploadPolicies response:', response);
  });
  
  let i = 1;
  let policy = { id: `policy-${i}`, name: `Policy ${i}` };
  const resp = call.write({ policy });
  console.log('call.write called!');
  // i++;
  // policy = { id: `policy-${i}`, name: `Policy ${i}` };
  // call.write({ policy });
  call.end();
  
  res.json({ message: 'Policies upload initiated' });
});

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it solves the problem to move that context.callback?.(); line into a process.nextTick call, and I think that makes sense anyway. That callback is called asynchronously in other cases, so it would be better to be consistent and make it asynchronous here too.

Copy link
Contributor Author

@serkanerip serkanerip Dec 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try that out, another solution came to my mind which might simplify the changes:

Since this issue happens with unary rpc calls, we could use sendMessageWithContext in here, and passing a new flag like WriteFlags.Endwith context so that in retrying-call the halfClose will be called right after sending message when the flag is set? Unrelated to current solution/changes, different changes will be introduced.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sent a commit to use nextMessageToSend and defer callback execution.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that will make it more complicated, because it adds an extra case to consider without removing the need to handle any other case.

this.trace(
'Sending halfClose immediately to child [' +
call.call.getCallNumber() +
'] - all messages already sent'
);
call.halfCloseSent = true;
call.call.halfClose();
}
// Otherwise, halfClose will be sent by sendNextChildMessage when messages complete
}
}
}
Expand All @@ -895,4 +935,4 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
return null;
}
}
}
}