diff --git a/.circleci/config.yml b/.circleci/config.yml index d690389..295d01c 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -7,7 +7,7 @@ jobs: build: docker: # specify the version you desire here - - image: circleci/node:8.11 + - image: circleci/node:10.0.0 working_directory: ~/repo diff --git a/README.md b/README.md index ae7190f..a9ee162 100644 --- a/README.md +++ b/README.md @@ -53,6 +53,31 @@ async function logMessagesFromFooBar() { logMessagesFromFooBar(); ``` +If you're dealing with large rosbag files and need flow control to process messages async, use the Async Generator `iterateMessages`. + +```js +const { open } = require('rosbag'); + +async function readWithAsyncConsumer () { + const options = { topics: ['/unicorns'] }; + const bag = await open("filename.bag"); + for await (const msg of bag.iterateMessages(options)) { + // each message is read separately + // respecting async behavior in the for loop + await doSomethingAsync(msg); + } +} + +readWithAsyncConsumer() + .then(() => console.log("DONE")); + +``` + +That way you can also create a readable message stream +```js + require("stream").Readable.from(bag.iterateMessages(options)) +``` + ## API ### Opening a new rosbag reader diff --git a/package.json b/package.json index 1d45e96..cbbccda 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "rosbag", - "version": "2.6.1", + "version": "2.6.3", "license": "Apache-2.0", "repository": "cruise-automation/rosbag.js", "dependencies": { @@ -9,7 +9,7 @@ "int53": "1.0.0" }, "engines": { - "node": ">=8.0.0" + "node": ">=10.0.0" }, "main": "dist/node", "browser": "dist/web", diff --git a/src/bag.js b/src/bag.js index 383a86a..c37c157 100644 --- a/src/bag.js +++ b/src/bag.js @@ -74,6 +74,12 @@ export default class Bag { } async readMessages(opts: ReadOptions, callback: (msg: ReadResult) => void) { + for await (const msg of this.iterateMessages(opts)) { + callback(msg); + } + } + + async *iterateMessages(opts: ReadOptions): AsyncGenerator, void, number> { const connections = this.connections; const startTime = opts.startTime || { sec: 0, nsec: 0 }; @@ -121,7 +127,9 @@ export default class Bag { endTime, decompress ); - messages.forEach((msg) => callback(parseMsg(msg, i))); + for (const msg of messages) { + if (yield parseMsg(msg, i)) return; + } } } } diff --git a/src/bag.test.js b/src/bag.test.js index b3be8ac..0bd26e6 100644 --- a/src/bag.test.js +++ b/src/bag.test.js @@ -211,6 +211,23 @@ describe("rosbag - high-level api", () => { expect(messages).toHaveLength(9); }); + it("reads at consumer speed and abort reading on demand", async () => { + const delay = (ms) => new Promise((resolve) => setTimeout(resolve, ms)); + const opts = { topics: ["/tf"] }; + const bag = await Bag.open(getFixture()); + const messages = bag.iterateMessages(opts || {}); + const r1 = await messages.next(); + expect(r1.value.timestamp.nsec).toBe(56251251); + expect(r1.done).toBe(false); + await delay(100); + const r2 = await messages.next(); + expect(r2.value.timestamp.nsec).toBe(56262848); + expect(r2.done).toBe(false); + await delay(100); + const r3 = await messages.next(true); + expect(r3.done).toBe(true); + }); + describe("compression", () => { it("throws if compression scheme is not registered", async () => { let errorThrown = false; diff --git a/src/fields.js b/src/fields.js index ca5d6c1..ab1fc13 100644 --- a/src/fields.js +++ b/src/fields.js @@ -11,6 +11,7 @@ import type { Time } from "./types"; // reads through a buffer and extracts { [key: string]: value: string } // pairs - the buffer is expected to have length prefixed utf8 strings // with a '=' separating the key and value +const EQUALS_CHARCODE = "=".charCodeAt(0); export function extractFields(buffer: Buffer) { if (buffer.length < 4) { throw new Error("Header fields are truncated."); @@ -27,8 +28,10 @@ export function extractFields(buffer: Buffer) { throw new Error("Header fields are corrupt."); } + // Passing a number into "indexOf" explicitly to avoid Buffer polyfill + // slow path. See issue #87. const field = buffer.slice(i, i + length); - const index = field.indexOf("="); + const index = field.indexOf(EQUALS_CHARCODE); if (index === -1) { throw new Error("Header field is missing equals sign."); } diff --git a/src/types.js b/src/types.js index a658407..3a37215 100644 --- a/src/types.js +++ b/src/types.js @@ -23,25 +23,19 @@ export interface Filelike { size(): number; } -export type RosMsgField = - | {| - type: string, - name: string, - isConstant?: boolean, - isComplex?: boolean, - value?: mixed, - isArray?: false, - arrayLength?: void, - |} - | {| - type: string, - name: string, - isConstant?: boolean, - isComplex?: boolean, - value?: mixed, - isArray: true, - arrayLength: ?number, - |}; +export type RosMsgField = {| + type: string, + name: string, + isComplex?: boolean, + + // For arrays + isArray?: boolean, + arrayLength?: ?number, + + // For constants + isConstant?: boolean, + value?: mixed, +|}; export type RosMsgDefinition = {| name?: string,