38#include "blocxx/BLOCXX_config.h"
65#define BLOCXX_POOL_LOG_DEBUG(logger, arg) do { BLOCXX_LOG_DEBUG(logger, m_poolName + ": " + arg); } while (0)
66#define BLOCXX_POOL_LOG_DEBUG2(logger, arg) do { BLOCXX_LOG_DEBUG2(logger, m_poolName + ": " + arg); } while (0)
67#define BLOCXX_POOL_LOG_DEBUG3(logger, arg) do { BLOCXX_LOG_DEBUG3(logger, m_poolName + ": " + arg); } while (0)
68#define BLOCXX_POOL_LOG_ERROR(logger, arg) do { BLOCXX_LOG_ERROR(logger, m_poolName + ": " + arg); } while (0)
69#define BLOCXX_POOL_LOG_FATAL_ERROR(logger, arg) do { BLOCXX_LOG_FATAL_ERROR(logger, m_poolName + ": " + arg); } while (0)
87class FixedSizePoolImpl;
89class FixedSizePoolWorkerThread :
public Thread
92 FixedSizePoolWorkerThread(FixedSizePoolImpl* thePool)
99 virtual void doShutdown()
101 MutexLock lock(m_guard);
102 if (m_currentRunnable)
107 virtual void doCooperativeCancel()
109 MutexLock lock(m_guard);
110 if (m_currentRunnable)
115 virtual void doDefinitiveCancel()
117 MutexLock lock(m_guard);
118 if (m_currentRunnable)
130 FixedSizePoolWorkerThread(
const FixedSizePoolWorkerThread&);
131 FixedSizePoolWorkerThread& operator=(
const FixedSizePoolWorkerThread&);
134class CommonPoolImpl :
public ThreadPoolImpl
137 CommonPoolImpl(UInt32 maxQueueSize,
const Logger& logger,
const String& poolName)
146 virtual ~CommonPoolImpl()
151 virtual bool queueIsFull()
const
153 return ((m_maxQueueSize > 0) && (
m_queue.size() == m_maxQueueSize));
157 bool queueClosed()
const
164 NonRecursiveMutexLock l(m_queueLock);
174 if (finishWorkInQueue)
176 TimeoutTimer timer(timeout);
179 if (timer.infinite())
187 if (!
m_queueEmpty.timedWait(l, timer.asAbsoluteTimeout()))
199 virtual void waitForEmptyQueue()
201 NonRecursiveMutexLock l(m_queueLock);
212 TimeoutTimer shutdownTimer(shutdownTimeout);
213 TimeoutTimer dTimer(definitiveCancelTimeout);
214 if (!finishOffWorkInQueue(finishWorkInQueue, shutdownTimer.asAbsoluteTimeout()))
223 if (!shutdownTimer.infinite())
233 Timeout absoluteShutdownTimeout(shutdownTimer.asAbsoluteTimeout());
237 m_threads[
i]->timedWait(absoluteShutdownTimeout);
247 if (!dTimer.infinite())
250 Timeout absoluteDefinitiveTimeout(dTimer.asAbsoluteTimeout());
256 if (!m_threads[
i]->definitiveCancel(absoluteDefinitiveTimeout))
261 catch (CancellationDeniedException& e)
263 BLOCXX_POOL_LOG_ERROR(m_logger, Format(
"Caught CanacellationDeniedException: %1 for thread %2. Pool shutdown may hang.", e,
i));
281 NonRecursiveMutexLock l(m_queueLock);
282 while ((
m_queue.size() == 0) && (!m_shutdown))
311 incrementWorkerCount();
328 virtual void incrementWorkerCount()
332 virtual void decrementWorkerCount()
351class FixedSizePoolImpl :
public CommonPoolImpl
354 FixedSizePoolImpl(UInt32 numThreads, UInt32 maxQueueSize,
const Logger& logger,
const String& poolName)
355 : CommonPoolImpl(maxQueueSize, logger, poolName)
359 for (UInt32
i = 0;
i < numThreads; ++
i)
363 for (UInt32
i = 0;
i < numThreads; ++
i)
369 catch (ThreadException& e)
381 virtual bool addWork(
const RunnableRef& work,
const Timeout& timeout)
391 TimeoutTimer timer(timeout);
392 while ( queueIsFull() && !queueClosed() )
426 shutdownThreads(finishWorkInQueue, shutdownTimeout, definitiveCancelTimeout);
428 virtual ~FixedSizePoolImpl()
446 friend class FixedSizePoolWorkerThread;
455 catch (ThreadCancelledException&)
459 catch (Exception& ex)
462 std::clog <<
"!!! Exception: " << ex.type() <<
" caught in ThreadPool worker: " << ex << std::endl;
464 Logger logger(COMPONENT_NAME);
465 BLOCXX_LOG_ERROR(logger, Format(
"!!! Exception caught in ThreadPool worker: %1", ex));
467 catch(std::exception& ex)
470 std::clog <<
"!!! std::exception what = \"" << ex.what() <<
"\" caught in ThreadPool worker" << std::endl;
472 Logger logger(COMPONENT_NAME);
473 BLOCXX_LOG_ERROR(logger, Format(
"!!! std::exception caught in ThreadPool worker: %1", ex.what()));
478 std::clog <<
"!!! Unknown Exception caught in ThreadPool worker" << std::endl;
480 Logger logger(COMPONENT_NAME);
481 BLOCXX_LOG_ERROR(logger,
"!!! Unknown Exception caught in ThreadPool worker.");
484Int32 FixedSizePoolWorkerThread::run()
507class DynamicSizePoolImpl;
509class DynamicSizePoolWorkerThread :
public Thread
512 DynamicSizePoolWorkerThread(DynamicSizePoolImpl* thePool)
519 virtual void doShutdown()
527 virtual void doCooperativeCancel()
535 virtual void doDefinitiveCancel()
550 DynamicSizePoolWorkerThread(
const DynamicSizePoolWorkerThread&);
551 DynamicSizePoolWorkerThread& operator=(
const DynamicSizePoolWorkerThread&);
554class DynamicSizePoolImpl :
public CommonPoolImpl
557 DynamicSizePoolImpl(UInt32 maxThreads, UInt32 maxQueueSize,
const Logger& logger,
const String& poolName)
558 : CommonPoolImpl(maxQueueSize, logger, poolName)
563 virtual bool addWork(
const RunnableRef& work,
const Timeout& timeout)
598 TimeoutTimer timer(timeout);
599 while ( queueIsFull() && !queueClosed() )
635 ThreadRef theThread(
new DynamicSizePoolWorkerThread(
this));
642 catch (ThreadException& e)
654 virtual void shutdown(ThreadPool::EShutdownQueueFlag finishWorkInQueue,
const Timeout& shutdownTimeout,
const Timeout& definitiveCancelTimeout)
656 shutdownThreads(finishWorkInQueue, shutdownTimeout, definitiveCancelTimeout);
658 virtual ~DynamicSizePoolImpl()
668 this->DynamicSizePoolImpl::shutdown(ThreadPool::E_DISCARD_WORK_IN_QUEUE, Timeout::relative(0.5), Timeout::relative(0.5));
677 UInt32 getMaxThreads()
const
685 friend class DynamicSizePoolWorkerThread;
687Int32 DynamicSizePoolWorkerThread::run()
713class DynamicSizeNoQueuePoolImpl :
public DynamicSizePoolImpl
716 DynamicSizeNoQueuePoolImpl(UInt32 maxThreads,
const Logger& logger,
const String& poolName)
717 : DynamicSizePoolImpl(maxThreads, maxThreads, logger, poolName)
722 virtual ~DynamicSizeNoQueuePoolImpl()
726 virtual void incrementWorkerCount()
731 virtual void decrementWorkerCount()
740 virtual bool queueIsFull()
const
745 return (freeThreads <=
m_queue.size());
762 m_impl =
new FixedSizePoolImpl(numThreads, maxQueueSize, logger, poolName);
765 m_impl =
new DynamicSizePoolImpl(numThreads, maxQueueSize, logger, poolName);
767 case DYNAMIC_SIZE_NO_QUEUE:
768 m_impl =
new DynamicSizeNoQueuePoolImpl(numThreads, logger, poolName);
778 m_impl =
new FixedSizePoolImpl(numThreads, maxQueueSize, logger, poolName);
781 m_impl =
new DynamicSizePoolImpl(numThreads, maxQueueSize, logger, poolName);
783 case DYNAMIC_SIZE_NO_QUEUE:
784 m_impl =
new DynamicSizeNoQueuePoolImpl(numThreads, logger, poolName);
801 return m_impl->addWork(work, timeout);
811 m_impl->shutdown(finishWorkInQueue, timeout, timeout);
816 m_impl->shutdown(finishWorkInQueue, shutdownTimeout, definitiveCancelTimeout);
821 m_impl->waitForEmptyQueue();
#define BLOCXX_DEFINE_EXCEPTION(NAME)
Define a new exception class named <NAME>Exception that derives from Exception.
#define BLOCXX_GLOBAL_STRING_INIT(str)
#define BLOCXX_LOG_ERROR(logger, message)
Log message to logger with the Error level.
std::deque< RunnableRef > m_queue
FixedSizePoolImpl * m_thePool
#define BLOCXX_POOL_LOG_DEBUG(logger, arg)
NonRecursiveMutex m_queueLock
RunnableRef m_currentRunnable
#define BLOCXX_POOL_LOG_ERROR(logger, arg)
Array< ThreadRef > m_threads
#define BLOCXX_POOL_LOG_DEBUG2(logger, arg)
Condition m_queueNotEmpty
#define BLOCXX_POOL_LOG_FATAL_ERROR(logger, arg)
#define BLOCXX_POOL_LOG_DEBUG3(logger, arg)
This logger just discards all log messages.
This String class is an abstract data type that represents as NULL terminated string of characters.
The ThreadPool class is used to coordinate a group of threads.
bool addWork(const RunnableRef &work)
Add an RunnableRef for the pool to execute.
ThreadPool(PoolType poolType, UInt32 numThreads, UInt32 maxQueueSize, const Logger &logger, const String &poolName="")
Constructor.
bool tryAddWork(const RunnableRef &work)
Add an RunnableRef for the pool to execute.
ThreadPool & operator=(const ThreadPool &x)
void shutdown(EShutdownQueueFlag finishWorkInQueue=E_FINISH_WORK_IN_QUEUE, const Timeout &timeout=Timeout::infinite)
Instruct all threads to exit and stop working.
@ E_DISCARD_WORK_IN_QUEUE
void waitForEmptyQueue()
Wait for the queue to empty out.
IntrusiveReference< ThreadPoolImpl > m_impl
virtual void waitForEmptyQueue()=0
virtual ~ThreadPoolImpl()
virtual void shutdown(ThreadPool::EShutdownQueueFlag finishWorkInQueue, const Timeout &shutdownTimeout, const Timeout &definitiveCancelTimeout)=0
virtual bool addWork(const RunnableRef &work, const Timeout &timeout)=0
A timeout can be absolute, which means that it will happen at the specified DateTime.
static Timeout relative(float seconds)
LazyGlobal< String, char const *const > GlobalString
IntrusiveReference< Runnable > RunnableRef
IntrusiveReference< Thread > ThreadRef
class BLOCXX_COMMON_API Logger
int AtomicGet(Atomic_t const &v)