📄 dlmglue.c
字号:
mlog_entry_void(); inode = ocfs2_lock_res_inode(lockres); mlog(0, "AST fired for inode %"MLFu64", l_action = %u, type = %s\n", OCFS2_I(inode)->ip_blkno, lockres->l_action, (lockres->l_type == OCFS2_LOCK_TYPE_META) ? "Meta" : "Data"); BUG_ON(!ocfs2_is_inode_lock(lockres)); spin_lock(&lockres->l_lock); lksb = &(lockres->l_lksb); if (lksb->status != DLM_NORMAL) { mlog(ML_ERROR, "ocfs2_inode_ast_func: lksb status value of %u " "on inode %"MLFu64"\n", lksb->status, OCFS2_I(inode)->ip_blkno); spin_unlock(&lockres->l_lock); mlog_exit_void(); return; } switch(lockres->l_action) { case OCFS2_AST_ATTACH: ocfs2_generic_handle_attach_action(lockres); lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL); break; case OCFS2_AST_CONVERT: ocfs2_generic_handle_convert_action(lockres); break; case OCFS2_AST_DOWNCONVERT: ocfs2_generic_handle_downconvert_action(lockres); break; default: mlog(ML_ERROR, "lockres %s: ast fired with invalid action: %u " "lockres flags = 0x%lx, unlock action: %u\n", lockres->l_name, lockres->l_action, lockres->l_flags, lockres->l_unlock_action); BUG(); } /* data locking ignores refresh flag for now. */ if (lockres->l_type == OCFS2_LOCK_TYPE_DATA) lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); /* set it to something invalid so if we get called again we * can catch it. */ lockres->l_action = OCFS2_AST_INVALID; spin_unlock(&lockres->l_lock); wake_up(&lockres->l_event); mlog_exit_void();}static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level){ int needs_downconvert = 0; mlog_entry_void(); assert_spin_locked(&lockres->l_lock); lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED); if (level > lockres->l_blocking) { /* only schedule a downconvert if we haven't already scheduled * one that goes low enough to satisfy the level we're * blocking. this also catches the case where we get * duplicate BASTs */ if (ocfs2_highest_compat_lock_level(level) < ocfs2_highest_compat_lock_level(lockres->l_blocking)) needs_downconvert = 1; lockres->l_blocking = level; } mlog_exit(needs_downconvert); return needs_downconvert;}static void ocfs2_generic_bast_func(struct ocfs2_super *osb, struct ocfs2_lock_res *lockres, int level){ int needs_downconvert; mlog_entry_void(); BUG_ON(level <= LKM_NLMODE); spin_lock(&lockres->l_lock); needs_downconvert = ocfs2_generic_handle_bast(lockres, level); if (needs_downconvert) ocfs2_schedule_blocked_lock(osb, lockres); spin_unlock(&lockres->l_lock); ocfs2_kick_vote_thread(osb); wake_up(&lockres->l_event); mlog_exit_void();}static void ocfs2_inode_bast_func(void *opaque, int level){ struct ocfs2_lock_res *lockres = opaque; struct inode *inode; struct ocfs2_super *osb; mlog_entry_void(); BUG_ON(!ocfs2_is_inode_lock(lockres)); inode = ocfs2_lock_res_inode(lockres); osb = OCFS2_SB(inode->i_sb); mlog(0, "BAST fired for inode %"MLFu64", blocking = %d, level = %d " "type = %s\n", OCFS2_I(inode)->ip_blkno, level, lockres->l_level, (lockres->l_type == OCFS2_LOCK_TYPE_META) ? "Meta" : "Data"); ocfs2_generic_bast_func(osb, lockres, level); mlog_exit_void();}static void ocfs2_generic_ast_func(struct ocfs2_lock_res *lockres, int ignore_refresh){ struct dlm_lockstatus *lksb = &lockres->l_lksb; spin_lock(&lockres->l_lock); if (lksb->status != DLM_NORMAL) { mlog(ML_ERROR, "lockres %s: lksb status value of %u!\n", lockres->l_name, lksb->status); spin_unlock(&lockres->l_lock); return; } switch(lockres->l_action) { case OCFS2_AST_ATTACH: ocfs2_generic_handle_attach_action(lockres); break; case OCFS2_AST_CONVERT: ocfs2_generic_handle_convert_action(lockres); break; case OCFS2_AST_DOWNCONVERT: ocfs2_generic_handle_downconvert_action(lockres); break; default: BUG(); } if (ignore_refresh) lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH); /* set it to something invalid so if we get called again we * can catch it. */ lockres->l_action = OCFS2_AST_INVALID; spin_unlock(&lockres->l_lock); wake_up(&lockres->l_event);}static void ocfs2_super_ast_func(void *opaque){ struct ocfs2_lock_res *lockres = opaque; mlog_entry_void(); mlog(0, "Superblock AST fired\n"); BUG_ON(!ocfs2_is_super_lock(lockres)); ocfs2_generic_ast_func(lockres, 0); mlog_exit_void();}static void ocfs2_super_bast_func(void *opaque, int level){ struct ocfs2_lock_res *lockres = opaque; struct ocfs2_super *osb; mlog_entry_void(); mlog(0, "Superblock BAST fired\n"); BUG_ON(!ocfs2_is_super_lock(lockres)); osb = ocfs2_lock_res_super(lockres); ocfs2_generic_bast_func(osb, lockres, level); mlog_exit_void();}static void ocfs2_rename_ast_func(void *opaque){ struct ocfs2_lock_res *lockres = opaque; mlog_entry_void(); mlog(0, "Rename AST fired\n"); BUG_ON(!ocfs2_is_rename_lock(lockres)); ocfs2_generic_ast_func(lockres, 1); mlog_exit_void();}static void ocfs2_rename_bast_func(void *opaque, int level){ struct ocfs2_lock_res *lockres = opaque; struct ocfs2_super *osb; mlog_entry_void(); mlog(0, "Rename BAST fired\n"); BUG_ON(!ocfs2_is_rename_lock(lockres)); osb = ocfs2_lock_res_super(lockres); ocfs2_generic_bast_func(osb, lockres, level); mlog_exit_void();}static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres, int convert){ mlog_entry_void(); spin_lock(&lockres->l_lock); lockres_clear_flags(lockres, OCFS2_LOCK_BUSY); if (convert) lockres->l_action = OCFS2_AST_INVALID; else lockres->l_unlock_action = OCFS2_UNLOCK_INVALID; spin_unlock(&lockres->l_lock); wake_up(&lockres->l_event); mlog_exit_void();}/* Note: If we detect another process working on the lock (i.e., * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller * to do the right thing in that case. */static int ocfs2_lock_create(struct ocfs2_super *osb, struct ocfs2_lock_res *lockres, int level, int flags){ int ret = 0; enum dlm_status status; mlog_entry_void(); mlog(0, "lock %s, level = %d, flags = %d\n", lockres->l_name, level, flags); spin_lock(&lockres->l_lock); if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) || (lockres->l_flags & OCFS2_LOCK_BUSY)) { spin_unlock(&lockres->l_lock); goto bail; } lockres->l_action = OCFS2_AST_ATTACH; lockres->l_requested = level; lockres_or_flags(lockres, OCFS2_LOCK_BUSY); spin_unlock(&lockres->l_lock); status = dlmlock(osb->dlm, level, &lockres->l_lksb, flags, lockres->l_name, lockres->l_ops->ast, lockres, lockres->l_ops->bast); if (status != DLM_NORMAL) { ocfs2_log_dlm_error("dlmlock", status, lockres); ret = -EINVAL; ocfs2_recover_from_dlm_error(lockres, 1); } mlog(0, "lock %s, successfull return from dlmlock\n", lockres->l_name);bail: mlog_exit(ret); return ret;}static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres, int flag){ int ret; spin_lock(&lockres->l_lock); ret = lockres->l_flags & flag; spin_unlock(&lockres->l_lock); return ret;}static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres){ wait_event(lockres->l_event, !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY));}static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres){ wait_event(lockres->l_event, !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING));}static void lockres_add_flag_callback(struct ocfs2_lock_res *lockres, struct ocfs2_lockres_flag_callback *fcb, unsigned long mask, unsigned long goal){ BUG_ON(!list_empty(&fcb->fc_lockres_item)); BUG_ON(fcb->fc_cb == NULL); assert_spin_locked(&lockres->l_lock); list_add_tail(&fcb->fc_lockres_item, &lockres->l_flag_cb_list); fcb->fc_flag_mask = mask; fcb->fc_flag_goal = goal;}/* predict what lock level we'll be dropping down to on behalf * of another node, and return true if the currently wanted * level will be compatible with it. */static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres, int wanted){ BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED)); return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking);}/* these are generic and could be used elsewhere */struct ocfs2_status_completion { int sc_status; struct completion sc_complete;};static void ocfs2_status_completion_cb(int rc, unsigned long data){ struct ocfs2_status_completion *sc; sc = (struct ocfs2_status_completion *)data; sc->sc_status = rc; complete(&sc->sc_complete);}static int ocfs2_wait_for_status_completion(struct ocfs2_status_completion *sc){ wait_for_completion(&sc->sc_complete); /* Re-arm the completion in case we want to wait on it again */ INIT_COMPLETION(sc->sc_complete); return sc->sc_status;}static void ocfs2_init_fcb(struct ocfs2_lockres_flag_callback *fcb, ocfs2_lock_callback cb, unsigned long cb_data, int stack_allocated){ fcb->fc_cb = cb; fcb->fc_data = cb_data; fcb->fc_free_once_called = !stack_allocated; INIT_LIST_HEAD(&fcb->fc_lockres_item);}/* Init a stack allocated FCB and an ocfs2_status_completion together. */static void ocfs2_init_completion_fcb(struct ocfs2_lockres_flag_callback *fcb, struct ocfs2_status_completion *sc){ init_completion(&sc->sc_complete); ocfs2_init_fcb(fcb, ocfs2_status_completion_cb, (unsigned long) sc, 1);}static int ocfs2_cluster_lock(struct ocfs2_super *osb, struct ocfs2_lock_res *lockres, int level, int lkm_flags, ocfs2_lock_callback cb, unsigned long cb_data){ struct ocfs2_lockres_flag_callback sync_fcb, *fcb; struct ocfs2_status_completion sc; enum dlm_status status; int ret; int catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR); int sync = 1; mlog_entry_void(); if (cb != NULL) { fcb = kmalloc(sizeof(*fcb), GFP_NOFS); if (fcb == NULL) { ret = -ENOMEM; goto out; } ocfs2_init_fcb(fcb, cb, cb_data, 0); /* A callback passed in means we'll assume async * behavior - no waiting on dlm operations will be * done here and the allocated fcb will call the * callback when done. */ sync = 0; } else { /* No callback passed which means the caller wants * synchronous behavior - we avoid kmalloc and use a * stack allocated fcb for this. The status completion * helpers defined above come in handy here. */ fcb = &sync_fcb; ocfs2_init_completion_fcb(fcb, &sc); }again: if (catch_signals && signal_pending(current)) { ret = -ERESTARTSYS; goto out; } spin_lock(&lockres->l_lock); mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING, "Cluster lock called on freeing lockres %s! flags 0x%lx\n", lockres->l_name, lockres->l_flags); /* We only compare against the currently granted level * here. If the lock is blocked waiting on a downconvert, * we'll get caught below. */ if (lockres->l_flags & OCFS2_LOCK_BUSY && level > lockres->l_level) { /* is someone sitting in dlm_lock? If so, wait on * them. */ lockres_add_flag_callback(lockres, fcb, OCFS2_LOCK_BUSY, 0); ret = -EIOCBRETRY; goto unlock; } if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) { /* lock has not been created yet. */ spin_unlock(&lockres->l_lock); ret = ocfs2_lock_create(osb, lockres, LKM_NLMODE, 0); if (ret < 0) { mlog_errno(ret); goto out; } goto again; } if (lockres->l_flags & OCFS2_LOCK_BLOCKED && !ocfs2_may_continue_on_blocked_lock(lockres, level)) { /* is the lock is currently blocked on behalf of * another node */ lockres_add_flag_callback(lockres, fcb, OCFS2_LOCK_BLOCKED, 0); ret = -EIOCBRETRY; goto unlock; } if (level > lockres->l_level) { if (lockres->l_action != OCFS2_AST_INVALID) mlog(ML_ERROR, "lockres %s has action %u pending\n", lockres->l_name, lockres->l_action); lockres->l_action = OCFS2_AST_CONVERT; lockres->l_requested = level; lockres_or_flags(lockres, OCFS2_LOCK_BUSY); spin_unlock(&lockres->l_lock); BUG_ON(level == LKM_IVMODE); BUG_ON(level == LKM_NLMODE); mlog(0, "lock %s, convert from %d to level = %d\n", lockres->l_name, lockres->l_level, level); /* call dlm_lock to upgrade lock now */ status = dlmlock(osb->dlm, level, &lockres->l_lksb, lkm_flags|LKM_CONVERT|LKM_VALBLK, lockres->l_name, lockres->l_ops->ast, lockres, lockres->l_ops->bast); if (status != DLM_NORMAL) { if ((lkm_flags & LKM_NOQUEUE) && (status == DLM_NOTQUEUED)) ret = -EAGAIN; else {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -