Skip to content

Stream data loss after multiple write calls #3973

Closed as not planned
Closed as not planned
@mStirner

Description

@mStirner

Details

When i have two write calls directly behind each other, the second payload get lost.

The following output is what i get. The line marked with * indicates the last message from the pipeline/data flow. Everything after this line, is from the second write call.

[dispatcher:server] received no pending uuid
Server local #1 Initiated by dispatcher 1
[dispatcher:disp1] received pending uuid
*Dispatcher 1 chain done: #1 Initiated by dispatcher
[dispatcher:server] received no pending uuid
Server local #2 Initiated by dispatcher 1

But as you can see, the pipeline execution is incomplete compared to the first one. I dont know why, i seems that in the dispatcher function the call cb(null, JSON.stringify(chunk)); does nothing.

When i execute that with attached debugger, the debugger disconnect after completing the first chain/write call. The last message shown is "Server local #2 Initiated by dispatcher 1" which means somehow the data from the second call is ready to be transmitted further in the chain, but the Transform stream in the dispatcher does not what it should.

The logic behind the dataflow is actually very simple:

-> [dispatcher] -> [dispatcher] -> [dispatcher] -> |
|                                                  |
|---------------------------------------------------

If i wait that the first round completes ("Dispatcher 1 chain done: #1 Initiated by dispatcher" is logged) and then do the second t1 call with #2 Initiated by dispatcher 1 Everything works as expected.

Node.js version

v16.17.0

Example code

Minimal reproducible example:

const { randomUUID } = require("crypto");
const { PassThrough, Transform } = require("stream");


const dispatcher = (local, name) => {

    let PENDING_EVENTS = new Map();

    let middleware = new Transform({
        transform(chunk, enc, cb) {

            chunk = String(chunk);
            chunk = JSON.parse(chunk);

            debugger;

            //console.log(`[dispatcher]`, chunk);

            if (PENDING_EVENTS.has(chunk.uuid)) {

                console.log(`[dispatcher:${name}] received pending uuid`);

                debugger;

                let fnc = PENDING_EVENTS.get(chunk.uuid);
                PENDING_EVENTS.delete(chunk.uuid);

                fnc(chunk.data);

            } else {

                console.log(`[dispatcher:${name}] received no pending uuid`);

                debugger;

                local(chunk.data, (data) => {

                    debugger;

                    chunk.data = data;
                    cb(null, JSON.stringify(chunk));
                    //this.push(JSON.stringify(chunk));

                });

            }

        }
    });

    let transmit = (data, cb) => {

        let msg = {
            uuid: randomUUID(),
            data
        };

        PENDING_EVENTS.set(msg.uuid, cb);
        middleware.push(JSON.stringify(msg));

    };

    return {
        middleware,
        transmit
    }

};


// PIPELINE GENERATION ---------------------

const { middleware: sm, transmit: st } = dispatcher((data, next) => {

    console.log("Server local", data);

    next(data)

}, "server");

const { middleware: m1, transmit: t1 } = dispatcher((data, next) => {

    console.log("Dispatcher1 local", data);

    next(data)

}, "disp1");

sm.pipe(m1).pipe(sm);

// PIPELINE GENERATION ---------------------


setTimeout(() => {

    t1("#1 Initiated by dispatcher 1", (data) => {
        console.log("Dispatcher 1 chain done:", data);
    });

    t1("#2 Initiated by dispatcher 1", (data) => {
        console.log("Dispatcher 1 chain done:", data);
    });

}, 1000);

Operating system

Ubuntu 18.04 LTS

Scope

code

Module and version

Not applicable.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions