⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ptllnd_peer.c

📁 非常经典的一个分布式系统
💻 C
📖 第 1 页 / 共 4 页
字号:
/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved. *   Author: PJ Kirner <pjkirner@clusterfs.com> *           E Barton <eeb@bartonsoftware.com> * *   This file is part of the Lustre file system, http://www.lustre.org *   Lustre is a trademark of Cluster File Systems, Inc. * *   This file is confidential source code owned by Cluster File Systems. *   No viewing, modification, compilation, redistribution, or any other *   form of use is permitted except through a signed license agreement. * *   If you have not signed such an agreement, then you have no rights to *   this file.  Please destroy it immediately and contact CFS. * */#include "ptllnd.h"#include <libcfs/list.h>static intkptllnd_count_queue(struct list_head *q){        struct list_head *e;        int               n = 0;                list_for_each(e, q) {                n++;        }        return n;}intkptllnd_get_peer_info(int index,                       lnet_process_id_t *id,                      int *state, int *sent_hello,                      int *refcount, __u64 *incarnation,                      __u64 *next_matchbits, __u64 *last_matchbits_seen,                      int *nsendq, int *nactiveq,                      int *credits, int *outstanding_credits) {        rwlock_t         *g_lock = &kptllnd_data.kptl_peer_rw_lock;        unsigned long     flags;        struct list_head *ptmp;        kptl_peer_t      *peer;        int               i;        int               rc = -ENOENT;        read_lock_irqsave(g_lock, flags);        for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++) {                                list_for_each (ptmp, &kptllnd_data.kptl_peers[i]) {                        peer = list_entry(ptmp, kptl_peer_t, peer_list);                        if (index-- > 0)                                continue;                                                *id          = peer->peer_id;                        *state       = peer->peer_state;                        *sent_hello  = peer->peer_sent_hello;                        *refcount    = atomic_read(&peer->peer_refcount);                        *incarnation = peer->peer_incarnation;                        spin_lock(&peer->peer_lock);                        *next_matchbits      = peer->peer_next_matchbits;                        *last_matchbits_seen = peer->peer_last_matchbits_seen;                        *credits             = peer->peer_credits;                        *outstanding_credits = peer->peer_outstanding_credits;                        *nsendq   = kptllnd_count_queue(&peer->peer_sendq);                        *nactiveq = kptllnd_count_queue(&peer->peer_activeq);                        spin_unlock(&peer->peer_lock);                        rc = 0;                        goto out;                }        }         out:        read_unlock_irqrestore(g_lock, flags);        return rc;}voidkptllnd_peer_add_peertable_locked (kptl_peer_t *peer){        LASSERT (!kptllnd_data.kptl_shutdown);        LASSERT (kptllnd_data.kptl_n_active_peers <                 kptllnd_data.kptl_expected_peers);        LASSERT (peer->peer_state == PEER_STATE_WAITING_HELLO ||                 peer->peer_state == PEER_STATE_ACTIVE);                kptllnd_data.kptl_n_active_peers++;        atomic_inc(&peer->peer_refcount);       /* +1 ref for the list */        /* NB add to HEAD of peer list for MRU order!         * (see kptllnd_cull_peertable) */        list_add(&peer->peer_list, kptllnd_nid2peerlist(peer->peer_id.nid));}voidkptllnd_cull_peertable_locked (lnet_process_id_t pid){        /* I'm about to add a new peer with this portals ID to the peer table,         * so (a) this peer should not exist already and (b) I want to leave at         * most (max_procs_per_nid - 1) peers with this NID in the table. */        struct list_head  *peers = kptllnd_nid2peerlist(pid.nid);        int                cull_count = *kptllnd_tunables.kptl_max_procs_per_node;        int                count;        struct list_head  *tmp;        struct list_head  *nxt;        kptl_peer_t       *peer;                count = 0;        list_for_each_safe (tmp, nxt, peers) {                /* NB I rely on kptllnd_peer_add_peertable_locked to add peers                 * in MRU order */                peer = list_entry(tmp, kptl_peer_t, peer_list);                                        if (peer->peer_id.nid != pid.nid)                        continue;                LASSERT (peer->peer_id.pid != pid.pid);                                        count++;                if (count < cull_count) /* recent (don't cull) */                        continue;                CDEBUG(D_NET, "Cull %s(%s)\n",                       libcfs_id2str(peer->peer_id),                       kptllnd_ptlid2str(peer->peer_ptlid));                                kptllnd_peer_close_locked(peer, 0);        }}kptl_peer_t *kptllnd_peer_allocate (lnet_process_id_t lpid, ptl_process_id_t ppid){        unsigned long    flags;        kptl_peer_t     *peer;        LIBCFS_ALLOC(peer, sizeof (*peer));        if (peer == NULL) {                CERROR("Can't create peer %s (%s)\n",                       libcfs_id2str(lpid),                        kptllnd_ptlid2str(ppid));                return NULL;        }        memset(peer, 0, sizeof(*peer));         /* zero flags etc */        INIT_LIST_HEAD (&peer->peer_sendq);        INIT_LIST_HEAD (&peer->peer_activeq);        spin_lock_init (&peer->peer_lock);        peer->peer_state = PEER_STATE_ALLOCATED;        peer->peer_error = 0;        peer->peer_last_alive = cfs_time_current();        peer->peer_id = lpid;        peer->peer_ptlid = ppid;        peer->peer_credits = 1;                 /* enough for HELLO */        peer->peer_next_matchbits = PTL_RESERVED_MATCHBITS;        peer->peer_outstanding_credits = *kptllnd_tunables.kptl_peercredits - 1;        peer->peer_sent_credits = 1;           /* HELLO credit is implicit */        peer->peer_max_msg_size = PTLLND_MIN_BUFFER_SIZE; /* until we know better */        atomic_set(&peer->peer_refcount, 1);    /* 1 ref for caller */        write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);        peer->peer_myincarnation = kptllnd_data.kptl_incarnation;        /* Only increase # peers under lock, to guarantee we dont grow it         * during shutdown */        if (kptllnd_data.kptl_shutdown) {                write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,                                         flags);                LIBCFS_FREE(peer, sizeof(*peer));                return NULL;        }        kptllnd_data.kptl_npeers++;        write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);                return peer;}voidkptllnd_peer_destroy (kptl_peer_t *peer){        unsigned long flags;                CDEBUG(D_NET, "Peer=%p\n", peer);        LASSERT (!in_interrupt());        LASSERT (atomic_read(&peer->peer_refcount) == 0);        LASSERT (peer->peer_state == PEER_STATE_ALLOCATED ||                 peer->peer_state == PEER_STATE_ZOMBIE);        LASSERT (list_empty(&peer->peer_sendq));        LASSERT (list_empty(&peer->peer_activeq));        write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);        if (peer->peer_state == PEER_STATE_ZOMBIE)                list_del(&peer->peer_list);        kptllnd_data.kptl_npeers--;        write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);        LIBCFS_FREE (peer, sizeof (*peer));}voidkptllnd_cancel_txlist (struct list_head *peerq, struct list_head *txs){        struct list_head  *tmp;        struct list_head  *nxt;        kptl_tx_t         *tx;        list_for_each_safe (tmp, nxt, peerq) {                tx = list_entry(tmp, kptl_tx_t, tx_list);                list_del(&tx->tx_list);                list_add_tail(&tx->tx_list, txs);                tx->tx_status = -EIO;                tx->tx_active = 0;        }}voidkptllnd_peer_cancel_txs(kptl_peer_t *peer, struct list_head *txs){        unsigned long   flags;        spin_lock_irqsave(&peer->peer_lock, flags);        kptllnd_cancel_txlist(&peer->peer_sendq, txs);        kptllnd_cancel_txlist(&peer->peer_activeq, txs);                        spin_unlock_irqrestore(&peer->peer_lock, flags);}voidkptllnd_peer_alive (kptl_peer_t *peer){        /* This is racy, but everyone's only writing cfs_time_current() */        peer->peer_last_alive = cfs_time_current();        mb();}voidkptllnd_peer_notify (kptl_peer_t *peer){        unsigned long flags;        time_t        last_alive = 0;        int           error = 0;                spin_lock_irqsave(&peer->peer_lock, flags);        if (peer->peer_error != 0) {                error = peer->peer_error;                peer->peer_error = 0;                                last_alive = cfs_time_current_sec() -                              cfs_duration_sec(cfs_time_current() -                                               peer->peer_last_alive);        }                spin_unlock_irqrestore(&peer->peer_lock, flags);        if (error != 0)                lnet_notify (kptllnd_data.kptl_ni, peer->peer_id.nid, 0,                             last_alive);}voidkptllnd_handle_closing_peers (){        unsigned long           flags;        struct list_head        txs;        kptl_peer_t            *peer;        struct list_head       *tmp;        struct list_head       *nxt;        kptl_tx_t              *tx;        int                     idle;        /* Check with a read lock first to avoid blocking anyone */        read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);        idle = list_empty(&kptllnd_data.kptl_closing_peers) &&               list_empty(&kptllnd_data.kptl_zombie_peers);        read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);        if (idle)                return;        INIT_LIST_HEAD(&txs);        write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);        /* Cancel txs on all zombie peers.  NB anyone dropping the last peer         * ref removes it from this list, so I musn't drop the lock while         * scanning it. */        list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) {                peer = list_entry (tmp, kptl_peer_t, peer_list);                LASSERT (peer->peer_state == PEER_STATE_ZOMBIE);                kptllnd_peer_cancel_txs(peer, &txs);        }        /* Notify LNET and cancel txs on closing (i.e. newly closed) peers.  NB         * I'm the only one removing from this list, but peers can be added on

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -