⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 viblnd_cb.c

📁 非常经典的一个分布式系统
💻 C
📖 第 1 页 / 共 5 页
字号:
                        conn->ibc_nsends_posted--;                        tx->tx_status = rc;                        tx->tx_waiting = 0;                        tx->tx_sending--;                                                done = (tx->tx_sending == 0);                        if (done)                                list_del (&tx->tx_list);                                                spin_unlock(&conn->ibc_lock);                                                if (conn->ibc_state == IBNAL_CONN_ESTABLISHED)                                CERROR ("Error %d posting transmit to %s\n",                                         vvrc, libcfs_nid2str(conn->ibc_peer->ibp_nid));                        else                                CDEBUG (D_NET, "Error %d posting transmit to %s\n",                                        rc, libcfs_nid2str(conn->ibc_peer->ibp_nid));                        kibnal_close_conn (conn, rc);                        if (done)                                kibnal_tx_done (tx);                        return;                }        }        spin_unlock(&conn->ibc_lock);}voidkibnal_tx_complete (kib_tx_t *tx, vv_comp_status_t vvrc){        kib_conn_t   *conn = tx->tx_conn;        int           failed = (vvrc != vv_comp_status_success);        int           idle;        CDEBUG(D_NET, "tx %p conn %p sending %d nwrq %d vvrc %d\n",                tx, conn, tx->tx_sending, tx->tx_nwrq, vvrc);        LASSERT (tx->tx_sending > 0);        if (failed &&            tx->tx_status == 0 &&            conn->ibc_state == IBNAL_CONN_ESTABLISHED)                CDEBUG(D_NETERROR, "tx -> %s type %x cookie "LPX64                       "sending %d waiting %d: failed %d\n",                        libcfs_nid2str(conn->ibc_peer->ibp_nid),                       tx->tx_msg->ibm_type, tx->tx_cookie,                       tx->tx_sending, tx->tx_waiting, vvrc);        spin_lock(&conn->ibc_lock);        /* I could be racing with rdma completion.  Whoever makes 'tx' idle         * gets to free it, which also drops its ref on 'conn'. */        tx->tx_sending--;        conn->ibc_nsends_posted--;        if (failed) {                tx->tx_waiting = 0;                tx->tx_status = -EIO;        }                idle = (tx->tx_sending == 0) &&         /* This is the final callback */               !tx->tx_waiting &&               /* Not waiting for peer */               !tx->tx_queued;                  /* Not re-queued (PUT_DONE) */        if (idle)                list_del(&tx->tx_list);        kibnal_conn_addref(conn);               /* 1 ref for me.... */        spin_unlock(&conn->ibc_lock);        if (idle)                kibnal_tx_done (tx);        if (failed) {                kibnal_close_conn (conn, -EIO);        } else {                kibnal_peer_alive(conn->ibc_peer);                kibnal_check_sends(conn);        }        kibnal_conn_decref(conn);               /* ...until here */}voidkibnal_init_tx_msg (kib_tx_t *tx, int type, int body_nob){        vv_scatgat_t *gl = &tx->tx_gl[tx->tx_nwrq];        vv_wr_t      *wrq = &tx->tx_wrq[tx->tx_nwrq];        int           nob = offsetof (kib_msg_t, ibm_u) + body_nob;        __u64         addr = (__u64)((unsigned long)((tx)->tx_msg));        LASSERT (tx->tx_nwrq >= 0 &&                  tx->tx_nwrq < (1 + IBNAL_MAX_RDMA_FRAGS));        LASSERT (nob <= IBNAL_MSG_SIZE);        kibnal_init_msg(tx->tx_msg, type, body_nob);        *gl = (vv_scatgat_t) {                .v_address = KIBNAL_ADDR2SG(addr),                .l_key     = tx->tx_lkey,                .length    = nob,        };        memset(wrq, 0, sizeof(*wrq));        wrq->wr_id = kibnal_ptr2wreqid(tx, IBNAL_WID_TX);        wrq->wr_type = vv_wr_send;        wrq->scatgat_list = gl;        wrq->num_of_data_segments = 1;        wrq->completion_notification = 1;        wrq->type.send.solicited_event = 1;        wrq->type.send.immidiate_data_indicator = 0;        wrq->type.send.send_qp_type.rc_type.fance_indicator = 0;                tx->tx_nwrq++;}intkibnal_init_rdma (kib_tx_t *tx, int type, int nob,                  kib_rdma_desc_t *dstrd, __u64 dstcookie){        kib_msg_t       *ibmsg = tx->tx_msg;        kib_rdma_desc_t *srcrd = tx->tx_rd;        vv_scatgat_t    *gl;        vv_wr_t         *wrq;        int              rc;#if IBNAL_USE_FMR        LASSERT (tx->tx_nwrq == 0);        gl = &tx->tx_gl[0];        gl->length    = nob;        gl->v_address = KIBNAL_ADDR2SG(srcrd->rd_addr);        gl->l_key     = srcrd->rd_key;        wrq = &tx->tx_wrq[0];        wrq->wr_id = kibnal_ptr2wreqid(tx, IBNAL_WID_RDMA);        wrq->completion_notification = 0;        wrq->scatgat_list = gl;        wrq->num_of_data_segments = 1;        wrq->wr_type = vv_wr_rdma_write;        wrq->type.send.solicited_event = 0;        wrq->type.send.send_qp_type.rc_type.fance_indicator = 0;        wrq->type.send.send_qp_type.rc_type.r_addr = dstrd->rd_addr;        wrq->type.send.send_qp_type.rc_type.r_r_key = dstrd->rd_key;        tx->tx_nwrq = 1;        rc = nob;#else        /* CAVEAT EMPTOR: this 'consumes' the frags in 'dstrd' */        int              resid = nob;        kib_rdma_frag_t *srcfrag;        int              srcidx;        kib_rdma_frag_t *dstfrag;        int              dstidx;        int              wrknob;        /* Called by scheduler */        LASSERT (!in_interrupt());        LASSERT (type == IBNAL_MSG_GET_DONE ||                 type == IBNAL_MSG_PUT_DONE);        srcidx = dstidx = 0;        srcfrag = &srcrd->rd_frags[0];        dstfrag = &dstrd->rd_frags[0];        rc = resid;        while (resid > 0) {                if (srcidx >= srcrd->rd_nfrag) {                        CERROR("Src buffer exhausted: %d frags\n", srcidx);                        rc = -EPROTO;                        break;                }                                if (dstidx == dstrd->rd_nfrag) {                        CERROR("Dst buffer exhausted: %d frags\n", dstidx);                        rc = -EPROTO;                        break;                }                if (tx->tx_nwrq == IBNAL_MAX_RDMA_FRAGS) {                        CERROR("RDMA too fragmented: %d/%d src %d/%d dst frags\n",                               srcidx, srcrd->rd_nfrag,                               dstidx, dstrd->rd_nfrag);                        rc = -EMSGSIZE;                        break;                }                wrknob = MIN(MIN(srcfrag->rf_nob, dstfrag->rf_nob), resid);                gl = &tx->tx_gl[tx->tx_nwrq];                gl->v_address = KIBNAL_ADDR2SG(kibnal_rf_addr(srcfrag));                gl->length    = wrknob;                gl->l_key     = srcrd->rd_key;                wrq = &tx->tx_wrq[tx->tx_nwrq];                wrq->wr_id = kibnal_ptr2wreqid(tx, IBNAL_WID_RDMA);                wrq->completion_notification = 0;                wrq->scatgat_list = gl;                wrq->num_of_data_segments = 1;                wrq->wr_type = vv_wr_rdma_write;                wrq->type.send.solicited_event = 0;                wrq->type.send.send_qp_type.rc_type.fance_indicator = 0;                wrq->type.send.send_qp_type.rc_type.r_addr = kibnal_rf_addr(dstfrag);                wrq->type.send.send_qp_type.rc_type.r_r_key = dstrd->rd_key;                resid -= wrknob;                if (wrknob < srcfrag->rf_nob) {                        kibnal_rf_set(srcfrag,                                       kibnal_rf_addr(srcfrag) + wrknob,                                       srcfrag->rf_nob - wrknob);                } else {                        srcfrag++;                        srcidx++;                }                                if (wrknob < dstfrag->rf_nob) {                        kibnal_rf_set(dstfrag,                                      kibnal_rf_addr(dstfrag) + wrknob,                                      dstfrag->rf_nob - wrknob);                } else {                        dstfrag++;                        dstidx++;                }                                tx->tx_nwrq++;        }        if (rc < 0)                             /* no RDMA if completing with failure */                tx->tx_nwrq = 0;#endif                ibmsg->ibm_u.completion.ibcm_status = rc;        ibmsg->ibm_u.completion.ibcm_cookie = dstcookie;        kibnal_init_tx_msg(tx, type, sizeof (kib_completion_msg_t));        return rc;}voidkibnal_queue_tx (kib_tx_t *tx, kib_conn_t *conn){        spin_lock(&conn->ibc_lock);        kibnal_queue_tx_locked (tx, conn);        spin_unlock(&conn->ibc_lock);                kibnal_check_sends(conn);}voidkibnal_schedule_peer_arp (kib_peer_t *peer){        unsigned long flags;        LASSERT (peer->ibp_connecting != 0);        LASSERT (peer->ibp_arp_count > 0);        kibnal_peer_addref(peer); /* extra ref for connd */        spin_lock_irqsave(&kibnal_data.kib_connd_lock, flags);        list_add_tail (&peer->ibp_connd_list, &kibnal_data.kib_connd_peers);        wake_up (&kibnal_data.kib_connd_waitq);        spin_unlock_irqrestore(&kibnal_data.kib_connd_lock, flags);}voidkibnal_launch_tx (kib_tx_t *tx, lnet_nid_t nid){        kib_peer_t      *peer;        kib_conn_t      *conn;        unsigned long    flags;        rwlock_t        *g_lock = &kibnal_data.kib_global_lock;        int              retry;        int              rc;        /* If I get here, I've committed to send, so I complete the tx with         * failure on any problems */                LASSERT (tx->tx_conn == NULL);          /* only set when assigned a conn */        LASSERT (tx->tx_nwrq > 0);              /* work items have been set up */        for (retry = 0; ; retry = 1) {                read_lock_irqsave(g_lock, flags);                        peer = kibnal_find_peer_locked (nid);                if (peer != NULL) {                        conn = kibnal_find_conn_locked (peer);                        if (conn != NULL) {                                kibnal_conn_addref(conn); /* 1 ref for me... */                                read_unlock_irqrestore(g_lock, flags);                                kibnal_queue_tx (tx, conn);                                kibnal_conn_decref(conn); /* ...to here */                                return;                        }                }                                /* Making one or more connections; I'll need a write lock... */                read_unlock(g_lock);                write_lock(g_lock);                peer = kibnal_find_peer_locked (nid);                if (peer != NULL)                        break;                write_unlock_irqrestore(g_lock, flags);                if (retry) {                        CERROR("Can't find peer %s\n", libcfs_nid2str(nid));                        tx->tx_status = -EHOSTUNREACH;                        tx->tx_waiting = 0;                        kibnal_tx_done (tx);                        return;                }                rc = kibnal_add_persistent_peer(nid, LNET_NIDADDR(nid));                if (rc != 0) {                        CERROR("Can't add peer %s: %d\n",                               libcfs_nid2str(nid), rc);                                                tx->tx_status = -EHOSTUNREACH;                        tx->tx_waiting = 0;                        kibnal_tx_done (tx);                        return;                }        }        conn = kibnal_find_conn_locked (peer);        if (conn != NULL) {                /* Connection exists; queue message on it */                kibnal_conn_addref(conn);       /* 1 ref for me... */                write_unlock_irqrestore(g_lock, flags);                                kibnal_queue_tx (tx, conn);                kibnal_conn_decref(conn);       /* ...until here */                return;        }        if (peer->ibp_connecting == 0 &&            peer->ibp_accepting == 0) {                if (!(peer->ibp_reconnect_interval == 0 || /* first attempt */                      time_after_eq(jiffies, peer->ibp_reconnect_time))) {                        write_unlock_irqrestore(g_lock, flags);                        tx->tx_status = -EHOSTUNREACH;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -