⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ptllnd.c

📁 非常经典的一个分布式系统
💻 C
📖 第 1 页 / 共 3 页
字号:
        CDEBUG(D_NET, "ptl id=%s, lnet id=%s\n",                kptllnd_ptlid2str(kptllnd_data.kptl_portals_id),               libcfs_nid2str(ni->ni_nid));        /* Initialized the incarnation - it must be for-all-time unique, even         * accounting for the fact that we increment it when we disconnect a         * peer that's using it */        do_gettimeofday(&tv);        kptllnd_data.kptl_incarnation = (((__u64)tv.tv_sec) * 1000000) +                                        tv.tv_usec;        CDEBUG(D_NET, "Incarnation="LPX64"\n", kptllnd_data.kptl_incarnation);        /*         * Allocate and setup the peer hash table         */        rwlock_init(&kptllnd_data.kptl_peer_rw_lock);        init_waitqueue_head(&kptllnd_data.kptl_watchdog_waitq);        INIT_LIST_HEAD(&kptllnd_data.kptl_closing_peers);        INIT_LIST_HEAD(&kptllnd_data.kptl_zombie_peers);        kptllnd_data.kptl_peer_hash_size =                *kptllnd_tunables.kptl_peer_hash_table_size;        LIBCFS_ALLOC(kptllnd_data.kptl_peers,                     (kptllnd_data.kptl_peer_hash_size *                       sizeof(struct list_head)));        if (kptllnd_data.kptl_peers == NULL) {                CERROR("Failed to allocate space for peer hash table size=%d\n",                        kptllnd_data.kptl_peer_hash_size);                rc = -ENOMEM;                goto failed;        }        for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++)                INIT_LIST_HEAD(&kptllnd_data.kptl_peers[i]);        LIBCFS_ALLOC(kptllnd_data.kptl_nak_msg, offsetof(kptl_msg_t, ptlm_u));        if (kptllnd_data.kptl_nak_msg == NULL) {                CERROR("Can't allocate NAK msg\n");                rc = -ENOMEM;                goto failed;        }        memset(kptllnd_data.kptl_nak_msg, 0, offsetof(kptl_msg_t, ptlm_u));        kptllnd_init_msg(kptllnd_data.kptl_nak_msg, PTLLND_MSG_TYPE_NAK, 0);        kptllnd_data.kptl_nak_msg->ptlm_magic    = PTLLND_MSG_MAGIC;        kptllnd_data.kptl_nak_msg->ptlm_version  = PTLLND_MSG_VERSION;        kptllnd_data.kptl_nak_msg->ptlm_srcpid   = the_lnet.ln_pid;        kptllnd_data.kptl_nak_msg->ptlm_srcnid   = ni->ni_nid;        kptllnd_data.kptl_nak_msg->ptlm_srcstamp = kptllnd_data.kptl_incarnation;        kptllnd_data.kptl_nak_msg->ptlm_dstpid   = LNET_PID_ANY;        kptllnd_data.kptl_nak_msg->ptlm_dstnid   = LNET_NID_ANY;        kptllnd_rx_buffer_pool_init(&kptllnd_data.kptl_rx_buffer_pool);        kptllnd_data.kptl_rx_cache =                 cfs_mem_cache_create("ptllnd_rx",                                     sizeof(kptl_rx_t) +                                      *kptllnd_tunables.kptl_max_msg_size,                                     0,    /* offset */                                     0);   /* flags */        if (kptllnd_data.kptl_rx_cache == NULL) {                CERROR("Can't create slab for RX descriptors\n");                rc = -ENOMEM;                goto failed;        }        /* lists/ptrs/locks initialised */        kptllnd_data.kptl_init = PTLLND_INIT_DATA;        /*****************************************************/        rc = kptllnd_setup_tx_descs();        if (rc != 0) {                CERROR("Can't pre-allocate %d TX descriptors: %d\n",                       *kptllnd_tunables.kptl_ntx, rc);                goto failed;        }                /* Start the scheduler threads for handling incoming requests.  No need         * to advance the state because this will be automatically cleaned up         * now that PTLNAT_INIT_DATA state has been entered */        CDEBUG(D_NET, "starting %d scheduler threads\n", PTLLND_N_SCHED);        for (i = 0; i < PTLLND_N_SCHED; i++) {                rc = kptllnd_thread_start(kptllnd_scheduler, (void *)((long)i));                if (rc != 0) {                        CERROR("Can't spawn scheduler[%d]: %d\n", i, rc);                        goto failed;                }        }        rc = kptllnd_thread_start(kptllnd_watchdog, NULL);        if (rc != 0) {                CERROR("Can't spawn watchdog: %d\n", rc);                goto failed;        }        /* Ensure that 'rxb_nspare' buffers can be off the net (being emptied)         * and we will still have enough buffers posted for all our peers */        spares = *kptllnd_tunables.kptl_rxb_nspare *                 ((*kptllnd_tunables.kptl_rxb_npages * PAGE_SIZE)/                  *kptllnd_tunables.kptl_max_msg_size);        /* reserve and post the buffers */        rc = kptllnd_rx_buffer_pool_reserve(&kptllnd_data.kptl_rx_buffer_pool,                                            kptllnd_data.kptl_expected_peers +                                            spares);        if (rc != 0) {                CERROR("Can't reserve RX Buffer pool: %d\n", rc);                goto failed;        }        /* flag everything initialised */        kptllnd_data.kptl_init = PTLLND_INIT_ALL;        /*****************************************************/        if (*kptllnd_tunables.kptl_checksum)                CWARN("Checksumming enabled\n");                CDEBUG(D_NET, "<<< kptllnd_startup SUCCESS\n");        return 0; failed:        CDEBUG(D_NET, "kptllnd_startup failed rc=%d\n", rc);        kptllnd_shutdown(ni);        return rc;}voidkptllnd_shutdown (lnet_ni_t *ni){        int               i;        ptl_err_t         prc;        lnet_process_id_t process_id;        unsigned long     flags;        CDEBUG(D_MALLOC, "before LND cleanup: kmem %d\n",               atomic_read (&libcfs_kmemory));        LASSERT (ni == kptllnd_data.kptl_ni);        switch (kptllnd_data.kptl_init) {        default:                LBUG();        case PTLLND_INIT_ALL:        case PTLLND_INIT_DATA:                /* Stop receiving */                kptllnd_rx_buffer_pool_fini(&kptllnd_data.kptl_rx_buffer_pool);                LASSERT (list_empty(&kptllnd_data.kptl_sched_rxq));                LASSERT (list_empty(&kptllnd_data.kptl_sched_rxbq));                /* Hold peertable lock to interleave cleanly with peer birth/death */                write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);                LASSERT (kptllnd_data.kptl_shutdown == 0);                kptllnd_data.kptl_shutdown = 1; /* phase 1 == destroy peers */                /* no new peers possible now */                write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,                                         flags);                /* nuke all existing peers */                process_id.nid = LNET_NID_ANY;                process_id.pid = LNET_PID_ANY;                kptllnd_peer_del(process_id);                read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags);                LASSERT (kptllnd_data.kptl_n_active_peers == 0);                i = 2;                while (kptllnd_data.kptl_npeers != 0) {                        i++;                        CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET,                               "Waiting for %d peers to terminate\n",                               kptllnd_data.kptl_npeers);                        read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock,                                                flags);                        cfs_pause(cfs_time_seconds(1));                        read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock,                                           flags);                }                LASSERT(list_empty(&kptllnd_data.kptl_closing_peers));                LASSERT(list_empty(&kptllnd_data.kptl_zombie_peers));                LASSERT (kptllnd_data.kptl_peers != NULL);                for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++)                        LASSERT (list_empty (&kptllnd_data.kptl_peers[i]));                read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags);                CDEBUG(D_NET, "All peers deleted\n");                /* Shutdown phase 2: kill the daemons... */                kptllnd_data.kptl_shutdown = 2;                mb();                                i = 2;                while (atomic_read (&kptllnd_data.kptl_nthreads) != 0) {                        /* Wake up all threads*/                        wake_up_all(&kptllnd_data.kptl_sched_waitq);                        wake_up_all(&kptllnd_data.kptl_watchdog_waitq);                        i++;                        CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */                               "Waiting for %d threads to terminate\n",                               atomic_read(&kptllnd_data.kptl_nthreads));                        cfs_pause(cfs_time_seconds(1));                }                CDEBUG(D_NET, "All Threads stopped\n");                LASSERT(list_empty(&kptllnd_data.kptl_sched_txq));                kptllnd_cleanup_tx_descs();                /* Nothing here now, but libcfs might soon require                 * us to explicitly destroy wait queues and semaphores                 * that would be done here */                /* fall through */        case PTLLND_INIT_NOTHING:                CDEBUG(D_NET, "PTLLND_INIT_NOTHING\n");                break;        }        if (!PtlHandleIsEqual(kptllnd_data.kptl_eqh, PTL_INVALID_HANDLE)) {                prc = PtlEQFree(kptllnd_data.kptl_eqh);                if (prc != PTL_OK)                        CERROR("Error %s(%d) freeing portals EQ\n",                               kptllnd_errtype2str(prc), prc);        }        if (!PtlHandleIsEqual(kptllnd_data.kptl_nih, PTL_INVALID_HANDLE)) {                prc = PtlNIFini(kptllnd_data.kptl_nih);                if (prc != PTL_OK)                        CERROR("Error %s(%d) finalizing portals NI\n",                               kptllnd_errtype2str(prc), prc);        }                LASSERT (atomic_read(&kptllnd_data.kptl_ntx) == 0);        LASSERT (list_empty(&kptllnd_data.kptl_idle_txs));        if (kptllnd_data.kptl_rx_cache != NULL)                cfs_mem_cache_destroy(kptllnd_data.kptl_rx_cache);        if (kptllnd_data.kptl_peers != NULL)                LIBCFS_FREE (kptllnd_data.kptl_peers,                             sizeof (struct list_head) *                             kptllnd_data.kptl_peer_hash_size);        if (kptllnd_data.kptl_nak_msg != NULL)                LIBCFS_FREE (kptllnd_data.kptl_nak_msg,                             offsetof(kptl_msg_t, ptlm_u));        memset(&kptllnd_data, 0, sizeof(kptllnd_data));        CDEBUG(D_MALLOC, "after LND cleanup: kmem %d\n",               atomic_read (&libcfs_kmemory));        PORTAL_MODULE_UNUSE;}int __initkptllnd_module_init (void){        int    rc;        kptllnd_assert_wire_constants();        rc = kptllnd_tunables_init();        if (rc != 0)                return rc;        kptllnd_init_ptltrace();        lnet_register_lnd(&kptllnd_lnd);        return 0;}void __exitkptllnd_module_fini (void){        lnet_unregister_lnd(&kptllnd_lnd);        kptllnd_tunables_fini();}MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");MODULE_DESCRIPTION("Kernel Portals LND v1.00");MODULE_LICENSE("GPL");module_init(kptllnd_module_init);module_exit(kptllnd_module_fini);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -