📄 qswlnd_cb.c
字号:
if (ktx == NULL) { CERROR ("Can't get txd for msg type %d for %s\n", type, libcfs_nid2str(target.nid)); return (-ENOMEM); } ktx->ktx_state = KTX_SENDING; ktx->ktx_nid = target.nid; ktx->ktx_args[0] = private; ktx->ktx_args[1] = lntmsg; ktx->ktx_args[2] = NULL; /* set when a GET commits to REPLY */ /* The first frag will be the pre-mapped buffer. */ ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1; if ((!target_is_router && /* target.nid is final dest */ !routing && /* I'm the source */ type == LNET_MSG_GET && /* optimize GET? */ *kqswnal_tunables.kqn_optimized_gets != 0 && lntmsg->msg_md->md_length >= *kqswnal_tunables.kqn_optimized_gets) || ((type == LNET_MSG_PUT || /* optimize PUT? */ type == LNET_MSG_REPLY) && /* optimize REPLY? */ *kqswnal_tunables.kqn_optimized_puts != 0 && payload_nob >= *kqswnal_tunables.kqn_optimized_puts)) { lnet_libmd_t *md = lntmsg->msg_md; kqswnal_msg_t *msg = (kqswnal_msg_t *)ktx->ktx_buffer; lnet_hdr_t *mhdr; kqswnal_remotemd_t *rmd; /* Optimised path: I send over the Elan vaddrs of the local * buffers, and my peer DMAs directly to/from them. * * First I set up ktx as if it was going to send this * payload, (it needs to map it anyway). This fills * ktx_frags[1] and onward with the network addresses * of the buffer frags. */ if (the_lnet.ln_ptlcompat == 2) { /* Strong portals compatibility: send "raw" LNET * header + rdma descriptor */ mhdr = (lnet_hdr_t *)ktx->ktx_buffer; rmd = (kqswnal_remotemd_t *)(mhdr + 1); } else { /* Send an RDMA message */ msg->kqm_magic = LNET_PROTO_QSW_MAGIC; msg->kqm_version = QSWLND_PROTO_VERSION; msg->kqm_type = QSWLND_MSG_RDMA; mhdr = &msg->kqm_u.rdma.kqrm_hdr; rmd = &msg->kqm_u.rdma.kqrm_rmd; } *mhdr = *hdr; nob = (((char *)rmd) - ktx->ktx_buffer); if (type == LNET_MSG_GET) { if ((md->md_options & LNET_MD_KIOV) != 0) rc = kqswnal_map_tx_kiov (ktx, 0, md->md_length, md->md_niov, md->md_iov.kiov); else rc = kqswnal_map_tx_iov (ktx, 0, md->md_length, md->md_niov, md->md_iov.iov); ktx->ktx_state = KTX_GETTING; } else { if (payload_kiov != NULL) rc = kqswnal_map_tx_kiov(ktx, 0, payload_nob, payload_niov, payload_kiov); else rc = kqswnal_map_tx_iov(ktx, 0, payload_nob, payload_niov, payload_iov); ktx->ktx_state = KTX_PUTTING; } if (rc != 0) goto out; rmd->kqrmd_nfrag = ktx->ktx_nfrag - 1; nob += offsetof(kqswnal_remotemd_t, kqrmd_frag[rmd->kqrmd_nfrag]); LASSERT (nob <= KQSW_TX_BUFFER_SIZE); memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1], rmd->kqrmd_nfrag * sizeof(EP_NMD)); ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, 0, nob);#if KQSW_CKSUM LASSERT (the_lnet.ln_ptlcompat != 2); msg->kqm_nob = nob + payload_nob; msg->kqm_cksum = 0; msg->kqm_cksum = kqswnal_csum(~0, (char *)msg, nob);#endif if (type == LNET_MSG_GET) { /* Allocate reply message now while I'm in thread context */ ktx->ktx_args[2] = lnet_create_reply_msg ( kqswnal_data.kqn_ni, lntmsg); if (ktx->ktx_args[2] == NULL) goto out; /* NB finalizing the REPLY message is my * responsibility now, whatever happens. */#if KQSW_CKSUM if (*kqswnal_tunables.kqn_inject_csum_error == 3) { msg->kqm_cksum++; *kqswnal_tunables.kqn_inject_csum_error = 0; } } else if (payload_kiov != NULL) { /* must checksum payload after header so receiver can * compute partial header cksum before swab. Sadly * this causes 2 rounds of kmap */ msg->kqm_cksum = kqswnal_csum_kiov(msg->kqm_cksum, 0, payload_nob, payload_niov, payload_kiov); if (*kqswnal_tunables.kqn_inject_csum_error == 2) { msg->kqm_cksum++; *kqswnal_tunables.kqn_inject_csum_error = 0; } } else { msg->kqm_cksum = kqswnal_csum_iov(msg->kqm_cksum, 0, payload_nob, payload_niov, payload_iov); if (*kqswnal_tunables.kqn_inject_csum_error == 2) { msg->kqm_cksum++; *kqswnal_tunables.kqn_inject_csum_error = 0; }#endif } } else if (payload_nob <= *kqswnal_tunables.kqn_tx_maxcontig) { lnet_hdr_t *mhdr; char *payload; kqswnal_msg_t *msg = (kqswnal_msg_t *)ktx->ktx_buffer; /* small message: single frag copied into the pre-mapped buffer */ if (the_lnet.ln_ptlcompat == 2) { /* Strong portals compatibility: send "raw" LNET header * + payload */ mhdr = (lnet_hdr_t *)ktx->ktx_buffer; payload = (char *)(mhdr + 1); } else { /* Send an IMMEDIATE message */ msg->kqm_magic = LNET_PROTO_QSW_MAGIC; msg->kqm_version = QSWLND_PROTO_VERSION; msg->kqm_type = QSWLND_MSG_IMMEDIATE; mhdr = &msg->kqm_u.immediate.kqim_hdr; payload = msg->kqm_u.immediate.kqim_payload; } *mhdr = *hdr; nob = (payload - ktx->ktx_buffer) + payload_nob; ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, 0, nob); if (payload_kiov != NULL) lnet_copy_kiov2flat(KQSW_TX_BUFFER_SIZE, payload, 0, payload_niov, payload_kiov, payload_offset, payload_nob); else lnet_copy_iov2flat(KQSW_TX_BUFFER_SIZE, payload, 0, payload_niov, payload_iov, payload_offset, payload_nob);#if KQSW_CKSUM LASSERT (the_lnet.ln_ptlcompat != 2); msg->kqm_nob = nob; msg->kqm_cksum = 0; msg->kqm_cksum = kqswnal_csum(~0, (char *)msg, nob); if (*kqswnal_tunables.kqn_inject_csum_error == 1) { msg->kqm_cksum++; *kqswnal_tunables.kqn_inject_csum_error = 0; }#endif } else { lnet_hdr_t *mhdr; kqswnal_msg_t *msg = (kqswnal_msg_t *)ktx->ktx_buffer; /* large message: multiple frags: first is hdr in pre-mapped buffer */ if (the_lnet.ln_ptlcompat == 2) { /* Strong portals compatibility: send "raw" LNET header * + payload */ mhdr = (lnet_hdr_t *)ktx->ktx_buffer; nob = sizeof(lnet_hdr_t); } else { /* Send an IMMEDIATE message */ msg->kqm_magic = LNET_PROTO_QSW_MAGIC; msg->kqm_version = QSWLND_PROTO_VERSION; msg->kqm_type = QSWLND_MSG_IMMEDIATE; mhdr = &msg->kqm_u.immediate.kqim_hdr; nob = offsetof(kqswnal_msg_t, kqm_u.immediate.kqim_payload); } *mhdr = *hdr; ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, 0, nob); if (payload_kiov != NULL) rc = kqswnal_map_tx_kiov (ktx, payload_offset, payload_nob, payload_niov, payload_kiov); else rc = kqswnal_map_tx_iov (ktx, payload_offset, payload_nob, payload_niov, payload_iov); if (rc != 0) goto out;#if KQSW_CKSUM msg->kqm_nob = nob + payload_nob; msg->kqm_cksum = 0; msg->kqm_cksum = kqswnal_csum(~0, (char *)msg, nob); msg->kqm_cksum = (payload_kiov != NULL) ? kqswnal_csum_kiov(msg->kqm_cksum, payload_offset, payload_nob, payload_niov, payload_kiov) : kqswnal_csum_iov(msg->kqm_cksum, payload_offset, payload_nob, payload_niov, payload_iov); if (*kqswnal_tunables.kqn_inject_csum_error == 1) { msg->kqm_cksum++; *kqswnal_tunables.kqn_inject_csum_error = 0; }#endif nob += payload_nob; } ktx->ktx_port = (nob <= KQSW_SMALLMSG) ? EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE; rc = kqswnal_launch (ktx); out: CDEBUG(rc == 0 ? D_NET : D_NETERROR, "%s %d bytes to %s%s: rc %d\n", routing ? (rc == 0 ? "Routed" : "Failed to route") : (rc == 0 ? "Sent" : "Failed to send"), nob, libcfs_nid2str(target.nid), target_is_router ? "(router)" : "", rc); if (rc != 0) { lnet_msg_t *repmsg = (lnet_msg_t *)ktx->ktx_args[2]; int state = ktx->ktx_state; kqswnal_put_idle_tx (ktx); if (state == KTX_GETTING && repmsg != NULL) { /* We committed to reply, but there was a problem * launching the GET. We can't avoid delivering a * REPLY event since we committed above, so we * pretend the GET succeeded but the REPLY * failed. */ rc = 0; lnet_finalize (kqswnal_data.kqn_ni, lntmsg, 0); lnet_finalize (kqswnal_data.kqn_ni, repmsg, -EIO); } } atomic_dec(&kqswnal_data.kqn_pending_txs); return (rc == 0 ? 0 : -EIO);}voidkqswnal_requeue_rx (kqswnal_rx_t *krx){ LASSERT (atomic_read(&krx->krx_refcount) == 0); LASSERT (!krx->krx_rpc_reply_needed); krx->krx_state = KRX_POSTED; if (kqswnal_data.kqn_shuttingdown) { /* free EKC rxd on shutdown */ ep_complete_receive(krx->krx_rxd); } else { /* repost receive */ ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx, &krx->krx_elanbuffer, 0); }}voidkqswnal_rpc_complete (EP_RXD *rxd){ int status = ep_rxd_status(rxd); kqswnal_rx_t *krx = (kqswnal_rx_t *)ep_rxd_arg(rxd); CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR, "rxd %p, krx %p, status %d\n", rxd, krx, status); LASSERT (krx->krx_rxd == rxd); LASSERT (krx->krx_rpc_reply_needed); krx->krx_rpc_reply_needed = 0; kqswnal_requeue_rx (krx);}voidkqswnal_rx_done (kqswnal_rx_t *krx) { int rc; LASSERT (atomic_read(&krx->krx_refcount) == 0); if (krx->krx_rpc_reply_needed) { /* We've not completed the peer's RPC yet... */ krx->krx_rpc_reply.msg.magic = LNET_PROTO_QSW_MAGIC; krx->krx_rpc_reply.msg.version = QSWLND_PROTO_VERSION; LASSERT (!in_interrupt()); rc = ep_complete_rpc(krx->krx_rxd, kqswnal_rpc_complete, krx, &krx->krx_rpc_reply.ep_statusblk, NULL, NULL, 0); if (rc == EP_SUCCESS) return; CERROR("can't complete RPC: %d\n", rc); krx->krx_rpc_reply_needed = 0; } kqswnal_requeue_rx(krx);} voidkqswnal_parse (kqswnal_rx_t *krx){ lnet_ni_t *ni = kqswnal_data.kqn_ni; kqswnal_msg_t *msg = (kqswnal_msg_t *)page_address(krx->krx_kiov[0].kiov_page); lnet_nid_t fromnid = kqswnal_rx_nid(krx); int swab; int n; int i; int nob; int rc; LASSERT (atomic_read(&krx->krx_refcount) == 1);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -