Class DisruptorBlockingQueue<E>

  • All Implemented Interfaces:
    ConcurrentQueue<E>, java.io.Serializable, java.lang.Iterable<E>, java.util.Collection<E>, java.util.concurrent.BlockingQueue<E>, java.util.Queue<E>

    public final class DisruptorBlockingQueue<E>
    extends MultithreadConcurrentQueue<E>
    implements java.io.Serializable, java.lang.Iterable<E>, java.util.Collection<E>, java.util.concurrent.BlockingQueue<E>, java.util.Queue<E>
    This is a lock free blocking queue that implements a fixed length queue backed by a ring buffer. Access to the ring buffer is sequenced by iterating a pair of atomic sequence numbers. One is for the head and another for the tail. When a particular thread would like to append to the queue, it obtains the sequence number for the tail. When the thread is ready to commit changes, a machine compare and set is used to prove that the sequence number matches the expected value. In other words, no other thread has modified the sequence. If the sequence number does not match, the operation fails. If the sequence number matches expectation the thread can continue to operate on the queue's ring buffer without contention. This check cleverly avoids any synchronization thus the moniker "lock free." The lack of synchronization results in significant performance advantages. For consumers, access to the back of the ring is controlled by a memory barrier mechanism, namely the "volatile" keyword. Spin locks are employed to ensure the ring tail cursor is up to date prior to updating it. Once the ring cursor is updated, the reader/consumer can be assured that there is data available to read. The consumer thread then employs a mechanism similar to the producer to validate access to the ring. A sequence number for the head of the ring is obtained and when the reader would like to commit the change to the buffer it uses the machine compare and set to prove that no other thread has modified the ring in the interim. This pattern of access is roughly an order of magnitude faster than ArrayBlockingQueue. It is roughly 2x faster than LinkedTransferQueue for similar operations/conditions. Given that LinkedTransferQueue is "state of the art" in terms of Java performance, it is clear that the Disruptor mechanism offers advantages over other strategies. The only memory allocation in this object occurs at object creation and in the clone and drainTo methods. Otherwise, no garbage collection will ever be triggered by calls to the disruptor queue. The drainTo method implements an efficient "batch" mechanism, and may be used to safely claim all of the available queue entries. Drain will not perform as well when it is dealing with contention from other reader threads. Overall the disruptor pattern is weak in dealing with massive thread contention, however efforts have been made to deal with that case here. As always, one should test their intended strategy.
    See Also:
    Serialized Form
    • Field Detail

      • queueNotFullCondition

        protected final Condition queueNotFullCondition
      • queueNotEmptyCondition

        protected final Condition queueNotEmptyCondition
    • Constructor Detail

      • DisruptorBlockingQueue

        public DisruptorBlockingQueue​(int capacity)

        Construct a blocking queue of the given fixed capacity.

        Note: actual capacity will be the next power of two larger than capacity.
        Parameters:
        capacity - maximum capacity of this queue
      • DisruptorBlockingQueue

        public DisruptorBlockingQueue​(int capacity,
                                      SpinPolicy spinPolicy)

        Construct a blocking queue with a given fixed capacity

        Note: actual capacity will be the next power of two larger than capacity. Waiting locking may be used in servers that are tuned for it, waiting locking provides a high performance locking implementation which is approximately a factor of 2 improvement in throughput (40M/s for 1-1 thread transfers) However waiting locking is more CPU aggressive and causes servers that may be configured with far too many threads to show very high load averages. This is probably not as detrimental as it is annoying.
        Parameters:
        capacity - - the queue capacity, suggest using a power of 2
        spinPolicy - - determine the level of cpu aggressiveness in waiting
      • DisruptorBlockingQueue

        public DisruptorBlockingQueue​(int capacity,
                                      java.util.Collection<? extends E> c)

        Construct a blocking queue of the given fixed capacity

        Note: actual capacity will be the next power of two larger than capacity.

        The values from the collection, c, are appended to the queue in iteration order. If the number of elements in the collection exceeds the actual capacity, then the additional elements overwrite the previous ones until all elements have been written once.
        Parameters:
        capacity - maximum capacity of this queue
        c - A collection to use to populate inital values
    • Method Detail

      • offer

        public final boolean offer​(E e)
        Description copied from interface: ConcurrentQueue
        Add element t to the ring
        Specified by:
        offer in interface java.util.concurrent.BlockingQueue<E>
        Specified by:
        offer in interface ConcurrentQueue<E>
        Specified by:
        offer in interface java.util.Queue<E>
        Overrides:
        offer in class MultithreadConcurrentQueue<E>
        Parameters:
        e - - element to offer
        Returns:
        boolean - true if the operation succeeded
      • remove

        public int remove​(E[] e)
        Description copied from interface: ConcurrentQueue
        return all elements in the queue to the provided array, up to the size of the provided array.
        Specified by:
        remove in interface ConcurrentQueue<E>
        Overrides:
        remove in class MultithreadConcurrentQueue<E>
        Parameters:
        e - - The element array
        Returns:
        int - the number of elements added to t
      • remove

        public E remove()
        Specified by:
        remove in interface java.util.Queue<E>
      • element

        public E element()
        Specified by:
        element in interface java.util.Queue<E>
      • put

        public void put​(E e)
                 throws java.lang.InterruptedException
        Specified by:
        put in interface java.util.concurrent.BlockingQueue<E>
        Throws:
        java.lang.InterruptedException
      • offer

        public boolean offer​(E e,
                             long timeout,
                             java.util.concurrent.TimeUnit unit)
                      throws java.lang.InterruptedException
        Specified by:
        offer in interface java.util.concurrent.BlockingQueue<E>
        Throws:
        java.lang.InterruptedException
      • take

        public E take()
               throws java.lang.InterruptedException
        Specified by:
        take in interface java.util.concurrent.BlockingQueue<E>
        Throws:
        java.lang.InterruptedException
      • poll

        public E poll​(long timeout,
                      java.util.concurrent.TimeUnit unit)
               throws java.lang.InterruptedException
        Specified by:
        poll in interface java.util.concurrent.BlockingQueue<E>
        Throws:
        java.lang.InterruptedException
      • remainingCapacity

        public int remainingCapacity()
        Specified by:
        remainingCapacity in interface java.util.concurrent.BlockingQueue<E>
      • drainTo

        public int drainTo​(java.util.Collection<? super E> c)
        Specified by:
        drainTo in interface java.util.concurrent.BlockingQueue<E>
      • drainTo

        public int drainTo​(java.util.Collection<? super E> c,
                           int maxElements)
        Specified by:
        drainTo in interface java.util.concurrent.BlockingQueue<E>
      • toArray

        public java.lang.Object[] toArray()
        Specified by:
        toArray in interface java.util.Collection<E>
      • toArray

        public <T> T[] toArray​(T[] a)
        Specified by:
        toArray in interface java.util.Collection<E>
      • add

        public boolean add​(E e)
        Specified by:
        add in interface java.util.concurrent.BlockingQueue<E>
        Specified by:
        add in interface java.util.Collection<E>
        Specified by:
        add in interface java.util.Queue<E>
      • remove

        public boolean remove​(java.lang.Object o)

        Provided for compatibility with the BlockingQueue interface only.

        This interface has been fixed to be properly concurrent, but will block the entire queue, it should not be used!
        Specified by:
        remove in interface java.util.concurrent.BlockingQueue<E>
        Specified by:
        remove in interface java.util.Collection<E>
      • containsAll

        public boolean containsAll​(java.util.Collection<?> c)
        Specified by:
        containsAll in interface java.util.Collection<E>
      • addAll

        public boolean addAll​(java.util.Collection<? extends E> c)
        Specified by:
        addAll in interface java.util.Collection<E>
      • removeAll

        public boolean removeAll​(java.util.Collection<?> c)
        Specified by:
        removeAll in interface java.util.Collection<E>
      • retainAll

        public boolean retainAll​(java.util.Collection<?> c)
        Specified by:
        retainAll in interface java.util.Collection<E>
      • iterator

        public java.util.Iterator<E> iterator()
        Specified by:
        iterator in interface java.util.Collection<E>
        Specified by:
        iterator in interface java.lang.Iterable<E>
      • isFull

        private boolean isFull()