⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 openiblnd.c

📁 非常经典的一个分布式系统
💻 C
📖 第 1 页 / 共 5 页
字号:
intkibnal_get_peer_info (int index, lnet_nid_t *nidp, __u32 *ipp, int *portp,                      int *persistencep){        kib_peer_t        *peer;        struct list_head  *ptmp;        unsigned long      flags;        int                i;        read_lock_irqsave(&kibnal_data.kib_global_lock, flags);        for (i = 0; i < kibnal_data.kib_peer_hash_size; i++) {                list_for_each (ptmp, &kibnal_data.kib_peers[i]) {                                                peer = list_entry (ptmp, kib_peer_t, ibp_list);                        LASSERT (peer->ibp_persistence != 0 ||                                 peer->ibp_connecting != 0 ||                                 peer->ibp_accepting != 0 ||                                 !list_empty (&peer->ibp_conns));                        if (index-- > 0)                                continue;                        *nidp = peer->ibp_nid;                        *ipp = peer->ibp_ip;                        *portp = peer->ibp_port;                        *persistencep = peer->ibp_persistence;                                                read_unlock_irqrestore(&kibnal_data.kib_global_lock,                                               flags);                        return (0);                }        }        read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);        return (-ENOENT);}intkibnal_add_persistent_peer (lnet_nid_t nid, __u32 ip, int port){        unsigned long      flags;        kib_peer_t        *peer;        kib_peer_t        *peer2;        int                rc;                if (nid == LNET_NID_ANY)                return (-EINVAL);        rc = kibnal_create_peer (&peer, nid);        if (rc != 0)                return rc;        write_lock_irqsave (&kibnal_data.kib_global_lock, flags);        /* I'm always called with a reference on kibnal_data.kib_ni         * so shutdown can't have started */        LASSERT (kibnal_data.kib_nonewpeers == 0);        peer2 = kibnal_find_peer_locked (nid);        if (peer2 != NULL) {                kibnal_peer_decref(peer);                peer = peer2;        } else {                /* peer table takes existing ref on peer */                list_add_tail (&peer->ibp_list,                               kibnal_nid2peerlist (nid));        }        peer->ibp_ip = ip;        peer->ibp_port = port;        peer->ibp_persistence++;                write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);        return (0);}voidkibnal_del_peer_locked (kib_peer_t *peer){        struct list_head *ctmp;        struct list_head *cnxt;        kib_conn_t       *conn;        peer->ibp_persistence = 0;        if (list_empty(&peer->ibp_conns)) {                kibnal_unlink_peer_locked(peer);        } else {                list_for_each_safe (ctmp, cnxt, &peer->ibp_conns) {                        conn = list_entry(ctmp, kib_conn_t, ibc_list);                        kibnal_close_conn_locked (conn, 0);                }                /* NB peer is no longer persistent; closing its last conn                 * unlinked it. */        }        /* NB peer now unlinked; might even be freed if the peer table had the         * last ref on it. */}intkibnal_del_peer (lnet_nid_t nid){        unsigned long      flags;        CFS_LIST_HEAD     (zombies);        struct list_head  *ptmp;        struct list_head  *pnxt;        kib_peer_t        *peer;        int                lo;        int                hi;        int                i;        int                rc = -ENOENT;        write_lock_irqsave (&kibnal_data.kib_global_lock, flags);        if (nid != LNET_NID_ANY)                lo = hi = kibnal_nid2peerlist(nid) - kibnal_data.kib_peers;        else {                lo = 0;                hi = kibnal_data.kib_peer_hash_size - 1;        }        for (i = lo; i <= hi; i++) {                list_for_each_safe (ptmp, pnxt, &kibnal_data.kib_peers[i]) {                        peer = list_entry (ptmp, kib_peer_t, ibp_list);                        LASSERT (peer->ibp_persistence != 0 ||                                 peer->ibp_connecting != 0 ||                                 peer->ibp_accepting != 0 ||                                 !list_empty (&peer->ibp_conns));                        if (!(nid == LNET_NID_ANY || peer->ibp_nid == nid))                                continue;                        if (!list_empty(&peer->ibp_tx_queue)) {                                LASSERT (list_empty(&peer->ibp_conns));                                list_splice_init(&peer->ibp_tx_queue, &zombies);                        }                        kibnal_del_peer_locked (peer);                        rc = 0;         /* matched something */                }        }        write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);        kibnal_txlist_done(&zombies, -EIO);        return (rc);}kib_conn_t *kibnal_get_conn_by_idx (int index){        kib_peer_t        *peer;        struct list_head  *ptmp;        kib_conn_t        *conn;        struct list_head  *ctmp;        unsigned long      flags;        int                i;        read_lock_irqsave(&kibnal_data.kib_global_lock, flags);        for (i = 0; i < kibnal_data.kib_peer_hash_size; i++) {                list_for_each (ptmp, &kibnal_data.kib_peers[i]) {                        peer = list_entry (ptmp, kib_peer_t, ibp_list);                        LASSERT (peer->ibp_persistence > 0 ||                                 peer->ibp_connecting != 0 ||                                 peer->ibp_accepting != 0 ||                                 !list_empty (&peer->ibp_conns));                        list_for_each (ctmp, &peer->ibp_conns) {                                if (index-- > 0)                                        continue;                                conn = list_entry (ctmp, kib_conn_t, ibc_list);                                kibnal_conn_addref(conn);                                read_unlock_irqrestore(&kibnal_data.kib_global_lock,                                                       flags);                                return (conn);                        }                }        }        read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);        return (NULL);}kib_conn_t *kibnal_create_conn (void){        kib_conn_t  *conn;        int          i;        __u64        vaddr = 0;        __u64        vaddr_base;        int          page_offset;        int          ipage;        int          rc;        union {                struct ib_qp_create_param  qp_create;                struct ib_qp_attribute     qp_attr;        } params;                LIBCFS_ALLOC (conn, sizeof (*conn));        if (conn == NULL) {                CERROR ("Can't allocate connection\n");                return (NULL);        }        /* zero flags, NULL pointers etc... */        memset (conn, 0, sizeof (*conn));        INIT_LIST_HEAD (&conn->ibc_tx_queue_nocred);        INIT_LIST_HEAD (&conn->ibc_tx_queue);        INIT_LIST_HEAD (&conn->ibc_tx_queue_rsrvd);        INIT_LIST_HEAD (&conn->ibc_active_txs);        spin_lock_init (&conn->ibc_lock);                atomic_inc (&kibnal_data.kib_nconns);        /* well not really, but I call destroy() on failure, which decrements */        LIBCFS_ALLOC (conn->ibc_rxs, IBNAL_RX_MSGS * sizeof (kib_rx_t));        if (conn->ibc_rxs == NULL)                goto failed;        memset (conn->ibc_rxs, 0, IBNAL_RX_MSGS * sizeof(kib_rx_t));        rc = kibnal_alloc_pages(&conn->ibc_rx_pages,                                IBNAL_RX_MSG_PAGES,                                IB_ACCESS_LOCAL_WRITE);        if (rc != 0)                goto failed;        vaddr_base = vaddr = conn->ibc_rx_pages->ibp_vaddr;        for (i = ipage = page_offset = 0; i < IBNAL_RX_MSGS; i++) {                struct page *page = conn->ibc_rx_pages->ibp_pages[ipage];                kib_rx_t   *rx = &conn->ibc_rxs[i];                rx->rx_conn = conn;                rx->rx_vaddr = vaddr;                rx->rx_msg = (kib_msg_t *)(((char *)page_address(page)) + page_offset);                                vaddr += IBNAL_MSG_SIZE;                LASSERT (vaddr <= vaddr_base + IBNAL_RX_MSG_BYTES);                                page_offset += IBNAL_MSG_SIZE;                LASSERT (page_offset <= PAGE_SIZE);                if (page_offset == PAGE_SIZE) {                        page_offset = 0;                        ipage++;                        LASSERT (ipage <= IBNAL_RX_MSG_PAGES);                }        }        /* We can post up to IBLND_MSG_QUEUE_SIZE immediate/req messages and         * the same # of ack/nak/rdma+done messages */        params.qp_create = (struct ib_qp_create_param) {                .limit = {                        .max_outstanding_send_request    = 3 * IBNAL_MSG_QUEUE_SIZE,                        .max_outstanding_receive_request = IBNAL_RX_MSGS,                        .max_send_gather_element         = 1,                        .max_receive_scatter_element     = 1,                },                .pd              = kibnal_data.kib_pd,                .send_queue      = kibnal_data.kib_cq,                .receive_queue   = kibnal_data.kib_cq,                .send_policy     = IB_WQ_SIGNAL_SELECTABLE,                .receive_policy  = IB_WQ_SIGNAL_SELECTABLE,                .rd_domain       = 0,                .transport       = IB_TRANSPORT_RC,                .device_specific = NULL,        };                rc = ib_qp_create (&params.qp_create, &conn->ibc_qp, &conn->ibc_qpn);        if (rc != 0) {                CERROR ("Failed to create queue pair: %d\n", rc);                goto failed;        }                /* Mark QP created */        conn->ibc_state = IBNAL_CONN_INIT_QP;        params.qp_attr = (struct ib_qp_attribute) {                .state             = IB_QP_STATE_INIT,                .port              = kibnal_data.kib_port,                .enable_rdma_read  = 1,                .enable_rdma_write = 1,                .valid_fields      = (IB_QP_ATTRIBUTE_STATE |                                      IB_QP_ATTRIBUTE_PORT |                                      IB_QP_ATTRIBUTE_PKEY_INDEX |                                      IB_QP_ATTRIBUTE_RDMA_ATOMIC_ENABLE),        };        rc = ib_qp_modify(conn->ibc_qp, &params.qp_attr);        if (rc != 0) {                CERROR ("Failed to modify queue pair: %d\n", rc);                goto failed;        }        /* 1 ref for caller */        atomic_set (&conn->ibc_refcount, 1);        return (conn);         failed:        kibnal_destroy_conn (conn);        return (NULL);}voidkibnal_destroy_conn (kib_conn_t *conn){        int    rc;                CDEBUG (D_NET, "connection %p\n", conn);        LASSERT (atomic_read (&conn->ibc_refcount) == 0);        LASSERT (list_empty(&conn->ibc_tx_queue));        LASSERT (list_empty(&conn->ibc_tx_queue_rsrvd));        LASSERT (list_empty(&conn->ibc_tx_queue_nocred));        LASSERT (list_empty(&conn->ibc_active_txs));        LASSERT (conn->ibc_nsends_posted == 0);        LASSERT (conn->ibc_connreq == NULL);        switch (conn->ibc_state) {        case IBNAL_CONN_ZOMBIE:                /* called after connection sequence initiated */        case IBNAL_CONN_INIT_QP:                rc = ib_qp_destroy(conn->ibc_qp);                if (rc != 0)                        CERROR("Can't destroy QP: %d\n", rc);                /* fall through */                        case IBNAL_CONN_INIT_NOTHING:                break;        default:                LASSERT (0);        }        if (conn->ibc_rx_pages != NULL)                 kibnal_free_pages(conn->ibc_rx_pages);                if (conn->ibc_rxs != NULL)                LIBCFS_FREE(conn->ibc_rxs,                             IBNAL_RX_MSGS * sizeof(kib_rx_t));        if (conn->ibc_peer != NULL)                kibnal_peer_decref(conn->ibc_peer);        LIBCFS_FREE(conn, sizeof (*conn));        atomic_dec(&kibnal_data.kib_nconns);                if (atomic_read (&kibnal_data.kib_nconns) == 0 &&            kibnal_data.kib_shutdown) {                /* I just nuked the last connection on shutdown; wake up                 * everyone so they can exit. */                wake_up_all(&kibnal_data.kib_sched_waitq);                wake_up_all(&kibnal_data.kib_reaper_waitq);        }}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -