Skip to content

Commit f734b3e

Browse files
committed
src: give StreamBases the capability to ask for data
Add a `OnStreamWantsWrite()` event that allows streams to ask for more input data if they want some. PR-URL: #18936 Reviewed-By: James M Snell <[email protected]> Reviewed-By: Matteo Collina <[email protected]>
1 parent c412150 commit f734b3e

File tree

4 files changed

+26
-0
lines changed

4 files changed

+26
-0
lines changed

src/node_http2.cc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2219,6 +2219,11 @@ ssize_t Http2Stream::Provider::Stream::OnRead(nghttp2_session* handle,
22192219
if (amount == 0 && stream->IsWritable()) {
22202220
CHECK(stream->queue_.empty());
22212221
DEBUG_HTTP2SESSION2(session, "deferring stream %d", id);
2222+
stream->EmitWantsWrite(length);
2223+
if (stream->available_outbound_length_ > 0 || !stream->IsWritable()) {
2224+
// EmitWantsWrite() did something interesting synchronously, restart:
2225+
return OnRead(handle, id, buf, length, flags, source, user_data);
2226+
}
22222227
return NGHTTP2_ERR_DEFERRED;
22232228
}
22242229

src/node_http2.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -573,6 +573,8 @@ class Http2Stream : public AsyncWrap,
573573
// Required for StreamBase
574574
int DoShutdown(ShutdownWrap* req_wrap) override;
575575

576+
bool HasWantsWrite() const override { return true; }
577+
576578
// Initiate a response on this stream.
577579
inline int SubmitResponse(nghttp2_nv* nva,
578580
size_t len,

src/stream_base-inl.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,13 @@ inline void StreamResource::EmitAfterShutdown(ShutdownWrap* w, int status) {
136136
listener_->OnStreamAfterShutdown(w, status);
137137
}
138138

139+
inline void StreamResource::EmitWantsWrite(size_t suggested_size) {
140+
#ifdef DEBUG
141+
v8::SealHandleScope handle_scope(v8::Isolate::GetCurrent());
142+
#endif
143+
listener_->OnStreamWantsWrite(suggested_size);
144+
}
145+
139146
inline StreamBase::StreamBase(Environment* env) : env_(env) {
140147
PushStreamListener(&default_listener_);
141148
}

src/stream_base.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,13 @@ class StreamListener {
131131
// (and raises an assertion if there is none).
132132
virtual void OnStreamAfterShutdown(ShutdownWrap* w, int status);
133133

134+
// This is called by the stream if it determines that it wants more data
135+
// to be written to it. Not all streams support this.
136+
// This callback will not be called as long as there are active writes.
137+
// It is not supported by all streams; `stream->HasWantsWrite()` returns
138+
// true if it is supported by a stream.
139+
virtual void OnStreamWantsWrite(size_t suggested_size) {}
140+
134141
// This is called immediately before the stream is destroyed.
135142
virtual void OnStreamDestroy() {}
136143

@@ -199,6 +206,9 @@ class StreamResource {
199206
size_t count,
200207
uv_stream_t* send_handle) = 0;
201208

209+
// Returns true if the stream supports the `OnStreamWantsWrite()` interface.
210+
virtual bool HasWantsWrite() const { return false; }
211+
202212
// Optionally, this may provide an error message to be used for
203213
// failing writes.
204214
virtual const char* Error() const;
@@ -222,6 +232,8 @@ class StreamResource {
222232
void EmitAfterWrite(WriteWrap* w, int status);
223233
// Call the current listener's OnStreamAfterShutdown() method.
224234
void EmitAfterShutdown(ShutdownWrap* w, int status);
235+
// Call the current listener's OnStreamWantsWrite() method.
236+
void EmitWantsWrite(size_t suggested_size);
225237

226238
StreamListener* listener_ = nullptr;
227239
uint64_t bytes_read_ = 0;

0 commit comments

Comments
 (0)