📄 ptllnd_peer.c
字号:
if (msg->ptlm_credits <= 1) { CERROR("Need more than 1+%d credits from %s\n", msg->ptlm_credits, libcfs_id2str(lpid)); return NULL; } write_lock_irqsave(g_lock, flags); peer = kptllnd_id2peer_locked(lpid); if (peer != NULL) { if (peer->peer_state == PEER_STATE_WAITING_HELLO) { /* Completing HELLO handshake */ LASSERT(peer->peer_incarnation == 0); if (msg->ptlm_dststamp != 0 && msg->ptlm_dststamp != peer->peer_myincarnation) { write_unlock_irqrestore(g_lock, flags); CERROR("Ignoring HELLO from %s: unexpected " "dststamp "LPX64" ("LPX64" wanted)\n", libcfs_id2str(lpid), msg->ptlm_dststamp, peer->peer_myincarnation); kptllnd_peer_decref(peer); return NULL; } /* Concurrent initiation or response to my HELLO */ peer->peer_state = PEER_STATE_ACTIVE; peer->peer_incarnation = msg->ptlm_srcstamp; peer->peer_next_matchbits = safe_matchbits; peer->peer_max_msg_size = msg->ptlm_u.hello.kptlhm_max_msg_size; write_unlock_irqrestore(g_lock, flags); return peer; } if (msg->ptlm_dststamp != 0 && msg->ptlm_dststamp <= peer->peer_myincarnation) { write_unlock_irqrestore(g_lock, flags); CERROR("Ignoring stale HELLO from %s: " "dststamp "LPX64" (current "LPX64")\n", libcfs_id2str(lpid), msg->ptlm_dststamp, peer->peer_myincarnation); kptllnd_peer_decref(peer); return NULL; } /* Brand new connection attempt: remove old incarnation */ kptllnd_peer_close_locked(peer, 0); } kptllnd_cull_peertable_locked(lpid); write_unlock_irqrestore(g_lock, flags); if (peer != NULL) { CDEBUG(D_NET, "Peer %s (%s) reconnecting:" " stamp "LPX64"("LPX64")\n", libcfs_id2str(lpid), kptllnd_ptlid2str(initiator), msg->ptlm_srcstamp, peer->peer_incarnation); kptllnd_peer_decref(peer); } hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE); if (hello_tx == NULL) { CERROR("Unable to allocate HELLO message for %s\n", libcfs_id2str(lpid)); return NULL; } kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO, sizeof(kptl_hello_msg_t)); new_peer = kptllnd_peer_allocate(lpid, initiator); if (new_peer == NULL) { kptllnd_tx_decref(hello_tx); return NULL; } rc = kptllnd_peer_reserve_buffers(); if (rc != 0) { kptllnd_peer_decref(new_peer); kptllnd_tx_decref(hello_tx); CERROR("Failed to reserve buffers for %s\n", libcfs_id2str(lpid)); return NULL; } write_lock_irqsave(g_lock, flags); again: if (kptllnd_data.kptl_shutdown) { write_unlock_irqrestore(g_lock, flags); CERROR ("Shutdown started, refusing connection from %s\n", libcfs_id2str(lpid)); kptllnd_peer_unreserve_buffers(); kptllnd_peer_decref(new_peer); kptllnd_tx_decref(hello_tx); return NULL; } peer = kptllnd_id2peer_locked(lpid); if (peer != NULL) { if (peer->peer_state == PEER_STATE_WAITING_HELLO) { /* An outgoing message instantiated 'peer' for me */ LASSERT(peer->peer_incarnation == 0); peer->peer_state = PEER_STATE_ACTIVE; peer->peer_incarnation = msg->ptlm_srcstamp; peer->peer_next_matchbits = safe_matchbits; peer->peer_max_msg_size = msg->ptlm_u.hello.kptlhm_max_msg_size; write_unlock_irqrestore(g_lock, flags); CWARN("Outgoing instantiated peer %s\n", libcfs_id2str(lpid)); } else { LASSERT (peer->peer_state == PEER_STATE_ACTIVE); write_unlock_irqrestore(g_lock, flags); /* WOW! Somehow this peer completed the HELLO * handshake while I slept. I guess I could have slept * while it rebooted and sent a new HELLO, so I'll fail * this one... */ CWARN("Wow! peer %s\n", libcfs_id2str(lpid)); kptllnd_peer_decref(peer); peer = NULL; } kptllnd_peer_unreserve_buffers(); kptllnd_peer_decref(new_peer); kptllnd_tx_decref(hello_tx); return peer; } if (kptllnd_data.kptl_n_active_peers == kptllnd_data.kptl_expected_peers) { /* peer table full */ write_unlock_irqrestore(g_lock, flags); kptllnd_peertable_overflow_msg("Connection from ", lpid); rc = kptllnd_reserve_buffers(1); /* HELLO headroom */ if (rc != 0) { CERROR("Refusing connection from %s\n", libcfs_id2str(lpid)); kptllnd_peer_unreserve_buffers(); kptllnd_peer_decref(new_peer); kptllnd_tx_decref(hello_tx); return NULL; } write_lock_irqsave(g_lock, flags); kptllnd_data.kptl_expected_peers++; goto again; } last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(lpid); hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen; hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size = *kptllnd_tunables.kptl_max_msg_size; new_peer->peer_state = PEER_STATE_ACTIVE; new_peer->peer_incarnation = msg->ptlm_srcstamp; new_peer->peer_next_matchbits = safe_matchbits; new_peer->peer_last_matchbits_seen = last_matchbits_seen; new_peer->peer_max_msg_size = msg->ptlm_u.hello.kptlhm_max_msg_size; kptllnd_peer_add_peertable_locked(new_peer); write_unlock_irqrestore(g_lock, flags); /* NB someone else could get in now and post a message before I post * the HELLO, but post_tx/check_sends take care of that! */ CDEBUG(D_NETTRACE, "%s: post response hello %p\n", libcfs_id2str(new_peer->peer_id), hello_tx); kptllnd_post_tx(new_peer, hello_tx, 0); kptllnd_peer_check_sends(new_peer); return new_peer;}voidkptllnd_tx_launch(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag){ kptllnd_post_tx(peer, tx, nfrag); kptllnd_peer_check_sends(peer);}intkptllnd_find_target(kptl_peer_t **peerp, lnet_process_id_t target){ rwlock_t *g_lock = &kptllnd_data.kptl_peer_rw_lock; ptl_process_id_t ptl_id; kptl_peer_t *new_peer; kptl_tx_t *hello_tx; unsigned long flags; int rc; __u64 last_matchbits_seen; /* I expect to find the peer, so I only take a read lock... */ read_lock_irqsave(g_lock, flags); *peerp = kptllnd_id2peer_locked(target); read_unlock_irqrestore(g_lock, flags); if (*peerp != NULL) return 0; if ((target.pid & LNET_PID_USERFLAG) != 0) { CWARN("Refusing to create a new connection to %s " "(non-kernel peer)\n", libcfs_id2str(target)); return -EHOSTUNREACH; } /* The new peer is a kernel ptllnd, and kernel ptllnds all have * the same portals PID */ ptl_id.nid = kptllnd_lnet2ptlnid(target.nid); ptl_id.pid = kptllnd_data.kptl_portals_id.pid; hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE); if (hello_tx == NULL) { CERROR("Unable to allocate connect message for %s\n", libcfs_id2str(target)); return -ENOMEM; } kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO, sizeof(kptl_hello_msg_t)); new_peer = kptllnd_peer_allocate(target, ptl_id); if (new_peer == NULL) { rc = -ENOMEM; goto unwind_0; } rc = kptllnd_peer_reserve_buffers(); if (rc != 0) goto unwind_1; write_lock_irqsave(g_lock, flags); again: if (kptllnd_data.kptl_shutdown) { write_unlock_irqrestore(g_lock, flags); rc = -ESHUTDOWN; goto unwind_2; } *peerp = kptllnd_id2peer_locked(target); if (*peerp != NULL) { write_unlock_irqrestore(g_lock, flags); goto unwind_2; } kptllnd_cull_peertable_locked(target); if (kptllnd_data.kptl_n_active_peers == kptllnd_data.kptl_expected_peers) { /* peer table full */ write_unlock_irqrestore(g_lock, flags); kptllnd_peertable_overflow_msg("Connection to ", target); rc = kptllnd_reserve_buffers(1); /* HELLO headroom */ if (rc != 0) { CERROR("Can't create connection to %s\n", libcfs_id2str(target)); rc = -ENOMEM; goto unwind_2; } write_lock_irqsave(g_lock, flags); kptllnd_data.kptl_expected_peers++; goto again; } last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(target); hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen; hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size = *kptllnd_tunables.kptl_max_msg_size; new_peer->peer_state = PEER_STATE_WAITING_HELLO; new_peer->peer_last_matchbits_seen = last_matchbits_seen; kptllnd_peer_add_peertable_locked(new_peer); write_unlock_irqrestore(g_lock, flags); /* NB someone else could get in now and post a message before I post * the HELLO, but post_tx/check_sends take care of that! */ CDEBUG(D_NETTRACE, "%s: post initial hello %p\n", libcfs_id2str(new_peer->peer_id), hello_tx); kptllnd_post_tx(new_peer, hello_tx, 0); kptllnd_peer_check_sends(new_peer); *peerp = new_peer; return 0; unwind_2: kptllnd_peer_unreserve_buffers(); unwind_1: kptllnd_peer_decref(new_peer); unwind_0: kptllnd_tx_decref(hello_tx); return rc;}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -