39#include "blocxx/BLOCXX_config.h"
41#if !defined(BLOCXX_WIN32)
56#ifdef BLOCXX_HAVE_UNISTD_H
59#include <sys/socket.h>
66#if defined(BLOCXX_DARWIN)
70#include <sys/utsname.h>
85 }
while (rc < 0 && errno == EINTR);
90 BLOCXX_LOG_ERROR(lgr, Format(
"Closing pipe handle %1 failed: %2", fd, lerrno));
95 ::ssize_t upread(
int fd,
void * buf, std::size_t count)
101 rv = ::read(fd, buf, count);
102 }
while (rv < 0 && errno == EINTR);
106 ::ssize_t upwrite(
int fd,
void const * buf, std::size_t count)
110 SignalScope ss(SIGPIPE, SIG_IGN);
114 rv = ::write(fd, buf, count);
115 }
while (rv < 0 && errno == EINTR);
119 int upaccept(
int s,
struct sockaddr * addr,
socklen_t * addrlen)
124 rv = ::accept(s, addr, addrlen);
125 }
while (rv < 0 && errno == EINTR);
130 E_WRITE_PIPE, E_READ_PIPE
135 void setKernelBufferSize(
Descriptor sockfd,
int bufsz, EDirection edir)
142 int optname = (edir == E_WRITE_PIPE ? SO_SNDBUF : SO_RCVBUF);
145 socklen_t getbufsz_len =
sizeof(getbufsz);
148 int errc = ::getsockopt(sockfd, SOL_SOCKET, optname, (
char*)&getbufsz, &getbufsz_len);
150 int errc = ::getsockopt(sockfd, SOL_SOCKET, optname, &getbufsz, &getbufsz_len);
152 if (errc == 0 && getbufsz < bufsz)
155 ::setsockopt(sockfd, SOL_SOCKET, optname, (
char*)&bufsz,
sizeof(bufsz));
157 ::setsockopt(sockfd, SOL_SOCKET, optname, &bufsz,
sizeof(bufsz));
164 int const BUFSZ = 64 * 1024;
165 setKernelBufferSize(sockfd_read, BUFSZ, E_READ_PIPE);
166 setKernelBufferSize(sockfd_write, BUFSZ, E_WRITE_PIPE);
171#if defined(BLOCXX_DARWIN)
174 bool needDescriptorPassingWorkaround =
true;
177 OnceFlag detectDescriptorPassingBugFlag = BLOCXX_ONCE_INIT;
180 void detectDescriptorPassingBug()
183 needDescriptorPassingWorkaround =
true;
187 struct utsname unamerv;
188 if (::uname(&unamerv) == -1)
190 needDescriptorPassingWorkaround =
true;
193 String release(unamerv.release);
194 PosixRegEx re(
"([^.]*)\\..*");
196 if (releaseCapture.size() < 2)
198 needDescriptorPassingWorkaround =
true;
201 String majorRelease = releaseCapture[1];
204 needDescriptorPassingWorkaround = (majorRelease.toInt32() < 9);
206 catch (StringConversionException& e)
208 needDescriptorPassingWorkaround =
true;
223 AcceptThread(
int serversock)
224 : m_serversock(serversock)
229 void acceptConnection();
230 int getConnectFD() {
return m_serverconn; }
237AcceptThread::acceptConnection()
239 struct sockaddr_in sin;
244 ::setsockopt(m_serversock, IPPROTO_TCP, 1,
245 (
char*) &tmp,
sizeof(
int));
247 val =
sizeof(
struct sockaddr_in);
248 if ((m_serverconn = upaccept(m_serversock, (
struct sockaddr*)&sin, &val))
254 ::setsockopt(m_serverconn, IPPROTO_TCP, 1,
255 (
char *) &tmp,
sizeof(
int));
257 ::setsockopt(m_serverconn, SOL_SOCKET, SO_KEEPALIVE,
258 (
char*) &tmp,
sizeof(
int));
262runConnClass(
void* arg)
264 AcceptThread* acceptThread = (AcceptThread*)(arg);
265 acceptThread->acceptConnection();
266 ::pthread_exit(NULL);
273 int svrfd, lerrno, connectfd;
275 struct sockaddr_in sin;
277 svrfd = socket( AF_INET, SOCK_STREAM, 0 );
278 sin.sin_family = AF_INET;
279 sin.sin_addr.s_addr = htonl( 0x7f000001 );
281 memset(sin.sin_zero, 0, 8 );
282 if (bind(svrfd, (
struct sockaddr * )&sin,
sizeof(
struct sockaddr_in ) ) == -1)
286 fprintf(stderr,
"CreateSocket(): Failed to bind on socket" );
289 if (listen(svrfd, 1) == -1)
295 val =
sizeof(
struct sockaddr_in);
296 if (getsockname(svrfd, (
struct sockaddr * )&sin, &val ) == -1)
299 fprintf(stderr,
"CreateSocket(): Failed to obtain socket name" );
304 AcceptThread* pat =
new AcceptThread(svrfd);
308 pthread_create(&athread, NULL, runConnClass, pat);
310 int clientfd = socket(AF_INET, SOCK_STREAM, 0);
318 struct sockaddr_in csin;
319 csin.sin_family = AF_INET;
320 csin.sin_addr.s_addr = htonl(0x7f000001);
321 csin.sin_port = sin.sin_port;
322 if (::connect(clientfd, (
struct sockaddr*)&csin,
sizeof(csin)) == -1)
333 ::setsockopt(clientfd, IPPROTO_TCP, TCP_NODELAY, (
char*)&tmp,
sizeof(
int));
335 ::setsockopt(clientfd, SOL_SOCKET, SO_KEEPALIVE, (
char*)&tmp,
sizeof(
int));
339 ::pthread_join(athread, &threadResult);
342 fds[0] = pat->getConnectFD();
384 void set_desc_blocking(
385 int d, EBlockingMode & bmflag, EBlockingMode blocking_mode)
388 int fdflags = fcntl(d, F_GETFL, 0);
395 fdflags &= ~O_NONBLOCK;
399 fdflags |= O_NONBLOCK;
401 if (fcntl(d, F_SETFL, fdflags) == -1)
405 bmflag = blocking_mode;
414 for (
size_t i = 0;
i < 2; ++
i)
442#if defined(BLOCXX_NETWARE)
450 if (::socketpair(AF_UNIX, SOCK_STREAM, 0,
m_fds) == -1)
455 ::shutdown(
m_fds[0], SHUT_WR);
456 ::shutdown(
m_fds[1], SHUT_RD);
475 rc = upclose(
m_fds[0]);
481 rc = upclose(
m_fds[1]);
503 rc = upclose(
m_fds[0]);
518 rc = upclose(
m_fds[1]);
550 rc = upwrite(
m_fds[1], data, dataLen);
591 rc = upread(
m_fds[0], buffer, bufferLen);
647 rc = blocxx::passDescriptor(
m_fds[1], descriptor);
653#if defined(BLOCXX_DARWIN)
654 callOnce(detectDescriptorPassingBugFlag, detectDescriptorPassingBug);
655 if (rc != -1 && needDescriptorPassingWorkaround)
713 descriptor = blocxx::receiveDescriptor(
m_fds[0]);
715#if defined(BLOCXX_DARWIN)
716 callOnce(detectDescriptorPassingBugFlag, detectDescriptorPassingBug);
717 if (needDescriptorPassingWorkaround)
#define BLOCXX_ASSERT(CON)
BLOCXX_ASSERT works similar to the assert() macro, but instead of calling abort(),...
#define BLOCXX_THROW(exType, msg)
Throw an exception using FILE and LINE.
#define BLOCXX_THROW_ERRNO_MSG(exType, msg)
Throw an exception using FILE, LINE, errno and strerror(errno)
#define BLOCXX_GLOBAL_STRING_INIT(str)
#define BLOCXX_LOG_ERROR(logger, message)
Log message to logger with the Error level.
#define BLOCXX_INVALID_HANDLE
PURPOSE: The AutoResource class template is an analog of std::auto_ptr for managing arbitrary resourc...
handle_type get() const
Return handle of resource, retaining ownership.
handle_type release()
Relinquish ownership of resource and return its handle.
virtual int write(const void *dataOut, int dataOutLen, ErrorAction errorAsException=E_RETURN_ON_ERROR)=0
Write a specified number of bytes to the device that is exposing the IOIFC interface.
virtual int read(void *dataIn, int dataInLen, ErrorAction errorAsException=E_RETURN_ON_ERROR)=0
Read a specified number of bytes from the device that is exposing the IOIFC interface.
virtual int read(void *buffer, int bufferLen, ErrorAction errorAsException=E_RETURN_ON_ERROR)
Read a specified number of bytes from the device that is exposing the IOIFC interface.
virtual Select_t getWriteSelectObj() const
Get the write select object.
virtual void setBlocking(EBlockingMode outputIsBlocking=E_BLOCKING)
Set the pipe's blocking mode.
virtual void setReadBlocking(EBlockingMode isBlocking=E_BLOCKING)
Set blocking mode for reading from pipe.
virtual void setWriteBlocking(EBlockingMode isBlocking=E_BLOCKING)
Set blocking mode for writing to pipe.
virtual ~PosixUnnamedPipe()
virtual int close()
Close the pipe.
virtual EBlockingMode getReadBlocking() const
Get the current blocking mode for reading from pipe.
virtual bool isOpen() const
Is the pipe open or closed?
virtual EBlockingMode getWriteBlocking() const
Get the current blocking mode for writing from pipe.
virtual int write(const void *data, int dataLen, ErrorAction errorAsException=E_RETURN_ON_ERROR)
Write a specified number of bytes to the device that is exposing the IOIFC interface.
virtual Select_t getReadSelectObj() const
Get the read select object.
virtual AutoDescriptor receiveDescriptor(const UnnamedPipeRef &ackPipe)
Gets a Descriptor from the peer.
virtual int closeOutputHandle()
virtual Descriptor getOutputDescriptor() const
Get the underlying output descriptor.
virtual Descriptor getInputDescriptor() const
Get the underlying input descriptor.
virtual void open()
Open the pipe.
virtual int closeInputHandle()
virtual void passDescriptor(Descriptor h, const UnnamedPipeRef &ackPipe=0, const ProcessRef &targetProcess=0)
Sends a Descriptor to the peer.
EBlockingMode m_blocking[2]
PosixUnnamedPipe(EOpen doOpen=E_OPEN)
static void testCancel()
Test if this thread has been cancelled.
static Timeout relative(float seconds)
void setTimeouts(const Timeout &timeout)
Sets the read & write timeout values.
virtual Descriptor getInputDescriptor() const =0
Get the underlying input descriptor.
Timeout getReadTimeout()
Gets the read timeout value.
virtual Descriptor getOutputDescriptor() const =0
Get the underlying output descriptor.
Timeout getWriteTimeout()
Gets the write timeout value.
GlobalString COMPONENT_NAME
int waitForIO(SocketHandle_t fd, int timeOutSecs, SocketFlags::EWaitDirectionFlag waitFlag)
Wait for input or output on a socket.
void BLOCXX_COMMON_API callOnce(OnceFlag &flag, FuncT F)
The first time callOnce is called with a given onceFlag argument, it calls func with no argument and ...
LazyGlobal< String, char const *const > GlobalString
Array< String > StringArray
class BLOCXX_COMMON_API Logger