📄 pth_high.c
字号:
if (rfds != NULL) memcpy(rfds, &rspare, sizeof(fd_set)); if (wfds != NULL) memcpy(wfds, &wspare, sizeof(fd_set)); if (efds != NULL) memcpy(efds, &espare, sizeof(fd_set)); return rc; } /* suspend current thread until one filedescriptor is ready or the timeout occurred */ rc = -1; ev = ev_select = pth_event(PTH_EVENT_SELECT|PTH_MODE_STATIC, &ev_key_select, &rc, nfd, rfds, wfds, efds); ev_timeout = NULL; if (timeout != NULL) { ev_timeout = pth_event(PTH_EVENT_TIME|PTH_MODE_STATIC, &ev_key_timeout, pth_timeout(timeout->tv_sec, timeout->tv_usec)); pth_event_concat(ev, ev_timeout, NULL); } if (ev_extra != NULL) pth_event_concat(ev, ev_extra, NULL); pth_wait(ev); if (ev_extra != NULL) pth_event_isolate(ev_extra); if (timeout != NULL) pth_event_isolate(ev_timeout); /* select return code semantics */ if (pth_event_status(ev_select) == PTH_STATUS_FAILED) return pth_error(-1, EBADF); selected = FALSE; if (pth_event_status(ev_select) == PTH_STATUS_OCCURRED) selected = TRUE; if ( timeout != NULL && pth_event_status(ev_timeout) == PTH_STATUS_OCCURRED) { selected = TRUE; /* POSIX.1-2001/SUSv3 compliance */ if (rfds != NULL) FD_ZERO(rfds); if (wfds != NULL) FD_ZERO(wfds); if (efds != NULL) FD_ZERO(efds); rc = 0; } if (ev_extra != NULL && !selected) return pth_error(-1, EINTR); return rc;}/* Pth variant of pth_pselect(2) */int pth_pselect(int nfds, fd_set *rfds, fd_set *wfds, fd_set *efds, const struct timespec *ts, const sigset_t *mask){ sigset_t omask; struct timeval tv; struct timeval *tvp; int rv; /* convert timeout */ if (ts != NULL) { tv.tv_sec = ts->tv_sec; tv.tv_usec = ts->tv_nsec / 1000; tvp = &tv; } else tvp = NULL; /* optionally set signal mask */ if (mask != NULL) if (pth_sc(sigprocmask)(SIG_SETMASK, mask, &omask) < 0) return pth_error(-1, errno); rv = pth_select(nfds, rfds, wfds, efds, tvp); /* optionally set signal mask */ if (mask != NULL) pth_shield { pth_sc(sigprocmask)(SIG_SETMASK, &omask, NULL); } return rv;}/* Pth variant of poll(2) */int pth_poll(struct pollfd *pfd, nfds_t nfd, int timeout){ return pth_poll_ev(pfd, nfd, timeout, NULL);}/* Pth variant of poll(2) with extra events: NOTICE: THIS HAS TO BE BASED ON pth_select(2) BECAUSE INTERNALLY THE SCHEDULER IS ONLY select(2) BASED!! */int pth_poll_ev(struct pollfd *pfd, nfds_t nfd, int timeout, pth_event_t ev_extra){ fd_set rfds, wfds, efds, xfds; struct timeval tv, *ptv; int maxfd, rc, n; unsigned int i; char data[64]; pth_implicit_init(); pth_debug2("pth_poll_ev: called from thread \"%s\"", pth_current->name); /* argument sanity checks */ if (pfd == NULL) return pth_error(-1, EFAULT); if (nfd < 0 || nfd > FD_SETSIZE) return pth_error(-1, EINVAL); /* convert timeout number into a timeval structure */ ptv = &tv; if (timeout == 0) { /* return immediately */ ptv->tv_sec = 0; ptv->tv_usec = 0; } else if (timeout == INFTIM /* (-1) */) { /* wait forever */ ptv = NULL; } else if (timeout > 0) { /* return after timeout */ ptv->tv_sec = (timeout / 1000); ptv->tv_usec = (timeout % 1000) * 1000; } else return pth_error(-1, EINVAL); /* create fd sets and determine max fd */ maxfd = -1; FD_ZERO(&rfds); FD_ZERO(&wfds); FD_ZERO(&efds); FD_ZERO(&xfds); for (i = 0; i < nfd; i++) { /* convert into fd_sets but remember that BSD select(2) says "the only exceptional condition detectable is out-of-band data received on a socket", hence we push POLLWRBAND events onto wfds instead of efds. Additionally, remember invalid filedescriptors in an extra fd_set xfds. */ if (!pth_util_fd_valid(pfd[i].fd)) { FD_SET(pfd[i].fd, &xfds); continue; } if (pfd[i].events & (POLLIN|POLLRDNORM)) FD_SET(pfd[i].fd, &rfds); if (pfd[i].events & (POLLOUT|POLLWRNORM|POLLWRBAND)) FD_SET(pfd[i].fd, &wfds); if (pfd[i].events & (POLLPRI|POLLRDBAND)) FD_SET(pfd[i].fd, &efds); if ( pfd[i].fd >= maxfd && (pfd[i].events & (POLLIN|POLLOUT|POLLPRI| POLLRDNORM|POLLRDBAND| POLLWRNORM|POLLWRBAND))) maxfd = pfd[i].fd; } /* examine fd sets with pth_select(3) */ rc = -1; if (maxfd != -1) { rc = pth_select_ev(maxfd+1, &rfds, &wfds, &efds, ptv, ev_extra); if (rc < 0) return pth_error(-1, errno); else if (rc == 0) return 0; } /* POSIX.1-2001/SUSv3 compliant result establishment */ n = 0; for (i = 0; i < nfd; i++) { pfd[i].revents = 0; if (FD_ISSET(pfd[i].fd, &xfds)) { if (pfd[i].fd >= 0) { pfd[i].revents |= POLLNVAL; n++; } continue; } if (maxfd == -1) continue; if (FD_ISSET(pfd[i].fd, &rfds)) { if (pfd[i].events & POLLIN) pfd[i].revents |= POLLIN; if (pfd[i].events & POLLRDNORM) pfd[i].revents |= POLLRDNORM; n++; /* support for POLLHUP */ if ( recv(pfd[i].fd, data, sizeof(data), MSG_PEEK) == -1 && ( errno == ESHUTDOWN || errno == ECONNRESET || errno == ECONNABORTED || errno == ENETRESET )) { pfd[i].revents &= ~(POLLIN); pfd[i].revents &= ~(POLLRDNORM); pfd[i].revents |= POLLHUP; } } else if (FD_ISSET(pfd[i].fd, &wfds)) { if (pfd[i].events & POLLOUT) pfd[i].revents |= POLLOUT; if (pfd[i].events & POLLWRNORM) pfd[i].revents |= POLLWRNORM; if (pfd[i].events & POLLWRBAND) pfd[i].revents |= POLLWRBAND; n++; } else if (FD_ISSET(pfd[i].fd, &efds)) { if (pfd[i].events & POLLPRI) pfd[i].revents |= POLLPRI; if (pfd[i].events & POLLRDBAND) pfd[i].revents |= POLLRDBAND; n++; } } return n;}/* Pth variant of connect(2) */int pth_connect(int s, const struct sockaddr *addr, socklen_t addrlen){ return pth_connect_ev(s, addr, addrlen, NULL);}/* Pth variant of connect(2) with extra events */int pth_connect_ev(int s, const struct sockaddr *addr, socklen_t addrlen, pth_event_t ev_extra){ pth_event_t ev; static pth_key_t ev_key = PTH_KEY_INIT; int rv, err; socklen_t errlen; int fdmode; pth_implicit_init(); pth_debug2("pth_connect_ev: enter from thread \"%s\"", pth_current->name); /* POSIX compliance */ if (!pth_util_fd_valid(s)) return pth_error(-1, EBADF); /* force filedescriptor into non-blocking mode */ if ((fdmode = pth_fdmode(s, PTH_FDMODE_NONBLOCK)) == PTH_FDMODE_ERROR) return pth_error(-1, EBADF); /* try to connect */ while ( (rv = pth_sc(connect)(s, (struct sockaddr *)addr, addrlen)) == -1 && errno == EINTR) ; /* restore filedescriptor mode */ pth_shield { pth_fdmode(s, fdmode); } /* if it is still on progress wait until socket is really writeable */ if (rv == -1 && errno == EINPROGRESS && fdmode != PTH_FDMODE_NONBLOCK) { if ((ev = pth_event(PTH_EVENT_FD|PTH_UNTIL_FD_WRITEABLE|PTH_MODE_STATIC, &ev_key, s)) == NULL) return pth_error(-1, errno); if (ev_extra != NULL) pth_event_concat(ev, ev_extra, NULL); pth_wait(ev); if (ev_extra != NULL) { pth_event_isolate(ev); if (pth_event_status(ev) != PTH_STATUS_OCCURRED) return pth_error(-1, EINTR); } errlen = sizeof(err); if (getsockopt(s, SOL_SOCKET, SO_ERROR, (void *)&err, &errlen) == -1) return -1; if (err == 0) return 0; return pth_error(rv, err); } pth_debug2("pth_connect_ev: leave to thread \"%s\"", pth_current->name); return rv;}/* Pth variant of accept(2) */int pth_accept(int s, struct sockaddr *addr, socklen_t *addrlen){ return pth_accept_ev(s, addr, addrlen, NULL);}/* Pth variant of accept(2) with extra events */int pth_accept_ev(int s, struct sockaddr *addr, socklen_t *addrlen, pth_event_t ev_extra){ pth_event_t ev; static pth_key_t ev_key = PTH_KEY_INIT; int fdmode; int rv; pth_implicit_init(); pth_debug2("pth_accept_ev: enter from thread \"%s\"", pth_current->name); /* POSIX compliance */ if (!pth_util_fd_valid(s)) return pth_error(-1, EBADF); /* force filedescriptor into non-blocking mode */ if ((fdmode = pth_fdmode(s, PTH_FDMODE_NONBLOCK)) == PTH_FDMODE_ERROR) return pth_error(-1, EBADF); /* poll socket via accept */ ev = NULL; while ((rv = pth_sc(accept)(s, addr, addrlen)) == -1 && (errno == EAGAIN || errno == EWOULDBLOCK) && fdmode != PTH_FDMODE_NONBLOCK) { /* do lazy event allocation */ if (ev == NULL) { if ((ev = pth_event(PTH_EVENT_FD|PTH_UNTIL_FD_READABLE|PTH_MODE_STATIC, &ev_key, s)) == NULL) return pth_error(-1, errno); if (ev_extra != NULL) pth_event_concat(ev, ev_extra, NULL); } /* wait until accept has a chance */ pth_wait(ev); /* check for the extra events */ if (ev_extra != NULL) { pth_event_isolate(ev); if (pth_event_status(ev) != PTH_STATUS_OCCURRED) { pth_fdmode(s, fdmode); return pth_error(-1, EINTR); } } } /* restore filedescriptor mode */ pth_shield { pth_fdmode(s, fdmode); if (rv != -1) pth_fdmode(rv, fdmode); } pth_debug2("pth_accept_ev: leave to thread \"%s\"", pth_current->name); return rv;}/* Pth variant of read(2) */ssize_t pth_read(int fd, void *buf, size_t nbytes){ return pth_read_ev(fd, buf, nbytes, NULL);}/* Pth variant of read(2) with extra event(s) */ssize_t pth_read_ev(int fd, void *buf, size_t nbytes, pth_event_t ev_extra){ struct timeval delay; pth_event_t ev; static pth_key_t ev_key = PTH_KEY_INIT; fd_set fds; int fdmode; int n; pth_implicit_init(); pth_debug2("pth_read_ev: enter from thread \"%s\"", pth_current->name); /* POSIX compliance */ if (nbytes == 0) return 0; if (!pth_util_fd_valid(fd)) return pth_error(-1, EBADF); /* check mode of filedescriptor */ if ((fdmode = pth_fdmode(fd, PTH_FDMODE_POLL)) == PTH_FDMODE_ERROR) return pth_error(-1, EBADF); /* poll filedescriptor if not already in non-blocking operation */ if (fdmode == PTH_FDMODE_BLOCK) { /* now directly poll filedescriptor for readability to avoid unneccessary (and resource consuming because of context switches, etc) event handling through the scheduler */ FD_ZERO(&fds); FD_SET(fd, &fds); delay.tv_sec = 0; delay.tv_usec = 0; while ((n = pth_sc(select)(fd+1, &fds, NULL, NULL, &delay)) < 0 && errno == EINTR) ; if (n < 0 && (errno == EINVAL || errno == EBADF)) return pth_error(-1, errno);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -