Class UnicastProcessor<T>
- Type Parameters:
T
- the value type received and emitted by this Processor subclass
- All Implemented Interfaces:
FlowableSubscriber<T>
,org.reactivestreams.Processor<T,
,T> org.reactivestreams.Publisher<T>
,org.reactivestreams.Subscriber<T>
FlowableProcessor
variant that queues up events until a single Subscriber
subscribes to it, replays
those events to it until the Subscriber
catches up and then switches to relaying events live to
this single Subscriber
until this UnicastProcessor
terminates or the Subscriber
cancels
its subscription.
This processor does not have a public constructor by design; a new empty instance of this
UnicastProcessor
can be created via the following create
methods that
allow specifying the retention policy for items:
create()
- creates an empty, unboundedUnicastProcessor
that caches all items and the terminal event it receives.create(int)
- creates an empty, unboundedUnicastProcessor
with a hint about how many total items one expects to retain.create(boolean)
- creates an empty, unboundedUnicastProcessor
that optionally delays an error it receives and replays it after the regular items have been emitted.create(int, Runnable)
- creates an empty, unboundedUnicastProcessor
with a hint about how many total items one expects to retain and a callback that will be called exactly once when theUnicastProcessor
gets terminated or the singleSubscriber
cancels.create(int, Runnable, boolean)
- creates an empty, unboundedUnicastProcessor
with a hint about how many total items one expects to retain and a callback that will be called exactly once when theUnicastProcessor
gets terminated or the singleSubscriber
cancels and optionally delays an error it receives and replays it after the regular items have been emitted.
If more than one Subscriber
attempts to subscribe to this Processor, they
will receive an IllegalStateException
if this UnicastProcessor
hasn't terminated yet,
or the Subscribers receive the terminal event (error or completion) if this
Processor has terminated.
The UnicastProcessor
buffers notifications and replays them to the single Subscriber
as requested,
for which it holds upstream items an unbounded internal buffer until they can be emitted.
Since a UnicastProcessor
is a Reactive Streams Processor
,
null
s are not allowed (Rule 2.13) as
parameters to onNext(Object)
and onError(Throwable)
. Such calls will result in a
NullPointerException
being thrown and the processor's state is not changed.
Since a UnicastProcessor
is a Flowable
as well as a FlowableProcessor
, it
honors the downstream backpressure but consumes an upstream source in an unbounded manner (requesting Long.MAX_VALUE
).
When this UnicastProcessor
is terminated via onError(Throwable)
the current or late single Subscriber
may receive the Throwable
before any available items could be emitted. To make sure an onError
event is delivered
to the Subscriber
after the normal items, create a UnicastProcessor
with the create(boolean)
or
create(int, Runnable, boolean)
factory methods.
Even though UnicastProcessor
implements the Subscriber
interface, calling
onSubscribe
is not required (Rule 2.12)
if the processor is used as a standalone source. However, calling onSubscribe
after the UnicastProcessor
reached its terminal state will result in the
given Subscription
being canceled immediately.
Calling onNext(Object)
, onError(Throwable)
and onComplete()
is required to be serialized (called from the same thread or called non-overlappingly from different threads
through external means of serialization). The FlowableProcessor.toSerialized()
method available to all FlowableProcessor
s
provides such serialization and also protects against reentrance (i.e., when a downstream Subscriber
consuming this processor also wants to call onNext(Object)
on this processor recursively).
This UnicastProcessor
supports the standard state-peeking methods hasComplete()
, hasThrowable()
,
getThrowable()
and hasSubscribers()
.
- Backpressure:
UnicastProcessor
honors the downstream backpressure but consumes an upstream source (if any) in an unbounded manner (requestingLong.MAX_VALUE
).- Scheduler:
UnicastProcessor
does not operate by default on a particularScheduler
and the singleSubscriber
gets notified on the thread the respectiveonXXX
methods were invoked.- Error handling:
- When the
onError(Throwable)
is called, theUnicastProcessor
enters into a terminal state and emits the sameThrowable
instance to the current singleSubscriber
. During this emission, if the singleSubscriber
s cancels its respectiveSubscription
s, theThrowable
is delivered to the global error handler viaRxJavaPlugins.onError(Throwable)
. If there were noSubscriber
s subscribed to thisUnicastProcessor
when theonError()
was called, the global error handler is not invoked.
Example usage:
UnicastProcessor<Integer> processor = UnicastProcessor.create();
TestSubscriber<Integer> ts1 = processor.test();
// fresh UnicastProcessors are empty
ts1.assertEmpty();
TestSubscriber<Integer> ts2 = processor.test();
// A UnicastProcessor only allows one Subscriber during its lifetime
ts2.assertFailure(IllegalStateException.class);
processor.onNext(1);
ts1.assertValue(1);
processor.onNext(2);
ts1.assertValues(1, 2);
processor.onComplete();
ts1.assertResult(1, 2);
// ----------------------------------------------------
UnicastProcessor<Integer> processor2 = UnicastProcessor.create();
// a UnicastProcessor caches events until its single Subscriber subscribes
processor2.onNext(1);
processor2.onNext(2);
processor2.onComplete();
TestSubscriber<Integer> ts3 = processor2.test();
// the cached events are emitted in order
ts3.assertResult(1, 2);
- Since:
- 2.0
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescription(package private) final class
-
Field Summary
FieldsModifier and TypeFieldDescription(package private) boolean
(package private) final boolean
(package private) boolean
(package private) final AtomicReference
<org.reactivestreams.Subscriber<? super T>> (package private) boolean
(package private) Throwable
(package private) final AtomicBoolean
(package private) final AtomicReference
<Runnable> (package private) final SpscLinkedArrayQueue
<T> (package private) final AtomicLong
(package private) final BasicIntQueueSubscription
<T> -
Constructor Summary
ConstructorsConstructorDescriptionUnicastProcessor
(int capacityHint, Runnable onTerminate, boolean delayError) Creates an UnicastProcessor with the given capacity hint and callback for when the Processor is terminated normally or its single Subscriber cancels. -
Method Summary
Modifier and TypeMethodDescription(package private) boolean
checkTerminated
(boolean failFast, boolean d, boolean empty, org.reactivestreams.Subscriber<? super @NonNull T> a, SpscLinkedArrayQueue<@NonNull T> q) static <T> @NonNull UnicastProcessor
<T> create()
Creates an UnicastSubject with an internal buffer capacity hint 16.static <T> @NonNull UnicastProcessor
<T> create
(boolean delayError) Creates an UnicastProcessor with default internal buffer capacity hint and delay error flag.static <T> @NonNull UnicastProcessor
<T> create
(int capacityHint) Creates an UnicastProcessor with the given internal buffer capacity hint.static <T> @NonNull UnicastProcessor
<T> Creates an UnicastProcessor with the given internal buffer capacity hint and a callback for the case when the single Subscriber cancels its subscription or the processor is terminated.static <T> @NonNull UnicastProcessor
<T> Creates an UnicastProcessor with the given internal buffer capacity hint, delay error flag and a callback for the case when the single Subscriber cancels its subscription or the processor is terminated.(package private) void
(package private) void
drain()
(package private) void
drainFused
(org.reactivestreams.Subscriber<? super @NonNull T> a) (package private) void
drainRegular
(org.reactivestreams.Subscriber<? super @NonNull T> a) Returns the error that caused the FlowableProcessor to terminate or null if the FlowableProcessor hasn't terminated yet.boolean
Returns true if the FlowableProcessor has reached a terminal state through a complete event.boolean
Returns true if the FlowableProcessor has subscribers.boolean
Returns true if the FlowableProcessor has reached a terminal state through an error event.void
void
void
void
onSubscribe
(org.reactivestreams.Subscription s) Implementors of this method should make sure everything that needs to be visible inSubscriber.onNext(Object)
is established before callingSubscription.request(long)
.protected void
subscribeActual
(org.reactivestreams.Subscriber<? super @NonNull T> s) Operator implementations (both source and intermediate) should implement this method that performs the necessary business logic and handles the incomingSubscriber
s.Methods inherited from class io.reactivex.rxjava3.processors.FlowableProcessor
toSerialized
Methods inherited from class io.reactivex.rxjava3.core.Flowable
all, amb, ambArray, ambWith, any, blockingFirst, blockingFirst, blockingForEach, blockingForEach, blockingIterable, blockingIterable, blockingLast, blockingLast, blockingLatest, blockingMostRecent, blockingNext, blockingSingle, blockingSingle, blockingStream, blockingStream, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, bufferSize, cache, cacheWithInitialCapacity, cast, collect, collect, collectInto, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatestArray, combineLatestArray, combineLatestArrayDelayError, combineLatestArrayDelayError, combineLatestDelayError, combineLatestDelayError, compose, concat, concat, concat, concat, concat, concat, concatArray, concatArrayDelayError, concatArrayEager, concatArrayEager, concatArrayEagerDelayError, concatArrayEagerDelayError, concatDelayError, concatDelayError, concatDelayError, concatEager, concatEager, concatEager, concatEager, concatEagerDelayError, concatEagerDelayError, concatEagerDelayError, concatEagerDelayError, concatMap, concatMap, concatMap, concatMapCompletable, concatMapCompletable, concatMapCompletableDelayError, concatMapCompletableDelayError, concatMapCompletableDelayError, concatMapDelayError, concatMapDelayError, concatMapDelayError, concatMapEager, concatMapEager, concatMapEagerDelayError, concatMapEagerDelayError, concatMapIterable, concatMapIterable, concatMapMaybe, concatMapMaybe, concatMapMaybeDelayError, concatMapMaybeDelayError, concatMapMaybeDelayError, concatMapSingle, concatMapSingle, concatMapSingleDelayError, concatMapSingleDelayError, concatMapSingleDelayError, concatMapStream, concatMapStream, concatWith, concatWith, concatWith, concatWith, contains, count, create, debounce, debounce, debounce, debounce, defaultIfEmpty, defer, delay, delay, delay, delay, delay, delay, delaySubscription, delaySubscription, delaySubscription, dematerialize, distinct, distinct, distinct, distinctUntilChanged, distinctUntilChanged, distinctUntilChanged, doAfterNext, doAfterTerminate, doFinally, doOnCancel, doOnComplete, doOnEach, doOnEach, doOnError, doOnLifecycle, doOnNext, doOnRequest, doOnSubscribe, doOnTerminate, elementAt, elementAt, elementAtOrError, empty, error, error, filter, first, firstElement, firstOrError, firstOrErrorStage, firstStage, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMapCompletable, flatMapCompletable, flatMapIterable, flatMapIterable, flatMapIterable, flatMapIterable, flatMapMaybe, flatMapMaybe, flatMapSingle, flatMapSingle, flatMapStream, flatMapStream, forEach, forEachWhile, forEachWhile, forEachWhile, fromAction, fromArray, fromCallable, fromCompletable, fromCompletionStage, fromFuture, fromFuture, fromIterable, fromMaybe, fromObservable, fromOptional, fromPublisher, fromRunnable, fromSingle, fromStream, fromSupplier, generate, generate, generate, generate, generate, groupBy, groupBy, groupBy, groupBy, groupBy, groupBy, groupJoin, hide, ignoreElements, interval, interval, interval, interval, intervalRange, intervalRange, isEmpty, join, just, just, just, just, just, just, just, just, just, just, last, lastElement, lastOrError, lastOrErrorStage, lastStage, lift, map, mapOptional, materialize, merge, merge, merge, merge, merge, merge, merge, merge, mergeArray, mergeArray, mergeArrayDelayError, mergeArrayDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeWith, mergeWith, mergeWith, mergeWith, never, observeOn, observeOn, observeOn, ofType, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureDrop, onBackpressureDrop, onBackpressureLatest, onBackpressureLatest, onBackpressureReduce, onBackpressureReduce, onErrorComplete, onErrorComplete, onErrorResumeNext, onErrorResumeWith, onErrorReturn, onErrorReturnItem, onTerminateDetach, parallel, parallel, parallel, publish, publish, publish, publish, range, rangeLong, rebatchRequests, reduce, reduce, reduceWith, repeat, repeat, repeatUntil, repeatWhen, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, retry, retry, retry, retry, retry, retryUntil, retryWhen, safeSubscribe, sample, sample, sample, sample, sample, sample, sample, scan, scan, scanWith, sequenceEqual, sequenceEqual, sequenceEqual, sequenceEqual, serialize, share, single, singleElement, singleOrError, singleOrErrorStage, singleStage, skip, skip, skip, skipLast, skipLast, skipLast, skipLast, skipLast, skipLast, skipUntil, skipWhile, sorted, sorted, startWith, startWith, startWith, startWith, startWithArray, startWithItem, startWithIterable, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribeOn, subscribeOn, subscribeWith, switchIfEmpty, switchMap, switchMap, switchMapCompletable, switchMapCompletableDelayError, switchMapDelayError, switchMapDelayError, switchMapMaybe, switchMapMaybeDelayError, switchMapSingle, switchMapSingleDelayError, switchOnNext, switchOnNext, switchOnNextDelayError, switchOnNextDelayError, take, take, take, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeUntil, takeUntil, takeWhile, test, test, test, throttleFirst, throttleFirst, throttleFirst, throttleLast, throttleLast, throttleLast, throttleLatest, throttleLatest, throttleLatest, throttleLatest, throttleLatest, throttleWithTimeout, throttleWithTimeout, throttleWithTimeout, timeInterval, timeInterval, timeInterval, timeInterval, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timer, timer, timestamp, timestamp, timestamp, timestamp, to, toFuture, toList, toList, toList, toMap, toMap, toMap, toMultimap, toMultimap, toMultimap, toMultimap, toObservable, toSortedList, toSortedList, toSortedList, toSortedList, unsafeCreate, unsubscribeOn, using, using, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zipArray, zipWith, zipWith, zipWith, zipWith
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
Methods inherited from interface org.reactivestreams.Publisher
subscribe
-
Field Details
-
queue
-
onTerminate
-
delayError
final boolean delayError -
done
volatile boolean done -
error
Throwable error -
downstream
-
cancelled
volatile boolean cancelled -
once
-
wip
-
requested
-
enableOperatorFusion
boolean enableOperatorFusion
-
-
Constructor Details
-
UnicastProcessor
UnicastProcessor(int capacityHint, Runnable onTerminate, boolean delayError) Creates an UnicastProcessor with the given capacity hint and callback for when the Processor is terminated normally or its single Subscriber cancels.History: 2.0.8 - experimental
- Parameters:
capacityHint
- the capacity hint for the internal, unbounded queueonTerminate
- the callback to run when the Processor is terminated or cancelled, null not alloweddelayError
- deliver pending onNext events before onError- Since:
- 2.2
-
-
Method Details
-
create
Creates an UnicastSubject with an internal buffer capacity hint 16.- Type Parameters:
T
- the value type- Returns:
- an UnicastSubject instance
-
create
Creates an UnicastProcessor with the given internal buffer capacity hint.- Type Parameters:
T
- the value type- Parameters:
capacityHint
- the hint to size the internal unbounded buffer- Returns:
- an UnicastProcessor instance
- Throws:
IllegalArgumentException
- ifcapacityHint
is non-positive
-
create
@CheckReturnValue @NonNull public static <T> @NonNull UnicastProcessor<T> create(boolean delayError) Creates an UnicastProcessor with default internal buffer capacity hint and delay error flag.History: 2.0.8 - experimental
- Type Parameters:
T
- the value type- Parameters:
delayError
- deliver pending onNext events before onError- Returns:
- an UnicastProcessor instance
- Since:
- 2.2
-
create
@CheckReturnValue @NonNull public static <T> @NonNull UnicastProcessor<T> create(int capacityHint, @NonNull @NonNull Runnable onTerminate) Creates an UnicastProcessor with the given internal buffer capacity hint and a callback for the case when the single Subscriber cancels its subscription or the processor is terminated.The callback, if not null, is called exactly once and non-overlapped with any active replay.
- Type Parameters:
T
- the value type- Parameters:
capacityHint
- the hint to size the internal unbounded bufferonTerminate
- the non null callback- Returns:
- an UnicastProcessor instance
- Throws:
NullPointerException
- ifonTerminate
isnull
IllegalArgumentException
- ifcapacityHint
is non-positive
-
create
@CheckReturnValue @NonNull public static <T> @NonNull UnicastProcessor<T> create(int capacityHint, @NonNull @NonNull Runnable onTerminate, boolean delayError) Creates an UnicastProcessor with the given internal buffer capacity hint, delay error flag and a callback for the case when the single Subscriber cancels its subscription or the processor is terminated.The callback, if not null, is called exactly once and non-overlapped with any active replay.
History: 2.0.8 - experimental
- Type Parameters:
T
- the value type- Parameters:
capacityHint
- the hint to size the internal unbounded bufferonTerminate
- the non null callbackdelayError
- deliver pending onNext events before onError- Returns:
- an UnicastProcessor instance
- Throws:
NullPointerException
- ifonTerminate
isnull
IllegalArgumentException
- ifcapacityHint
is non-positive- Since:
- 2.2
-
doTerminate
void doTerminate() -
drainRegular
-
drainFused
-
drain
void drain() -
checkTerminated
boolean checkTerminated(boolean failFast, boolean d, boolean empty, org.reactivestreams.Subscriber<? super @NonNull T> a, SpscLinkedArrayQueue<@NonNull T> q) -
onSubscribe
public void onSubscribe(org.reactivestreams.Subscription s) Description copied from interface:FlowableSubscriber
Implementors of this method should make sure everything that needs to be visible inSubscriber.onNext(Object)
is established before callingSubscription.request(long)
. In practice this means no initialization should happen after therequest()
call and additional behavior is thread safe in respect toonNext
. -
onNext
-
onError
-
onComplete
public void onComplete() -
subscribeActual
Description copied from class:Flowable
Operator implementations (both source and intermediate) should implement this method that performs the necessary business logic and handles the incomingSubscriber
s.There is no need to call any of the plugin hooks on the current
Flowable
instance or theSubscriber
; all hooks and basic safeguards have been applied byFlowable.subscribe(Subscriber)
before this method gets called.- Specified by:
subscribeActual
in classFlowable<T>
- Parameters:
s
- the incomingSubscriber
, nevernull
-
hasSubscribers
Description copied from class:FlowableProcessor
Returns true if the FlowableProcessor has subscribers.The method is thread-safe.
- Specified by:
hasSubscribers
in classFlowableProcessor<T>
- Returns:
- true if the FlowableProcessor has subscribers
-
getThrowable
Description copied from class:FlowableProcessor
Returns the error that caused the FlowableProcessor to terminate or null if the FlowableProcessor hasn't terminated yet.The method is thread-safe.
- Specified by:
getThrowable
in classFlowableProcessor<T>
- Returns:
- the error that caused the FlowableProcessor to terminate or null if the FlowableProcessor hasn't terminated yet
-
hasComplete
Description copied from class:FlowableProcessor
Returns true if the FlowableProcessor has reached a terminal state through a complete event.The method is thread-safe.
- Specified by:
hasComplete
in classFlowableProcessor<T>
- Returns:
- true if the FlowableProcessor has reached a terminal state through a complete event
- See Also:
-
hasThrowable
Description copied from class:FlowableProcessor
Returns true if the FlowableProcessor has reached a terminal state through an error event.The method is thread-safe.
- Specified by:
hasThrowable
in classFlowableProcessor<T>
- Returns:
- true if the FlowableProcessor has reached a terminal state through an error event
- See Also:
-