blocxx
Condition.cpp
Go to the documentation of this file.
1/*******************************************************************************
2* Copyright (C) 2005, Vintela, Inc. All rights reserved.
3* Copyright (C) 2006, Novell, Inc. All rights reserved.
4*
5* Redistribution and use in source and binary forms, with or without
6* modification, are permitted provided that the following conditions are met:
7*
8* * Redistributions of source code must retain the above copyright notice,
9* this list of conditions and the following disclaimer.
10* * Redistributions in binary form must reproduce the above copyright
11* notice, this list of conditions and the following disclaimer in the
12* documentation and/or other materials provided with the distribution.
13* * Neither the name of
14* Vintela, Inc.,
15* nor Novell, Inc.,
16* nor the names of its contributors or employees may be used to
17* endorse or promote products derived from this software without
18* specific prior written permission.
19*
20* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
23* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
24* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
25* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
26* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
27* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
28* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
29* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30* POSSIBILITY OF SUCH DAMAGE.
31*******************************************************************************/
32
33
37#include "blocxx/BLOCXX_config.h"
38#include "blocxx/Condition.hpp"
41#include "blocxx/Timeout.hpp"
43#include "blocxx/ThreadImpl.hpp"
44
45#include <cassert>
46#include <cerrno>
47#include <limits>
48#ifdef BLOCXX_HAVE_SYS_TIME_H
49#include <sys/time.h>
50#endif
51
52namespace BLOCXX_NAMESPACE
53{
54
57#if defined(BLOCXX_USE_PTHREAD)
60{
61 int res = pthread_cond_init(&m_condition, PTHREAD_COND_ATTR_DEFAULT);
62 if (res != 0)
63 {
64 BLOCXX_THROW(ConditionResourceException, "Failed initializing condition variable");
65 }
66}
69{
70 int res = pthread_cond_destroy(&m_condition);
71 assert(res == 0);
72}
74void
76{
77 int res = pthread_cond_signal(&m_condition);
78 assert(res == 0);
79}
81void
83{
84 int res = pthread_cond_broadcast(&m_condition);
85 assert(res == 0);
86}
88void
90{
92 int res;
93 NonRecursiveMutexLockState state;
94 mutex.conditionPreWait(state);
95 res = pthread_cond_wait(&m_condition, state.pmutex);
96 mutex.conditionPostWait(state);
97 assert(res == 0 || res == EINTR);
98 if (res == EINTR)
99 {
101 }
102}
104namespace
105{
106 inline
107 bool timespec_less(struct timespec const & x, struct timespec const & y)
108 {
109 return x.tv_sec < y.tv_sec ||
110 x.tv_sec == y.tv_sec && x.tv_nsec < y.tv_nsec;
111 }
112
113 int check_timedwait(
114 int rc, pthread_cond_t * cond, pthread_mutex_t * mtx,
115 struct timespec const * abstime
116 )
117 {
118#ifdef BLOCXX_NCR
119 if (rc == -1 && errno == EAGAIN)
120 {
121 return ETIMEDOUT;
122 }
123#endif
124 if (rc != EINVAL)
125 {
126 return rc;
127 }
128 // Solaris won't let you wait more than 10 ** 8 seconds.
129 time_t const max_future = 99999999;
130 time_t const max_time = std::numeric_limits<time_t>::max();
131 time_t now_sec = DateTime::getCurrent().get();
132 struct timespec new_abstime;
133 new_abstime.tv_sec = (
134 now_sec <= max_time - max_future
135 ? now_sec + max_future
136 : max_time
137 );
138 new_abstime.tv_nsec = 0;
139 bool early = timespec_less(new_abstime, *abstime);
140 if (!early)
141 {
142 new_abstime = *abstime;
143 }
144 int newrc = pthread_cond_timedwait(cond, mtx, &new_abstime);
145 return (newrc == ETIMEDOUT && early ? EINTR : newrc);
146 }
147}
148
149bool
151{
153 int res;
154 NonRecursiveMutexLockState state;
155 mutex.conditionPreWait(state);
156 bool ret = false;
157
158 timespec ts;
159 TimeoutTimer timer(timeout);
160
161 res = pthread_cond_timedwait(&m_condition, state.pmutex, timer.asTimespec(ts));
162 res = check_timedwait(res, &m_condition, state.pmutex, &ts);
163 mutex.conditionPostWait(state);
164 assert(res == 0 || res == ETIMEDOUT || res == EINTR);
165 if (res == EINTR)
166 {
168 }
169 ret = res != ETIMEDOUT;
170 return ret;
171}
172#elif defined (BLOCXX_WIN32)
175 : m_condition(new ConditionInfo_t)
176{
177 m_condition->waitersCount = 0;
178 m_condition->wasBroadcast = false;
179 m_condition->queue = ::CreateSemaphore(
180 NULL, // No security
181 0, // initially 0
182 0x7fffffff, // max count
183 NULL); // Unnamed
184 ::InitializeCriticalSection(&m_condition->waitersCountLock);
185 m_condition->waitersDone = ::CreateEvent(
186 NULL, // No security
187 false, // auto-reset
188 false, // non-signaled initially
189 NULL); // Unnamed
190}
193{
194 ::CloseHandle(m_condition->queue);
195 ::DeleteCriticalSection(&m_condition->waitersCountLock);
196 ::CloseHandle(m_condition->waitersDone);
197 delete m_condition;
198}
200void
202{
203 ::EnterCriticalSection(&m_condition->waitersCountLock);
204 bool haveWaiters = m_condition->waitersCount > 0;
205 ::LeaveCriticalSection(&m_condition->waitersCountLock);
206
207 // If no threads waiting, then this is a no-op
208 if (haveWaiters)
209 {
210 ::ReleaseSemaphore(m_condition->queue, 1, 0);
211 }
212}
214void
216{
217 ::EnterCriticalSection(&m_condition->waitersCountLock);
218 bool haveWaiters = false;
219 if (m_condition->waitersCount > 0)
220 {
221 // It's gonna be a broadcast, even if there's only one waiting thread.
222 haveWaiters = m_condition->wasBroadcast = true;
223 }
224
225 if (haveWaiters)
226 {
227 // Wake up all the waiting threads atomically
228 ::ReleaseSemaphore(m_condition->queue, m_condition->waitersCount, 0);
229 ::LeaveCriticalSection(&m_condition->waitersCountLock);
230
231 // Wait for all the threads to acquire the counting semaphore
232 ::WaitForSingleObject(m_condition->waitersDone, INFINITE);
233 m_condition->wasBroadcast = false;
234 }
235 else
236 {
237 ::LeaveCriticalSection(&m_condition->waitersCountLock);
238 }
239}
241void
243{
245}
247bool
248//Condition::doTimedWait(NonRecursiveMutex& mutex, UInt32 sTimeout, UInt32 usTimeout)
250{
252 bool cc = true;
253 NonRecursiveMutexLockState state;
254 mutex.conditionPreWait(state);
255
256 ::EnterCriticalSection(&m_condition->waitersCountLock);
257 m_condition->waitersCount++;
258 ::LeaveCriticalSection(&m_condition->waitersCountLock);
259
260 TimeoutTimer timer(timeout);
261 // Atomically release the mutex and wait on the
262 // queue until signal/broadcast.
263 if (::SignalObjectAndWait(mutex.m_mutex, m_condition->queue, timer.asDWORDMs(),
264 false) == WAIT_TIMEOUT)
265 {
266 cc = false;
267 }
268
269 ::EnterCriticalSection(&m_condition->waitersCountLock);
270 m_condition->waitersCount--;
271
272 // Check to see if we're the last waiter after the broadcast
273 bool isLastWaiter = (m_condition->wasBroadcast && m_condition->waitersCount == 0
274 && cc == true);
275
276 ::LeaveCriticalSection(&m_condition->waitersCountLock);
277
278 // If this is the last thread waiting for this broadcast, then let all the
279 // other threads proceed.
280 if (isLastWaiter)
281 {
282 // Atomically signal the waitersDone event and wait to acquire
283 // the external mutex. Enusres fairness
284 ::SignalObjectAndWait(m_condition->waitersDone, mutex.m_mutex,
285 INFINITE, false);
286 }
287 else
288 {
289 // Re-gain ownership of the external mutex
290 ::WaitForSingleObject(mutex.m_mutex, INFINITE);
291 }
292 mutex.conditionPostWait(state);
293 return cc;
294}
295#else
296#error "port me!"
297#endif
299void
301{
302 if (!lock.isLocked())
303 {
304 BLOCXX_THROW(ConditionLockException, "Lock must be locked");
305 }
306 doWait(*(lock.m_mutex));
307}
308
309bool
310Condition::timedWait(NonRecursiveMutexLock& lock, UInt32 sTimeout, UInt32 usTimeout)
311{
312 return timedWait(lock, Timeout::relative(sTimeout + static_cast<float>(usTimeout) / 1000000.0));
313}
314
316bool
318{
319 if (!lock.isLocked())
320 {
321 BLOCXX_THROW(ConditionLockException, "Lock must be locked");
322 }
323 return doTimedWait(*(lock.m_mutex), timeout);
324}
325
326} // end namespace BLOCXX_NAMESPACE
327
#define BLOCXX_DEFINE_EXCEPTION_WITH_ID(NAME)
Define a new exception class named <NAME>Exception that derives from Exception.
#define BLOCXX_THROW(exType, msg)
Throw an exception using FILE and LINE.
#define ETIMEDOUT
~Condition()
Destroy this Condition object.
void wait(NonRecursiveMutexLock &lock)
Atomically unlock a given mutex and wait for the this Condition object to get signalled.
void notifyAll()
Signal all threads that are currently waiting on the Condition object.
bool timedWait(NonRecursiveMutexLock &lock, const Timeout &timeout)
Atomically unlock a given mutex and wait for a given amount of time for this Condition object to get ...
Condition()
Construct a new Condition object.
bool doTimedWait(NonRecursiveMutex &mutex, const Timeout &timeout)
void notifyOne()
Signal one thread that is currently waiting on the Condition object through the wait or timedWait met...
void doWait(NonRecursiveMutex &mutex)
ConditionResourceException(const char *file, int line, const char *msg, int errorCode=::BLOCXX_NAMESPACE::Exception::UNKNOWN_ERROR_CODE, const Exception *otherException=0, int subClassId=::BLOCXX_NAMESPACE::Exception::UNKNOWN_SUBCLASS_ID)
Definition Condition.cpp:56
static DateTime getCurrent()
Gets a DateTime instance set to the current system time.
Note that descriptions of what exceptions may be thrown assumes that object is used correctly,...
Note that descriptions of what exceptions may be thrown assumes that object is used correctly,...
A timeout can be absolute, which means that it will happen at the specified DateTime.
Definition Timeout.hpp:56
static Timeout relative(float seconds)
Definition Timeout.cpp:58
static Timeout infinite
Definition Timeout.hpp:62
BLOCXX_COMMON_API void testCancel()
Test if this thread has been cancelled.
Taken from RFC 1321.