⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 poll.c

📁 非常经典的一个分布式系统
💻 C
📖 第 1 页 / 共 2 页
字号:
/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * * Copyright (C) 2001, 2002 Cluster File Systems, Inc. *   Author: Maxim Patlasov <maxim@clusterfs.com> * *   This file is part of the Lustre file system, http://www.lustre.org *   Lustre is a trademark of Cluster File Systems, Inc. * */#include "usocklnd.h"#include <unistd.h>#include <syscall.h>voidusocklnd_process_stale_list(usock_pollthread_t *pt_data){        while (!list_empty(&pt_data->upt_stale_list)) {                usock_conn_t *conn;                                        conn = list_entry(pt_data->upt_stale_list.next,                                  usock_conn_t, uc_stale_list);                                list_del(&conn->uc_stale_list);                                usocklnd_tear_peer_conn(conn);                usocklnd_conn_decref(conn); /* -1 for idx2conn[idx] or pr */        }}intusocklnd_poll_thread(void *arg){        int                 rc = 0;        usock_pollthread_t *pt_data = (usock_pollthread_t *)arg;        cfs_time_t          current_time;        cfs_time_t          planned_time;        int                 idx;        int                 idx_start;        int                 idx_finish;        int                 chunk;        int                 saved_nfds;        int                 extra;        int                 times;        /* mask signals to avoid SIGPIPE, etc */        sigset_t  sigs;        sigfillset (&sigs);        pthread_sigmask (SIG_SETMASK, &sigs, 0);                LASSERT(pt_data != NULL);                planned_time = cfs_time_shift(usock_tuns.ut_poll_timeout);        chunk = usocklnd_calculate_chunk_size(pt_data->upt_nfds);        saved_nfds = pt_data->upt_nfds;        idx_start = 1;                /* Main loop */        while (usock_data.ud_shutdown == 0) {                rc = 0;                /* Process all enqueued poll requests */                pthread_mutex_lock(&pt_data->upt_pollrequests_lock);                while (!list_empty(&pt_data->upt_pollrequests)) {                        usock_pollrequest_t *pr;                        pr = list_entry(pt_data->upt_pollrequests.next,                                        usock_pollrequest_t, upr_list);                                                list_del(&pr->upr_list);                        rc = usocklnd_process_pollrequest(pr, pt_data);                        if (rc)                                break;                                        }                pthread_mutex_unlock(&pt_data->upt_pollrequests_lock);                if (rc)                        break;                /* Delete conns orphaned due to POLL_DEL_REQUESTs */                usocklnd_process_stale_list(pt_data);                                /* Actual polling for events */                rc = poll(pt_data->upt_pollfd,                          pt_data->upt_nfds,                          usock_tuns.ut_poll_timeout * 1000);                if (rc < 0) {                        CERROR("Cannot poll(2): errno=%d\n", errno);                        break;                }                if (rc > 0)                        usocklnd_execute_handlers(pt_data);                current_time = cfs_time_current();                if (pt_data->upt_nfds < 2 ||                    cfs_time_before(current_time, planned_time))                        continue;                /* catch up growing pollfd[] */                if (pt_data->upt_nfds > saved_nfds) {                        extra = pt_data->upt_nfds - saved_nfds;                        saved_nfds = pt_data->upt_nfds;                } else {                        extra = 0;                }                times = cfs_duration_sec(cfs_time_sub(current_time, planned_time)) + 1;                                idx_finish = MIN(idx_start + chunk*times + extra, pt_data->upt_nfds);                for (idx = idx_start; idx < idx_finish; idx++) {                        usock_conn_t *conn = pt_data->upt_idx2conn[idx];                        pthread_mutex_lock(&conn->uc_lock);                        if (usocklnd_conn_timed_out(conn, current_time) &&                            conn->uc_state != UC_DEAD) {                                conn->uc_errored = 1;                                usocklnd_conn_kill_locked(conn);                        }                        pthread_mutex_unlock(&conn->uc_lock);                }                if (idx_finish == pt_data->upt_nfds) {                                                chunk = usocklnd_calculate_chunk_size(pt_data->upt_nfds);                        saved_nfds = pt_data->upt_nfds;                        idx_start = 1;                }                else {                        idx_start = idx_finish;                }                                planned_time = cfs_time_add(current_time,                                            cfs_time_seconds(usock_tuns.ut_poll_timeout));        }                /* All conns should be deleted by POLL_DEL_REQUESTs while shutdown */        LASSERT (rc != 0 || pt_data->upt_nfds == 1);        if (rc) {                pthread_mutex_lock(&pt_data->upt_pollrequests_lock);                /* Block new poll requests to be enqueued */                pt_data->upt_errno = rc;                                while (!list_empty(&pt_data->upt_pollrequests)) {                        usock_pollrequest_t *pr;                        pr = list_entry(pt_data->upt_pollrequests.next,                                        usock_pollrequest_t, upr_list);                                                list_del(&pr->upr_list);                        if (pr->upr_type == POLL_ADD_REQUEST) {                                close(pr->upr_conn->uc_fd);                                list_add_tail(&pr->upr_conn->uc_stale_list,                                              &pt_data->upt_stale_list);                        } else {                                usocklnd_conn_decref(pr->upr_conn);                        }                                                LIBCFS_FREE (pr, sizeof(*pr));                }                pthread_mutex_unlock(&pt_data->upt_pollrequests_lock);                usocklnd_process_stale_list(pt_data);                                for (idx = 1; idx < pt_data->upt_nfds; idx++) {                        usock_conn_t *conn = pt_data->upt_idx2conn[idx];                        LASSERT(conn != NULL);                        close(conn->uc_fd);                        usocklnd_tear_peer_conn(conn);                        usocklnd_conn_decref(conn);                }        }                /* unblock usocklnd_shutdown() */        cfs_complete(&pt_data->upt_completion);        return 0;}/* Returns 0 on success, <0 else */intusocklnd_add_pollrequest(usock_conn_t *conn, int type, short value){        int                  pt_idx = conn->uc_pt_idx;        usock_pollthread_t  *pt     = &usock_data.ud_pollthreads[pt_idx];        usock_pollrequest_t *pr;        LIBCFS_ALLOC(pr, sizeof(*pr));        if (pr == NULL) {                CERROR ("Cannot allocate poll request\n");                return -ENOMEM;        }        pr->upr_conn = conn;        pr->upr_type = type;        pr->upr_value = value;        usocklnd_conn_addref(conn); /* +1 for poll request */                pthread_mutex_lock(&pt->upt_pollrequests_lock);        if (pt->upt_errno) { /* very rare case: errored poll thread */                int rc = pt->upt_errno;                pthread_mutex_unlock(&pt->upt_pollrequests_lock);                usocklnd_conn_decref(conn);                LIBCFS_FREE(pr, sizeof(*pr));                return rc;        }                list_add_tail(&pr->upr_list, &pt->upt_pollrequests);        pthread_mutex_unlock(&pt->upt_pollrequests_lock);        return 0;}voidusocklnd_add_killrequest(usock_conn_t *conn){        int                  pt_idx = conn->uc_pt_idx;        usock_pollthread_t  *pt     = &usock_data.ud_pollthreads[pt_idx];        usock_pollrequest_t *pr     = conn->uc_preq;                /* Use preallocated poll request because there is no good         * workaround for ENOMEM error while killing connection */        if (pr) {                pr->upr_conn  = conn;                pr->upr_type  = POLL_DEL_REQUEST;                pr->upr_value = 0;                usocklnd_conn_addref(conn); /* +1 for poll request */                                pthread_mutex_lock(&pt->upt_pollrequests_lock);                                if (pt->upt_errno) { /* very rare case: errored poll thread */                        pthread_mutex_unlock(&pt->upt_pollrequests_lock);                        usocklnd_conn_decref(conn);                        return; /* conn will be killed in poll thread anyway */                }                list_add_tail(&pr->upr_list, &pt->upt_pollrequests);                pthread_mutex_unlock(&pt->upt_pollrequests_lock);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -