erl_poll.c

来自「OTP是开放电信平台的简称」· C语言 代码 · 共 2,608 行 · 第 1/5 页

C
2,608
字号
/* ``The contents of this file are subject to the Erlang Public License, * Version 1.1, (the "License"); you may not use this file except in * compliance with the License. You should have received a copy of the * Erlang Public License along with this software. If not, it can be * retrieved via the world wide web at http://www.erlang.org/. *  * Software distributed under the License is distributed on an "AS IS" * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See * the License for the specific language governing rights and limitations * under the License. *  * The Initial Developer of the Original Code is Ericsson Utvecklings AB. * Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings * AB. All Rights Reserved.'' *  *     $Id$ * *//* * Description:	Poll interface suitable for ERTS with or without *              SMP support. * *		The interface is currently implemented using: *		- select *		- poll *              - /dev/poll *              - epoll with poll or select as fallback *              - kqueue with poll or select as fallback * *		Some time in the future it will also be *		implemented using Solaris ports. * * * * Author: 	Rickard Green */#ifdef HAVE_CONFIG_H#  include "config.h"#endif#define WANT_NONBLOCKING#define ERTS_WANT_GOT_SIGUSR1#include "erl_poll.h"#if ERTS_POLL_USE_KQUEUE#  include <sys/types.h>#  include <sys/event.h>#  include <sys/time.h>#endif#if ERTS_POLL_USE_SELECT#  ifdef SYS_SELECT_H#    include <sys/select.h>#  endif#endif#ifdef NO_SYSCONF#  if ERTS_POLL_USE_SELECT#    include <sys/param.h>#  else#    include <limits.h>#  endif#endif#include "erl_driver.h"#include "erl_alloc.h"#if !defined(ERTS_POLL_USE_EPOLL) \    && !defined(ERTS_POLL_USE_DEVPOLL)  \    && !defined(ERTS_POLL_USE_POLL) \    && !defined(ERTS_POLL_USE_SELECT)#error "Missing implementation of erts_poll()"#endif#if defined(ERTS_KERNEL_POLL_VERSION) && !ERTS_POLL_USE_KERNEL_POLL#error "Missing kernel poll implementation of erts_poll()"#endif#if defined(ERTS_NO_KERNEL_POLL_VERSION) && ERTS_POLL_USE_KERNEL_POLL#error "Kernel poll used when it shouldn't be used"#endif#if 0#define ERTS_POLL_DEBUG_PRINT#endif#if defined(DEBUG) && 0#define HARD_DEBUG#endif#define ERTS_POLL_USE_BATCH_UPDATE_POLLSET (ERTS_POLL_USE_DEVPOLL \					    || ERTS_POLL_USE_KQUEUE)#define ERTS_POLL_USE_UPDATE_REQUESTS_QUEUE \   (defined(ERTS_SMP) || ERTS_POLL_USE_KERNEL_POLL || ERTS_POLL_USE_POLL)#define ERTS_POLL_USE_CONCURRENT_UPDATE \   (defined(ERTS_SMP) && ERTS_POLL_USE_EPOLL)#define ERTS_POLL_COALESCE_KP_RES (ERTS_POLL_USE_KQUEUE || ERTS_POLL_USE_EPOLL)#define FDS_STATUS_EXTRA_FREE_SIZE 128#define POLL_FDS_EXTRA_FREE_SIZE 128#ifdef ERTS_POLL_NEED_ASYNC_INTERRUPT_SUPPORT#  define ERTS_POLL_ASYNC_INTERRUPT_SUPPORT 1#else#  define ERTS_POLL_ASYNC_INTERRUPT_SUPPORT 0#endif#define ERTS_POLL_USE_WAKEUP_PIPE \   (ERTS_POLL_ASYNC_INTERRUPT_SUPPORT || defined(ERTS_SMP))#ifdef ERTS_SMP#define ERTS_POLLSET_LOCK(PS) \  erts_smp_mtx_lock(&(PS)->mtx)#define ERTS_POLLSET_UNLOCK(PS) \  erts_smp_mtx_unlock(&(PS)->mtx)#define ERTS_POLLSET_SET_POLLED_CHK(PS) \  ((int) erts_smp_atomic_xchg(&(PS)->polled, (long) 1))#define ERTS_POLLSET_SET_POLLED(PS) \  erts_smp_atomic_set(&(PS)->polled, (long) 1)#define ERTS_POLLSET_UNSET_POLLED(PS) \  erts_smp_atomic_set(&(PS)->polled, (long) 0)#define ERTS_POLLSET_IS_POLLED(PS) \  ((int) erts_smp_atomic_read(&(PS)->polled))#define ERTS_POLLSET_SET_POLLER_WOKEN_CHK(PS) \  ((int) erts_smp_atomic_xchg(&(PS)->woken, (long) 1))#define ERTS_POLLSET_SET_POLLER_WOKEN(PS) \  erts_smp_atomic_set(&(PS)->woken, (long) 1)#define ERTS_POLLSET_UNSET_POLLER_WOKEN(PS) \  erts_smp_atomic_set(&(PS)->woken, (long) 0)#define ERTS_POLLSET_IS_POLLER_WOKEN(PS) \  ((int) erts_smp_atomic_read(&(PS)->woken))#else#define ERTS_POLLSET_LOCK(PS)#define ERTS_POLLSET_UNLOCK(PS)#define ERTS_POLLSET_SET_POLLED_CHK(PS) 0#define ERTS_POLLSET_UNSET_POLLED(PS)#define ERTS_POLLSET_IS_POLLED(PS) 0#if ERTS_POLL_ASYNC_INTERRUPT_SUPPORT/* * Ideally, the ERTS_POLLSET_SET_POLLER_WOKEN_CHK(PS) operation would * be atomic. This operation isn't, but we will do okay anyway. The * "woken check" is only an optimization. The only requirement we have: * If (PS)->woken is set to a value != 0 when interrupting, we have to * write on the the wakeup pipe at least once. Multiple writes are okay. */#define ERTS_POLLSET_SET_POLLER_WOKEN_CHK(PS) ((PS)->woken++)#define ERTS_POLLSET_SET_POLLER_WOKEN(PS) ((PS)->woken = 1, (void) 0)#define ERTS_POLLSET_UNSET_POLLER_WOKEN(PS) ((PS)->woken = 0, (void) 0)#define ERTS_POLLSET_IS_POLLER_WOKEN(PS) ((PS)->woken)#else#define ERTS_POLLSET_SET_POLLER_WOKEN_CHK(PS) 1#define ERTS_POLLSET_SET_POLLER_WOKEN(PS)#define ERTS_POLLSET_UNSET_POLLER_WOKEN(PS)#define ERTS_POLLSET_IS_POLLER_WOKEN(PS) 1#endif#endif#if ERTS_POLL_USE_UPDATE_REQUESTS_QUEUE#define ERTS_POLLSET_SET_HAVE_UPDATE_REQUESTS(PS) \  erts_smp_atomic_set(&(PS)->have_update_requests, (long) 1)#define ERTS_POLLSET_UNSET_HAVE_UPDATE_REQUESTS(PS) \  erts_smp_atomic_set(&(PS)->have_update_requests, (long) 0)#define ERTS_POLLSET_HAVE_UPDATE_REQUESTS(PS) \  ((int) erts_smp_atomic_read(&(PS)->have_update_requests))#else#define ERTS_POLLSET_SET_HAVE_UPDATE_REQUESTS(PS)#define ERTS_POLLSET_UNSET_HAVE_UPDATE_REQUESTS(PS)#define ERTS_POLLSET_HAVE_UPDATE_REQUESTS(PS) 0#endif#if ERTS_POLL_ASYNC_INTERRUPT_SUPPORT && !defined(ERTS_SMP)#define ERTS_POLLSET_UNSET_INTERRUPTED_CHK(PS) unset_interrupted_chk((PS))#define ERTS_POLLSET_UNSET_INTERRUPTED(PS) ((PS)->interrupt = 0, (void) 0)#define ERTS_POLLSET_SET_INTERRUPTED(PS) ((PS)->interrupt = 1, (void) 0)#define ERTS_POLLSET_IS_INTERRUPTED(PS) ((PS)->interrupt)#else#define ERTS_POLLSET_UNSET_INTERRUPTED_CHK(PS) \  ((int) erts_smp_atomic_xchg(&(PS)->interrupt, (long) 0))#define ERTS_POLLSET_UNSET_INTERRUPTED(PS) \  erts_smp_atomic_set(&(PS)->interrupt, (long) 0)#define ERTS_POLLSET_SET_INTERRUPTED(PS) \  erts_smp_atomic_set(&(PS)->interrupt, (long) 1)#define ERTS_POLLSET_IS_INTERRUPTED(PS) \  ((int) erts_smp_atomic_read(&(PS)->interrupt))#endif#if ERTS_POLL_USE_FALLBACK#  if ERTS_POLL_USE_POLL#    define ERTS_POLL_NEED_FALLBACK(PS) ((PS)->no_poll_fds > 1)#  elif ERTS_POLL_USE_SELECT#    define ERTS_POLL_NEED_FALLBACK(PS) ((PS)->no_select_fds > 1)#  endif#endif/* * --- Data types ------------------------------------------------------------ */#if ERTS_POLL_USE_UPDATE_REQUESTS_QUEUE#define ERTS_POLLSET_UPDATE_REQ_BLOCK_SIZE 128typedef struct ErtsPollSetUpdateRequestsBlock_ ErtsPollSetUpdateRequestsBlock;struct ErtsPollSetUpdateRequestsBlock_ {    ErtsPollSetUpdateRequestsBlock *next;    int len;    int fds[ERTS_POLLSET_UPDATE_REQ_BLOCK_SIZE];};#endif#if ERTS_POLL_USE_UPDATE_REQUESTS_QUEUE#  define ERTS_POLL_FD_FLG_INURQ	(((unsigned short) 1) << 0)#endif#if ERTS_POLL_USE_FALLBACK#  define ERTS_POLL_FD_FLG_INFLBCK	(((unsigned short) 1) << 1)#  define ERTS_POLL_FD_FLG_USEFLBCK	(((unsigned short) 1) << 2)#endif#if ERTS_POLL_USE_KERNEL_POLL || defined(ERTS_SMP)#  define ERTS_POLL_FD_FLG_RST		(((unsigned short) 1) << 3)#endiftypedef struct {#if ERTS_POLL_USE_POLL    int pix;#endif    ErtsPollEvents used_events;    ErtsPollEvents events;#if ERTS_POLL_COALESCE_KP_RES    unsigned short res_ev_ix;#endif#if ERTS_POLL_USE_UPDATE_REQUESTS_QUEUE || ERTS_POLL_USE_FALLBACK    unsigned short flags;#endif} ErtsFdStatus;#if ERTS_POLL_COALESCE_KP_RES/* res_ev_ix max value */#define ERTS_POLL_MAX_RES ((1 << sizeof(unsigned short)*8) - 1)#endif#if ERTS_POLL_USE_KQUEUE#define ERTS_POLL_KQ_OP_HANDLED			1#define ERTS_POLL_KQ_OP_DEL_R			2#define ERTS_POLL_KQ_OP_DEL_W			3#define ERTS_POLL_KQ_OP_ADD_R			4#define ERTS_POLL_KQ_OP_ADD_W			5#define ERTS_POLL_KQ_OP_ADD2_R			6#define ERTS_POLL_KQ_OP_ADD2_W			7#endifstruct ErtsPollSet_ {    ErtsPollSet next;    int internal_fd_limit;    ErtsFdStatus *fds_status;    int no_of_user_fds;    int fds_status_len;#if ERTS_POLL_USE_KERNEL_POLL    int kp_fd;    int res_events_len;#if ERTS_POLL_USE_EPOLL    struct epoll_event *res_events;#elif ERTS_POLL_USE_KQUEUE    struct kevent *res_events;#elif ERTS_POLL_USE_DEVPOLL    struct pollfd *res_events;#endif#endif /* ERTS_POLL_USE_KERNEL_POLL */#if ERTS_POLL_USE_POLL    int next_poll_fds_ix;    int no_poll_fds;    int poll_fds_len;    struct pollfd*poll_fds;#elif ERTS_POLL_USE_SELECT    int next_sel_fd;    int max_fd;#if ERTS_POLL_USE_FALLBACK    int no_select_fds;#endif    fd_set input_fds;    fd_set res_input_fds;    fd_set output_fds;    fd_set res_output_fds;#endif#if ERTS_POLL_USE_UPDATE_REQUESTS_QUEUE    ErtsPollSetUpdateRequestsBlock update_requests;    ErtsPollSetUpdateRequestsBlock *curr_upd_req_block;    erts_smp_atomic_t have_update_requests;#endif#ifdef ERTS_SMP    erts_smp_atomic_t polled;    erts_smp_atomic_t woken;    erts_smp_mtx_t mtx;#elif ERTS_POLL_ASYNC_INTERRUPT_SUPPORT    volatile int woken;#endif#if ERTS_POLL_USE_WAKEUP_PIPE    int wake_fds[2];#endif#if ERTS_POLL_USE_FALLBACK    int fallback_used;#endif#if ERTS_POLL_ASYNC_INTERRUPT_SUPPORT && !defined(ERTS_SMP)    volatile int interrupt;#else    erts_smp_atomic_t interrupt;#endif    erts_smp_atomic_t timeout;#ifdef ERTS_POLL_COUNT_AVOIDED_WAKEUPS    erts_smp_atomic_t no_avoided_wakeups;    erts_smp_atomic_t no_avoided_interrupts;    erts_smp_atomic_t no_interrupt_timed;#endif};#if ERTS_POLL_ASYNC_INTERRUPT_SUPPORT && !defined(ERTS_SMP)static ERTS_INLINE intunset_interrupted_chk(ErtsPollSet ps){    /* This operation isn't atomic, but we have no need at all for an       atomic operation here... */    int res = ps->interrupt;    ps->interrupt = 0;    return res;}#endifstatic void fatal_error(char *format, ...);static void fatal_error_async_signal_safe(char *error_str);static int max_fds = -1;static ErtsPollSet pollsets;static erts_smp_spinlock_t pollsets_lock;#if ERTS_POLL_USE_POLLstatic ERTS_INLINE shortev2pollev(ErtsPollEvents ev){#if !ERTS_POLL_USE_FALLBACK || ERTS_POLL_USE_KQUEUE    return ERTS_POLL_EV_E2N(ev);#else /* Note, we only map events we are interested in */    short res_ev = (short) 0;    if (ev & ERTS_POLL_EV_IN)	res_ev |= ERTS_POLL_EV_NKP_IN;    if (ev & ERTS_POLL_EV_OUT)	res_ev |= ERTS_POLL_EV_NKP_OUT;    return res_ev;#endif}static ERTS_INLINE ErtsPollEventspollev2ev(short ev){#if !ERTS_POLL_USE_FALLBACK || ERTS_POLL_USE_KQUEUE    return ERTS_POLL_EV_N2E(ev);#else /* Note, we only map events we are interested in */    ErtsPollEvents res_ev = (ErtsPollEvents) 0;    if (ev & ERTS_POLL_EV_NKP_IN)	res_ev |= ERTS_POLL_EV_IN;    if (ev & ERTS_POLL_EV_NKP_OUT)	res_ev |= ERTS_POLL_EV_OUT;    if (ev & ERTS_POLL_EV_NKP_ERR)	res_ev |= ERTS_POLL_EV_ERR;    if (ev & ERTS_POLL_EV_NKP_NVAL)	res_ev |= ERTS_POLL_EV_NVAL;    return res_ev;#endif}#endif#ifdef HARD_DEBUGstatic void check_poll_result(ErtsPollResFd pr[], int len);#if ERTS_POLL_USE_DEVPOLLstatic void check_poll_status(ErtsPollSet ps);#endif /* ERTS_POLL_USE_DEVPOLL */#endif /* HARD_DEBUG */#ifdef ERTS_POLL_DEBUG_PRINTstatic void print_misc_debug_info(void);#endif/* * --- Wakeup pipe ----------------------------------------------------------- */#if ERTS_POLL_USE_WAKEUP_PIPEstatic ERTS_INLINE voidwake_poller(ErtsPollSet ps){    /*     * NOTE: This function might be called from signal handlers in the     *       non-smp case; therefore, it has to be async-signal safe in     *       the non-smp case.     */    if (!ERTS_POLLSET_SET_POLLER_WOKEN_CHK(ps)) {	ssize_t res;	if (ps->wake_fds[1] < 0)	    return; /* Not initialized yet */	do {	    /* write() is async-signal safe (according to posix) */	    res = write(ps->wake_fds[1], "!", 1);	} while (res < 0 && errno == EINTR);	if (res <= 0 && errno != ERRNO_BLOCK) {	    fatal_error_async_signal_safe(__FILE__					  ":XXX:wake_poller(): "					  "Failed to write on wakeup pipe\n");	}    }}static ERTS_INLINE voidcleanup_wakeup_pipe(ErtsPollSet ps){    int fd = ps->wake_fds[0];    int res;    do {	char *buf[32];	res = read(fd, (void *) buf, 32);    } while (res > 0 || (res < 0 && errno == EINTR));    if (res < 0 && errno != ERRNO_BLOCK) {	fatal_error("%s:%d:cleanup_wakeup_pipe(): "		    "Failed to read on wakeup pipe fd=%d: "		    "%s (%d)\n",		    __FILE__, __LINE__,		    fd,		    erl_errno_id(errno), errno);    }}static voidcreate_wakeup_pipe(ErtsPollSet ps){    int wake_fds[2];    ps->wake_fds[0] = -1;    ps->wake_fds[1] = -1;    if (pipe(wake_fds) < 0) {	fatal_error("%s:%d:create_wakeup_pipe(): "		    "Failed to create pipe: %s (%d)\n",		    __FILE__,		    __LINE__,		    erl_errno_id(errno),		    errno);    }    SET_NONBLOCKING(wake_fds[0]);    SET_NONBLOCKING(wake_fds[1]);#ifdef ERTS_POLL_DEBUG_PRINT    erts_printf("wakeup fds = {%d, %d}\n", wake_fds[0], wake_fds[1]);#endif    ERTS_POLL_EXPORT(erts_poll_control)(ps,					wake_fds[0],					ERTS_POLL_EV_IN,					1);#if ERTS_POLL_USE_FALLBACK    /* We depend on the wakeup pipe being handled by kernel poll */    if (ps->fds_status[wake_fds[0]].flags & ERTS_POLL_FD_FLG_INFLBCK)	fatal_error("%s:%d:create_wakeup_pipe(): Internal error\n",		    __FILE__, __LINE__);#endif    if (ps->internal_fd_limit <= wake_fds[1])	ps->internal_fd_limit = wake_fds[1] + 1;    if (ps->internal_fd_limit <= wake_fds[0])	ps->internal_fd_limit = wake_fds[0] + 1;    ps->wake_fds[0] = wake_fds[0];    ps->wake_fds[1] = wake_fds[1];}#endif /* ERTS_POLL_USE_WAKEUP_PIPE *//* * --- Poll set update requests ---------------------------------------------- */#if ERTS_POLL_USE_UPDATE_REQUESTS_QUEUEstatic ERTS_INLINE voidenqueue_update_request(ErtsPollSet ps, int fd){    ErtsPollSetUpdateRequestsBlock *urqbp;    ASSERT(fd < ps->fds_status_len);    if (ps->fds_status[fd].flags & ERTS_POLL_FD_FLG_INURQ)	return;    if (ps->update_requests.len == 0)	ERTS_POLLSET_SET_HAVE_UPDATE_REQUESTS(ps);    urqbp = ps->curr_upd_req_block;    if (urqbp->len == ERTS_POLLSET_UPDATE_REQ_BLOCK_SIZE) {	ASSERT(!urqbp->next);	urqbp = erts_alloc(ERTS_ALC_T_POLLSET_UPDREQ,			   sizeof(ErtsPollSetUpdateRequestsBlock));	ps->curr_upd_req_block->next = urqbp;	ps->curr_upd_req_block = urqbp;	urqbp->next = NULL;	urqbp->len = 0;    }    ps->fds_status[fd].flags |= ERTS_POLL_FD_FLG_INURQ;

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?