📄 conrpc.c
字号:
intlstcon_rpc_trans_interpreter(lstcon_rpc_trans_t *trans, struct list_head *head_up, lstcon_rpc_readent_func_t readent){ struct list_head tmp; struct list_head *next; lstcon_rpc_ent_t *ent; srpc_generic_reply_t *rep; srpc_client_rpc_t *rpc; lstcon_rpc_t *crpc; srpc_msg_t *msg; lstcon_node_t *nd; cfs_duration_t dur; struct timeval tv; int error; LASSERT (head_up != NULL); next = head_up; list_for_each_entry(crpc, &trans->tas_rpcs_list, crp_link) { if (copy_from_user(&tmp, next, sizeof(struct list_head))) return -EFAULT; if (tmp.next == head_up) return 0; next = tmp.next; ent = list_entry(next, lstcon_rpc_ent_t, rpe_link); rpc = crpc->crp_rpc; LASSERT (crpc->crp_stamp != 0); error = lstcon_rpc_get_reply(crpc, &msg); nd = crpc->crp_node; dur = cfs_time_sub(crpc->crp_stamp, console_session.ses_id.ses_stamp); cfs_duration_usec(dur, &tv); if (copy_to_user(&ent->rpe_peer, &nd->nd_id, sizeof(lnet_process_id_t)) || copy_to_user(&ent->rpe_stamp, &tv, sizeof(tv)) || copy_to_user(&ent->rpe_state, &nd->nd_state, sizeof(nd->nd_state)) || copy_to_user(&ent->rpe_rpc_errno, &error, sizeof(error))) return -EFAULT; if (error != 0) continue; /* RPC is done */ rep = (srpc_generic_reply_t *)&msg->msg_body.reply; if (copy_to_user(&ent->rpe_sid, &rep->sid, sizeof(lst_sid_t)) || copy_to_user(&ent->rpe_fwk_errno, &rep->status, sizeof(rep->status))) return -EFAULT; if (readent == NULL) continue; if ((error = readent(trans->tas_opc, msg, ent)) != 0) return error; } return 0;}voidlstcon_rpc_trans_destroy(lstcon_rpc_trans_t *trans){ srpc_client_rpc_t *rpc; lstcon_rpc_t *crpc; lstcon_rpc_t *tmp; int count = 0; list_for_each_entry_safe(crpc, tmp, &trans->tas_rpcs_list, crp_link) { rpc = crpc->crp_rpc; spin_lock(&rpc->crpc_lock); /* free it if not posted or finished already */ if (!crpc->crp_posted || crpc->crp_finished) { spin_unlock(&rpc->crpc_lock); list_del_init(&crpc->crp_link); lstcon_rpc_put(crpc); continue; } /* rpcs can be still not callbacked (even LNetMDUnlink is called) * because huge timeout for inaccessible network, don't make * user wait for them, just abandon them, they will be recycled * in callback */ LASSERT (crpc->crp_status != 0); crpc->crp_node = NULL; crpc->crp_trans = NULL; list_del_init(&crpc->crp_link); count ++; spin_unlock(&rpc->crpc_lock); atomic_dec(&trans->tas_remaining); } LASSERT (atomic_read(&trans->tas_remaining) == 0); list_del(&trans->tas_link); if (!list_empty(&trans->tas_olink)) list_del(&trans->tas_olink); CDEBUG(D_NET, "Transaction %s destroyed with %d pending RPCs\n", lstcon_rpc_trans_name(trans->tas_opc), count); LIBCFS_FREE(trans, sizeof(*trans)); return;}intlstcon_sesrpc_prep(lstcon_node_t *nd, int transop, lstcon_rpc_t **crpc){ srpc_mksn_reqst_t *msrq; srpc_rmsn_reqst_t *rsrq; int rc; switch (transop) { case LST_TRANS_SESNEW: rc = lstcon_rpc_prep(nd, SRPC_SERVICE_MAKE_SESSION, 0, crpc); if (rc != 0) return rc; msrq = &(*crpc)->crp_rpc->crpc_reqstmsg.msg_body.mksn_reqst; msrq->mksn_sid = console_session.ses_id; msrq->mksn_force = console_session.ses_force; strncpy(msrq->mksn_name, console_session.ses_name, strlen(console_session.ses_name)); break; case LST_TRANS_SESEND: rc = lstcon_rpc_prep(nd, SRPC_SERVICE_REMOVE_SESSION, 0, crpc); if (rc != 0) return rc; rsrq = &(*crpc)->crp_rpc->crpc_reqstmsg.msg_body.rmsn_reqst; rsrq->rmsn_sid = console_session.ses_id; break; default: LBUG(); } return 0;}intlstcon_dbgrpc_prep(lstcon_node_t *nd, lstcon_rpc_t **crpc){ srpc_debug_reqst_t *drq; int rc; rc = lstcon_rpc_prep(nd, SRPC_SERVICE_DEBUG, 0, crpc); if (rc != 0) return rc; drq = &(*crpc)->crp_rpc->crpc_reqstmsg.msg_body.dbg_reqst; drq->dbg_sid = console_session.ses_id; drq->dbg_flags = 0; return rc;}intlstcon_batrpc_prep(lstcon_node_t *nd, int transop, lstcon_tsb_hdr_t *tsb, lstcon_rpc_t **crpc){ lstcon_batch_t *batch; srpc_batch_reqst_t *brq; int rc; rc = lstcon_rpc_prep(nd, SRPC_SERVICE_BATCH, 0, crpc); if (rc != 0) return rc; brq = &(*crpc)->crp_rpc->crpc_reqstmsg.msg_body.bat_reqst; brq->bar_sid = console_session.ses_id; brq->bar_bid = tsb->tsb_id; brq->bar_testidx = tsb->tsb_index; brq->bar_opc = transop == LST_TRANS_TSBRUN ? SRPC_BATCH_OPC_RUN : (transop == LST_TRANS_TSBSTOP ? SRPC_BATCH_OPC_STOP: SRPC_BATCH_OPC_QUERY); if (transop != LST_TRANS_TSBRUN && transop != LST_TRANS_TSBSTOP) return 0; LASSERT (tsb->tsb_index == 0); batch = (lstcon_batch_t *)tsb; brq->bar_arg = batch->bat_arg; return 0;}intlstcon_statrpc_prep(lstcon_node_t *nd, lstcon_rpc_t **crpc){ srpc_stat_reqst_t *srq; int rc; rc = lstcon_rpc_prep(nd, SRPC_SERVICE_QUERY_STAT, 0, crpc); if (rc != 0) return rc; srq = &(*crpc)->crp_rpc->crpc_reqstmsg.msg_body.stat_reqst; srq->str_sid = console_session.ses_id; srq->str_type = 0; /* XXX remove it */ return 0;}lnet_process_id_t *lstcon_next_id(int idx, int nkiov, lnet_kiov_t *kiov){ lnet_process_id_t *pid; int i; i = idx / (CFS_PAGE_SIZE / sizeof(lnet_process_id_t)); LASSERT (i < nkiov); pid = (lnet_process_id_t *)cfs_page_address(kiov[i].kiov_page); return &pid[idx % (CFS_PAGE_SIZE / sizeof(lnet_process_id_t))];}intlstcon_dstnodes_prep(lstcon_group_t *grp, int idx, int dist, int span, int nkiov, lnet_kiov_t *kiov){ lnet_process_id_t *pid; lstcon_ndlink_t *ndl; lstcon_node_t *nd; int start; int end; int i = 0; LASSERT (dist >= 1); LASSERT (span >= 1); LASSERT (grp->grp_nnode >= 1); if (span > grp->grp_nnode) return -EINVAL; start = ((idx / dist) * span) % grp->grp_nnode; end = ((idx / dist) * span + span - 1) % grp->grp_nnode; list_for_each_entry(ndl, &grp->grp_ndl_list, ndl_link) { nd = ndl->ndl_node; if (i < start) { i ++; continue; } if (i > (end >= start ? end: grp->grp_nnode)) break; pid = lstcon_next_id((i - start), nkiov, kiov); *pid = nd->nd_id; i++; } if (start <= end) /* done */ return 0; list_for_each_entry(ndl, &grp->grp_ndl_list, ndl_link) { if (i > grp->grp_nnode + end) break; nd = ndl->ndl_node; pid = lstcon_next_id((i - start), nkiov, kiov); *pid = nd->nd_id; i++; } return 0;}intlstcon_pingrpc_prep(lst_test_ping_param_t *param, srpc_test_reqst_t *req){ test_ping_req_t *prq = &req->tsr_u.ping; prq->png_size = param->png_size; prq->png_flags = param->png_flags; /* TODO dest */ return 0;}intlstcon_bulkrpc_prep(lst_test_bulk_param_t *param, srpc_test_reqst_t *req){ test_bulk_req_t *brq = &req->tsr_u.bulk; brq->blk_opc = param->blk_opc; brq->blk_npg = (param->blk_size + CFS_PAGE_SIZE - 1) / CFS_PAGE_SIZE; brq->blk_flags = param->blk_flags; return 0;}intlstcon_testrpc_prep(lstcon_node_t *nd, int transop, lstcon_test_t *test, lstcon_rpc_t **crpc){ lstcon_group_t *sgrp = test->tes_src_grp; lstcon_group_t *dgrp = test->tes_dst_grp; srpc_test_reqst_t *trq; srpc_bulk_t *bulk; int i; int n = 0; int rc = 0; if (transop == LST_TRANS_TSBCLIADD) n = sfw_id_pages(test->tes_span); rc = lstcon_rpc_prep(nd, SRPC_SERVICE_TEST, n, crpc); if (rc != 0) return rc; trq = &(*crpc)->crp_rpc->crpc_reqstmsg.msg_body.tes_reqst; if (transop == LST_TRANS_TSBSRVADD) { int ndist = (sgrp->grp_nnode + test->tes_dist - 1) / test->tes_dist; int nspan = (dgrp->grp_nnode + test->tes_span - 1) / test->tes_span; int nmax = (ndist + nspan - 1) / nspan; trq->tsr_ndest = 0; trq->tsr_loop = nmax * test->tes_dist * test->tes_concur; } else { bulk = &(*crpc)->crp_rpc->crpc_bulk; for (i = 0; i < n; i++) { bulk->bk_iovs[i].kiov_offset = 0; bulk->bk_iovs[i].kiov_len = CFS_PAGE_SIZE; bulk->bk_iovs[i].kiov_page = cfs_alloc_page(CFS_ALLOC_STD); if (bulk->bk_iovs[i].kiov_page != NULL) continue; lstcon_rpc_put(*crpc); return -ENOMEM; } bulk->bk_sink = 0; LASSERT (transop == LST_TRANS_TSBCLIADD); rc = lstcon_dstnodes_prep(test->tes_dst_grp, test->tes_cliidx++, test->tes_dist, test->tes_span, n, &bulk->bk_iovs[0]); if (rc != 0) { lstcon_rpc_put(*crpc); return rc; } trq->tsr_ndest = test->tes_span; trq->tsr_loop = test->tes_loop; } trq->tsr_sid = console_session.ses_id; trq->tsr_bid = test->tes_hdr.tsb_id; trq->tsr_concur = test->tes_concur; trq->tsr_is_client = (transop == LST_TRANS_TSBCLIADD) ? 1 : 0; trq->tsr_stop_onerr = test->tes_stop_onerr; switch (test->tes_type) { case LST_TEST_PING: trq->tsr_service = SRPC_SERVICE_PING; rc = lstcon_pingrpc_prep((lst_test_ping_param_t *)&test->tes_param[0], trq); break; case LST_TEST_BULK: trq->tsr_service = SRPC_SERVICE_BRW; rc = lstcon_bulkrpc_prep((lst_test_bulk_param_t *)&test->tes_param[0], trq); break; default: LBUG(); break; } return rc;}voidlstcon_rpc_stat_reply(int transop, srpc_msg_t *msg, lstcon_node_t *nd, lstcon_trans_stat_t *stat){ srpc_mksn_reply_t *mksn_rep; srpc_rmsn_reply_t *rmsn_rep; srpc_debug_reply_t *dbg_rep; srpc_batch_reply_t *bat_rep; srpc_test_reply_t *test_rep; srpc_stat_reply_t *stat_rep; int errno = 0; switch (transop) { case LST_TRANS_SESNEW: mksn_rep = &msg->msg_body.mksn_reply; if (mksn_rep->mksn_status == 0) { lstcon_sesop_stat_success(stat, 1); /* session timeout on remote node */ nd->nd_timeout = mksn_rep->mksn_timeout; return; }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -