Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ For notes on migrating to 2.x / 0.200.x see [the upgrade guide](doc/upgrade-to-2

### :bug: Bug Fixes

* fix(sdk-logs): Fix the `batchLogProcessor` exporting only upon `_scheduledDelayMillis` and ignoring `maxExportBatchSize` [#5961](https://github.com/open-telemetry/opentelemetry-js/pull/5961) @jacksonweber

### :books: Documentation

### :house: Internal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,7 @@ export abstract class BatchLogRecordProcessorBase<T extends BufferConfig>
}

private _maybeStartTimer() {
if (this._timer !== undefined) {
return;
}
this._timer = setTimeout(() => {
const flush = () => {
this._flushOneBatch()
.then(() => {
if (this._finishedLogRecords.length > 0) {
Expand All @@ -161,7 +158,15 @@ export abstract class BatchLogRecordProcessorBase<T extends BufferConfig>
.catch(e => {
globalErrorHandler(e);
});
}, this._scheduledDelayMillis);
};
// we only wait if the queue doesn't have enough elements yet
if (this._finishedLogRecords.length >= this._maxExportBatchSize) {
return flush();
}
if (this._timer !== undefined) {
return;
}
this._timer = setTimeout(() => flush(), this._scheduledDelayMillis);
unrefTimer(this._timer);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,47 +141,175 @@ describe('BatchLogRecordProcessorBase', () => {

describe('onEmit', () => {
it('should export the log records with buffer size reached', done => {
const clock = sinon.useFakeTimers();
const processor = new BatchLogRecordProcessor(
exporter,
defaultBufferConfig
);
for (let i = 0; i < defaultBufferConfig.maxExportBatchSize; i++) {
// Add logs up to maxExportBatchSize - 1 (should not trigger export yet)
for (let i = 0; i < defaultBufferConfig.maxExportBatchSize - 1; i++) {
const logRecord = createLogRecord();
assert.strictEqual(exporter.getFinishedLogRecords().length, 0);
processor.onEmit(logRecord);
assert.strictEqual(exporter.getFinishedLogRecords().length, 0);
}
// Add the final log that reaches maxExportBatchSize (should trigger immediate export)
const logRecord = createLogRecord();
processor.onEmit(logRecord);
setTimeout(async () => {
// Should now have exported the batch immediately
assert.strictEqual(
exporter.getFinishedLogRecords().length,
defaultBufferConfig.maxExportBatchSize
);
await processor.shutdown();
assert.strictEqual(exporter.getFinishedLogRecords().length, 0);
done();
}, defaultBufferConfig.scheduledDelayMillis + 1000);
clock.tick(defaultBufferConfig.scheduledDelayMillis + 1000);
clock.restore();
}, 10); // Small delay to allow async export to complete
});

it('should force flush when timeout exceeded', done => {
const clock = sinon.useFakeTimers();
it('should export immediately when maxExportBatchSize is reached', async () => {
const processor = new BatchLogRecordProcessor(
exporter,
defaultBufferConfig
);
const exportSpy = sinon.spy(exporter, 'export');

// Add logs up to maxExportBatchSize - 1
for (let i = 0; i < defaultBufferConfig.maxExportBatchSize - 1; i++) {
const logRecord = createLogRecord();
processor.onEmit(logRecord);
assert.strictEqual(exporter.getFinishedLogRecords().length, 0);
sinon.assert.notCalled(exportSpy);
}

// Add the final log that should trigger immediate export
const logRecord = createLogRecord();
processor.onEmit(logRecord);

// Allow async operations to complete
await new Promise(resolve => setTimeout(resolve, 0));

// Verify export was called immediately
sinon.assert.calledOnce(exportSpy);
assert.strictEqual(
exporter.getFinishedLogRecords().length,
defaultBufferConfig.maxExportBatchSize
);

await processor.shutdown();
});

it('should export immediately without waiting for timer when batch size reached', async () => {
const processor = new BatchLogRecordProcessor(
exporter,
{ ...defaultBufferConfig, scheduledDelayMillis: 10000 } // Long delay
);
const exportSpy = sinon.spy(exporter, 'export');

// Fill the batch completely
for (let i = 0; i < defaultBufferConfig.maxExportBatchSize; i++) {
const logRecord = createLogRecord();
processor.onEmit(logRecord);
}

// Allow async operations to complete
await new Promise(resolve => setTimeout(resolve, 0));

// Verify export happened without waiting for the timer
sinon.assert.calledOnce(exportSpy);
assert.strictEqual(
exporter.getFinishedLogRecords().length,
defaultBufferConfig.maxExportBatchSize
);

await processor.shutdown();
});

it('should handle multiple immediate exports when batches fill up', async () => {
const processor = new BatchLogRecordProcessor(
exporter,
defaultBufferConfig
);
const exportSpy = sinon.spy(exporter, 'export');

// Fill up exactly 2 batches worth of logs
const totalLogs = defaultBufferConfig.maxExportBatchSize * 2;
for (let i = 0; i < totalLogs; i++) {
const logRecord = createLogRecord();
processor.onEmit(logRecord);
}

// Allow async operations to complete
await new Promise(resolve => setTimeout(resolve, 10));

// Should have called export twice (once per filled batch)
sinon.assert.calledTwice(exportSpy);
assert.strictEqual(exporter.getFinishedLogRecords().length, totalLogs);

await processor.shutdown();
});

it('should still use timer for partial batches after immediate export', async () => {
const processor = new BatchLogRecordProcessor(
exporter,
{ ...defaultBufferConfig, scheduledDelayMillis: 100 } // Shorter delay for testing
);
const exportSpy = sinon.spy(exporter, 'export');

// Fill one complete batch - should trigger immediate export
for (let i = 0; i < defaultBufferConfig.maxExportBatchSize; i++) {
const logRecord = createLogRecord();
processor.onEmit(logRecord);
}

// Allow immediate export to complete
await new Promise(resolve => setTimeout(resolve, 0));

// Verify first batch was exported immediately
sinon.assert.calledOnce(exportSpy);
const exportedSoFar = exporter.getFinishedLogRecords().length;
assert.strictEqual(exportedSoFar, defaultBufferConfig.maxExportBatchSize);

// Add one more log (partial batch)
const logRecord = createLogRecord();
processor.onEmit(logRecord);

// Should not export yet (still waiting for timer)
assert.strictEqual(exportSpy.callCount, 1); // Still only one call from the immediate export

// Wait for timer to trigger the partial batch export
await new Promise(resolve => setTimeout(resolve, 200)); // Wait longer than scheduledDelayMillis

// Should have exported the partial batch
sinon.assert.calledTwice(exportSpy);
assert.strictEqual(
exporter.getFinishedLogRecords().length,
exportedSoFar + 1
);

await processor.shutdown();
});

it('should force flush when timeout exceeded for partial batches', done => {
const clock = sinon.useFakeTimers();
const processor = new BatchLogRecordProcessor(
exporter,
defaultBufferConfig
);
// Add only a partial batch (less than maxExportBatchSize)
const partialBatchSize = Math.floor(
defaultBufferConfig.maxExportBatchSize / 2
);
for (let i = 0; i < partialBatchSize; i++) {
const logRecord = createLogRecord();
processor.onEmit(logRecord);
assert.strictEqual(exporter.getFinishedLogRecords().length, 0);
}
setTimeout(() => {
// Should export the partial batch after timeout
assert.strictEqual(
exporter.getFinishedLogRecords().length,
defaultBufferConfig.maxExportBatchSize
partialBatchSize
);
done();
}, defaultBufferConfig.scheduledDelayMillis + 1000);
Expand Down Expand Up @@ -265,32 +393,58 @@ describe('BatchLogRecordProcessorBase', () => {
});
});

it('should drop logRecords when there are more logRecords then "maxQueueSize"', () => {
it('should drop logRecords when there are more logRecords than "maxQueueSize"', () => {
// Use a large batch size to prevent automatic exports during this test
const maxQueueSize = 6;
const processor = new BatchLogRecordProcessor(exporter, { maxQueueSize });
const maxExportBatchSize = 20; // Will be clamped to maxQueueSize (6) by constructor
const processor = new BatchLogRecordProcessor(exporter, {
maxQueueSize,
maxExportBatchSize,
});

// Verify that maxExportBatchSize was adjusted to maxQueueSize
assert.strictEqual(processor['_maxExportBatchSize'], maxQueueSize);

// Disable the processor temporarily to prevent exports during testing
const originalMaybeStartTimer = processor['_maybeStartTimer'];
processor['_maybeStartTimer'] = () => {}; // Do nothing

const logRecord = createLogRecord();
for (let i = 0; i < maxQueueSize + 10; i++) {

// Add more logs than the queue can hold
for (let i = 0; i < maxQueueSize + 5; i++) {
processor.onEmit(logRecord);
}
assert.strictEqual(processor['_finishedLogRecords'].length, 6);

// Should only have maxQueueSize logs in the buffer, others should be dropped
assert.strictEqual(processor['_finishedLogRecords'].length, maxQueueSize);

// Restore original method
processor['_maybeStartTimer'] = originalMaybeStartTimer;

processor.shutdown();
});
});

describe('forceFlush', () => {
it('should force flush on demand', () => {
it('should force flush on demand', async () => {
const processor = new BatchLogRecordProcessor(
exporter,
defaultBufferConfig
);
for (let i = 0; i < defaultBufferConfig.maxExportBatchSize; i++) {
// Add partial batch (less than maxExportBatchSize) so it doesn't export automatically
const partialBatchSize = Math.floor(
defaultBufferConfig.maxExportBatchSize / 2
);
for (let i = 0; i < partialBatchSize; i++) {
const logRecord = createLogRecord();
processor.onEmit(logRecord);
}
assert.strictEqual(exporter.getFinishedLogRecords().length, 0);
processor.forceFlush();
await processor.forceFlush();
assert.strictEqual(
exporter.getFinishedLogRecords().length,
defaultBufferConfig.maxExportBatchSize
partialBatchSize
);
});

Expand Down
Loading