⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 qswlnd_cb.c

📁 非常经典的一个分布式系统
💻 C
📖 第 1 页 / 共 5 页
字号:
        if (ktx == NULL) {                CERROR ("Can't get txd for msg type %d for %s\n",                        type, libcfs_nid2str(target.nid));                return (-ENOMEM);        }        ktx->ktx_state   = KTX_SENDING;        ktx->ktx_nid     = target.nid;        ktx->ktx_args[0] = private;        ktx->ktx_args[1] = lntmsg;        ktx->ktx_args[2] = NULL;    /* set when a GET commits to REPLY */        /* The first frag will be the pre-mapped buffer. */        ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;        if ((!target_is_router &&               /* target.nid is final dest */             !routing &&                        /* I'm the source */             type == LNET_MSG_GET &&            /* optimize GET? */             *kqswnal_tunables.kqn_optimized_gets != 0 &&             lntmsg->msg_md->md_length >=              *kqswnal_tunables.kqn_optimized_gets) ||            ((type == LNET_MSG_PUT ||            /* optimize PUT? */              type == LNET_MSG_REPLY) &&         /* optimize REPLY? */             *kqswnal_tunables.kqn_optimized_puts != 0 &&             payload_nob >= *kqswnal_tunables.kqn_optimized_puts)) {                lnet_libmd_t       *md = lntmsg->msg_md;                kqswnal_msg_t      *msg = (kqswnal_msg_t *)ktx->ktx_buffer;                lnet_hdr_t         *mhdr;                kqswnal_remotemd_t *rmd;                /* Optimised path: I send over the Elan vaddrs of the local                 * buffers, and my peer DMAs directly to/from them.                 *                 * First I set up ktx as if it was going to send this                 * payload, (it needs to map it anyway).  This fills                 * ktx_frags[1] and onward with the network addresses                 * of the buffer frags. */                if (the_lnet.ln_ptlcompat == 2) {                        /* Strong portals compatibility: send "raw" LNET                         * header + rdma descriptor */                        mhdr = (lnet_hdr_t *)ktx->ktx_buffer;                        rmd  = (kqswnal_remotemd_t *)(mhdr + 1);                } else {                        /* Send an RDMA message */                        msg->kqm_magic = LNET_PROTO_QSW_MAGIC;                        msg->kqm_version = QSWLND_PROTO_VERSION;                        msg->kqm_type = QSWLND_MSG_RDMA;                        mhdr = &msg->kqm_u.rdma.kqrm_hdr;                        rmd  = &msg->kqm_u.rdma.kqrm_rmd;                }                *mhdr = *hdr;                nob = (((char *)rmd) - ktx->ktx_buffer);                if (type == LNET_MSG_GET) {                        if ((md->md_options & LNET_MD_KIOV) != 0)                                 rc = kqswnal_map_tx_kiov (ktx, 0, md->md_length,                                                          md->md_niov, md->md_iov.kiov);                        else                                rc = kqswnal_map_tx_iov (ktx, 0, md->md_length,                                                         md->md_niov, md->md_iov.iov);                        ktx->ktx_state = KTX_GETTING;                } else {                        if (payload_kiov != NULL)                                rc = kqswnal_map_tx_kiov(ktx, 0, payload_nob,                                                         payload_niov, payload_kiov);                        else                                rc = kqswnal_map_tx_iov(ktx, 0, payload_nob,                                                        payload_niov, payload_iov);                        ktx->ktx_state = KTX_PUTTING;                }                if (rc != 0)                        goto out;                rmd->kqrmd_nfrag = ktx->ktx_nfrag - 1;                nob += offsetof(kqswnal_remotemd_t,                                kqrmd_frag[rmd->kqrmd_nfrag]);                LASSERT (nob <= KQSW_TX_BUFFER_SIZE);                memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],                       rmd->kqrmd_nfrag * sizeof(EP_NMD));                ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, 0, nob);#if KQSW_CKSUM                LASSERT (the_lnet.ln_ptlcompat != 2);                msg->kqm_nob   = nob + payload_nob;                msg->kqm_cksum = 0;                msg->kqm_cksum = kqswnal_csum(~0, (char *)msg, nob);#endif                if (type == LNET_MSG_GET) {                        /* Allocate reply message now while I'm in thread context */                        ktx->ktx_args[2] = lnet_create_reply_msg (                                kqswnal_data.kqn_ni, lntmsg);                        if (ktx->ktx_args[2] == NULL)                                goto out;                        /* NB finalizing the REPLY message is my                         * responsibility now, whatever happens. */#if KQSW_CKSUM                        if (*kqswnal_tunables.kqn_inject_csum_error ==  3) {                                msg->kqm_cksum++;                                *kqswnal_tunables.kqn_inject_csum_error = 0;                        }                } else if (payload_kiov != NULL) {                        /* must checksum payload after header so receiver can                         * compute partial header cksum before swab.  Sadly                         * this causes 2 rounds of kmap */                        msg->kqm_cksum =                                kqswnal_csum_kiov(msg->kqm_cksum, 0, payload_nob,                                                  payload_niov, payload_kiov);                        if (*kqswnal_tunables.kqn_inject_csum_error ==  2) {                                msg->kqm_cksum++;                                *kqswnal_tunables.kqn_inject_csum_error = 0;                        }                } else {                        msg->kqm_cksum =                                kqswnal_csum_iov(msg->kqm_cksum, 0, payload_nob,                                                 payload_niov, payload_iov);                        if (*kqswnal_tunables.kqn_inject_csum_error ==  2) {                                msg->kqm_cksum++;                                *kqswnal_tunables.kqn_inject_csum_error = 0;                        }#endif                }                        } else if (payload_nob <= *kqswnal_tunables.kqn_tx_maxcontig) {                lnet_hdr_t    *mhdr;                char          *payload;                kqswnal_msg_t *msg = (kqswnal_msg_t *)ktx->ktx_buffer;                /* small message: single frag copied into the pre-mapped buffer */                if (the_lnet.ln_ptlcompat == 2) {                        /* Strong portals compatibility: send "raw" LNET header                         * + payload */                        mhdr = (lnet_hdr_t *)ktx->ktx_buffer;                        payload = (char *)(mhdr + 1);                } else {                        /* Send an IMMEDIATE message */                        msg->kqm_magic = LNET_PROTO_QSW_MAGIC;                        msg->kqm_version = QSWLND_PROTO_VERSION;                        msg->kqm_type = QSWLND_MSG_IMMEDIATE;                        mhdr = &msg->kqm_u.immediate.kqim_hdr;                        payload = msg->kqm_u.immediate.kqim_payload;                }                *mhdr = *hdr;                nob = (payload - ktx->ktx_buffer) + payload_nob;                ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, 0, nob);                if (payload_kiov != NULL)                        lnet_copy_kiov2flat(KQSW_TX_BUFFER_SIZE, payload, 0,                                            payload_niov, payload_kiov,                                             payload_offset, payload_nob);                else                        lnet_copy_iov2flat(KQSW_TX_BUFFER_SIZE, payload, 0,                                           payload_niov, payload_iov,                                            payload_offset, payload_nob);#if KQSW_CKSUM                LASSERT (the_lnet.ln_ptlcompat != 2);                msg->kqm_nob   = nob;                msg->kqm_cksum = 0;                msg->kqm_cksum = kqswnal_csum(~0, (char *)msg, nob);                if (*kqswnal_tunables.kqn_inject_csum_error == 1) {                        msg->kqm_cksum++;                        *kqswnal_tunables.kqn_inject_csum_error = 0;                }#endif        } else {                lnet_hdr_t    *mhdr;                kqswnal_msg_t *msg = (kqswnal_msg_t *)ktx->ktx_buffer;                /* large message: multiple frags: first is hdr in pre-mapped buffer */                if (the_lnet.ln_ptlcompat == 2) {                        /* Strong portals compatibility: send "raw" LNET header                         * + payload */                        mhdr = (lnet_hdr_t *)ktx->ktx_buffer;                        nob = sizeof(lnet_hdr_t);                } else {                        /* Send an IMMEDIATE message */                        msg->kqm_magic = LNET_PROTO_QSW_MAGIC;                        msg->kqm_version = QSWLND_PROTO_VERSION;                        msg->kqm_type = QSWLND_MSG_IMMEDIATE;                        mhdr = &msg->kqm_u.immediate.kqim_hdr;                        nob = offsetof(kqswnal_msg_t,                                       kqm_u.immediate.kqim_payload);                }                *mhdr = *hdr;                ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, 0, nob);                if (payload_kiov != NULL)                        rc = kqswnal_map_tx_kiov (ktx, payload_offset, payload_nob,                                                   payload_niov, payload_kiov);                else                        rc = kqswnal_map_tx_iov (ktx, payload_offset, payload_nob,                                                 payload_niov, payload_iov);                if (rc != 0)                        goto out;#if KQSW_CKSUM                msg->kqm_nob   = nob + payload_nob;                msg->kqm_cksum = 0;                msg->kqm_cksum = kqswnal_csum(~0, (char *)msg, nob);                msg->kqm_cksum = (payload_kiov != NULL) ?                                 kqswnal_csum_kiov(msg->kqm_cksum,                                                   payload_offset, payload_nob,                                                   payload_niov, payload_kiov) :                                 kqswnal_csum_iov(msg->kqm_cksum,                                                  payload_offset, payload_nob,                                                  payload_niov, payload_iov);                if (*kqswnal_tunables.kqn_inject_csum_error == 1) {                        msg->kqm_cksum++;                        *kqswnal_tunables.kqn_inject_csum_error = 0;                }#endif                nob += payload_nob;        }                ktx->ktx_port = (nob <= KQSW_SMALLMSG) ?                        EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;        rc = kqswnal_launch (ktx); out:        CDEBUG(rc == 0 ? D_NET : D_NETERROR, "%s %d bytes to %s%s: rc %d\n",               routing ? (rc == 0 ? "Routed" : "Failed to route") :                         (rc == 0 ? "Sent" : "Failed to send"),               nob, libcfs_nid2str(target.nid),               target_is_router ? "(router)" : "", rc);        if (rc != 0) {                lnet_msg_t *repmsg = (lnet_msg_t *)ktx->ktx_args[2];                int         state = ktx->ktx_state;                                kqswnal_put_idle_tx (ktx);                if (state == KTX_GETTING && repmsg != NULL) {                        /* We committed to reply, but there was a problem                         * launching the GET.  We can't avoid delivering a                         * REPLY event since we committed above, so we                         * pretend the GET succeeded but the REPLY                         * failed. */                        rc = 0;                        lnet_finalize (kqswnal_data.kqn_ni, lntmsg, 0);                        lnet_finalize (kqswnal_data.kqn_ni, repmsg, -EIO);                }                        }                atomic_dec(&kqswnal_data.kqn_pending_txs);        return (rc == 0 ? 0 : -EIO);}voidkqswnal_requeue_rx (kqswnal_rx_t *krx){        LASSERT (atomic_read(&krx->krx_refcount) == 0);        LASSERT (!krx->krx_rpc_reply_needed);        krx->krx_state = KRX_POSTED;        if (kqswnal_data.kqn_shuttingdown) {                /* free EKC rxd on shutdown */                ep_complete_receive(krx->krx_rxd);        } else {                /* repost receive */                ep_requeue_receive(krx->krx_rxd,                                    kqswnal_rxhandler, krx,                                   &krx->krx_elanbuffer, 0);        }}voidkqswnal_rpc_complete (EP_RXD *rxd){        int           status = ep_rxd_status(rxd);        kqswnal_rx_t *krx    = (kqswnal_rx_t *)ep_rxd_arg(rxd);                CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,               "rxd %p, krx %p, status %d\n", rxd, krx, status);        LASSERT (krx->krx_rxd == rxd);        LASSERT (krx->krx_rpc_reply_needed);                krx->krx_rpc_reply_needed = 0;        kqswnal_requeue_rx (krx);}voidkqswnal_rx_done (kqswnal_rx_t *krx) {        int           rc;        LASSERT (atomic_read(&krx->krx_refcount) == 0);        if (krx->krx_rpc_reply_needed) {                /* We've not completed the peer's RPC yet... */                krx->krx_rpc_reply.msg.magic   = LNET_PROTO_QSW_MAGIC;                krx->krx_rpc_reply.msg.version = QSWLND_PROTO_VERSION;                LASSERT (!in_interrupt());                rc = ep_complete_rpc(krx->krx_rxd,                                      kqswnal_rpc_complete, krx,                                     &krx->krx_rpc_reply.ep_statusblk,                                      NULL, NULL, 0);                if (rc == EP_SUCCESS)                        return;                CERROR("can't complete RPC: %d\n", rc);                krx->krx_rpc_reply_needed = 0;        }        kqswnal_requeue_rx(krx);}        voidkqswnal_parse (kqswnal_rx_t *krx){        lnet_ni_t      *ni = kqswnal_data.kqn_ni;        kqswnal_msg_t  *msg = (kqswnal_msg_t *)page_address(krx->krx_kiov[0].kiov_page);        lnet_nid_t      fromnid = kqswnal_rx_nid(krx);        int             swab;        int             n;        int             i;        int             nob;        int             rc;        LASSERT (atomic_read(&krx->krx_refcount) == 1);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -