Class ConnectableObservable<T>

  • Type Parameters:
    T - the type of items emitted by the ConnectableObservable
    All Implemented Interfaces:
    ObservableSource<T>
    Direct Known Subclasses:
    ObservablePublish, ObservableReplay

    public abstract class ConnectableObservable<T>
    extends Observable<T>
    A ConnectableObservable resembles an ordinary Observable, except that it does not begin emitting items when it is subscribed to, but only when its connect(io.reactivex.rxjava3.functions.Consumer<? super io.reactivex.rxjava3.disposables.Disposable>) method is called. In this way you can wait for all intended Observers to Observable.subscribe() to the Observable before the Observable begins emitting items.

    When the upstream terminates, the ConnectableObservable remains in this terminated state and, depending on the actual underlying implementation, relays cached events to late Observers. In order to reuse and restart this ConnectableObservable, the reset() method has to be called. When called, this ConnectableObservable will appear as fresh, unconnected source to new Observers. Disposing the connection will reset the ConnectableObservable to its fresh state and there is no need to call reset() in this case.

    Note that although connect() and reset() are safe to call from multiple threads, it is recommended a dedicated thread or business logic manages the connection or resetting of a ConnectableObservable so that there is no unwanted signal loss due to early connect() or reset() calls while Observers are still being subscribed to to this ConnectableObservable to receive signals from the get go.

    See Also:
    RxJava Wiki: Connectable Observable Operators
    • Constructor Detail

      • ConnectableObservable

        public ConnectableObservable()
    • Method Detail

      • connect

        @SchedulerSupport("none")
        public abstract void connect​(@NonNull
                                     @NonNull Consumer<? super Disposable> connection)
        Instructs the ConnectableObservable to begin emitting the items from its underlying Observable to its Observers.
        Scheduler:
        The behavior is determined by the implementor of this abstract class.
        Parameters:
        connection - the action that receives the connection subscription before the subscription to source happens allowing the caller to synchronously disconnect a synchronous source
        Throws:
        java.lang.NullPointerException - if connection is null
        See Also:
        ReactiveX documentation: Connect
      • reset

        @SchedulerSupport("none")
        public abstract void reset()
        Resets this ConnectableObservable into its fresh state if it has terminated or has been disposed.

        Calling this method on a fresh or active ConnectableObservable has no effect.

        Scheduler:
        The behavior is determined by the implementor of this abstract class.
        Since:
        3.0.0
      • refCount

        @CheckReturnValue
        @SchedulerSupport("none")
        @NonNull
        public final @NonNull Observable<T> refCount​(int observerCount)
        Connects to the upstream ConnectableObservable if the number of subscribed observers reaches the specified count and disconnect if all Observers have unsubscribed.
        Scheduler:
        This refCount overload does not operate on any particular Scheduler.

        History: 2.1.14 - experimental

        Parameters:
        observerCount - the number of Observers required to connect to the upstream
        Returns:
        the new Observable instance
        Throws:
        java.lang.IllegalArgumentException - if observerCount is non-positive
        Since:
        2.2
      • refCount

        @CheckReturnValue
        @SchedulerSupport("io.reactivex:computation")
        @NonNull
        public final @NonNull Observable<T> refCount​(long timeout,
                                                     @NonNull
                                                     @NonNull java.util.concurrent.TimeUnit unit)
        Connects to the upstream ConnectableObservable if the number of subscribed observers reaches 1 and disconnect after the specified timeout if all Observers have unsubscribed.
        Scheduler:
        This refCount overload operates on the computation Scheduler.

        History: 2.1.14 - experimental

        Parameters:
        timeout - the time to wait before disconnecting after all Observers unsubscribed
        unit - the time unit of the timeout
        Returns:
        the new Observable instance
        Throws:
        java.lang.NullPointerException - if unit is null
        Since:
        2.2
        See Also:
        refCount(long, TimeUnit, Scheduler)
      • refCount

        @CheckReturnValue
        @SchedulerSupport("custom")
        @NonNull
        public final @NonNull Observable<T> refCount​(long timeout,
                                                     @NonNull
                                                     @NonNull java.util.concurrent.TimeUnit unit,
                                                     @NonNull
                                                     @NonNull Scheduler scheduler)
        Connects to the upstream ConnectableObservable if the number of subscribed observers reaches 1 and disconnect after the specified timeout if all Observers have unsubscribed.
        Scheduler:
        This refCount overload operates on the specified Scheduler.

        History: 2.1.14 - experimental

        Parameters:
        timeout - the time to wait before disconnecting after all Observers unsubscribed
        unit - the time unit of the timeout
        scheduler - the target scheduler to wait on before disconnecting
        Returns:
        the new Observable instance
        Throws:
        java.lang.NullPointerException - if unit or scheduler is null
        Since:
        2.2
      • refCount

        @CheckReturnValue
        @SchedulerSupport("io.reactivex:computation")
        @NonNull
        public final @NonNull Observable<T> refCount​(int observerCount,
                                                     long timeout,
                                                     @NonNull
                                                     @NonNull java.util.concurrent.TimeUnit unit)
        Connects to the upstream ConnectableObservable if the number of subscribed observers reaches the specified count and disconnect after the specified timeout if all Observers have unsubscribed.
        Scheduler:
        This refCount overload operates on the computation Scheduler.

        History: 2.1.14 - experimental

        Parameters:
        observerCount - the number of Observers required to connect to the upstream
        timeout - the time to wait before disconnecting after all Observers unsubscribed
        unit - the time unit of the timeout
        Returns:
        the new Observable instance
        Throws:
        java.lang.NullPointerException - if unit or scheduler is null
        java.lang.IllegalArgumentException - if observerCount is non-positive
        Since:
        2.2
        See Also:
        refCount(int, long, TimeUnit, Scheduler)
      • refCount

        @CheckReturnValue
        @SchedulerSupport("custom")
        @NonNull
        public final @NonNull Observable<T> refCount​(int observerCount,
                                                     long timeout,
                                                     @NonNull
                                                     @NonNull java.util.concurrent.TimeUnit unit,
                                                     @NonNull
                                                     @NonNull Scheduler scheduler)
        Connects to the upstream ConnectableObservable if the number of subscribed observers reaches the specified count and disconnect after the specified timeout if all Observers have unsubscribed.
        Scheduler:
        This refCount overload operates on the specified Scheduler.

        History: 2.1.14 - experimental

        Parameters:
        observerCount - the number of Observers required to connect to the upstream
        timeout - the time to wait before disconnecting after all Observers unsubscribed
        unit - the time unit of the timeout
        scheduler - the target scheduler to wait on before disconnecting
        Returns:
        the new Observable instance
        Throws:
        java.lang.NullPointerException - if unit or scheduler is null
        java.lang.IllegalArgumentException - if observerCount is non-positive
        Since:
        2.2
      • autoConnect

        @NonNull
        @CheckReturnValue
        @SchedulerSupport("none")
        public @NonNull Observable<T> autoConnect()
        Returns an Observable that automatically connects (at most once) to this ConnectableObservable when the first Observer subscribes.

        The connection happens after the first subscription and happens at most once during the lifetime of the returned Observable. If this ConnectableObservable terminates, the connection is never renewed, no matter how Observers come and go. Use refCount() to renew a connection or dispose an active connection when all Observers have disposed their Disposables.

        This overload does not allow disconnecting the connection established via connect(Consumer). Use the autoConnect(int, Consumer) overload to gain access to the Disposable representing the only connection.

        Scheduler:
        autoConnect overload does not operate on any particular Scheduler.
        Returns:
        a new Observable instance that automatically connects to this ConnectableObservable when the first Observer subscribes
      • autoConnect

        @NonNull
        @CheckReturnValue
        @SchedulerSupport("none")
        public @NonNull Observable<T> autoConnect​(int numberOfObservers)
        Returns an Observable that automatically connects (at most once) to this ConnectableObservable when the specified number of Observers subscribe to it.

        The connection happens after the given number of subscriptions and happens at most once during the lifetime of the returned Observable. If this ConnectableObservable terminates, the connection is never renewed, no matter how Observers come and go. Use refCount() to renew a connection or dispose an active connection when all Observers have disposed their Disposables.

        This overload does not allow disconnecting the connection established via connect(Consumer). Use the autoConnect(int, Consumer) overload to gain access to the Disposable representing the only connection.

        Scheduler:
        autoConnect overload does not operate on any particular Scheduler.
        Parameters:
        numberOfObservers - the number of subscribers to await before calling connect on the ConnectableObservable. A non-positive value indicates an immediate connection.
        Returns:
        a new Observable instance that automatically connects to this ConnectableObservable when the specified number of Observers subscribe to it
      • autoConnect

        @NonNull
        @CheckReturnValue
        @SchedulerSupport("none")
        public @NonNull Observable<T> autoConnect​(int numberOfObservers,
                                                  @NonNull
                                                  @NonNull Consumer<? super Disposable> connection)
        Returns an Observable that automatically connects (at most once) to this ConnectableObservable when the specified number of Observers subscribe to it and calls the specified callback with the Disposable associated with the established connection.

        The connection happens after the given number of subscriptions and happens at most once during the lifetime of the returned Observable. If this ConnectableObservable terminates, the connection is never renewed, no matter how Observers come and go. Use refCount() to renew a connection or dispose an active connection when all Observers have disposed their Disposables.

        Scheduler:
        autoConnect overload does not operate on any particular Scheduler.
        Parameters:
        numberOfObservers - the number of subscribers to await before calling connect on the ConnectableObservable. A non-positive value indicates an immediate connection.
        connection - the callback Consumer that will receive the Disposable representing the established connection
        Returns:
        a new Observable instance that automatically connects to this ConnectableObservable when the specified number of Observers subscribe to it and calls the specified callback with the Disposable associated with the established connection
        Throws:
        java.lang.NullPointerException - if connection is null