⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 conrpc.c

📁 lustre 1.6.5 source code
💻 C
📖 第 1 页 / 共 3 页
字号:
/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: *  * Author: Liang Zhen <liangzhen@clusterfs.com> * * This file is part of Lustre, http://www.lustre.org * * Console framework rpcs */#ifdef __KERNEL__#include <libcfs/libcfs.h>#include <lnet/lib-lnet.h>#include "timer.h"#include "conrpc.h"#include "console.h"void lstcon_rpc_stat_reply(int, srpc_msg_t *,                           lstcon_node_t *, lstcon_trans_stat_t *);static voidlstcon_rpc_done(srpc_client_rpc_t *rpc){        lstcon_rpc_t *crpc = (lstcon_rpc_t *)rpc->crpc_priv;        LASSERT (crpc != NULL && rpc == crpc->crp_rpc);        LASSERT (crpc->crp_posted && !crpc->crp_finished);        spin_lock(&rpc->crpc_lock);        if (crpc->crp_trans == NULL) {                /* Orphan RPC is not in any transaction,                  * I'm just a poor body and nobody loves me */                spin_unlock(&rpc->crpc_lock);                /* release it */                lstcon_rpc_put(crpc);                return;        }        /* not an orphan RPC */        crpc->crp_finished = 1;        if (crpc->crp_stamp == 0) {                /* not aborted */                LASSERT (crpc->crp_status == 0);                crpc->crp_stamp  = cfs_time_current();                crpc->crp_status = rpc->crpc_status;        }        /* wakeup (transaction)thread if I'm the last RPC in the transaction */        if (atomic_dec_and_test(&crpc->crp_trans->tas_remaining))                cfs_waitq_signal(&crpc->crp_trans->tas_waitq);        spin_unlock(&rpc->crpc_lock);}intlstcon_rpc_init(lstcon_node_t *nd, int service,                int npg, int cached, lstcon_rpc_t *crpc){        crpc->crp_rpc = sfw_create_rpc(nd->nd_id, service,                                        npg, npg * CFS_PAGE_SIZE,                                       lstcon_rpc_done, (void *)crpc);        if (crpc->crp_rpc == NULL)                return -ENOMEM;        crpc->crp_trans    = NULL;        crpc->crp_node     = nd;        crpc->crp_posted   = 0;        crpc->crp_finished = 0;        crpc->crp_unpacked = 0;        crpc->crp_status   = 0;        crpc->crp_stamp    = 0;        crpc->crp_static   = !cached;        CFS_INIT_LIST_HEAD(&crpc->crp_link);        atomic_inc(&console_session.ses_rpc_counter);        return 0;}intlstcon_rpc_prep(lstcon_node_t *nd, int service,                int npg, lstcon_rpc_t **crpcpp){        lstcon_rpc_t  *crpc = NULL;        int            rc;        spin_lock(&console_session.ses_rpc_lock);        if (!list_empty(&console_session.ses_rpc_freelist)) {                crpc = list_entry(console_session.ses_rpc_freelist.next,                                  lstcon_rpc_t, crp_link);                list_del_init(&crpc->crp_link);        }        spin_unlock(&console_session.ses_rpc_lock);        if (crpc == NULL) {                LIBCFS_ALLOC(crpc, sizeof(*crpc));                if (crpc == NULL)                        return -ENOMEM;        }        rc = lstcon_rpc_init(nd, service, npg, 1, crpc);        if (rc == 0) {                *crpcpp = crpc;                return 0;        }        LIBCFS_FREE(crpc, sizeof(*crpc));        return rc;}voidlstcon_rpc_put(lstcon_rpc_t *crpc){        srpc_bulk_t *bulk = &crpc->crp_rpc->crpc_bulk;        int          i;        LASSERT (list_empty(&crpc->crp_link));        for (i = 0; i < bulk->bk_niov; i++) {                if (bulk->bk_iovs[i].kiov_page == NULL)                        continue;                cfs_free_page(bulk->bk_iovs[i].kiov_page);        }        srpc_client_rpc_decref(crpc->crp_rpc);        if (crpc->crp_static) {                /* Static RPC, not allocated */                memset(crpc, 0, sizeof(*crpc));                crpc->crp_static = 1;        } else {                spin_lock(&console_session.ses_rpc_lock);                list_add(&crpc->crp_link, &console_session.ses_rpc_freelist);                spin_unlock(&console_session.ses_rpc_lock);        }        /* RPC is not alive now */        atomic_dec(&console_session.ses_rpc_counter);}voidlstcon_rpc_post(lstcon_rpc_t *crpc){        lstcon_rpc_trans_t *trans = crpc->crp_trans;        LASSERT (trans != NULL);        atomic_inc(&trans->tas_remaining);        crpc->crp_posted = 1;        sfw_post_rpc(crpc->crp_rpc);}static char *lstcon_rpc_trans_name(int transop){        if (transop == LST_TRANS_SESNEW)                return "SESNEW";        if (transop == LST_TRANS_SESEND)                return "SESEND";        if (transop == LST_TRANS_SESQRY)                return "SESQRY";        if (transop == LST_TRANS_SESPING)                return "SESPING";        if (transop == LST_TRANS_TSBCLIADD)                return "TSBCLIADD";        if (transop == LST_TRANS_TSBSRVADD)                return "TSBSRVADD";        if (transop == LST_TRANS_TSBRUN)                return "TSBRUN";        if (transop == LST_TRANS_TSBSTOP)                return "TSBSTOP";        if (transop == LST_TRANS_TSBCLIQRY)                return "TSBCLIQRY";        if (transop == LST_TRANS_TSBSRVQRY)                return "TSBSRVQRY";        if (transop == LST_TRANS_STATQRY)                return "STATQRY";        return "Unknown";}intlstcon_rpc_trans_prep(struct list_head *translist,                      int transop, lstcon_rpc_trans_t **transpp){        lstcon_rpc_trans_t *trans;        if (translist != NULL) {                list_for_each_entry(trans, translist, tas_link) {                        /* Can't enqueue two private transaction on                         * the same object */                        if ((trans->tas_opc & transop) == LST_TRANS_PRIVATE)                                return -EPERM;                }        }        /* create a trans group */        LIBCFS_ALLOC(trans, sizeof(*trans));        if (trans == NULL)                return -ENOMEM;                trans->tas_opc = transop;        if (translist == NULL)                       CFS_INIT_LIST_HEAD(&trans->tas_olink);        else                list_add_tail(&trans->tas_olink, translist);        list_add_tail(&trans->tas_link, &console_session.ses_trans_list);        CFS_INIT_LIST_HEAD(&trans->tas_rpcs_list);        atomic_set(&trans->tas_remaining, 0);        cfs_waitq_init(&trans->tas_waitq);        *transpp = trans;        return 0;}voidlstcon_rpc_trans_addreq(lstcon_rpc_trans_t *trans, lstcon_rpc_t *crpc){        list_add_tail(&crpc->crp_link, &trans->tas_rpcs_list);        crpc->crp_trans = trans;}voidlstcon_rpc_trans_abort(lstcon_rpc_trans_t *trans, int error){        srpc_client_rpc_t *rpc;        lstcon_rpc_t      *crpc;        lstcon_node_t     *nd;        list_for_each_entry (crpc, &trans->tas_rpcs_list, crp_link) {                rpc = crpc->crp_rpc;                spin_lock(&rpc->crpc_lock);                if (!crpc->crp_posted || crpc->crp_stamp != 0) {                        /* rpc done or aborted already */                        spin_unlock(&rpc->crpc_lock);                        continue;                }                crpc->crp_stamp  = cfs_time_current();                crpc->crp_status = error;                spin_unlock(&rpc->crpc_lock);                sfw_abort_rpc(rpc);                if  (error != ETIMEDOUT)                        continue;                nd = crpc->crp_node;                if (cfs_time_after(nd->nd_stamp, crpc->crp_stamp))                        continue;                nd->nd_stamp = crpc->crp_stamp;                nd->nd_state = LST_NODE_DOWN;        }}static intlstcon_rpc_trans_check(lstcon_rpc_trans_t *trans){        if (console_session.ses_shutdown &&            !list_empty(&trans->tas_olink)) /* It's not an end session RPC */                return 1;        return (atomic_read(&trans->tas_remaining) == 0) ? 1: 0;}intlstcon_rpc_trans_postwait(lstcon_rpc_trans_t *trans, int timeout){        lstcon_rpc_t  *crpc;        int            rc;        if (list_empty(&trans->tas_rpcs_list))                return 0;        if (timeout < LST_TRANS_MIN_TIMEOUT)                timeout = LST_TRANS_MIN_TIMEOUT;        CDEBUG(D_NET, "Transaction %s started\n",               lstcon_rpc_trans_name(trans->tas_opc));        /* post all requests */        list_for_each_entry (crpc, &trans->tas_rpcs_list, crp_link) {                LASSERT (!crpc->crp_posted);                lstcon_rpc_post(crpc);        }        mutex_up(&console_session.ses_mutex);        rc = cfs_waitq_wait_event_interruptible_timeout(trans->tas_waitq,                                              lstcon_rpc_trans_check(trans),                                              timeout * HZ);        rc = (rc > 0)? 0: ((rc < 0)? -EINTR: -ETIMEDOUT);        mutex_down(&console_session.ses_mutex);        if (console_session.ses_shutdown)                rc = -ESHUTDOWN;        if (rc != 0) {                /* treat short timeout as canceled */                if (rc == -ETIMEDOUT && timeout < LST_TRANS_MIN_TIMEOUT * 2)                        rc = -EINTR;                lstcon_rpc_trans_abort(trans, rc);        }        CDEBUG(D_NET, "Transaction %s stopped: %d\n",               lstcon_rpc_trans_name(trans->tas_opc), rc);        lstcon_rpc_trans_stat(trans, lstcon_trans_stat());        return rc;}intlstcon_rpc_get_reply(lstcon_rpc_t *crpc, srpc_msg_t **msgpp){        lstcon_node_t        *nd  = crpc->crp_node;        srpc_client_rpc_t    *rpc = crpc->crp_rpc;        srpc_generic_reply_t *rep;        LASSERT (nd != NULL && rpc != NULL);        LASSERT (crpc->crp_stamp != 0);        if (crpc->crp_status != 0) {                *msgpp = NULL;                return crpc->crp_status;        }        *msgpp = &rpc->crpc_replymsg;        if (!crpc->crp_unpacked) {                sfw_unpack_message(*msgpp);                crpc->crp_unpacked = 1;        }               if (cfs_time_after(nd->nd_stamp, crpc->crp_stamp))                return 0;        nd->nd_stamp = crpc->crp_stamp;        rep = &(*msgpp)->msg_body.reply;        if (rep->sid.ses_nid == LNET_NID_ANY)                nd->nd_state = LST_NODE_UNKNOWN;        else if (lstcon_session_match(rep->sid))                nd->nd_state = LST_NODE_ACTIVE;        else                nd->nd_state = LST_NODE_BUSY;        return 0;}voidlstcon_rpc_trans_stat(lstcon_rpc_trans_t *trans, lstcon_trans_stat_t *stat){        lstcon_rpc_t      *crpc;        srpc_client_rpc_t *rpc;        srpc_msg_t        *rep;        int                error;        LASSERT (stat != NULL);        memset(stat, 0, sizeof(*stat));        list_for_each_entry(crpc, &trans->tas_rpcs_list, crp_link) {                lstcon_rpc_stat_total(stat, 1);                rpc = crpc->crp_rpc;                LASSERT (crpc->crp_stamp != 0);                error = lstcon_rpc_get_reply(crpc, &rep);                if (error != 0) {                        lstcon_rpc_stat_failure(stat, 1);                        if (stat->trs_rpc_errno == 0)                                stat->trs_rpc_errno = -error;                        continue;                }                lstcon_rpc_stat_success(stat, 1);                lstcon_rpc_stat_reply(trans->tas_opc, rep,                                      crpc->crp_node, stat);        }        CDEBUG(D_NET, "transaction %s : success %d, failure %d, total %d, "                      "RPC error(%d), Framework error(%d)\n",               lstcon_rpc_trans_name(trans->tas_opc),               lstcon_rpc_stat_success(stat, 0),               lstcon_rpc_stat_failure(stat, 0),               lstcon_rpc_stat_total(stat, 0),               stat->trs_rpc_errno, stat->trs_fwk_errno);        return;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -