Skip to content

KillHandle, BlockedTask, exit code propagation, linked failure #6

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 27 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
1dbcc8b
std: Remove ThreadPerCore spawn mode. Unused
brson Jul 10, 2013
a2943b8
Add Either::expect_{left,right}
bblum Jul 2, 2013
7c36164
Add Option::take_map{,_default}()
bblum Jul 2, 2013
895e45b
Remove redundant Atomic{Ui,I}nt types from unstable::sync
bblum Jul 2, 2013
149ee9d
Add AtomicOption::fill() and AtomicOption::is_empty()
bblum Jul 2, 2013
a740dc1
Reimplement ARC::unwrap() and friends.
bblum Jul 2, 2013
60990e0
Add UnsafeAtomicRcBox::try_unwrap()
bblum Jul 2, 2013
bb6a225
Add KillHandle and implement exit code propagation to replace join_latch
bblum Jul 3, 2013
1788524
Add kill::Death for task death services and use it in Task.
bblum Jul 2, 2013
01cde9e
Remove join_latch
bblum Jul 2, 2013
2e032cb
Add tests for KillHandle
bblum Jul 3, 2013
46eea31
Implement KillHandle::kill() and friends (unkillable, atomically). Cl…
bblum Jul 8, 2013
1568cb4
Do a task-killed check at the start of task 'timeslices'.
bblum Jul 8, 2013
d3c00fb
Add BlockedTask (wake, try_block, etc) in kill.rs.
bblum Jul 11, 2013
92178d9
Change the HOF context switchers to pass a BlockedTask instead of a ~…
bblum Jul 11, 2013
e332956
Add test::with_test_task() convenience function.
bblum Jul 11, 2013
a4933a8
Add tests for task killing and blocking.
bblum Jul 11, 2013
ba944ac
Stash a spare kill flag inside tasks, to save two atomic xadds in the…
bblum Jul 12, 2013
ceaf7df
Add option::take(), the building block of the option::swap_* family.
bblum Jul 13, 2013
7c67f2f
Replace *rust_task ptrs in taskgroup code with TaskHandle, for transi…
bblum Jul 13, 2013
34dec79
(cleanup) Modernize taskgroup code for the new borrow-checker.
bblum Jul 15, 2013
0f42c2b
(cleanup) Don't check taskgroup generation monotonicity unless cfg(te…
bblum Jul 15, 2013
56050a3
(cleanup) impl TaskSet
bblum Jul 15, 2013
0bee115
Fix linked failure tests to block forever instead of looping around y…
bblum Jul 16, 2013
f2c95e7
Enable taskgroup code for newsched spawns.
bblum Jul 15, 2013
63c29d0
Rename TCB to Taskgroup
bblum Jul 16, 2013
55ddbe1
Add watched and indestructible spawn modes.
bblum Jul 16, 2013
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 72 additions & 8 deletions src/libextra/arc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ use std::borrow;

/// As sync::condvar, a mechanism for unlock-and-descheduling and signaling.
pub struct Condvar<'self> {
is_mutex: bool,
failed: &'self mut bool,
cond: &'self sync::Condvar<'self>
priv is_mutex: bool,
priv failed: &'self mut bool,
priv cond: &'self sync::Condvar<'self>
}

impl<'self> Condvar<'self> {
Expand Down Expand Up @@ -108,7 +108,7 @@ impl<'self> Condvar<'self> {
****************************************************************************/

/// An atomically reference counted wrapper for shared immutable state.
pub struct ARC<T> { x: UnsafeAtomicRcBox<T> }
pub struct ARC<T> { priv x: UnsafeAtomicRcBox<T> }

/// Create an atomically reference counted wrapper.
pub fn ARC<T:Freeze + Send>(data: T) -> ARC<T> {
Expand All @@ -123,6 +123,20 @@ impl<T:Freeze+Send> ARC<T> {
pub fn get<'a>(&'a self) -> &'a T {
unsafe { &*self.x.get_immut() }
}

/**
* Retrieve the data back out of the ARC. This function blocks until the
* reference given to it is the last existing one, and then unwrap the data
* instead of destroying it.
*
* If multiple tasks call unwrap, all but the first will fail. Do not call
* unwrap from a task that holds another reference to the same ARC; it is
* guaranteed to deadlock.
*/
pub fn unwrap(self) -> T {
let ARC { x: x } = self;
unsafe { x.unwrap() }
}
}

/**
Expand All @@ -143,9 +157,9 @@ impl<T:Freeze + Send> Clone for ARC<T> {
****************************************************************************/

#[doc(hidden)]
struct MutexARCInner<T> { lock: Mutex, failed: bool, data: T }
struct MutexARCInner<T> { priv lock: Mutex, priv failed: bool, priv data: T }
/// An ARC with mutable data protected by a blocking mutex.
struct MutexARC<T> { x: UnsafeAtomicRcBox<MutexARCInner<T>> }
struct MutexARC<T> { priv x: UnsafeAtomicRcBox<MutexARCInner<T>> }

/// Create a mutex-protected ARC with the supplied data.
pub fn MutexARC<T:Send>(user_data: T) -> MutexARC<T> {
Expand Down Expand Up @@ -225,6 +239,22 @@ impl<T:Send> MutexARC<T> {
cond: cond })
}
}

/**
* Retrieves the data, blocking until all other references are dropped,
* exactly as arc::unwrap.
*
* Will additionally fail if another task has failed while accessing the arc.
*/
pub fn unwrap(self) -> T {
let MutexARC { x: x } = self;
let inner = unsafe { x.unwrap() };
let MutexARCInner { failed: failed, data: data, _ } = inner;
if failed {
fail!(~"Can't unwrap poisoned MutexARC - another task failed inside!");
}
data
}
}

// Common code for {mutex.access,rwlock.write}{,_cond}.
Expand Down Expand Up @@ -268,7 +298,7 @@ fn PoisonOnFail<'r>(failed: &'r mut bool) -> PoisonOnFail {
****************************************************************************/

#[doc(hidden)]
struct RWARCInner<T> { lock: RWlock, failed: bool, data: T }
struct RWARCInner<T> { priv lock: RWlock, priv failed: bool, priv data: T }
/**
* A dual-mode ARC protected by a reader-writer lock. The data can be accessed
* mutably or immutably, and immutably-accessing tasks may run concurrently.
Expand All @@ -278,7 +308,7 @@ struct RWARCInner<T> { lock: RWlock, failed: bool, data: T }
#[mutable] // XXX remove after snap
#[no_freeze]
struct RWARC<T> {
x: UnsafeAtomicRcBox<RWARCInner<T>>,
priv x: UnsafeAtomicRcBox<RWARCInner<T>>,
}

/// Create a reader/writer ARC with the supplied data.
Expand Down Expand Up @@ -429,6 +459,23 @@ impl<T:Freeze + Send> RWARC<T> {
}
}
}

/**
* Retrieves the data, blocking until all other references are dropped,
* exactly as arc::unwrap.
*
* Will additionally fail if another task has failed while accessing the arc
* in write mode.
*/
pub fn unwrap(self) -> T {
let RWARC { x: x, _ } = self;
let inner = unsafe { x.unwrap() };
let RWARCInner { failed: failed, data: data, _ } = inner;
if failed {
fail!(~"Can't unwrap poisoned RWARC - another task failed inside!")
}
data
}
}

// Borrowck rightly complains about immutably aliasing the rwlock in order to
Expand Down Expand Up @@ -611,6 +658,23 @@ mod tests {
}
}
#[test] #[should_fail] #[ignore(cfg(windows))]
pub fn test_mutex_arc_unwrap_poison() {
let arc = MutexARC(1);
let arc2 = ~(&arc).clone();
let (p, c) = comm::stream();
do task::spawn {
unsafe {
do arc2.access |one| {
c.send(());
assert!(*one == 2);
}
}
}
let _ = p.recv();
let one = arc.unwrap();
assert!(one == 1);
}
#[test] #[should_fail] #[ignore(cfg(windows))]
fn test_rw_arc_poison_wr() {
let arc = ~RWARC(1);
let arc2 = (*arc).clone();
Expand Down
32 changes: 26 additions & 6 deletions src/libstd/either.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use kinds::Copy;
use iterator::IteratorUtil;
use result::Result;
use result;
use str::StrSlice;
use vec;
use vec::{OwnedVector, ImmutableVector};

Expand Down Expand Up @@ -121,24 +122,37 @@ pub fn is_right<T, U>(eith: &Either<T, U>) -> bool {
}
}

/// Retrieves the value in the left branch. Fails if the either is Right.
/// Retrieves the value in the left branch.
/// Fails with a specified reason if the either is Right.
#[inline]
pub fn unwrap_left<T,U>(eith: Either<T,U>) -> T {
pub fn expect_left<T,U>(eith: Either<T,U>, reason: &str) -> T {
match eith {
Left(x) => x,
Right(_) => fail!("either::unwrap_left Right")
Right(_) => fail!(reason.to_owned())
}
}

/// Retrieves the value in the right branch. Fails if the either is Left.
/// Retrieves the value in the left branch. Fails if the either is Right.
#[inline]
pub fn unwrap_right<T,U>(eith: Either<T,U>) -> U {
pub fn unwrap_left<T,U>(eith: Either<T,U>) -> T {
expect_left(eith, "either::unwrap_left Right")
}

/// Retrieves the value in the right branch.
/// Fails with a specified reason if the either is Left.
#[inline]
pub fn expect_right<T,U>(eith: Either<T,U>, reason: &str) -> U {
match eith {
Right(x) => x,
Left(_) => fail!("either::unwrap_right Left")
Left(_) => fail!(reason.to_owned())
}
}

/// Retrieves the value in the right branch. Fails if the either is Left.
pub fn unwrap_right<T,U>(eith: Either<T,U>) -> U {
expect_right(eith, "either::unwrap_right Left")
}

impl<T, U> Either<T, U> {
#[inline]
pub fn either<V>(&self, f_left: &fn(&T) -> V, f_right: &fn(&U) -> V) -> V {
Expand All @@ -157,9 +171,15 @@ impl<T, U> Either<T, U> {
#[inline]
pub fn is_right(&self) -> bool { is_right(self) }

#[inline]
pub fn expect_left(self, reason: &str) -> T { expect_left(self, reason) }

#[inline]
pub fn unwrap_left(self) -> T { unwrap_left(self) }

#[inline]
pub fn expect_right(self, reason: &str) -> U { expect_right(self, reason) }

#[inline]
pub fn unwrap_right(self) -> U { unwrap_right(self) }
}
Expand Down
29 changes: 28 additions & 1 deletion src/libstd/option.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,13 @@ impl<T> Option<T> {
match *self { Some(ref mut x) => Some(f(x)), None => None }
}

/// Maps a `some` value from one type to another by a mutable reference,
/// or returns a default value.
#[inline]
pub fn map_mut_default<'a, U>(&'a mut self, def: U, f: &fn(&'a mut T) -> U) -> U {
match *self { Some(ref mut x) => f(x), None => def }
}

/// As `map`, but consumes the option and gives `f` ownership to avoid
/// copying.
#[inline]
Expand All @@ -200,6 +207,26 @@ impl<T> Option<T> {
match self { None => def, Some(v) => f(v) }
}

/// Take the value out of the option, leaving a `None` in its place.
#[inline]
pub fn take(&mut self) -> Option<T> {
util::replace(self, None)
}

/// As `map_consume`, but swaps a None into the original option rather
/// than consuming it by-value.
#[inline]
pub fn take_map<U>(&mut self, blk: &fn(T) -> U) -> Option<U> {
self.take().map_consume(blk)
}

/// As `map_consume_default`, but swaps a None into the original option
/// rather than consuming it by-value.
#[inline]
pub fn take_map_default<U> (&mut self, def: U, blk: &fn(T) -> U) -> U {
self.take().map_consume_default(def, blk)
}

/// Apply a function to the contained value or do nothing
pub fn mutate(&mut self, f: &fn(T) -> T) {
if self.is_some() {
Expand Down Expand Up @@ -295,7 +322,7 @@ impl<T> Option<T> {
#[inline]
pub fn swap_unwrap(&mut self) -> T {
if self.is_none() { fail!("option::swap_unwrap none") }
util::replace(self, None).unwrap()
self.take().unwrap()
}

/**
Expand Down
37 changes: 22 additions & 15 deletions src/libstd/rt/comm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use option::*;
use cast;
use util;
use ops::Drop;
use rt::task::Task;
use rt::kill::BlockedTask;
use kinds::Send;
use rt::sched::Scheduler;
use rt::local::Local;
Expand All @@ -30,13 +30,13 @@ use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable};
use cell::Cell;
use clone::Clone;

/// A combined refcount / ~Task pointer.
/// A combined refcount / BlockedTask-as-uint pointer.
///
/// Can be equal to the following values:
///
/// * 2 - both endpoints are alive
/// * 1 - either the sender or the receiver is dead, determined by context
/// * <ptr> - A pointer to a blocked Task that can be transmuted to ~Task
/// * <ptr> - A pointer to a blocked Task (see BlockedTask::cast_{to,from}_uint)
type State = uint;

static STATE_BOTH: State = 2;
Expand Down Expand Up @@ -137,11 +137,13 @@ impl<T> ChanOne<T> {
}
task_as_state => {
// Port is blocked. Wake it up.
let recvr: ~Task = cast::transmute(task_as_state);
let mut sched = Local::take::<Scheduler>();
rtdebug!("rendezvous send");
sched.metrics.rendezvous_sends += 1;
sched.schedule_task(recvr);
let recvr = BlockedTask::cast_from_uint(task_as_state);
do recvr.wake().map_consume |woken_task| {
let mut sched = Local::take::<Scheduler>();
rtdebug!("rendezvous send");
sched.metrics.rendezvous_sends += 1;
sched.schedule_task(woken_task);
};
}
}
}
Expand Down Expand Up @@ -177,7 +179,7 @@ impl<T> PortOne<T> {
// an acquire barrier to prevent reordering of the subsequent read
// of the payload. Also issues a release barrier to prevent reordering
// of any previous writes to the task structure.
let task_as_state: State = cast::transmute(task);
let task_as_state = task.cast_to_uint();
let oldstate = (*packet).state.swap(task_as_state, SeqCst);
match oldstate {
STATE_BOTH => {
Expand All @@ -193,8 +195,8 @@ impl<T> PortOne<T> {
// NB: We have to drop back into the scheduler event loop here
// instead of switching immediately back or we could end up
// triggering infinite recursion on the scheduler's stack.
let task: ~Task = cast::transmute(task_as_state);
sched.enqueue_task(task);
let recvr = BlockedTask::cast_from_uint(task_as_state);
sched.enqueue_blocked_task(recvr);
}
_ => util::unreachable()
}
Expand Down Expand Up @@ -258,9 +260,11 @@ impl<T> Drop for ChanOneHack<T> {
task_as_state => {
// The port is blocked waiting for a message we will never send. Wake it.
assert!((*this.packet()).payload.is_none());
let recvr: ~Task = cast::transmute(task_as_state);
let sched = Local::take::<Scheduler>();
sched.schedule_task(recvr);
let recvr = BlockedTask::cast_from_uint(task_as_state);
do recvr.wake().map_consume |woken_task| {
let sched = Local::take::<Scheduler>();
sched.schedule_task(woken_task);
};
}
}
}
Expand All @@ -283,7 +287,10 @@ impl<T> Drop for PortOneHack<T> {
let _packet: ~Packet<T> = cast::transmute(this.void_packet);
}
_ => {
util::unreachable()
// This case occurs during unwinding, when the blocked
// receiver was killed awake. The send-end will free it.
// It sure would be nice if we could assert the task is
// failing, but that would need a fragile local borrow.
}
}
}
Expand Down
Loading