Class MpscLinkedQueue<T>

java.lang.Object
io.reactivex.rxjava3.internal.queue.MpscLinkedQueue<T>
Type Parameters:
T - the contained value type
All Implemented Interfaces:
SimplePlainQueue<T>, SimpleQueue<T>

public final class MpscLinkedQueue<T> extends Object implements SimplePlainQueue<T>
A multi-producer single consumer unbounded queue.
  • Field Details

  • Constructor Details

    • MpscLinkedQueue

      public MpscLinkedQueue()
  • Method Details

    • offer

      public boolean offer(T e)
      Atomically enqueue a single value.

      IMPLEMENTATION NOTES:
      Offer is allowed from multiple threads.
      Offer allocates a new node and:

      1. Swaps it atomically with current producer node (only one producer 'wins')
      2. Sets the new node as the node following from the swapped producer node
      This works because each producer is guaranteed to 'plant' a new node and link the old node. No 2 producers can get the same producer node as part of XCHG guarantee.
      Specified by:
      offer in interface SimpleQueue<T>
      Parameters:
      e - the value to enqueue, not null
      Returns:
      true if successful, false if the value was not enqueued likely due to reaching the queue capacity)
      See Also:
    • poll

      @Nullable public T poll()
      Tries to dequeue a value (non-null) or returns null if the queue is empty.

      If the producer uses SimpleQueue.offer(Object, Object) and when polling in pairs, if the first poll() returns a non-null item, the second poll() is guaranteed to return a non-null item as well.

      IMPLEMENTATION NOTES:
      Poll is allowed from a SINGLE thread.
      Poll reads the next node from the consumerNode and:

      1. If it is null, the queue is assumed empty (though it might not be).
      2. If it is not null set it as the consumer node and return it's now evacuated value.
      This means the consumerNode.value is always null, which is also the starting point for the queue. Because null values are not allowed to be offered this is the only node with it's value set to null at any one time.
      Specified by:
      poll in interface SimplePlainQueue<T>
      Specified by:
      poll in interface SimpleQueue<T>
      Returns:
      the item or null to indicate an empty queue
      See Also:
    • offer

      public boolean offer(T v1, T v2)
      Description copied from interface: SimpleQueue
      Atomically enqueue two values.
      Specified by:
      offer in interface SimpleQueue<T>
      Parameters:
      v1 - the first value to enqueue, not null
      v2 - the second value to enqueue, not null
      Returns:
      true if successful, false if the value was not enqueued likely due to reaching the queue capacity)
    • clear

      public void clear()
      Description copied from interface: SimpleQueue
      Removes all enqueued items from this queue.
      Specified by:
      clear in interface SimpleQueue<T>
    • lvProducerNode

    • xchgProducerNode

    • lvConsumerNode

    • lpConsumerNode

    • spConsumerNode

      void spConsumerNode(MpscLinkedQueue.LinkedQueueNode<T> node)
    • isEmpty

      public boolean isEmpty()
      Returns true if the queue is empty.

      Note however that due to potential fused functions in SimpleQueue.poll() it is possible this method returns false but then poll() returns null because the fused function swallowed the available item(s).

      IMPLEMENTATION NOTES:
      Queue is empty when producerNode is the same as consumerNode. An alternative implementation would be to observe the producerNode.value is null, which also means an empty queue because only the consumerNode.value is allowed to be null.

      Specified by:
      isEmpty in interface SimpleQueue<T>
      Returns:
      true if the queue is empty