diff --git a/src/libstd/macros.rs b/src/libstd/macros.rs index fda48b6ffb7d9..b01bd8f993c01 100644 --- a/src/libstd/macros.rs +++ b/src/libstd/macros.rs @@ -38,16 +38,18 @@ macro_rules! rtassert ( } ) ) + +// The do_abort function was originally inside the abort macro, but +// this was ICEing the compiler so it has been moved outside. Now this +// seems to work? +pub fn do_abort() -> ! { + unsafe { ::libc::abort(); } +} + macro_rules! abort( ($( $msg:expr),+) => ( { rtdebug!($($msg),+); - - do_abort(); - - // NB: This is in a fn to avoid putting the `unsafe` block in a macro, - // which causes spurious 'unnecessary unsafe block' warnings. - fn do_abort() -> ! { - unsafe { ::libc::abort(); } - } + ::macros::do_abort(); } ) ) + diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs index b00df78f433d7..88c7b9a2bf268 100644 --- a/src/libstd/rt/comm.rs +++ b/src/libstd/rt/comm.rs @@ -120,13 +120,13 @@ impl ChanOne { match oldstate { STATE_BOTH => { // Port is not waiting yet. Nothing to do - do Local::borrow:: |sched| { + do Local::borrow:: |sched| { rtdebug!("non-rendezvous send"); sched.metrics.non_rendezvous_sends += 1; } } STATE_ONE => { - do Local::borrow:: |sched| { + do Local::borrow:: |sched| { rtdebug!("rendezvous send"); sched.metrics.rendezvous_sends += 1; } diff --git a/src/libstd/rt/local.rs b/src/libstd/rt/local.rs index e6988c538881a..6e0fbda5ec9a7 100644 --- a/src/libstd/rt/local.rs +++ b/src/libstd/rt/local.rs @@ -18,7 +18,7 @@ pub trait Local { fn put(value: ~Self); fn take() -> ~Self; fn exists() -> bool; - fn borrow(f: &fn(&mut Self)); + fn borrow(f: &fn(&mut Self) -> T) -> T; unsafe fn unsafe_borrow() -> *mut Self; unsafe fn try_unsafe_borrow() -> Option<*mut Self>; } @@ -27,7 +27,20 @@ impl Local for Scheduler { fn put(value: ~Scheduler) { unsafe { local_ptr::put(value) }} fn take() -> ~Scheduler { unsafe { local_ptr::take() } } fn exists() -> bool { local_ptr::exists() } - fn borrow(f: &fn(&mut Scheduler)) { unsafe { local_ptr::borrow(f) } } + fn borrow(f: &fn(&mut Scheduler) -> T) -> T { + let mut res: Option = None; + let res_ptr: *mut Option = &mut res; + unsafe { + do local_ptr::borrow |sched| { + let result = f(sched); + *res_ptr = Some(result); + } + } + match res { + Some(r) => { r } + None => abort!("function failed!") + } + } unsafe fn unsafe_borrow() -> *mut Scheduler { local_ptr::unsafe_borrow() } unsafe fn try_unsafe_borrow() -> Option<*mut Scheduler> { abort!("unimpl") } } @@ -36,8 +49,8 @@ impl Local for Task { fn put(_value: ~Task) { abort!("unimpl") } fn take() -> ~Task { abort!("unimpl") } fn exists() -> bool { abort!("unimpl") } - fn borrow(f: &fn(&mut Task)) { - do Local::borrow:: |sched| { + fn borrow(f: &fn(&mut Task) -> T) -> T { + do Local::borrow:: |sched| { match sched.current_task { Some(~ref mut task) => { f(&mut *task.task) @@ -74,7 +87,7 @@ impl Local for IoFactoryObject { fn put(_value: ~IoFactoryObject) { abort!("unimpl") } fn take() -> ~IoFactoryObject { abort!("unimpl") } fn exists() -> bool { abort!("unimpl") } - fn borrow(_f: &fn(&mut IoFactoryObject)) { abort!("unimpl") } + fn borrow(_f: &fn(&mut IoFactoryObject) -> T) -> T { abort!("unimpl") } unsafe fn unsafe_borrow() -> *mut IoFactoryObject { let sched = Local::unsafe_borrow::(); let io: *mut IoFactoryObject = (*sched).event_loop.io().unwrap(); @@ -115,4 +128,16 @@ mod test { } let _scheduler: ~Scheduler = Local::take(); } + + #[test] + fn borrow_with_return() { + let scheduler = ~new_test_uv_sched(); + Local::put(scheduler); + let res = do Local::borrow:: |_sched| { + true + }; + assert!(res) + let _scheduler: ~Scheduler = Local::take(); + } + } diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index caf3e15e535af..3198b2858763a 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -208,7 +208,7 @@ pub fn context() -> RuntimeContext { } else { if Local::exists::() { let context = ::cell::empty_cell(); - do Local::borrow:: |sched| { + do Local::borrow:: |sched| { if sched.in_task_context() { context.put_back(TaskContext); } else { diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index df231f6d88aec..3b8a31d1840b3 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -26,6 +26,11 @@ use rt::local::Local; use rt::rtio::RemoteCallback; use rt::metrics::SchedMetrics; +//use to_str::ToStr; + +/// To allow for using pointers as scheduler ids +use ptr::{to_uint}; + /// The Scheduler is responsible for coordinating execution of Coroutines /// on a single thread. When the scheduler is running it is owned by /// thread local storage and the running task is owned by the @@ -65,12 +70,15 @@ pub struct Scheduler { /// An action performed after a context switch on behalf of the /// code running before the context switch priv cleanup_job: Option, - metrics: SchedMetrics + metrics: SchedMetrics, + /// Should this scheduler run any task, or only pinned tasks? + run_anything: bool } pub struct SchedHandle { priv remote: ~RemoteCallbackObject, - priv queue: MessageQueue + priv queue: MessageQueue, + sched_id: uint } pub struct Coroutine { @@ -81,12 +89,20 @@ pub struct Coroutine { /// the task is dead priv saved_context: Context, /// The heap, GC, unwinding, local storage, logging - task: ~Task + task: ~Task, +} + +// A scheduler home is either a handle to the home scheduler, or an +// explicit "AnySched". +pub enum SchedHome { + AnySched, + Sched(SchedHandle) } pub enum SchedMessage { Wake, - Shutdown + Shutdown, + PinnedTask(~Coroutine) } enum CleanupJob { @@ -96,6 +112,8 @@ enum CleanupJob { pub impl Scheduler { + pub fn sched_id(&self) -> uint { to_uint(self) } + fn in_task_context(&self) -> bool { self.current_task.is_some() } fn new(event_loop: ~EventLoopObject, @@ -103,6 +121,16 @@ pub impl Scheduler { sleeper_list: SleeperList) -> Scheduler { + Scheduler::new_special(event_loop, work_queue, sleeper_list, true) + + } + + fn new_special(event_loop: ~EventLoopObject, + work_queue: WorkQueue<~Coroutine>, + sleeper_list: SleeperList, + run_anything: bool) + -> Scheduler { + // Lazily initialize the runtime TLS key local_ptr::init_tls_key(); @@ -117,7 +145,8 @@ pub impl Scheduler { saved_context: Context::empty(), current_task: None, cleanup_job: None, - metrics: SchedMetrics::new() + metrics: SchedMetrics::new(), + run_anything: run_anything } } @@ -147,11 +176,13 @@ pub impl Scheduler { (*event_loop).run(); } + rtdebug!("run taking sched"); let sched = Local::take::(); // XXX: Reenable this once we're using a per-task queue. With a shared // queue this is not true //assert!(sched.work_queue.is_empty()); - rtdebug!("scheduler metrics: %s\n", sched.metrics.to_str()); +// let out = sched.metrics.to_str(); +// rtdebug!("scheduler metrics: %s\n", out); return sched; } @@ -167,6 +198,7 @@ pub impl Scheduler { if sched.interpret_message_queue() { // We performed a scheduling action. There may be other work // to do yet, so let's try again later. + rtdebug!("run_sched_once, interpret_message_queue taking sched"); let mut sched = Local::take::(); sched.metrics.messages_received += 1; sched.event_loop.callback(Scheduler::run_sched_once); @@ -175,6 +207,7 @@ pub impl Scheduler { } // Now, look in the work queue for tasks to run + rtdebug!("run_sched_once taking"); let sched = Local::take::(); if sched.resume_task_from_queue() { // We performed a scheduling action. There may be other work @@ -209,33 +242,56 @@ pub impl Scheduler { return SchedHandle { remote: remote, - queue: self.message_queue.clone() + queue: self.message_queue.clone(), + sched_id: self.sched_id() }; } /// Schedule a task to be executed later. /// - /// Pushes the task onto the work stealing queue and tells the event loop - /// to run it later. Always use this instead of pushing to the work queue - /// directly. - fn enqueue_task(&mut self, task: ~Coroutine) { - self.work_queue.push(task); - self.event_loop.callback(Scheduler::run_sched_once); - - // We've made work available. Notify a sleeping scheduler. - // XXX: perf. Check for a sleeper without synchronizing memory. - // It's not critical that we always find it. - // XXX: perf. If there's a sleeper then we might as well just send - // it the task directly instead of pushing it to the - // queue. That is essentially the intent here and it is less - // work. - match self.sleeper_list.pop() { + /// Pushes the task onto the work stealing queue and tells the + /// event loop to run it later. Always use this instead of pushing + /// to the work queue directly. + + fn enqueue_task(&mut self, mut task: ~Coroutine) { + + // We don't want to queue tasks that belong on other threads, + // so we send them home at enqueue time. + + // The borrow checker doesn't like our disassembly of the + // Coroutine struct and partial use and mutation of the + // fields. So completely disassemble here and stop using? + + // XXX perf: I think we might be able to shuffle this code to + // only destruct when we need to. + + rtdebug!("a task was queued on: %u", self.sched_id()); + + let this = self; + + // We push the task onto our local queue clone. + this.work_queue.push(task); + this.event_loop.callback(Scheduler::run_sched_once); + + // We've made work available. Notify a + // sleeping scheduler. + + // XXX: perf. Check for a sleeper without + // synchronizing memory. It's not critical + // that we always find it. + + // XXX: perf. If there's a sleeper then we + // might as well just send it the task + // directly instead of pushing it to the + // queue. That is essentially the intent here + // and it is less work. + match this.sleeper_list.pop() { Some(handle) => { let mut handle = handle; handle.send(Wake) } - None => (/* pass */) - } + None => { (/* pass */) } + }; } // * Scheduler-context operations @@ -247,6 +303,15 @@ pub impl Scheduler { let mut this = self; match this.message_queue.pop() { + Some(PinnedTask(task)) => { + rtdebug!("recv BiasedTask message in sched: %u", + this.sched_id()); + let mut task = task; + task.task.home = Some(Sched(this.make_handle())); + this.resume_task_immediately(task); + return true; + } + Some(Wake) => { rtdebug!("recv Wake message"); this.sleepy = false; @@ -256,8 +321,9 @@ pub impl Scheduler { Some(Shutdown) => { rtdebug!("recv Shutdown message"); if this.sleepy { - // There may be an outstanding handle on the sleeper list. - // Pop them all to make sure that's not the case. + // There may be an outstanding handle on the + // sleeper list. Pop them all to make sure that's + // not the case. loop { match this.sleeper_list.pop() { Some(handle) => { @@ -268,8 +334,8 @@ pub impl Scheduler { } } } - // No more sleeping. After there are no outstanding event loop - // references we will shut down. + // No more sleeping. After there are no outstanding + // event loop references we will shut down. this.no_sleep = true; this.sleepy = false; Local::put(this); @@ -282,23 +348,93 @@ pub impl Scheduler { } } + /// Given an input Coroutine sends it back to its home scheduler. + fn send_task_home(task: ~Coroutine) { + let mut task = task; + let mut home = task.task.home.swap_unwrap(); + match home { + Sched(ref mut home_handle) => { + home_handle.send(PinnedTask(task)); + } + AnySched => { + abort!("error: cannot send anysched task home"); + } + } + } + + // Resume a task from the queue - but also take into account that + // it might not belong here. fn resume_task_from_queue(~self) -> bool { assert!(!self.in_task_context()); rtdebug!("looking in work queue for task to schedule"); - let mut this = self; + + // The borrow checker imposes the possibly absurd requirement + // that we split this into two match expressions. This is due + // to the inspection of the internal bits of task, as that + // can't be in scope when we act on task. match this.work_queue.pop() { Some(task) => { - rtdebug!("resuming task from work queue"); - this.resume_task_immediately(task); - return true; + let action_id = { + let home = &task.task.home; + match home { + &Some(Sched(ref home_handle)) + if home_handle.sched_id != this.sched_id() => { + 0 + } + &Some(AnySched) if this.run_anything => { + 1 + } + &Some(AnySched) => { + 2 + } + &Some(Sched(_)) => { + 3 + } + &None => { + 4 + } + } + }; + + match action_id { + 0 => { + rtdebug!("sending task home"); + Scheduler::send_task_home(task); + Local::put(this); + return false; + } + 1 => { + rtdebug!("resuming now"); + this.resume_task_immediately(task); + return true; + } + 2 => { + rtdebug!("re-queueing") + this.enqueue_task(task); + Local::put(this); + return false; + } + 3 => { + rtdebug!("resuming now"); + this.resume_task_immediately(task); + return true; + } + 4 => { + abort!("task home was None!"); + } + _ => { + abort!("literally, you should not be here"); + } + } } + None => { - rtdebug!("no tasks in queue"); - Local::put(this); - return false; - } + rtdebug!("no tasks in queue"); + Local::put(this); + return false; + } } } @@ -319,21 +455,32 @@ pub impl Scheduler { abort!("control reached end of task"); } - fn schedule_new_task(~self, task: ~Coroutine) { + pub fn schedule_task(~self, task: ~Coroutine) { assert!(self.in_task_context()); - do self.switch_running_tasks_and_then(task) |sched, last_task| { - let last_task = Cell(last_task); - sched.enqueue_task(last_task.take()); - } - } + // is the task home? + let is_home = task.is_home_no_tls(&self); - fn schedule_task(~self, task: ~Coroutine) { - assert!(self.in_task_context()); + // does the task have a home? + let homed = task.homed(); - do self.switch_running_tasks_and_then(task) |sched, last_task| { - let last_task = Cell(last_task); - sched.enqueue_task(last_task.take()); + let mut this = self; + + if is_home || (!homed && this.run_anything) { + // here we know we are home, execute now OR we know we + // aren't homed, and that this sched doesn't care + do this.switch_running_tasks_and_then(task) |sched, last_task| { + let last_task = Cell(last_task); + sched.enqueue_task(last_task.take()); + } + } else if !homed && !this.run_anything { + // the task isn't homed, but it can't be run here + this.enqueue_task(task); + Local::put(this); + } else { + // task isn't home, so don't run it here, send it home + Scheduler::send_task_home(task); + Local::put(this); } } @@ -515,13 +662,83 @@ impl SchedHandle { } pub impl Coroutine { + + /// This function checks that a coroutine is running "home". + fn is_home(&self) -> bool { + rtdebug!("checking if coroutine is home"); + do Local::borrow:: |sched| { + match self.task.home { + Some(AnySched) => { false } + Some(Sched(SchedHandle { sched_id: ref id, _ })) => { + *id == sched.sched_id() + } + None => { abort!("error: homeless task!"); } + } + } + } + + /// Without access to self, but with access to the "expected home + /// id", see if we are home. + fn is_home_using_id(id: uint) -> bool { + rtdebug!("checking if coroutine is home using id"); + do Local::borrow:: |sched| { + if sched.sched_id() == id { + true + } else { + false + } + } + } + + /// Check if this coroutine has a home + fn homed(&self) -> bool { + rtdebug!("checking if this coroutine has a home"); + match self.task.home { + Some(AnySched) => { false } + Some(Sched(_)) => { true } + None => { abort!("error: homeless task!"); + } + } + } + + /// A version of is_home that does not need to use TLS, it instead + /// takes local scheduler as a parameter. + fn is_home_no_tls(&self, sched: &~Scheduler) -> bool { + rtdebug!("checking if coroutine is home without tls"); + match self.task.home { + Some(AnySched) => { true } + Some(Sched(SchedHandle { sched_id: ref id, _})) => { + *id == sched.sched_id() + } + None => { abort!("error: homeless task!"); } + } + } + + /// Check TLS for the scheduler to see if we are on a special + /// scheduler. + pub fn on_special() -> bool { + rtdebug!("checking if coroutine is executing on special sched"); + do Local::borrow::() |sched| { + !sched.run_anything + } + } + + // Created new variants of "new" that takes a home scheduler + // parameter. The original with_task now calls with_task_homed + // using the AnySched paramter. + + fn new_homed(stack_pool: &mut StackPool, home: SchedHome, start: ~fn()) -> Coroutine { + Coroutine::with_task_homed(stack_pool, ~Task::new(), start, home) + } + fn new(stack_pool: &mut StackPool, start: ~fn()) -> Coroutine { Coroutine::with_task(stack_pool, ~Task::new(), start) } - fn with_task(stack_pool: &mut StackPool, - task: ~Task, - start: ~fn()) -> Coroutine { + fn with_task_homed(stack_pool: &mut StackPool, + task: ~Task, + start: ~fn(), + home: SchedHome) -> Coroutine { static MIN_STACK_SIZE: uint = 10000000; // XXX: Too much stack @@ -529,11 +746,22 @@ pub impl Coroutine { let mut stack = stack_pool.take_segment(MIN_STACK_SIZE); // NB: Context holds a pointer to that ~fn let initial_context = Context::new(start, &mut stack); - return Coroutine { + let mut crt = Coroutine { current_stack_segment: stack, saved_context: initial_context, - task: task + task: task, }; + crt.task.home = Some(home); + return crt; + } + + fn with_task(stack_pool: &mut StackPool, + task: ~Task, + start: ~fn()) -> Coroutine { + Coroutine::with_task_homed(stack_pool, + task, + start, + AnySched) } priv fn build_start_wrapper(start: ~fn()) -> ~fn() { @@ -549,17 +777,20 @@ pub impl Coroutine { let sched = Local::unsafe_borrow::(); let task = (*sched).current_task.get_mut_ref(); - // FIXME #6141: shouldn't neet to put `start()` in another closure + // FIXME #6141: shouldn't neet to put `start()` in + // another closure let start_cell = Cell(start_cell.take()); do task.task.run { - // N.B. Removing `start` from the start wrapper closure - // by emptying a cell is critical for correctness. The ~Task - // pointer, and in turn the closure used to initialize the first - // call frame, is destroyed in scheduler context, not task context. - // So any captured closures must not contain user-definable dtors - // that expect to be in task context. By moving `start` out of - // the closure, all the user code goes out of scope while - // the task is still running. + // N.B. Removing `start` from the start wrapper + // closure by emptying a cell is critical for + // correctness. The ~Task pointer, and in turn the + // closure used to initialize the first call + // frame, is destroyed in scheduler context, not + // task context. So any captured closures must + // not contain user-definable dtors that expect to + // be in task context. By moving `start` out of + // the closure, all the user code goes out of + // scope while the task is still running. let start = start_cell.take(); start(); }; @@ -603,6 +834,305 @@ mod test { use rt::test::*; use super::*; use rt::thread::Thread; + use ptr::to_uint; + + // Confirm that a sched_id actually is the uint form of the + // pointer to the scheduler struct. + + #[test] + fn simple_sched_id_test() { + do run_in_bare_thread { + let sched = ~new_test_uv_sched(); + assert!(to_uint(sched) == sched.sched_id()); + } + } + + // Compare two scheduler ids that are different, this should never + // fail but may catch a mistake someday. + + #[test] + fn compare_sched_id_test() { + do run_in_bare_thread { + let sched_one = ~new_test_uv_sched(); + let sched_two = ~new_test_uv_sched(); + assert!(sched_one.sched_id() != sched_two.sched_id()); + } + } + + // A simple test to check if a homed task run on a single + // scheduler ends up executing while home. + + #[test] + fn test_home_sched() { + do run_in_bare_thread { + let mut task_ran = false; + let task_ran_ptr: *mut bool = &mut task_ran; + let mut sched = ~new_test_uv_sched(); + + let sched_handle = sched.make_handle(); + let sched_id = sched.sched_id(); + + let task = ~do Coroutine::new_homed(&mut sched.stack_pool, + Sched(sched_handle)) { + unsafe { *task_ran_ptr = true }; + let sched = Local::take::(); + assert!(sched.sched_id() == sched_id); + Local::put::(sched); + }; + sched.enqueue_task(task); + sched.run(); + assert!(task_ran); + } + } + + // A test for each state of schedule_task + + #[test] + fn test_schedule_home_states() { + + use rt::uv::uvio::UvEventLoop; + use rt::sched::Shutdown; + use rt::sleeper_list::SleeperList; + use rt::work_queue::WorkQueue; + + do run_in_bare_thread { +// let nthreads = 2; + + let sleepers = SleeperList::new(); + let work_queue = WorkQueue::new(); + + // our normal scheduler + let mut normal_sched = ~Scheduler::new( + ~UvEventLoop::new(), + work_queue.clone(), + sleepers.clone()); + + let normal_handle = Cell(normal_sched.make_handle()); + + // our special scheduler + let mut special_sched = ~Scheduler::new_special( + ~UvEventLoop::new(), + work_queue.clone(), + sleepers.clone(), + true); + + let special_handle = Cell(special_sched.make_handle()); + let special_handle2 = Cell(special_sched.make_handle()); + let special_id = special_sched.sched_id(); + let t1_handle = special_sched.make_handle(); + let t4_handle = special_sched.make_handle(); + + let t1f = ~do Coroutine::new_homed(&mut special_sched.stack_pool, + Sched(t1_handle)) { + let is_home = Coroutine::is_home_using_id(special_id); + rtdebug!("t1 should be home: %b", is_home); + assert!(is_home); + }; + let t1f = Cell(t1f); + + let t2f = ~do Coroutine::new(&mut normal_sched.stack_pool) { + let on_special = Coroutine::on_special(); + rtdebug!("t2 should not be on special: %b", on_special); + assert!(!on_special); + }; + let t2f = Cell(t2f); + + let t3f = ~do Coroutine::new(&mut normal_sched.stack_pool) { + // not on special + let on_special = Coroutine::on_special(); + rtdebug!("t3 should not be on special: %b", on_special); + assert!(!on_special); + }; + let t3f = Cell(t3f); + + let t4f = ~do Coroutine::new_homed(&mut special_sched.stack_pool, + Sched(t4_handle)) { + // is home + let home = Coroutine::is_home_using_id(special_id); + rtdebug!("t4 should be home: %b", home); + assert!(home); + }; + let t4f = Cell(t4f); + + // we have four tests, make them as closures + let t1: ~fn() = || { + // task is home on special + let task = t1f.take(); + let sched = Local::take::(); + sched.schedule_task(task); + }; + let t2: ~fn() = || { + // not homed, task doesn't care + let task = t2f.take(); + let sched = Local::take::(); + sched.schedule_task(task); + }; + let t3: ~fn() = || { + // task not homed, must leave + let task = t3f.take(); + let sched = Local::take::(); + sched.schedule_task(task); + }; + let t4: ~fn() = || { + // task not home, send home + let task = t4f.take(); + let sched = Local::take::(); + sched.schedule_task(task); + }; + + let t1 = Cell(t1); + let t2 = Cell(t2); + let t3 = Cell(t3); + let t4 = Cell(t4); + + // build a main task that runs our four tests + let main_task = ~do Coroutine::new(&mut normal_sched.stack_pool) { + // the two tasks that require a normal start location + t2.take()(); + t4.take()(); + normal_handle.take().send(Shutdown); + special_handle.take().send(Shutdown); + }; + + // task to run the two "special start" tests + let special_task = ~do Coroutine::new_homed( + &mut special_sched.stack_pool, + Sched(special_handle2.take())) { + t1.take()(); + t3.take()(); + }; + + // enqueue the main tasks + normal_sched.enqueue_task(special_task); + normal_sched.enqueue_task(main_task); + + let nsched_cell = Cell(normal_sched); + let normal_thread = do Thread::start { + let sched = nsched_cell.take(); + sched.run(); + }; + + let ssched_cell = Cell(special_sched); + let special_thread = do Thread::start { + let sched = ssched_cell.take(); + sched.run(); + }; + + // wait for the end + let _thread1 = normal_thread; + let _thread2 = special_thread; + + } + } + + // The following test is a bit of a mess, but it trys to do + // something tricky so I'm not sure how to get around this in the + // short term. + + // A number of schedulers are created, and then a task is created + // and assigned a home scheduler. It is then "started" on a + // different scheduler. The scheduler it is started on should + // observe that the task is not home, and send it home. + + // This test is light in that it does very little. + + #[test] + fn test_transfer_task_home() { + + use rt::uv::uvio::UvEventLoop; + use rt::sched::Shutdown; + use rt::sleeper_list::SleeperList; + use rt::work_queue::WorkQueue; + use uint; + use container::Container; + use old_iter::MutableIter; + use vec::OwnedVector; + + do run_in_bare_thread { + + static N: uint = 8; + + let sleepers = SleeperList::new(); + let work_queue = WorkQueue::new(); + + let mut handles = ~[]; + let mut scheds = ~[]; + + for uint::range(0, N) |_| { + let loop_ = ~UvEventLoop::new(); + let mut sched = ~Scheduler::new(loop_, + work_queue.clone(), + sleepers.clone()); + let handle = sched.make_handle(); + rtdebug!("sched id: %u", handle.sched_id); + handles.push(handle); + scheds.push(sched); + }; + + let handles = Cell(handles); + + let home_handle = scheds[6].make_handle(); + let home_id = home_handle.sched_id; + let home = Sched(home_handle); + + let main_task = ~do Coroutine::new_homed(&mut scheds[1].stack_pool, home) { + + // Here we check if the task is running on its home. + let sched = Local::take::(); + rtdebug!("run location scheduler id: %u, home: %u", + sched.sched_id(), + home_id); + assert!(sched.sched_id() == home_id); + Local::put::(sched); + + let mut handles = handles.take(); + for handles.each_mut |handle| { + handle.send(Shutdown); + } + }; + + scheds[0].enqueue_task(main_task); + + let mut threads = ~[]; + + while !scheds.is_empty() { + let sched = scheds.pop(); + let sched_cell = Cell(sched); + let thread = do Thread::start { + let sched = sched_cell.take(); + sched.run(); + }; + threads.push(thread); + } + + let _threads = threads; + } + } + + // Do it a lot + + #[test] + fn test_stress_schedule_task_states() { + let n = stress_factor() * 120; + for int::range(0,n as int) |_| { + test_schedule_home_states(); + } + } + + // The goal is that this is the high-stress test for making sure + // homing is working. It allocates RUST_RT_STRESS tasks that + // do nothing but assert that they are home at execution + // time. These tasks are queued to random schedulers, so sometimes + // they are home and sometimes not. It also runs RUST_RT_STRESS + // times. + + #[test] + fn test_stress_homed_tasks() { + let n = stress_factor(); + for int::range(0,n as int) |_| { + run_in_mt_newsched_task_random_homed(); + } + } #[test] fn test_simple_scheduling() { @@ -683,7 +1213,7 @@ mod test { assert_eq!(count, MAX); fn run_task(count_ptr: *mut int) { - do Local::borrow:: |sched| { + do Local::borrow:: |sched| { let task = ~do Coroutine::new(&mut sched.stack_pool) { unsafe { *count_ptr = *count_ptr + 1; @@ -859,8 +1389,8 @@ mod test { fn start_closure_dtor() { use ops::Drop; - // Regression test that the `start` task entrypoint can contain dtors - // that use task resources + // Regression test that the `start` task entrypoint can + // contain dtors that use task resources do run_in_newsched_task { struct S { field: () } @@ -875,6 +1405,7 @@ mod test { do spawntask { let _ss = &s; } - } + } } + } diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index cf4967b12b304..06318ac6623b0 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -19,6 +19,7 @@ use cast::transmute; use rt::local::Local; use super::local_heap::LocalHeap; use rt::logging::StdErrLogger; +use rt::sched::{SchedHome, AnySched}; pub struct Task { heap: LocalHeap, @@ -26,7 +27,8 @@ pub struct Task { storage: LocalStorage, logger: StdErrLogger, unwinder: Option, - destroyed: bool + destroyed: bool, + home: Option } pub struct GarbageCollector; @@ -44,7 +46,8 @@ impl Task { storage: LocalStorage(ptr::null(), None), logger: StdErrLogger, unwinder: Some(Unwinder { unwinding: false }), - destroyed: false + destroyed: false, + home: Some(AnySched) } } @@ -55,14 +58,19 @@ impl Task { storage: LocalStorage(ptr::null(), None), logger: StdErrLogger, unwinder: None, - destroyed: false + destroyed: false, + home: Some(AnySched) } } + pub fn give_home(&mut self, new_home: SchedHome) { + self.home = Some(new_home); + } + pub fn run(&mut self, f: &fn()) { // This is just an assertion that `run` was called unsafely // and this instance of Task is still accessible. - do Local::borrow:: |task| { + do Local::borrow:: |task| { assert!(ptr::ref_eq(task, self)); } @@ -87,7 +95,7 @@ impl Task { fn destroy(&mut self) { // This is just an assertion that `destroy` was called unsafely // and this instance of Task is still accessible. - do Local::borrow:: |task| { + do Local::borrow:: |task| { assert!(ptr::ref_eq(task, self)); } match self.storage { diff --git a/src/libstd/rt/test.rs b/src/libstd/rt/test.rs index c8df3a6120338..bb284c0254179 100644 --- a/src/libstd/rt/test.rs +++ b/src/libstd/rt/test.rs @@ -88,6 +88,7 @@ pub fn run_in_mt_newsched_task(f: ~fn()) { let loop_ = ~UvEventLoop::new(); let mut sched = ~Scheduler::new(loop_, work_queue.clone(), sleepers.clone()); let handle = sched.make_handle(); + handles.push(handle); scheds.push(sched); } @@ -128,15 +129,128 @@ pub fn run_in_mt_newsched_task(f: ~fn()) { } } +// THIS IS AWFUL. Copy-pasted the above initialization function but +// with a number of hacks to make it spawn tasks on a variety of +// schedulers with a variety of homes using the new spawn. + +pub fn run_in_mt_newsched_task_random_homed() { + use libc; + use os; + use from_str::FromStr; + use rt::uv::uvio::UvEventLoop; + use rt::sched::Shutdown; + + do run_in_bare_thread { + let nthreads = match os::getenv("RUST_TEST_THREADS") { + Some(nstr) => FromStr::from_str(nstr).get(), + None => unsafe { + // Using more threads than cores in test code to force + // the OS to preempt them frequently. Assuming that + // this help stress test concurrent types. + rust_get_num_cpus() * 2 + } + }; + + let sleepers = SleeperList::new(); + let work_queue = WorkQueue::new(); + + let mut handles = ~[]; + let mut scheds = ~[]; + + // create a few special schedulers, those with even indicies + // will be pinned-only + for uint::range(0, nthreads) |i| { + let special = (i % 2) == 0; + let loop_ = ~UvEventLoop::new(); + let mut sched = ~Scheduler::new_special( + loop_, work_queue.clone(), sleepers.clone(), special); + let handle = sched.make_handle(); + handles.push(handle); + scheds.push(sched); + } + + // Schedule a pile o tasks + let n = 5*stress_factor(); + for uint::range(0,n) |_i| { + rtdebug!("creating task: %u", _i); + let hf: ~fn() = || { assert!(true) }; + spawntask_homed(&mut scheds, hf); + } + + // Now we want another pile o tasks that do not ever run on a + // special scheduler, because they are normal tasks. Because + // we can we put these in the "main" task. + + let n = 5*stress_factor(); + + let f: ~fn() = || { + for uint::range(0,n) |_| { + let f: ~fn() = || { + // Borrow the scheduler we run on and check if it is + // privileged. + do Local::borrow:: |sched| { + assert!(sched.run_anything); + }; + }; + spawntask_random(f); + }; + }; + + let f_cell = Cell(f); + let handles = Cell(handles); + + rtdebug!("creating main task"); + + let main_task = ~do Coroutine::new(&mut scheds[0].stack_pool) { + f_cell.take()(); + let mut handles = handles.take(); + // Tell schedulers to exit + for handles.each_mut |handle| { + handle.send(Shutdown); + } + }; + + rtdebug!("queuing main task") + + scheds[0].enqueue_task(main_task); + + let mut threads = ~[]; + + while !scheds.is_empty() { + let sched = scheds.pop(); + let sched_cell = Cell(sched); + let thread = do Thread::start { + let sched = sched_cell.take(); + rtdebug!("running sched: %u", sched.sched_id()); + sched.run(); + }; + + threads.push(thread); + } + + rtdebug!("waiting on scheduler threads"); + + // Wait for schedulers + let _threads = threads; + } + + extern { + fn rust_get_num_cpus() -> libc::uintptr_t; + } +} + + /// Test tasks will abort on failure instead of unwinding pub fn spawntask(f: ~fn()) { use super::sched::*; + rtdebug!("spawntask taking the scheduler from TLS") let mut sched = Local::take::(); let task = ~Coroutine::with_task(&mut sched.stack_pool, ~Task::without_unwinding(), f); - sched.schedule_new_task(task); + rtdebug!("spawntask scheduling the new task"); + sched.schedule_task(task); } /// Create a new task and run it right now. Aborts on failure @@ -188,6 +302,39 @@ pub fn spawntask_random(f: ~fn()) { } } +/// Spawn a task, with the current scheduler as home, and queue it to +/// run later. +pub fn spawntask_homed(scheds: &mut ~[~Scheduler], f: ~fn()) { + use super::sched::*; + use rand::{rng, RngUtil}; + let mut rng = rng(); + + let task = { + let sched = &mut scheds[rng.gen_int_range(0,scheds.len() as int)]; + let handle = sched.make_handle(); + let home_id = handle.sched_id; + + // now that we know where this is going, build a new function + // that can assert it is in the right place + let af: ~fn() = || { + do Local::borrow::() |sched| { + rtdebug!("home_id: %u, runtime loc: %u", + home_id, + sched.sched_id()); + assert!(home_id == sched.sched_id()); + }; + f() + }; + + ~Coroutine::with_task_homed(&mut sched.stack_pool, + ~Task::without_unwinding(), + af, + Sched(handle)) + }; + let dest_sched = &mut scheds[rng.gen_int_range(0,scheds.len() as int)]; + // enqueue it for future execution + dest_sched.enqueue_task(task); +} /// Spawn a task and wait for it to finish, returning whether it completed successfully or failed pub fn spawntask_try(f: ~fn()) -> Result<(), ()> { @@ -266,3 +413,4 @@ pub fn stress_factor() -> uint { } } + diff --git a/src/libstd/rt/tube.rs b/src/libstd/rt/tube.rs index 4482a92d916aa..c94b0bd642362 100644 --- a/src/libstd/rt/tube.rs +++ b/src/libstd/rt/tube.rs @@ -155,7 +155,7 @@ mod test { if i == 100 { return; } let tube = Cell(Cell(tube)); - do Local::borrow:: |sched| { + do Local::borrow:: |sched| { let tube = tube.take(); do sched.event_loop.callback { let mut tube = tube.take(); diff --git a/src/libstd/rt/uv/uvio.rs b/src/libstd/rt/uv/uvio.rs index 0f98ab11513d6..ebeb1c204514f 100644 --- a/src/libstd/rt/uv/uvio.rs +++ b/src/libstd/rt/uv/uvio.rs @@ -167,7 +167,7 @@ mod test_remote { let mut tube = Tube::new(); let tube_clone = tube.clone(); let remote_cell = cell::empty_cell(); - do Local::borrow::() |sched| { + do Local::borrow::() |sched| { let tube_clone = tube_clone.clone(); let tube_clone_cell = Cell(tube_clone); let remote = do sched.event_loop.remote_callback { diff --git a/src/libstd/task/mod.rs b/src/libstd/task/mod.rs index f24d2327358be..df5b88207eccf 100644 --- a/src/libstd/task/mod.rs +++ b/src/libstd/task/mod.rs @@ -514,7 +514,7 @@ pub fn failing() -> bool { } _ => { let mut unwinding = false; - do Local::borrow:: |local| { + do Local::borrow:: |local| { unwinding = match local.unwinder { Some(unwinder) => { unwinder.unwinding diff --git a/src/libstd/task/spawn.rs b/src/libstd/task/spawn.rs index 5941221821a85..5e507238f671f 100644 --- a/src/libstd/task/spawn.rs +++ b/src/libstd/task/spawn.rs @@ -578,7 +578,7 @@ fn spawn_raw_newsched(_opts: TaskOpts, f: ~fn()) { let mut sched = Local::take::(); let task = ~Coroutine::new(&mut sched.stack_pool, f); - sched.schedule_new_task(task); + sched.schedule_task(task); } fn spawn_raw_oldsched(mut opts: TaskOpts, f: ~fn()) { diff --git a/src/libstd/unstable/lang.rs b/src/libstd/unstable/lang.rs index 350b18d454169..21ef347874468 100644 --- a/src/libstd/unstable/lang.rs +++ b/src/libstd/unstable/lang.rs @@ -244,7 +244,7 @@ pub unsafe fn local_malloc(td: *c_char, size: uintptr_t) -> *c_char { } _ => { let mut alloc = ::ptr::null(); - do Local::borrow:: |task| { + do Local::borrow:: |task| { alloc = task.heap.alloc(td as *c_void, size as uint) as *c_char; } return alloc; @@ -262,7 +262,7 @@ pub unsafe fn local_free(ptr: *c_char) { rustrt::rust_upcall_free_noswitch(ptr); } _ => { - do Local::borrow:: |task| { + do Local::borrow:: |task| { task.heap.free(ptr as *c_void); } }