Class LinkedTransferQueue<E>
- java.lang.Object
-
- java.util.AbstractCollection<E>
-
- java.util.AbstractQueue<E>
-
- com.google.code.yanf4j.util.LinkedTransferQueue<E>
-
- Type Parameters:
E
- the type of elements held in this collection
- All Implemented Interfaces:
java.lang.Iterable<E>
,java.util.Collection<E>
,java.util.concurrent.BlockingQueue<E>
,java.util.Queue<E>
- Direct Known Subclasses:
FlowControlLinkedTransferQueue
public class LinkedTransferQueue<E> extends java.util.AbstractQueue<E> implements java.util.concurrent.BlockingQueue<E>
An unbounded TransferQueue based on linked nodes. This queue orders elements FIFO (first-in-first-out) with respect to any given producer. The head of the queue is that element that has been on the queue the longest time for some producer. The tail of the queue is that element that has been on the queue the shortest time for some producer.Beware that, unlike in most collections, the size method is NOT a constant-time operation. Because of the asynchronous nature of these queues, determining the current number of elements requires a traversal of the elements.
This class and its iterator implement all of the optional methods of the
Collection
andIterator
interfaces.Memory consistency effects: As with other concurrent collections, actions in a thread prior to placing an object into a
LinkedTransferQueue
happen-before actions subsequent to the access or removal of that element from theLinkedTransferQueue
in another thread.
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description (package private) class
LinkedTransferQueue.Itr
Iterators.static class
LinkedTransferQueue.PaddedAtomicReference<T>
Padded version of AtomicReference used for head, tail and cleanMe, to alleviate contention across threads CASing one vs the other.private static class
LinkedTransferQueue.QNode
Node class for LinkedTransferQueue.
-
Field Summary
Fields Modifier and Type Field Description private LinkedTransferQueue.PaddedAtomicReference<LinkedTransferQueue.QNode>
cleanMe
Reference to a cancelled node that might not yet have been unlinked from queue because it was the last inserted node when it cancelled.private LinkedTransferQueue.PaddedAtomicReference<LinkedTransferQueue.QNode>
head
head of the queueprivate static int
maxTimedSpins
The number of times to spin before blocking in timed waits.private static int
maxUntimedSpins
The number of times to spin before blocking in untimed waits.private static int
NCPUS
The number of CPUs, for spin controlprivate static int
NOWAIT
private static long
spinForTimeoutThreshold
The number of nanoseconds for which it is faster to spin rather than to use timed park.private LinkedTransferQueue.PaddedAtomicReference<LinkedTransferQueue.QNode>
tail
tail of the queueprivate static int
TIMEOUT
private static int
WAIT
-
Constructor Summary
Constructors Constructor Description LinkedTransferQueue()
Creates an initially empty LinkedTransferQueue.LinkedTransferQueue(java.util.Collection<? extends E> c)
Creates a LinkedTransferQueue initially containing the elements of the given collection, added in traversal order of the collection's iterator.
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description private boolean
advanceHead(LinkedTransferQueue.QNode h, LinkedTransferQueue.QNode nh)
Tries to cas nh as new head; if successful, unlink old head's next node to avoid garbage retention.private java.lang.Object
awaitFulfill(LinkedTransferQueue.QNode pred, LinkedTransferQueue.QNode s, java.lang.Object e, int mode, long nanos)
Spins/blocks until node s is fulfilled or caller gives up, depending on wait mode.(package private) E
cast(java.lang.Object e)
(package private) void
clean(LinkedTransferQueue.QNode pred, LinkedTransferQueue.QNode s)
Gets rid of cancelled node s with original predecessor pred.int
drainTo(java.util.Collection<? super E> c)
int
drainTo(java.util.Collection<? super E> c, int maxElements)
private java.lang.Object
fulfill(java.lang.Object e)
Version of xfer for poll() and tryTransfer, which simplifies control paths both here and in xferprivate LinkedTransferQueue.QNode
getValidatedTail()
Returns validated tail for use in cleaning methodsint
getWaitingConsumerCount()
boolean
hasWaitingConsumer()
boolean
isEmpty()
java.util.Iterator<E>
iterator()
boolean
offer(E e)
boolean
offer(E e, long timeout, java.util.concurrent.TimeUnit unit)
E
peek()
E
poll()
E
poll(long timeout, java.util.concurrent.TimeUnit unit)
void
put(E e)
private LinkedTransferQueue.QNode
reclean()
Tries to unsplice the cancelled node held in cleanMe that was previously uncleanable because it was at tail.int
remainingCapacity()
int
size()
Returns the number of elements in this queue.E
take()
void
transfer(E e)
(package private) LinkedTransferQueue.QNode
traversalHead()
Return head after performing any outstanding helping stepsboolean
tryTransfer(E e)
boolean
tryTransfer(E e, long timeout, java.util.concurrent.TimeUnit unit)
private java.lang.Object
xfer(java.lang.Object e, int mode, long nanos)
Puts or takes an item.-
Methods inherited from class java.util.AbstractCollection
contains, containsAll, remove, removeAll, retainAll, toArray, toArray, toString
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
-
-
-
-
Field Detail
-
NOWAIT
private static final int NOWAIT
- See Also:
- Constant Field Values
-
TIMEOUT
private static final int TIMEOUT
- See Also:
- Constant Field Values
-
WAIT
private static final int WAIT
- See Also:
- Constant Field Values
-
NCPUS
private static final int NCPUS
The number of CPUs, for spin control
-
maxTimedSpins
private static final int maxTimedSpins
The number of times to spin before blocking in timed waits. The value is empirically derived -- it works well across a variety of processors and OSes. Empirically, the best value seems not to vary with number of CPUs (beyond 2) so is just a constant.
-
maxUntimedSpins
private static final int maxUntimedSpins
The number of times to spin before blocking in untimed waits. This is greater than timed value because untimed waits spin faster since they don't need to check times on each spin.
-
spinForTimeoutThreshold
private static final long spinForTimeoutThreshold
The number of nanoseconds for which it is faster to spin rather than to use timed park. A rough estimate suffices.- See Also:
- Constant Field Values
-
head
private final LinkedTransferQueue.PaddedAtomicReference<LinkedTransferQueue.QNode> head
head of the queue
-
tail
private final LinkedTransferQueue.PaddedAtomicReference<LinkedTransferQueue.QNode> tail
tail of the queue
-
cleanMe
private final LinkedTransferQueue.PaddedAtomicReference<LinkedTransferQueue.QNode> cleanMe
Reference to a cancelled node that might not yet have been unlinked from queue because it was the last inserted node when it cancelled.
-
-
Constructor Detail
-
LinkedTransferQueue
public LinkedTransferQueue()
Creates an initially empty LinkedTransferQueue.
-
LinkedTransferQueue
public LinkedTransferQueue(java.util.Collection<? extends E> c)
Creates a LinkedTransferQueue initially containing the elements of the given collection, added in traversal order of the collection's iterator.- Parameters:
c
- the collection of elements to initially contain- Throws:
java.lang.NullPointerException
- if the specified collection or any of its elements are null
-
-
Method Detail
-
advanceHead
private boolean advanceHead(LinkedTransferQueue.QNode h, LinkedTransferQueue.QNode nh)
Tries to cas nh as new head; if successful, unlink old head's next node to avoid garbage retention.
-
xfer
private java.lang.Object xfer(java.lang.Object e, int mode, long nanos)
Puts or takes an item. Used for most queue operations (except poll() and tryTransfer()). See the similar code in SynchronousQueue for detailed explanation.- Parameters:
e
- the item or if null, signifies that this is a takemode
- the wait mode: NOWAIT, TIMEOUT, WAITnanos
- timeout in nanosecs, used only if mode is TIMEOUT- Returns:
- an item, or null on failure
-
fulfill
private java.lang.Object fulfill(java.lang.Object e)
Version of xfer for poll() and tryTransfer, which simplifies control paths both here and in xfer
-
awaitFulfill
private java.lang.Object awaitFulfill(LinkedTransferQueue.QNode pred, LinkedTransferQueue.QNode s, java.lang.Object e, int mode, long nanos)
Spins/blocks until node s is fulfilled or caller gives up, depending on wait mode.- Parameters:
pred
- the predecessor of waiting nodes
- the waiting nodee
- the comparison value for checking matchmode
- modenanos
- timeout value- Returns:
- matched item, or s if cancelled
-
getValidatedTail
private LinkedTransferQueue.QNode getValidatedTail()
Returns validated tail for use in cleaning methods
-
clean
void clean(LinkedTransferQueue.QNode pred, LinkedTransferQueue.QNode s)
Gets rid of cancelled node s with original predecessor pred.- Parameters:
pred
- predecessor of cancelled nodes
- the cancelled node
-
reclean
private LinkedTransferQueue.QNode reclean()
Tries to unsplice the cancelled node held in cleanMe that was previously uncleanable because it was at tail.- Returns:
- current cleanMe node (or null)
-
cast
E cast(java.lang.Object e)
-
put
public void put(E e) throws java.lang.InterruptedException
- Specified by:
put
in interfacejava.util.concurrent.BlockingQueue<E>
- Throws:
java.lang.InterruptedException
-
offer
public boolean offer(E e, long timeout, java.util.concurrent.TimeUnit unit) throws java.lang.InterruptedException
- Specified by:
offer
in interfacejava.util.concurrent.BlockingQueue<E>
- Throws:
java.lang.InterruptedException
-
offer
public boolean offer(E e)
-
transfer
public void transfer(E e) throws java.lang.InterruptedException
- Throws:
java.lang.InterruptedException
-
tryTransfer
public boolean tryTransfer(E e, long timeout, java.util.concurrent.TimeUnit unit) throws java.lang.InterruptedException
- Throws:
java.lang.InterruptedException
-
tryTransfer
public boolean tryTransfer(E e)
-
take
public E take() throws java.lang.InterruptedException
- Specified by:
take
in interfacejava.util.concurrent.BlockingQueue<E>
- Throws:
java.lang.InterruptedException
-
poll
public E poll(long timeout, java.util.concurrent.TimeUnit unit) throws java.lang.InterruptedException
- Specified by:
poll
in interfacejava.util.concurrent.BlockingQueue<E>
- Throws:
java.lang.InterruptedException
-
drainTo
public int drainTo(java.util.Collection<? super E> c)
- Specified by:
drainTo
in interfacejava.util.concurrent.BlockingQueue<E>
-
drainTo
public int drainTo(java.util.Collection<? super E> c, int maxElements)
- Specified by:
drainTo
in interfacejava.util.concurrent.BlockingQueue<E>
-
traversalHead
LinkedTransferQueue.QNode traversalHead()
Return head after performing any outstanding helping steps
-
iterator
public java.util.Iterator<E> iterator()
-
isEmpty
public boolean isEmpty()
-
hasWaitingConsumer
public boolean hasWaitingConsumer()
-
size
public int size()
Returns the number of elements in this queue. If this queue contains more than Integer.MAX_VALUE elements, returns Integer.MAX_VALUE.Beware that, unlike in most collections, this method is NOT a constant-time operation. Because of the asynchronous nature of these queues, determining the current number of elements requires an O(n) traversal.
-
getWaitingConsumerCount
public int getWaitingConsumerCount()
-
remainingCapacity
public int remainingCapacity()
- Specified by:
remainingCapacity
in interfacejava.util.concurrent.BlockingQueue<E>
-
-