Class SpscLinkedArrayQueue<T>

java.lang.Object
io.reactivex.rxjava3.operators.SpscLinkedArrayQueue<T>
Type Parameters:
T - the contained value type
All Implemented Interfaces:
SimplePlainQueue<T>, SimpleQueue<T>

public final class SpscLinkedArrayQueue<T> extends Object implements SimplePlainQueue<T>
A single-producer single-consumer array-backed queue which can allocate new arrays in case the consumer is slower than the producer.
Since:
3.1.1
  • Field Details

    • MAX_LOOK_AHEAD_STEP

      static final int MAX_LOOK_AHEAD_STEP
    • producerIndex

      final AtomicLong producerIndex
    • producerLookAheadStep

      int producerLookAheadStep
    • producerLookAhead

      long producerLookAhead
    • producerMask

      final int producerMask
    • producerBuffer

      AtomicReferenceArray<Object> producerBuffer
    • consumerMask

      final int consumerMask
    • consumerBuffer

      AtomicReferenceArray<Object> consumerBuffer
    • consumerIndex

      final AtomicLong consumerIndex
    • HAS_NEXT

      private static final Object HAS_NEXT
  • Constructor Details

    • SpscLinkedArrayQueue

      public SpscLinkedArrayQueue(int bufferSize)
      Constructs a linked array-based queue instance with the given island size rounded up to the next power of 2.
      Parameters:
      bufferSize - the maximum number of elements per island
  • Method Details

    • offer

      public boolean offer(T e)
      Atomically enqueue a single value.

      This implementation is correct for single producer thread use only.

      Specified by:
      offer in interface SimpleQueue<T>
      Parameters:
      e - the value to enqueue, not null
      Returns:
      true if successful, false if the value was not enqueued likely due to reaching the queue capacity)
    • writeToQueue

      private boolean writeToQueue(AtomicReferenceArray<Object> buffer, T e, long index, int offset)
    • resize

      private void resize(AtomicReferenceArray<Object> oldBuffer, long currIndex, int offset, T e, long mask)
    • soNext

      private void soNext(AtomicReferenceArray<Object> curr, AtomicReferenceArray<Object> next)
    • lvNextBufferAndUnlink

      private AtomicReferenceArray<Object> lvNextBufferAndUnlink(AtomicReferenceArray<Object> curr, int nextIndex)
    • poll

      @Nullable public T poll()
      Tries to dequeue a value (non-null) or returns null if the queue is empty.

      If the producer uses SimpleQueue.offer(Object, Object) and when polling in pairs, if the first poll() returns a non-null item, the second poll() is guaranteed to return a non-null item as well.

      This implementation is correct for single consumer thread use only.

      Specified by:
      poll in interface SimplePlainQueue<T>
      Specified by:
      poll in interface SimpleQueue<T>
      Returns:
      the item or null to indicate an empty queue
    • newBufferPoll

      private T newBufferPoll(AtomicReferenceArray<Object> nextBuffer, long index, int mask)
    • peek

      @Nullable public T peek()
      Returns the next element in this queue without removing it or null if this queue is empty
      Returns:
      the next element or null
    • newBufferPeek

      private T newBufferPeek(AtomicReferenceArray<Object> nextBuffer, long index, int mask)
    • clear

      public void clear()
      Description copied from interface: SimpleQueue
      Removes all enqueued items from this queue.
      Specified by:
      clear in interface SimpleQueue<T>
    • size

      public int size()
      Returns the number of elements in the queue.
      Returns:
      the number of elements in the queue
    • isEmpty

      public boolean isEmpty()
      Description copied from interface: SimpleQueue
      Returns true if the queue is empty.

      Note however that due to potential fused functions in SimpleQueue.poll() it is possible this method returns false but then poll() returns null because the fused function swallowed the available item(s).

      Specified by:
      isEmpty in interface SimpleQueue<T>
      Returns:
      true if the queue is empty
    • adjustLookAheadStep

      private void adjustLookAheadStep(int capacity)
    • lvProducerIndex

      private long lvProducerIndex()
    • lvConsumerIndex

      private long lvConsumerIndex()
    • lpProducerIndex

      private long lpProducerIndex()
    • lpConsumerIndex

      private long lpConsumerIndex()
    • soProducerIndex

      private void soProducerIndex(long v)
    • soConsumerIndex

      private void soConsumerIndex(long v)
    • calcWrappedOffset

      private static int calcWrappedOffset(long index, int mask)
    • calcDirectOffset

      private static int calcDirectOffset(int index)
    • soElement

      private static void soElement(AtomicReferenceArray<Object> buffer, int offset, Object e)
    • lvElement

      private static Object lvElement(AtomicReferenceArray<Object> buffer, int offset)
    • offer

      public boolean offer(T first, T second)
      Offer two elements at the same time.

      Don't use the regular offer() with this at all!

      Specified by:
      offer in interface SimpleQueue<T>
      Parameters:
      first - the first value, not null
      second - the second value, not null
      Returns:
      true if the queue accepted the two new values