Skip to content

http2: read from connection stream before session create #17132

Closed
@webcarrot

Description

@webcarrot

Version: v9.2.0
Platform: Linux develop 4.9.0-4-amd64 SMP Debian 4.9.51-1 (2017-09-28) x86_64 GNU/Linux
Subsystem: http2

Sorry for my english.

I try to do some action (read then unshift data back) with stream on "connection" event but before http2 connectionListener do its job (make session etc) ... and i`m not happy: after read, stream is no longer handled properly by http2 system and i must pack it into ugly Duplex stream.

My question: how to read data from stream, push it back and keep stream usable for http2 module?

Or maybe its a bug - https and http module handle properly such streams.

In this example i do pseudo alpn negotiation but i really need this to support PROXY protocol.

"use strict";

const http2 = require("http2");
const { Duplex } = require("stream");

// ...
const Http2Server = http2.createServer().constructor;

// HTTP2 preface from node-spdy
const PREFACE_BUFFER = Buffer. from ("PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n");
const PREFACE_BUFFER_LENGTH = PREFACE_BUFFER.length;

// Ugly Duplex Proxy
const kWait = Symbol("wait");
const kNread = Symbol("nread");
const kSocket = Symbol("socket");
const kBuffer = Symbol("buffer");
class SocketProxy extends Duplex {

  constructor(socket, buffer) {
    super({
      allowHalfOpen: true,
      decodeStrings: false
    });
    this[kWait] = true;
    this[kNread] = -1;
    this[kSocket] = socket;
    this[kBuffer] = buffer;
    socket.on("error", err => this.emit("error", err));
    socket.on("data", chunk => this.addChunk(chunk));
    socket.once("end", () => {
      this[kWait] = false;
      this.tryRead();
      this.emit("end");
      this.destroy();
    });
  }

  _write(data, encoding, cb) {
    try {
      this[kSocket].write(data, encoding);
      cb();
    } catch (err) {
      cb(err);
    }
  }

  _writev(chunks, cb) {
    try {
      this[kSocket].writev(chunks);
      cb();
    } catch (err) {
      cb(err);
    }
  }

  _destroy(e, cb) {
    try {
      this[kSocket].destroy(e);
      cb();
    } catch (err) {
      cb(err);
    }
    delete this[kSocket];
    delete this[kBuffer];
  }

  _final(cb) {
    try {
      this[kSocket].final();
      cb();
    } catch (err) {
      cb(err);
    }
  }

  _read(nread) {
    if (this[kBuffer].length > 0) {
      const data = this[kBuffer].slice(0, nread);
      this[kBuffer] = this[kBuffer].slice(nread);
      return this.push(data);
    } else if (this[kWait]) {
      this[kNread] = nread;
    } else {
      return this.push(null);
    }
  }

  addChunk(chunk) {
    this[kBuffer] = Buffer.concat([this[kBuffer], chunk]);
    this.tryRead();
  }

  tryRead() {
    const nread = this[kNread];
    if (nread !== -1) {
      this[kNread] = -1;
      this._read(nread);
    }
  }

  get remoteAddress() {
    return this[kSocket].remoteAddress;
  }

  get remotePort() {
    return this[kSocket].remotePort;
  }
}

// new connection listener - read, unshift or create ugly proxy
function connectionListener(socket) {
  const onReadable = () => {
    // at this point socket is somehow "broken" for http2
    socket.removeListener("readable", onReadable);
    let buffer;
    let chunk = socket.read();
    while (null !== chunk) {
      buffer = buffer ? Buffer.concat([buffer, chunk]) : chunk;
      let isH2 = true;
      const bufferLength = buffer.length;
      if (bufferLength >= PREFACE_BUFFER_LENGTH) {
        isH2 = PREFACE_BUFFER.equals(buffer.slice(0, PREFACE_BUFFER_LENGTH));
      } else {
        isH2 = buffer.equals(PREFACE_BUFFER.slice(0, bufferLength));
      }
      if (!isH2 || bufferLength >= PREFACE_BUFFER_LENGTH) {
        if (!isH2) {
          // ... pseudo alpn negotiation
          Object.defineProperty(socket, "alpnProtocol", {
            value: false
          });
          this.emit("postAlpnConnection", socket);
          // httpConnectionListener support readed socekt
          socket.unshift(buffer);
        } else {
          // socket is broken so make proxy...
          const proxy = new SocketProxy(socket, buffer);
          this.emit("postAlpnConnection", proxy);
        }
        return;
      }
      chunk = socket.read();
    }
    this.emit("postAlpnConnection", socket);
    socket.destroy("No data");
  };
  socket.on("readable", onReadable);
}

// handle session shutdown...
let shutdownWrapper;
function getShutdownWrapper(session) {
  if (!shutdownWrapper) {
    const origShutdown = session.shutdown;
    shutdownWrapper = function wrapper(options, callback) {
      // callback === stream.destroy, ignore err object not recognised by JSStreamWrap(?)
      origShutdown.call(this, options, callback ? (/*err*/) => callback() : undefined);
    };
  }
  return shutdownWrapper;
}

class Server extends Http2Server {

  constructor(options, handler) {
    super(options, handler);
    this.listeners("connection").forEach(listener => this.on("postAlpnConnection", listener));
    this.removeAllListeners("connection");
    this.addListener("connection", connectionListener);
    this.addListener("session", session => session.shutdown = getShutdownWrapper(session));
  }

}

const createServer = (options, handler) => {
  if (typeof options === "function") {
    handler = options;
    options = Object.create(null);
  }
  return new Server(options, handler);
};

module.exports = {
  Server,
  createServer
};

PS. This exemple works but probably make memory leaks.

Metadata

Metadata

Assignees

No one assigned

    Labels

    http2Issues or PRs related to the http2 subsystem.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions