Skip to content

Fixed OutOfMemoryError with CPU scheduler in recursive mode. #643

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 58 additions & 59 deletions rxjava-core/src/main/java/rx/schedulers/ExecutorScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import rx.Scheduler;
import rx.Subscription;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.ForwardSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Func2;

Expand All @@ -33,7 +34,7 @@
* <p>
* Note that if an {@link Executor} implementation is used instead of {@link ScheduledExecutorService} then a system-wide Timer will be used to handle delayed events.
*/
public class ExecutorScheduler extends Scheduler {
public class ExecutorScheduler extends Scheduler implements ReentrantSchedulerHelper {
private final Executor executor;

public ExecutorScheduler(Executor executor) {
Expand All @@ -47,18 +48,17 @@ public ExecutorScheduler(ScheduledExecutorService executor) {
@Override
public <T> Subscription schedulePeriodically(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action, long initialDelay, long period, TimeUnit unit) {
if (executor instanceof ScheduledExecutorService) {
final CompositeSubscription subscriptions = new CompositeSubscription();
CompositeSubscription subscription = new CompositeSubscription();
final ForwardSubscription scheduleSub = new ForwardSubscription();
final ForwardSubscription actionSub = new ForwardSubscription();
subscription.add(scheduleSub);
subscription.add(actionSub);

ScheduledFuture<?> f = ((ScheduledExecutorService) executor).scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
Subscription s = action.call(ExecutorScheduler.this, state);
subscriptions.add(s);
}
}, initialDelay, period, unit);
final Scheduler _scheduler = new ReentrantScheduler(this, scheduleSub, actionSub, subscription);

subscriptions.add(Subscriptions.from(f));
return subscriptions;
_scheduler.schedulePeriodically(state, action, initialDelay, period, unit);

return subscription;

} else {
return super.schedulePeriodically(state, action, initialDelay, period, unit);
Expand All @@ -67,81 +67,80 @@ public void run() {

@Override
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action, long delayTime, TimeUnit unit) {
final DiscardableAction<T> discardableAction = new DiscardableAction<T>(state, action);
final Scheduler _scheduler = this;
CompositeSubscription subscription = new CompositeSubscription();
final ForwardSubscription scheduleSub = new ForwardSubscription();
final ForwardSubscription actionSub = new ForwardSubscription();
subscription.add(scheduleSub);
subscription.add(actionSub);

final Scheduler _scheduler = new ReentrantScheduler(this, scheduleSub, actionSub, subscription);

_scheduler.schedule(state, action, delayTime, unit);

return subscription;
}

@Override
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
// all subscriptions that may need to be unsubscribed
final CompositeSubscription subscription = new CompositeSubscription(discardableAction);
CompositeSubscription subscription = new CompositeSubscription();
final ForwardSubscription scheduleSub = new ForwardSubscription();
final ForwardSubscription actionSub = new ForwardSubscription();
subscription.add(scheduleSub);
subscription.add(actionSub);

final Scheduler _scheduler = new ReentrantScheduler(this, scheduleSub, actionSub, subscription);

_scheduler.schedule(state, action);

return subscription;
}

@Override
public void scheduleTask(Runnable r, ForwardSubscription out, long delayTime, TimeUnit unit) {
Subscription before = out.getSubscription();
if (executor instanceof ScheduledExecutorService) {
// we are a ScheduledExecutorService so can do proper scheduling
ScheduledFuture<?> f = ((ScheduledExecutorService) executor).schedule(new Runnable() {
@Override
public void run() {
// when the delay has passed we now do the work on the actual scheduler
Subscription s = discardableAction.call(_scheduler);
// add the subscription to the CompositeSubscription so it is unsubscribed
subscription.add(s);
}
}, delayTime, unit);
ScheduledFuture<?> f = ((ScheduledExecutorService) executor).schedule(r, delayTime, unit);
// add the ScheduledFuture as a subscription so we can cancel the scheduled action if an unsubscribe happens
subscription.add(Subscriptions.from(f));
out.compareExchange(before, Subscriptions.from(f));
} else {
// we are not a ScheduledExecutorService so can't directly schedule
if (delayTime == 0) {
// no delay so put on the thread-pool right now
Subscription s = schedule(state, action);
// add the subscription to the CompositeSubscription so it is unsubscribed
subscription.add(s);
scheduleTask(r, out);
} else {
// there is a delay and this isn't a ScheduledExecutorService so we'll use a system-wide ScheduledExecutorService
// to handle the scheduling and once it's ready then execute on this Executor
ScheduledFuture<?> f = GenericScheduledExecutorService.getInstance().schedule(new Runnable() {

@Override
public void run() {
// now execute on the real Executor (by using the other overload that schedules for immediate execution)
Subscription s = _scheduler.schedule(state, action);
// add the subscription to the CompositeSubscription so it is unsubscribed
subscription.add(s);
}
}, delayTime, unit);
ScheduledFuture<?> f = GenericScheduledExecutorService.getInstance().schedule(r, delayTime, unit);
// add the ScheduledFuture as a subscription so we can cancel the scheduled action if an unsubscribe happens
subscription.add(Subscriptions.from(f));
out.compareExchange(before, Subscriptions.from(f));
}
}
return subscription;
}

@Override
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
final DiscardableAction<T> discardableAction = new DiscardableAction<T>(state, action);
final Scheduler _scheduler = this;
// all subscriptions that may need to be unsubscribed
final CompositeSubscription subscription = new CompositeSubscription(discardableAction);

// work to be done on a thread
Runnable r = new Runnable() {
@Override
public void run() {
Subscription s = discardableAction.call(_scheduler);
// add the subscription to the CompositeSubscription so it is unsubscribed
subscription.add(s);
}
};

public void scheduleTask(Runnable r, ForwardSubscription out) {
Subscription before = out.getSubscription();
// submit for immediate execution
if (executor instanceof ExecutorService) {
// we are an ExecutorService so get a Future back that supports unsubscribe
Future<?> f = ((ExecutorService) executor).submit(r);
// add the Future as a subscription so we can cancel the scheduled action if an unsubscribe happens
subscription.add(Subscriptions.from(f));
out.compareExchange(before, Subscriptions.from(f));
} else {
// we are the lowest common denominator so can't unsubscribe once we execute
executor.execute(r);
out.compareExchange(before, Subscriptions.empty());
}
}

return subscription;
@Override
public void scheduleTask(Runnable r, ForwardSubscription out, long initialDelay, long period, TimeUnit unit) {
Subscription before = out.getSubscription();
ScheduledFuture<?> f = ((ScheduledExecutorService) executor).scheduleAtFixedRate(r, initialDelay, period, unit);

out.compareExchange(before, Subscriptions.from(f));
}

}
151 changes: 151 additions & 0 deletions rxjava-core/src/main/java/rx/schedulers/ReentrantScheduler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.schedulers;

import java.util.concurrent.TimeUnit;
import rx.Scheduler;
import rx.Subscription;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.ForwardSubscription;
import rx.subscriptions.SerialSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Func1;
import rx.util.functions.Func2;

/**
* Do not re-enter the main scheduler's schedule() method as it will
* unnecessarily chain the subscriptions of every invocation.
*/
public final class ReentrantScheduler extends Scheduler {
final ReentrantSchedulerHelper scheduler;
final ForwardSubscription scheduleSub;
final ForwardSubscription actionSub;
final CompositeSubscription composite;

public ReentrantScheduler(
ReentrantSchedulerHelper scheduler,
ForwardSubscription scheduleSub,
ForwardSubscription actionSub,
CompositeSubscription composite) {
this.scheduler = scheduler;
this.scheduleSub = scheduleSub;
this.actionSub = actionSub;
this.composite = composite;
}

@Override
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
if (composite.isUnsubscribed()) {
// don't bother scheduling a task which wouldn't run anyway
return Subscriptions.empty();
}
Subscription before = actionSub.getSubscription();
final DiscardableAction<T> discardableAction = new DiscardableAction<T>(state, action);

actionSub.compareExchange(before, discardableAction);

Runnable r = new Runnable() {
@Override
public void run() {
Subscription sbefore = actionSub.getSubscription();
Subscription s = discardableAction.call(ReentrantScheduler.this);
actionSub.compareExchange(sbefore, s);
}
};

scheduler.scheduleTask(r, scheduleSub);

return composite;
}

@Override
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action, long delayTime, TimeUnit unit) {
if (composite.isUnsubscribed()) {
// don't bother scheduling a task which wouldn't run anyway
return Subscriptions.empty();
}

Subscription before = actionSub.getSubscription();
final DiscardableAction<T> discardableAction = new DiscardableAction<T>(state, action);
actionSub.compareExchange(before, discardableAction);

Runnable r = new Runnable() {
@Override
public void run() {
Subscription sbefore = actionSub.getSubscription();
Subscription s = discardableAction.call(ReentrantScheduler.this);
actionSub.compareExchange(sbefore, s);
}
};
scheduler.scheduleTask(r, scheduleSub, delayTime, unit);

return composite;
}

@Override
public <T> Subscription schedulePeriodically(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action, long initialDelay, long period, TimeUnit unit) {
if (composite.isUnsubscribed()) {
// don't bother scheduling a task which wouldn't run anyway
return Subscriptions.empty();
}

Subscription before = actionSub.getSubscription();
final PeriodicAction<T> periodicAction = new PeriodicAction<T>(state, action);
actionSub.compareExchange(before, periodicAction);

Runnable r = new Runnable() {
@Override
public void run() {
Subscription sbefore = actionSub.getSubscription();
Subscription s = periodicAction.call(ReentrantScheduler.this);
actionSub.compareExchange(sbefore, s);
}
};
scheduler.scheduleTask(r, scheduleSub, initialDelay, period, unit);

return composite;
}
/**
* An action that calls the underlying function in a periodic environment.
* @param <T> the state value type
*/
private static final class PeriodicAction<T> implements Subscription, Func1<Scheduler, Subscription> {
final T state;
final Func2<? super Scheduler, ? super T, ? extends Subscription> underlying;
final SerialSubscription ssub;

public PeriodicAction(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> underlying) {
this.state = state;
this.underlying = underlying;
this.ssub = new SerialSubscription();
}

@Override
public Subscription call(Scheduler scheduler) {
if (!ssub.isUnsubscribed()) {
Subscription s = underlying.call(scheduler, state);
ssub.setSubscription(s);
return ssub;
}
return Subscriptions.empty();
}

@Override
public void unsubscribe() {
ssub.unsubscribe();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.schedulers;

import java.util.concurrent.TimeUnit;
import rx.subscriptions.ForwardSubscription;

/**
* Simple scheduler API used by the ReentrantScheduler to
* communicate with the actual scheduler implementation.
*/
public interface ReentrantSchedulerHelper {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels wrong that we have a "helper" that schedulers extend from. That implies that the Scheduler interface is wrong.

/cc @headinthebox

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I needed this as the scheduler working with a reentrant scheduler should provide some core scheduling operations without creating chained subscriptions through the standard API. But ExecutorScheduler could implement this privately so it doesn't show up in the signature.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the default (Executor)Scheduler would schedule the stateless tasks without turning them into a recursive call, there were no need to use a helper:

/**
 * Schedules an action to be executed.
 * 
 * @param action
 *            action
 * @return a subscription to be able to unsubscribe from action.
 */
public Subscription schedule(final Action0 action) {
    return schedule(null, new Func2<Scheduler, Void, Subscription>() {
        @Override
        public Subscription call(Scheduler scheduler, Void state) {
            action.call();
            return Subscriptions.empty();
        }
    });
}

/**
* Schedule a task to be run immediately and update the subscription
* describing the schedule.
* @param r the task to run immediately
* @param out the subscription holding the current schedule subscription
*/
void scheduleTask(Runnable r, ForwardSubscription out);

/**
* Schedule a task to be run after the delay time and update the subscription
* describing the schedule.
* @param r the task to schedule
* @param out the subscription holding the current schedule subscription
* @param delayTime the time to delay the execution
* @param unit the time unit
*/
void scheduleTask(Runnable r, ForwardSubscription out, long delayTime, TimeUnit unit);

/**
* Schedule a task to be run after the delay time and after
* each period, then update the subscription describing the schedule.
* @param r the task to schedule
* @param out the subscription holding the current schedule subscription
* @param initialDelay the initial delay of the schedule
* @param period the between period of the schedule
* @param unit the time unit
*/
void scheduleTask(Runnable r, ForwardSubscription out, long initialDelay, long period, TimeUnit unit);
}
Loading