diff --git a/rxjava-core/src/main/java/rx/schedulers/NewThreadScheduler.java b/rxjava-core/src/main/java/rx/schedulers/NewThreadScheduler.java index b2fdff50d2..7e460923a6 100644 --- a/rxjava-core/src/main/java/rx/schedulers/NewThreadScheduler.java +++ b/rxjava-core/src/main/java/rx/schedulers/NewThreadScheduler.java @@ -52,7 +52,9 @@ private EventLoopScheduler() { @Override public Thread newThread(Runnable r) { - return new Thread(r, "RxNewThreadScheduler-" + count.incrementAndGet()); + Thread t = new Thread(r, "RxNewThreadScheduler-" + count.incrementAndGet()); + t.setDaemon(true); + return t; } }); } diff --git a/rxjava-core/src/test/java/rx/operators/OperationParallelMergeTest.java b/rxjava-core/src/test/java/rx/operators/OperationParallelMergeTest.java index 907993ef10..7872ae59d5 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationParallelMergeTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationParallelMergeTest.java @@ -58,24 +58,8 @@ public void testParallelMerge() { @Test public void testNumberOfThreads() { - final ConcurrentHashMap threads = new ConcurrentHashMap(); - Observable.merge(getStreams()) - .toBlockingObservable().forEach(new Action1() { - - @Override - public void call(String o) { - System.out.println("o: " + o + " Thread: " + Thread.currentThread()); - threads.put(Thread.currentThread().getName(), Thread.currentThread().getName()); - } - }); - - // without injecting anything, the getStream() method uses Interval which runs on a default scheduler - assertEquals(Runtime.getRuntime().availableProcessors(), threads.keySet().size()); - - // clear - threads.clear(); - - // now we parallelMerge into 3 streams and observeOn for each + final ConcurrentHashMap threads = new ConcurrentHashMap(); + // parallelMerge into 3 streams and observeOn for each // we expect 3 threads in the output OperationParallelMerge.parallelMerge(getStreams(), 3) .flatMap(new Func1, Observable>() { @@ -90,8 +74,8 @@ public Observable call(Observable o) { @Override public void call(String o) { - System.out.println("o: " + o + " Thread: " + Thread.currentThread()); - threads.put(Thread.currentThread().getName(), Thread.currentThread().getName()); + System.out.println("o: " + o + " Thread: " + Thread.currentThread().getId()); + threads.put(Thread.currentThread().getId(), Thread.currentThread().getId()); } }); @@ -100,7 +84,7 @@ public void call(String o) { @Test public void testNumberOfThreadsOnScheduledMerge() { - final ConcurrentHashMap threads = new ConcurrentHashMap(); + final ConcurrentHashMap threads = new ConcurrentHashMap(); // now we parallelMerge into 3 streams and observeOn for each // we expect 3 threads in the output @@ -109,8 +93,8 @@ public void testNumberOfThreadsOnScheduledMerge() { @Override public void call(String o) { - System.out.println("o: " + o + " Thread: " + Thread.currentThread()); - threads.put(Thread.currentThread().getName(), Thread.currentThread().getName()); + System.out.println("o: " + o + " Thread: " + Thread.currentThread().getId()); + threads.put(Thread.currentThread().getId(), Thread.currentThread().getId()); } });