⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 mxlnd.c

📁 非常经典的一个分布式系统
💻 C
📖 第 1 页 / 共 2 页
字号:
                if (ret < 0) {                        CDEBUG(D_NETERROR, "kernel_read() returned %d - closing %s\n", ret, filename);                        filp_close(filp, current->files);                        MXLND_FREE(buf, allocd + 1);                        return -1;                }                if (ret < bufsize) bufsize = ret;                buf_off = 0;                while (buf_off < bufsize) {                        sep = strchr(buf + buf_off, '\n');                        if (sep != NULL) {                                /* we have a line */                                line = buf + buf_off;                                *sep = '\0';                                ret = mxlnd_parse_line(line);                                if (ret != 0 && strlen(line) != 0) {                                        CDEBUG(D_NETERROR, "Failed to parse \"%s\". Ignoring this host.\n", line);                                }                                buf_off += strlen(line) + 1;                        } else {                                /* last line or we need to read more */                                line = buf + buf_off;                                ret = mxlnd_parse_line(line);                                if (ret != 0) {                                        bufsize -= strlen(line) + 1;                                }                                buf_off += strlen(line) + 1;                        }                }                offset += bufsize;                bufsize = MXLND_BUFSIZE;        }        MXLND_FREE(buf, allocd + 1);        filp_close(filp, current->files);        mxlnd_print_hosts();        return 0;}/** * mxlnd_init_mx - open the endpoint, set out ID, register the EAGER callback * @ni - the network interface * * Returns 0 on success, -1 on failure */intmxlnd_init_mx(lnet_ni_t *ni){        int                     ret     = 0;        int                     found   = 0;        mx_return_t             mxret;        mx_endpoint_addr_t      addr;        u32                     board   = *kmxlnd_tunables.kmx_board;        u32                     ep_id   = *kmxlnd_tunables.kmx_ep_id;        u64                     nic_id  = 0LL;        struct kmx_host         *host   = NULL;        mxret = mx_init();        if (mxret != MX_SUCCESS) {                CERROR("mx_init() failed with %s (%d)\n", mx_strerror(mxret), mxret);                return -1;        }        ret = mxlnd_parse_hosts(*kmxlnd_tunables.kmx_hosts);        if (ret != 0) {                if (*kmxlnd_tunables.kmx_hosts != NULL) {                        CERROR("mxlnd_parse_hosts(%s) failed\n", *kmxlnd_tunables.kmx_hosts);                }                mx_finalize();                return -1;        }        list_for_each_entry(host, &kmxlnd_data.kmx_hosts, mxh_list) {                if (strcmp(host->mxh_hostname, system_utsname.nodename) == 0) {                        /* override the defaults and module parameters with                          * the info from the hosts file */                        board = host->mxh_board;                        ep_id = host->mxh_ep_id;                        kmxlnd_data.kmx_localhost = host;                        CDEBUG(D_NET, "my hostname is %s board %d ep_id %d\n", kmxlnd_data.kmx_localhost->mxh_hostname, kmxlnd_data.kmx_localhost->mxh_board, kmxlnd_data.kmx_localhost->mxh_ep_id);                        found = 1;                        break;                }        }        if (found == 0) {                CERROR("no host entry found for localhost\n");                mx_finalize();                return -1;        }        mxret = mx_open_endpoint(board, ep_id, MXLND_MSG_MAGIC,                                  NULL, 0, &kmxlnd_data.kmx_endpt);        if (mxret != MX_SUCCESS) {                CERROR("mx_open_endpoint() failed with %d\n", mxret);                mx_finalize();                return -1;        }        mx_get_endpoint_addr(kmxlnd_data.kmx_endpt, &addr);        mx_decompose_endpoint_addr(addr, &nic_id, &ep_id);        LASSERT(host != NULL);        ni->ni_nid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), host->mxh_addr);        CDEBUG(D_NET, "My NID is 0x%llx\n", ni->ni_nid);        /* this will catch all unexpected receives. */        mxret = mx_register_unexp_handler(kmxlnd_data.kmx_endpt,                                          (mx_unexp_handler_t) mxlnd_unexpected_recv,                                          NULL);        if (mxret != MX_SUCCESS) {                CERROR("mx_register_unexp_callback() failed with %s\n",                          mx_strerror(mxret));                mx_close_endpoint(kmxlnd_data.kmx_endpt);                mx_finalize();                return -1;        }        mxret = mx_set_request_timeout(kmxlnd_data.kmx_endpt, NULL, MXLND_COMM_TIMEOUT/HZ*1000);        if (mxret != MX_SUCCESS) {                CERROR("mx_set_request_timeout() failed with %s\n",                         mx_strerror(mxret));                mx_close_endpoint(kmxlnd_data.kmx_endpt);                mx_finalize();                return -1;        }        return 0;}/** * mxlnd_thread_start - spawn a kernel thread with this function * @fn - function pointer * @arg - pointer to the parameter data * * Returns 0 on success and a negative value on failure */intmxlnd_thread_start(int (*fn)(void *arg), void *arg){        int     pid = 0;        int     i   = (int) ((long) arg);        atomic_inc(&kmxlnd_data.kmx_nthreads);        init_completion(&kmxlnd_data.kmx_completions[i]);        pid = kernel_thread (fn, arg, 0);        if (pid <= 0) {                CERROR("mx_thread_start() failed with %d\n", pid);                atomic_dec(&kmxlnd_data.kmx_nthreads);        }        return pid;}/** * mxlnd_thread_stop - decrement thread counter * * The thread returns 0 when it detects shutdown. * We are simply decrementing the thread counter. */voidmxlnd_thread_stop(long id){        int     i       = (int) id;        atomic_dec (&kmxlnd_data.kmx_nthreads);        complete(&kmxlnd_data.kmx_completions[i]);}/** * mxlnd_shutdown - stop IO, clean up state * @ni - LNET interface handle * * No calls to the LND should be made after calling this function. */voidmxlnd_shutdown (lnet_ni_t *ni){        int             i               = 0;        LASSERT (ni == kmxlnd_data.kmx_ni);        LASSERT (ni->ni_data == &kmxlnd_data);        CDEBUG(D_NET, "in shutdown()\n");        CDEBUG(D_MALLOC, "before MXLND cleanup: libcfs_kmemory %d "                         "kmx_mem_used %ld\n", atomic_read (&libcfs_kmemory),                          kmxlnd_data.kmx_mem_used);        switch (kmxlnd_data.kmx_init) {        case MXLND_INIT_ALL:                CDEBUG(D_NET, "setting shutdown = 1\n");                /* set shutdown and wakeup request_waitds */                kmxlnd_data.kmx_shutdown = 1;                mb();                mx_wakeup(kmxlnd_data.kmx_endpt);                up(&kmxlnd_data.kmx_tx_queue_sem);                mxlnd_sleep(2 * HZ);                /* fall through */        case MXLND_INIT_THREADS:                CDEBUG(D_NET, "waiting on threads\n");                /* wait for threads to complete */                for (i = 0; i < MXLND_NCOMPLETIONS; i++) {                        wait_for_completion(&kmxlnd_data.kmx_completions[i]);                }                LASSERT(atomic_read(&kmxlnd_data.kmx_nthreads) == 0);                CDEBUG(D_NET, "freeing completions\n");                MXLND_FREE(kmxlnd_data.kmx_completions,                             MXLND_NCOMPLETIONS * sizeof(struct completion));                /* fall through */        case MXLND_INIT_MX:                CDEBUG(D_NET, "stopping mx\n");                /* wakeup waiters if they missed the above.                 * close endpoint to stop all traffic.                 * this will cancel and cleanup all requests, etc. */                mx_wakeup(kmxlnd_data.kmx_endpt);                mx_close_endpoint(kmxlnd_data.kmx_endpt);                mx_finalize();                CDEBUG(D_NET, "mxlnd_free_hosts();\n");                mxlnd_free_hosts();                /* fall through */        case MXLND_INIT_RXS:                CDEBUG(D_NET, "freeing rxs\n");                /* free all rxs and associated pages */                mxlnd_free_rxs();                /* fall through */        case MXLND_INIT_TXS:                CDEBUG(D_NET, "freeing txs\n");                /* free all txs and associated pages */                mxlnd_free_txs();                /* fall through */        case MXLND_INIT_DATA:                CDEBUG(D_NET, "freeing peers\n");                /* free peer list */                mxlnd_free_peers();                /* fall through */        case MXLND_INIT_NOTHING:                break;        }        CDEBUG(D_NET, "shutdown complete\n");        CDEBUG(D_MALLOC, "after MXLND cleanup: libcfs_kmemory %d "                         "kmx_mem_used %ld\n", atomic_read (&libcfs_kmemory),                          kmxlnd_data.kmx_mem_used);        kmxlnd_data.kmx_init = MXLND_INIT_NOTHING;        PORTAL_MODULE_UNUSE;        return;}/** * mxlnd_startup - initialize state, open an endpoint, start IO * @ni - LNET interface handle * * Initialize state, open an endpoint, start monitoring threads. * Should only be called once. */intmxlnd_startup (lnet_ni_t *ni){        int                     i       = 0;        int                     ret     = 0;        struct timeval          tv;        LASSERT (ni->ni_lnd == &the_kmxlnd);        if (kmxlnd_data.kmx_init != MXLND_INIT_NOTHING) {                CERROR("Only 1 instance supported\n");                return -EPERM;        }        CDEBUG(D_MALLOC, "before MXLND startup: libcfs_kmemory %d "                         "kmx_mem_used %ld\n", atomic_read (&libcfs_kmemory),                          kmxlnd_data.kmx_mem_used);        /* reserve 1/2 of tx for connect request messages */        ni->ni_maxtxcredits = *kmxlnd_tunables.kmx_ntx / 2;        ni->ni_peertxcredits = *kmxlnd_tunables.kmx_credits;        PORTAL_MODULE_USE;        memset (&kmxlnd_data, 0, sizeof (kmxlnd_data));        kmxlnd_data.kmx_ni = ni;        ni->ni_data = &kmxlnd_data;        do_gettimeofday(&tv);        kmxlnd_data.kmx_incarnation = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;        CDEBUG(D_NET, "my incarnation is %lld\n", kmxlnd_data.kmx_incarnation);        spin_lock_init (&kmxlnd_data.kmx_global_lock);        INIT_LIST_HEAD (&kmxlnd_data.kmx_conn_req);        spin_lock_init (&kmxlnd_data.kmx_conn_lock);        sema_init(&kmxlnd_data.kmx_conn_sem, 0);        INIT_LIST_HEAD (&kmxlnd_data.kmx_hosts);        spin_lock_init (&kmxlnd_data.kmx_hosts_lock);        for (i = 0; i < MXLND_HASH_SIZE; i++) {                INIT_LIST_HEAD (&kmxlnd_data.kmx_peers[i]);        }        rwlock_init (&kmxlnd_data.kmx_peers_lock);        INIT_LIST_HEAD (&kmxlnd_data.kmx_txs);        INIT_LIST_HEAD (&kmxlnd_data.kmx_tx_idle);        spin_lock_init (&kmxlnd_data.kmx_tx_idle_lock);        kmxlnd_data.kmx_tx_next_cookie = 1;        INIT_LIST_HEAD (&kmxlnd_data.kmx_tx_queue);        spin_lock_init (&kmxlnd_data.kmx_tx_queue_lock);        sema_init(&kmxlnd_data.kmx_tx_queue_sem, 0);        INIT_LIST_HEAD (&kmxlnd_data.kmx_rxs);        spin_lock_init (&kmxlnd_data.kmx_rxs_lock);        INIT_LIST_HEAD (&kmxlnd_data.kmx_rx_idle);        spin_lock_init (&kmxlnd_data.kmx_rx_idle_lock);                kmxlnd_data.kmx_init = MXLND_INIT_DATA;        /*****************************************************/        ret = mxlnd_init_txs();        if (ret != 0) {                CERROR("Can't alloc tx descs: %d\n", ret);                goto failed;        }        kmxlnd_data.kmx_init = MXLND_INIT_TXS;        /*****************************************************/        ret = mxlnd_init_rxs();        if (ret != 0) {                CERROR("Can't alloc rx descs: %d\n", ret);                goto failed;        }        kmxlnd_data.kmx_init = MXLND_INIT_RXS;        /*****************************************************/        ret = mxlnd_init_mx(ni);        if (ret != 0) {                CERROR("Can't init mx\n");                goto failed;        }        kmxlnd_data.kmx_init = MXLND_INIT_MX;        /*****************************************************/        /* start threads */        MXLND_ALLOC (kmxlnd_data.kmx_completions,                      MXLND_NCOMPLETIONS * sizeof(struct completion));        if (kmxlnd_data.kmx_completions == NULL) {                CERROR("failed to alloc kmxlnd_data.kmx_completions");                goto failed;        }        memset(kmxlnd_data.kmx_completions, 0,                MXLND_NCOMPLETIONS * sizeof(struct completion));        {                int     i               = 0;                if (MXLND_N_SCHED > *kmxlnd_tunables.kmx_n_waitd) {                        *kmxlnd_tunables.kmx_n_waitd = MXLND_N_SCHED;                }                CDEBUG(D_NET, "using %d %s in mx_wait_any()\n",                        *kmxlnd_tunables.kmx_n_waitd,                         *kmxlnd_tunables.kmx_n_waitd == 1 ? "thread" : "threads");                for (i = 0; i < *kmxlnd_tunables.kmx_n_waitd; i++) {                        ret = mxlnd_thread_start(mxlnd_request_waitd, (void*)((long)i));                        if (ret < 0) {                                CERROR("Starting mxlnd_request_waitd[%d] failed with %d\n", i, ret);                                for (--i; i >= 0; i--) {                                        wait_for_completion(&kmxlnd_data.kmx_completions[i]);                                }                                LASSERT(atomic_read(&kmxlnd_data.kmx_nthreads) == 0);                                MXLND_FREE(kmxlnd_data.kmx_completions,                                         MXLND_NCOMPLETIONS * sizeof(struct completion));                                goto failed;                        }                }                ret = mxlnd_thread_start(mxlnd_tx_queued, (void*)((long)i++));                if (ret < 0) {                        CERROR("Starting mxlnd_tx_queued failed with %d\n", ret);                        for (--i; i >= 0; i--) {                                wait_for_completion(&kmxlnd_data.kmx_completions[i]);                        }                        LASSERT(atomic_read(&kmxlnd_data.kmx_nthreads) == 0);                        MXLND_FREE(kmxlnd_data.kmx_completions,                                 MXLND_NCOMPLETIONS * sizeof(struct completion));                        goto failed;                }                ret = mxlnd_thread_start(mxlnd_timeoutd, (void*)((long)i++));                if (ret < 0) {                        CERROR("Starting mxlnd_timeoutd failed with %d\n", ret);                        for (--i; i >= 0; i--) {                                wait_for_completion(&kmxlnd_data.kmx_completions[i]);                        }                        LASSERT(atomic_read(&kmxlnd_data.kmx_nthreads) == 0);                        MXLND_FREE(kmxlnd_data.kmx_completions,                                 MXLND_NCOMPLETIONS * sizeof(struct completion));                        goto failed;                }        }        kmxlnd_data.kmx_init = MXLND_INIT_THREADS;        /*****************************************************/                kmxlnd_data.kmx_init = MXLND_INIT_ALL;        CDEBUG(D_MALLOC, "startup complete (kmx_mem_used %ld)\n", kmxlnd_data.kmx_mem_used);                return 0;failed:        CERROR("mxlnd_startup failed\n");        mxlnd_shutdown (ni);            return (-ENETDOWN);}static int mxlnd_init(void){        lnet_register_lnd(&the_kmxlnd);	return 0;}static void mxlnd_exit(void){        lnet_unregister_lnd(&the_kmxlnd);	return;}module_init(mxlnd_init);module_exit(mxlnd_exit);MODULE_LICENSE("GPL");MODULE_AUTHOR("Myricom, Inc. - help@myri.com");MODULE_DESCRIPTION("Kernel MyrinetExpress LND");MODULE_VERSION("0.5.0");

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -