⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 gmlnd_comm.c

📁 非常经典的一个分布式系统
💻 C
📖 第 1 页 / 共 2 页
字号:
/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * * Copyright (c) 2003 Los Alamos National Laboratory (LANL) * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved. * *   This file is part of Lustre, http://www.lustre.org/ * *   Lustre is free software; you can redistribute it and/or *   modify it under the terms of version 2 of the GNU General Public *   License as published by the Free Software Foundation. * *   Lustre is distributed in the hope that it will be useful, *   but WITHOUT ANY WARRANTY; without even the implied warranty of *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the *   GNU General Public License for more details. * *   You should have received a copy of the GNU General Public License *   along with Lustre; if not, write to the Free Software *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. *//* *	This file contains all gmnal send and receive functions */#include "gmlnd.h"voidgmnal_notify_peer_down(gmnal_tx_t *tx){        struct timeval     now;        time_t             then;        do_gettimeofday (&now);        then = now.tv_sec - (jiffies - tx->tx_launchtime)/HZ;        lnet_notify(tx->tx_gmni->gmni_ni, tx->tx_nid, 0, then);}voidgmnal_pack_msg(gmnal_ni_t *gmni, gmnal_msg_t *msg,               lnet_nid_t dstnid, int type){        /* CAVEAT EMPTOR! this only sets the common message fields. */        msg->gmm_magic    = GMNAL_MSG_MAGIC;        msg->gmm_version  = GMNAL_MSG_VERSION;        msg->gmm_type     = type;        msg->gmm_srcnid   = lnet_ptlcompat_srcnid(gmni->gmni_ni->ni_nid,                                                  dstnid);        msg->gmm_dstnid   = dstnid;}intgmnal_unpack_msg(gmnal_ni_t *gmni, gmnal_rx_t *rx){        gmnal_msg_t *msg = GMNAL_NETBUF_MSG(&rx->rx_buf);        const int    hdr_size = offsetof(gmnal_msg_t, gmm_u);        int          buffnob = rx->rx_islarge ? gmni->gmni_large_msgsize :                                                gmni->gmni_small_msgsize;        int          flip;        /* rc = 0:SUCCESS -ve:failure +ve:version mismatch */        /* GM may not overflow our buffer */        LASSERT (rx->rx_recv_nob <= buffnob);        /* 6 bytes are enough to have received magic + version */        if (rx->rx_recv_nob < 6) {                CERROR("Short message from gmid %u: %d\n",                        rx->rx_recv_gmid, rx->rx_recv_nob);                return -EPROTO;        }        if (msg->gmm_magic == GMNAL_MSG_MAGIC) {                flip = 0;        } else if (msg->gmm_magic == __swab32(GMNAL_MSG_MAGIC)) {                flip = 1;        } else if (msg->gmm_magic == LNET_PROTO_MAGIC ||                   msg->gmm_magic == __swab32(LNET_PROTO_MAGIC)) {                return EPROTO;        } else {                CERROR("Bad magic from gmid %u: %08x\n",                        rx->rx_recv_gmid, msg->gmm_magic);                return -EPROTO;        }        if (msg->gmm_version !=             (flip ? __swab16(GMNAL_MSG_VERSION) : GMNAL_MSG_VERSION)) {                return EPROTO;        }        if (rx->rx_recv_nob < hdr_size) {                CERROR("Short message from %u: %d\n",                       rx->rx_recv_gmid, rx->rx_recv_nob);                return -EPROTO;        }        if (flip) {                /* leave magic unflipped as a clue to peer endianness */                __swab16s(&msg->gmm_version);                __swab16s(&msg->gmm_type);                __swab64s(&msg->gmm_srcnid);                __swab64s(&msg->gmm_dstnid);        }                if (msg->gmm_srcnid == LNET_NID_ANY) {                CERROR("Bad src nid from %u: %s\n",                        rx->rx_recv_gmid, libcfs_nid2str(msg->gmm_srcnid));                return -EPROTO;        }        if (!lnet_ptlcompat_matchnid(gmni->gmni_ni->ni_nid,                                      msg->gmm_dstnid)) {                CERROR("Bad dst nid from %u: %s\n",                       rx->rx_recv_gmid, libcfs_nid2str(msg->gmm_dstnid));                return -EPROTO;        }                switch (msg->gmm_type) {        default:                CERROR("Unknown message type from %u: %x\n",                        rx->rx_recv_gmid, msg->gmm_type);                return -EPROTO;                        case GMNAL_MSG_IMMEDIATE:                if (rx->rx_recv_nob < offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[0])) {                        CERROR("Short IMMEDIATE from %u: %d("LPSZ")\n",                                rx->rx_recv_gmid, rx->rx_recv_nob,                                offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[0]));                        return -EPROTO;                }                break;        }        return 0;}gmnal_tx_t *gmnal_get_tx(gmnal_ni_t *gmni){	gmnal_tx_t	 *tx = NULL;        spin_lock(&gmni->gmni_tx_lock);        if (gmni->gmni_shutdown ||            list_empty(&gmni->gmni_idle_txs)) {                spin_unlock(&gmni->gmni_tx_lock);                return NULL;        }                tx = list_entry(gmni->gmni_idle_txs.next, gmnal_tx_t, tx_list);        list_del(&tx->tx_list);        spin_unlock(&gmni->gmni_tx_lock);        LASSERT (tx->tx_lntmsg == NULL);        LASSERT (tx->tx_ltxb == NULL);        LASSERT (!tx->tx_credit);                return tx;}voidgmnal_tx_done(gmnal_tx_t *tx, int rc){	gmnal_ni_t *gmni = tx->tx_gmni;        int         wake_sched = 0;        lnet_msg_t *lnetmsg = tx->tx_lntmsg;                tx->tx_lntmsg = NULL;        spin_lock(&gmni->gmni_tx_lock);                if (tx->tx_ltxb != NULL) {                wake_sched = 1;                list_add_tail(&tx->tx_ltxb->txb_list, &gmni->gmni_idle_ltxbs);                tx->tx_ltxb = NULL;        }                if (tx->tx_credit) {                wake_sched = 1;                gmni->gmni_tx_credits++;                tx->tx_credit = 0;        }                list_add_tail(&tx->tx_list, &gmni->gmni_idle_txs);        if (wake_sched)                gmnal_check_txqueues_locked(gmni);        spin_unlock(&gmni->gmni_tx_lock);        /* Delay finalize until tx is free */        if (lnetmsg != NULL)                lnet_finalize(gmni->gmni_ni, lnetmsg, rc);}void gmnal_drop_sends_callback(struct gm_port *gm_port, void *context,                           gm_status_t status){	gmnal_tx_t	*tx = (gmnal_tx_t*)context;        LASSERT(!in_interrupt());                 CDEBUG(D_NET, "status for tx [%p] is [%d][%s], nid %s\n",                tx, status, gmnal_gmstatus2str(status),               libcfs_nid2str(tx->tx_nid));        gmnal_tx_done(tx, -EIO);}void gmnal_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status){	gmnal_tx_t	*tx = (gmnal_tx_t*)context;	gmnal_ni_t	*gmni = tx->tx_gmni;        LASSERT(!in_interrupt());	switch(status) {        case GM_SUCCESS:                gmnal_tx_done(tx, 0);                return;        case GM_SEND_DROPPED:                CDEBUG(D_NETERROR, "Dropped tx %p to %s\n",                        tx, libcfs_nid2str(tx->tx_nid));                /* Another tx failed and called gm_drop_sends() which made this                 * one complete immediately */                gmnal_tx_done(tx, -EIO);                return;                                default:                /* Some error; NB don't complete tx yet; we need its credit for                 * gm_drop_sends() */                CDEBUG(D_NETERROR, "tx %p error %d(%s), nid %s\n",                       tx, status, gmnal_gmstatus2str(status),                        libcfs_nid2str(tx->tx_nid));                gmnal_notify_peer_down(tx);                spin_lock(&gmni->gmni_gm_lock);                gm_drop_sends(gmni->gmni_port,                               tx->tx_ltxb != NULL ?                              GMNAL_LARGE_PRIORITY : GMNAL_SMALL_PRIORITY,                              tx->tx_gmlid, *gmnal_tunables.gm_port,                               gmnal_drop_sends_callback, tx);                spin_unlock(&gmni->gmni_gm_lock);		return;	}        /* not reached */        LBUG();}voidgmnal_check_txqueues_locked (gmnal_ni_t *gmni){        gmnal_tx_t    *tx;        gmnal_txbuf_t *ltxb;        int            gmsize;        int            pri;        void          *netaddr;                tx = list_empty(&gmni->gmni_buf_txq) ? NULL :             list_entry(gmni->gmni_buf_txq.next, gmnal_tx_t, tx_list);        if (tx != NULL &&            (tx->tx_large_nob == 0 ||              !list_empty(&gmni->gmni_idle_ltxbs))) {                /* consume tx */                list_del(&tx->tx_list);                                LASSERT (tx->tx_ltxb == NULL);                if (tx->tx_large_nob != 0) {                        ltxb = list_entry(gmni->gmni_idle_ltxbs.next,                                          gmnal_txbuf_t, txb_list);                        /* consume large buffer */                        list_del(&ltxb->txb_list);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -