diff --git a/src/logger.ts b/src/logger.ts index 8157324b..7adf1263 100644 --- a/src/logger.ts +++ b/src/logger.ts @@ -25,6 +25,7 @@ export const LogId = { telemetryMetadataError: mongoLogId(1_002_005), telemetryDeviceIdFailure: mongoLogId(1_002_006), telemetryDeviceIdTimeout: mongoLogId(1_002_007), + telemetryContainerEnvFailure: mongoLogId(1_002_008), toolExecute: mongoLogId(1_003_001), toolExecuteFailure: mongoLogId(1_003_002), diff --git a/src/server.ts b/src/server.ts index b0e8e19c..9012fdf5 100644 --- a/src/server.ts +++ b/src/server.ts @@ -130,7 +130,7 @@ export class Server { } } - this.telemetry.emitEvents([event]).catch(() => {}); + this.telemetry.emitEvents([event]); } private registerTools() { diff --git a/src/telemetry/telemetry.ts b/src/telemetry/telemetry.ts index ccf0eb41..c6dfae51 100644 --- a/src/telemetry/telemetry.ts +++ b/src/telemetry/telemetry.ts @@ -7,114 +7,151 @@ import { MACHINE_METADATA } from "./constants.js"; import { EventCache } from "./eventCache.js"; import nodeMachineId from "node-machine-id"; import { getDeviceId } from "@mongodb-js/device-id"; +import fs from "fs/promises"; + +async function fileExists(filePath: string): Promise { + try { + await fs.access(filePath, fs.constants.F_OK); + return true; // File exists + } catch (e: unknown) { + if ( + e instanceof Error && + ( + e as Error & { + code: string; + } + ).code === "ENOENT" + ) { + return false; // File does not exist + } + throw e; // Re-throw unexpected errors + } +} -type EventResult = { - success: boolean; - error?: Error; -}; +async function isContainerized(): Promise { + if (process.env.container) { + return true; + } + + const exists = await Promise.all(["/.dockerenv", "/run/.containerenv", "/var/run/.containerenv"].map(fileExists)); -export const DEVICE_ID_TIMEOUT = 3000; + return exists.includes(true); +} export class Telemetry { - private isBufferingEvents: boolean = true; - /** Resolves when the device ID is retrieved or timeout occurs */ - public deviceIdPromise: Promise | undefined; private deviceIdAbortController = new AbortController(); private eventCache: EventCache; private getRawMachineId: () => Promise; + private getContainerEnv: () => Promise; + private cachedCommonProperties?: CommonProperties; + private flushing: boolean = false; private constructor( private readonly session: Session, private readonly userConfig: UserConfig, - private readonly commonProperties: CommonProperties, - { eventCache, getRawMachineId }: { eventCache: EventCache; getRawMachineId: () => Promise } + { + eventCache, + getRawMachineId, + getContainerEnv, + }: { + eventCache: EventCache; + getRawMachineId: () => Promise; + getContainerEnv: () => Promise; + } ) { this.eventCache = eventCache; this.getRawMachineId = getRawMachineId; + this.getContainerEnv = getContainerEnv; } static create( session: Session, userConfig: UserConfig, { - commonProperties = { ...MACHINE_METADATA }, eventCache = EventCache.getInstance(), getRawMachineId = () => nodeMachineId.machineId(true), + getContainerEnv = isContainerized, }: { eventCache?: EventCache; getRawMachineId?: () => Promise; - commonProperties?: CommonProperties; + getContainerEnv?: () => Promise; } = {} ): Telemetry { - const instance = new Telemetry(session, userConfig, commonProperties, { eventCache, getRawMachineId }); - - void instance.start(); - return instance; - } - - private async start(): Promise { - if (!this.isTelemetryEnabled()) { - return; - } - this.deviceIdPromise = getDeviceId({ - getMachineId: () => this.getRawMachineId(), - onError: (reason, error) => { - switch (reason) { - case "resolutionError": - logger.debug(LogId.telemetryDeviceIdFailure, "telemetry", String(error)); - break; - case "timeout": - logger.debug(LogId.telemetryDeviceIdTimeout, "telemetry", "Device ID retrieval timed out"); - break; - case "abort": - // No need to log in the case of aborts - break; - } - }, - abortSignal: this.deviceIdAbortController.signal, + const instance = new Telemetry(session, userConfig, { + eventCache, + getRawMachineId, + getContainerEnv, }); - this.commonProperties.device_id = await this.deviceIdPromise; - - this.isBufferingEvents = false; + return instance; } public async close(): Promise { this.deviceIdAbortController.abort(); - this.isBufferingEvents = false; - await this.emitEvents(this.eventCache.getEvents()); + await this.flush(); } /** * Emits events through the telemetry pipeline * @param events - The events to emit */ - public async emitEvents(events: BaseEvent[]): Promise { - try { - if (!this.isTelemetryEnabled()) { - logger.info(LogId.telemetryEmitFailure, "telemetry", `Telemetry is disabled.`); - return; - } - - await this.emit(events); - } catch { - logger.debug(LogId.telemetryEmitFailure, "telemetry", `Error emitting telemetry events.`); - } + public emitEvents(events: BaseEvent[]): void { + void this.flush(events); } /** * Gets the common properties for events * @returns Object containing common properties for all events */ - public getCommonProperties(): CommonProperties { - return { - ...this.commonProperties, - mcp_client_version: this.session.agentRunner?.version, - mcp_client_name: this.session.agentRunner?.name, - session_id: this.session.sessionId, - config_atlas_auth: this.session.apiClient.hasCredentials() ? "true" : "false", - config_connection_string: this.userConfig.connectionString ? "true" : "false", - }; + private async getCommonProperties(): Promise { + if (!this.cachedCommonProperties) { + let deviceId: string | undefined; + try { + deviceId = await getDeviceId({ + getMachineId: () => this.getRawMachineId(), + onError: (reason, error) => { + switch (reason) { + case "resolutionError": + logger.debug(LogId.telemetryDeviceIdFailure, "telemetry", String(error)); + break; + case "timeout": + logger.debug( + LogId.telemetryDeviceIdTimeout, + "telemetry", + "Device ID retrieval timed out" + ); + break; + case "abort": + // No need to log in the case of aborts + break; + } + }, + abortSignal: this.deviceIdAbortController.signal, + }); + } catch (error: unknown) { + const err = error instanceof Error ? error : new Error(String(error)); + logger.debug(LogId.telemetryDeviceIdFailure, "telemetry", err.message); + } + let containerEnv: boolean | undefined; + try { + containerEnv = await this.getContainerEnv(); + } catch (error: unknown) { + const err = error instanceof Error ? error : new Error(String(error)); + logger.debug(LogId.telemetryContainerEnvFailure, "telemetry", err.message); + } + this.cachedCommonProperties = { + ...MACHINE_METADATA, + mcp_client_version: this.session.agentRunner?.version, + mcp_client_name: this.session.agentRunner?.name, + session_id: this.session.sessionId, + config_atlas_auth: this.session.apiClient.hasCredentials() ? "true" : "false", + config_connection_string: this.userConfig.connectionString ? "true" : "false", + is_container_env: containerEnv ? "true" : "false", + device_id: deviceId, + }; + } + + return this.cachedCommonProperties; } /** @@ -135,60 +172,62 @@ export class Telemetry { } /** - * Attempts to emit events through authenticated and unauthenticated clients + * Attempts to flush events through authenticated and unauthenticated clients * Falls back to caching if both attempts fail */ - private async emit(events: BaseEvent[]): Promise { - if (this.isBufferingEvents) { - this.eventCache.appendEvents(events); + public async flush(events?: BaseEvent[]): Promise { + if (!this.isTelemetryEnabled()) { + logger.info(LogId.telemetryEmitFailure, "telemetry", `Telemetry is disabled.`); + return; + } + + if (this.flushing) { + this.eventCache.appendEvents(events ?? []); return; } - const cachedEvents = this.eventCache.getEvents(); - const allEvents = [...cachedEvents, ...events]; + this.flushing = true; - logger.debug( - LogId.telemetryEmitStart, - "telemetry", - `Attempting to send ${allEvents.length} events (${cachedEvents.length} cached)` - ); + try { + const cachedEvents = this.eventCache.getEvents(); + const allEvents = [...cachedEvents, ...(events ?? [])]; + + logger.debug( + LogId.telemetryEmitStart, + "telemetry", + `Attempting to send ${allEvents.length} events (${cachedEvents.length} cached)` + ); - const result = await this.sendEvents(this.session.apiClient, allEvents); - if (result.success) { + await this.sendEvents(this.session.apiClient, allEvents); this.eventCache.clearEvents(); logger.debug( LogId.telemetryEmitSuccess, "telemetry", `Sent ${allEvents.length} events successfully: ${JSON.stringify(allEvents, null, 2)}` ); - return; + } catch (error: unknown) { + logger.debug( + LogId.telemetryEmitFailure, + "telemetry", + `Error sending event to client: ${error instanceof Error ? error.message : String(error)}` + ); + this.eventCache.appendEvents(events ?? []); } - logger.debug( - LogId.telemetryEmitFailure, - "telemetry", - `Error sending event to client: ${result.error instanceof Error ? result.error.message : String(result.error)}` - ); - this.eventCache.appendEvents(events); + this.flushing = false; } /** * Attempts to send events through the provided API client */ - private async sendEvents(client: ApiClient, events: BaseEvent[]): Promise { - try { - await client.sendEvents( - events.map((event) => ({ - ...event, - properties: { ...this.getCommonProperties(), ...event.properties }, - })) - ); - return { success: true }; - } catch (error) { - return { - success: false, - error: error instanceof Error ? error : new Error(String(error)), - }; - } + private async sendEvents(client: ApiClient, events: BaseEvent[]): Promise { + const commonProperties = await this.getCommonProperties(); + + await client.sendEvents( + events.map((event) => ({ + ...event, + properties: { ...commonProperties, ...event.properties }, + })) + ); } } diff --git a/src/telemetry/types.ts b/src/telemetry/types.ts index d77cc010..05ce8f3f 100644 --- a/src/telemetry/types.ts +++ b/src/telemetry/types.ts @@ -71,4 +71,5 @@ export type CommonProperties = { config_atlas_auth?: TelemetryBoolSet; config_connection_string?: TelemetryBoolSet; session_id?: string; + is_container_env?: TelemetryBoolSet; } & CommonStaticProperties; diff --git a/src/tools/tool.ts b/src/tools/tool.ts index bcf886ea..1957acc7 100644 --- a/src/tools/tool.ts +++ b/src/tools/tool.ts @@ -74,12 +74,12 @@ export abstract class ToolBase { logger.debug(LogId.toolExecute, "tool", `Executing ${this.name} with args: ${JSON.stringify(args)}`); const result = await this.execute(...args); - await this.emitToolEvent(startTime, result, ...args).catch(() => {}); + this.emitToolEvent(startTime, result, ...args); return result; } catch (error: unknown) { logger.error(LogId.toolExecuteFailure, "tool", `Error executing ${this.name}: ${error as string}`); const toolResult = await this.handleError(error, args[0] as ToolArgs); - await this.emitToolEvent(startTime, toolResult, ...args).catch(() => {}); + this.emitToolEvent(startTime, toolResult, ...args); return toolResult; } }; @@ -179,11 +179,11 @@ export abstract class ToolBase { * @param result - Whether the command succeeded or failed * @param args - The arguments passed to the tool */ - private async emitToolEvent( + private emitToolEvent( startTime: number, result: CallToolResult, ...args: Parameters> - ): Promise { + ): void { if (!this.telemetry.isTelemetryEnabled()) { return; } @@ -209,6 +209,6 @@ export abstract class ToolBase { event.properties.project_id = metadata.projectId; } - await this.telemetry.emitEvents([event]); + this.telemetry.emitEvents([event]); } } diff --git a/tests/integration/telemetry.test.ts b/tests/integration/telemetry.test.ts deleted file mode 100644 index 522c1154..00000000 --- a/tests/integration/telemetry.test.ts +++ /dev/null @@ -1,28 +0,0 @@ -import { createHmac } from "crypto"; -import { Telemetry } from "../../src/telemetry/telemetry.js"; -import { Session } from "../../src/session.js"; -import { config } from "../../src/config.js"; -import nodeMachineId from "node-machine-id"; - -describe("Telemetry", () => { - it("should resolve the actual machine ID", async () => { - const actualId: string = await nodeMachineId.machineId(true); - - const actualHashedId = createHmac("sha256", actualId.toUpperCase()).update("atlascli").digest("hex"); - - const telemetry = Telemetry.create( - new Session({ - apiBaseUrl: "", - }), - config - ); - - expect(telemetry.getCommonProperties().device_id).toBe(undefined); - expect(telemetry["isBufferingEvents"]).toBe(true); - - await telemetry.deviceIdPromise; - - expect(telemetry.getCommonProperties().device_id).toBe(actualHashedId); - expect(telemetry["isBufferingEvents"]).toBe(false); - }); -}); diff --git a/tests/unit/telemetry.test.ts b/tests/unit/telemetry.test.ts index c1ae28ea..fc4e8f8b 100644 --- a/tests/unit/telemetry.test.ts +++ b/tests/unit/telemetry.test.ts @@ -1,6 +1,6 @@ import { ApiClient } from "../../src/common/atlas/apiClient.js"; import { Session } from "../../src/session.js"; -import { DEVICE_ID_TIMEOUT, Telemetry } from "../../src/telemetry/telemetry.js"; +import { Telemetry } from "../../src/telemetry/telemetry.js"; import { BaseEvent, TelemetryResult } from "../../src/telemetry/types.js"; import { EventCache } from "../../src/telemetry/eventCache.js"; import { config } from "../../src/config.js"; @@ -16,6 +16,8 @@ const MockApiClient = ApiClient as jest.MockedClass; jest.mock("../../src/telemetry/eventCache.js"); const MockEventCache = EventCache as jest.MockedClass; +const nextTick = () => new Promise((resolve) => process.nextTick(resolve)); + describe("Telemetry", () => { const machineId = "test-machine-id"; const hashedMachineId = createHmac("sha256", machineId.toUpperCase()).update("atlascli").digest("hex"); @@ -24,6 +26,11 @@ describe("Telemetry", () => { let mockEventCache: jest.Mocked; let session: Session; let telemetry: Telemetry; + let telemetryConfig: { + eventCache: EventCache; + getRawMachineId: () => Promise; + getContainerEnv: () => Promise; + }; // Helper function to create properly typed test events function createTestEvent(options?: { @@ -77,11 +84,10 @@ describe("Telemetry", () => { expect(appendEvents.length).toBe(appendEventsCalls); if (sendEventsCalledWith) { - expect(sendEvents[0]?.[0]).toEqual( + expect(sendEvents[0]?.[0]).toMatchObject( sendEventsCalledWith.map((event) => ({ ...event, properties: { - ...telemetry.getCommonProperties(), ...event.properties, }, })) @@ -125,10 +131,13 @@ describe("Telemetry", () => { setAgentRunner: jest.fn().mockResolvedValue(undefined), } as unknown as Session; - telemetry = Telemetry.create(session, config, { + telemetryConfig = { eventCache: mockEventCache, getRawMachineId: () => Promise.resolve(machineId), - }); + getContainerEnv: () => Promise.resolve(false), + }; + + telemetry = Telemetry.create(session, config, telemetryConfig); config.telemetry = "enabled"; }); @@ -138,7 +147,8 @@ describe("Telemetry", () => { it("should send events successfully", async () => { const testEvent = createTestEvent(); - await telemetry.emitEvents([testEvent]); + telemetry.emitEvents([testEvent]); + await nextTick(); // wait for the event to be sent verifyMockCalls({ sendEventsCalls: 1, @@ -152,7 +162,8 @@ describe("Telemetry", () => { const testEvent = createTestEvent(); - await telemetry.emitEvents([testEvent]); + telemetry.emitEvents([testEvent]); + await nextTick(); // wait for the event to be sent verifyMockCalls({ sendEventsCalls: 1, @@ -175,7 +186,8 @@ describe("Telemetry", () => { // Set up mock to return cached events mockEventCache.getEvents.mockReturnValueOnce([cachedEvent]); - await telemetry.emitEvents([newEvent]); + telemetry.emitEvents([newEvent]); + await nextTick(); // wait for the event to be sent verifyMockCalls({ sendEventsCalls: 1, @@ -184,9 +196,7 @@ describe("Telemetry", () => { }); }); - it("should correctly add common properties to events", () => { - const commonProps = telemetry.getCommonProperties(); - + it("should correctly add common properties to events", async () => { // Use explicit type assertion const expectedProps: Record = { mcp_client_version: "1.0.0", @@ -197,48 +207,61 @@ describe("Telemetry", () => { device_id: hashedMachineId, }; - expect(commonProps).toMatchObject(expectedProps); - }); + const testEvent = createTestEvent(); - describe("machine ID resolution", () => { - beforeEach(() => { - jest.clearAllMocks(); - jest.useFakeTimers(); - }); + telemetry.emitEvents([testEvent]); + await nextTick(); // wait for the event to be sent - afterEach(() => { - jest.clearAllMocks(); - jest.useRealTimers(); + const checkEvent = { + ...testEvent, + properties: { + ...testEvent.properties, + ...expectedProps, + }, + }; + + verifyMockCalls({ + sendEventsCalls: 1, + clearEventsCalls: 1, + sendEventsCalledWith: [checkEvent], }); + }); + describe("machine ID resolution", () => { it("should successfully resolve the machine ID", async () => { - telemetry = Telemetry.create(session, config, { - getRawMachineId: () => Promise.resolve(machineId), + const testEvent = createTestEvent(); + + telemetry.emitEvents([testEvent]); + await nextTick(); // wait for the event to be sent + + const checkEvent = { + ...testEvent, + properties: { + ...testEvent.properties, + device_id: hashedMachineId, + }, + }; + + verifyMockCalls({ + sendEventsCalls: 1, + clearEventsCalls: 1, + sendEventsCalledWith: [checkEvent], }); - - expect(telemetry["isBufferingEvents"]).toBe(true); - expect(telemetry.getCommonProperties().device_id).toBe(undefined); - - await telemetry.deviceIdPromise; - - expect(telemetry["isBufferingEvents"]).toBe(false); - expect(telemetry.getCommonProperties().device_id).toBe(hashedMachineId); }); it("should handle machine ID resolution failure", async () => { const loggerSpy = jest.spyOn(logger, "debug"); telemetry = Telemetry.create(session, config, { + ...telemetryConfig, getRawMachineId: () => Promise.reject(new Error("Failed to get device ID")), }); - expect(telemetry["isBufferingEvents"]).toBe(true); - expect(telemetry.getCommonProperties().device_id).toBe(undefined); + const testEvent = createTestEvent(); - await telemetry.deviceIdPromise; + telemetry.emitEvents([testEvent]); - expect(telemetry["isBufferingEvents"]).toBe(false); - expect(telemetry.getCommonProperties().device_id).toBe("unknown"); + await nextTick(); // wait for the event to be sent expect(loggerSpy).toHaveBeenCalledWith( LogId.telemetryDeviceIdFailure, @@ -247,27 +270,28 @@ describe("Telemetry", () => { ); }); - it("should timeout if machine ID resolution takes too long", async () => { + it("should timeout if machine ID resolution takes too long", () => { const loggerSpy = jest.spyOn(logger, "debug"); - telemetry = Telemetry.create(session, config, { getRawMachineId: () => new Promise(() => {}) }); + jest.useFakeTimers(); - expect(telemetry["isBufferingEvents"]).toBe(true); - expect(telemetry.getCommonProperties().device_id).toBe(undefined); + telemetry = Telemetry.create(session, config, { + ...telemetryConfig, + getRawMachineId: () => new Promise(() => {}), // Never resolves + }); + + const testEvent = createTestEvent(); - jest.advanceTimersByTime(DEVICE_ID_TIMEOUT / 2); + telemetry.emitEvents([testEvent]); - // Make sure the timeout doesn't happen prematurely. - expect(telemetry["isBufferingEvents"]).toBe(true); - expect(telemetry.getCommonProperties().device_id).toBe(undefined); + jest.advanceTimersByTime(5000); - jest.advanceTimersByTime(DEVICE_ID_TIMEOUT); + jest.useRealTimers(); - await telemetry.deviceIdPromise; + expect(loggerSpy).toHaveBeenCalledTimes(2); - expect(telemetry.getCommonProperties().device_id).toBe("unknown"); - expect(telemetry["isBufferingEvents"]).toBe(false); - expect(loggerSpy).toHaveBeenCalledWith( + expect(loggerSpy).toHaveBeenNthCalledWith( + 2, LogId.telemetryDeviceIdTimeout, "telemetry", "Device ID retrieval timed out" @@ -288,9 +312,12 @@ describe("Telemetry", () => { it("should not send events", async () => { const testEvent = createTestEvent(); - await telemetry.emitEvents([testEvent]); + telemetry.emitEvents([testEvent]); + await nextTick(); // wait for the event to be sent - verifyMockCalls(); + verifyMockCalls({ + sendEventsCalls: 0, + }); }); }); @@ -313,9 +340,12 @@ describe("Telemetry", () => { it("should not send events", async () => { const testEvent = createTestEvent(); - await telemetry.emitEvents([testEvent]); + telemetry.emitEvents([testEvent]); + await nextTick(); // wait for the event to be sent - verifyMockCalls(); + verifyMockCalls({ + sendEventsCalls: 0, + }); }); }); });