Class UnicastProcessor<T>
- java.lang.Object
-
- io.reactivex.rxjava3.core.Flowable<T>
-
- io.reactivex.rxjava3.processors.FlowableProcessor<T>
-
- io.reactivex.rxjava3.processors.UnicastProcessor<T>
-
- Type Parameters:
T
- the value type received and emitted by this Processor subclass
- All Implemented Interfaces:
FlowableSubscriber<T>
,org.reactivestreams.Processor<T,T>
,org.reactivestreams.Publisher<T>
,org.reactivestreams.Subscriber<T>
public final class UnicastProcessor<@NonNull T> extends FlowableProcessor<T>
AFlowableProcessor
variant that queues up events until a singleSubscriber
subscribes to it, replays those events to it until theSubscriber
catches up and then switches to relaying events live to this singleSubscriber
until thisUnicastProcessor
terminates or theSubscriber
cancels its subscription.This processor does not have a public constructor by design; a new empty instance of this
UnicastProcessor
can be created via the followingcreate
methods that allow specifying the retention policy for items:create()
- creates an empty, unboundedUnicastProcessor
that caches all items and the terminal event it receives.create(int)
- creates an empty, unboundedUnicastProcessor
with a hint about how many total items one expects to retain.create(boolean)
- creates an empty, unboundedUnicastProcessor
that optionally delays an error it receives and replays it after the regular items have been emitted.create(int, Runnable)
- creates an empty, unboundedUnicastProcessor
with a hint about how many total items one expects to retain and a callback that will be called exactly once when theUnicastProcessor
gets terminated or the singleSubscriber
cancels.create(int, Runnable, boolean)
- creates an empty, unboundedUnicastProcessor
with a hint about how many total items one expects to retain and a callback that will be called exactly once when theUnicastProcessor
gets terminated or the singleSubscriber
cancels and optionally delays an error it receives and replays it after the regular items have been emitted.
If more than one
Subscriber
attempts to subscribe to this Processor, they will receive anIllegalStateException
if thisUnicastProcessor
hasn't terminated yet, or the Subscribers receive the terminal event (error or completion) if this Processor has terminated.The
UnicastProcessor
buffers notifications and replays them to the singleSubscriber
as requested, for which it holds upstream items an unbounded internal buffer until they can be emitted.Since a
UnicastProcessor
is a Reactive StreamsProcessor
,null
s are not allowed (Rule 2.13) as parameters toonNext(Object)
andonError(Throwable)
. Such calls will result in aNullPointerException
being thrown and the processor's state is not changed.Since a
UnicastProcessor
is aFlowable
as well as aFlowableProcessor
, it honors the downstream backpressure but consumes an upstream source in an unbounded manner (requestingLong.MAX_VALUE
).When this
UnicastProcessor
is terminated viaonError(Throwable)
the current or late singleSubscriber
may receive theThrowable
before any available items could be emitted. To make sure anonError
event is delivered to theSubscriber
after the normal items, create aUnicastProcessor
with thecreate(boolean)
orcreate(int, Runnable, boolean)
factory methods.Even though
UnicastProcessor
implements theSubscriber
interface, callingonSubscribe
is not required (Rule 2.12) if the processor is used as a standalone source. However, callingonSubscribe
after theUnicastProcessor
reached its terminal state will result in the givenSubscription
being canceled immediately.Calling
onNext(Object)
,onError(Throwable)
andonComplete()
is required to be serialized (called from the same thread or called non-overlappingly from different threads through external means of serialization). TheFlowableProcessor.toSerialized()
method available to allFlowableProcessor
s provides such serialization and also protects against reentrance (i.e., when a downstreamSubscriber
consuming this processor also wants to callonNext(Object)
on this processor recursively).This
UnicastProcessor
supports the standard state-peeking methodshasComplete()
,hasThrowable()
,getThrowable()
andhasSubscribers()
.- Backpressure:
UnicastProcessor
honors the downstream backpressure but consumes an upstream source (if any) in an unbounded manner (requestingLong.MAX_VALUE
).- Scheduler:
UnicastProcessor
does not operate by default on a particularScheduler
and the singleSubscriber
gets notified on the thread the respectiveonXXX
methods were invoked.- Error handling:
- When the
onError(Throwable)
is called, theUnicastProcessor
enters into a terminal state and emits the sameThrowable
instance to the current singleSubscriber
. During this emission, if the singleSubscriber
s cancels its respectiveSubscription
s, theThrowable
is delivered to the global error handler viaRxJavaPlugins.onError(Throwable)
. If there were noSubscriber
s subscribed to thisUnicastProcessor
when theonError()
was called, the global error handler is not invoked.
Example usage:
UnicastProcessor<Integer> processor = UnicastProcessor.create(); TestSubscriber<Integer> ts1 = processor.test(); // fresh UnicastProcessors are empty ts1.assertEmpty(); TestSubscriber<Integer> ts2 = processor.test(); // A UnicastProcessor only allows one Subscriber during its lifetime ts2.assertFailure(IllegalStateException.class); processor.onNext(1); ts1.assertValue(1); processor.onNext(2); ts1.assertValues(1, 2); processor.onComplete(); ts1.assertResult(1, 2); // ---------------------------------------------------- UnicastProcessor<Integer> processor2 = UnicastProcessor.create(); // a UnicastProcessor caches events until its single Subscriber subscribes processor2.onNext(1); processor2.onNext(2); processor2.onComplete(); TestSubscriber<Integer> ts3 = processor2.test(); // the cached events are emitted in order ts3.assertResult(1, 2);
- Since:
- 2.0
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description (package private) class
UnicastProcessor.UnicastQueueSubscription
-
Field Summary
Fields Modifier and Type Field Description (package private) boolean
cancelled
(package private) boolean
delayError
(package private) boolean
done
(package private) java.util.concurrent.atomic.AtomicReference<org.reactivestreams.Subscriber<? super T>>
downstream
(package private) boolean
enableOperatorFusion
(package private) java.lang.Throwable
error
(package private) java.util.concurrent.atomic.AtomicBoolean
once
(package private) java.util.concurrent.atomic.AtomicReference<java.lang.Runnable>
onTerminate
(package private) SpscLinkedArrayQueue<T>
queue
(package private) java.util.concurrent.atomic.AtomicLong
requested
(package private) BasicIntQueueSubscription<T>
wip
-
Constructor Summary
Constructors Constructor Description UnicastProcessor(int capacityHint, java.lang.Runnable onTerminate, boolean delayError)
Creates an UnicastProcessor with the given capacity hint and callback for when the Processor is terminated normally or its single Subscriber cancels.
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description (package private) boolean
checkTerminated(boolean failFast, boolean d, boolean empty, org.reactivestreams.Subscriber<? super @NonNull T> a, SpscLinkedArrayQueue<@NonNull T> q)
static <T> @NonNull UnicastProcessor<T>
create()
Creates an UnicastSubject with an internal buffer capacity hint 16.static <T> @NonNull UnicastProcessor<T>
create(boolean delayError)
Creates an UnicastProcessor with default internal buffer capacity hint and delay error flag.static <T> @NonNull UnicastProcessor<T>
create(int capacityHint)
Creates an UnicastProcessor with the given internal buffer capacity hint.static <T> @NonNull UnicastProcessor<T>
create(int capacityHint, @NonNull java.lang.Runnable onTerminate)
Creates an UnicastProcessor with the given internal buffer capacity hint and a callback for the case when the single Subscriber cancels its subscription or the processor is terminated.static <T> @NonNull UnicastProcessor<T>
create(int capacityHint, @NonNull java.lang.Runnable onTerminate, boolean delayError)
Creates an UnicastProcessor with the given internal buffer capacity hint, delay error flag and a callback for the case when the single Subscriber cancels its subscription or the processor is terminated.(package private) void
doTerminate()
(package private) void
drain()
(package private) void
drainFused(org.reactivestreams.Subscriber<? super @NonNull T> a)
(package private) void
drainRegular(org.reactivestreams.Subscriber<? super @NonNull T> a)
@Nullable java.lang.Throwable
getThrowable()
Returns the error that caused the FlowableProcessor to terminate or null if the FlowableProcessor hasn't terminated yet.boolean
hasComplete()
Returns true if the FlowableProcessor has reached a terminal state through a complete event.boolean
hasSubscribers()
Returns true if the FlowableProcessor has subscribers.boolean
hasThrowable()
Returns true if the FlowableProcessor has reached a terminal state through an error event.void
onComplete()
void
onError(java.lang.Throwable t)
void
onNext(@NonNull T t)
void
onSubscribe(org.reactivestreams.Subscription s)
Implementors of this method should make sure everything that needs to be visible inSubscriber.onNext(Object)
is established before callingSubscription.request(long)
.protected void
subscribeActual(org.reactivestreams.Subscriber<? super @NonNull T> s)
Operator implementations (both source and intermediate) should implement this method that performs the necessary business logic and handles the incomingSubscriber
s.-
Methods inherited from class io.reactivex.rxjava3.processors.FlowableProcessor
toSerialized
-
Methods inherited from class io.reactivex.rxjava3.core.Flowable
all, amb, ambArray, ambWith, any, blockingFirst, blockingFirst, blockingForEach, blockingForEach, blockingIterable, blockingIterable, blockingLast, blockingLast, blockingLatest, blockingMostRecent, blockingNext, blockingSingle, blockingSingle, blockingStream, blockingStream, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, bufferSize, cache, cacheWithInitialCapacity, cast, collect, collect, collectInto, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatestArray, combineLatestArray, combineLatestArrayDelayError, combineLatestArrayDelayError, combineLatestDelayError, combineLatestDelayError, compose, concat, concat, concat, concat, concat, concat, concatArray, concatArrayDelayError, concatArrayEager, concatArrayEager, concatArrayEagerDelayError, concatArrayEagerDelayError, concatDelayError, concatDelayError, concatDelayError, concatEager, concatEager, concatEager, concatEager, concatEagerDelayError, concatEagerDelayError, concatEagerDelayError, concatEagerDelayError, concatMap, concatMap, concatMap, concatMapCompletable, concatMapCompletable, concatMapCompletableDelayError, concatMapCompletableDelayError, concatMapCompletableDelayError, concatMapDelayError, concatMapDelayError, concatMapDelayError, concatMapEager, concatMapEager, concatMapEagerDelayError, concatMapEagerDelayError, concatMapIterable, concatMapIterable, concatMapMaybe, concatMapMaybe, concatMapMaybeDelayError, concatMapMaybeDelayError, concatMapMaybeDelayError, concatMapSingle, concatMapSingle, concatMapSingleDelayError, concatMapSingleDelayError, concatMapSingleDelayError, concatMapStream, concatMapStream, concatWith, concatWith, concatWith, concatWith, contains, count, create, debounce, debounce, debounce, debounce, defaultIfEmpty, defer, delay, delay, delay, delay, delay, delay, delaySubscription, delaySubscription, delaySubscription, dematerialize, distinct, distinct, distinct, distinctUntilChanged, distinctUntilChanged, distinctUntilChanged, doAfterNext, doAfterTerminate, doFinally, doOnCancel, doOnComplete, doOnEach, doOnEach, doOnError, doOnLifecycle, doOnNext, doOnRequest, doOnSubscribe, doOnTerminate, elementAt, elementAt, elementAtOrError, empty, error, error, filter, first, firstElement, firstOrError, firstOrErrorStage, firstStage, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMapCompletable, flatMapCompletable, flatMapIterable, flatMapIterable, flatMapIterable, flatMapIterable, flatMapMaybe, flatMapMaybe, flatMapSingle, flatMapSingle, flatMapStream, flatMapStream, forEach, forEachWhile, forEachWhile, forEachWhile, fromAction, fromArray, fromCallable, fromCompletable, fromCompletionStage, fromFuture, fromFuture, fromIterable, fromMaybe, fromObservable, fromOptional, fromPublisher, fromRunnable, fromSingle, fromStream, fromSupplier, generate, generate, generate, generate, generate, groupBy, groupBy, groupBy, groupBy, groupBy, groupBy, groupJoin, hide, ignoreElements, interval, interval, interval, interval, intervalRange, intervalRange, isEmpty, join, just, just, just, just, just, just, just, just, just, just, last, lastElement, lastOrError, lastOrErrorStage, lastStage, lift, map, mapOptional, materialize, merge, merge, merge, merge, merge, merge, merge, merge, mergeArray, mergeArray, mergeArrayDelayError, mergeArrayDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeWith, mergeWith, mergeWith, mergeWith, never, observeOn, observeOn, observeOn, ofType, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureDrop, onBackpressureDrop, onBackpressureLatest, onBackpressureLatest, onBackpressureReduce, onBackpressureReduce, onErrorComplete, onErrorComplete, onErrorResumeNext, onErrorResumeWith, onErrorReturn, onErrorReturnItem, onTerminateDetach, parallel, parallel, parallel, publish, publish, publish, publish, range, rangeLong, rebatchRequests, reduce, reduce, reduceWith, repeat, repeat, repeatUntil, repeatWhen, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, retry, retry, retry, retry, retry, retryUntil, retryWhen, safeSubscribe, sample, sample, sample, sample, sample, sample, sample, scan, scan, scanWith, sequenceEqual, sequenceEqual, sequenceEqual, sequenceEqual, serialize, share, single, singleElement, singleOrError, singleOrErrorStage, singleStage, skip, skip, skip, skipLast, skipLast, skipLast, skipLast, skipLast, skipLast, skipUntil, skipWhile, sorted, sorted, startWith, startWith, startWith, startWith, startWithArray, startWithItem, startWithIterable, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribeOn, subscribeOn, subscribeWith, switchIfEmpty, switchMap, switchMap, switchMapCompletable, switchMapCompletableDelayError, switchMapDelayError, switchMapDelayError, switchMapMaybe, switchMapMaybeDelayError, switchMapSingle, switchMapSingleDelayError, switchOnNext, switchOnNext, switchOnNextDelayError, switchOnNextDelayError, take, take, take, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeUntil, takeUntil, takeWhile, test, test, test, throttleFirst, throttleFirst, throttleFirst, throttleLast, throttleLast, throttleLast, throttleLatest, throttleLatest, throttleLatest, throttleLatest, throttleLatest, throttleWithTimeout, throttleWithTimeout, throttleWithTimeout, timeInterval, timeInterval, timeInterval, timeInterval, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timer, timer, timestamp, timestamp, timestamp, timestamp, to, toFuture, toList, toList, toList, toMap, toMap, toMap, toMultimap, toMultimap, toMultimap, toMultimap, toObservable, toSortedList, toSortedList, toSortedList, toSortedList, unsafeCreate, unsubscribeOn, using, using, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zipArray, zipWith, zipWith, zipWith, zipWith
-
-
-
-
Field Detail
-
queue
final SpscLinkedArrayQueue<T> queue
-
onTerminate
final java.util.concurrent.atomic.AtomicReference<java.lang.Runnable> onTerminate
-
delayError
final boolean delayError
-
done
volatile boolean done
-
error
java.lang.Throwable error
-
downstream
final java.util.concurrent.atomic.AtomicReference<org.reactivestreams.Subscriber<? super T>> downstream
-
cancelled
volatile boolean cancelled
-
once
final java.util.concurrent.atomic.AtomicBoolean once
-
wip
final BasicIntQueueSubscription<T> wip
-
requested
final java.util.concurrent.atomic.AtomicLong requested
-
enableOperatorFusion
boolean enableOperatorFusion
-
-
Constructor Detail
-
UnicastProcessor
UnicastProcessor(int capacityHint, java.lang.Runnable onTerminate, boolean delayError)
Creates an UnicastProcessor with the given capacity hint and callback for when the Processor is terminated normally or its single Subscriber cancels.History: 2.0.8 - experimental
- Parameters:
capacityHint
- the capacity hint for the internal, unbounded queueonTerminate
- the callback to run when the Processor is terminated or cancelled, null not alloweddelayError
- deliver pending onNext events before onError- Since:
- 2.2
-
-
Method Detail
-
create
@CheckReturnValue @NonNull public static <T> @NonNull UnicastProcessor<T> create()
Creates an UnicastSubject with an internal buffer capacity hint 16.- Type Parameters:
T
- the value type- Returns:
- an UnicastSubject instance
-
create
@CheckReturnValue @NonNull public static <T> @NonNull UnicastProcessor<T> create(int capacityHint)
Creates an UnicastProcessor with the given internal buffer capacity hint.- Type Parameters:
T
- the value type- Parameters:
capacityHint
- the hint to size the internal unbounded buffer- Returns:
- an UnicastProcessor instance
- Throws:
java.lang.IllegalArgumentException
- ifcapacityHint
is non-positive
-
create
@CheckReturnValue @NonNull public static <T> @NonNull UnicastProcessor<T> create(boolean delayError)
Creates an UnicastProcessor with default internal buffer capacity hint and delay error flag.History: 2.0.8 - experimental
- Type Parameters:
T
- the value type- Parameters:
delayError
- deliver pending onNext events before onError- Returns:
- an UnicastProcessor instance
- Since:
- 2.2
-
create
@CheckReturnValue @NonNull public static <T> @NonNull UnicastProcessor<T> create(int capacityHint, @NonNull @NonNull java.lang.Runnable onTerminate)
Creates an UnicastProcessor with the given internal buffer capacity hint and a callback for the case when the single Subscriber cancels its subscription or the processor is terminated.The callback, if not null, is called exactly once and non-overlapped with any active replay.
- Type Parameters:
T
- the value type- Parameters:
capacityHint
- the hint to size the internal unbounded bufferonTerminate
- the non null callback- Returns:
- an UnicastProcessor instance
- Throws:
java.lang.NullPointerException
- ifonTerminate
isnull
java.lang.IllegalArgumentException
- ifcapacityHint
is non-positive
-
create
@CheckReturnValue @NonNull public static <T> @NonNull UnicastProcessor<T> create(int capacityHint, @NonNull @NonNull java.lang.Runnable onTerminate, boolean delayError)
Creates an UnicastProcessor with the given internal buffer capacity hint, delay error flag and a callback for the case when the single Subscriber cancels its subscription or the processor is terminated.The callback, if not null, is called exactly once and non-overlapped with any active replay.
History: 2.0.8 - experimental
- Type Parameters:
T
- the value type- Parameters:
capacityHint
- the hint to size the internal unbounded bufferonTerminate
- the non null callbackdelayError
- deliver pending onNext events before onError- Returns:
- an UnicastProcessor instance
- Throws:
java.lang.NullPointerException
- ifonTerminate
isnull
java.lang.IllegalArgumentException
- ifcapacityHint
is non-positive- Since:
- 2.2
-
doTerminate
void doTerminate()
-
drain
void drain()
-
checkTerminated
boolean checkTerminated(boolean failFast, boolean d, boolean empty, org.reactivestreams.Subscriber<? super @NonNull T> a, SpscLinkedArrayQueue<@NonNull T> q)
-
onSubscribe
public void onSubscribe(org.reactivestreams.Subscription s)
Description copied from interface:FlowableSubscriber
Implementors of this method should make sure everything that needs to be visible inSubscriber.onNext(Object)
is established before callingSubscription.request(long)
. In practice this means no initialization should happen after therequest()
call and additional behavior is thread safe in respect toonNext
.
-
onError
public void onError(java.lang.Throwable t)
-
onComplete
public void onComplete()
-
subscribeActual
protected void subscribeActual(org.reactivestreams.Subscriber<? super @NonNull T> s)
Description copied from class:Flowable
Operator implementations (both source and intermediate) should implement this method that performs the necessary business logic and handles the incomingSubscriber
s.There is no need to call any of the plugin hooks on the current
Flowable
instance or theSubscriber
; all hooks and basic safeguards have been applied byFlowable.subscribe(Subscriber)
before this method gets called.- Specified by:
subscribeActual
in classFlowable<T>
- Parameters:
s
- the incomingSubscriber
, nevernull
-
hasSubscribers
@CheckReturnValue public boolean hasSubscribers()
Description copied from class:FlowableProcessor
Returns true if the FlowableProcessor has subscribers.The method is thread-safe.
- Specified by:
hasSubscribers
in classFlowableProcessor<T>
- Returns:
- true if the FlowableProcessor has subscribers
-
getThrowable
@Nullable @CheckReturnValue public @Nullable java.lang.Throwable getThrowable()
Description copied from class:FlowableProcessor
Returns the error that caused the FlowableProcessor to terminate or null if the FlowableProcessor hasn't terminated yet.The method is thread-safe.
- Specified by:
getThrowable
in classFlowableProcessor<T>
- Returns:
- the error that caused the FlowableProcessor to terminate or null if the FlowableProcessor hasn't terminated yet
-
hasComplete
@CheckReturnValue public boolean hasComplete()
Description copied from class:FlowableProcessor
Returns true if the FlowableProcessor has reached a terminal state through a complete event.The method is thread-safe.
- Specified by:
hasComplete
in classFlowableProcessor<T>
- Returns:
- true if the FlowableProcessor has reached a terminal state through a complete event
- See Also:
FlowableProcessor.hasThrowable()
-
hasThrowable
@CheckReturnValue public boolean hasThrowable()
Description copied from class:FlowableProcessor
Returns true if the FlowableProcessor has reached a terminal state through an error event.The method is thread-safe.
- Specified by:
hasThrowable
in classFlowableProcessor<T>
- Returns:
- true if the FlowableProcessor has reached a terminal state through an error event
- See Also:
FlowableProcessor.getThrowable()
,FlowableProcessor.hasComplete()
-
-