📄 lockspace.c
字号:
dlm_astd_stop(); fail: return error;}static void threads_stop(void){ dlm_scand_stop(); dlm_lowcomms_stop(); dlm_astd_stop();}static int new_lockspace(char *name, int namelen, void **lockspace, uint32_t flags, int lvblen){ struct dlm_ls *ls; int i, size, error = -ENOMEM; int do_unreg = 0; if (namelen > DLM_LOCKSPACE_LEN) return -EINVAL; if (!lvblen || (lvblen % 8)) return -EINVAL; if (!try_module_get(THIS_MODULE)) return -EINVAL; ls = dlm_find_lockspace_name(name, namelen); if (ls) { *lockspace = ls; module_put(THIS_MODULE); return -EEXIST; } ls = kzalloc(sizeof(struct dlm_ls) + namelen, GFP_KERNEL); if (!ls) goto out; memcpy(ls->ls_name, name, namelen); ls->ls_namelen = namelen; ls->ls_lvblen = lvblen; ls->ls_count = 0; ls->ls_flags = 0; if (flags & DLM_LSFL_TIMEWARN) set_bit(LSFL_TIMEWARN, &ls->ls_flags); if (flags & DLM_LSFL_FS) ls->ls_allocation = GFP_NOFS; else ls->ls_allocation = GFP_KERNEL; /* ls_exflags are forced to match among nodes, and we don't need to require all nodes to have TIMEWARN or FS set */ ls->ls_exflags = (flags & ~(DLM_LSFL_TIMEWARN | DLM_LSFL_FS)); size = dlm_config.ci_rsbtbl_size; ls->ls_rsbtbl_size = size; ls->ls_rsbtbl = kmalloc(sizeof(struct dlm_rsbtable) * size, GFP_KERNEL); if (!ls->ls_rsbtbl) goto out_lsfree; for (i = 0; i < size; i++) { INIT_LIST_HEAD(&ls->ls_rsbtbl[i].list); INIT_LIST_HEAD(&ls->ls_rsbtbl[i].toss); rwlock_init(&ls->ls_rsbtbl[i].lock); } size = dlm_config.ci_lkbtbl_size; ls->ls_lkbtbl_size = size; ls->ls_lkbtbl = kmalloc(sizeof(struct dlm_lkbtable) * size, GFP_KERNEL); if (!ls->ls_lkbtbl) goto out_rsbfree; for (i = 0; i < size; i++) { INIT_LIST_HEAD(&ls->ls_lkbtbl[i].list); rwlock_init(&ls->ls_lkbtbl[i].lock); ls->ls_lkbtbl[i].counter = 1; } size = dlm_config.ci_dirtbl_size; ls->ls_dirtbl_size = size; ls->ls_dirtbl = kmalloc(sizeof(struct dlm_dirtable) * size, GFP_KERNEL); if (!ls->ls_dirtbl) goto out_lkbfree; for (i = 0; i < size; i++) { INIT_LIST_HEAD(&ls->ls_dirtbl[i].list); rwlock_init(&ls->ls_dirtbl[i].lock); } INIT_LIST_HEAD(&ls->ls_waiters); mutex_init(&ls->ls_waiters_mutex); INIT_LIST_HEAD(&ls->ls_orphans); mutex_init(&ls->ls_orphans_mutex); INIT_LIST_HEAD(&ls->ls_timeout); mutex_init(&ls->ls_timeout_mutex); INIT_LIST_HEAD(&ls->ls_nodes); INIT_LIST_HEAD(&ls->ls_nodes_gone); ls->ls_num_nodes = 0; ls->ls_low_nodeid = 0; ls->ls_total_weight = 0; ls->ls_node_array = NULL; memset(&ls->ls_stub_rsb, 0, sizeof(struct dlm_rsb)); ls->ls_stub_rsb.res_ls = ls; ls->ls_debug_rsb_dentry = NULL; ls->ls_debug_waiters_dentry = NULL; init_waitqueue_head(&ls->ls_uevent_wait); ls->ls_uevent_result = 0; init_completion(&ls->ls_members_done); ls->ls_members_result = -1; ls->ls_recoverd_task = NULL; mutex_init(&ls->ls_recoverd_active); spin_lock_init(&ls->ls_recover_lock); spin_lock_init(&ls->ls_rcom_spin); get_random_bytes(&ls->ls_rcom_seq, sizeof(uint64_t)); ls->ls_recover_status = 0; ls->ls_recover_seq = 0; ls->ls_recover_args = NULL; init_rwsem(&ls->ls_in_recovery); init_rwsem(&ls->ls_recv_active); INIT_LIST_HEAD(&ls->ls_requestqueue); mutex_init(&ls->ls_requestqueue_mutex); mutex_init(&ls->ls_clear_proc_locks); ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_KERNEL); if (!ls->ls_recover_buf) goto out_dirfree; INIT_LIST_HEAD(&ls->ls_recover_list); spin_lock_init(&ls->ls_recover_list_lock); ls->ls_recover_list_count = 0; ls->ls_local_handle = ls; init_waitqueue_head(&ls->ls_wait_general); INIT_LIST_HEAD(&ls->ls_root_list); init_rwsem(&ls->ls_root_sem); down_write(&ls->ls_in_recovery); spin_lock(&lslist_lock); list_add(&ls->ls_list, &lslist); spin_unlock(&lslist_lock); /* needs to find ls in lslist */ error = dlm_recoverd_start(ls); if (error) { log_error(ls, "can't start dlm_recoverd %d", error); goto out_delist; } error = kobject_setup(ls); if (error) goto out_stop; error = kobject_register(&ls->ls_kobj); if (error) goto out_stop; /* let kobject handle freeing of ls if there's an error */ do_unreg = 1; /* This uevent triggers dlm_controld in userspace to add us to the group of nodes that are members of this lockspace (managed by the cluster infrastructure.) Once it's done that, it tells us who the current lockspace members are (via configfs) and then tells the lockspace to start running (via sysfs) in dlm_ls_start(). */ error = do_uevent(ls, 1); if (error) goto out_stop; wait_for_completion(&ls->ls_members_done); error = ls->ls_members_result; if (error) goto out_members; dlm_create_debug_file(ls); log_debug(ls, "join complete"); *lockspace = ls; return 0; out_members: do_uevent(ls, 0); dlm_clear_members(ls); kfree(ls->ls_node_array); out_stop: dlm_recoverd_stop(ls); out_delist: spin_lock(&lslist_lock); list_del(&ls->ls_list); spin_unlock(&lslist_lock); kfree(ls->ls_recover_buf); out_dirfree: kfree(ls->ls_dirtbl); out_lkbfree: kfree(ls->ls_lkbtbl); out_rsbfree: kfree(ls->ls_rsbtbl); out_lsfree: if (do_unreg) kobject_unregister(&ls->ls_kobj); else kfree(ls); out: module_put(THIS_MODULE); return error;}int dlm_new_lockspace(char *name, int namelen, void **lockspace, uint32_t flags, int lvblen){ int error = 0; mutex_lock(&ls_lock); if (!ls_count) error = threads_start(); if (error) goto out; error = new_lockspace(name, namelen, lockspace, flags, lvblen); if (!error) ls_count++; else if (!ls_count) threads_stop(); out: mutex_unlock(&ls_lock); return error;}/* Return 1 if the lockspace still has active remote locks, * 2 if the lockspace still has active local locks. */static int lockspace_busy(struct dlm_ls *ls){ int i, lkb_found = 0; struct dlm_lkb *lkb; /* NOTE: We check the lockidtbl here rather than the resource table. This is because there may be LKBs queued as ASTs that have been unlinked from their RSBs and are pending deletion once the AST has been delivered */ for (i = 0; i < ls->ls_lkbtbl_size; i++) { read_lock(&ls->ls_lkbtbl[i].lock); if (!list_empty(&ls->ls_lkbtbl[i].list)) { lkb_found = 1; list_for_each_entry(lkb, &ls->ls_lkbtbl[i].list, lkb_idtbl_list) { if (!lkb->lkb_nodeid) { read_unlock(&ls->ls_lkbtbl[i].lock); return 2; } } } read_unlock(&ls->ls_lkbtbl[i].lock); } return lkb_found;}static int release_lockspace(struct dlm_ls *ls, int force){ struct dlm_lkb *lkb; struct dlm_rsb *rsb; struct list_head *head; int i; int busy = lockspace_busy(ls); if (busy > force) return -EBUSY; if (force < 3) do_uevent(ls, 0); dlm_recoverd_stop(ls); remove_lockspace(ls); dlm_delete_debug_file(ls); dlm_astd_suspend(); kfree(ls->ls_recover_buf); /* * Free direntry structs. */ dlm_dir_clear(ls); kfree(ls->ls_dirtbl); /* * Free all lkb's on lkbtbl[] lists. */ for (i = 0; i < ls->ls_lkbtbl_size; i++) { head = &ls->ls_lkbtbl[i].list; while (!list_empty(head)) { lkb = list_entry(head->next, struct dlm_lkb, lkb_idtbl_list); list_del(&lkb->lkb_idtbl_list); dlm_del_ast(lkb); if (lkb->lkb_lvbptr && lkb->lkb_flags & DLM_IFL_MSTCPY) free_lvb(lkb->lkb_lvbptr); free_lkb(lkb); } } dlm_astd_resume(); kfree(ls->ls_lkbtbl); /* * Free all rsb's on rsbtbl[] lists */ for (i = 0; i < ls->ls_rsbtbl_size; i++) { head = &ls->ls_rsbtbl[i].list; while (!list_empty(head)) { rsb = list_entry(head->next, struct dlm_rsb, res_hashchain); list_del(&rsb->res_hashchain); free_rsb(rsb); } head = &ls->ls_rsbtbl[i].toss; while (!list_empty(head)) { rsb = list_entry(head->next, struct dlm_rsb, res_hashchain); list_del(&rsb->res_hashchain); free_rsb(rsb); } } kfree(ls->ls_rsbtbl); /* * Free structures on any other lists */ dlm_purge_requestqueue(ls); kfree(ls->ls_recover_args); dlm_clear_free_entries(ls); dlm_clear_members(ls); dlm_clear_members_gone(ls); kfree(ls->ls_node_array); kobject_unregister(&ls->ls_kobj); /* The ls structure will be freed when the kobject is done with */ mutex_lock(&ls_lock); ls_count--; if (!ls_count) threads_stop(); mutex_unlock(&ls_lock); module_put(THIS_MODULE); return 0;}/* * Called when a system has released all its locks and is not going to use the * lockspace any longer. We free everything we're managing for this lockspace. * Remaining nodes will go through the recovery process as if we'd died. The * lockspace must continue to function as usual, participating in recoveries, * until this returns. * * Force has 4 possible values: * 0 - don't destroy locksapce if it has any LKBs * 1 - destroy lockspace if it has remote LKBs but not if it has local LKBs * 2 - destroy lockspace regardless of LKBs * 3 - destroy lockspace as part of a forced shutdown */int dlm_release_lockspace(void *lockspace, int force){ struct dlm_ls *ls; ls = dlm_find_lockspace_local(lockspace); if (!ls) return -EINVAL; dlm_put_lockspace(ls); return release_lockspace(ls, force);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -