⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 viblnd.c

📁 非常经典的一个分布式系统
💻 C
📖 第 1 页 / 共 5 页
字号:
        struct list_head  *ptmp;        struct list_head  *pnxt;        kib_peer_t        *peer;        int                lo;        int                hi;        int                i;        unsigned long      flags;        int                rc = -ENOENT;        write_lock_irqsave(&kibnal_data.kib_global_lock, flags);        if (nid != LNET_NID_ANY)                lo = hi = kibnal_nid2peerlist(nid) - kibnal_data.kib_peers;        else {                lo = 0;                hi = kibnal_data.kib_peer_hash_size - 1;        }        for (i = lo; i <= hi; i++) {                list_for_each_safe (ptmp, pnxt, &kibnal_data.kib_peers[i]) {                        peer = list_entry (ptmp, kib_peer_t, ibp_list);                        LASSERT (peer->ibp_persistence != 0 ||                                 peer->ibp_connecting != 0 ||                                 peer->ibp_accepting != 0 ||                                 !list_empty (&peer->ibp_conns));                        if (!(nid == LNET_NID_ANY || peer->ibp_nid == nid))                                continue;                        if (!list_empty(&peer->ibp_tx_queue)) {                                LASSERT (list_empty(&peer->ibp_conns));                                list_splice_init(&peer->ibp_tx_queue, &zombies);                        }                        kibnal_del_peer_locked (peer);                        rc = 0;         /* matched something */                }        }        write_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);        kibnal_txlist_done(&zombies, -EIO);        return (rc);}kib_conn_t *kibnal_get_conn_by_idx (int index){        kib_peer_t        *peer;        struct list_head  *ptmp;        kib_conn_t        *conn;        struct list_head  *ctmp;        int                i;        unsigned long      flags;        read_lock_irqsave(&kibnal_data.kib_global_lock, flags);        for (i = 0; i < kibnal_data.kib_peer_hash_size; i++) {                list_for_each (ptmp, &kibnal_data.kib_peers[i]) {                        peer = list_entry (ptmp, kib_peer_t, ibp_list);                        LASSERT (peer->ibp_persistence > 0 ||                                 peer->ibp_connecting != 0 ||                                 peer->ibp_accepting != 0 ||                                 !list_empty (&peer->ibp_conns));                        list_for_each (ctmp, &peer->ibp_conns) {                                if (index-- > 0)                                        continue;                                conn = list_entry (ctmp, kib_conn_t, ibc_list);                                kibnal_conn_addref(conn);                                read_unlock_irqrestore(&kibnal_data.kib_global_lock,                                                       flags);                                return (conn);                        }                }        }        read_unlock_irqrestore(&kibnal_data.kib_global_lock, flags);        return (NULL);}voidkibnal_debug_rx (kib_rx_t *rx){        CDEBUG(D_CONSOLE, "      %p nob %d msg_type %x "               "cred %d seq "LPD64"\n",               rx, rx->rx_nob, rx->rx_msg->ibm_type,               rx->rx_msg->ibm_credits, rx->rx_msg->ibm_seq);}voidkibnal_debug_tx (kib_tx_t *tx){        CDEBUG(D_CONSOLE, "      %p snd %d q %d w %d rc %d dl %lx "               "cookie "LPX64" msg %s%s type %x cred %d seq "LPD64"\n",               tx, tx->tx_sending, tx->tx_queued, tx->tx_waiting,               tx->tx_status, tx->tx_deadline, tx->tx_cookie,               tx->tx_lntmsg[0] == NULL ? "-" : "!",               tx->tx_lntmsg[1] == NULL ? "-" : "!",               tx->tx_msg->ibm_type, tx->tx_msg->ibm_credits,               tx->tx_msg->ibm_seq);}voidkibnal_debug_conn (kib_conn_t *conn){        struct list_head *tmp;        int               i;                spin_lock(&conn->ibc_lock);                CDEBUG(D_CONSOLE, "conn[%d] %p -> %s: \n",                atomic_read(&conn->ibc_refcount), conn,                libcfs_nid2str(conn->ibc_peer->ibp_nid));        CDEBUG(D_CONSOLE, "   txseq "LPD64" rxseq "LPD64" state %d \n",               conn->ibc_txseq, conn->ibc_rxseq, conn->ibc_state);        CDEBUG(D_CONSOLE, "   nposted %d cred %d o_cred %d r_cred %d\n",               conn->ibc_nsends_posted, conn->ibc_credits,                conn->ibc_outstanding_credits, conn->ibc_reserved_credits);        CDEBUG(D_CONSOLE, "   disc %d comms_err %d\n",               conn->ibc_disconnect, conn->ibc_comms_error);        CDEBUG(D_CONSOLE, "   early_rxs:\n");        list_for_each(tmp, &conn->ibc_early_rxs)                kibnal_debug_rx(list_entry(tmp, kib_rx_t, rx_list));                CDEBUG(D_CONSOLE, "   tx_queue_nocred:\n");        list_for_each(tmp, &conn->ibc_tx_queue_nocred)                kibnal_debug_tx(list_entry(tmp, kib_tx_t, tx_list));        CDEBUG(D_CONSOLE, "   tx_queue_rsrvd:\n");        list_for_each(tmp, &conn->ibc_tx_queue_rsrvd)                kibnal_debug_tx(list_entry(tmp, kib_tx_t, tx_list));        CDEBUG(D_CONSOLE, "   tx_queue:\n");        list_for_each(tmp, &conn->ibc_tx_queue)                kibnal_debug_tx(list_entry(tmp, kib_tx_t, tx_list));                CDEBUG(D_CONSOLE, "   active_txs:\n");        list_for_each(tmp, &conn->ibc_active_txs)                kibnal_debug_tx(list_entry(tmp, kib_tx_t, tx_list));                CDEBUG(D_CONSOLE, "   rxs:\n");        for (i = 0; i < IBNAL_RX_MSGS; i++)                kibnal_debug_rx(&conn->ibc_rxs[i]);        spin_unlock(&conn->ibc_lock);}intkibnal_set_qp_state (kib_conn_t *conn, vv_qp_state_t new_state){        static vv_qp_attr_t attr;                kib_connvars_t   *cv = conn->ibc_connvars;        vv_return_t       vvrc;                /* Only called by connd => static OK */        LASSERT (!in_interrupt());        LASSERT (current == kibnal_data.kib_connd);        memset(&attr, 0, sizeof(attr));                switch (new_state) {        default:                LBUG();                        case vv_qp_state_init: {                struct vv_qp_modify_init_st *init = &attr.modify.params.init;                init->p_key_indx     = cv->cv_pkey_index;                init->phy_port_num   = cv->cv_port;                init->q_key          = IBNAL_QKEY; /* XXX but VV_QP_AT_Q_KEY not set! */                init->access_control = vv_acc_r_mem_read |                                       vv_acc_r_mem_write; /* XXX vv_acc_l_mem_write ? */                attr.modify.vv_qp_attr_mask = VV_QP_AT_P_KEY_IX |                                               VV_QP_AT_PHY_PORT_NUM |                                              VV_QP_AT_ACCESS_CON_F;                break;        }        case vv_qp_state_rtr: {                struct vv_qp_modify_rtr_st *rtr = &attr.modify.params.rtr;                vv_add_vec_t               *av  = &rtr->remote_add_vec;                av->dlid                      = cv->cv_path.dlid;                av->grh_flag                  = (!IBNAL_LOCAL_SUB);                av->max_static_rate           = IBNAL_R_2_STATIC_RATE(cv->cv_path.rate);                av->service_level             = cv->cv_path.sl;                av->source_path_bit           = IBNAL_SOURCE_PATH_BIT;                av->pmtu                      = cv->cv_path.mtu;                av->rnr_retry_count           = cv->cv_rnr_count;                av->global_dest.traffic_class = cv->cv_path.traffic_class;                av->global_dest.hope_limit    = cv->cv_path.hop_limut;                av->global_dest.flow_lable    = cv->cv_path.flow_label;                av->global_dest.s_gid_index   = cv->cv_sgid_index;                // XXX other av fields zero?                rtr->destanation_qp            = cv->cv_remote_qpn;                rtr->receive_psn               = cv->cv_rxpsn;                rtr->responder_rdma_r_atom_num = IBNAL_OUS_DST_RD;                rtr->opt_min_rnr_nak_timer     = *kibnal_tunables.kib_rnr_nak_timer;                // XXX sdp sets VV_QP_AT_OP_F but no actual optional options                attr.modify.vv_qp_attr_mask = VV_QP_AT_ADD_VEC |                                               VV_QP_AT_DEST_QP |                                              VV_QP_AT_R_PSN |                                               VV_QP_AT_MIN_RNR_NAK_T |                                              VV_QP_AT_RESP_RDMA_ATOM_OUT_NUM |                                              VV_QP_AT_OP_F;                break;        }        case vv_qp_state_rts: {                struct vv_qp_modify_rts_st *rts = &attr.modify.params.rts;                rts->send_psn                 = cv->cv_txpsn;                rts->local_ack_timeout        = *kibnal_tunables.kib_local_ack_timeout;                rts->retry_num                = *kibnal_tunables.kib_retry_cnt;                rts->rnr_num                  = *kibnal_tunables.kib_rnr_cnt;                rts->dest_out_rdma_r_atom_num = IBNAL_OUS_DST_RD;                                attr.modify.vv_qp_attr_mask = VV_QP_AT_S_PSN |                                              VV_QP_AT_L_ACK_T |                                              VV_QP_AT_RETRY_NUM |                                              VV_QP_AT_RNR_NUM |                                              VV_QP_AT_DEST_RDMA_ATOM_OUT_NUM;                break;        }        case vv_qp_state_error:        case vv_qp_state_reset:                attr.modify.vv_qp_attr_mask = 0;                break;        }                        attr.modify.qp_modify_into_state = new_state;        attr.modify.vv_qp_attr_mask |= VV_QP_AT_STATE;                vvrc = vv_qp_modify(kibnal_data.kib_hca, conn->ibc_qp, &attr, NULL);        if (vvrc != vv_return_ok) {                CERROR("Can't modify qp -> %s state to %d: %d\n",                        libcfs_nid2str(conn->ibc_peer->ibp_nid),                       new_state, vvrc);                return -EIO;        }                return 0;}kib_conn_t *kibnal_create_conn (cm_cep_handle_t cep){        kib_conn_t   *conn;        int           i;        int           page_offset;        int           ipage;        vv_return_t   vvrc;        int           rc;        static vv_qp_attr_t  reqattr;        static vv_qp_attr_t  rspattr;        /* Only the connd creates conns => single threaded */        LASSERT(!in_interrupt());        LASSERT(current == kibnal_data.kib_connd);                LIBCFS_ALLOC(conn, sizeof (*conn));        if (conn == NULL) {                CERROR ("Can't allocate connection\n");                return (NULL);        }        /* zero flags, NULL pointers etc... */        memset (conn, 0, sizeof (*conn));        conn->ibc_version = IBNAL_MSG_VERSION;  /* Use latest version at first */        INIT_LIST_HEAD (&conn->ibc_early_rxs);        INIT_LIST_HEAD (&conn->ibc_tx_queue_nocred);        INIT_LIST_HEAD (&conn->ibc_tx_queue);        INIT_LIST_HEAD (&conn->ibc_tx_queue_rsrvd);        INIT_LIST_HEAD (&conn->ibc_active_txs);        spin_lock_init (&conn->ibc_lock);                atomic_inc (&kibnal_data.kib_nconns);        /* well not really, but I call destroy() on failure, which decrements */        conn->ibc_cep = cep;        LIBCFS_ALLOC(conn->ibc_connvars, sizeof(*conn->ibc_connvars));        if (conn->ibc_connvars == NULL) {                CERROR("Can't allocate in-progress connection state\n");                goto failed;        }        memset (conn->ibc_connvars, 0, sizeof(*conn->ibc_connvars));        /* Random seed for QP sequence number */        get_random_bytes(&conn->ibc_connvars->cv_rxpsn,                         sizeof(conn->ibc_connvars->cv_rxpsn));        LIBCFS_ALLOC(conn->ibc_rxs, IBNAL_RX_MSGS * sizeof (kib_rx_t));        if (conn->ibc_rxs == NULL) {                CERROR("Cannot allocate RX buffers\n");                goto failed;        }        memset (conn->ibc_rxs, 0, IBNAL_RX_MSGS * sizeof(kib_rx_t));        rc = kibnal_alloc_pages(&conn->ibc_rx_pages, IBNAL_RX_MSG_PAGES, 1);        if (rc != 0)                goto failed;        for (i = ipage = page_offset = 0; i < IBNAL_RX_MSGS; i++) {                struct page    *page = conn->ibc_rx_pages->ibp_pages[ipage];                kib_rx_t       *rx = &conn->ibc_rxs[i];                vv_mem_reg_h_t  mem_h;                vv_r_key_t      r_key;                rx->rx_conn = conn;                rx->rx_msg = (kib_msg_t *)(((char *)page_address(page)) +                              page_offset);                vvrc = vv_get_gen_mr_attrib(kibnal_data.kib_hca,                                            rx->rx_msg,                                            IBNAL_MSG_SIZE,                                            &mem_h,                                            &rx->rx_lkey,                                            &r_key);                LASSERT (vvrc == vv_return_ok);                CDEBUG(D_NET, "Rx[%d] %p->%p[%x]\n", i, rx,                        rx->rx_msg, rx->rx_lkey);                page_offset += IBNAL_MSG_SIZE;                LASSERT (page_offset <= PAGE_SIZE);                if (page_offset == PAGE_SIZE) {                        page_offset = 0;                        ipage++;                        LASSERT (ipage <= IBNAL_RX_MSG_PAGES);                }        }        memset(&reqattr, 0, sizeof(reqattr));        reqattr.create.qp_type                    = vv_qp_type_r_conn;        reqattr.create.cq_send_h                  = kibnal_data.kib_cq;        reqattr.create.cq_receive_h               = kibnal_data.kib_cq;        reqattr.create.send_max_outstand_wr       = (1 + IBNAL_MAX_RDMA_FRAGS) *                                                     (*kibnal_tunables.kib_concurrent_sends);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -