⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ptllnd_peer.c

📁 非常经典的一个分布式系统
💻 C
📖 第 1 页 / 共 4 页
字号:
        if (msg->ptlm_credits <= 1) {                CERROR("Need more than 1+%d credits from %s\n",                       msg->ptlm_credits, libcfs_id2str(lpid));                return NULL;        }                write_lock_irqsave(g_lock, flags);        peer = kptllnd_id2peer_locked(lpid);        if (peer != NULL) {                if (peer->peer_state == PEER_STATE_WAITING_HELLO) {                        /* Completing HELLO handshake */                        LASSERT(peer->peer_incarnation == 0);                        if (msg->ptlm_dststamp != 0 &&                            msg->ptlm_dststamp != peer->peer_myincarnation) {                                write_unlock_irqrestore(g_lock, flags);                                CERROR("Ignoring HELLO from %s: unexpected "                                       "dststamp "LPX64" ("LPX64" wanted)\n",                                       libcfs_id2str(lpid),                                       msg->ptlm_dststamp,                                       peer->peer_myincarnation);                                kptllnd_peer_decref(peer);                                return NULL;                        }                                                /* Concurrent initiation or response to my HELLO */                        peer->peer_state = PEER_STATE_ACTIVE;                        peer->peer_incarnation = msg->ptlm_srcstamp;                        peer->peer_next_matchbits = safe_matchbits;                        peer->peer_max_msg_size =                                msg->ptlm_u.hello.kptlhm_max_msg_size;                                                write_unlock_irqrestore(g_lock, flags);                        return peer;                }                if (msg->ptlm_dststamp != 0 &&                    msg->ptlm_dststamp <= peer->peer_myincarnation) {                        write_unlock_irqrestore(g_lock, flags);                        CERROR("Ignoring stale HELLO from %s: "                               "dststamp "LPX64" (current "LPX64")\n",                               libcfs_id2str(lpid),                               msg->ptlm_dststamp,                               peer->peer_myincarnation);                        kptllnd_peer_decref(peer);                        return NULL;                }                /* Brand new connection attempt: remove old incarnation */                kptllnd_peer_close_locked(peer, 0);        }        kptllnd_cull_peertable_locked(lpid);        write_unlock_irqrestore(g_lock, flags);        if (peer != NULL) {                CDEBUG(D_NET, "Peer %s (%s) reconnecting:"                       " stamp "LPX64"("LPX64")\n",                       libcfs_id2str(lpid), kptllnd_ptlid2str(initiator),                       msg->ptlm_srcstamp, peer->peer_incarnation);                kptllnd_peer_decref(peer);        }        hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);        if (hello_tx == NULL) {                CERROR("Unable to allocate HELLO message for %s\n",                       libcfs_id2str(lpid));                return NULL;        }        kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,                         sizeof(kptl_hello_msg_t));        new_peer = kptllnd_peer_allocate(lpid, initiator);        if (new_peer == NULL) {                kptllnd_tx_decref(hello_tx);                return NULL;        }        rc = kptllnd_peer_reserve_buffers();        if (rc != 0) {                kptllnd_peer_decref(new_peer);                kptllnd_tx_decref(hello_tx);                CERROR("Failed to reserve buffers for %s\n",                       libcfs_id2str(lpid));                return NULL;        }        write_lock_irqsave(g_lock, flags); again:        if (kptllnd_data.kptl_shutdown) {                write_unlock_irqrestore(g_lock, flags);                CERROR ("Shutdown started, refusing connection from %s\n",                        libcfs_id2str(lpid));                kptllnd_peer_unreserve_buffers();                kptllnd_peer_decref(new_peer);                kptllnd_tx_decref(hello_tx);                return NULL;        }        peer = kptllnd_id2peer_locked(lpid);        if (peer != NULL) {                if (peer->peer_state == PEER_STATE_WAITING_HELLO) {                        /* An outgoing message instantiated 'peer' for me */                        LASSERT(peer->peer_incarnation == 0);                        peer->peer_state = PEER_STATE_ACTIVE;                        peer->peer_incarnation = msg->ptlm_srcstamp;                        peer->peer_next_matchbits = safe_matchbits;                        peer->peer_max_msg_size =                                msg->ptlm_u.hello.kptlhm_max_msg_size;                        write_unlock_irqrestore(g_lock, flags);                        CWARN("Outgoing instantiated peer %s\n",                              libcfs_id2str(lpid));		} else {			LASSERT (peer->peer_state == PEER_STATE_ACTIVE);                        write_unlock_irqrestore(g_lock, flags);			/* WOW!  Somehow this peer completed the HELLO			 * handshake while I slept.  I guess I could have slept			 * while it rebooted and sent a new HELLO, so I'll fail			 * this one... */                        CWARN("Wow! peer %s\n", libcfs_id2str(lpid));			kptllnd_peer_decref(peer);			peer = NULL;		}                kptllnd_peer_unreserve_buffers();                kptllnd_peer_decref(new_peer);                kptllnd_tx_decref(hello_tx);                return peer;        }        if (kptllnd_data.kptl_n_active_peers ==            kptllnd_data.kptl_expected_peers) {                /* peer table full */                write_unlock_irqrestore(g_lock, flags);                kptllnd_peertable_overflow_msg("Connection from ", lpid);                rc = kptllnd_reserve_buffers(1); /* HELLO headroom */                if (rc != 0) {                        CERROR("Refusing connection from %s\n",                               libcfs_id2str(lpid));                        kptllnd_peer_unreserve_buffers();                        kptllnd_peer_decref(new_peer);                        kptllnd_tx_decref(hello_tx);                        return NULL;                }                                write_lock_irqsave(g_lock, flags);                kptllnd_data.kptl_expected_peers++;                goto again;        }        last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(lpid);        hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;        hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =                *kptllnd_tunables.kptl_max_msg_size;        new_peer->peer_state = PEER_STATE_ACTIVE;        new_peer->peer_incarnation = msg->ptlm_srcstamp;        new_peer->peer_next_matchbits = safe_matchbits;        new_peer->peer_last_matchbits_seen = last_matchbits_seen;        new_peer->peer_max_msg_size = msg->ptlm_u.hello.kptlhm_max_msg_size;        kptllnd_peer_add_peertable_locked(new_peer);        write_unlock_irqrestore(g_lock, flags);	/* NB someone else could get in now and post a message before I post	 * the HELLO, but post_tx/check_sends take care of that! */        CDEBUG(D_NETTRACE, "%s: post response hello %p\n",               libcfs_id2str(new_peer->peer_id), hello_tx);        kptllnd_post_tx(new_peer, hello_tx, 0);        kptllnd_peer_check_sends(new_peer);        return new_peer;}voidkptllnd_tx_launch(kptl_peer_t *peer, kptl_tx_t *tx, int nfrag){        kptllnd_post_tx(peer, tx, nfrag);        kptllnd_peer_check_sends(peer);}intkptllnd_find_target(kptl_peer_t **peerp, lnet_process_id_t target){        rwlock_t         *g_lock = &kptllnd_data.kptl_peer_rw_lock;        ptl_process_id_t  ptl_id;        kptl_peer_t      *new_peer;        kptl_tx_t        *hello_tx;        unsigned long     flags;        int               rc;        __u64             last_matchbits_seen;        /* I expect to find the peer, so I only take a read lock... */        read_lock_irqsave(g_lock, flags);        *peerp = kptllnd_id2peer_locked(target);        read_unlock_irqrestore(g_lock, flags);        if (*peerp != NULL)                return 0;                if ((target.pid & LNET_PID_USERFLAG) != 0) {                CWARN("Refusing to create a new connection to %s "                      "(non-kernel peer)\n", libcfs_id2str(target));                return -EHOSTUNREACH;        }        /* The new peer is a kernel ptllnd, and kernel ptllnds all have         * the same portals PID */        ptl_id.nid = kptllnd_lnet2ptlnid(target.nid);        ptl_id.pid = kptllnd_data.kptl_portals_id.pid;        hello_tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);        if (hello_tx == NULL) {                CERROR("Unable to allocate connect message for %s\n",                       libcfs_id2str(target));                return -ENOMEM;        }        kptllnd_init_msg(hello_tx->tx_msg, PTLLND_MSG_TYPE_HELLO,                         sizeof(kptl_hello_msg_t));        new_peer = kptllnd_peer_allocate(target, ptl_id);        if (new_peer == NULL) {                rc = -ENOMEM;                goto unwind_0;        }        rc = kptllnd_peer_reserve_buffers();        if (rc != 0)                goto unwind_1;        write_lock_irqsave(g_lock, flags); again:        if (kptllnd_data.kptl_shutdown) {                write_unlock_irqrestore(g_lock, flags);                rc = -ESHUTDOWN;                goto unwind_2;        }        *peerp = kptllnd_id2peer_locked(target);        if (*peerp != NULL) {                write_unlock_irqrestore(g_lock, flags);                goto unwind_2;        }        kptllnd_cull_peertable_locked(target);        if (kptllnd_data.kptl_n_active_peers ==            kptllnd_data.kptl_expected_peers) {                /* peer table full */                write_unlock_irqrestore(g_lock, flags);                kptllnd_peertable_overflow_msg("Connection to ", target);                rc = kptllnd_reserve_buffers(1); /* HELLO headroom */                if (rc != 0) {                        CERROR("Can't create connection to %s\n",                               libcfs_id2str(target));                        rc = -ENOMEM;                        goto unwind_2;                }                write_lock_irqsave(g_lock, flags);                kptllnd_data.kptl_expected_peers++;                goto again;        }        last_matchbits_seen = kptllnd_get_last_seen_matchbits_locked(target);        hello_tx->tx_msg->ptlm_u.hello.kptlhm_matchbits = last_matchbits_seen;        hello_tx->tx_msg->ptlm_u.hello.kptlhm_max_msg_size =                *kptllnd_tunables.kptl_max_msg_size;                        new_peer->peer_state = PEER_STATE_WAITING_HELLO;        new_peer->peer_last_matchbits_seen = last_matchbits_seen;                kptllnd_peer_add_peertable_locked(new_peer);        write_unlock_irqrestore(g_lock, flags);	/* NB someone else could get in now and post a message before I post	 * the HELLO, but post_tx/check_sends take care of that! */        CDEBUG(D_NETTRACE, "%s: post initial hello %p\n",               libcfs_id2str(new_peer->peer_id), hello_tx);        kptllnd_post_tx(new_peer, hello_tx, 0);        kptllnd_peer_check_sends(new_peer);               *peerp = new_peer;        return 0;         unwind_2:        kptllnd_peer_unreserve_buffers(); unwind_1:        kptllnd_peer_decref(new_peer); unwind_0:        kptllnd_tx_decref(hello_tx);        return rc;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -