blocxx
Condition.cpp
Go to the documentation of this file.
1/*******************************************************************************
2* Copyright (C) 2005, Vintela, Inc. All rights reserved.
3* Copyright (C) 2006, Novell, Inc. All rights reserved.
4*
5* Redistribution and use in source and binary forms, with or without
6* modification, are permitted provided that the following conditions are met:
7*
8* * Redistributions of source code must retain the above copyright notice,
9* this list of conditions and the following disclaimer.
10* * Redistributions in binary form must reproduce the above copyright
11* notice, this list of conditions and the following disclaimer in the
12* documentation and/or other materials provided with the distribution.
13* * Neither the name of
14* Vintela, Inc.,
15* nor Novell, Inc.,
16* nor the names of its contributors or employees may be used to
17* endorse or promote products derived from this software without
18* specific prior written permission.
19*
20* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
23* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
24* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
25* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
26* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
27* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
28* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
29* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30* POSSIBILITY OF SUCH DAMAGE.
31*******************************************************************************/
32
33
37#include "blocxx/BLOCXX_config.h"
38#include "blocxx/Condition.hpp"
41#include "blocxx/Timeout.hpp"
43#include "blocxx/ThreadImpl.hpp"
44
45#include <cassert>
46#include <cerrno>
47#include <limits>
48#ifdef BLOCXX_HAVE_SYS_TIME_H
49#include <sys/time.h>
50#endif
51
52namespace BLOCXX_NAMESPACE
53{
54
57#if defined(BLOCXX_USE_PTHREAD)
60{
61 int res = pthread_cond_init(&m_condition, PTHREAD_COND_ATTR_DEFAULT);
62 if (res != 0)
63 {
64 BLOCXX_THROW(ConditionResourceException, "Failed initializing condition variable");
65 }
66}
69{
70 int res = pthread_cond_destroy(&m_condition);
71 assert(res == 0);
72}
74void
76{
77 int res = pthread_cond_signal(&m_condition);
78 assert(res == 0);
79}
81void
83{
84 int res = pthread_cond_broadcast(&m_condition);
85 assert(res == 0);
86}
88void
89Condition::doWait(NonRecursiveMutex& mutex)
90{
92 int res;
93 NonRecursiveMutexLockState state;
94 mutex.conditionPreWait(state);
95 res = pthread_cond_wait(&m_condition, state.pmutex);
96 mutex.conditionPostWait(state);
97 assert(res == 0 || res == EINTR);
98 if (res == EINTR)
99 {
101 }
102}
104namespace
105{
106 inline
107 bool timespec_less(struct timespec const & x, struct timespec const & y)
108 {
109 return x.tv_sec < y.tv_sec ||
110 x.tv_sec == y.tv_sec && x.tv_nsec < y.tv_nsec;
111 }
112
113 int check_timedwait(
114 int rc, pthread_cond_t * cond, pthread_mutex_t * mtx,
115 struct timespec const * abstime
116 )
117 {
118#ifdef BLOCXX_NCR
119 if (rc == -1 && errno == EAGAIN)
120 {
121 return ETIMEDOUT;
122 }
123#endif
124 if (rc != EINVAL)
125 {
126 return rc;
127 }
128 // Solaris won't let you wait more than 10 ** 8 seconds.
129 time_t const max_future = 99999999;
130 time_t const max_time = std::numeric_limits<time_t>::max();
131 time_t now_sec = DateTime::getCurrent().get();
132 struct timespec new_abstime;
133 new_abstime.tv_sec = (
134 now_sec <= max_time - max_future
135 ? now_sec + max_future
136 : max_time
137 );
138 new_abstime.tv_nsec = 0;
139 bool early = timespec_less(new_abstime, *abstime);
140 if (!early)
141 {
142 new_abstime = *abstime;
143 }
144 int newrc = pthread_cond_timedwait(cond, mtx, &new_abstime);
145 return (newrc == ETIMEDOUT && early ? EINTR : newrc);
146 }
147}
148
149bool
150Condition::doTimedWait(NonRecursiveMutex& mutex, const Timeout& timeout)
151{
153 int res;
154 NonRecursiveMutexLockState state;
155 mutex.conditionPreWait(state);
156 bool ret = false;
157
158 timespec ts;
159 TimeoutTimer timer(timeout);
160
161 res = pthread_cond_timedwait(&m_condition, state.pmutex, timer.asTimespec(ts));
162 res = check_timedwait(res, &m_condition, state.pmutex, &ts);
163 mutex.conditionPostWait(state);
164 assert(res == 0 || res == ETIMEDOUT || res == EINTR);
165 if (res == EINTR)
166 {
168 }
169 ret = res != ETIMEDOUT;
170 return ret;
171}
172#elif defined (BLOCXX_WIN32)
175 : m_condition(new ConditionInfo_t)
176{
177 m_condition->waitersCount = 0;
178 m_condition->wasBroadcast = false;
179 m_condition->queue = ::CreateSemaphore(
180 NULL, // No security
181 0, // initially 0
182 0x7fffffff, // max count
183 NULL); // Unnamed
184 ::InitializeCriticalSection(&m_condition->waitersCountLock);
185 m_condition->waitersDone = ::CreateEvent(
186 NULL, // No security
187 false, // auto-reset
188 false, // non-signaled initially
189 NULL); // Unnamed
190}
193{
194 ::CloseHandle(m_condition->queue);
195 ::DeleteCriticalSection(&m_condition->waitersCountLock);
196 ::CloseHandle(m_condition->waitersDone);
197 delete m_condition;
198}
200void
202{
203 ::EnterCriticalSection(&m_condition->waitersCountLock);
204 bool haveWaiters = m_condition->waitersCount > 0;
205 ::LeaveCriticalSection(&m_condition->waitersCountLock);
206
207 // If no threads waiting, then this is a no-op
208 if (haveWaiters)
209 {
210 ::ReleaseSemaphore(m_condition->queue, 1, 0);
211 }
212}
214void
216{
217 ::EnterCriticalSection(&m_condition->waitersCountLock);
218 bool haveWaiters = false;
219 if (m_condition->waitersCount > 0)
220 {
221 // It's gonna be a broadcast, even if there's only one waiting thread.
222 haveWaiters = m_condition->wasBroadcast = true;
223 }
224
225 if (haveWaiters)
226 {
227 // Wake up all the waiting threads atomically
228 ::ReleaseSemaphore(m_condition->queue, m_condition->waitersCount, 0);
229 ::LeaveCriticalSection(&m_condition->waitersCountLock);
230
231 // Wait for all the threads to acquire the counting semaphore
232 ::WaitForSingleObject(m_condition->waitersDone, INFINITE);
233 m_condition->wasBroadcast = false;
234 }
235 else
236 {
237 ::LeaveCriticalSection(&m_condition->waitersCountLock);
238 }
239}
241void
242Condition::doWait(NonRecursiveMutex& mutex)
243{
245}
247bool
248//Condition::doTimedWait(NonRecursiveMutex& mutex, UInt32 sTimeout, UInt32 usTimeout)
249Condition::doTimedWait(NonRecursiveMutex& mutex, const Timeout& timeout)
250{
252 bool cc = true;
253 NonRecursiveMutexLockState state;
254 mutex.conditionPreWait(state);
255
256 ::EnterCriticalSection(&m_condition->waitersCountLock);
257 m_condition->waitersCount++;
258 ::LeaveCriticalSection(&m_condition->waitersCountLock);
259
260 TimeoutTimer timer(timeout);
261 // Atomically release the mutex and wait on the
262 // queue until signal/broadcast.
263 if (::SignalObjectAndWait(mutex.m_mutex, m_condition->queue, timer.asDWORDMs(),
264 false) == WAIT_TIMEOUT)
265 {
266 cc = false;
267 }
268
269 ::EnterCriticalSection(&m_condition->waitersCountLock);
270 m_condition->waitersCount--;
271
272 // Check to see if we're the last waiter after the broadcast
273 bool isLastWaiter = (m_condition->wasBroadcast && m_condition->waitersCount == 0
274 && cc == true);
275
276 ::LeaveCriticalSection(&m_condition->waitersCountLock);
277
278 // If this is the last thread waiting for this broadcast, then let all the
279 // other threads proceed.
280 if (isLastWaiter)
281 {
282 // Atomically signal the waitersDone event and wait to acquire
283 // the external mutex. Enusres fairness
284 ::SignalObjectAndWait(m_condition->waitersDone, mutex.m_mutex,
285 INFINITE, false);
286 }
287 else
288 {
289 // Re-gain ownership of the external mutex
290 ::WaitForSingleObject(mutex.m_mutex, INFINITE);
291 }
292 mutex.conditionPostWait(state);
293 return cc;
294}
295#else
296#error "port me!"
297#endif
299void
301{
302 if (!lock.isLocked())
303 {
304 BLOCXX_THROW(ConditionLockException, "Lock must be locked");
305 }
306 doWait(*(lock.m_mutex));
307}
309bool
310Condition::timedWait(NonRecursiveMutexLock& lock, UInt32 sTimeout, UInt32 usTimeout)
311{
312 return timedWait(lock, Timeout::relative(sTimeout + static_cast<float>(usTimeout) / 1000000.0));
313}
314
316bool
318{
319 if (!lock.isLocked())
320 {
321 BLOCXX_THROW(ConditionLockException, "Lock must be locked");
322 }
323 return doTimedWait(*(lock.m_mutex), timeout);
324}
325
326} // end namespace BLOCXX_NAMESPACE
327
#define BLOCXX_DEFINE_EXCEPTION_WITH_ID(NAME)
Define a new exception class named <NAME>Exception that derives from Exception.
#define BLOCXX_THROW(exType, msg)
Throw an exception using FILE and LINE.
EParserState state
#define ETIMEDOUT
~Condition()
Destroy this Condition object.
void wait(NonRecursiveMutexLock &lock)
Atomically unlock a given mutex and wait for the this Condition object to get signalled.
void notifyAll()
Signal all threads that are currently waiting on the Condition object.
bool timedWait(NonRecursiveMutexLock &lock, const Timeout &timeout)
Atomically unlock a given mutex and wait for a given amount of time for this Condition object to get ...
Condition()
Construct a new Condition object.
bool doTimedWait(NonRecursiveMutex &mutex, const Timeout &timeout)
void notifyOne()
Signal one thread that is currently waiting on the Condition object through the wait or timedWait met...
void doWait(NonRecursiveMutex &mutex)
static DateTime getCurrent()
Gets a DateTime instance set to the current system time.
Note that descriptions of what exceptions may be thrown assumes that object is used correctly,...
A timeout can be absolute, which means that it will happen at the specified DateTime.
Definition Timeout.hpp:56
static Timeout relative(float seconds)
Definition Timeout.cpp:58
static Timeout infinite
Definition Timeout.hpp:62
BLOCXX_COMMON_API void testCancel()
Test if this thread has been cancelled.
Taken from RFC 1321.