📄 rpc.c
字号:
}/* handles an incoming RPC */intsrpc_handle_rpc (swi_workitem_t *wi){ srpc_server_rpc_t *rpc = wi->wi_data; srpc_service_t *sv = rpc->srpc_service; srpc_event_t *ev = &rpc->srpc_ev; int rc = 0; LASSERT (wi == &rpc->srpc_wi); spin_lock(&sv->sv_lock); if (sv->sv_shuttingdown) { spin_unlock(&sv->sv_lock); if (rpc->srpc_bulk != NULL) LNetMDUnlink(rpc->srpc_bulk->bk_mdh); LNetMDUnlink(rpc->srpc_replymdh); if (ev->ev_fired) { /* no more event, OK to finish */ srpc_server_rpc_done(rpc, -ESHUTDOWN); return 1; } return 0; } spin_unlock(&sv->sv_lock); switch (wi->wi_state) { default: LBUG (); case SWI_STATE_NEWBORN: { srpc_msg_t *msg; srpc_generic_reply_t *reply; msg = &rpc->srpc_reqstbuf->buf_msg; reply = &rpc->srpc_replymsg.msg_body.reply; if (msg->msg_version != SRPC_MSG_VERSION && msg->msg_version != __swab32(SRPC_MSG_VERSION)) { CWARN ("Version mismatch: %u, %u expected, from %s\n", msg->msg_version, SRPC_MSG_VERSION, libcfs_id2str(rpc->srpc_peer)); reply->status = EPROTO; } else { reply->status = 0; rc = (*sv->sv_handler) (rpc); LASSERT (reply->status == 0 || !rpc->srpc_bulk); } if (rc != 0) { srpc_server_rpc_done(rpc, rc); return 1; } wi->wi_state = SWI_STATE_BULK_STARTED; if (rpc->srpc_bulk != NULL) { rc = srpc_do_bulk(rpc); if (rc == 0) return 0; /* wait for bulk */ LASSERT (ev->ev_fired); ev->ev_status = rc; } } case SWI_STATE_BULK_STARTED: LASSERT (rpc->srpc_bulk == NULL || ev->ev_fired); if (rpc->srpc_bulk != NULL) { rc = ev->ev_status; if (sv->sv_bulk_ready != NULL) rc = (*sv->sv_bulk_ready) (rpc, rc); if (rc != 0) { srpc_server_rpc_done(rpc, rc); return 1; } } wi->wi_state = SWI_STATE_REPLY_SUBMITTED; rc = srpc_send_reply(rpc); if (rc == 0) return 0; /* wait for reply */ srpc_server_rpc_done(rpc, rc); return 1; case SWI_STATE_REPLY_SUBMITTED: LASSERT (ev->ev_fired); wi->wi_state = SWI_STATE_DONE; srpc_server_rpc_done(rpc, ev->ev_status); return 1; } return 0;}voidsrpc_client_rpc_expired (void *data){ srpc_client_rpc_t *rpc = data; CWARN ("Client RPC expired: service %d, peer %s, timeout %d.\n", rpc->crpc_service, libcfs_id2str(rpc->crpc_dest), rpc->crpc_timeout); spin_lock(&rpc->crpc_lock); rpc->crpc_timeout = 0; srpc_abort_rpc(rpc, -ETIMEDOUT); spin_unlock(&rpc->crpc_lock); spin_lock(&srpc_data.rpc_glock); srpc_data.rpc_counters.rpcs_expired++; spin_unlock(&srpc_data.rpc_glock); return;}inline voidsrpc_add_client_rpc_timer (srpc_client_rpc_t *rpc){ stt_timer_t *timer = &rpc->crpc_timer; if (rpc->crpc_timeout == 0) return; CFS_INIT_LIST_HEAD(&timer->stt_list); timer->stt_data = rpc; timer->stt_func = srpc_client_rpc_expired; timer->stt_expires = cfs_time_add(rpc->crpc_timeout, cfs_time_current_sec()); stt_add_timer(timer); return;}/* * Called with rpc->crpc_lock held. * * Upon exit the RPC expiry timer is not queued and the handler is not * running on any CPU. */voidsrpc_del_client_rpc_timer (srpc_client_rpc_t *rpc){ /* timer not planted or already exploded */ if (rpc->crpc_timeout == 0) return; /* timer sucessfully defused */ if (stt_del_timer(&rpc->crpc_timer)) return;#ifdef __KERNEL__ /* timer detonated, wait for it to explode */ while (rpc->crpc_timeout != 0) { spin_unlock(&rpc->crpc_lock); cfs_schedule(); spin_lock(&rpc->crpc_lock); }#else LBUG(); /* impossible in single-threaded runtime */#endif return;}voidsrpc_check_sends (srpc_peer_t *peer, int credits){ struct list_head *q; srpc_client_rpc_t *rpc; LASSERT (credits >= 0); LASSERT (srpc_data.rpc_state == SRPC_STATE_RUNNING); spin_lock(&peer->stp_lock); peer->stp_credits += credits; while (peer->stp_credits) { if (!list_empty(&peer->stp_ctl_rpcq)) q = &peer->stp_ctl_rpcq; else if (!list_empty(&peer->stp_rpcq)) q = &peer->stp_rpcq; else break; peer->stp_credits--; rpc = list_entry(q->next, srpc_client_rpc_t, crpc_privl); list_del_init(&rpc->crpc_privl); srpc_client_rpc_decref(rpc); /* --ref for peer->*rpcq */ swi_schedule_workitem(&rpc->crpc_wi); } spin_unlock(&peer->stp_lock); return;}voidsrpc_client_rpc_done (srpc_client_rpc_t *rpc, int status){ swi_workitem_t *wi = &rpc->crpc_wi; srpc_peer_t *peer = rpc->crpc_peer; LASSERT (status != 0 || wi->wi_state == SWI_STATE_DONE); spin_lock(&rpc->crpc_lock); rpc->crpc_closed = 1; if (rpc->crpc_status == 0) rpc->crpc_status = status; srpc_del_client_rpc_timer(rpc); CDEBUG ((status == 0) ? D_NET : D_NETERROR, "Client RPC done: service %d, peer %s, status %s:%d:%d\n", rpc->crpc_service, libcfs_id2str(rpc->crpc_dest), swi_state2str(wi->wi_state), rpc->crpc_aborted, status); /* * No one can schedule me now since: * - RPC timer has been defused. * - all LNet events have been fired. * - crpc_closed has been set, preventing srpc_abort_rpc from * scheduling me. * Cancel pending schedules and prevent future schedule attempts: */ LASSERT (!srpc_event_pending(rpc)); swi_kill_workitem(wi); spin_unlock(&rpc->crpc_lock); (*rpc->crpc_done) (rpc); if (peer != NULL) srpc_check_sends(peer, 1); return;}/* sends an outgoing RPC */intsrpc_send_rpc (swi_workitem_t *wi){ int rc = 0; srpc_client_rpc_t *rpc = wi->wi_data; srpc_msg_t *reply = &rpc->crpc_replymsg; int do_bulk = rpc->crpc_bulk.bk_niov > 0; LASSERT (rpc != NULL); LASSERT (wi == &rpc->crpc_wi); spin_lock(&rpc->crpc_lock); if (rpc->crpc_aborted) { spin_unlock(&rpc->crpc_lock); goto abort; } spin_unlock(&rpc->crpc_lock); switch (wi->wi_state) { default: LBUG (); case SWI_STATE_NEWBORN: LASSERT (!srpc_event_pending(rpc)); rc = srpc_prepare_reply(rpc); if (rc != 0) { srpc_client_rpc_done(rpc, rc); return 1; } rc = srpc_prepare_bulk(rpc); if (rc != 0) break; wi->wi_state = SWI_STATE_REQUEST_SUBMITTED; rc = srpc_send_request(rpc); break; case SWI_STATE_REQUEST_SUBMITTED: /* CAVEAT EMPTOR: rqtev, rpyev, and bulkev may come in any * order; however, they're processed in a strict order: * rqt, rpy, and bulk. */ if (!rpc->crpc_reqstev.ev_fired) break; rc = rpc->crpc_reqstev.ev_status; if (rc != 0) break; wi->wi_state = SWI_STATE_REQUEST_SENT; /* perhaps more events, fall thru */ case SWI_STATE_REQUEST_SENT: { srpc_msg_type_t type = srpc_service2reply(rpc->crpc_service); if (!rpc->crpc_replyev.ev_fired) break; rc = rpc->crpc_replyev.ev_status; if (rc != 0) break; if ((reply->msg_type != type && reply->msg_type != __swab32(type)) || (reply->msg_magic != SRPC_MSG_MAGIC && reply->msg_magic != __swab32(SRPC_MSG_MAGIC))) { CWARN ("Bad message from %s: type %u (%d expected)," " magic %u (%d expected).\n", libcfs_id2str(rpc->crpc_dest), reply->msg_type, type, reply->msg_magic, SRPC_MSG_MAGIC); rc = -EBADMSG; break; } if (do_bulk && reply->msg_body.reply.status != 0) { CWARN ("Remote error %d at %s, unlink bulk buffer in " "case peer didn't initiate bulk transfer\n", reply->msg_body.reply.status, libcfs_id2str(rpc->crpc_dest)); LNetMDUnlink(rpc->crpc_bulk.bk_mdh); } wi->wi_state = SWI_STATE_REPLY_RECEIVED; } case SWI_STATE_REPLY_RECEIVED: if (do_bulk && !rpc->crpc_bulkev.ev_fired) break; rc = do_bulk ? rpc->crpc_bulkev.ev_status : 0; /* Bulk buffer was unlinked due to remote error. Clear error * since reply buffer still contains valid data. * NB rpc->crpc_done shouldn't look into bulk data in case of * remote error. */ if (do_bulk && rpc->crpc_bulkev.ev_lnet == LNET_EVENT_UNLINK && rpc->crpc_status == 0 && reply->msg_body.reply.status != 0) rc = 0; wi->wi_state = SWI_STATE_DONE; srpc_client_rpc_done(rpc, rc); return 1; } if (rc != 0) { spin_lock(&rpc->crpc_lock); srpc_abort_rpc(rpc, rc); spin_unlock(&rpc->crpc_lock); }abort: if (rpc->crpc_aborted) { LNetMDUnlink(rpc->crpc_reqstmdh); LNetMDUnlink(rpc->crpc_replymdh); LNetMDUnlink(rpc->crpc_bulk.bk_mdh); if (!srpc_event_pending(rpc)) { srpc_client_rpc_done(rpc, -EINTR); return 1; } } return 0;}srpc_client_rpc_t *srpc_create_client_rpc (lnet_process_id_t peer, int service, int nbulkiov, int bulklen, void (*rpc_done)(srpc_client_rpc_t *), void (*rpc_fini)(srpc_client_rpc_t *), void *priv){ srpc_client_rpc_t *rpc; LIBCFS_ALLOC(rpc, offsetof(srpc_client_rpc_t, crpc_bulk.bk_iovs[nbulkiov])); if (rpc == NULL) return NULL; srpc_init_client_rpc(rpc, peer, service, nbulkiov, bulklen, rpc_done, rpc_fini, priv); return rpc;}/* called with rpc->crpc_lock held */static inline voidsrpc_queue_rpc (srpc_peer_t *peer, srpc_client_rpc_t *rpc){ int service = rpc->crpc_service; LASSERT (peer->stp_nid == rpc->crpc_dest.nid); LASSERT (srpc_data.rpc_state == SRPC_STATE_RUNNING); rpc->crpc_peer = peer; spin_lock(&peer->stp_lock); /* Framework RPCs that alter session state shall take precedence * over test RPCs and framework query RPCs */ if (service <= SRPC_FRAMEWORK_SERVICE_MAX_ID && service != SRPC_SERVICE_DEBUG && service != SRPC_SERVICE_QUERY_STAT) list_add_tail(&rpc->crpc_privl, &peer->stp_ctl_rpcq); else list_add_tail(&rpc->crpc_privl, &peer->stp_rpcq); srpc_client_rpc_addref(rpc); /* ++ref for peer->*rpcq */ spin_unlock(&peer->stp_lock); return;}/* called with rpc->crpc_lock held */voidsrpc_abort_rpc (srpc_client_rpc_t *rpc, int why){ srpc_peer_t *peer = rpc->crpc_peer; LASSERT (why != 0); if (rpc->crpc_aborted || /* already aborted */ rpc->crpc_closed) /* callback imminent */ return; CDEBUG (D_NET, "Aborting RPC: service %d, peer %s, state %s, why %d\n", rpc->crpc_service, libcfs_id2str(rpc->crpc_dest), swi_state2str(rpc->crpc_wi.wi_state), why);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -