Class QueueDrainHelper


  • public final class QueueDrainHelper
    extends java.lang.Object
    Utility class to help with the queue-drain serialization idiom.
    • Field Summary

      Fields 
      Modifier and Type Field Description
      (package private) static long COMPLETED_MASK  
      (package private) static long REQUESTED_MASK  
    • Constructor Summary

      Constructors 
      Modifier Constructor Description
      private QueueDrainHelper()
      Utility class.
    • Method Summary

      All Methods Static Methods Concrete Methods 
      Modifier and Type Method Description
      static <T,​U>
      boolean
      checkTerminated​(boolean d, boolean empty, Observer<?> observer, boolean delayError, SimpleQueue<?> q, Disposable disposable, ObservableQueueDrain<T,​U> qd)  
      static <T,​U>
      boolean
      checkTerminated​(boolean d, boolean empty, org.reactivestreams.Subscriber<?> s, boolean delayError, SimpleQueue<?> q, QueueDrain<T,​U> qd)  
      static <T> SimpleQueue<T> createQueue​(int capacityHint)
      Creates a queue: spsc-array if capacityHint is positive and spsc-linked-array if capacityHint is negative; in both cases, the capacity is the absolute value of prefetch.
      static <T,​U>
      void
      drainLoop​(SimplePlainQueue<T> q, Observer<? super U> a, boolean delayError, Disposable dispose, ObservableQueueDrain<T,​U> qd)  
      static <T,​U>
      void
      drainMaxLoop​(SimplePlainQueue<T> q, org.reactivestreams.Subscriber<? super U> a, boolean delayError, Disposable dispose, QueueDrain<T,​U> qd)
      Drain the queue but give up with an error if there aren't enough requests.
      (package private) static boolean isCancelled​(BooleanSupplier cancelled)  
      static <T> void postComplete​(org.reactivestreams.Subscriber<? super T> actual, java.util.Queue<T> queue, java.util.concurrent.atomic.AtomicLong state, BooleanSupplier isCancelled)
      Signals the completion of the main sequence and switches to post-completion replay mode.
      (package private) static <T> boolean postCompleteDrain​(long n, org.reactivestreams.Subscriber<? super T> actual, java.util.Queue<T> queue, java.util.concurrent.atomic.AtomicLong state, BooleanSupplier isCancelled)
      Drains the queue based on the outstanding requests in post-completed mode (only!).
      static <T> boolean postCompleteRequest​(long n, org.reactivestreams.Subscriber<? super T> actual, java.util.Queue<T> queue, java.util.concurrent.atomic.AtomicLong state, BooleanSupplier isCancelled)
      Accumulates requests (not validated) and handles the completed mode draining of the queue based on the requests.
      static void request​(org.reactivestreams.Subscription s, int prefetch)
      Requests Long.MAX_VALUE if prefetch is negative or the exact amount if prefetch is positive.
      • Methods inherited from class java.lang.Object

        clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
    • Constructor Detail

      • QueueDrainHelper

        private QueueDrainHelper()
        Utility class.
    • Method Detail

      • drainMaxLoop

        public static <T,​U> void drainMaxLoop​(SimplePlainQueue<T> q,
                                                    org.reactivestreams.Subscriber<? super U> a,
                                                    boolean delayError,
                                                    Disposable dispose,
                                                    QueueDrain<T,​U> qd)
        Drain the queue but give up with an error if there aren't enough requests.
        Type Parameters:
        T - the queue value type
        U - the emission value type
        Parameters:
        q - the queue
        a - the subscriber
        delayError - true if errors should be delayed after all normal items
        dispose - the disposable to call when termination happens and cleanup is necessary
        qd - the QueueDrain instance that gives status information to the drain logic
      • checkTerminated

        public static <T,​U> boolean checkTerminated​(boolean d,
                                                          boolean empty,
                                                          org.reactivestreams.Subscriber<?> s,
                                                          boolean delayError,
                                                          SimpleQueue<?> q,
                                                          QueueDrain<T,​U> qd)
      • createQueue

        public static <T> SimpleQueue<T> createQueue​(int capacityHint)
        Creates a queue: spsc-array if capacityHint is positive and spsc-linked-array if capacityHint is negative; in both cases, the capacity is the absolute value of prefetch.
        Type Parameters:
        T - the value type of the queue
        Parameters:
        capacityHint - the capacity hint, negative value will create an array-based SPSC queue
        Returns:
        the queue instance
      • request

        public static void request​(org.reactivestreams.Subscription s,
                                   int prefetch)
        Requests Long.MAX_VALUE if prefetch is negative or the exact amount if prefetch is positive.
        Parameters:
        s - the Subscription to request from
        prefetch - the prefetch value
      • postCompleteRequest

        public static <T> boolean postCompleteRequest​(long n,
                                                      org.reactivestreams.Subscriber<? super T> actual,
                                                      java.util.Queue<T> queue,
                                                      java.util.concurrent.atomic.AtomicLong state,
                                                      BooleanSupplier isCancelled)
        Accumulates requests (not validated) and handles the completed mode draining of the queue based on the requests.

        Post-completion backpressure handles the case when a source produces values based on requests when it is active but more values are available even after its completion. In this case, the onComplete() can't just emit the contents of the queue but has to coordinate with the requested amounts. This requires two distinct modes: active and completed. In active mode, requests flow through and the queue is not accessed but in completed mode, requests no-longer reach the upstream but help in draining the queue.

        Type Parameters:
        T - the value type emitted
        Parameters:
        n - the request amount, positive (not validated)
        actual - the target Subscriber to send events to
        queue - the queue to drain if in the post-complete state
        state - holds the request amount and the post-completed flag
        isCancelled - a supplier that returns true if the drain has been cancelled
        Returns:
        true if the state indicates a completion state.
      • postCompleteDrain

        static <T> boolean postCompleteDrain​(long n,
                                             org.reactivestreams.Subscriber<? super T> actual,
                                             java.util.Queue<T> queue,
                                             java.util.concurrent.atomic.AtomicLong state,
                                             BooleanSupplier isCancelled)
        Drains the queue based on the outstanding requests in post-completed mode (only!).
        Type Parameters:
        T - the value type
        Parameters:
        n - the current request amount
        actual - the target Subscriber to send events to
        queue - the queue to drain if in the post-complete state
        state - holds the request amount and the post-completed flag
        isCancelled - a supplier that returns true if the drain has been cancelled
        Returns:
        true if the queue was completely drained or the drain process was cancelled
      • postComplete

        public static <T> void postComplete​(org.reactivestreams.Subscriber<? super T> actual,
                                            java.util.Queue<T> queue,
                                            java.util.concurrent.atomic.AtomicLong state,
                                            BooleanSupplier isCancelled)
        Signals the completion of the main sequence and switches to post-completion replay mode.

        Don't modify the queue after calling this method!

        Post-completion backpressure handles the case when a source produces values based on requests when it is active but more values are available even after its completion. In this case, the onComplete() can't just emit the contents of the queue but has to coordinate with the requested amounts. This requires two distinct modes: active and completed. In active mode, requests flow through and the queue is not accessed but in completed mode, requests no-longer reach the upstream but help in draining the queue.

        The algorithm utilizes the most significant bit (bit 63) of a long value (AtomicLong) since request amount only goes up to Long.MAX_VALUE (bits 0-62) and negative values aren't allowed.

        Type Parameters:
        T - the value type emitted
        Parameters:
        actual - the target Subscriber to send events to
        queue - the queue to drain if in the post-complete state
        state - holds the request amount and the post-completed flag
        isCancelled - a supplier that returns true if the drain has been cancelled