Class PublishSubject<T>
- java.lang.Object
-
- io.reactivex.rxjava3.core.Observable<T>
-
- io.reactivex.rxjava3.subjects.Subject<T>
-
- io.reactivex.rxjava3.subjects.PublishSubject<T>
-
- Type Parameters:
T
- the type of items observed and emitted by the Subject
- All Implemented Interfaces:
ObservableSource<T>
,Observer<T>
public final class PublishSubject<T> extends Subject<T>
A Subject that emits (multicasts) items to currently subscribedObserver
s and terminal events to current or lateObserver
s.This subject does not have a public constructor by design; a new empty instance of this
PublishSubject
can be created via thecreate()
method.Since a
Subject
is conceptionally derived from theProcessor
type in the Reactive Streams specification,null
s are not allowed (Rule 2.13) as parameters toonNext(Object)
andonError(Throwable)
. Such calls will result in aNullPointerException
being thrown and the subject's state is not changed.Since a
PublishSubject
is anObservable
, it does not support backpressure.When this
PublishSubject
is terminated viaonError(Throwable)
oronComplete()
, lateObserver
s only receive the respective terminal event.Unlike a
BehaviorSubject
, aPublishSubject
doesn't retain/cache items, therefore, a newObserver
won't receive any past items.Even though
PublishSubject
implements theObserver
interface, callingonSubscribe
is not required (Rule 2.12) if the subject is used as a standalone source. However, callingonSubscribe
after thePublishSubject
reached its terminal state will result in the givenDisposable
being disposed immediately.Calling
onNext(Object)
,onError(Throwable)
andonComplete()
is required to be serialized (called from the same thread or called non-overlappingly from different threads through external means of serialization). TheSubject.toSerialized()
method available to allSubject
s provides such serialization and also protects against reentrance (i.e., when a downstreamObserver
consuming this subject also wants to callonNext(Object)
on this subject recursively).This
PublishSubject
supports the standard state-peeking methodshasComplete()
,hasThrowable()
,getThrowable()
andhasObservers()
.- Scheduler:
PublishSubject
does not operate by default on a particularScheduler
and theObserver
s get notified on the thread the respectiveonXXX
methods were invoked.- Error handling:
- When the
onError(Throwable)
is called, thePublishSubject
enters into a terminal state and emits the sameThrowable
instance to the last set ofObserver
s. During this emission, if one or moreObserver
s dispose their respectiveDisposable
s, theThrowable
is delivered to the global error handler viaRxJavaPlugins.onError(Throwable)
(multiple times if multipleObserver
s cancel at once). If there were noObserver
s subscribed to thisPublishSubject
when theonError()
was called, the global error handler is not invoked.
Example usage:
PublishSubject<Object> subject = PublishSubject.create(); // observer1 will receive all onNext and onComplete events subject.subscribe(observer1); subject.onNext("one"); subject.onNext("two"); // observer2 will only receive "three" and onComplete subject.subscribe(observer2); subject.onNext("three"); subject.onComplete(); // late Observers only receive the terminal event subject.test().assertEmpty();
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description (package private) static class
PublishSubject.PublishDisposable<T>
Wraps the actual subscriber, tracks its requests and makes cancellation to remove itself from the current subscribers array.
-
Field Summary
Fields Modifier and Type Field Description (package private) static PublishSubject.PublishDisposable[]
EMPTY
An empty subscribers array to avoid allocating it all the time.(package private) java.lang.Throwable
error
The error, write before terminating and read after checking subscribers.(package private) java.util.concurrent.atomic.AtomicReference<PublishSubject.PublishDisposable<T>[]>
subscribers
The array of currently subscribed subscribers.(package private) static PublishSubject.PublishDisposable[]
TERMINATED
The terminated indicator for the subscribers array.
-
Constructor Summary
Constructors Constructor Description PublishSubject()
Constructs a PublishSubject.
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description (package private) boolean
add(PublishSubject.PublishDisposable<T> ps)
Tries to add the given subscriber to the subscribers array atomically or returns false if the subject has terminated.static <T> @NonNull PublishSubject<T>
create()
Constructs a PublishSubject.@Nullable java.lang.Throwable
getThrowable()
Returns the error that caused the Subject to terminate or null if the Subject hasn't terminated yet.boolean
hasComplete()
Returns true if the subject has reached a terminal state through a complete event.boolean
hasObservers()
Returns true if the subject has any Observers.boolean
hasThrowable()
Returns true if the subject has reached a terminal state through an error event.void
onComplete()
Notifies theObserver
that theObservable
has finished sending push-based notifications.void
onError(java.lang.Throwable t)
Notifies theObserver
that theObservable
has experienced an error condition.void
onNext(T t)
Provides theObserver
with a new item to observe.void
onSubscribe(Disposable d)
Provides theObserver
with the means of cancelling (disposing) the connection (channel) with theObservable
in both synchronous (from withinObserver.onNext(Object)
) and asynchronous manner.(package private) void
remove(PublishSubject.PublishDisposable<T> ps)
Atomically removes the given subscriber if it is subscribed to the subject.protected void
subscribeActual(Observer<? super T> t)
Operator implementations (both source and intermediate) should implement this method that performs the necessary business logic and handles the incomingObserver
s.-
Methods inherited from class io.reactivex.rxjava3.subjects.Subject
toSerialized
-
Methods inherited from class io.reactivex.rxjava3.core.Observable
all, amb, ambArray, ambWith, any, blockingFirst, blockingFirst, blockingForEach, blockingForEach, blockingIterable, blockingIterable, blockingLast, blockingLast, blockingLatest, blockingMostRecent, blockingNext, blockingSingle, blockingSingle, blockingStream, blockingStream, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, bufferSize, cache, cacheWithInitialCapacity, cast, collect, collect, collectInto, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatestArray, combineLatestArray, combineLatestArrayDelayError, combineLatestArrayDelayError, combineLatestDelayError, combineLatestDelayError, compose, concat, concat, concat, concat, concat, concat, concatArray, concatArrayDelayError, concatArrayEager, concatArrayEager, concatArrayEagerDelayError, concatArrayEagerDelayError, concatDelayError, concatDelayError, concatDelayError, concatEager, concatEager, concatEager, concatEager, concatEagerDelayError, concatEagerDelayError, concatEagerDelayError, concatEagerDelayError, concatMap, concatMap, concatMap, concatMapCompletable, concatMapCompletable, concatMapCompletableDelayError, concatMapCompletableDelayError, concatMapCompletableDelayError, concatMapDelayError, concatMapDelayError, concatMapDelayError, concatMapEager, concatMapEager, concatMapEagerDelayError, concatMapEagerDelayError, concatMapIterable, concatMapMaybe, concatMapMaybe, concatMapMaybeDelayError, concatMapMaybeDelayError, concatMapMaybeDelayError, concatMapSingle, concatMapSingle, concatMapSingleDelayError, concatMapSingleDelayError, concatMapSingleDelayError, concatMapStream, concatWith, concatWith, concatWith, concatWith, contains, count, create, debounce, debounce, debounce, debounce, defaultIfEmpty, defer, delay, delay, delay, delay, delay, delay, delaySubscription, delaySubscription, delaySubscription, dematerialize, distinct, distinct, distinct, distinctUntilChanged, distinctUntilChanged, distinctUntilChanged, doAfterNext, doAfterTerminate, doFinally, doOnComplete, doOnDispose, doOnEach, doOnEach, doOnError, doOnLifecycle, doOnNext, doOnSubscribe, doOnTerminate, elementAt, elementAt, elementAtOrError, empty, error, error, filter, first, firstElement, firstOrError, firstOrErrorStage, firstStage, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMapCompletable, flatMapCompletable, flatMapIterable, flatMapIterable, flatMapMaybe, flatMapMaybe, flatMapSingle, flatMapSingle, flatMapStream, forEach, forEachWhile, forEachWhile, forEachWhile, fromAction, fromArray, fromCallable, fromCompletable, fromCompletionStage, fromFuture, fromFuture, fromIterable, fromMaybe, fromOptional, fromPublisher, fromRunnable, fromSingle, fromStream, fromSupplier, generate, generate, generate, generate, generate, groupBy, groupBy, groupBy, groupBy, groupBy, groupJoin, hide, ignoreElements, interval, interval, interval, interval, intervalRange, intervalRange, isEmpty, join, just, just, just, just, just, just, just, just, just, just, last, lastElement, lastOrError, lastOrErrorStage, lastStage, lift, map, mapOptional, materialize, merge, merge, merge, merge, merge, merge, merge, merge, mergeArray, mergeArray, mergeArrayDelayError, mergeArrayDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeWith, mergeWith, mergeWith, mergeWith, never, observeOn, observeOn, observeOn, ofType, onErrorComplete, onErrorComplete, onErrorResumeNext, onErrorResumeWith, onErrorReturn, onErrorReturnItem, onTerminateDetach, publish, publish, range, rangeLong, reduce, reduce, reduceWith, repeat, repeat, repeatUntil, repeatWhen, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, retry, retry, retry, retry, retry, retryUntil, retryWhen, safeSubscribe, sample, sample, sample, sample, sample, sample, sample, scan, scan, scanWith, sequenceEqual, sequenceEqual, sequenceEqual, sequenceEqual, serialize, share, single, singleElement, singleOrError, singleOrErrorStage, singleStage, skip, skip, skip, skipLast, skipLast, skipLast, skipLast, skipLast, skipLast, skipUntil, skipWhile, sorted, sorted, startWith, startWith, startWith, startWith, startWithArray, startWithItem, startWithIterable, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribeOn, subscribeWith, switchIfEmpty, switchMap, switchMap, switchMapCompletable, switchMapCompletableDelayError, switchMapDelayError, switchMapDelayError, switchMapMaybe, switchMapMaybeDelayError, switchMapSingle, switchMapSingleDelayError, switchOnNext, switchOnNext, switchOnNextDelayError, switchOnNextDelayError, take, take, take, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeUntil, takeUntil, takeWhile, test, test, throttleFirst, throttleFirst, throttleFirst, throttleLast, throttleLast, throttleLast, throttleLatest, throttleLatest, throttleLatest, throttleLatest, throttleLatest, throttleWithTimeout, throttleWithTimeout, throttleWithTimeout, timeInterval, timeInterval, timeInterval, timeInterval, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timer, timer, timestamp, timestamp, timestamp, timestamp, to, toFlowable, toFuture, toList, toList, toList, toMap, toMap, toMap, toMultimap, toMultimap, toMultimap, toMultimap, toSortedList, toSortedList, toSortedList, toSortedList, unsafeCreate, unsubscribeOn, using, using, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, wrap, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zipArray, zipWith, zipWith, zipWith, zipWith
-
-
-
-
Field Detail
-
TERMINATED
static final PublishSubject.PublishDisposable[] TERMINATED
The terminated indicator for the subscribers array.
-
EMPTY
static final PublishSubject.PublishDisposable[] EMPTY
An empty subscribers array to avoid allocating it all the time.
-
subscribers
final java.util.concurrent.atomic.AtomicReference<PublishSubject.PublishDisposable<T>[]> subscribers
The array of currently subscribed subscribers.
-
error
java.lang.Throwable error
The error, write before terminating and read after checking subscribers.
-
-
Method Detail
-
create
@CheckReturnValue @NonNull public static <T> @NonNull PublishSubject<T> create()
Constructs a PublishSubject.- Type Parameters:
T
- the value type- Returns:
- the new PublishSubject
-
subscribeActual
protected void subscribeActual(Observer<? super T> t)
Description copied from class:Observable
Operator implementations (both source and intermediate) should implement this method that performs the necessary business logic and handles the incomingObserver
s.There is no need to call any of the plugin hooks on the current
Observable
instance or theObserver
; all hooks and basic safeguards have been applied byObservable.subscribe(Observer)
before this method gets called.- Specified by:
subscribeActual
in classObservable<T>
- Parameters:
t
- the incomingObserver
, nevernull
-
add
boolean add(PublishSubject.PublishDisposable<T> ps)
Tries to add the given subscriber to the subscribers array atomically or returns false if the subject has terminated.- Parameters:
ps
- the subscriber to add- Returns:
- true if successful, false if the subject has terminated
-
remove
void remove(PublishSubject.PublishDisposable<T> ps)
Atomically removes the given subscriber if it is subscribed to the subject.- Parameters:
ps
- the subject to remove
-
onSubscribe
public void onSubscribe(Disposable d)
Description copied from interface:Observer
Provides theObserver
with the means of cancelling (disposing) the connection (channel) with theObservable
in both synchronous (from withinObserver.onNext(Object)
) and asynchronous manner.- Parameters:
d
- theDisposable
instance whoseDisposable.dispose()
can be called anytime to cancel the connection
-
onNext
public void onNext(T t)
Description copied from interface:Observer
Provides theObserver
with a new item to observe.The
Observable
may call this method 0 or more times.The
Observable
will not call this method again after it calls eitherObserver.onComplete()
orObserver.onError(java.lang.Throwable)
.- Parameters:
t
- the item emitted by the Observable
-
onError
public void onError(java.lang.Throwable t)
Description copied from interface:Observer
Notifies theObserver
that theObservable
has experienced an error condition.If the
Observable
calls this method, it will not thereafter callObserver.onNext(T)
orObserver.onComplete()
.- Parameters:
t
- the exception encountered by the Observable
-
onComplete
public void onComplete()
Description copied from interface:Observer
Notifies theObserver
that theObservable
has finished sending push-based notifications.The
Observable
will not call this method if it callsObserver.onError(java.lang.Throwable)
.
-
hasObservers
@CheckReturnValue public boolean hasObservers()
Description copied from class:Subject
Returns true if the subject has any Observers.The method is thread-safe.
- Specified by:
hasObservers
in classSubject<T>
- Returns:
- true if the subject has any Observers
-
getThrowable
@Nullable @CheckReturnValue public @Nullable java.lang.Throwable getThrowable()
Description copied from class:Subject
Returns the error that caused the Subject to terminate or null if the Subject hasn't terminated yet.The method is thread-safe.
- Specified by:
getThrowable
in classSubject<T>
- Returns:
- the error that caused the Subject to terminate or null if the Subject hasn't terminated yet
-
hasThrowable
@CheckReturnValue public boolean hasThrowable()
Description copied from class:Subject
Returns true if the subject has reached a terminal state through an error event.The method is thread-safe.
- Specified by:
hasThrowable
in classSubject<T>
- Returns:
- true if the subject has reached a terminal state through an error event
- See Also:
Subject.getThrowable()
,Subject.hasComplete()
-
hasComplete
@CheckReturnValue public boolean hasComplete()
Description copied from class:Subject
Returns true if the subject has reached a terminal state through a complete event.The method is thread-safe.
- Specified by:
hasComplete
in classSubject<T>
- Returns:
- true if the subject has reached a terminal state through a complete event
- See Also:
Subject.hasThrowable()
-
-