Package com.rabbitmq.client.impl
Class WorkPool<K,W>
- java.lang.Object
-
- com.rabbitmq.client.impl.WorkPool<K,W>
-
- Type Parameters:
K
- Key -- type of clientW
- Work -- type of work item
public class WorkPool<K,W> extends java.lang.Object
This is a generic implementation of the channels specification in Channeling Work, Nov 2010 (channels.pdf). Objects of type K must be registered, with
Each client has a state which is exactly one of dormant, in progress or ready. Immediately after registration a client is dormant. Items may be (singly) added to (the end of) a client's queue withregisterKey(K)
, and then they become clients and a queue of items (type W) is stored for each client.addWorkItem(Object, Object)
. If the client is dormant it becomes ready thereby. All other states remain unchanged. The next ready client, together with a collection of its items, may be retrieved withnextWorkBlock(collection,max)
(making that client in progress). An in progress client can finish (processing a batch of items) withfinishWorkBlock(K)
. It then becomes either dormant or ready, depending if its queue of work items is empty or no. If a client has items queued, it is either in progress or ready but cannot be both. When work is finished it may be marked ready if there is further work, or dormant if there is not. There is never any work for a dormant client. A client may be unregistered, withunregisterKey(K)
, which removes the client from all parts of the state, and any queue of items stored with it. All clients may be unregistered withunregisterAllKeys()
.Concurrent Semantics
This implementation is thread-safe.
-
-
Field Summary
Fields Modifier and Type Field Description private java.util.function.BiConsumer<VariableLinkedBlockingQueue<W>,W>
enqueueingCallback
private java.util.Set<K>
inProgress
The set of clients which have work in progress.private static int
MAX_QUEUE_LENGTH
private java.util.Map<K,VariableLinkedBlockingQueue<W>>
pool
The pool of registered clients, with their work queues.private SetQueue<K>
ready
An injective queue of ready clients.private java.util.Set<K>
unlimited
Those keys which want limits to be removed.
-
Constructor Summary
Constructors Constructor Description WorkPool(int queueingTimeout)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description boolean
addWorkItem(K key, W item)
Add (enqueue) an item for a specific client.private void
dormantToReady(K key)
private int
drainTo(VariableLinkedBlockingQueue<W> deList, java.util.Collection<W> c, int maxElements)
Private implementation ofdrainTo
(not implemented forLinkedList<W>
s).boolean
finishWorkBlock(K key)
Set client no longer in progress.private void
inProgressToDormant(K key)
private void
inProgressToReady(K key)
private boolean
isDormant(K key)
private boolean
isInProgress(K key)
private boolean
isReady(K key)
private boolean
isRegistered(K key)
void
limit(K key)
private boolean
moreWorkItems(K key)
K
nextWorkBlock(java.util.Collection<W> to, int size)
Return the next ready client, and transfer a collection of that client's items to process.private K
readyToInProgress()
void
registerKey(K key)
Add clientkey
to pool of item queues, with an empty queue.private void
setCapacities(int capacity)
void
unlimit(K key)
void
unregisterAllKeys()
Remove all clients from pool and from any other state.void
unregisterKey(K key)
Remove client from pool and from any other state.
-
-
-
Field Detail
-
MAX_QUEUE_LENGTH
private static final int MAX_QUEUE_LENGTH
- See Also:
- Constant Field Values
-
inProgress
private final java.util.Set<K> inProgress
The set of clients which have work in progress.
-
pool
private final java.util.Map<K,VariableLinkedBlockingQueue<W>> pool
The pool of registered clients, with their work queues.
-
unlimited
private final java.util.Set<K> unlimited
Those keys which want limits to be removed. We do not limit queue size if this is non-empty.
-
enqueueingCallback
private final java.util.function.BiConsumer<VariableLinkedBlockingQueue<W>,W> enqueueingCallback
-
-
Method Detail
-
registerKey
public void registerKey(K key)
Add clientkey
to pool of item queues, with an empty queue. A client is initially dormant. No-op ifkey
already present.- Parameters:
key
- client to add to pool
-
limit
public void limit(K key)
-
unlimit
public void unlimit(K key)
-
setCapacities
private void setCapacities(int capacity)
-
unregisterKey
public void unregisterKey(K key)
Remove client from pool and from any other state. Has no effect if client already absent.- Parameters:
key
- of client to unregister
-
unregisterAllKeys
public void unregisterAllKeys()
Remove all clients from pool and from any other state.
-
nextWorkBlock
public K nextWorkBlock(java.util.Collection<W> to, int size)
Return the next ready client, and transfer a collection of that client's items to process. Mark client in progress. If there is no ready client, returnnull
.- Parameters:
to
- collection object in which to transfer itemssize
- max number of items to transfer- Returns:
- key of client to whom items belong, or
null
if there is none.
-
drainTo
private int drainTo(VariableLinkedBlockingQueue<W> deList, java.util.Collection<W> c, int maxElements)
Private implementation ofdrainTo
(not implemented forLinkedList<W>
s).- Parameters:
deList
- to take (poll) elements fromc
- to add elements tomaxElements
- to take from deList- Returns:
- number of elements actually taken
-
addWorkItem
public boolean addWorkItem(K key, W item)
Add (enqueue) an item for a specific client. No change and returnsfalse
if client not registered. If dormant, the client will be marked ready.- Parameters:
key
- the client to add to the work item toitem
- the work item to add to the client queue- Returns:
true
if and only if the client is marked ready — as a result of this work item
-
finishWorkBlock
public boolean finishWorkBlock(K key)
Set client no longer in progress. Ignore unknown clients (and returnfalse
).- Parameters:
key
- client that has finished work- Returns:
true
if and only if client becomes ready- Throws:
java.lang.IllegalStateException
- if registered client not in progress
-
moreWorkItems
private boolean moreWorkItems(K key)
-
isInProgress
private boolean isInProgress(K key)
-
isReady
private boolean isReady(K key)
-
isRegistered
private boolean isRegistered(K key)
-
isDormant
private boolean isDormant(K key)
-
inProgressToReady
private void inProgressToReady(K key)
-
inProgressToDormant
private void inProgressToDormant(K key)
-
dormantToReady
private void dormantToReady(K key)
-
readyToInProgress
private K readyToInProgress()
-
-