Class MpscRelaxedArrayQueue<E>

    • Field Detail

      • mask

        private final long mask
        Note on terminology: - position/id: overall progress indicator, not an array index or offset at which to lookup/write. - index: for looking up within an array (including the inlined producerCycleClaim array) - offset: for pointer like access using Unsafe The producer in this queue operates on cycleId and the producerCycleClaim array: - The cycleId grow monotonically, and the parity bit (cycleIndex) indicated which claim to use - The producerCycleClaim indicate position in a cycle as well as the originating cycleId. From a claim we can calculate the producer overall position as well as the position within a cycle. The buffer is split into 2 cycles (matching cycleIndex 0 and 1), allowing the above indicators to control producer progress on separate counters while maintaining the appearance of a contiguous buffer to the consumer.
      • cycleLength

        private final int cycleLength
      • cycleLengthLog2

        private final int cycleLengthLog2
      • buffer

        private final E[] buffer
      • positionWithinCycleMask

        private final int positionWithinCycleMask
      • cycleIdBitShift

        private final int cycleIdBitShift
      • maxCycleId

        private final long maxCycleId
    • Constructor Detail

      • MpscRelaxedArrayQueue

        public MpscRelaxedArrayQueue​(int capacity)
    • Method Detail

      • iterator

        public java.util.Iterator<E> iterator()
        Specified by:
        iterator in interface java.util.Collection<E>
        Specified by:
        iterator in interface java.lang.Iterable<E>
        Specified by:
        iterator in class java.util.AbstractCollection<E>
      • offer

        public boolean offer​(E e)
        Description copied from interface: MessagePassingQueue
        Called from a producer thread subject to the restrictions appropriate to the implementation and according to the Queue.offer(Object) interface.
        Specified by:
        offer in interface MessagePassingQueue<E>
        Specified by:
        offer in interface java.util.Queue<E>
        Parameters:
        e - not null, will throw NPE if it is
        Returns:
        true if element was inserted into the queue, false iff full
      • isFull

        private boolean isFull​(long producerPosition)
        Given the nature of getAndAdd progress on producerPosition and given the potential risk for over claiming it is quite possible for this method to report a queue which is not full as full.
      • rotateCycle

        private void rotateCycle​(long claimCycleId,
                                 int cycleIdBitShift,
                                 long maxCycleId)
      • detectSlowRotation

        private long detectSlowRotation​(long claimCycleId,
                                        long nextCycleId)
      • validateProducerClaim

        private boolean validateProducerClaim​(int activeCycleIndex,
                                              long producerCycleClaim,
                                              long cycleId,
                                              int positionOnCycle,
                                              int cycleLengthLog2,
                                              boolean slowProducer)
        Validate a producer claim to find out if is an overclaim (beyond the producer limit).
        Returns:
        true if the claim is valid, false otherwise.
      • fixProducerOverClaim

        private boolean fixProducerOverClaim​(int activeCycleIndex,
                                             long producerCycleClaim,
                                             boolean slowProducer)
        It tries to fix a producer overclaim.
        Returns:
        true if the claim is now safe to be used,false otherwise and is needed to retry the claim.
      • validateSlowProducerOverClaim

        private void validateSlowProducerOverClaim​(int activeCycleIndex,
                                                   long producerCycleClaim)
        Validates a slow producer over-claim throwing IllegalStateException if the offer on it can't continue.
      • soCycleElement

        private void soCycleElement​(E[] buffer,
                                    E e,
                                    int activeCycleIndex,
                                    int positionWithinCycle,
                                    int cycleLengthLog2)
      • poll

        public E poll()
        Description copied from interface: MessagePassingQueue
        Called from the consumer thread subject to the restrictions appropriate to the implementation and according to the Queue.poll() interface.
        Specified by:
        poll in interface MessagePassingQueue<E>
        Specified by:
        poll in interface java.util.Queue<E>
        Returns:
        a message from the queue if one is available, null iff empty
      • signalConsumerProgress

        private void signalConsumerProgress​(long consumerPosition,
                                            E[] buffer,
                                            long offset)
      • pollSlowPath

        private E pollSlowPath​(E[] buffer,
                               long offset,
                               long consumerPosition)
      • peek

        public E peek()
        Description copied from interface: MessagePassingQueue
        Called from the consumer thread subject to the restrictions appropriate to the implementation and according to the Queue.peek() interface.
        Specified by:
        peek in interface MessagePassingQueue<E>
        Specified by:
        peek in interface java.util.Queue<E>
        Returns:
        a message from the queue if one is available, null iff empty
      • peekSlowPath

        private E peekSlowPath​(E[] buffer,
                               long consumerPosition,
                               long offset)
      • spinForElement

        private E spinForElement​(E[] buffer,
                                 long offset)
      • size

        public int size()
        Description copied from interface: MessagePassingQueue
        This method's accuracy is subject to concurrent modifications happening as the size is estimated and as such is a best effort rather than absolute value. For some implementations this method may be O(n) rather than O(1).
        Specified by:
        size in interface java.util.Collection<E>
        Specified by:
        size in interface MessagePassingQueue<E>
        Specified by:
        size in class java.util.AbstractCollection<E>
        Returns:
        number of messages in the queue, between 0 and Integer.MAX_VALUE but less or equals to capacity (if bounded).
      • clear

        public void clear()
        Description copied from interface: MessagePassingQueue
        Removes all items from the queue. Called from the consumer thread subject to the restrictions appropriate to the implementation and according to the Collection.clear() interface.
        Specified by:
        clear in interface java.util.Collection<E>
        Specified by:
        clear in interface MessagePassingQueue<E>
        Overrides:
        clear in class java.util.AbstractQueue<E>
      • isEmpty

        public boolean isEmpty()
        Description copied from interface: MessagePassingQueue
        This method's accuracy is subject to concurrent modifications happening as the observation is carried out.
        Specified by:
        isEmpty in interface java.util.Collection<E>
        Specified by:
        isEmpty in interface MessagePassingQueue<E>
        Overrides:
        isEmpty in class java.util.AbstractCollection<E>
        Returns:
        true if empty, false otherwise
      • relaxedOffer

        public boolean relaxedOffer​(E e)
        Description copied from interface: MessagePassingQueue
        Called from a producer thread subject to the restrictions appropriate to the implementation. As opposed to Queue.offer(Object) this method may return false without the queue being full.
        Specified by:
        relaxedOffer in interface MessagePassingQueue<E>
        Parameters:
        e - not null, will throw NPE if it is
        Returns:
        true if element was inserted into the queue, false if unable to offer
      • relaxedPoll

        public E relaxedPoll()
        Description copied from interface: MessagePassingQueue
        Called from the consumer thread subject to the restrictions appropriate to the implementation. As opposed to Queue.poll() this method may return null without the queue being empty.
        Specified by:
        relaxedPoll in interface MessagePassingQueue<E>
        Returns:
        a message from the queue if one is available, null if unable to poll
      • relaxedPeek

        public E relaxedPeek()
        Description copied from interface: MessagePassingQueue
        Called from the consumer thread subject to the restrictions appropriate to the implementation. As opposed to Queue.peek() this method may return null without the queue being empty.
        Specified by:
        relaxedPeek in interface MessagePassingQueue<E>
        Returns:
        a message from the queue if one is available, null if unable to peek
      • drain

        public int drain​(MessagePassingQueue.Consumer<E> c)
        Description copied from interface: MessagePassingQueue
        Remove all available item from the queue and hand to consume. This should be semantically similar to:
         M m;
         while((m = relaxedPoll()) != null){
         c.accept(m);
         }
         
        There's no strong commitment to the queue being empty at the end of a drain. Called from a consumer thread subject to the restrictions appropriate to the implementation.

        WARNING: Explicit assumptions are made with regards to MessagePassingQueue.Consumer.accept(T) make sure you have read and understood these before using this method.

        Specified by:
        drain in interface MessagePassingQueue<E>
        Returns:
        the number of polled elements
      • fill

        public int fill​(MessagePassingQueue.Supplier<E> s)
        Description copied from interface: MessagePassingQueue
        Stuff the queue with elements from the supplier. Semantically similar to:
         while(relaxedOffer(s.get());
         
        There's no strong commitment to the queue being full at the end of a fill. Called from a producer thread subject to the restrictions appropriate to the implementation.

        Unbounded queues will fill up the queue with a fixed amount rather than fill up to oblivion. WARNING: Explicit assumptions are made with regards to MessagePassingQueue.Supplier.get() make sure you have read and understood these before using this method.

        Specified by:
        fill in interface MessagePassingQueue<E>
        Returns:
        the number of offered elements
      • drain

        public int drain​(MessagePassingQueue.Consumer<E> c,
                         int limit)
        Description copied from interface: MessagePassingQueue
        Remove up to limit elements from the queue and hand to consume. This should be semantically similar to:

        
           M m;
           int i = 0;
           for(;i < limit && (m = relaxedPoll()) != null; i++){
             c.accept(m);
           }
           return i;
         

        There's no strong commitment to the queue being empty at the end of a drain. Called from a consumer thread subject to the restrictions appropriate to the implementation.

        WARNING: Explicit assumptions are made with regards to MessagePassingQueue.Consumer.accept(T) make sure you have read and understood these before using this method.

        Specified by:
        drain in interface MessagePassingQueue<E>
        Returns:
        the number of polled elements
      • fill

        public int fill​(MessagePassingQueue.Supplier<E> s,
                        int limit)
        Description copied from interface: MessagePassingQueue
        Stuff the queue with up to limit elements from the supplier. Semantically similar to:

        
           for(int i=0; i < limit && relaxedOffer(s.get()); i++);
         

        There's no strong commitment to the queue being full at the end of a fill. Called from a producer thread subject to the restrictions appropriate to the implementation. WARNING: Explicit assumptions are made with regards to MessagePassingQueue.Supplier.get() make sure you have read and understood these before using this method.

        Specified by:
        fill in interface MessagePassingQueue<E>
        Returns:
        the number of offered elements
      • positionWithinCycle

        private static int positionWithinCycle​(long producerCycleClaim,
                                               int positionOnCycleMask)
      • producerClaimCycleId

        private static long producerClaimCycleId​(long producerCycleClaim,
                                                 int cycleIdBitShift)
      • producerPositionFromClaim

        private static long producerPositionFromClaim​(long producerCycleClaim,
                                                      int positionOnCycleMask,
                                                      int cycleIdBitShift,
                                                      int cycleLengthLog2)
      • producerPosition

        private static long producerPosition​(int positionWithinCycle,
                                             long cycleId,
                                             int cycleLengthLog2)
        Convert position in cycle and cycleId into a producer position (monotonically increasing reflection of offers that is comparable with the consumerPosition to determine size/empty/full)
      • calcElementIndexInBuffer

        private static int calcElementIndexInBuffer​(int positionWithinCycle,
                                                    int cycleIndex,
                                                    int cycleLengthLog2)
        Convert [position within cycle, cycleIndex] to index in buffer.
      • toString

        public java.lang.String toString()
        Overrides:
        toString in class java.util.AbstractCollection<E>