erl_poll.c
来自「OTP是开放电信平台的简称」· C语言 代码 · 共 2,608 行 · 第 1/5 页
C
2,608 行
/* ``The contents of this file are subject to the Erlang Public License, * Version 1.1, (the "License"); you may not use this file except in * compliance with the License. You should have received a copy of the * Erlang Public License along with this software. If not, it can be * retrieved via the world wide web at http://www.erlang.org/. * * Software distributed under the License is distributed on an "AS IS" * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See * the License for the specific language governing rights and limitations * under the License. * * The Initial Developer of the Original Code is Ericsson Utvecklings AB. * Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings * AB. All Rights Reserved.'' * * $Id$ * *//* * Description: Poll interface suitable for ERTS with or without * SMP support. * * The interface is currently implemented using: * - select * - poll * - /dev/poll * - epoll with poll or select as fallback * - kqueue with poll or select as fallback * * Some time in the future it will also be * implemented using Solaris ports. * * * * Author: Rickard Green */#ifdef HAVE_CONFIG_H# include "config.h"#endif#define WANT_NONBLOCKING#define ERTS_WANT_GOT_SIGUSR1#include "erl_poll.h"#if ERTS_POLL_USE_KQUEUE# include <sys/types.h># include <sys/event.h># include <sys/time.h>#endif#if ERTS_POLL_USE_SELECT# ifdef SYS_SELECT_H# include <sys/select.h># endif#endif#ifdef NO_SYSCONF# if ERTS_POLL_USE_SELECT# include <sys/param.h># else# include <limits.h># endif#endif#include "erl_driver.h"#include "erl_alloc.h"#if !defined(ERTS_POLL_USE_EPOLL) \ && !defined(ERTS_POLL_USE_DEVPOLL) \ && !defined(ERTS_POLL_USE_POLL) \ && !defined(ERTS_POLL_USE_SELECT)#error "Missing implementation of erts_poll()"#endif#if defined(ERTS_KERNEL_POLL_VERSION) && !ERTS_POLL_USE_KERNEL_POLL#error "Missing kernel poll implementation of erts_poll()"#endif#if defined(ERTS_NO_KERNEL_POLL_VERSION) && ERTS_POLL_USE_KERNEL_POLL#error "Kernel poll used when it shouldn't be used"#endif#if 0#define ERTS_POLL_DEBUG_PRINT#endif#if defined(DEBUG) && 0#define HARD_DEBUG#endif#define ERTS_POLL_USE_BATCH_UPDATE_POLLSET (ERTS_POLL_USE_DEVPOLL \ || ERTS_POLL_USE_KQUEUE)#define ERTS_POLL_USE_UPDATE_REQUESTS_QUEUE \ (defined(ERTS_SMP) || ERTS_POLL_USE_KERNEL_POLL || ERTS_POLL_USE_POLL)#define ERTS_POLL_USE_CONCURRENT_UPDATE \ (defined(ERTS_SMP) && ERTS_POLL_USE_EPOLL)#define ERTS_POLL_COALESCE_KP_RES (ERTS_POLL_USE_KQUEUE || ERTS_POLL_USE_EPOLL)#define FDS_STATUS_EXTRA_FREE_SIZE 128#define POLL_FDS_EXTRA_FREE_SIZE 128#ifdef ERTS_POLL_NEED_ASYNC_INTERRUPT_SUPPORT# define ERTS_POLL_ASYNC_INTERRUPT_SUPPORT 1#else# define ERTS_POLL_ASYNC_INTERRUPT_SUPPORT 0#endif#define ERTS_POLL_USE_WAKEUP_PIPE \ (ERTS_POLL_ASYNC_INTERRUPT_SUPPORT || defined(ERTS_SMP))#ifdef ERTS_SMP#define ERTS_POLLSET_LOCK(PS) \ erts_smp_mtx_lock(&(PS)->mtx)#define ERTS_POLLSET_UNLOCK(PS) \ erts_smp_mtx_unlock(&(PS)->mtx)#define ERTS_POLLSET_SET_POLLED_CHK(PS) \ ((int) erts_smp_atomic_xchg(&(PS)->polled, (long) 1))#define ERTS_POLLSET_SET_POLLED(PS) \ erts_smp_atomic_set(&(PS)->polled, (long) 1)#define ERTS_POLLSET_UNSET_POLLED(PS) \ erts_smp_atomic_set(&(PS)->polled, (long) 0)#define ERTS_POLLSET_IS_POLLED(PS) \ ((int) erts_smp_atomic_read(&(PS)->polled))#define ERTS_POLLSET_SET_POLLER_WOKEN_CHK(PS) \ ((int) erts_smp_atomic_xchg(&(PS)->woken, (long) 1))#define ERTS_POLLSET_SET_POLLER_WOKEN(PS) \ erts_smp_atomic_set(&(PS)->woken, (long) 1)#define ERTS_POLLSET_UNSET_POLLER_WOKEN(PS) \ erts_smp_atomic_set(&(PS)->woken, (long) 0)#define ERTS_POLLSET_IS_POLLER_WOKEN(PS) \ ((int) erts_smp_atomic_read(&(PS)->woken))#else#define ERTS_POLLSET_LOCK(PS)#define ERTS_POLLSET_UNLOCK(PS)#define ERTS_POLLSET_SET_POLLED_CHK(PS) 0#define ERTS_POLLSET_UNSET_POLLED(PS)#define ERTS_POLLSET_IS_POLLED(PS) 0#if ERTS_POLL_ASYNC_INTERRUPT_SUPPORT/* * Ideally, the ERTS_POLLSET_SET_POLLER_WOKEN_CHK(PS) operation would * be atomic. This operation isn't, but we will do okay anyway. The * "woken check" is only an optimization. The only requirement we have: * If (PS)->woken is set to a value != 0 when interrupting, we have to * write on the the wakeup pipe at least once. Multiple writes are okay. */#define ERTS_POLLSET_SET_POLLER_WOKEN_CHK(PS) ((PS)->woken++)#define ERTS_POLLSET_SET_POLLER_WOKEN(PS) ((PS)->woken = 1, (void) 0)#define ERTS_POLLSET_UNSET_POLLER_WOKEN(PS) ((PS)->woken = 0, (void) 0)#define ERTS_POLLSET_IS_POLLER_WOKEN(PS) ((PS)->woken)#else#define ERTS_POLLSET_SET_POLLER_WOKEN_CHK(PS) 1#define ERTS_POLLSET_SET_POLLER_WOKEN(PS)#define ERTS_POLLSET_UNSET_POLLER_WOKEN(PS)#define ERTS_POLLSET_IS_POLLER_WOKEN(PS) 1#endif#endif#if ERTS_POLL_USE_UPDATE_REQUESTS_QUEUE#define ERTS_POLLSET_SET_HAVE_UPDATE_REQUESTS(PS) \ erts_smp_atomic_set(&(PS)->have_update_requests, (long) 1)#define ERTS_POLLSET_UNSET_HAVE_UPDATE_REQUESTS(PS) \ erts_smp_atomic_set(&(PS)->have_update_requests, (long) 0)#define ERTS_POLLSET_HAVE_UPDATE_REQUESTS(PS) \ ((int) erts_smp_atomic_read(&(PS)->have_update_requests))#else#define ERTS_POLLSET_SET_HAVE_UPDATE_REQUESTS(PS)#define ERTS_POLLSET_UNSET_HAVE_UPDATE_REQUESTS(PS)#define ERTS_POLLSET_HAVE_UPDATE_REQUESTS(PS) 0#endif#if ERTS_POLL_ASYNC_INTERRUPT_SUPPORT && !defined(ERTS_SMP)#define ERTS_POLLSET_UNSET_INTERRUPTED_CHK(PS) unset_interrupted_chk((PS))#define ERTS_POLLSET_UNSET_INTERRUPTED(PS) ((PS)->interrupt = 0, (void) 0)#define ERTS_POLLSET_SET_INTERRUPTED(PS) ((PS)->interrupt = 1, (void) 0)#define ERTS_POLLSET_IS_INTERRUPTED(PS) ((PS)->interrupt)#else#define ERTS_POLLSET_UNSET_INTERRUPTED_CHK(PS) \ ((int) erts_smp_atomic_xchg(&(PS)->interrupt, (long) 0))#define ERTS_POLLSET_UNSET_INTERRUPTED(PS) \ erts_smp_atomic_set(&(PS)->interrupt, (long) 0)#define ERTS_POLLSET_SET_INTERRUPTED(PS) \ erts_smp_atomic_set(&(PS)->interrupt, (long) 1)#define ERTS_POLLSET_IS_INTERRUPTED(PS) \ ((int) erts_smp_atomic_read(&(PS)->interrupt))#endif#if ERTS_POLL_USE_FALLBACK# if ERTS_POLL_USE_POLL# define ERTS_POLL_NEED_FALLBACK(PS) ((PS)->no_poll_fds > 1)# elif ERTS_POLL_USE_SELECT# define ERTS_POLL_NEED_FALLBACK(PS) ((PS)->no_select_fds > 1)# endif#endif/* * --- Data types ------------------------------------------------------------ */#if ERTS_POLL_USE_UPDATE_REQUESTS_QUEUE#define ERTS_POLLSET_UPDATE_REQ_BLOCK_SIZE 128typedef struct ErtsPollSetUpdateRequestsBlock_ ErtsPollSetUpdateRequestsBlock;struct ErtsPollSetUpdateRequestsBlock_ { ErtsPollSetUpdateRequestsBlock *next; int len; int fds[ERTS_POLLSET_UPDATE_REQ_BLOCK_SIZE];};#endif#if ERTS_POLL_USE_UPDATE_REQUESTS_QUEUE# define ERTS_POLL_FD_FLG_INURQ (((unsigned short) 1) << 0)#endif#if ERTS_POLL_USE_FALLBACK# define ERTS_POLL_FD_FLG_INFLBCK (((unsigned short) 1) << 1)# define ERTS_POLL_FD_FLG_USEFLBCK (((unsigned short) 1) << 2)#endif#if ERTS_POLL_USE_KERNEL_POLL || defined(ERTS_SMP)# define ERTS_POLL_FD_FLG_RST (((unsigned short) 1) << 3)#endiftypedef struct {#if ERTS_POLL_USE_POLL int pix;#endif ErtsPollEvents used_events; ErtsPollEvents events;#if ERTS_POLL_COALESCE_KP_RES unsigned short res_ev_ix;#endif#if ERTS_POLL_USE_UPDATE_REQUESTS_QUEUE || ERTS_POLL_USE_FALLBACK unsigned short flags;#endif} ErtsFdStatus;#if ERTS_POLL_COALESCE_KP_RES/* res_ev_ix max value */#define ERTS_POLL_MAX_RES ((1 << sizeof(unsigned short)*8) - 1)#endif#if ERTS_POLL_USE_KQUEUE#define ERTS_POLL_KQ_OP_HANDLED 1#define ERTS_POLL_KQ_OP_DEL_R 2#define ERTS_POLL_KQ_OP_DEL_W 3#define ERTS_POLL_KQ_OP_ADD_R 4#define ERTS_POLL_KQ_OP_ADD_W 5#define ERTS_POLL_KQ_OP_ADD2_R 6#define ERTS_POLL_KQ_OP_ADD2_W 7#endifstruct ErtsPollSet_ { ErtsPollSet next; int internal_fd_limit; ErtsFdStatus *fds_status; int no_of_user_fds; int fds_status_len;#if ERTS_POLL_USE_KERNEL_POLL int kp_fd; int res_events_len;#if ERTS_POLL_USE_EPOLL struct epoll_event *res_events;#elif ERTS_POLL_USE_KQUEUE struct kevent *res_events;#elif ERTS_POLL_USE_DEVPOLL struct pollfd *res_events;#endif#endif /* ERTS_POLL_USE_KERNEL_POLL */#if ERTS_POLL_USE_POLL int next_poll_fds_ix; int no_poll_fds; int poll_fds_len; struct pollfd*poll_fds;#elif ERTS_POLL_USE_SELECT int next_sel_fd; int max_fd;#if ERTS_POLL_USE_FALLBACK int no_select_fds;#endif fd_set input_fds; fd_set res_input_fds; fd_set output_fds; fd_set res_output_fds;#endif#if ERTS_POLL_USE_UPDATE_REQUESTS_QUEUE ErtsPollSetUpdateRequestsBlock update_requests; ErtsPollSetUpdateRequestsBlock *curr_upd_req_block; erts_smp_atomic_t have_update_requests;#endif#ifdef ERTS_SMP erts_smp_atomic_t polled; erts_smp_atomic_t woken; erts_smp_mtx_t mtx;#elif ERTS_POLL_ASYNC_INTERRUPT_SUPPORT volatile int woken;#endif#if ERTS_POLL_USE_WAKEUP_PIPE int wake_fds[2];#endif#if ERTS_POLL_USE_FALLBACK int fallback_used;#endif#if ERTS_POLL_ASYNC_INTERRUPT_SUPPORT && !defined(ERTS_SMP) volatile int interrupt;#else erts_smp_atomic_t interrupt;#endif erts_smp_atomic_t timeout;#ifdef ERTS_POLL_COUNT_AVOIDED_WAKEUPS erts_smp_atomic_t no_avoided_wakeups; erts_smp_atomic_t no_avoided_interrupts; erts_smp_atomic_t no_interrupt_timed;#endif};#if ERTS_POLL_ASYNC_INTERRUPT_SUPPORT && !defined(ERTS_SMP)static ERTS_INLINE intunset_interrupted_chk(ErtsPollSet ps){ /* This operation isn't atomic, but we have no need at all for an atomic operation here... */ int res = ps->interrupt; ps->interrupt = 0; return res;}#endifstatic void fatal_error(char *format, ...);static void fatal_error_async_signal_safe(char *error_str);static int max_fds = -1;static ErtsPollSet pollsets;static erts_smp_spinlock_t pollsets_lock;#if ERTS_POLL_USE_POLLstatic ERTS_INLINE shortev2pollev(ErtsPollEvents ev){#if !ERTS_POLL_USE_FALLBACK || ERTS_POLL_USE_KQUEUE return ERTS_POLL_EV_E2N(ev);#else /* Note, we only map events we are interested in */ short res_ev = (short) 0; if (ev & ERTS_POLL_EV_IN) res_ev |= ERTS_POLL_EV_NKP_IN; if (ev & ERTS_POLL_EV_OUT) res_ev |= ERTS_POLL_EV_NKP_OUT; return res_ev;#endif}static ERTS_INLINE ErtsPollEventspollev2ev(short ev){#if !ERTS_POLL_USE_FALLBACK || ERTS_POLL_USE_KQUEUE return ERTS_POLL_EV_N2E(ev);#else /* Note, we only map events we are interested in */ ErtsPollEvents res_ev = (ErtsPollEvents) 0; if (ev & ERTS_POLL_EV_NKP_IN) res_ev |= ERTS_POLL_EV_IN; if (ev & ERTS_POLL_EV_NKP_OUT) res_ev |= ERTS_POLL_EV_OUT; if (ev & ERTS_POLL_EV_NKP_ERR) res_ev |= ERTS_POLL_EV_ERR; if (ev & ERTS_POLL_EV_NKP_NVAL) res_ev |= ERTS_POLL_EV_NVAL; return res_ev;#endif}#endif#ifdef HARD_DEBUGstatic void check_poll_result(ErtsPollResFd pr[], int len);#if ERTS_POLL_USE_DEVPOLLstatic void check_poll_status(ErtsPollSet ps);#endif /* ERTS_POLL_USE_DEVPOLL */#endif /* HARD_DEBUG */#ifdef ERTS_POLL_DEBUG_PRINTstatic void print_misc_debug_info(void);#endif/* * --- Wakeup pipe ----------------------------------------------------------- */#if ERTS_POLL_USE_WAKEUP_PIPEstatic ERTS_INLINE voidwake_poller(ErtsPollSet ps){ /* * NOTE: This function might be called from signal handlers in the * non-smp case; therefore, it has to be async-signal safe in * the non-smp case. */ if (!ERTS_POLLSET_SET_POLLER_WOKEN_CHK(ps)) { ssize_t res; if (ps->wake_fds[1] < 0) return; /* Not initialized yet */ do { /* write() is async-signal safe (according to posix) */ res = write(ps->wake_fds[1], "!", 1); } while (res < 0 && errno == EINTR); if (res <= 0 && errno != ERRNO_BLOCK) { fatal_error_async_signal_safe(__FILE__ ":XXX:wake_poller(): " "Failed to write on wakeup pipe\n"); } }}static ERTS_INLINE voidcleanup_wakeup_pipe(ErtsPollSet ps){ int fd = ps->wake_fds[0]; int res; do { char *buf[32]; res = read(fd, (void *) buf, 32); } while (res > 0 || (res < 0 && errno == EINTR)); if (res < 0 && errno != ERRNO_BLOCK) { fatal_error("%s:%d:cleanup_wakeup_pipe(): " "Failed to read on wakeup pipe fd=%d: " "%s (%d)\n", __FILE__, __LINE__, fd, erl_errno_id(errno), errno); }}static voidcreate_wakeup_pipe(ErtsPollSet ps){ int wake_fds[2]; ps->wake_fds[0] = -1; ps->wake_fds[1] = -1; if (pipe(wake_fds) < 0) { fatal_error("%s:%d:create_wakeup_pipe(): " "Failed to create pipe: %s (%d)\n", __FILE__, __LINE__, erl_errno_id(errno), errno); } SET_NONBLOCKING(wake_fds[0]); SET_NONBLOCKING(wake_fds[1]);#ifdef ERTS_POLL_DEBUG_PRINT erts_printf("wakeup fds = {%d, %d}\n", wake_fds[0], wake_fds[1]);#endif ERTS_POLL_EXPORT(erts_poll_control)(ps, wake_fds[0], ERTS_POLL_EV_IN, 1);#if ERTS_POLL_USE_FALLBACK /* We depend on the wakeup pipe being handled by kernel poll */ if (ps->fds_status[wake_fds[0]].flags & ERTS_POLL_FD_FLG_INFLBCK) fatal_error("%s:%d:create_wakeup_pipe(): Internal error\n", __FILE__, __LINE__);#endif if (ps->internal_fd_limit <= wake_fds[1]) ps->internal_fd_limit = wake_fds[1] + 1; if (ps->internal_fd_limit <= wake_fds[0]) ps->internal_fd_limit = wake_fds[0] + 1; ps->wake_fds[0] = wake_fds[0]; ps->wake_fds[1] = wake_fds[1];}#endif /* ERTS_POLL_USE_WAKEUP_PIPE *//* * --- Poll set update requests ---------------------------------------------- */#if ERTS_POLL_USE_UPDATE_REQUESTS_QUEUEstatic ERTS_INLINE voidenqueue_update_request(ErtsPollSet ps, int fd){ ErtsPollSetUpdateRequestsBlock *urqbp; ASSERT(fd < ps->fds_status_len); if (ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_INURQ) return; if (ps->update_requests.len == 0) ERTS_POLLSET_SET_HAVE_UPDATE_REQUESTS(ps); urqbp = ps->curr_upd_req_block; if (urqbp->len == ERTS_POLLSET_UPDATE_REQ_BLOCK_SIZE) { ASSERT(!urqbp->next); urqbp = erts_alloc(ERTS_ALC_T_POLLSET_UPDREQ, sizeof(ErtsPollSetUpdateRequestsBlock)); ps->curr_upd_req_block->next = urqbp; ps->curr_upd_req_block = urqbp; urqbp->next = NULL; urqbp->len = 0; } ps->fds_status[fd].flags |= ERTS_POLL_FD_FLG_INURQ;
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?