Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
522 changes: 518 additions & 4 deletions protos/protos.d.ts

Large diffs are not rendered by default.

1,821 changes: 1,747 additions & 74 deletions protos/protos.js

Large diffs are not rendered by default.

215 changes: 193 additions & 22 deletions protos/protos.json

Large diffs are not rendered by default.

65 changes: 64 additions & 1 deletion src/lease-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,24 @@
*/

import {EventEmitter} from 'events';

import {AckError, Message, Subscriber} from './subscriber';
import {defaultOptions} from './default-options';
import {Duration} from './temporal';
import {DebugMessage} from './debug';
import {logs as baseLogs} from './logs';

/**
* Loggers. Exported for unit tests.
*
* @private
*/
export const logs = {
callbackDelivery: baseLogs.pubsub.sublog('callback-delivery'),
callbackExceptions: baseLogs.pubsub.sublog('callback-exceptions'),
expiry: baseLogs.pubsub.sublog('expiry'),
subscriberFlowControl: baseLogs.pubsub.sublog('subscriber-flow-control'),
};

export interface FlowControlOptions {
allowExcessMessages?: boolean;
Expand Down Expand Up @@ -106,6 +121,12 @@ export class LeaseManager extends EventEmitter {
if (allowExcessMessages! || !wasFull) {
this._dispense(message);
} else {
if (this.pending === 0) {
logs.subscriberFlowControl.info(
'subscriber for %s is client-side flow blocked',
this._subscriber.name,
);
}
this._pending.push(message);
}

Expand All @@ -125,6 +146,13 @@ export class LeaseManager extends EventEmitter {
clear(): Message[] {
const wasFull = this.isFull();

if (this.pending > 0) {
logs.subscriberFlowControl.info(
'subscriber for %s is unblocking client-side flow due to clear()',
this._subscriber.name,
);
}

this._pending = [];
const remaining = Array.from(this._messages);
this._messages.clear();
Expand Down Expand Up @@ -173,6 +201,18 @@ export class LeaseManager extends EventEmitter {
const index = this._pending.indexOf(message);
this._pending.splice(index, 1);
} else if (this.pending > 0) {
if (this.pending > 1) {
logs.subscriberFlowControl.info(
'subscriber for %s dispensing one blocked message',
this._subscriber.name,
);
} else {
logs.subscriberFlowControl.info(
'subscriber for %s fully unblocked on client-side flow control',
this._subscriber.name,
);
}

this._dispense(this._pending.shift()!);
}

Expand Down Expand Up @@ -225,8 +265,26 @@ export class LeaseManager extends EventEmitter {
if (this._subscriber.isOpen) {
message.subSpans.flowEnd();
process.nextTick(() => {
logs.callbackDelivery.info(
'message (ID %s, ackID %s) delivery to user callbacks',
message.id,
message.ackId,
);
message.subSpans.processingStart(this._subscriber.name);
this._subscriber.emit('message', message);
try {
this._subscriber.emit('message', message);
} catch (e: unknown) {
logs.callbackExceptions.error(
'message (ID %s, ackID %s) caused a user callback exception: %o',
message.id,
message.ackId,
e,
);
this._subscriber.emit(
'debug',
new DebugMessage('error during user callback', e as Error),
);
}
});
}
}
Expand Down Expand Up @@ -265,6 +323,11 @@ export class LeaseManager extends EventEmitter {
message.subSpans.modAckStart(deadline, false);
}
} else {
logs.expiry.warn(
'message (ID %s, ackID %s) has been dropped from leasing due to a timeout',
message.id,
message.ackId,
);
this.remove(message);
}
}
Expand Down
24 changes: 24 additions & 0 deletions src/logs.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import {loggingUtils} from 'google-gax';

/**
* Base logger. Other loggers will derive from this one.
*
* @private
*/
export const logs = {
pubsub: loggingUtils.log('pubsub'),
};
61 changes: 57 additions & 4 deletions src/message-queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,16 @@ import {Duration} from './temporal';
import {addToBucket} from './util';
import {DebugMessage} from './debug';
import * as tracing from './telemetry-tracing';
import {logs as baseLogs} from './logs';

/**
* Loggers. Exported for unit tests.
*
* @private
*/
export const logs = {
ackBatch: baseLogs.pubsub.sublog('ack-batch'),
};

export interface ReducedMessage {
ackId: string;
Expand Down Expand Up @@ -147,6 +157,8 @@ export abstract class MessageQueue {
this.setOptions(options);
}

protected abstract getType(): string;

/**
* Shuts down this message queue gracefully. Any acks/modAcks pending in
* the queue or waiting for retry will be removed. If exactly-once delivery
Expand Down Expand Up @@ -188,6 +200,20 @@ export abstract class MessageQueue {
return this._options!.maxMilliseconds!;
}

/*!
* Logs about a batch being sent, and why.
* @private
*/
private logBatch(reason: string) {
logs.ackBatch.info(
'%s triggered %s batch of %i messages, a total of %i bytes',
reason,
this.getType(),
this._requests.length,
this.bytes,
);
}

/**
* Adds a message to the queue.
*
Expand All @@ -213,7 +239,11 @@ export abstract class MessageQueue {
this._requests.length + 1 >= maxMessages! ||
this.bytes + size >= MAX_BATCH_BYTES
) {
await this.flush();
const reason =
this._requests.length + 1 >= maxMessages!
? 'going over count'
: 'going over size';
await this.flush(reason);
}

// Add the message to the current batch.
Expand All @@ -233,7 +263,10 @@ export abstract class MessageQueue {

// Ensure that we are counting toward maxMilliseconds by timer.
if (!this._timer) {
this._timer = setTimeout(() => this.flush(), maxMilliseconds!);
this._timer = setTimeout(
() => this.flush('batch timer'),
maxMilliseconds!,
);
}

return responsePromise.promise;
Expand Down Expand Up @@ -263,7 +296,7 @@ export abstract class MessageQueue {
// Make sure we actually do have another batch scheduled.
if (!this._timer) {
this._timer = setTimeout(
() => this.flush(),
() => this.flush('retry timer'),
this._options.maxMilliseconds!,
);
}
Expand All @@ -283,7 +316,11 @@ export abstract class MessageQueue {
* Sends a batch of messages.
* @private
*/
async flush(): Promise<void> {
async flush(reason?: string): Promise<void> {
if (reason) {
this.logBatch(reason);
}

if (this._timer) {
clearTimeout(this._timer);
delete this._timer;
Expand Down Expand Up @@ -493,6 +530,14 @@ export abstract class MessageQueue {
* @class
*/
export class AckQueue extends MessageQueue {
/**
* @private
* @returns The name of the items in this queue.
*/
protected getType(): string {
return 'ack';
}

/**
* Sends a batch of ack requests.
*
Expand Down Expand Up @@ -555,6 +600,14 @@ export class AckQueue extends MessageQueue {
* @class
*/
export class ModAckQueue extends MessageQueue {
/**
* @private
* @returns The name of the items in this queue.
*/
protected getType(): string {
return 'modack/nack';
}

/**
* Sends a batch of modAck requests. Each deadline requires its own request,
* so we have to group all the ackIds by deadline and send multiple requests.
Expand Down
Loading
Loading