Class MultithreadConcurrentQueue<E>
java.lang.Object
com.conversantmedia.util.concurrent.MultithreadConcurrentQueue<E>
- All Implemented Interfaces:
ConcurrentQueue<E>
- Direct Known Subclasses:
DisruptorBlockingQueue
This is the disruptor implemented for multiple simultaneous reader and writer threads.
This data structure approaches 20-40ns for transfers on fast hardware.
This code is optimized and tested using a 64bit HotSpot JVM on an Intel x86-64 environment. Other
environments should be carefully tested before using in production.
Created by jcairns on 5/29/14.
-
Field Summary
FieldsModifier and TypeFieldDescription(package private) long
(package private) long
(package private) long
(package private) long
(package private) long
(package private) long
(package private) long
(package private) long
(package private) final E[]
(package private) long
(package private) long
(package private) long
(package private) long
(package private) long
(package private) long
(package private) long
(package private) long
(package private) final LongAdder
(package private) long
(package private) final ContendedAtomicLong
(package private) final long
(package private) long
(package private) long
(package private) long
(package private) long
(package private) long
(package private) long
(package private) long
(package private) long
(package private) long
(package private) long
(package private) long
(package private) long
(package private) long
(package private) long
protected final int
(package private) final LongAdder
(package private) long
(package private) final ContendedAtomicLong
-
Constructor Summary
ConstructorsConstructorDescriptionMultithreadConcurrentQueue
(int capacity) Construct a blocking queue of the given fixed capacity. -
Method Summary
Modifier and TypeMethodDescriptionint
capacity()
void
clear()
clear the queue of all elementsfinal boolean
final boolean
isEmpty()
boolean
Add element t to the ringfinal E
peek()
return the first element in the queuepoll()
remove the first element from the queue and return itint
return all elements in the queue to the provided array, up to the size of the provided array.final int
size()
This implemention is known to be broken if preemption were to occur after reading the tail pointer.(package private) long
-
Field Details
-
size
protected final int size -
mask
final long mask -
tail
-
tailCursor
-
p1
long p1 -
p2
long p2 -
p3
long p3 -
p4
long p4 -
p5
long p5 -
p6
long p6 -
p7
long p7 -
tailCache
long tailCache -
a1
long a1 -
a2
long a2 -
a3
long a3 -
a4
long a4 -
a5
long a5 -
a6
long a6 -
a7
long a7 -
a8
long a8 -
buffer
-
r1
long r1 -
r2
long r2 -
r3
long r3 -
r4
long r4 -
r5
long r5 -
r6
long r6 -
r7
long r7 -
headCache
long headCache -
c1
long c1 -
c2
long c2 -
c3
long c3 -
c4
long c4 -
c5
long c5 -
c6
long c6 -
c7
long c7 -
c8
long c8 -
head
-
headCursor
-
-
Constructor Details
-
MultithreadConcurrentQueue
public MultithreadConcurrentQueue(int capacity) Construct a blocking queue of the given fixed capacity. Note: actual capacity will be the next power of two larger than capacity.- Parameters:
capacity
- maximum capacity of this queue
-
-
Method Details
-
offer
Description copied from interface:ConcurrentQueue
Add element t to the ring- Specified by:
offer
in interfaceConcurrentQueue<E>
- Parameters:
e
- - element to offer- Returns:
- boolean - true if the operation succeeded
-
poll
Description copied from interface:ConcurrentQueue
remove the first element from the queue and return it- Specified by:
poll
in interfaceConcurrentQueue<E>
- Returns:
- T
-
peek
Description copied from interface:ConcurrentQueue
return the first element in the queue- Specified by:
peek
in interfaceConcurrentQueue<E>
- Returns:
- E - The element
-
remove
Description copied from interface:ConcurrentQueue
return all elements in the queue to the provided array, up to the size of the provided array.- Specified by:
remove
in interfaceConcurrentQueue<E>
- Parameters:
e
- - The element array- Returns:
- int - the number of elements added to t
-
size
public final int size()This implemention is known to be broken if preemption were to occur after reading the tail pointer. Code should not depend on size for a correct result.- Specified by:
size
in interfaceConcurrentQueue<E>
- Returns:
- int - possibly the size, or possibly any value less than capacity()
-
capacity
public int capacity()- Specified by:
capacity
in interfaceConcurrentQueue<E>
- Returns:
- int - the capacity of the queue
-
isEmpty
public final boolean isEmpty()- Specified by:
isEmpty
in interfaceConcurrentQueue<E>
- Returns:
- boolean - true if the queue is currently empty
-
clear
public void clear()Description copied from interface:ConcurrentQueue
clear the queue of all elements- Specified by:
clear
in interfaceConcurrentQueue<E>
-
contains
- Specified by:
contains
in interfaceConcurrentQueue<E>
- Parameters:
o
- - the object to test- Returns:
- boolean - true if specified object is contained in the queue
-
sumToAvoidOptimization
long sumToAvoidOptimization()
-