Class BehaviorSubject<T>
- Type Parameters:
T
- the type of item expected to be observed by the Subject
- All Implemented Interfaces:
ObservableSource<T>
,Observer<T>
Observer
.
This subject does not have a public constructor by design; a new empty instance of this
BehaviorSubject
can be created via the create()
method and
a new non-empty instance can be created via createDefault(Object)
(named as such to avoid
overload resolution conflict with Observable.create
that creates an Observable, not a BehaviorSubject
).
Since a Subject
is conceptionally derived from the Processor
type in the Reactive Streams specification,
null
s are not allowed (Rule 2.13) as
default initial values in createDefault(Object)
or as parameters to onNext(Object)
and
onError(Throwable)
. Such calls will result in a
NullPointerException
being thrown and the subject's state is not changed.
Since a BehaviorSubject
is an Observable
, it does not support backpressure.
When this BehaviorSubject
is terminated via onError(Throwable)
or onComplete()
, the
last observed item (if any) is cleared and late Observer
s only receive
the respective terminal event.
The BehaviorSubject
does not support clearing its cached value (to appear empty again), however, the
effect can be achieved by using a special item and making sure Observer
s subscribe through a
filter whose predicate filters out this special item:
BehaviorSubject<Integer> subject = BehaviorSubject.create();
final Integer EMPTY = Integer.MIN_VALUE;
Observable<Integer> observable = subject.filter(v -> v != EMPTY);
TestObserver<Integer> to1 = observable.test();
subject.onNext(1);
// this will "clear" the cache
subject.onNext(EMPTY);
TestObserver<Integer> to2 = observable.test();
subject.onNext(2);
subject.onComplete();
// to1 received both non-empty items
to1.assertResult(1, 2);
// to2 received only 2 even though the current item was EMPTY
// when it got subscribed
to2.assertResult(2);
// Observers coming after the subject was terminated receive
// no items and only the onComplete event in this case.
observable.test().assertResult();
Even though BehaviorSubject
implements the Observer
interface, calling
onSubscribe
is not required (Rule 2.12)
if the subject is used as a standalone source. However, calling onSubscribe
after the BehaviorSubject
reached its terminal state will result in the
given Disposable
being disposed immediately.
Calling onNext(Object)
, onError(Throwable)
and onComplete()
is required to be serialized (called from the same thread or called non-overlappingly from different threads
through external means of serialization). The Subject.toSerialized()
method available to all Subject
s
provides such serialization and also protects against reentrance (i.e., when a downstream Observer
consuming this subject also wants to call onNext(Object)
on this subject recursively).
This BehaviorSubject
supports the standard state-peeking methods hasComplete()
, hasThrowable()
,
getThrowable()
and hasObservers()
as well as means to read the latest observed value
in a non-blocking and thread-safe manner via hasValue()
or getValue()
.
- Scheduler:
BehaviorSubject
does not operate by default on a particularScheduler
and theObserver
s get notified on the thread the respectiveonXXX
methods were invoked.- Error handling:
- When the
onError(Throwable)
is called, theBehaviorSubject
enters into a terminal state and emits the sameThrowable
instance to the last set ofObserver
s. During this emission, if one or moreObserver
s dispose their respectiveDisposable
s, theThrowable
is delivered to the global error handler viaRxJavaPlugins.onError(Throwable)
(multiple times if multipleObserver
s cancel at once). If there were noObserver
s subscribed to thisBehaviorSubject
when theonError()
was called, the global error handler is not invoked.
Example usage:
// observer will receive all 4 events (including "default").
BehaviorSubject<Object> subject = BehaviorSubject.createDefault("default");
subject.subscribe(observer);
subject.onNext("one");
subject.onNext("two");
subject.onNext("three");
// observer will receive the "one", "two" and "three" events, but not "zero"
BehaviorSubject<Object> subject = BehaviorSubject.create();
subject.onNext("zero");
subject.onNext("one");
subject.subscribe(observer);
subject.onNext("two");
subject.onNext("three");
// observer will receive only onComplete
BehaviorSubject<Object> subject = BehaviorSubject.create();
subject.onNext("zero");
subject.onNext("one");
subject.onComplete();
subject.subscribe(observer);
// observer will receive only onError
BehaviorSubject<Object> subject = BehaviorSubject.create();
subject.onNext("zero");
subject.onNext("one");
subject.onError(new RuntimeException("error"));
subject.subscribe(observer);
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescription(package private) static final class
-
Field Summary
FieldsModifier and TypeFieldDescription(package private) static final BehaviorSubject.BehaviorDisposable[]
(package private) long
(package private) final ReadWriteLock
(package private) final AtomicReference
<BehaviorSubject.BehaviorDisposable<T>[]> (package private) final Lock
(package private) final AtomicReference
<Throwable> (package private) static final BehaviorSubject.BehaviorDisposable[]
(package private) final AtomicReference
<Object> (package private) final Lock
-
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescription(package private) boolean
static <T> @NonNull BehaviorSubject
<T> create()
Creates aBehaviorSubject
without a default item.static <@NonNull T>
@NonNull BehaviorSubject<T> createDefault
(@NonNull T defaultValue) Creates aBehaviorSubject
that emits the last item it observed and all subsequent items to eachObserver
that subscribes to it.Returns the error that caused the Subject to terminate or null if the Subject hasn't terminated yet.getValue()
Returns a single value the Subject currently has or null if no such value exists.boolean
Returns true if the subject has reached a terminal state through a complete event.boolean
Returns true if the subject has any Observers.boolean
Returns true if the subject has reached a terminal state through an error event.boolean
hasValue()
Returns true if the subject has any value.void
Notifies theObserver
that theObservable
has finished sending push-based notifications.void
Notifies theObserver
that theObservable
has experienced an error condition.void
Provides theObserver
with a new item to observe.void
Provides theObserver
with the means of cancelling (disposing) the connection (channel) with theObservable
in both synchronous (from withinObserver.onNext(Object)
) and asynchronous manner.(package private) void
(package private) void
setCurrent
(Object o) protected void
subscribeActual
(Observer<? super T> observer) Operator implementations (both source and intermediate) should implement this method that performs the necessary business logic and handles the incomingObserver
s.(package private) int
(package private) BehaviorSubject.BehaviorDisposable<T>[]
Methods inherited from class io.reactivex.rxjava3.subjects.Subject
toSerialized
Methods inherited from class io.reactivex.rxjava3.core.Observable
all, amb, ambArray, ambWith, any, blockingFirst, blockingFirst, blockingForEach, blockingForEach, blockingIterable, blockingIterable, blockingLast, blockingLast, blockingLatest, blockingMostRecent, blockingNext, blockingSingle, blockingSingle, blockingStream, blockingStream, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, bufferSize, cache, cacheWithInitialCapacity, cast, collect, collect, collectInto, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatestArray, combineLatestArray, combineLatestArrayDelayError, combineLatestArrayDelayError, combineLatestDelayError, combineLatestDelayError, compose, concat, concat, concat, concat, concat, concat, concatArray, concatArrayDelayError, concatArrayEager, concatArrayEager, concatArrayEagerDelayError, concatArrayEagerDelayError, concatDelayError, concatDelayError, concatDelayError, concatEager, concatEager, concatEager, concatEager, concatEagerDelayError, concatEagerDelayError, concatEagerDelayError, concatEagerDelayError, concatMap, concatMap, concatMap, concatMapCompletable, concatMapCompletable, concatMapCompletableDelayError, concatMapCompletableDelayError, concatMapCompletableDelayError, concatMapDelayError, concatMapDelayError, concatMapDelayError, concatMapEager, concatMapEager, concatMapEagerDelayError, concatMapEagerDelayError, concatMapIterable, concatMapMaybe, concatMapMaybe, concatMapMaybeDelayError, concatMapMaybeDelayError, concatMapMaybeDelayError, concatMapSingle, concatMapSingle, concatMapSingleDelayError, concatMapSingleDelayError, concatMapSingleDelayError, concatMapStream, concatWith, concatWith, concatWith, concatWith, contains, count, create, debounce, debounce, debounce, debounce, defaultIfEmpty, defer, delay, delay, delay, delay, delay, delay, delaySubscription, delaySubscription, delaySubscription, dematerialize, distinct, distinct, distinct, distinctUntilChanged, distinctUntilChanged, distinctUntilChanged, doAfterNext, doAfterTerminate, doFinally, doOnComplete, doOnDispose, doOnEach, doOnEach, doOnError, doOnLifecycle, doOnNext, doOnSubscribe, doOnTerminate, elementAt, elementAt, elementAtOrError, empty, error, error, filter, first, firstElement, firstOrError, firstOrErrorStage, firstStage, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMapCompletable, flatMapCompletable, flatMapIterable, flatMapIterable, flatMapMaybe, flatMapMaybe, flatMapSingle, flatMapSingle, flatMapStream, forEach, forEachWhile, forEachWhile, forEachWhile, fromAction, fromArray, fromCallable, fromCompletable, fromCompletionStage, fromFuture, fromFuture, fromIterable, fromMaybe, fromOptional, fromPublisher, fromRunnable, fromSingle, fromStream, fromSupplier, generate, generate, generate, generate, generate, groupBy, groupBy, groupBy, groupBy, groupBy, groupJoin, hide, ignoreElements, interval, interval, interval, interval, intervalRange, intervalRange, isEmpty, join, just, just, just, just, just, just, just, just, just, just, last, lastElement, lastOrError, lastOrErrorStage, lastStage, lift, map, mapOptional, materialize, merge, merge, merge, merge, merge, merge, merge, merge, mergeArray, mergeArray, mergeArrayDelayError, mergeArrayDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeWith, mergeWith, mergeWith, mergeWith, never, observeOn, observeOn, observeOn, ofType, onErrorComplete, onErrorComplete, onErrorResumeNext, onErrorResumeWith, onErrorReturn, onErrorReturnItem, onTerminateDetach, publish, publish, range, rangeLong, reduce, reduce, reduceWith, repeat, repeat, repeatUntil, repeatWhen, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, retry, retry, retry, retry, retry, retryUntil, retryWhen, safeSubscribe, sample, sample, sample, sample, sample, sample, sample, scan, scan, scanWith, sequenceEqual, sequenceEqual, sequenceEqual, sequenceEqual, serialize, share, single, singleElement, singleOrError, singleOrErrorStage, singleStage, skip, skip, skip, skipLast, skipLast, skipLast, skipLast, skipLast, skipLast, skipUntil, skipWhile, sorted, sorted, startWith, startWith, startWith, startWith, startWithArray, startWithItem, startWithIterable, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribeOn, subscribeWith, switchIfEmpty, switchMap, switchMap, switchMapCompletable, switchMapCompletableDelayError, switchMapDelayError, switchMapDelayError, switchMapMaybe, switchMapMaybeDelayError, switchMapSingle, switchMapSingleDelayError, switchOnNext, switchOnNext, switchOnNextDelayError, switchOnNextDelayError, take, take, take, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeUntil, takeUntil, takeWhile, test, test, throttleFirst, throttleFirst, throttleFirst, throttleLast, throttleLast, throttleLast, throttleLatest, throttleLatest, throttleLatest, throttleLatest, throttleLatest, throttleWithTimeout, throttleWithTimeout, throttleWithTimeout, timeInterval, timeInterval, timeInterval, timeInterval, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timer, timer, timestamp, timestamp, timestamp, timestamp, to, toFlowable, toFuture, toList, toList, toList, toMap, toMap, toMap, toMultimap, toMultimap, toMultimap, toMultimap, toSortedList, toSortedList, toSortedList, toSortedList, unsafeCreate, unsubscribeOn, using, using, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, wrap, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zipArray, zipWith, zipWith, zipWith, zipWith
-
Field Details
-
value
-
observers
-
EMPTY
-
TERMINATED
-
lock
-
readLock
-
writeLock
-
terminalEvent
-
index
long index
-
-
Constructor Details
-
BehaviorSubject
BehaviorSubject(T defaultValue) Constructs an empty BehaviorSubject.- Parameters:
defaultValue
- the initial value, not null (verified)- Since:
- 2.0
-
-
Method Details
-
create
Creates aBehaviorSubject
without a default item.- Type Parameters:
T
- the type of item the Subject will emit- Returns:
- the constructed
BehaviorSubject
-
createDefault
@CheckReturnValue @NonNull public static <@NonNull T> @NonNull BehaviorSubject<T> createDefault(@NonNull T defaultValue) Creates aBehaviorSubject
that emits the last item it observed and all subsequent items to eachObserver
that subscribes to it.- Type Parameters:
T
- the type of item the Subject will emit- Parameters:
defaultValue
- the item that will be emitted first to anyObserver
as long as theBehaviorSubject
has not yet observed any items from its sourceObservable
- Returns:
- the constructed
BehaviorSubject
- Throws:
NullPointerException
- ifdefaultValue
isnull
-
subscribeActual
Description copied from class:Observable
Operator implementations (both source and intermediate) should implement this method that performs the necessary business logic and handles the incomingObserver
s.There is no need to call any of the plugin hooks on the current
Observable
instance or theObserver
; all hooks and basic safeguards have been applied byObservable.subscribe(Observer)
before this method gets called.- Specified by:
subscribeActual
in classObservable<T>
- Parameters:
observer
- the incomingObserver
, nevernull
-
onSubscribe
Description copied from interface:Observer
Provides theObserver
with the means of cancelling (disposing) the connection (channel) with theObservable
in both synchronous (from withinObserver.onNext(Object)
) and asynchronous manner.- Parameters:
d
- theDisposable
instance whoseDisposable.dispose()
can be called anytime to cancel the connection
-
onNext
Description copied from interface:Observer
Provides theObserver
with a new item to observe.The
Observable
may call this method 0 or more times.The
Observable
will not call this method again after it calls eitherObserver.onComplete()
orObserver.onError(java.lang.Throwable)
.- Parameters:
t
- the item emitted by the Observable
-
onError
Description copied from interface:Observer
Notifies theObserver
that theObservable
has experienced an error condition.If the
Observable
calls this method, it will not thereafter callObserver.onNext(T)
orObserver.onComplete()
.- Parameters:
t
- the exception encountered by the Observable
-
onComplete
public void onComplete()Description copied from interface:Observer
Notifies theObserver
that theObservable
has finished sending push-based notifications.The
Observable
will not call this method if it callsObserver.onError(java.lang.Throwable)
. -
hasObservers
Description copied from class:Subject
Returns true if the subject has any Observers.The method is thread-safe.
- Specified by:
hasObservers
in classSubject<T>
- Returns:
- true if the subject has any Observers
-
subscriberCount
-
getThrowable
Description copied from class:Subject
Returns the error that caused the Subject to terminate or null if the Subject hasn't terminated yet.The method is thread-safe.
- Specified by:
getThrowable
in classSubject<T>
- Returns:
- the error that caused the Subject to terminate or null if the Subject hasn't terminated yet
-
getValue
Returns a single value the Subject currently has or null if no such value exists.The method is thread-safe.
- Returns:
- a single value the Subject currently has or null if no such value exists
-
hasComplete
Description copied from class:Subject
Returns true if the subject has reached a terminal state through a complete event.The method is thread-safe.
- Specified by:
hasComplete
in classSubject<T>
- Returns:
- true if the subject has reached a terminal state through a complete event
- See Also:
-
hasThrowable
Description copied from class:Subject
Returns true if the subject has reached a terminal state through an error event.The method is thread-safe.
- Specified by:
hasThrowable
in classSubject<T>
- Returns:
- true if the subject has reached a terminal state through an error event
- See Also:
-
hasValue
Returns true if the subject has any value.The method is thread-safe.
- Returns:
- true if the subject has any value
-
add
-
remove
-
terminate
-
setCurrent
-