📄 qswlnd_cb.c
字号:
}#if 0static char *hdr_type_string (lnet_hdr_t *hdr){ switch (hdr->type) { case LNET_MSG_ACK: return ("ACK"); case LNET_MSG_PUT: return ("PUT"); case LNET_MSG_GET: return ("GET"); case LNET_MSG_REPLY: return ("REPLY"); default: return ("<UNKNOWN>"); }}static voidkqswnal_cerror_hdr(lnet_hdr_t * hdr){ char *type_str = hdr_type_string (hdr); CERROR("P3 Header at %p of type %s length %d\n", hdr, type_str, le32_to_cpu(hdr->payload_length)); CERROR(" From nid/pid "LPU64"/%u\n", le64_to_cpu(hdr->src_nid), le32_to_cpu(hdr->src_pid)); CERROR(" To nid/pid "LPU64"/%u\n", le64_to_cpu(hdr->dest_nid), le32_to_cpu(hdr->dest_pid)); switch (le32_to_cpu(hdr->type)) { case LNET_MSG_PUT: CERROR(" Ptl index %d, ack md "LPX64"."LPX64", " "match bits "LPX64"\n", le32_to_cpu(hdr->msg.put.ptl_index), hdr->msg.put.ack_wmd.wh_interface_cookie, hdr->msg.put.ack_wmd.wh_object_cookie, le64_to_cpu(hdr->msg.put.match_bits)); CERROR(" offset %d, hdr data "LPX64"\n", le32_to_cpu(hdr->msg.put.offset), hdr->msg.put.hdr_data); break; case LNET_MSG_GET: CERROR(" Ptl index %d, return md "LPX64"."LPX64", " "match bits "LPX64"\n", le32_to_cpu(hdr->msg.get.ptl_index), hdr->msg.get.return_wmd.wh_interface_cookie, hdr->msg.get.return_wmd.wh_object_cookie, hdr->msg.get.match_bits); CERROR(" Length %d, src offset %d\n", le32_to_cpu(hdr->msg.get.sink_length), le32_to_cpu(hdr->msg.get.src_offset)); break; case LNET_MSG_ACK: CERROR(" dst md "LPX64"."LPX64", manipulated length %d\n", hdr->msg.ack.dst_wmd.wh_interface_cookie, hdr->msg.ack.dst_wmd.wh_object_cookie, le32_to_cpu(hdr->msg.ack.mlength)); break; case LNET_MSG_REPLY: CERROR(" dst md "LPX64"."LPX64"\n", hdr->msg.reply.dst_wmd.wh_interface_cookie, hdr->msg.reply.dst_wmd.wh_object_cookie); }} /* end of print_hdr() */#endifintkqswnal_check_rdma (int nlfrag, EP_NMD *lfrag, int nrfrag, EP_NMD *rfrag){ int i; if (nlfrag != nrfrag) { CERROR("Can't cope with unequal # frags: %d local %d remote\n", nlfrag, nrfrag); return (-EINVAL); } for (i = 0; i < nlfrag; i++) if (lfrag[i].nmd_len != rfrag[i].nmd_len) { CERROR("Can't cope with unequal frags %d(%d):" " %d local %d remote\n", i, nlfrag, lfrag[i].nmd_len, rfrag[i].nmd_len); return (-EINVAL); } return (0);}kqswnal_remotemd_t *kqswnal_get_portalscompat_rmd (kqswnal_rx_t *krx){ /* Check that the RMD sent after the "raw" LNET header in a * portals-compatible QSWLND message is OK */ char *buffer = (char *)page_address(krx->krx_kiov[0].kiov_page); kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + sizeof(lnet_hdr_t)); /* Note RDMA addresses are sent in native endian-ness in the "old" * portals protocol so no swabbing... */ if (buffer + krx->krx_nob < (char *)(rmd + 1)) { /* msg too small to discover rmd size */ CERROR ("Incoming message [%d] too small for RMD (%d needed)\n", krx->krx_nob, (int)(((char *)(rmd + 1)) - buffer)); return (NULL); } if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) { /* rmd doesn't fit in the incoming message */ CERROR ("Incoming message [%d] too small for RMD[%d] (%d needed)\n", krx->krx_nob, rmd->kqrmd_nfrag, (int)(((char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) - buffer)); return (NULL); } return (rmd);}voidkqswnal_rdma_store_complete (EP_RXD *rxd) { int status = ep_rxd_status(rxd); kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd); kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0]; CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR, "rxd %p, ktx %p, status %d\n", rxd, ktx, status); LASSERT (ktx->ktx_state == KTX_RDMA_STORE); LASSERT (krx->krx_rxd == rxd); LASSERT (krx->krx_rpc_reply_needed); krx->krx_rpc_reply_needed = 0; kqswnal_rx_decref (krx); /* free ktx & finalize() its lnet_msg_t */ kqswnal_tx_done(ktx, (status == EP_SUCCESS) ? 0 : -ECONNABORTED);}voidkqswnal_rdma_fetch_complete (EP_RXD *rxd) { /* Completed fetching the PUT/REPLY data */ int status = ep_rxd_status(rxd); kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd); kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0]; CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR, "rxd %p, ktx %p, status %d\n", rxd, ktx, status); LASSERT (ktx->ktx_state == KTX_RDMA_FETCH); LASSERT (krx->krx_rxd == rxd); /* RPC completes with failure by default */ LASSERT (krx->krx_rpc_reply_needed); LASSERT (krx->krx_rpc_reply.msg.status != 0); if (status == EP_SUCCESS) { krx->krx_rpc_reply.msg.status = 0; status = 0; } else { /* Abandon RPC since get failed */ krx->krx_rpc_reply_needed = 0; status = -ECONNABORTED; } /* krx gets decref'd in kqswnal_tx_done_in_thread_context() */ LASSERT (krx->krx_state == KRX_PARSE); krx->krx_state = KRX_COMPLETING; /* free ktx & finalize() its lnet_msg_t */ kqswnal_tx_done(ktx, status);}intkqswnal_rdma (kqswnal_rx_t *krx, lnet_msg_t *lntmsg, int type, kqswnal_remotemd_t *rmd, unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov, unsigned int offset, unsigned int len){ kqswnal_tx_t *ktx; int eprc; int rc; /* Not both mapped and paged payload */ LASSERT (iov == NULL || kiov == NULL); /* RPC completes with failure by default */ LASSERT (krx->krx_rpc_reply_needed); LASSERT (krx->krx_rpc_reply.msg.status != 0); if (len == 0) { /* data got truncated to nothing. */ lnet_finalize(kqswnal_data.kqn_ni, lntmsg, 0); /* Let kqswnal_rx_done() complete the RPC with success */ krx->krx_rpc_reply.msg.status = 0; return (0); } /* NB I'm using 'ktx' just to map the local RDMA buffers; I'm not actually sending a portals message with it */ ktx = kqswnal_get_idle_tx(); if (ktx == NULL) { CERROR ("Can't get txd for RDMA with %s\n", libcfs_nid2str(kqswnal_rx_nid(krx))); return (-ENOMEM); } ktx->ktx_state = type; ktx->ktx_nid = kqswnal_rx_nid(krx); ktx->ktx_args[0] = krx; ktx->ktx_args[1] = lntmsg; LASSERT (atomic_read(&krx->krx_refcount) > 0); /* Take an extra ref for the completion callback */ atomic_inc(&krx->krx_refcount); /* Map on the rail the RPC prefers */ ktx->ktx_rail = ep_rcvr_prefrail(krx->krx_eprx, ep_rxd_railmask(krx->krx_rxd)); /* Start mapping at offset 0 (we're not mapping any headers) */ ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0; if (kiov != NULL) rc = kqswnal_map_tx_kiov(ktx, offset, len, niov, kiov); else rc = kqswnal_map_tx_iov(ktx, offset, len, niov, iov); if (rc != 0) { CERROR ("Can't map local RDMA data: %d\n", rc); goto out; } rc = kqswnal_check_rdma (ktx->ktx_nfrag, ktx->ktx_frags, rmd->kqrmd_nfrag, rmd->kqrmd_frag); if (rc != 0) { CERROR ("Incompatible RDMA descriptors\n"); goto out; } switch (type) { default: LBUG(); case KTX_RDMA_STORE: krx->krx_rpc_reply.msg.status = 0; krx->krx_rpc_reply.msg.magic = LNET_PROTO_QSW_MAGIC; krx->krx_rpc_reply.msg.version = QSWLND_PROTO_VERSION; krx->krx_rpc_reply.msg.u.get.len = len;#if KQSW_CKSUM krx->krx_rpc_reply.msg.u.get.cksum = (kiov != NULL) ? kqswnal_csum_kiov(~0, offset, len, niov, kiov) : kqswnal_csum_iov(~0, offset, len, niov, iov); if (*kqswnal_tunables.kqn_inject_csum_error == 4) { krx->krx_rpc_reply.msg.u.get.cksum++; *kqswnal_tunables.kqn_inject_csum_error = 0; }#endif eprc = ep_complete_rpc(krx->krx_rxd, kqswnal_rdma_store_complete, ktx, &krx->krx_rpc_reply.ep_statusblk, ktx->ktx_frags, rmd->kqrmd_frag, rmd->kqrmd_nfrag); if (eprc != EP_SUCCESS) { CERROR("can't complete RPC: %d\n", eprc); /* don't re-attempt RPC completion */ krx->krx_rpc_reply_needed = 0; rc = -ECONNABORTED; } break; case KTX_RDMA_FETCH: eprc = ep_rpc_get (krx->krx_rxd, kqswnal_rdma_fetch_complete, ktx, rmd->kqrmd_frag, ktx->ktx_frags, ktx->ktx_nfrag); if (eprc != EP_SUCCESS) { CERROR("ep_rpc_get failed: %d\n", eprc); /* Don't attempt RPC completion: * EKC nuked it when the get failed */ krx->krx_rpc_reply_needed = 0; rc = -ECONNABORTED; } break; } out: if (rc != 0) { kqswnal_rx_decref(krx); /* drop callback's ref */ kqswnal_put_idle_tx (ktx); } atomic_dec(&kqswnal_data.kqn_pending_txs); return (rc);}intkqswnal_send (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg){ lnet_hdr_t *hdr = &lntmsg->msg_hdr; int type = lntmsg->msg_type; lnet_process_id_t target = lntmsg->msg_target; int target_is_router = lntmsg->msg_target_is_router; int routing = lntmsg->msg_routing; unsigned int payload_niov = lntmsg->msg_niov; struct iovec *payload_iov = lntmsg->msg_iov; lnet_kiov_t *payload_kiov = lntmsg->msg_kiov; unsigned int payload_offset = lntmsg->msg_offset; unsigned int payload_nob = lntmsg->msg_len; int nob; kqswnal_tx_t *ktx; int rc; /* NB 1. hdr is in network byte order */ /* 2. 'private' depends on the message type */ CDEBUG(D_NET, "sending %u bytes in %d frags to %s\n", payload_nob, payload_niov, libcfs_id2str(target)); LASSERT (payload_nob == 0 || payload_niov > 0); LASSERT (payload_niov <= LNET_MAX_IOV); /* It must be OK to kmap() if required */ LASSERT (payload_kiov == NULL || !in_interrupt ()); /* payload is either all vaddrs or all pages */ LASSERT (!(payload_kiov != NULL && payload_iov != NULL)); if (kqswnal_nid2elanid (target.nid) < 0) { CERROR("%s not in my cluster\n", libcfs_nid2str(target.nid)); return -EIO; } /* I may not block for a transmit descriptor if I might block the * router, receiver, or an interrupt handler. */ ktx = kqswnal_get_idle_tx();
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -