📄 rpc.c
字号:
rpc->crpc_aborted = 1; rpc->crpc_status = why; if (peer != NULL) { spin_lock(&peer->stp_lock); if (!list_empty(&rpc->crpc_privl)) { /* still queued */ list_del_init(&rpc->crpc_privl); srpc_client_rpc_decref(rpc); /* --ref for peer->*rpcq */ rpc->crpc_peer = NULL; /* no credit taken */ } spin_unlock(&peer->stp_lock); } swi_schedule_workitem(&rpc->crpc_wi); return;}/* called with rpc->crpc_lock held */voidsrpc_post_rpc (srpc_client_rpc_t *rpc){ srpc_peer_t *peer; LASSERT (!rpc->crpc_aborted); LASSERT (rpc->crpc_peer == NULL); LASSERT (srpc_data.rpc_state == SRPC_STATE_RUNNING); LASSERT ((rpc->crpc_bulk.bk_len & ~CFS_PAGE_MASK) == 0); CDEBUG (D_NET, "Posting RPC: peer %s, service %d, timeout %d\n", libcfs_id2str(rpc->crpc_dest), rpc->crpc_service, rpc->crpc_timeout); srpc_add_client_rpc_timer(rpc); peer = srpc_nid2peer(rpc->crpc_dest.nid); if (peer == NULL) { srpc_abort_rpc(rpc, -ENOMEM); return; } srpc_queue_rpc(peer, rpc); spin_unlock(&rpc->crpc_lock); srpc_check_sends(peer, 0); spin_lock(&rpc->crpc_lock); return;}intsrpc_send_reply (srpc_server_rpc_t *rpc){ srpc_event_t *ev = &rpc->srpc_ev; srpc_msg_t *msg = &rpc->srpc_replymsg; srpc_buffer_t *buffer = rpc->srpc_reqstbuf; srpc_service_t *sv = rpc->srpc_service; __u64 rpyid; int rc; LASSERT (buffer != NULL); rpyid = buffer->buf_msg.msg_body.reqst.rpyid; spin_lock(&sv->sv_lock); if (!sv->sv_shuttingdown && sv->sv_id > SRPC_FRAMEWORK_SERVICE_MAX_ID) { /* Repost buffer before replying since test client * might send me another RPC once it gets the reply */ if (srpc_service_post_buffer(sv, buffer) != 0) CWARN ("Failed to repost %s buffer\n", sv->sv_name); rpc->srpc_reqstbuf = NULL; } spin_unlock(&sv->sv_lock); ev->ev_fired = 0; ev->ev_data = rpc; ev->ev_type = SRPC_REPLY_SENT; msg->msg_magic = SRPC_MSG_MAGIC; msg->msg_version = SRPC_MSG_VERSION; msg->msg_type = srpc_service2reply(sv->sv_id); rc = srpc_post_active_rdma(SRPC_RDMA_PORTAL, rpyid, msg, sizeof(*msg), LNET_MD_OP_PUT, rpc->srpc_peer, rpc->srpc_self, &rpc->srpc_replymdh, ev); if (rc != 0) ev->ev_fired = 1; /* no more event expected */ return rc;}/* when in kernel always called with LNET_LOCK() held, and in thread context */void srpc_lnet_ev_handler (lnet_event_t *ev){ srpc_event_t *rpcev = ev->md.user_ptr; srpc_client_rpc_t *crpc; srpc_server_rpc_t *srpc; srpc_buffer_t *buffer; srpc_service_t *sv; srpc_msg_t *msg; srpc_msg_type_t type; LASSERT (!in_interrupt()); if (ev->status != 0) { spin_lock(&srpc_data.rpc_glock); srpc_data.rpc_counters.errors++; spin_unlock(&srpc_data.rpc_glock); } rpcev->ev_lnet = ev->type; switch (rpcev->ev_type) { default: LBUG (); case SRPC_REQUEST_SENT: if (ev->status == 0 && ev->type != LNET_EVENT_UNLINK) { spin_lock(&srpc_data.rpc_glock); srpc_data.rpc_counters.rpcs_sent++; spin_unlock(&srpc_data.rpc_glock); } case SRPC_REPLY_RCVD: case SRPC_BULK_REQ_RCVD: crpc = rpcev->ev_data; LASSERT (rpcev == &crpc->crpc_reqstev || rpcev == &crpc->crpc_replyev || rpcev == &crpc->crpc_bulkev); spin_lock(&crpc->crpc_lock); LASSERT (rpcev->ev_fired == 0); rpcev->ev_fired = 1; rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ? -EINTR : ev->status; swi_schedule_workitem(&crpc->crpc_wi); spin_unlock(&crpc->crpc_lock); break; case SRPC_REQUEST_RCVD: sv = rpcev->ev_data; LASSERT (rpcev == &sv->sv_ev); spin_lock(&sv->sv_lock); LASSERT (ev->unlinked); LASSERT (ev->type == LNET_EVENT_PUT || ev->type == LNET_EVENT_UNLINK); LASSERT (ev->type != LNET_EVENT_UNLINK || sv->sv_shuttingdown); buffer = container_of(ev->md.start, srpc_buffer_t, buf_msg); buffer->buf_peer = ev->initiator; buffer->buf_self = ev->target.nid; sv->sv_nposted_msg--; LASSERT (sv->sv_nposted_msg >= 0); if (sv->sv_shuttingdown) { /* Leave buffer on sv->sv_posted_msgq since * srpc_finish_service needs to traverse it. */ spin_unlock(&sv->sv_lock); break; } list_del(&buffer->buf_list); /* from sv->sv_posted_msgq */ msg = &buffer->buf_msg; type = srpc_service2request(sv->sv_id); if (ev->status != 0 || ev->mlength != sizeof(*msg) || (msg->msg_type != type && msg->msg_type != __swab32(type)) || (msg->msg_magic != SRPC_MSG_MAGIC && msg->msg_magic != __swab32(SRPC_MSG_MAGIC))) { CERROR ("Dropping RPC (%s) from %s: " "status %d mlength %d type %u magic %u.\n", sv->sv_name, libcfs_id2str(ev->initiator), ev->status, ev->mlength, msg->msg_type, msg->msg_magic); /* NB might drop sv_lock in srpc_service_recycle_buffer, * sv_nposted_msg++ as an implicit reference to prevent * sv from disappearing under me */ sv->sv_nposted_msg++; srpc_service_recycle_buffer(sv, buffer); sv->sv_nposted_msg--; spin_unlock(&sv->sv_lock); if (ev->status == 0) { /* status!=0 counted already */ spin_lock(&srpc_data.rpc_glock); srpc_data.rpc_counters.errors++; spin_unlock(&srpc_data.rpc_glock); } break; } if (!list_empty(&sv->sv_free_rpcq)) { srpc = list_entry(sv->sv_free_rpcq.next, srpc_server_rpc_t, srpc_list); list_del(&srpc->srpc_list); srpc_init_server_rpc(srpc, sv, buffer); list_add_tail(&srpc->srpc_list, &sv->sv_active_rpcq); srpc_schedule_server_rpc(srpc); } else { list_add_tail(&buffer->buf_list, &sv->sv_blocked_msgq); } spin_unlock(&sv->sv_lock); spin_lock(&srpc_data.rpc_glock); srpc_data.rpc_counters.rpcs_rcvd++; spin_unlock(&srpc_data.rpc_glock); break; case SRPC_BULK_GET_RPLD: LASSERT (ev->type == LNET_EVENT_SEND || ev->type == LNET_EVENT_REPLY || ev->type == LNET_EVENT_UNLINK); if (ev->type == LNET_EVENT_SEND && ev->status == 0 && !ev->unlinked) break; /* wait for the final LNET_EVENT_REPLY */ case SRPC_BULK_PUT_SENT: if (ev->status == 0 && ev->type != LNET_EVENT_UNLINK) { spin_lock(&srpc_data.rpc_glock); if (rpcev->ev_type == SRPC_BULK_GET_RPLD) srpc_data.rpc_counters.bulk_get += ev->mlength; else srpc_data.rpc_counters.bulk_put += ev->mlength; spin_unlock(&srpc_data.rpc_glock); } case SRPC_REPLY_SENT: srpc = rpcev->ev_data; sv = srpc->srpc_service; LASSERT (rpcev == &srpc->srpc_ev); spin_lock(&sv->sv_lock); rpcev->ev_fired = 1; rpcev->ev_status = (ev->type == LNET_EVENT_UNLINK) ? -EINTR : ev->status; srpc_schedule_server_rpc(srpc); spin_unlock(&sv->sv_lock); break; } return;}#ifndef __KERNEL__intsrpc_check_event (int timeout){ lnet_event_t ev; int rc; int i; rc = LNetEQPoll(&srpc_data.rpc_lnet_eq, 1, timeout * 1000, &ev, &i); if (rc == 0) return 0; LASSERT (rc == -EOVERFLOW || rc == 1); /* We can't affort to miss any events... */ if (rc == -EOVERFLOW) { CERROR ("Dropped an event!!!\n"); abort(); } srpc_lnet_ev_handler(&ev); return 1;}#endifintsrpc_startup (void){ int i; int rc; memset(&srpc_data, 0, sizeof(struct smoketest_rpc)); spin_lock_init(&srpc_data.rpc_glock); /* 1 second pause to avoid timestamp reuse */ cfs_pause(cfs_time_seconds(1)); srpc_data.rpc_matchbits = ((__u64) cfs_time_current_sec()) << 48; srpc_data.rpc_state = SRPC_STATE_NONE; LIBCFS_ALLOC(srpc_data.rpc_peers, sizeof(struct list_head) * SRPC_PEER_HASH_SIZE); if (srpc_data.rpc_peers == NULL) { CERROR ("Failed to alloc peer hash.\n"); return -ENOMEM; } for (i = 0; i < SRPC_PEER_HASH_SIZE; i++) CFS_INIT_LIST_HEAD(&srpc_data.rpc_peers[i]);#ifdef __KERNEL__ rc = LNetNIInit(LUSTRE_SRV_LNET_PID);#else if (the_lnet.ln_server_mode_flag) rc = LNetNIInit(LUSTRE_SRV_LNET_PID); else rc = LNetNIInit(getpid() | LNET_PID_USERFLAG);#endif if (rc < 0) { CERROR ("LNetNIInit() has failed: %d\n", rc); LIBCFS_FREE(srpc_data.rpc_peers, sizeof(struct list_head) * SRPC_PEER_HASH_SIZE); return rc; } srpc_data.rpc_state = SRPC_STATE_NI_INIT; srpc_data.rpc_lnet_eq = LNET_EQ_NONE;#ifdef __KERNEL__ rc = LNetEQAlloc(16, srpc_lnet_ev_handler, &srpc_data.rpc_lnet_eq);#else rc = LNetEQAlloc(10240, LNET_EQ_HANDLER_NONE, &srpc_data.rpc_lnet_eq);#endif if (rc != 0) { CERROR("LNetEQAlloc() has failed: %d\n", rc); goto bail; } rc = LNetSetLazyPortal(SRPC_FRAMEWORK_REQUEST_PORTAL); LASSERT (rc == 0); srpc_data.rpc_state = SRPC_STATE_EQ_INIT; rc = swi_startup(); if (rc != 0) goto bail; srpc_data.rpc_state = SRPC_STATE_WI_INIT; rc = stt_startup();bail: if (rc != 0) srpc_shutdown(); else srpc_data.rpc_state = SRPC_STATE_RUNNING; return rc;}voidsrpc_shutdown (void){ int i; int rc; int state; state = srpc_data.rpc_state; srpc_data.rpc_state = SRPC_STATE_STOPPING; switch (state) { default: LBUG (); case SRPC_STATE_RUNNING: spin_lock(&srpc_data.rpc_glock); for (i = 0; i <= SRPC_SERVICE_MAX_ID; i++) { srpc_service_t *sv = srpc_data.rpc_services[i]; LASSERTF (sv == NULL, "service not empty: id %d, name %s\n", i, sv->sv_name); } spin_unlock(&srpc_data.rpc_glock); stt_shutdown(); case SRPC_STATE_WI_INIT: swi_shutdown(); case SRPC_STATE_EQ_INIT: rc = LNetClearLazyPortal(SRPC_FRAMEWORK_REQUEST_PORTAL); LASSERT (rc == 0); rc = LNetEQFree(srpc_data.rpc_lnet_eq); LASSERT (rc == 0); /* the EQ should have no user by now */ case SRPC_STATE_NI_INIT: LNetNIFini(); break; } /* srpc_peer_t's are kept in hash until shutdown */ for (i = 0; i < SRPC_PEER_HASH_SIZE; i++) { srpc_peer_t *peer; while (!list_empty(&srpc_data.rpc_peers[i])) { peer = list_entry(srpc_data.rpc_peers[i].next, srpc_peer_t, stp_list); list_del(&peer->stp_list); LASSERT (list_empty(&peer->stp_rpcq)); LASSERT (list_empty(&peer->stp_ctl_rpcq)); LASSERT (peer->stp_credits == SRPC_PEER_CREDITS); LIBCFS_FREE(peer, sizeof(srpc_peer_t)); } } LIBCFS_FREE(srpc_data.rpc_peers, sizeof(struct list_head) * SRPC_PEER_HASH_SIZE); return;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -