⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 qswlnd_cb.c

📁 非常经典的一个分布式系统
💻 C
📖 第 1 页 / 共 5 页
字号:
        /* If ln_ptlcompat is set, peers may send me an "old" unencapsulated         * lnet hdr */        LASSERT (offsetof(kqswnal_msg_t, kqm_u) <= sizeof(lnet_hdr_t));                if (krx->krx_nob < offsetof(kqswnal_msg_t, kqm_u)) {                CERROR("Short message %d received from %s\n",                       krx->krx_nob, libcfs_nid2str(fromnid));                goto done;        }        swab = msg->kqm_magic == __swab32(LNET_PROTO_QSW_MAGIC);        if (swab || msg->kqm_magic == LNET_PROTO_QSW_MAGIC) {#if KQSW_CKSUM                __u32 csum0;                __u32 csum1;                /* csum byte array before swab */                csum1 = msg->kqm_cksum;                msg->kqm_cksum = 0;                csum0 = kqswnal_csum_kiov(~0, 0, krx->krx_nob,                                          krx->krx_npages, krx->krx_kiov);                msg->kqm_cksum = csum1;#endif                if (swab) {                        __swab16s(&msg->kqm_version);                        __swab16s(&msg->kqm_type);#if KQSW_CKSUM                        __swab32s(&msg->kqm_cksum);                        __swab32s(&msg->kqm_nob);#endif                }                if (msg->kqm_version != QSWLND_PROTO_VERSION) {                        /* Future protocol version compatibility support!                         * The next qswlnd-specific protocol rev will first                         * send an RPC to check version.                         * 1.4.6 and 1.4.7.early reply with a status                         * block containing its current version.                         * Later versions send a failure (-ve) status +                         * magic/version */                        if (!krx->krx_rpc_reply_needed) {                                CERROR("Unexpected version %d from %s\n",                                       msg->kqm_version, libcfs_nid2str(fromnid));                                goto done;                        }                        LASSERT (krx->krx_rpc_reply.msg.status == -EPROTO);                        goto done;                }                switch (msg->kqm_type) {                default:                        CERROR("Bad request type %x from %s\n",                               msg->kqm_type, libcfs_nid2str(fromnid));                        goto done;                case QSWLND_MSG_IMMEDIATE:                        if (krx->krx_rpc_reply_needed) {                                /* Should have been a simple message */                                CERROR("IMMEDIATE sent as RPC from %s\n",                                       libcfs_nid2str(fromnid));                                goto done;                        }                        nob = offsetof(kqswnal_msg_t, kqm_u.immediate.kqim_payload);                        if (krx->krx_nob < nob) {                                CERROR("Short IMMEDIATE %d(%d) from %s\n",                                       krx->krx_nob, nob, libcfs_nid2str(fromnid));                                goto done;                        }#if KQSW_CKSUM                        if (csum0 != msg->kqm_cksum) {                                CERROR("Bad IMMEDIATE checksum %08x(%08x) from %s\n",                                       csum0, msg->kqm_cksum, libcfs_nid2str(fromnid));                                CERROR("nob %d (%d)\n", krx->krx_nob, msg->kqm_nob);                                goto done;                        }#endif                        rc = lnet_parse(ni, &msg->kqm_u.immediate.kqim_hdr,                                        fromnid, krx, 0);                        if (rc < 0)                                goto done;                        return;                case QSWLND_MSG_RDMA:                        if (!krx->krx_rpc_reply_needed) {                                /* Should have been a simple message */                                CERROR("RDMA sent as simple message from %s\n",                                       libcfs_nid2str(fromnid));                                goto done;                        }                        nob = offsetof(kqswnal_msg_t,                                       kqm_u.rdma.kqrm_rmd.kqrmd_frag[0]);                        if (krx->krx_nob < nob) {                                CERROR("Short RDMA message %d(%d) from %s\n",                                       krx->krx_nob, nob, libcfs_nid2str(fromnid));                                goto done;                        }                        if (swab)                                __swab32s(&msg->kqm_u.rdma.kqrm_rmd.kqrmd_nfrag);                        n = msg->kqm_u.rdma.kqrm_rmd.kqrmd_nfrag;                        nob = offsetof(kqswnal_msg_t,                                       kqm_u.rdma.kqrm_rmd.kqrmd_frag[n]);                        if (krx->krx_nob < nob) {                                CERROR("short RDMA message %d(%d) from %s\n",                                       krx->krx_nob, nob, libcfs_nid2str(fromnid));                                goto done;                        }                        if (swab) {                                for (i = 0; i < n; i++) {                                        EP_NMD *nmd = &msg->kqm_u.rdma.kqrm_rmd.kqrmd_frag[i];                                        __swab32s(&nmd->nmd_addr);                                        __swab32s(&nmd->nmd_len);                                        __swab32s(&nmd->nmd_attr);                                }                        }#if KQSW_CKSUM                        krx->krx_cksum = csum0; /* stash checksum so far */#endif                        rc = lnet_parse(ni, &msg->kqm_u.rdma.kqrm_hdr,                                        fromnid, krx, 1);                        if (rc < 0)                                goto done;                        return;                }                /* Not Reached */        }        if (msg->kqm_magic == LNET_PROTO_MAGIC ||            msg->kqm_magic == __swab32(LNET_PROTO_MAGIC)) {                /* Future protocol version compatibility support!                 * When LNET unifies protocols over all LNDs, the first thing a                 * peer will send will be a version query RPC.                   * 1.4.6 and 1.4.7.early reply with a status block containing                 * LNET_PROTO_QSW_MAGIC..                 * Later versions send a failure (-ve) status +                 * magic/version */                if (!krx->krx_rpc_reply_needed) {                        CERROR("Unexpected magic %08x from %s\n",                               msg->kqm_magic, libcfs_nid2str(fromnid));                        goto done;                }                LASSERT (krx->krx_rpc_reply.msg.status == -EPROTO);                goto done;        }        if (the_lnet.ln_ptlcompat != 0) {                /* Portals compatibility (strong or weak)                 * This could be an unencapsulated LNET header.  If it's big                 * enough, let LNET's parser sort it out */                if (krx->krx_nob < sizeof(lnet_hdr_t)) {                        CERROR("Short portals-compatible message from %s\n",                               libcfs_nid2str(fromnid));                        goto done;                }                krx->krx_raw_lnet_hdr = 1;                rc = lnet_parse(ni, (lnet_hdr_t *)msg,                                fromnid, krx, krx->krx_rpc_reply_needed);                if (rc < 0)                        goto done;                return;        }        CERROR("Unrecognised magic %08x from %s\n",               msg->kqm_magic, libcfs_nid2str(fromnid)); done:        kqswnal_rx_decref(krx);}/* Receive Interrupt Handler: posts to schedulers */void kqswnal_rxhandler(EP_RXD *rxd){        unsigned long flags;        int           nob    = ep_rxd_len (rxd);        int           status = ep_rxd_status (rxd);        kqswnal_rx_t *krx    = (kqswnal_rx_t *)ep_rxd_arg (rxd);        CDEBUG(D_NET, "kqswnal_rxhandler: rxd %p, krx %p, nob %d, status %d\n",               rxd, krx, nob, status);        LASSERT (krx != NULL);        LASSERT (krx->krx_state == KRX_POSTED);                krx->krx_state = KRX_PARSE;        krx->krx_rxd = rxd;        krx->krx_nob = nob;        krx->krx_raw_lnet_hdr = 0;        /* RPC reply iff rpc request received without error */        krx->krx_rpc_reply_needed = ep_rxd_isrpc(rxd) &&                                    (status == EP_SUCCESS ||                                     status == EP_MSG_TOO_BIG);        /* Default to failure if an RPC reply is requested but not handled */        krx->krx_rpc_reply.msg.status = -EPROTO;        atomic_set (&krx->krx_refcount, 1);        if (status != EP_SUCCESS) {                /* receives complete with failure when receiver is removed */                if (status == EP_SHUTDOWN)                        LASSERT (kqswnal_data.kqn_shuttingdown);                else                        CERROR("receive status failed with status %d nob %d\n",                               ep_rxd_status(rxd), nob);                kqswnal_rx_decref(krx);                return;        }        if (!in_interrupt()) {                kqswnal_parse(krx);                return;        }        spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);        list_add_tail (&krx->krx_list, &kqswnal_data.kqn_readyrxds);        wake_up (&kqswnal_data.kqn_sched_waitq);        spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);}intkqswnal_recv (lnet_ni_t     *ni,              void          *private,              lnet_msg_t    *lntmsg,              int            delayed,              unsigned int   niov,              struct iovec  *iov,              lnet_kiov_t   *kiov,              unsigned int   offset,              unsigned int   mlen,              unsigned int   rlen){        kqswnal_rx_t       *krx = (kqswnal_rx_t *)private;        lnet_nid_t          fromnid;        kqswnal_msg_t      *msg;        lnet_hdr_t         *hdr;        kqswnal_remotemd_t *rmd;        int                 msg_offset;        int                 rc;        LASSERT (!in_interrupt ());             /* OK to map */        /* Either all pages or all vaddrs */        LASSERT (!(kiov != NULL && iov != NULL));        fromnid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), ep_rxd_node(krx->krx_rxd));        msg = (kqswnal_msg_t *)page_address(krx->krx_kiov[0].kiov_page);        if (krx->krx_rpc_reply_needed) {                /* optimized (rdma) request sent as RPC */                if (krx->krx_raw_lnet_hdr) {                        LASSERT (the_lnet.ln_ptlcompat != 0);                        hdr = (lnet_hdr_t *)msg;                        rmd = kqswnal_get_portalscompat_rmd(krx);                        if (rmd == NULL)                                return (-EPROTO);                } else {                        LASSERT (msg->kqm_type == QSWLND_MSG_RDMA);                        hdr = &msg->kqm_u.rdma.kqrm_hdr;                        rmd = &msg->kqm_u.rdma.kqrm_rmd;                }                /* NB header is still in wire byte order */                switch (le32_to_cpu(hdr->type)) {                        case LNET_MSG_PUT:                        case LNET_MSG_REPLY:                                /* This is an optimized PUT/REPLY */                                rc = kqswnal_rdma(krx, lntmsg,                                                   KTX_RDMA_FETCH, rmd,                                                  niov, iov, kiov, offset, mlen);                                break;                        case LNET_MSG_GET:#if KQSW_CKSUM                                if (krx->krx_cksum != msg->kqm_cksum) {                                        CERROR("Bad GET checksum %08x(%08x) from %s\n",                                               krx->krx_cksum, msg->kqm_cksum,                                               libcfs_nid2str(fromnid));                                        rc = -EIO;                                        break;                                }#endif                                                                if (lntmsg == NULL) {                                        /* No buffer match: my decref will                                         * complete the RPC with failure */                                        rc = 0;                                } else {                                        /* Matched something! */                                        rc = kqswnal_rdma(krx, lntmsg,                                                          KTX_RDMA_STORE, rmd,                                                          lntmsg->msg_niov,                                                          lntmsg->msg_iov,                                                          lntmsg->msg_kiov,                                                          lntmsg->msg_offset,                                                          lntmsg->msg_len);                                }                                break;                        default:                                CERROR("Bad RPC type %d\n",                                       le32_to_cpu(hdr->type));                                rc = -EPROTO;                                break;                }                kqswnal_rx_decref(krx);                return rc;        }        if (krx->krx_raw_lnet_hdr) {                LASSERT (the_lnet.ln_ptlcompat != 0);                msg_offset = sizeof(lnet_hdr_t);        } else {                LASSERT (msg->kqm_type == QSWLND_MSG_IMMEDIATE);                msg_offset = offsetof(kqswnal_msg_t, kqm_u.immediate.kqim_payload);        }                if (krx->krx_nob < msg_offset + rlen) {                CERROR("Bad message size from %s: have %d, need %d + %d\n",                       libcfs_nid2str(fromnid), krx->krx_nob,                       msg_offset, rlen);                kqswnal_rx_decref(krx);                ret

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -