Class Schedulers
- java.lang.Object
-
- io.reactivex.rxjava3.schedulers.Schedulers
-
public final class Schedulers extends java.lang.Object
Static factory methods for returning standardScheduler
instances.The initial and runtime values of the various scheduler types can be overridden via the
RxJavaPlugins.setInit(scheduler name)SchedulerHandler()
andRxJavaPlugins.set(scheduler name)SchedulerHandler()
respectively. Note that overriding any initialScheduler
via theRxJavaPlugins
has to happen before theSchedulers
class is accessed.Supported system properties (
System.getProperty()
):rx3.io-keep-alive-time
(long): sets the keep-alive time of theio()
Scheduler
workers, default isIoScheduler.KEEP_ALIVE_TIME_DEFAULT
rx3.io-priority
(int): sets the thread priority of theio()
Scheduler
, default isThread.NORM_PRIORITY
rx3.io-scheduled-release
(boolean):true
sets the worker release mode of theio()
Scheduler
to scheduled, default isfalse
for eager mode.rx3.computation-threads
(int): sets the number of threads in thecomputation()
Scheduler
, default is the number of available CPUsrx3.computation-priority
(int): sets the thread priority of thecomputation()
Scheduler
, default isThread.NORM_PRIORITY
rx3.newthread-priority
(int): sets the thread priority of thenewThread()
Scheduler
, default isThread.NORM_PRIORITY
rx3.single-priority
(int): sets the thread priority of thesingle()
Scheduler
, default isThread.NORM_PRIORITY
rx3.purge-enabled
(boolean): enables purging of allScheduler
's backing thread pools, default istrue
rx3.scheduler.use-nanotime
(boolean):true
instructsScheduler
to useSystem.nanoTime()
forScheduler.now(TimeUnit)
, instead of defaultSystem.currentTimeMillis()
(false
)
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description (package private) static class
Schedulers.ComputationHolder
(package private) static class
Schedulers.ComputationTask
(package private) static class
Schedulers.IoHolder
(package private) static class
Schedulers.IOTask
(package private) static class
Schedulers.NewThreadHolder
(package private) static class
Schedulers.NewThreadTask
(package private) static class
Schedulers.SingleHolder
(package private) static class
Schedulers.SingleTask
-
Field Summary
Fields Modifier and Type Field Description (package private) static @NonNull Scheduler
COMPUTATION
(package private) static @NonNull Scheduler
IO
(package private) static @NonNull Scheduler
NEW_THREAD
(package private) static @NonNull Scheduler
SINGLE
(package private) static @NonNull Scheduler
TRAMPOLINE
-
Constructor Summary
Constructors Modifier Constructor Description private
Schedulers()
Utility class.
-
Method Summary
All Methods Static Methods Concrete Methods Modifier and Type Method Description static @NonNull Scheduler
computation()
Returns a default, sharedScheduler
instance intended for computational work.static @NonNull Scheduler
from(@NonNull java.util.concurrent.Executor executor)
static @NonNull Scheduler
from(@NonNull java.util.concurrent.Executor executor, boolean interruptibleWorker)
static @NonNull Scheduler
from(@NonNull java.util.concurrent.Executor executor, boolean interruptibleWorker, boolean fair)
static @NonNull Scheduler
io()
Returns a default, sharedScheduler
instance intended for IO-bound work.static @NonNull Scheduler
newThread()
Returns a default, sharedScheduler
instance that creates a newThread
for each unit of work.static void
shutdown()
Shuts down the standardScheduler
s.static @NonNull Scheduler
single()
Returns a default, shared, single-thread-backedScheduler
instance for work requiring strongly-sequential execution on the same background thread.static void
start()
Starts the standardScheduler
s.static @NonNull Scheduler
trampoline()
Returns a default, sharedScheduler
instance whoseScheduler.Worker
instances queue work and execute them in a FIFO manner on one of the participating threads.
-
-
-
Method Detail
-
computation
@NonNull public static @NonNull Scheduler computation()
Returns a default, sharedScheduler
instance intended for computational work.This can be used for event-loops, processing callbacks and other computational work.
It is not recommended to perform blocking, IO-bound work on this scheduler. Use
io()
instead.The default instance has a backing pool of single-threaded
ScheduledExecutorService
instances equal to the number of available processors (Runtime.availableProcessors()
) to the Java VM.Unhandled errors will be delivered to the scheduler Thread's
Thread.UncaughtExceptionHandler
.This type of scheduler is less sensitive to leaking
Scheduler.Worker
instances, although not disposing a worker that has timed/delayed tasks not cancelled by other means may leak resources and/or execute those tasks "unexpectedly".If the
RxJavaPlugins.setFailOnNonBlockingScheduler(boolean)
is set totrue
, attempting to execute operators that block while running on this scheduler will throw anIllegalStateException
.You can control certain properties of this standard scheduler via system properties that have to be set before the
Schedulers
class is referenced in your code.Supported system properties (
System.getProperty()
):rx3.computation-threads
(int): sets the number of threads in thecomputation()
Scheduler
, default is the number of available CPUsrx3.computation-priority
(int): sets the thread priority of thecomputation()
Scheduler
, default isThread.NORM_PRIORITY
The default value of this scheduler can be overridden at initialization time via the
RxJavaPlugins.setInitComputationSchedulerHandler(io.reactivex.rxjava3.functions.Function)
plugin method. Note that due to possible initialization cycles, using any of the other scheduler-returning methods will result in aNullPointerException
. Once theSchedulers
class has been initialized, you can override the returnedScheduler
instance via theRxJavaPlugins.setComputationSchedulerHandler(io.reactivex.rxjava3.functions.Function)
method.It is possible to create a fresh instance of this scheduler with a custom
ThreadFactory
, via theRxJavaPlugins.createComputationScheduler(ThreadFactory)
method. Note that such custom instances require a manual call toScheduler.shutdown()
to allow the JVM to exit or the (J2EE) container to unload properly.Operators on the base reactive classes that use this scheduler are marked with the @
SchedulerSupport
(COMPUTATION
) annotation.- Returns:
- a
Scheduler
meant for computation-bound work
-
io
@NonNull public static @NonNull Scheduler io()
Returns a default, sharedScheduler
instance intended for IO-bound work.This can be used for asynchronously performing blocking IO.
The implementation is backed by a pool of single-threaded
ScheduledExecutorService
instances that will try to reuse previously started instances used by the worker returned byScheduler.createWorker()
but otherwise will start a new backingScheduledExecutorService
instance. Note that this scheduler may create an unbounded number of worker threads that can result in system slowdowns orOutOfMemoryError
. Therefore, for casual uses or when implementing an operator, the Worker instances must be disposed viaDisposable.dispose()
.It is not recommended to perform computational work on this scheduler. Use
computation()
instead.Unhandled errors will be delivered to the scheduler Thread's
Thread.UncaughtExceptionHandler
.You can control certain properties of this standard scheduler via system properties that have to be set before the
Schedulers
class is referenced in your code.Supported system properties (
System.getProperty()
):rx3.io-keep-alive-time
(long): sets the keep-alive time of theio()
Scheduler
workers, default isIoScheduler.KEEP_ALIVE_TIME_DEFAULT
rx3.io-priority
(int): sets the thread priority of theio()
Scheduler
, default isThread.NORM_PRIORITY
rx3.io-scheduled-release
(boolean):true
sets the worker release mode of the#io()
Scheduler
to scheduled, default isfalse
for eager mode.
The default value of this scheduler can be overridden at initialization time via the
RxJavaPlugins.setInitIoSchedulerHandler(io.reactivex.rxjava3.functions.Function)
plugin method. Note that due to possible initialization cycles, using any of the other scheduler-returning methods will result in aNullPointerException
. Once theSchedulers
class has been initialized, you can override the returnedScheduler
instance via theRxJavaPlugins.setIoSchedulerHandler(io.reactivex.rxjava3.functions.Function)
method.It is possible to create a fresh instance of this scheduler with a custom
ThreadFactory
, via theRxJavaPlugins.createIoScheduler(ThreadFactory)
method. Note that such custom instances require a manual call toScheduler.shutdown()
to allow the JVM to exit or the (J2EE) container to unload properly.Operators on the base reactive classes that use this scheduler are marked with the @
SchedulerSupport
(IO
) annotation.When the
Scheduler.Worker
is disposed, the underlying worker can be released to the cached worker pool in two modes:- In eager mode (default), the underlying worker is returned immediately to the cached worker pool and can be reused much quicker by operators. The drawback is that if the currently running task doesn't respond to interruption in time or at all, this may lead to delays or deadlock with the reuse use of the underlying worker.
- In scheduled mode (enabled via the system parameter
rx3.io-scheduled-release
set totrue
), the underlying worker is returned to the cached worker pool only after the currently running task has finished. This can help prevent premature reuse of the underlying worker and likely won't lead to delays or deadlock with such reuses. The drawback is that the delay in release may lead to an excess amount of underlying workers being created.
- Returns:
- a
Scheduler
meant for IO-bound work
-
trampoline
@NonNull public static @NonNull Scheduler trampoline()
Returns a default, sharedScheduler
instance whoseScheduler.Worker
instances queue work and execute them in a FIFO manner on one of the participating threads.The default implementation's
Scheduler.scheduleDirect(Runnable)
methods execute the tasks on the current thread without any queueing and the timed overloads use blocking sleep as well.Note that this scheduler can't be reliably used to return the execution of tasks to the "main" thread. Such behavior requires a blocking-queueing scheduler currently not provided by RxJava itself but may be found in external libraries.
This scheduler can't be overridden via an
RxJavaPlugins
method.- Returns:
- a
Scheduler
that queues work on the current thread
-
newThread
@NonNull public static @NonNull Scheduler newThread()
Returns a default, sharedScheduler
instance that creates a newThread
for each unit of work.The default implementation of this scheduler creates a new, single-threaded
ScheduledExecutorService
for each invocation of theScheduler.scheduleDirect(Runnable)
(plus its overloads) andScheduler.createWorker()
methods, thus an unbounded number of worker threads may be created that can result in system slowdowns orOutOfMemoryError
. Therefore, for casual uses or when implementing an operator, the Worker instances must be disposed viaDisposable.dispose()
.Unhandled errors will be delivered to the scheduler Thread's
Thread.UncaughtExceptionHandler
.You can control certain properties of this standard scheduler via system properties that have to be set before the
Schedulers
class is referenced in your code.Supported system properties (
System.getProperty()
):rx3.newthread-priority
(int): sets the thread priority of thenewThread()
Scheduler
, default isThread.NORM_PRIORITY
The default value of this scheduler can be overridden at initialization time via the
RxJavaPlugins.setInitNewThreadSchedulerHandler(io.reactivex.rxjava3.functions.Function)
plugin method. Note that due to possible initialization cycles, using any of the other scheduler-returning methods will result in aNullPointerException
. Once theSchedulers
class has been initialized, you can override the returnedScheduler
instance via theRxJavaPlugins.setNewThreadSchedulerHandler(io.reactivex.rxjava3.functions.Function)
method.It is possible to create a fresh instance of this scheduler with a custom
ThreadFactory
, via theRxJavaPlugins.createNewThreadScheduler(ThreadFactory)
method. Note that such custom instances require a manual call toScheduler.shutdown()
to allow the JVM to exit or the (J2EE) container to unload properly.Operators on the base reactive classes that use this scheduler are marked with the @
SchedulerSupport
(NEW_TRHEAD
) annotation.- Returns:
- a
Scheduler
that creates new threads
-
single
@NonNull public static @NonNull Scheduler single()
Returns a default, shared, single-thread-backedScheduler
instance for work requiring strongly-sequential execution on the same background thread.Uses:
- event loop
- support
Schedulers.from(
Executor
)
andfrom(
ExecutorService
)
with delayed scheduling - support benchmarks that pipeline data from some thread to another thread and avoid core-bashing of computation's round-robin nature
Unhandled errors will be delivered to the scheduler Thread's
Thread.UncaughtExceptionHandler
.This type of scheduler is less sensitive to leaking
Scheduler.Worker
instances, although not disposing a worker that has timed/delayed tasks not cancelled by other means may leak resources and/or execute those tasks "unexpectedly".If the
RxJavaPlugins.setFailOnNonBlockingScheduler(boolean)
is set totrue
, attempting to execute operators that block while running on this scheduler will throw anIllegalStateException
.You can control certain properties of this standard scheduler via system properties that have to be set before the
Schedulers
class is referenced in your code.Supported system properties (
System.getProperty()
):rx3.single-priority
(int): sets the thread priority of thesingle()
Scheduler
, default isThread.NORM_PRIORITY
The default value of this scheduler can be overridden at initialization time via the
RxJavaPlugins.setInitSingleSchedulerHandler(io.reactivex.rxjava3.functions.Function)
plugin method. Note that due to possible initialization cycles, using any of the other scheduler-returning methods will result in aNullPointerException
. Once theSchedulers
class has been initialized, you can override the returnedScheduler
instance via theRxJavaPlugins.setSingleSchedulerHandler(io.reactivex.rxjava3.functions.Function)
method.It is possible to create a fresh instance of this scheduler with a custom
ThreadFactory
, via theRxJavaPlugins.createSingleScheduler(ThreadFactory)
method. Note that such custom instances require a manual call toScheduler.shutdown()
to allow the JVM to exit or the (J2EE) container to unload properly.Operators on the base reactive classes that use this scheduler are marked with the @
SchedulerSupport
(SINGLE
) annotation.- Returns:
- a
Scheduler
that shares a single backing thread. - Since:
- 2.0
-
from
@NonNull public static @NonNull Scheduler from(@NonNull @NonNull java.util.concurrent.Executor executor)
Wraps anExecutor
into a newScheduler
instance and delegatesschedule()
calls to it.If the provided executor doesn't support any of the more specific standard Java executor APIs, tasks scheduled by this scheduler can't be interrupted when they are executing but only prevented from running prior to that. In addition, tasks scheduled with a time delay or periodically will use the
single()
scheduler for the timed waiting before posting the actual task to the given executor.Tasks submitted to the
Scheduler.Worker
of thisScheduler
are also not interruptible. Use thefrom(Executor, boolean)
overload to enable task interruption via this wrapper.If the provided executor supports the standard Java
ExecutorService
API, tasks scheduled by this scheduler can be cancelled/interrupted by callingDisposable.dispose()
. In addition, tasks scheduled with a time delay or periodically will use thesingle()
scheduler for the timed waiting before posting the actual task to the given executor.If the provided executor supports the standard Java
ScheduledExecutorService
API, tasks scheduled by this scheduler can be cancelled/interrupted by callingDisposable.dispose()
. In addition, tasks scheduled with a time delay or periodically will use the provided executor. Note, however, if the providedScheduledExecutorService
instance is not single threaded, tasks scheduled with a time delay close to each other may end up executing in different order than the original schedule() call was issued. This limitation may be lifted in a future patch.The implementation of the Worker of this wrapper
Scheduler
is eager and will execute as many non-delayed tasks as it can, which may result in a longer than expected occupation of a thread of the given backingExecutor
. In other terms, it does not allow per-Runnable
fairness in case the worker runs on a shared underlying thread of theExecutor
. Seefrom(Executor, boolean, boolean)
to create a wrapper that uses the underlyingExecutor
more fairly.Starting, stopping and restarting this scheduler is not supported (no-op) and the provided executor's lifecycle must be managed externally:
ExecutorService exec = Executors.newSingleThreadedExecutor(); try { Scheduler scheduler = Schedulers.from(exec); Flowable.just(1) .subscribeOn(scheduler) .map(v -> v + 1) .observeOn(scheduler) .blockingSubscribe(System.out::println); } finally { exec.shutdown(); }
Note that the provided
Executor
should avoid throwing aRejectedExecutionException
(for example, by shutting it down prematurely or using a bounded-queueExecutorService
) because such circumstances prevent RxJava from progressing flow-related activities correctly. If theExecutor.execute(Runnable)
orExecutorService.submit(Callable)
throws, theRejectedExecutionException
is routed to the global error handler viaRxJavaPlugins.onError(Throwable)
. To avoid shutdown-related problems, it is recommended all flows using the returnedScheduler
to be canceled/disposed before the underlyingExecutor
is shut down. To avoid problems due to theExecutor
having a bounded-queue, it is recommended to rephrase the flow to utilize backpressure as the means to limit outstanding work.This type of scheduler is less sensitive to leaking
Scheduler.Worker
instances, although not disposing a worker that has timed/delayed tasks not cancelled by other means may leak resources and/or execute those tasks "unexpectedly".Note that this method returns a new
Scheduler
instance, even for the sameExecutor
instance.It is possible to wrap an
Executor
into aScheduler
without triggering the initialization of all the standard schedulers by using theRxJavaPlugins.createExecutorScheduler(Executor, boolean, boolean)
method before theSchedulers
class itself is accessed.- Parameters:
executor
- the executor to wrap- Returns:
- the new
Scheduler
wrapping theExecutor
- See Also:
from(Executor, boolean, boolean)
-
from
@NonNull public static @NonNull Scheduler from(@NonNull @NonNull java.util.concurrent.Executor executor, boolean interruptibleWorker)
Wraps anExecutor
into a newScheduler
instance and delegatesschedule()
calls to it.The tasks scheduled by the returned
Scheduler
and itsScheduler.Worker
can be optionally interrupted.If the provided executor doesn't support any of the more specific standard Java executor APIs, tasks scheduled with a time delay or periodically will use the
single()
scheduler for the timed waiting before posting the actual task to the given executor.If the provided executor supports the standard Java
ExecutorService
API, tasks scheduled by this scheduler can be cancelled/interrupted by callingDisposable.dispose()
. In addition, tasks scheduled with a time delay or periodically will use thesingle()
scheduler for the timed waiting before posting the actual task to the given executor.If the provided executor supports the standard Java
ScheduledExecutorService
API, tasks scheduled by this scheduler can be cancelled/interrupted by callingDisposable.dispose()
. In addition, tasks scheduled with a time delay or periodically will use the provided executor. Note, however, if the providedScheduledExecutorService
instance is not single threaded, tasks scheduled with a time delay close to each other may end up executing in different order than the original schedule() call was issued. This limitation may be lifted in a future patch.The implementation of the
Worker
of this wrapperScheduler
is eager and will execute as many non-delayed tasks as it can, which may result in a longer than expected occupation of a thread of the given backingExecutor
. In other terms, it does not allow per-Runnable
fairness in case the worker runs on a shared underlying thread of theExecutor
. Seefrom(Executor, boolean, boolean)
to create a wrapper that uses the underlyingExecutor
more fairly.Starting, stopping and restarting this scheduler is not supported (no-op) and the provided executor's lifecycle must be managed externally:
ExecutorService exec = Executors.newSingleThreadedExecutor(); try { Scheduler scheduler = Schedulers.from(exec, true); Flowable.just(1) .subscribeOn(scheduler) .map(v -> v + 1) .observeOn(scheduler) .blockingSubscribe(System.out::println); } finally { exec.shutdown(); }
Note that the provided
Executor
should avoid throwing aRejectedExecutionException
(for example, by shutting it down prematurely or using a bounded-queueExecutorService
) because such circumstances prevent RxJava from progressing flow-related activities correctly. If theExecutor.execute(Runnable)
orExecutorService.submit(Callable)
throws, theRejectedExecutionException
is routed to the global error handler viaRxJavaPlugins.onError(Throwable)
. To avoid shutdown-related problems, it is recommended all flows using the returnedScheduler
to be canceled/disposed before the underlyingExecutor
is shut down. To avoid problems due to theExecutor
having a bounded-queue, it is recommended to rephrase the flow to utilize backpressure as the means to limit outstanding work.This type of scheduler is less sensitive to leaking
Scheduler.Worker
instances, although not disposing a worker that has timed/delayed tasks not cancelled by other means may leak resources and/or execute those tasks "unexpectedly".Note that this method returns a new
Scheduler
instance, even for the sameExecutor
instance.It is possible to wrap an
Executor
into aScheduler
without triggering the initialization of all the standard schedulers by using theRxJavaPlugins.createExecutorScheduler(Executor, boolean, boolean)
method before theSchedulers
class itself is accessed.History: 2.2.6 - experimental
- Parameters:
executor
- the executor to wrapinterruptibleWorker
- iftrue
, the tasks submitted to theScheduler.Worker
will be interrupted when the task is disposed.- Returns:
- the new
Scheduler
wrapping theExecutor
- Since:
- 3.0.0
- See Also:
from(Executor, boolean, boolean)
-
from
@NonNull public static @NonNull Scheduler from(@NonNull @NonNull java.util.concurrent.Executor executor, boolean interruptibleWorker, boolean fair)
Wraps anExecutor
into a newScheduler
instance and delegatesschedule()
calls to it.The tasks scheduled by the returned
Scheduler
and itsScheduler.Worker
can be optionally interrupted.If the provided executor doesn't support any of the more specific standard Java executor APIs, tasks scheduled with a time delay or periodically will use the
single()
scheduler for the timed waiting before posting the actual task to the given executor.If the provided executor supports the standard Java
ExecutorService
API, tasks scheduled by this scheduler can be cancelled/interrupted by callingDisposable.dispose()
. In addition, tasks scheduled with a time delay or periodically will use thesingle()
scheduler for the timed waiting before posting the actual task to the given executor.If the provided executor supports the standard Java
ScheduledExecutorService
API, tasks scheduled by this scheduler can be cancelled/interrupted by callingDisposable.dispose()
. In addition, tasks scheduled with a time delay or periodically will use the provided executor. Note, however, if the providedScheduledExecutorService
instance is not single threaded, tasks scheduled with a time delay close to each other may end up executing in different order than the original schedule() call was issued. This limitation may be lifted in a future patch.The implementation of the Worker of this wrapper
Scheduler
can operate in both eager (non-fair) and fair modes depending on the specified parameter. In eager mode, it will execute as many non-delayed tasks as it can, which may result in a longer than expected occupation of a thread of the given backingExecutor
. In other terms, it does not allow per-Runnable
fairness in case the worker runs on a shared underlying thread of theExecutor
. In fair mode, non-delayed tasks will still be executed in a FIFO and non-overlapping manner, but after each task, the execution for the next task is rescheduled with the same underlyingExecutor
, allowing interleaving from both the sameScheduler
or other external usages of the underlyingExecutor
.Starting, stopping and restarting this scheduler is not supported (no-op) and the provided executor's lifecycle must be managed externally:
ExecutorService exec = Executors.newSingleThreadedExecutor(); try { Scheduler scheduler = Schedulers.from(exec, true, true); Flowable.just(1) .subscribeOn(scheduler) .map(v -> v + 1) .observeOn(scheduler) .blockingSubscribe(System.out::println); } finally { exec.shutdown(); }
Note that the provided
Executor
should avoid throwing aRejectedExecutionException
(for example, by shutting it down prematurely or using a bounded-queueExecutorService
) because such circumstances prevent RxJava from progressing flow-related activities correctly. If theExecutor.execute(Runnable)
orExecutorService.submit(Callable)
throws, theRejectedExecutionException
is routed to the global error handler viaRxJavaPlugins.onError(Throwable)
. To avoid shutdown-related problems, it is recommended all flows using the returnedScheduler
to be canceled/disposed before the underlyingExecutor
is shut down. To avoid problems due to theExecutor
having a bounded-queue, it is recommended to rephrase the flow to utilize backpressure as the means to limit outstanding work.This type of scheduler is less sensitive to leaking
Scheduler.Worker
instances, although not disposing a worker that has timed/delayed tasks not cancelled by other means may leak resources and/or execute those tasks "unexpectedly".Note that this method returns a new
Scheduler
instance, even for the sameExecutor
instance.It is possible to wrap an
Executor
into aScheduler
without triggering the initialization of all the standard schedulers by using theRxJavaPlugins.createExecutorScheduler(Executor, boolean, boolean)
method before theSchedulers
class itself is accessed.- Parameters:
executor
- the executor to wrapinterruptibleWorker
- iftrue
, the tasks submitted to theScheduler.Worker
will be interrupted when the task is disposed.fair
- iftrue
, tasks submitted to theScheduler
orWorker
will be executed by the underlyingExecutor
one after the other, still in a FIFO and non-overlapping manner, but allows interleaving with other tasks submitted to the underlyingExecutor
. Iffalse
, the underlying FIFO scheme will execute as many tasks as it can before giving up the underlyingExecutor
thread.- Returns:
- the new
Scheduler
wrapping theExecutor
- Since:
- 3.0.0
-
shutdown
public static void shutdown()
Shuts down the standardScheduler
s.The operation is idempotent and thread-safe.
-
start
public static void start()
Starts the standardScheduler
s.The operation is idempotent and thread-safe.
-
-