Skip to content

Commit b479ce7

Browse files
authored
add experimental async/await support. (#582)
This patch adds experimental async/await support to Tokio. It does this by adding feature flags to existing libs only where necessary in order to add nightly specific code (mostly `Unpin` implementations). It then provides a new crate: `tokio-async-await` which is a shim layer on top of `tokio`. The `tokio-async-await` crate is expected to look exactly like `tokio` does, but with async / await support. This strategy reduces the amount of cfg guarding in the main libraries. This patch also adds `tokio-channel`, which is copied from futures-rs 0.1 and adds the necessary `Unpin` implementations. In general, futures 0.1 is mostly unmaintained, so it will make sense for Tokio to take over maintainership of key components regardless of async / await support.
1 parent 6e45e0a commit b479ce7

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+3736
-3
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ keywords = ["io", "async", "non-blocking", "futures"]
2424

2525
members = [
2626
"./",
27+
"tokio-channel",
2728
"tokio-codec",
2829
"tokio-current-thread",
2930
"tokio-executor",

tokio-async-await/Cargo.toml

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
cargo-features = ["edition", "rename-dependency"]
2+
3+
[package]
4+
name = "tokio-async-await"
5+
edition = "2018"
6+
7+
# When releasing to crates.io:
8+
# - Update html_root_url.
9+
version = "0.1.0"
10+
authors = ["Carl Lerche <[email protected]>"]
11+
license = "MIT"
12+
repository = "https://github.com/tokio-rs/tokio"
13+
homepage = "https://tokio.rs"
14+
documentation = "https://docs.rs/tokio-async-await/0.1.0"
15+
description = """
16+
Experimental async/await support for Tokio
17+
"""
18+
categories = ["asynchronous"]
19+
20+
[workspace]
21+
22+
[lib]
23+
name = "tokio"
24+
25+
[dependencies]
26+
futures = "0.1.23"
27+
tokio_main = { package = "tokio", version = "0.1.7", path = ".." }
28+
tokio-io = { version = "0.1.7", path = "../tokio-io" }
29+
tokio-channel = { version = "0.1.0", path = "../tokio-channel", features = ["async-await-preview"] }
30+
tokio-reactor = { version = "0.1.5", path = "../tokio-reactor", features = ["async-await-preview"] }
31+
futures-core-preview = { version = "0.3.0-alpha.2" }
32+
futures-util-preview = { version = "0.3.0-alpha.2" }
33+
34+
[dev-dependencies]
35+
bytes = "0.4.9"
36+
tokio-codec = { version = "0.1.0", path = "../tokio-codec" }
37+
hyper = "0.12.8"
38+
39+
[patch.crates-io]
40+
tokio = { path = "../" }
41+
tokio-executor = { path = "../tokio-executor" }
42+
tokio-io = { path = "../tokio-io" }
43+
tokio-reactor = { path = "../tokio-reactor" }
44+
tokio-tcp = { path = "../tokio-tcp" }
45+
tokio-timer = { path = "../tokio-timer" }

tokio-async-await/LICENSE

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
Copyright (c) 2018 Tokio Contributors
2+
3+
Permission is hereby granted, free of charge, to any
4+
person obtaining a copy of this software and associated
5+
documentation files (the "Software"), to deal in the
6+
Software without restriction, including without
7+
limitation the rights to use, copy, modify, merge,
8+
publish, distribute, sublicense, and/or sell copies of
9+
the Software, and to permit persons to whom the Software
10+
is furnished to do so, subject to the following
11+
conditions:
12+
13+
The above copyright notice and this permission notice
14+
shall be included in all copies or substantial portions
15+
of the Software.
16+
17+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
18+
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
19+
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
20+
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
21+
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
22+
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
23+
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
24+
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
25+
DEALINGS IN THE SOFTWARE.
26+
27+
Copyright (c) 2016 futures-rs authors
28+
29+
Permission is hereby granted, free of charge, to any
30+
person obtaining a copy of this software and associated
31+
documentation files (the "Software"), to deal in the
32+
Software without restriction, including without
33+
limitation the rights to use, copy, modify, merge,
34+
publish, distribute, sublicense, and/or sell copies of
35+
the Software, and to permit persons to whom the Software
36+
is furnished to do so, subject to the following
37+
conditions:
38+
39+
The above copyright notice and this permission notice
40+
shall be included in all copies or substantial portions
41+
of the Software.
42+
43+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
44+
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
45+
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
46+
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
47+
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
48+
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
49+
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
50+
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
51+
DEALINGS IN THE SOFTWARE.
52+

tokio-async-await/README.md

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
# Tokio async/await preview
2+
3+
This crate provides a preview of Tokio with async / await support. It is a shim
4+
layer on top of `tokio`.
5+
6+
**This crate requires Rust nightly and does not provide API stability
7+
guarantees. You are living on the here.**
8+
9+
## Usage
10+
11+
To use this crate, you need need to start with a Rust 2018 edition crate.
12+
13+
Add this to your `Cargo.toml`:
14+
15+
```toml
16+
# At the very top of the file
17+
cargo-features = ["edition"]
18+
19+
# In the `[packages]` section
20+
edition = "2018"
21+
22+
# In the `[dependencies]` section
23+
tokio-async-await = "0.1.0"
24+
```
25+
26+
Then, get started. In your application, add:
27+
28+
```rust
29+
// The nightly features that are commonly needed with async / await
30+
#![feature(await_macro, async_await, futures_api)]
31+
32+
// This pulls in the `tokio-async-await` crate. While Rust 2018 doesn't require
33+
// `extern crate`, we need to pull in the macros.
34+
#[macro_use]
35+
extern crate tokio;
36+
37+
fn main() {
38+
// And we are async...
39+
tokio::run_async(async {
40+
println!("Hello");
41+
});
42+
}
43+
```
44+
45+
Because nightly is required, run the app with `cargo +nightly run`
46+
47+
Check the [examples](examples) directory for more.
48+
49+
## License
50+
51+
This project is licensed under the [MIT license](LICENSE).
52+
53+
### Contribution
54+
55+
Unless you explicitly state otherwise, any contribution intentionally submitted
56+
for inclusion in Tokio by you, shall be licensed as MIT, without any additional
57+
terms or conditions.

tokio-async-await/examples/chat.rs

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
#![feature(await_macro, async_await, futures_api)]
2+
3+
#[macro_use]
4+
extern crate tokio;
5+
6+
use tokio::codec::{LinesCodec, Decoder};
7+
use tokio::net::{TcpListener, TcpStream};
8+
use tokio::prelude::*;
9+
use tokio::sync::mpsc;
10+
11+
use std::collections::HashMap;
12+
use std::io;
13+
use std::net::SocketAddr;
14+
use std::sync::{Arc, Mutex};
15+
16+
/// Shorthand for the transmit half of the message channel.
17+
type Tx = mpsc::UnboundedSender<String>;
18+
19+
struct Shared {
20+
peers: HashMap<SocketAddr, Tx>,
21+
}
22+
23+
impl Shared {
24+
/// Create a new, empty, instance of `Shared`.
25+
fn new() -> Self {
26+
Shared {
27+
peers: HashMap::new(),
28+
}
29+
}
30+
}
31+
32+
async fn process(stream: TcpStream, state: Arc<Mutex<Shared>>) -> io::Result<()> {
33+
let addr = stream.peer_addr().unwrap();
34+
let mut lines = LinesCodec::new().framed(stream);
35+
36+
// Extract the peer's name
37+
let name = match await!(lines.next()) {
38+
Some(name) => name?,
39+
None => {
40+
// Disconnected early
41+
return Ok(());
42+
}
43+
};
44+
45+
println!("`{}` is joining the chat", name);
46+
47+
let (tx, mut rx) = mpsc::unbounded();
48+
49+
// Register the socket
50+
state.lock().unwrap()
51+
.peers.insert(addr, tx);
52+
53+
// Split the `lines` handle into send and recv handles. This allows spawning
54+
// separate tasks.
55+
let (mut lines_tx, mut lines_rx) = lines.split();
56+
57+
// Spawn a task that receives all lines broadcasted to us from other peers
58+
// and writes it to the client.
59+
tokio::spawn_async(async move {
60+
while let Some(line) = await!(rx.next()) {
61+
let line = line.unwrap();
62+
await!(lines_tx.send_async(line));
63+
}
64+
});
65+
66+
// Use the current task to read lines from the socket and broadcast them to
67+
// other peers.
68+
while let Some(message) = await!(lines_rx.next()) {
69+
// TODO: Error handling
70+
let message = message.unwrap();
71+
72+
let mut line = name.clone();
73+
line.push_str(": ");
74+
line.push_str(&message);
75+
line.push_str("\r\n");
76+
77+
let state = state.lock().unwrap();
78+
79+
for (peer_addr, tx) in &state.peers {
80+
if *peer_addr != addr {
81+
// TODO: Error handling
82+
tx.unbounded_send(line.clone()).unwrap();
83+
}
84+
}
85+
}
86+
87+
// Remove the client from the shared state. Doing so will also result in the
88+
// tx task to terminate.
89+
state.lock().unwrap()
90+
.peers.remove(&addr)
91+
.expect("bug");
92+
93+
Ok(())
94+
}
95+
96+
fn main() {
97+
// Create the shared state. This is how all the peers communicate.
98+
//
99+
// The server task will hold a handle to this. For every new client, the
100+
// `state` handle is cloned and passed into the task that processes the
101+
// client connection.
102+
let state = Arc::new(Mutex::new(Shared::new()));
103+
104+
let addr = "127.0.0.1:6142".parse().unwrap();
105+
106+
// Bind a TCP listener to the socket address.
107+
//
108+
// Note that this is the Tokio TcpListener, which is fully async.
109+
let listener = TcpListener::bind(&addr).unwrap();
110+
111+
println!("server running on localhost:6142");
112+
113+
// Start the Tokio runtime.
114+
tokio::run_async(async move {
115+
let mut incoming = listener.incoming();
116+
117+
while let Some(stream) = await!(incoming.next()) {
118+
let stream = match stream {
119+
Ok(stream) => stream,
120+
Err(_) => continue,
121+
};
122+
123+
let state = state.clone();
124+
125+
tokio::spawn_async(async move {
126+
if let Err(_) = await!(process(stream, state)) {
127+
eprintln!("failed to process connection");
128+
}
129+
});
130+
}
131+
});
132+
}
133+
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
#![feature(await_macro, async_await, futures_api)]
2+
3+
#[macro_use]
4+
extern crate tokio;
5+
6+
use tokio::net::TcpStream;
7+
use tokio::prelude::*;
8+
9+
use std::io;
10+
use std::net::SocketAddr;
11+
12+
const MESSAGES: &[&str] = &[
13+
"hello",
14+
"world",
15+
"one two three",
16+
];
17+
18+
async fn run_client(addr: &SocketAddr) -> io::Result<()> {
19+
let mut stream = await!(TcpStream::connect(addr))?;
20+
21+
// Buffer to read into
22+
let mut buf = [0; 128];
23+
24+
for msg in MESSAGES {
25+
println!(" > write = {:?}", msg);
26+
27+
// Write the message to the server
28+
await!(stream.write_all_async(msg.as_bytes()))?;
29+
30+
// Read the message back from the server
31+
await!(stream.read_exact_async(&mut buf[..msg.len()]))?;
32+
33+
assert_eq!(&buf[..msg.len()], msg.as_bytes());
34+
}
35+
36+
Ok(())
37+
}
38+
39+
fn main() {
40+
use std::env;
41+
42+
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
43+
let addr = addr.parse::<SocketAddr>().unwrap();
44+
45+
// Connect to the echo serveer
46+
47+
tokio::run_async(async move {
48+
match await!(run_client(&addr)) {
49+
Ok(_) => println!("done."),
50+
Err(e) => eprintln!("echo client failed; error = {:?}", e),
51+
}
52+
});
53+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
#![feature(await_macro, async_await)]
2+
3+
#[macro_use]
4+
extern crate tokio;
5+
6+
use tokio::net::{TcpListener, TcpStream};
7+
use tokio::prelude::*;
8+
9+
use std::net::SocketAddr;
10+
11+
fn handle(mut stream: TcpStream) {
12+
tokio::spawn_async(async move {
13+
let mut buf = [0; 1024];
14+
15+
loop {
16+
match await!(stream.read_async(&mut buf)).unwrap() {
17+
0 => break, // Socket closed
18+
n => {
19+
// Send the data back
20+
await!(stream.write_all_async(&buf[0..n])).unwrap();
21+
}
22+
}
23+
}
24+
});
25+
}
26+
27+
fn main() {
28+
use std::env;
29+
30+
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
31+
let addr = addr.parse::<SocketAddr>().unwrap();
32+
33+
// Bind the TCP listener
34+
let listener = TcpListener::bind(&addr).unwrap();
35+
println!("Listening on: {}", addr);
36+
37+
tokio::run_async(async {
38+
let mut incoming = listener.incoming();
39+
40+
while let Some(stream) = await!(incoming.next()) {
41+
let stream = stream.unwrap();
42+
handle(stream);
43+
}
44+
});
45+
}

0 commit comments

Comments
 (0)