⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ptllnd_cb.c

📁 非常经典的一个分布式系统
💻 C
📖 第 1 页 / 共 5 页
字号:
ptllnd_check_sends(ptllnd_peer_t *peer){        lnet_ni_t      *ni = peer->plp_ni;        ptllnd_ni_t    *plni = ni->ni_data;        ptllnd_tx_t    *tx;        ptl_md_t        md;        ptl_handle_md_t mdh;        int             rc;        CDEBUG(D_NET, "%s: [%d/%d+%d(%d)\n",               libcfs_id2str(peer->plp_id), peer->plp_credits,               peer->plp_outstanding_credits, peer->plp_sent_credits,               plni->plni_peer_credits + peer->plp_lazy_credits);        if (list_empty(&peer->plp_txq) &&            peer->plp_outstanding_credits >= PTLLND_CREDIT_HIGHWATER(plni) &&            peer->plp_credits != 0) {                tx = ptllnd_new_tx(peer, PTLLND_MSG_TYPE_NOOP, 0);                CDEBUG(D_NET, "NOOP tx=%p\n",tx);                if (tx == NULL) {                        CERROR("Can't return credits to %s\n",                               libcfs_id2str(peer->plp_id));                } else {                        ptllnd_set_tx_deadline(tx);                        list_add_tail(&tx->tx_list, &peer->plp_txq);                }        }        while (!list_empty(&peer->plp_txq)) {                tx = list_entry(peer->plp_txq.next, ptllnd_tx_t, tx_list);                LASSERT (tx->tx_msgsize > 0);                LASSERT (peer->plp_outstanding_credits >= 0);                LASSERT (peer->plp_sent_credits >= 0);                LASSERT (peer->plp_outstanding_credits + peer->plp_sent_credits                         <= plni->plni_peer_credits + peer->plp_lazy_credits);                LASSERT (peer->plp_credits >= 0);                if (peer->plp_credits == 0) {   /* no credits */                        PTLLND_HISTORY("%s[%d/%d+%d(%d)]: no creds for %p",                                       libcfs_id2str(peer->plp_id),                                       peer->plp_credits,                                       peer->plp_outstanding_credits,                                       peer->plp_sent_credits,                                       plni->plni_peer_credits +                                       peer->plp_lazy_credits, tx);                        break;                }                                if (peer->plp_credits == 1 &&   /* last credit reserved for */                    peer->plp_outstanding_credits == 0) { /* returning credits */                        PTLLND_HISTORY("%s[%d/%d+%d(%d)]: too few creds for %p",                                       libcfs_id2str(peer->plp_id),                                       peer->plp_credits,                                       peer->plp_outstanding_credits,                                       peer->plp_sent_credits,                                       plni->plni_peer_credits +                                       peer->plp_lazy_credits, tx);                        break;                }                                list_del(&tx->tx_list);                list_add_tail(&tx->tx_list, &peer->plp_activeq);                CDEBUG(D_NET, "Sending at TX=%p type=%s (%d)\n",tx,                        ptllnd_msgtype2str(tx->tx_type),tx->tx_type);                if (tx->tx_type == PTLLND_MSG_TYPE_NOOP &&                    (!list_empty(&peer->plp_txq) ||                     peer->plp_outstanding_credits <                     PTLLND_CREDIT_HIGHWATER(plni))) {                        /* redundant NOOP */                        ptllnd_tx_done(tx);                        continue;                }                /* Set stamp at the last minute; on a new peer, I don't know it                 * until I receive the HELLO back */                tx->tx_msg.ptlm_dststamp = peer->plp_stamp;                /*                 * Return all the credits we have                 */                tx->tx_msg.ptlm_credits = peer->plp_outstanding_credits;                peer->plp_sent_credits += peer->plp_outstanding_credits;                peer->plp_outstanding_credits = 0;                /*                 * One less credit                 */                peer->plp_credits--;                if (plni->plni_checksum)                        tx->tx_msg.ptlm_cksum =                                 ptllnd_cksum(&tx->tx_msg,                                             offsetof(kptl_msg_t, ptlm_u));                md.user_ptr = ptllnd_obj2eventarg(tx, PTLLND_EVENTARG_TYPE_TX);                md.eq_handle = plni->plni_eqh;                md.threshold = 1;                md.options = PTLLND_MD_OPTIONS;                md.start = &tx->tx_msg;                md.length = tx->tx_msgsize;                rc = PtlMDBind(plni->plni_nih, md, LNET_UNLINK, &mdh);                if (rc != PTL_OK) {                        CERROR("PtlMDBind for %s failed: %s(%d)\n",                               libcfs_id2str(peer->plp_id),                               ptllnd_errtype2str(rc), rc);                        tx->tx_status = -EIO;                        ptllnd_tx_done(tx);                        break;                }                LASSERT (tx->tx_type != PTLLND_RDMA_WRITE &&                         tx->tx_type != PTLLND_RDMA_READ);                                tx->tx_reqmdh = mdh;                gettimeofday(&tx->tx_req_posted, NULL);                PTLLND_HISTORY("%s[%d/%d+%d(%d)]: %s %p c %d",                               libcfs_id2str(peer->plp_id),                               peer->plp_credits,                               peer->plp_outstanding_credits,                               peer->plp_sent_credits,                               plni->plni_peer_credits +                               peer->plp_lazy_credits,                               ptllnd_msgtype2str(tx->tx_type), tx,                               tx->tx_msg.ptlm_credits);                rc = PtlPut(mdh, PTL_NOACK_REQ, peer->plp_ptlid,                            plni->plni_portal, 0, LNET_MSG_MATCHBITS, 0, 0);                if (rc != PTL_OK) {                        CERROR("PtlPut for %s failed: %s(%d)\n",                               libcfs_id2str(peer->plp_id),                               ptllnd_errtype2str(rc), rc);                        tx->tx_status = -EIO;                        ptllnd_tx_done(tx);                        break;                }        }}intptllnd_passive_rdma(ptllnd_peer_t *peer, int type, lnet_msg_t *msg,                    unsigned int niov, struct iovec *iov,                    unsigned int offset, unsigned int len){        lnet_ni_t      *ni = peer->plp_ni;        ptllnd_ni_t    *plni = ni->ni_data;        ptllnd_tx_t    *tx = ptllnd_new_tx(peer, type, 0);        __u64           matchbits;        ptl_md_t        md;        ptl_handle_md_t mdh;        ptl_handle_me_t meh;        int             rc;        int             rc2;        time_t          start;        int             w;        CDEBUG(D_NET, "niov=%d offset=%d len=%d\n",niov,offset,len);        LASSERT (type == PTLLND_MSG_TYPE_GET ||                 type == PTLLND_MSG_TYPE_PUT);        if (tx == NULL) {                CERROR("Can't allocate %s tx for %s\n",                       type == PTLLND_MSG_TYPE_GET ? "GET" : "PUT/REPLY",                       libcfs_id2str(peer->plp_id));                return -ENOMEM;        }        rc = ptllnd_set_txiov(tx, niov, iov, offset, len);        if (rc != 0) {                CERROR ("Can't allocate iov %d for %s\n",                        niov, libcfs_id2str(peer->plp_id));                rc = -ENOMEM;                goto failed;        }        md.user_ptr = ptllnd_obj2eventarg(tx, PTLLND_EVENTARG_TYPE_TX);        md.eq_handle = plni->plni_eqh;        md.threshold = 1;        md.max_size = 0;        md.options = PTLLND_MD_OPTIONS;        if(type == PTLLND_MSG_TYPE_GET)                md.options |= PTL_MD_OP_PUT | PTL_MD_ACK_DISABLE;        else                md.options |= PTL_MD_OP_GET;        ptllnd_set_md_buffer(&md, tx);        start = cfs_time_current_sec();        w = plni->plni_long_wait;        while (!peer->plp_recvd_hello) {        /* wait to validate plp_match */                if (peer->plp_closing) {                        rc = -EIO;                        goto failed;                }                if (w > 0 && cfs_time_current_sec() > start + w/1000) {                        CWARN("Waited %ds to connect to %s\n",                              (int)(cfs_time_current_sec() - start),                              libcfs_id2str(peer->plp_id));                        w *= 2;                }                ptllnd_wait(ni, w);        }        if (peer->plp_match < PTL_RESERVED_MATCHBITS)                peer->plp_match = PTL_RESERVED_MATCHBITS;        matchbits = peer->plp_match++;        rc = PtlMEAttach(plni->plni_nih, plni->plni_portal, peer->plp_ptlid,                         matchbits, 0, PTL_UNLINK, PTL_INS_BEFORE, &meh);        if (rc != PTL_OK) {                CERROR("PtlMEAttach for %s failed: %s(%d)\n",                       libcfs_id2str(peer->plp_id),                       ptllnd_errtype2str(rc), rc);                rc = -EIO;                goto failed;        }        gettimeofday(&tx->tx_bulk_posted, NULL);        rc = PtlMDAttach(meh, md, LNET_UNLINK, &mdh);        if (rc != PTL_OK) {                CERROR("PtlMDAttach for %s failed: %s(%d)\n",                       libcfs_id2str(peer->plp_id),                       ptllnd_errtype2str(rc), rc);                rc2 = PtlMEUnlink(meh);                LASSERT (rc2 == PTL_OK);                rc = -EIO;                goto failed;        }        tx->tx_bulkmdh = mdh;        /*         * We need to set the stamp here because it         * we could have received a HELLO above that set         * peer->plp_stamp         */        tx->tx_msg.ptlm_dststamp = peer->plp_stamp;        tx->tx_msg.ptlm_u.rdma.kptlrm_hdr = msg->msg_hdr;        tx->tx_msg.ptlm_u.rdma.kptlrm_matchbits = matchbits;        if (type == PTLLND_MSG_TYPE_GET) {                tx->tx_lnetreplymsg = lnet_create_reply_msg(ni, msg);                if (tx->tx_lnetreplymsg == NULL) {                        CERROR("Can't create reply for GET to %s\n",                               libcfs_id2str(msg->msg_target));                        rc = -ENOMEM;                        goto failed;                }        }        tx->tx_lnetmsg = msg;        PTLLND_HISTORY("%s[%d/%d+%d(%d)]: post passive %s p %d %p",                       libcfs_id2str(msg->msg_target),                       peer->plp_credits, peer->plp_outstanding_credits,                       peer->plp_sent_credits,                       plni->plni_peer_credits + peer->plp_lazy_credits,                       lnet_msgtyp2str(msg->msg_type),                       (le32_to_cpu(msg->msg_type) == LNET_MSG_PUT) ?                        le32_to_cpu(msg->msg_hdr.msg.put.ptl_index) :                       (le32_to_cpu(msg->msg_type) == LNET_MSG_GET) ?                        le32_to_cpu(msg->msg_hdr.msg.get.ptl_index) : -1,                       tx);        ptllnd_post_tx(tx);        return 0; failed:        ptllnd_tx_done(tx);        return rc;}intptllnd_active_rdma(ptllnd_peer_t *peer, int type,                   lnet_msg_t *msg, __u64 matchbits,                   unsigned int niov, struct iovec *iov,                   unsigned int offset, unsigned int len){        lnet_ni_t       *ni = peer->plp_ni;        ptllnd_ni_t     *plni = ni->ni_data;        ptllnd_tx_t     *tx = ptllnd_new_tx(peer, type, 0);        ptl_md_t         md;        ptl_handle_md_t  mdh;        int              rc;        LASSERT (type == PTLLND_RDMA_READ ||                 type == PTLLND_RDMA_WRITE);        if (tx == NULL) {                CERROR("Can't allocate tx for RDMA %s with %s\n",                       (type == PTLLND_RDMA_WRITE) ? "write" : "read",                       libcfs_id2str(peer->plp_id));                ptllnd_close_peer(peer, -ENOMEM);                return -ENOMEM;        }        rc = ptllnd_set_txiov(tx, niov, iov, offset, len);        if (rc != 0) {                CERROR ("Can't allocate iov %d for %s\n",                        niov, libcfs_id2str(peer->plp_id));                rc = -ENOMEM;                goto failed;        }        md.user_ptr = ptllnd_obj2eventarg(tx, PTLLND_EVENTARG_TYPE_TX);        md.eq_handle = plni->plni_eqh;        md.max_size = 0;        md.options = PTLLND_MD_OPTIONS;        md.threshold = (type == PTLLND_RDMA_READ) ? 2 : 1;        ptllnd_set_md_buffer(&md, tx);        rc = PtlMDBind(plni->plni_nih, md, LNET_UNLINK, &mdh);        if (rc != PTL_OK) {                CERROR("PtlMDBind for %s failed: %s(%d)\n",                       libcfs_id2str(peer->plp_id),                       ptllnd_errtype2str(rc), rc);                rc = -EIO;                goto failed;        }        tx->tx_bulkmdh = mdh;        tx->tx_lnetmsg = msg;        ptllnd_set_tx_deadline(tx);        list_add_tail(&tx->tx_list, &peer->plp_activeq);        gettimeofday(&tx->tx_bulk_posted, NULL);        if (type == PTLLND_RDMA_READ)                rc = PtlGet(mdh, peer->plp_ptlid,                            plni->plni_portal, 0, matchbits, 0);        else                rc = PtlPut(mdh, PTL_NOACK_REQ, peer->plp_ptlid,                            plni->plni_portal, 0, matchbits, 0,                             (msg == NULL) ? PTLLND_RDMA_FAIL : PTLLND_RDMA_OK);        if (rc == PTL_OK)                return 0;        CERROR("Can't initiate RDMA with %s: %s(%d)\n",               libcfs_id2str(peer->plp_id),               ptllnd_errtype2str(rc), rc);        tx->tx_lnetmsg = NULL; failed:        tx->tx_status = rc;        ptllnd_tx_done(tx);    /* this will close peer */        return rc;}intptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *msg){        ptllnd_ni_t    *plni = ni->ni_data;        ptllnd_peer_t  *plp;        ptllnd_tx_t    *tx;        int             nob;        int             rc;        LASSERT (!msg->msg_routing);        LASSERT (msg->msg_kiov == NULL);        LASSERT (msg->msg_niov <= PTL_MD_MAX_IOV); /* !!! */

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -