⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 o2iblnd_cb.c

📁 非常经典的一个分布式系统
💻 C
📖 第 1 页 / 共 5 页
字号:
/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * * Copyright (C) 2006 Cluster File Systems, Inc. *   Author: Eric Barton <eric@bartonsoftware.com> * *   This file is part of Lustre, http://www.lustre.org. * *   Lustre is free software; you can redistribute it and/or *   modify it under the terms of version 2 of the GNU General Public *   License as published by the Free Software Foundation. * *   Lustre is distributed in the hope that it will be useful, *   but WITHOUT ANY WARRANTY; without even the implied warranty of *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the *   GNU General Public License for more details. * *   You should have received a copy of the GNU General Public License *   along with Lustre; if not, write to the Free Software *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. * */#include "o2iblnd.h"char *kiblnd_msgtype2str(int type) {        switch (type) {        case IBLND_MSG_CONNREQ:                return "CONNREQ";                        case IBLND_MSG_CONNACK:                return "CONNACK";                        case IBLND_MSG_NOOP:                return "NOOP";                        case IBLND_MSG_IMMEDIATE:                return "IMMEDIATE";                        case IBLND_MSG_PUT_REQ:                return "PUT_REQ";                        case IBLND_MSG_PUT_NAK:                return "PUT_NAK";                        case IBLND_MSG_PUT_ACK:                return "PUT_ACK";                        case IBLND_MSG_PUT_DONE:                return "PUT_DONE";                        case IBLND_MSG_GET_REQ:                return "GET_REQ";                        case IBLND_MSG_GET_DONE:                return "GET_DONE";                        default:                return "???";        }}voidkiblnd_tx_done (lnet_ni_t *ni, kib_tx_t *tx){        lnet_msg_t *lntmsg[2];        kib_net_t  *net = ni->ni_data;        int         rc;        int         i;        LASSERT (net != NULL);        LASSERT (!in_interrupt());        LASSERT (!tx->tx_queued);               /* mustn't be queued for sending */        LASSERT (tx->tx_sending == 0);          /* mustn't be awaiting sent callback */        LASSERT (!tx->tx_waiting);              /* mustn't be awaiting peer response */#if IBLND_MAP_ON_DEMAND        if (tx->tx_fmr != NULL) {                rc = ib_fmr_pool_unmap(tx->tx_fmr);                LASSERT (rc == 0);                if (tx->tx_status != 0) {                        rc = ib_flush_fmr_pool(net->ibn_fmrpool);                        LASSERT (rc == 0);                }                tx->tx_fmr = NULL;        }#else        if (tx->tx_nfrags != 0) {                kiblnd_dma_unmap_sg(net->ibn_dev->ibd_cmid->device,                                    tx->tx_frags, tx->tx_nfrags, tx->tx_dmadir);                tx->tx_nfrags = 0;        }#endif        /* tx may have up to 2 lnet msgs to finalise */        lntmsg[0] = tx->tx_lntmsg[0]; tx->tx_lntmsg[0] = NULL;        lntmsg[1] = tx->tx_lntmsg[1]; tx->tx_lntmsg[1] = NULL;        rc = tx->tx_status;        if (tx->tx_conn != NULL) {                LASSERT (ni == tx->tx_conn->ibc_peer->ibp_ni);                kiblnd_conn_decref(tx->tx_conn);                tx->tx_conn = NULL;        }        tx->tx_nwrq = 0;        tx->tx_status = 0;        spin_lock(&net->ibn_tx_lock);        list_add(&tx->tx_list, &net->ibn_idle_txs);        spin_unlock(&net->ibn_tx_lock);        /* delay finalize until my descs have been freed */        for (i = 0; i < 2; i++) {                if (lntmsg[i] == NULL)                        continue;                lnet_finalize(ni, lntmsg[i], rc);        }}voidkiblnd_txlist_done (lnet_ni_t *ni, struct list_head *txlist, int status){        kib_tx_t *tx;                while (!list_empty (txlist)) {                tx = list_entry (txlist->next, kib_tx_t, tx_list);                list_del (&tx->tx_list);                /* complete now */                tx->tx_waiting = 0;                tx->tx_status = status;                kiblnd_tx_done(ni, tx);        }}kib_tx_t *kiblnd_get_idle_tx (lnet_ni_t *ni){        kib_net_t     *net = ni->ni_data;        kib_tx_t      *tx;        LASSERT (net != NULL);        spin_lock(&net->ibn_tx_lock);        if (list_empty(&net->ibn_idle_txs)) {                spin_unlock(&net->ibn_tx_lock);                return NULL;        }        tx = list_entry(net->ibn_idle_txs.next, kib_tx_t, tx_list);        list_del(&tx->tx_list);        /* Allocate a new completion cookie.  It might not be needed,         * but we've got a lock right now and we're unlikely to         * wrap... */        tx->tx_cookie = kiblnd_data.kib_next_tx_cookie++;        spin_unlock(&net->ibn_tx_lock);        LASSERT (tx->tx_nwrq == 0);        LASSERT (!tx->tx_queued);        LASSERT (tx->tx_sending == 0);        LASSERT (!tx->tx_waiting);        LASSERT (tx->tx_status == 0);        LASSERT (tx->tx_conn == NULL);        LASSERT (tx->tx_lntmsg[0] == NULL);        LASSERT (tx->tx_lntmsg[1] == NULL);#if IBLND_MAP_ON_DEMAND        LASSERT (tx->tx_fmr == NULL);#else        LASSERT (tx->tx_nfrags == 0);#endif        return tx;}voidkiblnd_drop_rx (kib_rx_t *rx){        kib_conn_t         *conn = rx->rx_conn;        unsigned long       flags;                spin_lock_irqsave(&kiblnd_data.kib_sched_lock, flags);        LASSERT (conn->ibc_nrx > 0);        conn->ibc_nrx--;        spin_unlock_irqrestore(&kiblnd_data.kib_sched_lock, flags);        kiblnd_conn_decref(conn);}intkiblnd_post_rx (kib_rx_t *rx, int credit){        kib_conn_t         *conn = rx->rx_conn;        kib_net_t          *net = conn->ibc_peer->ibp_ni->ni_data;        struct ib_recv_wr  *bad_wrq;        int                 rc;        LASSERT (net != NULL);        LASSERT (!in_interrupt());        LASSERT (credit == IBLND_POSTRX_NO_CREDIT ||                 credit == IBLND_POSTRX_PEER_CREDIT ||                 credit == IBLND_POSTRX_RSRVD_CREDIT);        rx->rx_sge.length = IBLND_MSG_SIZE;        rx->rx_sge.lkey = net->ibn_dev->ibd_mr->lkey;        rx->rx_sge.addr = rx->rx_msgaddr;        rx->rx_wrq.next = NULL;        rx->rx_wrq.sg_list = &rx->rx_sge;        rx->rx_wrq.num_sge = 1;        rx->rx_wrq.wr_id = kiblnd_ptr2wreqid(rx, IBLND_WID_RX);        LASSERT (conn->ibc_state >= IBLND_CONN_INIT);        LASSERT (rx->rx_nob >= 0);              /* not posted */        if (conn->ibc_state > IBLND_CONN_ESTABLISHED) {                kiblnd_drop_rx(rx);             /* No more posts for this rx */                return 0;        }        rx->rx_nob = -1;                        /* flag posted */        rc = ib_post_recv(conn->ibc_cmid->qp, &rx->rx_wrq, &bad_wrq);        if (conn->ibc_state < IBLND_CONN_ESTABLISHED) /* Initial post */                return rc;        if (rc != 0) {                CERROR("Can't post rx for %s: %d\n",                       libcfs_nid2str(conn->ibc_peer->ibp_nid), rc);                kiblnd_close_conn(conn, rc);                kiblnd_drop_rx(rx);             /* No more posts for this rx */                return rc;        }        if (credit == IBLND_POSTRX_NO_CREDIT)                return 0;        spin_lock(&conn->ibc_lock);        if (credit == IBLND_POSTRX_PEER_CREDIT)                conn->ibc_outstanding_credits++;        else                conn->ibc_reserved_credits++;        spin_unlock(&conn->ibc_lock);        kiblnd_check_sends(conn);        return 0;}kib_tx_t *kiblnd_find_waiting_tx_locked(kib_conn_t *conn, int txtype, __u64 cookie){        struct list_head   *tmp;        list_for_each(tmp, &conn->ibc_active_txs) {                kib_tx_t *tx = list_entry(tmp, kib_tx_t, tx_list);                LASSERT (!tx->tx_queued);                LASSERT (tx->tx_sending != 0 || tx->tx_waiting);                if (tx->tx_cookie != cookie)                        continue;                if (tx->tx_waiting &&                    tx->tx_msg->ibm_type == txtype)                        return tx;                CWARN("Bad completion: %swaiting, type %x (wanted %x)\n",                      tx->tx_waiting ? "" : "NOT ",                      tx->tx_msg->ibm_type, txtype);        }        return NULL;}voidkiblnd_handle_completion(kib_conn_t *conn, int txtype, int status, __u64 cookie){        kib_tx_t    *tx;        lnet_ni_t   *ni = conn->ibc_peer->ibp_ni;        int          idle;        spin_lock(&conn->ibc_lock);        tx = kiblnd_find_waiting_tx_locked(conn, txtype, cookie);        if (tx == NULL) {                spin_unlock(&conn->ibc_lock);                CWARN("Unmatched completion type %x cookie "LPX64" from %s\n",                      txtype, cookie, libcfs_nid2str(conn->ibc_peer->ibp_nid));                kiblnd_close_conn(conn, -EPROTO);                return;        }        if (tx->tx_status == 0) {               /* success so far */                if (status < 0) {               /* failed? */                        tx->tx_status = status;                } else if (txtype == IBLND_MSG_GET_REQ) {                        lnet_set_reply_msg_len(ni, tx->tx_lntmsg[1], status);                }        }        tx->tx_waiting = 0;        idle = !tx->tx_queued && (tx->tx_sending == 0);        if (idle)                list_del(&tx->tx_list);        spin_unlock(&conn->ibc_lock);        if (idle)                kiblnd_tx_done(ni, tx);}voidkiblnd_send_completion (kib_conn_t *conn, int type, int status, __u64 cookie){        lnet_ni_t   *ni = conn->ibc_peer->ibp_ni;        kib_tx_t    *tx = kiblnd_get_idle_tx(ni);        if (tx == NULL) {                CERROR("Can't get tx for completion %x for %s\n",                       type, libcfs_nid2str(conn->ibc_peer->ibp_nid));                return;        }        tx->tx_msg->ibm_u.completion.ibcm_status = status;        tx->tx_msg->ibm_u.completion.ibcm_cookie = cookie;        kiblnd_init_tx_msg(ni, tx, type, sizeof(kib_completion_msg_t));        kiblnd_queue_tx(tx, conn);}voidkiblnd_handle_rx (kib_rx_t *rx){        kib_msg_t    *msg = rx->rx_msg;        kib_conn_t   *conn = rx->rx_conn;        lnet_ni_t    *ni = conn->ibc_peer->ibp_ni;        int           credits = msg->ibm_credits;        kib_tx_t     *tx;        int           rc = 0;        int           rc2;        int           post_credit;        LASSERT (conn->ibc_state >= IBLND_CONN_ESTABLISHED);        CDEBUG (D_NET, "Received %x[%d] from %s\n",                msg->ibm_type, credits, libcfs_nid2str(conn->ibc_peer->ibp_nid));        if (credits != 0) {                /* Have I received credits that will let me send? */                spin_lock(&conn->ibc_lock);                if (conn->ibc_credits + credits > IBLND_MSG_QUEUE_SIZE) {                        rc2 = conn->ibc_credits;                        spin_unlock(&conn->ibc_lock);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -