⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 o2iblnd_cb.c

📁 非常经典的一个分布式系统
💻 C
📖 第 1 页 / 共 5 页
字号:
        rd->rd_addr = 0;        fmr = ib_fmr_pool_map_phys(net->ibn_fmrpool, tx->tx_pages,                                   npages, rd->rd_addr);        if (IS_ERR(fmr)) {                CERROR ("Can't map %d pages: %ld\n", npages, PTR_ERR(fmr));                return PTR_ERR(fmr);        }        /* If rd is not tx_rd, it's going to get sent to a peer, who will need         * the rkey */        rd->rd_key = (rd != tx->tx_rd) ? fmr->fmr->rkey : fmr->fmr->lkey;        rd->rd_nob = nob;        tx->tx_fmr = fmr;        return 0;}intkiblnd_setup_rd_iov (lnet_ni_t *ni, kib_tx_t *tx, kib_rdma_desc_t *rd,                     unsigned int niov, struct iovec *iov, int offset, int nob){        int           resid;        int           fragnob;        struct page  *page;        int           npages;        unsigned long page_offset;        unsigned long vaddr;        LASSERT (nob > 0);        LASSERT (niov > 0);        while (offset >= iov->iov_len) {                offset -= iov->iov_len;                niov--;                iov++;                LASSERT (niov > 0);        }        if (nob > iov->iov_len - offset) {                CERROR ("Can't map multiple vaddr fragments\n");                return (-EMSGSIZE);        }        vaddr = ((unsigned long)iov->iov_base) + offset;        page_offset = vaddr & (PAGE_SIZE - 1);        resid = nob;        npages = 0;        do {                LASSERT (npages < LNET_MAX_IOV);                page = kiblnd_kvaddr_to_page(vaddr);                if (page == NULL) {                        CERROR("Can't find page for %lu\n", vaddr);                        return -EFAULT;                }                tx->tx_pages[npages++] = lnet_page2phys(page);                fragnob = PAGE_SIZE - (vaddr & (PAGE_SIZE - 1));                vaddr += fragnob;                resid -= fragnob;        } while (resid > 0);        return kiblnd_map_tx(ni, tx, rd, npages, page_offset, nob);}intkiblnd_setup_rd_kiov (lnet_ni_t *ni, kib_tx_t *tx, kib_rdma_desc_t *rd,                      int nkiov, lnet_kiov_t *kiov, int offset, int nob){        int            resid;        int            npages;        unsigned long  page_offset;        CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);        LASSERT (nob > 0);        LASSERT (nkiov > 0);        LASSERT (nkiov <= LNET_MAX_IOV);        while (offset >= kiov->kiov_len) {                offset -= kiov->kiov_len;                nkiov--;                kiov++;                LASSERT (nkiov > 0);        }        page_offset = kiov->kiov_offset + offset;        resid = offset + nob;        npages = 0;        do {                LASSERT (npages < LNET_MAX_IOV);                LASSERT (nkiov > 0);                if ((npages > 0 && kiov->kiov_offset != 0) ||                    (resid > kiov->kiov_len &&                     (kiov->kiov_offset + kiov->kiov_len) != PAGE_SIZE)) {                        /* Can't have gaps */                        CERROR ("Can't make payload contiguous in I/O VM:"                                "page %d, offset %d, len %d \n",                                npages, kiov->kiov_offset, kiov->kiov_len);                        return -EINVAL;                }                tx->tx_pages[npages++] = lnet_page2phys(kiov->kiov_page);                resid -= kiov->kiov_len;                kiov++;                nkiov--;        } while (resid > 0);        return kiblnd_map_tx(ni, tx, rd, npages, page_offset, nob);}#endifvoidkiblnd_check_sends (kib_conn_t *conn){        kib_tx_t          *tx;        lnet_ni_t         *ni = conn->ibc_peer->ibp_ni;        int                rc;        int                consume_cred = 0;        struct ib_send_wr *bad_wrq;        int                done;        /* Don't send anything until after the connection is established */        if (conn->ibc_state < IBLND_CONN_ESTABLISHED) {                CDEBUG(D_NET, "%s too soon\n",                       libcfs_nid2str(conn->ibc_peer->ibp_nid));                return;        }        spin_lock(&conn->ibc_lock);        LASSERT (conn->ibc_nsends_posted <=                 *kiblnd_tunables.kib_concurrent_sends);        LASSERT (conn->ibc_reserved_credits >= 0);        while (conn->ibc_reserved_credits > 0 &&               !list_empty(&conn->ibc_tx_queue_rsrvd)) {                tx = list_entry(conn->ibc_tx_queue_rsrvd.next,                                kib_tx_t, tx_list);                list_del(&tx->tx_list);                list_add_tail(&tx->tx_list, &conn->ibc_tx_queue);                conn->ibc_reserved_credits--;        }        if (list_empty(&conn->ibc_tx_queue) &&            list_empty(&conn->ibc_tx_queue_nocred) &&            (conn->ibc_outstanding_credits >= IBLND_CREDIT_HIGHWATER ||             kiblnd_send_keepalive(conn))) {                spin_unlock(&conn->ibc_lock);                tx = kiblnd_get_idle_tx(ni);                if (tx != NULL)                        kiblnd_init_tx_msg(ni, tx, IBLND_MSG_NOOP, 0);                spin_lock(&conn->ibc_lock);                if (tx != NULL)                        kiblnd_queue_tx_locked(tx, conn);        }        for (;;) {                if (!list_empty (&conn->ibc_tx_queue_nocred)) {                        tx = list_entry (conn->ibc_tx_queue_nocred.next,                                          kib_tx_t, tx_list);                        consume_cred = 0;                } else if (!list_empty (&conn->ibc_tx_queue)) {                        tx = list_entry (conn->ibc_tx_queue.next,                                         kib_tx_t, tx_list);                        consume_cred = 1;                } else {                        /* nothing to send right now */                        break;                }                                LASSERT (tx->tx_queued);                /* We rely on this for QP sizing */                LASSERT (tx->tx_nwrq > 0 &&                         tx->tx_nwrq <= 1 + IBLND_MAX_RDMA_FRAGS);                LASSERT (conn->ibc_outstanding_credits >= 0);                LASSERT (conn->ibc_outstanding_credits <= IBLND_MSG_QUEUE_SIZE);                LASSERT (conn->ibc_credits >= 0);                LASSERT (conn->ibc_credits <= IBLND_MSG_QUEUE_SIZE);                if (conn->ibc_nsends_posted ==                     *kiblnd_tunables.kib_concurrent_sends) {                        /* tx completions outstanding... */                        CDEBUG(D_NET, "%s: posted enough\n",                               libcfs_nid2str(conn->ibc_peer->ibp_nid));                        break;                }                if (consume_cred) {                        if (conn->ibc_credits == 0) {   /* no credits */                                CDEBUG(D_NET, "%s: no credits\n",                                       libcfs_nid2str(conn->ibc_peer->ibp_nid));                                break;                        }                        if (conn->ibc_credits == 1 &&   /* last credit reserved for */                            conn->ibc_outstanding_credits == 0) { /* giving back credits */                                CDEBUG(D_NET, "%s: not using last credit\n",                                       libcfs_nid2str(conn->ibc_peer->ibp_nid));                                break;                        }                }                list_del (&tx->tx_list);                tx->tx_queued = 0;                /* NB don't drop ibc_lock before bumping tx_sending */                if (tx->tx_msg->ibm_type == IBLND_MSG_NOOP &&                    (!list_empty(&conn->ibc_tx_queue) ||                     !list_empty(&conn->ibc_tx_queue_nocred) ||                     (conn->ibc_outstanding_credits < IBLND_CREDIT_HIGHWATER &&                      !kiblnd_send_keepalive(conn)))) {                        /* redundant NOOP */                        spin_unlock(&conn->ibc_lock);                        kiblnd_tx_done(ni, tx);                        spin_lock(&conn->ibc_lock);                        CDEBUG(D_NET, "%s: redundant noop\n",                               libcfs_nid2str(conn->ibc_peer->ibp_nid));                        continue;                }                kiblnd_pack_msg(ni, tx->tx_msg, conn->ibc_outstanding_credits,                                conn->ibc_peer->ibp_nid, conn->ibc_incarnation);                conn->ibc_outstanding_credits = 0;                conn->ibc_nsends_posted++;                if (consume_cred)                        conn->ibc_credits--;                /* CAVEAT EMPTOR!  This tx could be the PUT_DONE of an RDMA                 * PUT.  If so, it was first queued here as a PUT_REQ, sent and                 * stashed on ibc_active_txs, matched by an incoming PUT_ACK,                 * and then re-queued here.  It's (just) possible that                 * tx_sending is non-zero if we've not done the tx_complete() from                 * the first send; hence the ++ rather than = below. */                tx->tx_sending++;                list_add (&tx->tx_list, &conn->ibc_active_txs);#if 0                {                        int i;                                                for (i = 0; i < tx->tx_nwrq - 1; i++) {                                LASSERT (tx->tx_wrq[i].opcode == IB_WR_RDMA_WRITE);                                LASSERT (tx->tx_wrq[i].next == &tx->tx_wrq[i+1]);                                LASSERT (tx->tx_wrq[i].sg_list == &tx->tx_sge[i]);                                                        CDEBUG(D_WARNING, "WORK[%d]: RDMA "LPX64                                       " for %d k %x -> "LPX64" k %x\n", i,                                       tx->tx_wrq[i].sg_list->addr,                                       tx->tx_wrq[i].sg_list->length,                                       tx->tx_wrq[i].sg_list->lkey,                                       tx->tx_wrq[i].wr.rdma.remote_addr,                                       tx->tx_wrq[i].wr.rdma.rkey);                        }                                                LASSERT (tx->tx_wrq[i].opcode == IB_WR_SEND);                        LASSERT (tx->tx_wrq[i].next == NULL);                        LASSERT (tx->tx_wrq[i].sg_list == &tx->tx_sge[i]);                                                CDEBUG(D_WARNING, "WORK[%d]: SEND "LPX64" for %d k %x\n", i,                               tx->tx_wrq[i].sg_list->addr,                               tx->tx_wrq[i].sg_list->length,                               tx->tx_wrq[i].sg_list->lkey);                }#endif                           /* I'm still holding ibc_lock! */                if (conn->ibc_state != IBLND_CONN_ESTABLISHED)                        rc = -ECONNABORTED;                else                        rc = ib_post_send(conn->ibc_cmid->qp, tx->tx_wrq, &bad_wrq);                conn->ibc_last_send = jiffies;                if (rc != 0) {                        /* NB credits are transferred in the actual                         * message, which can only be the last work item */                        conn->ibc_outstanding_credits += tx->tx_msg->ibm_credits;                        if (consume_cred)                                conn->ibc_credits++;                        conn->ibc_nsends_posted--;                        tx->tx_status = rc;                        tx->tx_waiting = 0;                        tx->tx_sending--;                        done = (tx->tx_sending == 0);                        if (done)                                list_del (&tx->tx_list);                        spin_unlock(&conn->ibc_lock);                        if (conn->ibc_state == IBLND_CONN_ESTABLISHED)                                CERROR("Error %d posting transmit to %s\n",                                       rc, libcfs_nid2str(conn->ibc_peer->ibp_nid));                        else                                CDEBUG(D_NET, "Error %d posting transmit to %s\n",                                       rc, libcfs_nid2str(conn->ibc_peer->ibp_nid));                        kiblnd_close_conn(conn, rc);                        if (done)                                kiblnd_tx_done(ni, tx);                        return;                }        }        spin_unlock(&conn->ibc_lock);}voidkiblnd_tx_complete (kib_tx_t *tx, int status){        int           failed = (status != IB_WC_SUCCESS);        kib_conn_t   *conn = tx->tx_conn;        int           idle;        LASSERT (tx->tx_sending > 0);        if (failed) {                if (conn->ibc_state == IBLND_CONN_ESTABLISHED)                        CDEBUG(D_NETERROR, "Tx -> %s cookie "LPX64                               "sending %d waiting %d: failed %d\n",                               libcfs_nid2str(conn->ibc_peer->ibp_nid),                               tx->tx_cookie, tx->tx_sending, tx->tx_waiting,                               status);                kiblnd_close_conn(conn, -EIO);        } else {                kiblnd_peer_alive(conn->ibc_peer);        }        spin_lock(&conn->ibc_lock);        /* I could be racing with rdma completion.  Whoever makes 'tx' idle         * gets to free it, which also drops its ref on 'conn'. */        tx->tx_sending--;        conn->ibc_nsends_posted--;        if (failed) {                tx->tx_waiting = 0;             /* don't wait for peer */                tx->tx_status = -EIO;        }        idle = (tx->tx_sending == 0) &&         /* This is the final callback */               !tx->tx_waiting &&               /* Not waiting for peer */               !tx->tx_queued;                  /* Not re-queued (PUT_DONE) */        if (idle)                list_del(&tx->tx_list);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -