⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 o2iblnd_cb.c

📁 非常经典的一个分布式系统
💻 C
📖 第 1 页 / 共 5 页
字号:
        kiblnd_conn_addref(conn);               /* 1 ref for me.... */        spin_unlock(&conn->ibc_lock);        if (idle)                kiblnd_tx_done(conn->ibc_peer->ibp_ni, tx);        kiblnd_check_sends(conn);        kiblnd_conn_decref(conn);               /* ...until here */}voidkiblnd_init_tx_msg (lnet_ni_t *ni, kib_tx_t *tx, int type, int body_nob){        kib_net_t         *net = ni->ni_data;        struct ib_sge     *sge = &tx->tx_sge[tx->tx_nwrq];        struct ib_send_wr *wrq = &tx->tx_wrq[tx->tx_nwrq];        int                nob = offsetof (kib_msg_t, ibm_u) + body_nob;        LASSERT (net != NULL);        LASSERT (tx->tx_nwrq >= 0);        LASSERT (tx->tx_nwrq < IBLND_MAX_RDMA_FRAGS + 1);        LASSERT (nob <= IBLND_MSG_SIZE);        kiblnd_init_msg(tx->tx_msg, type, body_nob);        sge->addr = tx->tx_msgaddr;        sge->lkey = net->ibn_dev->ibd_mr->lkey;        sge->length = nob;        memset(wrq, 0, sizeof(*wrq));        wrq->next       = NULL;        wrq->wr_id      = kiblnd_ptr2wreqid(tx, IBLND_WID_TX);        wrq->sg_list    = sge;        wrq->num_sge    = 1;        wrq->opcode     = IB_WR_SEND;        wrq->send_flags = IB_SEND_SIGNALED;        tx->tx_nwrq++;}intkiblnd_init_rdma (lnet_ni_t *ni, kib_tx_t *tx, int type,                  int nob, kib_rdma_desc_t *dstrd, __u64 dstcookie){        kib_msg_t         *ibmsg = tx->tx_msg;        kib_rdma_desc_t   *srcrd = tx->tx_rd;        struct ib_sge     *sge = &tx->tx_sge[0];        struct ib_send_wr *wrq = &tx->tx_wrq[0];        int                rc = nob;#if IBLND_MAP_ON_DEMAND        LASSERT (!in_interrupt());        LASSERT (tx->tx_nwrq == 0);        LASSERT (type == IBLND_MSG_GET_DONE ||                 type == IBLND_MSG_PUT_DONE);        sge->addr = srcrd->rd_addr;        sge->lkey = srcrd->rd_key;        sge->length = nob;        wrq = &tx->tx_wrq[0];        wrq->next       = &tx->tx_wrq[1];        wrq->wr_id      = kiblnd_ptr2wreqid(tx, IBLND_WID_RDMA);        wrq->sg_list    = sge;        wrq->num_sge    = 1;        wrq->opcode     = IB_WR_RDMA_WRITE;        wrq->send_flags = 0;        wrq->wr.rdma.remote_addr = dstrd->rd_addr;        wrq->wr.rdma.rkey        = dstrd->rd_key;        tx->tx_nwrq = 1;#else        /* CAVEAT EMPTOR: this 'consumes' the frags in 'dstrd' */        int              resid = nob;        kib_rdma_frag_t *srcfrag;        int              srcidx;        kib_rdma_frag_t *dstfrag;        int              dstidx;        int              wrknob;        LASSERT (!in_interrupt());        LASSERT (tx->tx_nwrq == 0);        LASSERT (type == IBLND_MSG_GET_DONE ||                 type == IBLND_MSG_PUT_DONE);        srcidx = dstidx = 0;        srcfrag = &srcrd->rd_frags[0];        dstfrag = &dstrd->rd_frags[0];        while (resid > 0) {                if (srcidx >= srcrd->rd_nfrags) {                        CERROR("Src buffer exhausted: %d frags\n", srcidx);                        rc = -EPROTO;                        break;                }                                if (dstidx == dstrd->rd_nfrags) {                        CERROR("Dst buffer exhausted: %d frags\n", dstidx);                        rc = -EPROTO;                        break;                }                if (tx->tx_nwrq == IBLND_MAX_RDMA_FRAGS) {                        CERROR("RDMA too fragmented: %d/%d src %d/%d dst frags\n",                               srcidx, srcrd->rd_nfrags,                               dstidx, dstrd->rd_nfrags);                        rc = -EMSGSIZE;                        break;                }                wrknob = MIN(MIN(srcfrag->rf_nob, dstfrag->rf_nob), resid);                sge = &tx->tx_sge[tx->tx_nwrq];                sge->addr   = srcfrag->rf_addr;                sge->length = wrknob;                sge->lkey   = srcrd->rd_key;                wrq = &tx->tx_wrq[tx->tx_nwrq];                wrq->next       = wrq + 1;                wrq->wr_id      = kiblnd_ptr2wreqid(tx, IBLND_WID_RDMA);                wrq->sg_list    = sge;                wrq->num_sge    = 1;                wrq->opcode     = IB_WR_RDMA_WRITE;                wrq->send_flags = 0;                wrq->wr.rdma.remote_addr = dstfrag->rf_addr;                wrq->wr.rdma.rkey        = dstrd->rd_key;                wrq++;                sge++;                resid -= wrknob;                if (wrknob < srcfrag->rf_nob) {                        srcfrag->rf_nob  -= wrknob;                        srcfrag->rf_addr += wrknob;                } else {                        srcfrag++;                        srcidx++;                }                                if (wrknob < dstfrag->rf_nob) {                        dstfrag->rf_nob  -= wrknob;                        dstfrag->rf_addr += wrknob;                } else {                        dstfrag++;                        dstidx++;                }                                tx->tx_nwrq++;        }        if (rc < 0)                             /* no RDMA if completing with failure */                tx->tx_nwrq = 0;#endif        ibmsg->ibm_u.completion.ibcm_status = rc;        ibmsg->ibm_u.completion.ibcm_cookie = dstcookie;        kiblnd_init_tx_msg(ni, tx, type, sizeof (kib_completion_msg_t));        return rc;}voidkiblnd_queue_tx_locked (kib_tx_t *tx, kib_conn_t *conn){        struct list_head   *q;        LASSERT (tx->tx_nwrq > 0);              /* work items set up */        LASSERT (!tx->tx_queued);               /* not queued for sending already */        tx->tx_queued = 1;        tx->tx_deadline = jiffies + (*kiblnd_tunables.kib_timeout * HZ);        if (tx->tx_conn == NULL) {                kiblnd_conn_addref(conn);                tx->tx_conn = conn;                LASSERT (tx->tx_msg->ibm_type != IBLND_MSG_PUT_DONE);        } else {                /* PUT_DONE first attached to conn as a PUT_REQ */                LASSERT (tx->tx_conn == conn);                LASSERT (tx->tx_msg->ibm_type == IBLND_MSG_PUT_DONE);        }        switch (tx->tx_msg->ibm_type) {        default:                LBUG();        case IBLND_MSG_PUT_REQ:        case IBLND_MSG_GET_REQ:                q = &conn->ibc_tx_queue_rsrvd;                break;        case IBLND_MSG_PUT_NAK:        case IBLND_MSG_PUT_ACK:        case IBLND_MSG_PUT_DONE:        case IBLND_MSG_GET_DONE:                q = &conn->ibc_tx_queue_nocred;                break;        case IBLND_MSG_NOOP:        case IBLND_MSG_IMMEDIATE:                q = &conn->ibc_tx_queue;                break;        }        list_add_tail(&tx->tx_list, q);}voidkiblnd_queue_tx (kib_tx_t *tx, kib_conn_t *conn){        spin_lock(&conn->ibc_lock);        kiblnd_queue_tx_locked(tx, conn);        spin_unlock(&conn->ibc_lock);        kiblnd_check_sends(conn);}voidkiblnd_connect_peer (kib_peer_t *peer){        struct rdma_cm_id *cmid;        struct sockaddr_in sockaddr;        int                rc;        LASSERT (peer->ibp_connecting > 0);        cmid = rdma_create_id(kiblnd_cm_callback, peer, RDMA_PS_TCP);        if (IS_ERR(cmid)) {                CERROR("Can't create CMID for %s: %ld\n",                       libcfs_nid2str(peer->ibp_nid), PTR_ERR(cmid));                rc = PTR_ERR(cmid);                goto failed;        }        memset(&sockaddr, 0, sizeof(sockaddr));        sockaddr.sin_family = AF_INET;        sockaddr.sin_port = htons(*kiblnd_tunables.kib_service);        sockaddr.sin_addr.s_addr = htonl(LNET_NIDADDR(peer->ibp_nid));        kiblnd_peer_addref(peer);               /* cmid's ref */        rc = rdma_resolve_addr(cmid, NULL, (struct sockaddr *)&sockaddr,                               *kiblnd_tunables.kib_timeout * 1000);        if (rc == 0)                return;        /* Can't initiate address resolution:  */        CERROR("Can't resolve addr for %s: %d\n",               libcfs_nid2str(peer->ibp_nid), rc);        kiblnd_peer_decref(peer);               /* cmid's ref */        rdma_destroy_id(cmid); failed:        kiblnd_peer_connect_failed(peer, 1, rc);}voidkiblnd_launch_tx (lnet_ni_t *ni, kib_tx_t *tx, lnet_nid_t nid){        kib_peer_t        *peer;        kib_peer_t        *peer2;        kib_conn_t        *conn;        rwlock_t          *g_lock = &kiblnd_data.kib_global_lock;        unsigned long      flags;        int                rc;        /* If I get here, I've committed to send, so I complete the tx with         * failure on any problems */        LASSERT (tx->tx_conn == NULL);          /* only set when assigned a conn */        LASSERT (tx->tx_nwrq > 0);              /* work items have been set up */        /* First time, just use a read lock since I expect to find my peer         * connected */        read_lock_irqsave(g_lock, flags);        peer = kiblnd_find_peer_locked(nid);        if (peer != NULL && !list_empty(&peer->ibp_conns)) {                /* Found a peer with an established connection */                conn = kiblnd_get_conn_locked(peer);                kiblnd_conn_addref(conn); /* 1 ref for me... */                read_unlock_irqrestore(g_lock, flags);                kiblnd_queue_tx(tx, conn);                kiblnd_conn_decref(conn); /* ...to here */                return;        }        read_unlock(g_lock);        /* Re-try with a write lock */        write_lock(g_lock);        peer = kiblnd_find_peer_locked(nid);        if (peer != NULL) {                if (list_empty(&peer->ibp_conns)) {                        /* found a peer, but it's still connecting... */                        LASSERT (peer->ibp_connecting != 0 ||                                 peer->ibp_accepting != 0);                        list_add_tail (&tx->tx_list, &peer->ibp_tx_queue);                        write_unlock_irqrestore(g_lock, flags);                } else {                        conn = kiblnd_get_conn_locked(peer);                        kiblnd_conn_addref(conn); /* 1 ref for me... */                                                write_unlock_irqrestore(g_lock, flags);                                                kiblnd_queue_tx(tx, conn);                        kiblnd_conn_decref(conn); /* ...to here */                }                return;        }        write_unlock_irqrestore(g_lock, flags);        /* Allocate a peer ready to add to the peer table and retry */        rc = kiblnd_create_peer(ni, &peer, nid);        if (rc != 0) {                CERROR("Can't create peer %s\n", libcfs_nid2str(nid));                tx->tx_status = -EHOSTUNREACH;                tx->tx_waiting = 0;                kiblnd_tx_done(ni, tx);                return;        }        write_lock_irqsave(g_lock, flags);        peer2 = kiblnd_find_peer_locked(nid);        if (peer2 != NULL) {                if (list_empty(&peer2->ibp_conns)) {                        /* found a peer, but it's still connecting... */                        LASSERT (peer2->ibp_connecting != 0 ||                                 peer2->ibp_accepting != 0);                        list_add_tail (&tx->tx_list, &peer2->ibp_tx_queue);                        write_unlock_irqrestore(g_lock, flags);                } else {                        conn = kiblnd_get_conn_locked(peer2);                        kiblnd_conn_addref(conn); /* 1 ref for me... */                        write_unlock_irqrestore(g_lock, flags);                                                kiblnd_queue_tx(tx, conn);                        kiblnd_conn_decref(conn); /* ...to here */                }                kiblnd_peer_decref(peer);                return;        }        /* Brand new peer */        LASSERT (peer->ibp_connecting == 0);        peer->ibp_connecting = 1;        /* always called with a ref on ni, which prevents ni being shutdown */        LASSERT (((kib_net_t *)ni->ni_data)->ibn_shutdown == 0);        list_add_tail(&tx->tx_list, &peer->ibp_tx_queue);        kiblnd_peer_addref(peer);        list_add_tail(&peer->ibp_list, kiblnd_nid2peerlist(nid));

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -