Class MemoryAwareThreadPoolExecutor

  • All Implemented Interfaces:
    java.util.concurrent.Executor, java.util.concurrent.ExecutorService
    Direct Known Subclasses:
    FairOrderedMemoryAwareThreadPoolExecutor, OrderedMemoryAwareThreadPoolExecutor

    public class MemoryAwareThreadPoolExecutor
    extends java.util.concurrent.ThreadPoolExecutor
    A ThreadPoolExecutor which blocks the task submission when there's too many tasks in the queue. Both per-Channel and per-Executor limitation can be applied.

    When a task (i.e. Runnable) is submitted, MemoryAwareThreadPoolExecutor calls ObjectSizeEstimator.estimateSize(Object) to get the estimated size of the task in bytes to calculate the amount of memory occupied by the unprocessed tasks.

    If the total size of the unprocessed tasks exceeds either per-Channel or per-Executor threshold, any further execute(Runnable) call will block until the tasks in the queue are processed so that the total size goes under the threshold.

    Using an alternative task size estimation strategy

    Although the default implementation does its best to guess the size of an object of unknown type, it is always good idea to to use an alternative ObjectSizeEstimator implementation instead of the DefaultObjectSizeEstimator to avoid incorrect task size calculation, especially when: Here is an example that demonstrates how to implement an ObjectSizeEstimator which understands a user-defined object:
     public class MyRunnable implements Runnable {
    
         private final byte[] data;
    
         public MyRunnable(byte[] data) {
             this.data = data;
         }
    
         public void run() {
             // Process 'data' ..
         }
     }
    
     public class MyObjectSizeEstimator extends DefaultObjectSizeEstimator {
    
         @Override
         public int estimateSize(Object o) {
             if (o instanceof MyRunnable) {
                 return ((MyRunnable) o).data.length + 8;
             }
             return super.estimateSize(o);
         }
     }
    
     ThreadPoolExecutor pool = new MemoryAwareThreadPoolExecutor(
             16, 65536, 1048576, 30, TimeUnit.SECONDS,
             new MyObjectSizeEstimator(),
             Executors.defaultThreadFactory());
    
     pool.execute(new MyRunnable(data));
     

    Event execution order

    Please note that this executor does not maintain the order of the ChannelEvents for the same Channel. For example, you can even receive a "channelClosed" event before a "messageReceived" event, as depicted by the following diagram. For example, the events can be processed as depicted below:
               --------------------------------> Timeline -------------------------------->
    
     Thread X: --- Channel A (Event 1) --- Channel A (Event 2) --------------------------->
    
     Thread Y: --- Channel A (Event 3) --- Channel B (Event 2) --- Channel B (Event 3) --->
    
     Thread Z: --- Channel B (Event 1) --- Channel B (Event 4) --- Channel A (Event 4) --->
     
    To maintain the event order, you must use OrderedMemoryAwareThreadPoolExecutor.
    • Constructor Detail

      • MemoryAwareThreadPoolExecutor

        public MemoryAwareThreadPoolExecutor​(int corePoolSize,
                                             long maxChannelMemorySize,
                                             long maxTotalMemorySize)
        Creates a new instance.
        Parameters:
        corePoolSize - the maximum number of active threads
        maxChannelMemorySize - the maximum total size of the queued events per channel. Specify 0 to disable.
        maxTotalMemorySize - the maximum total size of the queued events for this pool Specify 0 to disable.
      • MemoryAwareThreadPoolExecutor

        public MemoryAwareThreadPoolExecutor​(int corePoolSize,
                                             long maxChannelMemorySize,
                                             long maxTotalMemorySize,
                                             long keepAliveTime,
                                             java.util.concurrent.TimeUnit unit)
        Creates a new instance.
        Parameters:
        corePoolSize - the maximum number of active threads
        maxChannelMemorySize - the maximum total size of the queued events per channel. Specify 0 to disable.
        maxTotalMemorySize - the maximum total size of the queued events for this pool Specify 0 to disable.
        keepAliveTime - the amount of time for an inactive thread to shut itself down
        unit - the TimeUnit of keepAliveTime
      • MemoryAwareThreadPoolExecutor

        public MemoryAwareThreadPoolExecutor​(int corePoolSize,
                                             long maxChannelMemorySize,
                                             long maxTotalMemorySize,
                                             long keepAliveTime,
                                             java.util.concurrent.TimeUnit unit,
                                             java.util.concurrent.ThreadFactory threadFactory)
        Creates a new instance.
        Parameters:
        corePoolSize - the maximum number of active threads
        maxChannelMemorySize - the maximum total size of the queued events per channel. Specify 0 to disable.
        maxTotalMemorySize - the maximum total size of the queued events for this pool Specify 0 to disable.
        keepAliveTime - the amount of time for an inactive thread to shut itself down
        unit - the TimeUnit of keepAliveTime
        threadFactory - the ThreadFactory of this pool
      • MemoryAwareThreadPoolExecutor

        public MemoryAwareThreadPoolExecutor​(int corePoolSize,
                                             long maxChannelMemorySize,
                                             long maxTotalMemorySize,
                                             long keepAliveTime,
                                             java.util.concurrent.TimeUnit unit,
                                             ObjectSizeEstimator objectSizeEstimator,
                                             java.util.concurrent.ThreadFactory threadFactory)
        Creates a new instance.
        Parameters:
        corePoolSize - the maximum number of active threads
        maxChannelMemorySize - the maximum total size of the queued events per channel. Specify 0 to disable.
        maxTotalMemorySize - the maximum total size of the queued events for this pool Specify 0 to disable.
        keepAliveTime - the amount of time for an inactive thread to shut itself down
        unit - the TimeUnit of keepAliveTime
        threadFactory - the ThreadFactory of this pool
        objectSizeEstimator - the ObjectSizeEstimator of this pool
    • Method Detail

      • terminated

        protected void terminated()
        Overrides:
        terminated in class java.util.concurrent.ThreadPoolExecutor
      • shutdownNow

        public java.util.List<java.lang.Runnable> shutdownNow()
        Specified by:
        shutdownNow in interface java.util.concurrent.ExecutorService
        Overrides:
        shutdownNow in class java.util.concurrent.ThreadPoolExecutor
      • shutdownNow

        public java.util.List<java.lang.Runnable> shutdownNow​(boolean notify)
        See ThreadPoolExecutor.shutdownNow() for how it handles the shutdown. If true is given to this method it also notifies all ChannelFuture's of the not executed ChannelEventRunnable's.

        Be aware that if you call this with false you will need to handle the notification of the ChannelFuture's by your self. So only use this if you really have a use-case for it.

      • getMaxChannelMemorySize

        public long getMaxChannelMemorySize()
        Returns the maximum total size of the queued events per channel.
      • setMaxChannelMemorySize

        public void setMaxChannelMemorySize​(long maxChannelMemorySize)
        Sets the maximum total size of the queued events per channel. Specify 0 to disable.
      • getMaxTotalMemorySize

        public long getMaxTotalMemorySize()
        Returns the maximum total size of the queued events for this pool.
      • execute

        public void execute​(java.lang.Runnable command)
        Specified by:
        execute in interface java.util.concurrent.Executor
        Overrides:
        execute in class java.util.concurrent.ThreadPoolExecutor
      • doExecute

        protected void doExecute​(java.lang.Runnable task)
        Put the actual execution logic here. The default implementation simply calls doUnorderedExecute(Runnable).
      • doUnorderedExecute

        protected final void doUnorderedExecute​(java.lang.Runnable task)
        Executes the specified task without maintaining the event order.
      • remove

        public boolean remove​(java.lang.Runnable task)
        Overrides:
        remove in class java.util.concurrent.ThreadPoolExecutor
      • beforeExecute

        protected void beforeExecute​(java.lang.Thread t,
                                     java.lang.Runnable r)
        Overrides:
        beforeExecute in class java.util.concurrent.ThreadPoolExecutor
      • increaseCounter

        protected void increaseCounter​(java.lang.Runnable task)
      • decreaseCounter

        protected void decreaseCounter​(java.lang.Runnable task)
      • getChannelCounter

        private java.util.concurrent.atomic.AtomicLong getChannelCounter​(Channel channel)
      • shouldCount

        protected boolean shouldCount​(java.lang.Runnable task)
        Returns true if and only if the specified task should be counted to limit the global and per-channel memory consumption. To override this method, you must call super.shouldCount() to make sure important tasks are not counted.