@@ -5,6 +5,7 @@ use std::{fmt, io};
5
5
use bytes:: { Buf , BytesMut } ;
6
6
use futures_core:: { ready, Stream } ;
7
7
use futures_sink:: Sink ;
8
+ use pin_project:: pin_project;
8
9
9
10
use crate :: { AsyncRead , AsyncWrite , Decoder , Encoder } ;
10
11
@@ -20,16 +21,16 @@ bitflags::bitflags! {
20
21
21
22
/// A unified `Stream` and `Sink` interface to an underlying I/O object, using
22
23
/// the `Encoder` and `Decoder` traits to encode and decode frames.
24
+ #[ pin_project]
23
25
pub struct Framed < T , U > {
26
+ #[ pin]
24
27
io : T ,
25
28
codec : U ,
26
29
flags : Flags ,
27
30
read_buf : BytesMut ,
28
31
write_buf : BytesMut ,
29
32
}
30
33
31
- impl < T , U > Unpin for Framed < T , U > { }
32
-
33
34
impl < T , U > Framed < T , U >
34
35
where
35
36
T : AsyncRead + AsyncWrite ,
@@ -185,17 +186,18 @@ impl<T, U> Framed<T, U> {
185
186
186
187
impl < T , U > Framed < T , U > {
187
188
/// Serialize item and Write to the inner buffer
188
- pub fn write ( & mut self , item : <U as Encoder >:: Item ) -> Result < ( ) , <U as Encoder >:: Error >
189
+ pub fn write ( mut self : Pin < & mut Self > , item : <U as Encoder >:: Item ) -> Result < ( ) , <U as Encoder >:: Error >
189
190
where
190
191
T : AsyncWrite ,
191
192
U : Encoder ,
192
193
{
193
- let remaining = self . write_buf . capacity ( ) - self . write_buf . len ( ) ;
194
+ let this = self . as_mut ( ) . project ( ) ;
195
+ let remaining = this. write_buf . capacity ( ) - this. write_buf . len ( ) ;
194
196
if remaining < LW {
195
- self . write_buf . reserve ( HW - remaining) ;
197
+ this . write_buf . reserve ( HW - remaining) ;
196
198
}
197
199
198
- self . codec . encode ( item, & mut self . write_buf ) ?;
200
+ this . codec . encode ( item, this . write_buf ) ?;
199
201
Ok ( ( ) )
200
202
}
201
203
@@ -207,21 +209,22 @@ impl<T, U> Framed<T, U> {
207
209
}
208
210
209
211
/// Try to read underlying I/O stream and decode item.
210
- pub fn next_item ( & mut self , cx : & mut Context < ' _ > ) -> Poll < Option < Result < U :: Item , U :: Error > > >
212
+ pub fn next_item ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Result < U :: Item , U :: Error > > >
211
213
where
212
214
T : AsyncRead ,
213
215
U : Decoder ,
214
216
{
215
217
loop {
218
+ let mut this = self . as_mut ( ) . project ( ) ;
216
219
// Repeatedly call `decode` or `decode_eof` as long as it is
217
220
// "readable". Readable is defined as not having returned `None`. If
218
221
// the upstream has returned EOF, and the decoder is no longer
219
222
// readable, it can be assumed that the decoder will never become
220
223
// readable again, at which point the stream is terminated.
221
224
222
- if self . flags . contains ( Flags :: READABLE ) {
223
- if self . flags . contains ( Flags :: EOF ) {
224
- match self . codec . decode_eof ( & mut self . read_buf ) {
225
+ if this . flags . contains ( Flags :: READABLE ) {
226
+ if this . flags . contains ( Flags :: EOF ) {
227
+ match this . codec . decode_eof ( & mut this . read_buf ) {
225
228
Ok ( Some ( frame) ) => return Poll :: Ready ( Some ( Ok ( frame) ) ) ,
226
229
Ok ( None ) => return Poll :: Ready ( None ) ,
227
230
Err ( e) => return Poll :: Ready ( Some ( Err ( e) ) ) ,
@@ -230,7 +233,7 @@ impl<T, U> Framed<T, U> {
230
233
231
234
log:: trace!( "attempting to decode a frame" ) ;
232
235
233
- match self . codec . decode ( & mut self . read_buf ) {
236
+ match this . codec . decode ( & mut this . read_buf ) {
234
237
Ok ( Some ( frame) ) => {
235
238
log:: trace!( "frame decoded from buffer" ) ;
236
239
return Poll :: Ready ( Some ( Ok ( frame) ) ) ;
@@ -239,45 +242,44 @@ impl<T, U> Framed<T, U> {
239
242
_ => ( ) , // Need more data
240
243
}
241
244
242
- self . flags . remove ( Flags :: READABLE ) ;
245
+ this . flags . remove ( Flags :: READABLE ) ;
243
246
}
244
247
245
- debug_assert ! ( !self . flags. contains( Flags :: EOF ) ) ;
248
+ debug_assert ! ( !this . flags. contains( Flags :: EOF ) ) ;
246
249
247
250
// Otherwise, try to read more data and try again. Make sure we've got room
248
- let remaining = self . read_buf . capacity ( ) - self . read_buf . len ( ) ;
251
+ let remaining = this . read_buf . capacity ( ) - this . read_buf . len ( ) ;
249
252
if remaining < LW {
250
- self . read_buf . reserve ( HW - remaining)
253
+ this . read_buf . reserve ( HW - remaining)
251
254
}
252
- let cnt = match unsafe {
253
- Pin :: new_unchecked ( & mut self . io ) . poll_read_buf ( cx, & mut self . read_buf )
254
- } {
255
+ let cnt = match this. io . poll_read_buf ( cx, & mut this. read_buf ) {
255
256
Poll :: Pending => return Poll :: Pending ,
256
257
Poll :: Ready ( Err ( e) ) => return Poll :: Ready ( Some ( Err ( e. into ( ) ) ) ) ,
257
258
Poll :: Ready ( Ok ( cnt) ) => cnt,
258
259
} ;
259
260
260
261
if cnt == 0 {
261
- self . flags . insert ( Flags :: EOF ) ;
262
+ this . flags . insert ( Flags :: EOF ) ;
262
263
}
263
- self . flags . insert ( Flags :: READABLE ) ;
264
+ this . flags . insert ( Flags :: READABLE ) ;
264
265
}
265
266
}
266
267
267
268
/// Flush write buffer to underlying I/O stream.
268
- pub fn flush ( & mut self , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) , U :: Error > >
269
+ pub fn flush ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) , U :: Error > >
269
270
where
270
271
T : AsyncWrite ,
271
272
U : Encoder ,
272
273
{
274
+ let mut this = self . as_mut ( ) . project ( ) ;
273
275
log:: trace!( "flushing framed transport" ) ;
274
276
275
- while !self . write_buf . is_empty ( ) {
276
- log:: trace!( "writing; remaining={}" , self . write_buf. len( ) ) ;
277
+ while !this . write_buf . is_empty ( ) {
278
+ log:: trace!( "writing; remaining={}" , this . write_buf. len( ) ) ;
277
279
278
- let n = ready ! ( unsafe {
279
- Pin :: new_unchecked ( & mut self . io) . poll_write( cx, & self . write_buf)
280
- } ) ?;
280
+ let n = ready ! (
281
+ this . io. as_mut ( ) . poll_write( cx, this . write_buf)
282
+ ) ?;
281
283
282
284
if n == 0 {
283
285
return Poll :: Ready ( Err ( io:: Error :: new (
@@ -288,26 +290,25 @@ impl<T, U> Framed<T, U> {
288
290
}
289
291
290
292
// remove written data
291
- self . write_buf . advance ( n) ;
293
+ this . write_buf . advance ( n) ;
292
294
}
293
295
294
296
// Try flushing the underlying IO
295
- ready ! ( unsafe { Pin :: new_unchecked ( & mut self . io) . poll_flush( cx) } ) ?;
297
+ ready ! ( this . io. poll_flush( cx) ) ?;
296
298
297
299
log:: trace!( "framed transport flushed" ) ;
298
300
Poll :: Ready ( Ok ( ( ) ) )
299
301
}
300
302
301
303
/// Flush write buffer and shutdown underlying I/O stream.
302
- pub fn close ( & mut self , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) , U :: Error > >
304
+ pub fn close ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) , U :: Error > >
303
305
where
304
306
T : AsyncWrite ,
305
307
U : Encoder ,
306
308
{
307
- unsafe {
308
- ready ! ( Pin :: new_unchecked( & mut self . io) . poll_flush( cx) ) ?;
309
- ready ! ( Pin :: new_unchecked( & mut self . io) . poll_shutdown( cx) ) ?;
310
- }
309
+ let mut this = self . as_mut ( ) . project ( ) ;
310
+ ready ! ( this. io. as_mut( ) . poll_flush( cx) ) ?;
311
+ ready ! ( this. io. as_mut( ) . poll_shutdown( cx) ) ?;
311
312
Poll :: Ready ( Ok ( ( ) ) )
312
313
}
313
314
}
@@ -319,7 +320,7 @@ where
319
320
{
320
321
type Item = Result < U :: Item , U :: Error > ;
321
322
322
- fn poll_next ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
323
+ fn poll_next ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
323
324
self . next_item ( cx)
324
325
}
325
326
}
@@ -341,21 +342,21 @@ where
341
342
}
342
343
343
344
fn start_send (
344
- mut self : Pin < & mut Self > ,
345
+ self : Pin < & mut Self > ,
345
346
item : <U as Encoder >:: Item ,
346
347
) -> Result < ( ) , Self :: Error > {
347
348
self . write ( item)
348
349
}
349
350
350
351
fn poll_flush (
351
- mut self : Pin < & mut Self > ,
352
+ self : Pin < & mut Self > ,
352
353
cx : & mut Context < ' _ > ,
353
354
) -> Poll < Result < ( ) , Self :: Error > > {
354
355
self . flush ( cx)
355
356
}
356
357
357
358
fn poll_close (
358
- mut self : Pin < & mut Self > ,
359
+ self : Pin < & mut Self > ,
359
360
cx : & mut Context < ' _ > ,
360
361
) -> Poll < Result < ( ) , Self :: Error > > {
361
362
self . close ( cx)
0 commit comments