📄 ncbi_namedpipe.cpp
字号:
DWORD x_timeout = TimeoutToMSec(timeout); DWORD bytes_written = 0; // Wait a data from the pipe with timeout. // NOTE: The function WaitForSingleObject() do not work with pipe. do { if (!WriteFile(m_Pipe, (char*)buf, count, &bytes_written, NULL)) { if ( n_written ) { *n_written = bytes_written; } throw string("Failed to write data into the named pipe"); } if ( bytes_written ) { break; } DWORD x_sleep = kSleepTime; if (x_timeout != INFINITE) { if (x_timeout < kSleepTime) { x_sleep = x_timeout; } x_timeout -= x_sleep; } SleepMilliSec(x_sleep); } while (x_timeout == INFINITE || x_timeout); if ( !bytes_written ) { return eIO_Timeout; } if ( n_written ) { *n_written = bytes_written; } status = eIO_Success; } catch (string& what) { ERR_POST(s_FormatErrorMessage("Write", what)); } m_WriteStatus = status; return status;}EIO_Status CNamedPipeHandle::Wait(EIO_Event, const STimeout*){ return eIO_Success;}EIO_Status CNamedPipeHandle::Status(EIO_Event direction) const{ switch ( direction ) { case eIO_Read: return m_ReadStatus; case eIO_Write: return m_WriteStatus; default: // Should never get here assert(0); break; } return eIO_InvalidArg;}// Convert STimeout value to number of millisecondslong CNamedPipeHandle::TimeoutToMSec(const STimeout* timeout) const{ return timeout ? timeout->sec * 1000 + timeout->usec / 1000 : INFINITE;}#elif defined(NCBI_OS_UNIX)////////////////////////////////////////////////////////////////////////////////// CNamedPipeHandle -- Unix version//// The maximum length the queue of pending connections may grow toconst int kListenQueueSize = 32;class CNamedPipeHandle{public: CNamedPipeHandle(void); ~CNamedPipeHandle(void); // client-side EIO_Status Open(const string& pipename, const STimeout* timeout, size_t pipebufsize); // server-side EIO_Status Create(const string& pipename, size_t pipebufsize); EIO_Status Listen(const STimeout* timeout); EIO_Status Disconnect(void); // common EIO_Status Close(void); EIO_Status Read (void* buf, size_t count, size_t* n_read, const STimeout* timeout); EIO_Status Write(const void* buf, size_t count, size_t* n_written, const STimeout* timeout); EIO_Status Wait(EIO_Event event, const STimeout* timeout); EIO_Status Status(EIO_Event direction) const;private: // Close socket persistently bool x_CloseSocket(int sock); // Set socket i/o buffer size (dir: SO_SNDBUF, SO_RCVBUF) bool x_SetSocketBufSize(int sock, size_t pipebufsize, int dir);private: int m_LSocket; // listening socket SOCK m_IoSocket; // I/O socket size_t m_PipeBufSize; // pipe buffer size};CNamedPipeHandle::CNamedPipeHandle(void) : m_LSocket(-1), m_IoSocket(0), m_PipeBufSize(0){ return;}CNamedPipeHandle::~CNamedPipeHandle(void){ Close();}EIO_Status CNamedPipeHandle::Open(const string& pipename, const STimeout* timeout, size_t pipebufsize){ EIO_Status status = eIO_Unknown; struct sockaddr_un addr; int sock = -1; try { if (m_LSocket >= 0 || m_IoSocket) { throw string("Pipe is already open"); } if (sizeof(addr.sun_path) <= pipename.length()) { status = eIO_InvalidArg; throw "Pipe name too long: \"" + pipename + '"'; } m_PipeBufSize = pipebufsize; // Create a UNIX socket if ((sock = socket(PF_UNIX, SOCK_STREAM, 0)) < 0) { throw string("UNIX socket() failed: ") + strerror(errno); } // Set buffer size if ( m_PipeBufSize ) { if ( !x_SetSocketBufSize(sock, m_PipeBufSize, SO_SNDBUF) || !x_SetSocketBufSize(sock, m_PipeBufSize, SO_RCVBUF) ) { throw string("UNIX socket set buffer size failed: ") + strerror(errno); } } // Set non-blocking mode if (fcntl(sock, F_SETFL, fcntl(sock, F_GETFL, 0) | O_NONBLOCK) == -1) { throw string("UNIX socket set to non-blocking failed: ") + strerror(errno); } // Connect to server memset(&addr, 0, sizeof(addr)); addr.sun_family = AF_UNIX;#ifdef HAVE_SIN_LEN addr.sun_len = (socklen_t) sizeof(addr);#endif strcpy(addr.sun_path, pipename.c_str()); int n; int x_errno = 0; // Auto-resume if interrupted by a signal for (n = 0; ; n = 1) { if (connect(sock, (struct sockaddr*) &addr, sizeof(addr)) == 0) { break; } x_errno = errno; if (x_errno != EINTR) { break; } } // If not connected if ( x_errno ) { if ((n != 0 || x_errno != EINPROGRESS) && (n == 0 || x_errno != EALREADY) && x_errno != EWOULDBLOCK) { if (x_errno == EINTR) { status = eIO_Interrupt; } throw "UNIX socket connect(\"" + pipename + "\") failed: " + strerror(x_errno); } if (!timeout || timeout->sec || timeout->usec) { // Wait for socket to connect (if timeout is set or infinite) for (;;) { // Auto-resume if interrupted by a signal struct timeval* tmp; struct timeval tm; if ( !timeout ) { // NB: Timeout has been normalized already tm.tv_sec = timeout->sec; tm.tv_usec = timeout->usec; tmp = &tm; } else tmp = 0; fd_set wfds; fd_set efds; FD_ZERO(&wfds); FD_ZERO(&efds); FD_SET(sock, &wfds); FD_SET(sock, &efds); n = select(sock + 1, 0, &wfds, &efds, tmp); if (n == 0) { x_CloseSocket(sock); Close(); return eIO_Timeout; } if (n > 0) { if ( FD_ISSET(sock, &wfds) ) { break; } assert( FD_ISSET(sock, &efds) ); } if (n < 0 && errno == EINTR) { continue; } throw string("UNIX socket select() failed: ") + strerror(errno); } // Check connection x_errno = 0; socklen_t x_len = (socklen_t) sizeof(x_errno); if ((getsockopt(sock, SOL_SOCKET, SO_ERROR, &x_errno, &x_len) != 0 || x_errno != 0)) { throw string("UNIX socket getsockopt() failed: ") + strerror(x_errno ? x_errno : errno); } if (x_errno == ECONNREFUSED) { status = eIO_Closed; throw "Connection refused in \"" + pipename + '"'; } } } // Create I/O socket if (SOCK_CreateOnTop(&sock, sizeof(sock), &m_IoSocket) != eIO_Success){ throw string("UNIX socket cannot convert to SOCK"); } } catch (string& what) { if (sock >= 0) { x_CloseSocket(sock); } Close(); ERR_POST(s_FormatErrorMessage("Open", what)); return status; } return eIO_Success;}EIO_Status CNamedPipeHandle::Create(const string& pipename, size_t pipebufsize){ EIO_Status status = eIO_Unknown; struct sockaddr_un addr; try { if (m_LSocket >= 0 || m_IoSocket) { throw string("Pipe is already open"); } if (sizeof(addr.sun_path) <= pipename.length()) { status = eIO_InvalidArg; throw "Pipe name too long: \"" + pipename + '"'; } m_PipeBufSize = pipebufsize; // Create a UNIX socket if ((m_LSocket = socket(PF_UNIX, SOCK_STREAM, 0)) < 0) { throw string("UNIX socket() failed: ") + strerror(errno); } // Remove any pre-existing socket (or other file) if (unlink(pipename.c_str()) != 0 && errno != ENOENT) { throw "UNIX socket unlink(\"" + pipename + "\") failed: " + strerror(errno); } // Bind socket memset(&addr, 0, sizeof(addr)); addr.sun_family = AF_UNIX;#ifdef HAVE_SIN_LEN addr.sun_len = (socklen_t) sizeof(addr);#endif strcpy(addr.sun_path, pipename.c_str()); mode_t u = umask(0); if (bind(m_LSocket, (struct sockaddr*) &addr, sizeof(addr)) != 0) { umask(u); throw "UNIX socket bind(\"" + pipename + "\") failed: " + strerror(errno); } umask(u);#ifndef NCBI_OS_IRIX fchmod(m_LSocket, S_IRWXU | S_IRWXG | S_IRWXO);#endif // Listen for connections on a socket if (listen(m_LSocket, kListenQueueSize) != 0) { throw "UNIX socket listen(\"" + pipename + "\") failed: " + strerror(errno); } if (fcntl(m_LSocket, F_SETFL, fcntl(m_LSocket, F_GETFL, 0) | O_NONBLOCK) == -1) { throw string("UNIX socket set to non-blocking failed: ") + strerror(errno); } } catch (string& what) { Close(); ERR_POST(s_FormatErrorMessage("Create", what)); return status; } return eIO_Success;}EIO_Status CNamedPipeHandle::Listen(const STimeout* timeout){ EIO_Status status = eIO_Unknown; int sock = -1; try { if (m_LSocket < 0) { status = eIO_Closed; throw string("Pipe is closed"); } // Wait for the client to connect for (;;) { // Auto-resume if interrupted by a signal struct timeval* tmp; struct timeval tm; if (timeout) { // NB: Timeout has been normalized already tm.tv_sec = timeout->sec; tm.tv_usec = timeout->usec; tmp = &tm; } else { tmp = 0; } fd_set rfds; fd_set efds; FD_ZERO(&rfds); FD_ZERO(&efds); FD_SET(m_LSocket, &rfds); FD_SET(m_LSocket, &efds); int n = select(m_LSocket + 1, &rfds, 0, &efds, tmp); if (n == 0) { return eIO_Timeout; } if (n > 0) { if ( FD_ISSET(m_LSocket, &rfds) ) break; assert( FD_ISSET(m_LSocket, &efds) ); } if (n < 0 && errno == EINTR) { continue; } throw string("UNIX socket select() failed: ") + strerror(errno); } // Can accept next connection from the list of waiting ones struct sockaddr_un addr; socklen_t addrlen = (socklen_t) sizeof(addr); memset(&addr, 0, sizeof(addr));# ifdef HAVE_SIN_LEN addr.sun_len = sizeof(addr);# endif if ((sock = accept(m_LSocket, (struct sockaddr*)&addr, &addrlen)) < 0){ throw string("UNIX socket accept() failed: ") + strerror(errno); } // Set buffer size if ( m_PipeBufSize ) { if ( !x_SetSocketBufSize(sock, m_PipeBufSize, SO_SNDBUF) || !x_SetSocketBufSize(sock, m_PipeBufSize, SO_RCVBUF) ) { throw string("UNIX socket set buffer size failed: ") + strerror(errno); } } // Create new I/O socket if (SOCK_CreateOnTop(&sock, sizeof(sock), &m_IoSocket) != eIO_Success){ throw string("UNIX socket cannot convert to SOCK"); } } catch (string& what) { if (sock >= 0) { x_CloseSocket(sock); } Close(); ERR_POST(s_FormatErrorMessage("Listen", what)); return status; } return eIO_Success;}EIO_Status CNamedPipeHandle::Disconnect(void){ if ( !m_IoSocket ) { return eIO_Closed; } // Close I/O socket EIO_Status status = SOCK_Close(m_IoSocket); m_IoSocket = 0; return status;}EIO_Status CNamedPipeHandle::Close(void){ // Disconnect current client EIO_Status status = Disconnect(); // Close listening socket
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -