blocxx
WaitpidThreadFix.cpp
Go to the documentation of this file.
1/*******************************************************************************
2* Copyright (C) 2005, Quest Software, Inc. All rights reserved.
3*
4* Redistribution and use in source and binary forms, with or without
5* modification, are permitted provided that the following conditions are met:
6*
7* * Redistributions of source code must retain the above copyright notice,
8* this list of conditions and the following disclaimer.
9* * Redistributions in binary form must reproduce the above copyright
10* notice, this list of conditions and the following disclaimer in the
11* documentation and/or other materials provided with the distribution.
12* * Neither the name of the Network Associates, nor Quest Software, Inc., nor the
13* names of its contributors or employees may be used to endorse or promote
14* products derived from this software without specific prior written
15* permission.
16*
17* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27* POSSIBILITY OF SUCH DAMAGE.
28*******************************************************************************/
29
33
34#include "blocxx/Thread.hpp"
36#include "blocxx/Exec.hpp"
38#include "blocxx/ThreadOnce.hpp"
41#include "blocxx/Condition.hpp"
42#include "blocxx/Reference.hpp"
44#include <queue>
45#include <sys/types.h>
46#ifndef BLOCXX_WIN32
47#include <sys/wait.h>
48#endif
49
50using namespace blocxx;
51
52namespace BLOCXX_NAMESPACE
53{
54
55namespace
56{
57 bool g_useWaitpidThreadFix =
58#ifdef BLOCXX_WAITPID_THREADING_PROBLEM
59 true;
60#else
61 false;
62#endif
63
64 class ProcessThread;
65
66 OnceFlag g_initThreadGuard = BLOCXX_ONCE_INIT;
67 ProcessThread* g_processThread = 0;
68
69 void initThread();
70
71 Thread_t getWorkerThreadId();
72
73}
74
76{
77 bool rv = g_useWaitpidThreadFix;
78 g_useWaitpidThreadFix = enabled;
79 return rv;
80}
81
83{
84 if (!g_useWaitpidThreadFix)
85 {
86 return false;
87 }
88 Thread_t currThread = ThreadImpl::currentThread();
89 Thread_t workerThread = getWorkerThreadId();
90
91 // If we are already in the WaitpidThreadFix worker thread
92 // then we dont want to cause an infinite loop
93 if (ThreadImpl::sameThreads(currThread, workerThread))
94 {
95 return false;
96 }
97 return true;
98}
99
100namespace
101{
102 typedef Reference<Exception> ExceptionPtr;
103
104
105 class WorkSignal
106 {
107 public:
108 WorkSignal()
109 : m_signal(false)
110 {
111 }
112
113 ~WorkSignal()
114 {
115 }
116
117 void signal()
118 {
119 NonRecursiveMutexLock lock(m_mutex);
120 m_signal = true;
121 m_cond.notifyAll();
122 }
123
124 void waitForSignal()
125 {
126 NonRecursiveMutexLock lock(m_mutex);
127
128 while(!m_signal)
129 {
130 m_cond.wait(lock);
131 }
132 }
133
134 private:
135 bool m_signal;
136 Condition m_cond;
137 NonRecursiveMutex m_mutex;
138 };
139
140 //***************************************************************************
141 // - This base class represents the work to be performed by ControlledAccessThread
142 // - This class and all derived classes must be thread safe
143 class WorkItem : public IntrusiveCountableBase
144 {
145 public:
146 virtual ~WorkItem()
147 {
148 }
149
150 virtual void doWork() = 0;
151
152 void signalDone()
153 {
154 m_doneSig.signal();
155 }
156
157 void saveException(Exception* err)
158 {
159 NonRecursiveMutexLock lock(m_errMutex);
160 m_err = err;
161 }
162
163 Exception* getException()
164 {
165 NonRecursiveMutexLock lock(m_errMutex);
166 return m_err.getPtr();
167 }
168
169 protected:
170 ExceptionPtr m_err;
171 NonRecursiveMutex m_errMutex;
172 WorkSignal m_doneSig;
173 };
174
175
176 //***************************************************************************
177 class SpawnWorkItem : public WorkItem
178 {
179 public:
180 SpawnWorkItem(char const * execPath, char const * const argv[],
181 char const * const envp[], Exec::PreExec & preExec)
182 : m_execPath(execPath)
183 , m_argv(argv)
184 , m_envp(envp)
185 , m_preExec(preExec)
186 {
187 }
188
189 virtual ~SpawnWorkItem()
190 {
191 }
192
193 virtual void doWork()
194 {
195 NonRecursiveMutexLock lock(m_resultMutex);
196 m_result = Exec::spawnImpl(m_execPath, m_argv, m_envp, m_preExec);
197 }
198
199 ProcessRef waitTillDone()
200 {
201 m_doneSig.waitForSignal();
202
203 NonRecursiveMutexLock lock(m_resultMutex);
204 return m_result;
205 }
206
207 protected:
208 ProcessRef m_result;
209 NonRecursiveMutex m_resultMutex;
210
211 const char * m_execPath;
212 const char * const * m_argv;
213 const char * const * m_envp;
214 Exec::PreExec& m_preExec;
215 };
216
217
218 //***************************************************************************
219 class WaitpidWorkItem : public WorkItem
220 {
221 public:
222 WaitpidWorkItem(const ::pid_t& pid)
223 : m_pid(pid)
224 {
225 }
226
227 virtual ~WaitpidWorkItem()
228 {
229 }
230
231 virtual void doWork()
232 {
233 NonRecursiveMutexLock lock(m_resultMutex);
234 m_result = pollStatusImpl(m_pid);
235 }
236
237 Process::Status waitTillDone()
238 {
239 m_doneSig.waitForSignal();
240
241 NonRecursiveMutexLock lock(m_resultMutex);
242 return m_result;
243 }
244
245
246 protected:
247 Process::Status m_result;
248 NonRecursiveMutex m_resultMutex;
249
250 const ::pid_t& m_pid;
251 };
252
253 typedef IntrusiveReference<SpawnWorkItem> SpawnWorkItemPtr;
254 typedef IntrusiveReference<WaitpidWorkItem> WaitpidWorkItemPtr;
255
256 class WorkQueue
257 {
258 public:
259 WorkQueue() {}
260 virtual ~WorkQueue() {}
261
262 WorkItem* getWork()
263 {
264 NonRecursiveMutexLock lock(m_workMutex);
265
266 // Wait for some work to show up
267 // by checking the predicate in a loop
268 while(m_work.empty())
269 {
270 m_workNotEmpty.wait(lock);
271 }
272
273 WorkItem* newWork = m_work.front();
274 m_work.pop();
275
276 return newWork;
277 }
278
279 void addWork(WorkItem* newWork)
280 {
281 NonRecursiveMutexLock lock(m_workMutex);
282 m_work.push(newWork);
283 m_workNotEmpty.notifyAll();
284 }
285
286 private:
287 std::queue<WorkItem*> m_work;
288 Condition m_workNotEmpty;
289 NonRecursiveMutex m_workMutex;
290 };
291
292 //***************************************************************************
293 // This is the worker thread that launches processes and/or calls
294 // waitpid on them when BLOCXX_WAITPID_THREADING_PROBLEM is defined
295 //***************************************************************************
296 class ProcessThread : public Thread
297 {
298 public:
299 ProcessThread();
300 virtual ~ProcessThread();
301
302 virtual Int32 run();
303
305 char const * exec_path,
306 char const * const argv[],
307 char const * const envp[],
308 Exec::PreExec & pre_exec
309 );
310
311 Process::Status waitPid(const ProcId& pid);
312
313 protected:
314 WorkQueue m_workQueue;
315
316 NonRecursiveMutex m_idMutex;
317 };
318
319 ProcessThread::ProcessThread()
320 {
321 }
322
323 ProcessThread::~ProcessThread()
324 {
325 }
326
327 // This function will never exit until the process terminates itself.
328 Int32 ProcessThread::run()
329 {
330 // Infinite loop.
331 while(true)
332 {
333 WorkItem* newWork;
334 newWork = m_workQueue.getWork();
335
336 try
337 {
338 newWork->doWork();
339 }
340 catch(Exception& e)
341 {
342 newWork->saveException(e.clone());
343 }
344 newWork->signalDone();
345 }
346
347 // A return (never reached) to make various compilers happy.
348 return 0;
349 }
350
351 ProcessRef ProcessThread::spawn(char const * exec_path, char const * const argv[],
352 char const * const envp[], Exec::PreExec & pre_exec)
353 {
354 SpawnWorkItemPtr newWork(new SpawnWorkItem(exec_path, argv, envp, pre_exec));
355 m_workQueue.addWork(newWork.getPtr());
356
357 ProcessRef result = newWork->waitTillDone();
358
359 Exception* err = newWork->getException();
360 if(err != 0)
361 {
362 err->rethrow();
363 }
364
365 return result;
366 }
367
368 Process::Status ProcessThread::waitPid(const ProcId& pid)
369 {
370 WaitpidWorkItemPtr newWork(new WaitpidWorkItem(pid));
371 m_workQueue.addWork(newWork.getPtr());
372
373 Process::Status result = newWork->waitTillDone();
374
375 Exception* err = newWork->getException();
376 if(err != 0)
377 {
378 err->rethrow();
379 }
380
381 return result;
382 }
383
384
385 void initThread()
386 {
387 // create the worker thread
388 g_processThread = new ProcessThread();
389 g_processThread->start();
390 }
391
392 Thread_t getWorkerThreadId()
393 {
394 callOnce(g_initThreadGuard, initThread);
395 return g_processThread->getId();
396 }
397
398} // namespace (anon)
399
400
402 char const * const argv[], char const * const envp[], Exec::PreExec & pre_exec)
403{
404 callOnce(g_initThreadGuard, initThread);
405 return g_processThread->spawn(exec_path, argv, envp, pre_exec);
406}
407
409{
410 callOnce(g_initThreadGuard, initThread);
411 return g_processThread->waitPid(pid);
412}
413
414} //namespace BLOCXX_NAMESPACE
415
This class is used to specify what spawn() should do between fork and exec.
Definition Exec.hpp:106
Portable process status.
Definition Process.hpp:123
ProcessRef spawn(char const *exec_path, char const *const argv[], char const *const envp[], PreExec &pre_exec)
Run the executable exec_path in a child process, with argv for the program arguments and envp for the...
Definition Exec.cpp:133
bool sameThreads(const volatile Thread_t &handle1, const volatile Thread_t &handle2)
Check two platform dependant thread types for equality.
BLOCXX_COMMON_API ProcessRef spawnProcess(char const *exec_path, char const *const argv[], char const *const envp[], Exec::PreExec &pre_exec)
BLOCXX_COMMON_API Process::Status waitPid(const ProcId &pid)
BLOCXX_COMMON_API bool setWaitpidThreadFixEnabled(bool enabled)
If a program is single threaded (like the monitor code is), then this function can be called to ensur...
BLOCXX_COMMON_API bool shouldUseWaitpidThreadFix()
Taken from RFC 1321.
void BLOCXX_COMMON_API callOnce(OnceFlag &flag, FuncT F)
The first time callOnce is called with a given onceFlag argument, it calls func with no argument and ...
Process::Status pollStatusImpl(ProcId pid)
Definition Process.cpp:637
IntrusiveReference< Process > ProcessRef