Skip to content

Commit dce83c7

Browse files
feywindgcf-owl-bot[bot]hongalex
authored
feat: ad-hoc, targeted debug logging (#2062)
* feat: first set of logging changes * tests: add unit test for publisher queue logs * feat: more work on logging * feat: client-side subscriber flow control logs * chore: lint fix * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: move all of the logs to sub-logs of pubsub * feat: disambiguate ack vs modack queue logs * fix: a few more log integration fixups * fix: clear up what 99th percentile means * fix: use the grpc constants instead of a number Co-authored-by: Alex Hong <[email protected]> * fix: tweak wording on 99th percentile log * fix: make the method signature changes extra-non-breaking --------- Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com> Co-authored-by: Alex Hong <[email protected]>
1 parent 7a85349 commit dce83c7

16 files changed

+3162
-211
lines changed

protos/protos.d.ts

Lines changed: 518 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

protos/protos.js

Lines changed: 1747 additions & 74 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

protos/protos.json

Lines changed: 193 additions & 22 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/lease-manager.ts

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,24 @@
1515
*/
1616

1717
import {EventEmitter} from 'events';
18+
1819
import {AckError, Message, Subscriber} from './subscriber';
1920
import {defaultOptions} from './default-options';
2021
import {Duration} from './temporal';
22+
import {DebugMessage} from './debug';
23+
import {logs as baseLogs} from './logs';
24+
25+
/**
26+
* Loggers. Exported for unit tests.
27+
*
28+
* @private
29+
*/
30+
export const logs = {
31+
callbackDelivery: baseLogs.pubsub.sublog('callback-delivery'),
32+
callbackExceptions: baseLogs.pubsub.sublog('callback-exceptions'),
33+
expiry: baseLogs.pubsub.sublog('expiry'),
34+
subscriberFlowControl: baseLogs.pubsub.sublog('subscriber-flow-control'),
35+
};
2136

2237
export interface FlowControlOptions {
2338
allowExcessMessages?: boolean;
@@ -106,6 +121,12 @@ export class LeaseManager extends EventEmitter {
106121
if (allowExcessMessages! || !wasFull) {
107122
this._dispense(message);
108123
} else {
124+
if (this.pending === 0) {
125+
logs.subscriberFlowControl.info(
126+
'subscriber for %s is client-side flow blocked',
127+
this._subscriber.name,
128+
);
129+
}
109130
this._pending.push(message);
110131
}
111132

@@ -125,6 +146,13 @@ export class LeaseManager extends EventEmitter {
125146
clear(): Message[] {
126147
const wasFull = this.isFull();
127148

149+
if (this.pending > 0) {
150+
logs.subscriberFlowControl.info(
151+
'subscriber for %s is unblocking client-side flow due to clear()',
152+
this._subscriber.name,
153+
);
154+
}
155+
128156
this._pending = [];
129157
const remaining = Array.from(this._messages);
130158
this._messages.clear();
@@ -173,6 +201,18 @@ export class LeaseManager extends EventEmitter {
173201
const index = this._pending.indexOf(message);
174202
this._pending.splice(index, 1);
175203
} else if (this.pending > 0) {
204+
if (this.pending > 1) {
205+
logs.subscriberFlowControl.info(
206+
'subscriber for %s dispensing one blocked message',
207+
this._subscriber.name,
208+
);
209+
} else {
210+
logs.subscriberFlowControl.info(
211+
'subscriber for %s fully unblocked on client-side flow control',
212+
this._subscriber.name,
213+
);
214+
}
215+
176216
this._dispense(this._pending.shift()!);
177217
}
178218

@@ -225,8 +265,26 @@ export class LeaseManager extends EventEmitter {
225265
if (this._subscriber.isOpen) {
226266
message.subSpans.flowEnd();
227267
process.nextTick(() => {
268+
logs.callbackDelivery.info(
269+
'message (ID %s, ackID %s) delivery to user callbacks',
270+
message.id,
271+
message.ackId,
272+
);
228273
message.subSpans.processingStart(this._subscriber.name);
229-
this._subscriber.emit('message', message);
274+
try {
275+
this._subscriber.emit('message', message);
276+
} catch (e: unknown) {
277+
logs.callbackExceptions.error(
278+
'message (ID %s, ackID %s) caused a user callback exception: %o',
279+
message.id,
280+
message.ackId,
281+
e,
282+
);
283+
this._subscriber.emit(
284+
'debug',
285+
new DebugMessage('error during user callback', e as Error),
286+
);
287+
}
230288
});
231289
}
232290
}
@@ -265,6 +323,11 @@ export class LeaseManager extends EventEmitter {
265323
message.subSpans.modAckStart(deadline, false);
266324
}
267325
} else {
326+
logs.expiry.warn(
327+
'message (ID %s, ackID %s) has been dropped from leasing due to a timeout',
328+
message.id,
329+
message.ackId,
330+
);
268331
this.remove(message);
269332
}
270333
}

src/logs.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
import {loggingUtils} from 'google-gax';
16+
17+
/**
18+
* Base logger. Other loggers will derive from this one.
19+
*
20+
* @private
21+
*/
22+
export const logs = {
23+
pubsub: loggingUtils.log('pubsub'),
24+
};

src/message-queues.ts

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,16 @@ import {Duration} from './temporal';
3434
import {addToBucket} from './util';
3535
import {DebugMessage} from './debug';
3636
import * as tracing from './telemetry-tracing';
37+
import {logs as baseLogs} from './logs';
38+
39+
/**
40+
* Loggers. Exported for unit tests.
41+
*
42+
* @private
43+
*/
44+
export const logs = {
45+
ackBatch: baseLogs.pubsub.sublog('ack-batch'),
46+
};
3747

3848
export interface ReducedMessage {
3949
ackId: string;
@@ -147,6 +157,8 @@ export abstract class MessageQueue {
147157
this.setOptions(options);
148158
}
149159

160+
protected abstract getType(): string;
161+
150162
/**
151163
* Shuts down this message queue gracefully. Any acks/modAcks pending in
152164
* the queue or waiting for retry will be removed. If exactly-once delivery
@@ -188,6 +200,20 @@ export abstract class MessageQueue {
188200
return this._options!.maxMilliseconds!;
189201
}
190202

203+
/*!
204+
* Logs about a batch being sent, and why.
205+
* @private
206+
*/
207+
private logBatch(reason: string) {
208+
logs.ackBatch.info(
209+
'%s triggered %s batch of %i messages, a total of %i bytes',
210+
reason,
211+
this.getType(),
212+
this._requests.length,
213+
this.bytes,
214+
);
215+
}
216+
191217
/**
192218
* Adds a message to the queue.
193219
*
@@ -213,7 +239,11 @@ export abstract class MessageQueue {
213239
this._requests.length + 1 >= maxMessages! ||
214240
this.bytes + size >= MAX_BATCH_BYTES
215241
) {
216-
await this.flush();
242+
const reason =
243+
this._requests.length + 1 >= maxMessages!
244+
? 'going over count'
245+
: 'going over size';
246+
await this.flush(reason);
217247
}
218248

219249
// Add the message to the current batch.
@@ -233,7 +263,10 @@ export abstract class MessageQueue {
233263

234264
// Ensure that we are counting toward maxMilliseconds by timer.
235265
if (!this._timer) {
236-
this._timer = setTimeout(() => this.flush(), maxMilliseconds!);
266+
this._timer = setTimeout(
267+
() => this.flush('batch timer'),
268+
maxMilliseconds!,
269+
);
237270
}
238271

239272
return responsePromise.promise;
@@ -263,7 +296,7 @@ export abstract class MessageQueue {
263296
// Make sure we actually do have another batch scheduled.
264297
if (!this._timer) {
265298
this._timer = setTimeout(
266-
() => this.flush(),
299+
() => this.flush('retry timer'),
267300
this._options.maxMilliseconds!,
268301
);
269302
}
@@ -283,7 +316,11 @@ export abstract class MessageQueue {
283316
* Sends a batch of messages.
284317
* @private
285318
*/
286-
async flush(): Promise<void> {
319+
async flush(reason?: string): Promise<void> {
320+
if (reason) {
321+
this.logBatch(reason);
322+
}
323+
287324
if (this._timer) {
288325
clearTimeout(this._timer);
289326
delete this._timer;
@@ -493,6 +530,14 @@ export abstract class MessageQueue {
493530
* @class
494531
*/
495532
export class AckQueue extends MessageQueue {
533+
/**
534+
* @private
535+
* @returns The name of the items in this queue.
536+
*/
537+
protected getType(): string {
538+
return 'ack';
539+
}
540+
496541
/**
497542
* Sends a batch of ack requests.
498543
*
@@ -555,6 +600,14 @@ export class AckQueue extends MessageQueue {
555600
* @class
556601
*/
557602
export class ModAckQueue extends MessageQueue {
603+
/**
604+
* @private
605+
* @returns The name of the items in this queue.
606+
*/
607+
protected getType(): string {
608+
return 'modack/nack';
609+
}
610+
558611
/**
559612
* Sends a batch of modAck requests. Each deadline requires its own request,
560613
* so we have to group all the ackIds by deadline and send multiple requests.

0 commit comments

Comments
 (0)