Class WorkQueue<T>

  • Type Parameters:
    T - The work unit type.
    All Implemented Interfaces:
    java.lang.AutoCloseable

    public class WorkQueue<T>
    extends java.lang.Object
    implements java.lang.AutoCloseable
    A parallel work queue.
    • Nested Class Summary

      Nested Classes 
      Modifier and Type Class Description
      static interface  WorkQueue.WorkUnitProcessor<T>
      A work unit processor.
      private static class  WorkQueue.WorkUnitWrapper<T>
      A wrapper for work units (needed to send a poison pill as a null value, since BlockingQueue does not accept null values).
    • Field Summary

      Fields 
      Modifier and Type Field Description
      private InterruptionChecker interruptionChecker
      The shared InterruptionChecker, used to detect thread interruption and execution exceptions, and to shut down all threads if either of these occurs.
      private LogNode log
      The log node.
      private java.util.concurrent.atomic.AtomicInteger numIncompleteWorkUnits
      The number of work units remaining to be processed, plus the number of currently running threads working on a work unit.
      private int numWorkers
      The number of workers.
      private java.util.concurrent.ConcurrentLinkedQueue<java.util.concurrent.Future<?>> workerFutures
      The Future object added for each worker, used to detect worker completion.
      private WorkQueue.WorkUnitProcessor<T> workUnitProcessor
      The work unit processor.
      private java.util.concurrent.BlockingQueue<WorkQueue.WorkUnitWrapper<T>> workUnits
      The queue of work units.
    • Method Summary

      All Methods Static Methods Instance Methods Concrete Methods 
      Modifier and Type Method Description
      void addWorkUnit​(T workUnit)
      Add a unit of work.
      void addWorkUnits​(java.util.Collection<T> workUnits)
      Add multiple units of work.
      void close()
      Completion barrier for work queue.
      private void runWorkLoop()
      Start a worker.
      static <U> void runWorkQueue​(java.util.Collection<U> elements, java.util.concurrent.ExecutorService executorService, InterruptionChecker interruptionChecker, int numParallelTasks, LogNode log, WorkQueue.WorkUnitProcessor<U> workUnitProcessor)
      Start a work queue on the elements in the provided collection, blocking until all work units have been completed.
      private void sendPoisonPills()
      Send poison pills to workers.
      private void startWorkers​(java.util.concurrent.ExecutorService executorService, int numTasks)
      Start worker threads with a shared log.
      • Methods inherited from class java.lang.Object

        clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
    • Field Detail

      • numWorkers

        private final int numWorkers
        The number of workers.
      • numIncompleteWorkUnits

        private final java.util.concurrent.atomic.AtomicInteger numIncompleteWorkUnits
        The number of work units remaining to be processed, plus the number of currently running threads working on a work unit.
      • workerFutures

        private final java.util.concurrent.ConcurrentLinkedQueue<java.util.concurrent.Future<?>> workerFutures
        The Future object added for each worker, used to detect worker completion.
      • interruptionChecker

        private final InterruptionChecker interruptionChecker
        The shared InterruptionChecker, used to detect thread interruption and execution exceptions, and to shut down all threads if either of these occurs.
      • log

        private final LogNode log
        The log node.
    • Constructor Detail

      • WorkQueue

        private WorkQueue​(java.util.Collection<T> initialWorkUnits,
                          WorkQueue.WorkUnitProcessor<T> workUnitProcessor,
                          int numWorkers,
                          InterruptionChecker interruptionChecker,
                          LogNode log)
        A parallel work queue.
        Parameters:
        initialWorkUnits - the initial work units
        workUnitProcessor - the work unit processor
        numWorkers - the number of workers
        interruptionChecker - the interruption checker
        log - the log
    • Method Detail

      • runWorkQueue

        public static <U> void runWorkQueue​(java.util.Collection<U> elements,
                                            java.util.concurrent.ExecutorService executorService,
                                            InterruptionChecker interruptionChecker,
                                            int numParallelTasks,
                                            LogNode log,
                                            WorkQueue.WorkUnitProcessor<U> workUnitProcessor)
                                     throws java.lang.InterruptedException,
                                            java.util.concurrent.ExecutionException
        Start a work queue on the elements in the provided collection, blocking until all work units have been completed.
        Type Parameters:
        U - The type of the work queue units.
        Parameters:
        elements - The work queue units to process.
        executorService - The ExecutorService.
        interruptionChecker - the interruption checker
        numParallelTasks - The number of parallel tasks.
        log - The log.
        workUnitProcessor - The WorkQueue.WorkUnitProcessor.
        Throws:
        java.lang.InterruptedException - If the work was interrupted.
        java.util.concurrent.ExecutionException - If a worker throws an uncaught exception.
      • startWorkers

        private void startWorkers​(java.util.concurrent.ExecutorService executorService,
                                  int numTasks)
        Start worker threads with a shared log.
        Parameters:
        executorService - the executor service
        numTasks - the number of worker tasks to start
      • sendPoisonPills

        private void sendPoisonPills()
        Send poison pills to workers.
      • runWorkLoop

        private void runWorkLoop()
                          throws java.lang.InterruptedException,
                                 java.util.concurrent.ExecutionException
        Start a worker. Called by startWorkers(), but should also be called by the main thread to do some of the work on that thread, to prevent deadlock in the case that the ExecutorService doesn't have as many threads available as numParallelTasks. When this method returns, either all the work has been completed, or this or some other thread was interrupted. If InterruptedException is thrown, this thread or another was interrupted.
        Throws:
        java.lang.InterruptedException - if a worker thread was interrupted
        java.util.concurrent.ExecutionException - if a worker thread throws an uncaught exception
      • addWorkUnit

        public void addWorkUnit​(T workUnit)
        Add a unit of work. May be called by workers to add more work units to the tail of the queue.
        Parameters:
        workUnit - the work unit
        Throws:
        java.lang.NullPointerException - if the work unit is null.
      • addWorkUnits

        public void addWorkUnits​(java.util.Collection<T> workUnits)
        Add multiple units of work. May be called by workers to add more work units to the tail of the queue.
        Parameters:
        workUnits - The work units to add to the tail of the queue.
        Throws:
        java.lang.NullPointerException - if any of the work units are null.
      • close

        public void close()
                   throws java.util.concurrent.ExecutionException
        Completion barrier for work queue. This should be called after runWorkLoop() exits on the main thread (e.g. using try-with-resources).
        Specified by:
        close in interface java.lang.AutoCloseable
        Throws:
        java.util.concurrent.ExecutionException - If a worker threw an uncaught exception.