Class WorkPool<K,W>

java.lang.Object
com.rabbitmq.client.impl.WorkPool<K,W>
Type Parameters:
K - Key -- type of client
W - Work -- type of work item

public class WorkPool<K,W> extends Object

This is a generic implementation of the channels specification in Channeling Work, Nov 2010 (channels.pdf). Objects of type K must be registered, with registerKey(K), and then they become clients and a queue of items (type W) is stored for each client.

Each client has a state which is exactly one of dormant, in progress or ready. Immediately after registration a client is dormant. Items may be (singly) added to (the end of) a client's queue with addWorkItem(Object, Object). If the client is dormant it becomes ready thereby. All other states remain unchanged. The next ready client, together with a collection of its items, may be retrieved with nextWorkBlock(collection,max) (making that client in progress). An in progress client can finish (processing a batch of items) with finishWorkBlock(K). It then becomes either dormant or ready, depending if its queue of work items is empty or no. If a client has items queued, it is either in progress or ready but cannot be both. When work is finished it may be marked ready if there is further work, or dormant if there is not. There is never any work for a dormant client. A client may be unregistered, with unregisterKey(K), which removes the client from all parts of the state, and any queue of items stored with it. All clients may be unregistered with unregisterAllKeys().

Concurrent Semantics

This implementation is thread-safe.
  • Field Details

    • MAX_QUEUE_LENGTH

      private static final int MAX_QUEUE_LENGTH
      See Also:
    • ready

      private final SetQueue<K> ready
      An injective queue of ready clients.
    • inProgress

      private final Set<K> inProgress
      The set of clients which have work in progress.
    • pool

      private final Map<K,VariableLinkedBlockingQueue<W>> pool
      The pool of registered clients, with their work queues.
    • unlimited

      private final Set<K> unlimited
      Those keys which want limits to be removed. We do not limit queue size if this is non-empty.
    • enqueueingCallback

      private final BiConsumer<VariableLinkedBlockingQueue<W>,W> enqueueingCallback
  • Constructor Details

    • WorkPool

      public WorkPool(int queueingTimeout)
  • Method Details

    • registerKey

      public void registerKey(K key)
      Add client key to pool of item queues, with an empty queue. A client is initially dormant. No-op if key already present.
      Parameters:
      key - client to add to pool
    • limit

      public void limit(K key)
    • unlimit

      public void unlimit(K key)
    • setCapacities

      private void setCapacities(int capacity)
    • unregisterKey

      public void unregisterKey(K key)
      Remove client from pool and from any other state. Has no effect if client already absent.
      Parameters:
      key - of client to unregister
    • unregisterAllKeys

      public void unregisterAllKeys()
      Remove all clients from pool and from any other state.
    • nextWorkBlock

      public K nextWorkBlock(Collection<W> to, int size)
      Return the next ready client, and transfer a collection of that client's items to process. Mark client in progress. If there is no ready client, return null.
      Parameters:
      to - collection object in which to transfer items
      size - max number of items to transfer
      Returns:
      key of client to whom items belong, or null if there is none.
    • drainTo

      private int drainTo(VariableLinkedBlockingQueue<W> deList, Collection<W> c, int maxElements)
      Private implementation of drainTo (not implemented for LinkedList<W>s).
      Parameters:
      deList - to take (poll) elements from
      c - to add elements to
      maxElements - to take from deList
      Returns:
      number of elements actually taken
    • addWorkItem

      public boolean addWorkItem(K key, W item)
      Add (enqueue) an item for a specific client. No change and returns false if client not registered. If dormant, the client will be marked ready.
      Parameters:
      key - the client to add to the work item to
      item - the work item to add to the client queue
      Returns:
      true if and only if the client is marked readyas a result of this work item
    • finishWorkBlock

      public boolean finishWorkBlock(K key)
      Set client no longer in progress. Ignore unknown clients (and return false).
      Parameters:
      key - client that has finished work
      Returns:
      true if and only if client becomes ready
      Throws:
      IllegalStateException - if registered client not in progress
    • moreWorkItems

      private boolean moreWorkItems(K key)
    • isInProgress

      private boolean isInProgress(K key)
    • isReady

      private boolean isReady(K key)
    • isRegistered

      private boolean isRegistered(K key)
    • isDormant

      private boolean isDormant(K key)
    • inProgressToReady

      private void inProgressToReady(K key)
    • inProgressToDormant

      private void inProgressToDormant(K key)
    • dormantToReady

      private void dormantToReady(K key)
    • readyToInProgress

      private K readyToInProgress()