Class ManyToManyConcurrentArrayQueue<E>
- java.lang.Object
-
- org.agrona.concurrent.AbstractConcurrentArrayQueuePadding1
-
- org.agrona.concurrent.AbstractConcurrentArrayQueueProducer
-
- org.agrona.concurrent.AbstractConcurrentArrayQueuePadding2
-
- org.agrona.concurrent.AbstractConcurrentArrayQueueConsumer
-
- org.agrona.concurrent.AbstractConcurrentArrayQueuePadding3
-
- org.agrona.concurrent.AbstractConcurrentArrayQueue<E>
-
- org.agrona.concurrent.ManyToManyConcurrentArrayQueue<E>
-
- Type Parameters:
E
- type of the elements stored in theQueue
.
- All Implemented Interfaces:
java.lang.Iterable<E>
,java.util.Collection<E>
,java.util.Queue<E>
,Pipe<E>
,QueuedPipe<E>
public class ManyToManyConcurrentArrayQueue<E> extends AbstractConcurrentArrayQueue<E>
Many producer to many consumer concurrent queue that is array backed.This is a Java port of Dmitry Vyukov's MPMC queue.
Note: This queue breaks the contract for peek and poll in that it can return null when the queue has no item available but size could be greater than zero if an offer is in progress. This is due to the offer being a multiple step process which can start and be interrupted before completion, the thread will later be resumed and the offer process completes. Other methods, such as peek and poll, could spin internally waiting on the offer to complete to provide sequentially consistency across methods but this can have a detrimental effect in a resource starved system. This internal spinning eats up a CPU core and prevents other threads making progress resulting in latency spikes. To avoid this a more relaxed approach is taken in that an in-progress offer is not waited on to complete. The poll method has similar properties for the multi-consumer implementation.
If you wish to check for empty then call
AbstractConcurrentArrayQueue.isEmpty()
rather thanAbstractConcurrentArrayQueue.size()
checking for zero.
-
-
Field Summary
Fields Modifier and Type Field Description private long[]
sequences
private static int
SEQUENCES_ARRAY_BASE
-
Fields inherited from class org.agrona.concurrent.AbstractConcurrentArrayQueue
buffer, BUFFER_ARRAY_BASE, capacity, HEAD_OFFSET, SHARED_HEAD_CACHE_OFFSET, SHIFT_FOR_SCALE, TAIL_OFFSET
-
Fields inherited from class org.agrona.concurrent.AbstractConcurrentArrayQueuePadding3
p128, p129, p130, p131, p132, p133, p134, p135, p136, p137, p138, p139, p140, p142, p143, p144, p145, p146, p147, p148, p149, p150, p151, p152, p153, p154, p155, p156, p157, p158, p159, p160, p161, p162, p163, p164, p165, p166, p167, p168, p169, p170, p171, p172, p173, p174, p175, p176, p177, p178, p179, p180, p181, p182, p183, p184, p185, p186, p187, p189, p190, p191, p192, p193
-
Fields inherited from class org.agrona.concurrent.AbstractConcurrentArrayQueueConsumer
head
-
Fields inherited from class org.agrona.concurrent.AbstractConcurrentArrayQueuePadding2
p064, p065, p066, p067, p068, p069, p070, p071, p072, p073, p074, p075, p076, p077, p078, p079, p080, p081, p082, p083, p084, p085, p086, p087, p088, p089, p090, p091, p092, p093, p094, p095, p096, p097, p098, p099, p100, p101, p102, p103, p104, p105, p106, p107, p108, p109, p110, p111, p112, p113, p114, p115, p116, p117, p118, p119, p120, p121, p122, p123, p124, p125, p126, p127
-
Fields inherited from class org.agrona.concurrent.AbstractConcurrentArrayQueueProducer
headCache, sharedHeadCache, tail
-
Fields inherited from class org.agrona.concurrent.AbstractConcurrentArrayQueuePadding1
p000, p001, p002, p003, p004, p005, p006, p007, p008, p009, p010, p011, p012, p013, p014, p015, p016, p017, p018, p019, p020, p021, p022, p023, p024, p025, p026, p027, p028, p029, p030, p031, p032, p033, p034, p035, p036, p037, p038, p039, p040, p041, p042, p043, p044, p045, p046, p047, p048, p049, p050, p051, p052, p053, p054, p055, p056, p057, p058, p059, p060, p061, p062, p063
-
-
Constructor Summary
Constructors Constructor Description ManyToManyConcurrentArrayQueue(int requestedCapacity)
Create a new queue with a bounded capacity.
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description int
drain(java.util.function.Consumer<E> elementConsumer)
Drain the number of elements present in a collection at the time the operation starts.int
drain(java.util.function.Consumer<E> elementConsumer, int limit)
Drain the minimum of a limit and the number of elements present in a collection at the time the operation starts.int
drainTo(java.util.Collection<? super E> target, int limit)
Drain available elements into the providedCollection
up to a provided maximum limit of elements.boolean
offer(E e)
E
peek()
E
poll()
private static long
sequenceArrayOffset(long sequence, long mask)
-
Methods inherited from class org.agrona.concurrent.AbstractConcurrentArrayQueue
add, addAll, addedCount, capacity, clear, contains, containsAll, element, isEmpty, iterator, remainingCapacity, remove, remove, removeAll, removedCount, retainAll, sequenceToBufferOffset, size, toArray, toArray
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
-
-
-
Constructor Detail
-
ManyToManyConcurrentArrayQueue
public ManyToManyConcurrentArrayQueue(int requestedCapacity)
Create a new queue with a bounded capacity. The requested capacity will be rounded up to the next positive power-of-two in size. That is if you request a capacity of 1000 then you will get 1024. If you request 1024 then that is what you will get.- Parameters:
requestedCapacity
- of the queue which must be >= 2.- Throws:
java.lang.IllegalArgumentException
- if the requestedCapacity < 2.
-
-
Method Detail
-
offer
public boolean offer(E e)
-
poll
public E poll()
-
peek
public E peek()
- Specified by:
peek
in interfacejava.util.Queue<E>
- Overrides:
peek
in classAbstractConcurrentArrayQueue<E>
-
drain
public int drain(java.util.function.Consumer<E> elementConsumer)
Drain the number of elements present in a collection at the time the operation starts.If possible, implementations should use smart batching to best handle burst traffic.
- Parameters:
elementConsumer
-Consumer
for processing elements.- Returns:
- the number of elements drained.
-
drain
public int drain(java.util.function.Consumer<E> elementConsumer, int limit)
Drain the minimum of a limit and the number of elements present in a collection at the time the operation starts.If possible, implementations should use smart batching to best handle burst traffic.
- Parameters:
elementConsumer
-Consumer
for processing elements.limit
- maximum number of elements to be drained in a drain operation.- Returns:
- the number of elements drained.
-
drainTo
public int drainTo(java.util.Collection<? super E> target, int limit)
Drain available elements into the providedCollection
up to a provided maximum limit of elements.If possible, implementations should use smart batching to best handle burst traffic.
- Parameters:
target
- in to which elements are drained.limit
- maximum number of elements to be drained in a drain operation.- Returns:
- the number of elements actually drained.
-
sequenceArrayOffset
private static long sequenceArrayOffset(long sequence, long mask)
-
-