blocxx
Select.cpp
Go to the documentation of this file.
1/*******************************************************************************
2* Copyright (C) 2005, Vintela, Inc. All rights reserved.
3* Copyright (C) 2006, Novell, Inc. All rights reserved.
4*
5* Redistribution and use in source and binary forms, with or without
6* modification, are permitted provided that the following conditions are met:
7*
8* * Redistributions of source code must retain the above copyright notice,
9* this list of conditions and the following disclaimer.
10* * Redistributions in binary form must reproduce the above copyright
11* notice, this list of conditions and the following disclaimer in the
12* documentation and/or other materials provided with the distribution.
13* * Neither the name of
14* Vintela, Inc.,
15* nor Novell, Inc.,
16* nor the names of its contributors or employees may be used to
17* endorse or promote products derived from this software without
18* specific prior written permission.
19*
20* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
23* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
24* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
25* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
26* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
27* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
28* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
29* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30* POSSIBILITY OF SUCH DAMAGE.
31*******************************************************************************/
32
33
38
39#include "blocxx/BLOCXX_config.h"
40#include "blocxx/Select.hpp"
41#include "blocxx/AutoPtr.hpp"
42#include "blocxx/Assertion.hpp"
43#include "blocxx/Thread.hpp" // for testCancel()
46
47#if defined(BLOCXX_WIN32)
48#include <cassert>
49#endif
50
51extern "C"
52{
53
54#ifndef BLOCXX_WIN32
55 #ifdef BLOCXX_HAVE_SYS_EPOLL_H
56 #include <sys/epoll.h>
57 #endif
58 #if defined (BLOCXX_HAVE_SYS_POLL_H)
59 #include <sys/poll.h>
60 #endif
61 #if defined (BLOCXX_HAVE_SYS_SELECT_H)
62 #include <sys/select.h>
63 #endif
64#endif
65
66#ifdef BLOCXX_HAVE_SYS_TIME_H
67 #include <sys/time.h>
68#endif
69
70#include <sys/types.h>
71
72#ifdef BLOCXX_HAVE_UNISTD_H
73 #include <unistd.h>
74#endif
75
76#include <errno.h>
77}
78
79namespace BLOCXX_NAMESPACE
80{
81
82namespace Select
83{
84
85namespace
86{
87 const float LOOP_TIMEOUT = 10.0;
88}
90// deprecated in 4.0.0
91int
92selectRW(SelectObjectArray& selarray, UInt32 ms)
93{
94 return selectRW(selarray, Timeout::relative(static_cast<float>(ms) * 1000));
95}
96
97#if defined(BLOCXX_WIN32)
99int
100selectRW(SelectObjectArray& selarray, const Timeout& timeout)
101{
102 int rc;
103 size_t hcount = static_cast<DWORD>(selarray.size());
104 AutoPtrVec<HANDLE> hdls(new HANDLE[hcount]);
105
106 size_t handleidx = 0;
107 for (size_t i = 0; i < selarray.size(); i++, handleidx++)
108 {
109 if(selarray[i].s.isSocket && selarray[i].s.networkevents)
110 {
111 ::WSAEventSelect(selarray[i].s.sockfd,
112 selarray[i].s.event, selarray[i].s.networkevents);
113 }
114
115 hdls[handleidx] = selarray[i].s.event;
116 }
117
118 TimeoutTimer timer(timeout);
119 timer.start();
120 DWORD cc = ::WaitForMultipleObjects(hcount, hdls.get(), FALSE, timer.asDWORDMs());
121
122 assert(cc != WAIT_ABANDONED);
123
124 switch (cc)
125 {
126 case WAIT_FAILED:
128 break;
129 case WAIT_TIMEOUT:
131 break;
132 default:
133 rc = cc - WAIT_OBJECT_0;
134
135 // If this is a socket, set it back to
136 // blocking mode
137 if(selarray[rc].s.isSocket)
138 {
139 if(selarray[rc].s.networkevents
140 && selarray[rc].s.doreset == false)
141 {
142 ::WSAEventSelect(selarray[rc].s.sockfd,
143 selarray[rc].s.event, selarray[rc].s.networkevents);
144 }
145 else
146 {
147 // Set socket back to blocking
148 ::WSAEventSelect(selarray[rc].s.sockfd,
149 selarray[rc].s.event, 0);
150 u_long ioctlarg = 0;
151 ::ioctlsocket(selarray[rc].s.sockfd, FIONBIO, &ioctlarg);
152 }
153 }
154 break;
155 }
156
157 if( rc < 0 )
158 return rc;
159
160 int availableCount = 0;
161 for (size_t i = 0; i < selarray.size(); i++)
162 {
163 if( WaitForSingleObject(selarray[i].s.event, 0) == WAIT_OBJECT_0 )
164 {
165 if( selarray[i].waitForRead )
166 selarray[i].readAvailable = true;
167 if( selarray[i].waitForWrite )
168 selarray[i].writeAvailable = true;
169 ++availableCount;
170 }
171 else
172 {
173 selarray[i].readAvailable = false;
174 selarray[i].writeAvailable = false;
175 }
176 }
177 return availableCount;
178}
179
180
181#else
182
184// epoll version
185int
186selectRWEpoll(SelectObjectArray& selarray, const Timeout& timeout)
187{
188#ifdef BLOCXX_HAVE_SYS_EPOLL_H
189 int ecc = 0;
190 AutoPtrVec<epoll_event> events(new epoll_event[selarray.size()]);
191 AutoDescriptor epfd(epoll_create(selarray.size()));
192 if(epfd.get() == -1)
193 {
194 if (errno == ENOSYS) // kernel doesn't support it
195 {
197 }
198 // Need to return something else?
200 }
201
202 UInt32 const read_events = EPOLLIN | EPOLLPRI | EPOLLERR | EPOLLHUP;
203 UInt32 const write_events = EPOLLOUT | EPOLLERR | EPOLLHUP;
204 for (size_t i = 0; i < selarray.size(); i++)
205 {
206 BLOCXX_ASSERT(selarray[i].s >= 0);
207 selarray[i].readAvailable = false;
208 selarray[i].writeAvailable = false;
209 selarray[i].wasError = false;
210 events[i].data = epoll_data_t(); // zero-init to make valgrind happy
211 events[i].data.u32 = i;
212 events[i].events = 0;
213 if(selarray[i].waitForRead)
214 {
215 events[i].events |= read_events;
216 }
217 if(selarray[i].waitForWrite)
218 {
219 events[i].events |= write_events;
220 }
221
222 if(epoll_ctl(epfd.get(), EPOLL_CTL_ADD, selarray[i].s, &events[i]) != 0)
223 {
224 return errno == EPERM ? SELECT_NOT_IMPLEMENTED : SELECT_ERROR;
225 }
226 }
227
228 // here we spin checking for thread cancellation every so often.
229
230 TimeoutTimer timer(timeout);
231 timer.start();
232 int savedErrno;
233 do
234 {
236 const float maxWaitSec = LOOP_TIMEOUT;
237 ecc = epoll_wait(epfd.get(), events.get(), selarray.size(), timer.asIntMs(maxWaitSec));
238 savedErrno = errno;
239 if (ecc < 0 && errno == EINTR)
240 {
241 ecc = 0;
242 errno = 0;
244 }
245 timer.loop();
246 } while ((ecc == 0) && !timer.expired());
247
248 if (ecc < 0)
249 {
250 errno = savedErrno;
252 }
253 if (ecc == 0)
254 {
256 }
257
258 for(int i = 0; i < ecc; i++)
259 {
260 SelectObject & so = selarray[events[i].data.u32];
261 so.readAvailable = so.waitForRead && (events[i].events & read_events);
262 so.writeAvailable = so.waitForWrite && (events[i].events & write_events);
263 }
264
265 return ecc;
266#else
268#endif
269}
270
272// poll() version
273int
274selectRWPoll(SelectObjectArray& selarray, const Timeout& timeout)
275{
276#if defined (BLOCXX_HAVE_SYS_POLL_H)
277 int rc = 0;
278
279 AutoPtrVec<pollfd> pfds(new pollfd[selarray.size()]);
280
281 // here we spin checking for thread cancellation every so often.
282 TimeoutTimer timer(timeout);
283 timer.start();
284
285 int savedErrno;
286 do
287 {
288 for (size_t i = 0; i < selarray.size(); i++)
289 {
290 BLOCXX_ASSERT(selarray[i].s >= 0);
291 selarray[i].readAvailable = false;
292 selarray[i].writeAvailable = false;
293 selarray[i].wasError = false;
294 pfds[i].revents = 0;
295 pfds[i].fd = selarray[i].s;
296 pfds[i].events = selarray[i].waitForRead ? (POLLIN | POLLPRI) : 0;
297 if(selarray[i].waitForWrite)
298 pfds[i].events |= POLLOUT;
299 }
300
302 const float maxWaitSec = LOOP_TIMEOUT;
303 rc = ::poll(pfds.get(), selarray.size(), timer.asIntMs(maxWaitSec));
304 savedErrno = errno;
305 if (rc < 0 && errno == EINTR)
306 {
307 rc = 0;
308 errno = 0;
310#ifdef BLOCXX_NETWARE
311 // When the NetWare server is shutting down, select will
312 // set errno to EINTR on return. If this thread does not
313 // yield control (cooperative multitasking) then we end
314 // up in a very tight loop and get a CPUHog server abbend.
315 pthread_yield();
316#endif
317 }
318
319 timer.loop();
320 } while ((rc == 0) && !timer.expired());
321
322 if (rc < 0)
323 {
324 errno = savedErrno;
326 }
327 if (rc == 0)
328 {
330 }
331 for (size_t i = 0; i < selarray.size(); i++)
332 {
333 if (pfds[i].revents & (POLLERR | POLLNVAL))
334 {
335 selarray[i].wasError = true;
336 }
337
338 if(selarray[i].waitForRead)
339 {
340 selarray[i].readAvailable = (pfds[i].revents &
341 (POLLIN | POLLPRI | POLLHUP));
342 }
343
344 if(selarray[i].waitForWrite)
345 {
346 selarray[i].writeAvailable = (pfds[i].revents &
347 (POLLOUT | POLLHUP));
348 }
349 }
350
351 return rc;
352#else
354#endif
355}
356
357// ::select() version
358int
359selectRWSelect(SelectObjectArray& selarray, const Timeout& timeout)
360{
361#if defined (BLOCXX_HAVE_SYS_SELECT_H)
362 int rc = 0;
363 fd_set ifds;
364 fd_set ofds;
365
366 // here we spin checking for thread cancellation every so often.
367 TimeoutTimer timer(timeout);
368 timer.start();
369
370 int savedErrno;
371 do
372 {
373 int maxfd = 0;
374 FD_ZERO(&ifds);
375 FD_ZERO(&ofds);
376 for (size_t i = 0; i < selarray.size(); ++i)
377 {
378 int fd = selarray[i].s;
379 BLOCXX_ASSERT(fd >= 0);
380 if (maxfd < fd)
381 {
382 maxfd = fd;
383 }
384 if (fd < 0 || fd >= FD_SETSIZE)
385 {
386 errno = EINVAL;
388 }
389 if (selarray[i].waitForRead)
390 {
391 FD_SET(fd, &ifds);
392 }
393 if (selarray[i].waitForWrite)
394 {
395 FD_SET(fd, &ofds);
396 }
397 }
398
400 struct timeval tv;
401 const float maxWaitSec = LOOP_TIMEOUT;
402 rc = ::select(maxfd+1, &ifds, &ofds, NULL, timer.asTimeval(tv, maxWaitSec));
403 savedErrno = errno;
404 if (rc < 0 && errno == EINTR)
405 {
406 rc = 0;
407 errno = 0;
409#ifdef BLOCXX_NETWARE
410 // When the NetWare server is shutting down, select will
411 // set errno to EINTR on return. If this thread does not
412 // yield control (cooperative multitasking) then we end
413 // up in a very tight loop and get a CPUHog server abbend.
414 pthread_yield();
415#endif
416 }
417
418 timer.loop();
419 } while ((rc == 0) && !timer.expired());
420
421 if (rc < 0)
422 {
423 errno = savedErrno;
425 }
426 if (rc == 0)
427 {
429 }
430 int availableCount = 0;
431 int cval;
432 for (size_t i = 0; i < selarray.size(); i++)
433 {
434 selarray[i].wasError = false;
435 cval = 0;
436 if (FD_ISSET(selarray[i].s, &ifds))
437 {
438 selarray[i].readAvailable = true;
439 cval = 1;
440 }
441 else
442 {
443 selarray[i].readAvailable = false;
444 }
445
446 if (FD_ISSET(selarray[i].s, &ofds))
447 {
448 selarray[i].writeAvailable = true;
449 cval = 1;
450 }
451 else
452 {
453 selarray[i].writeAvailable = false;
454 }
455
456 availableCount += cval;
457
458 }
459
460 return availableCount;
461#else
463#endif
464}
465
466int
467selectRW(SelectObjectArray& selarray, const Timeout& timeout)
468{
469 int rv = selectRWEpoll(selarray, timeout);
470 if (rv != SELECT_NOT_IMPLEMENTED)
471 {
472 return rv;
473 }
474
475 rv = selectRWPoll(selarray, timeout);
476 if (rv != SELECT_NOT_IMPLEMENTED)
477 {
478 return rv;
479 }
480
481 rv = selectRWSelect(selarray, timeout);
483 return rv;
484}
485
487#endif // #else BLOCXX_WIN32
488
489int
490select(const SelectTypeArray& selarray, UInt32 ms)
491{
492 return select(selarray, Timeout::relative(static_cast<float>(ms) * 1000.0));
493}
494
496int
497select(const SelectTypeArray& selarray, const Timeout& timeout)
498{
500 soa.reserve(selarray.size());
501 for (size_t i = 0; i < selarray.size(); ++i)
502 {
503 SelectObject curObj(selarray[i]);
504 curObj.waitForRead = true;
505 soa.push_back(curObj);
506 }
507 int rv = selectRW(soa, timeout);
508 if (rv < 0)
509 {
510 return rv;
511 }
512
513 // find the first selected object
514 for (size_t i = 0; i < soa.size(); ++i)
515 {
516 if (soa[i].readAvailable)
517 {
518 return i;
519 }
520 }
521 errno = 0;
522 return SELECT_ERROR;
523}
524
525} // end namespace Select
526
527} // end namespace BLOCXX_NAMESPACE
528
#define BLOCXX_ASSERT(CON)
BLOCXX_ASSERT works similar to the assert() macro, but instead of calling abort(),...
Definition Assertion.hpp:57
void push_back(const T &x)
Append an element to the end of the Array.
size_type size() const
void reserve(size_type n)
Ensure the capacity is at least the size of a given value.
The AutoPtrVec class provides a simple class for smart pointers to a dynamically allocated array of o...
Definition AutoPtr.hpp:185
handle_type get() const
Return handle of resource, retaining ownership.
static void testCancel()
Test if this thread has been cancelled.
Definition Thread.cpp:432
A timeout can be absolute, which means that it will happen at the specified DateTime.
Definition Timeout.hpp:56
static Timeout relative(float seconds)
Definition Timeout.cpp:58
A TimeoutTimer is used by an algorithm to determine when a timeout has expired.
bool expired() const
Indicates whether the last loop time has exceeded the timeout.
void start()
Meant to be called by timeout functions which loop.
void loop()
Meant to be called by timeout functions which loop, but don't want to reset the interval.
int selectRWSelect(SelectObjectArray &selarray, const Timeout &timeout)
Definition Select.cpp:359
Array< SelectObject > SelectObjectArray
Definition Select.hpp:113
int selectRWPoll(SelectObjectArray &selarray, const Timeout &timeout)
Definition Select.cpp:274
int select(const SelectTypeArray &selarray, UInt32 ms)
Select returns as soon as input is available on any of Select_t objects that are in given array.
Definition Select.cpp:490
const int SELECT_TIMEOUT
The value returned from select when the timeout value has expired.
Definition Select.hpp:59
const int SELECT_NOT_IMPLEMENTED
Used internally, but listed here to prevent conflicts.
Definition Select.hpp:67
int selectRW(SelectObjectArray &selarray, UInt32 ms)
Definition Select.cpp:92
int selectRWEpoll(SelectObjectArray &selarray, const Timeout &timeout)
Definition Select.cpp:186
const int SELECT_ERROR
The value returned from select when any error occurs other than timeout.
Definition Select.hpp:63
Taken from RFC 1321.
Array< Select_t > SelectTypeArray
Definition Select.hpp:53
AutoResource< AutoDescriptorPolicy > AutoDescriptor
An analog of std::auto_ptr for descriptors.
bool waitForRead
Input parameter. Set it to true to indicate that waiting for read availability on s is desired.
Definition Select.hpp:103
bool writeAvailable
Ouput parameter. Will be set to true to indicate that s has become available for writing.
Definition Select.hpp:109
bool waitForWrite
Input parameter. Set it to true to indicate that waiting for write availability on s is desired.
Definition Select.hpp:105
bool readAvailable
Ouput parameter. Will be set to true to indicate that s has become available for reading.
Definition Select.hpp:107