⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ralnd_cb.c

📁 非常经典的一个分布式系统
💻 C
📖 第 1 页 / 共 5 页
字号:
        case RANAL_MSG_PUT_REQ:                rc = kranal_map_buffer(tx);                LASSERT (rc != -EAGAIN);                if (rc != 0)                        break;                tx->tx_msg.ram_u.putreq.raprm_cookie = tx->tx_cookie;                rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);                expect_reply = 1;                break;        case RANAL_MSG_PUT_ACK:                rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);                expect_reply = 1;                break;        case RANAL_MSG_GET_REQ:                rc = kranal_map_buffer(tx);                LASSERT (rc != -EAGAIN);                if (rc != 0)                        break;                tx->tx_msg.ram_u.get.ragm_cookie = tx->tx_cookie;                tx->tx_msg.ram_u.get.ragm_desc.rard_key = tx->tx_map_key;                tx->tx_msg.ram_u.get.ragm_desc.rard_addr.AddressBits =                        (__u64)((unsigned long)tx->tx_buffer);                tx->tx_msg.ram_u.get.ragm_desc.rard_nob = tx->tx_nob;                rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);                expect_reply = 1;                break;        }        if (rc == -EAGAIN) {                /* I need credits to send this.  Replace tx at the head of the                 * fmaq and I'll get rescheduled when credits appear */                CDEBUG(D_NET, "EAGAIN on %p\n", conn);                spin_lock_irqsave(&conn->rac_lock, flags);                list_add(&tx->tx_list, &conn->rac_fmaq);                spin_unlock_irqrestore(&conn->rac_lock, flags);                return;        }        if (!expect_reply || rc != 0) {                kranal_tx_done(tx, rc);        } else {                /* LASSERT(current) above ensures this doesn't race with reply                 * processing */                spin_lock_irqsave(&conn->rac_lock, flags);                list_add_tail(&tx->tx_list, &conn->rac_replyq);                tx->tx_qtime = jiffies;                spin_unlock_irqrestore(&conn->rac_lock, flags);        }        if (more_to_do) {                CDEBUG(D_NET, "Rescheduling %p (more to do)\n", conn);                kranal_schedule_conn(conn);        }}static inline voidkranal_swab_rdma_desc (kra_rdma_desc_t *d){        __swab64s(&d->rard_key.Key);        __swab16s(&d->rard_key.Cookie);        __swab16s(&d->rard_key.MdHandle);        __swab32s(&d->rard_key.Flags);        __swab64s(&d->rard_addr.AddressBits);        __swab32s(&d->rard_nob);}kra_tx_t *kranal_match_reply(kra_conn_t *conn, int type, __u64 cookie){        struct list_head *ttmp;        kra_tx_t         *tx;        unsigned long     flags;        spin_lock_irqsave(&conn->rac_lock, flags);        list_for_each(ttmp, &conn->rac_replyq) {                tx = list_entry(ttmp, kra_tx_t, tx_list);                CDEBUG(D_NET,"Checking %p %02x/"LPX64"\n",                       tx, tx->tx_msg.ram_type, tx->tx_cookie);                if (tx->tx_cookie != cookie)                        continue;                if (tx->tx_msg.ram_type != type) {                        spin_unlock_irqrestore(&conn->rac_lock, flags);                        CWARN("Unexpected type %x (%x expected) "                              "matched reply from %s\n",                              tx->tx_msg.ram_type, type,                              libcfs_nid2str(conn->rac_peer->rap_nid));                        return NULL;                }                list_del(&tx->tx_list);                spin_unlock_irqrestore(&conn->rac_lock, flags);                return tx;        }        spin_unlock_irqrestore(&conn->rac_lock, flags);        CWARN("Unmatched reply %02x/"LPX64" from %s\n",              type, cookie, libcfs_nid2str(conn->rac_peer->rap_nid));        return NULL;}voidkranal_check_fma_rx (kra_conn_t *conn){        unsigned long flags;        __u32         seq;        kra_tx_t     *tx;        kra_msg_t    *msg;        void         *prefix;        RAP_RETURN    rrc = RapkFmaGetPrefix(conn->rac_rihandle, &prefix);        kra_peer_t   *peer = conn->rac_peer;        int           rc = 0;        int           repost = 1;        if (rrc == RAP_NOT_DONE)                return;        CDEBUG(D_NET, "RX on %p\n", conn);        LASSERT (rrc == RAP_SUCCESS);        conn->rac_last_rx = jiffies;        seq = conn->rac_rx_seq++;        msg = (kra_msg_t *)prefix;        /* stash message for portals callbacks they'll NULL         * rac_rxmsg if they consume it */        LASSERT (conn->rac_rxmsg == NULL);        conn->rac_rxmsg = msg;        if (msg->ram_magic != RANAL_MSG_MAGIC) {                if (__swab32(msg->ram_magic) != RANAL_MSG_MAGIC) {                        CERROR("Unexpected magic %08x from %s\n",                               msg->ram_magic, libcfs_nid2str(peer->rap_nid));                        rc = -EPROTO;                        goto out;                }                __swab32s(&msg->ram_magic);                __swab16s(&msg->ram_version);                __swab16s(&msg->ram_type);                __swab64s(&msg->ram_srcnid);                __swab64s(&msg->ram_connstamp);                __swab32s(&msg->ram_seq);                /* NB message type checked below; NOT here... */                switch (msg->ram_type) {                case RANAL_MSG_PUT_ACK:                        kranal_swab_rdma_desc(&msg->ram_u.putack.rapam_desc);                        break;                case RANAL_MSG_GET_REQ:                        kranal_swab_rdma_desc(&msg->ram_u.get.ragm_desc);                        break;                default:                        break;                }        }        if (msg->ram_version != RANAL_MSG_VERSION) {                CERROR("Unexpected protocol version %d from %s\n",                       msg->ram_version, libcfs_nid2str(peer->rap_nid));                rc = -EPROTO;                goto out;        }        if (msg->ram_srcnid != peer->rap_nid) {                CERROR("Unexpected peer %s from %s\n",                       libcfs_nid2str(msg->ram_srcnid),                        libcfs_nid2str(peer->rap_nid));                rc = -EPROTO;                goto out;        }        if (msg->ram_connstamp != conn->rac_peer_connstamp) {                CERROR("Unexpected connstamp "LPX64"("LPX64                       " expected) from %s\n",                       msg->ram_connstamp, conn->rac_peer_connstamp,                       libcfs_nid2str(peer->rap_nid));                rc = -EPROTO;                goto out;        }        if (msg->ram_seq != seq) {                CERROR("Unexpected sequence number %d(%d expected) from %s\n",                       msg->ram_seq, seq, libcfs_nid2str(peer->rap_nid));                rc = -EPROTO;                goto out;        }        if ((msg->ram_type & RANAL_MSG_FENCE) != 0) {                /* This message signals RDMA completion... */                rrc = RapkFmaSyncWait(conn->rac_rihandle);                if (rrc != RAP_SUCCESS) {                        CERROR("RapkFmaSyncWait failed: %d\n", rrc);                        rc = -ENETDOWN;                        goto out;                }        }        if (conn->rac_close_recvd) {                CERROR("Unexpected message %d after CLOSE from %s\n",                       msg->ram_type, libcfs_nid2str(conn->rac_peer->rap_nid));                rc = -EPROTO;                goto out;        }        if (msg->ram_type == RANAL_MSG_CLOSE) {                CWARN("RX CLOSE from %s\n", libcfs_nid2str(conn->rac_peer->rap_nid));                conn->rac_close_recvd = 1;                write_lock_irqsave(&kranal_data.kra_global_lock, flags);                if (conn->rac_state == RANAL_CONN_ESTABLISHED)                        kranal_close_conn_locked(conn, 0);                else if (conn->rac_state == RANAL_CONN_CLOSING &&                         conn->rac_close_sent)                        kranal_terminate_conn_locked(conn);                write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);                goto out;        }        if (conn->rac_state != RANAL_CONN_ESTABLISHED)                goto out;        switch (msg->ram_type) {        case RANAL_MSG_NOOP:                /* Nothing to do; just a keepalive */                CDEBUG(D_NET, "RX NOOP on %p\n", conn);                break;        case RANAL_MSG_IMMEDIATE:                CDEBUG(D_NET, "RX IMMEDIATE on %p\n", conn);                rc = lnet_parse(kranal_data.kra_ni, &msg->ram_u.immediate.raim_hdr,                                 msg->ram_srcnid, conn, 0);                repost = rc < 0;                break;        case RANAL_MSG_PUT_REQ:                CDEBUG(D_NET, "RX PUT_REQ on %p\n", conn);                rc = lnet_parse(kranal_data.kra_ni, &msg->ram_u.putreq.raprm_hdr,                                 msg->ram_srcnid, conn, 1);                repost = rc < 0;                break;        case RANAL_MSG_PUT_NAK:                CDEBUG(D_NET, "RX PUT_NAK on %p\n", conn);                tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,                                        msg->ram_u.completion.racm_cookie);                if (tx == NULL)                        break;                LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||                         tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);                kranal_tx_done(tx, -ENOENT);    /* no match */                break;        case RANAL_MSG_PUT_ACK:                CDEBUG(D_NET, "RX PUT_ACK on %p\n", conn);                tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,                                        msg->ram_u.putack.rapam_src_cookie);                if (tx == NULL)                        break;                kranal_rdma(tx, RANAL_MSG_PUT_DONE,                            &msg->ram_u.putack.rapam_desc,                            msg->ram_u.putack.rapam_desc.rard_nob,                            msg->ram_u.putack.rapam_dst_cookie);                break;        case RANAL_MSG_PUT_DONE:                CDEBUG(D_NET, "RX PUT_DONE on %p\n", conn);                tx = kranal_match_reply(conn, RANAL_MSG_PUT_ACK,                                        msg->ram_u.completion.racm_cookie);                if (tx == NULL)                        break;                LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||                         tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);                kranal_tx_done(tx, 0);                break;        case RANAL_MSG_GET_REQ:                CDEBUG(D_NET, "RX GET_REQ on %p\n", conn);                rc = lnet_parse(kranal_data.kra_ni, &msg->ram_u.get.ragm_hdr,                                 msg->ram_srcnid, conn, 1);                repost = rc < 0;                break;        case RANAL_MSG_GET_NAK:                CDEBUG(D_NET, "RX GET_NAK on %p\n", conn);                tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,                                        msg->ram_u.completion.racm_cookie);                if (tx == NULL)                        break;                LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||                         tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);                kranal_tx_done(tx, -ENOENT);    /* no match */                break;        case RANAL_MSG_GET_DONE:                CDEBUG(D_NET, "RX GET_DONE on %p\n", conn);                tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,                                        msg->ram_u.completion.racm_cookie);                if (tx == NULL)                        break;                LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||                         tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);#if 0                /* completion message should send rdma length if we ever allow                 * GET truncation */                lnet_set_reply_msg_len(kranal_data.kra_ni, tx->tx_lntmsg[1], ???);#endif                kranal_tx_done(tx, 0);                break;        } out:        if (rc < 0)                             /* protocol/comms error */                kranal_close_conn (conn, rc);        if (repost && conn->rac_rxmsg != NULL)                kranal_consume_rxmsg(conn, NULL, 0);        /* check again later */        kranal_schedule_conn(conn);}voidkranal_complete_closed_conn (kra_conn_t *conn){        kra_tx_t   *tx;        int         nfma;        int         nreplies;        LASSERT (conn->rac_state == RANAL_CONN_CLOSED);        LASSERT (list_empty(&conn->rac_list));        LASSERT (list_empty(&conn->rac_hashlist));        for (nfma = 0; !list_empty(&conn->rac_fmaq); nfma++) {                tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);                list_del(&tx->tx_list);                kranal_tx_done(tx, -ECONNABORTED);        }        LASSERT (list_empty(&conn->rac_rdmaq));        for (nreplies = 0; !list_empty(&conn->rac_replyq); nreplies++) {                tx = list_entry(conn->rac_replyq.next, kra_tx_t, tx_list);                list_del(&tx->tx_list);                kranal_tx_done(tx, -ECONNABORTED);        }        CWARN("Closed conn %p -> %s: nmsg %d nreplies %d\n",               conn, libcfs_nid2str(conn->rac_peer->rap_nid), nfma, nreplies);}intkranal_process_new_conn (kra_conn_t *conn){        RAP_RETURN   rrc;                rrc = RapkC

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -