38#include "blocxx/BLOCXX_config.h"
65#define BLOCXX_POOL_LOG_DEBUG(logger, arg) do { BLOCXX_LOG_DEBUG(logger, m_poolName + ": " + arg); } while (0)
66#define BLOCXX_POOL_LOG_DEBUG2(logger, arg) do { BLOCXX_LOG_DEBUG2(logger, m_poolName + ": " + arg); } while (0)
67#define BLOCXX_POOL_LOG_DEBUG3(logger, arg) do { BLOCXX_LOG_DEBUG3(logger, m_poolName + ": " + arg); } while (0)
68#define BLOCXX_POOL_LOG_ERROR(logger, arg) do { BLOCXX_LOG_ERROR(logger, m_poolName + ": " + arg); } while (0)
69#define BLOCXX_POOL_LOG_FATAL_ERROR(logger, arg) do { BLOCXX_LOG_FATAL_ERROR(logger, m_poolName + ": " + arg); } while (0)
87class FixedSizePoolImpl;
89class FixedSizePoolWorkerThread :
public Thread
92 FixedSizePoolWorkerThread(FixedSizePoolImpl* thePool)
99 virtual void doShutdown()
101 MutexLock lock(m_guard);
102 if (m_currentRunnable)
104 m_currentRunnable->doShutdown();
107 virtual void doCooperativeCancel()
109 MutexLock lock(m_guard);
110 if (m_currentRunnable)
112 m_currentRunnable->doCooperativeCancel();
115 virtual void doDefinitiveCancel()
117 MutexLock lock(m_guard);
118 if (m_currentRunnable)
120 m_currentRunnable->doDefinitiveCancel();
124 FixedSizePoolImpl* m_thePool;
130 FixedSizePoolWorkerThread(
const FixedSizePoolWorkerThread&);
131 FixedSizePoolWorkerThread& operator=(
const FixedSizePoolWorkerThread&);
137 CommonPoolImpl(UInt32 maxQueueSize,
const Logger& logger,
const String& poolName)
138 : m_maxQueueSize(maxQueueSize)
139 , m_queueClosed(false)
142 , m_poolName(poolName)
146 virtual ~CommonPoolImpl()
151 virtual bool queueIsFull()
const
153 return ((m_maxQueueSize > 0) && (m_queue.size() == m_maxQueueSize));
157 bool queueClosed()
const
159 return m_shutdown || m_queueClosed;
164 NonRecursiveMutexLock l(m_queueLock);
171 m_queueClosed =
true;
174 if (finishWorkInQueue)
176 TimeoutTimer timer(timeout);
177 while (m_queue.size() != 0)
179 if (timer.infinite())
182 m_queueEmpty.wait(l);
187 if (!m_queueEmpty.timedWait(l, timer.asAbsoluteTimeout()))
199 virtual void waitForEmptyQueue()
201 NonRecursiveMutexLock l(m_queueLock);
202 while (m_queue.size() != 0)
205 m_queueEmpty.wait(l);
212 TimeoutTimer shutdownTimer(shutdownTimeout);
213 TimeoutTimer dTimer(definitiveCancelTimeout);
214 if (!finishOffWorkInQueue(finishWorkInQueue, shutdownTimer.asAbsoluteTimeout()))
220 m_queueNotEmpty.notifyAll();
221 m_queueNotFull.notifyAll();
223 if (!shutdownTimer.infinite())
226 for (UInt32 i = 0; i < m_threads.size(); ++i)
229 m_threads[i]->shutdown();
233 Timeout absoluteShutdownTimeout(shutdownTimer.asAbsoluteTimeout());
234 for (UInt32 i = 0; i < m_threads.size(); ++i)
237 m_threads[i]->timedWait(absoluteShutdownTimeout);
241 for (UInt32 i = 0; i < m_threads.size(); ++i)
244 m_threads[i]->cooperativeCancel();
247 if (!dTimer.infinite())
250 Timeout absoluteDefinitiveTimeout(dTimer.asAbsoluteTimeout());
251 for (UInt32 i = 0; i < m_threads.size(); ++i)
256 if (!m_threads[i]->definitiveCancel(absoluteDefinitiveTimeout))
261 catch (CancellationDeniedException& e)
263 BLOCXX_POOL_LOG_ERROR(m_logger, Format(
"Caught CanacellationDeniedException: %1 for thread %2. Pool shutdown may hang.", e, i));
271 for (UInt32 i = 0; i < m_threads.size(); ++i)
274 m_threads[i]->join();
281 NonRecursiveMutexLock l(m_queueLock);
282 while ((m_queue.size() == 0) && (!m_shutdown))
287 m_queueNotEmpty.wait(l);
311 incrementWorkerCount();
315 m_queueNotFull.notifyAll();
319 if (m_queue.size() == 0)
321 m_queueEmpty.notifyAll();
328 virtual void incrementWorkerCount()
332 virtual void decrementWorkerCount()
337 UInt32 m_maxQueueSize;
339 Array<ThreadRef> m_threads;
340 std::deque<RunnableRef> m_queue;
344 NonRecursiveMutex m_queueLock;
345 Condition m_queueNotFull;
346 Condition m_queueEmpty;
347 Condition m_queueNotEmpty;
351class FixedSizePoolImpl :
public CommonPoolImpl
354 FixedSizePoolImpl(UInt32 numThreads, UInt32 maxQueueSize,
const Logger& logger,
const String& poolName)
355 : CommonPoolImpl(maxQueueSize, logger, poolName)
358 m_threads.reserve(numThreads);
359 for (UInt32 i = 0; i < numThreads; ++i)
361 m_threads.push_back(
ThreadRef(
new FixedSizePoolWorkerThread(
this)));
363 for (UInt32 i = 0; i < numThreads; ++i)
367 m_threads[i]->start();
369 catch (ThreadException& e)
381 virtual bool addWork(
const RunnableRef& work,
const Timeout& timeout)
390 NonRecursiveMutexLock l(m_queueLock);
391 TimeoutTimer timer(timeout);
392 while ( queueIsFull() && !queueClosed() )
395 if (!m_queueNotFull.timedWait(l, timer.asAbsoluteTimeout()))
398 BLOCXX_POOL_LOG_DEBUG3(m_logger,
"Queue is full and timeout expired. Not adding work and returning false");
406 BLOCXX_POOL_LOG_DEBUG3(m_logger,
"Queue was closed out from underneath us. Not adding work and returning false");
410 m_queue.push_back(work);
413 if (m_queue.size() == 1)
416 m_queueNotEmpty.notifyAll();
426 shutdownThreads(finishWorkInQueue, shutdownTimeout, definitiveCancelTimeout);
428 virtual ~FixedSizePoolImpl()
446 friend class FixedSizePoolWorkerThread;
462 std::clog <<
"!!! Exception: " << ex.type() <<
" caught in ThreadPool worker: " << ex << std::endl;
464 Logger logger(COMPONENT_NAME);
467 catch(std::exception& ex)
470 std::clog <<
"!!! std::exception what = \"" << ex.what() <<
"\" caught in ThreadPool worker" << std::endl;
472 Logger logger(COMPONENT_NAME);
478 std::clog <<
"!!! Unknown Exception caught in ThreadPool worker" << std::endl;
480 Logger logger(COMPONENT_NAME);
481 BLOCXX_LOG_ERROR(logger,
"!!! Unknown Exception caught in ThreadPool worker.");
484Int32 FixedSizePoolWorkerThread::run()
489 RunnableRef work = m_thePool->getWorkFromQueue(
true);
496 MutexLock lock(m_guard);
497 m_currentRunnable = work;
501 MutexLock lock(m_guard);
502 m_currentRunnable = 0;
507class DynamicSizePoolImpl;
509class DynamicSizePoolWorkerThread :
public Thread
512 DynamicSizePoolWorkerThread(DynamicSizePoolImpl* thePool)
519 virtual void doShutdown()
521 MutexLock lock(m_guard);
522 if (m_currentRunnable)
527 virtual void doCooperativeCancel()
529 MutexLock lock(m_guard);
530 if (m_currentRunnable)
535 virtual void doDefinitiveCancel()
537 MutexLock lock(m_guard);
538 if (m_currentRunnable)
544 DynamicSizePoolImpl* m_thePool;
550 DynamicSizePoolWorkerThread(
const DynamicSizePoolWorkerThread&);
551 DynamicSizePoolWorkerThread& operator=(
const DynamicSizePoolWorkerThread&);
554class DynamicSizePoolImpl :
public CommonPoolImpl
557 DynamicSizePoolImpl(UInt32 maxThreads, UInt32 maxQueueSize,
const Logger& logger,
const String& poolName)
558 : CommonPoolImpl(maxQueueSize, logger, poolName)
559 , m_maxThreads(maxThreads)
563 virtual bool addWork(
const RunnableRef& work,
const Timeout& timeout)
571 NonRecursiveMutexLock l(m_queueLock);
576 BLOCXX_POOL_LOG_DEBUG3(m_logger,
"Queue was closed out from underneath us. Not adding work and returning false");
584 while (i < m_threads.size())
586 if (!m_threads[i]->isRunning())
589 m_threads[i]->join();
598 TimeoutTimer timer(timeout);
599 while ( queueIsFull() && !queueClosed() )
602 if (!m_queueNotFull.timedWait(l, timer.asAbsoluteTimeout()))
605 BLOCXX_POOL_LOG_DEBUG3(m_logger,
"Queue is full and timeout expired. Not adding work and returning false");
613 BLOCXX_POOL_LOG_DEBUG3(m_logger,
"Queue was closed out from underneath us. Not adding work and returning false");
617 m_queue.push_back(work);
628 m_queueNotEmpty.notifyOne();
633 if (!m_queue.empty() && m_threads.size() < m_maxThreads)
635 ThreadRef theThread(
new DynamicSizePoolWorkerThread(
this));
636 m_threads.push_back(theThread);
642 catch (ThreadException& e)
645 m_threads.pop_back();
654 virtual void shutdown(ThreadPool::EShutdownQueueFlag finishWorkInQueue,
const Timeout& shutdownTimeout,
const Timeout& definitiveCancelTimeout)
656 shutdownThreads(finishWorkInQueue, shutdownTimeout, definitiveCancelTimeout);
658 virtual ~DynamicSizePoolImpl()
668 this->DynamicSizePoolImpl::shutdown(ThreadPool::E_DISCARD_WORK_IN_QUEUE, Timeout::relative(0.5), Timeout::relative(0.5));
677 UInt32 getMaxThreads()
const
685 friend class DynamicSizePoolWorkerThread;
687Int32 DynamicSizePoolWorkerThread::run()
692 RunnableRef work = m_thePool->getWorkFromQueue(
false);
699 MutexLock lock(m_guard);
700 m_currentRunnable = work;
703 m_thePool->decrementWorkerCount();
705 MutexLock lock(m_guard);
706 m_currentRunnable = 0;
713class DynamicSizeNoQueuePoolImpl :
public DynamicSizePoolImpl
716 DynamicSizeNoQueuePoolImpl(UInt32 maxThreads,
const Logger& logger,
const String& poolName)
717 : DynamicSizePoolImpl(maxThreads, maxThreads, logger, poolName)
718 , m_workingThreads(0)
722 virtual ~DynamicSizeNoQueuePoolImpl()
726 virtual void incrementWorkerCount()
731 virtual void decrementWorkerCount()
733 NonRecursiveMutexLock lock(m_queueLock);
736 m_queueNotFull.notifyAll();
740 virtual bool queueIsFull()
const
744 size_t freeThreads = getMaxThreads() -
AtomicGet(m_workingThreads);
745 return (freeThreads <= m_queue.size());
750 size_t m_workingThreads;
762 m_impl =
new FixedSizePoolImpl(numThreads, maxQueueSize, logger, poolName);
765 m_impl =
new DynamicSizePoolImpl(numThreads, maxQueueSize, logger, poolName);
768 m_impl =
new DynamicSizeNoQueuePoolImpl(numThreads, logger, poolName);
778 m_impl =
new FixedSizePoolImpl(numThreads, maxQueueSize, logger, poolName);
781 m_impl =
new DynamicSizePoolImpl(numThreads, maxQueueSize, logger, poolName);
784 m_impl =
new DynamicSizeNoQueuePoolImpl(numThreads, logger, poolName);
801 return m_impl->addWork(work, timeout);
811 m_impl->shutdown(finishWorkInQueue, timeout, timeout);
816 m_impl->shutdown(finishWorkInQueue, shutdownTimeout, definitiveCancelTimeout);
821 m_impl->waitForEmptyQueue();
#define BLOCXX_DEFINE_EXCEPTION(NAME)
Define a new exception class named <NAME>Exception that derives from Exception.
#define BLOCXX_GLOBAL_STRING_INIT(str)
#define BLOCXX_LOG_ERROR(logger, message)
Log message to logger with the Error level.
#define BLOCXX_POOL_LOG_DEBUG(logger, arg)
#define BLOCXX_POOL_LOG_ERROR(logger, arg)
#define BLOCXX_POOL_LOG_DEBUG2(logger, arg)
#define BLOCXX_POOL_LOG_FATAL_ERROR(logger, arg)
#define BLOCXX_POOL_LOG_DEBUG3(logger, arg)
This class is the base of all exceptions thrown by BloCxx code.
This logger just discards all log messages.
virtual void doShutdown()
This function is available for subclasses of Thread to override if they wish to be notified when shut...
virtual void doCooperativeCancel()
This function is available for subclasses to override if they wish to be notified when a cooperative ...
virtual void doDefinitiveCancel()
See the documentation for doCooperativeCancel().
This String class is an abstract data type that represents as NULL terminated string of characters.
The ThreadPool class is used to coordinate a group of threads.
bool addWork(const RunnableRef &work)
Add an RunnableRef for the pool to execute.
ThreadPool(PoolType poolType, UInt32 numThreads, UInt32 maxQueueSize, const Logger &logger, const String &poolName="")
Constructor.
bool tryAddWork(const RunnableRef &work)
Add an RunnableRef for the pool to execute.
ThreadPool & operator=(const ThreadPool &x)
void shutdown(EShutdownQueueFlag finishWorkInQueue=E_FINISH_WORK_IN_QUEUE, const Timeout &timeout=Timeout::infinite)
Instruct all threads to exit and stop working.
@ E_DISCARD_WORK_IN_QUEUE
void waitForEmptyQueue()
Wait for the queue to empty out.
IntrusiveReference< ThreadPoolImpl > m_impl
virtual void waitForEmptyQueue()=0
virtual ~ThreadPoolImpl()
virtual void shutdown(ThreadPool::EShutdownQueueFlag finishWorkInQueue, const Timeout &shutdownTimeout, const Timeout &definitiveCancelTimeout)=0
virtual bool addWork(const RunnableRef &work, const Timeout &timeout)=0
A timeout can be absolute, which means that it will happen at the specified DateTime.
static Timeout relative(float seconds)
LazyGlobal< String, char const *const > GlobalString
IntrusiveReference< Runnable > RunnableRef
IntrusiveReference< Thread > ThreadRef
class BLOCXX_COMMON_API Logger
int AtomicGet(Atomic_t const &v)
In the event a thread has been cancelled, a ThreadCancelledException will be thrown.