📄 ptllnd_cb.c
字号:
/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved. * Author: Eric Barton <eeb@bartonsoftware.com> * * This file is part of the Lustre file system, http://www.lustre.org * Lustre is a trademark of Cluster File Systems, Inc. * * This file is confidential source code owned by Cluster File Systems. * No viewing, modification, compilation, redistribution, or any other * form of use is permitted except through a signed license agreement. * * If you have not signed such an agreement, then you have no rights to * this file. Please destroy it immediately and contact CFS. * */#include "ptllnd.h"voidptllnd_set_tx_deadline(ptllnd_tx_t *tx){ ptllnd_peer_t *peer = tx->tx_peer; lnet_ni_t *ni = peer->plp_ni; ptllnd_ni_t *plni = ni->ni_data; tx->tx_deadline = cfs_time_current_sec() + plni->plni_timeout;}voidptllnd_post_tx(ptllnd_tx_t *tx){ ptllnd_peer_t *peer = tx->tx_peer; ptllnd_set_tx_deadline(tx); list_add_tail(&tx->tx_list, &peer->plp_txq); ptllnd_check_sends(peer);}char *ptllnd_ptlid2str(ptl_process_id_t id){ static char strs[8][32]; static int idx = 0; char *str = strs[idx++]; if (idx >= sizeof(strs)/sizeof(strs[0])) idx = 0; snprintf(str, sizeof(strs[0]), FMT_PTLID, id.pid, id.nid); return str;}voidptllnd_destroy_peer(ptllnd_peer_t *peer){ lnet_ni_t *ni = peer->plp_ni; ptllnd_ni_t *plni = ni->ni_data; int nmsg = peer->plp_lazy_credits + plni->plni_peer_credits; ptllnd_size_buffers(ni, -nmsg); LASSERT (peer->plp_closing); LASSERT (plni->plni_npeers > 0); LASSERT (list_empty(&peer->plp_txq)); LASSERT (list_empty(&peer->plp_activeq)); plni->plni_npeers--; LIBCFS_FREE(peer, sizeof(*peer));}voidptllnd_abort_txs(ptllnd_ni_t *plni, struct list_head *q){ while (!list_empty(q)) { ptllnd_tx_t *tx = list_entry(q->next, ptllnd_tx_t, tx_list); tx->tx_status = -ESHUTDOWN; list_del(&tx->tx_list); list_add_tail(&tx->tx_list, &plni->plni_zombie_txs); }}voidptllnd_close_peer(ptllnd_peer_t *peer, int error){ lnet_ni_t *ni = peer->plp_ni; ptllnd_ni_t *plni = ni->ni_data; if (peer->plp_closing) return; peer->plp_closing = 1; if (!list_empty(&peer->plp_txq) || !list_empty(&peer->plp_activeq) || error != 0) { CWARN("Closing %s\n", libcfs_id2str(peer->plp_id)); if (plni->plni_debug) ptllnd_dump_debug(ni, peer->plp_id); } ptllnd_abort_txs(plni, &peer->plp_txq); ptllnd_abort_txs(plni, &peer->plp_activeq); list_del(&peer->plp_list); ptllnd_peer_decref(peer);}ptllnd_peer_t *ptllnd_find_peer(lnet_ni_t *ni, lnet_process_id_t id, int create){ ptllnd_ni_t *plni = ni->ni_data; unsigned int hash = LNET_NIDADDR(id.nid) % plni->plni_peer_hash_size; struct list_head *tmp; ptllnd_peer_t *plp; ptllnd_tx_t *tx; int rc; LASSERT (LNET_NIDNET(id.nid) == LNET_NIDNET(ni->ni_nid)); list_for_each(tmp, &plni->plni_peer_hash[hash]) { plp = list_entry(tmp, ptllnd_peer_t, plp_list); if (plp->plp_id.nid == id.nid && plp->plp_id.pid == id.pid) { ptllnd_peer_addref(plp); return plp; } } if (!create) return NULL; /* New peer: check first for enough posted buffers */ plni->plni_npeers++; rc = ptllnd_size_buffers(ni, plni->plni_peer_credits); if (rc != 0) { plni->plni_npeers--; return NULL; } LIBCFS_ALLOC(plp, sizeof(*plp)); if (plp == NULL) { CERROR("Can't allocate new peer %s\n", libcfs_id2str(id)); plni->plni_npeers--; ptllnd_size_buffers(ni, -plni->plni_peer_credits); return NULL; } plp->plp_ni = ni; plp->plp_id = id; plp->plp_ptlid.nid = LNET_NIDADDR(id.nid); plp->plp_ptlid.pid = plni->plni_ptllnd_pid; plp->plp_credits = 1; /* add more later when she gives me credits */ plp->plp_max_msg_size = plni->plni_max_msg_size; /* until I hear from her */ plp->plp_sent_credits = 1; /* Implicit credit for HELLO */ plp->plp_outstanding_credits = plni->plni_peer_credits - 1; plp->plp_lazy_credits = 0; plp->plp_extra_lazy_credits = 0; plp->plp_match = 0; plp->plp_stamp = 0; plp->plp_recvd_hello = 0; plp->plp_closing = 0; plp->plp_refcount = 1; CFS_INIT_LIST_HEAD(&plp->plp_list); CFS_INIT_LIST_HEAD(&plp->plp_txq); CFS_INIT_LIST_HEAD(&plp->plp_activeq); ptllnd_peer_addref(plp); list_add_tail(&plp->plp_list, &plni->plni_peer_hash[hash]); tx = ptllnd_new_tx(plp, PTLLND_MSG_TYPE_HELLO, 0); if (tx == NULL) { CERROR("Can't send HELLO to %s\n", libcfs_id2str(id)); ptllnd_close_peer(plp, -ENOMEM); ptllnd_peer_decref(plp); return NULL; } tx->tx_msg.ptlm_u.hello.kptlhm_matchbits = PTL_RESERVED_MATCHBITS; tx->tx_msg.ptlm_u.hello.kptlhm_max_msg_size = plni->plni_max_msg_size; PTLLND_HISTORY("%s[%d/%d+%d(%d)]: post hello %p", libcfs_id2str(id), tx->tx_peer->plp_credits, tx->tx_peer->plp_outstanding_credits, tx->tx_peer->plp_sent_credits, plni->plni_peer_credits + tx->tx_peer->plp_lazy_credits, tx); ptllnd_post_tx(tx); return plp;}intptllnd_count_q(struct list_head *q){ struct list_head *e; int n = 0; list_for_each(e, q) { n++; } return n;}const char *ptllnd_tx_typestr(int type) { switch (type) { case PTLLND_RDMA_WRITE: return "rdma_write"; case PTLLND_RDMA_READ: return "rdma_read"; case PTLLND_MSG_TYPE_PUT: return "put_req"; case PTLLND_MSG_TYPE_GET: return "get_req"; case PTLLND_MSG_TYPE_IMMEDIATE: return "immediate"; case PTLLND_MSG_TYPE_NOOP: return "noop"; case PTLLND_MSG_TYPE_HELLO: return "hello"; default: return "<unknown>"; }}voidptllnd_debug_tx(ptllnd_tx_t *tx) { CDEBUG(D_WARNING, "%s %s b %ld.%06ld/%ld.%06ld" " r %ld.%06ld/%ld.%06ld status %d\n", ptllnd_tx_typestr(tx->tx_type), libcfs_id2str(tx->tx_peer->plp_id), tx->tx_bulk_posted.tv_sec, tx->tx_bulk_posted.tv_usec, tx->tx_bulk_done.tv_sec, tx->tx_bulk_done.tv_usec, tx->tx_req_posted.tv_sec, tx->tx_req_posted.tv_usec, tx->tx_req_done.tv_sec, tx->tx_req_done.tv_usec, tx->tx_status);}voidptllnd_debug_peer(lnet_ni_t *ni, lnet_process_id_t id){ ptllnd_peer_t *plp = ptllnd_find_peer(ni, id, 0); struct list_head *tmp; ptllnd_ni_t *plni = ni->ni_data; ptllnd_tx_t *tx; if (plp == NULL) { CDEBUG(D_WARNING, "No peer %s\n", libcfs_id2str(id)); return; } CDEBUG(D_WARNING, "%s %s%s [%d] "LPU64".%06d m "LPU64" q %d/%d c %d/%d+%d(%d)\n", libcfs_id2str(id), plp->plp_recvd_hello ? "H" : "_", plp->plp_closing ? "C" : "_", plp->plp_refcount, plp->plp_stamp / 1000000, (int)(plp->plp_stamp % 1000000), plp->plp_match, ptllnd_count_q(&plp->plp_txq), ptllnd_count_q(&plp->plp_activeq), plp->plp_credits, plp->plp_outstanding_credits, plp->plp_sent_credits, plni->plni_peer_credits + plp->plp_lazy_credits); CDEBUG(D_WARNING, "txq:\n"); list_for_each (tmp, &plp->plp_txq) { tx = list_entry(tmp, ptllnd_tx_t, tx_list); ptllnd_debug_tx(tx); } CDEBUG(D_WARNING, "activeq:\n"); list_for_each (tmp, &plp->plp_activeq) { tx = list_entry(tmp, ptllnd_tx_t, tx_list); ptllnd_debug_tx(tx); } CDEBUG(D_WARNING, "zombies:\n"); list_for_each (tmp, &plni->plni_zombie_txs) { tx = list_entry(tmp, ptllnd_tx_t, tx_list); if (tx->tx_peer->plp_id.nid == id.nid && tx->tx_peer->plp_id.pid == id.pid) ptllnd_debug_tx(tx); } CDEBUG(D_WARNING, "history:\n"); list_for_each (tmp, &plni->plni_tx_history) { tx = list_entry(tmp, ptllnd_tx_t, tx_list); if (tx->tx_peer->plp_id.nid == id.nid && tx->tx_peer->plp_id.pid == id.pid) ptllnd_debug_tx(tx); } ptllnd_peer_decref(plp);}voidptllnd_dump_debug(lnet_ni_t *ni, lnet_process_id_t id){ ptllnd_debug_peer(ni, id); ptllnd_dump_history();}voidptllnd_notify(lnet_ni_t *ni, lnet_nid_t nid, int alive){ lnet_process_id_t id; ptllnd_peer_t *peer; time_t start = cfs_time_current_sec(); ptllnd_ni_t *plni = ni->ni_data; int w = plni->plni_long_wait; /* This is only actually used to connect to routers at startup! */ LASSERT(alive); id.nid = nid; id.pid = LUSTRE_SRV_LNET_PID; peer = ptllnd_find_peer(ni, id, 1); if (peer == NULL) return; /* wait for the peer to reply */ while (!peer->plp_recvd_hello) { if (w > 0 && cfs_time_current_sec() > start + w/1000) { CWARN("Waited %ds to connect to %s\n", (int)(cfs_time_current_sec() - start), libcfs_id2str(id)); w *= 2; } ptllnd_wait(ni, w); } ptllnd_peer_decref(peer);}intptllnd_setasync(lnet_ni_t *ni, lnet_process_id_t id, int nasync){ ptllnd_peer_t *peer = ptllnd_find_peer(ni, id, nasync > 0); int rc; if (peer == NULL) return -ENOMEM; LASSERT (peer->plp_lazy_credits >= 0); LASSERT (peer->plp_extra_lazy_credits >= 0); /* If nasync < 0, we're being told we can reduce the total message * headroom. We can't do this right now because our peer might already * have credits for the extra buffers, so we just account the extra * headroom in case we need it later and only destroy buffers when the
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -