⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ptllnd_cb.c

📁 非常经典的一个分布式系统
💻 C
📖 第 1 页 / 共 5 页
字号:
         * peer closes.         *         * Note that the following condition handles this case, where it         * actually increases the extra lazy credit counter. */        if (nasync <= peer->plp_extra_lazy_credits) {                peer->plp_extra_lazy_credits -= nasync;                return 0;        }        LASSERT (nasync > 0);        nasync -= peer->plp_extra_lazy_credits;        peer->plp_extra_lazy_credits = 0;                rc = ptllnd_size_buffers(ni, nasync);        if (rc == 0) {                peer->plp_lazy_credits += nasync;                peer->plp_outstanding_credits += nasync;        }        return rc;}__u32ptllnd_cksum (void *ptr, int nob){        char  *c  = ptr;        __u32  sum = 0;        while (nob-- > 0)                sum = ((sum << 1) | (sum >> 31)) + *c++;        /* ensure I don't return 0 (== no checksum) */        return (sum == 0) ? 1 : sum;}ptllnd_tx_t *ptllnd_new_tx(ptllnd_peer_t *peer, int type, int payload_nob){        lnet_ni_t   *ni = peer->plp_ni;        ptllnd_ni_t *plni = ni->ni_data;        ptllnd_tx_t *tx;        int          msgsize;        CDEBUG(D_NET, "peer=%p type=%d payload=%d\n", peer, type, payload_nob);        switch (type) {        default:                LBUG();        case PTLLND_RDMA_WRITE:        case PTLLND_RDMA_READ:                LASSERT (payload_nob == 0);                msgsize = 0;                break;        case PTLLND_MSG_TYPE_PUT:        case PTLLND_MSG_TYPE_GET:                LASSERT (payload_nob == 0);                msgsize = offsetof(kptl_msg_t, ptlm_u) +                           sizeof(kptl_rdma_msg_t);                break;        case PTLLND_MSG_TYPE_IMMEDIATE:                msgsize = offsetof(kptl_msg_t,                                   ptlm_u.immediate.kptlim_payload[payload_nob]);                break;        case PTLLND_MSG_TYPE_NOOP:                LASSERT (payload_nob == 0);                msgsize = offsetof(kptl_msg_t, ptlm_u);                break;        case PTLLND_MSG_TYPE_HELLO:                LASSERT (payload_nob == 0);                msgsize = offsetof(kptl_msg_t, ptlm_u) +                          sizeof(kptl_hello_msg_t);                break;        }        msgsize = (msgsize + 7) & ~7;        LASSERT (msgsize <= peer->plp_max_msg_size);        LIBCFS_ALLOC(tx, offsetof(ptllnd_tx_t, tx_msg) + msgsize);        if (tx == NULL) {                CERROR("Can't allocate msg type %d for %s\n",                       type, libcfs_id2str(peer->plp_id));                return NULL;        }        CFS_INIT_LIST_HEAD(&tx->tx_list);        tx->tx_peer = peer;        tx->tx_type = type;        tx->tx_lnetmsg = tx->tx_lnetreplymsg = NULL;        tx->tx_niov = 0;        tx->tx_iov = NULL;        tx->tx_reqmdh = PTL_INVALID_HANDLE;        tx->tx_bulkmdh = PTL_INVALID_HANDLE;        tx->tx_msgsize = msgsize;        tx->tx_completing = 0;        tx->tx_status = 0;        memset(&tx->tx_bulk_posted, 0, sizeof(tx->tx_bulk_posted));        memset(&tx->tx_bulk_done, 0, sizeof(tx->tx_bulk_done));        memset(&tx->tx_req_posted, 0, sizeof(tx->tx_req_posted));        memset(&tx->tx_req_done, 0, sizeof(tx->tx_req_done));        if (msgsize != 0) {                tx->tx_msg.ptlm_magic = PTLLND_MSG_MAGIC;                tx->tx_msg.ptlm_version = PTLLND_MSG_VERSION;                tx->tx_msg.ptlm_type = type;                tx->tx_msg.ptlm_credits = 0;                tx->tx_msg.ptlm_nob = msgsize;                tx->tx_msg.ptlm_cksum = 0;                tx->tx_msg.ptlm_srcnid = ni->ni_nid;                tx->tx_msg.ptlm_srcstamp = plni->plni_stamp;                tx->tx_msg.ptlm_dstnid = peer->plp_id.nid;                tx->tx_msg.ptlm_dststamp = peer->plp_stamp;                tx->tx_msg.ptlm_srcpid = the_lnet.ln_pid;                tx->tx_msg.ptlm_dstpid = peer->plp_id.pid;        }        ptllnd_peer_addref(peer);        plni->plni_ntxs++;        CDEBUG(D_NET, "tx=%p\n",tx);        return tx;}voidptllnd_abort_tx(ptllnd_tx_t *tx, ptl_handle_md_t *mdh){        ptllnd_peer_t   *peer = tx->tx_peer;        lnet_ni_t       *ni = peer->plp_ni;        int              rc;        time_t           start = cfs_time_current_sec();        ptllnd_ni_t     *plni = ni->ni_data;        int              w = plni->plni_long_wait;        while (!PtlHandleIsEqual(*mdh, PTL_INVALID_HANDLE)) {                rc = PtlMDUnlink(*mdh);#ifndef LUSTRE_PORTALS_UNLINK_SEMANTICS                if (rc == PTL_OK) /* unlink successful => no unlinked event */                        return;                LASSERT (rc == PTL_MD_IN_USE);#endif                if (w > 0 && cfs_time_current_sec() > start + w/1000) {                        CWARN("Waited %ds to abort tx to %s\n",                              (int)(cfs_time_current_sec() - start),                              libcfs_id2str(peer->plp_id));                        w *= 2;                }                /* Wait for ptllnd_tx_event() to invalidate */                ptllnd_wait(ni, w);        }}voidptllnd_cull_tx_history(ptllnd_ni_t *plni){        int max = plni->plni_max_tx_history;        while (plni->plni_ntx_history > max) {                ptllnd_tx_t *tx = list_entry(plni->plni_tx_history.next,                                              ptllnd_tx_t, tx_list);                list_del(&tx->tx_list);                ptllnd_peer_decref(tx->tx_peer);                LIBCFS_FREE(tx, offsetof(ptllnd_tx_t, tx_msg) + tx->tx_msgsize);                LASSERT (plni->plni_ntxs > 0);                plni->plni_ntxs--;                plni->plni_ntx_history--;        }}voidptllnd_tx_done(ptllnd_tx_t *tx){        ptllnd_peer_t   *peer = tx->tx_peer;        lnet_ni_t       *ni = peer->plp_ni;        ptllnd_ni_t     *plni = ni->ni_data;        /* CAVEAT EMPTOR: If this tx is being aborted, I'll continue to get         * events for this tx until it's unlinked.  So I set tx_completing to         * flag the tx is getting handled */        if (tx->tx_completing)                return;        tx->tx_completing = 1;        if (!list_empty(&tx->tx_list))                list_del_init(&tx->tx_list);        if (tx->tx_status != 0) {                if (plni->plni_debug) {                        CERROR("Completing tx for %s with error %d\n",                               libcfs_id2str(peer->plp_id), tx->tx_status);                        ptllnd_debug_tx(tx);                }                ptllnd_close_peer(peer, tx->tx_status);        }                ptllnd_abort_tx(tx, &tx->tx_reqmdh);        ptllnd_abort_tx(tx, &tx->tx_bulkmdh);        if (tx->tx_niov > 0) {                LIBCFS_FREE(tx->tx_iov, tx->tx_niov * sizeof(*tx->tx_iov));                tx->tx_niov = 0;        }        if (tx->tx_lnetreplymsg != NULL) {                LASSERT (tx->tx_type == PTLLND_MSG_TYPE_GET);                LASSERT (tx->tx_lnetmsg != NULL);                /* Simulate GET success always  */                lnet_finalize(ni, tx->tx_lnetmsg, 0);                CDEBUG(D_NET, "lnet_finalize(tx_lnetreplymsg=%p)\n",tx->tx_lnetreplymsg);                lnet_finalize(ni, tx->tx_lnetreplymsg, tx->tx_status);        } else if (tx->tx_lnetmsg != NULL) {                lnet_finalize(ni, tx->tx_lnetmsg, tx->tx_status);        }        plni->plni_ntx_history++;        list_add_tail(&tx->tx_list, &plni->plni_tx_history);                ptllnd_cull_tx_history(plni);}intptllnd_set_txiov(ptllnd_tx_t *tx,                 unsigned int niov, struct iovec *iov,                 unsigned int offset, unsigned int len){        ptl_md_iovec_t *piov;        int             npiov;        if (len == 0) {                tx->tx_niov = 0;                return 0;        }        /*         * Remove iovec's at the beginning that         * are skipped because of the offset.         * Adjust the offset accordingly         */        for (;;) {                LASSERT (niov > 0);                if (offset < iov->iov_len)                        break;                offset -= iov->iov_len;                niov--;                iov++;        }        for (;;) {                int temp_offset = offset;                int resid = len;                LIBCFS_ALLOC(piov, niov * sizeof(*piov));                if (piov == NULL)                        return -ENOMEM;                for (npiov = 0;; npiov++) {                        LASSERT (npiov < niov);                        LASSERT (iov->iov_len >= temp_offset);                        piov[npiov].iov_base = iov[npiov].iov_base + temp_offset;                        piov[npiov].iov_len = iov[npiov].iov_len - temp_offset;                                                if (piov[npiov].iov_len >= resid) {                                piov[npiov].iov_len = resid;                                npiov++;                                break;                        }                        resid -= piov[npiov].iov_len;                        temp_offset = 0;                }                if (npiov == niov) {                        tx->tx_niov = niov;                        tx->tx_iov = piov;                        return 0;                }                /* Dang! The piov I allocated was too big and it's a drag to                 * have to maintain separate 'allocated' and 'used' sizes, so                 * I'll just do it again; NB this doesn't happen normally... */                LIBCFS_FREE(piov, niov * sizeof(*piov));                niov = npiov;        }}voidptllnd_set_md_buffer(ptl_md_t *md, ptllnd_tx_t *tx){        unsigned int    niov = tx->tx_niov;        ptl_md_iovec_t *iov = tx->tx_iov;        LASSERT ((md->options & PTL_MD_IOVEC) == 0);        if (niov == 0) {                md->start = NULL;                md->length = 0;        } else if (niov == 1) {                md->start = iov[0].iov_base;                md->length = iov[0].iov_len;        } else {                md->start = iov;                md->length = niov;                md->options |= PTL_MD_IOVEC;        }}intptllnd_post_buffer(ptllnd_buffer_t *buf){        lnet_ni_t        *ni = buf->plb_ni;        ptllnd_ni_t      *plni = ni->ni_data;        ptl_process_id_t  anyid = {                .nid       = PTL_NID_ANY,                .pid       = PTL_PID_ANY};        ptl_md_t          md = {                .start     = buf->plb_buffer,                .length    = plni->plni_buffer_size,                .threshold = PTL_MD_THRESH_INF,                .max_size  = plni->plni_max_msg_size,                .options   = (PTLLND_MD_OPTIONS |                              PTL_MD_OP_PUT | PTL_MD_MAX_SIZE |                               PTL_MD_LOCAL_ALIGN8),                .user_ptr  = ptllnd_obj2eventarg(buf, PTLLND_EVENTARG_TYPE_BUF),                .eq_handle = plni->plni_eqh};        ptl_handle_me_t meh;        int             rc;        LASSERT (!buf->plb_posted);        rc = PtlMEAttach(plni->plni_nih, plni->plni_portal,                         anyid, LNET_MSG_MATCHBITS, 0,                         PTL_UNLINK, PTL_INS_AFTER, &meh);        if (rc != PTL_OK) {                CERROR("PtlMEAttach failed: %s(%d)\n",                       ptllnd_errtype2str(rc), rc);                return -ENOMEM;        }        buf->plb_posted = 1;        plni->plni_nposted_buffers++;        rc = PtlMDAttach(meh, md, LNET_UNLINK, &buf->plb_md);        if (rc == PTL_OK)                return 0;        CERROR("PtlMDAttach failed: %s(%d)\n",               ptllnd_errtype2str(rc), rc);        buf->plb_posted = 0;        plni->plni_nposted_buffers--;        rc = PtlMEUnlink(meh);        LASSERT (rc == PTL_OK);        return -ENOMEM;}void

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -