Skip to content

Commit 83f79d0

Browse files
committed
attach processes to a slave pty
The tty crate provides a TtyServer type but it's has blocking IO, whereas we need to integrate with Tokio. For this reason, we only use the libc bindings tty-rs provides, and implement the tokio integration ourself. The implementation is mostly taken of tokio-process.
1 parent 79ab4b8 commit 83f79d0

File tree

3 files changed

+230
-119
lines changed

3 files changed

+230
-119
lines changed

Cargo.toml

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,15 @@ version = "0.1.0"
44
authors = ["Corentin Henry <[email protected]>"]
55

66
[dependencies]
7-
tokio-core = "0.1.12"
8-
tokio-io = "0.1.4"
9-
tokio-process = "0.1.5"
107
futures = "0.1.17"
11-
tokio-timer = "0.1.2"
12-
regex = "0.2.5"
8+
libc = "*"
139
log = "0.4.1"
14-
bytes = "0.4.6"
10+
mio = "*"
11+
regex = "0.2.5"
12+
tokio-core = "0.1.12"
13+
tokio-io = "0.1.4"
14+
tokio-proto = "*"
15+
tty = "*"
1516

1617
[dev-dependencies]
1718
env_logger = "0.4.3"

src/lib.rs

Lines changed: 68 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,32 @@
1-
#[macro_use] extern crate log;
2-
extern crate bytes;
31
extern crate futures;
2+
extern crate libc;
3+
#[macro_use]
4+
extern crate log;
5+
extern crate mio;
46
extern crate regex;
57
extern crate tokio_core;
68
extern crate tokio_io;
7-
extern crate tokio_process;
9+
// extern crate pty;
10+
extern crate tty;
811

912
use std::error::Error;
1013
use std::fmt;
11-
use std::io::{self, Cursor, Read};
14+
use std::io::{self, Read, Write};
1215
use std::collections::VecDeque;
13-
use std::process::{Command, Stdio};
16+
use std::process::Command;
1417
use std::time::Duration;
1518

16-
use bytes::Buf;
17-
use bytes::BytesMut;
18-
1919
use futures::{Async, Canceled, Future, Poll, Stream};
2020
use futures::sync::{mpsc, oneshot};
2121

2222
use regex::Regex;
2323

24-
use tokio_process::{Child, ChildStderr, ChildStdin, ChildStdout, CommandExt};
25-
use tokio_io::{codec, AsyncWrite};
2624
use tokio_core::reactor::Handle as TokioHandle;
2725
use tokio_core::reactor::Timeout;
2826

27+
mod pty;
28+
use pty::*;
29+
2930
#[derive(Debug, Clone)]
3031
pub enum Match {
3132
/// Match a regular expression
@@ -56,25 +57,12 @@ impl PartialEq for Match {
5657

5758
struct InputRequest(Vec<u8>, oneshot::Sender<()>);
5859

59-
struct ActiveInputRequest(Cursor<Vec<u8>>, oneshot::Sender<()>);
60-
61-
impl From<InputRequest> for ActiveInputRequest {
62-
fn from(req: InputRequest) -> Self {
63-
ActiveInputRequest(Cursor::new(req.0), req.1)
64-
}
65-
}
66-
6760
pub struct Session {
6861
/// A tokio core handle used to spawn asynchronous tasks.
6962
handle: TokioHandle,
7063

71-
/// process we're interacting with
72-
#[allow(dead_code)]
73-
process: Child,
74-
/// process stdin async writer
75-
stdin: ChildStdin,
7664
/// process stdout async reader
77-
stdout: ChildStdout,
65+
pty: Pty,
7866

7967
/// buffer where we store bytes read from stdout
8068
buffer: Vec<u8>,
@@ -83,7 +71,7 @@ pub struct Session {
8371
input_requests_rx: mpsc::UnboundedReceiver<InputRequest>,
8472
/// FIFO storage for the input requests. Requests are processed one after another, not
8573
/// concurrently.
86-
input_requests: VecDeque<ActiveInputRequest>,
74+
input_requests: VecDeque<InputRequest>,
8775

8876
/// Receiver for the matching requests coming from the expect handle.
8977
match_requests_rx: mpsc::UnboundedReceiver<MatchRequest>,
@@ -134,8 +122,8 @@ pub enum MatchError {
134122
impl Error for MatchError {
135123
fn description(&self) -> &str {
136124
match *self {
137-
MatchError::Eof => "met unexpected EOF while reading stdout",
138-
MatchError::Timeout => "timeout while trying to find matches in stdout",
125+
MatchError::Eof => "met unexpected EOF while reading output",
126+
MatchError::Timeout => "timeout while trying to find matches output",
139127
}
140128
}
141129

@@ -158,51 +146,22 @@ pub struct Handle {
158146
input_requests_tx: mpsc::UnboundedSender<InputRequest>,
159147
}
160148

161-
struct LineCodec;
162-
163-
impl codec::Decoder for LineCodec {
164-
type Item = String;
165-
type Error = io::Error;
166-
167-
fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<String>, io::Error> {
168-
if let Some(n) = buf.as_ref().iter().position(|b| *b == b'\n') {
169-
let line = buf.split_to(n);
170-
buf.split_to(1);
171-
return match ::std::str::from_utf8(line.as_ref()) {
172-
Ok(s) => Ok(Some(s.to_string())),
173-
Err(_) => Err(io::Error::new(io::ErrorKind::Other, "invalid string")),
174-
};
175-
}
176-
Ok(None)
177-
}
178-
}
179-
180-
pub struct Stderr(codec::FramedRead<ChildStderr, LineCodec>);
181-
182-
impl Stderr {
183-
fn new(stderr: ChildStderr) -> Self {
184-
Stderr(codec::FramedRead::new(stderr, LineCodec {}))
185-
}
186-
}
187-
188-
impl Stream for Stderr {
189-
type Item = String;
190-
type Error = io::Error;
191-
192-
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
193-
self.0.poll()
194-
}
195-
}
196-
197149
impl Handle {
198150
pub fn send(&self, bytes: Vec<u8>) -> Box<Future<Item = (), Error = Canceled>> {
199151
let handle = self.clone();
200152
let (response_tx, response_rx) = oneshot::channel::<()>();
201-
handle.input_requests_tx.unbounded_send(InputRequest(bytes, response_tx)).unwrap();
153+
handle
154+
.input_requests_tx
155+
.unbounded_send(InputRequest(bytes, response_tx))
156+
.unwrap();
202157
Box::new(response_rx)
203158
}
204159

205-
pub fn expect(&mut self, matches: Vec<Match>, timeout: Option<Duration>) -> Box<Future<Item = MatchOutcome, Error = ()>> {
160+
pub fn expect(
161+
&mut self,
162+
matches: Vec<Match>,
163+
timeout: Option<Duration>,
164+
) -> Box<Future<Item = MatchOutcome, Error = ()>> {
206165
let (response_tx, response_rx) = oneshot::channel::<MatchOutcome>();
207166
let request = MatchRequest {
208167
matches,
@@ -215,30 +174,24 @@ impl Handle {
215174
}
216175
}
217176

177+
// TODO:
178+
//
179+
// - Make stdin evented PollEvented?
180+
// - Configure stding with Termios so that it's non-blocking. For other options (echo, nowait,
181+
// etc.) see pexpect.
182+
//
183+
// NOTE: the man page of termios says it is unspecified whether e O_NONBLOCK file status flag takes
184+
// precedence over the MIN and TIME settings.
185+
218186
impl Session {
219-
pub fn spawn(cmd: &mut Command, handle: TokioHandle) -> Result<Handle, ()> {
187+
pub fn spawn(cmd: Command, handle: &TokioHandle) -> Result<Handle, ()> {
220188
debug!("spawning new command {:?}", cmd);
221-
let mut process = cmd
222-
.stdout(Stdio::piped())
223-
.stdin(Stdio::piped())
224-
.stderr(Stdio::piped())
225-
.env("RUST_BACKTRACE", "1")
226-
.spawn_async(&handle.clone())
227-
.unwrap();
228-
229189
let (input_tx, input_rx) = mpsc::unbounded::<InputRequest>();
230190
let (match_tx, match_rx) = mpsc::unbounded::<MatchRequest>();
231-
232-
handle.clone().spawn(
233-
Stderr::new(process.stderr().take().unwrap()).for_each(|msg| {
234-
error!("spawned process stderr: {}", msg);
235-
Ok(())
236-
}).map_err(|_| ()));
237-
191+
let mut pty = Pty::new::<::std::fs::File>(None, handle).unwrap();
192+
let mut _child = pty.spawn(cmd).unwrap();
238193
let session = Session {
239-
stdout: process.stdout().take().unwrap(),
240-
stdin: process.stdin().take().unwrap(),
241-
process: process,
194+
pty: pty,
242195
handle: handle.clone(),
243196
buffer: Vec::new(),
244197
input_requests_rx: input_rx,
@@ -247,35 +200,36 @@ impl Session {
247200
match_requests: VecDeque::new(),
248201
};
249202
handle.spawn(session);
250-
251203
Ok(Handle {
252-
match_requests_tx: match_tx,
253-
input_requests_tx: input_tx,
204+
match_requests_tx: match_tx.clone(),
205+
input_requests_tx: input_tx.clone(),
254206
})
255207
}
256208

257-
fn poll_pending_input(req: &mut ActiveInputRequest, stdin: &mut ChildStdin) -> Poll<(), io::Error> {
209+
fn poll_pending_input<W: Write>(req: &mut InputRequest, input: &mut W) -> Poll<(), io::Error> {
258210
debug!("processing pending input request");
211+
let mut size = 0;
259212
loop {
260-
match stdin.write_buf(&mut req.0) {
261-
Ok(Async::Ready(_)) => {
262-
if !req.0.has_remaining() {
213+
match input.write(&req.0[size..]) {
214+
Ok(i) => {
215+
size += i;
216+
if size == req.0.len() {
263217
return Ok(Async::Ready(()));
264218
}
265219
// FIXME: do we need to check if we wrote 0 bytes to avoid infinite looping?
266220
continue;
267221
}
268-
// Cannot happen according to the docs https://docs.rs/tokio-io/0.1.4/tokio_io/trait.AsyncWrite.html
269-
Ok(Async::NotReady) => unreachable!(),
270222
Err(e) => {
271223
if e.kind() == io::ErrorKind::WouldBlock {
272-
return Ok(Async::NotReady);
224+
break;
273225
} else {
274226
return Err(e);
275227
}
276228
}
277229
}
278230
}
231+
req.0 = req.0.split_off(size);
232+
Ok(Async::NotReady)
279233
}
280234

281235
fn poll_pending_match(
@@ -286,7 +240,10 @@ impl Session {
286240
debug!("checking pending match request: {:?}", req);
287241

288242
if let Some((match_index, match_start, match_end)) = Self::try_match(req, buffer) {
289-
debug!("found match (mach index {}, match range: [{}, {}]", match_index, match_start, match_end);
243+
debug!(
244+
"found match (mach index {}, match range: [{}, {}]",
245+
match_index, match_start, match_end
246+
);
290247
let match_buf = Self::extract_match(buffer, match_start, match_end);
291248
return Ok(Async::Ready(Ok((match_index, match_buf))));
292249
}
@@ -331,17 +288,15 @@ impl Session {
331288
Ok(Async::NotReady)
332289
}
333290

334-
fn read_stdout(&mut self) -> Result<usize, io::Error> {
335-
debug!("reading from stdout");
291+
fn read_output(&mut self) -> Result<usize, io::Error> {
292+
debug!("reading from pty");
336293
let mut buf = [0; 4096];
337294
let mut size = 0;
338295
loop {
339-
// read is supposed to be sync, but since ChildStdout implement AsyncRead, it's async.
340-
// See the doc about that: https://docs.rs/tokio-io/0.1.2/tokio_io/trait.AsyncRead.html
341-
match self.stdout.read(&mut buf[..]) {
296+
match self.pty.read(&mut buf[..]) {
342297
Ok(i) => {
343298
if i == 0 {
344-
warn!("met EOF while reading stdout");
299+
warn!("met EOF while reading from pty");
345300
return Err(io::ErrorKind::UnexpectedEof.into());
346301
}
347302
size += i;
@@ -350,7 +305,7 @@ impl Session {
350305
}
351306
Err(e) => {
352307
if e.kind() == io::ErrorKind::WouldBlock {
353-
debug!("done reading from stdout");
308+
debug!("done reading from pty");
354309
debug!("buffer so far: {}", String::from_utf8_lossy(&self.buffer));
355310
return Ok(size);
356311
}
@@ -389,7 +344,7 @@ impl Session {
389344
loop {
390345
match self.input_requests_rx.poll() {
391346
Ok(Async::Ready(Some(req))) => {
392-
self.input_requests.push_back(req.into());
347+
self.input_requests.push_back(req);
393348
debug!("got input request");
394349
}
395350
Ok(Async::Ready(None)) => {
@@ -428,21 +383,24 @@ impl Session {
428383
}
429384

430385
fn extract_match(buffer: &mut Vec<u8>, start: usize, end: usize) -> Vec<u8> {
386+
let new_buf = buffer.split_off(end);
431387
let matched = buffer.split_off(start);
432-
let _ = buffer.split_off(end - start);
433-
matched
388+
// buffer now contains what we want to return
389+
let ret = buffer.clone();
390+
*buffer = new_buf;
391+
ret
434392
}
435393

436394
fn process_input(&mut self) {
437395
debug!("processing input requests");
438396
let Session {
439397
ref mut input_requests,
440-
ref mut stdin,
398+
ref mut pty,
441399
..
442400
} = *self;
443401
let mut n_requests_processed = 0;
444402
while let Some(req) = input_requests.get_mut(n_requests_processed) {
445-
match Self::poll_pending_input(req, stdin) {
403+
match Self::poll_pending_input(req, pty) {
446404
Ok(Async::Ready(())) => {
447405
debug!("processed input request");
448406
n_requests_processed += 1;
@@ -459,13 +417,13 @@ impl Session {
459417

460418
fn process_matches(&mut self) -> Result<(), io::Error> {
461419
let mut eof = false;
462-
match self.read_stdout() {
420+
match self.read_output() {
463421
Ok(i) => {
464-
debug!("read {} bytes from stdout", i);
422+
debug!("read {} bytes from pty", i);
465423
}
466424
Err(e) => {
467425
if e.kind() == io::ErrorKind::UnexpectedEof {
468-
debug!("EOF in stdout");
426+
debug!("EOF in pty");
469427
eof = true;
470428
} else {
471429
return Err(e);
@@ -505,9 +463,6 @@ impl Future for Session {
505463
self.get_match_requests().unwrap();
506464
self.process_input();
507465
self.process_matches().unwrap();
508-
509-
510-
511466
Ok(Async::NotReady)
512467
}
513468
}

0 commit comments

Comments
 (0)