📄 ptllnd.c
字号:
CDEBUG(D_NET, "ptl id=%s, lnet id=%s\n", kptllnd_ptlid2str(kptllnd_data.kptl_portals_id), libcfs_nid2str(ni->ni_nid)); /* Initialized the incarnation - it must be for-all-time unique, even * accounting for the fact that we increment it when we disconnect a * peer that's using it */ do_gettimeofday(&tv); kptllnd_data.kptl_incarnation = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec; CDEBUG(D_NET, "Incarnation="LPX64"\n", kptllnd_data.kptl_incarnation); /* * Allocate and setup the peer hash table */ rwlock_init(&kptllnd_data.kptl_peer_rw_lock); init_waitqueue_head(&kptllnd_data.kptl_watchdog_waitq); INIT_LIST_HEAD(&kptllnd_data.kptl_closing_peers); INIT_LIST_HEAD(&kptllnd_data.kptl_zombie_peers); kptllnd_data.kptl_peer_hash_size = *kptllnd_tunables.kptl_peer_hash_table_size; LIBCFS_ALLOC(kptllnd_data.kptl_peers, (kptllnd_data.kptl_peer_hash_size * sizeof(struct list_head))); if (kptllnd_data.kptl_peers == NULL) { CERROR("Failed to allocate space for peer hash table size=%d\n", kptllnd_data.kptl_peer_hash_size); rc = -ENOMEM; goto failed; } for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++) INIT_LIST_HEAD(&kptllnd_data.kptl_peers[i]); LIBCFS_ALLOC(kptllnd_data.kptl_nak_msg, offsetof(kptl_msg_t, ptlm_u)); if (kptllnd_data.kptl_nak_msg == NULL) { CERROR("Can't allocate NAK msg\n"); rc = -ENOMEM; goto failed; } memset(kptllnd_data.kptl_nak_msg, 0, offsetof(kptl_msg_t, ptlm_u)); kptllnd_init_msg(kptllnd_data.kptl_nak_msg, PTLLND_MSG_TYPE_NAK, 0); kptllnd_data.kptl_nak_msg->ptlm_magic = PTLLND_MSG_MAGIC; kptllnd_data.kptl_nak_msg->ptlm_version = PTLLND_MSG_VERSION; kptllnd_data.kptl_nak_msg->ptlm_srcpid = the_lnet.ln_pid; kptllnd_data.kptl_nak_msg->ptlm_srcnid = ni->ni_nid; kptllnd_data.kptl_nak_msg->ptlm_srcstamp = kptllnd_data.kptl_incarnation; kptllnd_data.kptl_nak_msg->ptlm_dstpid = LNET_PID_ANY; kptllnd_data.kptl_nak_msg->ptlm_dstnid = LNET_NID_ANY; kptllnd_rx_buffer_pool_init(&kptllnd_data.kptl_rx_buffer_pool); kptllnd_data.kptl_rx_cache = cfs_mem_cache_create("ptllnd_rx", sizeof(kptl_rx_t) + *kptllnd_tunables.kptl_max_msg_size, 0, /* offset */ 0); /* flags */ if (kptllnd_data.kptl_rx_cache == NULL) { CERROR("Can't create slab for RX descriptors\n"); rc = -ENOMEM; goto failed; } /* lists/ptrs/locks initialised */ kptllnd_data.kptl_init = PTLLND_INIT_DATA; /*****************************************************/ rc = kptllnd_setup_tx_descs(); if (rc != 0) { CERROR("Can't pre-allocate %d TX descriptors: %d\n", *kptllnd_tunables.kptl_ntx, rc); goto failed; } /* Start the scheduler threads for handling incoming requests. No need * to advance the state because this will be automatically cleaned up * now that PTLNAT_INIT_DATA state has been entered */ CDEBUG(D_NET, "starting %d scheduler threads\n", PTLLND_N_SCHED); for (i = 0; i < PTLLND_N_SCHED; i++) { rc = kptllnd_thread_start(kptllnd_scheduler, (void *)((long)i)); if (rc != 0) { CERROR("Can't spawn scheduler[%d]: %d\n", i, rc); goto failed; } } rc = kptllnd_thread_start(kptllnd_watchdog, NULL); if (rc != 0) { CERROR("Can't spawn watchdog: %d\n", rc); goto failed; } /* Ensure that 'rxb_nspare' buffers can be off the net (being emptied) * and we will still have enough buffers posted for all our peers */ spares = *kptllnd_tunables.kptl_rxb_nspare * ((*kptllnd_tunables.kptl_rxb_npages * PAGE_SIZE)/ *kptllnd_tunables.kptl_max_msg_size); /* reserve and post the buffers */ rc = kptllnd_rx_buffer_pool_reserve(&kptllnd_data.kptl_rx_buffer_pool, kptllnd_data.kptl_expected_peers + spares); if (rc != 0) { CERROR("Can't reserve RX Buffer pool: %d\n", rc); goto failed; } /* flag everything initialised */ kptllnd_data.kptl_init = PTLLND_INIT_ALL; /*****************************************************/ if (*kptllnd_tunables.kptl_checksum) CWARN("Checksumming enabled\n"); CDEBUG(D_NET, "<<< kptllnd_startup SUCCESS\n"); return 0; failed: CDEBUG(D_NET, "kptllnd_startup failed rc=%d\n", rc); kptllnd_shutdown(ni); return rc;}voidkptllnd_shutdown (lnet_ni_t *ni){ int i; ptl_err_t prc; lnet_process_id_t process_id; unsigned long flags; CDEBUG(D_MALLOC, "before LND cleanup: kmem %d\n", atomic_read (&libcfs_kmemory)); LASSERT (ni == kptllnd_data.kptl_ni); switch (kptllnd_data.kptl_init) { default: LBUG(); case PTLLND_INIT_ALL: case PTLLND_INIT_DATA: /* Stop receiving */ kptllnd_rx_buffer_pool_fini(&kptllnd_data.kptl_rx_buffer_pool); LASSERT (list_empty(&kptllnd_data.kptl_sched_rxq)); LASSERT (list_empty(&kptllnd_data.kptl_sched_rxbq)); /* Hold peertable lock to interleave cleanly with peer birth/death */ write_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags); LASSERT (kptllnd_data.kptl_shutdown == 0); kptllnd_data.kptl_shutdown = 1; /* phase 1 == destroy peers */ /* no new peers possible now */ write_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags); /* nuke all existing peers */ process_id.nid = LNET_NID_ANY; process_id.pid = LNET_PID_ANY; kptllnd_peer_del(process_id); read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags); LASSERT (kptllnd_data.kptl_n_active_peers == 0); i = 2; while (kptllnd_data.kptl_npeers != 0) { i++; CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, "Waiting for %d peers to terminate\n", kptllnd_data.kptl_npeers); read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags); cfs_pause(cfs_time_seconds(1)); read_lock_irqsave(&kptllnd_data.kptl_peer_rw_lock, flags); } LASSERT(list_empty(&kptllnd_data.kptl_closing_peers)); LASSERT(list_empty(&kptllnd_data.kptl_zombie_peers)); LASSERT (kptllnd_data.kptl_peers != NULL); for (i = 0; i < kptllnd_data.kptl_peer_hash_size; i++) LASSERT (list_empty (&kptllnd_data.kptl_peers[i])); read_unlock_irqrestore(&kptllnd_data.kptl_peer_rw_lock, flags); CDEBUG(D_NET, "All peers deleted\n"); /* Shutdown phase 2: kill the daemons... */ kptllnd_data.kptl_shutdown = 2; mb(); i = 2; while (atomic_read (&kptllnd_data.kptl_nthreads) != 0) { /* Wake up all threads*/ wake_up_all(&kptllnd_data.kptl_sched_waitq); wake_up_all(&kptllnd_data.kptl_watchdog_waitq); i++; CDEBUG(((i & (-i)) == i) ? D_WARNING : D_NET, /* power of 2? */ "Waiting for %d threads to terminate\n", atomic_read(&kptllnd_data.kptl_nthreads)); cfs_pause(cfs_time_seconds(1)); } CDEBUG(D_NET, "All Threads stopped\n"); LASSERT(list_empty(&kptllnd_data.kptl_sched_txq)); kptllnd_cleanup_tx_descs(); /* Nothing here now, but libcfs might soon require * us to explicitly destroy wait queues and semaphores * that would be done here */ /* fall through */ case PTLLND_INIT_NOTHING: CDEBUG(D_NET, "PTLLND_INIT_NOTHING\n"); break; } if (!PtlHandleIsEqual(kptllnd_data.kptl_eqh, PTL_INVALID_HANDLE)) { prc = PtlEQFree(kptllnd_data.kptl_eqh); if (prc != PTL_OK) CERROR("Error %s(%d) freeing portals EQ\n", kptllnd_errtype2str(prc), prc); } if (!PtlHandleIsEqual(kptllnd_data.kptl_nih, PTL_INVALID_HANDLE)) { prc = PtlNIFini(kptllnd_data.kptl_nih); if (prc != PTL_OK) CERROR("Error %s(%d) finalizing portals NI\n", kptllnd_errtype2str(prc), prc); } LASSERT (atomic_read(&kptllnd_data.kptl_ntx) == 0); LASSERT (list_empty(&kptllnd_data.kptl_idle_txs)); if (kptllnd_data.kptl_rx_cache != NULL) cfs_mem_cache_destroy(kptllnd_data.kptl_rx_cache); if (kptllnd_data.kptl_peers != NULL) LIBCFS_FREE (kptllnd_data.kptl_peers, sizeof (struct list_head) * kptllnd_data.kptl_peer_hash_size); if (kptllnd_data.kptl_nak_msg != NULL) LIBCFS_FREE (kptllnd_data.kptl_nak_msg, offsetof(kptl_msg_t, ptlm_u)); memset(&kptllnd_data, 0, sizeof(kptllnd_data)); CDEBUG(D_MALLOC, "after LND cleanup: kmem %d\n", atomic_read (&libcfs_kmemory)); PORTAL_MODULE_UNUSE;}int __initkptllnd_module_init (void){ int rc; kptllnd_assert_wire_constants(); rc = kptllnd_tunables_init(); if (rc != 0) return rc; kptllnd_init_ptltrace(); lnet_register_lnd(&kptllnd_lnd); return 0;}void __exitkptllnd_module_fini (void){ lnet_unregister_lnd(&kptllnd_lnd); kptllnd_tunables_fini();}MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");MODULE_DESCRIPTION("Kernel Portals LND v1.00");MODULE_LICENSE("GPL");module_init(kptllnd_module_init);module_exit(kptllnd_module_fini);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -