⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 qswlnd_cb.c

📁 非常经典的一个分布式系统
💻 C
📖 第 1 页 / 共 5 页
字号:
}#if 0static char *hdr_type_string (lnet_hdr_t *hdr){        switch (hdr->type) {        case LNET_MSG_ACK:                return ("ACK");        case LNET_MSG_PUT:                return ("PUT");        case LNET_MSG_GET:                return ("GET");        case LNET_MSG_REPLY:                return ("REPLY");        default:                return ("<UNKNOWN>");        }}static voidkqswnal_cerror_hdr(lnet_hdr_t * hdr){        char *type_str = hdr_type_string (hdr);        CERROR("P3 Header at %p of type %s length %d\n", hdr, type_str,               le32_to_cpu(hdr->payload_length));        CERROR("    From nid/pid "LPU64"/%u\n", le64_to_cpu(hdr->src_nid),               le32_to_cpu(hdr->src_pid));        CERROR("    To nid/pid "LPU64"/%u\n", le64_to_cpu(hdr->dest_nid),               le32_to_cpu(hdr->dest_pid));        switch (le32_to_cpu(hdr->type)) {        case LNET_MSG_PUT:                CERROR("    Ptl index %d, ack md "LPX64"."LPX64", "                       "match bits "LPX64"\n",                       le32_to_cpu(hdr->msg.put.ptl_index),                       hdr->msg.put.ack_wmd.wh_interface_cookie,                       hdr->msg.put.ack_wmd.wh_object_cookie,                       le64_to_cpu(hdr->msg.put.match_bits));                CERROR("    offset %d, hdr data "LPX64"\n",                       le32_to_cpu(hdr->msg.put.offset),                       hdr->msg.put.hdr_data);                break;        case LNET_MSG_GET:                CERROR("    Ptl index %d, return md "LPX64"."LPX64", "                       "match bits "LPX64"\n",                       le32_to_cpu(hdr->msg.get.ptl_index),                       hdr->msg.get.return_wmd.wh_interface_cookie,                       hdr->msg.get.return_wmd.wh_object_cookie,                       hdr->msg.get.match_bits);                CERROR("    Length %d, src offset %d\n",                       le32_to_cpu(hdr->msg.get.sink_length),                       le32_to_cpu(hdr->msg.get.src_offset));                break;        case LNET_MSG_ACK:                CERROR("    dst md "LPX64"."LPX64", manipulated length %d\n",                       hdr->msg.ack.dst_wmd.wh_interface_cookie,                       hdr->msg.ack.dst_wmd.wh_object_cookie,                       le32_to_cpu(hdr->msg.ack.mlength));                break;        case LNET_MSG_REPLY:                CERROR("    dst md "LPX64"."LPX64"\n",                       hdr->msg.reply.dst_wmd.wh_interface_cookie,                       hdr->msg.reply.dst_wmd.wh_object_cookie);        }}                               /* end of print_hdr() */#endifintkqswnal_check_rdma (int nlfrag, EP_NMD *lfrag,                    int nrfrag, EP_NMD *rfrag){        int  i;        if (nlfrag != nrfrag) {                CERROR("Can't cope with unequal # frags: %d local %d remote\n",                       nlfrag, nrfrag);                return (-EINVAL);        }                for (i = 0; i < nlfrag; i++)                if (lfrag[i].nmd_len != rfrag[i].nmd_len) {                        CERROR("Can't cope with unequal frags %d(%d):"                               " %d local %d remote\n",                               i, nlfrag, lfrag[i].nmd_len, rfrag[i].nmd_len);                        return (-EINVAL);                }                return (0);}kqswnal_remotemd_t *kqswnal_get_portalscompat_rmd (kqswnal_rx_t *krx){        /* Check that the RMD sent after the "raw" LNET header in a         * portals-compatible QSWLND message is OK */        char               *buffer = (char *)page_address(krx->krx_kiov[0].kiov_page);        kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + sizeof(lnet_hdr_t));        /* Note RDMA addresses are sent in native endian-ness in the "old"         * portals protocol so no swabbing... */        if (buffer + krx->krx_nob < (char *)(rmd + 1)) {                /* msg too small to discover rmd size */                CERROR ("Incoming message [%d] too small for RMD (%d needed)\n",                        krx->krx_nob, (int)(((char *)(rmd + 1)) - buffer));                return (NULL);        }        if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) {                /* rmd doesn't fit in the incoming message */                CERROR ("Incoming message [%d] too small for RMD[%d] (%d needed)\n",                        krx->krx_nob, rmd->kqrmd_nfrag,                        (int)(((char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) - buffer));                return (NULL);        }        return (rmd);}voidkqswnal_rdma_store_complete (EP_RXD *rxd) {        int           status = ep_rxd_status(rxd);        kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);        kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];                CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,               "rxd %p, ktx %p, status %d\n", rxd, ktx, status);        LASSERT (ktx->ktx_state == KTX_RDMA_STORE);        LASSERT (krx->krx_rxd == rxd);        LASSERT (krx->krx_rpc_reply_needed);        krx->krx_rpc_reply_needed = 0;        kqswnal_rx_decref (krx);        /* free ktx & finalize() its lnet_msg_t */        kqswnal_tx_done(ktx, (status == EP_SUCCESS) ? 0 : -ECONNABORTED);}voidkqswnal_rdma_fetch_complete (EP_RXD *rxd) {        /* Completed fetching the PUT/REPLY data */        int           status = ep_rxd_status(rxd);        kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);        kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];                CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,               "rxd %p, ktx %p, status %d\n", rxd, ktx, status);        LASSERT (ktx->ktx_state == KTX_RDMA_FETCH);        LASSERT (krx->krx_rxd == rxd);        /* RPC completes with failure by default */        LASSERT (krx->krx_rpc_reply_needed);        LASSERT (krx->krx_rpc_reply.msg.status != 0);        if (status == EP_SUCCESS) {                krx->krx_rpc_reply.msg.status = 0;                status = 0;        } else {                /* Abandon RPC since get failed */                krx->krx_rpc_reply_needed = 0;                status = -ECONNABORTED;        }        /* krx gets decref'd in kqswnal_tx_done_in_thread_context() */        LASSERT (krx->krx_state == KRX_PARSE);        krx->krx_state = KRX_COMPLETING;        /* free ktx & finalize() its lnet_msg_t */        kqswnal_tx_done(ktx, status);}intkqswnal_rdma (kqswnal_rx_t *krx, lnet_msg_t *lntmsg,              int type, kqswnal_remotemd_t *rmd,              unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,              unsigned int offset, unsigned int len){        kqswnal_tx_t       *ktx;        int                 eprc;        int                 rc;        /* Not both mapped and paged payload */        LASSERT (iov == NULL || kiov == NULL);        /* RPC completes with failure by default */        LASSERT (krx->krx_rpc_reply_needed);        LASSERT (krx->krx_rpc_reply.msg.status != 0);        if (len == 0) {                /* data got truncated to nothing. */                lnet_finalize(kqswnal_data.kqn_ni, lntmsg, 0);                /* Let kqswnal_rx_done() complete the RPC with success */                krx->krx_rpc_reply.msg.status = 0;                return (0);        }                /* NB I'm using 'ktx' just to map the local RDMA buffers; I'm not           actually sending a portals message with it */        ktx = kqswnal_get_idle_tx();        if (ktx == NULL) {                CERROR ("Can't get txd for RDMA with %s\n",                        libcfs_nid2str(kqswnal_rx_nid(krx)));                return (-ENOMEM);        }        ktx->ktx_state   = type;        ktx->ktx_nid     = kqswnal_rx_nid(krx);        ktx->ktx_args[0] = krx;        ktx->ktx_args[1] = lntmsg;        LASSERT (atomic_read(&krx->krx_refcount) > 0);        /* Take an extra ref for the completion callback */        atomic_inc(&krx->krx_refcount);        /* Map on the rail the RPC prefers */        ktx->ktx_rail = ep_rcvr_prefrail(krx->krx_eprx,                                         ep_rxd_railmask(krx->krx_rxd));        /* Start mapping at offset 0 (we're not mapping any headers) */        ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;                if (kiov != NULL)                rc = kqswnal_map_tx_kiov(ktx, offset, len, niov, kiov);        else                rc = kqswnal_map_tx_iov(ktx, offset, len, niov, iov);        if (rc != 0) {                CERROR ("Can't map local RDMA data: %d\n", rc);                goto out;        }        rc = kqswnal_check_rdma (ktx->ktx_nfrag, ktx->ktx_frags,                                 rmd->kqrmd_nfrag, rmd->kqrmd_frag);        if (rc != 0) {                CERROR ("Incompatible RDMA descriptors\n");                goto out;        }        switch (type) {        default:                LBUG();                        case KTX_RDMA_STORE:                krx->krx_rpc_reply.msg.status    = 0;                krx->krx_rpc_reply.msg.magic     = LNET_PROTO_QSW_MAGIC;                krx->krx_rpc_reply.msg.version   = QSWLND_PROTO_VERSION;                krx->krx_rpc_reply.msg.u.get.len = len;#if KQSW_CKSUM                krx->krx_rpc_reply.msg.u.get.cksum = (kiov != NULL) ?                            kqswnal_csum_kiov(~0, offset, len, niov, kiov) :                            kqswnal_csum_iov(~0, offset, len, niov, iov);                if (*kqswnal_tunables.kqn_inject_csum_error == 4) {                        krx->krx_rpc_reply.msg.u.get.cksum++;                        *kqswnal_tunables.kqn_inject_csum_error = 0;                }#endif                eprc = ep_complete_rpc(krx->krx_rxd,                                        kqswnal_rdma_store_complete, ktx,                                        &krx->krx_rpc_reply.ep_statusblk,                                        ktx->ktx_frags, rmd->kqrmd_frag,                                        rmd->kqrmd_nfrag);                if (eprc != EP_SUCCESS) {                        CERROR("can't complete RPC: %d\n", eprc);                        /* don't re-attempt RPC completion */                        krx->krx_rpc_reply_needed = 0;                        rc = -ECONNABORTED;                }                break;                        case KTX_RDMA_FETCH:                eprc = ep_rpc_get (krx->krx_rxd,                                    kqswnal_rdma_fetch_complete, ktx,                                   rmd->kqrmd_frag, ktx->ktx_frags, ktx->ktx_nfrag);                if (eprc != EP_SUCCESS) {                        CERROR("ep_rpc_get failed: %d\n", eprc);                        /* Don't attempt RPC completion:                          * EKC nuked it when the get failed */                        krx->krx_rpc_reply_needed = 0;                        rc = -ECONNABORTED;                }                break;        } out:        if (rc != 0) {                kqswnal_rx_decref(krx);                 /* drop callback's ref */                kqswnal_put_idle_tx (ktx);        }        atomic_dec(&kqswnal_data.kqn_pending_txs);        return (rc);}intkqswnal_send (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg){        lnet_hdr_t       *hdr = &lntmsg->msg_hdr;        int               type = lntmsg->msg_type;        lnet_process_id_t target = lntmsg->msg_target;        int               target_is_router = lntmsg->msg_target_is_router;        int               routing = lntmsg->msg_routing;        unsigned int      payload_niov = lntmsg->msg_niov;        struct iovec     *payload_iov = lntmsg->msg_iov;        lnet_kiov_t      *payload_kiov = lntmsg->msg_kiov;        unsigned int      payload_offset = lntmsg->msg_offset;        unsigned int      payload_nob = lntmsg->msg_len;        int               nob;        kqswnal_tx_t     *ktx;        int               rc;        /* NB 1. hdr is in network byte order */        /*    2. 'private' depends on the message type */                CDEBUG(D_NET, "sending %u bytes in %d frags to %s\n",               payload_nob, payload_niov, libcfs_id2str(target));        LASSERT (payload_nob == 0 || payload_niov > 0);        LASSERT (payload_niov <= LNET_MAX_IOV);        /* It must be OK to kmap() if required */        LASSERT (payload_kiov == NULL || !in_interrupt ());        /* payload is either all vaddrs or all pages */        LASSERT (!(payload_kiov != NULL && payload_iov != NULL));        if (kqswnal_nid2elanid (target.nid) < 0) {                CERROR("%s not in my cluster\n", libcfs_nid2str(target.nid));                return -EIO;        }        /* I may not block for a transmit descriptor if I might block the         * router, receiver, or an interrupt handler. */        ktx = kqswnal_get_idle_tx();

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -