📄 conrpc.c
字号:
/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * * Author: Liang Zhen <liangzhen@clusterfs.com> * * This file is part of Lustre, http://www.lustre.org * * Console framework rpcs */#ifdef __KERNEL__#include <libcfs/libcfs.h>#include <lnet/lib-lnet.h>#include "timer.h"#include "conrpc.h"#include "console.h"void lstcon_rpc_stat_reply(int, srpc_msg_t *, lstcon_node_t *, lstcon_trans_stat_t *);static voidlstcon_rpc_done(srpc_client_rpc_t *rpc){ lstcon_rpc_t *crpc = (lstcon_rpc_t *)rpc->crpc_priv; LASSERT (crpc != NULL && rpc == crpc->crp_rpc); LASSERT (crpc->crp_posted && !crpc->crp_finished); spin_lock(&rpc->crpc_lock); if (crpc->crp_trans == NULL) { /* Orphan RPC is not in any transaction, * I'm just a poor body and nobody loves me */ spin_unlock(&rpc->crpc_lock); /* release it */ lstcon_rpc_put(crpc); return; } /* not an orphan RPC */ crpc->crp_finished = 1; if (crpc->crp_stamp == 0) { /* not aborted */ LASSERT (crpc->crp_status == 0); crpc->crp_stamp = cfs_time_current(); crpc->crp_status = rpc->crpc_status; } /* wakeup (transaction)thread if I'm the last RPC in the transaction */ if (atomic_dec_and_test(&crpc->crp_trans->tas_remaining)) cfs_waitq_signal(&crpc->crp_trans->tas_waitq); spin_unlock(&rpc->crpc_lock);}intlstcon_rpc_init(lstcon_node_t *nd, int service, int npg, int cached, lstcon_rpc_t *crpc){ crpc->crp_rpc = sfw_create_rpc(nd->nd_id, service, npg, npg * CFS_PAGE_SIZE, lstcon_rpc_done, (void *)crpc); if (crpc->crp_rpc == NULL) return -ENOMEM; crpc->crp_trans = NULL; crpc->crp_node = nd; crpc->crp_posted = 0; crpc->crp_finished = 0; crpc->crp_unpacked = 0; crpc->crp_status = 0; crpc->crp_stamp = 0; crpc->crp_static = !cached; CFS_INIT_LIST_HEAD(&crpc->crp_link); atomic_inc(&console_session.ses_rpc_counter); return 0;}intlstcon_rpc_prep(lstcon_node_t *nd, int service, int npg, lstcon_rpc_t **crpcpp){ lstcon_rpc_t *crpc = NULL; int rc; spin_lock(&console_session.ses_rpc_lock); if (!list_empty(&console_session.ses_rpc_freelist)) { crpc = list_entry(console_session.ses_rpc_freelist.next, lstcon_rpc_t, crp_link); list_del_init(&crpc->crp_link); } spin_unlock(&console_session.ses_rpc_lock); if (crpc == NULL) { LIBCFS_ALLOC(crpc, sizeof(*crpc)); if (crpc == NULL) return -ENOMEM; } rc = lstcon_rpc_init(nd, service, npg, 1, crpc); if (rc == 0) { *crpcpp = crpc; return 0; } LIBCFS_FREE(crpc, sizeof(*crpc)); return rc;}voidlstcon_rpc_put(lstcon_rpc_t *crpc){ srpc_bulk_t *bulk = &crpc->crp_rpc->crpc_bulk; int i; LASSERT (list_empty(&crpc->crp_link)); for (i = 0; i < bulk->bk_niov; i++) { if (bulk->bk_iovs[i].kiov_page == NULL) continue; cfs_free_page(bulk->bk_iovs[i].kiov_page); } srpc_client_rpc_decref(crpc->crp_rpc); if (crpc->crp_static) { /* Static RPC, not allocated */ memset(crpc, 0, sizeof(*crpc)); crpc->crp_static = 1; } else { spin_lock(&console_session.ses_rpc_lock); list_add(&crpc->crp_link, &console_session.ses_rpc_freelist); spin_unlock(&console_session.ses_rpc_lock); } /* RPC is not alive now */ atomic_dec(&console_session.ses_rpc_counter);}voidlstcon_rpc_post(lstcon_rpc_t *crpc){ lstcon_rpc_trans_t *trans = crpc->crp_trans; LASSERT (trans != NULL); atomic_inc(&trans->tas_remaining); crpc->crp_posted = 1; sfw_post_rpc(crpc->crp_rpc);}static char *lstcon_rpc_trans_name(int transop){ if (transop == LST_TRANS_SESNEW) return "SESNEW"; if (transop == LST_TRANS_SESEND) return "SESEND"; if (transop == LST_TRANS_SESQRY) return "SESQRY"; if (transop == LST_TRANS_SESPING) return "SESPING"; if (transop == LST_TRANS_TSBCLIADD) return "TSBCLIADD"; if (transop == LST_TRANS_TSBSRVADD) return "TSBSRVADD"; if (transop == LST_TRANS_TSBRUN) return "TSBRUN"; if (transop == LST_TRANS_TSBSTOP) return "TSBSTOP"; if (transop == LST_TRANS_TSBCLIQRY) return "TSBCLIQRY"; if (transop == LST_TRANS_TSBSRVQRY) return "TSBSRVQRY"; if (transop == LST_TRANS_STATQRY) return "STATQRY"; return "Unknown";}intlstcon_rpc_trans_prep(struct list_head *translist, int transop, lstcon_rpc_trans_t **transpp){ lstcon_rpc_trans_t *trans; if (translist != NULL) { list_for_each_entry(trans, translist, tas_link) { /* Can't enqueue two private transaction on * the same object */ if ((trans->tas_opc & transop) == LST_TRANS_PRIVATE) return -EPERM; } } /* create a trans group */ LIBCFS_ALLOC(trans, sizeof(*trans)); if (trans == NULL) return -ENOMEM; trans->tas_opc = transop; if (translist == NULL) CFS_INIT_LIST_HEAD(&trans->tas_olink); else list_add_tail(&trans->tas_olink, translist); list_add_tail(&trans->tas_link, &console_session.ses_trans_list); CFS_INIT_LIST_HEAD(&trans->tas_rpcs_list); atomic_set(&trans->tas_remaining, 0); cfs_waitq_init(&trans->tas_waitq); *transpp = trans; return 0;}voidlstcon_rpc_trans_addreq(lstcon_rpc_trans_t *trans, lstcon_rpc_t *crpc){ list_add_tail(&crpc->crp_link, &trans->tas_rpcs_list); crpc->crp_trans = trans;}voidlstcon_rpc_trans_abort(lstcon_rpc_trans_t *trans, int error){ srpc_client_rpc_t *rpc; lstcon_rpc_t *crpc; lstcon_node_t *nd; list_for_each_entry (crpc, &trans->tas_rpcs_list, crp_link) { rpc = crpc->crp_rpc; spin_lock(&rpc->crpc_lock); if (!crpc->crp_posted || crpc->crp_stamp != 0) { /* rpc done or aborted already */ spin_unlock(&rpc->crpc_lock); continue; } crpc->crp_stamp = cfs_time_current(); crpc->crp_status = error; spin_unlock(&rpc->crpc_lock); sfw_abort_rpc(rpc); if (error != ETIMEDOUT) continue; nd = crpc->crp_node; if (cfs_time_after(nd->nd_stamp, crpc->crp_stamp)) continue; nd->nd_stamp = crpc->crp_stamp; nd->nd_state = LST_NODE_DOWN; }}static intlstcon_rpc_trans_check(lstcon_rpc_trans_t *trans){ if (console_session.ses_shutdown && !list_empty(&trans->tas_olink)) /* It's not an end session RPC */ return 1; return (atomic_read(&trans->tas_remaining) == 0) ? 1: 0;}intlstcon_rpc_trans_postwait(lstcon_rpc_trans_t *trans, int timeout){ lstcon_rpc_t *crpc; int rc; if (list_empty(&trans->tas_rpcs_list)) return 0; if (timeout < LST_TRANS_MIN_TIMEOUT) timeout = LST_TRANS_MIN_TIMEOUT; CDEBUG(D_NET, "Transaction %s started\n", lstcon_rpc_trans_name(trans->tas_opc)); /* post all requests */ list_for_each_entry (crpc, &trans->tas_rpcs_list, crp_link) { LASSERT (!crpc->crp_posted); lstcon_rpc_post(crpc); } mutex_up(&console_session.ses_mutex); rc = cfs_waitq_wait_event_interruptible_timeout(trans->tas_waitq, lstcon_rpc_trans_check(trans), timeout * HZ); rc = (rc > 0)? 0: ((rc < 0)? -EINTR: -ETIMEDOUT); mutex_down(&console_session.ses_mutex); if (console_session.ses_shutdown) rc = -ESHUTDOWN; if (rc != 0) { /* treat short timeout as canceled */ if (rc == -ETIMEDOUT && timeout < LST_TRANS_MIN_TIMEOUT * 2) rc = -EINTR; lstcon_rpc_trans_abort(trans, rc); } CDEBUG(D_NET, "Transaction %s stopped: %d\n", lstcon_rpc_trans_name(trans->tas_opc), rc); lstcon_rpc_trans_stat(trans, lstcon_trans_stat()); return rc;}intlstcon_rpc_get_reply(lstcon_rpc_t *crpc, srpc_msg_t **msgpp){ lstcon_node_t *nd = crpc->crp_node; srpc_client_rpc_t *rpc = crpc->crp_rpc; srpc_generic_reply_t *rep; LASSERT (nd != NULL && rpc != NULL); LASSERT (crpc->crp_stamp != 0); if (crpc->crp_status != 0) { *msgpp = NULL; return crpc->crp_status; } *msgpp = &rpc->crpc_replymsg; if (!crpc->crp_unpacked) { sfw_unpack_message(*msgpp); crpc->crp_unpacked = 1; } if (cfs_time_after(nd->nd_stamp, crpc->crp_stamp)) return 0; nd->nd_stamp = crpc->crp_stamp; rep = &(*msgpp)->msg_body.reply; if (rep->sid.ses_nid == LNET_NID_ANY) nd->nd_state = LST_NODE_UNKNOWN; else if (lstcon_session_match(rep->sid)) nd->nd_state = LST_NODE_ACTIVE; else nd->nd_state = LST_NODE_BUSY; return 0;}voidlstcon_rpc_trans_stat(lstcon_rpc_trans_t *trans, lstcon_trans_stat_t *stat){ lstcon_rpc_t *crpc; srpc_client_rpc_t *rpc; srpc_msg_t *rep; int error; LASSERT (stat != NULL); memset(stat, 0, sizeof(*stat)); list_for_each_entry(crpc, &trans->tas_rpcs_list, crp_link) { lstcon_rpc_stat_total(stat, 1); rpc = crpc->crp_rpc; LASSERT (crpc->crp_stamp != 0); error = lstcon_rpc_get_reply(crpc, &rep); if (error != 0) { lstcon_rpc_stat_failure(stat, 1); if (stat->trs_rpc_errno == 0) stat->trs_rpc_errno = -error; continue; } lstcon_rpc_stat_success(stat, 1); lstcon_rpc_stat_reply(trans->tas_opc, rep, crpc->crp_node, stat); } CDEBUG(D_NET, "transaction %s : success %d, failure %d, total %d, " "RPC error(%d), Framework error(%d)\n", lstcon_rpc_trans_name(trans->tas_opc), lstcon_rpc_stat_success(stat, 0), lstcon_rpc_stat_failure(stat, 0), lstcon_rpc_stat_total(stat, 0), stat->trs_rpc_errno, stat->trs_fwk_errno); return;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -