Package org.ojalgo.concurrent
Class ProcessingService
java.lang.Object
org.ojalgo.concurrent.ProcessingService
A simple wrapper around an
ExecutorService
that makes it easier to process collections of items in
parallel. The work items are processed by a Consumer
, Function
or TwoStepMapper
. In
particular the TwoStepMapper
can be used to aggregate/reduce data from the work items and then
combine the collected data into a final result.-
Nested Class Summary
Nested ClassesModifier and TypeClassDescription(package private) static final class
(package private) static final class
-
Field Summary
Fields -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescription<W,
R> Map <W, R> compute
(Collection<W> work, int parallelism, Function<W, R> computer) Compute an output item for each (unique) input item, and return the results as aMap
.<W,
R> Map <W, R> compute
(Collection<W> work, Function<W, R> computer) Using parallelismParallelism.CORES
.<W,
R> Map <W, R> compute
(Collection<W> work, IntSupplier parallelism, Function<W, R> computer) divider()
<W,
R> Collection <R> map
(Collection<W> work, int parallelism, Function<W, R> mapper) Simply map each (unique) input item to an output item - aCollection
of input results in aCollection
of output.<W,
R> Collection <R> map
(Collection<W> work, Function<W, R> mapper) Using parallelismParallelism.CORES
.<W,
R> Collection <R> map
(Collection<W> work, IntSupplier parallelism, Function<W, R> mapper) static ProcessingService
newInstance
(String name) <W> void
process
(Collection<? extends W> work, int parallelism, Consumer<W> processor) Will create at mostparallelism
tasks to work through thework
items, processing them withprocessor
.<W> void
process
(Collection<? extends W> work, Consumer<W> processor) Using parallelismParallelism.CORES
.<W> void
process
(Collection<? extends W> work, IntSupplier parallelism, Consumer<W> processor) <W> void
processPair
(W work1, W work2, Consumer<W> processor) Just 2 work items.<W> void
processTriplet
(W work1, W work2, W work3, Consumer<W> processor) Just 3 work items.<W,
R> R reduce
(Collection<W> work, int parallelism, Supplier<? extends TwoStepMapper.Mergeable<W, R>> reducer) Deprecated.<W,
R> R reduce
(Collection<W> work, IntSupplier parallelism, Supplier<? extends TwoStepMapper.Mergeable<W, R>> reducer) Deprecated.v54 Useinvalid reference
#reduceMergeable(Collection<W>,IntSupplier,Supplier<? extends TwoStepMapper.Mergeable<W, R>>)
<W,
R> R reduce
(Collection<W> work, Supplier<? extends TwoStepMapper.Mergeable<W, R>> reducer) Deprecated.v54 Useinvalid reference
#reduceMergeable(Collection<W>,Supplier<? extends TwoStepMapper.Mergeable<W, R>>)
<W,
R, A extends TwoStepMapper.Combineable<W, R, A>>
RreduceCombineable
(Collection<W> work, int parallelism, Supplier<A> reducer) Will create at mostparallelism
tasks to work through thework
items, processing them withreducer
.<W,
R, A extends TwoStepMapper.Combineable<W, R, A>>
RreduceCombineable
(Collection<W> work, IntSupplier parallelism, Supplier<A> reducer) <W,
R, A extends TwoStepMapper.Combineable<W, R, A>>
RreduceCombineable
(Collection<W> work, Supplier<A> reducer) Using parallelismParallelism.CORES
.<W,
R, A extends TwoStepMapper.Mergeable<W, R>>
RreduceMergeable
(Collection<W> work, int parallelism, Supplier<A> reducer) Will create at mostparallelism
tasks to work through thework
items, processing them withreducer
.<W,
R, A extends TwoStepMapper.Mergeable<W, R>>
RreduceMergeable
(Collection<W> work, IntSupplier parallelism, Supplier<A> reducer) <W,
R, A extends TwoStepMapper.Mergeable<W, R>>
RreduceMergeable
(Collection<W> work, Supplier<A> reducer) Using parallelismParallelism.CORES
.void
Will create preciselyparallelism
tasks that each execute theprocessor
.void
run
(IntSupplier parallelism, Runnable processor) <T> AtomicBoolean
take
(BlockingQueue<T> queue, int parallelism, Consumer<T> processor) Will submit preciselyparallelism
tasks that each take from thequeue
feeding the items to theprocessor
.<T> AtomicBoolean
take
(BlockingQueue<T> queue, IntSupplier parallelism, Consumer<T> processor)
-
Field Details
-
INSTANCE
-
myExecutor
-
-
Constructor Details
-
ProcessingService
-
-
Method Details
-
newInstance
-
compute
Using parallelismParallelism.CORES
.- See Also:
-
compute
Compute an output item for each (unique) input item, and return the results as aMap
. If the input contains duplicates, the output will have fewer items. It is therefore vital that the input type implementsObject.hashCode()
andObject.equals(Object)
properly.Will create at most
parallelism
tasks to work through thework
items, processing them withcomputer
and collectiing the results in aMap
.- Type Parameters:
W
- The work item typeR
- The function return type- Parameters:
work
- The collection of work itemsparallelism
- The maximum number of concurrent workers that will process the work itemscomputer
- The processing code- Returns:
- A map of function input to output
-
compute
- See Also:
-
divider
-
getExecutor
- Returns:
- The underlying
ExecutorService
-
map
Using parallelismParallelism.CORES
.- See Also:
-
map
Simply map each (unique) input item to an output item - aCollection
of input results in aCollection
of output. If the input contains duplicates, the output will have fewer items. It is therefore vital that the input type implementsObject.hashCode()
andObject.equals(Object)
properly.- Type Parameters:
W
- The input item typeR
- The output item type- Parameters:
work
- The collection of work itemsparallelism
- The maximum number of concurrent workers that will process the work itemsmapper
- The mapper functiom- Returns:
- The mapped results
-
map
- See Also:
-
process
Using parallelismParallelism.CORES
.- See Also:
-
process
Will create at mostparallelism
tasks to work through thework
items, processing them withprocessor
.- Type Parameters:
W
- The work item type- Parameters:
work
- The collection of work itemsparallelism
- The maximum number of concurrent workers that will process the work itemsprocessor
- The processing code
-
process
public <W> void process(Collection<? extends W> work, IntSupplier parallelism, Consumer<W> processor) - See Also:
-
processPair
Just 2 work items.- See Also:
-
processTriplet
Just 3 work items.- See Also:
-
reduce
@Deprecated public <W,R> R reduce(Collection<W> work, int parallelism, Supplier<? extends TwoStepMapper.Mergeable<W, R>> reducer) Deprecated.v54 Useinvalid reference
#reduceMergeable(Collection<W>,int,Supplier<? extends TwoStepMapper.Mergeable<W, R>>)
-
reduce
@Deprecated public <W,R> R reduce(Collection<W> work, IntSupplier parallelism, Supplier<? extends TwoStepMapper.Mergeable<W, R>> reducer) Deprecated.v54 Useinvalid reference
#reduceMergeable(Collection<W>,IntSupplier,Supplier<? extends TwoStepMapper.Mergeable<W, R>>)
-
reduce
@Deprecated public <W,R> R reduce(Collection<W> work, Supplier<? extends TwoStepMapper.Mergeable<W, R>> reducer) Deprecated.v54 Useinvalid reference
#reduceMergeable(Collection<W>,Supplier<? extends TwoStepMapper.Mergeable<W, R>>)
-
reduceCombineable
public <W,R, R reduceCombineableA extends TwoStepMapper.Combineable<W, R, A>> (Collection<W> work, int parallelism, Supplier<A> reducer) Will create at mostparallelism
tasks to work through thework
items, processing them withreducer
. The state of each task'sreducer
will be combined into a single instance, and the results of that instance will be returned.Each
TwoStepMapper.Combineable
is only worked on by a single thread, and the results are combined into a single instance. The instances are not reused.- Parameters:
work
- The collection of work itemsparallelism
- The maximum number of concurrent workers that will process the work itemsreducer
- ATwoStepMapper.Combineable
implementation that does what you want.- Returns:
- The results...
-
reduceCombineable
public <W,R, R reduceCombineableA extends TwoStepMapper.Combineable<W, R, A>> (Collection<W> work, IntSupplier parallelism, Supplier<A> reducer) - See Also:
-
reduceCombineable
public <W,R, R reduceCombineableA extends TwoStepMapper.Combineable<W, R, A>> (Collection<W> work, Supplier<A> reducer) Using parallelismParallelism.CORES
.- See Also:
-
reduceMergeable
public <W,R, R reduceMergeableA extends TwoStepMapper.Mergeable<W, R>> (Collection<W> work, int parallelism, Supplier<A> reducer) Will create at mostparallelism
tasks to work through thework
items, processing them withreducer
. The results of each task'sreducer
will be merged into a single instance, and the results of that instance will be returned.Each
TwoStepMapper.Mergeable
is only worked on by a single thread, and the results are combined into a single instance. The instances are not reused.- Parameters:
work
- The collection of work itemsparallelism
- The maximum number of concurrent workers that will process the work itemsreducer
- ATwoStepMapper.Mergeable
implementation that does what you want.- Returns:
- The results...
-
reduceMergeable
public <W,R, R reduceMergeableA extends TwoStepMapper.Mergeable<W, R>> (Collection<W> work, IntSupplier parallelism, Supplier<A> reducer) - See Also:
-
reduceMergeable
public <W,R, R reduceMergeableA extends TwoStepMapper.Mergeable<W, R>> (Collection<W> work, Supplier<A> reducer) Using parallelismParallelism.CORES
.- See Also:
-
run
Will create preciselyparallelism
tasks that each execute theprocessor
.- Parameters:
parallelism
- The number of concurrent workers/threads that will runprocessor
- The processing code
-
run
- See Also:
-
take
Will submit preciselyparallelism
tasks that each take from thequeue
feeding the items to theprocessor
. The tasks will continue to run until the returnedAtomicBoolean
is set tofalse
(or the thread is interrupted).If the threads of the underlying
ExecutorService
are daemon threads, the JVM will not wait for them to finish before it exits. The default behaviour, usingINSTANCE
ornewInstance(String)
, is to make use of ojAlgo'sDaemonPoolExecutor
.- Type Parameters:
T
- The work item type- Parameters:
queue
- The queue to take fromparallelism
- How many parallel workers to createprocessor
- What to do with each of the work items- Returns:
- A flag that can be used to signal the tasks to stop
-
take
public <T> AtomicBoolean take(BlockingQueue<T> queue, IntSupplier parallelism, Consumer<T> processor) - See Also:
-
invalid reference