Class QueueDrainHelper
- java.lang.Object
-
- io.reactivex.rxjava3.internal.util.QueueDrainHelper
-
public final class QueueDrainHelper extends java.lang.Object
Utility class to help with the queue-drain serialization idiom.
-
-
Field Summary
Fields Modifier and Type Field Description (package private) static long
COMPLETED_MASK
(package private) static long
REQUESTED_MASK
-
Constructor Summary
Constructors Modifier Constructor Description private
QueueDrainHelper()
Utility class.
-
Method Summary
All Methods Static Methods Concrete Methods Modifier and Type Method Description static <T,U>
booleancheckTerminated(boolean d, boolean empty, Observer<?> observer, boolean delayError, SimpleQueue<?> q, Disposable disposable, ObservableQueueDrain<T,U> qd)
static <T,U>
booleancheckTerminated(boolean d, boolean empty, org.reactivestreams.Subscriber<?> s, boolean delayError, SimpleQueue<?> q, QueueDrain<T,U> qd)
static <T> SimpleQueue<T>
createQueue(int capacityHint)
Creates a queue: spsc-array if capacityHint is positive and spsc-linked-array if capacityHint is negative; in both cases, the capacity is the absolute value of prefetch.static <T,U>
voiddrainLoop(SimplePlainQueue<T> q, Observer<? super U> a, boolean delayError, Disposable dispose, ObservableQueueDrain<T,U> qd)
static <T,U>
voiddrainMaxLoop(SimplePlainQueue<T> q, org.reactivestreams.Subscriber<? super U> a, boolean delayError, Disposable dispose, QueueDrain<T,U> qd)
Drain the queue but give up with an error if there aren't enough requests.(package private) static boolean
isCancelled(BooleanSupplier cancelled)
static <T> void
postComplete(org.reactivestreams.Subscriber<? super T> actual, java.util.Queue<T> queue, java.util.concurrent.atomic.AtomicLong state, BooleanSupplier isCancelled)
Signals the completion of the main sequence and switches to post-completion replay mode.(package private) static <T> boolean
postCompleteDrain(long n, org.reactivestreams.Subscriber<? super T> actual, java.util.Queue<T> queue, java.util.concurrent.atomic.AtomicLong state, BooleanSupplier isCancelled)
Drains the queue based on the outstanding requests in post-completed mode (only!).static <T> boolean
postCompleteRequest(long n, org.reactivestreams.Subscriber<? super T> actual, java.util.Queue<T> queue, java.util.concurrent.atomic.AtomicLong state, BooleanSupplier isCancelled)
Accumulates requests (not validated) and handles the completed mode draining of the queue based on the requests.static void
request(org.reactivestreams.Subscription s, int prefetch)
RequestsLong.MAX_VALUE
if prefetch is negative or the exact amount if prefetch is positive.
-
-
-
Field Detail
-
COMPLETED_MASK
static final long COMPLETED_MASK
- See Also:
- Constant Field Values
-
REQUESTED_MASK
static final long REQUESTED_MASK
- See Also:
- Constant Field Values
-
-
Method Detail
-
drainMaxLoop
public static <T,U> void drainMaxLoop(SimplePlainQueue<T> q, org.reactivestreams.Subscriber<? super U> a, boolean delayError, Disposable dispose, QueueDrain<T,U> qd)
Drain the queue but give up with an error if there aren't enough requests.- Type Parameters:
T
- the queue value typeU
- the emission value type- Parameters:
q
- the queuea
- the subscriberdelayError
- true if errors should be delayed after all normal itemsdispose
- the disposable to call when termination happens and cleanup is necessaryqd
- the QueueDrain instance that gives status information to the drain logic
-
checkTerminated
public static <T,U> boolean checkTerminated(boolean d, boolean empty, org.reactivestreams.Subscriber<?> s, boolean delayError, SimpleQueue<?> q, QueueDrain<T,U> qd)
-
drainLoop
public static <T,U> void drainLoop(SimplePlainQueue<T> q, Observer<? super U> a, boolean delayError, Disposable dispose, ObservableQueueDrain<T,U> qd)
-
checkTerminated
public static <T,U> boolean checkTerminated(boolean d, boolean empty, Observer<?> observer, boolean delayError, SimpleQueue<?> q, Disposable disposable, ObservableQueueDrain<T,U> qd)
-
createQueue
public static <T> SimpleQueue<T> createQueue(int capacityHint)
Creates a queue: spsc-array if capacityHint is positive and spsc-linked-array if capacityHint is negative; in both cases, the capacity is the absolute value of prefetch.- Type Parameters:
T
- the value type of the queue- Parameters:
capacityHint
- the capacity hint, negative value will create an array-based SPSC queue- Returns:
- the queue instance
-
request
public static void request(org.reactivestreams.Subscription s, int prefetch)
RequestsLong.MAX_VALUE
if prefetch is negative or the exact amount if prefetch is positive.- Parameters:
s
- the Subscription to request fromprefetch
- the prefetch value
-
postCompleteRequest
public static <T> boolean postCompleteRequest(long n, org.reactivestreams.Subscriber<? super T> actual, java.util.Queue<T> queue, java.util.concurrent.atomic.AtomicLong state, BooleanSupplier isCancelled)
Accumulates requests (not validated) and handles the completed mode draining of the queue based on the requests.Post-completion backpressure handles the case when a source produces values based on requests when it is active but more values are available even after its completion. In this case, the onComplete() can't just emit the contents of the queue but has to coordinate with the requested amounts. This requires two distinct modes: active and completed. In active mode, requests flow through and the queue is not accessed but in completed mode, requests no-longer reach the upstream but help in draining the queue.
- Type Parameters:
T
- the value type emitted- Parameters:
n
- the request amount, positive (not validated)actual
- the target Subscriber to send events toqueue
- the queue to drain if in the post-complete statestate
- holds the request amount and the post-completed flagisCancelled
- a supplier that returns true if the drain has been cancelled- Returns:
- true if the state indicates a completion state.
-
isCancelled
static boolean isCancelled(BooleanSupplier cancelled)
-
postCompleteDrain
static <T> boolean postCompleteDrain(long n, org.reactivestreams.Subscriber<? super T> actual, java.util.Queue<T> queue, java.util.concurrent.atomic.AtomicLong state, BooleanSupplier isCancelled)
Drains the queue based on the outstanding requests in post-completed mode (only!).- Type Parameters:
T
- the value type- Parameters:
n
- the current request amountactual
- the target Subscriber to send events toqueue
- the queue to drain if in the post-complete statestate
- holds the request amount and the post-completed flagisCancelled
- a supplier that returns true if the drain has been cancelled- Returns:
- true if the queue was completely drained or the drain process was cancelled
-
postComplete
public static <T> void postComplete(org.reactivestreams.Subscriber<? super T> actual, java.util.Queue<T> queue, java.util.concurrent.atomic.AtomicLong state, BooleanSupplier isCancelled)
Signals the completion of the main sequence and switches to post-completion replay mode.Don't modify the queue after calling this method!
Post-completion backpressure handles the case when a source produces values based on requests when it is active but more values are available even after its completion. In this case, the onComplete() can't just emit the contents of the queue but has to coordinate with the requested amounts. This requires two distinct modes: active and completed. In active mode, requests flow through and the queue is not accessed but in completed mode, requests no-longer reach the upstream but help in draining the queue.
The algorithm utilizes the most significant bit (bit 63) of a long value (AtomicLong) since request amount only goes up to
Long.MAX_VALUE
(bits 0-62) and negative values aren't allowed.- Type Parameters:
T
- the value type emitted- Parameters:
actual
- the target Subscriber to send events toqueue
- the queue to drain if in the post-complete statestate
- holds the request amount and the post-completed flagisCancelled
- a supplier that returns true if the drain has been cancelled
-
-