Class ConnectableObservable<T>
- Type Parameters:
T
- the type of items emitted by theConnectableObservable
- All Implemented Interfaces:
ObservableSource<T>
- Direct Known Subclasses:
ObservablePublish
,ObservableReplay
ConnectableObservable
resembles an ordinary Observable
, except that it does not begin
emitting items when it is subscribed to, but only when its connect(io.reactivex.rxjava3.functions.Consumer<? super io.reactivex.rxjava3.disposables.Disposable>)
method is called. In this way you
can wait for all intended Observer
s to Observable.subscribe()
to the Observable
before the Observable
begins emitting items.
When the upstream terminates, the ConnectableObservable
remains in this terminated state and,
depending on the actual underlying implementation, relays cached events to late Observer
s.
In order to reuse and restart this ConnectableObservable
, the reset()
method has to be called.
When called, this ConnectableObservable
will appear as fresh, unconnected source to new Observer
s.
Disposing the connection will reset the ConnectableObservable
to its fresh state and there is no need to call
reset()
in this case.
Note that although connect()
and reset()
are safe to call from multiple threads, it is recommended
a dedicated thread or business logic manages the connection or resetting of a ConnectableObservable
so that
there is no unwanted signal loss due to early connect()
or reset()
calls while Observer
s are
still being subscribed to to this ConnectableObservable
to receive signals from the get go.
- See Also:
-
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionReturns anObservable
that automatically connects (at most once) to thisConnectableObservable
when the firstObserver
subscribes.autoConnect
(int numberOfObservers) Returns anObservable
that automatically connects (at most once) to thisConnectableObservable
when the specified number ofObserver
s subscribe to it.autoConnect
(int numberOfObservers, @NonNull Consumer<? super Disposable> connection) Returns anObservable
that automatically connects (at most once) to thisConnectableObservable
when the specified number ofObserver
s subscribe to it and calls the specified callback with theDisposable
associated with the established connection.final @NonNull Disposable
connect()
Instructs theConnectableObservable
to begin emitting the items from its underlyingObservable
to itsObserver
s.abstract void
connect
(@NonNull Consumer<? super Disposable> connection) Instructs theConnectableObservable
to begin emitting the items from its underlyingObservable
to itsObserver
s.refCount()
Returns anObservable
that stays connected to thisConnectableObservable
as long as there is at least one subscription to thisConnectableObservable
.final @NonNull Observable
<T> refCount
(int observerCount) Connects to the upstreamConnectableObservable
if the number of subscribed observers reaches the specified count and disconnect if allObserver
s have unsubscribed.final @NonNull Observable
<T> Connects to the upstreamConnectableObservable
if the number of subscribed observers reaches the specified count and disconnect after the specified timeout if allObserver
s have unsubscribed.final @NonNull Observable
<T> Connects to the upstreamConnectableObservable
if the number of subscribed observers reaches the specified count and disconnect after the specified timeout if allObserver
s have unsubscribed.final @NonNull Observable
<T> Connects to the upstreamConnectableObservable
if the number of subscribed observers reaches 1 and disconnect after the specified timeout if allObserver
s have unsubscribed.final @NonNull Observable
<T> Connects to the upstreamConnectableObservable
if the number of subscribed observers reaches 1 and disconnect after the specified timeout if allObserver
s have unsubscribed.abstract void
reset()
Resets thisConnectableObservable
into its fresh state if it has terminated or has been disposed.Methods inherited from class io.reactivex.rxjava3.core.Observable
all, amb, ambArray, ambWith, any, blockingFirst, blockingFirst, blockingForEach, blockingForEach, blockingIterable, blockingIterable, blockingLast, blockingLast, blockingLatest, blockingMostRecent, blockingNext, blockingSingle, blockingSingle, blockingStream, blockingStream, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, bufferSize, cache, cacheWithInitialCapacity, cast, collect, collect, collectInto, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatestArray, combineLatestArray, combineLatestArrayDelayError, combineLatestArrayDelayError, combineLatestDelayError, combineLatestDelayError, compose, concat, concat, concat, concat, concat, concat, concatArray, concatArrayDelayError, concatArrayEager, concatArrayEager, concatArrayEagerDelayError, concatArrayEagerDelayError, concatDelayError, concatDelayError, concatDelayError, concatEager, concatEager, concatEager, concatEager, concatEagerDelayError, concatEagerDelayError, concatEagerDelayError, concatEagerDelayError, concatMap, concatMap, concatMap, concatMapCompletable, concatMapCompletable, concatMapCompletableDelayError, concatMapCompletableDelayError, concatMapCompletableDelayError, concatMapDelayError, concatMapDelayError, concatMapDelayError, concatMapEager, concatMapEager, concatMapEagerDelayError, concatMapEagerDelayError, concatMapIterable, concatMapMaybe, concatMapMaybe, concatMapMaybeDelayError, concatMapMaybeDelayError, concatMapMaybeDelayError, concatMapSingle, concatMapSingle, concatMapSingleDelayError, concatMapSingleDelayError, concatMapSingleDelayError, concatMapStream, concatWith, concatWith, concatWith, concatWith, contains, count, create, debounce, debounce, debounce, debounce, defaultIfEmpty, defer, delay, delay, delay, delay, delay, delay, delaySubscription, delaySubscription, delaySubscription, dematerialize, distinct, distinct, distinct, distinctUntilChanged, distinctUntilChanged, distinctUntilChanged, doAfterNext, doAfterTerminate, doFinally, doOnComplete, doOnDispose, doOnEach, doOnEach, doOnError, doOnLifecycle, doOnNext, doOnSubscribe, doOnTerminate, elementAt, elementAt, elementAtOrError, empty, error, error, filter, first, firstElement, firstOrError, firstOrErrorStage, firstStage, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMapCompletable, flatMapCompletable, flatMapIterable, flatMapIterable, flatMapMaybe, flatMapMaybe, flatMapSingle, flatMapSingle, flatMapStream, forEach, forEachWhile, forEachWhile, forEachWhile, fromAction, fromArray, fromCallable, fromCompletable, fromCompletionStage, fromFuture, fromFuture, fromIterable, fromMaybe, fromOptional, fromPublisher, fromRunnable, fromSingle, fromStream, fromSupplier, generate, generate, generate, generate, generate, groupBy, groupBy, groupBy, groupBy, groupBy, groupJoin, hide, ignoreElements, interval, interval, interval, interval, intervalRange, intervalRange, isEmpty, join, just, just, just, just, just, just, just, just, just, just, last, lastElement, lastOrError, lastOrErrorStage, lastStage, lift, map, mapOptional, materialize, merge, merge, merge, merge, merge, merge, merge, merge, mergeArray, mergeArray, mergeArrayDelayError, mergeArrayDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeWith, mergeWith, mergeWith, mergeWith, never, observeOn, observeOn, observeOn, ofType, onErrorComplete, onErrorComplete, onErrorResumeNext, onErrorResumeWith, onErrorReturn, onErrorReturnItem, onTerminateDetach, publish, publish, range, rangeLong, reduce, reduce, reduceWith, repeat, repeat, repeatUntil, repeatWhen, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, retry, retry, retry, retry, retry, retryUntil, retryWhen, safeSubscribe, sample, sample, sample, sample, sample, sample, sample, scan, scan, scanWith, sequenceEqual, sequenceEqual, sequenceEqual, sequenceEqual, serialize, share, single, singleElement, singleOrError, singleOrErrorStage, singleStage, skip, skip, skip, skipLast, skipLast, skipLast, skipLast, skipLast, skipLast, skipUntil, skipWhile, sorted, sorted, startWith, startWith, startWith, startWith, startWithArray, startWithItem, startWithIterable, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribeActual, subscribeOn, subscribeWith, switchIfEmpty, switchMap, switchMap, switchMapCompletable, switchMapCompletableDelayError, switchMapDelayError, switchMapDelayError, switchMapMaybe, switchMapMaybeDelayError, switchMapSingle, switchMapSingleDelayError, switchOnNext, switchOnNext, switchOnNextDelayError, switchOnNextDelayError, take, take, take, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeUntil, takeUntil, takeWhile, test, test, throttleFirst, throttleFirst, throttleFirst, throttleLast, throttleLast, throttleLast, throttleLatest, throttleLatest, throttleLatest, throttleLatest, throttleLatest, throttleWithTimeout, throttleWithTimeout, throttleWithTimeout, timeInterval, timeInterval, timeInterval, timeInterval, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timer, timer, timestamp, timestamp, timestamp, timestamp, to, toFlowable, toFuture, toList, toList, toList, toMap, toMap, toMap, toMultimap, toMultimap, toMultimap, toMultimap, toSortedList, toSortedList, toSortedList, toSortedList, unsafeCreate, unsubscribeOn, using, using, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, wrap, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zipArray, zipWith, zipWith, zipWith, zipWith
-
Constructor Details
-
ConnectableObservable
public ConnectableObservable()
-
-
Method Details
-
connect
@SchedulerSupport("none") public abstract void connect(@NonNull @NonNull Consumer<? super Disposable> connection) Instructs theConnectableObservable
to begin emitting the items from its underlyingObservable
to itsObserver
s.- Scheduler:
- The behavior is determined by the implementor of this abstract class.
- Parameters:
connection
- the action that receives the connection subscription before the subscription to source happens allowing the caller to synchronously disconnect a synchronous source- Throws:
NullPointerException
- ifconnection
isnull
- See Also:
-
reset
Resets thisConnectableObservable
into its fresh state if it has terminated or has been disposed.Calling this method on a fresh or active
ConnectableObservable
has no effect.- Scheduler:
- The behavior is determined by the implementor of this abstract class.
- Since:
- 3.0.0
-
connect
Instructs theConnectableObservable
to begin emitting the items from its underlyingObservable
to itsObserver
s.To disconnect from a synchronous source, use the
connect(Consumer)
method.- Scheduler:
- The behavior is determined by the implementor of this abstract class.
- Returns:
- the
Disposable
representing the connection - See Also:
-
refCount
Returns anObservable
that stays connected to thisConnectableObservable
as long as there is at least one subscription to thisConnectableObservable
.- Scheduler:
- This
refCount
overload does not operate on any particularScheduler
.
- Returns:
- a new
Observable
instance - See Also:
-
refCount
@CheckReturnValue @SchedulerSupport("none") @NonNull public final @NonNull Observable<T> refCount(int observerCount) Connects to the upstreamConnectableObservable
if the number of subscribed observers reaches the specified count and disconnect if allObserver
s have unsubscribed.- Scheduler:
- This
refCount
overload does not operate on any particularScheduler
.
History: 2.1.14 - experimental
- Parameters:
observerCount
- the number ofObserver
s required to connect to the upstream- Returns:
- the new
Observable
instance - Throws:
IllegalArgumentException
- ifobserverCount
is non-positive- Since:
- 2.2
-
refCount
@CheckReturnValue @SchedulerSupport("io.reactivex:computation") @NonNull public final @NonNull Observable<T> refCount(long timeout, @NonNull @NonNull TimeUnit unit) Connects to the upstreamConnectableObservable
if the number of subscribed observers reaches 1 and disconnect after the specified timeout if allObserver
s have unsubscribed.- Scheduler:
- This
refCount
overload operates on thecomputation
Scheduler
.
History: 2.1.14 - experimental
- Parameters:
timeout
- the time to wait before disconnecting after allObserver
s unsubscribedunit
- the time unit of the timeout- Returns:
- the new
Observable
instance - Throws:
NullPointerException
- ifunit
isnull
- Since:
- 2.2
- See Also:
-
refCount
@CheckReturnValue @SchedulerSupport("custom") @NonNull public final @NonNull Observable<T> refCount(long timeout, @NonNull @NonNull TimeUnit unit, @NonNull @NonNull Scheduler scheduler) Connects to the upstreamConnectableObservable
if the number of subscribed observers reaches 1 and disconnect after the specified timeout if allObserver
s have unsubscribed.- Scheduler:
- This
refCount
overload operates on the specifiedScheduler
.
History: 2.1.14 - experimental
- Parameters:
timeout
- the time to wait before disconnecting after allObserver
s unsubscribedunit
- the time unit of the timeoutscheduler
- the target scheduler to wait on before disconnecting- Returns:
- the new
Observable
instance - Throws:
NullPointerException
- ifunit
orscheduler
isnull
- Since:
- 2.2
-
refCount
@CheckReturnValue @SchedulerSupport("io.reactivex:computation") @NonNull public final @NonNull Observable<T> refCount(int observerCount, long timeout, @NonNull @NonNull TimeUnit unit) Connects to the upstreamConnectableObservable
if the number of subscribed observers reaches the specified count and disconnect after the specified timeout if allObserver
s have unsubscribed.- Scheduler:
- This
refCount
overload operates on thecomputation
Scheduler
.
History: 2.1.14 - experimental
- Parameters:
observerCount
- the number ofObserver
s required to connect to the upstreamtimeout
- the time to wait before disconnecting after allObserver
s unsubscribedunit
- the time unit of the timeout- Returns:
- the new
Observable
instance - Throws:
NullPointerException
- ifunit
orscheduler
isnull
IllegalArgumentException
- ifobserverCount
is non-positive- Since:
- 2.2
- See Also:
-
refCount
@CheckReturnValue @SchedulerSupport("custom") @NonNull public final @NonNull Observable<T> refCount(int observerCount, long timeout, @NonNull @NonNull TimeUnit unit, @NonNull @NonNull Scheduler scheduler) Connects to the upstreamConnectableObservable
if the number of subscribed observers reaches the specified count and disconnect after the specified timeout if allObserver
s have unsubscribed.- Scheduler:
- This
refCount
overload operates on the specifiedScheduler
.
History: 2.1.14 - experimental
- Parameters:
observerCount
- the number ofObserver
s required to connect to the upstreamtimeout
- the time to wait before disconnecting after allObserver
s unsubscribedunit
- the time unit of the timeoutscheduler
- the target scheduler to wait on before disconnecting- Returns:
- the new
Observable
instance - Throws:
NullPointerException
- ifunit
orscheduler
isnull
IllegalArgumentException
- ifobserverCount
is non-positive- Since:
- 2.2
-
autoConnect
Returns anObservable
that automatically connects (at most once) to thisConnectableObservable
when the firstObserver
subscribes.The connection happens after the first subscription and happens at most once during the lifetime of the returned
Observable
. If thisConnectableObservable
terminates, the connection is never renewed, no matter howObserver
s come and go. UserefCount()
to renew a connection or dispose an active connection when allObserver
s have disposed theirDisposable
s.This overload does not allow disconnecting the connection established via
connect(Consumer)
. Use theautoConnect(int, Consumer)
overload to gain access to theDisposable
representing the only connection.- Scheduler:
autoConnect
overload does not operate on any particularScheduler
.
- Returns:
- a new
Observable
instance that automatically connects to thisConnectableObservable
when the firstObserver
subscribes
-
autoConnect
@NonNull @CheckReturnValue @SchedulerSupport("none") public @NonNull Observable<T> autoConnect(int numberOfObservers) Returns anObservable
that automatically connects (at most once) to thisConnectableObservable
when the specified number ofObserver
s subscribe to it.The connection happens after the given number of subscriptions and happens at most once during the lifetime of the returned
Observable
. If thisConnectableObservable
terminates, the connection is never renewed, no matter howObserver
s come and go. UserefCount()
to renew a connection or dispose an active connection when allObserver
s have disposed theirDisposable
s.This overload does not allow disconnecting the connection established via
connect(Consumer)
. Use theautoConnect(int, Consumer)
overload to gain access to theDisposable
representing the only connection.- Scheduler:
autoConnect
overload does not operate on any particularScheduler
.
- Parameters:
numberOfObservers
- the number of subscribers to await before calling connect on theConnectableObservable
. A non-positive value indicates an immediate connection.- Returns:
- a new
Observable
instance that automatically connects to thisConnectableObservable
when the specified number ofObserver
s subscribe to it
-
autoConnect
@NonNull @CheckReturnValue @SchedulerSupport("none") public @NonNull Observable<T> autoConnect(int numberOfObservers, @NonNull @NonNull Consumer<? super Disposable> connection) Returns anObservable
that automatically connects (at most once) to thisConnectableObservable
when the specified number ofObserver
s subscribe to it and calls the specified callback with theDisposable
associated with the established connection.The connection happens after the given number of subscriptions and happens at most once during the lifetime of the returned
Observable
. If thisConnectableObservable
terminates, the connection is never renewed, no matter howObserver
s come and go. UserefCount()
to renew a connection or dispose an active connection when allObserver
s have disposed theirDisposable
s.- Scheduler:
autoConnect
overload does not operate on any particularScheduler
.
- Parameters:
numberOfObservers
- the number of subscribers to await before calling connect on theConnectableObservable
. A non-positive value indicates an immediate connection.connection
- the callbackConsumer
that will receive theDisposable
representing the established connection- Returns:
- a new
Observable
instance that automatically connects to thisConnectableObservable
when the specified number ofObserver
s subscribe to it and calls the specified callback with theDisposable
associated with the established connection - Throws:
NullPointerException
- ifconnection
isnull
-