📄 event.c
字号:
_ST_POLL_OSFD_CNT += npds; return 0;}void _st_poll_pollset_del(struct pollfd *pds, int npds){ _ST_POLL_OSFD_CNT -= npds; ST_ASSERT(_ST_POLL_OSFD_CNT >= 0);}void _st_poll_dispatch(void){ int timeout, nfd; _st_clist_t *q; st_utime_t min_timeout; _st_pollq_t *pq; struct pollfd *pds, *epds, *pollfds; /* * Build up the array of struct pollfd to wait on. * If existing array is not big enough, release it and allocate a new one. */ ST_ASSERT(_ST_POLL_OSFD_CNT >= 0); if (_ST_POLL_OSFD_CNT > _ST_POLLFDS_SIZE) { free(_ST_POLLFDS); _ST_POLLFDS = (struct pollfd *) malloc((_ST_POLL_OSFD_CNT + 10) * sizeof(struct pollfd)); ST_ASSERT(_ST_POLLFDS != NULL); _ST_POLLFDS_SIZE = _ST_POLL_OSFD_CNT + 10; } pollfds = _ST_POLLFDS; /* Gather all descriptors into one array */ for (q = _ST_IOQ.next; q != &_ST_IOQ; q = q->next) { pq = _ST_POLLQUEUE_PTR(q); memcpy(pollfds, pq->pds, sizeof(struct pollfd) * pq->npds); pollfds += pq->npds; } ST_ASSERT(pollfds <= _ST_POLLFDS + _ST_POLLFDS_SIZE); if (_ST_SLEEPQ == NULL) { timeout = -1; } else { min_timeout = (_ST_SLEEPQ->due <= _ST_LAST_CLOCK) ? 0 : (_ST_SLEEPQ->due - _ST_LAST_CLOCK); timeout = (int) (min_timeout / 1000); } /* Check for I/O operations */ nfd = poll(_ST_POLLFDS, _ST_POLL_OSFD_CNT, timeout); /* Notify threads that are associated with the selected descriptors */ if (nfd > 0) { pollfds = _ST_POLLFDS; for (q = _ST_IOQ.next; q != &_ST_IOQ; q = q->next) { pq = _ST_POLLQUEUE_PTR(q); epds = pollfds + pq->npds; for (pds = pollfds; pds < epds; pds++) { if (pds->revents) break; } if (pds < epds) { memcpy(pq->pds, pollfds, sizeof(struct pollfd) * pq->npds); ST_REMOVE_LINK(&pq->links); pq->on_ioq = 0; if (pq->thread->flags & _ST_FL_ON_SLEEPQ) _ST_DEL_SLEEPQ(pq->thread); pq->thread->state = _ST_ST_RUNNABLE; _ST_ADD_RUNQ(pq->thread); _ST_POLL_OSFD_CNT -= pq->npds; ST_ASSERT(_ST_POLL_OSFD_CNT >= 0); } pollfds = epds; } }}int _st_poll_fd_new(int osfd){ return 0;}int _st_poll_fd_close(int osfd){ /* * We don't maintain I/O counts for poll event system * so nothing to check here. */ return 0;}int _st_poll_fd_getlimit(void){ /* zero means no specific limit */ return 0;}static _st_eventsys_t _st_poll_eventsys = { "poll", ST_EVENTSYS_POLL, _st_poll_init, _st_poll_dispatch, _st_poll_pollset_add, _st_poll_pollset_del, _st_poll_fd_new, _st_poll_fd_close, _st_poll_fd_getlimit};#endif /* MD_HAVE_POLL */#ifdef MD_HAVE_KQUEUE/***************************************** * kqueue event system */ int _st_kq_init(void){ int err = 0; int rv = 0; _st_kq_data = (struct _st_kqdata *) calloc(1, sizeof(*_st_kq_data)); if (!_st_kq_data) return -1; if ((_st_kq_data->kq = kqueue()) < 0) { err = errno; rv = -1; goto cleanup_kq; } fcntl(_st_kq_data->kq, F_SETFD, FD_CLOEXEC); _st_kq_data->pid = getpid(); /* * Allocate file descriptor data array. * FD_SETSIZE looks like good initial size. */ _st_kq_data->fd_data_size = FD_SETSIZE; _st_kq_data->fd_data = (_kq_fd_data_t *)calloc(_st_kq_data->fd_data_size, sizeof(_kq_fd_data_t)); if (!_st_kq_data->fd_data) { err = errno; rv = -1; goto cleanup_kq; } /* Allocate event lists */ _st_kq_data->evtlist_size = ST_KQ_MIN_EVTLIST_SIZE; _st_kq_data->evtlist = (struct kevent *)malloc(_st_kq_data->evtlist_size * sizeof(struct kevent)); _st_kq_data->addlist_size = ST_KQ_MIN_EVTLIST_SIZE; _st_kq_data->addlist = (struct kevent *)malloc(_st_kq_data->addlist_size * sizeof(struct kevent)); _st_kq_data->dellist_size = ST_KQ_MIN_EVTLIST_SIZE; _st_kq_data->dellist = (struct kevent *)malloc(_st_kq_data->dellist_size * sizeof(struct kevent)); if (!_st_kq_data->evtlist || !_st_kq_data->addlist || !_st_kq_data->dellist) { err = ENOMEM; rv = -1; } cleanup_kq: if (rv < 0) { if (_st_kq_data->kq >= 0) close(_st_kq_data->kq); free(_st_kq_data->fd_data); free(_st_kq_data->evtlist); free(_st_kq_data->addlist); free(_st_kq_data->dellist); free(_st_kq_data); _st_kq_data = NULL; errno = err; } return rv;}int _st_kq_fd_data_expand(int maxfd){ _kq_fd_data_t *ptr; int n = _st_kq_data->fd_data_size; while (maxfd >= n) n <<= 1; ptr = (_kq_fd_data_t *)realloc(_st_kq_data->fd_data, n * sizeof(_kq_fd_data_t)); if (!ptr) return -1; memset(ptr + _st_kq_data->fd_data_size, 0, (n - _st_kq_data->fd_data_size) * sizeof(_kq_fd_data_t)); _st_kq_data->fd_data = ptr; _st_kq_data->fd_data_size = n; return 0;}int _st_kq_addlist_expand(int avail){ struct kevent *ptr; int n = _st_kq_data->addlist_size; while (avail > n - _st_kq_data->addlist_cnt) n <<= 1; ptr = (struct kevent *)realloc(_st_kq_data->addlist, n * sizeof(struct kevent)); if (!ptr) return -1; _st_kq_data->addlist = ptr; _st_kq_data->addlist_size = n; /* * Try to expand the result event list too * (although we don't have to do it). */ ptr = (struct kevent *)realloc(_st_kq_data->evtlist, n * sizeof(struct kevent)); if (ptr) { _st_kq_data->evtlist = ptr; _st_kq_data->evtlist_size = n; } return 0;}void _st_kq_addlist_add(const struct kevent *kev){ ST_ASSERT(_st_kq_data->addlist_cnt < _st_kq_data->addlist_size); memcpy(_st_kq_data->addlist + _st_kq_data->addlist_cnt, kev, sizeof(struct kevent)); _st_kq_data->addlist_cnt++;}void _st_kq_dellist_add(const struct kevent *kev){ int n = _st_kq_data->dellist_size; if (_st_kq_data->dellist_cnt >= n) { struct kevent *ptr; n <<= 1; ptr = (struct kevent *)realloc(_st_kq_data->dellist, n * sizeof(struct kevent)); if (!ptr) { /* See comment in _st_kq_pollset_del() */ return; } _st_kq_data->dellist = ptr; _st_kq_data->dellist_size = n; } memcpy(_st_kq_data->dellist + _st_kq_data->dellist_cnt, kev, sizeof(struct kevent)); _st_kq_data->dellist_cnt++;}int _st_kq_pollset_add(struct pollfd *pds, int npds){ struct kevent kev; struct pollfd *pd; struct pollfd *epd = pds + npds; /* * Pollset adding is "atomic". That is, either it succeeded for * all descriptors in the set or it failed. It means that we * need to do all the checks up front so we don't have to * "unwind" if adding of one of the descriptors failed. */ for (pd = pds; pd < epd; pd++) { /* POLLIN and/or POLLOUT must be set, but nothing else */ if (pd->fd < 0 || !pd->events || (pd->events & ~(POLLIN | POLLOUT))) { errno = EINVAL; return -1; } if (pd->fd >= _st_kq_data->fd_data_size && _st_kq_fd_data_expand(pd->fd) < 0) return -1; } /* * Make sure we have enough room in the addlist for twice as many * descriptors as in the pollset (for both READ and WRITE filters). */ npds <<= 1; if (npds > _st_kq_data->addlist_size - _st_kq_data->addlist_cnt && _st_kq_addlist_expand(npds) < 0) return -1; for (pd = pds; pd < epd; pd++) { if ((pd->events & POLLIN) && (_ST_KQ_READ_CNT(pd->fd)++ == 0)) { memset(&kev, 0, sizeof(kev)); kev.ident = pd->fd; kev.filter = EVFILT_READ;#ifdef NOTE_EOF /* Make it behave like select() and poll() */ kev.fflags = NOTE_EOF;#endif kev.flags = (EV_ADD | EV_ONESHOT); _st_kq_addlist_add(&kev); } if ((pd->events & POLLOUT) && (_ST_KQ_WRITE_CNT(pd->fd)++ == 0)) { memset(&kev, 0, sizeof(kev)); kev.ident = pd->fd; kev.filter = EVFILT_WRITE; kev.flags = (EV_ADD | EV_ONESHOT); _st_kq_addlist_add(&kev); } } return 0;}void _st_kq_pollset_del(struct pollfd *pds, int npds){ struct kevent kev; struct pollfd *pd; struct pollfd *epd = pds + npds; /* * It's OK if deleting fails because a descriptor will either be * closed or fire only once (we set EV_ONESHOT flag). */ _st_kq_data->dellist_cnt = 0; for (pd = pds; pd < epd; pd++) { if ((pd->events & POLLIN) && (--_ST_KQ_READ_CNT(pd->fd) == 0)) { memset(&kev, 0, sizeof(kev)); kev.ident = pd->fd; kev.filter = EVFILT_READ; kev.flags = EV_DELETE; _st_kq_dellist_add(&kev); } if ((pd->events & POLLOUT) && (--_ST_KQ_WRITE_CNT(pd->fd) == 0)) { memset(&kev, 0, sizeof(kev)); kev.ident = pd->fd; kev.filter = EVFILT_WRITE; kev.flags = EV_DELETE; _st_kq_dellist_add(&kev); } } if (_st_kq_data->dellist_cnt > 0) { /* * We do "synchronous" kqueue deletes to avoid deleting * closed descriptors and other possible problems. */ int rv; do { /* This kevent() won't block since result list size is 0 */ rv = kevent(_st_kq_data->kq, _st_kq_data->dellist, _st_kq_data->dellist_cnt, NULL, 0, NULL); } while (rv < 0 && errno == EINTR); }}void _st_kq_dispatch(void){ struct timespec timeout, *tsp; struct kevent kev; st_utime_t min_timeout; _st_clist_t *q; _st_pollq_t *pq; struct pollfd *pds, *epds; int nfd, i, osfd, notify, filter; short events, revents; if (_ST_SLEEPQ == NULL) { tsp = NULL; } else { min_timeout = (_ST_SLEEPQ->due <= _ST_LAST_CLOCK) ? 0 : (_ST_SLEEPQ->due - _ST_LAST_CLOCK); timeout.tv_sec = (time_t) (min_timeout / 1000000); timeout.tv_nsec = (long) ((min_timeout % 1000000) * 1000); tsp = &timeout; } retry_kevent: /* Check for I/O operations */ nfd = kevent(_st_kq_data->kq, _st_kq_data->addlist, _st_kq_data->addlist_cnt, _st_kq_data->evtlist, _st_kq_data->evtlist_size, tsp); _st_kq_data->addlist_cnt = 0; if (nfd > 0) { for (i = 0; i < nfd; i++) { osfd = _st_kq_data->evtlist[i].ident; filter = _st_kq_data->evtlist[i].filter; if (filter == EVFILT_READ) { _ST_KQ_REVENTS(osfd) |= POLLIN; } else if (filter == EVFILT_WRITE) { _ST_KQ_REVENTS(osfd) |= POLLOUT; } if (_st_kq_data->evtlist[i].flags & EV_ERROR) { if (_st_kq_data->evtlist[i].data == EBADF) { _ST_KQ_REVENTS(osfd) |= POLLNVAL; } else { _ST_KQ_REVENTS(osfd) |= POLLERR; } } } _st_kq_data->dellist_cnt = 0; for (q = _ST_IOQ.next; q != &_ST_IOQ; q = q->next) { pq = _ST_POLLQUEUE_PTR(q); notify = 0; epds = pq->pds + pq->npds; for (pds = pq->pds; pds < epds; pds++) { osfd = pds->fd; events = pds->events; revents = (short)(_ST_KQ_REVENTS(osfd) & ~(POLLIN | POLLOUT)); if ((events & POLLIN) && (_ST_KQ_REVENTS(osfd) & POLLIN)) { revents |= POLLIN; } if ((events & POLLOUT) && (_ST_KQ_REVENTS(osfd) & POLLOUT)) { revents |= POLLOUT; } pds->revents = revents; if (revents) { notify = 1; } } if (notify) { ST_REMOVE_LINK(&pq->links); pq->on_ioq = 0; for (pds = pq->pds; pds < epds; pds++) { osfd = pds->fd; events = pds->events; /* * We set EV_ONESHOT flag so we only need to delete * descriptor if it didn't fire. */ if ((events & POLLIN) && (--_ST_KQ_READ_CNT(osfd) == 0) && ((_ST_KQ_REVENTS(osfd) & POLLIN) == 0)) { memset(&kev, 0, sizeof(kev)); kev.ident = osfd; kev.filter = EVFILT_READ; kev.flags = EV_DELETE; _st_kq_dellist_add(&kev); } if ((events & POLLOUT) && (--_ST_KQ_WRITE_CNT(osfd) == 0) && ((_ST_KQ_REVENTS(osfd) & POLLOUT) == 0)) { memset(&kev, 0, sizeof(kev)); kev.ident = osfd; kev.filter = EVFILT_WRITE; kev.flags = EV_DELETE; _st_kq_dellist_add(&kev); } } if (pq->thread->flags & _ST_FL_ON_SLEEPQ) _ST_DEL_SLEEPQ(pq->thread); pq->thread->state = _ST_ST_RUNNABLE; _ST_ADD_RUNQ(pq->thread); } } if (_st_kq_data->dellist_cnt > 0) { int rv; do { /* This kevent() won't block since result list size is 0 */ rv = kevent(_st_kq_data->kq, _st_kq_data->dellist, _st_kq_data->dellist_cnt, NULL, 0, NULL); } while (rv < 0 && errno == EINTR); } for (i = 0; i < nfd; i++) { osfd = _st_kq_data->evtlist[i].ident; _ST_KQ_REVENTS(osfd) = 0; } } else if (nfd < 0) {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -