Class Scheduler
- java.lang.Object
-
- io.reactivex.rxjava3.core.Scheduler
-
- Direct Known Subclasses:
ComputationScheduler
,ExecutorScheduler
,ImmediateThinScheduler
,IoScheduler
,NewThreadScheduler
,SchedulerWhen
,SingleScheduler
,TestScheduler
,TrampolineScheduler
public abstract class Scheduler extends java.lang.Object
AScheduler
is an object that specifies an API for scheduling units of work provided in the form ofRunnable
s to be executed without delay (effectively as soon as possible), after a specified time delay or periodically and represents an abstraction over an asynchronous boundary that ensures these units of work get executed by some underlying task-execution scheme (such as custom Threads, event loop,Executor
or Actor system) with some uniform properties and guarantees regardless of the particular underlying scheme.You can get various standard, RxJava-specific instances of this class via the static methods of the
Schedulers
utility class.The so-called
Scheduler.Worker
s of aScheduler
can be created via thecreateWorker()
method which allow the scheduling of multipleRunnable
tasks in an isolated manner.Runnable
tasks scheduled on aWorker
are guaranteed to be executed sequentially and in a non-overlapping fashion. Non-delayedRunnable
tasks are guaranteed to execute in a First-In-First-Out order but their execution may be interleaved with delayed tasks. In addition, outstanding or running tasks can be cancelled together viaDisposable.dispose()
without affecting any otherWorker
instances of the sameScheduler
.Implementations of the
scheduleDirect(java.lang.Runnable)
andScheduler.Worker.schedule(java.lang.Runnable)
methods are encouraged to call theRxJavaPlugins.onSchedule(Runnable)
method to allow a scheduler hook to manipulate (wrap or replace) the originalRunnable
task before it is submitted to the underlying task-execution scheme.The default implementations of the
scheduleDirect
methods provided by this abstract class delegate to the respectiveschedule
methods in theScheduler.Worker
instance created viacreateWorker()
for each individualRunnable
task submitted. Implementors of this class are encouraged to provide a more efficient direct scheduling implementation to avoid the time and memory overhead of creating suchWorker
s for every task. This delegation is done via special wrapper instances around the originalRunnable
before calling the respectiveWorker.schedule
method. Note that this can lead to multipleRxJavaPlugins.onSchedule
calls and potentially multiple hooks applied. Therefore, the default implementations ofscheduleDirect
(and theScheduler.Worker.schedulePeriodically(Runnable, long, long, TimeUnit)
) wrap the incomingRunnable
into a class that implements theSchedulerRunnableIntrospection
interface which can grant access to the original or hookedRunnable
, thus, a repeatedRxJavaPlugins.onSchedule
can detect the earlier hook and not apply a new one over again.The default implementation of
now(TimeUnit)
andScheduler.Worker.now(TimeUnit)
methods to return currentSystem.currentTimeMillis()
value in the desired time unit, unlessrx3.scheduler.use-nanotime
(boolean) is set. When the property is set totrue
, the method usesSystem.nanoTime()
as its basis instead. CustomScheduler
implementations can override this to provide specialized time accounting (such as virtual time to be advanced programmatically). Note that operators requiring aScheduler
may rely on either of thenow()
calls provided byScheduler
orWorker
respectively, therefore, it is recommended they represent a logically consistent source of the current time.The default implementation of the
Scheduler.Worker.schedulePeriodically(Runnable, long, long, TimeUnit)
method uses theScheduler.Worker.schedule(Runnable, long, TimeUnit)
for scheduling theRunnable
task periodically. The algorithm calculates the next absolute time when the task should run again and schedules this execution based on the relative time between it andScheduler.Worker.now(TimeUnit)
. However, drifts or changes in the system clock could affect this calculation either by scheduling subsequent runs too frequently or too far apart. Therefore, the default implementation uses theclockDriftTolerance()
value (set viarx3.scheduler.drift-tolerance
andrx3.scheduler.drift-tolerance-unit
) to detect a drift inScheduler.Worker.now(TimeUnit)
and re-adjust the absolute/relative time calculation accordingly.The default implementations of
start()
andshutdown()
do nothing and should be overridden if the underlying task-execution scheme supports stopping and restarting itself.If the
Scheduler
is shut down or aWorker
is disposed, theschedule
methods should return theDisposable.disposed()
singleton instance indicating the shut down/disposed state to the caller. Since the shutdown or dispose can happen from any thread, theschedule
implementations should make best effort to cancel tasks immediately after those tasks have been submitted to the underlying task-execution scheme if the shutdown/dispose was detected after this submission.All methods on the
Scheduler
andWorker
classes should be thread safe.
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description (package private) static class
Scheduler.DisposeTask
(package private) static class
Scheduler.PeriodicDirectTask
static class
Scheduler.Worker
Represents an isolated, sequential worker of a parent Scheduler for executingRunnable
tasks on an underlying task-execution scheme (such as custom Threads, event loop,Executor
or Actor system).
-
Field Summary
Fields Modifier and Type Field Description (package private) static long
CLOCK_DRIFT_TOLERANCE_NANOSECONDS
The tolerance for a clock drift in nanoseconds where the periodic scheduler will rebase.(package private) static boolean
IS_DRIFT_USE_NANOTIME
Value representing whether to useSystem.nanoTime()
, or default as clock fornow(TimeUnit)
andScheduler.Worker.now(TimeUnit)
.
-
Constructor Summary
Constructors Constructor Description Scheduler()
-
Method Summary
All Methods Static Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description static long
clockDriftTolerance()
Returns the clock drift tolerance in nanoseconds.(package private) static long
computeClockDrift(long time, java.lang.String timeUnit)
Returns the clock drift tolerance in nanoseconds based on the input selection.(package private) static long
computeNow(java.util.concurrent.TimeUnit unit)
Returns the current clock time depending on state ofIS_DRIFT_USE_NANOTIME
in givenunit
abstract @NonNull Scheduler.Worker
createWorker()
Retrieves or creates a newScheduler.Worker
that represents sequential execution of actions.long
now(@NonNull java.util.concurrent.TimeUnit unit)
Returns the 'current time' of the Scheduler in the specified time unit.@NonNull Disposable
scheduleDirect(@NonNull java.lang.Runnable run)
Schedules the given task on this Scheduler without any time delay.@NonNull Disposable
scheduleDirect(@NonNull java.lang.Runnable run, long delay, @NonNull java.util.concurrent.TimeUnit unit)
Schedules the execution of the given task with the given time delay.@NonNull Disposable
schedulePeriodicallyDirect(@NonNull java.lang.Runnable run, long initialDelay, long period, @NonNull java.util.concurrent.TimeUnit unit)
Schedules a periodic execution of the given task with the given initial time delay and repeat period.void
shutdown()
Instructs the Scheduler instance to stop threads, stop accepting tasks on any outstandingScheduler.Worker
instances and clean up any associated resources with this Scheduler.void
start()
Allows the Scheduler instance to start threads and accept tasks on them.<S extends Scheduler & Disposable>
Swhen(@NonNull Function<Flowable<Flowable<Completable>>,Completable> combine)
Allows the use of operators for controlling the timing around when actions scheduled on workers are actually done.
-
-
-
Field Detail
-
IS_DRIFT_USE_NANOTIME
static boolean IS_DRIFT_USE_NANOTIME
Value representing whether to useSystem.nanoTime()
, or default as clock fornow(TimeUnit)
andScheduler.Worker.now(TimeUnit)
.Associated system parameter:
rx3.scheduler.use-nanotime
, boolean, defaultfalse
-
CLOCK_DRIFT_TOLERANCE_NANOSECONDS
static final long CLOCK_DRIFT_TOLERANCE_NANOSECONDS
The tolerance for a clock drift in nanoseconds where the periodic scheduler will rebase.Associated system parameters:
rx3.scheduler.drift-tolerance
, long, default15
rx3.scheduler.drift-tolerance-unit
, string, defaultminutes
, supportsseconds
andmilliseconds
.
-
-
Method Detail
-
computeNow
static long computeNow(java.util.concurrent.TimeUnit unit)
Returns the current clock time depending on state ofIS_DRIFT_USE_NANOTIME
in givenunit
By default
System.currentTimeMillis()
will be used as the clock. When the property is setSystem.nanoTime()
will be used.- Parameters:
unit
- the time unit- Returns:
- the 'current time' in given unit
- Throws:
java.lang.NullPointerException
- ifunit
isnull
-
computeClockDrift
static long computeClockDrift(long time, java.lang.String timeUnit)
Returns the clock drift tolerance in nanoseconds based on the input selection.- Parameters:
time
- the time valuetimeUnit
- the time unit string- Returns:
- the time amount in nanoseconds
-
clockDriftTolerance
public static long clockDriftTolerance()
Returns the clock drift tolerance in nanoseconds.Related system properties:
rx3.scheduler.drift-tolerance
, long, default15
rx3.scheduler.drift-tolerance-unit
, string, defaultminutes
, supportsseconds
andmilliseconds
.
- Returns:
- the tolerance in nanoseconds
- Since:
- 2.0
-
createWorker
@NonNull public abstract @NonNull Scheduler.Worker createWorker()
Retrieves or creates a newScheduler.Worker
that represents sequential execution of actions.When work is completed, the
Worker
instance should be released by callingDisposable.dispose()
to avoid potential resource leaks in the underlying task-execution scheme.Work on a
Scheduler.Worker
is guaranteed to be sequential and non-overlapping.- Returns:
- a Worker representing a serial queue of actions to be executed
-
now
public long now(@NonNull @NonNull java.util.concurrent.TimeUnit unit)
Returns the 'current time' of the Scheduler in the specified time unit.- Parameters:
unit
- the time unit- Returns:
- the 'current time'
- Throws:
java.lang.NullPointerException
- ifunit
isnull
- Since:
- 2.0
-
start
public void start()
Allows the Scheduler instance to start threads and accept tasks on them.Implementations should make sure the call is idempotent, thread-safe and should not throw any
RuntimeException
if it doesn't support this functionality.- Since:
- 2.0
-
shutdown
public void shutdown()
Instructs the Scheduler instance to stop threads, stop accepting tasks on any outstandingScheduler.Worker
instances and clean up any associated resources with this Scheduler.Implementations should make sure the call is idempotent, thread-safe and should not throw any
RuntimeException
if it doesn't support this functionality.- Since:
- 2.0
-
scheduleDirect
@NonNull public @NonNull Disposable scheduleDirect(@NonNull @NonNull java.lang.Runnable run)
Schedules the given task on this Scheduler without any time delay.This method is safe to be called from multiple threads but there are no ordering or non-overlapping guarantees between tasks.
- Parameters:
run
- the task to execute- Returns:
- the Disposable instance that let's one cancel this particular task.
- Throws:
java.lang.NullPointerException
- ifrun
isnull
- Since:
- 2.0
-
scheduleDirect
@NonNull public @NonNull Disposable scheduleDirect(@NonNull @NonNull java.lang.Runnable run, long delay, @NonNull @NonNull java.util.concurrent.TimeUnit unit)
Schedules the execution of the given task with the given time delay.This method is safe to be called from multiple threads but there are no ordering guarantees between tasks.
- Parameters:
run
- the task to scheduledelay
- the delay amount, non-positive values indicate non-delayed schedulingunit
- the unit of measure of the delay amount- Returns:
- the Disposable that let's one cancel this particular delayed task.
- Throws:
java.lang.NullPointerException
- ifrun
orunit
isnull
- Since:
- 2.0
-
schedulePeriodicallyDirect
@NonNull public @NonNull Disposable schedulePeriodicallyDirect(@NonNull @NonNull java.lang.Runnable run, long initialDelay, long period, @NonNull @NonNull java.util.concurrent.TimeUnit unit)
Schedules a periodic execution of the given task with the given initial time delay and repeat period.This method is safe to be called from multiple threads but there are no ordering guarantees between tasks.
The periodic execution is at a fixed rate, that is, the first execution will be after the
initialDelay
, the second afterinitialDelay + period
, the third afterinitialDelay + 2 * period
, and so on.- Parameters:
run
- the task to scheduleinitialDelay
- the initial delay amount, non-positive values indicate non-delayed schedulingperiod
- the period at which the task should be re-executedunit
- the unit of measure of the delay amount- Returns:
- the Disposable that let's one cancel this particular delayed task.
- Throws:
java.lang.NullPointerException
- ifrun
orunit
isnull
- Since:
- 2.0
-
when
@NonNull public <S extends Scheduler & Disposable> S when(@NonNull @NonNull Function<Flowable<Flowable<Completable>>,Completable> combine)
Allows the use of operators for controlling the timing around when actions scheduled on workers are actually done. This makes it possible to layer additional behavior on thisScheduler
. The only parameter is a function that flattens anFlowable
ofFlowable
ofCompletable
s into just oneCompletable
. There must be a chain of operators connecting the returned value to the sourceFlowable
otherwise any work scheduled on the returnedScheduler
will not be executed.When
createWorker()
is invoked aFlowable
ofCompletable
s is onNext'd to the combinator to be flattened. If the innerFlowable
is not immediately subscribed to an calls toScheduler.Worker.schedule(java.lang.Runnable)
are buffered. Once theFlowable
is subscribed to actions are then onNext'd asCompletable
s.Finally the actions scheduled on the parent
Scheduler
when the inner mostCompletable
s are subscribed to.When the
Scheduler.Worker
is unsubscribed theCompletable
emits an onComplete and triggers any behavior in the flattening operator. TheFlowable
and allCompletable
s give to the flattening function never onError.Limit the amount concurrency two at a time without creating a new fix size thread pool:
Scheduler limitScheduler = Schedulers.computation().when(workers -> { // use merge max concurrent to limit the number of concurrent // callbacks two at a time return Completable.merge(Flowable.merge(workers), 2); });
This is a slightly different way to limit the concurrency but it has some interesting benefits and drawbacks to the method above. It works by limited the number of concurrent
Scheduler.Worker
s rather than individual actions. Generally eachFlowable
uses its ownScheduler.Worker
. This means that this will essentially limit the number of concurrent subscribes. The danger comes from using operators likeFlowable.zip(org.reactivestreams.Publisher, org.reactivestreams.Publisher, io.reactivex.rxjava3.functions.BiFunction)
where subscribing to the firstFlowable
could deadlock the subscription to the second.Scheduler limitScheduler = Schedulers.computation().when(workers -> { // use merge max concurrent to limit the number of concurrent // Flowables two at a time return Completable.merge(Flowable.merge(workers, 2)); });
Slowing down the rate to no more than 1 a second. This suffers from the same problem as the one above I could find anFlowable
operator that limits the rate without dropping the values (aka leaky bucket algorithm).Scheduler slowScheduler = Schedulers.computation().when(workers -> { // use concatenate to make each worker happen one at a time. return Completable.concat(workers.map(actions -> { // delay the starting of the next worker by 1 second. return Completable.merge(actions.delaySubscription(1, TimeUnit.SECONDS)); })); });
History: 2.0.1 - experimental
- Type Parameters:
S
- a Scheduler and a Subscription- Parameters:
combine
- the function that takes a two-level nested Flowable sequence of a Completable and returns the Completable that will be subscribed to and should trigger the execution of the scheduled Actions.- Returns:
- the Scheduler with the customized execution behavior
- Throws:
java.lang.NullPointerException
- ifcombine
isnull
- Since:
- 2.1
-
-