⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dlmrecovery.c

📁 linux 内核源代码
💻 C
📖 第 1 页 / 共 5 页
字号:
				     dlm_get_lock_cookie_node(be64_to_cpu(c)),				     dlm_get_lock_cookie_seq(be64_to_cpu(c)));				__dlm_print_one_lock_resource(res);				BUG();			}			BUG_ON(lock->ml.node != ml->node);			if (tmpq != queue) {				mlog(0, "lock was on %u instead of %u for %.*s\n",				     j, ml->list, res->lockname.len, res->lockname.name);				spin_unlock(&res->spinlock);				continue;			}			/* see NOTE above about why we do not update			 * to match the master here */			/* move the lock to its proper place */			/* do not alter lock refcount.  switching lists. */			list_move_tail(&lock->list, queue);			spin_unlock(&res->spinlock);			added++;			mlog(0, "just reordered a local lock!\n");			continue;		}		/* lock is for another node. */		newlock = dlm_new_lock(ml->type, ml->node,				       be64_to_cpu(ml->cookie), NULL);		if (!newlock) {			ret = -ENOMEM;			goto leave;		}		lksb = newlock->lksb;		dlm_lock_attach_lockres(newlock, res);		if (ml->convert_type != LKM_IVMODE) {			BUG_ON(queue != &res->converting);			newlock->ml.convert_type = ml->convert_type;		}		lksb->flags |= (ml->flags &				(DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB));		if (ml->type == LKM_NLMODE)			goto skip_lvb;		if (!dlm_lvb_is_empty(mres->lvb)) {			if (lksb->flags & DLM_LKSB_PUT_LVB) {				/* other node was trying to update				 * lvb when node died.  recreate the				 * lksb with the updated lvb. */				memcpy(lksb->lvb, mres->lvb, DLM_LVB_LEN);				/* the lock resource lvb update must happen				 * NOW, before the spinlock is dropped.				 * we no longer wait for the AST to update				 * the lvb. */				memcpy(res->lvb, mres->lvb, DLM_LVB_LEN);			} else {				/* otherwise, the node is sending its 				 * most recent valid lvb info */				BUG_ON(ml->type != LKM_EXMODE &&				       ml->type != LKM_PRMODE);				if (!dlm_lvb_is_empty(res->lvb) && 				    (ml->type == LKM_EXMODE || 				     memcmp(res->lvb, mres->lvb, DLM_LVB_LEN))) { 					int i; 					mlog(ML_ERROR, "%s:%.*s: received bad " 					     "lvb! type=%d\n", dlm->name, 					     res->lockname.len, 					     res->lockname.name, ml->type); 					printk("lockres lvb=["); 					for (i=0; i<DLM_LVB_LEN; i++) 						printk("%02x", res->lvb[i]); 					printk("]\nmigrated lvb=["); 					for (i=0; i<DLM_LVB_LEN; i++) 						printk("%02x", mres->lvb[i]); 					printk("]\n"); 					dlm_print_one_lock_resource(res); 					BUG();				}				memcpy(res->lvb, mres->lvb, DLM_LVB_LEN);			}		}skip_lvb:		/* NOTE:		 * wrt lock queue ordering and recovery:		 *    1. order of locks on granted queue is		 *       meaningless.		 *    2. order of locks on converting queue is		 *       LOST with the node death.  sorry charlie.		 *    3. order of locks on the blocked queue is		 *       also LOST.		 * order of locks does not affect integrity, it		 * just means that a lock request may get pushed		 * back in line as a result of the node death.		 * also note that for a given node the lock order		 * for its secondary queue locks is preserved		 * relative to each other, but clearly *not*		 * preserved relative to locks from other nodes.		 */		bad = 0;		spin_lock(&res->spinlock);		list_for_each_entry(lock, queue, list) {			if (lock->ml.cookie == ml->cookie) {				__be64 c = lock->ml.cookie;				mlog(ML_ERROR, "%s:%.*s: %u:%llu: lock already "				     "exists on this lockres!\n", dlm->name,				     res->lockname.len, res->lockname.name,				     dlm_get_lock_cookie_node(be64_to_cpu(c)),				     dlm_get_lock_cookie_seq(be64_to_cpu(c)));				mlog(ML_NOTICE, "sent lock: type=%d, conv=%d, "				     "node=%u, cookie=%u:%llu, queue=%d\n",	      			     ml->type, ml->convert_type, ml->node,				     dlm_get_lock_cookie_node(be64_to_cpu(ml->cookie)),				     dlm_get_lock_cookie_seq(be64_to_cpu(ml->cookie)),				     ml->list);				__dlm_print_one_lock_resource(res);				bad = 1;				break;			}		}		if (!bad) {			dlm_lock_get(newlock);			list_add_tail(&newlock->list, queue);			mlog(0, "%s:%.*s: added lock for node %u, "			     "setting refmap bit\n", dlm->name,			     res->lockname.len, res->lockname.name, ml->node);			dlm_lockres_set_refmap_bit(ml->node, res);			added++;		}		spin_unlock(&res->spinlock);	}	mlog(0, "done running all the locks\n");leave:	/* balance the ref taken when the work was queued */	spin_lock(&res->spinlock);	dlm_lockres_drop_inflight_ref(dlm, res);	spin_unlock(&res->spinlock);	if (ret < 0) {		mlog_errno(ret);		if (newlock)			dlm_lock_put(newlock);	}	mlog_exit(ret);	return ret;}void dlm_move_lockres_to_recovery_list(struct dlm_ctxt *dlm,				       struct dlm_lock_resource *res){	int i;	struct list_head *queue;	struct dlm_lock *lock, *next;	res->state |= DLM_LOCK_RES_RECOVERING;	if (!list_empty(&res->recovering)) {		mlog(0,		     "Recovering res %s:%.*s, is already on recovery list!\n",		     dlm->name, res->lockname.len, res->lockname.name);		list_del_init(&res->recovering);	}	/* We need to hold a reference while on the recovery list */	dlm_lockres_get(res);	list_add_tail(&res->recovering, &dlm->reco.resources);	/* find any pending locks and put them back on proper list */	for (i=DLM_BLOCKED_LIST; i>=DLM_GRANTED_LIST; i--) {		queue = dlm_list_idx_to_ptr(res, i);		list_for_each_entry_safe(lock, next, queue, list) {			dlm_lock_get(lock);			if (lock->convert_pending) {				/* move converting lock back to granted */				BUG_ON(i != DLM_CONVERTING_LIST);				mlog(0, "node died with convert pending "				     "on %.*s. move back to granted list.\n",				     res->lockname.len, res->lockname.name);				dlm_revert_pending_convert(res, lock);				lock->convert_pending = 0;			} else if (lock->lock_pending) {				/* remove pending lock requests completely */				BUG_ON(i != DLM_BLOCKED_LIST);				mlog(0, "node died with lock pending "				     "on %.*s. remove from blocked list and skip.\n",				     res->lockname.len, res->lockname.name);				/* lock will be floating until ref in				 * dlmlock_remote is freed after the network				 * call returns.  ok for it to not be on any				 * list since no ast can be called				 * (the master is dead). */				dlm_revert_pending_lock(res, lock);				lock->lock_pending = 0;			} else if (lock->unlock_pending) {				/* if an unlock was in progress, treat as				 * if this had completed successfully				 * before sending this lock state to the				 * new master.  note that the dlm_unlock				 * call is still responsible for calling				 * the unlockast.  that will happen after				 * the network call times out.  for now,				 * just move lists to prepare the new				 * recovery master.  */				BUG_ON(i != DLM_GRANTED_LIST);				mlog(0, "node died with unlock pending "				     "on %.*s. remove from blocked list and skip.\n",				     res->lockname.len, res->lockname.name);				dlm_commit_pending_unlock(res, lock);				lock->unlock_pending = 0;			} else if (lock->cancel_pending) {				/* if a cancel was in progress, treat as				 * if this had completed successfully				 * before sending this lock state to the				 * new master */				BUG_ON(i != DLM_CONVERTING_LIST);				mlog(0, "node died with cancel pending "				     "on %.*s. move back to granted list.\n",				     res->lockname.len, res->lockname.name);				dlm_commit_pending_cancel(res, lock);				lock->cancel_pending = 0;			}			dlm_lock_put(lock);		}	}}/* removes all recovered locks from the recovery list. * sets the res->owner to the new master. * unsets the RECOVERY flag and wakes waiters. */static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,					      u8 dead_node, u8 new_master){	int i;	struct hlist_node *hash_iter;	struct hlist_head *bucket;	struct dlm_lock_resource *res, *next;	mlog_entry_void();	assert_spin_locked(&dlm->spinlock);	list_for_each_entry_safe(res, next, &dlm->reco.resources, recovering) {		if (res->owner == dead_node) {			list_del_init(&res->recovering);			spin_lock(&res->spinlock);			/* new_master has our reference from			 * the lock state sent during recovery */			dlm_change_lockres_owner(dlm, res, new_master);			res->state &= ~DLM_LOCK_RES_RECOVERING;			if (__dlm_lockres_has_locks(res))				__dlm_dirty_lockres(dlm, res);			spin_unlock(&res->spinlock);			wake_up(&res->wq);			dlm_lockres_put(res);		}	}	/* this will become unnecessary eventually, but	 * for now we need to run the whole hash, clear	 * the RECOVERING state and set the owner	 * if necessary */	for (i = 0; i < DLM_HASH_BUCKETS; i++) {		bucket = dlm_lockres_hash(dlm, i);		hlist_for_each_entry(res, hash_iter, bucket, hash_node) {			if (res->state & DLM_LOCK_RES_RECOVERING) {				if (res->owner == dead_node) {					mlog(0, "(this=%u) res %.*s owner=%u "					     "was not on recovering list, but "					     "clearing state anyway\n",					     dlm->node_num, res->lockname.len,					     res->lockname.name, new_master);				} else if (res->owner == dlm->node_num) {					mlog(0, "(this=%u) res %.*s owner=%u "					     "was not on recovering list, "					     "owner is THIS node, clearing\n",					     dlm->node_num, res->lockname.len,					     res->lockname.name, new_master);				} else					continue;				if (!list_empty(&res->recovering)) {					mlog(0, "%s:%.*s: lockres was "					     "marked RECOVERING, owner=%u\n",					     dlm->name, res->lockname.len,					     res->lockname.name, res->owner);					list_del_init(&res->recovering);					dlm_lockres_put(res);				}				spin_lock(&res->spinlock);				/* new_master has our reference from				 * the lock state sent during recovery */				dlm_change_lockres_owner(dlm, res, new_master);				res->state &= ~DLM_LOCK_RES_RECOVERING;				if (__dlm_lockres_has_locks(res))					__dlm_dirty_lockres(dlm, res);				spin_unlock(&res->spinlock);				wake_up(&res->wq);			}		}	}}static inline int dlm_lvb_needs_invalidation(struct dlm_lock *lock, int local){	if (local) {		if (lock->ml.type != LKM_EXMODE &&		    lock->ml.type != LKM_PRMODE)			return 1;	} else if (lock->ml.type == LKM_EXMODE)		return 1;	return 0;}static void dlm_revalidate_lvb(struct dlm_ctxt *dlm,			       struct dlm_lock_resource *res, u8 dead_node){	struct list_head *queue;	struct dlm_lock *lock;	int blank_lvb = 0, local = 0;	int i;	u8 search_node;	assert_spin_locked(&dlm->spinlock);	assert_spin_locked(&res->spinlock);	if (res->owner == dlm->node_num)		/* if this node owned the lockres, and if the dead node 		 * had an EX when he died, blank out the lvb */		search_node = dead_node;	else {		/* if this is a secondary lockres, and we had no EX or PR		 * locks granted, we can no longer trust the lvb */		search_node = dlm->node_num;		local = 1;  /* check local state for valid lvb */	}	for (i=DLM_GRANTED_LIST; i<=DLM_CONVERTING_LIST; i++) {		queue = dlm_list_idx_to_ptr(res, i);		list_for_each_entry(lock, queue, list) {			if (lock->ml.node == search_node) {				if (dlm_lvb_needs_invalidation(lock, local)) {					/* zero the lksb lvb and lockres lvb */					blank_lvb = 1;					memset(lock->lksb->lvb, 0, DLM_LVB_LEN);				}			}		}	}	if (blank_lvb) {		mlog(0, "clearing %.*s lvb, dead node %u had EX\n",		     res->lockname.len, res->lockname.name, dead_node);		memset(res->lvb, 0, DLM_LVB_LEN);	}}static void dlm_free_dead_locks(struct dlm_ctxt *dlm,				struct dlm_lock_resource *res, u8 dead_node){	struct dlm_lock *lock, *next;	unsigned int freed = 0;	/* this node is the lockres master:	 * 1) remove any stale locks for the dead node	 * 2) if the dead node had an EX when he died, blank out the lvb 	 */	assert_spin_locked(&dlm->spinlock);	assert_spin_locked(&res->spinlock);	/* TODO: check pending_asts, pending_basts here */	list_for_each_entry_safe(lock, next, &res->granted, list) {		if (lock->ml.node == dead_node) {			list_del_init(&lock->list);			dlm_lock_put(lock);			freed++;		}	}	list_for_each_entry_safe(lock, next, &res->converting, list) {		if (lock->ml.node == dead_node) {			list_del_init(&lock->list);			dlm_lock_put(lock);			freed++;		}	}	list_for_each_entry_safe(lock, next, &res->blocked, list) {		if (lock->ml.node == dead_node) {			list_del_init(&lock->list);			dlm_lock_put(lock);			freed++;		}	}	if (freed) {		mlog(0, "%s:%.*s: freed %u locks for dead node %u, "		     "dropping ref from lockres\n", dlm->name,		     res->lockname.len, res->lockname.name, freed, dead_node);		BUG_ON(!test_bit(dead_node, res->refmap));		dlm_lockres_clear_refmap_bit(dead_node, res);	} else if (test_bit(dead_node, res->refmap)) {		mlog(0, "%s:%.*s: dead node %u had a ref, but had "		     "no locks and had not purged before dying\n", dlm->name,		     res->lockname.len, res->lockname.name, dead_node);		dlm_lockres_clear_refmap_bit(dead_node, res);	}	/* do not kick thread yet */	__dlm_dirty_lockres(dlm, res);}/* if this node is the recovery master, and there are no * locks for a given lockres owned by this node that are in * either PR or EX mode, zero out the lvb before requesting. * */static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node){	struct hlist_node *iter;	struct dlm_lock_resource *res;	int i;	struct hlist_head *bucket;	struct dlm_lock *lock;	/* purge any stale mles */	dlm_clean_master_list(dlm, dead_node);	/*	 * now clean up all lock resources.  there are two rules:	 *	 * 1) if the dead node was the master, move the lockre

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -