⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mxlnd_cb.c

📁 非常经典的一个分布式系统
💻 C
📖 第 1 页 / 共 5 页
字号:
        return (tx->mxc_msg_type == MXLND_MSG_EAGER ||                tx->mxc_msg_type == MXLND_MSG_GET_REQ ||                tx->mxc_msg_type == MXLND_MSG_PUT_REQ ||                tx->mxc_msg_type == MXLND_MSG_NOOP);}/** * mxlnd_init_msg - set type and number of bytes * @msg - msg pointer * @type - of message * @body_nob - bytes in msg body */static inline voidmxlnd_init_msg(kmx_msg_t *msg, u8 type, int body_nob){        msg->mxm_type = type;        msg->mxm_nob  = offsetof(kmx_msg_t, mxm_u) + body_nob;}static inline voidmxlnd_init_tx_msg (struct kmx_ctx *tx, u8 type, int body_nob, lnet_nid_t nid){        int             nob     = offsetof (kmx_msg_t, mxm_u) + body_nob;        struct kmx_msg  *msg    = NULL;                LASSERT (tx != NULL);        LASSERT (nob <= MXLND_EAGER_SIZE);        tx->mxc_nid = nid;        /* tx->mxc_peer should have already been set if we know it */        tx->mxc_msg_type = type;        tx->mxc_nseg = 1;        /* tx->mxc_seg.segment_ptr is already pointing to mxc_page */        tx->mxc_seg.segment_length = nob;        tx->mxc_pin_type = MX_PIN_PHYSICAL;        //tx->mxc_state = MXLND_CTX_PENDING;        msg = tx->mxc_msg;        msg->mxm_type = type;        msg->mxm_nob  = nob;        return;}static inline __u32 mxlnd_cksum (void *ptr, int nob){        char  *c  = ptr;        __u32  sum = 0;        while (nob-- > 0)                sum = ((sum << 1) | (sum >> 31)) + *c++;        /* ensure I don't return 0 (== no checksum) */        return (sum == 0) ? 1 : sum;}/** * mxlnd_pack_msg - complete msg info * @tx - msg to send */static inline voidmxlnd_pack_msg(struct kmx_ctx *tx){        struct kmx_msg  *msg    = tx->mxc_msg;        /* type and nob should already be set in init_msg() */        msg->mxm_magic    = MXLND_MSG_MAGIC;        msg->mxm_version  = MXLND_MSG_VERSION;        /*   mxm_type */        /* don't use mxlnd_tx_requires_credit() since we want PUT_ACK to         * return credits as well */        if (tx->mxc_msg_type != MXLND_MSG_CONN_REQ &&            tx->mxc_msg_type != MXLND_MSG_CONN_ACK) {                spin_lock(&tx->mxc_conn->mxk_lock);                msg->mxm_credits  = tx->mxc_conn->mxk_outstanding;                tx->mxc_conn->mxk_outstanding = 0;                spin_unlock(&tx->mxc_conn->mxk_lock);        } else {                msg->mxm_credits  = 0;        }        /*   mxm_nob */        msg->mxm_cksum    = 0;                msg->mxm_srcnid   = lnet_ptlcompat_srcnid(kmxlnd_data.kmx_ni->ni_nid, tx->mxc_nid);        msg->mxm_srcstamp = kmxlnd_data.kmx_incarnation;        msg->mxm_dstnid   = tx->mxc_nid;        /* if it is a new peer, the dststamp will be 0 */        msg->mxm_dststamp = tx->mxc_conn->mxk_incarnation;                 msg->mxm_seq      = tx->mxc_cookie;        if (*kmxlnd_tunables.kmx_cksum) {                msg->mxm_cksum = mxlnd_cksum(msg, msg->mxm_nob);        }       }intmxlnd_unpack_msg(kmx_msg_t *msg, int nob){        const int hdr_size      = offsetof(kmx_msg_t, mxm_u);        __u32     msg_cksum     = 0;        int       flip          = 0;        int       msg_nob       = 0;        /* 6 bytes are enough to have received magic + version */        if (nob < 6) {                CDEBUG(D_NETERROR, "not enough bytes for magic + hdr: %d\n", nob);                return -EPROTO;        }        if (msg->mxm_magic == MXLND_MSG_MAGIC) {                flip = 0;        } else if (msg->mxm_magic == __swab32(MXLND_MSG_MAGIC)) {                flip = 1;        } else {                CDEBUG(D_NETERROR, "Bad magic: %08x\n", msg->mxm_magic);                return -EPROTO;        }        if (msg->mxm_version !=            (flip ? __swab16(MXLND_MSG_VERSION) : MXLND_MSG_VERSION)) {                CDEBUG(D_NETERROR, "Bad version: %d\n", msg->mxm_version);                return -EPROTO;        }        if (nob < hdr_size) {                CDEBUG(D_NETERROR, "not enough for a header: %d\n", nob);                return -EPROTO;        }        msg_nob = flip ? __swab32(msg->mxm_nob) : msg->mxm_nob;        if (msg_nob > nob) {                CDEBUG(D_NETERROR, "Short message: got %d, wanted %d\n", nob, msg_nob);                return -EPROTO;        }        /* checksum must be computed with mxm_cksum zero and BEFORE anything         * gets flipped */        msg_cksum = flip ? __swab32(msg->mxm_cksum) : msg->mxm_cksum;        msg->mxm_cksum = 0;        if (msg_cksum != 0 && msg_cksum != mxlnd_cksum(msg, msg_nob)) {                CDEBUG(D_NETERROR, "Bad checksum\n");                return -EPROTO;        }        msg->mxm_cksum = msg_cksum;        if (flip) {                /* leave magic unflipped as a clue to peer endianness */                __swab16s(&msg->mxm_version);                CLASSERT (sizeof(msg->mxm_type) == 1);                CLASSERT (sizeof(msg->mxm_credits) == 1);                msg->mxm_nob = msg_nob;                __swab64s(&msg->mxm_srcnid);                __swab64s(&msg->mxm_srcstamp);                __swab64s(&msg->mxm_dstnid);                __swab64s(&msg->mxm_dststamp);                __swab64s(&msg->mxm_seq);        }        if (msg->mxm_srcnid == LNET_NID_ANY) {                CDEBUG(D_NETERROR, "Bad src nid: %s\n", libcfs_nid2str(msg->mxm_srcnid));                return -EPROTO;        }        switch (msg->mxm_type) {        default:                CDEBUG(D_NETERROR, "Unknown message type %x\n", msg->mxm_type);                return -EPROTO;        case MXLND_MSG_NOOP:                break;        case MXLND_MSG_EAGER:                if (msg_nob < offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[0])) {                        CDEBUG(D_NETERROR, "Short EAGER: %d(%d)\n", msg_nob,                               (int)offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[0]));                        return -EPROTO;                }                break;        case MXLND_MSG_PUT_REQ:                if (msg_nob < hdr_size + sizeof(msg->mxm_u.put_req)) {                        CDEBUG(D_NETERROR, "Short PUT_REQ: %d(%d)\n", msg_nob,                               (int)(hdr_size + sizeof(msg->mxm_u.put_req)));                        return -EPROTO;                }                if (flip)                        __swab64s(&msg->mxm_u.put_req.mxprm_cookie);                break;        case MXLND_MSG_PUT_ACK:                if (msg_nob < hdr_size + sizeof(msg->mxm_u.put_ack)) {                        CDEBUG(D_NETERROR, "Short PUT_ACK: %d(%d)\n", msg_nob,                               (int)(hdr_size + sizeof(msg->mxm_u.put_ack)));                        return -EPROTO;                }                if (flip) {                        __swab64s(&msg->mxm_u.put_ack.mxpam_src_cookie);                        __swab64s(&msg->mxm_u.put_ack.mxpam_dst_cookie);                }                break;        case MXLND_MSG_GET_REQ:                if (msg_nob < hdr_size + sizeof(msg->mxm_u.get_req)) {                        CDEBUG(D_NETERROR, "Short GET_REQ: %d(%d)\n", msg_nob,                               (int)(hdr_size + sizeof(msg->mxm_u.get_req)));                        return -EPROTO;                }                if (flip) {                        __swab64s(&msg->mxm_u.get_req.mxgrm_cookie);                }                break;        case MXLND_MSG_CONN_REQ:        case MXLND_MSG_CONN_ACK:                if (msg_nob < hdr_size + sizeof(msg->mxm_u.conn_req)) {                        CDEBUG(D_NETERROR, "Short connreq/ack: %d(%d)\n", msg_nob,                               (int)(hdr_size + sizeof(msg->mxm_u.conn_req)));                        return -EPROTO;                }                if (flip) {                        __swab32s(&msg->mxm_u.conn_req.mxcrm_queue_depth);                        __swab32s(&msg->mxm_u.conn_req.mxcrm_eager_size);                }                break;        }        return 0;}/** * mxlnd_recv_msg * @lntmsg - the LNET msg that this is continuing. If EAGER, then NULL. * @rx * @msg_type * @cookie * @length - length of incoming message * @pending - add to kmx_pending (0 is NO and 1 is YES) * * The caller gets the rx and sets nid, peer and conn if known. * * Returns 0 on success and -1 on failure */intmxlnd_recv_msg(lnet_msg_t *lntmsg, struct kmx_ctx *rx, u8 msg_type, u64 cookie, u32 length){        int             ret     = 0;        mx_return_t     mxret   = MX_SUCCESS;        __u64           mask    = 0xF00FFFFFFFFFFFFFLL;        rx->mxc_msg_type = msg_type;        rx->mxc_lntmsg[0] = lntmsg; /* may be NULL if EAGER */        rx->mxc_cookie = cookie;        /* rx->mxc_match may already be set */        /* rx->mxc_seg.segment_ptr is already set */        rx->mxc_seg.segment_length = length;        rx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;        ret = mxlnd_q_pending_ctx(rx);        if (ret == -1) {                /* FIXME the conn is disconnected, now what? */                return -1;        }        mxret = mx_kirecv(kmxlnd_data.kmx_endpt, &rx->mxc_seg, 1, MX_PIN_PHYSICAL,                          cookie, mask, (void *) rx, &rx->mxc_mxreq);        if (mxret != MX_SUCCESS) {                mxlnd_deq_pending_ctx(rx);                CDEBUG(D_NETERROR, "mx_kirecv() failed with %s (%d)\n",                                    mx_strerror(mxret), (int) mxret);                return -1;        }        return 0;}/** * mxlnd_unexpected_recv - this is the callback function that will handle  *                         unexpected receives * @context - NULL, ignore * @source - the peer's mx_endpoint_addr_t * @match_value - the msg's bit, should be MXLND_MASK_EAGER * @length - length of incoming message * @data_if_available - ignore * * If it is an eager-sized msg, we will call recv_msg() with the actual * length. If it is a large message, we will call recv_msg() with a * length of 0 bytes to drop it because we should never have a large, * unexpected message. * * NOTE - The MX library blocks until this function completes. Make it as fast as * possible. DO NOT allocate memory which can block! * * If we cannot get a rx or the conn is closed, drop the message on the floor * (i.e. recv 0 bytes and ignore). */mx_unexp_handler_action_tmxlnd_unexpected_recv(void *context, mx_endpoint_addr_t source,                 __u64 match_value, __u32 length, void *data_if_available){        int             ret             = 0;        struct kmx_ctx  *rx             = NULL;        mx_ksegment_t   seg;        u8              msg_type        = 0;        u8              error           = 0;        u64             cookie          = 0LL;        if (context != NULL) {                CDEBUG(D_NETERROR, "unexpected receive with non-NULL context\n");        }#if MXLND_DEBUG        CDEBUG(D_NET, "unexpected_recv() bits=0x%llx length=%d\n", match_value, length);#endif        rx = mxlnd_get_idle_rx();        if (rx != NULL) {                mxlnd_parse_match(match_value, &msg_type, &error, &cookie);                if (length <= MXLND_EAGER_SIZE) {                        ret = mxlnd_recv_msg(NULL, rx, msg_type, match_value, length);                } else {                        CDEBUG(D_NETERROR, "unexpected large receive with "                                           "match_value=0x%llx length=%d\n",                                            match_value, length);                        ret = mxlnd_recv_msg(NULL, rx, msg_type, match_value, 0);                }                if (ret == 0) {                        struct kmx_conn *conn   = NULL;                        mx_get_endpoint_addr_context(source, (void **) &conn);                        if (conn != NULL) {                                mxlnd_conn_addref(conn);                                rx->mxc_conn = conn;                                rx->mxc_peer = conn->mxk_peer;                                if (conn->mxk_peer != NULL) {                                        rx->mxc_nid = conn->mxk_peer->mxp_nid;                                } else {                                        CDEBUG(D_NETERROR, "conn is 0x%p and peer "                                                           "is NULL\n", conn);                                }                        }                } else {                        CDEBUG(D_NETERROR, "could not post receive\n");                        mxlnd_put_idle_rx(rx);                }        }        if (rx == NULL || ret != 0) {                if (rx == NULL) {                        CDEBUG(D_NETERROR, "no idle rxs available - dropping rx\n");                } else {                        /* ret != 0 */                        CDEBUG(D_NETERROR, "disconnected peer - dropping rx\n");                }                seg.segment_ptr = 0LL;                seg.segment_length = 0;                mx_kirecv(kmxlnd_data.kmx_endpt, &seg, 1, MX_PIN_PHYSICAL,                          match_value, 0xFFFFFFFFFFFFFFFFLL, NULL, NULL);        }        return MX_RECV_CONTINUE;}intmxlnd_get_peer_info(int index, lnet_nid_t *nidp, int *count){        int                      i      = 0;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -