📄 rpc.c
字号:
return 0;}intsrpc_post_active_rqtbuf(lnet_process_id_t peer, int service, void *buf, int len, lnet_handle_md_t *mdh, srpc_event_t *ev){ int rc; int portal; if (service > SRPC_FRAMEWORK_SERVICE_MAX_ID) portal = SRPC_REQUEST_PORTAL; else portal = SRPC_FRAMEWORK_REQUEST_PORTAL; rc = srpc_post_active_rdma(portal, service, buf, len, LNET_MD_OP_PUT, peer, LNET_NID_ANY, mdh, ev); return rc;}intsrpc_post_passive_rqtbuf(int service, void *buf, int len, lnet_handle_md_t *mdh, srpc_event_t *ev){ int rc; int portal; lnet_process_id_t any = {.nid = LNET_NID_ANY, .pid = LNET_PID_ANY}; if (service > SRPC_FRAMEWORK_SERVICE_MAX_ID) portal = SRPC_REQUEST_PORTAL; else portal = SRPC_FRAMEWORK_REQUEST_PORTAL; rc = srpc_post_passive_rdma(portal, service, buf, len, LNET_MD_OP_PUT, any, mdh, ev); return rc;}intsrpc_service_post_buffer (srpc_service_t *sv, srpc_buffer_t *buf){ srpc_msg_t *msg = &buf->buf_msg; int rc; LASSERT (!sv->sv_shuttingdown); buf->buf_mdh = LNET_INVALID_HANDLE; list_add(&buf->buf_list, &sv->sv_posted_msgq); sv->sv_nposted_msg++; spin_unlock(&sv->sv_lock); rc = srpc_post_passive_rqtbuf(sv->sv_id, msg, sizeof(*msg), &buf->buf_mdh, &sv->sv_ev); /* At this point, a RPC (new or delayed) may have arrived in * msg and its event handler has been called. So we must add * buf to sv_posted_msgq _before_ dropping sv_lock */ spin_lock(&sv->sv_lock); if (rc == 0) { if (sv->sv_shuttingdown) { spin_unlock(&sv->sv_lock); /* srpc_shutdown_service might have tried to unlink me * when my buf_mdh was still invalid */ LNetMDUnlink(buf->buf_mdh); spin_lock(&sv->sv_lock); } return 0; } sv->sv_nposted_msg--; if (sv->sv_shuttingdown) return rc; list_del(&buf->buf_list); spin_unlock(&sv->sv_lock); LIBCFS_FREE(buf, sizeof(*buf)); spin_lock(&sv->sv_lock); return rc; }intsrpc_service_add_buffers (srpc_service_t *sv, int nbuffer){ int rc; int posted; srpc_buffer_t *buf; LASSERTF (nbuffer > 0, "nbuffer must be positive: %d\n", nbuffer); for (posted = 0; posted < nbuffer; posted++) { LIBCFS_ALLOC(buf, sizeof(*buf)); if (buf == NULL) break; spin_lock(&sv->sv_lock); rc = srpc_service_post_buffer(sv, buf); spin_unlock(&sv->sv_lock); if (rc != 0) break; } return posted;}voidsrpc_service_remove_buffers (srpc_service_t *sv, int nbuffer){ LASSERTF (nbuffer > 0, "nbuffer must be positive: %d\n", nbuffer); spin_lock(&sv->sv_lock); LASSERT (sv->sv_nprune >= 0); LASSERT (!sv->sv_shuttingdown); sv->sv_nprune += nbuffer; spin_unlock(&sv->sv_lock); return;}/* returns 1 if sv has finished, otherwise 0 */intsrpc_finish_service (srpc_service_t *sv){ srpc_server_rpc_t *rpc; srpc_buffer_t *buf; spin_lock(&sv->sv_lock); LASSERT (sv->sv_shuttingdown); /* srpc_shutdown_service called */ if (sv->sv_nposted_msg != 0 || !list_empty(&sv->sv_active_rpcq)) { CDEBUG (D_NET, "waiting for %d posted buffers to unlink and " "in-flight RPCs to die.\n", sv->sv_nposted_msg); if (!list_empty(&sv->sv_active_rpcq)) { rpc = list_entry(sv->sv_active_rpcq.next, srpc_server_rpc_t, srpc_list); CDEBUG (D_NETERROR, "Active RPC on shutdown: sv %s, peer %s, " "wi %s scheduled %d running %d, " "ev fired %d type %d status %d lnet %d\n", sv->sv_name, libcfs_id2str(rpc->srpc_peer), swi_state2str(rpc->srpc_wi.wi_state), rpc->srpc_wi.wi_scheduled, rpc->srpc_wi.wi_running, rpc->srpc_ev.ev_fired, rpc->srpc_ev.ev_type, rpc->srpc_ev.ev_status, rpc->srpc_ev.ev_lnet); } spin_unlock(&sv->sv_lock); return 0; } spin_unlock(&sv->sv_lock); /* no lock needed from now on */ for (;;) { struct list_head *q; if (!list_empty(&sv->sv_posted_msgq)) q = &sv->sv_posted_msgq; else if (!list_empty(&sv->sv_blocked_msgq)) q = &sv->sv_blocked_msgq; else break; buf = list_entry(q->next, srpc_buffer_t, buf_list); list_del(&buf->buf_list); LIBCFS_FREE(buf, sizeof(*buf)); } while (!list_empty(&sv->sv_free_rpcq)) { rpc = list_entry(sv->sv_free_rpcq.next, srpc_server_rpc_t, srpc_list); list_del(&rpc->srpc_list); LIBCFS_FREE(rpc, sizeof(*rpc)); } return 1;}/* called with sv->sv_lock held */voidsrpc_service_recycle_buffer (srpc_service_t *sv, srpc_buffer_t *buf){ if (sv->sv_shuttingdown) goto free; if (sv->sv_nprune == 0) { if (srpc_service_post_buffer(sv, buf) != 0) CWARN ("Failed to post %s buffer\n", sv->sv_name); return; } sv->sv_nprune--;free: spin_unlock(&sv->sv_lock); LIBCFS_FREE(buf, sizeof(*buf)); spin_lock(&sv->sv_lock);}voidsrpc_shutdown_service (srpc_service_t *sv){ srpc_server_rpc_t *rpc; srpc_buffer_t *buf; spin_lock(&sv->sv_lock); CDEBUG (D_NET, "Shutting down service: id %d, name %s\n", sv->sv_id, sv->sv_name); sv->sv_shuttingdown = 1; /* i.e. no new active RPC */ /* schedule in-flight RPCs to notice the shutdown */ list_for_each_entry (rpc, &sv->sv_active_rpcq, srpc_list) { swi_schedule_workitem(&rpc->srpc_wi); } spin_unlock(&sv->sv_lock); /* OK to traverse sv_posted_msgq without lock, since no one * touches sv_posted_msgq now */ list_for_each_entry (buf, &sv->sv_posted_msgq, buf_list) LNetMDUnlink(buf->buf_mdh); return;}intsrpc_send_request (srpc_client_rpc_t *rpc){ srpc_event_t *ev = &rpc->crpc_reqstev; int rc; ev->ev_fired = 0; ev->ev_data = rpc; ev->ev_type = SRPC_REQUEST_SENT; rc = srpc_post_active_rqtbuf(rpc->crpc_dest, rpc->crpc_service, &rpc->crpc_reqstmsg, sizeof(srpc_msg_t), &rpc->crpc_reqstmdh, ev); if (rc != 0) { LASSERT (rc == -ENOMEM); ev->ev_fired = 1; /* no more event expected */ } return rc;}intsrpc_prepare_reply (srpc_client_rpc_t *rpc){ srpc_event_t *ev = &rpc->crpc_replyev; __u64 *id = &rpc->crpc_reqstmsg.msg_body.reqst.rpyid; int rc; ev->ev_fired = 0; ev->ev_data = rpc; ev->ev_type = SRPC_REPLY_RCVD; *id = srpc_next_id(); rc = srpc_post_passive_rdma(SRPC_RDMA_PORTAL, *id, &rpc->crpc_replymsg, sizeof(srpc_msg_t), LNET_MD_OP_PUT, rpc->crpc_dest, &rpc->crpc_replymdh, ev); if (rc != 0) { LASSERT (rc == -ENOMEM); ev->ev_fired = 1; /* no more event expected */ } return rc;}intsrpc_prepare_bulk (srpc_client_rpc_t *rpc){ srpc_bulk_t *bk = &rpc->crpc_bulk; srpc_event_t *ev = &rpc->crpc_bulkev; __u64 *id = &rpc->crpc_reqstmsg.msg_body.reqst.bulkid; int rc; int opt; LASSERT (bk->bk_niov <= LNET_MAX_IOV); if (bk->bk_niov == 0) return 0; /* nothing to do */ opt = bk->bk_sink ? LNET_MD_OP_PUT : LNET_MD_OP_GET;#ifdef __KERNEL__ opt |= LNET_MD_KIOV;#else opt |= LNET_MD_IOVEC;#endif ev->ev_fired = 0; ev->ev_data = rpc; ev->ev_type = SRPC_BULK_REQ_RCVD; *id = srpc_next_id(); rc = srpc_post_passive_rdma(SRPC_RDMA_PORTAL, *id, &bk->bk_iovs[0], bk->bk_niov, opt, rpc->crpc_dest, &bk->bk_mdh, ev); if (rc != 0) { LASSERT (rc == -ENOMEM); ev->ev_fired = 1; /* no more event expected */ } return rc;}intsrpc_do_bulk (srpc_server_rpc_t *rpc){ srpc_event_t *ev = &rpc->srpc_ev; srpc_bulk_t *bk = rpc->srpc_bulk; __u64 id = rpc->srpc_reqstbuf->buf_msg.msg_body.reqst.bulkid; int rc; int opt; LASSERT (bk != NULL); opt = bk->bk_sink ? LNET_MD_OP_GET : LNET_MD_OP_PUT;#ifdef __KERNEL__ opt |= LNET_MD_KIOV;#else opt |= LNET_MD_IOVEC;#endif ev->ev_fired = 0; ev->ev_data = rpc; ev->ev_type = bk->bk_sink ? SRPC_BULK_GET_RPLD : SRPC_BULK_PUT_SENT; rc = srpc_post_active_rdma(SRPC_RDMA_PORTAL, id, &bk->bk_iovs[0], bk->bk_niov, opt, rpc->srpc_peer, rpc->srpc_self, &bk->bk_mdh, ev); if (rc != 0) ev->ev_fired = 1; /* no more event expected */ return rc;}/* called with srpc_service_t::sv_lock held */inline voidsrpc_schedule_server_rpc (srpc_server_rpc_t *rpc){ srpc_service_t *sv = rpc->srpc_service; if (sv->sv_id > SRPC_FRAMEWORK_SERVICE_MAX_ID) swi_schedule_workitem(&rpc->srpc_wi); else /* framework RPCs are handled one by one */ swi_schedule_serial_workitem(&rpc->srpc_wi); return;}/* only called from srpc_handle_rpc */voidsrpc_server_rpc_done (srpc_server_rpc_t *rpc, int status){ srpc_service_t *sv = rpc->srpc_service; srpc_buffer_t *buffer; LASSERT (status != 0 || rpc->srpc_wi.wi_state == SWI_STATE_DONE); rpc->srpc_status = status; CDEBUG (status == 0 ? D_NET : D_NETERROR, "Server RPC done: service %s, peer %s, status %s:%d\n", sv->sv_name, libcfs_id2str(rpc->srpc_peer), swi_state2str(rpc->srpc_wi.wi_state), status); if (status != 0) { spin_lock(&srpc_data.rpc_glock); srpc_data.rpc_counters.rpcs_dropped++; spin_unlock(&srpc_data.rpc_glock); } if (rpc->srpc_done != NULL) (*rpc->srpc_done) (rpc); LASSERT (rpc->srpc_bulk == NULL); spin_lock(&sv->sv_lock); if (rpc->srpc_reqstbuf != NULL) { /* NB might drop sv_lock in srpc_service_recycle_buffer, but * sv won't go away for sv_active_rpcq must not be empty */ srpc_service_recycle_buffer(sv, rpc->srpc_reqstbuf); rpc->srpc_reqstbuf = NULL; } list_del(&rpc->srpc_list); /* from sv->sv_active_rpcq */ /* * No one can schedule me now since: * - I'm not on sv_active_rpcq. * - all LNet events have been fired. * Cancel pending schedules and prevent future schedule attempts: */ LASSERT (rpc->srpc_ev.ev_fired); swi_kill_workitem(&rpc->srpc_wi); if (!sv->sv_shuttingdown && !list_empty(&sv->sv_blocked_msgq)) { buffer = list_entry(sv->sv_blocked_msgq.next, srpc_buffer_t, buf_list); list_del(&buffer->buf_list); srpc_init_server_rpc(rpc, sv, buffer); list_add_tail(&rpc->srpc_list, &sv->sv_active_rpcq); srpc_schedule_server_rpc(rpc); } else { list_add(&rpc->srpc_list, &sv->sv_free_rpcq); } spin_unlock(&sv->sv_lock); return;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -