Class FlowableCache<T>
java.lang.Object
io.reactivex.rxjava3.core.Flowable<T>
io.reactivex.rxjava3.internal.operators.flowable.AbstractFlowableWithUpstream<T,T>
io.reactivex.rxjava3.internal.operators.flowable.FlowableCache<T>
- Type Parameters:
T
- the source element type
- All Implemented Interfaces:
FlowableSubscriber<T>
,HasUpstreamPublisher<T>
,org.reactivestreams.Publisher<T>
,org.reactivestreams.Subscriber<T>
public final class FlowableCache<T>
extends AbstractFlowableWithUpstream<T,T>
implements FlowableSubscriber<T>
An observable which auto-connects to another observable, caches the elements
from that observable but allows terminating the connection and completing the cache.
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescription(package private) static final class
Hosts the downstream consumer and its current requested and replay states.(package private) static final class
Represents a segment of the cached item list as part of a linked-node-list structure. -
Field Summary
FieldsModifier and TypeFieldDescription(package private) final int
The number of items per cached nodes.(package private) boolean
True if the source has terminated.(package private) static final FlowableCache.CacheSubscription[]
A shared instance of an empty array of subscribers to avoid creating a new empty array when all subscribers cancel.(package private) Throwable
Ifsubscribers
isTERMINATED
, this holds the terminal error if not null.(package private) final FlowableCache.Node
<T> The starting point of the cached items.(package private) final AtomicBoolean
The subscription to the source should happen at most once.(package private) long
The total number of elements in the list available for reads.(package private) final AtomicReference
<FlowableCache.CacheSubscription<T>[]> The current known array of subscriber state to notify.(package private) FlowableCache.Node
<T> The current tail of the linked structure holding the items.(package private) int
How many items have been put into the tail node so far.(package private) static final FlowableCache.CacheSubscription[]
A shared instance indicating the source has no more events and there is no need to remember subscribers anymore.Fields inherited from class io.reactivex.rxjava3.internal.operators.flowable.AbstractFlowableWithUpstream
source
-
Constructor Summary
ConstructorsConstructorDescriptionFlowableCache
(Flowable<T> source, int capacityHint) Constructs an empty, non-connected cache. -
Method Summary
Modifier and TypeMethodDescription(package private) void
add
(FlowableCache.CacheSubscription<T> consumer) Atomically adds the consumer to thesubscribers
copy-on-write array if the source has not yet terminated.(package private) long
Returns the number of events currently cached.(package private) boolean
Returns true if there are observers subscribed to this observable.(package private) boolean
Check if this cached observable is connected to its source.void
void
void
void
onSubscribe
(org.reactivestreams.Subscription s) Implementors of this method should make sure everything that needs to be visible inSubscriber.onNext(Object)
is established before callingSubscription.request(long)
.(package private) void
remove
(FlowableCache.CacheSubscription<T> consumer) Atomically removes the consumer from thesubscribers
copy-on-write array.(package private) void
replay
(FlowableCache.CacheSubscription<T> consumer) Replays the contents of this cache to the given consumer based on its current state and number of items requested by it.protected void
subscribeActual
(org.reactivestreams.Subscriber<? super T> t) Operator implementations (both source and intermediate) should implement this method that performs the necessary business logic and handles the incomingSubscriber
s.Methods inherited from class io.reactivex.rxjava3.internal.operators.flowable.AbstractFlowableWithUpstream
source
Methods inherited from class io.reactivex.rxjava3.core.Flowable
all, amb, ambArray, ambWith, any, blockingFirst, blockingFirst, blockingForEach, blockingForEach, blockingIterable, blockingIterable, blockingLast, blockingLast, blockingLatest, blockingMostRecent, blockingNext, blockingSingle, blockingSingle, blockingStream, blockingStream, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, bufferSize, cache, cacheWithInitialCapacity, cast, collect, collect, collectInto, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatestArray, combineLatestArray, combineLatestArrayDelayError, combineLatestArrayDelayError, combineLatestDelayError, combineLatestDelayError, compose, concat, concat, concat, concat, concat, concat, concatArray, concatArrayDelayError, concatArrayEager, concatArrayEager, concatArrayEagerDelayError, concatArrayEagerDelayError, concatDelayError, concatDelayError, concatDelayError, concatEager, concatEager, concatEager, concatEager, concatEagerDelayError, concatEagerDelayError, concatEagerDelayError, concatEagerDelayError, concatMap, concatMap, concatMap, concatMapCompletable, concatMapCompletable, concatMapCompletableDelayError, concatMapCompletableDelayError, concatMapCompletableDelayError, concatMapDelayError, concatMapDelayError, concatMapDelayError, concatMapEager, concatMapEager, concatMapEagerDelayError, concatMapEagerDelayError, concatMapIterable, concatMapIterable, concatMapMaybe, concatMapMaybe, concatMapMaybeDelayError, concatMapMaybeDelayError, concatMapMaybeDelayError, concatMapSingle, concatMapSingle, concatMapSingleDelayError, concatMapSingleDelayError, concatMapSingleDelayError, concatMapStream, concatMapStream, concatWith, concatWith, concatWith, concatWith, contains, count, create, debounce, debounce, debounce, debounce, defaultIfEmpty, defer, delay, delay, delay, delay, delay, delay, delaySubscription, delaySubscription, delaySubscription, dematerialize, distinct, distinct, distinct, distinctUntilChanged, distinctUntilChanged, distinctUntilChanged, doAfterNext, doAfterTerminate, doFinally, doOnCancel, doOnComplete, doOnEach, doOnEach, doOnError, doOnLifecycle, doOnNext, doOnRequest, doOnSubscribe, doOnTerminate, elementAt, elementAt, elementAtOrError, empty, error, error, filter, first, firstElement, firstOrError, firstOrErrorStage, firstStage, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMapCompletable, flatMapCompletable, flatMapIterable, flatMapIterable, flatMapIterable, flatMapIterable, flatMapMaybe, flatMapMaybe, flatMapSingle, flatMapSingle, flatMapStream, flatMapStream, forEach, forEachWhile, forEachWhile, forEachWhile, fromAction, fromArray, fromCallable, fromCompletable, fromCompletionStage, fromFuture, fromFuture, fromIterable, fromMaybe, fromObservable, fromOptional, fromPublisher, fromRunnable, fromSingle, fromStream, fromSupplier, generate, generate, generate, generate, generate, groupBy, groupBy, groupBy, groupBy, groupBy, groupBy, groupJoin, hide, ignoreElements, interval, interval, interval, interval, intervalRange, intervalRange, isEmpty, join, just, just, just, just, just, just, just, just, just, just, last, lastElement, lastOrError, lastOrErrorStage, lastStage, lift, map, mapOptional, materialize, merge, merge, merge, merge, merge, merge, merge, merge, mergeArray, mergeArray, mergeArrayDelayError, mergeArrayDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeWith, mergeWith, mergeWith, mergeWith, never, observeOn, observeOn, observeOn, ofType, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureDrop, onBackpressureDrop, onBackpressureLatest, onBackpressureLatest, onBackpressureReduce, onBackpressureReduce, onErrorComplete, onErrorComplete, onErrorResumeNext, onErrorResumeWith, onErrorReturn, onErrorReturnItem, onTerminateDetach, parallel, parallel, parallel, publish, publish, publish, publish, range, rangeLong, rebatchRequests, reduce, reduce, reduceWith, repeat, repeat, repeatUntil, repeatWhen, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, retry, retry, retry, retry, retry, retryUntil, retryWhen, safeSubscribe, sample, sample, sample, sample, sample, sample, sample, scan, scan, scanWith, sequenceEqual, sequenceEqual, sequenceEqual, sequenceEqual, serialize, share, single, singleElement, singleOrError, singleOrErrorStage, singleStage, skip, skip, skip, skipLast, skipLast, skipLast, skipLast, skipLast, skipLast, skipUntil, skipWhile, sorted, sorted, startWith, startWith, startWith, startWith, startWithArray, startWithItem, startWithIterable, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribeOn, subscribeOn, subscribeWith, switchIfEmpty, switchMap, switchMap, switchMapCompletable, switchMapCompletableDelayError, switchMapDelayError, switchMapDelayError, switchMapMaybe, switchMapMaybeDelayError, switchMapSingle, switchMapSingleDelayError, switchOnNext, switchOnNext, switchOnNextDelayError, switchOnNextDelayError, take, take, take, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeUntil, takeUntil, takeWhile, test, test, test, throttleFirst, throttleFirst, throttleFirst, throttleLast, throttleLast, throttleLast, throttleLatest, throttleLatest, throttleLatest, throttleLatest, throttleLatest, throttleWithTimeout, throttleWithTimeout, throttleWithTimeout, timeInterval, timeInterval, timeInterval, timeInterval, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timer, timer, timestamp, timestamp, timestamp, timestamp, to, toFuture, toList, toList, toList, toMap, toMap, toMap, toMultimap, toMultimap, toMultimap, toMultimap, toObservable, toSortedList, toSortedList, toSortedList, toSortedList, unsafeCreate, unsubscribeOn, using, using, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zipArray, zipWith, zipWith, zipWith, zipWith
-
Field Details
-
once
The subscription to the source should happen at most once. -
capacityHint
final int capacityHintThe number of items per cached nodes. -
subscribers
The current known array of subscriber state to notify. -
EMPTY
A shared instance of an empty array of subscribers to avoid creating a new empty array when all subscribers cancel. -
TERMINATED
A shared instance indicating the source has no more events and there is no need to remember subscribers anymore. -
size
volatile long sizeThe total number of elements in the list available for reads. -
head
The starting point of the cached items. -
tail
FlowableCache.Node<T> tailThe current tail of the linked structure holding the items. -
tailOffset
int tailOffsetHow many items have been put into the tail node so far. -
error
Throwable errorIfsubscribers
isTERMINATED
, this holds the terminal error if not null. -
done
volatile boolean doneTrue if the source has terminated.
-
-
Constructor Details
-
FlowableCache
Constructs an empty, non-connected cache.- Parameters:
source
- the source to subscribe to for the first incoming subscribercapacityHint
- the number of items expected (reduce allocation frequency)
-
-
Method Details
-
subscribeActual
Description copied from class:Flowable
Operator implementations (both source and intermediate) should implement this method that performs the necessary business logic and handles the incomingSubscriber
s.There is no need to call any of the plugin hooks on the current
Flowable
instance or theSubscriber
; all hooks and basic safeguards have been applied byFlowable.subscribe(Subscriber)
before this method gets called.- Specified by:
subscribeActual
in classFlowable<T>
- Parameters:
t
- the incomingSubscriber
, nevernull
-
isConnected
boolean isConnected()Check if this cached observable is connected to its source.- Returns:
- true if already connected
-
hasSubscribers
boolean hasSubscribers()Returns true if there are observers subscribed to this observable.- Returns:
- true if the cache has Subscribers
-
cachedEventCount
long cachedEventCount()Returns the number of events currently cached.- Returns:
- the number of currently cached event count
-
add
Atomically adds the consumer to thesubscribers
copy-on-write array if the source has not yet terminated.- Parameters:
consumer
- the consumer to add
-
remove
Atomically removes the consumer from thesubscribers
copy-on-write array.- Parameters:
consumer
- the consumer to remove
-
replay
Replays the contents of this cache to the given consumer based on its current state and number of items requested by it.- Parameters:
consumer
- the consumer to continue replaying items to
-
onSubscribe
public void onSubscribe(org.reactivestreams.Subscription s) Description copied from interface:FlowableSubscriber
Implementors of this method should make sure everything that needs to be visible inSubscriber.onNext(Object)
is established before callingSubscription.request(long)
. In practice this means no initialization should happen after therequest()
call and additional behavior is thread safe in respect toonNext
.- Specified by:
onSubscribe
in interfaceFlowableSubscriber<T>
- Specified by:
onSubscribe
in interfaceorg.reactivestreams.Subscriber<T>
-
onNext
- Specified by:
onNext
in interfaceorg.reactivestreams.Subscriber<T>
-
onError
- Specified by:
onError
in interfaceorg.reactivestreams.Subscriber<T>
-
onComplete
public void onComplete()- Specified by:
onComplete
in interfaceorg.reactivestreams.Subscriber<T>
-