Class UnicastSubject<T>
- java.lang.Object
-
- io.reactivex.rxjava3.core.Observable<T>
-
- io.reactivex.rxjava3.subjects.Subject<T>
-
- io.reactivex.rxjava3.subjects.UnicastSubject<T>
-
- Type Parameters:
T
- the value type received and emitted by this Subject subclass
- All Implemented Interfaces:
ObservableSource<T>
,Observer<T>
public final class UnicastSubject<T> extends Subject<T>
A Subject that queues up events until a singleObserver
subscribes to it, replays those events to it until theObserver
catches up and then switches to relaying events live to this singleObserver
until thisUnicastSubject
terminates or theObserver
disposes.Note that
UnicastSubject
holds an unbounded internal buffer.This subject does not have a public constructor by design; a new empty instance of this
UnicastSubject
can be created via the followingcreate
methods that allow specifying the retention policy for items:create()
- creates an empty, unboundedUnicastSubject
that caches all items and the terminal event it receives.create(int)
- creates an empty, unboundedUnicastSubject
with a hint about how many total items one expects to retain.create(boolean)
- creates an empty, unboundedUnicastSubject
that optionally delays an error it receives and replays it after the regular items have been emitted.create(int, Runnable)
- creates an empty, unboundedUnicastSubject
with a hint about how many total items one expects to retain and a callback that will be called exactly once when theUnicastSubject
gets terminated or the singleObserver
disposes.create(int, Runnable, boolean)
- creates an empty, unboundedUnicastSubject
with a hint about how many total items one expects to retain and a callback that will be called exactly once when theUnicastSubject
gets terminated or the singleObserver
disposes and optionally delays an error it receives and replays it after the regular items have been emitted.
If more than one
Observer
attempts to subscribe to thisUnicastSubject
, they will receive anIllegalStateException
indicating the single-use-only nature of thisUnicastSubject
, even if theUnicastSubject
already terminated with an error.Since a
Subject
is conceptionally derived from theProcessor
type in the Reactive Streams specification,null
s are not allowed (Rule 2.13) as parameters toonNext(Object)
andonError(Throwable)
. Such calls will result in aNullPointerException
being thrown and the subject's state is not changed.Since a
UnicastSubject
is anObservable
, it does not support backpressure.When this
UnicastSubject
is terminated viaonError(Throwable)
the current or late singleObserver
may receive theThrowable
before any available items could be emitted. To make sure an onError event is delivered to theObserver
after the normal items, create aUnicastSubject
with thecreate(boolean)
orcreate(int, Runnable, boolean)
factory methods.Even though
UnicastSubject
implements theObserver
interface, callingonSubscribe
is not required (Rule 2.12) if the subject is used as a standalone source. However, callingonSubscribe
after theUnicastSubject
reached its terminal state will result in the givenDisposable
being disposed immediately.Calling
onNext(Object)
,onError(Throwable)
andonComplete()
is required to be serialized (called from the same thread or called non-overlappingly from different threads through external means of serialization). TheSubject.toSerialized()
method available to allSubject
s provides such serialization and also protects against reentrance (i.e., when a downstreamObserver
consuming this subject also wants to callonNext(Object)
on this subject recursively).This
UnicastSubject
supports the standard state-peeking methodshasComplete()
,hasThrowable()
,getThrowable()
andhasObservers()
.- Scheduler:
UnicastSubject
does not operate by default on a particularScheduler
and the singleObserver
gets notified on the thread the respectiveonXXX
methods were invoked.- Error handling:
- When the
onError(Throwable)
is called, theUnicastSubject
enters into a terminal state and emits the sameThrowable
instance to the current singleObserver
. During this emission, if the singleObserver
s disposes its respectiveDisposable
, theThrowable
is delivered to the global error handler viaRxJavaPlugins.onError(Throwable)
. If there were noObserver
s subscribed to thisUnicastSubject
when theonError()
was called, the global error handler is not invoked.
Example usage:
UnicastSubject<Integer> subject = UnicastSubject.create(); TestObserver<Integer> to1 = subject.test(); // fresh UnicastSubjects are empty to1.assertEmpty(); TestObserver<Integer> to2 = subject.test(); // A UnicastSubject only allows one Observer during its lifetime to2.assertFailure(IllegalStateException.class); subject.onNext(1); to1.assertValue(1); subject.onNext(2); to1.assertValues(1, 2); subject.onComplete(); to1.assertResult(1, 2); // ---------------------------------------------------- UnicastSubject<Integer> subject2 = UnicastSubject.create(); // a UnicastSubject caches events until its single Observer subscribes subject2.onNext(1); subject2.onNext(2); subject2.onComplete(); TestObserver<Integer> to3 = subject2.test(); // the cached events are emitted in order to3.assertResult(1, 2);
- Since:
- 2.0
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description (package private) class
UnicastSubject.UnicastQueueDisposable
-
Field Summary
Fields Modifier and Type Field Description (package private) boolean
delayError
deliver onNext events before error event.(package private) boolean
disposed
Indicates the single observer has cancelled.(package private) boolean
done
Indicates the source has terminated.(package private) java.util.concurrent.atomic.AtomicReference<Observer<? super T>>
downstream
The single Observer.(package private) boolean
enableOperatorFusion
(package private) java.lang.Throwable
error
The terminal error if not null.(package private) java.util.concurrent.atomic.AtomicBoolean
once
Set to 1 atomically for the first and only Subscriber.(package private) java.util.concurrent.atomic.AtomicReference<java.lang.Runnable>
onTerminate
The optional callback when the Subject gets cancelled or terminates.(package private) SpscLinkedArrayQueue<T>
queue
The queue that buffers the source events.(package private) BasicIntQueueDisposable<T>
wip
The wip counter and QueueDisposable surface.
-
Constructor Summary
Constructors Constructor Description UnicastSubject(int capacityHint, java.lang.Runnable onTerminate, boolean delayError)
Creates an UnicastSubject with the given capacity hint, delay error flag and callback for when the Subject is terminated normally or its single Subscriber cancels.
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description static <T> @NonNull UnicastSubject<T>
create()
Creates an UnicastSubject with an internal buffer capacity hint 16.static <T> @NonNull UnicastSubject<T>
create(boolean delayError)
Creates an UnicastSubject with an internal buffer capacity hint 16 and given delay error flag.static <T> @NonNull UnicastSubject<T>
create(int capacityHint)
Creates an UnicastSubject with the given internal buffer capacity hint.static <T> @NonNull UnicastSubject<T>
create(int capacityHint, @NonNull java.lang.Runnable onTerminate)
Creates an UnicastSubject with the given internal buffer capacity hint and a callback for the case when the single Subscriber cancels its subscription or the subject is terminated.static <T> @NonNull UnicastSubject<T>
create(int capacityHint, @NonNull java.lang.Runnable onTerminate, boolean delayError)
Creates an UnicastSubject with the given internal buffer capacity hint, delay error flag and a callback for the case when the single Observer disposes itsDisposable
or the subject is terminated.(package private) void
doTerminate()
(package private) void
drain()
(package private) void
drainFused(Observer<? super T> a)
(package private) void
drainNormal(Observer<? super T> a)
(package private) void
errorOrComplete(Observer<? super T> a)
(package private) boolean
failedFast(SimpleQueue<T> q, Observer<? super T> a)
@Nullable java.lang.Throwable
getThrowable()
Returns the error that caused the Subject to terminate or null if the Subject hasn't terminated yet.boolean
hasComplete()
Returns true if the subject has reached a terminal state through a complete event.boolean
hasObservers()
Returns true if the subject has any Observers.boolean
hasThrowable()
Returns true if the subject has reached a terminal state through an error event.void
onComplete()
Notifies theObserver
that theObservable
has finished sending push-based notifications.void
onError(java.lang.Throwable t)
Notifies theObserver
that theObservable
has experienced an error condition.void
onNext(T t)
Provides theObserver
with a new item to observe.void
onSubscribe(Disposable d)
Provides theObserver
with the means of cancelling (disposing) the connection (channel) with theObservable
in both synchronous (from withinObserver.onNext(Object)
) and asynchronous manner.protected void
subscribeActual(Observer<? super T> observer)
Operator implementations (both source and intermediate) should implement this method that performs the necessary business logic and handles the incomingObserver
s.-
Methods inherited from class io.reactivex.rxjava3.subjects.Subject
toSerialized
-
Methods inherited from class io.reactivex.rxjava3.core.Observable
all, amb, ambArray, ambWith, any, blockingFirst, blockingFirst, blockingForEach, blockingForEach, blockingIterable, blockingIterable, blockingLast, blockingLast, blockingLatest, blockingMostRecent, blockingNext, blockingSingle, blockingSingle, blockingStream, blockingStream, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, bufferSize, cache, cacheWithInitialCapacity, cast, collect, collect, collectInto, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatestArray, combineLatestArray, combineLatestArrayDelayError, combineLatestArrayDelayError, combineLatestDelayError, combineLatestDelayError, compose, concat, concat, concat, concat, concat, concat, concatArray, concatArrayDelayError, concatArrayEager, concatArrayEager, concatArrayEagerDelayError, concatArrayEagerDelayError, concatDelayError, concatDelayError, concatDelayError, concatEager, concatEager, concatEager, concatEager, concatEagerDelayError, concatEagerDelayError, concatEagerDelayError, concatEagerDelayError, concatMap, concatMap, concatMap, concatMapCompletable, concatMapCompletable, concatMapCompletableDelayError, concatMapCompletableDelayError, concatMapCompletableDelayError, concatMapDelayError, concatMapDelayError, concatMapDelayError, concatMapEager, concatMapEager, concatMapEagerDelayError, concatMapEagerDelayError, concatMapIterable, concatMapMaybe, concatMapMaybe, concatMapMaybeDelayError, concatMapMaybeDelayError, concatMapMaybeDelayError, concatMapSingle, concatMapSingle, concatMapSingleDelayError, concatMapSingleDelayError, concatMapSingleDelayError, concatMapStream, concatWith, concatWith, concatWith, concatWith, contains, count, create, debounce, debounce, debounce, debounce, defaultIfEmpty, defer, delay, delay, delay, delay, delay, delay, delaySubscription, delaySubscription, delaySubscription, dematerialize, distinct, distinct, distinct, distinctUntilChanged, distinctUntilChanged, distinctUntilChanged, doAfterNext, doAfterTerminate, doFinally, doOnComplete, doOnDispose, doOnEach, doOnEach, doOnError, doOnLifecycle, doOnNext, doOnSubscribe, doOnTerminate, elementAt, elementAt, elementAtOrError, empty, error, error, filter, first, firstElement, firstOrError, firstOrErrorStage, firstStage, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMapCompletable, flatMapCompletable, flatMapIterable, flatMapIterable, flatMapMaybe, flatMapMaybe, flatMapSingle, flatMapSingle, flatMapStream, forEach, forEachWhile, forEachWhile, forEachWhile, fromAction, fromArray, fromCallable, fromCompletable, fromCompletionStage, fromFuture, fromFuture, fromIterable, fromMaybe, fromOptional, fromPublisher, fromRunnable, fromSingle, fromStream, fromSupplier, generate, generate, generate, generate, generate, groupBy, groupBy, groupBy, groupBy, groupBy, groupJoin, hide, ignoreElements, interval, interval, interval, interval, intervalRange, intervalRange, isEmpty, join, just, just, just, just, just, just, just, just, just, just, last, lastElement, lastOrError, lastOrErrorStage, lastStage, lift, map, mapOptional, materialize, merge, merge, merge, merge, merge, merge, merge, merge, mergeArray, mergeArray, mergeArrayDelayError, mergeArrayDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeWith, mergeWith, mergeWith, mergeWith, never, observeOn, observeOn, observeOn, ofType, onErrorComplete, onErrorComplete, onErrorResumeNext, onErrorResumeWith, onErrorReturn, onErrorReturnItem, onTerminateDetach, publish, publish, range, rangeLong, reduce, reduce, reduceWith, repeat, repeat, repeatUntil, repeatWhen, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, retry, retry, retry, retry, retry, retryUntil, retryWhen, safeSubscribe, sample, sample, sample, sample, sample, sample, sample, scan, scan, scanWith, sequenceEqual, sequenceEqual, sequenceEqual, sequenceEqual, serialize, share, single, singleElement, singleOrError, singleOrErrorStage, singleStage, skip, skip, skip, skipLast, skipLast, skipLast, skipLast, skipLast, skipLast, skipUntil, skipWhile, sorted, sorted, startWith, startWith, startWith, startWith, startWithArray, startWithItem, startWithIterable, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribeOn, subscribeWith, switchIfEmpty, switchMap, switchMap, switchMapCompletable, switchMapCompletableDelayError, switchMapDelayError, switchMapDelayError, switchMapMaybe, switchMapMaybeDelayError, switchMapSingle, switchMapSingleDelayError, switchOnNext, switchOnNext, switchOnNextDelayError, switchOnNextDelayError, take, take, take, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeUntil, takeUntil, takeWhile, test, test, throttleFirst, throttleFirst, throttleFirst, throttleLast, throttleLast, throttleLast, throttleLatest, throttleLatest, throttleLatest, throttleLatest, throttleLatest, throttleWithTimeout, throttleWithTimeout, throttleWithTimeout, timeInterval, timeInterval, timeInterval, timeInterval, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timer, timer, timestamp, timestamp, timestamp, timestamp, to, toFlowable, toFuture, toList, toList, toList, toMap, toMap, toMap, toMultimap, toMultimap, toMultimap, toMultimap, toSortedList, toSortedList, toSortedList, toSortedList, unsafeCreate, unsubscribeOn, using, using, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, wrap, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zipArray, zipWith, zipWith, zipWith, zipWith
-
-
-
-
Field Detail
-
queue
final SpscLinkedArrayQueue<T> queue
The queue that buffers the source events.
-
downstream
final java.util.concurrent.atomic.AtomicReference<Observer<? super T>> downstream
The single Observer.
-
onTerminate
final java.util.concurrent.atomic.AtomicReference<java.lang.Runnable> onTerminate
The optional callback when the Subject gets cancelled or terminates.
-
delayError
final boolean delayError
deliver onNext events before error event.
-
disposed
volatile boolean disposed
Indicates the single observer has cancelled.
-
done
volatile boolean done
Indicates the source has terminated.
-
error
java.lang.Throwable error
The terminal error if not null. Must be set before writing to done and read after done == true.
-
once
final java.util.concurrent.atomic.AtomicBoolean once
Set to 1 atomically for the first and only Subscriber.
-
wip
final BasicIntQueueDisposable<T> wip
The wip counter and QueueDisposable surface.
-
enableOperatorFusion
boolean enableOperatorFusion
-
-
Constructor Detail
-
UnicastSubject
UnicastSubject(int capacityHint, java.lang.Runnable onTerminate, boolean delayError)
Creates an UnicastSubject with the given capacity hint, delay error flag and callback for when the Subject is terminated normally or its single Subscriber cancels.History: 2.0.8 - experimental
- Parameters:
capacityHint
- the capacity hint for the internal, unbounded queueonTerminate
- the callback to run when the Subject is terminated or cancelled, null not alloweddelayError
- deliver pending onNext events before onError- Since:
- 2.2
-
-
Method Detail
-
create
@CheckReturnValue @NonNull public static <T> @NonNull UnicastSubject<T> create()
Creates an UnicastSubject with an internal buffer capacity hint 16.- Type Parameters:
T
- the value type- Returns:
- an UnicastSubject instance
-
create
@CheckReturnValue @NonNull public static <T> @NonNull UnicastSubject<T> create(int capacityHint)
Creates an UnicastSubject with the given internal buffer capacity hint.- Type Parameters:
T
- the value type- Parameters:
capacityHint
- the hint to size the internal unbounded buffer- Returns:
- an UnicastSubject instance
- Throws:
java.lang.IllegalArgumentException
- ifcapacityHint
is non-positive
-
create
@CheckReturnValue @NonNull public static <T> @NonNull UnicastSubject<T> create(int capacityHint, @NonNull @NonNull java.lang.Runnable onTerminate)
Creates an UnicastSubject with the given internal buffer capacity hint and a callback for the case when the single Subscriber cancels its subscription or the subject is terminated.The callback, if not null, is called exactly once and non-overlapped with any active replay.
- Type Parameters:
T
- the value type- Parameters:
capacityHint
- the hint to size the internal unbounded bufferonTerminate
- the callback to run when the Subject is terminated or cancelled, null not allowed- Returns:
- an UnicastSubject instance
- Throws:
java.lang.NullPointerException
- ifonTerminate
isnull
java.lang.IllegalArgumentException
- ifcapacityHint
is non-positive
-
create
@CheckReturnValue @NonNull public static <T> @NonNull UnicastSubject<T> create(int capacityHint, @NonNull @NonNull java.lang.Runnable onTerminate, boolean delayError)
Creates an UnicastSubject with the given internal buffer capacity hint, delay error flag and a callback for the case when the single Observer disposes itsDisposable
or the subject is terminated.The callback, if not null, is called exactly once and non-overlapped with any active replay.
History: 2.0.8 - experimental
- Type Parameters:
T
- the value type- Parameters:
capacityHint
- the hint to size the internal unbounded bufferonTerminate
- the callback to run when the Subject is terminated or cancelled, null not alloweddelayError
- deliver pending onNext events before onError- Returns:
- an UnicastSubject instance
- Throws:
java.lang.NullPointerException
- ifonTerminate
isnull
java.lang.IllegalArgumentException
- ifcapacityHint
is non-positive- Since:
- 2.2
-
create
@CheckReturnValue @NonNull public static <T> @NonNull UnicastSubject<T> create(boolean delayError)
Creates an UnicastSubject with an internal buffer capacity hint 16 and given delay error flag.The callback, if not null, is called exactly once and non-overlapped with any active replay.
History: 2.0.8 - experimental
- Type Parameters:
T
- the value type- Parameters:
delayError
- deliver pending onNext events before onError- Returns:
- an UnicastSubject instance
- Since:
- 2.2
-
subscribeActual
protected void subscribeActual(Observer<? super T> observer)
Description copied from class:Observable
Operator implementations (both source and intermediate) should implement this method that performs the necessary business logic and handles the incomingObserver
s.There is no need to call any of the plugin hooks on the current
Observable
instance or theObserver
; all hooks and basic safeguards have been applied byObservable.subscribe(Observer)
before this method gets called.- Specified by:
subscribeActual
in classObservable<T>
- Parameters:
observer
- the incomingObserver
, nevernull
-
doTerminate
void doTerminate()
-
onSubscribe
public void onSubscribe(Disposable d)
Description copied from interface:Observer
Provides theObserver
with the means of cancelling (disposing) the connection (channel) with theObservable
in both synchronous (from withinObserver.onNext(Object)
) and asynchronous manner.- Parameters:
d
- theDisposable
instance whoseDisposable.dispose()
can be called anytime to cancel the connection
-
onNext
public void onNext(T t)
Description copied from interface:Observer
Provides theObserver
with a new item to observe.The
Observable
may call this method 0 or more times.The
Observable
will not call this method again after it calls eitherObserver.onComplete()
orObserver.onError(java.lang.Throwable)
.- Parameters:
t
- the item emitted by the Observable
-
onError
public void onError(java.lang.Throwable t)
Description copied from interface:Observer
Notifies theObserver
that theObservable
has experienced an error condition.If the
Observable
calls this method, it will not thereafter callObserver.onNext(T)
orObserver.onComplete()
.- Parameters:
t
- the exception encountered by the Observable
-
onComplete
public void onComplete()
Description copied from interface:Observer
Notifies theObserver
that theObservable
has finished sending push-based notifications.The
Observable
will not call this method if it callsObserver.onError(java.lang.Throwable)
.
-
failedFast
boolean failedFast(SimpleQueue<T> q, Observer<? super T> a)
-
drain
void drain()
-
hasObservers
@CheckReturnValue public boolean hasObservers()
Description copied from class:Subject
Returns true if the subject has any Observers.The method is thread-safe.
- Specified by:
hasObservers
in classSubject<T>
- Returns:
- true if the subject has any Observers
-
getThrowable
@Nullable @CheckReturnValue public @Nullable java.lang.Throwable getThrowable()
Description copied from class:Subject
Returns the error that caused the Subject to terminate or null if the Subject hasn't terminated yet.The method is thread-safe.
- Specified by:
getThrowable
in classSubject<T>
- Returns:
- the error that caused the Subject to terminate or null if the Subject hasn't terminated yet
-
hasThrowable
@CheckReturnValue public boolean hasThrowable()
Description copied from class:Subject
Returns true if the subject has reached a terminal state through an error event.The method is thread-safe.
- Specified by:
hasThrowable
in classSubject<T>
- Returns:
- true if the subject has reached a terminal state through an error event
- See Also:
Subject.getThrowable()
,Subject.hasComplete()
-
hasComplete
@CheckReturnValue public boolean hasComplete()
Description copied from class:Subject
Returns true if the subject has reached a terminal state through a complete event.The method is thread-safe.
- Specified by:
hasComplete
in classSubject<T>
- Returns:
- true if the subject has reached a terminal state through a complete event
- See Also:
Subject.hasThrowable()
-
-