diff --git a/doc/tutorial-tasks.md b/doc/tutorial-tasks.md index 7831dc1d80fc0..bed696748306e 100644 --- a/doc/tutorial-tasks.md +++ b/doc/tutorial-tasks.md @@ -236,7 +236,7 @@ Instead we can use a `SharedChan`, a type that allows a single use core::comm::{stream, SharedChan}; let (port, chan) = stream(); -let chan = SharedChan(chan); +let chan = SharedChan::new(chan); for uint::range(0, 3) |init_val| { // Create a new channel handle to distribute to the child task diff --git a/src/compiletest/procsrv.rs b/src/compiletest/procsrv.rs index ca10aa0da7d45..7d78b84dc8005 100644 --- a/src/compiletest/procsrv.rs +++ b/src/compiletest/procsrv.rs @@ -73,7 +73,7 @@ pub fn run(lib_path: ~str, writeclose(pipe_in.out, input); - let p = comm::PortSet(); + let p = comm::PortSet::new(); let ch = p.chan(); do task::spawn_sched(task::SingleThreaded) || { let errput = readclose(pipe_err.in); diff --git a/src/libcore/comm.rs b/src/libcore/comm.rs index f8b046e5b8c4f..fc13463bd1c2a 100644 --- a/src/libcore/comm.rs +++ b/src/libcore/comm.rs @@ -19,6 +19,7 @@ use option::{Option, Some, None}; use uint; use unstable; use vec; +use unstable::Exclusive; use pipes::{recv, try_recv, wait_many, peek, PacketHeader}; @@ -56,49 +57,79 @@ pub trait Peekable { fn peek(&self) -> bool; } -/// Returns the index of an endpoint that is ready to receive. -pub fn selecti(endpoints: &[T]) -> uint { - wait_many(endpoints) -} - -/// Returns 0 or 1 depending on which endpoint is ready to receive -pub fn select2i(a: &A, b: &B) -> - Either<(), ()> { - match wait_many([a.header(), b.header()]) { - 0 => Left(()), - 1 => Right(()), - _ => fail!(~"wait returned unexpected index") - } -} // Streams - Make pipes a little easier in general. -proto! streamp ( +/*proto! streamp ( Open:send { data(T) -> Open } -) +)*/ -#[doc(hidden)] -struct Chan_ { - mut endp: Option> -} +#[allow(non_camel_case_types)] +pub mod streamp { + priv use core::kinds::Owned; -/// An endpoint that can send many messages. -pub enum Chan { - Chan_(Chan_) + pub fn init() -> (client::Open, server::Open) { + pub use core::pipes::HasBuffer; + ::core::pipes::entangle() + } + + #[allow(non_camel_case_types)] + pub enum Open { pub data(T, server::Open), } + + #[allow(non_camel_case_types)] + pub mod client { + priv use core::kinds::Owned; + + #[allow(non_camel_case_types)] + pub fn try_data(pipe: Open, x_0: T) -> + ::core::option::Option> { + { + use super::data; + let (c, s) = ::core::pipes::entangle(); + let message = data(x_0, s); + if ::core::pipes::send(pipe, message) { + ::core::pipes::rt::make_some(c) + } else { ::core::pipes::rt::make_none() } + } + } + + #[allow(non_camel_case_types)] + pub fn data(pipe: Open, x_0: T) -> Open { + { + use super::data; + let (c, s) = ::core::pipes::entangle(); + let message = data(x_0, s); + ::core::pipes::send(pipe, message); + c + } + } + + #[allow(non_camel_case_types)] + pub type Open = ::core::pipes::SendPacket>; + } + + #[allow(non_camel_case_types)] + pub mod server { + priv use core::kinds::Owned; + + #[allow(non_camel_case_types)] + pub type Open = ::core::pipes::RecvPacket>; + } } -struct Port_ { - mut endp: Option>, +/// An endpoint that can send many messages. +pub struct Chan { + mut endp: Option> } /// An endpoint that can receive many messages. -pub enum Port { - Port_(Port_) +pub struct Port { + mut endp: Option>, } -/** Creates a `(chan, port)` pair. +/** Creates a `(Port, Chan)` pair. These allow sending or receiving an unlimited number of messages. @@ -106,96 +137,70 @@ These allow sending or receiving an unlimited number of messages. pub fn stream() -> (Port, Chan) { let (c, s) = streamp::init(); - (Port_(Port_ { endp: Some(s) }), Chan_(Chan_{ endp: Some(c) })) -} - -// Add an inherent method so that imports of GenericChan are not -// required. -pub impl Chan { - fn send(&self, x: T) { chan_send(self, x) } - fn try_send(&self, x: T) -> bool { chan_try_send(self, x) } + (Port { endp: Some(s) }, Chan { endp: Some(c) }) } impl GenericChan for Chan { - fn send(&self, x: T) { chan_send(self, x) } -} - -#[inline(always)] -fn chan_send(self: &Chan, x: T) { - let mut endp = None; - endp <-> self.endp; - self.endp = Some( - streamp::client::data(endp.unwrap(), x)) + #[inline(always)] + fn send(&self, x: T) { + let mut endp = None; + endp <-> self.endp; + self.endp = Some( + streamp::client::data(endp.unwrap(), x)) + } } impl GenericSmartChan for Chan { + #[inline(always)] fn try_send(&self, x: T) -> bool { - chan_try_send(self, x) - } -} - -#[inline(always)] -fn chan_try_send(self: &Chan, x: T) -> bool { - let mut endp = None; - endp <-> self.endp; - match streamp::client::try_data(endp.unwrap(), x) { - Some(next) => { - self.endp = Some(next); - true + let mut endp = None; + endp <-> self.endp; + match streamp::client::try_data(endp.unwrap(), x) { + Some(next) => { + self.endp = Some(next); + true + } + None => false } - None => false } } -// Use an inherent impl so that imports are not required: -pub impl Port { - fn recv(&self) -> T { port_recv(self) } - fn try_recv(&self) -> Option { port_try_recv(self) } - fn peek(&self) -> bool { port_peek(self) } -} - impl GenericPort for Port { - // These two calls will prefer the inherent versions above: - fn recv(&self) -> T { port_recv(self) } - fn try_recv(&self) -> Option { port_try_recv(self) } -} - -#[inline(always)] -fn port_recv(self: &Port) -> T { - let mut endp = None; - endp <-> self.endp; - let streamp::data(x, endp) = recv(endp.unwrap()); - self.endp = Some(endp); - x -} - -#[inline(always)] -fn port_try_recv(self: &Port) -> Option { - let mut endp = None; - endp <-> self.endp; - match try_recv(endp.unwrap()) { - Some(streamp::data(x, endp)) => { - self.endp = Some(endp); - Some(x) + #[inline(always)] + fn recv(&self) -> T { + let mut endp = None; + endp <-> self.endp; + let streamp::data(x, endp) = recv(endp.unwrap()); + self.endp = Some(endp); + x + } + + #[inline(always)] + fn try_recv(&self) -> Option { + let mut endp = None; + endp <-> self.endp; + match try_recv(endp.unwrap()) { + Some(streamp::data(x, endp)) => { + self.endp = Some(endp); + Some(x) + } + None => None } - None => None } } impl Peekable for Port { - fn peek(&self) -> bool { port_peek(self) } -} - -#[inline(always)] -fn port_peek(self: &Port) -> bool { - let mut endp = None; - endp <-> self.endp; - let peek = match &endp { - &Some(ref endp) => peek(endp), - &None => fail!(~"peeking empty stream") - }; - self.endp <-> endp; - peek + #[inline(always)] + fn peek(&self) -> bool { + let mut endp = None; + endp <-> self.endp; + let peek = match &endp { + &Some(ref endp) => peek(endp), + &None => fail!(~"peeking empty stream") + }; + self.endp <-> endp; + peek + } } impl Selectable for Port { @@ -214,20 +219,14 @@ pub struct PortSet { mut ports: ~[Port], } -pub fn PortSet() -> PortSet{ - PortSet { - ports: ~[] - } -} +pub impl PortSet { -// Use an inherent impl so that imports are not required: -pub impl PortSet { - fn recv(&self) -> T { port_set_recv(self) } - fn try_recv(&self) -> Option { port_set_try_recv(self) } - fn peek(&self) -> bool { port_set_peek(self) } -} + fn new() -> PortSet { + PortSet { + ports: ~[] + } + } -pub impl PortSet { fn add(&self, port: Port) { self.ports.push(port) } @@ -240,135 +239,163 @@ pub impl PortSet { } impl GenericPort for PortSet { - fn try_recv(&self) -> Option { port_set_try_recv(self) } - fn recv(&self) -> T { port_set_recv(self) } -} - -#[inline(always)] -fn port_set_recv(self: &PortSet) -> T { - port_set_try_recv(self).expect("port_set: endpoints closed") -} - -#[inline(always)] -fn port_set_try_recv(self: &PortSet) -> Option { - let mut result = None; - // we have to swap the ports array so we aren't borrowing - // aliasable mutable memory. - let mut ports = ~[]; - ports <-> self.ports; - while result.is_none() && ports.len() > 0 { - let i = wait_many(ports); - match ports[i].try_recv() { - Some(m) => { - result = Some(m); - } - None => { - // Remove this port. - let _ = ports.swap_remove(i); + fn try_recv(&self) -> Option { + let mut result = None; + // we have to swap the ports array so we aren't borrowing + // aliasable mutable memory. + let mut ports = ~[]; + ports <-> self.ports; + while result.is_none() && ports.len() > 0 { + let i = wait_many(ports); + match ports[i].try_recv() { + Some(m) => { + result = Some(m); + } + None => { + // Remove this port. + let _ = ports.swap_remove(i); + } } } + ports <-> self.ports; + result + } + fn recv(&self) -> T { + self.try_recv().expect("port_set: endpoints closed") } - ports <-> self.ports; - result } impl Peekable for PortSet { - fn peek(&self) -> bool { port_set_peek(self) } -} - -#[inline(always)] -fn port_set_peek(self: &PortSet) -> bool { - // It'd be nice to use self.port.each, but that version isn't - // pure. - for uint::range(0, vec::uniq_len(&const self.ports)) |i| { - // XXX: Botch pending demuting. - unsafe { - let port: &Port = cast::transmute(&mut self.ports[i]); - if port.peek() { return true } + fn peek(&self) -> bool { + // It'd be nice to use self.port.each, but that version isn't + // pure. + for uint::range(0, vec::uniq_len(&const self.ports)) |i| { + // XXX: Botch pending demuting. + unsafe { + let port: &Port = cast::transmute(&mut self.ports[i]); + if port.peek() { return true } + } } + false } - false } - /// A channel that can be shared between many senders. -pub type SharedChan = unstable::Exclusive>; - -pub impl SharedChan { - fn send(&self, x: T) { shared_chan_send(self, x) } - fn try_send(&self, x: T) -> bool { shared_chan_try_send(self, x) } +pub struct SharedChan { + ch: Exclusive> } -impl GenericChan for SharedChan { - fn send(&self, x: T) { shared_chan_send(self, x) } +impl SharedChan { + /// Converts a `chan` into a `shared_chan`. + pub fn new(c: Chan) -> SharedChan { + SharedChan { ch: unstable::exclusive(c) } + } } -#[inline(always)] -fn shared_chan_send(self: &SharedChan, x: T) { - let mut xx = Some(x); - do self.with_imm |chan| { - let mut x = None; - x <-> xx; - chan.send(x.unwrap()) +impl GenericChan for SharedChan { + fn send(&self, x: T) { + let mut xx = Some(x); + do self.ch.with_imm |chan| { + let mut x = None; + x <-> xx; + chan.send(x.unwrap()) + } } } impl GenericSmartChan for SharedChan { - fn try_send(&self, x: T) -> bool { shared_chan_try_send(self, x) } + fn try_send(&self, x: T) -> bool { + let mut xx = Some(x); + do self.ch.with_imm |chan| { + let mut x = None; + x <-> xx; + chan.try_send(x.unwrap()) + } + } } -#[inline(always)] -fn shared_chan_try_send(self: &SharedChan, x: T) -> bool { - let mut xx = Some(x); - do self.with_imm |chan| { - let mut x = None; - x <-> xx; - chan.try_send(x.unwrap()) +impl ::clone::Clone for SharedChan { + fn clone(&self) -> SharedChan { + SharedChan { ch: self.ch.clone() } } } -/// Converts a `chan` into a `shared_chan`. -pub fn SharedChan(c: Chan) -> SharedChan { - unstable::exclusive(c) -} +/*proto! oneshot ( + Oneshot:send { + send(T) -> ! + } +)*/ + +#[allow(non_camel_case_types)] +pub mod oneshot { + priv use core::kinds::Owned; + + pub fn init() -> (client::Oneshot, server::Oneshot) { + pub use core::pipes::HasBuffer; + + let buffer = + ~::core::pipes::Buffer{ + header: ::core::pipes::BufferHeader(), + data: __Buffer{ + Oneshot: ::core::pipes::mk_packet::>() + }, + }; + do ::core::pipes::entangle_buffer(buffer) |buffer, data| { + { + data.Oneshot.set_buffer(buffer); + ::ptr::addr_of(&(data.Oneshot)) + } + } + } + #[allow(non_camel_case_types)] + pub enum Oneshot { pub send(T), } + #[allow(non_camel_case_types)] + pub struct __Buffer { + Oneshot: ::core::pipes::Packet>, + } -/// Receive a message from one of two endpoints. -pub trait Select2 { - /// Receive a message or return `None` if a connection closes. - fn try_select(&self) -> Either, Option>; - /// Receive a message or fail if a connection closes. - fn select(&self) -> Either; -} + #[allow(non_camel_case_types)] + pub mod client { -impl, - Right: Selectable + GenericPort> - Select2 for (Left, Right) { + priv use core::kinds::Owned; - fn select(&self) -> Either { - match *self { - (ref lp, ref rp) => match select2i(lp, rp) { - Left(()) => Left (lp.recv()), - Right(()) => Right(rp.recv()) - } + #[allow(non_camel_case_types)] + pub fn try_send(pipe: Oneshot, x_0: T) -> + ::core::option::Option<()> { + { + use super::send; + let message = send(x_0); + if ::core::pipes::send(pipe, message) { + ::core::pipes::rt::make_some(()) + } else { ::core::pipes::rt::make_none() } + } } - } - fn try_select(&self) -> Either, Option> { - match *self { - (ref lp, ref rp) => match select2i(lp, rp) { - Left(()) => Left (lp.try_recv()), - Right(()) => Right(rp.try_recv()) - } + #[allow(non_camel_case_types)] + pub fn send(pipe: Oneshot, x_0: T) { + { + use super::send; + let message = send(x_0); + ::core::pipes::send(pipe, message); + } } + + #[allow(non_camel_case_types)] + pub type Oneshot = + ::core::pipes::SendPacketBuffered, + super::__Buffer>; } -} -proto! oneshot ( - Oneshot:send { - send(T) -> ! + #[allow(non_camel_case_types)] + pub mod server { + priv use core::kinds::Owned; + + #[allow(non_camel_case_types)] + pub type Oneshot = + ::core::pipes::RecvPacketBuffered, + super::__Buffer>; } -) +} /// The send end of a oneshot pipe. pub type ChanOne = oneshot::client::Oneshot; @@ -425,6 +452,55 @@ pub fn try_send_one(chan: ChanOne, data: T) oneshot::client::try_send(chan, data).is_some() } + + +/// Returns the index of an endpoint that is ready to receive. +pub fn selecti(endpoints: &[T]) -> uint { + wait_many(endpoints) +} + +/// Returns 0 or 1 depending on which endpoint is ready to receive +pub fn select2i(a: &A, b: &B) -> + Either<(), ()> { + match wait_many([a.header(), b.header()]) { + 0 => Left(()), + 1 => Right(()), + _ => fail!(~"wait returned unexpected index") + } +} + +/// Receive a message from one of two endpoints. +pub trait Select2 { + /// Receive a message or return `None` if a connection closes. + fn try_select(&self) -> Either, Option>; + /// Receive a message or fail if a connection closes. + fn select(&self) -> Either; +} + +impl, + Right: Selectable + GenericPort> + Select2 for (Left, Right) { + + fn select(&self) -> Either { + match *self { + (ref lp, ref rp) => match select2i(lp, rp) { + Left(()) => Left (lp.recv()), + Right(()) => Right(rp.recv()) + } + } + } + + fn try_select(&self) -> Either, Option> { + match *self { + (ref lp, ref rp) => match select2i(lp, rp) { + Left(()) => Left (lp.try_recv()), + Right(()) => Right(rp.try_recv()) + } + } + } +} + #[cfg(test)] mod test { use either::Right; diff --git a/src/libcore/pipes.rs b/src/libcore/pipes.rs index fddb2af558709..36cfdbf5617aa 100644 --- a/src/libcore/pipes.rs +++ b/src/libcore/pipes.rs @@ -83,7 +83,6 @@ bounded and unbounded protocols allows for less code duplication. */ use cast::{forget, reinterpret_cast, transmute}; -use cell::Cell; use either::{Either, Left, Right}; use kinds::Owned; use libc; @@ -93,14 +92,12 @@ use ptr; use task; use vec; -#[doc(hidden)] static SPIN_COUNT: uint = 0; macro_rules! move_it ( { $x:expr } => ( unsafe { let y = *ptr::addr_of(&($x)); y } ) ) -#[doc(hidden)] #[deriving(Eq)] enum State { Empty, @@ -125,7 +122,6 @@ pub fn BufferHeader() -> BufferHeader { } // This is for protocols to associate extra data to thread around. -#[doc(hidden)] pub struct Buffer { header: BufferHeader, data: T, @@ -186,13 +182,11 @@ pub impl PacketHeader { } } -#[doc(hidden)] pub struct Packet { header: PacketHeader, mut payload: Option, } -#[doc(hidden)] pub trait HasBuffer { fn set_buffer(&self, b: *libc::c_void); } @@ -203,14 +197,12 @@ impl HasBuffer for Packet { } } -#[doc(hidden)] pub fn mk_packet() -> Packet { Packet { header: PacketHeader(), payload: None, } } -#[doc(hidden)] fn unibuffer() -> ~Buffer> { let b = ~Buffer { header: BufferHeader(), @@ -226,7 +218,6 @@ fn unibuffer() -> ~Buffer> { b } -#[doc(hidden)] pub fn packet() -> *Packet { let b = unibuffer(); let p = ptr::addr_of(&(b.data)); @@ -235,7 +226,6 @@ pub fn packet() -> *Packet { p } -#[doc(hidden)] pub fn entangle_buffer( buffer: ~Buffer, init: &fn(*libc::c_void, x: &T) -> *Packet) @@ -246,30 +236,6 @@ pub fn entangle_buffer( (SendPacketBuffered(p), RecvPacketBuffered(p)) } -// If I call the rusti versions directly from a polymorphic function, -// I get link errors. This is a bug that needs investigated more. -#[doc(hidden)] -pub fn atomic_xchng_rel(dst: &mut int, src: int) -> int { - unsafe { - intrinsics::atomic_xchg_rel(dst, src) - } -} - -#[doc(hidden)] -pub fn atomic_add_acq(dst: &mut int, src: int) -> int { - unsafe { - intrinsics::atomic_xadd_acq(dst, src) - } -} - -#[doc(hidden)] -pub fn atomic_sub_rel(dst: &mut int, src: int) -> int { - unsafe { - intrinsics::atomic_xsub_rel(dst, src) - } -} - -#[doc(hidden)] pub fn swap_task(dst: &mut *rust_task, src: *rust_task) -> *rust_task { // It might be worth making both acquire and release versions of // this. @@ -278,11 +244,9 @@ pub fn swap_task(dst: &mut *rust_task, src: *rust_task) -> *rust_task { } } -#[doc(hidden)] #[allow(non_camel_case_types)] pub type rust_task = libc::c_void; -#[doc(hidden)] pub mod rustrt { use libc; use super::rust_task; @@ -304,7 +268,6 @@ pub mod rustrt { } } -#[doc(hidden)] fn wait_event(this: *rust_task) -> *libc::c_void { unsafe { let mut event = ptr::null(); @@ -317,21 +280,18 @@ fn wait_event(this: *rust_task) -> *libc::c_void { } } -#[doc(hidden)] fn swap_state_acq(dst: &mut State, src: State) -> State { unsafe { transmute(intrinsics::atomic_xchg_acq(transmute(dst), src as int)) } } -#[doc(hidden)] fn swap_state_rel(dst: &mut State, src: State) -> State { unsafe { transmute(intrinsics::atomic_xchg_rel(transmute(dst), src as int)) } } -#[doc(hidden)] pub unsafe fn get_buffer(p: *PacketHeader) -> ~Buffer { transmute((*p).buf_header()) } @@ -349,7 +309,7 @@ impl ::ops::Drop for BufferResource { let b = move_it!(self.buffer); //let p = ptr::addr_of(*b); //error!("drop %?", p); - let old_count = atomic_sub_rel(&mut b.header.ref_count, 1); + let old_count = intrinsics::atomic_xsub_rel(&mut b.header.ref_count, 1); //let old_count = atomic_xchng_rel(b.header.ref_count, 0); if old_count == 1 { // The new count is 0. @@ -366,7 +326,7 @@ impl ::ops::Drop for BufferResource { fn BufferResource(b: ~Buffer) -> BufferResource { //let p = ptr::addr_of(*b); //error!("take %?", p); - atomic_add_acq(&mut b.header.ref_count, 1); + unsafe { intrinsics::atomic_xadd_acq(&mut b.header.ref_count, 1) }; BufferResource { // tjc: ???? @@ -374,7 +334,6 @@ fn BufferResource(b: ~Buffer) -> BufferResource { } } -#[doc(hidden)] pub fn send(p: SendPacketBuffered, payload: T) -> bool { let header = p.header(); let p_ = p.unwrap(); @@ -551,7 +510,6 @@ pub fn peek(p: &RecvPacketBuffered) -> bool { } } -#[doc(hidden)] fn sender_terminate(p: *Packet) { let p = unsafe { &*p }; match swap_state_rel(&mut p.header.state, Terminated) { @@ -582,7 +540,6 @@ fn sender_terminate(p: *Packet) { } } -#[doc(hidden)] fn receiver_terminate(p: *Packet) { let p = unsafe { &*p }; match swap_state_rel(&mut p.header.state, Terminated) { @@ -670,94 +627,12 @@ pub fn wait_many(pkts: &[T]) -> uint { ready_packet } -/** Receives a message from one of two endpoints. - -The return value is `left` if the first endpoint received something, -or `right` if the second endpoint receives something. In each case, -the result includes the other endpoint as well so it can be used -again. Below is an example of using `select2`. - -~~~ -match select2(a, b) { - left((none, b)) { - // endpoint a was closed. - } - right((a, none)) { - // endpoint b was closed. - } - left((Some(_), b)) { - // endpoint a received a message - } - right(a, Some(_)) { - // endpoint b received a message. - } -} -~~~ - -Sometimes messages will be available on both endpoints at once. In -this case, `select2` may return either `left` or `right`. - -*/ -pub fn select2( - a: RecvPacketBuffered, - b: RecvPacketBuffered) - -> Either<(Option, RecvPacketBuffered), - (RecvPacketBuffered, Option)> -{ - let i = wait_many([a.header(), b.header()]); - - match i { - 0 => Left((try_recv(a), b)), - 1 => Right((a, try_recv(b))), - _ => fail!(~"select2 return an invalid packet") - } -} - -#[doc(hidden)] -pub trait Selectable { - fn header(&self) -> *PacketHeader; -} - -impl Selectable for *PacketHeader { - fn header(&self) -> *PacketHeader { *self } -} - -/// Returns the index of an endpoint that is ready to receive. -pub fn selecti(endpoints: &[T]) -> uint { - wait_many(endpoints) -} - -/// Returns 0 or 1 depending on which endpoint is ready to receive -pub fn select2i(a: &A, b: &B) -> - Either<(), ()> { - match wait_many([a.header(), b.header()]) { - 0 => Left(()), - 1 => Right(()), - _ => fail!(~"wait returned unexpected index") - } -} - -/** Waits on a set of endpoints. Returns a message, its index, and a - list of the remaining endpoints. - -*/ -pub fn select(endpoints: ~[RecvPacketBuffered]) - -> (uint, Option, ~[RecvPacketBuffered]) -{ - let ready = wait_many(endpoints.map(|p| p.header())); - let mut remaining = endpoints; - let port = remaining.swap_remove(ready); - let result = try_recv(port); - (ready, result, remaining) -} - /** The sending end of a pipe. It can be used to send exactly one message. */ pub type SendPacket = SendPacketBuffered>; -#[doc(hidden)] pub fn SendPacket(p: *Packet) -> SendPacket { SendPacketBuffered(p) } @@ -828,7 +703,6 @@ pub impl SendPacketBuffered { /// message. pub type RecvPacket = RecvPacketBuffered>; -#[doc(hidden)] pub fn RecvPacket(p: *Packet) -> RecvPacket { RecvPacketBuffered(p) } @@ -896,55 +770,89 @@ pub fn RecvPacketBuffered(p: *Packet) } } -#[doc(hidden)] pub fn entangle() -> (SendPacket, RecvPacket) { let p = packet(); (SendPacket(p), RecvPacket(p)) } -/** Spawn a task to provide a service. +/** Receives a message from one of two endpoints. + +The return value is `left` if the first endpoint received something, +or `right` if the second endpoint receives something. In each case, +the result includes the other endpoint as well so it can be used +again. Below is an example of using `select2`. + +~~~ +match select2(a, b) { + left((none, b)) { + // endpoint a was closed. + } + right((a, none)) { + // endpoint b was closed. + } + left((Some(_), b)) { + // endpoint a received a message + } + right(a, Some(_)) { + // endpoint b received a message. + } +} +~~~ -It takes an initialization function that produces a send and receive -endpoint. The send endpoint is returned to the caller and the receive -endpoint is passed to the new task. +Sometimes messages will be available on both endpoints at once. In +this case, `select2` may return either `left` or `right`. */ -pub fn spawn_service( - init: extern fn() -> (SendPacketBuffered, - RecvPacketBuffered), - service: ~fn(v: RecvPacketBuffered)) - -> SendPacketBuffered { - let (client, server) = init(); +pub fn select2( + a: RecvPacketBuffered, + b: RecvPacketBuffered) + -> Either<(Option, RecvPacketBuffered), + (RecvPacketBuffered, Option)> +{ + let i = wait_many([a.header(), b.header()]); - // This is some nasty gymnastics required to safely move the pipe - // into a new task. - let server = Cell(server); - do task::spawn { - service(server.take()); + match i { + 0 => Left((try_recv(a), b)), + 1 => Right((a, try_recv(b))), + _ => fail!(~"select2 return an invalid packet") } +} - client +pub trait Selectable { + fn header(&self) -> *PacketHeader; } -/** Like `spawn_service_recv`, but for protocols that start in the -receive state. +impl Selectable for *PacketHeader { + fn header(&self) -> *PacketHeader { *self } +} -*/ -pub fn spawn_service_recv( - init: extern fn() -> (RecvPacketBuffered, - SendPacketBuffered), - service: ~fn(v: SendPacketBuffered)) - -> RecvPacketBuffered { - let (client, server) = init(); +/// Returns the index of an endpoint that is ready to receive. +pub fn selecti(endpoints: &[T]) -> uint { + wait_many(endpoints) +} - // This is some nasty gymnastics required to safely move the pipe - // into a new task. - let server = Cell(server); - do task::spawn { - service(server.take()) +/// Returns 0 or 1 depending on which endpoint is ready to receive +pub fn select2i(a: &A, b: &B) -> + Either<(), ()> { + match wait_many([a.header(), b.header()]) { + 0 => Left(()), + 1 => Right(()), + _ => fail!(~"wait returned unexpected index") } +} + +/** Waits on a set of endpoints. Returns a message, its index, and a + list of the remaining endpoints. - client +*/ +pub fn select(endpoints: ~[RecvPacketBuffered]) + -> (uint, Option, ~[RecvPacketBuffered]) +{ + let ready = wait_many(endpoints.map(|p| p.header())); + let mut remaining = endpoints; + let port = remaining.swap_remove(ready); + let result = try_recv(port); + (ready, result, remaining) } pub mod rt { diff --git a/src/libcore/run.rs b/src/libcore/run.rs index 9e6524c25cb29..8b18cc3c6968c 100644 --- a/src/libcore/run.rs +++ b/src/libcore/run.rs @@ -405,7 +405,7 @@ pub fn program_output(prog: &str, args: &[~str]) -> ProgramOutput { // or the other. FIXME (#2625): Surely there's a much more // clever way to do this. let (p, ch) = stream(); - let ch = SharedChan(ch); + let ch = SharedChan::new(ch); let ch_clone = ch.clone(); do task::spawn_sched(task::SingleThreaded) { let errput = readclose(pipe_err.in); diff --git a/src/libcore/task/mod.rs b/src/libcore/task/mod.rs index 2975f3d5e4660..55546514e4fa5 100644 --- a/src/libcore/task/mod.rs +++ b/src/libcore/task/mod.rs @@ -657,7 +657,7 @@ fn test_cant_dup_task_builder() { #[test] #[ignore(cfg(windows))] fn test_spawn_unlinked_unsup_no_fail_down() { // grandchild sends on a port let (po, ch) = stream(); - let ch = SharedChan(ch); + let ch = SharedChan::new(ch); do spawn_unlinked { let ch = ch.clone(); do spawn_unlinked { @@ -881,7 +881,7 @@ fn test_spawn_sched_no_threads() { #[test] fn test_spawn_sched() { let (po, ch) = stream::<()>(); - let ch = SharedChan(ch); + let ch = SharedChan::new(ch); fn f(i: int, ch: SharedChan<()>) { let parent_sched_id = unsafe { rt::rust_get_sched_id() }; diff --git a/src/libcore/unstable/weak_task.rs b/src/libcore/unstable/weak_task.rs index 6eabb0629d1fc..4e2174fd5d24c 100644 --- a/src/libcore/unstable/weak_task.rs +++ b/src/libcore/unstable/weak_task.rs @@ -69,7 +69,7 @@ fn create_global_service() -> ~WeakTaskService { debug!("creating global weak task service"); let (port, chan) = stream::(); let port = Cell(port); - let chan = SharedChan(chan); + let chan = SharedChan::new(chan); let chan_clone = chan.clone(); do task().unlinked().spawn { diff --git a/src/librustc/rustc.rc b/src/librustc/rustc.rc index 3fbe1b96ef7aa..0e242805e13ae 100644 --- a/src/librustc/rustc.rc +++ b/src/librustc/rustc.rc @@ -307,7 +307,7 @@ bug and need to present an error. pub fn monitor(+f: ~fn(diagnostic::Emitter)) { use core::comm::*; let (p, ch) = stream(); - let ch = SharedChan(ch); + let ch = SharedChan::new(ch); let ch_capture = ch.clone(); match do task::try || { let ch = ch_capture.clone(); diff --git a/src/librustdoc/astsrv.rs b/src/librustdoc/astsrv.rs index 3b905c612a5c6..c4c81b5ec2c70 100644 --- a/src/librustdoc/astsrv.rs +++ b/src/librustdoc/astsrv.rs @@ -69,7 +69,7 @@ fn run(owner: SrvOwner, source: ~str, parse: Parser) -> T { } let srv_ = Srv { - ch: SharedChan(ch) + ch: SharedChan::new(ch) }; let res = owner(srv_.clone()); diff --git a/src/librustdoc/markdown_writer.rs b/src/librustdoc/markdown_writer.rs index fcf7011cbc309..13f598ce70781 100644 --- a/src/librustdoc/markdown_writer.rs +++ b/src/librustdoc/markdown_writer.rs @@ -232,7 +232,7 @@ fn write_file(path: &Path, s: ~str) { pub fn future_writer_factory( ) -> (WriterFactory, Port<(doc::Page, ~str)>) { let (markdown_po, markdown_ch) = stream(); - let markdown_ch = SharedChan(markdown_ch); + let markdown_ch = SharedChan::new(markdown_ch); let writer_factory: WriterFactory = |page| { let (writer_po, writer_ch) = comm::stream(); let markdown_ch = markdown_ch.clone(); diff --git a/src/librustdoc/page_pass.rs b/src/librustdoc/page_pass.rs index c620e20530e66..93e7d8c808cda 100644 --- a/src/librustdoc/page_pass.rs +++ b/src/librustdoc/page_pass.rs @@ -50,7 +50,7 @@ pub fn run( let (result_port, result_chan) = stream(); let (page_port, page_chan) = stream(); - let page_chan = SharedChan(page_chan); + let page_chan = SharedChan::new(page_chan); do task::spawn { result_chan.send(make_doc_from_pages(&page_port)); }; diff --git a/src/libstd/arc.rs b/src/libstd/arc.rs index da1e4688939cc..8abe0262314b5 100644 --- a/src/libstd/arc.rs +++ b/src/libstd/arc.rs @@ -499,7 +499,7 @@ mod tests { let (p, c) = comm::stream(); do task::spawn() || { - let p = comm::PortSet(); + let p = comm::PortSet::new(); c.send(p.chan()); let arc_v = p.recv(); diff --git a/src/libstd/net_ip.rs b/src/libstd/net_ip.rs index e920ff20ac5dc..e58b17b83218f 100644 --- a/src/libstd/net_ip.rs +++ b/src/libstd/net_ip.rs @@ -113,7 +113,7 @@ enum IpGetAddrErr { pub fn get_addr(node: &str, iotask: &iotask) -> result::Result<~[IpAddr], IpGetAddrErr> { let (output_po, output_ch) = stream(); - let mut output_ch = Some(SharedChan(output_ch)); + let mut output_ch = Some(SharedChan::new(output_ch)); do str::as_buf(node) |node_ptr, len| { let output_ch = output_ch.swap_unwrap(); debug!("slice len %?", len); diff --git a/src/libstd/net_tcp.rs b/src/libstd/net_tcp.rs index b32df75063d97..6bf97843fa178 100644 --- a/src/libstd/net_tcp.rs +++ b/src/libstd/net_tcp.rs @@ -150,16 +150,16 @@ pub fn connect(input_ip: ip::IpAddr, port: uint, -> result::Result { unsafe { let (result_po, result_ch) = stream::(); - let result_ch = SharedChan(result_ch); + let result_ch = SharedChan::new(result_ch); let (closed_signal_po, closed_signal_ch) = stream::<()>(); - let closed_signal_ch = SharedChan(closed_signal_ch); + let closed_signal_ch = SharedChan::new(closed_signal_ch); let conn_data = ConnectReqData { result_ch: result_ch, closed_signal_ch: closed_signal_ch }; let conn_data_ptr = ptr::addr_of(&conn_data); let (reader_po, reader_ch) = stream::>(); - let reader_ch = SharedChan(reader_ch); + let reader_ch = SharedChan::new(reader_ch); let stream_handle_ptr = malloc_uv_tcp_t(); *(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = uv::ll::tcp_t(); let socket_data = @TcpSocketData { @@ -517,7 +517,7 @@ pub fn accept(new_conn: TcpNewConnection) server_handle_ptr) as *TcpListenFcData; let (reader_po, reader_ch) = stream::< Result<~[u8], TcpErrData>>(); - let reader_ch = SharedChan(reader_ch); + let reader_ch = SharedChan::new(reader_ch); let iotask = &(*server_data_ptr).iotask; let stream_handle_ptr = malloc_uv_tcp_t(); *(stream_handle_ptr as *mut uv::ll::uv_tcp_t) = @@ -537,7 +537,7 @@ pub fn accept(new_conn: TcpNewConnection) (*client_socket_data_ptr).stream_handle_ptr; let (result_po, result_ch) = stream::>(); - let result_ch = SharedChan(result_ch); + let result_ch = SharedChan::new(result_ch); // UNSAFE LIBUV INTERACTION BEGIN // .. normally this happens within the context of @@ -646,9 +646,9 @@ fn listen_common(host_ip: ip::IpAddr, on_connect_cb: ~fn(*uv::ll::uv_tcp_t)) -> result::Result<(), TcpListenErrData> { let (stream_closed_po, stream_closed_ch) = stream::<()>(); - let stream_closed_ch = SharedChan(stream_closed_ch); + let stream_closed_ch = SharedChan::new(stream_closed_ch); let (kill_po, kill_ch) = stream::>(); - let kill_ch = SharedChan(kill_ch); + let kill_ch = SharedChan::new(kill_ch); let server_stream = uv::ll::tcp_t(); let server_stream_ptr = ptr::addr_of(&server_stream); let server_data: TcpListenFcData = TcpListenFcData { @@ -997,7 +997,7 @@ impl io::Writer for TcpSocketBuf { fn tear_down_socket_data(socket_data: @TcpSocketData) { unsafe { let (closed_po, closed_ch) = stream::<()>(); - let closed_ch = SharedChan(closed_ch); + let closed_ch = SharedChan::new(closed_ch); let close_data = TcpSocketCloseData { closed_ch: closed_ch }; @@ -1147,7 +1147,7 @@ fn write_common_impl(socket_data_ptr: *TcpSocketData, vec::len(raw_write_data)) ]; let write_buf_vec_ptr = ptr::addr_of(&write_buf_vec); let (result_po, result_ch) = stream::(); - let result_ch = SharedChan(result_ch); + let result_ch = SharedChan::new(result_ch); let write_data = WriteReqData { result_ch: result_ch }; @@ -1554,7 +1554,7 @@ mod test { let (server_result_po, server_result_ch) = stream::<~str>(); let (cont_po, cont_ch) = stream::<()>(); - let cont_ch = SharedChan(cont_ch); + let cont_ch = SharedChan::new(cont_ch); // server let hl_loop_clone = hl_loop.clone(); do task::spawn_sched(task::ManualThreads(1u)) { @@ -1592,7 +1592,7 @@ mod test { let expected_resp = ~"pong"; let (cont_po, cont_ch) = stream::<()>(); - let cont_ch = SharedChan(cont_ch); + let cont_ch = SharedChan::new(cont_ch); // server let hl_loop_clone = hl_loop.clone(); do task::spawn_sched(task::ManualThreads(1u)) { @@ -1652,7 +1652,7 @@ mod test { let expected_resp = ~"pong"; let (cont_po, cont_ch) = stream::<()>(); - let cont_ch = SharedChan(cont_ch); + let cont_ch = SharedChan::new(cont_ch); // server let hl_loop_clone = hl_loop.clone(); do task::spawn_sched(task::ManualThreads(1u)) { @@ -1717,7 +1717,7 @@ mod test { let (server_result_po, server_result_ch) = stream::<~str>(); let (cont_po, cont_ch) = stream::<()>(); - let cont_ch = SharedChan(cont_ch); + let cont_ch = SharedChan::new(cont_ch); // server let iotask_clone = iotask.clone(); do task::spawn_sched(task::ManualThreads(1u)) { @@ -1764,7 +1764,7 @@ mod test { let expected_resp = ~"A string\nwith multiple lines\n"; let (cont_po, cont_ch) = stream::<()>(); - let cont_ch = SharedChan(cont_ch); + let cont_ch = SharedChan::new(cont_ch); // server let hl_loop_clone = hl_loop.clone(); do task::spawn_sched(task::ManualThreads(1u)) { @@ -1813,7 +1813,7 @@ mod test { cont_ch: SharedChan<()>, iotask: &IoTask) -> ~str { let (server_po, server_ch) = stream::<~str>(); - let server_ch = SharedChan(server_ch); + let server_ch = SharedChan::new(server_ch); let server_ip_addr = ip::v4::parse_addr(server_ip); let listen_result = listen(server_ip_addr, server_port, 128, iotask, diff --git a/src/libstd/test.rs b/src/libstd/test.rs index ee83a0c9bd652..4ccbf207170e9 100644 --- a/src/libstd/test.rs +++ b/src/libstd/test.rs @@ -446,7 +446,7 @@ fn run_tests(opts: &TestOpts, let mut pending = 0; let (p, ch) = stream(); - let ch = SharedChan(ch); + let ch = SharedChan::new(ch); while pending > 0 || !remaining.is_empty() { while pending < concurrency && !remaining.is_empty() { @@ -797,7 +797,7 @@ mod tests { testfn: DynTestFn(|| f()), }; let (p, ch) = stream(); - let ch = SharedChan(ch); + let ch = SharedChan::new(ch); run_test(false, desc, ch); let (_, res) = p.recv(); assert!(res != TrOk); @@ -815,7 +815,7 @@ mod tests { testfn: DynTestFn(|| f()), }; let (p, ch) = stream(); - let ch = SharedChan(ch); + let ch = SharedChan::new(ch); run_test(false, desc, ch); let (_, res) = p.recv(); assert!(res == TrIgnored); @@ -834,7 +834,7 @@ mod tests { testfn: DynTestFn(|| f()), }; let (p, ch) = stream(); - let ch = SharedChan(ch); + let ch = SharedChan::new(ch); run_test(false, desc, ch); let (_, res) = p.recv(); assert!(res == TrOk); @@ -852,7 +852,7 @@ mod tests { testfn: DynTestFn(|| f()), }; let (p, ch) = stream(); - let ch = SharedChan(ch); + let ch = SharedChan::new(ch); run_test(false, desc, ch); let (_, res) = p.recv(); assert!(res == TrFailed); diff --git a/src/libstd/timer.rs b/src/libstd/timer.rs index c229e72ae5d4a..e862fe6077cb6 100644 --- a/src/libstd/timer.rs +++ b/src/libstd/timer.rs @@ -43,7 +43,7 @@ pub fn delayed_send(iotask: &IoTask, ch: &Chan, val: T) { let (timer_done_po, timer_done_ch) = stream::<()>(); - let timer_done_ch = SharedChan(timer_done_ch); + let timer_done_ch = SharedChan::new(timer_done_ch); let timer = uv::ll::timer_t(); let timer_ptr = ptr::addr_of(&timer); do iotask::interact(iotask) |loop_ptr| { @@ -199,7 +199,7 @@ mod test { #[test] fn test_gl_timer_sleep_stress2() { let (po, ch) = stream(); - let ch = SharedChan(ch); + let ch = SharedChan::new(ch); let hl_loop = &uv::global_loop::get(); let repeat = 20u; diff --git a/src/libstd/uv_global_loop.rs b/src/libstd/uv_global_loop.rs index 3130abdc7cd99..ac762d7d801fe 100644 --- a/src/libstd/uv_global_loop.rs +++ b/src/libstd/uv_global_loop.rs @@ -211,7 +211,7 @@ mod test { #[ignore] fn test_stress_gl_uv_global_loop_high_level_global_timer() { let (exit_po, exit_ch) = stream::<()>(); - let exit_ch = SharedChan(exit_ch); + let exit_ch = SharedChan::new(exit_ch); let cycles = 5000u; for iter::repeat(cycles) { let exit_ch_clone = exit_ch.clone(); diff --git a/src/libstd/uv_iotask.rs b/src/libstd/uv_iotask.rs index 7a9d2438e6ada..c7a78f3891923 100644 --- a/src/libstd/uv_iotask.rs +++ b/src/libstd/uv_iotask.rs @@ -126,7 +126,7 @@ fn run_loop(iotask_ch: &Chan) { // while we dwell in the I/O loop let iotask = IoTask{ async_handle: async_handle, - op_chan: SharedChan(msg_ch) + op_chan: SharedChan::new(msg_ch) }; iotask_ch.send(iotask); @@ -230,7 +230,7 @@ fn impl_uv_iotask_async(iotask: &IoTask) { let (exit_po, exit_ch) = stream::<()>(); let ah_data = AhData { iotask: iotask.clone(), - exit_ch: SharedChan(exit_ch) + exit_ch: SharedChan::new(exit_ch) }; let ah_data_ptr: *AhData = unsafe { ptr::to_unsafe_ptr(&ah_data) @@ -293,7 +293,7 @@ fn test_uv_iotask_async() { // loop lives until, at least, all of the // impl_uv_hl_async() runs have been called, at least. let (work_exit_po, work_exit_ch) = stream::<()>(); - let work_exit_ch = SharedChan(work_exit_ch); + let work_exit_ch = SharedChan::new(work_exit_ch); for iter::repeat(7u) { let iotask_clone = iotask.clone(); let work_exit_ch_clone = work_exit_ch.clone(); diff --git a/src/libstd/uv_ll.rs b/src/libstd/uv_ll.rs index 740ecec001f83..ab3074e49dd68 100644 --- a/src/libstd/uv_ll.rs +++ b/src/libstd/uv_ll.rs @@ -1717,12 +1717,12 @@ mod test { let kill_server_msg = ~"does a dog have buddha nature?"; let server_resp_msg = ~"mu!"; let (client_port, client_chan) = stream::<~str>(); - let client_chan = SharedChan(client_chan); + let client_chan = SharedChan::new(client_chan); let (server_port, server_chan) = stream::<~str>(); - let server_chan = SharedChan(server_chan); + let server_chan = SharedChan::new(server_chan); let (continue_port, continue_chan) = stream::(); - let continue_chan = SharedChan(continue_chan); + let continue_chan = SharedChan::new(continue_chan); let kill_server_msg_copy = copy kill_server_msg; let server_resp_msg_copy = copy server_resp_msg; diff --git a/src/test/bench/msgsend-pipes-shared.rs b/src/test/bench/msgsend-pipes-shared.rs index 50647e8c100dc..dbfd38ccf2624 100644 --- a/src/test/bench/msgsend-pipes-shared.rs +++ b/src/test/bench/msgsend-pipes-shared.rs @@ -58,7 +58,7 @@ fn run(args: &[~str]) { let (from_child, to_parent) = comm::stream(); let (from_parent, to_child) = comm::stream(); - let to_child = SharedChan(to_child); + let to_child = SharedChan::new(to_child); let size = uint::from_str(args[1]).get(); let workers = uint::from_str(args[2]).get(); diff --git a/src/test/bench/msgsend-pipes.rs b/src/test/bench/msgsend-pipes.rs index 77e64818f7e6c..492b13f570869 100644 --- a/src/test/bench/msgsend-pipes.rs +++ b/src/test/bench/msgsend-pipes.rs @@ -53,7 +53,7 @@ fn server(requests: PortSet, responses: Chan) { fn run(args: &[~str]) { let (from_child, to_parent) = stream(); let (from_parent_, to_child) = stream(); - let from_parent = PortSet(); + let from_parent = PortSet::new(); from_parent.add(from_parent_); let size = uint::from_str(args[1]).get(); diff --git a/src/test/bench/pingpong.rs b/src/test/bench/pingpong.rs index 731605e82bd1f..64fb9652ceae0 100644 --- a/src/test/bench/pingpong.rs +++ b/src/test/bench/pingpong.rs @@ -14,7 +14,8 @@ extern mod std; -use core::pipes::{spawn_service, recv}; +use core::cell::Cell; +use core::pipes::*; use std::time::precise_time_s; proto! pingpong ( @@ -70,6 +71,52 @@ macro_rules! follow ( ) ) + +/** Spawn a task to provide a service. + +It takes an initialization function that produces a send and receive +endpoint. The send endpoint is returned to the caller and the receive +endpoint is passed to the new task. + +*/ +pub fn spawn_service( + init: extern fn() -> (SendPacketBuffered, + RecvPacketBuffered), + service: ~fn(v: RecvPacketBuffered)) + -> SendPacketBuffered { + let (client, server) = init(); + + // This is some nasty gymnastics required to safely move the pipe + // into a new task. + let server = Cell(server); + do task::spawn { + service(server.take()); + } + + client +} + +/** Like `spawn_service_recv`, but for protocols that start in the +receive state. + +*/ +pub fn spawn_service_recv( + init: extern fn() -> (RecvPacketBuffered, + SendPacketBuffered), + service: ~fn(v: SendPacketBuffered)) + -> RecvPacketBuffered { + let (client, server) = init(); + + // This is some nasty gymnastics required to safely move the pipe + // into a new task. + let server = Cell(server); + do task::spawn { + service(server.take()) + } + + client +} + fn switch(+endp: core::pipes::RecvPacketBuffered, f: &fn(+v: Option) -> U) -> U { f(core::pipes::try_recv(endp)) diff --git a/src/test/bench/shootout-chameneos-redux.rs b/src/test/bench/shootout-chameneos-redux.rs index a81f7fd76e77b..9dad24646ded2 100644 --- a/src/test/bench/shootout-chameneos-redux.rs +++ b/src/test/bench/shootout-chameneos-redux.rs @@ -137,9 +137,9 @@ fn rendezvous(nn: uint, set: ~[color]) { // these ports will allow us to hear from the creatures let (from_creatures, to_rendezvous) = stream::(); - let to_rendezvous = SharedChan(to_rendezvous); + let to_rendezvous = SharedChan::new(to_rendezvous); let (from_creatures_log, to_rendezvous_log) = stream::<~str>(); - let to_rendezvous_log = SharedChan(to_rendezvous_log); + let to_rendezvous_log = SharedChan::new(to_rendezvous_log); // these channels will be passed to the creatures so they can talk to us diff --git a/src/test/bench/shootout-mandelbrot.rs b/src/test/bench/shootout-mandelbrot.rs index f5d1661fa52bd..4909d05b35b16 100644 --- a/src/test/bench/shootout-mandelbrot.rs +++ b/src/test/bench/shootout-mandelbrot.rs @@ -173,7 +173,7 @@ fn main() { else { uint::from_str(args[1]).get() }; let (pport, pchan) = comm::stream(); - let pchan = comm::SharedChan(pchan); + let pchan = comm::SharedChan::new(pchan); for uint::range(0_u, size) |j| { let cchan = pchan.clone(); do task::spawn { cchan.send(chanmb(j, size, depth)) }; diff --git a/src/test/bench/shootout-pfib.rs b/src/test/bench/shootout-pfib.rs index a90afd418d881..f7bd779a8d89e 100644 --- a/src/test/bench/shootout-pfib.rs +++ b/src/test/bench/shootout-pfib.rs @@ -38,7 +38,7 @@ fn fib(n: int) -> int { } else if n <= 2 { c.send(1); } else { - let p = PortSet(); + let p = PortSet::new(); let ch = p.chan(); task::spawn(|| pfib(ch, n - 1) ); let ch = p.chan(); diff --git a/src/test/bench/task-perf-linked-failure.rs b/src/test/bench/task-perf-linked-failure.rs index d7514320e36d5..90c9d6b33e4ad 100644 --- a/src/test/bench/task-perf-linked-failure.rs +++ b/src/test/bench/task-perf-linked-failure.rs @@ -26,7 +26,7 @@ use core::comm::*; fn grandchild_group(num_tasks: uint) { let (po, ch) = stream(); - let ch = SharedChan(ch); + let ch = SharedChan::new(ch); for num_tasks.times { let ch = ch.clone(); diff --git a/src/test/run-pass/hashmap-memory.rs b/src/test/run-pass/hashmap-memory.rs index 910708b710602..bca4cbafc6cc4 100644 --- a/src/test/run-pass/hashmap-memory.rs +++ b/src/test/run-pass/hashmap-memory.rs @@ -60,7 +60,7 @@ mod map_reduce { pub fn map_reduce(inputs: ~[~str]) { let (ctrl_port, ctrl_chan) = stream(); - let ctrl_chan = SharedChan(ctrl_chan); + let ctrl_chan = SharedChan::new(ctrl_chan); // This task becomes the master control task. It spawns others // to do the rest. diff --git a/src/test/run-pass/pipe-detect-term.rs b/src/test/run-pass/pipe-detect-term.rs index 6afa9e29349f3..bd0ffa6459067 100644 --- a/src/test/run-pass/pipe-detect-term.rs +++ b/src/test/run-pass/pipe-detect-term.rs @@ -18,7 +18,7 @@ extern mod std; use std::timer::sleep; use std::uv; -use core::pipes; +use core::cell::Cell; use core::pipes::{try_recv, recv}; proto! oneshot ( @@ -30,12 +30,14 @@ proto! oneshot ( pub fn main() { let iotask = &uv::global_loop::get(); - pipes::spawn_service(oneshot::init, |p| { - match try_recv(p) { + let (chan, port) = oneshot::init(); + let port = Cell(port); + do spawn { + match try_recv(port.take()) { Some(*) => { fail!() } None => { } } - }); + } sleep(iotask, 100); diff --git a/src/test/run-pass/pipe-select.rs b/src/test/run-pass/pipe-select.rs index 221ecbfcf02e5..12d60c9d6ab01 100644 --- a/src/test/run-pass/pipe-select.rs +++ b/src/test/run-pass/pipe-select.rs @@ -17,8 +17,9 @@ extern mod std; use std::timer::sleep; use std::uv; +use core::cell::Cell; use core::pipes; -use core::pipes::{recv, select}; +use core::pipes::*; proto! oneshot ( waiting:send { @@ -32,13 +33,30 @@ proto! stream ( } ) +pub fn spawn_service( + init: extern fn() -> (SendPacketBuffered, + RecvPacketBuffered), + service: ~fn(v: RecvPacketBuffered)) + -> SendPacketBuffered { + let (client, server) = init(); + + // This is some nasty gymnastics required to safely move the pipe + // into a new task. + let server = Cell(server); + do task::spawn { + service(server.take()); + } + + client +} + pub fn main() { use oneshot::client::*; use stream::client::*; let iotask = &uv::global_loop::get(); - let c = pipes::spawn_service(stream::init, |p| { + let c = spawn_service(stream::init, |p| { error!("waiting for pipes"); let stream::send(x, p) = recv(p); error!("got pipes"); diff --git a/src/test/run-pass/pipe-sleep.rs b/src/test/run-pass/pipe-sleep.rs index 57d72edd0a46a..86ffc96e89aec 100644 --- a/src/test/run-pass/pipe-sleep.rs +++ b/src/test/run-pass/pipe-sleep.rs @@ -13,8 +13,9 @@ extern mod std; use std::timer::sleep; use std::uv; +use core::cell::Cell; use core::pipes; -use core::pipes::recv; +use core::pipes::*; proto! oneshot ( waiting:send { @@ -22,10 +23,35 @@ proto! oneshot ( } ) + +/** Spawn a task to provide a service. + +It takes an initialization function that produces a send and receive +endpoint. The send endpoint is returned to the caller and the receive +endpoint is passed to the new task. + +*/ +pub fn spawn_service( + init: extern fn() -> (SendPacketBuffered, + RecvPacketBuffered), + service: ~fn(v: RecvPacketBuffered)) + -> SendPacketBuffered { + let (client, server) = init(); + + // This is some nasty gymnastics required to safely move the pipe + // into a new task. + let server = Cell(server); + do task::spawn { + service(server.take()); + } + + client +} + pub fn main() { use oneshot::client::*; - let c = pipes::spawn_service(oneshot::init, |p| { recv(p); }); + let c = spawn_service(oneshot::init, |p| { recv(p); }); let iotask = &uv::global_loop::get(); sleep(iotask, 500); diff --git a/src/test/run-pass/task-comm-14.rs b/src/test/run-pass/task-comm-14.rs index 4f0eb7c101f55..8b4855deaa563 100644 --- a/src/test/run-pass/task-comm-14.rs +++ b/src/test/run-pass/task-comm-14.rs @@ -11,7 +11,7 @@ // xfail-fast pub fn main() { - let po = comm::PortSet(); + let po = comm::PortSet::new(); // Spawn 10 tasks each sending us back one int. let mut i = 10; diff --git a/src/test/run-pass/task-comm-3.rs b/src/test/run-pass/task-comm-3.rs index 9d5cb75804408..cf06deb1923a6 100644 --- a/src/test/run-pass/task-comm-3.rs +++ b/src/test/run-pass/task-comm-3.rs @@ -32,7 +32,7 @@ fn test00() { debug!("Creating tasks"); - let po = comm::PortSet(); + let po = comm::PortSet::new(); let mut i: int = 0; diff --git a/src/test/run-pass/task-comm-6.rs b/src/test/run-pass/task-comm-6.rs index af004e0f34d44..67ef5fb190526 100644 --- a/src/test/run-pass/task-comm-6.rs +++ b/src/test/run-pass/task-comm-6.rs @@ -15,7 +15,7 @@ pub fn main() { test00(); } fn test00() { let mut r: int = 0; let mut sum: int = 0; - let p = comm::PortSet(); + let p = comm::PortSet::new(); let c0 = p.chan(); let c1 = p.chan(); let c2 = p.chan(); diff --git a/src/test/run-pass/task-comm-7.rs b/src/test/run-pass/task-comm-7.rs index 980ded6aecc0d..81b4988852559 100644 --- a/src/test/run-pass/task-comm-7.rs +++ b/src/test/run-pass/task-comm-7.rs @@ -23,7 +23,7 @@ fn test00_start(c: comm::Chan, start: int, number_of_messages: int) { fn test00() { let mut r: int = 0; let mut sum: int = 0; - let p = comm::PortSet(); + let p = comm::PortSet::new(); let number_of_messages: int = 10; let c = p.chan(); diff --git a/src/test/run-pass/task-comm-9.rs b/src/test/run-pass/task-comm-9.rs index 1661792c6ac36..a3c8dc554a663 100644 --- a/src/test/run-pass/task-comm-9.rs +++ b/src/test/run-pass/task-comm-9.rs @@ -22,7 +22,7 @@ fn test00_start(c: &comm::Chan, number_of_messages: int) { fn test00() { let r: int = 0; let mut sum: int = 0; - let p = comm::PortSet(); + let p = comm::PortSet::new(); let number_of_messages: int = 10; let ch = p.chan(); diff --git a/src/test/run-pass/unique-send-2.rs b/src/test/run-pass/unique-send-2.rs index b655f86bdbb11..df1c8708acd11 100644 --- a/src/test/run-pass/unique-send-2.rs +++ b/src/test/run-pass/unique-send-2.rs @@ -16,7 +16,7 @@ fn child(c: &SharedChan<~uint>, i: uint) { pub fn main() { let (p, ch) = stream(); - let ch = SharedChan(ch); + let ch = SharedChan::new(ch); let n = 100u; let mut expected = 0u; for uint::range(0u, n) |i| { diff --git a/src/test/run-pass/unwind-resource.rs b/src/test/run-pass/unwind-resource.rs index f76868b28bef4..4c7b2e6370242 100644 --- a/src/test/run-pass/unwind-resource.rs +++ b/src/test/run-pass/unwind-resource.rs @@ -39,7 +39,7 @@ fn f(c: SharedChan) { pub fn main() { let (p, c) = stream(); - let c = SharedChan(c); + let c = SharedChan::new(c); task::spawn_unlinked(|| f(c.clone()) ); error!("hiiiiiiiii"); assert!(p.recv());