Class MulticastProcessor<T>

  • Type Parameters:
    T - the input and output value type
    All Implemented Interfaces:
    FlowableSubscriber<T>, org.reactivestreams.Processor<T,​T>, org.reactivestreams.Publisher<T>, org.reactivestreams.Subscriber<T>

    @BackpressureSupport(FULL)
    @SchedulerSupport("none")
    public final class MulticastProcessor<@NonNull T>
    extends FlowableProcessor<T>
    A FlowableProcessor implementation that coordinates downstream requests through a front-buffer and stable-prefetching, optionally canceling the upstream if all subscribers have cancelled.

    This processor does not have a public constructor by design; a new empty instance of this MulticastProcessor can be created via the following create methods that allow configuring it:

    • create(): create an empty MulticastProcessor with Flowable.bufferSize() prefetch amount and no reference counting behavior.
    • create(int): create an empty MulticastProcessor with the given prefetch amount and no reference counting behavior.
    • create(boolean): create an empty MulticastProcessor with Flowable.bufferSize() prefetch amount and an optional reference counting behavior.
    • create(int, boolean): create an empty MulticastProcessor with the given prefetch amount and an optional reference counting behavior.

    When the reference counting behavior is enabled, the MulticastProcessor cancels its upstream when all Subscribers have cancelled. Late Subscribers will then be immediately completed.

    Because MulticastProcessor implements the Subscriber interface, calling onSubscribe is mandatory (Rule 2.12). If MulticastProcessor should run standalone, i.e., without subscribing the MulticastProcessor to another Publisher, use start() or startUnbounded() methods to initialize the internal buffer. Failing to do so will lead to a NullPointerException at runtime.

    Use offer(Object) to try and offer/emit items but don't fail if the internal buffer is full.

    A MulticastProcessor is a Processor type in the Reactive Streams specification, nulls are not allowed (Rule 2.13) as parameters to onSubscribe(Subscription), offer(Object), onNext(Object) and onError(Throwable). Such calls will result in a NullPointerException being thrown and the processor's state is not changed.

    Since a MulticastProcessor is a Flowable, it supports backpressure. The backpressure from the currently subscribed Subscribers are coordinated by emitting upstream items only if all of those Subscribers have requested at least one item. This behavior is also called lockstep-mode because even if some Subscribers can take any number of items, other Subscribers requesting less or infrequently will slow down the overall throughput of the flow.

    Calling onNext(Object), offer(Object), onError(Throwable) and onComplete() is required to be serialized (called from the same thread or called non-overlappingly from different threads through external means of serialization). The FlowableProcessor.toSerialized() method available to all FlowableProcessors provides such serialization and also protects against reentrance (i.e., when a downstream Subscriber consuming this processor also wants to call onNext(Object) on this processor recursively).

    This MulticastProcessor supports the standard state-peeking methods hasComplete(), hasThrowable(), getThrowable() and hasSubscribers(). This processor doesn't allow peeking into its buffer.

    When this MulticastProcessor is terminated via onError(Throwable) or onComplete(), all previously signaled but not yet consumed items will be still available to Subscribers and the respective terminal even is only emitted when all previous items have been successfully delivered to Subscribers. If there are no Subscribers, the remaining items will be buffered indefinitely.

    The MulticastProcessor does not support clearing its cached events (to appear empty again).

    Backpressure:
    The backpressure from the currently subscribed Subscribers are coordinated by emitting upstream items only if all of those Subscribers have requested at least one item. This behavior is also called lockstep-mode because even if some Subscribers can take any number of items, other Subscribers requesting less or infrequently will slow down the overall throughput of the flow.
    Scheduler:
    MulticastProcessor does not operate by default on a particular Scheduler and the Subscribers get notified on an arbitrary thread in a serialized fashion.

    Example:

    
        MulticastProcessor<Integer> mp = Flowable.range(1, 10)
        .subscribeWith(MulticastProcessor.create());
    
        mp.test().assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
    
        // --------------------
    
        MulticastProcessor<Integer> mp2 = MulticastProcessor.create(4);
        mp2.start();
    
        assertTrue(mp2.offer(1));
        assertTrue(mp2.offer(2));
        assertTrue(mp2.offer(3));
        assertTrue(mp2.offer(4));
    
        assertFalse(mp2.offer(5));
    
        mp2.onComplete();
    
        mp2.test().assertResult(1, 2, 3, 4);
     

    History: 2.1.14 - experimental

    Since:
    2.2
    • Constructor Detail

      • MulticastProcessor

        MulticastProcessor​(int bufferSize,
                           boolean refCount)
        Constructs a fresh instance with the given prefetch amount and the optional refCount-behavior.
        Parameters:
        bufferSize - the prefetch amount
        refCount - if true and if all Subscribers have canceled, the upstream is cancelled
    • Method Detail

      • create

        @CheckReturnValue
        @NonNull
        public static <T> @NonNull MulticastProcessor<T> create()
        Constructs a fresh instance with the default Flowable.bufferSize() prefetch amount and no refCount-behavior.
        Type Parameters:
        T - the input and output value type
        Returns:
        the new MulticastProcessor instance
      • create

        @CheckReturnValue
        @NonNull
        public static <T> @NonNull MulticastProcessor<T> create​(boolean refCount)
        Constructs a fresh instance with the default Flowable.bufferSize() prefetch amount and the optional refCount-behavior.
        Type Parameters:
        T - the input and output value type
        Parameters:
        refCount - if true and if all Subscribers have canceled, the upstream is cancelled
        Returns:
        the new MulticastProcessor instance
      • create

        @CheckReturnValue
        @NonNull
        public static <T> @NonNull MulticastProcessor<T> create​(int bufferSize)
        Constructs a fresh instance with the given prefetch amount and no refCount behavior.
        Type Parameters:
        T - the input and output value type
        Parameters:
        bufferSize - the prefetch amount
        Returns:
        the new MulticastProcessor instance
        Throws:
        java.lang.IllegalArgumentException - if bufferSize is non-positive
      • create

        @CheckReturnValue
        @NonNull
        public static <T> @NonNull MulticastProcessor<T> create​(int bufferSize,
                                                                boolean refCount)
        Constructs a fresh instance with the given prefetch amount and the optional refCount-behavior.
        Type Parameters:
        T - the input and output value type
        Parameters:
        bufferSize - the prefetch amount
        refCount - if true and if all Subscribers have canceled, the upstream is cancelled
        Returns:
        the new MulticastProcessor instance
        Throws:
        java.lang.IllegalArgumentException - if bufferSize is non-positive
      • start

        public void start()
        Initializes this Processor by setting an upstream Subscription that ignores request amounts, uses a fixed buffer and allows using the onXXX and offer methods afterwards.
      • startUnbounded

        public void startUnbounded()
        Initializes this Processor by setting an upstream Subscription that ignores request amounts, uses an unbounded buffer and allows using the onXXX and offer methods afterwards.
      • onSubscribe

        public void onSubscribe​(@NonNull
                                @NonNull org.reactivestreams.Subscription s)
        Description copied from interface: FlowableSubscriber
        Implementors of this method should make sure everything that needs to be visible in Subscriber.onNext(Object) is established before calling Subscription.request(long). In practice this means no initialization should happen after the request() call and additional behavior is thread safe in respect to onNext.
      • offer

        @CheckReturnValue
        public boolean offer​(@NonNull
                             @NonNull T t)
        Tries to offer an item into the internal queue and returns false if the queue is full.
        Parameters:
        t - the item to offer, not null
        Returns:
        true if successful, false if the queue is full
        Throws:
        java.lang.NullPointerException - if t is null
        java.lang.IllegalStateException - if the processor is in fusion mode
      • onComplete

        public void onComplete()
      • getThrowable

        @CheckReturnValue
        public java.lang.Throwable getThrowable()
        Description copied from class: FlowableProcessor
        Returns the error that caused the FlowableProcessor to terminate or null if the FlowableProcessor hasn't terminated yet.

        The method is thread-safe.

        Specified by:
        getThrowable in class FlowableProcessor<T>
        Returns:
        the error that caused the FlowableProcessor to terminate or null if the FlowableProcessor hasn't terminated yet
      • subscribeActual

        protected void subscribeActual​(@NonNull
                                       @NonNull org.reactivestreams.Subscriber<? super @NonNull T> s)
        Description copied from class: Flowable
        Operator implementations (both source and intermediate) should implement this method that performs the necessary business logic and handles the incoming Subscribers.

        There is no need to call any of the plugin hooks on the current Flowable instance or the Subscriber; all hooks and basic safeguards have been applied by Flowable.subscribe(Subscriber) before this method gets called.

        Specified by:
        subscribeActual in class Flowable<T>
        Parameters:
        s - the incoming Subscriber, never null
      • drain

        void drain()