Class BehaviorProcessor<T>
- Type Parameters:
T
- the type of item expected to be observed and emitted by the Processor
- All Implemented Interfaces:
FlowableSubscriber<T>
,org.reactivestreams.Processor<T,
,T> org.reactivestreams.Publisher<T>
,org.reactivestreams.Subscriber<T>
Subscriber
.
This processor does not have a public constructor by design; a new empty instance of this
BehaviorProcessor
can be created via the create()
method and
a new non-empty instance can be created via createDefault(Object)
(named as such to avoid
overload resolution conflict with Flowable.create
that creates a Flowable, not a BehaviorProcessor
).
In accordance with the Reactive Streams specification (Rule 2.13)
null
s are not allowed as default initial values in createDefault(Object)
or as parameters to onNext(Object)
and
onError(Throwable)
.
When this BehaviorProcessor
is terminated via onError(Throwable)
or onComplete()
, the
last observed item (if any) is cleared and late Subscriber
s only receive
the respective terminal event.
The BehaviorProcessor
does not support clearing its cached value (to appear empty again), however, the
effect can be achieved by using a special item and making sure Subscriber
s subscribe through a
filter whose predicate filters out this special item:
BehaviorProcessor<Integer> processor = BehaviorProcessor.create();
final Integer EMPTY = Integer.MIN_VALUE;
Flowable<Integer> flowable = processor.filter(v -> v != EMPTY);
TestSubscriber<Integer> ts1 = flowable.test();
processor.onNext(1);
// this will "clear" the cache
processor.onNext(EMPTY);
TestSubscriber<Integer> ts2 = flowable.test();
processor.onNext(2);
processor.onComplete();
// ts1 received both non-empty items
ts1.assertResult(1, 2);
// ts2 received only 2 even though the current item was EMPTY
// when it got subscribed
ts2.assertResult(2);
// Subscribers coming after the processor was terminated receive
// no items and only the onComplete event in this case.
flowable.test().assertResult();
Even though BehaviorProcessor
implements the Subscriber
interface, calling
onSubscribe
is not required (Rule 2.12)
if the processor is used as a standalone source. However, calling onSubscribe
after the BehaviorProcessor
reached its terminal state will result in the
given Subscription
being cancelled immediately.
Calling onNext(Object)
, offer(Object)
, onError(Throwable)
and onComplete()
is required to be serialized (called from the same thread or called non-overlappingly from different threads
through external means of serialization). The FlowableProcessor.toSerialized()
method available to all FlowableProcessor
s
provides such serialization and also protects against reentrance (i.e., when a downstream Subscriber
consuming this processor also wants to call onNext(Object)
on this processor recursively).
Note that serializing over offer(Object)
is not supported through toSerialized()
because it is a method
available on the PublishProcessor
and BehaviorProcessor
classes only.
This BehaviorProcessor
supports the standard state-peeking methods hasComplete()
, hasThrowable()
,
getThrowable()
and hasSubscribers()
as well as means to read the latest observed value
in a non-blocking and thread-safe manner via hasValue()
or getValue()
.
Note that this processor signals MissingBackpressureException
if a particular Subscriber
is not
ready to receive onNext
events. To avoid this exception being signaled, use offer(Object)
to only
try to emit an item when all Subscriber
s have requested item(s).
- Backpressure:
- The
BehaviorProcessor
does not coordinate requests of its downstreamSubscriber
s and expects each individualSubscriber
is ready to receiveonNext
items whenonNext(Object)
is called. If aSubscriber
is not ready, aMissingBackpressureException
is signalled to it. To avoid overflowing the currentSubscriber
s, the conditionaloffer(Object)
method is available that returns true if any of theSubscriber
s is not ready to receiveonNext
events. If there are noSubscriber
s to the processor,offer()
always succeeds. If theBehaviorProcessor
is (optionally) subscribed to anotherPublisher
, this upstreamPublisher
is consumed in an unbounded fashion (requestingLong.MAX_VALUE
). - Scheduler:
BehaviorProcessor
does not operate by default on a particularScheduler
and theSubscriber
s get notified on the thread the respectiveonXXX
methods were invoked.- Error handling:
- When the
onError(Throwable)
is called, theBehaviorProcessor
enters into a terminal state and emits the sameThrowable
instance to the last set ofSubscriber
s. During this emission, if one or moreSubscriber
s cancel their respectiveSubscription
s, theThrowable
is delivered to the global error handler viaRxJavaPlugins.onError(Throwable)
(multiple times if multipleSubscriber
s cancel at once). If there were noSubscriber
s subscribed to thisBehaviorProcessor
when theonError()
was called, the global error handler is not invoked.
Example usage:
// subscriber will receive all events.
BehaviorProcessor<Object> processor = BehaviorProcessor.create("default");
processor.subscribe(subscriber);
processor.onNext("one");
processor.onNext("two");
processor.onNext("three");
// subscriber will receive the "one", "two" and "three" events, but not "zero"
BehaviorProcessor<Object> processor = BehaviorProcessor.create("default");
processor.onNext("zero");
processor.onNext("one");
processor.subscribe(subscriber);
processor.onNext("two");
processor.onNext("three");
// subscriber will receive only onComplete
BehaviorProcessor<Object> processor = BehaviorProcessor.create("default");
processor.onNext("zero");
processor.onNext("one");
processor.onComplete();
processor.subscribe(subscriber);
// subscriber will receive only onError
BehaviorProcessor<Object> processor = BehaviorProcessor.create("default");
processor.onNext("zero");
processor.onNext("one");
processor.onError(new RuntimeException("error"));
processor.subscribe(subscriber);
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescription(package private) static final class
-
Field Summary
FieldsModifier and TypeFieldDescription(package private) static final BehaviorProcessor.BehaviorSubscription[]
(package private) static final Object[]
(package private) long
(package private) final ReadWriteLock
(package private) final Lock
(package private) final AtomicReference
<BehaviorProcessor.BehaviorSubscription<T>[]> (package private) final AtomicReference
<Throwable> (package private) static final BehaviorProcessor.BehaviorSubscription[]
(package private) final AtomicReference
<Object> (package private) final Lock
-
Constructor Summary
ConstructorsConstructorDescriptionConstructs an empty BehaviorProcessor.BehaviorProcessor
(@NonNull T defaultValue) Constructs a BehaviorProcessor with the given initial value. -
Method Summary
Modifier and TypeMethodDescription(package private) boolean
static <T> @NonNull BehaviorProcessor
<T> create()
Creates aBehaviorProcessor
without a default item.static <@NonNull T>
@NonNull BehaviorProcessor<T> createDefault
(@NonNull T defaultValue) Creates aBehaviorProcessor
that emits the last item it observed and all subsequent items to eachSubscriber
that subscribes to it.Returns the error that caused the FlowableProcessor to terminate or null if the FlowableProcessor hasn't terminated yet.getValue()
Returns a single value the BehaviorProcessor currently has or null if no such value exists.boolean
Returns true if the FlowableProcessor has reached a terminal state through a complete event.boolean
Returns true if the FlowableProcessor has subscribers.boolean
Returns true if the FlowableProcessor has reached a terminal state through an error event.boolean
hasValue()
Returns true if the BehaviorProcessor has any value.boolean
Tries to emit the item to all currently subscribedSubscriber
s if all of them has requested some value, returnsfalse
otherwise.void
void
void
void
onSubscribe
(@NonNull org.reactivestreams.Subscription s) Implementors of this method should make sure everything that needs to be visible inSubscriber.onNext(Object)
is established before callingSubscription.request(long)
.(package private) void
(package private) void
setCurrent
(Object o) protected void
subscribeActual
(@NonNull org.reactivestreams.Subscriber<? super @NonNull T> s) Operator implementations (both source and intermediate) should implement this method that performs the necessary business logic and handles the incomingSubscriber
s.(package private) int
(package private) BehaviorProcessor.BehaviorSubscription<T>[]
Methods inherited from class io.reactivex.rxjava3.processors.FlowableProcessor
toSerialized
Methods inherited from class io.reactivex.rxjava3.core.Flowable
all, amb, ambArray, ambWith, any, blockingFirst, blockingFirst, blockingForEach, blockingForEach, blockingIterable, blockingIterable, blockingLast, blockingLast, blockingLatest, blockingMostRecent, blockingNext, blockingSingle, blockingSingle, blockingStream, blockingStream, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, bufferSize, cache, cacheWithInitialCapacity, cast, collect, collect, collectInto, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatestArray, combineLatestArray, combineLatestArrayDelayError, combineLatestArrayDelayError, combineLatestDelayError, combineLatestDelayError, compose, concat, concat, concat, concat, concat, concat, concatArray, concatArrayDelayError, concatArrayEager, concatArrayEager, concatArrayEagerDelayError, concatArrayEagerDelayError, concatDelayError, concatDelayError, concatDelayError, concatEager, concatEager, concatEager, concatEager, concatEagerDelayError, concatEagerDelayError, concatEagerDelayError, concatEagerDelayError, concatMap, concatMap, concatMap, concatMapCompletable, concatMapCompletable, concatMapCompletableDelayError, concatMapCompletableDelayError, concatMapCompletableDelayError, concatMapDelayError, concatMapDelayError, concatMapDelayError, concatMapEager, concatMapEager, concatMapEagerDelayError, concatMapEagerDelayError, concatMapIterable, concatMapIterable, concatMapMaybe, concatMapMaybe, concatMapMaybeDelayError, concatMapMaybeDelayError, concatMapMaybeDelayError, concatMapSingle, concatMapSingle, concatMapSingleDelayError, concatMapSingleDelayError, concatMapSingleDelayError, concatMapStream, concatMapStream, concatWith, concatWith, concatWith, concatWith, contains, count, create, debounce, debounce, debounce, debounce, defaultIfEmpty, defer, delay, delay, delay, delay, delay, delay, delaySubscription, delaySubscription, delaySubscription, dematerialize, distinct, distinct, distinct, distinctUntilChanged, distinctUntilChanged, distinctUntilChanged, doAfterNext, doAfterTerminate, doFinally, doOnCancel, doOnComplete, doOnEach, doOnEach, doOnError, doOnLifecycle, doOnNext, doOnRequest, doOnSubscribe, doOnTerminate, elementAt, elementAt, elementAtOrError, empty, error, error, filter, first, firstElement, firstOrError, firstOrErrorStage, firstStage, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMapCompletable, flatMapCompletable, flatMapIterable, flatMapIterable, flatMapIterable, flatMapIterable, flatMapMaybe, flatMapMaybe, flatMapSingle, flatMapSingle, flatMapStream, flatMapStream, forEach, forEachWhile, forEachWhile, forEachWhile, fromAction, fromArray, fromCallable, fromCompletable, fromCompletionStage, fromFuture, fromFuture, fromIterable, fromMaybe, fromObservable, fromOptional, fromPublisher, fromRunnable, fromSingle, fromStream, fromSupplier, generate, generate, generate, generate, generate, groupBy, groupBy, groupBy, groupBy, groupBy, groupBy, groupJoin, hide, ignoreElements, interval, interval, interval, interval, intervalRange, intervalRange, isEmpty, join, just, just, just, just, just, just, just, just, just, just, last, lastElement, lastOrError, lastOrErrorStage, lastStage, lift, map, mapOptional, materialize, merge, merge, merge, merge, merge, merge, merge, merge, mergeArray, mergeArray, mergeArrayDelayError, mergeArrayDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeWith, mergeWith, mergeWith, mergeWith, never, observeOn, observeOn, observeOn, ofType, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureDrop, onBackpressureDrop, onBackpressureLatest, onBackpressureLatest, onBackpressureReduce, onBackpressureReduce, onErrorComplete, onErrorComplete, onErrorResumeNext, onErrorResumeWith, onErrorReturn, onErrorReturnItem, onTerminateDetach, parallel, parallel, parallel, publish, publish, publish, publish, range, rangeLong, rebatchRequests, reduce, reduce, reduceWith, repeat, repeat, repeatUntil, repeatWhen, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, retry, retry, retry, retry, retry, retryUntil, retryWhen, safeSubscribe, sample, sample, sample, sample, sample, sample, sample, scan, scan, scanWith, sequenceEqual, sequenceEqual, sequenceEqual, sequenceEqual, serialize, share, single, singleElement, singleOrError, singleOrErrorStage, singleStage, skip, skip, skip, skipLast, skipLast, skipLast, skipLast, skipLast, skipLast, skipUntil, skipWhile, sorted, sorted, startWith, startWith, startWith, startWith, startWithArray, startWithItem, startWithIterable, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribeOn, subscribeOn, subscribeWith, switchIfEmpty, switchMap, switchMap, switchMapCompletable, switchMapCompletableDelayError, switchMapDelayError, switchMapDelayError, switchMapMaybe, switchMapMaybeDelayError, switchMapSingle, switchMapSingleDelayError, switchOnNext, switchOnNext, switchOnNextDelayError, switchOnNextDelayError, take, take, take, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeUntil, takeUntil, takeWhile, test, test, test, throttleFirst, throttleFirst, throttleFirst, throttleLast, throttleLast, throttleLast, throttleLatest, throttleLatest, throttleLatest, throttleLatest, throttleLatest, throttleWithTimeout, throttleWithTimeout, throttleWithTimeout, timeInterval, timeInterval, timeInterval, timeInterval, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timer, timer, timestamp, timestamp, timestamp, timestamp, to, toFuture, toList, toList, toList, toMap, toMap, toMap, toMultimap, toMultimap, toMultimap, toMultimap, toObservable, toSortedList, toSortedList, toSortedList, toSortedList, unsafeCreate, unsubscribeOn, using, using, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zipArray, zipWith, zipWith, zipWith, zipWith
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
Methods inherited from interface org.reactivestreams.Publisher
subscribe
-
Field Details
-
subscribers
-
EMPTY_ARRAY
-
EMPTY
-
TERMINATED
-
lock
-
readLock
-
writeLock
-
value
-
terminalEvent
-
index
long index
-
-
Constructor Details
-
BehaviorProcessor
BehaviorProcessor()Constructs an empty BehaviorProcessor.- Since:
- 2.0
-
BehaviorProcessor
Constructs a BehaviorProcessor with the given initial value.- Parameters:
defaultValue
- the initial value, not null (verified)- Throws:
NullPointerException
- ifdefaultValue
isnull
- Since:
- 2.0
-
-
Method Details
-
create
Creates aBehaviorProcessor
without a default item.- Type Parameters:
T
- the type of item the BehaviorProcessor will emit- Returns:
- the constructed
BehaviorProcessor
-
createDefault
@CheckReturnValue @NonNull public static <@NonNull T> @NonNull BehaviorProcessor<T> createDefault(@NonNull T defaultValue) Creates aBehaviorProcessor
that emits the last item it observed and all subsequent items to eachSubscriber
that subscribes to it.- Type Parameters:
T
- the type of item the BehaviorProcessor will emit- Parameters:
defaultValue
- the item that will be emitted first to anySubscriber
as long as theBehaviorProcessor
has not yet observed any items from its sourceObservable
- Returns:
- the constructed
BehaviorProcessor
- Throws:
NullPointerException
- ifdefaultValue
isnull
-
subscribeActual
protected void subscribeActual(@NonNull @NonNull org.reactivestreams.Subscriber<? super @NonNull T> s) Description copied from class:Flowable
Operator implementations (both source and intermediate) should implement this method that performs the necessary business logic and handles the incomingSubscriber
s.There is no need to call any of the plugin hooks on the current
Flowable
instance or theSubscriber
; all hooks and basic safeguards have been applied byFlowable.subscribe(Subscriber)
before this method gets called.- Specified by:
subscribeActual
in classFlowable<T>
- Parameters:
s
- the incomingSubscriber
, nevernull
-
onSubscribe
Description copied from interface:FlowableSubscriber
Implementors of this method should make sure everything that needs to be visible inSubscriber.onNext(Object)
is established before callingSubscription.request(long)
. In practice this means no initialization should happen after therequest()
call and additional behavior is thread safe in respect toonNext
. -
onNext
-
onError
-
onComplete
public void onComplete() -
offer
Tries to emit the item to all currently subscribedSubscriber
s if all of them has requested some value, returnsfalse
otherwise.This method should be called in a sequential manner just like the
onXXX
methods of thisBehaviorProcessor
.History: 2.0.8 - experimental
- Parameters:
t
- the item to emit, notnull
- Returns:
true
if the item was emitted to allSubscriber
s- Throws:
NullPointerException
- ift
isnull
- Since:
- 2.2
-
hasSubscribers
Description copied from class:FlowableProcessor
Returns true if the FlowableProcessor has subscribers.The method is thread-safe.
- Specified by:
hasSubscribers
in classFlowableProcessor<T>
- Returns:
- true if the FlowableProcessor has subscribers
-
subscriberCount
-
getThrowable
Description copied from class:FlowableProcessor
Returns the error that caused the FlowableProcessor to terminate or null if the FlowableProcessor hasn't terminated yet.The method is thread-safe.
- Specified by:
getThrowable
in classFlowableProcessor<T>
- Returns:
- the error that caused the FlowableProcessor to terminate or null if the FlowableProcessor hasn't terminated yet
-
getValue
Returns a single value the BehaviorProcessor currently has or null if no such value exists.The method is thread-safe.
- Returns:
- a single value the BehaviorProcessor currently has or null if no such value exists
-
hasComplete
Description copied from class:FlowableProcessor
Returns true if the FlowableProcessor has reached a terminal state through a complete event.The method is thread-safe.
- Specified by:
hasComplete
in classFlowableProcessor<T>
- Returns:
- true if the FlowableProcessor has reached a terminal state through a complete event
- See Also:
-
hasThrowable
Description copied from class:FlowableProcessor
Returns true if the FlowableProcessor has reached a terminal state through an error event.The method is thread-safe.
- Specified by:
hasThrowable
in classFlowableProcessor<T>
- Returns:
- true if the FlowableProcessor has reached a terminal state through an error event
- See Also:
-
hasValue
Returns true if the BehaviorProcessor has any value.The method is thread-safe.
- Returns:
- true if the BehaviorProcessor has any value
-
add
-
remove
-
terminate
-
setCurrent
-