Class QueueDrainHelper

java.lang.Object
io.reactivex.rxjava3.internal.util.QueueDrainHelper

public final class QueueDrainHelper extends Object
Utility class to help with the queue-drain serialization idiom.
  • Field Details

  • Constructor Details

    • QueueDrainHelper

      private QueueDrainHelper()
      Utility class.
  • Method Details

    • drainMaxLoop

      public static <T, U> void drainMaxLoop(SimplePlainQueue<T> q, org.reactivestreams.Subscriber<? super U> a, boolean delayError, Disposable dispose, QueueDrain<T,U> qd)
      Drain the queue but give up with an error if there aren't enough requests.
      Type Parameters:
      T - the queue value type
      U - the emission value type
      Parameters:
      q - the queue
      a - the subscriber
      delayError - true if errors should be delayed after all normal items
      dispose - the disposable to call when termination happens and cleanup is necessary
      qd - the QueueDrain instance that gives status information to the drain logic
    • checkTerminated

      public static <T, U> boolean checkTerminated(boolean d, boolean empty, org.reactivestreams.Subscriber<?> s, boolean delayError, SimpleQueue<?> q, QueueDrain<T,U> qd)
    • drainLoop

      public static <T, U> void drainLoop(SimplePlainQueue<T> q, Observer<? super U> a, boolean delayError, Disposable dispose, ObservableQueueDrain<T,U> qd)
    • checkTerminated

      public static <T, U> boolean checkTerminated(boolean d, boolean empty, Observer<?> observer, boolean delayError, SimpleQueue<?> q, Disposable disposable, ObservableQueueDrain<T,U> qd)
    • createQueue

      public static <T> SimpleQueue<T> createQueue(int capacityHint)
      Creates a queue: spsc-array if capacityHint is positive and spsc-linked-array if capacityHint is negative; in both cases, the capacity is the absolute value of prefetch.
      Type Parameters:
      T - the value type of the queue
      Parameters:
      capacityHint - the capacity hint, negative value will create an array-based SPSC queue
      Returns:
      the queue instance
    • request

      public static void request(org.reactivestreams.Subscription s, int prefetch)
      Requests Long.MAX_VALUE if prefetch is negative or the exact amount if prefetch is positive.
      Parameters:
      s - the Subscription to request from
      prefetch - the prefetch value
    • postCompleteRequest

      public static <T> boolean postCompleteRequest(long n, org.reactivestreams.Subscriber<? super T> actual, Queue<T> queue, AtomicLong state, BooleanSupplier isCancelled)
      Accumulates requests (not validated) and handles the completed mode draining of the queue based on the requests.

      Post-completion backpressure handles the case when a source produces values based on requests when it is active but more values are available even after its completion. In this case, the onComplete() can't just emit the contents of the queue but has to coordinate with the requested amounts. This requires two distinct modes: active and completed. In active mode, requests flow through and the queue is not accessed but in completed mode, requests no-longer reach the upstream but help in draining the queue.

      Type Parameters:
      T - the value type emitted
      Parameters:
      n - the request amount, positive (not validated)
      actual - the target Subscriber to send events to
      queue - the queue to drain if in the post-complete state
      state - holds the request amount and the post-completed flag
      isCancelled - a supplier that returns true if the drain has been cancelled
      Returns:
      true if the state indicates a completion state.
    • isCancelled

      static boolean isCancelled(BooleanSupplier cancelled)
    • postCompleteDrain

      static <T> boolean postCompleteDrain(long n, org.reactivestreams.Subscriber<? super T> actual, Queue<T> queue, AtomicLong state, BooleanSupplier isCancelled)
      Drains the queue based on the outstanding requests in post-completed mode (only!).
      Type Parameters:
      T - the value type
      Parameters:
      n - the current request amount
      actual - the target Subscriber to send events to
      queue - the queue to drain if in the post-complete state
      state - holds the request amount and the post-completed flag
      isCancelled - a supplier that returns true if the drain has been cancelled
      Returns:
      true if the queue was completely drained or the drain process was cancelled
    • postComplete

      public static <T> void postComplete(org.reactivestreams.Subscriber<? super T> actual, Queue<T> queue, AtomicLong state, BooleanSupplier isCancelled)
      Signals the completion of the main sequence and switches to post-completion replay mode.

      Don't modify the queue after calling this method!

      Post-completion backpressure handles the case when a source produces values based on requests when it is active but more values are available even after its completion. In this case, the onComplete() can't just emit the contents of the queue but has to coordinate with the requested amounts. This requires two distinct modes: active and completed. In active mode, requests flow through and the queue is not accessed but in completed mode, requests no-longer reach the upstream but help in draining the queue.

      The algorithm utilizes the most significant bit (bit 63) of a long value (AtomicLong) since request amount only goes up to Long.MAX_VALUE (bits 0-62) and negative values aren't allowed.

      Type Parameters:
      T - the value type emitted
      Parameters:
      actual - the target Subscriber to send events to
      queue - the queue to drain if in the post-complete state
      state - holds the request amount and the post-completed flag
      isCancelled - a supplier that returns true if the drain has been cancelled