Skip to content
This repository was archived by the owner on Sep 5, 2019. It is now read-only.

start tutorial #1

Merged
merged 11 commits into from
Aug 14, 2019
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions src/bin/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
use std::{collections::HashMap, net::ToSocketAddrs};

use futures::{
io::{BufReader, WriteHalf},
stream::FuturesUnordered,
io::{BufReader, WriteHalf, AsyncRead},
stream::{FuturesUnordered, Stream},
channel::mpsc::{self, unbounded},
SinkExt,
select,
Expand Down
178 changes: 178 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
#![feature(async_await)]

use std::{
net::ToSocketAddrs,
sync::Arc,
collections::hash_map::{HashMap, Entry},
};

use futures::{
channel::mpsc,
SinkExt,
select,
};

use async_std::{
io::BufReader,
prelude::*,
task,
net::{TcpListener, TcpStream},
};

type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
type Sender<T> = mpsc::UnboundedSender<T>;
type Receiver<T> = mpsc::UnboundedReceiver<T>;

#[derive(Debug)]
enum Void {}

fn main() -> Result<()> {
task::block_on(server("127.0.0.1:8080"))
}

async fn server(addr: impl ToSocketAddrs) -> Result<()> {
let listener = TcpListener::bind(addr).await?;

let (broker_sender, broker_receiver) = mpsc::unbounded();
let broker = task::spawn(broker(broker_receiver));
let mut incoming = listener.incoming();
while let Some(stream) = incoming.next().await {
let stream = stream?;
println!("Accepting from: {}", stream.peer_addr()?);
spawn_and_log_error(client(broker_sender.clone(), stream));
}
drop(broker_sender);
broker.await;
Ok(())
}

async fn client(mut broker: Sender<Event>, stream: TcpStream) -> Result<()> {
let stream = Arc::new(stream);
let reader = BufReader::new(&*stream);
let mut lines = reader.lines();

let name = match lines.next().await {
None => Err("peer disconnected immediately")?,
Some(line) => line?,
};
let (_shutdown_sender, shutdown_receiver) = mpsc::unbounded::<Void>();
broker.send(Event::NewPeer {
name: name.clone(),
stream: Arc::clone(&stream),
shutdown: shutdown_receiver,
}).await.unwrap();

while let Some(line) = lines.next().await {
let line = line?;
let (dest, msg) = match line.find(':') {
None => continue,
Some(idx) => (&line[..idx], line[idx + 1 ..].trim()),
};
let dest: Vec<String> = dest.split(',').map(|name| name.trim().to_string()).collect();
let msg: String = msg.trim().to_string();

broker.send(Event::Message {
from: name.clone(),
to: dest,
msg,
}).await.unwrap();
}

Ok(())
}

async fn client_writer(
messages: &mut Receiver<String>,
stream: Arc<TcpStream>,
mut shutdown: Receiver<Void>,
) -> Result<()> {
let mut stream = &*stream;
loop {
select! {
msg = messages.next() => match msg {
Some(msg) => stream.write_all(msg.as_bytes()).await?,
None => break,
},
void = shutdown.next() => match void {
Some(void) => match void {},
None => break,
}
}
}
Ok(())
}

#[derive(Debug)]
enum Event {
NewPeer {
name: String,
stream: Arc<TcpStream>,
shutdown: Receiver<Void>,
},
Message {
from: String,
to: Vec<String>,
msg: String,
},
}

async fn broker(mut events: Receiver<Event>) {
let (disconnect_sender, mut disconnect_receiver) =
mpsc::unbounded::<(String, Receiver<String>)>();
let mut peers: HashMap<String, Sender<String>> = HashMap::new();

loop {
let event = select! {
event = events.next() => match event {
None => break,
Some(event) => event,
},
disconnect = disconnect_receiver.next() => {
let (name, _pending_messages) = disconnect.unwrap();
assert!(peers.remove(&name).is_some());
continue;
},
};
match event {
Event::Message { from, to, msg } => {
for addr in to {
if let Some(peer) = peers.get_mut(&addr) {
peer.send(format!("from {}: {}\n", from, msg)).await
.unwrap()
}
}
}
Event::NewPeer { name, stream, shutdown } => {
match peers.entry(name.clone()) {
Entry::Occupied(..) => (),
Entry::Vacant(entry) => {
let (client_sender, mut client_receiver) = mpsc::unbounded();
entry.insert(client_sender);
let mut disconnect_sender = disconnect_sender.clone();
spawn_and_log_error(async move {
let res = client_writer(&mut client_receiver, stream, shutdown).await;
disconnect_sender.send((name, client_receiver)).await
.unwrap();
res
});
}
}
}
}
}
drop(peers);
drop(disconnect_sender);
while let Some((_name, _pending_messages)) = disconnect_receiver.next().await {
}
}

fn spawn_and_log_error<F>(fut: F) -> task::JoinHandle<()>
where
F: Future<Output = Result<()>> + Send + 'static,
{
task::spawn(async move {
if let Err(e) = fut.await {
eprintln!("{}", e)
}
})
}
Loading