⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 poll.c

📁 非常经典的一个分布式系统
💻 C
📖 第 1 页 / 共 2 页
字号:
                conn->uc_preq = NULL;        }}/* Process poll request. Update poll data. * Returns 0 on success, <0 else */intusocklnd_process_pollrequest(usock_pollrequest_t *pr,                             usock_pollthread_t *pt_data){        int            type  = pr->upr_type;        short          value = pr->upr_value;        usock_conn_t  *conn  = pr->upr_conn;        int            idx = 0;        struct pollfd *pollfd   = pt_data->upt_pollfd;        int           *fd2idx   = pt_data->upt_fd2idx;        usock_conn_t **idx2conn = pt_data->upt_idx2conn;        int           *skip     = pt_data->upt_skip;                LASSERT(conn != NULL);        LASSERT(conn->uc_fd >=0);        LASSERT(type == POLL_ADD_REQUEST ||                conn->uc_fd < pt_data->upt_nfd2idx);        if (type != POLL_ADD_REQUEST) {                idx = fd2idx[conn->uc_fd];                if (idx > 0 && idx < pt_data->upt_nfds) { /* hot path */                        LASSERT(pollfd[idx].fd == conn->uc_fd);                } else { /* unlikely */                        CWARN("Very unlikely event happend: trying to"                              " handle poll request of type %d but idx=%d"                              " is out of range [1 ... %d]. Is shutdown"                              " in progress (%d)?\n",                              type, idx, pt_data->upt_nfds - 1,                              usock_data.ud_shutdown);                        LIBCFS_FREE (pr, sizeof(*pr));                        usocklnd_conn_decref(conn);                        return 0;                }        }        LIBCFS_FREE (pr, sizeof(*pr));                switch (type) {        case POLL_ADD_REQUEST:                if (pt_data->upt_nfds >= pt_data->upt_npollfd) {                        /* resize pollfd[], idx2conn[] and skip[] */                        struct pollfd *new_pollfd;                        int            new_npollfd = pt_data->upt_npollfd * 2;                        usock_conn_t **new_idx2conn;                        int           *new_skip;                        new_pollfd = LIBCFS_REALLOC(pollfd, new_npollfd *                                                     sizeof(struct pollfd));                        if (new_pollfd == NULL)                                goto process_pollrequest_enomem;                        pt_data->upt_pollfd = pollfd = new_pollfd;                                                new_idx2conn = LIBCFS_REALLOC(idx2conn, new_npollfd *                                                      sizeof(usock_conn_t *));                        if (new_idx2conn == NULL)                                goto process_pollrequest_enomem;                        pt_data->upt_idx2conn = idx2conn = new_idx2conn;                        new_skip = LIBCFS_REALLOC(skip, new_npollfd *                                                  sizeof(int));                        if (new_skip == NULL)                                goto process_pollrequest_enomem;                        pt_data->upt_skip = new_skip;                                                pt_data->upt_npollfd = new_npollfd;                }                if (conn->uc_fd >= pt_data->upt_nfd2idx) {                        /* resize fd2idx[] */                        int *new_fd2idx;                        int  new_nfd2idx = pt_data->upt_nfd2idx * 2;                        while (new_nfd2idx <= conn->uc_fd)                                new_nfd2idx *= 2;                        new_fd2idx = LIBCFS_REALLOC(fd2idx, new_nfd2idx *                                                    sizeof(int));                        if (new_fd2idx == NULL)                                goto process_pollrequest_enomem;                        pt_data->upt_fd2idx = fd2idx = new_fd2idx;                        memset(fd2idx + pt_data->upt_nfd2idx, 0,                               (new_nfd2idx - pt_data->upt_nfd2idx)                               * sizeof(int));                        pt_data->upt_nfd2idx = new_nfd2idx;                }                LASSERT(fd2idx[conn->uc_fd] == 0);                idx = pt_data->upt_nfds++;                idx2conn[idx] = conn;                fd2idx[conn->uc_fd] = idx;                pollfd[idx].fd = conn->uc_fd;                pollfd[idx].events = value;                pollfd[idx].revents = 0;                break;        case POLL_DEL_REQUEST:                fd2idx[conn->uc_fd] = 0; /* invalidate this entry */                                --pt_data->upt_nfds;                if (idx != pt_data->upt_nfds) {                        /* shift last entry into released position */                        memcpy(&pollfd[idx], &pollfd[pt_data->upt_nfds],                               sizeof(struct pollfd));                        idx2conn[idx] = idx2conn[pt_data->upt_nfds];                        fd2idx[pollfd[idx].fd] = idx;                                        }                close(conn->uc_fd);                list_add_tail(&conn->uc_stale_list, &pt_data->upt_stale_list);                break;        case POLL_RX_SET_REQUEST:                pollfd[idx].events = (pollfd[idx].events & ~POLLIN) | value;                break;        case POLL_TX_SET_REQUEST:                pollfd[idx].events = (pollfd[idx].events & ~POLLOUT) | value;                break;        case POLL_SET_REQUEST:                pollfd[idx].events = value;                break;        default:                LBUG(); /* unknown type */                        }        /* In the case of POLL_ADD_REQUEST, idx2conn[idx] takes the         * reference that poll request possesses */        if (type != POLL_ADD_REQUEST)                usocklnd_conn_decref(conn);                return 0;  process_pollrequest_enomem:        usocklnd_conn_decref(conn);        return -ENOMEM;}/* Loop on poll data executing handlers repeatedly until *  fair_limit is reached or all entries are exhausted */voidusocklnd_execute_handlers(usock_pollthread_t *pt_data){        struct pollfd *pollfd   = pt_data->upt_pollfd;        int            nfds     = pt_data->upt_nfds;        usock_conn_t **idx2conn = pt_data->upt_idx2conn;        int           *skip     = pt_data->upt_skip;        int            j;        if (pollfd[0].revents & POLLIN)                while (usocklnd_notifier_handler(pollfd[0].fd) > 0)                        ;        skip[0] = 1; /* always skip notifier fd */        for (j = 0; j < usock_tuns.ut_fair_limit; j++) {                int prev = 0;                int i = skip[0];                                if (i >= nfds) /* nothing ready */                        break;                                do {                        usock_conn_t *conn = idx2conn[i];                        int next;                                                if (j == 0) /* first pass... */                                next = skip[i] = i+1; /* set skip chain */                        else /* later passes... */                                next = skip[i]; /* skip unready pollfds */                        /* kill connection if it's closed by peer and                         * there is no data pending for reading */                        if ((pollfd[i].revents & POLLERR) != 0 ||                            (pollfd[i].revents & POLLHUP) != 0) {                                if ((pollfd[i].events & POLLIN) != 0 &&                                    (pollfd[i].revents & POLLIN) == 0)                                        usocklnd_conn_kill(conn);                                else                                        usocklnd_exception_handler(conn);                        }                                                if ((pollfd[i].revents & POLLIN) != 0 &&                            usocklnd_read_handler(conn) <= 0)                                pollfd[i].revents &= ~POLLIN;                                                if ((pollfd[i].revents & POLLOUT) != 0 &&                            usocklnd_write_handler(conn) <= 0)                                pollfd[i].revents &= ~POLLOUT;                                                if ((pollfd[i].revents & (POLLIN | POLLOUT)) == 0)                                skip[prev] = next; /* skip this entry next pass */                        else                                prev = i;                                                i = next;                } while (i < nfds);        }}intusocklnd_calculate_chunk_size(int num){        const int n     = 4;        const int p     = usock_tuns.ut_poll_timeout;        int       chunk = num;                /* chunk should be big enough to detect a timeout on any         * connection within (n+1)/n times the timeout interval         * if we checks every 'p' seconds 'chunk' conns */                         if (usock_tuns.ut_timeout > n * p)                chunk = (chunk * n * p) / usock_tuns.ut_timeout;                if (chunk == 0)                chunk = 1;        return chunk;}voidusocklnd_wakeup_pollthread(int i){        usock_pollthread_t *pt = &usock_data.ud_pollthreads[i];        int                 notification = 0;        int                 rc;        rc = syscall(SYS_write, pt->upt_notifier_fd, &notification,                     sizeof(notification));        if (rc != sizeof(notification))                CERROR("Very unlikely event happend: "                       "cannot write to notifier fd (rc=%d; errno=%d)\n",                       rc, errno);}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -