📄 qswlnd_cb.c
字号:
/* If ln_ptlcompat is set, peers may send me an "old" unencapsulated * lnet hdr */ LASSERT (offsetof(kqswnal_msg_t, kqm_u) <= sizeof(lnet_hdr_t)); if (krx->krx_nob < offsetof(kqswnal_msg_t, kqm_u)) { CERROR("Short message %d received from %s\n", krx->krx_nob, libcfs_nid2str(fromnid)); goto done; } swab = msg->kqm_magic == __swab32(LNET_PROTO_QSW_MAGIC); if (swab || msg->kqm_magic == LNET_PROTO_QSW_MAGIC) {#if KQSW_CKSUM __u32 csum0; __u32 csum1; /* csum byte array before swab */ csum1 = msg->kqm_cksum; msg->kqm_cksum = 0; csum0 = kqswnal_csum_kiov(~0, 0, krx->krx_nob, krx->krx_npages, krx->krx_kiov); msg->kqm_cksum = csum1;#endif if (swab) { __swab16s(&msg->kqm_version); __swab16s(&msg->kqm_type);#if KQSW_CKSUM __swab32s(&msg->kqm_cksum); __swab32s(&msg->kqm_nob);#endif } if (msg->kqm_version != QSWLND_PROTO_VERSION) { /* Future protocol version compatibility support! * The next qswlnd-specific protocol rev will first * send an RPC to check version. * 1.4.6 and 1.4.7.early reply with a status * block containing its current version. * Later versions send a failure (-ve) status + * magic/version */ if (!krx->krx_rpc_reply_needed) { CERROR("Unexpected version %d from %s\n", msg->kqm_version, libcfs_nid2str(fromnid)); goto done; } LASSERT (krx->krx_rpc_reply.msg.status == -EPROTO); goto done; } switch (msg->kqm_type) { default: CERROR("Bad request type %x from %s\n", msg->kqm_type, libcfs_nid2str(fromnid)); goto done; case QSWLND_MSG_IMMEDIATE: if (krx->krx_rpc_reply_needed) { /* Should have been a simple message */ CERROR("IMMEDIATE sent as RPC from %s\n", libcfs_nid2str(fromnid)); goto done; } nob = offsetof(kqswnal_msg_t, kqm_u.immediate.kqim_payload); if (krx->krx_nob < nob) { CERROR("Short IMMEDIATE %d(%d) from %s\n", krx->krx_nob, nob, libcfs_nid2str(fromnid)); goto done; }#if KQSW_CKSUM if (csum0 != msg->kqm_cksum) { CERROR("Bad IMMEDIATE checksum %08x(%08x) from %s\n", csum0, msg->kqm_cksum, libcfs_nid2str(fromnid)); CERROR("nob %d (%d)\n", krx->krx_nob, msg->kqm_nob); goto done; }#endif rc = lnet_parse(ni, &msg->kqm_u.immediate.kqim_hdr, fromnid, krx, 0); if (rc < 0) goto done; return; case QSWLND_MSG_RDMA: if (!krx->krx_rpc_reply_needed) { /* Should have been a simple message */ CERROR("RDMA sent as simple message from %s\n", libcfs_nid2str(fromnid)); goto done; } nob = offsetof(kqswnal_msg_t, kqm_u.rdma.kqrm_rmd.kqrmd_frag[0]); if (krx->krx_nob < nob) { CERROR("Short RDMA message %d(%d) from %s\n", krx->krx_nob, nob, libcfs_nid2str(fromnid)); goto done; } if (swab) __swab32s(&msg->kqm_u.rdma.kqrm_rmd.kqrmd_nfrag); n = msg->kqm_u.rdma.kqrm_rmd.kqrmd_nfrag; nob = offsetof(kqswnal_msg_t, kqm_u.rdma.kqrm_rmd.kqrmd_frag[n]); if (krx->krx_nob < nob) { CERROR("short RDMA message %d(%d) from %s\n", krx->krx_nob, nob, libcfs_nid2str(fromnid)); goto done; } if (swab) { for (i = 0; i < n; i++) { EP_NMD *nmd = &msg->kqm_u.rdma.kqrm_rmd.kqrmd_frag[i]; __swab32s(&nmd->nmd_addr); __swab32s(&nmd->nmd_len); __swab32s(&nmd->nmd_attr); } }#if KQSW_CKSUM krx->krx_cksum = csum0; /* stash checksum so far */#endif rc = lnet_parse(ni, &msg->kqm_u.rdma.kqrm_hdr, fromnid, krx, 1); if (rc < 0) goto done; return; } /* Not Reached */ } if (msg->kqm_magic == LNET_PROTO_MAGIC || msg->kqm_magic == __swab32(LNET_PROTO_MAGIC)) { /* Future protocol version compatibility support! * When LNET unifies protocols over all LNDs, the first thing a * peer will send will be a version query RPC. * 1.4.6 and 1.4.7.early reply with a status block containing * LNET_PROTO_QSW_MAGIC.. * Later versions send a failure (-ve) status + * magic/version */ if (!krx->krx_rpc_reply_needed) { CERROR("Unexpected magic %08x from %s\n", msg->kqm_magic, libcfs_nid2str(fromnid)); goto done; } LASSERT (krx->krx_rpc_reply.msg.status == -EPROTO); goto done; } if (the_lnet.ln_ptlcompat != 0) { /* Portals compatibility (strong or weak) * This could be an unencapsulated LNET header. If it's big * enough, let LNET's parser sort it out */ if (krx->krx_nob < sizeof(lnet_hdr_t)) { CERROR("Short portals-compatible message from %s\n", libcfs_nid2str(fromnid)); goto done; } krx->krx_raw_lnet_hdr = 1; rc = lnet_parse(ni, (lnet_hdr_t *)msg, fromnid, krx, krx->krx_rpc_reply_needed); if (rc < 0) goto done; return; } CERROR("Unrecognised magic %08x from %s\n", msg->kqm_magic, libcfs_nid2str(fromnid)); done: kqswnal_rx_decref(krx);}/* Receive Interrupt Handler: posts to schedulers */void kqswnal_rxhandler(EP_RXD *rxd){ unsigned long flags; int nob = ep_rxd_len (rxd); int status = ep_rxd_status (rxd); kqswnal_rx_t *krx = (kqswnal_rx_t *)ep_rxd_arg (rxd); CDEBUG(D_NET, "kqswnal_rxhandler: rxd %p, krx %p, nob %d, status %d\n", rxd, krx, nob, status); LASSERT (krx != NULL); LASSERT (krx->krx_state == KRX_POSTED); krx->krx_state = KRX_PARSE; krx->krx_rxd = rxd; krx->krx_nob = nob; krx->krx_raw_lnet_hdr = 0; /* RPC reply iff rpc request received without error */ krx->krx_rpc_reply_needed = ep_rxd_isrpc(rxd) && (status == EP_SUCCESS || status == EP_MSG_TOO_BIG); /* Default to failure if an RPC reply is requested but not handled */ krx->krx_rpc_reply.msg.status = -EPROTO; atomic_set (&krx->krx_refcount, 1); if (status != EP_SUCCESS) { /* receives complete with failure when receiver is removed */ if (status == EP_SHUTDOWN) LASSERT (kqswnal_data.kqn_shuttingdown); else CERROR("receive status failed with status %d nob %d\n", ep_rxd_status(rxd), nob); kqswnal_rx_decref(krx); return; } if (!in_interrupt()) { kqswnal_parse(krx); return; } spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags); list_add_tail (&krx->krx_list, &kqswnal_data.kqn_readyrxds); wake_up (&kqswnal_data.kqn_sched_waitq); spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);}intkqswnal_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed, unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov, unsigned int offset, unsigned int mlen, unsigned int rlen){ kqswnal_rx_t *krx = (kqswnal_rx_t *)private; lnet_nid_t fromnid; kqswnal_msg_t *msg; lnet_hdr_t *hdr; kqswnal_remotemd_t *rmd; int msg_offset; int rc; LASSERT (!in_interrupt ()); /* OK to map */ /* Either all pages or all vaddrs */ LASSERT (!(kiov != NULL && iov != NULL)); fromnid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), ep_rxd_node(krx->krx_rxd)); msg = (kqswnal_msg_t *)page_address(krx->krx_kiov[0].kiov_page); if (krx->krx_rpc_reply_needed) { /* optimized (rdma) request sent as RPC */ if (krx->krx_raw_lnet_hdr) { LASSERT (the_lnet.ln_ptlcompat != 0); hdr = (lnet_hdr_t *)msg; rmd = kqswnal_get_portalscompat_rmd(krx); if (rmd == NULL) return (-EPROTO); } else { LASSERT (msg->kqm_type == QSWLND_MSG_RDMA); hdr = &msg->kqm_u.rdma.kqrm_hdr; rmd = &msg->kqm_u.rdma.kqrm_rmd; } /* NB header is still in wire byte order */ switch (le32_to_cpu(hdr->type)) { case LNET_MSG_PUT: case LNET_MSG_REPLY: /* This is an optimized PUT/REPLY */ rc = kqswnal_rdma(krx, lntmsg, KTX_RDMA_FETCH, rmd, niov, iov, kiov, offset, mlen); break; case LNET_MSG_GET:#if KQSW_CKSUM if (krx->krx_cksum != msg->kqm_cksum) { CERROR("Bad GET checksum %08x(%08x) from %s\n", krx->krx_cksum, msg->kqm_cksum, libcfs_nid2str(fromnid)); rc = -EIO; break; }#endif if (lntmsg == NULL) { /* No buffer match: my decref will * complete the RPC with failure */ rc = 0; } else { /* Matched something! */ rc = kqswnal_rdma(krx, lntmsg, KTX_RDMA_STORE, rmd, lntmsg->msg_niov, lntmsg->msg_iov, lntmsg->msg_kiov, lntmsg->msg_offset, lntmsg->msg_len); } break; default: CERROR("Bad RPC type %d\n", le32_to_cpu(hdr->type)); rc = -EPROTO; break; } kqswnal_rx_decref(krx); return rc; } if (krx->krx_raw_lnet_hdr) { LASSERT (the_lnet.ln_ptlcompat != 0); msg_offset = sizeof(lnet_hdr_t); } else { LASSERT (msg->kqm_type == QSWLND_MSG_IMMEDIATE); msg_offset = offsetof(kqswnal_msg_t, kqm_u.immediate.kqim_payload); } if (krx->krx_nob < msg_offset + rlen) { CERROR("Bad message size from %s: have %d, need %d + %d\n", libcfs_nid2str(fromnid), krx->krx_nob, msg_offset, rlen); kqswnal_rx_decref(krx); ret
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -