⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 openiblnd_cb.c

📁 非常经典的一个分布式系统
💻 C
📖 第 1 页 / 共 5 页
字号:
        ibmsg = tx->tx_msg;        ibmsg->ibm_u.rdma.ibrm_hdr = lntmsg->msg_hdr;        ibmsg->ibm_u.rdma.ibrm_cookie = tx->tx_passive_rdma_cookie;        ibmsg->ibm_u.rdma.ibrm_desc.rd_key = tx->tx_md.md_rkey;        ibmsg->ibm_u.rdma.ibrm_desc.rd_addr = tx->tx_md.md_addr;        ibmsg->ibm_u.rdma.ibrm_desc.rd_nob = nob;        kibnal_init_tx_msg (tx, type, sizeof (kib_rdma_msg_t));        CDEBUG(D_NET, "Passive: %p cookie "LPX64", key %x, addr "               LPX64", nob %d\n",               tx, tx->tx_passive_rdma_cookie, tx->tx_md.md_rkey,               tx->tx_md.md_addr, nob);                /* lntmsg gets finalized when tx completes. */        tx->tx_lntmsg[0] = lntmsg;        kibnal_launch_tx(tx, nid);        return (0); failed:        tx->tx_status = rc;        kibnal_tx_done (tx);        return (-EIO);}voidkibnal_start_active_rdma (int type, int status,                          kib_rx_t *rx, lnet_msg_t *lntmsg,                           unsigned int niov,                          struct iovec *iov, lnet_kiov_t *kiov,                          int offset, int nob){        kib_msg_t    *rxmsg = rx->rx_msg;        kib_msg_t    *txmsg;        kib_tx_t     *tx;        int           access;        int           rdma_op;        int           rc;        CDEBUG(D_NET, "type %d, status %d, niov %d, offset %d, nob %d\n",               type, status, niov, offset, nob);        /* Called by scheduler */        LASSERT (!in_interrupt ());        /* Either all pages or all vaddrs */        LASSERT (!(kiov != NULL && iov != NULL));        /* No data if we're completing with failure */        LASSERT (status == 0 || nob == 0);        LASSERT (type == IBNAL_MSG_GET_DONE ||                 type == IBNAL_MSG_PUT_DONE);        if (type == IBNAL_MSG_GET_DONE) {                access   = 0;                rdma_op  = IB_OP_RDMA_WRITE;                LASSERT (rxmsg->ibm_type == IBNAL_MSG_GET_RDMA);        } else {                access   = IB_ACCESS_LOCAL_WRITE;                rdma_op  = IB_OP_RDMA_READ;                LASSERT (rxmsg->ibm_type == IBNAL_MSG_PUT_RDMA);        }        tx = kibnal_get_idle_tx ();        if (tx == NULL) {                CERROR ("tx descs exhausted on RDMA from %s"                        " completing locally with failure\n",                        libcfs_nid2str(rx->rx_conn->ibc_peer->ibp_nid));                lnet_finalize (kibnal_data.kib_ni, lntmsg, -ENOMEM);                return;        }        LASSERT (tx->tx_nsp == 0);                                if (nob != 0) {                /* We actually need to transfer some data (the transfer                 * size could get truncated to zero when the incoming                 * message is matched) */                if (kiov != NULL)                        rc = kibnal_map_kiov (tx, access,                                              niov, kiov, offset, nob);                else                        rc = kibnal_map_iov (tx, access,                                             niov, iov, offset, nob);                                if (rc != 0) {                        CERROR ("Can't map RDMA -> %s: %d\n",                                 libcfs_nid2str(rx->rx_conn->ibc_peer->ibp_nid),                                 rc);                        /* We'll skip the RDMA and complete with failure. */                        status = rc;                        nob = 0;                } else {                        tx->tx_gl[0] = (struct ib_gather_scatter) {                                .address = tx->tx_md.md_addr,                                .length  = nob,                                .key     = tx->tx_md.md_lkey,                        };                                        tx->tx_sp[0] = (struct ib_send_param) {                                .work_request_id      = kibnal_ptr2wreqid(tx, 0),                                .op                   = rdma_op,                                .gather_list          = &tx->tx_gl[0],                                .num_gather_entries   = 1,                                .remote_address       = rxmsg->ibm_u.rdma.ibrm_desc.rd_addr,                                .rkey                 = rxmsg->ibm_u.rdma.ibrm_desc.rd_key,                                .device_specific      = NULL,                                .solicited_event      = 0,                                .signaled             = 1,                                .immediate_data_valid = 0,                                .fence                = 0,                                .inline_data          = 0,                        };                        tx->tx_nsp = 1;                }        }        txmsg = tx->tx_msg;        txmsg->ibm_u.completion.ibcm_cookie = rxmsg->ibm_u.rdma.ibrm_cookie;        txmsg->ibm_u.completion.ibcm_status = status;                kibnal_init_tx_msg(tx, type, sizeof (kib_completion_msg_t));        if (status == 0 && nob != 0) {                LASSERT (tx->tx_nsp > 1);                /* RDMA: lntmsg gets finalized when the tx completes.  This                 * is after the completion message has been sent, which in                 * turn is after the RDMA has finished. */                tx->tx_lntmsg[0] = lntmsg;        } else {                LASSERT (tx->tx_nsp == 1);                /* No RDMA: local completion happens now! */                CDEBUG(D_NET, "No data: immediate completion\n");                lnet_finalize (kibnal_data.kib_ni, lntmsg,                              status == 0 ? 0 : -EIO);        }        kibnal_queue_tx(tx, rx->rx_conn);}intkibnal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg){        lnet_hdr_t       *hdr = &lntmsg->msg_hdr;         int               type = lntmsg->msg_type;         lnet_process_id_t target = lntmsg->msg_target;        int               target_is_router = lntmsg->msg_target_is_router;        int               routing = lntmsg->msg_routing;        unsigned int      payload_niov = lntmsg->msg_niov;         struct iovec     *payload_iov = lntmsg->msg_iov;         lnet_kiov_t      *payload_kiov = lntmsg->msg_kiov;        unsigned int      payload_offset = lntmsg->msg_offset;        unsigned int      payload_nob = lntmsg->msg_len;        kib_msg_t        *ibmsg;        kib_tx_t         *tx;        int               nob;        /* NB 'private' is different depending on what we're sending.... */        CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",               payload_nob, payload_niov, libcfs_id2str(target));        LASSERT (payload_nob == 0 || payload_niov > 0);        LASSERT (payload_niov <= LNET_MAX_IOV);        /* Thread context if we're sending payload */        LASSERT (!in_interrupt() || payload_niov == 0);        /* payload is either all vaddrs or all pages */        LASSERT (!(payload_kiov != NULL && payload_iov != NULL));        switch (type) {        default:                LBUG();                return (-EIO);                        case LNET_MSG_ACK:                LASSERT (payload_nob == 0);                break;        case LNET_MSG_GET:                if (routing || target_is_router)                        break;                  /* send IMMEDIATE */                /* is the REPLY message too small for RDMA? */                nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[lntmsg->msg_md->md_length]);                if (nob <= IBNAL_MSG_SIZE)                        break;                  /* send IMMEDIATE */                if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0)                        return kibnal_start_passive_rdma(IBNAL_MSG_GET_RDMA, lntmsg,                                                          lntmsg->msg_md->md_niov,                                                          lntmsg->msg_md->md_iov.iov, NULL,                                                         lntmsg->msg_md->md_length);                return kibnal_start_passive_rdma(IBNAL_MSG_GET_RDMA, lntmsg,                                                  lntmsg->msg_md->md_niov,                                                  NULL, lntmsg->msg_md->md_iov.kiov,                                                 lntmsg->msg_md->md_length);        case LNET_MSG_REPLY:        case LNET_MSG_PUT:                /* Is the payload small enough not to need RDMA? */                nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]);                if (nob <= IBNAL_MSG_SIZE)                        break;                  /* send IMMEDIATE */                                return kibnal_start_passive_rdma(IBNAL_MSG_PUT_RDMA, lntmsg,                                                 payload_niov,                                                 payload_iov, payload_kiov,                                                 payload_nob);        }        /* Send IMMEDIATE */        tx = kibnal_get_idle_tx();        if (tx == NULL) {                CERROR ("Can't send %d to %s: tx descs exhausted%s\n",                         type, libcfs_nid2str(target.nid),                         in_interrupt() ? " (intr)" : "");                return (-ENOMEM);        }        ibmsg = tx->tx_msg;        ibmsg->ibm_u.immediate.ibim_hdr = *hdr;        if (payload_kiov != NULL)                lnet_copy_kiov2flat(IBNAL_MSG_SIZE, ibmsg,                                    offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),                                    payload_niov, payload_kiov,                                     payload_offset, payload_nob);        else                lnet_copy_iov2flat(IBNAL_MSG_SIZE, ibmsg,                                   offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),                                   payload_niov, payload_iov,                                    payload_offset, payload_nob);        kibnal_init_tx_msg (tx, IBNAL_MSG_IMMEDIATE,                            offsetof(kib_immediate_msg_t,                                      ibim_payload[payload_nob]));        /* lntmsg gets finalized when tx completes */        tx->tx_lntmsg[0] = lntmsg;        kibnal_launch_tx(tx, target.nid);        return (0);}intkibnal_eager_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,                   void **new_private){        kib_rx_t    *rx = private;        kib_conn_t  *conn = rx->rx_conn;        if (conn->ibc_version == IBNAL_MSG_VERSION_RDMAREPLYNOTRSRVD) {                /* Can't block if RDMA completions need normal credits */                LCONSOLE_ERROR_MSG(0x12a,                                "Dropping message from %s: no buffers free. "                               "%s is running an old version of LNET that may "                               "deadlock if messages wait for buffers)\n",                               libcfs_nid2str(conn->ibc_peer->ibp_nid),                               libcfs_nid2str(conn->ibc_peer->ibp_nid));                return -EDEADLK;        }                *new_private = private;        return 0;}intkibnal_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,             int delayed, unsigned int niov,             struct iovec *iov, lnet_kiov_t *kiov,             unsigned int offset, unsigned int mlen, unsigned int rlen){        kib_rx_t    *rx = private;        kib_msg_t   *rxmsg = rx->rx_msg;        int          msg_nob;        int          rc = 0;                LASSERT (mlen <= rlen);        LASSERT (!in_interrupt ());        /* Either all pages or all vaddrs */        LASSERT (!(kiov != NULL && iov != NULL));        switch (rxmsg->ibm_type) {        default:                LBUG();        case IBNAL_MSG_IMMEDIATE:                msg_nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[rlen]);                if (msg_nob > rx->rx_nob) {                        CERROR ("Immediate message from %s too big: %d(%d)\n",                                libcfs_nid2str(rxmsg->ibm_u.immediate.ibim_hdr.src_nid),                                msg_nob, rx->rx_nob);                        rc = -EPROTO;                        break;                }                if (kiov != NULL)                        lnet_copy_flat2kiov(                                niov, kiov, offset,                                 IBNAL_MSG_SIZE, rxmsg,                                offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),                                mlen);                else                        lnet_copy_flat2iov(                                niov, iov, offset,                                IBNAL_MSG_SIZE, rxmsg,                                offsetof(kib_msg_t, ibm_u.immediate.ibim_payload),                                mlen);                lnet_finalize (ni, lntmsg, 0);                break;        case IBNAL_MSG_GET_RDMA:                if (lntmsg != NULL) {                        /* GET matched: RDMA lntmsg's payload */                        kibnal_start_active_rdma(IBNAL_MSG_GET_DONE, 0,                                                 rx, lntmsg,                                                  lntmsg->msg_niov,                                                  lntmsg->msg_iov,                                                  lntmsg->msg_kiov,                                                 lntmsg->msg_offset,                                                  lntmsg->msg_len);                } else {                        /* GET didn't match anything */                        kibnal_start_active_rdma (IBNAL_MSG_GET_DONE, -ENODATA,                                                  rx, NULL, 0, NULL, NULL, 0, 0);                }                break;        case IBNAL_MSG_PUT_RDMA:                kibnal_start_active_rdma (IBNAL_MSG_PUT_DONE, 0, rx, lntmsg,                                          niov, iov, kiov, offset, mlen);                break;        }        kibnal_post_rx(rx, 1, 0);        return rc;}intkibnal_thread_start (int (*fn)(void *arg), void *arg){        long    pid = kernel_thread (fn, arg, 0);        if (pid < 0)                return ((int)pid);        atomic_inc (&kibnal_data.kib_nthreads);        return (0);}voidkibnal_thread_fini (void){        atomic_dec (&kibnal_data.kib_nthreads);}void

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -