📄 ptllnd_peer.c
字号:
/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved. * Author: PJ Kirner <pjkirner@clusterfs.com> * E Barton <eeb@bartonsoftware.com> * * This file is part of the Lustre file system, http://www.lustre.org * Lustre is a trademark of Cluster File Systems, Inc. * * This file is confidential source code owned by Cluster File Systems. * No viewing, modification, compilation, redistribution, or any other * form of use is permitted except through a signed license agreement. * * If you have not signed such an agreement, then you have no rights to * this file. Please destroy it immediately and contact CFS. * */#include "ptllnd.h"#include <libcfs/list.h>static intkptllnd_count_queue(struct list_head *q){ struct list_head *e; int n = 0; list_for_each(e, q) { n++; } return n;}intkptllnd_get_peer_info(int index, lnet_process_id_t *id, int *state, int *sent_hello, int *refcount, __u64 *incarnation, __u64 *next_matchbits, __u64 *last_matchbits_seen, int *nsendq, int *nactiveq, int *credits, int *outstanding_credits) { rwlock_t *g_lock = &kptllnd_data.kptl_peer_rw_lock; unsigned long flags; struct list_head *ptmp; kptl_peer_t *peer; int i; int rc = -ENOENT; read_lock_irqsave(g_lock, flags); for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++) { list_for_each (ptmp, &kptllnd_data.kptl_peers[i]) { peer = list_entry(ptmp, kptl_peer_t, peer_list); if (index-- > 0) continue; *id = peer->peer_id; *state = peer->peer_state; *sent_hello = peer->peer_sent_hello; *refcount = atomic_read(&peer->peer_refcount); *incarnation = peer->peer_incarnation; spin_lock(&peer->peer_lock); *next_matchbits = peer->peer_next_matchbits; *last_matchbits_seen = peer->peer_last_matchbits_seen; *credits = peer->peer_credits; *outstanding_credits = peer->peer_outstanding_credits; *nsendq = kptllnd_count_queue(&peer->peer_sendq); *nactiveq = kptllnd_count_queue(&peer->peer_activeq); spin_unlock(&peer->peer_lock); rc = 0; goto out; } } out: read_unlock_irqrestore(g_lock, flags); return rc;}voidkptllnd_peer_add_peertable_locked (kptl_peer_t *peer){ LASSERT (!kptllnd_data.kptl_shutdown); LASSERT (kptllnd_data.kptl_n_active_peers < kptllnd_data.kptl_expected_peers); LASSERT (peer->peer_state == PEER_STATE_WAITING_HELLO || peer->peer_state == PEER_STATE_ACTIVE); kptllnd_data.kptl_n_active_peers++; atomic_inc(&peer->peer_refcount); /* +1 ref for the list */ /* NB add to HEAD of peer list for MRU order! * (see kptllnd_cull_peertable) */ list_add(&peer->peer_list, kptllnd_nid2peerlist(peer->peer_id.nid));}voidkptllnd_cull_peertable_locked (lnet_process_id_t pid){ /* I'm about to add a new peer with this portals ID to the peer table, * so (a) this peer should not exist already and (b) I want to leave at * most (max_procs_per_nid - 1) peers with this NID in the table. */ struct list_head *peers = kptllnd_nid2peerlist(pid.nid); int cull_count = *kptllnd_tunables.kptl_max_procs_per_node; int count; struct list_head *tmp; struct list_head *nxt; kptl_peer_t *peer; count = 0; list_for_each_safe (tmp, nxt, peers) { /* NB I rely on kptllnd_peer_add_peertable_locked to add peers * in MRU order */ peer = list_entry(tmp, kptl_peer_t, peer_list); if (peer->peer_id.nid != pid.nid) continue; LASSERT (peer->peer_id.pid != pid.pid); count++; if (count < cull_count) /* recent (don't cull) */ continue; CDEBUG(D_NET, "Cull %s(%s)\n", libcfs_id2str(peer->peer_id), kptllnd_ptlid2str(peer->peer_ptlid)); kptllnd_peer_close_locked(peer, 0); }}kptl_peer_t *kptllnd_peer_allocate (lnet_process_id_t lpid, ptl_process_id_t ppid){ unsigned long flags; kptl_peer_t *peer; LIBCFS_ALLOC(peer, sizeof (*peer)); if (peer == NULL) { CERROR("Can't create peer %s (%s)\n", libcfs_id2str(lpid), kptllnd_ptlid2str(ppid)); return NULL; } memset(peer, 0, sizeof(*peer)); /* zero flags etc */ INIT_LIST_HEAD (&peer->peer_sendq); INIT_LIST_HEAD (&peer->peer_activeq); spin_lock_init (&peer->peer_lock); peer->peer_state = PEER_STATE_ALLOCATED; peer->peer_error = 0; peer->peer_last_alive = cfs_time_current(); peer->peer_id = lpid; peer->peer_ptlid = ppid; peer->peer_credits = 1; /* enough for HELLO */ peer->peer_next_matchbits = PTL_RESERVED_MATCHBITS; peer->peer_outstanding_credits = *kptllnd_tunables.kptl_peercredits - 1; peer->peer_sent_credits = 1; /* HELLO credit is implicit */ peer->peer_max_msg_size = PTLLND_MIN_BUFFER_SIZE; /* until we know better */ atomic_set(&peer->peer_refcount, 1); /* 1 ref for caller */ write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags); peer->peer_myincarnation = kptllnd_data.kptl_incarnation; /* Only increase # peers under lock, to guarantee we dont grow it * during shutdown */ if (kptllnd_data.kptl_shutdown) { write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags); LIBCFS_FREE(peer, sizeof(*peer)); return NULL; } kptllnd_data.kptl_npeers++; write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags); return peer;}voidkptllnd_peer_destroy (kptl_peer_t *peer){ unsigned long flags; CDEBUG(D_NET, "Peer=%p\n", peer); LASSERT (!in_interrupt()); LASSERT (atomic_read(&peer->peer_refcount) == 0); LASSERT (peer->peer_state == PEER_STATE_ALLOCATED || peer->peer_state == PEER_STATE_ZOMBIE); LASSERT (list_empty(&peer->peer_sendq)); LASSERT (list_empty(&peer->peer_activeq)); write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags); if (peer->peer_state == PEER_STATE_ZOMBIE) list_del(&peer->peer_list); kptllnd_data.kptl_npeers--; write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags); LIBCFS_FREE (peer, sizeof (*peer));}voidkptllnd_cancel_txlist (struct list_head *peerq, struct list_head *txs){ struct list_head *tmp; struct list_head *nxt; kptl_tx_t *tx; list_for_each_safe (tmp, nxt, peerq) { tx = list_entry(tmp, kptl_tx_t, tx_list); list_del(&tx->tx_list); list_add_tail(&tx->tx_list, txs); tx->tx_status = -EIO; tx->tx_active = 0; }}voidkptllnd_peer_cancel_txs(kptl_peer_t *peer, struct list_head *txs){ unsigned long flags; spin_lock_irqsave(&peer->peer_lock, flags); kptllnd_cancel_txlist(&peer->peer_sendq, txs); kptllnd_cancel_txlist(&peer->peer_activeq, txs); spin_unlock_irqrestore(&peer->peer_lock, flags);}voidkptllnd_peer_alive (kptl_peer_t *peer){ /* This is racy, but everyone's only writing cfs_time_current() */ peer->peer_last_alive = cfs_time_current(); mb();}voidkptllnd_peer_notify (kptl_peer_t *peer){ unsigned long flags; time_t last_alive = 0; int error = 0; spin_lock_irqsave(&peer->peer_lock, flags); if (peer->peer_error != 0) { error = peer->peer_error; peer->peer_error = 0; last_alive = cfs_time_current_sec() - cfs_duration_sec(cfs_time_current() - peer->peer_last_alive); } spin_unlock_irqrestore(&peer->peer_lock, flags); if (error != 0) lnet_notify (kptllnd_data.kptl_ni, peer->peer_id.nid, 0, last_alive);}voidkptllnd_handle_closing_peers (){ unsigned long flags; struct list_head txs; kptl_peer_t *peer; struct list_head *tmp; struct list_head *nxt; kptl_tx_t *tx; int idle; /* Check with a read lock first to avoid blocking anyone */ read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags); idle = list_empty(&kptllnd_data.kptl_closing_peers) && list_empty(&kptllnd_data.kptl_zombie_peers); read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags); if (idle) return; INIT_LIST_HEAD(&txs); write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags); /* Cancel txs on all zombie peers. NB anyone dropping the last peer * ref removes it from this list, so I musn't drop the lock while * scanning it. */ list_for_each (tmp, &kptllnd_data.kptl_zombie_peers) { peer = list_entry (tmp, kptl_peer_t, peer_list); LASSERT (peer->peer_state == PEER_STATE_ZOMBIE); kptllnd_peer_cancel_txs(peer, &txs); } /* Notify LNET and cancel txs on closing (i.e. newly closed) peers. NB * I'm the only one removing from this list, but peers can be added on
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -