⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ptllnd_cb.c

📁 非常经典的一个分布式系统
💻 C
📖 第 1 页 / 共 5 页
字号:
        CDEBUG(D_NET, "%s [%d]+%d,%d -> %s%s\n",                lnet_msgtyp2str(msg->msg_type),               msg->msg_niov, msg->msg_offset, msg->msg_len,               libcfs_nid2str(msg->msg_target.nid),               msg->msg_target_is_router ? "(rtr)" : "");        if ((msg->msg_target.pid & LNET_PID_USERFLAG) != 0) {                CERROR("Can't send to non-kernel peer %s\n",                       libcfs_id2str(msg->msg_target));                return -EHOSTUNREACH;        }                plp = ptllnd_find_peer(ni, msg->msg_target, 1);        if (plp == NULL)                return -ENOMEM;        switch (msg->msg_type) {        default:                LBUG();        case LNET_MSG_ACK:                LASSERT (msg->msg_len == 0);                break;                          /* send IMMEDIATE */        case LNET_MSG_GET:                if (msg->msg_target_is_router)                        break;                  /* send IMMEDIATE */                nob = msg->msg_md->md_length;                nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[nob]);                if (nob <= plni->plni_max_msg_size)                        break;                LASSERT ((msg->msg_md->md_options & LNET_MD_KIOV) == 0);                rc = ptllnd_passive_rdma(plp, PTLLND_MSG_TYPE_GET, msg,                                         msg->msg_md->md_niov,                                         msg->msg_md->md_iov.iov,                                         0, msg->msg_md->md_length);                ptllnd_peer_decref(plp);                return rc;        case LNET_MSG_REPLY:        case LNET_MSG_PUT:                nob = msg->msg_len;                nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[nob]);                if (nob <= plp->plp_max_msg_size)                        break;                  /* send IMMEDIATE */                rc = ptllnd_passive_rdma(plp, PTLLND_MSG_TYPE_PUT, msg,                                         msg->msg_niov, msg->msg_iov,                                         msg->msg_offset, msg->msg_len);                ptllnd_peer_decref(plp);                return rc;        }        /* send IMMEDIATE         * NB copy the payload so we don't have to do a fragmented send */        tx = ptllnd_new_tx(plp, PTLLND_MSG_TYPE_IMMEDIATE, msg->msg_len);        if (tx == NULL) {                CERROR("Can't allocate tx for lnet type %d to %s\n",                       msg->msg_type, libcfs_id2str(msg->msg_target));                ptllnd_peer_decref(plp);                return -ENOMEM;        }        lnet_copy_iov2flat(tx->tx_msgsize, &tx->tx_msg,                           offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload),                           msg->msg_niov, msg->msg_iov, msg->msg_offset,                           msg->msg_len);        tx->tx_msg.ptlm_u.immediate.kptlim_hdr = msg->msg_hdr;        tx->tx_lnetmsg = msg;        PTLLND_HISTORY("%s[%d/%d+%d(%d)]: post immediate %s p %d %p",                       libcfs_id2str(msg->msg_target),                       plp->plp_credits, plp->plp_outstanding_credits,                       plp->plp_sent_credits,                       plni->plni_peer_credits + plp->plp_lazy_credits,                       lnet_msgtyp2str(msg->msg_type),                       (le32_to_cpu(msg->msg_type) == LNET_MSG_PUT) ?                        le32_to_cpu(msg->msg_hdr.msg.put.ptl_index) :                       (le32_to_cpu(msg->msg_type) == LNET_MSG_GET) ?                        le32_to_cpu(msg->msg_hdr.msg.get.ptl_index) : -1,                       tx);        ptllnd_post_tx(tx);        ptllnd_peer_decref(plp);        return 0;}voidptllnd_rx_done(ptllnd_rx_t *rx){        ptllnd_peer_t *plp = rx->rx_peer;        lnet_ni_t     *ni = plp->plp_ni;        ptllnd_ni_t   *plni = ni->ni_data;        plp->plp_outstanding_credits++;        PTLLND_HISTORY("%s[%d/%d+%d(%d)]: rx=%p done\n",                       libcfs_id2str(plp->plp_id),                       plp->plp_credits, plp->plp_outstanding_credits,                        plp->plp_sent_credits,                       plni->plni_peer_credits + plp->plp_lazy_credits, rx);        ptllnd_check_sends(rx->rx_peer);        LASSERT (plni->plni_nrxs > 0);        plni->plni_nrxs--;}intptllnd_eager_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg,                  void **new_privatep){        /* Shouldn't get here; recvs only block for router buffers */        LBUG();        return 0;}intptllnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg,            int delayed, unsigned int niov,            struct iovec *iov, lnet_kiov_t *kiov,            unsigned int offset, unsigned int mlen, unsigned int rlen){        ptllnd_rx_t    *rx = private;        int             rc = 0;        int             nob;        LASSERT (kiov == NULL);        LASSERT (niov <= PTL_MD_MAX_IOV);       /* !!! */        switch (rx->rx_msg->ptlm_type) {        default:                LBUG();        case PTLLND_MSG_TYPE_IMMEDIATE:                nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[mlen]);                if (nob > rx->rx_nob) {                        CERROR("Immediate message from %s too big: %d(%d)\n",                               libcfs_id2str(rx->rx_peer->plp_id),                               nob, rx->rx_nob);                        rc = -EPROTO;                        break;                }                lnet_copy_flat2iov(niov, iov, offset,                                   rx->rx_nob, rx->rx_msg,                                   offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload),                                   mlen);                lnet_finalize(ni, msg, 0);                break;        case PTLLND_MSG_TYPE_PUT:                rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_READ, msg,                                        rx->rx_msg->ptlm_u.rdma.kptlrm_matchbits,                                        niov, iov, offset, mlen);                break;        case PTLLND_MSG_TYPE_GET:                if (msg != NULL)                        rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_WRITE, msg,                                                rx->rx_msg->ptlm_u.rdma.kptlrm_matchbits,                                                msg->msg_niov, msg->msg_iov,                                                msg->msg_offset, msg->msg_len);                else                        rc = ptllnd_active_rdma(rx->rx_peer, PTLLND_RDMA_WRITE, NULL,                                                rx->rx_msg->ptlm_u.rdma.kptlrm_matchbits,                                                0, NULL, 0, 0);                break;        }        ptllnd_rx_done(rx);        return rc;}voidptllnd_parse_request(lnet_ni_t *ni, ptl_process_id_t initiator,                     kptl_msg_t *msg, unsigned int nob){        ptllnd_ni_t      *plni = ni->ni_data;        const int         basenob = offsetof(kptl_msg_t, ptlm_u);        lnet_process_id_t srcid;        ptllnd_rx_t       rx;        int               flip;        __u16             msg_version;        __u32             msg_cksum;        ptllnd_peer_t    *plp;        int               rc;        if (nob < 6) {                CERROR("Very short receive from %s\n",                       ptllnd_ptlid2str(initiator));                return;        }        /* I can at least read MAGIC/VERSION */        flip = msg->ptlm_magic == __swab32(PTLLND_MSG_MAGIC);        if (!flip && msg->ptlm_magic != PTLLND_MSG_MAGIC) {                CERROR("Bad protocol magic %08x from %s\n",                        msg->ptlm_magic, ptllnd_ptlid2str(initiator));                return;        }        msg_version = flip ? __swab16(msg->ptlm_version) : msg->ptlm_version;        if (msg_version != PTLLND_MSG_VERSION) {                CERROR("Bad protocol version %04x from %s: %04x expected\n",                        (__u32)msg_version, ptllnd_ptlid2str(initiator), PTLLND_MSG_VERSION);                if (plni->plni_abort_on_protocol_mismatch)                        abort();                return;        }        if (nob < basenob) {                CERROR("Short receive from %s: got %d, wanted at least %d\n",                       ptllnd_ptlid2str(initiator), nob, basenob);                return;        }        /* checksum must be computed with         * 1) ptlm_cksum zero and         * 2) BEFORE anything gets modified/flipped         */        msg_cksum = flip ? __swab32(msg->ptlm_cksum) : msg->ptlm_cksum;        msg->ptlm_cksum = 0;        if (msg_cksum != 0 &&            msg_cksum != ptllnd_cksum(msg, offsetof(kptl_msg_t, ptlm_u))) {                CERROR("Bad checksum from %s\n", ptllnd_ptlid2str(initiator));                return;        }        msg->ptlm_version = msg_version;        msg->ptlm_cksum = msg_cksum;                if (flip) {                /* NB stamps are opaque cookies */                __swab32s(&msg->ptlm_nob);                __swab64s(&msg->ptlm_srcnid);                __swab64s(&msg->ptlm_dstnid);                __swab32s(&msg->ptlm_srcpid);                __swab32s(&msg->ptlm_dstpid);        }                srcid.nid = msg->ptlm_srcnid;        srcid.pid = msg->ptlm_srcpid;        if (LNET_NIDNET(msg->ptlm_srcnid) != LNET_NIDNET(ni->ni_nid)) {                CERROR("Bad source id %s from %s\n",                       libcfs_id2str(srcid),                       ptllnd_ptlid2str(initiator));                return;        }        if (msg->ptlm_type == PTLLND_MSG_TYPE_NAK) {                CERROR("NAK from %s (%s)\n",                        libcfs_id2str(srcid),                       ptllnd_ptlid2str(initiator));                if (plni->plni_dump_on_nak)                        ptllnd_dump_debug(ni, srcid);                                if (plni->plni_abort_on_nak)                        abort();                                return;        }                if (msg->ptlm_dstnid != ni->ni_nid ||            msg->ptlm_dstpid != the_lnet.ln_pid) {                CERROR("Bad dstid %s (%s expected) from %s\n",                       libcfs_id2str((lnet_process_id_t) {                               .nid = msg->ptlm_dstnid,                               .pid = msg->ptlm_dstpid}),                       libcfs_id2str((lnet_process_id_t) {                               .nid = ni->ni_nid,                               .pid = the_lnet.ln_pid}),                       libcfs_id2str(srcid));                return;        }        if (msg->ptlm_dststamp != plni->plni_stamp) {                CERROR("Bad dststamp "LPX64"("LPX64" expected) from %s\n",                       msg->ptlm_dststamp, plni->plni_stamp,                       libcfs_id2str(srcid));                return;        }        PTLLND_HISTORY("RX %s: %s %d %p", libcfs_id2str(srcid),                        ptllnd_msgtype2str(msg->ptlm_type),                       msg->ptlm_credits, &rx);        switch (msg->ptlm_type) {        case PTLLND_MSG_TYPE_PUT:        case PTLLND_MSG_TYPE_GET:                if (nob < basenob + sizeof(kptl_rdma_msg_t)) {                        CERROR("Short rdma request from %s(%s)\n",                               libcfs_id2str(srcid),                               ptllnd_ptlid2str(initiator));                        return;                }                if (flip)                        __swab64s(&msg->ptlm_u.rdma.kptlrm_matchbits);                break;        case PTLLND_MSG_TYPE_IMMEDIATE:                if (nob < offsetof(kptl_msg_t,                                   ptlm_u.immediate.kptlim_payload)) {                        CERROR("Short immediate from %s(%s)\n",                               libcfs_id2str(srcid),                               ptllnd_ptlid2str(initiator));                        return;                }                break;        case PTLLND_MSG_TYPE_HELLO:                if (nob < basenob + sizeof(kptl_hello_msg_t)) {                        CERROR("Short hello from %s(%s)\n",                               libcfs_id2str(srcid),                               ptllnd_ptlid2str(initiator));                        return;                }                if(flip){                        __swab64s(&msg->ptlm_u.hello.kptlhm_matchbits);                        __swab32s(&msg->ptlm_u.hello.kptlhm_max_msg_size);                }                break;                        case PTLLND_MSG_TYPE_NOOP:                break;        default:                CERROR("Bad message type %d from %s(%s)\n", msg->ptlm_type,                       libcfs_id2str(srcid),                       ptllnd_ptlid2str(initiator));                return;        }        plp = ptllnd_find_peer(ni, srcid, 0);        if (plp == NULL) {                CERROR("Can't find peer %s\n", libcfs_id2str(srcid));                return;        }        if (msg->ptlm_type == PTLLND_MSG_TYPE_HELLO) {                if (plp->plp_recvd_hello) {                        CERROR("Unexpected HELLO from %s\n",                               libcfs_id2str(srcid));                        ptllnd_peer_decref(plp);                        return;                }                plp->plp_max_msg_size = msg->ptlm_u.hello.kptlhm_max_msg_size;                plp->plp_match = msg->ptlm_u.hello.kptlhm_matchbits;                plp->plp_stamp = msg->ptlm_srcstamp;                plp->plp_recvd_hello = 1;        } else if (!plp->plp_recvd_hello) {                CERROR("Bad message type %d (HELLO expected) from %s\n",                       msg->ptlm_type, libcfs_id2str(srcid));                ptllnd_peer_decref(plp);                return;        } else if (msg->ptlm_srcstamp != plp->plp_stamp) {                CERROR("Bad srcstamp "LPX64"("LPX64" expected) from %s\n",                       msg->ptlm_srcstamp, plp->plp_stamp,

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -