Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions experimental/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ For notes on migrating to 2.x / 0.200.x see [the upgrade guide](doc/upgrade-to-2

### :bug: Bug Fixes

* fix(sdk-logs): Fix the `batchLogProcessor` exporting only upon `_scheduledDelayMillis` and ignoring `maxExportBatchSize` [#5961](https://github.com/open-telemetry/opentelemetry-js/pull/5961) @jacksonweber
* fix(otlp-grpc-exporter-base): fix GRPC exporter not sending the user-agent header [#5687](https://github.com/open-telemetry/opentelemetry-js/issues/5867) @david-luna

### :books: Documentation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ export abstract class BatchLogRecordProcessorBase<T extends BufferConfig>
private readonly _scheduledDelayMillis: number;
private readonly _exportTimeoutMillis: number;

private _isExporting = false;
private _finishedLogRecords: SdkLogRecord[] = [];
private _timer: NodeJS.Timeout | number | undefined;
private _shutdownOnce: BindOnceFuture<void>;
Expand Down Expand Up @@ -146,22 +147,28 @@ export abstract class BatchLogRecordProcessorBase<T extends BufferConfig>
}

private _maybeStartTimer() {
if (this._timer !== undefined) {
return;
}
this._timer = setTimeout(() => {
if (this._isExporting) return;
const flush = () => {
this._isExporting = true;
this._flushOneBatch()
.then(() => {
this._isExporting = false;
if (this._finishedLogRecords.length > 0) {
this._clearTimer();
this._maybeStartTimer();
}
})
.catch(e => {
this._isExporting = false;
globalErrorHandler(e);
});
}, this._scheduledDelayMillis);

};
// we only wait if the queue doesn't have enough elements yet
if (this._finishedLogRecords.length >= this._maxExportBatchSize) {
return flush();
}
if (this._timer !== undefined) return;
this._timer = setTimeout(() => flush(), this._scheduledDelayMillis);
// depending on runtime, this may be a 'number' or NodeJS.Timeout
if (typeof this._timer !== 'number') {
this._timer.unref();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import * as assert from 'assert';
import * as sinon from 'sinon';
import {
ExportResult,
ExportResultCode,
loggingErrorHandler,
setGlobalErrorHandler,
Expand All @@ -27,6 +28,7 @@ import {
LogRecordLimits,
SdkLogRecord,
InMemoryLogRecordExporter,
LogRecordExporter,
} from '../../../src';
import { BatchLogRecordProcessorBase } from '../../../src/export/BatchLogRecordProcessorBase';
import { reconfigureLimits } from '../../../src/config';
Expand Down Expand Up @@ -141,47 +143,175 @@ describe('BatchLogRecordProcessorBase', () => {

describe('onEmit', () => {
it('should export the log records with buffer size reached', done => {
const clock = sinon.useFakeTimers();
const processor = new BatchLogRecordProcessor(
exporter,
defaultBufferConfig
);
for (let i = 0; i < defaultBufferConfig.maxExportBatchSize; i++) {
// Add logs up to maxExportBatchSize - 1 (should not trigger export yet)
for (let i = 0; i < defaultBufferConfig.maxExportBatchSize - 1; i++) {
const logRecord = createLogRecord();
assert.strictEqual(exporter.getFinishedLogRecords().length, 0);
processor.onEmit(logRecord);
assert.strictEqual(exporter.getFinishedLogRecords().length, 0);
}
// Add the final log that reaches maxExportBatchSize (should trigger immediate export)
const logRecord = createLogRecord();
processor.onEmit(logRecord);
setTimeout(async () => {
// Should now have exported the batch immediately
assert.strictEqual(
exporter.getFinishedLogRecords().length,
defaultBufferConfig.maxExportBatchSize
);
await processor.shutdown();
assert.strictEqual(exporter.getFinishedLogRecords().length, 0);
done();
}, defaultBufferConfig.scheduledDelayMillis + 1000);
clock.tick(defaultBufferConfig.scheduledDelayMillis + 1000);
clock.restore();
}, 10); // Small delay to allow async export to complete
});

it('should force flush when timeout exceeded', done => {
const clock = sinon.useFakeTimers();
it('should export immediately when maxExportBatchSize is reached', async () => {
const processor = new BatchLogRecordProcessor(
exporter,
defaultBufferConfig
);
const exportSpy = sinon.spy(exporter, 'export');

// Add logs up to maxExportBatchSize - 1
for (let i = 0; i < defaultBufferConfig.maxExportBatchSize - 1; i++) {
const logRecord = createLogRecord();
processor.onEmit(logRecord);
assert.strictEqual(exporter.getFinishedLogRecords().length, 0);
sinon.assert.notCalled(exportSpy);
}

// Add the final log that should trigger immediate export
const logRecord = createLogRecord();
processor.onEmit(logRecord);

// Allow async operations to complete
await new Promise(resolve => setTimeout(resolve, 0));

// Verify export was called immediately
sinon.assert.calledOnce(exportSpy);
assert.strictEqual(
exporter.getFinishedLogRecords().length,
defaultBufferConfig.maxExportBatchSize
);

await processor.shutdown();
});

it('should export immediately without waiting for timer when batch size reached', async () => {
const processor = new BatchLogRecordProcessor(
exporter,
{ ...defaultBufferConfig, scheduledDelayMillis: 10000 } // Long delay
);
const exportSpy = sinon.spy(exporter, 'export');

// Fill the batch completely
for (let i = 0; i < defaultBufferConfig.maxExportBatchSize; i++) {
const logRecord = createLogRecord();
processor.onEmit(logRecord);
}

// Allow async operations to complete
await new Promise(resolve => setTimeout(resolve, 0));

// Verify export happened without waiting for the timer
sinon.assert.calledOnce(exportSpy);
assert.strictEqual(
exporter.getFinishedLogRecords().length,
defaultBufferConfig.maxExportBatchSize
);

await processor.shutdown();
});

it('should handle multiple immediate exports when batches fill up', async () => {
const processor = new BatchLogRecordProcessor(
exporter,
defaultBufferConfig
);
const exportSpy = sinon.spy(exporter, 'export');

// Fill up exactly 2 batches worth of logs
const totalLogs = defaultBufferConfig.maxExportBatchSize * 2;
for (let i = 0; i < totalLogs; i++) {
const logRecord = createLogRecord();
processor.onEmit(logRecord);
}

// Allow async operations to complete
await new Promise(resolve => setTimeout(resolve, 10));

// Should have called export twice (once per filled batch)
sinon.assert.calledTwice(exportSpy);
assert.strictEqual(exporter.getFinishedLogRecords().length, totalLogs);

await processor.shutdown();
});

it('should still use timer for partial batches after immediate export', async () => {
const processor = new BatchLogRecordProcessor(
exporter,
{ ...defaultBufferConfig, scheduledDelayMillis: 100 } // Shorter delay for testing
);
const exportSpy = sinon.spy(exporter, 'export');

// Fill one complete batch - should trigger immediate export
for (let i = 0; i < defaultBufferConfig.maxExportBatchSize; i++) {
const logRecord = createLogRecord();
processor.onEmit(logRecord);
}

// Allow immediate export to complete
await new Promise(resolve => setTimeout(resolve, 0));

// Verify first batch was exported immediately
sinon.assert.calledOnce(exportSpy);
const exportedSoFar = exporter.getFinishedLogRecords().length;
assert.strictEqual(exportedSoFar, defaultBufferConfig.maxExportBatchSize);

// Add one more log (partial batch)
const logRecord = createLogRecord();
processor.onEmit(logRecord);

// Should not export yet (still waiting for timer)
assert.strictEqual(exportSpy.callCount, 1); // Still only one call from the immediate export

// Wait for timer to trigger the partial batch export
await new Promise(resolve => setTimeout(resolve, 200)); // Wait longer than scheduledDelayMillis

// Should have exported the partial batch
sinon.assert.calledTwice(exportSpy);
assert.strictEqual(
exporter.getFinishedLogRecords().length,
exportedSoFar + 1
);

await processor.shutdown();
});

it('should force flush when timeout exceeded for partial batches', done => {
const clock = sinon.useFakeTimers();
const processor = new BatchLogRecordProcessor(
exporter,
defaultBufferConfig
);
// Add only a partial batch (less than maxExportBatchSize)
const partialBatchSize = Math.floor(
defaultBufferConfig.maxExportBatchSize / 2
);
for (let i = 0; i < partialBatchSize; i++) {
const logRecord = createLogRecord();
processor.onEmit(logRecord);
assert.strictEqual(exporter.getFinishedLogRecords().length, 0);
}
setTimeout(() => {
// Should export the partial batch after timeout
assert.strictEqual(
exporter.getFinishedLogRecords().length,
defaultBufferConfig.maxExportBatchSize
partialBatchSize
);
done();
}, defaultBufferConfig.scheduledDelayMillis + 1000);
Expand Down Expand Up @@ -265,32 +395,58 @@ describe('BatchLogRecordProcessorBase', () => {
});
});

it('should drop logRecords when there are more logRecords then "maxQueueSize"', () => {
it('should drop logRecords when there are more logRecords than "maxQueueSize"', () => {
// Use a large batch size to prevent automatic exports during this test
const maxQueueSize = 6;
const processor = new BatchLogRecordProcessor(exporter, { maxQueueSize });
const maxExportBatchSize = 20; // Will be clamped to maxQueueSize (6) by constructor
const processor = new BatchLogRecordProcessor(exporter, {
maxQueueSize,
maxExportBatchSize,
});

// Verify that maxExportBatchSize was adjusted to maxQueueSize
assert.strictEqual(processor['_maxExportBatchSize'], maxQueueSize);

// Disable the processor temporarily to prevent exports during testing
const originalMaybeStartTimer = processor['_maybeStartTimer'];
processor['_maybeStartTimer'] = () => {}; // Do nothing

const logRecord = createLogRecord();
for (let i = 0; i < maxQueueSize + 10; i++) {

// Add more logs than the queue can hold
for (let i = 0; i < maxQueueSize + 5; i++) {
processor.onEmit(logRecord);
}
assert.strictEqual(processor['_finishedLogRecords'].length, 6);

// Should only have maxQueueSize logs in the buffer, others should be dropped
assert.strictEqual(processor['_finishedLogRecords'].length, maxQueueSize);

// Restore original method
processor['_maybeStartTimer'] = originalMaybeStartTimer;

processor.shutdown();
});
});

describe('forceFlush', () => {
it('should force flush on demand', () => {
it('should force flush on demand', async () => {
const processor = new BatchLogRecordProcessor(
exporter,
defaultBufferConfig
);
for (let i = 0; i < defaultBufferConfig.maxExportBatchSize; i++) {
// Add partial batch (less than maxExportBatchSize) so it doesn't export automatically
const partialBatchSize = Math.floor(
defaultBufferConfig.maxExportBatchSize / 2
);
for (let i = 0; i < partialBatchSize; i++) {
const logRecord = createLogRecord();
processor.onEmit(logRecord);
}
assert.strictEqual(exporter.getFinishedLogRecords().length, 0);
processor.forceFlush();
await processor.forceFlush();
assert.strictEqual(
exporter.getFinishedLogRecords().length,
defaultBufferConfig.maxExportBatchSize
partialBatchSize
);
});

Expand Down Expand Up @@ -346,4 +502,44 @@ describe('BatchLogRecordProcessorBase', () => {
assert.strictEqual(exportedLogRecords, 1);
});
});

describe('Concurrency', () => {
it('should only send a single batch at a time', async () => {
const callbacks: ((result: ExportResult) => void)[] = [];
const logRecords: SdkLogRecord[] = [];
const exporter: LogRecordExporter = {
export: async (
exportedLogRecords: SdkLogRecord[],
resultCallback: (result: ExportResult) => void
) => {
callbacks.push(resultCallback);
logRecords.push(...exportedLogRecords);
},
shutdown: async () => {},
};
const processor = new BatchLogRecordProcessor(exporter, {
maxExportBatchSize: 5,
maxQueueSize: 6,
});
const totalLogRecords = 50;
for (let i = 0; i < totalLogRecords; i++) {
const logRecord = createLogRecord();
processor.onEmit(logRecord);
}
assert.equal(callbacks.length, 1);
assert.equal(logRecords.length, 5);
callbacks[0]({ code: ExportResultCode.SUCCESS });
await new Promise(resolve => setTimeout(resolve, 0));
// After the first batch completes we will have dropped a number
// of log records and the next batch will be smaller
assert.equal(callbacks.length, 2);
assert.equal(logRecords.length, 10);
callbacks[1]({ code: ExportResultCode.SUCCESS });

// We expect that all the other log records have been dropped
await new Promise(resolve => setTimeout(resolve, 0));
assert.equal(callbacks.length, 2);
assert.equal(logRecords.length, 10);
});
});
});