⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mxlnd_cb.c

📁 非常经典的一个分布式系统
💻 C
📖 第 1 页 / 共 5 页
字号:
                sum = old_sum + kiov[i].kiov_len;                if (i == 0) sum -= kiov[i].kiov_offset;                if (!first_found && (sum > offset)) {                        first_kiov = i;                        first_kiov_offset = offset - old_sum;                        //if (i == 0) first_kiov_offset + kiov[i].kiov_offset;                        if (i == 0) first_kiov_offset = kiov[i].kiov_offset;                        first_found = 1;                        sum = kiov[i].kiov_len - first_kiov_offset;                        old_sum = 0;                }                if (sum >= nob) {                        last_kiov = i;                        last_kiov_length = kiov[i].kiov_len - (sum - nob);                        if (first_kiov == last_kiov) last_kiov_length -= first_kiov_offset;                        break;                }                old_sum = sum;        }        LASSERT(first_kiov >= 0 && last_kiov >= first_kiov);        nseg = last_kiov - first_kiov + 1;        LASSERT(nseg > 0);                MXLND_ALLOC (seg, nseg * sizeof(*seg));        if (seg == NULL) {                CDEBUG(D_NETERROR, "MXLND_ALLOC() failed\n");                return -1;        }        memset(seg, 0, niov * sizeof(*seg));        ctx->mxc_nseg = niov;        sum = 0;        for (i = 0; i < niov; i++) {                seg[i].segment_ptr = lnet_page2phys(kiov[first_kiov + i].kiov_page);                seg[i].segment_length = kiov[first_kiov + i].kiov_len;                if (i == 0) {                        seg[i].segment_ptr += (u64) first_kiov_offset;                        /* we have to add back the original kiov_offset */                        seg[i].segment_length -= first_kiov_offset +                                                 kiov[first_kiov].kiov_offset;                }                if (i == (nseg - 1)) {                        seg[i].segment_length = last_kiov_length;                }                sum += seg[i].segment_length;        }        ctx->mxc_seg_list = seg;        ctx->mxc_pin_type = MX_PIN_PHYSICAL;#ifdef MX_PIN_FULLPAGES        ctx->mxc_pin_type |= MX_PIN_FULLPAGES;#endif        LASSERT(nob == sum);        return 0;}voidmxlnd_send_nak(struct kmx_ctx *tx, lnet_nid_t nid, int type, int status, __u64 cookie){        LASSERT(type == MXLND_MSG_PUT_ACK);        mxlnd_init_tx_msg(tx, type, sizeof(kmx_putack_msg_t), tx->mxc_nid);        tx->mxc_cookie = cookie;        tx->mxc_msg->mxm_u.put_ack.mxpam_src_cookie = cookie;        tx->mxc_msg->mxm_u.put_ack.mxpam_dst_cookie = ((u64) status << 52); /* error code */        tx->mxc_match = mxlnd_create_match(tx, status);        mxlnd_queue_tx(tx);}/** * mxlnd_send_data - get tx, map [k]iov, queue tx * @ni * @lntmsg * @peer * @msg_type * @cookie * * This setups the DATA send for PUT or GET. * * On success, it queues the tx, on failure it calls lnet_finalize() */voidmxlnd_send_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, struct kmx_peer *peer, u8 msg_type, u64 cookie){        int                     ret             = 0;        lnet_process_id_t       target          = lntmsg->msg_target;        unsigned int            niov            = lntmsg->msg_niov;        struct iovec           *iov             = lntmsg->msg_iov;        lnet_kiov_t            *kiov            = lntmsg->msg_kiov;        unsigned int            offset          = lntmsg->msg_offset;        unsigned int            nob             = lntmsg->msg_len;        struct kmx_ctx         *tx              = NULL;        LASSERT(lntmsg != NULL);        LASSERT(peer != NULL);        LASSERT(msg_type == MXLND_MSG_PUT_DATA || msg_type == MXLND_MSG_GET_DATA);        LASSERT((cookie>>52) == 0);        tx = mxlnd_get_idle_tx();        if (tx == NULL) {                CDEBUG(D_NETERROR, "Can't allocate %s tx for %s\n",                        msg_type == MXLND_MSG_PUT_DATA ? "PUT_DATA" : "GET_DATA",                        libcfs_nid2str(target.nid));                goto failed_0;        }        tx->mxc_nid = target.nid;        mxlnd_conn_addref(peer->mxp_conn);        tx->mxc_peer = peer;        tx->mxc_conn = peer->mxp_conn;        tx->mxc_msg_type = msg_type;        tx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;        tx->mxc_state = MXLND_CTX_PENDING;        tx->mxc_lntmsg[0] = lntmsg;        tx->mxc_cookie = cookie;        tx->mxc_match = mxlnd_create_match(tx, 0);        /* This setups up the mx_ksegment_t to send the DATA payload  */        if (nob == 0) {                /* do not setup the segments */                CDEBUG(D_NETERROR, "nob = 0; why didn't we use an EAGER reply "                                   "to %s?\n", libcfs_nid2str(target.nid));                ret = 0;        } else if (kiov == NULL) {                ret = mxlnd_setup_iov(tx, niov, iov, offset, nob);        } else {                ret = mxlnd_setup_kiov(tx, niov, kiov, offset, nob);        }        if (ret != 0) {                CDEBUG(D_NETERROR, "Can't setup send DATA for %s\n",                                    libcfs_nid2str(target.nid));                tx->mxc_status.code = -EIO;                goto failed_1;        }        mxlnd_queue_tx(tx);        return;failed_1:        mxlnd_conn_decref(peer->mxp_conn);        mxlnd_put_idle_tx(tx);        return;failed_0:        CDEBUG(D_NETERROR, "no tx avail\n");        lnet_finalize(ni, lntmsg, -EIO);        return;}/** * mxlnd_recv_data - map [k]iov, post rx * @ni * @lntmsg * @rx * @msg_type * @cookie * * This setups the DATA receive for PUT or GET. * * On success, it returns 0, on failure it returns -1 */intmxlnd_recv_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, struct kmx_ctx *rx, u8 msg_type, u64 cookie){        int                     ret             = 0;        lnet_process_id_t       target          = lntmsg->msg_target;        unsigned int            niov            = lntmsg->msg_niov;        struct iovec           *iov             = lntmsg->msg_iov;        lnet_kiov_t            *kiov            = lntmsg->msg_kiov;        unsigned int            offset          = lntmsg->msg_offset;        unsigned int            nob             = lntmsg->msg_len;        mx_return_t             mxret           = MX_SUCCESS;        /* above assumes MXLND_MSG_PUT_DATA */        if (msg_type == MXLND_MSG_GET_DATA) {                niov = lntmsg->msg_md->md_niov;                iov = lntmsg->msg_md->md_iov.iov;                kiov = lntmsg->msg_md->md_iov.kiov;                offset = 0;                nob = lntmsg->msg_md->md_length;        }        LASSERT(lntmsg != NULL);        LASSERT(rx != NULL);        LASSERT(msg_type == MXLND_MSG_PUT_DATA || msg_type == MXLND_MSG_GET_DATA);        LASSERT((cookie>>52) == 0); /* ensure top 12 bits are 0 */        rx->mxc_msg_type = msg_type;        rx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;        rx->mxc_state = MXLND_CTX_PENDING;        rx->mxc_nid = target.nid;        /* if posting a GET_DATA, we may not yet know the peer */        if (rx->mxc_peer != NULL) {                rx->mxc_conn = rx->mxc_peer->mxp_conn;        }        rx->mxc_lntmsg[0] = lntmsg;        rx->mxc_cookie = cookie;        rx->mxc_match = mxlnd_create_match(rx, 0);        /* This setups up the mx_ksegment_t to receive the DATA payload  */        if (kiov == NULL) {                ret = mxlnd_setup_iov(rx, niov, iov, offset, nob);        } else {                ret = mxlnd_setup_kiov(rx, niov, kiov, offset, nob);        }        if (msg_type == MXLND_MSG_GET_DATA) {                rx->mxc_lntmsg[1] = lnet_create_reply_msg(kmxlnd_data.kmx_ni, lntmsg);                if (rx->mxc_lntmsg[1] == NULL) {                        CDEBUG(D_NETERROR, "Can't create reply for GET -> %s\n",                                           libcfs_nid2str(target.nid));                        ret = -1;                }        }        if (ret != 0) {                CDEBUG(D_NETERROR, "Can't setup %s rx for %s\n",                       msg_type == MXLND_MSG_PUT_DATA ? "PUT_DATA" : "GET_DATA",                       libcfs_nid2str(target.nid));                return -1;        }        ret = mxlnd_q_pending_ctx(rx);        if (ret == -1) {                return -1;        }        CDEBUG(D_NET, "receiving %s 0x%llx\n", mxlnd_msgtype_to_str(msg_type), rx->mxc_cookie);        mxret = mx_kirecv(kmxlnd_data.kmx_endpt,                           rx->mxc_seg_list, rx->mxc_nseg,                          rx->mxc_pin_type, rx->mxc_match,                          0xF00FFFFFFFFFFFFFLL, (void *) rx,                           &rx->mxc_mxreq);        if (mxret != MX_SUCCESS) {                if (rx->mxc_conn != NULL) {                        mxlnd_deq_pending_ctx(rx);                }                CDEBUG(D_NETERROR, "mx_kirecv() failed with %d for %s\n",                                    (int) mxret, libcfs_nid2str(target.nid));                return -1;        }        return 0;}/** * mxlnd_send - the LND required send function * @ni * @private * @lntmsg * * This must not block. Since we may not have a peer struct for the receiver, * it will append send messages on a global tx list. We will then up the * tx_queued's semaphore to notify it of the new send.  */intmxlnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg){        int                     ret             = 0;        int                     type            = lntmsg->msg_type;        lnet_hdr_t             *hdr             = &lntmsg->msg_hdr;        lnet_process_id_t       target          = lntmsg->msg_target;        lnet_nid_t              nid             = target.nid;        int                     target_is_router = lntmsg->msg_target_is_router;        int                     routing         = lntmsg->msg_routing;        unsigned int            payload_niov    = lntmsg->msg_niov;        struct iovec           *payload_iov     = lntmsg->msg_iov;        lnet_kiov_t            *payload_kiov    = lntmsg->msg_kiov;        unsigned int            payload_offset  = lntmsg->msg_offset;        unsigned int            payload_nob     = lntmsg->msg_len;        struct kmx_ctx         *tx              = NULL;        struct kmx_msg         *txmsg           = NULL;        struct kmx_ctx         *rx              = (struct kmx_ctx *) private; /* for REPLY */        struct kmx_ctx         *rx_data         = NULL;        struct kmx_conn        *conn            = NULL;        int                     nob             = 0;        __u32                   length          = 0;        struct kmx_peer         *peer           = NULL;        CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",                       payload_nob, payload_niov, libcfs_id2str(target));        LASSERT (payload_nob == 0 || payload_niov > 0);        LASSERT (payload_niov <= LNET_MAX_IOV);        /* payload is either all vaddrs or all pages */        LASSERT (!(payload_kiov != NULL && payload_iov != NULL));        /* private is used on LNET_GET_REPLY only, NULL for all other cases */        /* NOTE we may not know the peer if it is the very first PUT_REQ or GET_REQ         * to a new peer, use the nid */        peer = mxlnd_find_peer_by_nid(nid);        if (peer != NULL) {                conn = peer->mxp_conn;                if (conn) mxlnd_conn_addref(conn);        }        if (conn == NULL && peer != NULL) {                CDEBUG(D_NETERROR, "conn==NULL peer=0x%p nid=0x%llx payload_nob=%d type=%s\n",                        peer, nid, payload_nob, ((type==LNET_MSG_PUT) ? "PUT" :                        ((type==LNET_MSG_GET) ? "GET" : "Other")));        }        switch (type) {        case LNET_MSG_ACK:                LASSERT (payload_nob == 0);                break;        case LNET_MSG_REPLY:        case LNET_MSG_PUT:                /* Is the payload small enough not to need DATA? */                nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[payload_nob]);                if (nob <= MXLND_EAGER_SIZE)                        break;                  /* send EAGER */                tx = mxlnd_get_idle_tx();                if (unlikely(tx == NULL)) {                        CDEBUG(D_NETERROR, "Can't allocate %s tx for %s\n",                               type == LNET_MSG_PUT ? "PUT" : "REPLY",                               libcfs_nid2str(nid));                        if (conn) mxlnd_conn_decref(conn);                        return -ENOMEM;                }                /* the peer may be NULL */                tx->mxc_peer = peer;                tx->mxc_conn = conn; /* may be NULL */                /* we added a conn ref above */                mxlnd_init_tx_msg (tx, MXLND_MSG_PUT_REQ, sizeof(kmx_putreq_msg_t), nid);                txmsg = tx->mxc_msg;                txmsg->mxm_u.put_req.mxprm_hdr = *hdr;                txmsg->mxm_u.put_req.mxprm_cookie = tx->mxc_cookie;                tx->mxc_match = mxlnd_create_match(tx, 0);                /* we must post a receive _before_ sending the request.                 * we need to determine how much to receive, it will be either                 * a put_ack or a put_nak. The put_ack is larger, so use it. */                rx = mxlnd_get_idle_rx();                if (unlikely(rx == NULL)) {                        CDEBUG(D_NETERROR, "Can't allocate rx for PUT_ACK for %s\n",                                           libcfs_nid2str(nid));                        mxlnd_put_idle_tx(tx);                        if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */                        return -ENOMEM;                }                rx->mxc_nid = nid;                rx->mxc_peer = peer;                /* conn may be NULL but unlikely since the first msg is always small */                if (conn) mxlnd_conn_addref(conn); /* for this rx */                rx->mxc_conn = conn;                rx->mxc_msg_type = MXLND_MSG_PUT_ACK;                rx->mxc_cookie = tx->mxc_cookie;                rx->mxc_match = mxlnd_create_match(rx, 0);                length = offsetof(kmx_msg_t, mxm_u) + sizeof(kmx_putack_msg_t);                ret = mxlnd_recv_msg(lntmsg, rx, MXLND_MSG_PUT_ACK, rx->mxc_match, length);                if (unlikely(ret != 0)) {                        CDEBUG(D_NETERROR, "recv_msg() failed for PUT_ACK for %s\n",                                           libcfs_nid2str(nid));                        rx->mxc_lntmsg[0] = NULL;                        mxlnd_put_idle_rx(rx);                        mxlnd_put_idle_tx(tx);                        if (conn) {                                mxlnd_conn_decref(conn); /* for the rx... */                                mxlnd_conn_decref(conn); /* and for the tx */                        }                        return -ENOMEM;                }                mxlnd_queue_tx(tx)

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -