Class LinkedTransferQueue<E>

  • Type Parameters:
    E - the type of elements held in this collection.
    All Implemented Interfaces:
    java.io.Serializable, java.lang.Iterable<E>, java.util.Collection<E>, java.util.concurrent.BlockingQueue<E>, java.util.Queue<E>, TransferQueue<E>

    class LinkedTransferQueue<E>
    extends java.util.AbstractQueue<E>
    implements TransferQueue<E>, java.io.Serializable
    An unbounded TransferQueue based on linked nodes. This queue orders elements FIFO (first-in-first-out) with respect to any given producer. The head of the queue is that element that has been on the queue the longest time for some producer. The tail of the queue is that element that has been on the queue the shortest time for some producer.

    Beware that, unlike in most collections, the size method is NOT a constant-time operation. Because of the asynchronous nature of these queues, determining the current number of elements requires a traversal of the elements, and so may report inaccurate results if this collection is modified during traversal. Additionally, the bulk operations addAll, removeAll, retainAll, containsAll, equals, and toArray are not guaranteed to be performed atomically. For example, an iterator operating concurrently with an addAll operation might view only some of the added elements.

    This class and its iterator implement all of the optional methods of the Collection and Iterator interfaces.

    Memory consistency effects: As with other concurrent collections, actions in a thread prior to placing an object into a LinkedTransferQueue happen-before actions subsequent to the access or removal of that element from the LinkedTransferQueue in another thread.

    This class is a member of the Java Collections Framework. TODO: Do NOT remove this class. It's referenced from DataStructures.

    • Field Summary

      Fields 
      Modifier and Type Field Description
      private static int ASYNC  
      private static int CHAINED_SPINS
      The number of times to spin before blocking when a node is preceded by another node that is apparently spinning.
      private static int FRONT_SPINS
      The number of times to spin (with randomly interspersed calls to Thread.yield) on multiprocessor before blocking when a node is apparently the first waiter in the queue.
      (package private) LinkedTransferQueue.Node head
      head of the queue; null until first enqueue
      private static long headOffset  
      private static boolean MP
      True if on multiprocessor
      private static int NOW  
      private static long serialVersionUID  
      (package private) static int SWEEP_THRESHOLD
      The maximum number of estimated removal failures (sweepVotes) to tolerate before sweeping through the queue unlinking cancelled nodes that were not unlinked upon initial removal.
      private int sweepVotes
      The number of apparent failures to unsplice removed nodes
      private static long sweepVotesOffset  
      private static int SYNC  
      private LinkedTransferQueue.Node tail
      tail of the queue; null until first append
      private static long tailOffset  
      private static int TIMED  
      private static sun.misc.Unsafe UNSAFE  
    • Constructor Summary

      Constructors 
      Constructor Description
      LinkedTransferQueue()
      Creates an initially empty LinkedTransferQueue.
      LinkedTransferQueue​(java.util.Collection<? extends E> c)
      Creates a LinkedTransferQueue initially containing the elements of the given collection, added in traversal order of the collection's iterator.
    • Method Summary

      All Methods Static Methods Instance Methods Concrete Methods 
      Modifier and Type Method Description
      boolean add​(E e)
      Inserts the specified element at the tail of this queue.
      private E awaitMatch​(LinkedTransferQueue.Node s, LinkedTransferQueue.Node pred, E e, boolean timed, long nanos)
      Spins/yields/blocks until node s is matched or caller gives up.
      private boolean casHead​(LinkedTransferQueue.Node cmp, LinkedTransferQueue.Node val)  
      private boolean casSweepVotes​(int cmp, int val)  
      (package private) static <E> E cast​(java.lang.Object item)  
      private boolean casTail​(LinkedTransferQueue.Node cmp, LinkedTransferQueue.Node val)  
      boolean contains​(java.lang.Object o)
      Returns true if this queue contains the specified element.
      private int countOfMode​(boolean data)
      Traverses and counts unmatched nodes of the given mode.
      int drainTo​(java.util.Collection<? super E> c)  
      int drainTo​(java.util.Collection<? super E> c, int maxElements)  
      private boolean findAndRemove​(java.lang.Object e)
      Main implementation of remove(Object)
      private E firstDataItem()
      Returns the item in the first unmatched node with isData; or null if none.
      private LinkedTransferQueue.Node firstOfMode​(boolean isData)
      Returns the first unmatched node of the given mode, or null if none.
      (package private) static sun.misc.Unsafe getUnsafe()
      Returns a sun.misc.Unsafe.
      int getWaitingConsumerCount()
      Returns an estimate of the number of consumers waiting to dequeue elements via take or poll.
      boolean hasWaitingConsumer()
      Returns true if there is at least one consumer waiting to dequeue an element via take or poll.
      boolean isEmpty()
      Returns true if this queue contains no elements.
      java.util.Iterator<E> iterator()
      Returns an iterator over the elements in this queue in proper sequence.
      boolean offer​(E e)
      Inserts the specified element at the tail of this queue.
      boolean offer​(E e, long timeout, java.util.concurrent.TimeUnit unit)
      Inserts the specified element at the tail of this queue.
      E peek()  
      E poll()  
      E poll​(long timeout, java.util.concurrent.TimeUnit unit)  
      void put​(E e)
      Inserts the specified element at the tail of this queue.
      private void readObject​(java.io.ObjectInputStream s)
      Reconstitutes the Queue instance from a stream (that is, deserializes it).
      int remainingCapacity()
      Always returns Integer.MAX_VALUE because a LinkedTransferQueue is not capacity constrained.
      boolean remove​(java.lang.Object o)
      Removes a single instance of the specified element from this queue, if it is present.
      int size()
      Returns the number of elements in this queue.
      private static int spinsFor​(LinkedTransferQueue.Node pred, boolean haveData)
      Returns spin/yield value for a node with given predecessor and data mode.
      (package private) LinkedTransferQueue.Node succ​(LinkedTransferQueue.Node p)
      Returns the successor of p, or the head node if p.next has been linked to self, which will only be true if traversing with a stale pointer that is now off the list.
      private void sweep()
      Unlinks matched (typically cancelled) nodes encountered in a traversal from head.
      E take()  
      void transfer​(E e)
      Transfers the element to a consumer, waiting if necessary to do so.
      private LinkedTransferQueue.Node tryAppend​(LinkedTransferQueue.Node s, boolean haveData)
      Tries to append node s as tail.
      boolean tryTransfer​(E e)
      Transfers the element to a waiting consumer immediately, if possible.
      boolean tryTransfer​(E e, long timeout, java.util.concurrent.TimeUnit unit)
      Transfers the element to a consumer if it is possible to do so before the timeout elapses.
      (package private) void unsplice​(LinkedTransferQueue.Node pred, LinkedTransferQueue.Node s)
      Unsplices (now or later) the given deleted/cancelled node with the given predecessor.
      private void writeObject​(java.io.ObjectOutputStream s)
      Saves the state to a stream (that is, serializes it).
      private E xfer​(E e, boolean haveData, int how, long nanos)
      Implements all queuing methods.
      • Methods inherited from class java.util.AbstractQueue

        addAll, clear, element, remove
      • Methods inherited from class java.util.AbstractCollection

        containsAll, removeAll, retainAll, toArray, toArray, toString
      • Methods inherited from class java.lang.Object

        clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
      • Methods inherited from interface java.util.Collection

        addAll, clear, containsAll, equals, hashCode, parallelStream, removeAll, removeIf, retainAll, spliterator, stream, toArray, toArray, toArray
      • Methods inherited from interface java.lang.Iterable

        forEach
      • Methods inherited from interface java.util.Queue

        element, remove
    • Field Detail

      • MP

        private static final boolean MP
        True if on multiprocessor
      • FRONT_SPINS

        private static final int FRONT_SPINS
        The number of times to spin (with randomly interspersed calls to Thread.yield) on multiprocessor before blocking when a node is apparently the first waiter in the queue. See above for explanation. Must be a power of two. The value is empirically derived -- it works pretty well across a variety of processors, numbers of CPUs, and OSes.
        See Also:
        Constant Field Values
      • CHAINED_SPINS

        private static final int CHAINED_SPINS
        The number of times to spin before blocking when a node is preceded by another node that is apparently spinning. Also serves as an increment to FRONT_SPINS on phase changes, and as base average frequency for yielding during spins. Must be a power of two.
        See Also:
        Constant Field Values
      • SWEEP_THRESHOLD

        static final int SWEEP_THRESHOLD
        The maximum number of estimated removal failures (sweepVotes) to tolerate before sweeping through the queue unlinking cancelled nodes that were not unlinked upon initial removal. See above for explanation. The value must be at least two to avoid useless sweeps when removing trailing nodes.
        See Also:
        Constant Field Values
      • sweepVotes

        private transient volatile int sweepVotes
        The number of apparent failures to unsplice removed nodes
      • UNSAFE

        private static final sun.misc.Unsafe UNSAFE
      • headOffset

        private static final long headOffset
      • tailOffset

        private static final long tailOffset
      • sweepVotesOffset

        private static final long sweepVotesOffset
    • Constructor Detail

      • LinkedTransferQueue

        public LinkedTransferQueue()
        Creates an initially empty LinkedTransferQueue.
      • LinkedTransferQueue

        public LinkedTransferQueue​(java.util.Collection<? extends E> c)
        Creates a LinkedTransferQueue initially containing the elements of the given collection, added in traversal order of the collection's iterator.
        Parameters:
        c - the collection of elements to initially contain
        Throws:
        java.lang.NullPointerException - if the specified collection or any of its elements are null
    • Method Detail

      • casSweepVotes

        private boolean casSweepVotes​(int cmp,
                                      int val)
      • cast

        static <E> E cast​(java.lang.Object item)
      • xfer

        private E xfer​(E e,
                       boolean haveData,
                       int how,
                       long nanos)
        Implements all queuing methods. See above for explanation.
        Parameters:
        e - the item or null for take
        haveData - true if this is a put, else a take
        how - NOW, ASYNC, SYNC, or TIMED
        nanos - timeout in nanosecs, used only if mode is TIMED
        Returns:
        an item if matched, else e
        Throws:
        java.lang.NullPointerException - if haveData mode but e is null
      • tryAppend

        private LinkedTransferQueue.Node tryAppend​(LinkedTransferQueue.Node s,
                                                   boolean haveData)
        Tries to append node s as tail.
        Parameters:
        s - the node to append
        haveData - true if appending in data mode
        Returns:
        null on failure due to losing race with append in different mode, else s's predecessor, or s itself if no predecessor
      • awaitMatch

        private E awaitMatch​(LinkedTransferQueue.Node s,
                             LinkedTransferQueue.Node pred,
                             E e,
                             boolean timed,
                             long nanos)
        Spins/yields/blocks until node s is matched or caller gives up.
        Parameters:
        s - the waiting node
        pred - the predecessor of s, or s itself if it has no predecessor, or null if unknown (the null case does not occur in any current calls but may in possible future extensions)
        e - the comparison value for checking match
        timed - if true, wait only until timeout elapses
        nanos - timeout in nanosecs, used only if timed is true
        Returns:
        matched item, or e if unmatched on interrupt or timeout
      • spinsFor

        private static int spinsFor​(LinkedTransferQueue.Node pred,
                                    boolean haveData)
        Returns spin/yield value for a node with given predecessor and data mode. See above for explanation.
      • succ

        final LinkedTransferQueue.Node succ​(LinkedTransferQueue.Node p)
        Returns the successor of p, or the head node if p.next has been linked to self, which will only be true if traversing with a stale pointer that is now off the list.
      • firstOfMode

        private LinkedTransferQueue.Node firstOfMode​(boolean isData)
        Returns the first unmatched node of the given mode, or null if none. Used by methods isEmpty, hasWaitingConsumer.
      • firstDataItem

        private E firstDataItem()
        Returns the item in the first unmatched node with isData; or null if none. Used by peek.
      • countOfMode

        private int countOfMode​(boolean data)
        Traverses and counts unmatched nodes of the given mode. Used by methods size and getWaitingConsumerCount.
      • unsplice

        final void unsplice​(LinkedTransferQueue.Node pred,
                            LinkedTransferQueue.Node s)
        Unsplices (now or later) the given deleted/cancelled node with the given predecessor.
        Parameters:
        pred - a node that was at one time known to be the predecessor of s, or null or s itself if s is/was at head
        s - the node to be unspliced
      • sweep

        private void sweep()
        Unlinks matched (typically cancelled) nodes encountered in a traversal from head.
      • findAndRemove

        private boolean findAndRemove​(java.lang.Object e)
        Main implementation of remove(Object)
      • put

        public void put​(E e)
        Inserts the specified element at the tail of this queue. As the queue is unbounded, this method will never block.
        Specified by:
        put in interface java.util.concurrent.BlockingQueue<E>
        Throws:
        java.lang.NullPointerException - if the specified element is null
      • offer

        public boolean offer​(E e,
                             long timeout,
                             java.util.concurrent.TimeUnit unit)
        Inserts the specified element at the tail of this queue. As the queue is unbounded, this method will never block or return false.
        Specified by:
        offer in interface java.util.concurrent.BlockingQueue<E>
        Returns:
        true (as specified by BlockingQueue.offer)
        Throws:
        java.lang.NullPointerException - if the specified element is null
      • offer

        public boolean offer​(E e)
        Inserts the specified element at the tail of this queue. As the queue is unbounded, this method will never return false.
        Specified by:
        offer in interface java.util.concurrent.BlockingQueue<E>
        Specified by:
        offer in interface java.util.Queue<E>
        Returns:
        true (as specified by Queue.offer(E))
        Throws:
        java.lang.NullPointerException - if the specified element is null
      • add

        public boolean add​(E e)
        Inserts the specified element at the tail of this queue. As the queue is unbounded, this method will never throw IllegalStateException or return false.
        Specified by:
        add in interface java.util.concurrent.BlockingQueue<E>
        Specified by:
        add in interface java.util.Collection<E>
        Specified by:
        add in interface java.util.Queue<E>
        Overrides:
        add in class java.util.AbstractQueue<E>
        Returns:
        true (as specified by Collection.add(E))
        Throws:
        java.lang.NullPointerException - if the specified element is null
      • tryTransfer

        public boolean tryTransfer​(E e)
        Transfers the element to a waiting consumer immediately, if possible.

        More precisely, transfers the specified element immediately if there exists a consumer already waiting to receive it (in take() or timed poll), otherwise returning false without enqueuing the element.

        Specified by:
        tryTransfer in interface TransferQueue<E>
        Parameters:
        e - the element to transfer
        Returns:
        true if the element was transferred, else false
        Throws:
        java.lang.NullPointerException - if the specified element is null
      • transfer

        public void transfer​(E e)
                      throws java.lang.InterruptedException
        Transfers the element to a consumer, waiting if necessary to do so.

        More precisely, transfers the specified element immediately if there exists a consumer already waiting to receive it (in take() or timed poll), else inserts the specified element at the tail of this queue and waits until the element is received by a consumer.

        Specified by:
        transfer in interface TransferQueue<E>
        Parameters:
        e - the element to transfer
        Throws:
        java.lang.NullPointerException - if the specified element is null
        java.lang.InterruptedException - if interrupted while waiting, in which case the element is not enqueued.
      • tryTransfer

        public boolean tryTransfer​(E e,
                                   long timeout,
                                   java.util.concurrent.TimeUnit unit)
                            throws java.lang.InterruptedException
        Transfers the element to a consumer if it is possible to do so before the timeout elapses.

        More precisely, transfers the specified element immediately if there exists a consumer already waiting to receive it (in take() or timed poll), else inserts the specified element at the tail of this queue and waits until the element is received by a consumer, returning false if the specified wait time elapses before the element can be transferred.

        Specified by:
        tryTransfer in interface TransferQueue<E>
        Parameters:
        e - the element to transfer
        timeout - how long to wait before giving up, in units of unit
        unit - a TimeUnit determining how to interpret the timeout parameter
        Returns:
        true if successful, or false if the specified waiting time elapses before completion, in which case the element is not enqueued.
        Throws:
        java.lang.NullPointerException - if the specified element is null
        java.lang.InterruptedException - if interrupted while waiting, in which case the element is not enqueued.
      • take

        public E take()
               throws java.lang.InterruptedException
        Specified by:
        take in interface java.util.concurrent.BlockingQueue<E>
        Throws:
        java.lang.InterruptedException
      • poll

        public E poll​(long timeout,
                      java.util.concurrent.TimeUnit unit)
               throws java.lang.InterruptedException
        Specified by:
        poll in interface java.util.concurrent.BlockingQueue<E>
        Throws:
        java.lang.InterruptedException
      • poll

        public E poll()
        Specified by:
        poll in interface java.util.Queue<E>
      • drainTo

        public int drainTo​(java.util.Collection<? super E> c)
        Specified by:
        drainTo in interface java.util.concurrent.BlockingQueue<E>
        Throws:
        java.lang.NullPointerException
        java.lang.IllegalArgumentException
      • drainTo

        public int drainTo​(java.util.Collection<? super E> c,
                           int maxElements)
        Specified by:
        drainTo in interface java.util.concurrent.BlockingQueue<E>
        Throws:
        java.lang.NullPointerException
        java.lang.IllegalArgumentException
      • iterator

        public java.util.Iterator<E> iterator()
        Returns an iterator over the elements in this queue in proper sequence. The elements will be returned in order from first (head) to last (tail).

        The returned iterator is a "weakly consistent" iterator that will never throw ConcurrentModificationException, and guarantees to traverse elements as they existed upon construction of the iterator, and may (but is not guaranteed to) reflect any modifications subsequent to construction.

        Specified by:
        iterator in interface java.util.Collection<E>
        Specified by:
        iterator in interface java.lang.Iterable<E>
        Specified by:
        iterator in class java.util.AbstractCollection<E>
        Returns:
        an iterator over the elements in this queue in proper sequence
      • peek

        public E peek()
        Specified by:
        peek in interface java.util.Queue<E>
      • isEmpty

        public boolean isEmpty()
        Returns true if this queue contains no elements.
        Specified by:
        isEmpty in interface java.util.Collection<E>
        Overrides:
        isEmpty in class java.util.AbstractCollection<E>
        Returns:
        true if this queue contains no elements
      • hasWaitingConsumer

        public boolean hasWaitingConsumer()
        Description copied from interface: TransferQueue
        Returns true if there is at least one consumer waiting to dequeue an element via take or poll. The return value represents a momentary state of affairs.
        Specified by:
        hasWaitingConsumer in interface TransferQueue<E>
        Returns:
        true if there is at least one waiting consumer
      • size

        public int size()
        Returns the number of elements in this queue. If this queue contains more than Integer.MAX_VALUE elements, returns Integer.MAX_VALUE.

        Beware that, unlike in most collections, this method is NOT a constant-time operation. Because of the asynchronous nature of these queues, determining the current number of elements requires an O(n) traversal.

        Specified by:
        size in interface java.util.Collection<E>
        Specified by:
        size in class java.util.AbstractCollection<E>
        Returns:
        the number of elements in this queue
      • getWaitingConsumerCount

        public int getWaitingConsumerCount()
        Description copied from interface: TransferQueue
        Returns an estimate of the number of consumers waiting to dequeue elements via take or poll. The return value is an approximation of a momentary state of affairs, that may be inaccurate if consumers have completed or given up waiting. The value may be useful for monitoring and heuristics, but not for synchronization control. Implementations of this method are likely to be noticeably slower than those for TransferQueue.hasWaitingConsumer().
        Specified by:
        getWaitingConsumerCount in interface TransferQueue<E>
        Returns:
        the number of consumers waiting to dequeue elements
      • remove

        public boolean remove​(java.lang.Object o)
        Removes a single instance of the specified element from this queue, if it is present. More formally, removes an element e such that o.equals(e), if this queue contains one or more such elements. Returns true if this queue contained the specified element (or equivalently, if this queue changed as a result of the call).
        Specified by:
        remove in interface java.util.concurrent.BlockingQueue<E>
        Specified by:
        remove in interface java.util.Collection<E>
        Overrides:
        remove in class java.util.AbstractCollection<E>
        Parameters:
        o - element to be removed from this queue, if present
        Returns:
        true if this queue changed as a result of the call
      • contains

        public boolean contains​(java.lang.Object o)
        Returns true if this queue contains the specified element. More formally, returns true if and only if this queue contains at least one element e such that o.equals(e).
        Specified by:
        contains in interface java.util.concurrent.BlockingQueue<E>
        Specified by:
        contains in interface java.util.Collection<E>
        Overrides:
        contains in class java.util.AbstractCollection<E>
        Parameters:
        o - object to be checked for containment in this queue
        Returns:
        true if this queue contains the specified element
      • remainingCapacity

        public int remainingCapacity()
        Always returns Integer.MAX_VALUE because a LinkedTransferQueue is not capacity constrained.
        Specified by:
        remainingCapacity in interface java.util.concurrent.BlockingQueue<E>
        Returns:
        Integer.MAX_VALUE (as specified by BlockingQueue.remainingCapacity)
      • writeObject

        private void writeObject​(java.io.ObjectOutputStream s)
                          throws java.io.IOException
        Saves the state to a stream (that is, serializes it).
        Parameters:
        s - the stream
        Throws:
        java.io.IOException
      • readObject

        private void readObject​(java.io.ObjectInputStream s)
                         throws java.io.IOException,
                                java.lang.ClassNotFoundException
        Reconstitutes the Queue instance from a stream (that is, deserializes it).
        Parameters:
        s - the stream
        Throws:
        java.io.IOException
        java.lang.ClassNotFoundException
      • getUnsafe

        static sun.misc.Unsafe getUnsafe()
        Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package. Replace with a simple call to Unsafe.getUnsafe when integrating into a jdk.
        Returns:
        a sun.misc.Unsafe