⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 qswlnd_cb.c

📁 非常经典的一个分布式系统
💻 C
📖 第 1 页 / 共 5 页
字号:
                offset = 0;                /* iov must not run out before end of data */                LASSERT (nob == 0 || niov > 0);        } while (nob > 0);        return csum;}#endifvoidkqswnal_put_idle_tx (kqswnal_tx_t *ktx){        unsigned long     flags;        kqswnal_unmap_tx (ktx);                 /* release temporary mappings */        ktx->ktx_state = KTX_IDLE;        spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);        list_del (&ktx->ktx_list);              /* take off active list */        list_add (&ktx->ktx_list, &kqswnal_data.kqn_idletxds);        spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);}kqswnal_tx_t *kqswnal_get_idle_tx (void){        unsigned long  flags;        kqswnal_tx_t  *ktx;        spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);        if (kqswnal_data.kqn_shuttingdown ||            list_empty (&kqswnal_data.kqn_idletxds)) {                spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);                return NULL;        }        ktx = list_entry (kqswnal_data.kqn_idletxds.next, kqswnal_tx_t, ktx_list);        list_del (&ktx->ktx_list);        list_add (&ktx->ktx_list, &kqswnal_data.kqn_activetxds);        ktx->ktx_launcher = current->pid;        atomic_inc(&kqswnal_data.kqn_pending_txs);        spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);        /* Idle descs can't have any mapped (as opposed to pre-mapped) pages */        LASSERT (ktx->ktx_nmappedpages == 0);        return (ktx);}voidkqswnal_tx_done_in_thread_context (kqswnal_tx_t *ktx){        lnet_msg_t    *lnetmsg0 = NULL;        lnet_msg_t    *lnetmsg1 = NULL;        int            status0  = 0;        int            status1  = 0;        kqswnal_rx_t  *krx;                LASSERT (!in_interrupt());                if (ktx->ktx_status == -EHOSTDOWN)                kqswnal_notify_peer_down(ktx);        switch (ktx->ktx_state) {        case KTX_RDMA_FETCH:                    /* optimized PUT/REPLY handled */                krx      = (kqswnal_rx_t *)ktx->ktx_args[0];                lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1];                status0  = ktx->ktx_status;#if KQSW_CKSUM                if (status0 == 0) {             /* RDMA succeeded */                        kqswnal_msg_t *msg;                        __u32          csum;                        msg = (kqswnal_msg_t *)                              page_address(krx->krx_kiov[0].kiov_page);                        csum = (lnetmsg0->msg_kiov != NULL) ?                               kqswnal_csum_kiov(krx->krx_cksum,                                                 lnetmsg0->msg_offset,                                                 lnetmsg0->msg_wanted,                                                 lnetmsg0->msg_niov,                                                 lnetmsg0->msg_kiov) :                               kqswnal_csum_iov(krx->krx_cksum,                                                lnetmsg0->msg_offset,                                                lnetmsg0->msg_wanted,                                                lnetmsg0->msg_niov,                                                lnetmsg0->msg_iov);                        /* Can only check csum if I got it all */                        if (lnetmsg0->msg_wanted == lnetmsg0->msg_len &&                            csum != msg->kqm_cksum) {                                ktx->ktx_status = -EIO;                                krx->krx_rpc_reply.msg.status = -EIO;                                CERROR("RDMA checksum failed %u(%u) from %s\n",                                       csum, msg->kqm_cksum,                                       libcfs_nid2str(kqswnal_rx_nid(krx)));                        }                }#endif                       LASSERT (krx->krx_state == KRX_COMPLETING);                kqswnal_rx_decref (krx);                break;        case KTX_RDMA_STORE:       /* optimized GET handled */        case KTX_PUTTING:          /* optimized PUT sent */        case KTX_SENDING:          /* normal send */                lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1];                status0  = ktx->ktx_status;                break;        case KTX_GETTING:          /* optimized GET sent & payload received */                /* Complete the GET with success since we can't avoid                 * delivering a REPLY event; we committed to it when we                 * launched the GET */                lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1];                status0  = 0;                lnetmsg1 = (lnet_msg_t *)ktx->ktx_args[2];                status1  = ktx->ktx_status;#if KQSW_CKSUM                if (status1 == 0) {             /* RDMA succeeded */                        lnet_msg_t   *lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1];                        lnet_libmd_t *md = lnetmsg0->msg_md;                        __u32         csum;                                        csum = ((md->md_options & LNET_MD_KIOV) != 0) ?                                kqswnal_csum_kiov(~0, 0,                                                 md->md_length,                                                 md->md_niov,                                                  md->md_iov.kiov) :                               kqswnal_csum_iov(~0, 0,                                                md->md_length,                                                md->md_niov,                                                md->md_iov.iov);                        if (csum != ktx->ktx_cksum) {                                CERROR("RDMA checksum failed %u(%u) from %s\n",                                       csum, ktx->ktx_cksum,                                       libcfs_nid2str(ktx->ktx_nid));                                status1 = -EIO;                        }                }#endif                                break;        default:                LASSERT (0);        }        kqswnal_put_idle_tx (ktx);        lnet_finalize (kqswnal_data.kqn_ni, lnetmsg0, status0);        if (lnetmsg1 != NULL)                lnet_finalize (kqswnal_data.kqn_ni, lnetmsg1, status1);}voidkqswnal_tx_done (kqswnal_tx_t *ktx, int status){        unsigned long      flags;        ktx->ktx_status = status;        if (!in_interrupt()) {                kqswnal_tx_done_in_thread_context(ktx);                return;        }        /* Complete the send in thread context */        spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);                list_add_tail(&ktx->ktx_schedlist,                       &kqswnal_data.kqn_donetxds);        wake_up(&kqswnal_data.kqn_sched_waitq);                spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock, flags);}static voidkqswnal_txhandler(EP_TXD *txd, void *arg, int status){        kqswnal_tx_t         *ktx = (kqswnal_tx_t *)arg;        kqswnal_rpc_reply_t  *reply;        LASSERT (txd != NULL);        LASSERT (ktx != NULL);        CDEBUG(D_NET, "txd %p, arg %p status %d\n", txd, arg, status);        if (status != EP_SUCCESS) {                CDEBUG (D_NETERROR, "Tx completion to %s failed: %d\n",                         libcfs_nid2str(ktx->ktx_nid), status);                status = -EHOSTDOWN;        } else switch (ktx->ktx_state) {        case KTX_GETTING:        case KTX_PUTTING:                /* RPC complete! */                reply = (kqswnal_rpc_reply_t *)ep_txd_statusblk(txd);                if (reply->msg.magic == 0) {    /* "old" peer */                        status = reply->msg.status;                        break;                }                                if (reply->msg.magic != LNET_PROTO_QSW_MAGIC) {                        if (reply->msg.magic != swab32(LNET_PROTO_QSW_MAGIC)) {                                CERROR("%s unexpected rpc reply magic %08x\n",                                       libcfs_nid2str(ktx->ktx_nid),                                       reply->msg.magic);                                status = -EPROTO;                                break;                        }                        __swab32s(&reply->msg.status);                        __swab32s(&reply->msg.version);                                                if (ktx->ktx_state == KTX_GETTING) {                                __swab32s(&reply->msg.u.get.len);                                __swab32s(&reply->msg.u.get.cksum);                        }                }                                        status = reply->msg.status;                if (status != 0) {                        CERROR("%s RPC status %08x\n",                               libcfs_nid2str(ktx->ktx_nid), status);                        break;                }                if (ktx->ktx_state == KTX_GETTING) {                        lnet_set_reply_msg_len(kqswnal_data.kqn_ni,                                               (lnet_msg_t *)ktx->ktx_args[2],                                               reply->msg.u.get.len);#if KQSW_CKSUM                        ktx->ktx_cksum = reply->msg.u.get.cksum;#endif                }                break;                        case KTX_SENDING:                status = 0;                break;                        default:                LBUG();                break;        }        kqswnal_tx_done(ktx, status);}intkqswnal_launch (kqswnal_tx_t *ktx){        /* Don't block for transmit descriptor if we're in interrupt context */        int   attr = in_interrupt() ? (EP_NO_SLEEP | EP_NO_ALLOC) : 0;        int   dest = kqswnal_nid2elanid (ktx->ktx_nid);        unsigned long flags;        int   rc;        ktx->ktx_launchtime = jiffies;        if (kqswnal_data.kqn_shuttingdown)                return (-ESHUTDOWN);        LASSERT (dest >= 0);                    /* must be a peer */        if (ktx->ktx_nmappedpages != 0)                attr = EP_SET_PREFRAIL(attr, ktx->ktx_rail);        switch (ktx->ktx_state) {        case KTX_GETTING:        case KTX_PUTTING:                if (the_lnet.ln_testprotocompat != 0 &&                    the_lnet.ln_ptlcompat == 0) {                        kqswnal_msg_t *msg = (kqswnal_msg_t *)ktx->ktx_buffer;                        /* single-shot proto test:                         * Future version queries will use an RPC, so I'll                         * co-opt one of the existing ones */                        LNET_LOCK();                        if ((the_lnet.ln_testprotocompat & 1) != 0) {                                msg->kqm_version++;                                the_lnet.ln_testprotocompat &= ~1;                        }                        if ((the_lnet.ln_testprotocompat & 2) != 0) {                                msg->kqm_magic = LNET_PROTO_MAGIC;                                the_lnet.ln_testprotocompat &= ~2;                        }                        LNET_UNLOCK();                }                /* NB ktx_frag[0] is the GET/PUT hdr + kqswnal_remotemd_t.                 * The other frags are the payload, awaiting RDMA */                rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,                                     ktx->ktx_port, attr,                                     kqswnal_txhandler, ktx,                                     NULL, ktx->ktx_frags, 1);                break;        case KTX_SENDING:                rc = ep_transmit_message(kqswnal_data.kqn_eptx, dest,                                         ktx->ktx_port, attr,                                         kqswnal_txhandler, ktx,                                         NULL, ktx->ktx_frags, ktx->ktx_nfrag);                break;                        default:                LBUG();                rc = -EINVAL;                   /* no compiler warning please */                break;        }        switch (rc) {        case EP_SUCCESS: /* success */                return (0);        case EP_ENOMEM: /* can't allocate ep txd => queue for later */                spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);                list_add_tail (&ktx->ktx_schedlist, &kqswnal_data.kqn_delayedtxds);                wake_up (&kqswnal_data.kqn_sched_waitq);                spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);                return (0);        default: /* fatal error */                CDEBUG (D_NETERROR, "Tx to %s failed: %d\n", libcfs_nid2str(ktx->ktx_nid), rc);                kqswnal_notify_peer_down(ktx);                return (-EHOSTUNREACH);        }

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -