📄 ptio.c
字号:
} else wrp = NULL; rv = select(op->arg1.osfd + 1, rdp, wrp, NULL, &tv); if (self->state & PT_THREAD_ABORTED) { self->state &= ~PT_THREAD_ABORTED; op->result.code = -1; op->syserrno = EINTR; op->status = pt_continuation_done; return; } if ((-1 == rv) && ((errno == EINTR) || (errno == EAGAIN))) continue; /* go around the loop again */ if (rv > 0) { PRInt16 revents = 0; if ((op->event & POLLIN) && FD_ISSET(op->arg1.osfd, &rd)) revents |= POLLIN; if ((op->event & POLLOUT) && FD_ISSET(op->arg1.osfd, &wr)) revents |= POLLOUT; if (op->function(op, revents)) op->status = pt_continuation_done; } else if (rv == -1) { op->result.code = -1; op->syserrno = errno; op->status = pt_continuation_done; } /* else, select timed out */ } while (pt_continuation_done != op->status); break; default: now = epoch = PR_IntervalNow(); remaining = op->timeout; do { PRIntn rv; if (op->event & POLLIN) { FD_ZERO(&rd); FD_SET(op->arg1.osfd, &rd); rdp = &rd; } else rdp = NULL; if (op->event & POLLOUT) { FD_ZERO(&wr); FD_SET(op->arg1.osfd, &wr); wrp = ≀ } else wrp = NULL; wait_for_remaining = PR_TRUE; msecs = (PRInt32)PR_IntervalToMilliseconds(remaining); if (msecs > PT_DEFAULT_POLL_MSEC) { wait_for_remaining = PR_FALSE; msecs = PT_DEFAULT_POLL_MSEC; } tv.tv_sec = msecs/PR_MSEC_PER_SEC; tv.tv_usec = (msecs % PR_MSEC_PER_SEC) * PR_USEC_PER_MSEC; rv = select(op->arg1.osfd + 1, rdp, wrp, NULL, &tv); if (self->state & PT_THREAD_ABORTED) { self->state &= ~PT_THREAD_ABORTED; op->result.code = -1; op->syserrno = EINTR; op->status = pt_continuation_done; return; } if (rv > 0) { PRInt16 revents = 0; if ((op->event & POLLIN) && FD_ISSET(op->arg1.osfd, &rd)) revents |= POLLIN; if ((op->event & POLLOUT) && FD_ISSET(op->arg1.osfd, &wr)) revents |= POLLOUT; if (op->function(op, revents)) op->status = pt_continuation_done; } else if ((rv == 0) || ((errno == EINTR) || (errno == EAGAIN))) { if (rv == 0) { /* select timed out */ if (wait_for_remaining) now += remaining; else now += PR_MillisecondsToInterval(msecs); } else now = PR_IntervalNow(); elapsed = (PRIntervalTime) (now - epoch); if (elapsed >= op->timeout) { op->result.code = -1; op->syserrno = ETIMEDOUT; op->status = pt_continuation_done; } else remaining = op->timeout - elapsed; } else { op->result.code = -1; op->syserrno = errno; op->status = pt_continuation_done; } } while (pt_continuation_done != op->status); break; }} /* pt_poll_now_with_select */#endif /* _PR_POLL_WITH_SELECT */static void pt_poll_now(pt_Continuation *op){ PRInt32 msecs; PRIntervalTime epoch, now, elapsed, remaining; PRBool wait_for_remaining; PRThread *self = PR_GetCurrentThread(); PR_ASSERT(PR_INTERVAL_NO_WAIT != op->timeout);#if defined (_PR_POLL_WITH_SELECT) /* * If the fd is small enough call the select-based poll operation */ if (op->arg1.osfd < FD_SETSIZE) { pt_poll_now_with_select(op); return; }#endif switch (op->timeout) { case PR_INTERVAL_NO_TIMEOUT: msecs = PT_DEFAULT_POLL_MSEC; do { PRIntn rv; struct pollfd tmp_pfd; tmp_pfd.revents = 0; tmp_pfd.fd = op->arg1.osfd; tmp_pfd.events = op->event; rv = poll(&tmp_pfd, 1, msecs); if (self->state & PT_THREAD_ABORTED) { self->state &= ~PT_THREAD_ABORTED; op->result.code = -1; op->syserrno = EINTR; op->status = pt_continuation_done; return; } if ((-1 == rv) && ((errno == EINTR) || (errno == EAGAIN))) continue; /* go around the loop again */ if (rv > 0) { PRInt16 events = tmp_pfd.events; PRInt16 revents = tmp_pfd.revents; if ((revents & POLLNVAL) /* busted in all cases */ || ((events & POLLOUT) && (revents & POLLHUP))) /* write op & hup */ { op->result.code = -1; if (POLLNVAL & revents) op->syserrno = EBADF; else if (POLLHUP & revents) op->syserrno = EPIPE; op->status = pt_continuation_done; } else { if (op->function(op, revents)) op->status = pt_continuation_done; } } else if (rv == -1) { op->result.code = -1; op->syserrno = errno; op->status = pt_continuation_done; } /* else, poll timed out */ } while (pt_continuation_done != op->status); break; default: now = epoch = PR_IntervalNow(); remaining = op->timeout; do { PRIntn rv; struct pollfd tmp_pfd; tmp_pfd.revents = 0; tmp_pfd.fd = op->arg1.osfd; tmp_pfd.events = op->event; wait_for_remaining = PR_TRUE; msecs = (PRInt32)PR_IntervalToMilliseconds(remaining); if (msecs > PT_DEFAULT_POLL_MSEC) { wait_for_remaining = PR_FALSE; msecs = PT_DEFAULT_POLL_MSEC; } rv = poll(&tmp_pfd, 1, msecs); if (self->state & PT_THREAD_ABORTED) { self->state &= ~PT_THREAD_ABORTED; op->result.code = -1; op->syserrno = EINTR; op->status = pt_continuation_done; return; } if (rv > 0) { PRInt16 events = tmp_pfd.events; PRInt16 revents = tmp_pfd.revents; if ((revents & POLLNVAL) /* busted in all cases */ || ((events & POLLOUT) && (revents & POLLHUP))) /* write op & hup */ { op->result.code = -1; if (POLLNVAL & revents) op->syserrno = EBADF; else if (POLLHUP & revents) op->syserrno = EPIPE; op->status = pt_continuation_done; } else { if (op->function(op, revents)) { op->status = pt_continuation_done; } } } else if ((rv == 0) || ((errno == EINTR) || (errno == EAGAIN))) { if (rv == 0) /* poll timed out */ { if (wait_for_remaining) now += remaining; else now += PR_MillisecondsToInterval(msecs); } else now = PR_IntervalNow(); elapsed = (PRIntervalTime) (now - epoch); if (elapsed >= op->timeout) { op->result.code = -1; op->syserrno = ETIMEDOUT; op->status = pt_continuation_done; } else remaining = op->timeout - elapsed; } else { op->result.code = -1; op->syserrno = errno; op->status = pt_continuation_done; } } while (pt_continuation_done != op->status); break; }} /* pt_poll_now */static PRIntn pt_Continue(pt_Continuation *op){ op->status = pt_continuation_pending; /* set default value */ /* * let each thread call poll directly */ pt_poll_now(op); PR_ASSERT(pt_continuation_done == op->status); return op->result.code;} /* pt_Continue *//*****************************************************************************//*********************** specific continuation functions *********************//*****************************************************************************/static PRBool pt_connect_cont(pt_Continuation *op, PRInt16 revents){ op->syserrno = _MD_unix_get_nonblocking_connect_error(op->arg1.osfd); if (op->syserrno != 0) { op->result.code = -1; } else { op->result.code = 0; } return PR_TRUE; /* this one is cooked */} /* pt_connect_cont */static PRBool pt_accept_cont(pt_Continuation *op, PRInt16 revents){ op->syserrno = 0; op->result.code = accept( op->arg1.osfd, op->arg2.buffer, op->arg3.addr_len); if (-1 == op->result.code) { op->syserrno = errno; if (EWOULDBLOCK == errno || EAGAIN == errno || ECONNABORTED == errno) return PR_FALSE; /* do nothing - this one ain't finished */ } return PR_TRUE;} /* pt_accept_cont */static PRBool pt_read_cont(pt_Continuation *op, PRInt16 revents){ /* * Any number of bytes will complete the operation. It need * not (and probably will not) satisfy the request. The only * error we continue is EWOULDBLOCK|EAGAIN. */ op->result.code = read( op->arg1.osfd, op->arg2.buffer, op->arg3.amount); op->syserrno = errno; return ((-1 == op->result.code) && (EWOULDBLOCK == op->syserrno || EAGAIN == op->syserrno)) ? PR_FALSE : PR_TRUE;} /* pt_read_cont */static PRBool pt_recv_cont(pt_Continuation *op, PRInt16 revents){ /* * Any number of bytes will complete the operation. It need * not (and probably will not) satisfy the request. The only * error we continue is EWOULDBLOCK|EAGAIN. */#if defined(SOLARIS) if (0 == op->arg4.flags) op->result.code = read( op->arg1.osfd, op->arg2.buffer, op->arg3.amount); else op->result.code = recv( op->arg1.osfd, op->arg2.buffer, op->arg3.amount, op->arg4.flags);#else op->result.code = recv( op->arg1.osfd, op->arg2.buffer, op->arg3.amount, op->arg4.flags);#endif op->syserrno = errno; return ((-1 == op->result.code) && (EWOULDBLOCK == op->syserrno || EAGAIN == op->syserrno)) ? PR_FALSE : PR_TRUE;} /* pt_recv_cont */static PRBool pt_send_cont(pt_Continuation *op, PRInt16 revents){ PRIntn bytes;#if defined(SOLARIS) PRInt32 tmp_amount = op->arg3.amount;#endif /* * We want to write the entire amount out, no matter how many * tries it takes. Keep advancing the buffer and the decrementing * the amount until the amount goes away. Return the total bytes * (which should be the original amount) when finished (or an * error). */#if defined(SOLARIS)retry: bytes = write(op->arg1.osfd, op->arg2.buffer, tmp_amount);#else bytes = send( op->arg1.osfd, op->arg2.buffer, op->arg3.amount, op->arg4.flags);#endif op->syserrno = errno;#if defined(SOLARIS) /* * The write system call has been reported to return the ERANGE error * on occasion. Try to write in smaller chunks to workaround this bug. */ if ((bytes == -1) && (op->syserrno == ERANGE)) { if (tmp_amount > 1) { tmp_amount = tmp_amount/2; /* half the bytes */ goto retry; } }#endif if (bytes >= 0) /* this is progress */ { char *bp = (char*)op->arg2.buffer; bp += bytes; /* adjust the buffer pointer */ op->arg2.buffer = bp; op->result.code += bytes; /* accumulate the number sent */ op->arg3.amount -= bytes; /* and reduce the required count */ return (0 == op->arg3.amount) ? PR_TRUE : PR_FALSE; } else if ((EWOULDBLOCK != op->syserrno) && (EAGAIN != op->syserrno)) { op->result.code = -1; return PR_TRUE; } else return PR_FALSE;} /* pt_send_cont */static PRBool pt_write_cont(pt_Continuation *op, PRInt16 revents){ PRIntn bytes; /* * We want to write the entire amount out, no matter how many * tries it takes. Keep advancing the buffer and the decrementing * the amount until the amount goes away. Return the total bytes * (which should be the original amount) when finished (or an * error). */ bytes = write(op->arg1.osfd, op->arg2.buffer, op->arg3.amount); op->syserrno = errno; if (bytes >= 0) /* this is progress */ { char *bp = (char*)op->arg2.buffer; bp += bytes; /* adjust the buffer pointer */ op->arg2.buffer = bp; op->result.code += bytes; /* accumulate the number sent */ op->arg3.amount -= bytes; /* and reduce the required count */ return (0 == op->arg3.amount) ? PR_TRUE : PR_FALSE; } else if ((EWOULDBLOCK != op->syserrno) && (EAGAIN != op->syserrno)) { op->result.code = -1; return PR_TRUE; } else return PR_FALSE;} /* pt_write_cont */static PRBool pt_writev_cont(pt_Continuation *op, PRInt16 revents){ PRIntn bytes; struct iovec *iov = (struct iovec*)op->arg2.buffer; /* * Same rules as write, but continuing seems to be a bit more * complicated. As the number of bytes sent grows, we have to * redefine the vector we're pointing at. We might have to * modify an individual vector parms or we might have to eliminate * a pair altogether. */ bytes = writev(op->arg1.osfd, iov, op->arg3.amount); op->syserrno = errno; if (bytes >= 0) /* this is progress */ { PRIntn iov_index; op->result.code += bytes; /* accumulate the number sent */ for (iov_index = 0; iov_index < op->arg3.amount; ++iov_index) { /* how much progress did we make in the i/o vector? */ if (bytes < iov[iov_index].iov_len) { /* this element's not done yet */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -