📄 dlmglue.c
字号:
/* launch vote thread */ osb->vote_task = kthread_run(ocfs2_vote_thread, osb, "ocfs2vote-%d", osb->osb_id); if (IS_ERR(osb->vote_task)) { status = PTR_ERR(osb->vote_task); osb->vote_task = NULL; mlog_errno(status); goto bail; } /* used by the dlm code to make message headers unique, each * node in this domain must agree on this. */ dlm_key = crc32_le(0, osb->uuid_str, strlen(osb->uuid_str)); /* for now, uuid == domain */ dlm = dlm_register_domain(osb->uuid_str, dlm_key); if (IS_ERR(dlm)) { status = PTR_ERR(dlm); mlog_errno(status); goto bail; } dlm_register_eviction_cb(dlm, &osb->osb_eviction_cb);local: ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb); ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb); osb->dlm = dlm; status = 0;bail: if (status < 0) { ocfs2_dlm_shutdown_debug(osb); if (osb->vote_task) kthread_stop(osb->vote_task); } mlog_exit(status); return status;}void ocfs2_dlm_shutdown(struct ocfs2_super *osb){ mlog_entry_void(); dlm_unregister_eviction_cb(&osb->osb_eviction_cb); ocfs2_drop_osb_locks(osb); if (osb->vote_task) { kthread_stop(osb->vote_task); osb->vote_task = NULL; } ocfs2_lock_res_free(&osb->osb_super_lockres); ocfs2_lock_res_free(&osb->osb_rename_lockres); dlm_unregister_domain(osb->dlm); osb->dlm = NULL; ocfs2_dlm_shutdown_debug(osb); mlog_exit_void();}static void ocfs2_unlock_ast_func(void *opaque, enum dlm_status status){ struct ocfs2_lock_res *lockres = opaque; mlog_entry_void(); mlog(0, "UNLOCK AST called on lock %s, action = %d\n", lockres->l_name, lockres->l_unlock_action); spin_lock(&lockres->l_lock); /* We tried to cancel a convert request, but it was already * granted. All we want to do here is clear our unlock * state. The wake_up call done at the bottom is redundant * (__ocfs2_cancel_convert doesn't sleep on this) but doesn't * hurt anything anyway */ if (status == DLM_CANCELGRANT && lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) { mlog(0, "Got cancelgrant for %s\n", lockres->l_name); /* We don't clear the busy flag in this case as it * should have been cleared by the ast which the dlm * has called. */ goto complete_unlock; } if (status != DLM_NORMAL) { mlog(ML_ERROR, "Dlm passes status %d for lock %s, " "unlock_action %d\n", status, lockres->l_name, lockres->l_unlock_action); spin_unlock(&lockres->l_lock); return; } switch(lockres->l_unlock_action) { case OCFS2_UNLOCK_CANCEL_CONVERT: mlog(0, "Cancel convert success for %s\n", lockres->l_name); lockres->l_action = OCFS2_AST_INVALID; break; case OCFS2_UNLOCK_DROP_LOCK: lockres->l_level = LKM_IVMODE; break; default: BUG(); } lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);complete_unlock: lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; spin_unlock(&lockres->l_lock); wake_up(&lockres->l_event); mlog_exit_void();}/* BEWARE: called with lockres lock, and always drops it. Caller * should not be calling us with a busy lock... */static int __ocfs2_drop_lock(struct ocfs2_super *osb, struct ocfs2_lock_res *lockres){ int ret = 0; enum dlm_status status; if (lockres->l_flags & OCFS2_LOCK_BUSY) mlog(ML_ERROR, "destroying busy lock: \"%s\"\n", lockres->l_name); if (lockres->l_flags & OCFS2_LOCK_BLOCKED) mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name); if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { spin_unlock(&lockres->l_lock); goto bail; } lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED); /* make sure we never get here while waiting for an ast to * fire. */ BUG_ON(lockres->l_action != OCFS2_AST_INVALID); /* is this necessary? */ lockres_or_flags(lockres, OCFS2_LOCK_BUSY); lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK; spin_unlock(&lockres->l_lock); mlog(0, "lock %s\n", lockres->l_name); status = dlmunlock(osb->dlm, &lockres->l_lksb, LKM_VALBLK, lockres->l_ops->unlock_ast, lockres); if (status != DLM_NORMAL) { ocfs2_log_dlm_error("dlmunlock", status, lockres); mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags); dlm_print_one_lock(lockres->l_lksb.lockid); BUG(); } mlog(0, "lock %s, successfull return from dlmunlock\n", lockres->l_name); ocfs2_wait_on_busy_lock(lockres);bail: mlog_exit(ret); return ret;}typedef void (ocfs2_pre_drop_cb_t)(struct ocfs2_lock_res *, void *);struct drop_lock_cb { ocfs2_pre_drop_cb_t *drop_func; void *drop_data;};static int ocfs2_drop_lock(struct ocfs2_super *osb, struct ocfs2_lock_res *lockres, struct drop_lock_cb *dcb){ /* We didn't get anywhere near actually using this lockres. */ if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED)) return 0; spin_lock(&lockres->l_lock); mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING), "lockres %s, flags 0x%lx\n", lockres->l_name, lockres->l_flags); while (lockres->l_flags & OCFS2_LOCK_BUSY) { mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = " "%u, unlock_action = %u\n", lockres->l_name, lockres->l_flags, lockres->l_action, lockres->l_unlock_action); spin_unlock(&lockres->l_lock); /* XXX: Today we just wait on any busy * locks... Perhaps we need to cancel converts in the * future? */ ocfs2_wait_on_busy_lock(lockres); spin_lock(&lockres->l_lock); } if (dcb) dcb->drop_func(lockres, dcb->drop_data); /* This will drop the spinlock for us. Dur de dur, at least we * keep the ugliness in one place :) */ return __ocfs2_drop_lock(osb, lockres);}/* Mark the lockres as being dropped. It will no longer be * queued if blocking, but we still may have to wait on it * being dequeued from the vote thread before we can consider * it safe to drop. * * You can *not* attempt to call cluster_lock on this lockres anymore. */void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres){ int status; struct ocfs2_status_completion sc; struct ocfs2_lockres_flag_callback fcb; ocfs2_init_completion_fcb(&fcb, &sc); spin_lock(&lockres->l_lock); lockres->l_flags |= OCFS2_LOCK_FREEING; while (lockres->l_flags & OCFS2_LOCK_QUEUED) { lockres_add_flag_callback(lockres, &fcb, OCFS2_LOCK_QUEUED, 0); spin_unlock(&lockres->l_lock); mlog(0, "Waiting on lockres %s\n", lockres->l_name); status = ocfs2_wait_for_status_completion(&sc); if (status) mlog_errno(status); spin_lock(&lockres->l_lock); } spin_unlock(&lockres->l_lock);}static void ocfs2_drop_osb_locks(struct ocfs2_super *osb){ int status; mlog_entry_void(); ocfs2_mark_lockres_freeing(&osb->osb_super_lockres); status = ocfs2_drop_lock(osb, &osb->osb_super_lockres, NULL); if (status < 0) mlog_errno(status); ocfs2_mark_lockres_freeing(&osb->osb_rename_lockres); status = ocfs2_drop_lock(osb, &osb->osb_rename_lockres, NULL); if (status < 0) mlog_errno(status); mlog_exit(status);}static void ocfs2_meta_pre_drop(struct ocfs2_lock_res *lockres, void *data){ struct inode *inode = data; /* the metadata lock requires a bit more work as we have an * LVB to worry about. */ if (lockres->l_flags & OCFS2_LOCK_ATTACHED && lockres->l_level == LKM_EXMODE && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) __ocfs2_stuff_meta_lvb(inode);}int ocfs2_drop_inode_locks(struct inode *inode){ int status, err; struct drop_lock_cb meta_dcb = { ocfs2_meta_pre_drop, inode, }; mlog_entry_void(); /* No need to call ocfs2_mark_lockres_freeing here - * ocfs2_clear_inode has done it for us. */ err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), &OCFS2_I(inode)->ip_data_lockres, NULL); if (err < 0) mlog_errno(err); status = err; err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb), &OCFS2_I(inode)->ip_meta_lockres, &meta_dcb); if (err < 0) mlog_errno(err); if (err < 0 && !status) status = err; mlog_exit(status); return status;}/* called with the spinlock held, and WILL drop it. */static int __ocfs2_downconvert_lock(struct ocfs2_super *osb, struct ocfs2_lock_res *lockres, int new_level, int lvb){ int ret, flags = LKM_CONVERT; enum dlm_status status; mlog_entry_void(); BUG_ON(lockres->l_blocking <= LKM_NLMODE); if (lockres->l_level <= new_level) { mlog(ML_ERROR, "lockres->l_level (%u) <= new_level (%u)\n", lockres->l_level, new_level); BUG(); } mlog(0, "lock %s, new_level = %d, l_blocking = %d, lvb = %d\n", lockres->l_name, new_level, lockres->l_blocking, lvb); lockres->l_action = OCFS2_AST_DOWNCONVERT; lockres->l_requested = new_level; lockres_or_flags(lockres, OCFS2_LOCK_BUSY); spin_unlock(&lockres->l_lock); if (lvb) flags |= LKM_VALBLK; status = dlmlock(osb->dlm, new_level, &lockres->l_lksb, flags, lockres->l_name, lockres->l_ops->ast, lockres, lockres->l_ops->bast); if (status != DLM_NORMAL) { ocfs2_log_dlm_error("dlmlock", status, lockres); ret = -EINVAL; ocfs2_recover_from_dlm_error(lockres, 1); goto bail; } ret = 0;bail: mlog_exit(ret); return ret;}/* called with the spinlock held, and WILL drop it. */static int __ocfs2_cancel_convert(struct ocfs2_super *osb, struct ocfs2_lock_res *lockres){ int ret; enum dlm_status status; mlog_entry_void(); mlog(0, "lock %s\n", lockres->l_name); /* were we in a convert when we got the bast fire? */ BUG_ON(lockres->l_action != OCFS2_AST_CONVERT && lockres->l_action != OCFS2_AST_DOWNCONVERT); /* set things up for the unlockast to know to just * clear out the ast_action and unset busy, etc. */ lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT; mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY), "lock %s, invalid flags: 0x%lx\n", lockres->l_name, lockres->l_flags); spin_unlock(&lockres->l_lock); ret = 0; status = dlmunlock(osb->dlm, &lockres->l_lksb, LKM_CANCEL, lockres->l_ops->unlock_ast, lockres); if (status != DLM_NORMAL) { ocfs2_log_dlm_error("dlmunlock", status, lockres); ret = -EINVAL; ocfs2_recover_from_dlm_error(lockres, 0); } mlog(0, "lock %s return from dlmunlock\n", lockres->l_name); mlog_exit(ret); return ret;}static int ocfs2_cancel_convert(struct ocfs2_super *osb, struct ocfs2_lock_res *lockres){ assert_spin_locked(&lockres->l_lock); if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) { /* If we're already trying to cancel a lock conversion * then just drop the spinlock and allow the caller to * requeue this lock. */ spin_unlock(&lockres->l_lock); mlog(0, "Lockres %s, skip convert\n", lockres->l_name); return 0; } /* this will drop the spinlock for us. */ return __ocfs2_cancel_convert(osb, lockres);}static inline int ocfs2_can_downconvert_meta_lock(struct inode *inode, struct ocfs2_lock_res *lockres, int new_level){ int ret; mlog_entry_void(); BUG_ON(new_level != LKM_NLMODE && new_level != LKM_PRMODE); if (lockres->l_flags & OCFS2_LOCK_REFRESHING) { ret = 0; mlog(0, "lockres %s currently being refreshed -- backing " "off!\n", lockres->l_name); } else if (new_level == LKM_PRMODE) ret = !lockres->l_ex_holders && ocfs2_inode_fully_checkpointed(inode); else /* Must be NLMODE we're converting to. */ ret = !lockres->l_ro_holders && !lockres->l_ex_holders && ocfs2_inode_fully_checkpointed(inode); mlog_exit(ret); return ret;}static int ocfs2_do_unblock_meta(struct inode *inode, int *requeue){ int new_level; int set_lvb = 0; int ret = 0; struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_meta_lockres; struct ocfs2_super *osb = OCFS2_SB(inode->i_sb); mlog_entry_void(); spin_lock(&lockres->l_lock); BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); mlog(0, "l_level=%d, l_blocking=%d\n", lockres->l_level, lockres->l_blocking); BUG_ON(lockres->l_level != LKM_EXMODE && lockres->l_level != LKM_PRMODE); if (lockres->l_flags & OCFS2_LOCK_BUSY) { *requeue = 1; ret = ocfs2_cancel_convert(osb, lockres); if (ret < 0) mlog_errno(ret); goto leave; } new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking); mlog(0, "l_level=%d, l_blocking=%d, new_level=%d\n", lockres->l_level, lockres->l_blocking, new_level); if (ocfs2_can_downconvert_meta_lock(inode, lockres, new_level)) { if (lockres->l_level == LKM_EXMODE) set_lvb = 1; /* If the lock hasn't been refreshed yet (rare), then * our memory inode values are old and we skip * stuffing the lvb. There's no need to actually clear * out the lvb here as it's value is still valid. */ if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) { if (set_lvb) __ocfs2_stuff_meta_lvb(inode); } else mlog(0, "lockres %s: downconverting stale lock!\n", lockres->l_name); mlog(0, "calling __ocfs2_downconvert_lock with " "l
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -