blocxx
ThreadPool.cpp
Go to the documentation of this file.
1/*******************************************************************************
2* Copyright (C) 2005, Vintela, Inc. All rights reserved.
3* Copyright (C) 2006, Novell, Inc. All rights reserved.
4*
5* Redistribution and use in source and binary forms, with or without
6* modification, are permitted provided that the following conditions are met:
7*
8* * Redistributions of source code must retain the above copyright notice,
9* this list of conditions and the following disclaimer.
10* * Redistributions in binary form must reproduce the above copyright
11* notice, this list of conditions and the following disclaimer in the
12* documentation and/or other materials provided with the distribution.
13* * Neither the name of
14* Vintela, Inc.,
15* nor Novell, Inc.,
16* nor the names of its contributors or employees may be used to
17* endorse or promote products derived from this software without
18* specific prior written permission.
19*
20* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
23* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
24* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
25* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
26* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
27* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
28* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
29* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30* POSSIBILITY OF SUCH DAMAGE.
31*******************************************************************************/
32
33
37
38#include "blocxx/BLOCXX_config.h"
39#include "blocxx/ThreadPool.hpp"
40#include "blocxx/Array.hpp"
41#include "blocxx/Thread.hpp"
44#include "blocxx/Condition.hpp"
45#include "blocxx/Format.hpp"
46#include "blocxx/Mutex.hpp"
47#include "blocxx/MutexLock.hpp"
48#include "blocxx/NullLogger.hpp"
49#include "blocxx/Timeout.hpp"
52
53#include <deque>
54
55#ifdef BLOCXX_DEBUG
56#include <iostream> // for cerr
57#endif
58
59namespace BLOCXX_NAMESPACE
60{
61
63
64// logger can be null
65#define BLOCXX_POOL_LOG_DEBUG(logger, arg) do { BLOCXX_LOG_DEBUG(logger, m_poolName + ": " + arg); } while (0)
66#define BLOCXX_POOL_LOG_DEBUG2(logger, arg) do { BLOCXX_LOG_DEBUG2(logger, m_poolName + ": " + arg); } while (0)
67#define BLOCXX_POOL_LOG_DEBUG3(logger, arg) do { BLOCXX_LOG_DEBUG3(logger, m_poolName + ": " + arg); } while (0)
68#define BLOCXX_POOL_LOG_ERROR(logger, arg) do { BLOCXX_LOG_ERROR(logger, m_poolName + ": " + arg); } while (0)
69#define BLOCXX_POOL_LOG_FATAL_ERROR(logger, arg) do { BLOCXX_LOG_FATAL_ERROR(logger, m_poolName + ": " + arg); } while (0)
70
73{
74public:
75 // returns true if work is placed in the queue to be run and false if not.
76 virtual bool addWork(const RunnableRef& work, const Timeout& timeout) = 0;
77 virtual void shutdown(ThreadPool::EShutdownQueueFlag finishWorkInQueue, const Timeout& shutdownTimeout, const Timeout& definitiveCancelTimeout) = 0;
78 virtual void waitForEmptyQueue() = 0;
80 {
81 }
82};
83namespace {
84
85GlobalString COMPONENT_NAME = BLOCXX_GLOBAL_STRING_INIT("blocxx.ThreadPool");
86
87class FixedSizePoolImpl;
89class FixedSizePoolWorkerThread : public Thread
90{
91public:
92 FixedSizePoolWorkerThread(FixedSizePoolImpl* thePool)
93 : Thread()
94 , m_thePool(thePool)
95 {
96 }
97 virtual Int32 run();
98private:
99 virtual void doShutdown()
100 {
101 MutexLock lock(m_guard);
102 if (m_currentRunnable)
103 {
104 m_currentRunnable->doShutdown();
105 }
106 }
107 virtual void doCooperativeCancel()
108 {
109 MutexLock lock(m_guard);
110 if (m_currentRunnable)
111 {
112 m_currentRunnable->doCooperativeCancel();
113 }
114 }
115 virtual void doDefinitiveCancel()
116 {
117 MutexLock lock(m_guard);
118 if (m_currentRunnable)
119 {
120 m_currentRunnable->doDefinitiveCancel();
121 }
122 }
123
124 FixedSizePoolImpl* m_thePool;
125
126 Mutex m_guard;
127 RunnableRef m_currentRunnable;
128
129 // non-copyable
130 FixedSizePoolWorkerThread(const FixedSizePoolWorkerThread&);
131 FixedSizePoolWorkerThread& operator=(const FixedSizePoolWorkerThread&);
132};
134class CommonPoolImpl : public ThreadPoolImpl
135{
136protected:
137 CommonPoolImpl(UInt32 maxQueueSize, const Logger& logger, const String& poolName)
138 : m_maxQueueSize(maxQueueSize)
139 , m_queueClosed(false)
140 , m_shutdown(false)
141 , m_logger(logger)
142 , m_poolName(poolName)
143 {
144 }
145
146 virtual ~CommonPoolImpl()
147 {
148 }
149
150 // assumes that m_queueLock is locked. DynamicSizeNoQueuePoolImpl overrides this.
151 virtual bool queueIsFull() const
152 {
153 return ((m_maxQueueSize > 0) && (m_queue.size() == m_maxQueueSize));
154 }
155
156 // assumes that m_queueLock is locked
157 bool queueClosed() const
158 {
159 return m_shutdown || m_queueClosed;
160 }
161
162 bool finishOffWorkInQueue(ThreadPool::EShutdownQueueFlag finishWorkInQueue, const Timeout& timeout)
163 {
164 NonRecursiveMutexLock l(m_queueLock);
165 // the pool is in the process of being destroyed
166 if (queueClosed())
167 {
168 BLOCXX_POOL_LOG_DEBUG2(m_logger, "Queue is already closed. Why are you trying to shutdown again?");
169 return false;
170 }
171 m_queueClosed = true;
172 BLOCXX_POOL_LOG_DEBUG2(m_logger, "Queue closed");
173
174 if (finishWorkInQueue)
175 {
176 TimeoutTimer timer(timeout);
177 while (m_queue.size() != 0)
178 {
179 if (timer.infinite())
180 {
181 BLOCXX_POOL_LOG_DEBUG2(m_logger, "Waiting forever for queue to empty");
182 m_queueEmpty.wait(l);
183 }
184 else
185 {
186 BLOCXX_POOL_LOG_DEBUG2(m_logger, "Waiting w/timout for queue to empty");
187 if (!m_queueEmpty.timedWait(l, timer.asAbsoluteTimeout()))
188 {
189 BLOCXX_POOL_LOG_DEBUG2(m_logger, "Wait timed out. Work in queue will be discarded.");
190 break; // timed out
191 }
192 }
193 }
194 }
195 m_shutdown = true;
196 return true;
197 }
198
199 virtual void waitForEmptyQueue()
200 {
201 NonRecursiveMutexLock l(m_queueLock);
202 while (m_queue.size() != 0)
203 {
204 BLOCXX_POOL_LOG_DEBUG2(m_logger, "Waiting for empty queue");
205 m_queueEmpty.wait(l);
206 }
207 BLOCXX_POOL_LOG_DEBUG2(m_logger, "Queue empty: the wait is over");
208 }
209
210 void shutdownThreads(ThreadPool::EShutdownQueueFlag finishWorkInQueue, const Timeout& shutdownTimeout, const Timeout& definitiveCancelTimeout)
211 {
212 TimeoutTimer shutdownTimer(shutdownTimeout);
213 TimeoutTimer dTimer(definitiveCancelTimeout);
214 if (!finishOffWorkInQueue(finishWorkInQueue, shutdownTimer.asAbsoluteTimeout()))
215 {
216 return;
217 }
218
219 // Wake up any workers so they recheck shutdown flag
220 m_queueNotEmpty.notifyAll();
221 m_queueNotFull.notifyAll();
222
223 if (!shutdownTimer.infinite())
224 {
225 // Tell all the threads to shutdown
226 for (UInt32 i = 0; i < m_threads.size(); ++i)
227 {
228 BLOCXX_POOL_LOG_DEBUG(m_logger, Format("Calling shutdown on thread %1", i));
229 m_threads[i]->shutdown();
230 }
231
232 // Wait until shutdownTimeout for the threads to finish
233 Timeout absoluteShutdownTimeout(shutdownTimer.asAbsoluteTimeout());
234 for (UInt32 i = 0; i < m_threads.size(); ++i)
235 {
236 BLOCXX_POOL_LOG_DEBUG2(m_logger, Format("Waiting for thread %1 to exit.", i));
237 m_threads[i]->timedWait(absoluteShutdownTimeout);
238 }
239
240 // Tell all the threads to cooperative cancel
241 for (UInt32 i = 0; i < m_threads.size(); ++i)
242 {
243 BLOCXX_POOL_LOG_DEBUG2(m_logger, Format("Calling cooperativeCancel on thread %1", i));
244 m_threads[i]->cooperativeCancel();
245 }
246
247 if (!dTimer.infinite())
248 {
249 // If any still haven't shut down, definitiveCancel will kill them.
250 Timeout absoluteDefinitiveTimeout(dTimer.asAbsoluteTimeout());
251 for (UInt32 i = 0; i < m_threads.size(); ++i)
252 {
253 BLOCXX_POOL_LOG_DEBUG2(m_logger, Format("Calling definitiveCancel on thread %1", i));
254 try
255 {
256 if (!m_threads[i]->definitiveCancel(absoluteDefinitiveTimeout))
257 {
258 BLOCXX_POOL_LOG_FATAL_ERROR(m_logger, Format("Thread %1 was forcibly cancelled.", i));
259 }
260 }
261 catch (CancellationDeniedException& e)
262 {
263 BLOCXX_POOL_LOG_ERROR(m_logger, Format("Caught CanacellationDeniedException: %1 for thread %2. Pool shutdown may hang.", e, i));
264 }
265 }
266 }
267
268 }
269
270 // Clean up after the threads and/or wait for them to exit.
271 for (UInt32 i = 0; i < m_threads.size(); ++i)
272 {
273 BLOCXX_POOL_LOG_DEBUG2(m_logger, Format("calling join() on thread %1", i));
274 m_threads[i]->join();
275 BLOCXX_POOL_LOG_DEBUG2(m_logger, Format("join() finished for thread %1", i));
276 }
277 }
278
279 RunnableRef getWorkFromQueue(bool waitForWork)
280 {
281 NonRecursiveMutexLock l(m_queueLock);
282 while ((m_queue.size() == 0) && (!m_shutdown))
283 {
284 if (waitForWork)
285 {
286 BLOCXX_POOL_LOG_DEBUG3(m_logger, "Waiting for work");
287 m_queueNotEmpty.wait(l);
288 }
289 else
290 {
291 // wait 1 sec for work, to more efficiently handle a stream
292 // of single requests.
293 if (!m_queueNotEmpty.timedWait(l,Timeout::relative(1)))
294 {
295 BLOCXX_POOL_LOG_DEBUG3(m_logger, "No work after 1 sec. I'm not waiting any longer");
296 return RunnableRef();
297 }
298 }
299 }
300 // check to see if a shutdown started while the thread was sleeping
301 if (m_shutdown)
302 {
303 BLOCXX_POOL_LOG_DEBUG(m_logger, "The pool is shutdown, not getting any more work");
304 return RunnableRef();
305 }
306
307 RunnableRef work = m_queue.front();
308 m_queue.pop_front();
309
310 // This needs to happen before the call to queueIsFull() because the worker count can affect the result of queueIsFull()
311 incrementWorkerCount();
312 // handle threads waiting in addWork().
313 if (!queueIsFull())
314 {
315 m_queueNotFull.notifyAll();
316 }
317
318 // handle waiting shutdown thread or callers of waitForEmptyQueue()
319 if (m_queue.size() == 0)
320 {
321 m_queueEmpty.notifyAll();
322 }
323 BLOCXX_POOL_LOG_DEBUG3(m_logger, "A thread got some work to do");
324 return work;
325 }
326
327 // hooks for DynamicSizeNoQueuePoolImpl subclass. Yes this is a horrible design, it just saves code duplication.
328 virtual void incrementWorkerCount()
329 {
330 }
331
332 virtual void decrementWorkerCount()
333 {
334 }
335
336 // pool characteristics
337 UInt32 m_maxQueueSize;
338 // pool state
339 Array<ThreadRef> m_threads;
340 std::deque<RunnableRef> m_queue;
341 bool m_queueClosed;
342 bool m_shutdown;
343 // pool synchronization
344 NonRecursiveMutex m_queueLock;
345 Condition m_queueNotFull;
346 Condition m_queueEmpty;
347 Condition m_queueNotEmpty;
348 Logger m_logger;
349 String m_poolName;
350};
351class FixedSizePoolImpl : public CommonPoolImpl
352{
353public:
354 FixedSizePoolImpl(UInt32 numThreads, UInt32 maxQueueSize, const Logger& logger, const String& poolName)
355 : CommonPoolImpl(maxQueueSize, logger, poolName)
356 {
357 // create the threads and start them up.
358 m_threads.reserve(numThreads);
359 for (UInt32 i = 0; i < numThreads; ++i)
360 {
361 m_threads.push_back(ThreadRef(new FixedSizePoolWorkerThread(this)));
362 }
363 for (UInt32 i = 0; i < numThreads; ++i)
364 {
365 try
366 {
367 m_threads[i]->start();
368 }
369 catch (ThreadException& e)
370 {
371 BLOCXX_POOL_LOG_ERROR(m_logger, Format("Failed to start thread #%1: %2", i, e));
372 m_threads.resize(i); // remove non-started threads
373 // shutdown the rest
374 this->FixedSizePoolImpl::shutdown(ThreadPool::E_DISCARD_WORK_IN_QUEUE, Timeout::relative(0.5), Timeout::relative(0.5));
375 throw;
376 }
377 }
378 BLOCXX_POOL_LOG_DEBUG(m_logger, "Threads are started and ready to go");
379 }
380 // returns true if work is placed in the queue to be run and false if not.
381 virtual bool addWork(const RunnableRef& work, const Timeout& timeout)
382 {
383 // check precondition: work != NULL
384 if (!work)
385 {
386 BLOCXX_POOL_LOG_DEBUG(m_logger, "Trying to add NULL work! Shame on you.");
387 return false;
388 }
389
390 NonRecursiveMutexLock l(m_queueLock);
391 TimeoutTimer timer(timeout);
392 while ( queueIsFull() && !queueClosed() )
393 {
394 BLOCXX_POOL_LOG_DEBUG3(m_logger, "Queue is full. Waiting until a spot opens up so we can add some work");
395 if (!m_queueNotFull.timedWait(l, timer.asAbsoluteTimeout()))
396 {
397 // timed out
398 BLOCXX_POOL_LOG_DEBUG3(m_logger, "Queue is full and timeout expired. Not adding work and returning false");
399 return false;
400 }
401 }
402
403 // the pool is in the process of being destroyed
404 if (queueClosed())
405 {
406 BLOCXX_POOL_LOG_DEBUG3(m_logger, "Queue was closed out from underneath us. Not adding work and returning false");
407 return false;
408 }
409
410 m_queue.push_back(work);
411
412 // if the queue was empty, there may be workers just sitting around, so we need to wake them up!
413 if (m_queue.size() == 1)
414 {
415 BLOCXX_POOL_LOG_DEBUG3(m_logger, "Waking up sleepy workers");
416 m_queueNotEmpty.notifyAll();
417 }
418
419 BLOCXX_POOL_LOG_DEBUG(m_logger, "Work has been added to the queue");
420 return true;
421 }
422
423 // we keep this around so it can be called in the destructor
424 virtual void shutdown(ThreadPool::EShutdownQueueFlag finishWorkInQueue, const Timeout& shutdownTimeout, const Timeout& definitiveCancelTimeout)
425 {
426 shutdownThreads(finishWorkInQueue, shutdownTimeout, definitiveCancelTimeout);
427 }
428 virtual ~FixedSizePoolImpl()
429 {
430 // can't let exception escape the destructor
431 try
432 {
433 // don't need a lock here, because we're the only thread left.
434 if (!queueClosed())
435 {
436 // Make sure the pool is shutdown.
437 // Specify which shutdown() we want so we don't get undefined behavior calling a virtual function from the destructor.
438 this->FixedSizePoolImpl::shutdown(ThreadPool::E_DISCARD_WORK_IN_QUEUE, Timeout::relative(0.5), Timeout::relative(0.5));
439 }
440 }
441 catch (...)
442 {
443 }
444 }
445private:
446 friend class FixedSizePoolWorkerThread;
447};
448void runRunnable(const RunnableRef& work)
449{
450 // don't let exceptions escape, we need to keep going, except for ThreadCancelledException, in which case we need to stop.
451 try
452 {
453 work->run();
454 }
456 {
457 throw;
458 }
459 catch (Exception& ex)
460 {
461#ifdef BLOCXX_DEBUG
462 std::clog << "!!! Exception: " << ex.type() << " caught in ThreadPool worker: " << ex << std::endl;
463#endif
464 Logger logger(COMPONENT_NAME);
465 BLOCXX_LOG_ERROR(logger, Format("!!! Exception caught in ThreadPool worker: %1", ex));
466 }
467 catch(std::exception& ex)
468 {
469#ifdef BLOCXX_DEBUG
470 std::clog << "!!! std::exception what = \"" << ex.what() << "\" caught in ThreadPool worker" << std::endl;
471#endif
472 Logger logger(COMPONENT_NAME);
473 BLOCXX_LOG_ERROR(logger, Format("!!! std::exception caught in ThreadPool worker: %1", ex.what()));
474 }
475 catch (...)
476 {
477#ifdef BLOCXX_DEBUG
478 std::clog << "!!! Unknown Exception caught in ThreadPool worker" << std::endl;
479#endif
480 Logger logger(COMPONENT_NAME);
481 BLOCXX_LOG_ERROR(logger, "!!! Unknown Exception caught in ThreadPool worker.");
482 }
483}
484Int32 FixedSizePoolWorkerThread::run()
485{
486 while (true)
487 {
488 // check queue for work
489 RunnableRef work = m_thePool->getWorkFromQueue(true);
490 if (!work)
491 {
492 return 0;
493 }
494 // save this off so it can be cancelled by another thread.
495 {
496 MutexLock lock(m_guard);
497 m_currentRunnable = work;
498 }
499 runRunnable(work);
500 {
501 MutexLock lock(m_guard);
502 m_currentRunnable = 0;
503 }
504 }
505 return 0;
506}
507class DynamicSizePoolImpl;
509class DynamicSizePoolWorkerThread : public Thread
510{
511public:
512 DynamicSizePoolWorkerThread(DynamicSizePoolImpl* thePool)
513 : Thread()
514 , m_thePool(thePool)
515 {
516 }
517 virtual Int32 run();
518private:
519 virtual void doShutdown()
520 {
521 MutexLock lock(m_guard);
522 if (m_currentRunnable)
523 {
524 m_currentRunnable->doShutdown();
525 }
526 }
527 virtual void doCooperativeCancel()
528 {
529 MutexLock lock(m_guard);
530 if (m_currentRunnable)
531 {
532 m_currentRunnable->doCooperativeCancel();
533 }
534 }
535 virtual void doDefinitiveCancel()
536 {
537 MutexLock lock(m_guard);
538 if (m_currentRunnable)
539 {
540 m_currentRunnable->doDefinitiveCancel();
541 }
542 }
543
544 DynamicSizePoolImpl* m_thePool;
545
546 Mutex m_guard;
547 RunnableRef m_currentRunnable;
548
549 // non-copyable
550 DynamicSizePoolWorkerThread(const DynamicSizePoolWorkerThread&);
551 DynamicSizePoolWorkerThread& operator=(const DynamicSizePoolWorkerThread&);
552};
554class DynamicSizePoolImpl : public CommonPoolImpl
555{
556public:
557 DynamicSizePoolImpl(UInt32 maxThreads, UInt32 maxQueueSize, const Logger& logger, const String& poolName)
558 : CommonPoolImpl(maxQueueSize, logger, poolName)
559 , m_maxThreads(maxThreads)
560 {
561 }
562 // returns true if work is placed in the queue to be run and false if not.
563 virtual bool addWork(const RunnableRef& work, const Timeout& timeout)
564 {
565 // check precondition: work != NULL
566 if (!work)
567 {
568 BLOCXX_POOL_LOG_DEBUG(m_logger, "Trying to add NULL work! Shame on you.");
569 return false;
570 }
571 NonRecursiveMutexLock l(m_queueLock);
572
573 // the pool is in the process of being destroyed
574 if (queueClosed())
575 {
576 BLOCXX_POOL_LOG_DEBUG3(m_logger, "Queue was closed out from underneath us. Not adding work and returning false");
577 return false;
578 }
579
580 // Can't touch m_threads until *after* we check for the queue being closed, shutdown
581 // requires that m_threads not change after the queue is closed.
582 // Now clean up dead threads (before we add the new one, so we don't need to check it)
583 size_t i = 0;
584 while (i < m_threads.size())
585 {
586 if (!m_threads[i]->isRunning())
587 {
588 BLOCXX_POOL_LOG_DEBUG3(m_logger, Format("Thread %1 is finished. Cleaning up it's remains.", i));
589 m_threads[i]->join();
590 m_threads.remove(i);
591 }
592 else
593 {
594 ++i;
595 }
596 }
597
598 TimeoutTimer timer(timeout);
599 while ( queueIsFull() && !queueClosed() )
600 {
601 BLOCXX_POOL_LOG_DEBUG3(m_logger, "Queue is full. Waiting until a spot opens up so we can add some work");
602 if (!m_queueNotFull.timedWait(l, timer.asAbsoluteTimeout()))
603 {
604 // timed out
605 BLOCXX_POOL_LOG_DEBUG3(m_logger, "Queue is full and timeout expired. Not adding work and returning false");
606 return false;
607 }
608 }
609
610 // The previous loop could have ended because a spot opened in the queue or it was closed. Check for the close.
611 if (queueClosed())
612 {
613 BLOCXX_POOL_LOG_DEBUG3(m_logger, "Queue was closed out from underneath us. Not adding work and returning false");
614 return false;
615 }
616
617 m_queue.push_back(work);
618
619 BLOCXX_POOL_LOG_DEBUG(m_logger, "Work has been added to the queue");
620
621 // Release the lock and wake up a thread waiting for work in the queue
622 // This bit of code is a race condition with the thread,
623 // but if we acquire the lock again before it does, then we
624 // properly handle that case. The only disadvantage if we win
625 // the "race" is that we'll unnecessarily start a new thread.
626 // In practice it works all the time.
627 l.release();
628 m_queueNotEmpty.notifyOne();
629 Thread::yield(); // give the thread a chance to run
630 l.lock();
631
632 // Start up a new thread to handle the work in the queue.
633 if (!m_queue.empty() && m_threads.size() < m_maxThreads)
634 {
635 ThreadRef theThread(new DynamicSizePoolWorkerThread(this));
636 m_threads.push_back(theThread);
637 BLOCXX_POOL_LOG_DEBUG3(m_logger, "About to start a new thread");
638 try
639 {
640 theThread->start();
641 }
642 catch (ThreadException& e)
643 {
644 BLOCXX_POOL_LOG_ERROR(m_logger, Format("Failed to start thread: %1", e));
645 m_threads.pop_back();
646 throw;
647 }
648 BLOCXX_POOL_LOG_DEBUG2(m_logger, "New thread started");
649 }
650 return true;
651 }
652
653 // we keep this around so it can be called in the destructor
654 virtual void shutdown(ThreadPool::EShutdownQueueFlag finishWorkInQueue, const Timeout& shutdownTimeout, const Timeout& definitiveCancelTimeout)
655 {
656 shutdownThreads(finishWorkInQueue, shutdownTimeout, definitiveCancelTimeout);
657 }
658 virtual ~DynamicSizePoolImpl()
659 {
660 // can't let exception escape the destructor
661 try
662 {
663 // don't need a lock here, because we're the only thread left.
664 if (!queueClosed())
665 {
666 // Make sure the pool is shutdown.
667 // Specify which shutdown() we want so we don't get undefined behavior calling a virtual function from the destructor.
668 this->DynamicSizePoolImpl::shutdown(ThreadPool::E_DISCARD_WORK_IN_QUEUE, Timeout::relative(0.5), Timeout::relative(0.5));
669 }
670 }
671 catch (...)
672 {
673 }
674 }
675
676protected:
677 UInt32 getMaxThreads() const
678 {
679 return m_maxThreads;
680 }
681
682private:
683 // pool characteristics
684 UInt32 m_maxThreads;
685 friend class DynamicSizePoolWorkerThread;
686};
687Int32 DynamicSizePoolWorkerThread::run()
688{
689 while (true)
690 {
691 // check queue for work
692 RunnableRef work = m_thePool->getWorkFromQueue(false);
693 if (!work)
694 {
695 return 0;
696 }
697 // save this off so it can be cancelled by another thread.
698 {
699 MutexLock lock(m_guard);
700 m_currentRunnable = work;
701 }
702 runRunnable(work);
703 m_thePool->decrementWorkerCount();
704 {
705 MutexLock lock(m_guard);
706 m_currentRunnable = 0;
707 }
708 }
709 return 0;
710}
711
713class DynamicSizeNoQueuePoolImpl : public DynamicSizePoolImpl
714{
715public:
716 DynamicSizeNoQueuePoolImpl(UInt32 maxThreads, const Logger& logger, const String& poolName)
717 : DynamicSizePoolImpl(maxThreads, maxThreads, logger, poolName) // allow queue in superclass, but prevent it from having any backlog
718 , m_workingThreads(0)
719 {
720 }
721
722 virtual ~DynamicSizeNoQueuePoolImpl()
723 {
724 }
725
726 virtual void incrementWorkerCount()
727 {
728 ++m_workingThreads;
729 }
730
731 virtual void decrementWorkerCount()
732 {
733 NonRecursiveMutexLock lock(m_queueLock);
734 --m_workingThreads;
735 // wake up any threads waiting to start some work
736 m_queueNotFull.notifyAll();
737 }
738
739 // One difference between this class and DynamicSizePoolImpl is that we change the definition of queueIsFull()
740 virtual bool queueIsFull() const
741 {
742 // don't let the queue get bigger than the number of free threads. This effectively prevents work from being
743 // queued up which can't be immediately serviced.
744 size_t freeThreads = getMaxThreads() - AtomicGet(m_workingThreads);
745 return (freeThreads <= m_queue.size());
746 }
747
748private:
749 // Keep track of the number of threads doing work. Protected by m_guard
750 size_t m_workingThreads;
751
752};
753
754} // end anonymous namespace
756ThreadPool::ThreadPool(PoolType poolType, UInt32 numThreads, UInt32 maxQueueSize, const String& poolName)
757{
758 NullLogger logger;
759 switch (poolType)
760 {
761 case FIXED_SIZE:
762 m_impl = new FixedSizePoolImpl(numThreads, maxQueueSize, logger, poolName);
763 break;
764 case DYNAMIC_SIZE:
765 m_impl = new DynamicSizePoolImpl(numThreads, maxQueueSize, logger, poolName);
766 break;
768 m_impl = new DynamicSizeNoQueuePoolImpl(numThreads, logger, poolName);
769 break;
770 }
771}
772
773ThreadPool::ThreadPool(PoolType poolType, UInt32 numThreads, UInt32 maxQueueSize, const Logger& logger, const String& poolName)
774{
775 switch (poolType)
776 {
777 case FIXED_SIZE:
778 m_impl = new FixedSizePoolImpl(numThreads, maxQueueSize, logger, poolName);
779 break;
780 case DYNAMIC_SIZE:
781 m_impl = new DynamicSizePoolImpl(numThreads, maxQueueSize, logger, poolName);
782 break;
784 m_impl = new DynamicSizeNoQueuePoolImpl(numThreads, logger, poolName);
785 break;
786 }
787}
788
790{
791 return m_impl->addWork(work, Timeout::infinite);
792}
793
795{
796 return m_impl->addWork(work, Timeout::relative(0));
797}
798
799bool ThreadPool::tryAddWork(const RunnableRef& work, const Timeout& timeout)
800{
801 return m_impl->addWork(work, timeout);
802}
803
804void ThreadPool::shutdown(EShutdownQueueFlag finishWorkInQueue, int shutdownSecs)
805{
806 m_impl->shutdown(finishWorkInQueue, Timeout::relative(shutdownSecs), Timeout::relative(shutdownSecs));
807}
808
809void ThreadPool::shutdown(EShutdownQueueFlag finishWorkInQueue, const Timeout& timeout)
810{
811 m_impl->shutdown(finishWorkInQueue, timeout, timeout);
812}
813
814void ThreadPool::shutdown(EShutdownQueueFlag finishWorkInQueue, const Timeout& shutdownTimeout, const Timeout& definitiveCancelTimeout)
815{
816 m_impl->shutdown(finishWorkInQueue, shutdownTimeout, definitiveCancelTimeout);
817}
818
820{
821 m_impl->waitForEmptyQueue();
822}
823
827
833
835{
836 m_impl = x.m_impl;
837 return *this;
838}
839
840} // end namespace BLOCXX_NAMESPACE
841
#define BLOCXX_DEFINE_EXCEPTION(NAME)
Define a new exception class named <NAME>Exception that derives from Exception.
#define BLOCXX_GLOBAL_STRING_INIT(str)
#define BLOCXX_LOG_ERROR(logger, message)
Log message to logger with the Error level.
Definition Logger.hpp:433
#define BLOCXX_POOL_LOG_DEBUG(logger, arg)
#define BLOCXX_POOL_LOG_ERROR(logger, arg)
#define BLOCXX_POOL_LOG_DEBUG2(logger, arg)
#define BLOCXX_POOL_LOG_FATAL_ERROR(logger, arg)
#define BLOCXX_POOL_LOG_DEBUG3(logger, arg)
This class is the base of all exceptions thrown by BloCxx code.
Definition Exception.hpp:66
Logging interface.
Definition Logger.hpp:87
This logger just discards all log messages.
virtual void doShutdown()
This function is available for subclasses of Thread to override if they wish to be notified when shut...
Definition Runnable.cpp:53
virtual void doCooperativeCancel()
This function is available for subclasses to override if they wish to be notified when a cooperative ...
Definition Runnable.cpp:59
virtual void doDefinitiveCancel()
See the documentation for doCooperativeCancel().
Definition Runnable.cpp:65
This String class is an abstract data type that represents as NULL terminated string of characters.
Definition String.hpp:67
The ThreadPool class is used to coordinate a group of threads.
bool addWork(const RunnableRef &work)
Add an RunnableRef for the pool to execute.
ThreadPool(PoolType poolType, UInt32 numThreads, UInt32 maxQueueSize, const Logger &logger, const String &poolName="")
Constructor.
bool tryAddWork(const RunnableRef &work)
Add an RunnableRef for the pool to execute.
ThreadPool & operator=(const ThreadPool &x)
void shutdown(EShutdownQueueFlag finishWorkInQueue=E_FINISH_WORK_IN_QUEUE, const Timeout &timeout=Timeout::infinite)
Instruct all threads to exit and stop working.
void waitForEmptyQueue()
Wait for the queue to empty out.
IntrusiveReference< ThreadPoolImpl > m_impl
virtual void waitForEmptyQueue()=0
virtual void shutdown(ThreadPool::EShutdownQueueFlag finishWorkInQueue, const Timeout &shutdownTimeout, const Timeout &definitiveCancelTimeout)=0
virtual bool addWork(const RunnableRef &work, const Timeout &timeout)=0
A timeout can be absolute, which means that it will happen at the specified DateTime.
Definition Timeout.hpp:56
static Timeout relative(float seconds)
Definition Timeout.cpp:58
static Timeout infinite
Definition Timeout.hpp:62
Taken from RFC 1321.
LazyGlobal< String, char const *const > GlobalString
IntrusiveReference< Runnable > RunnableRef
IntrusiveReference< Thread > ThreadRef
Definition CommonFwd.hpp:92
class BLOCXX_COMMON_API Logger
Definition CommonFwd.hpp:63
int AtomicGet(Atomic_t const &v)
Definition AtomicOps.cpp:70
In the event a thread has been cancelled, a ThreadCancelledException will be thrown.