Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ jobs:
build:
docker:
# specify the version you desire here
- image: circleci/node:8.11
- image: circleci/node:10.0.0

working_directory: ~/repo

Expand Down
25 changes: 25 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,31 @@ async function logMessagesFromFooBar() {
logMessagesFromFooBar();
```

If you're dealing with large rosbag files and need flow control to process messages async, use the Async Generator `iterateMessages`.

```js
const { open } = require('rosbag');

async function readWithAsyncConsumer () {
const options = { topics: ['/unicorns'] };
const bag = await open("filename.bag");
for await (const msg of bag.iterateMessages(options)) {
// each message is read separately
// respecting async behavior in the for loop
await doSomethingAsync(msg);
}
}

readWithAsyncConsumer()
.then(() => console.log("DONE"));

```

That way you can also create a readable message stream
```js
require("stream").Readable.from(bag.iterateMessages(options))
```

## API

### Opening a new rosbag reader
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"int53": "1.0.0"
},
"engines": {
"node": ">=8.0.0"
"node": ">=10.0.0"
},
"main": "dist/node",
"browser": "dist/web",
Expand Down
10 changes: 9 additions & 1 deletion src/bag.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ export default class Bag {
}

async readMessages(opts: ReadOptions, callback: (msg: ReadResult<any>) => void) {
for await (const msg of this.iterateMessages(opts)) {
callback(msg);
}
}

async *iterateMessages(opts: ReadOptions): AsyncGenerator<ReadResult<any>, void, number> {
const connections = this.connections;

const startTime = opts.startTime || { sec: 0, nsec: 0 };
Expand Down Expand Up @@ -119,7 +125,9 @@ export default class Bag {
endTime,
decompress
);
messages.forEach((msg) => callback(parseMsg(msg, i)));
for (const msg of messages) {
if (yield parseMsg(msg, i)) return;
}
}
}
}
17 changes: 17 additions & 0 deletions src/bag.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,23 @@ describe("rosbag - high-level api", () => {
expect(messages).toHaveLength(9);
});

it("reads at consumer speed and abort reading on demand", async () => {
const delay = (ms) => new Promise((resolve) => setTimeout(resolve, ms));
const opts = { topics: ["/tf"] };
const bag = await Bag.open(getFixture());
const messages = bag.iterateMessages(opts || {});
const r1 = await messages.next();
expect(r1.value.timestamp.nsec).toBe(56251251);
expect(r1.done).toBe(false);
await delay(100);
const r2 = await messages.next();
expect(r2.value.timestamp.nsec).toBe(56262848);
expect(r2.done).toBe(false);
await delay(100);
const r3 = await messages.next(true);
expect(r3.done).toBe(true);
});

describe("compression", () => {
it("throws if compression scheme is not registered", async () => {
let errorThrown = false;
Expand Down