⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 o2iblnd.c

📁 非常经典的一个分布式系统
💻 C
📖 第 1 页 / 共 4 页
字号:
                                continue;                        if (index-- > 0)                                continue;                        *nidp = peer->ibp_nid;                        *count = atomic_read(&peer->ibp_refcount);                        read_unlock_irqrestore(&kiblnd_data.kib_global_lock,                                               flags);                        return 0;                }        }        read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);        return -ENOENT;}voidkiblnd_del_peer_locked (kib_peer_t *peer){        struct list_head *ctmp;        struct list_head *cnxt;        kib_conn_t       *conn;        if (list_empty(&peer->ibp_conns)) {                kiblnd_unlink_peer_locked(peer);        } else {                list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {                        conn = list_entry(ctmp, kib_conn_t, ibc_list);                        kiblnd_close_conn_locked(conn, 0);                }                /* NB closing peer's last conn unlinked it. */        }        /* NB peer now unlinked; might even be freed if the peer table had the         * last ref on it. */}intkiblnd_del_peer (lnet_ni_t *ni, lnet_nid_t nid){        CFS_LIST_HEAD     (zombies);        struct list_head  *ptmp;        struct list_head  *pnxt;        kib_peer_t        *peer;        int                lo;        int                hi;        int                i;        unsigned long      flags;        int                rc = -ENOENT;        write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);        if (nid != LNET_NID_ANY) {                lo = hi = kiblnd_nid2peerlist(nid) - kiblnd_data.kib_peers;        } else {                lo = 0;                hi = kiblnd_data.kib_peer_hash_size - 1;        }        for (i = lo; i <= hi; i++) {                list_for_each_safe (ptmp, pnxt, &kiblnd_data.kib_peers[i]) {                        peer = list_entry(ptmp, kib_peer_t, ibp_list);                        LASSERT (peer->ibp_connecting > 0 ||                                 peer->ibp_accepting > 0 ||                                 !list_empty(&peer->ibp_conns));                        if (peer->ibp_ni != ni)                                continue;                        if (!(nid == LNET_NID_ANY || peer->ibp_nid == nid))                                continue;                        if (!list_empty(&peer->ibp_tx_queue)) {                                LASSERT (list_empty(&peer->ibp_conns));                                list_splice_init(&peer->ibp_tx_queue, &zombies);                        }                        kiblnd_del_peer_locked(peer);                        rc = 0;         /* matched something */                }        }        write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);        kiblnd_txlist_done(ni, &zombies, -EIO);        return rc;}kib_conn_t *kiblnd_get_conn_by_idx (lnet_ni_t *ni, int index){        kib_peer_t        *peer;        struct list_head  *ptmp;        kib_conn_t        *conn;        struct list_head  *ctmp;        int                i;        unsigned long      flags;        read_lock_irqsave(&kiblnd_data.kib_global_lock, flags);        for (i = 0; i < kiblnd_data.kib_peer_hash_size; i++) {                list_for_each (ptmp, &kiblnd_data.kib_peers[i]) {                        peer = list_entry(ptmp, kib_peer_t, ibp_list);                        LASSERT (peer->ibp_connecting > 0 ||                                 peer->ibp_accepting > 0 ||                                 !list_empty(&peer->ibp_conns));                        if (peer->ibp_ni != ni)                                continue;                        list_for_each (ctmp, &peer->ibp_conns) {                                if (index-- > 0)                                        continue;                                conn = list_entry(ctmp, kib_conn_t, ibc_list);                                kiblnd_conn_addref(conn);                                read_unlock_irqrestore(&kiblnd_data.kib_global_lock,                                                       flags);                                return conn;                        }                }        }        read_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);        return NULL;}voidkiblnd_debug_rx (kib_rx_t *rx){        CDEBUG(D_CONSOLE, "      %p status %d msg_type %x cred %d\n",               rx, rx->rx_status, rx->rx_msg->ibm_type,               rx->rx_msg->ibm_credits);}voidkiblnd_debug_tx (kib_tx_t *tx){        CDEBUG(D_CONSOLE, "      %p snd %d q %d w %d rc %d dl %lx "               "cookie "LPX64" msg %s%s type %x cred %d\n",               tx, tx->tx_sending, tx->tx_queued, tx->tx_waiting,               tx->tx_status, tx->tx_deadline, tx->tx_cookie,               tx->tx_lntmsg[0] == NULL ? "-" : "!",               tx->tx_lntmsg[1] == NULL ? "-" : "!",               tx->tx_msg->ibm_type, tx->tx_msg->ibm_credits);}voidkiblnd_debug_conn (kib_conn_t *conn){        struct list_head *tmp;        int               i;        spin_lock(&conn->ibc_lock);        CDEBUG(D_CONSOLE, "conn[%d] %p -> %s: \n",               atomic_read(&conn->ibc_refcount), conn,               libcfs_nid2str(conn->ibc_peer->ibp_nid));        CDEBUG(D_CONSOLE, "   state %d nposted %d cred %d o_cred %d r_cred %d\n",               conn->ibc_state, conn->ibc_nsends_posted, conn->ibc_credits,                conn->ibc_outstanding_credits, conn->ibc_reserved_credits);        CDEBUG(D_CONSOLE, "   comms_err %d\n", conn->ibc_comms_error);        CDEBUG(D_CONSOLE, "   early_rxs:\n");        list_for_each(tmp, &conn->ibc_early_rxs)                kiblnd_debug_rx(list_entry(tmp, kib_rx_t, rx_list));        CDEBUG(D_CONSOLE, "   tx_queue_nocred:\n");        list_for_each(tmp, &conn->ibc_tx_queue_nocred)                kiblnd_debug_tx(list_entry(tmp, kib_tx_t, tx_list));        CDEBUG(D_CONSOLE, "   tx_queue_rsrvd:\n");        list_for_each(tmp, &conn->ibc_tx_queue_rsrvd)                kiblnd_debug_tx(list_entry(tmp, kib_tx_t, tx_list));        CDEBUG(D_CONSOLE, "   tx_queue:\n");        list_for_each(tmp, &conn->ibc_tx_queue)                kiblnd_debug_tx(list_entry(tmp, kib_tx_t, tx_list));        CDEBUG(D_CONSOLE, "   active_txs:\n");        list_for_each(tmp, &conn->ibc_active_txs)                kiblnd_debug_tx(list_entry(tmp, kib_tx_t, tx_list));        CDEBUG(D_CONSOLE, "   rxs:\n");        for (i = 0; i < IBLND_RX_MSGS; i++)                kiblnd_debug_rx(&conn->ibc_rxs[i]);        spin_unlock(&conn->ibc_lock);}kib_conn_t *kiblnd_create_conn (kib_peer_t *peer, struct rdma_cm_id *cmid, int state){        /* CAVEAT EMPTOR:         * If the new conn is created successfully it takes over the caller's         * ref on 'peer'.  It also "owns" 'cmid' and destroys it when it itself         * is destroyed.  On failure, the caller's ref on 'peer' remains and         * she must dispose of 'cmid'.  (Actually I'd block forever if I tried         * to destroy 'cmid' here since I'm called from the CM which still has         * its ref on 'cmid'). */        kib_conn_t             *conn;        kib_net_t              *net = peer->ibp_ni->ni_data;        int                     i;        int                     page_offset;        int                     ipage;        int                     rc;        struct ib_cq           *cq;        struct ib_qp_init_attr *init_qp_attr;        unsigned long           flags;        LASSERT (net != NULL);        LASSERT (!in_interrupt());        LIBCFS_ALLOC(init_qp_attr, sizeof(*init_qp_attr));        if (init_qp_attr == NULL) {                CERROR("Can't allocate qp_attr for %s\n",                       libcfs_nid2str(peer->ibp_nid));                goto failed_0;        }        LIBCFS_ALLOC(conn, sizeof(*conn));        if (conn == NULL) {                CERROR("Can't allocate connection for %s\n",                       libcfs_nid2str(peer->ibp_nid));                goto failed_1;        }        memset(conn, 0, sizeof(*conn)); /* zero flags, NULL pointers etc... */        conn->ibc_state = IBLND_CONN_INIT;        conn->ibc_peer = peer;                  /* I take the caller's ref */        cmid->context = conn;                   /* for future CM callbacks */        conn->ibc_cmid = cmid;        INIT_LIST_HEAD(&conn->ibc_early_rxs);        INIT_LIST_HEAD(&conn->ibc_tx_queue);        INIT_LIST_HEAD(&conn->ibc_tx_queue_rsrvd);        INIT_LIST_HEAD(&conn->ibc_tx_queue_nocred);        INIT_LIST_HEAD(&conn->ibc_active_txs);        spin_lock_init(&conn->ibc_lock);        LIBCFS_ALLOC(conn->ibc_connvars, sizeof(*conn->ibc_connvars));        if (conn->ibc_connvars == NULL) {                CERROR("Can't allocate in-progress connection state\n");                goto failed_2;        }        memset(conn->ibc_connvars, 0, sizeof(*conn->ibc_connvars));        LIBCFS_ALLOC(conn->ibc_rxs, IBLND_RX_MSGS * sizeof(kib_rx_t));        if (conn->ibc_rxs == NULL) {                CERROR("Cannot allocate RX buffers\n");                goto failed_2;        }        memset(conn->ibc_rxs, 0, IBLND_RX_MSGS * sizeof(kib_rx_t));        rc = kiblnd_alloc_pages(&conn->ibc_rx_pages, IBLND_RX_MSG_PAGES);        if (rc != 0)                goto failed_2;        for (i = ipage = page_offset = 0; i < IBLND_RX_MSGS; i++) {                struct page *page = conn->ibc_rx_pages->ibp_pages[ipage];                kib_rx_t    *rx = &conn->ibc_rxs[i];                rx->rx_conn = conn;                rx->rx_msg = (kib_msg_t *)(((char *)page_address(page)) +                                           page_offset);                rx->rx_msgaddr = kiblnd_dma_map_single(cmid->device,                                                       rx->rx_msg, IBLND_MSG_SIZE,                                                       DMA_FROM_DEVICE);                KIBLND_UNMAP_ADDR_SET(rx, rx_msgunmap, rx->rx_msgaddr);                CDEBUG(D_NET,"rx %d: %p "LPX64"("LPX64")\n",                       i, rx->rx_msg, rx->rx_msgaddr,                       lnet_page2phys(page) + page_offset);                page_offset += IBLND_MSG_SIZE;                LASSERT (page_offset <= PAGE_SIZE);                if (page_offset == PAGE_SIZE) {                        page_offset = 0;                        ipage++;                        LASSERT (ipage <= IBLND_RX_MSG_PAGES);                }        }        cq = ib_create_cq(cmid->device,                          kiblnd_cq_completion, kiblnd_cq_event, conn,                          IBLND_CQ_ENTRIES());        if (!IS_ERR(cq)) {                conn->ibc_cq = cq;        } else {                CERROR("Can't create CQ: %ld\n", PTR_ERR(cq));                goto failed_2;        }        rc = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);        if (rc != 0) {                CERROR("Can't request completion notificiation: %d\n", rc);                goto failed_2;        }                memset(init_qp_attr, 0, sizeof(*init_qp_attr));        init_qp_attr->event_handler = kiblnd_qp_event;        init_qp_attr->qp_context = conn;        init_qp_attr->cap.max_send_wr = (*kiblnd_tunables.kib_concurrent_sends) *                                        (1 + IBLND_MAX_RDMA_FRAGS);        init_qp_attr->cap.max_recv_wr = IBLND_RX_MSGS;        init_qp_attr->cap.max_send_sge = 1;        init_qp_attr->cap.max_recv_sge = 1;        init_qp_attr->sq_sig_type = IB_SIGNAL_REQ_WR;        init_qp_attr->qp_type = IB_QPT_RC;        init_qp_attr->send_cq = cq;        init_qp_attr->recv_cq = cq;        rc = 0;        write_lock_irqsave(&kiblnd_data.kib_global_lock, flags);        switch (*kiblnd_tunables.kib_ib_mtu) {        default:                rc = *kiblnd_tunables.kib_ib_mtu;                /* fall through to... */        case 0: /* set tunable to the default                 * CAVEAT EMPTOR! this assumes the default is one of the MTUs                 * below, otherwise we'll WARN on the next QP create */                *kiblnd_tunables.kib_ib_mtu =                        ib_mtu_enum_to_int(cmid->route.path_rec->mtu);                break;        case 256:                cmid->route.path_rec->mtu = IB_MTU_256;                break;        case 512:                cmid->route.path_rec->mtu = IB_MTU_512;                break;        case 1024:                cmid->route.path_rec->mtu = IB_MTU_1024;                break;        case 2048:                cmid->route.path_rec->mtu = IB_MTU_2048;                break;        case 4096:                cmid->route.path_rec->mtu = IB_MTU_4096;                break;        }        write_unlock_irqrestore(&kiblnd_data.kib_global_lock, flags);        if (rc != 0)                CWARN("Invalid IB MTU value %d, using default value %d\n",                      rc, *kiblnd_tunables.kib_ib_mtu);                                        rc = rdma_create_qp(cmid, net->ibn_dev->ibd_pd, init_qp_attr);        if (rc != 0) {                CERROR("Can't create QP: %d\n", rc);                goto failed_2;        }        LIBCFS_FREE(init_qp_attr, sizeof(*init_qp_attr));        /* 1 ref for caller and each rxmsg */        atomic_set(&conn->ibc_refcount, 1 + IBLND_RX_MSGS);        conn->ibc_nrx = IBLND_RX_MSGS;        /* post receives */        for (i = 0; i < IBLND_RX_MSGS; i++) {                rc = kiblnd_post_rx(&conn->ibc_rxs[i],                                    IBLND_POSTRX_NO_CREDIT);                if (rc != 0) {                        CERROR("Can't post rxmsg: %d\n", rc);                        /* Make posted receives complete */                        kiblnd_abort_receives(conn);                        /* correct # of posted buffers                          * NB locking needed now I'm racing with completion */                        spin_lock_irqsave(&kiblnd_data.kib_sched_lock, flags);                        conn->ibc_nrx -= IBLND_RX_MSGS - i;                        spin_unlock_irqrestore(&kiblnd_data.kib_sched_lock,                                               flags);                        /* Drop my own and unused rxbuffer refcounts */                        while (i++ <= IBLND_RX_MSGS)                                kiblnd_conn_decref(conn);                        return NULL;                }        }                /* Init successful! */        LASSERT (state == IBLND_CONN_ACTIVE_CONNECT ||                 state == IBLND_CONN_PASSIVE_WAIT);        conn->ibc_state = state;        /* 1 more conn */        atomic_inc(&net->ibn_nconns);        return conn; failed_2:        kiblnd_destroy_conn(conn); failed_1:        LIBCFS_FREE(init_qp_attr, sizeof(*init_qp_attr)); failed_0:        return NULL;}voidkiblnd_destroy_conn (kib_conn_t *conn){        struct rdma_cm_id *cmid = conn->ibc_cmid;        kib_peer_t        *peer = conn->ibc_peer;        int                rc;        int                i;        LASSERT (!in_interrupt());        LASSERT (atomic_read(&conn->ibc_refcount) == 0);        LASSERT (list_empty(&conn->ibc_early_rxs));        LASSERT (list_empty(&conn->ibc_tx_queue));        LASSERT (list_empty(&conn->ibc_tx_queue_rsrvd));        LASSERT (list_empty(&conn->ibc_tx_queue_nocred));        LASSERT (list_empty(&conn->ibc_active_txs));        LASSERT (conn->ibc_nsends_posted == 0);        switch (conn->ibc_state) {        default:                /* conn must be completely disengaged from the network */                LBUG();

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -