📄 mxlnd_cb.c
字号:
sum = old_sum + kiov[i].kiov_len; if (i == 0) sum -= kiov[i].kiov_offset; if (!first_found && (sum > offset)) { first_kiov = i; first_kiov_offset = offset - old_sum; //if (i == 0) first_kiov_offset + kiov[i].kiov_offset; if (i == 0) first_kiov_offset = kiov[i].kiov_offset; first_found = 1; sum = kiov[i].kiov_len - first_kiov_offset; old_sum = 0; } if (sum >= nob) { last_kiov = i; last_kiov_length = kiov[i].kiov_len - (sum - nob); if (first_kiov == last_kiov) last_kiov_length -= first_kiov_offset; break; } old_sum = sum; } LASSERT(first_kiov >= 0 && last_kiov >= first_kiov); nseg = last_kiov - first_kiov + 1; LASSERT(nseg > 0); MXLND_ALLOC (seg, nseg * sizeof(*seg)); if (seg == NULL) { CDEBUG(D_NETERROR, "MXLND_ALLOC() failed\n"); return -1; } memset(seg, 0, niov * sizeof(*seg)); ctx->mxc_nseg = niov; sum = 0; for (i = 0; i < niov; i++) { seg[i].segment_ptr = lnet_page2phys(kiov[first_kiov + i].kiov_page); seg[i].segment_length = kiov[first_kiov + i].kiov_len; if (i == 0) { seg[i].segment_ptr += (u64) first_kiov_offset; /* we have to add back the original kiov_offset */ seg[i].segment_length -= first_kiov_offset + kiov[first_kiov].kiov_offset; } if (i == (nseg - 1)) { seg[i].segment_length = last_kiov_length; } sum += seg[i].segment_length; } ctx->mxc_seg_list = seg; ctx->mxc_pin_type = MX_PIN_PHYSICAL;#ifdef MX_PIN_FULLPAGES ctx->mxc_pin_type |= MX_PIN_FULLPAGES;#endif LASSERT(nob == sum); return 0;}voidmxlnd_send_nak(struct kmx_ctx *tx, lnet_nid_t nid, int type, int status, __u64 cookie){ LASSERT(type == MXLND_MSG_PUT_ACK); mxlnd_init_tx_msg(tx, type, sizeof(kmx_putack_msg_t), tx->mxc_nid); tx->mxc_cookie = cookie; tx->mxc_msg->mxm_u.put_ack.mxpam_src_cookie = cookie; tx->mxc_msg->mxm_u.put_ack.mxpam_dst_cookie = ((u64) status << 52); /* error code */ tx->mxc_match = mxlnd_create_match(tx, status); mxlnd_queue_tx(tx);}/** * mxlnd_send_data - get tx, map [k]iov, queue tx * @ni * @lntmsg * @peer * @msg_type * @cookie * * This setups the DATA send for PUT or GET. * * On success, it queues the tx, on failure it calls lnet_finalize() */voidmxlnd_send_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, struct kmx_peer *peer, u8 msg_type, u64 cookie){ int ret = 0; lnet_process_id_t target = lntmsg->msg_target; unsigned int niov = lntmsg->msg_niov; struct iovec *iov = lntmsg->msg_iov; lnet_kiov_t *kiov = lntmsg->msg_kiov; unsigned int offset = lntmsg->msg_offset; unsigned int nob = lntmsg->msg_len; struct kmx_ctx *tx = NULL; LASSERT(lntmsg != NULL); LASSERT(peer != NULL); LASSERT(msg_type == MXLND_MSG_PUT_DATA || msg_type == MXLND_MSG_GET_DATA); LASSERT((cookie>>52) == 0); tx = mxlnd_get_idle_tx(); if (tx == NULL) { CDEBUG(D_NETERROR, "Can't allocate %s tx for %s\n", msg_type == MXLND_MSG_PUT_DATA ? "PUT_DATA" : "GET_DATA", libcfs_nid2str(target.nid)); goto failed_0; } tx->mxc_nid = target.nid; mxlnd_conn_addref(peer->mxp_conn); tx->mxc_peer = peer; tx->mxc_conn = peer->mxp_conn; tx->mxc_msg_type = msg_type; tx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT; tx->mxc_state = MXLND_CTX_PENDING; tx->mxc_lntmsg[0] = lntmsg; tx->mxc_cookie = cookie; tx->mxc_match = mxlnd_create_match(tx, 0); /* This setups up the mx_ksegment_t to send the DATA payload */ if (nob == 0) { /* do not setup the segments */ CDEBUG(D_NETERROR, "nob = 0; why didn't we use an EAGER reply " "to %s?\n", libcfs_nid2str(target.nid)); ret = 0; } else if (kiov == NULL) { ret = mxlnd_setup_iov(tx, niov, iov, offset, nob); } else { ret = mxlnd_setup_kiov(tx, niov, kiov, offset, nob); } if (ret != 0) { CDEBUG(D_NETERROR, "Can't setup send DATA for %s\n", libcfs_nid2str(target.nid)); tx->mxc_status.code = -EIO; goto failed_1; } mxlnd_queue_tx(tx); return;failed_1: mxlnd_conn_decref(peer->mxp_conn); mxlnd_put_idle_tx(tx); return;failed_0: CDEBUG(D_NETERROR, "no tx avail\n"); lnet_finalize(ni, lntmsg, -EIO); return;}/** * mxlnd_recv_data - map [k]iov, post rx * @ni * @lntmsg * @rx * @msg_type * @cookie * * This setups the DATA receive for PUT or GET. * * On success, it returns 0, on failure it returns -1 */intmxlnd_recv_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, struct kmx_ctx *rx, u8 msg_type, u64 cookie){ int ret = 0; lnet_process_id_t target = lntmsg->msg_target; unsigned int niov = lntmsg->msg_niov; struct iovec *iov = lntmsg->msg_iov; lnet_kiov_t *kiov = lntmsg->msg_kiov; unsigned int offset = lntmsg->msg_offset; unsigned int nob = lntmsg->msg_len; mx_return_t mxret = MX_SUCCESS; /* above assumes MXLND_MSG_PUT_DATA */ if (msg_type == MXLND_MSG_GET_DATA) { niov = lntmsg->msg_md->md_niov; iov = lntmsg->msg_md->md_iov.iov; kiov = lntmsg->msg_md->md_iov.kiov; offset = 0; nob = lntmsg->msg_md->md_length; } LASSERT(lntmsg != NULL); LASSERT(rx != NULL); LASSERT(msg_type == MXLND_MSG_PUT_DATA || msg_type == MXLND_MSG_GET_DATA); LASSERT((cookie>>52) == 0); /* ensure top 12 bits are 0 */ rx->mxc_msg_type = msg_type; rx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT; rx->mxc_state = MXLND_CTX_PENDING; rx->mxc_nid = target.nid; /* if posting a GET_DATA, we may not yet know the peer */ if (rx->mxc_peer != NULL) { rx->mxc_conn = rx->mxc_peer->mxp_conn; } rx->mxc_lntmsg[0] = lntmsg; rx->mxc_cookie = cookie; rx->mxc_match = mxlnd_create_match(rx, 0); /* This setups up the mx_ksegment_t to receive the DATA payload */ if (kiov == NULL) { ret = mxlnd_setup_iov(rx, niov, iov, offset, nob); } else { ret = mxlnd_setup_kiov(rx, niov, kiov, offset, nob); } if (msg_type == MXLND_MSG_GET_DATA) { rx->mxc_lntmsg[1] = lnet_create_reply_msg(kmxlnd_data.kmx_ni, lntmsg); if (rx->mxc_lntmsg[1] == NULL) { CDEBUG(D_NETERROR, "Can't create reply for GET -> %s\n", libcfs_nid2str(target.nid)); ret = -1; } } if (ret != 0) { CDEBUG(D_NETERROR, "Can't setup %s rx for %s\n", msg_type == MXLND_MSG_PUT_DATA ? "PUT_DATA" : "GET_DATA", libcfs_nid2str(target.nid)); return -1; } ret = mxlnd_q_pending_ctx(rx); if (ret == -1) { return -1; } CDEBUG(D_NET, "receiving %s 0x%llx\n", mxlnd_msgtype_to_str(msg_type), rx->mxc_cookie); mxret = mx_kirecv(kmxlnd_data.kmx_endpt, rx->mxc_seg_list, rx->mxc_nseg, rx->mxc_pin_type, rx->mxc_match, 0xF00FFFFFFFFFFFFFLL, (void *) rx, &rx->mxc_mxreq); if (mxret != MX_SUCCESS) { if (rx->mxc_conn != NULL) { mxlnd_deq_pending_ctx(rx); } CDEBUG(D_NETERROR, "mx_kirecv() failed with %d for %s\n", (int) mxret, libcfs_nid2str(target.nid)); return -1; } return 0;}/** * mxlnd_send - the LND required send function * @ni * @private * @lntmsg * * This must not block. Since we may not have a peer struct for the receiver, * it will append send messages on a global tx list. We will then up the * tx_queued's semaphore to notify it of the new send. */intmxlnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg){ int ret = 0; int type = lntmsg->msg_type; lnet_hdr_t *hdr = &lntmsg->msg_hdr; lnet_process_id_t target = lntmsg->msg_target; lnet_nid_t nid = target.nid; int target_is_router = lntmsg->msg_target_is_router; int routing = lntmsg->msg_routing; unsigned int payload_niov = lntmsg->msg_niov; struct iovec *payload_iov = lntmsg->msg_iov; lnet_kiov_t *payload_kiov = lntmsg->msg_kiov; unsigned int payload_offset = lntmsg->msg_offset; unsigned int payload_nob = lntmsg->msg_len; struct kmx_ctx *tx = NULL; struct kmx_msg *txmsg = NULL; struct kmx_ctx *rx = (struct kmx_ctx *) private; /* for REPLY */ struct kmx_ctx *rx_data = NULL; struct kmx_conn *conn = NULL; int nob = 0; __u32 length = 0; struct kmx_peer *peer = NULL; CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n", payload_nob, payload_niov, libcfs_id2str(target)); LASSERT (payload_nob == 0 || payload_niov > 0); LASSERT (payload_niov <= LNET_MAX_IOV); /* payload is either all vaddrs or all pages */ LASSERT (!(payload_kiov != NULL && payload_iov != NULL)); /* private is used on LNET_GET_REPLY only, NULL for all other cases */ /* NOTE we may not know the peer if it is the very first PUT_REQ or GET_REQ * to a new peer, use the nid */ peer = mxlnd_find_peer_by_nid(nid); if (peer != NULL) { conn = peer->mxp_conn; if (conn) mxlnd_conn_addref(conn); } if (conn == NULL && peer != NULL) { CDEBUG(D_NETERROR, "conn==NULL peer=0x%p nid=0x%llx payload_nob=%d type=%s\n", peer, nid, payload_nob, ((type==LNET_MSG_PUT) ? "PUT" : ((type==LNET_MSG_GET) ? "GET" : "Other"))); } switch (type) { case LNET_MSG_ACK: LASSERT (payload_nob == 0); break; case LNET_MSG_REPLY: case LNET_MSG_PUT: /* Is the payload small enough not to need DATA? */ nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[payload_nob]); if (nob <= MXLND_EAGER_SIZE) break; /* send EAGER */ tx = mxlnd_get_idle_tx(); if (unlikely(tx == NULL)) { CDEBUG(D_NETERROR, "Can't allocate %s tx for %s\n", type == LNET_MSG_PUT ? "PUT" : "REPLY", libcfs_nid2str(nid)); if (conn) mxlnd_conn_decref(conn); return -ENOMEM; } /* the peer may be NULL */ tx->mxc_peer = peer; tx->mxc_conn = conn; /* may be NULL */ /* we added a conn ref above */ mxlnd_init_tx_msg (tx, MXLND_MSG_PUT_REQ, sizeof(kmx_putreq_msg_t), nid); txmsg = tx->mxc_msg; txmsg->mxm_u.put_req.mxprm_hdr = *hdr; txmsg->mxm_u.put_req.mxprm_cookie = tx->mxc_cookie; tx->mxc_match = mxlnd_create_match(tx, 0); /* we must post a receive _before_ sending the request. * we need to determine how much to receive, it will be either * a put_ack or a put_nak. The put_ack is larger, so use it. */ rx = mxlnd_get_idle_rx(); if (unlikely(rx == NULL)) { CDEBUG(D_NETERROR, "Can't allocate rx for PUT_ACK for %s\n", libcfs_nid2str(nid)); mxlnd_put_idle_tx(tx); if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */ return -ENOMEM; } rx->mxc_nid = nid; rx->mxc_peer = peer; /* conn may be NULL but unlikely since the first msg is always small */ if (conn) mxlnd_conn_addref(conn); /* for this rx */ rx->mxc_conn = conn; rx->mxc_msg_type = MXLND_MSG_PUT_ACK; rx->mxc_cookie = tx->mxc_cookie; rx->mxc_match = mxlnd_create_match(rx, 0); length = offsetof(kmx_msg_t, mxm_u) + sizeof(kmx_putack_msg_t); ret = mxlnd_recv_msg(lntmsg, rx, MXLND_MSG_PUT_ACK, rx->mxc_match, length); if (unlikely(ret != 0)) { CDEBUG(D_NETERROR, "recv_msg() failed for PUT_ACK for %s\n", libcfs_nid2str(nid)); rx->mxc_lntmsg[0] = NULL; mxlnd_put_idle_rx(rx); mxlnd_put_idle_tx(tx); if (conn) { mxlnd_conn_decref(conn); /* for the rx... */ mxlnd_conn_decref(conn); /* and for the tx */ } return -ENOMEM; } mxlnd_queue_tx(tx)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -