Class UnicastSubject<T>

  • Type Parameters:
    T - the value type received and emitted by this Subject subclass
    All Implemented Interfaces:
    ObservableSource<T>, Observer<T>

    public final class UnicastSubject<T>
    extends Subject<T>
    A Subject that queues up events until a single Observer subscribes to it, replays those events to it until the Observer catches up and then switches to relaying events live to this single Observer until this UnicastSubject terminates or the Observer disposes.

    Note that UnicastSubject holds an unbounded internal buffer.

    This subject does not have a public constructor by design; a new empty instance of this UnicastSubject can be created via the following create methods that allow specifying the retention policy for items:

    • create() - creates an empty, unbounded UnicastSubject that caches all items and the terminal event it receives.
    • create(int) - creates an empty, unbounded UnicastSubject with a hint about how many total items one expects to retain.
    • create(boolean) - creates an empty, unbounded UnicastSubject that optionally delays an error it receives and replays it after the regular items have been emitted.
    • create(int, Runnable) - creates an empty, unbounded UnicastSubject with a hint about how many total items one expects to retain and a callback that will be called exactly once when the UnicastSubject gets terminated or the single Observer disposes.
    • create(int, Runnable, boolean) - creates an empty, unbounded UnicastSubject with a hint about how many total items one expects to retain and a callback that will be called exactly once when the UnicastSubject gets terminated or the single Observer disposes and optionally delays an error it receives and replays it after the regular items have been emitted.

    If more than one Observer attempts to subscribe to this UnicastSubject, they will receive an IllegalStateException indicating the single-use-only nature of this UnicastSubject, even if the UnicastSubject already terminated with an error.

    Since a Subject is conceptionally derived from the Processor type in the Reactive Streams specification, nulls are not allowed (Rule 2.13) as parameters to onNext(Object) and onError(Throwable). Such calls will result in a NullPointerException being thrown and the subject's state is not changed.

    Since a UnicastSubject is an Observable, it does not support backpressure.

    When this UnicastSubject is terminated via onError(Throwable) the current or late single Observer may receive the Throwable before any available items could be emitted. To make sure an onError event is delivered to the Observer after the normal items, create a UnicastSubject with the create(boolean) or create(int, Runnable, boolean) factory methods.

    Even though UnicastSubject implements the Observer interface, calling onSubscribe is not required (Rule 2.12) if the subject is used as a standalone source. However, calling onSubscribe after the UnicastSubject reached its terminal state will result in the given Disposable being disposed immediately.

    Calling onNext(Object), onError(Throwable) and onComplete() is required to be serialized (called from the same thread or called non-overlappingly from different threads through external means of serialization). The Subject.toSerialized() method available to all Subjects provides such serialization and also protects against reentrance (i.e., when a downstream Observer consuming this subject also wants to call onNext(Object) on this subject recursively).

    This UnicastSubject supports the standard state-peeking methods hasComplete(), hasThrowable(), getThrowable() and hasObservers().

    Scheduler:
    UnicastSubject does not operate by default on a particular Scheduler and the single Observer gets notified on the thread the respective onXXX methods were invoked.
    Error handling:
    When the onError(Throwable) is called, the UnicastSubject enters into a terminal state and emits the same Throwable instance to the current single Observer. During this emission, if the single Observers disposes its respective Disposable, the Throwable is delivered to the global error handler via RxJavaPlugins.onError(Throwable). If there were no Observers subscribed to this UnicastSubject when the onError() was called, the global error handler is not invoked.

    Example usage:

    
     UnicastSubject<Integer> subject = UnicastSubject.create();
    
     TestObserver<Integer> to1 = subject.test();
    
     // fresh UnicastSubjects are empty
     to1.assertEmpty();
    
     TestObserver<Integer> to2 = subject.test();
    
     // A UnicastSubject only allows one Observer during its lifetime
     to2.assertFailure(IllegalStateException.class);
    
     subject.onNext(1);
     to1.assertValue(1);
    
     subject.onNext(2);
     to1.assertValues(1, 2);
    
     subject.onComplete();
     to1.assertResult(1, 2);
    
     // ----------------------------------------------------
    
     UnicastSubject<Integer> subject2 = UnicastSubject.create();
    
     // a UnicastSubject caches events until its single Observer subscribes
     subject2.onNext(1);
     subject2.onNext(2);
     subject2.onComplete();
    
     TestObserver<Integer> to3 = subject2.test();
    
     // the cached events are emitted in order
     to3.assertResult(1, 2);
     
    Since:
    2.0
    • Field Detail

      • downstream

        final java.util.concurrent.atomic.AtomicReference<Observer<? super T>> downstream
        The single Observer.
      • onTerminate

        final java.util.concurrent.atomic.AtomicReference<java.lang.Runnable> onTerminate
        The optional callback when the Subject gets cancelled or terminates.
      • delayError

        final boolean delayError
        deliver onNext events before error event.
      • disposed

        volatile boolean disposed
        Indicates the single observer has cancelled.
      • done

        volatile boolean done
        Indicates the source has terminated.
      • error

        java.lang.Throwable error
        The terminal error if not null. Must be set before writing to done and read after done == true.
      • once

        final java.util.concurrent.atomic.AtomicBoolean once
        Set to 1 atomically for the first and only Subscriber.
      • enableOperatorFusion

        boolean enableOperatorFusion
    • Constructor Detail

      • UnicastSubject

        UnicastSubject​(int capacityHint,
                       java.lang.Runnable onTerminate,
                       boolean delayError)
        Creates an UnicastSubject with the given capacity hint, delay error flag and callback for when the Subject is terminated normally or its single Subscriber cancels.

        History: 2.0.8 - experimental

        Parameters:
        capacityHint - the capacity hint for the internal, unbounded queue
        onTerminate - the callback to run when the Subject is terminated or cancelled, null not allowed
        delayError - deliver pending onNext events before onError
        Since:
        2.2
    • Method Detail

      • create

        @CheckReturnValue
        @NonNull
        public static <T> @NonNull UnicastSubject<T> create​(int capacityHint)
        Creates an UnicastSubject with the given internal buffer capacity hint.
        Type Parameters:
        T - the value type
        Parameters:
        capacityHint - the hint to size the internal unbounded buffer
        Returns:
        an UnicastSubject instance
        Throws:
        java.lang.IllegalArgumentException - if capacityHint is non-positive
      • create

        @CheckReturnValue
        @NonNull
        public static <T> @NonNull UnicastSubject<T> create​(int capacityHint,
                                                            @NonNull
                                                            @NonNull java.lang.Runnable onTerminate)
        Creates an UnicastSubject with the given internal buffer capacity hint and a callback for the case when the single Subscriber cancels its subscription or the subject is terminated.

        The callback, if not null, is called exactly once and non-overlapped with any active replay.

        Type Parameters:
        T - the value type
        Parameters:
        capacityHint - the hint to size the internal unbounded buffer
        onTerminate - the callback to run when the Subject is terminated or cancelled, null not allowed
        Returns:
        an UnicastSubject instance
        Throws:
        java.lang.NullPointerException - if onTerminate is null
        java.lang.IllegalArgumentException - if capacityHint is non-positive
      • create

        @CheckReturnValue
        @NonNull
        public static <T> @NonNull UnicastSubject<T> create​(int capacityHint,
                                                            @NonNull
                                                            @NonNull java.lang.Runnable onTerminate,
                                                            boolean delayError)
        Creates an UnicastSubject with the given internal buffer capacity hint, delay error flag and a callback for the case when the single Observer disposes its Disposable or the subject is terminated.

        The callback, if not null, is called exactly once and non-overlapped with any active replay.

        History: 2.0.8 - experimental

        Type Parameters:
        T - the value type
        Parameters:
        capacityHint - the hint to size the internal unbounded buffer
        onTerminate - the callback to run when the Subject is terminated or cancelled, null not allowed
        delayError - deliver pending onNext events before onError
        Returns:
        an UnicastSubject instance
        Throws:
        java.lang.NullPointerException - if onTerminate is null
        java.lang.IllegalArgumentException - if capacityHint is non-positive
        Since:
        2.2
      • create

        @CheckReturnValue
        @NonNull
        public static <T> @NonNull UnicastSubject<T> create​(boolean delayError)
        Creates an UnicastSubject with an internal buffer capacity hint 16 and given delay error flag.

        The callback, if not null, is called exactly once and non-overlapped with any active replay.

        History: 2.0.8 - experimental

        Type Parameters:
        T - the value type
        Parameters:
        delayError - deliver pending onNext events before onError
        Returns:
        an UnicastSubject instance
        Since:
        2.2
      • subscribeActual

        protected void subscribeActual​(Observer<? super T> observer)
        Description copied from class: Observable
        Operator implementations (both source and intermediate) should implement this method that performs the necessary business logic and handles the incoming Observers.

        There is no need to call any of the plugin hooks on the current Observable instance or the Observer; all hooks and basic safeguards have been applied by Observable.subscribe(Observer) before this method gets called.

        Specified by:
        subscribeActual in class Observable<T>
        Parameters:
        observer - the incoming Observer, never null
      • doTerminate

        void doTerminate()
      • onError

        public void onError​(java.lang.Throwable t)
        Description copied from interface: Observer
        Notifies the Observer that the Observable has experienced an error condition.

        If the Observable calls this method, it will not thereafter call Observer.onNext(T) or Observer.onComplete().

        Parameters:
        t - the exception encountered by the Observable
      • drainNormal

        void drainNormal​(Observer<? super T> a)
      • drainFused

        void drainFused​(Observer<? super T> a)
      • errorOrComplete

        void errorOrComplete​(Observer<? super T> a)
      • drain

        void drain()
      • hasObservers

        @CheckReturnValue
        public boolean hasObservers()
        Description copied from class: Subject
        Returns true if the subject has any Observers.

        The method is thread-safe.

        Specified by:
        hasObservers in class Subject<T>
        Returns:
        true if the subject has any Observers
      • getThrowable

        @Nullable
        @CheckReturnValue
        public @Nullable java.lang.Throwable getThrowable()
        Description copied from class: Subject
        Returns the error that caused the Subject to terminate or null if the Subject hasn't terminated yet.

        The method is thread-safe.

        Specified by:
        getThrowable in class Subject<T>
        Returns:
        the error that caused the Subject to terminate or null if the Subject hasn't terminated yet
      • hasComplete

        @CheckReturnValue
        public boolean hasComplete()
        Description copied from class: Subject
        Returns true if the subject has reached a terminal state through a complete event.

        The method is thread-safe.

        Specified by:
        hasComplete in class Subject<T>
        Returns:
        true if the subject has reached a terminal state through a complete event
        See Also:
        Subject.hasThrowable()