Class MpscLinkedQueue<T>
- java.lang.Object
-
- io.reactivex.rxjava3.internal.queue.MpscLinkedQueue<T>
-
- Type Parameters:
T
- the contained value type
- All Implemented Interfaces:
SimplePlainQueue<T>
,SimpleQueue<T>
public final class MpscLinkedQueue<T> extends java.lang.Object implements SimplePlainQueue<T>
A multi-producer single consumer unbounded queue.
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description (package private) static class
MpscLinkedQueue.LinkedQueueNode<E>
-
Field Summary
Fields Modifier and Type Field Description private java.util.concurrent.atomic.AtomicReference<MpscLinkedQueue.LinkedQueueNode<T>>
consumerNode
private java.util.concurrent.atomic.AtomicReference<MpscLinkedQueue.LinkedQueueNode<T>>
producerNode
-
Constructor Summary
Constructors Constructor Description MpscLinkedQueue()
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description void
clear()
Removes all enqueued items from this queue.boolean
isEmpty()
Returns true if the queue is empty.(package private) MpscLinkedQueue.LinkedQueueNode<T>
lpConsumerNode()
(package private) MpscLinkedQueue.LinkedQueueNode<T>
lvConsumerNode()
(package private) MpscLinkedQueue.LinkedQueueNode<T>
lvProducerNode()
boolean
offer(T e)
Atomically enqueue a single value.boolean
offer(T v1, T v2)
Atomically enqueue two values.T
poll()
Tries to dequeue a value (non-null) or returns null if the queue is empty.(package private) void
spConsumerNode(MpscLinkedQueue.LinkedQueueNode<T> node)
(package private) MpscLinkedQueue.LinkedQueueNode<T>
xchgProducerNode(MpscLinkedQueue.LinkedQueueNode<T> node)
-
-
-
Field Detail
-
producerNode
private final java.util.concurrent.atomic.AtomicReference<MpscLinkedQueue.LinkedQueueNode<T>> producerNode
-
consumerNode
private final java.util.concurrent.atomic.AtomicReference<MpscLinkedQueue.LinkedQueueNode<T>> consumerNode
-
-
Method Detail
-
offer
public boolean offer(T e)
Atomically enqueue a single value.
IMPLEMENTATION NOTES:
Offer is allowed from multiple threads.
Offer allocates a new node and:- Swaps it atomically with current producer node (only one producer 'wins')
- Sets the new node as the node following from the swapped producer node
- Specified by:
offer
in interfaceSimpleQueue<T>
- Parameters:
e
- the value to enqueue, not null- Returns:
- true if successful, false if the value was not enqueued likely due to reaching the queue capacity)
- See Also:
Queue.offer(java.lang.Object)
-
poll
@Nullable public T poll()
Tries to dequeue a value (non-null) or returns null if the queue is empty.If the producer uses
SimpleQueue.offer(Object, Object)
and when polling in pairs, if the first poll() returns a non-null item, the second poll() is guaranteed to return a non-null item as well.
IMPLEMENTATION NOTES:
Poll is allowed from a SINGLE thread.
Poll reads the next node from the consumerNode and:- If it is null, the queue is assumed empty (though it might not be).
- If it is not null set it as the consumer node and return it's now evacuated value.
- Specified by:
poll
in interfaceSimplePlainQueue<T>
- Specified by:
poll
in interfaceSimpleQueue<T>
- Returns:
- the item or null to indicate an empty queue
- See Also:
Queue.poll()
-
offer
public boolean offer(T v1, T v2)
Description copied from interface:SimpleQueue
Atomically enqueue two values.- Specified by:
offer
in interfaceSimpleQueue<T>
- Parameters:
v1
- the first value to enqueue, not nullv2
- the second value to enqueue, not null- Returns:
- true if successful, false if the value was not enqueued likely due to reaching the queue capacity)
-
clear
public void clear()
Description copied from interface:SimpleQueue
Removes all enqueued items from this queue.- Specified by:
clear
in interfaceSimpleQueue<T>
-
lvProducerNode
MpscLinkedQueue.LinkedQueueNode<T> lvProducerNode()
-
xchgProducerNode
MpscLinkedQueue.LinkedQueueNode<T> xchgProducerNode(MpscLinkedQueue.LinkedQueueNode<T> node)
-
lvConsumerNode
MpscLinkedQueue.LinkedQueueNode<T> lvConsumerNode()
-
lpConsumerNode
MpscLinkedQueue.LinkedQueueNode<T> lpConsumerNode()
-
spConsumerNode
void spConsumerNode(MpscLinkedQueue.LinkedQueueNode<T> node)
-
isEmpty
public boolean isEmpty()
Returns true if the queue is empty.Note however that due to potential fused functions in
SimpleQueue.poll()
it is possible this method returns false but then poll() returns null because the fused function swallowed the available item(s).
IMPLEMENTATION NOTES:
Queue is empty when producerNode is the same as consumerNode. An alternative implementation would be to observe the producerNode.value is null, which also means an empty queue because only the consumerNode.value is allowed to be null.- Specified by:
isEmpty
in interfaceSimpleQueue<T>
- Returns:
- true if the queue is empty
-
-