📄 qswlnd_cb.c
字号:
offset = 0; /* iov must not run out before end of data */ LASSERT (nob == 0 || niov > 0); } while (nob > 0); return csum;}#endifvoidkqswnal_put_idle_tx (kqswnal_tx_t *ktx){ unsigned long flags; kqswnal_unmap_tx (ktx); /* release temporary mappings */ ktx->ktx_state = KTX_IDLE; spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags); list_del (&ktx->ktx_list); /* take off active list */ list_add (&ktx->ktx_list, &kqswnal_data.kqn_idletxds); spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);}kqswnal_tx_t *kqswnal_get_idle_tx (void){ unsigned long flags; kqswnal_tx_t *ktx; spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags); if (kqswnal_data.kqn_shuttingdown || list_empty (&kqswnal_data.kqn_idletxds)) { spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags); return NULL; } ktx = list_entry (kqswnal_data.kqn_idletxds.next, kqswnal_tx_t, ktx_list); list_del (&ktx->ktx_list); list_add (&ktx->ktx_list, &kqswnal_data.kqn_activetxds); ktx->ktx_launcher = current->pid; atomic_inc(&kqswnal_data.kqn_pending_txs); spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags); /* Idle descs can't have any mapped (as opposed to pre-mapped) pages */ LASSERT (ktx->ktx_nmappedpages == 0); return (ktx);}voidkqswnal_tx_done_in_thread_context (kqswnal_tx_t *ktx){ lnet_msg_t *lnetmsg0 = NULL; lnet_msg_t *lnetmsg1 = NULL; int status0 = 0; int status1 = 0; kqswnal_rx_t *krx; LASSERT (!in_interrupt()); if (ktx->ktx_status == -EHOSTDOWN) kqswnal_notify_peer_down(ktx); switch (ktx->ktx_state) { case KTX_RDMA_FETCH: /* optimized PUT/REPLY handled */ krx = (kqswnal_rx_t *)ktx->ktx_args[0]; lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1]; status0 = ktx->ktx_status;#if KQSW_CKSUM if (status0 == 0) { /* RDMA succeeded */ kqswnal_msg_t *msg; __u32 csum; msg = (kqswnal_msg_t *) page_address(krx->krx_kiov[0].kiov_page); csum = (lnetmsg0->msg_kiov != NULL) ? kqswnal_csum_kiov(krx->krx_cksum, lnetmsg0->msg_offset, lnetmsg0->msg_wanted, lnetmsg0->msg_niov, lnetmsg0->msg_kiov) : kqswnal_csum_iov(krx->krx_cksum, lnetmsg0->msg_offset, lnetmsg0->msg_wanted, lnetmsg0->msg_niov, lnetmsg0->msg_iov); /* Can only check csum if I got it all */ if (lnetmsg0->msg_wanted == lnetmsg0->msg_len && csum != msg->kqm_cksum) { ktx->ktx_status = -EIO; krx->krx_rpc_reply.msg.status = -EIO; CERROR("RDMA checksum failed %u(%u) from %s\n", csum, msg->kqm_cksum, libcfs_nid2str(kqswnal_rx_nid(krx))); } }#endif LASSERT (krx->krx_state == KRX_COMPLETING); kqswnal_rx_decref (krx); break; case KTX_RDMA_STORE: /* optimized GET handled */ case KTX_PUTTING: /* optimized PUT sent */ case KTX_SENDING: /* normal send */ lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1]; status0 = ktx->ktx_status; break; case KTX_GETTING: /* optimized GET sent & payload received */ /* Complete the GET with success since we can't avoid * delivering a REPLY event; we committed to it when we * launched the GET */ lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1]; status0 = 0; lnetmsg1 = (lnet_msg_t *)ktx->ktx_args[2]; status1 = ktx->ktx_status;#if KQSW_CKSUM if (status1 == 0) { /* RDMA succeeded */ lnet_msg_t *lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1]; lnet_libmd_t *md = lnetmsg0->msg_md; __u32 csum; csum = ((md->md_options & LNET_MD_KIOV) != 0) ? kqswnal_csum_kiov(~0, 0, md->md_length, md->md_niov, md->md_iov.kiov) : kqswnal_csum_iov(~0, 0, md->md_length, md->md_niov, md->md_iov.iov); if (csum != ktx->ktx_cksum) { CERROR("RDMA checksum failed %u(%u) from %s\n", csum, ktx->ktx_cksum, libcfs_nid2str(ktx->ktx_nid)); status1 = -EIO; } }#endif break; default: LASSERT (0); } kqswnal_put_idle_tx (ktx); lnet_finalize (kqswnal_data.kqn_ni, lnetmsg0, status0); if (lnetmsg1 != NULL) lnet_finalize (kqswnal_data.kqn_ni, lnetmsg1, status1);}voidkqswnal_tx_done (kqswnal_tx_t *ktx, int status){ unsigned long flags; ktx->ktx_status = status; if (!in_interrupt()) { kqswnal_tx_done_in_thread_context(ktx); return; } /* Complete the send in thread context */ spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags); list_add_tail(&ktx->ktx_schedlist, &kqswnal_data.kqn_donetxds); wake_up(&kqswnal_data.kqn_sched_waitq); spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock, flags);}static voidkqswnal_txhandler(EP_TXD *txd, void *arg, int status){ kqswnal_tx_t *ktx = (kqswnal_tx_t *)arg; kqswnal_rpc_reply_t *reply; LASSERT (txd != NULL); LASSERT (ktx != NULL); CDEBUG(D_NET, "txd %p, arg %p status %d\n", txd, arg, status); if (status != EP_SUCCESS) { CDEBUG (D_NETERROR, "Tx completion to %s failed: %d\n", libcfs_nid2str(ktx->ktx_nid), status); status = -EHOSTDOWN; } else switch (ktx->ktx_state) { case KTX_GETTING: case KTX_PUTTING: /* RPC complete! */ reply = (kqswnal_rpc_reply_t *)ep_txd_statusblk(txd); if (reply->msg.magic == 0) { /* "old" peer */ status = reply->msg.status; break; } if (reply->msg.magic != LNET_PROTO_QSW_MAGIC) { if (reply->msg.magic != swab32(LNET_PROTO_QSW_MAGIC)) { CERROR("%s unexpected rpc reply magic %08x\n", libcfs_nid2str(ktx->ktx_nid), reply->msg.magic); status = -EPROTO; break; } __swab32s(&reply->msg.status); __swab32s(&reply->msg.version); if (ktx->ktx_state == KTX_GETTING) { __swab32s(&reply->msg.u.get.len); __swab32s(&reply->msg.u.get.cksum); } } status = reply->msg.status; if (status != 0) { CERROR("%s RPC status %08x\n", libcfs_nid2str(ktx->ktx_nid), status); break; } if (ktx->ktx_state == KTX_GETTING) { lnet_set_reply_msg_len(kqswnal_data.kqn_ni, (lnet_msg_t *)ktx->ktx_args[2], reply->msg.u.get.len);#if KQSW_CKSUM ktx->ktx_cksum = reply->msg.u.get.cksum;#endif } break; case KTX_SENDING: status = 0; break; default: LBUG(); break; } kqswnal_tx_done(ktx, status);}intkqswnal_launch (kqswnal_tx_t *ktx){ /* Don't block for transmit descriptor if we're in interrupt context */ int attr = in_interrupt() ? (EP_NO_SLEEP | EP_NO_ALLOC) : 0; int dest = kqswnal_nid2elanid (ktx->ktx_nid); unsigned long flags; int rc; ktx->ktx_launchtime = jiffies; if (kqswnal_data.kqn_shuttingdown) return (-ESHUTDOWN); LASSERT (dest >= 0); /* must be a peer */ if (ktx->ktx_nmappedpages != 0) attr = EP_SET_PREFRAIL(attr, ktx->ktx_rail); switch (ktx->ktx_state) { case KTX_GETTING: case KTX_PUTTING: if (the_lnet.ln_testprotocompat != 0 && the_lnet.ln_ptlcompat == 0) { kqswnal_msg_t *msg = (kqswnal_msg_t *)ktx->ktx_buffer; /* single-shot proto test: * Future version queries will use an RPC, so I'll * co-opt one of the existing ones */ LNET_LOCK(); if ((the_lnet.ln_testprotocompat & 1) != 0) { msg->kqm_version++; the_lnet.ln_testprotocompat &= ~1; } if ((the_lnet.ln_testprotocompat & 2) != 0) { msg->kqm_magic = LNET_PROTO_MAGIC; the_lnet.ln_testprotocompat &= ~2; } LNET_UNLOCK(); } /* NB ktx_frag[0] is the GET/PUT hdr + kqswnal_remotemd_t. * The other frags are the payload, awaiting RDMA */ rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest, ktx->ktx_port, attr, kqswnal_txhandler, ktx, NULL, ktx->ktx_frags, 1); break; case KTX_SENDING: rc = ep_transmit_message(kqswnal_data.kqn_eptx, dest, ktx->ktx_port, attr, kqswnal_txhandler, ktx, NULL, ktx->ktx_frags, ktx->ktx_nfrag); break; default: LBUG(); rc = -EINVAL; /* no compiler warning please */ break; } switch (rc) { case EP_SUCCESS: /* success */ return (0); case EP_ENOMEM: /* can't allocate ep txd => queue for later */ spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags); list_add_tail (&ktx->ktx_schedlist, &kqswnal_data.kqn_delayedtxds); wake_up (&kqswnal_data.kqn_sched_waitq); spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags); return (0); default: /* fatal error */ CDEBUG (D_NETERROR, "Tx to %s failed: %d\n", libcfs_nid2str(ktx->ktx_nid), rc); kqswnal_notify_peer_down(ktx); return (-EHOSTUNREACH); }
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -