⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 iiblnd_cb.c

📁 非常经典的一个分布式系统
💻 C
📖 第 1 页 / 共 5 页
字号:
                        write_unlock_irqrestore(g_lock, flags);                        tx->tx_status = -EHOSTUNREACH;                        tx->tx_waiting = 0;                        kibnal_tx_done (tx);                        return;                }                kibnal_schedule_active_connect_locked(peer, IBNAL_MSG_VERSION);        }                /* A connection is being established; queue the message... */        list_add_tail (&tx->tx_list, &peer->ibp_tx_queue);        write_unlock_irqrestore(g_lock, flags);}voidkibnal_txlist_done (struct list_head *txlist, int status){        kib_tx_t *tx;        while (!list_empty (txlist)) {                tx = list_entry (txlist->next, kib_tx_t, tx_list);                list_del (&tx->tx_list);                /* complete now */                tx->tx_waiting = 0;                tx->tx_status = status;                kibnal_tx_done (tx);        }}intkibnal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg){        lnet_hdr_t       *hdr = &lntmsg->msg_hdr;         int               type = lntmsg->msg_type;         lnet_process_id_t target = lntmsg->msg_target;        int               target_is_router = lntmsg->msg_target_is_router;        int               routing = lntmsg->msg_routing;        unsigned int      payload_niov = lntmsg->msg_niov;         struct iovec     *payload_iov = lntmsg->msg_iov;         lnet_kiov_t      *payload_kiov = lntmsg->msg_kiov;        unsigned int      payload_offset = lntmsg->msg_offset;        unsigned int      payload_nob = lntmsg->msg_len;        kib_msg_t        *ibmsg;        kib_tx_t         *tx;        int               nob;        int               rc;        /* NB 'private' is different depending on what we're sending.... */        CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",               payload_nob, payload_niov, libcfs_id2str(target));        LASSERT (payload_nob == 0 || payload_niov > 0);        LASSERT (payload_niov <= LNET_MAX_IOV);        /* Thread context */        LASSERT (!in_interrupt());        /* payload is either all vaddrs or all pages */        LASSERT (!(payload_kiov != NULL && payload_iov != NULL));        switch (type) {        default:                LBUG();                return (-EIO);                        case LNET_MSG_ACK:                LASSERT (payload_nob == 0);                break;        case LNET_MSG_GET:                if (routing || target_is_router)                        break;                  /* send IMMEDIATE */                                /* is the REPLY message too small for RDMA? */                nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[lntmsg->msg_md->md_length]);                if (nob <= IBNAL_MSG_SIZE)                        break;                  /* send IMMEDIATE */                tx = kibnal_get_idle_tx();                if (tx == NULL) {                        CERROR("Can allocate txd for GET to %s: \n",                               libcfs_nid2str(target.nid));                        return -ENOMEM;                }                                ibmsg = tx->tx_msg;                ibmsg->ibm_u.get.ibgm_hdr = *hdr;                ibmsg->ibm_u.get.ibgm_cookie = tx->tx_cookie;                if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0)                        rc = kibnal_setup_rd_iov(tx, &ibmsg->ibm_u.get.ibgm_rd,                                                 0,                                                 lntmsg->msg_md->md_niov,                                                 lntmsg->msg_md->md_iov.iov,                                                 0, lntmsg->msg_md->md_length);                else                        rc = kibnal_setup_rd_kiov(tx, &ibmsg->ibm_u.get.ibgm_rd,                                                  0,                                                  lntmsg->msg_md->md_niov,                                                  lntmsg->msg_md->md_iov.kiov,                                                  0, lntmsg->msg_md->md_length);                if (rc != 0) {                        CERROR("Can't setup GET sink for %s: %d\n",                               libcfs_nid2str(target.nid), rc);                        kibnal_tx_done(tx);                        return -EIO;                }#if IBNAL_USE_FMR                nob = sizeof(kib_get_msg_t);#else                {                        int n = ibmsg->ibm_u.get.ibgm_rd.rd_nfrag;                                                nob = offsetof(kib_get_msg_t, ibgm_rd.rd_frags[n]);                }#endif                kibnal_init_tx_msg(tx, IBNAL_MSG_GET_REQ, nob);                tx->tx_lntmsg[1] = lnet_create_reply_msg(kibnal_data.kib_ni,                                                         lntmsg);                if (tx->tx_lntmsg[1] == NULL) {                        CERROR("Can't create reply for GET -> %s\n",                               libcfs_nid2str(target.nid));                        kibnal_tx_done(tx);                        return -EIO;                }                tx->tx_lntmsg[0] = lntmsg;      /* finalise lntmsg[0,1] on completion */                tx->tx_waiting = 1;             /* waiting for GET_DONE */                kibnal_launch_tx(tx, target.nid);                return 0;        case LNET_MSG_REPLY:         case LNET_MSG_PUT:                /* Is the payload small enough not to need RDMA? */                nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]);                if (nob <= IBNAL_MSG_SIZE)                        break;                  /* send IMMEDIATE */                tx = kibnal_get_idle_tx();                if (tx == NULL) {                        CERROR("Can't allocate %s txd for %s\n",                               type == LNET_MSG_PUT ? "PUT" : "REPLY",                               libcfs_nid2str(target.nid));                        return -ENOMEM;                }                if (payload_kiov == NULL)                        rc = kibnal_setup_rd_iov(tx, tx->tx_rd, 1,                                                 payload_niov, payload_iov,                                                 payload_offset, payload_nob);                else                        rc = kibnal_setup_rd_kiov(tx, tx->tx_rd, 1,                                                  payload_niov, payload_kiov,                                                  payload_offset, payload_nob);                if (rc != 0) {                        CERROR("Can't setup PUT src for %s: %d\n",                               libcfs_nid2str(target.nid), rc);                        kibnal_tx_done(tx);                        return -EIO;                }                ibmsg = tx->tx_msg;                ibmsg->ibm_u.putreq.ibprm_hdr = *hdr;                ibmsg->ibm_u.putreq.ibprm_cookie = tx->tx_cookie;                kibnal_init_tx_msg(tx, IBNAL_MSG_PUT_REQ, sizeof(kib_putreq_msg_t));                tx->tx_lntmsg[0] = lntmsg;      /* finalise lntmsg on completion */                tx->tx_waiting = 1;             /* waiting for PUT_{ACK,NAK} */                kibnal_launch_tx(tx, target.nid);                return 0;        }        /* send IMMEDIATE */        LASSERT (offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob])                 <= IBNAL_MSG_SIZE);        tx = kibnal_get_idle_tx();        if (tx == NULL) {                CERROR ("Can't send %d to %s: tx descs exhausted\n",                        type, libcfs_nid2str(target.nid));                return -ENOMEM;        }        ibmsg = tx->tx_msg;        ibmsg->ibm_u.immediate.ibim_hdr = *hdr;        if (payload_kiov != NULL)                lnet_copy_kiov2flat(IBNAL_MSG_SIZE, ibmsg,                                    offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),                                    payload_niov, payload_kiov,                                    payload_offset, payload_nob);        else                lnet_copy_iov2flat(IBNAL_MSG_SIZE, ibmsg,                                   offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),                                   payload_niov, payload_iov,                                   payload_offset, payload_nob);        nob = offsetof(kib_immediate_msg_t, ibim_payload[payload_nob]);        kibnal_init_tx_msg (tx, IBNAL_MSG_IMMEDIATE, nob);        tx->tx_lntmsg[0] = lntmsg;              /* finalise lntmsg on completion */        kibnal_launch_tx(tx, target.nid);        return 0;}voidkibnal_reply(lnet_ni_t *ni, kib_rx_t *rx, lnet_msg_t *lntmsg){        lnet_process_id_t target = lntmsg->msg_target;        unsigned int      niov = lntmsg->msg_niov;         struct iovec     *iov = lntmsg->msg_iov;         lnet_kiov_t      *kiov = lntmsg->msg_kiov;        unsigned int      offset = lntmsg->msg_offset;        unsigned int      nob = lntmsg->msg_len;        kib_tx_t         *tx;        int               rc;                tx = kibnal_get_idle_tx();        if (tx == NULL) {                CERROR("Can't get tx for REPLY to %s\n",                       libcfs_nid2str(target.nid));                goto failed_0;        }        if (nob == 0)                rc = 0;        else if (kiov == NULL)                rc = kibnal_setup_rd_iov(tx, tx->tx_rd, 1,                                          niov, iov, offset, nob);        else                rc = kibnal_setup_rd_kiov(tx, tx->tx_rd, 1,                                           niov, kiov, offset, nob);        if (rc != 0) {                CERROR("Can't setup GET src for %s: %d\n",                       libcfs_nid2str(target.nid), rc);                goto failed_1;        }                rc = kibnal_init_rdma(tx, IBNAL_MSG_GET_DONE, nob,                              &rx->rx_msg->ibm_u.get.ibgm_rd,                              rx->rx_msg->ibm_u.get.ibgm_cookie);        if (rc < 0) {                CERROR("Can't setup rdma for GET from %s: %d\n",                        libcfs_nid2str(target.nid), rc);                goto failed_1;        }                if (rc == 0) {                /* No RDMA: local completion may happen now! */                lnet_finalize(ni, lntmsg, 0);        } else {                /* RDMA: lnet_finalize(lntmsg) when it                 * completes */                tx->tx_lntmsg[0] = lntmsg;        }                kibnal_queue_tx(tx, rx->rx_conn);        return;         failed_1:        kibnal_tx_done(tx); failed_0:        lnet_finalize(ni, lntmsg, -EIO);}intkibnal_eager_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,                   void **new_private){        kib_rx_t    *rx = private;        kib_conn_t  *conn = rx->rx_conn;        if (conn->ibc_version == IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD) {                /* Can't block if RDMA completions need normal credits */                LCONSOLE_ERROR_MSG(0x12d,  "Dropping message from %s: no "                                   "buffers free. %s is running an old version"                                   " of LNET that may deadlock if messages "                                   "wait for buffers)\n",                                   libcfs_nid2str(conn->ibc_peer->ibp_nid),                                   libcfs_nid2str(conn->ibc_peer->ibp_nid));                return -EDEADLK;        }                *new_private = private;        return 0;}intkibnal_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,             unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,             unsigned int offset, unsigned int mlen, unsigned int rlen){        kib_rx_t    *rx = private;        kib_msg_t   *rxmsg = rx->rx_msg;        kib_conn_t  *conn = rx->rx_conn;        kib_tx_t    *tx;        kib_msg_t   *txmsg;        int          nob;        int          post_cred = 1;        int          rc = 0;                LASSERT (mlen <= rlen);        LASSERT (!in_interrupt());        /* Either all pages or all vaddrs */        LASSERT (!(kiov != NULL && iov != NULL));        switch (rxmsg->ibm_type) {        default:                LBUG();                        case IBNAL_MSG_IMMEDIATE:                nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[rlen]);                if (nob > rx->rx_nob) {                        CERROR ("Immediate message from %s too big: %d(%d)\n",                                libcfs_nid2str(rxmsg->ibm_u.immediate.ibim_hdr.src_nid),                                nob, rx->rx_nob);                        rc = -EPROTO;                        break;                }                if (kiov != NULL)                        lnet_copy_flat2kiov(niov, kiov, offset,                                            IBNAL_MSG_SIZE, rxmsg,                                            offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),                                            mlen);                else                        lnet_copy_flat2iov(niov, iov, offset,                                           IBNAL_MSG_SIZE, rxmsg,                                           offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),                                           mlen);                lnet_finalize (ni, lntmsg, 0);                break;        case IBNAL_MSG_PUT_REQ:                if (mlen == 0) {                        lnet_finalize(ni, lntmsg, 0);                        kibnal_send_completion(rx->rx_conn, IBNAL_MSG_PUT_NAK, 0,                                               rxmsg->ibm_u.putreq.ibprm_cookie);                        break;                }                                tx = kibnal_get_idle_tx();                if (tx == NULL) {                        CERROR("Can't allocate tx for %s\n",                               libcfs_nid2str(conn->ibc_peer->ibp_nid));                        /* Not replying will break the connection */                        rc = -ENOMEM;                        break;  

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -