Class MemoryAwareThreadPoolExecutor
- All Implemented Interfaces:
Executor
,ExecutorService
- Direct Known Subclasses:
FairOrderedMemoryAwareThreadPoolExecutor
,OrderedMemoryAwareThreadPoolExecutor
ThreadPoolExecutor
which blocks the task submission when there's
too many tasks in the queue. Both per-Channel
and per-Executor
limitation can be applied.
When a task (i.e. Runnable
) is submitted,
MemoryAwareThreadPoolExecutor
calls ObjectSizeEstimator.estimateSize(Object)
to get the estimated size of the task in bytes to calculate the amount of
memory occupied by the unprocessed tasks.
If the total size of the unprocessed tasks exceeds either per-Channel
or per-Executor
threshold, any further execute(Runnable)
call will block until the tasks in the queue are processed so that the total
size goes under the threshold.
Using an alternative task size estimation strategy
Although the default implementation does its best to guess the size of an object of unknown type, it is always good idea to to use an alternativeObjectSizeEstimator
implementation instead of the
DefaultObjectSizeEstimator
to avoid incorrect task size calculation,
especially when:
- you are using
MemoryAwareThreadPoolExecutor
independently fromExecutionHandler
, - you are submitting a task whose type is not
ChannelEventRunnable
, or - the message type of the
MessageEvent
in theChannelEventRunnable
is notChannelBuffer
.
ObjectSizeEstimator
which understands a user-defined object:
public class MyRunnable implementsRunnable
{ private final byte[] data; public MyRunnable(byte[] data) { this.data = data; } public void run() { // Process 'data' .. } } public class MyObjectSizeEstimator extendsDefaultObjectSizeEstimator
{ @Override public int estimateSize(Object o) { if (o instanceof MyRunnable) { return ((MyRunnable) o).data.length + 8; } return super.estimateSize(o); } }ThreadPoolExecutor
pool = newMemoryAwareThreadPoolExecutor
( 16, 65536, 1048576, 30,TimeUnit
.SECONDS, new MyObjectSizeEstimator(),Executors
.defaultThreadFactory()); pool.execute(new MyRunnable(data));
Event execution order
Please note that this executor does not maintain the order of theChannelEvent
s for the same Channel
. For example,
you can even receive a "channelClosed"
event before a
"messageReceived"
event, as depicted by the following diagram.
For example, the events can be processed as depicted below:
--------------------------------> Timeline --------------------------------> Thread X: --- Channel A (Event 1) --- Channel A (Event 2) ---------------------------> Thread Y: --- Channel A (Event 3) --- Channel B (Event 2) --- Channel B (Event 3) ---> Thread Z: --- Channel B (Event 1) --- Channel B (Event 4) --- Channel A (Event 4) --->To maintain the event order, you must use
OrderedMemoryAwareThreadPoolExecutor
.-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionprivate static class
private static final class
private static final class
private static final class
Nested classes/interfaces inherited from class java.util.concurrent.ThreadPoolExecutor
ThreadPoolExecutor.AbortPolicy, ThreadPoolExecutor.CallerRunsPolicy, ThreadPoolExecutor.DiscardOldestPolicy, ThreadPoolExecutor.DiscardPolicy
-
Field Summary
FieldsModifier and TypeFieldDescriptionprivate final ConcurrentMap
<Channel, AtomicLong> private static final InternalLogger
private static final SharedResourceMisuseDetector
private boolean
private final MemoryAwareThreadPoolExecutor.Limiter
-
Constructor Summary
ConstructorsConstructorDescriptionMemoryAwareThreadPoolExecutor
(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize) Creates a new instance.MemoryAwareThreadPoolExecutor
(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit) Creates a new instance.MemoryAwareThreadPoolExecutor
(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) Creates a new instance.MemoryAwareThreadPoolExecutor
(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit, ObjectSizeEstimator objectSizeEstimator, ThreadFactory threadFactory) Creates a new instance. -
Method Summary
Modifier and TypeMethodDescriptionprotected void
beforeExecute
(Thread t, Runnable r) protected void
decreaseCounter
(Runnable task) protected void
Put the actual execution logic here.protected final void
doUnorderedExecute
(Runnable task) Executes the specified task without maintaining the event order.void
private AtomicLong
getChannelCounter
(Channel channel) long
Returns the maximum total size of the queued events per channel.long
Returns the maximum total size of the queued events for this pool.boolean
Returns if theChannelFuture
's of theChannelEventRunnable
's should be notified about the shutdown of thisMemoryAwareThreadPoolExecutor
.Returns theObjectSizeEstimator
of this pool.protected void
increaseCounter
(Runnable task) boolean
void
setMaxChannelMemorySize
(long maxChannelMemorySize) Sets the maximum total size of the queued events per channel.void
setNotifyChannelFuturesOnShutdown
(boolean notifyOnShutdown) If set tofalse
no queuedChannelEventRunnable
'sChannelFuture
will get notified onceshutdownNow()
is called.void
setObjectSizeEstimator
(ObjectSizeEstimator objectSizeEstimator) Sets theObjectSizeEstimator
of this pool.protected boolean
shouldCount
(Runnable task) Returnstrue
if and only if the specifiedtask
should be counted to limit the global and per-channel memory consumption.This will callshutdownNow(boolean)
with the value ofgetNotifyChannelFuturesOnShutdown()
.shutdownNow
(boolean notify) SeeThreadPoolExecutor.shutdownNow()
for how it handles the shutdown.protected void
Methods inherited from class java.util.concurrent.ThreadPoolExecutor
afterExecute, allowCoreThreadTimeOut, allowsCoreThreadTimeOut, awaitTermination, finalize, getActiveCount, getCompletedTaskCount, getCorePoolSize, getKeepAliveTime, getLargestPoolSize, getMaximumPoolSize, getPoolSize, getQueue, getRejectedExecutionHandler, getTaskCount, getThreadFactory, isShutdown, isTerminated, isTerminating, prestartAllCoreThreads, prestartCoreThread, purge, setCorePoolSize, setKeepAliveTime, setMaximumPoolSize, setRejectedExecutionHandler, setThreadFactory, shutdown, toString
Methods inherited from class java.util.concurrent.AbstractExecutorService
invokeAll, invokeAll, invokeAny, invokeAny, newTaskFor, newTaskFor, submit, submit, submit
-
Field Details
-
logger
-
misuseDetector
-
settings
-
channelCounters
-
totalLimiter
-
notifyOnShutdown
private volatile boolean notifyOnShutdown
-
-
Constructor Details
-
MemoryAwareThreadPoolExecutor
public MemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize) Creates a new instance.- Parameters:
corePoolSize
- the maximum number of active threadsmaxChannelMemorySize
- the maximum total size of the queued events per channel. Specify0
to disable.maxTotalMemorySize
- the maximum total size of the queued events for this pool Specify0
to disable.
-
MemoryAwareThreadPoolExecutor
public MemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit) Creates a new instance.- Parameters:
corePoolSize
- the maximum number of active threadsmaxChannelMemorySize
- the maximum total size of the queued events per channel. Specify0
to disable.maxTotalMemorySize
- the maximum total size of the queued events for this pool Specify0
to disable.keepAliveTime
- the amount of time for an inactive thread to shut itself downunit
- theTimeUnit
ofkeepAliveTime
-
MemoryAwareThreadPoolExecutor
public MemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) Creates a new instance.- Parameters:
corePoolSize
- the maximum number of active threadsmaxChannelMemorySize
- the maximum total size of the queued events per channel. Specify0
to disable.maxTotalMemorySize
- the maximum total size of the queued events for this pool Specify0
to disable.keepAliveTime
- the amount of time for an inactive thread to shut itself downunit
- theTimeUnit
ofkeepAliveTime
threadFactory
- theThreadFactory
of this pool
-
MemoryAwareThreadPoolExecutor
public MemoryAwareThreadPoolExecutor(int corePoolSize, long maxChannelMemorySize, long maxTotalMemorySize, long keepAliveTime, TimeUnit unit, ObjectSizeEstimator objectSizeEstimator, ThreadFactory threadFactory) Creates a new instance.- Parameters:
corePoolSize
- the maximum number of active threadsmaxChannelMemorySize
- the maximum total size of the queued events per channel. Specify0
to disable.maxTotalMemorySize
- the maximum total size of the queued events for this pool Specify0
to disable.keepAliveTime
- the amount of time for an inactive thread to shut itself downunit
- theTimeUnit
ofkeepAliveTime
objectSizeEstimator
- theObjectSizeEstimator
of this poolthreadFactory
- theThreadFactory
of this pool
-
-
Method Details
-
terminated
protected void terminated()- Overrides:
terminated
in classThreadPoolExecutor
-
shutdownNow
This will callshutdownNow(boolean)
with the value ofgetNotifyChannelFuturesOnShutdown()
.- Specified by:
shutdownNow
in interfaceExecutorService
- Overrides:
shutdownNow
in classThreadPoolExecutor
-
shutdownNow
SeeThreadPoolExecutor.shutdownNow()
for how it handles the shutdown. Iftrue
is given to this method it also notifies allChannelFuture
's of the not executedChannelEventRunnable
's.Be aware that if you call this with
false
you will need to handle the notification of theChannelFuture
's by your self. So only use this if you really have a use-case for it. -
getObjectSizeEstimator
Returns theObjectSizeEstimator
of this pool. -
setObjectSizeEstimator
Sets theObjectSizeEstimator
of this pool. -
getMaxChannelMemorySize
public long getMaxChannelMemorySize()Returns the maximum total size of the queued events per channel. -
setMaxChannelMemorySize
public void setMaxChannelMemorySize(long maxChannelMemorySize) Sets the maximum total size of the queued events per channel. Specify0
to disable. -
getMaxTotalMemorySize
public long getMaxTotalMemorySize()Returns the maximum total size of the queued events for this pool. -
setNotifyChannelFuturesOnShutdown
public void setNotifyChannelFuturesOnShutdown(boolean notifyOnShutdown) If set tofalse
no queuedChannelEventRunnable
'sChannelFuture
will get notified onceshutdownNow()
is called. If set totrue
every queuedChannelEventRunnable
will get marked as failed viaChannelFuture.setFailure(Throwable)
.Please only set this to
false
if you want to handle the notification by yourself and know what you are doing. Default istrue
. -
getNotifyChannelFuturesOnShutdown
public boolean getNotifyChannelFuturesOnShutdown()Returns if theChannelFuture
's of theChannelEventRunnable
's should be notified about the shutdown of thisMemoryAwareThreadPoolExecutor
. -
execute
- Specified by:
execute
in interfaceExecutor
- Overrides:
execute
in classThreadPoolExecutor
-
doExecute
Put the actual execution logic here. The default implementation simply callsdoUnorderedExecute(Runnable)
. -
doUnorderedExecute
Executes the specified task without maintaining the event order. -
remove
- Overrides:
remove
in classThreadPoolExecutor
-
beforeExecute
- Overrides:
beforeExecute
in classThreadPoolExecutor
-
increaseCounter
-
decreaseCounter
-
getChannelCounter
-
shouldCount
Returnstrue
if and only if the specifiedtask
should be counted to limit the global and per-channel memory consumption. To override this method, you must callsuper.shouldCount()
to make sure important tasks are not counted.
-