📄 ncbi_pipe.cpp
字号:
}EIO_Status CPipeHandle::Write(const void* buf, size_t count, size_t* n_written, const STimeout* timeout){ EIO_Status status = eIO_Unknown; try { if (m_ProcHandle == INVALID_HANDLE_VALUE) { status = eIO_Closed; throw string("Pipe is closed"); } if (m_ChildStdIn == INVALID_HANDLE_VALUE) { throw string("Pipe I/O handle is closed"); } if ( !count ) { return eIO_Success; } DWORD x_timeout = x_TimeoutToMSec(timeout); DWORD bytes_written = 0; // Wait for data from the pipe with timeout. // NOTE: The function WaitForSingleObject() does not work with pipes. do { if ( !WriteFile(m_ChildStdIn, (char*)buf, count, &bytes_written, NULL) ) { if ( n_written ) { *n_written = bytes_written; } throw string("Failed to write data into pipe"); } if ( bytes_written ) { break; } DWORD x_sleep = kSleepTime; if (x_timeout != INFINITE) { if (x_timeout < kSleepTime) { x_sleep = x_timeout; } x_timeout -= x_sleep; } SleepMilliSec(x_sleep); } while (x_timeout == INFINITE || x_timeout); if ( !bytes_written ) { return eIO_Timeout; } if ( n_written ) { *n_written = bytes_written; } status = eIO_Success; } catch (string& what) { ERR_POST(s_FormatErrorMessage("Write", what)); } return status;}void CPipeHandle::x_Clear(void){ if (m_ChildStdIn != INVALID_HANDLE_VALUE) { ::CloseHandle(m_ChildStdIn); m_ChildStdIn = INVALID_HANDLE_VALUE; } if (m_ChildStdOut != INVALID_HANDLE_VALUE) { ::CloseHandle(m_ChildStdOut); m_ChildStdOut = INVALID_HANDLE_VALUE; } if (m_ChildStdErr != INVALID_HANDLE_VALUE) { ::CloseHandle(m_ChildStdErr); m_ChildStdErr = INVALID_HANDLE_VALUE; } m_ProcHandle = INVALID_HANDLE_VALUE; m_Pid = 0;}HANDLE CPipeHandle::x_GetHandle(CPipe::EChildIOHandle from_handle) const{ switch (from_handle) { case CPipe::eStdIn: return m_ChildStdIn; case CPipe::eStdOut: return m_ChildStdOut; case CPipe::eStdErr: return m_ChildStdErr; } return INVALID_HANDLE_VALUE;}long CPipeHandle::x_TimeoutToMSec(const STimeout* timeout) const{ return timeout ? (timeout->sec * 1000) + (timeout->usec / 1000) : INFINITE;}bool CPipeHandle::x_SetNonBlockingMode(HANDLE fd, bool nonblock) const{ // Pipe is in the byte-mode. // NOTE: We cannot get a state of a pipe handle opened for writing. // We cannot set a state of a pipe handle opened for reading. DWORD state = nonblock ? PIPE_READMODE_BYTE | PIPE_NOWAIT : PIPE_READMODE_BYTE; return SetNamedPipeHandleState(fd, &state, NULL, NULL) != 0; }#elif defined(NCBI_OS_UNIX)////////////////////////////////////////////////////////////////////////////////// CPipeHandle -- Unix version//class CPipeHandle{public: CPipeHandle(); ~CPipeHandle(); EIO_Status Open(const string& cmd, const vector<string>& args, CPipe::TCreateFlags create_flags); EIO_Status Close(int* exitcode, const STimeout* timeout); EIO_Status CloseHandle (CPipe::EChildIOHandle handle); EIO_Status Read(void* buf, size_t count, size_t* read, const CPipe::EChildIOHandle from_handle, const STimeout* timeout); EIO_Status Write(const void* buf, size_t count, size_t* written, const STimeout* timeout); TProcessHandle GetProcessHandle(void) const { return m_Pid; };private: // Clear object state. void x_Clear(void); // Get child's I/O handle. int x_GetHandle(CPipe::EChildIOHandle from_handle) const; // Trigger blocking mode on specified I/O handle. bool x_SetNonBlockingMode(int fd, bool nonblock = true) const; // Wait on the file descriptor I/O. // Return eIO_Success when "fd" is found ready for the I/O. // Return eIO_Timeout, if timeout expired before I/O became available. // Throw an exception on error. EIO_Status x_Wait(int fd, EIO_Event direction, const STimeout* timeout) const;private: // I/O handles for child process. int m_ChildStdIn; int m_ChildStdOut; int m_ChildStdErr; // Child process pid. TPid m_Pid; // Pipe flags CPipe::TCreateFlags m_Flags;};CPipeHandle::CPipeHandle() : m_ChildStdIn(-1), m_ChildStdOut(-1), m_ChildStdErr(-1), m_Pid((pid_t)(-1)), m_Flags(0){ return;}CPipeHandle::~CPipeHandle(){ static const STimeout kZeroTimeout = {0, 0}; Close(0, &kZeroTimeout); x_Clear();}EIO_Status CPipeHandle::Open(const string& cmd, const vector<string>& args, CPipe::TCreateFlags create_flags){ x_Clear(); bool need_delete_handles = false; int fd_pipe_in[2], fd_pipe_out[2], fd_pipe_err[2]; m_Flags = create_flags; // Child process I/O handles fd_pipe_in[0] = -1; fd_pipe_out[1] = -1; fd_pipe_err[1] = -1; try { if (m_Pid != (pid_t)(-1)) { throw string("Pipe is already open"); } need_delete_handles = true; // Create pipe for child's stdin assert(CPipe::fStdIn_Close); if ( !IS_SET(create_flags, CPipe::fStdIn_Close) ) { if (pipe(fd_pipe_in) == -1) { throw string("Failed to create pipe for stdin"); } m_ChildStdIn = fd_pipe_in[1]; x_SetNonBlockingMode(m_ChildStdIn); } // Create pipe for child's stdout assert(CPipe::fStdOut_Close); if ( !IS_SET(create_flags, CPipe::fStdOut_Close) ) { if (pipe(fd_pipe_out) == -1) { throw string("Failed to create pipe for stdout"); } fflush(stdout); m_ChildStdOut = fd_pipe_out[0]; x_SetNonBlockingMode(m_ChildStdOut); } // Create pipe for child's stderr assert(CPipe::fStdErr_Open); if ( IS_SET(create_flags, CPipe::fStdErr_Open) ) { if (pipe(fd_pipe_err) == -1) { throw string("Failed to create pipe for stderr"); } fflush(stderr); m_ChildStdErr = fd_pipe_err[0]; x_SetNonBlockingMode(m_ChildStdErr); } // Fork child process switch (m_Pid = fork()) { case (pid_t)(-1): // Fork failed throw string("Failed to fork process"); case 0: // Now we are in the child process int status = -1; // Bind child's standard I/O file handles to pipe assert(CPipe::fStdIn_Close); if ( !IS_SET(create_flags, CPipe::fStdIn_Close) ) { if (dup2(fd_pipe_in[0], STDIN_FILENO) < 0) { _exit(status); } close(fd_pipe_in[0]); close(fd_pipe_in[1]); } else { fclose(stdin); } assert(CPipe::fStdOut_Close); if ( !IS_SET(create_flags, CPipe::fStdOut_Close) ) { if (dup2(fd_pipe_out[1], STDOUT_FILENO) < 0) { _exit(status); } close(fd_pipe_out[0]); close(fd_pipe_out[1]); } else { fclose(stdout); } assert(!CPipe::fStdErr_Close); if ( IS_SET(create_flags, CPipe::fStdErr_Open) ) { if (dup2(fd_pipe_err[1], STDERR_FILENO) < 0) { _exit(status); } close(fd_pipe_err[0]); close(fd_pipe_err[1]); } else { fclose(stderr); } // Prepare program arguments size_t cnt = args.size(); size_t i = 0; const char** x_args = new const char*[cnt + 2]; typedef ArrayDeleter<const char*> TArgsDeleter; AutoPtr<const char*, TArgsDeleter> p_args = x_args; ITERATE (vector<string>, arg, args) { x_args[++i] = arg->c_str(); } x_args[0] = cmd.c_str(); x_args[cnt + 1] = 0; // Execute the program status = execvp(cmd.c_str(), const_cast<char**> (x_args)); _exit(status); } // Close unused pipe handles assert(CPipe::fStdIn_Close); if ( !IS_SET(create_flags, CPipe::fStdIn_Close) ) { close(fd_pipe_in[0]); } assert(CPipe::fStdOut_Close); if ( !IS_SET(create_flags, CPipe::fStdOut_Close) ) { close(fd_pipe_out[1]); } assert(!CPipe::fStdErr_Close); if ( IS_SET(create_flags, CPipe::fStdErr_Open) ) { close(fd_pipe_err[1]); } return eIO_Success; } catch (string& what) { if ( need_delete_handles ) { close(fd_pipe_in[0]); close(fd_pipe_out[1]); close(fd_pipe_err[1]); } // Close all opened file descriptors (close timeout doesn't apply here) Close(0, 0); ERR_POST(s_FormatErrorMessage("Open", what)); return eIO_Unknown; }}EIO_Status CPipeHandle::Close(int* exitcode, const STimeout* timeout){ EIO_Status status = eIO_Unknown; unsigned long x_timeout = 1; int x_options = 0; int x_exitcode = -1; if (m_Pid == (pid_t)(-1)) { status = eIO_Closed; } else { // If timeout is not infinite if ( timeout ) { x_timeout = (timeout->sec * 1000) + (timeout->usec / 1000); x_options = WNOHANG; } // Retry if interrupted by signal for (;;) { pid_t ws = waitpid(m_Pid, &x_exitcode, x_options); if (ws > 0) { // Process has terminated status = eIO_Success; break; } else if (ws == 0) { // Process is still running assert(timeout); if ( !x_timeout ) { status = eIO_Timeout; break; } unsigned long x_sleep = kSleepTime; if (x_timeout < kSleepTime) { x_sleep = x_timeout; } x_timeout -= x_sleep; SleepMilliSec(x_sleep); } else { // Some error if (errno != EINTR) { break; } } } } // Is the process running? if (status == eIO_Timeout) { x_exitcode = -1; assert(CPipe::fKillOnClose); if ( IS_SET(m_Flags, CPipe::fKillOnClose) ) { status = CProcess(m_Pid, CProcess::ePid).Kill() ? eIO_Success : eIO_Unknown; } // Active by default. assert(!CPipe::fCloseOnClose); if ( !IS_SET(m_Flags, CPipe::fCloseOnClose) ) { status = eIO_Success; } // fKeepOnClose -- nothing to do. assert(CPipe::fKeepOnClose); } // Is the process still running? Nothing to do. if (status != eIO_Timeout) { x_Clear(); } if ( exitcode ) { // Get real exit code or -1 on error *exitcode = (status == eIO_Success && x_exitcode != -1) ? WEXITSTATUS(x_exitcode) : -1; } return status;}EIO_Status CPipeHandle::CloseHandle(CPipe::EChildIOHandle handle){ switch ( handle ) { case CPipe::eStdIn: if (m_ChildStdIn == -1) { return eIO_Closed; } close(m_ChildStdIn); m_ChildStdIn = -1; break; case CPipe::eStdOut: if (m_ChildStdOut == -1) { return eIO_Closed; } close(m_ChildStdOut); m_ChildStdOut = -1; break; case CPipe::eStdErr: if (m_ChildStdErr == -1) { return eIO_Closed; } close(m_ChildStdErr); m_ChildStdErr = -1; break; default: return eIO_InvalidArg; } return eIO_Success;}EIO_Status CPipeHandle::Read(void* buf, size_t count, size_t* n_read, const CPipe::EChildIOHandle from_handle, const STimeout* timeout){ EIO_Status status = eIO_Unknown; try { if (m_Pid == (pid_t)(-1)) { status = eIO_Closed; throw string("Pipe is closed"); } int fd = x_GetHandle(from_handle); if (fd == -1) { throw string("Pipe I/O handle is closed"); } if ( !count ) { return eIO_Success; } // Retry if either blocked or interrupted for (;;) { // Try to read ssize_t bytes_read = read(fd, buf, count); if (bytes_read >= 0) { if ( n_read ) { *n_read = (size_t)bytes_read; } status = bytes_read ? eIO_Success : eIO_Closed; break; } // Blocked -- wait for data to come; exit if timeout/error if (errno == EAGAIN || errno == EWOULDBLOCK) { status = x_Wait(fd, eIO_Read, timeout); if (status != eIO_Success) { break; } continue; } // Interrupted read -- restart if (errno != EINTR) { throw string("Failed to read data from pipe"); } } } catch (string& what) { ERR_POST(s_FormatErrorMessage("Read", what)); } return status;}EIO_Status CPipeHandle::Write(const void* buf, size_t count, size_t* n_written, const STimeout* timeout){ EIO_Status status = eIO_Unknown; try { if (m_Pid == (pid_t)(-1)) { status = eIO_Closed; throw string("Pipe is closed"); } if (m_ChildStdIn == -1) { throw string("Pipe I/O handle is closed"); } if ( !count ) { return eIO_Success;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -