Class WorkQueue<T>
- java.lang.Object
-
- nonapi.io.github.classgraph.concurrency.WorkQueue<T>
-
- Type Parameters:
T
- The work unit type.
- All Implemented Interfaces:
java.lang.AutoCloseable
public class WorkQueue<T> extends java.lang.Object implements java.lang.AutoCloseable
A parallel work queue.
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static interface
WorkQueue.WorkUnitProcessor<T>
A work unit processor.private static class
WorkQueue.WorkUnitWrapper<T>
A wrapper for work units (needed to send a poison pill as a null value, since BlockingQueue does not accept null values).
-
Field Summary
Fields Modifier and Type Field Description private InterruptionChecker
interruptionChecker
The shared InterruptionChecker, used to detect thread interruption and execution exceptions, and to shut down all threads if either of these occurs.private LogNode
log
The log node.private java.util.concurrent.atomic.AtomicInteger
numIncompleteWorkUnits
The number of work units remaining to be processed, plus the number of currently running threads working on a work unit.private int
numWorkers
The number of workers.private java.util.concurrent.ConcurrentLinkedQueue<java.util.concurrent.Future<?>>
workerFutures
The Future object added for each worker, used to detect worker completion.private WorkQueue.WorkUnitProcessor<T>
workUnitProcessor
The work unit processor.private java.util.concurrent.BlockingQueue<WorkQueue.WorkUnitWrapper<T>>
workUnits
The queue of work units.
-
Constructor Summary
Constructors Modifier Constructor Description private
WorkQueue(java.util.Collection<T> initialWorkUnits, WorkQueue.WorkUnitProcessor<T> workUnitProcessor, int numWorkers, InterruptionChecker interruptionChecker, LogNode log)
A parallel work queue.
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description void
addWorkUnit(T workUnit)
Add a unit of work.void
addWorkUnits(java.util.Collection<T> workUnits)
Add multiple units of work.void
close()
Completion barrier for work queue.private void
runWorkLoop()
Start a worker.static <U> void
runWorkQueue(java.util.Collection<U> elements, java.util.concurrent.ExecutorService executorService, InterruptionChecker interruptionChecker, int numParallelTasks, LogNode log, WorkQueue.WorkUnitProcessor<U> workUnitProcessor)
Start a work queue on the elements in the provided collection, blocking until all work units have been completed.private void
sendPoisonPills()
Send poison pills to workers.private void
startWorkers(java.util.concurrent.ExecutorService executorService, int numTasks)
Start worker threads with a shared log.
-
-
-
Field Detail
-
workUnitProcessor
private final WorkQueue.WorkUnitProcessor<T> workUnitProcessor
The work unit processor.
-
workUnits
private final java.util.concurrent.BlockingQueue<WorkQueue.WorkUnitWrapper<T>> workUnits
The queue of work units.
-
numWorkers
private final int numWorkers
The number of workers.
-
numIncompleteWorkUnits
private final java.util.concurrent.atomic.AtomicInteger numIncompleteWorkUnits
The number of work units remaining to be processed, plus the number of currently running threads working on a work unit.
-
workerFutures
private final java.util.concurrent.ConcurrentLinkedQueue<java.util.concurrent.Future<?>> workerFutures
The Future object added for each worker, used to detect worker completion.
-
interruptionChecker
private final InterruptionChecker interruptionChecker
The shared InterruptionChecker, used to detect thread interruption and execution exceptions, and to shut down all threads if either of these occurs.
-
log
private final LogNode log
The log node.
-
-
Constructor Detail
-
WorkQueue
private WorkQueue(java.util.Collection<T> initialWorkUnits, WorkQueue.WorkUnitProcessor<T> workUnitProcessor, int numWorkers, InterruptionChecker interruptionChecker, LogNode log)
A parallel work queue.- Parameters:
initialWorkUnits
- the initial work unitsworkUnitProcessor
- the work unit processornumWorkers
- the number of workersinterruptionChecker
- the interruption checkerlog
- the log
-
-
Method Detail
-
runWorkQueue
public static <U> void runWorkQueue(java.util.Collection<U> elements, java.util.concurrent.ExecutorService executorService, InterruptionChecker interruptionChecker, int numParallelTasks, LogNode log, WorkQueue.WorkUnitProcessor<U> workUnitProcessor) throws java.lang.InterruptedException, java.util.concurrent.ExecutionException
Start a work queue on the elements in the provided collection, blocking until all work units have been completed.- Type Parameters:
U
- The type of the work queue units.- Parameters:
elements
- The work queue units to process.executorService
- TheExecutorService
.interruptionChecker
- the interruption checkernumParallelTasks
- The number of parallel tasks.log
- The log.workUnitProcessor
- TheWorkQueue.WorkUnitProcessor
.- Throws:
java.lang.InterruptedException
- If the work was interrupted.java.util.concurrent.ExecutionException
- If a worker throws an uncaught exception.
-
startWorkers
private void startWorkers(java.util.concurrent.ExecutorService executorService, int numTasks)
Start worker threads with a shared log.- Parameters:
executorService
- the executor servicenumTasks
- the number of worker tasks to start
-
sendPoisonPills
private void sendPoisonPills()
Send poison pills to workers.
-
runWorkLoop
private void runWorkLoop() throws java.lang.InterruptedException, java.util.concurrent.ExecutionException
Start a worker. Called by startWorkers(), but should also be called by the main thread to do some of the work on that thread, to prevent deadlock in the case that the ExecutorService doesn't have as many threads available as numParallelTasks. When this method returns, either all the work has been completed, or this or some other thread was interrupted. If InterruptedException is thrown, this thread or another was interrupted.- Throws:
java.lang.InterruptedException
- if a worker thread was interruptedjava.util.concurrent.ExecutionException
- if a worker thread throws an uncaught exception
-
addWorkUnit
public void addWorkUnit(T workUnit)
Add a unit of work. May be called by workers to add more work units to the tail of the queue.- Parameters:
workUnit
- the work unit- Throws:
java.lang.NullPointerException
- if the work unit is null.
-
addWorkUnits
public void addWorkUnits(java.util.Collection<T> workUnits)
Add multiple units of work. May be called by workers to add more work units to the tail of the queue.- Parameters:
workUnits
- The work units to add to the tail of the queue.- Throws:
java.lang.NullPointerException
- if any of the work units are null.
-
close
public void close() throws java.util.concurrent.ExecutionException
Completion barrier for work queue. This should be called after runWorkLoop() exits on the main thread (e.g. using try-with-resources).- Specified by:
close
in interfacejava.lang.AutoCloseable
- Throws:
java.util.concurrent.ExecutionException
- If a worker threw an uncaught exception.
-
-