Class SchedulerWhen
- java.lang.Object
-
- io.reactivex.rxjava3.core.Scheduler
-
- io.reactivex.rxjava3.internal.schedulers.SchedulerWhen
-
- All Implemented Interfaces:
Disposable
public class SchedulerWhen extends Scheduler implements Disposable
Allows the use of operators for controlling the timing around when actions scheduled on workers are actually done. This makes it possible to layer additional behavior on thisScheduler
. The only parameter is a function that flattens anObservable
ofObservable
ofCompletable
s into just oneCompletable
. There must be a chain of operators connecting the returned value to the sourceObservable
otherwise any work scheduled on the returnedScheduler
will not be executed.When
Scheduler.createWorker()
is invoked aObservable
ofCompletable
s is onNext'd to the combinator to be flattened. If the innerObservable
is not immediately subscribed to an calls toScheduler.Worker.schedule(java.lang.Runnable)
are buffered. Once theObservable
is subscribed to actions are then onNext'd asCompletable
s.Finally the actions scheduled on the parent
Scheduler
when the inner mostCompletable
s are subscribed to.When the
Worker
is unsubscribed theCompletable
emits an onComplete and triggers any behavior in the flattening operator. TheObservable
and allCompletable
s give to the flattening function never onError.Limit the amount concurrency two at a time without creating a new fix size thread pool:
Scheduler limitScheduler = Schedulers.computation().when(workers -> { // use merge max concurrent to limit the number of concurrent // callbacks two at a time return Completable.merge(Observable.merge(workers), 2); });
This is a slightly different way to limit the concurrency but it has some interesting benefits and drawbacks to the method above. It works by limited the number of concurrent
Worker
s rather than individual actions. Generally eachObservable
uses its ownWorker
. This means that this will essentially limit the number of concurrent subscribes. The danger comes from using operators likeFlowable.zip(org.reactivestreams.Publisher, org.reactivestreams.Publisher, io.reactivex.rxjava3.functions.BiFunction)
where subscribing to the firstObservable
could deadlock the subscription to the second.Scheduler limitScheduler = Schedulers.computation().when(workers -> { // use merge max concurrent to limit the number of concurrent // Observables two at a time return Completable.merge(Observable.merge(workers, 2)); });
Observable
operator that limits the rate without dropping the values (aka leaky bucket algorithm).Scheduler slowScheduler = Schedulers.computation().when(workers -> { // use concatenate to make each worker happen one at a time. return Completable.concat(workers.map(actions -> { // delay the starting of the next worker by 1 second. return Completable.merge(actions.delaySubscription(1, TimeUnit.SECONDS)); })); });
History 2.0.1 - experimental
- Since:
- 2.1
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description (package private) static class
SchedulerWhen.CreateWorkerFunction
(package private) static class
SchedulerWhen.DelayedAction
(package private) static class
SchedulerWhen.ImmediateAction
(package private) static class
SchedulerWhen.OnCompletedAction
(package private) static class
SchedulerWhen.QueueWorker
(package private) static class
SchedulerWhen.ScheduledAction
(package private) static class
SchedulerWhen.SubscribedDisposable
-
Nested classes/interfaces inherited from class io.reactivex.rxjava3.core.Scheduler
Scheduler.Worker
-
-
Field Summary
Fields Modifier and Type Field Description private Scheduler
actualScheduler
private Disposable
disposable
(package private) static Disposable
DISPOSED
(package private) static Disposable
SUBSCRIBED
private FlowableProcessor<Flowable<Completable>>
workerProcessor
-
Constructor Summary
Constructors Constructor Description SchedulerWhen(Function<Flowable<Flowable<Completable>>,Completable> combine, Scheduler actualScheduler)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description @NonNull Scheduler.Worker
createWorker()
Retrieves or creates a newScheduler.Worker
that represents sequential execution of actions.void
dispose()
Dispose the resource, the operation should be idempotent.boolean
isDisposed()
Returns true if this resource has been disposed.-
Methods inherited from class io.reactivex.rxjava3.core.Scheduler
clockDriftTolerance, now, scheduleDirect, scheduleDirect, schedulePeriodicallyDirect, shutdown, start, when
-
-
-
-
Field Detail
-
actualScheduler
private final Scheduler actualScheduler
-
workerProcessor
private final FlowableProcessor<Flowable<Completable>> workerProcessor
-
disposable
private Disposable disposable
-
SUBSCRIBED
static final Disposable SUBSCRIBED
-
DISPOSED
static final Disposable DISPOSED
-
-
Constructor Detail
-
SchedulerWhen
public SchedulerWhen(Function<Flowable<Flowable<Completable>>,Completable> combine, Scheduler actualScheduler)
-
-
Method Detail
-
dispose
public void dispose()
Description copied from interface:Disposable
Dispose the resource, the operation should be idempotent.- Specified by:
dispose
in interfaceDisposable
-
isDisposed
public boolean isDisposed()
Description copied from interface:Disposable
Returns true if this resource has been disposed.- Specified by:
isDisposed
in interfaceDisposable
- Returns:
- true if this resource has been disposed
-
createWorker
@NonNull public @NonNull Scheduler.Worker createWorker()
Description copied from class:Scheduler
Retrieves or creates a newScheduler.Worker
that represents sequential execution of actions.When work is completed, the
Worker
instance should be released by callingDisposable.dispose()
to avoid potential resource leaks in the underlying task-execution scheme.Work on a
Scheduler.Worker
is guaranteed to be sequential and non-overlapping.- Specified by:
createWorker
in classScheduler
- Returns:
- a Worker representing a serial queue of actions to be executed
-
-