Class Scheduler

  • Direct Known Subclasses:
    ComputationScheduler, ExecutorScheduler, ImmediateThinScheduler, IoScheduler, NewThreadScheduler, SchedulerWhen, SingleScheduler, TestScheduler, TrampolineScheduler

    public abstract class Scheduler
    extends java.lang.Object
    A Scheduler is an object that specifies an API for scheduling units of work provided in the form of Runnables to be executed without delay (effectively as soon as possible), after a specified time delay or periodically and represents an abstraction over an asynchronous boundary that ensures these units of work get executed by some underlying task-execution scheme (such as custom Threads, event loop, Executor or Actor system) with some uniform properties and guarantees regardless of the particular underlying scheme.

    You can get various standard, RxJava-specific instances of this class via the static methods of the Schedulers utility class.

    The so-called Scheduler.Workers of a Scheduler can be created via the createWorker() method which allow the scheduling of multiple Runnable tasks in an isolated manner. Runnable tasks scheduled on a Worker are guaranteed to be executed sequentially and in a non-overlapping fashion. Non-delayed Runnable tasks are guaranteed to execute in a First-In-First-Out order but their execution may be interleaved with delayed tasks. In addition, outstanding or running tasks can be cancelled together via Disposable.dispose() without affecting any other Worker instances of the same Scheduler.

    Implementations of the scheduleDirect(java.lang.Runnable) and Scheduler.Worker.schedule(java.lang.Runnable) methods are encouraged to call the RxJavaPlugins.onSchedule(Runnable) method to allow a scheduler hook to manipulate (wrap or replace) the original Runnable task before it is submitted to the underlying task-execution scheme.

    The default implementations of the scheduleDirect methods provided by this abstract class delegate to the respective schedule methods in the Scheduler.Worker instance created via createWorker() for each individual Runnable task submitted. Implementors of this class are encouraged to provide a more efficient direct scheduling implementation to avoid the time and memory overhead of creating such Workers for every task. This delegation is done via special wrapper instances around the original Runnable before calling the respective Worker.schedule method. Note that this can lead to multiple RxJavaPlugins.onSchedule calls and potentially multiple hooks applied. Therefore, the default implementations of scheduleDirect (and the Scheduler.Worker.schedulePeriodically(Runnable, long, long, TimeUnit)) wrap the incoming Runnable into a class that implements the SchedulerRunnableIntrospection interface which can grant access to the original or hooked Runnable, thus, a repeated RxJavaPlugins.onSchedule can detect the earlier hook and not apply a new one over again.

    The default implementation of now(TimeUnit) and Scheduler.Worker.now(TimeUnit) methods to return current System.currentTimeMillis() value in the desired time unit, unless rx3.scheduler.use-nanotime (boolean) is set. When the property is set to true, the method uses System.nanoTime() as its basis instead. Custom Scheduler implementations can override this to provide specialized time accounting (such as virtual time to be advanced programmatically). Note that operators requiring a Scheduler may rely on either of the now() calls provided by Scheduler or Worker respectively, therefore, it is recommended they represent a logically consistent source of the current time.

    The default implementation of the Scheduler.Worker.schedulePeriodically(Runnable, long, long, TimeUnit) method uses the Scheduler.Worker.schedule(Runnable, long, TimeUnit) for scheduling the Runnable task periodically. The algorithm calculates the next absolute time when the task should run again and schedules this execution based on the relative time between it and Scheduler.Worker.now(TimeUnit). However, drifts or changes in the system clock could affect this calculation either by scheduling subsequent runs too frequently or too far apart. Therefore, the default implementation uses the clockDriftTolerance() value (set via rx3.scheduler.drift-tolerance and rx3.scheduler.drift-tolerance-unit) to detect a drift in Scheduler.Worker.now(TimeUnit) and re-adjust the absolute/relative time calculation accordingly.

    The default implementations of start() and shutdown() do nothing and should be overridden if the underlying task-execution scheme supports stopping and restarting itself.

    If the Scheduler is shut down or a Worker is disposed, the schedule methods should return the Disposable.disposed() singleton instance indicating the shut down/disposed state to the caller. Since the shutdown or dispose can happen from any thread, the schedule implementations should make best effort to cancel tasks immediately after those tasks have been submitted to the underlying task-execution scheme if the shutdown/dispose was detected after this submission.

    All methods on the Scheduler and Worker classes should be thread safe.

    • Field Detail

      • IS_DRIFT_USE_NANOTIME

        static boolean IS_DRIFT_USE_NANOTIME
        Value representing whether to use System.nanoTime(), or default as clock for now(TimeUnit) and Scheduler.Worker.now(TimeUnit).

        Associated system parameter:

        • rx3.scheduler.use-nanotime, boolean, default false
      • CLOCK_DRIFT_TOLERANCE_NANOSECONDS

        static final long CLOCK_DRIFT_TOLERANCE_NANOSECONDS
        The tolerance for a clock drift in nanoseconds where the periodic scheduler will rebase.

        Associated system parameters:

        • rx3.scheduler.drift-tolerance, long, default 15
        • rx3.scheduler.drift-tolerance-unit, string, default minutes, supports seconds and milliseconds.
    • Constructor Detail

      • Scheduler

        public Scheduler()
    • Method Detail

      • computeNow

        static long computeNow​(java.util.concurrent.TimeUnit unit)
        Returns the current clock time depending on state of IS_DRIFT_USE_NANOTIME in given unit

        By default System.currentTimeMillis() will be used as the clock. When the property is set System.nanoTime() will be used.

        Parameters:
        unit - the time unit
        Returns:
        the 'current time' in given unit
        Throws:
        java.lang.NullPointerException - if unit is null
      • computeClockDrift

        static long computeClockDrift​(long time,
                                      java.lang.String timeUnit)
        Returns the clock drift tolerance in nanoseconds based on the input selection.
        Parameters:
        time - the time value
        timeUnit - the time unit string
        Returns:
        the time amount in nanoseconds
      • clockDriftTolerance

        public static long clockDriftTolerance()
        Returns the clock drift tolerance in nanoseconds.

        Related system properties:

        • rx3.scheduler.drift-tolerance, long, default 15
        • rx3.scheduler.drift-tolerance-unit, string, default minutes, supports seconds and milliseconds.
        Returns:
        the tolerance in nanoseconds
        Since:
        2.0
      • createWorker

        @NonNull
        public abstract @NonNull Scheduler.Worker createWorker()
        Retrieves or creates a new Scheduler.Worker that represents sequential execution of actions.

        When work is completed, the Worker instance should be released by calling Disposable.dispose() to avoid potential resource leaks in the underlying task-execution scheme.

        Work on a Scheduler.Worker is guaranteed to be sequential and non-overlapping.

        Returns:
        a Worker representing a serial queue of actions to be executed
      • now

        public long now​(@NonNull
                        @NonNull java.util.concurrent.TimeUnit unit)
        Returns the 'current time' of the Scheduler in the specified time unit.
        Parameters:
        unit - the time unit
        Returns:
        the 'current time'
        Throws:
        java.lang.NullPointerException - if unit is null
        Since:
        2.0
      • start

        public void start()
        Allows the Scheduler instance to start threads and accept tasks on them.

        Implementations should make sure the call is idempotent, thread-safe and should not throw any RuntimeException if it doesn't support this functionality.

        Since:
        2.0
      • shutdown

        public void shutdown()
        Instructs the Scheduler instance to stop threads, stop accepting tasks on any outstanding Scheduler.Worker instances and clean up any associated resources with this Scheduler.

        Implementations should make sure the call is idempotent, thread-safe and should not throw any RuntimeException if it doesn't support this functionality.

        Since:
        2.0
      • scheduleDirect

        @NonNull
        public @NonNull Disposable scheduleDirect​(@NonNull
                                                  @NonNull java.lang.Runnable run)
        Schedules the given task on this Scheduler without any time delay.

        This method is safe to be called from multiple threads but there are no ordering or non-overlapping guarantees between tasks.

        Parameters:
        run - the task to execute
        Returns:
        the Disposable instance that let's one cancel this particular task.
        Throws:
        java.lang.NullPointerException - if run is null
        Since:
        2.0
      • scheduleDirect

        @NonNull
        public @NonNull Disposable scheduleDirect​(@NonNull
                                                  @NonNull java.lang.Runnable run,
                                                  long delay,
                                                  @NonNull
                                                  @NonNull java.util.concurrent.TimeUnit unit)
        Schedules the execution of the given task with the given time delay.

        This method is safe to be called from multiple threads but there are no ordering guarantees between tasks.

        Parameters:
        run - the task to schedule
        delay - the delay amount, non-positive values indicate non-delayed scheduling
        unit - the unit of measure of the delay amount
        Returns:
        the Disposable that let's one cancel this particular delayed task.
        Throws:
        java.lang.NullPointerException - if run or unit is null
        Since:
        2.0
      • schedulePeriodicallyDirect

        @NonNull
        public @NonNull Disposable schedulePeriodicallyDirect​(@NonNull
                                                              @NonNull java.lang.Runnable run,
                                                              long initialDelay,
                                                              long period,
                                                              @NonNull
                                                              @NonNull java.util.concurrent.TimeUnit unit)
        Schedules a periodic execution of the given task with the given initial time delay and repeat period.

        This method is safe to be called from multiple threads but there are no ordering guarantees between tasks.

        The periodic execution is at a fixed rate, that is, the first execution will be after the initialDelay, the second after initialDelay + period, the third after initialDelay + 2 * period, and so on.

        Parameters:
        run - the task to schedule
        initialDelay - the initial delay amount, non-positive values indicate non-delayed scheduling
        period - the period at which the task should be re-executed
        unit - the unit of measure of the delay amount
        Returns:
        the Disposable that let's one cancel this particular delayed task.
        Throws:
        java.lang.NullPointerException - if run or unit is null
        Since:
        2.0
      • when

        @NonNull
        public <S extends Scheduler & Disposable> S when​(@NonNull
                                                         @NonNull Function<Flowable<Flowable<Completable>>,​Completable> combine)
        Allows the use of operators for controlling the timing around when actions scheduled on workers are actually done. This makes it possible to layer additional behavior on this Scheduler. The only parameter is a function that flattens an Flowable of Flowable of Completables into just one Completable. There must be a chain of operators connecting the returned value to the source Flowable otherwise any work scheduled on the returned Scheduler will not be executed.

        When createWorker() is invoked a Flowable of Completables is onNext'd to the combinator to be flattened. If the inner Flowable is not immediately subscribed to an calls to Scheduler.Worker.schedule(java.lang.Runnable) are buffered. Once the Flowable is subscribed to actions are then onNext'd as Completables.

        Finally the actions scheduled on the parent Scheduler when the inner most Completables are subscribed to.

        When the Scheduler.Worker is unsubscribed the Completable emits an onComplete and triggers any behavior in the flattening operator. The Flowable and all Completables give to the flattening function never onError.

        Limit the amount concurrency two at a time without creating a new fix size thread pool:

         Scheduler limitScheduler = Schedulers.computation().when(workers -> {
          // use merge max concurrent to limit the number of concurrent
          // callbacks two at a time
          return Completable.merge(Flowable.merge(workers), 2);
         });
         

        This is a slightly different way to limit the concurrency but it has some interesting benefits and drawbacks to the method above. It works by limited the number of concurrent Scheduler.Workers rather than individual actions. Generally each Flowable uses its own Scheduler.Worker. This means that this will essentially limit the number of concurrent subscribes. The danger comes from using operators like Flowable.zip(org.reactivestreams.Publisher, org.reactivestreams.Publisher, io.reactivex.rxjava3.functions.BiFunction) where subscribing to the first Flowable could deadlock the subscription to the second.

         Scheduler limitScheduler = Schedulers.computation().when(workers -> {
          // use merge max concurrent to limit the number of concurrent
          // Flowables two at a time
          return Completable.merge(Flowable.merge(workers, 2));
         });
         
        Slowing down the rate to no more than 1 a second. This suffers from the same problem as the one above I could find an Flowable operator that limits the rate without dropping the values (aka leaky bucket algorithm).
         Scheduler slowScheduler = Schedulers.computation().when(workers -> {
          // use concatenate to make each worker happen one at a time.
          return Completable.concat(workers.map(actions -> {
              // delay the starting of the next worker by 1 second.
              return Completable.merge(actions.delaySubscription(1, TimeUnit.SECONDS));
          }));
         });
         

        History: 2.0.1 - experimental

        Type Parameters:
        S - a Scheduler and a Subscription
        Parameters:
        combine - the function that takes a two-level nested Flowable sequence of a Completable and returns the Completable that will be subscribed to and should trigger the execution of the scheduled Actions.
        Returns:
        the Scheduler with the customized execution behavior
        Throws:
        java.lang.NullPointerException - if combine is null
        Since:
        2.1