Class ObservableCache<T>
- java.lang.Object
-
- io.reactivex.rxjava3.core.Observable<U>
-
- io.reactivex.rxjava3.internal.operators.observable.AbstractObservableWithUpstream<T,T>
-
- io.reactivex.rxjava3.internal.operators.observable.ObservableCache<T>
-
- Type Parameters:
T
- the source element type
- All Implemented Interfaces:
ObservableSource<T>
,Observer<T>
,HasUpstreamObservableSource<T>
public final class ObservableCache<T> extends AbstractObservableWithUpstream<T,T> implements Observer<T>
An observable which auto-connects to another observable, caches the elements from that observable but allows terminating the connection and completing the cache.
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description (package private) static class
ObservableCache.CacheDisposable<T>
Hosts the downstream consumer and its current requested and replay states.(package private) static class
ObservableCache.Node<T>
Represents a segment of the cached item list as part of a linked-node-list structure.
-
Field Summary
Fields Modifier and Type Field Description (package private) int
capacityHint
The number of items per cached nodes.(package private) boolean
done
True if the source has terminated.(package private) static ObservableCache.CacheDisposable[]
EMPTY
A shared instance of an empty array of observers to avoid creating a new empty array when all observers dispose.(package private) java.lang.Throwable
error
Ifobservers
isTERMINATED
, this holds the terminal error if not null.(package private) ObservableCache.Node<T>
head
The starting point of the cached items.(package private) java.util.concurrent.atomic.AtomicReference<ObservableCache.CacheDisposable<T>[]>
observers
The current known array of observer state to notify.(package private) java.util.concurrent.atomic.AtomicBoolean
once
The subscription to the source should happen at most once.(package private) long
size
The total number of elements in the list available for reads.(package private) ObservableCache.Node<T>
tail
The current tail of the linked structure holding the items.(package private) int
tailOffset
How many items have been put into the tail node so far.(package private) static ObservableCache.CacheDisposable[]
TERMINATED
A shared instance indicating the source has no more events and there is no need to remember observers anymore.-
Fields inherited from class io.reactivex.rxjava3.internal.operators.observable.AbstractObservableWithUpstream
source
-
-
Constructor Summary
Constructors Constructor Description ObservableCache(Observable<T> source, int capacityHint)
Constructs an empty, non-connected cache.
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description (package private) void
add(ObservableCache.CacheDisposable<T> consumer)
Atomically adds the consumer to theobservers
copy-on-write array if the source has not yet terminated.(package private) long
cachedEventCount()
Returns the number of events currently cached.(package private) boolean
hasObservers()
Returns true if there are observers subscribed to this observable.(package private) boolean
isConnected()
Check if this cached observable is connected to its source.void
onComplete()
Notifies theObserver
that theObservable
has finished sending push-based notifications.void
onError(java.lang.Throwable t)
Notifies theObserver
that theObservable
has experienced an error condition.void
onNext(T t)
Provides theObserver
with a new item to observe.void
onSubscribe(Disposable d)
Provides theObserver
with the means of cancelling (disposing) the connection (channel) with theObservable
in both synchronous (from withinObserver.onNext(Object)
) and asynchronous manner.(package private) void
remove(ObservableCache.CacheDisposable<T> consumer)
Atomically removes the consumer from theobservers
copy-on-write array.(package private) void
replay(ObservableCache.CacheDisposable<T> consumer)
Replays the contents of this cache to the given consumer based on its current state and number of items requested by it.protected void
subscribeActual(Observer<? super T> t)
Operator implementations (both source and intermediate) should implement this method that performs the necessary business logic and handles the incomingObserver
s.-
Methods inherited from class io.reactivex.rxjava3.internal.operators.observable.AbstractObservableWithUpstream
source
-
Methods inherited from class io.reactivex.rxjava3.core.Observable
all, amb, ambArray, ambWith, any, blockingFirst, blockingFirst, blockingForEach, blockingForEach, blockingIterable, blockingIterable, blockingLast, blockingLast, blockingLatest, blockingMostRecent, blockingNext, blockingSingle, blockingSingle, blockingStream, blockingStream, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, bufferSize, cache, cacheWithInitialCapacity, cast, collect, collect, collectInto, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatestArray, combineLatestArray, combineLatestArrayDelayError, combineLatestArrayDelayError, combineLatestDelayError, combineLatestDelayError, compose, concat, concat, concat, concat, concat, concat, concatArray, concatArrayDelayError, concatArrayEager, concatArrayEager, concatArrayEagerDelayError, concatArrayEagerDelayError, concatDelayError, concatDelayError, concatDelayError, concatEager, concatEager, concatEager, concatEager, concatEagerDelayError, concatEagerDelayError, concatEagerDelayError, concatEagerDelayError, concatMap, concatMap, concatMap, concatMapCompletable, concatMapCompletable, concatMapCompletableDelayError, concatMapCompletableDelayError, concatMapCompletableDelayError, concatMapDelayError, concatMapDelayError, concatMapDelayError, concatMapEager, concatMapEager, concatMapEagerDelayError, concatMapEagerDelayError, concatMapIterable, concatMapMaybe, concatMapMaybe, concatMapMaybeDelayError, concatMapMaybeDelayError, concatMapMaybeDelayError, concatMapSingle, concatMapSingle, concatMapSingleDelayError, concatMapSingleDelayError, concatMapSingleDelayError, concatMapStream, concatWith, concatWith, concatWith, concatWith, contains, count, create, debounce, debounce, debounce, debounce, defaultIfEmpty, defer, delay, delay, delay, delay, delay, delay, delaySubscription, delaySubscription, delaySubscription, dematerialize, distinct, distinct, distinct, distinctUntilChanged, distinctUntilChanged, distinctUntilChanged, doAfterNext, doAfterTerminate, doFinally, doOnComplete, doOnDispose, doOnEach, doOnEach, doOnError, doOnLifecycle, doOnNext, doOnSubscribe, doOnTerminate, elementAt, elementAt, elementAtOrError, empty, error, error, filter, first, firstElement, firstOrError, firstOrErrorStage, firstStage, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMapCompletable, flatMapCompletable, flatMapIterable, flatMapIterable, flatMapMaybe, flatMapMaybe, flatMapSingle, flatMapSingle, flatMapStream, forEach, forEachWhile, forEachWhile, forEachWhile, fromAction, fromArray, fromCallable, fromCompletable, fromCompletionStage, fromFuture, fromFuture, fromIterable, fromMaybe, fromOptional, fromPublisher, fromRunnable, fromSingle, fromStream, fromSupplier, generate, generate, generate, generate, generate, groupBy, groupBy, groupBy, groupBy, groupBy, groupJoin, hide, ignoreElements, interval, interval, interval, interval, intervalRange, intervalRange, isEmpty, join, just, just, just, just, just, just, just, just, just, just, last, lastElement, lastOrError, lastOrErrorStage, lastStage, lift, map, mapOptional, materialize, merge, merge, merge, merge, merge, merge, merge, merge, mergeArray, mergeArray, mergeArrayDelayError, mergeArrayDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeWith, mergeWith, mergeWith, mergeWith, never, observeOn, observeOn, observeOn, ofType, onErrorComplete, onErrorComplete, onErrorResumeNext, onErrorResumeWith, onErrorReturn, onErrorReturnItem, onTerminateDetach, publish, publish, range, rangeLong, reduce, reduce, reduceWith, repeat, repeat, repeatUntil, repeatWhen, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, retry, retry, retry, retry, retry, retryUntil, retryWhen, safeSubscribe, sample, sample, sample, sample, sample, sample, sample, scan, scan, scanWith, sequenceEqual, sequenceEqual, sequenceEqual, sequenceEqual, serialize, share, single, singleElement, singleOrError, singleOrErrorStage, singleStage, skip, skip, skip, skipLast, skipLast, skipLast, skipLast, skipLast, skipLast, skipUntil, skipWhile, sorted, sorted, startWith, startWith, startWith, startWith, startWithArray, startWithItem, startWithIterable, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribeOn, subscribeWith, switchIfEmpty, switchMap, switchMap, switchMapCompletable, switchMapCompletableDelayError, switchMapDelayError, switchMapDelayError, switchMapMaybe, switchMapMaybeDelayError, switchMapSingle, switchMapSingleDelayError, switchOnNext, switchOnNext, switchOnNextDelayError, switchOnNextDelayError, take, take, take, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeUntil, takeUntil, takeWhile, test, test, throttleFirst, throttleFirst, throttleFirst, throttleLast, throttleLast, throttleLast, throttleLatest, throttleLatest, throttleLatest, throttleLatest, throttleLatest, throttleWithTimeout, throttleWithTimeout, throttleWithTimeout, timeInterval, timeInterval, timeInterval, timeInterval, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timer, timer, timestamp, timestamp, timestamp, timestamp, to, toFlowable, toFuture, toList, toList, toList, toMap, toMap, toMap, toMultimap, toMultimap, toMultimap, toMultimap, toSortedList, toSortedList, toSortedList, toSortedList, unsafeCreate, unsubscribeOn, using, using, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, wrap, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zipArray, zipWith, zipWith, zipWith, zipWith
-
-
-
-
Field Detail
-
once
final java.util.concurrent.atomic.AtomicBoolean once
The subscription to the source should happen at most once.
-
capacityHint
final int capacityHint
The number of items per cached nodes.
-
observers
final java.util.concurrent.atomic.AtomicReference<ObservableCache.CacheDisposable<T>[]> observers
The current known array of observer state to notify.
-
EMPTY
static final ObservableCache.CacheDisposable[] EMPTY
A shared instance of an empty array of observers to avoid creating a new empty array when all observers dispose.
-
TERMINATED
static final ObservableCache.CacheDisposable[] TERMINATED
A shared instance indicating the source has no more events and there is no need to remember observers anymore.
-
size
volatile long size
The total number of elements in the list available for reads.
-
head
final ObservableCache.Node<T> head
The starting point of the cached items.
-
tail
ObservableCache.Node<T> tail
The current tail of the linked structure holding the items.
-
tailOffset
int tailOffset
How many items have been put into the tail node so far.
-
error
java.lang.Throwable error
Ifobservers
isTERMINATED
, this holds the terminal error if not null.
-
done
volatile boolean done
True if the source has terminated.
-
-
Constructor Detail
-
ObservableCache
public ObservableCache(Observable<T> source, int capacityHint)
Constructs an empty, non-connected cache.- Parameters:
source
- the source to subscribe to for the first incoming observercapacityHint
- the number of items expected (reduce allocation frequency)
-
-
Method Detail
-
subscribeActual
protected void subscribeActual(Observer<? super T> t)
Description copied from class:Observable
Operator implementations (both source and intermediate) should implement this method that performs the necessary business logic and handles the incomingObserver
s.There is no need to call any of the plugin hooks on the current
Observable
instance or theObserver
; all hooks and basic safeguards have been applied byObservable.subscribe(Observer)
before this method gets called.- Specified by:
subscribeActual
in classObservable<T>
- Parameters:
t
- the incomingObserver
, nevernull
-
isConnected
boolean isConnected()
Check if this cached observable is connected to its source.- Returns:
- true if already connected
-
hasObservers
boolean hasObservers()
Returns true if there are observers subscribed to this observable.- Returns:
- true if the cache has observers
-
cachedEventCount
long cachedEventCount()
Returns the number of events currently cached.- Returns:
- the number of currently cached event count
-
add
void add(ObservableCache.CacheDisposable<T> consumer)
Atomically adds the consumer to theobservers
copy-on-write array if the source has not yet terminated.- Parameters:
consumer
- the consumer to add
-
remove
void remove(ObservableCache.CacheDisposable<T> consumer)
Atomically removes the consumer from theobservers
copy-on-write array.- Parameters:
consumer
- the consumer to remove
-
replay
void replay(ObservableCache.CacheDisposable<T> consumer)
Replays the contents of this cache to the given consumer based on its current state and number of items requested by it.- Parameters:
consumer
- the consumer to continue replaying items to
-
onSubscribe
public void onSubscribe(Disposable d)
Description copied from interface:Observer
Provides theObserver
with the means of cancelling (disposing) the connection (channel) with theObservable
in both synchronous (from withinObserver.onNext(Object)
) and asynchronous manner.- Specified by:
onSubscribe
in interfaceObserver<T>
- Parameters:
d
- theDisposable
instance whoseDisposable.dispose()
can be called anytime to cancel the connection
-
onNext
public void onNext(T t)
Description copied from interface:Observer
Provides theObserver
with a new item to observe.The
Observable
may call this method 0 or more times.The
Observable
will not call this method again after it calls eitherObserver.onComplete()
orObserver.onError(java.lang.Throwable)
.
-
onError
public void onError(java.lang.Throwable t)
Description copied from interface:Observer
Notifies theObserver
that theObservable
has experienced an error condition.If the
Observable
calls this method, it will not thereafter callObserver.onNext(T)
orObserver.onComplete()
.
-
onComplete
public void onComplete()
Description copied from interface:Observer
Notifies theObserver
that theObservable
has finished sending push-based notifications.The
Observable
will not call this method if it callsObserver.onError(java.lang.Throwable)
.- Specified by:
onComplete
in interfaceObserver<T>
-
-