Class ReplayProcessor<T>

  • Type Parameters:
    T - the value type
    All Implemented Interfaces:
    FlowableSubscriber<T>, org.reactivestreams.Processor<T,​T>, org.reactivestreams.Publisher<T>, org.reactivestreams.Subscriber<T>

    public final class ReplayProcessor<@NonNull T>
    extends FlowableProcessor<T>
    Replays events to Subscribers.

    The ReplayProcessor supports the following item retention strategies:

    The ReplayProcessor can be created in bounded and unbounded mode. It can be bounded by size (maximum number of elements retained at most) and/or time (maximum age of elements replayed).

    Since a ReplayProcessor is a Reactive Streams Processor, nulls are not allowed (Rule 2.13) as parameters to onNext(Object) and onError(Throwable). Such calls will result in a NullPointerException being thrown and the processor's state is not changed.

    This ReplayProcessor respects the individual backpressure behavior of its Subscribers but does not coordinate their request amounts towards the upstream (because there might not be any) and consumes the upstream in an unbounded manner (requesting Long.MAX_VALUE). Note that Subscribers receive a continuous sequence of values after they subscribed even if an individual item gets delayed due to backpressure. Due to concurrency requirements, a size-bounded ReplayProcessor may hold strong references to more source emissions than specified.

    When this ReplayProcessor is terminated via onError(Throwable) or onComplete(), late Subscribers will receive the retained/cached items first (if any) followed by the respective terminal event. If the ReplayProcessor has a time-bound, the age of the retained/cached items are still considered when replaying and thus it may result in no items being emitted before the terminal event.

    Once an Subscriber has subscribed, it will receive items continuously from that point on. Bounds only affect how many past items a new Subscriber will receive before it catches up with the live event feed.

    Even though ReplayProcessor implements the Subscriber interface, calling onSubscribe is not required (Rule 2.12) if the processor is used as a standalone source. However, calling onSubscribe after the ReplayProcessor reached its terminal state will result in the given Subscription being canceled immediately.

    Calling onNext(Object), onError(Throwable) and onComplete() is required to be serialized (called from the same thread or called non-overlappingly from different threads through external means of serialization). The FlowableProcessor.toSerialized() method available to all FlowableProcessors provides such serialization and also protects against reentrance (i.e., when a downstream Subscriber consuming this processor also wants to call onNext(Object) on this processor recursively).

    This ReplayProcessor supports the standard state-peeking methods hasComplete(), hasThrowable(), getThrowable() and hasSubscribers() as well as means to read the retained/cached items in a non-blocking and thread-safe manner via hasValue(), getValue(), getValues() or getValues(Object[]).

    Note that due to concurrency requirements, a size- and time-bounded ReplayProcessor may hold strong references to more source emissions than specified while it isn't terminated yet. Use the cleanupBuffer() to allow such inaccessible items to be cleaned up by GC once no consumer references them anymore.

    Backpressure:
    This ReplayProcessor respects the individual backpressure behavior of its Subscribers but does not coordinate their request amounts towards the upstream (because there might not be any) and consumes the upstream in an unbounded manner (requesting Long.MAX_VALUE). Note that Subscribers receive a continuous sequence of values after they subscribed even if an individual item gets delayed due to backpressure.
    Scheduler:
    ReplayProcessor does not operate by default on a particular Scheduler and the Subscribers get notified on the thread the respective onXXX methods were invoked. Time-bound ReplayProcessors use the given Scheduler in their create methods as time source to timestamp of items received for the age checks.
    Error handling:
    When the onError(Throwable) is called, the ReplayProcessor enters into a terminal state and emits the same Throwable instance to the last set of Subscribers. During this emission, if one or more Subscribers cancel their respective Subscriptions, the Throwable is delivered to the global error handler via RxJavaPlugins.onError(Throwable) (multiple times if multiple Subscribers cancel at once). If there were no Subscribers subscribed to this ReplayProcessor when the onError() was called, the global error handler is not invoked.

    Example usage:

     
    
      ReplayProcessor<Object> processor = new ReplayProcessor<T>();
      processor.onNext("one");
      processor.onNext("two");
      processor.onNext("three");
      processor.onComplete();
    
      // both of the following will get the onNext/onComplete calls from above
      processor.subscribe(subscriber1);
      processor.subscribe(subscriber2);
    
       
    • Constructor Detail

      • ReplayProcessor

        ReplayProcessor​(ReplayProcessor.ReplayBuffer<@NonNull T> buffer)
        Constructs a ReplayProcessor with the given custom ReplayBuffer instance.
        Parameters:
        buffer - the ReplayBuffer instance, not null (not verified)
    • Method Detail

      • create

        @CheckReturnValue
        @NonNull
        public static <T> @NonNull ReplayProcessor<T> create()
        Creates an unbounded ReplayProcessor.

        The internal buffer is backed by an ArrayList and starts with an initial capacity of 16. Once the number of items reaches this capacity, it will grow as necessary (usually by 50%). However, as the number of items grows, this causes frequent array reallocation and copying, and may hurt performance and latency. This can be avoided with the create(int) overload which takes an initial capacity parameter and can be tuned to reduce the array reallocation frequency as needed.

        Type Parameters:
        T - the type of items observed and emitted by the ReplayProcessor
        Returns:
        the created ReplayProcessor
      • create

        @CheckReturnValue
        @NonNull
        public static <T> @NonNull ReplayProcessor<T> create​(int capacityHint)
        Creates an unbounded ReplayProcessor with the specified initial buffer capacity.

        Use this method to avoid excessive array reallocation while the internal buffer grows to accommodate new items. For example, if you know that the buffer will hold 32k items, you can ask the ReplayProcessor to preallocate its internal array with a capacity to hold that many items. Once the items start to arrive, the internal array won't need to grow, creating less garbage and no overhead due to frequent array-copying.

        Type Parameters:
        T - the type of items observed and emitted by this type of processor
        Parameters:
        capacityHint - the initial buffer capacity
        Returns:
        the created processor
        Throws:
        java.lang.IllegalArgumentException - if capacityHint is non-positive
      • createWithSize

        @CheckReturnValue
        @NonNull
        public static <T> @NonNull ReplayProcessor<T> createWithSize​(int maxSize)
        Creates a size-bounded ReplayProcessor.

        In this setting, the ReplayProcessor holds at most size items in its internal buffer and discards the oldest item.

        When Subscribers subscribe to a terminated ReplayProcessor, they are guaranteed to see at most size onNext events followed by a termination event.

        If a Subscriber subscribes while the ReplayProcessor is active, it will observe all items in the buffer at that point in time and each item observed afterwards, even if the buffer evicts items due to the size constraint in the mean time. In other words, once a Subscriber subscribes, it will receive items without gaps in the sequence.

        Type Parameters:
        T - the type of items observed and emitted by this type of processor
        Parameters:
        maxSize - the maximum number of buffered items
        Returns:
        the created processor
        Throws:
        java.lang.IllegalArgumentException - if maxSize is non-positive
      • createUnbounded

        @CheckReturnValue
        static <T> ReplayProcessor<T> createUnbounded()
        Creates an unbounded ReplayProcessor with the bounded-implementation for testing purposes.

        This variant behaves like the regular unbounded ReplayProcessor created via create() but uses the structures of the bounded-implementation. This is by no means intended for the replacement of the original, array-backed and unbounded ReplayProcessor due to the additional overhead of the linked-list based internal buffer. The sole purpose is to allow testing and reasoning about the behavior of the bounded implementations without the interference of the eviction policies.

        Type Parameters:
        T - the type of items observed and emitted by this type of processor
        Returns:
        the created processor
      • createWithTime

        @CheckReturnValue
        @NonNull
        public static <T> @NonNull ReplayProcessor<T> createWithTime​(long maxAge,
                                                                     @NonNull
                                                                     @NonNull java.util.concurrent.TimeUnit unit,
                                                                     @NonNull
                                                                     @NonNull Scheduler scheduler)
        Creates a time-bounded ReplayProcessor.

        In this setting, the ReplayProcessor internally tags each observed item with a timestamp value supplied by the Scheduler and keeps only those whose age is less than the supplied time value converted to milliseconds. For example, an item arrives at T=0 and the max age is set to 5; at T>=5 this first item is then evicted by any subsequent item or termination event, leaving the buffer empty.

        Once the processor is terminated, Subscribers subscribing to it will receive items that remained in the buffer after the terminal event, regardless of their age.

        If a Subscriber subscribes while the ReplayProcessor is active, it will observe only those items from within the buffer that have an age less than the specified time, and each item observed thereafter, even if the buffer evicts items due to the time constraint in the mean time. In other words, once a Subscriber subscribes, it observes items without gaps in the sequence except for any outdated items at the beginning of the sequence.

        Note that terminal notifications (onError and onComplete) trigger eviction as well. For example, with a max age of 5, the first item is observed at T=0, then an onComplete notification arrives at T=10. If a Subscriber subscribes at T=11, it will find an empty ReplayProcessor with just an onComplete notification.

        Type Parameters:
        T - the type of items observed and emitted by this type of processor
        Parameters:
        maxAge - the maximum age of the contained items
        unit - the time unit of time
        scheduler - the Scheduler that provides the current time
        Returns:
        the created processor
        Throws:
        java.lang.NullPointerException - if unit or scheduler is null
        java.lang.IllegalArgumentException - if maxAge is non-positive
      • createWithTimeAndSize

        @CheckReturnValue
        @NonNull
        public static <T> @NonNull ReplayProcessor<T> createWithTimeAndSize​(long maxAge,
                                                                            @NonNull
                                                                            @NonNull java.util.concurrent.TimeUnit unit,
                                                                            @NonNull
                                                                            @NonNull Scheduler scheduler,
                                                                            int maxSize)
        Creates a time- and size-bounded ReplayProcessor.

        In this setting, the ReplayProcessor internally tags each received item with a timestamp value supplied by the Scheduler and holds at most size items in its internal buffer. It evicts items from the start of the buffer if their age becomes less-than or equal to the supplied age in milliseconds or the buffer reaches its size limit.

        When Subscribers subscribe to a terminated ReplayProcessor, they observe the items that remained in the buffer after the terminal notification, regardless of their age, but at most size items.

        If a Subscriber subscribes while the ReplayProcessor is active, it will observe only those items from within the buffer that have age less than the specified time and each subsequent item, even if the buffer evicts items due to the time constraint in the mean time. In other words, once a Subscriber subscribes, it observes items without gaps in the sequence except for the outdated items at the beginning of the sequence.

        Note that terminal notifications (onError and onComplete) trigger eviction as well. For example, with a max age of 5, the first item is observed at T=0, then an onComplete notification arrives at T=10. If a Subscriber subscribes at T=11, it will find an empty ReplayProcessor with just an onComplete notification.

        Type Parameters:
        T - the type of items observed and emitted by this type of processor
        Parameters:
        maxAge - the maximum age of the contained items
        unit - the time unit of time
        maxSize - the maximum number of buffered items
        scheduler - the Scheduler that provides the current time
        Returns:
        the created processor
        Throws:
        java.lang.NullPointerException - if unit or scheduler is null
        java.lang.IllegalArgumentException - if maxAge or maxSize is non-positive
      • subscribeActual

        protected void subscribeActual​(org.reactivestreams.Subscriber<? super @NonNull T> s)
        Description copied from class: Flowable
        Operator implementations (both source and intermediate) should implement this method that performs the necessary business logic and handles the incoming Subscribers.

        There is no need to call any of the plugin hooks on the current Flowable instance or the Subscriber; all hooks and basic safeguards have been applied by Flowable.subscribe(Subscriber) before this method gets called.

        Specified by:
        subscribeActual in class Flowable<T>
        Parameters:
        s - the incoming Subscriber, never null
      • onSubscribe

        public void onSubscribe​(org.reactivestreams.Subscription s)
        Description copied from interface: FlowableSubscriber
        Implementors of this method should make sure everything that needs to be visible in Subscriber.onNext(Object) is established before calling Subscription.request(long). In practice this means no initialization should happen after the request() call and additional behavior is thread safe in respect to onNext.
      • onNext

        public void onNext​(@NonNull T t)
      • onError

        public void onError​(java.lang.Throwable t)
      • onComplete

        public void onComplete()
      • getThrowable

        @Nullable
        @CheckReturnValue
        public @Nullable java.lang.Throwable getThrowable()
        Description copied from class: FlowableProcessor
        Returns the error that caused the FlowableProcessor to terminate or null if the FlowableProcessor hasn't terminated yet.

        The method is thread-safe.

        Specified by:
        getThrowable in class FlowableProcessor<T>
        Returns:
        the error that caused the FlowableProcessor to terminate or null if the FlowableProcessor hasn't terminated yet
      • cleanupBuffer

        public void cleanupBuffer()
        Makes sure the item cached by the head node in a bounded ReplayProcessor is released (as it is never part of a replay).

        By default, live bounded buffers will remember one item before the currently receivable one to ensure subscribers can always receive a continuous sequence of items. A terminated ReplayProcessor automatically releases this inaccessible item.

        The method must be called sequentially, similar to the standard onXXX methods.

        History: 2.1.11 - experimental

        Since:
        2.2
      • getValue

        @CheckReturnValue
        public T getValue()
        Returns the latest value this processor has or null if no such value exists.

        The method is thread-safe.

        Returns:
        the latest value this processor currently has or null if no such value exists
      • getValues

        @CheckReturnValue
        public java.lang.Object[] getValues()
        Returns an Object array containing snapshot all values of this processor.

        The method is thread-safe.

        Returns:
        the array containing the snapshot of all values of this processor
      • getValues

        @CheckReturnValue
        public T[] getValues​(@NonNull T[] array)
        Returns a typed array containing a snapshot of all values of this processor.

        The method follows the conventions of Collection.toArray by setting the array element after the last value to null (if the capacity permits).

        The method is thread-safe.

        Parameters:
        array - the target array to copy values into if it fits
        Returns:
        the given array if the values fit into it or a new array containing all values
      • hasValue

        @CheckReturnValue
        public boolean hasValue()
        Returns true if this processor has any value.

        The method is thread-safe.

        Returns:
        true if the processor has any value