⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ptllnd_cb.c

📁 非常经典的一个分布式系统
💻 C
📖 第 1 页 / 共 5 页
字号:
/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved. *   Author: Eric Barton <eeb@bartonsoftware.com> * *   This file is part of the Lustre file system, http://www.lustre.org *   Lustre is a trademark of Cluster File Systems, Inc. * *   This file is confidential source code owned by Cluster File Systems. *   No viewing, modification, compilation, redistribution, or any other *   form of use is permitted except through a signed license agreement. * *   If you have not signed such an agreement, then you have no rights to *   this file.  Please destroy it immediately and contact CFS. * */#include "ptllnd.h"voidptllnd_set_tx_deadline(ptllnd_tx_t *tx){        ptllnd_peer_t  *peer = tx->tx_peer;        lnet_ni_t      *ni = peer->plp_ni;        ptllnd_ni_t    *plni = ni->ni_data;        tx->tx_deadline = cfs_time_current_sec() + plni->plni_timeout;}voidptllnd_post_tx(ptllnd_tx_t *tx){        ptllnd_peer_t  *peer = tx->tx_peer;        ptllnd_set_tx_deadline(tx);        list_add_tail(&tx->tx_list, &peer->plp_txq);        ptllnd_check_sends(peer);}char *ptllnd_ptlid2str(ptl_process_id_t id){        static char strs[8][32];        static int  idx = 0;        char   *str = strs[idx++];                if (idx >= sizeof(strs)/sizeof(strs[0]))                idx = 0;        snprintf(str, sizeof(strs[0]), FMT_PTLID, id.pid, id.nid);        return str;}voidptllnd_destroy_peer(ptllnd_peer_t *peer){        lnet_ni_t         *ni = peer->plp_ni;        ptllnd_ni_t       *plni = ni->ni_data;        int                nmsg = peer->plp_lazy_credits +                                  plni->plni_peer_credits;        ptllnd_size_buffers(ni, -nmsg);        LASSERT (peer->plp_closing);        LASSERT (plni->plni_npeers > 0);        LASSERT (list_empty(&peer->plp_txq));        LASSERT (list_empty(&peer->plp_activeq));        plni->plni_npeers--;        LIBCFS_FREE(peer, sizeof(*peer));}voidptllnd_abort_txs(ptllnd_ni_t *plni, struct list_head *q){        while (!list_empty(q)) {                ptllnd_tx_t *tx = list_entry(q->next, ptllnd_tx_t, tx_list);                tx->tx_status = -ESHUTDOWN;                list_del(&tx->tx_list);                list_add_tail(&tx->tx_list, &plni->plni_zombie_txs);        }}voidptllnd_close_peer(ptllnd_peer_t *peer, int error){        lnet_ni_t   *ni = peer->plp_ni;        ptllnd_ni_t *plni = ni->ni_data;        if (peer->plp_closing)                return;        peer->plp_closing = 1;        if (!list_empty(&peer->plp_txq) ||            !list_empty(&peer->plp_activeq) ||            error != 0) {                CWARN("Closing %s\n", libcfs_id2str(peer->plp_id));                if (plni->plni_debug)                        ptllnd_dump_debug(ni, peer->plp_id);        }                ptllnd_abort_txs(plni, &peer->plp_txq);        ptllnd_abort_txs(plni, &peer->plp_activeq);        list_del(&peer->plp_list);        ptllnd_peer_decref(peer);}ptllnd_peer_t *ptllnd_find_peer(lnet_ni_t *ni, lnet_process_id_t id, int create){        ptllnd_ni_t       *plni = ni->ni_data;        unsigned int       hash = LNET_NIDADDR(id.nid) % plni->plni_peer_hash_size;        struct list_head  *tmp;        ptllnd_peer_t     *plp;        ptllnd_tx_t       *tx;        int                rc;        LASSERT (LNET_NIDNET(id.nid) == LNET_NIDNET(ni->ni_nid));        list_for_each(tmp, &plni->plni_peer_hash[hash]) {                plp = list_entry(tmp, ptllnd_peer_t, plp_list);                if (plp->plp_id.nid == id.nid &&                    plp->plp_id.pid == id.pid) {                        ptllnd_peer_addref(plp);                        return plp;                }        }        if (!create)                return NULL;        /* New peer: check first for enough posted buffers */        plni->plni_npeers++;        rc = ptllnd_size_buffers(ni, plni->plni_peer_credits);        if (rc != 0) {                plni->plni_npeers--;                return NULL;        }        LIBCFS_ALLOC(plp, sizeof(*plp));        if (plp == NULL) {                CERROR("Can't allocate new peer %s\n", libcfs_id2str(id));                plni->plni_npeers--;                ptllnd_size_buffers(ni, -plni->plni_peer_credits);                return NULL;        }        plp->plp_ni = ni;        plp->plp_id = id;        plp->plp_ptlid.nid = LNET_NIDADDR(id.nid);        plp->plp_ptlid.pid = plni->plni_ptllnd_pid;        plp->plp_credits = 1; /* add more later when she gives me credits */        plp->plp_max_msg_size = plni->plni_max_msg_size; /* until I hear from her */        plp->plp_sent_credits = 1;              /* Implicit credit for HELLO */        plp->plp_outstanding_credits = plni->plni_peer_credits - 1;        plp->plp_lazy_credits = 0;        plp->plp_extra_lazy_credits = 0;        plp->plp_match = 0;        plp->plp_stamp = 0;        plp->plp_recvd_hello = 0;        plp->plp_closing = 0;        plp->plp_refcount = 1;        CFS_INIT_LIST_HEAD(&plp->plp_list);        CFS_INIT_LIST_HEAD(&plp->plp_txq);        CFS_INIT_LIST_HEAD(&plp->plp_activeq);        ptllnd_peer_addref(plp);        list_add_tail(&plp->plp_list, &plni->plni_peer_hash[hash]);        tx = ptllnd_new_tx(plp, PTLLND_MSG_TYPE_HELLO, 0);        if (tx == NULL) {                CERROR("Can't send HELLO to %s\n", libcfs_id2str(id));                ptllnd_close_peer(plp, -ENOMEM);                ptllnd_peer_decref(plp);                return NULL;        }        tx->tx_msg.ptlm_u.hello.kptlhm_matchbits = PTL_RESERVED_MATCHBITS;        tx->tx_msg.ptlm_u.hello.kptlhm_max_msg_size = plni->plni_max_msg_size;        PTLLND_HISTORY("%s[%d/%d+%d(%d)]: post hello %p", libcfs_id2str(id),                       tx->tx_peer->plp_credits,                       tx->tx_peer->plp_outstanding_credits,                       tx->tx_peer->plp_sent_credits,                       plni->plni_peer_credits +                        tx->tx_peer->plp_lazy_credits, tx);        ptllnd_post_tx(tx);        return plp;}intptllnd_count_q(struct list_head *q){        struct list_head *e;        int               n = 0;                list_for_each(e, q) {                n++;        }                return n;}const char *ptllnd_tx_typestr(int type) {        switch (type) {        case PTLLND_RDMA_WRITE:                return "rdma_write";                        case PTLLND_RDMA_READ:                return "rdma_read";        case PTLLND_MSG_TYPE_PUT:                return "put_req";                        case PTLLND_MSG_TYPE_GET:                return "get_req";        case PTLLND_MSG_TYPE_IMMEDIATE:                return "immediate";        case PTLLND_MSG_TYPE_NOOP:                return "noop";        case PTLLND_MSG_TYPE_HELLO:                return "hello";        default:                return "<unknown>";        }}voidptllnd_debug_tx(ptllnd_tx_t *tx) {        CDEBUG(D_WARNING, "%s %s b %ld.%06ld/%ld.%06ld"               " r %ld.%06ld/%ld.%06ld status %d\n",               ptllnd_tx_typestr(tx->tx_type),               libcfs_id2str(tx->tx_peer->plp_id),               tx->tx_bulk_posted.tv_sec, tx->tx_bulk_posted.tv_usec,                tx->tx_bulk_done.tv_sec, tx->tx_bulk_done.tv_usec,               tx->tx_req_posted.tv_sec, tx->tx_req_posted.tv_usec,               tx->tx_req_done.tv_sec, tx->tx_req_done.tv_usec,               tx->tx_status);}voidptllnd_debug_peer(lnet_ni_t *ni, lnet_process_id_t id){        ptllnd_peer_t    *plp = ptllnd_find_peer(ni, id, 0);        struct list_head *tmp;        ptllnd_ni_t      *plni = ni->ni_data;        ptllnd_tx_t      *tx;                if (plp == NULL) {                CDEBUG(D_WARNING, "No peer %s\n", libcfs_id2str(id));                return;        }                CDEBUG(D_WARNING, "%s %s%s [%d] "LPU64".%06d m "LPU64" q %d/%d c %d/%d+%d(%d)\n",               libcfs_id2str(id),                plp->plp_recvd_hello ? "H" : "_",               plp->plp_closing     ? "C" : "_",               plp->plp_refcount,               plp->plp_stamp / 1000000, (int)(plp->plp_stamp % 1000000),               plp->plp_match,               ptllnd_count_q(&plp->plp_txq),               ptllnd_count_q(&plp->plp_activeq),               plp->plp_credits, plp->plp_outstanding_credits, plp->plp_sent_credits,               plni->plni_peer_credits + plp->plp_lazy_credits);        CDEBUG(D_WARNING, "txq:\n");        list_for_each (tmp, &plp->plp_txq) {                tx = list_entry(tmp, ptllnd_tx_t, tx_list);                                ptllnd_debug_tx(tx);        }        CDEBUG(D_WARNING, "activeq:\n");        list_for_each (tmp, &plp->plp_activeq) {                tx = list_entry(tmp, ptllnd_tx_t, tx_list);                                ptllnd_debug_tx(tx);        }        CDEBUG(D_WARNING, "zombies:\n");        list_for_each (tmp, &plni->plni_zombie_txs) {                tx = list_entry(tmp, ptllnd_tx_t, tx_list);                                if (tx->tx_peer->plp_id.nid == id.nid &&                    tx->tx_peer->plp_id.pid == id.pid)                        ptllnd_debug_tx(tx);        }                CDEBUG(D_WARNING, "history:\n");        list_for_each (tmp, &plni->plni_tx_history) {                tx = list_entry(tmp, ptllnd_tx_t, tx_list);                                if (tx->tx_peer->plp_id.nid == id.nid &&                    tx->tx_peer->plp_id.pid == id.pid)                        ptllnd_debug_tx(tx);        }                ptllnd_peer_decref(plp);}voidptllnd_dump_debug(lnet_ni_t *ni, lnet_process_id_t id){        ptllnd_debug_peer(ni, id);        ptllnd_dump_history();}voidptllnd_notify(lnet_ni_t *ni, lnet_nid_t nid, int alive){        lnet_process_id_t  id;        ptllnd_peer_t     *peer;        time_t             start = cfs_time_current_sec();        ptllnd_ni_t       *plni = ni->ni_data;        int                w = plni->plni_long_wait;        /* This is only actually used to connect to routers at startup! */        LASSERT(alive);        id.nid = nid;        id.pid = LUSTRE_SRV_LNET_PID;                peer = ptllnd_find_peer(ni, id, 1);        if (peer == NULL)                return;        /* wait for the peer to reply */        while (!peer->plp_recvd_hello) {                if (w > 0 && cfs_time_current_sec() > start + w/1000) {                        CWARN("Waited %ds to connect to %s\n",                              (int)(cfs_time_current_sec() - start),                              libcfs_id2str(id));                        w *= 2;                }                                ptllnd_wait(ni, w);        }                ptllnd_peer_decref(peer);}intptllnd_setasync(lnet_ni_t *ni, lnet_process_id_t id, int nasync){        ptllnd_peer_t *peer = ptllnd_find_peer(ni, id, nasync > 0);        int            rc;                if (peer == NULL)                return -ENOMEM;        LASSERT (peer->plp_lazy_credits >= 0);        LASSERT (peer->plp_extra_lazy_credits >= 0);        /* If nasync < 0, we're being told we can reduce the total message         * headroom.  We can't do this right now because our peer might already         * have credits for the extra buffers, so we just account the extra         * headroom in case we need it later and only destroy buffers when the

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -