⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 gmlnd_comm.c

📁 非常经典的一个分布式系统
💻 C
📖 第 1 页 / 共 2 页
字号:
                        spin_unlock(&gmni->gmni_tx_lock);                        /* Unlocking here allows sends to get re-ordered,                         * but we want to allow other CPUs to progress... */                        tx->tx_ltxb = ltxb;                        /* marshall message in tx_ltxb...                         * 1. Copy what was marshalled so far (in tx_buf) */                        memcpy(GMNAL_NETBUF_MSG(&ltxb->txb_buf),                               GMNAL_NETBUF_MSG(&tx->tx_buf), tx->tx_msgnob);                        /* 2. Copy the payload */                        if (tx->tx_large_iskiov)                                lnet_copy_kiov2kiov(                                        gmni->gmni_large_pages,                                        ltxb->txb_buf.nb_kiov,                                        tx->tx_msgnob,                                        tx->tx_large_niov,                                        tx->tx_large_frags.kiov,                                        tx->tx_large_offset,                                        tx->tx_large_nob);                        else                                lnet_copy_iov2kiov(                                        gmni->gmni_large_pages,                                        ltxb->txb_buf.nb_kiov,                                        tx->tx_msgnob,                                        tx->tx_large_niov,                                        tx->tx_large_frags.iov,                                        tx->tx_large_offset,                                        tx->tx_large_nob);                        tx->tx_msgnob += tx->tx_large_nob;                        spin_lock(&gmni->gmni_tx_lock);                }                list_add_tail(&tx->tx_list, &gmni->gmni_cred_txq);        }        if (!list_empty(&gmni->gmni_cred_txq) &&            gmni->gmni_tx_credits != 0) {                tx = list_entry(gmni->gmni_cred_txq.next, gmnal_tx_t, tx_list);                /* consume tx and 1 credit */                list_del(&tx->tx_list);                gmni->gmni_tx_credits--;                spin_unlock(&gmni->gmni_tx_lock);                /* Unlocking here allows sends to get re-ordered, but we want                 * to allow other CPUs to progress... */                LASSERT(!tx->tx_credit);                tx->tx_credit = 1;                tx->tx_launchtime = jiffies;                if (tx->tx_msgnob <= gmni->gmni_small_msgsize) {                        LASSERT (tx->tx_ltxb == NULL);                        netaddr = GMNAL_NETBUF_LOCAL_NETADDR(&tx->tx_buf);                        gmsize = gmni->gmni_small_gmsize;                        pri = GMNAL_SMALL_PRIORITY;                } else {                        LASSERT (tx->tx_ltxb != NULL);                        netaddr = GMNAL_NETBUF_LOCAL_NETADDR(&tx->tx_ltxb->txb_buf);                        gmsize = gmni->gmni_large_gmsize;                        pri = GMNAL_LARGE_PRIORITY;                }                spin_lock(&gmni->gmni_gm_lock);                gm_send_to_peer_with_callback(gmni->gmni_port,                                               netaddr, gmsize,                                               tx->tx_msgnob,                                              pri,                                               tx->tx_gmlid,                                              gmnal_tx_callback,                                               (void*)tx);                spin_unlock(&gmni->gmni_gm_lock);                spin_lock(&gmni->gmni_tx_lock);        }}voidgmnal_post_rx(gmnal_ni_t *gmni, gmnal_rx_t *rx){        int   gmsize = rx->rx_islarge ? gmni->gmni_large_gmsize :                                        gmni->gmni_small_gmsize;        int   pri    = rx->rx_islarge ? GMNAL_LARGE_PRIORITY :                                        GMNAL_SMALL_PRIORITY;        void *buffer = GMNAL_NETBUF_LOCAL_NETADDR(&rx->rx_buf);	CDEBUG(D_NET, "posting rx %p buf %p\n", rx, buffer);	spin_lock(&gmni->gmni_gm_lock);        gm_provide_receive_buffer_with_tag(gmni->gmni_port,                                            buffer, gmsize, pri, 0);	spin_unlock(&gmni->gmni_gm_lock);}voidgmnal_version_reply (gmnal_ni_t *gmni, gmnal_rx_t *rx){        /* Future protocol version compatibility support!         * The next gmlnd-specific protocol rev will first send a message to         * check version; I reply with a stub message containing my current         * magic+version... */        gmnal_msg_t *msg;        gmnal_tx_t  *tx = gmnal_get_tx(gmni);        if (tx == NULL) {                CERROR("Can't allocate tx to send version info to %u\n",                       rx->rx_recv_gmid);                return;        }        LASSERT (tx->tx_lntmsg == NULL);        /* no finalize */        tx->tx_nid = LNET_NID_ANY;        tx->tx_gmlid = rx->rx_recv_gmid;        msg = GMNAL_NETBUF_MSG(&tx->tx_buf);        msg->gmm_magic   = GMNAL_MSG_MAGIC;        msg->gmm_version = GMNAL_MSG_VERSION;        /* just send magic + version */        tx->tx_msgnob = offsetof(gmnal_msg_t, gmm_type);        tx->tx_large_nob = 0;        spin_lock(&gmni->gmni_tx_lock);        list_add_tail(&tx->tx_list, &gmni->gmni_buf_txq);        gmnal_check_txqueues_locked(gmni);        spin_unlock(&gmni->gmni_tx_lock);}intgmnal_rx_thread(void *arg){	gmnal_ni_t	*gmni = arg;	gm_recv_event_t	*rxevent = NULL;	gm_recv_t	*recv = NULL;        gmnal_rx_t      *rx;        int              rc;	cfs_daemonize("gmnal_rxd");        down(&gmni->gmni_rx_mutex);	while (!gmni->gmni_shutdown) {                spin_lock(&gmni->gmni_gm_lock);		rxevent = gm_blocking_receive_no_spin(gmni->gmni_port);                spin_unlock(&gmni->gmni_gm_lock);                switch (GM_RECV_EVENT_TYPE(rxevent)) {                default:                        gm_unknown(gmni->gmni_port, rxevent);                        continue;                case GM_FAST_RECV_EVENT:                case GM_FAST_PEER_RECV_EVENT:                case GM_PEER_RECV_EVENT:                case GM_FAST_HIGH_RECV_EVENT:                case GM_FAST_HIGH_PEER_RECV_EVENT:                case GM_HIGH_PEER_RECV_EVENT:                case GM_RECV_EVENT:                case GM_HIGH_RECV_EVENT:                        break;                }                                recv = &rxevent->recv;                rx = gm_hash_find(gmni->gmni_rx_hash,                                   gm_ntohp(recv->buffer));                LASSERT (rx != NULL);                rx->rx_recv_nob  = gm_ntoh_u32(recv->length);                rx->rx_recv_gmid = gm_ntoh_u16(recv->sender_node_id);                rx->rx_recv_port = gm_ntoh_u8(recv->sender_port_id);                rx->rx_recv_type = gm_ntoh_u8(recv->type);                switch (GM_RECV_EVENT_TYPE(rxevent)) {                case GM_FAST_RECV_EVENT:                case GM_FAST_PEER_RECV_EVENT:                case GM_FAST_HIGH_RECV_EVENT:                case GM_FAST_HIGH_PEER_RECV_EVENT:                        LASSERT (rx->rx_recv_nob <= PAGE_SIZE);                        memcpy(GMNAL_NETBUF_MSG(&rx->rx_buf),                               gm_ntohp(recv->message), rx->rx_recv_nob);                        break;                }                up(&gmni->gmni_rx_mutex);                CDEBUG (D_NET, "rx %p: buf %p(%p) nob %d\n", rx,                         GMNAL_NETBUF_LOCAL_NETADDR(&rx->rx_buf),                        gm_ntohp(recv->buffer), rx->rx_recv_nob);                /* We're connectionless: simply drop packets with                 * errors */                rc = gmnal_unpack_msg(gmni, rx);                if (rc == 0) {                        gmnal_msg_t *msg = GMNAL_NETBUF_MSG(&rx->rx_buf);                                                LASSERT (msg->gmm_type == GMNAL_MSG_IMMEDIATE);                        rc =  lnet_parse(gmni->gmni_ni,                                          &msg->gmm_u.immediate.gmim_hdr,                                         msg->gmm_srcnid,                                         rx, 0);                } else if (rc > 0) {                        gmnal_version_reply(gmni, rx);                        rc = -EPROTO;           /* repost rx */                }                if (rc < 0)                     /* parse failure */                        gmnal_post_rx(gmni, rx);                down(&gmni->gmni_rx_mutex);	}        up(&gmni->gmni_rx_mutex);	CDEBUG(D_NET, "exiting\n");        atomic_dec(&gmni->gmni_nthreads);	return 0;}voidgmnal_stop_threads(gmnal_ni_t *gmni){        int count = 2;        gmni->gmni_shutdown = 1;        mb();                /* wake rxthread owning gmni_rx_mutex with an alarm. */	spin_lock(&gmni->gmni_gm_lock);	gm_set_alarm(gmni->gmni_port, &gmni->gmni_alarm, 0, NULL, NULL);	spin_unlock(&gmni->gmni_gm_lock);	while (atomic_read(&gmni->gmni_nthreads) != 0) {                count++;                if ((count & (count - 1)) == 0)                        CWARN("Waiting for %d threads to stop\n",                              atomic_read(&gmni->gmni_nthreads));                gmnal_yield(1);	}}intgmnal_start_threads(gmnal_ni_t *gmni){        int     i;        int     pid;        LASSERT (!gmni->gmni_shutdown);        LASSERT (atomic_read(&gmni->gmni_nthreads) == 0);	gm_initialize_alarm(&gmni->gmni_alarm);	for (i = 0; i < num_online_cpus(); i++) {                pid = kernel_thread(gmnal_rx_thread, (void*)gmni, 0);                if (pid < 0) {                        CERROR("rx thread failed to start: %d\n", pid);                        gmnal_stop_threads(gmni);                        return pid;                }                atomic_inc(&gmni->gmni_nthreads);	}	return 0;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -