⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dlmrecovery.c

📁 linux2.6.16版本
💻 C
📖 第 1 页 / 共 5 页
字号:
	int nodenum;	int ret = 0;	*real_master = DLM_LOCK_RES_OWNER_UNKNOWN;	/* we only reach here if one of the two nodes in a	 * migration died while the migration was in progress.	 * at this point we need to requery the master.  we	 * know that the new_master got as far as creating	 * an mle on at least one node, but we do not know	 * if any nodes had actually cleared the mle and set	 * the master to the new_master.  the old master	 * is supposed to set the owner to UNKNOWN in the	 * event of a new_master death, so the only possible	 * responses that we can get from nodes here are	 * that the master is new_master, or that the master	 * is UNKNOWN.	 * if all nodes come back with UNKNOWN then we know	 * the lock needs remastering here.	 * if any node comes back with a valid master, check	 * to see if that master is the one that we are	 * recovering.  if so, then the new_master died and	 * we need to remaster this lock.  if not, then the	 * new_master survived and that node will respond to	 * other nodes about the owner.	 * if there is an owner, this node needs to dump this	 * lockres and alert the sender that this lockres	 * was rejected. */	spin_lock(&dlm->spinlock);	dlm_node_iter_init(dlm->domain_map, &iter);	spin_unlock(&dlm->spinlock);	while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {		/* do not send to self */		if (nodenum == dlm->node_num)			continue;		ret = dlm_do_master_requery(dlm, res, nodenum, real_master);		if (ret < 0) {			mlog_errno(ret);			BUG();			/* TODO: need to figure a way to restart this */		}		if (*real_master != DLM_LOCK_RES_OWNER_UNKNOWN) {			mlog(0, "lock master is %u\n", *real_master);			break;		}	}	return ret;}static int dlm_do_master_requery(struct dlm_ctxt *dlm,				 struct dlm_lock_resource *res,				 u8 nodenum, u8 *real_master){	int ret = -EINVAL;	struct dlm_master_requery req;	int status = DLM_LOCK_RES_OWNER_UNKNOWN;	memset(&req, 0, sizeof(req));	req.node_idx = dlm->node_num;	req.namelen = res->lockname.len;	memcpy(req.name, res->lockname.name, res->lockname.len);	ret = o2net_send_message(DLM_MASTER_REQUERY_MSG, dlm->key,				 &req, sizeof(req), nodenum, &status);	/* XXX: negative status not handled properly here. */	if (ret < 0)		mlog_errno(ret);	else {		BUG_ON(status < 0);		BUG_ON(status > DLM_LOCK_RES_OWNER_UNKNOWN);		*real_master = (u8) (status & 0xff);		mlog(0, "node %u responded to master requery with %u\n",			  nodenum, *real_master);		ret = 0;	}	return ret;}/* this function cannot error, so unless the sending * or receiving of the message failed, the owner can * be trusted */int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data){	struct dlm_ctxt *dlm = data;	struct dlm_master_requery *req = (struct dlm_master_requery *)msg->buf;	struct dlm_lock_resource *res = NULL;	int master = DLM_LOCK_RES_OWNER_UNKNOWN;	u32 flags = DLM_ASSERT_MASTER_REQUERY;	if (!dlm_grab(dlm)) {		/* since the domain has gone away on this		 * node, the proper response is UNKNOWN */		return master;	}	spin_lock(&dlm->spinlock);	res = __dlm_lookup_lockres(dlm, req->name, req->namelen);	if (res) {		spin_lock(&res->spinlock);		master = res->owner;		if (master == dlm->node_num) {			int ret = dlm_dispatch_assert_master(dlm, res,							     0, 0, flags);			if (ret < 0) {				mlog_errno(-ENOMEM);				/* retry!? */				BUG();			}		}		spin_unlock(&res->spinlock);	}	spin_unlock(&dlm->spinlock);	dlm_put(dlm);	return master;}static inline struct list_head *dlm_list_num_to_pointer(struct dlm_lock_resource *res, int list_num){	struct list_head *ret;	BUG_ON(list_num < 0);	BUG_ON(list_num > 2);	ret = &(res->granted);	ret += list_num;	return ret;}/* TODO: do ast flush business * TODO: do MIGRATING and RECOVERING spinning *//** NOTE about in-flight requests during migration:** Before attempting the migrate, the master has marked the lockres as* MIGRATING and then flushed all of its pending ASTS.  So any in-flight* requests either got queued before the MIGRATING flag got set, in which* case the lock data will reflect the change and a return message is on* the way, or the request failed to get in before MIGRATING got set.  In* this case, the caller will be told to spin and wait for the MIGRATING* flag to be dropped, then recheck the master.* This holds true for the convert, cancel and unlock cases, and since lvb* updates are tied to these same messages, it applies to lvb updates as* well.  For the lock case, there is no way a lock can be on the master* queue and not be on the secondary queue since the lock is always added* locally first.  This means that the new target node will never be sent* a lock that he doesn't already have on the list.* In total, this means that the local lock is correct and should not be* updated to match the one sent by the master.  Any messages sent back* from the master before the MIGRATING flag will bring the lock properly* up-to-date, and the change will be ordered properly for the waiter.* We will *not* attempt to modify the lock underneath the waiter.*/static int dlm_process_recovery_data(struct dlm_ctxt *dlm,				     struct dlm_lock_resource *res,				     struct dlm_migratable_lockres *mres){	struct dlm_migratable_lock *ml;	struct list_head *queue;	struct dlm_lock *newlock = NULL;	struct dlm_lockstatus *lksb = NULL;	int ret = 0;	int i;	struct list_head *iter;	struct dlm_lock *lock = NULL;	mlog(0, "running %d locks for this lockres\n", mres->num_locks);	for (i=0; i<mres->num_locks; i++) {		ml = &(mres->ml[i]);		BUG_ON(ml->highest_blocked != LKM_IVMODE);		newlock = NULL;		lksb = NULL;		queue = dlm_list_num_to_pointer(res, ml->list);		/* if the lock is for the local node it needs to		 * be moved to the proper location within the queue.		 * do not allocate a new lock structure. */		if (ml->node == dlm->node_num) {			/* MIGRATION ONLY! */			BUG_ON(!(mres->flags & DLM_MRES_MIGRATION));			spin_lock(&res->spinlock);			list_for_each(iter, queue) {				lock = list_entry (iter, struct dlm_lock, list);				if (lock->ml.cookie != ml->cookie)					lock = NULL;				else					break;			}			/* lock is always created locally first, and			 * destroyed locally last.  it must be on the list */			if (!lock) {				mlog(ML_ERROR, "could not find local lock "					       "with cookie %"MLFu64"!\n",				     ml->cookie);				BUG();			}			BUG_ON(lock->ml.node != ml->node);			/* see NOTE above about why we do not update			 * to match the master here */			/* move the lock to its proper place */			/* do not alter lock refcount.  switching lists. */			list_del_init(&lock->list);			list_add_tail(&lock->list, queue);			spin_unlock(&res->spinlock);			mlog(0, "just reordered a local lock!\n");			continue;		}		/* lock is for another node. */		newlock = dlm_new_lock(ml->type, ml->node,				       be64_to_cpu(ml->cookie), NULL);		if (!newlock) {			ret = -ENOMEM;			goto leave;		}		lksb = newlock->lksb;		dlm_lock_attach_lockres(newlock, res);		if (ml->convert_type != LKM_IVMODE) {			BUG_ON(queue != &res->converting);			newlock->ml.convert_type = ml->convert_type;		}		lksb->flags |= (ml->flags &				(DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB));					if (mres->lvb[0]) {			if (lksb->flags & DLM_LKSB_PUT_LVB) {				/* other node was trying to update				 * lvb when node died.  recreate the				 * lksb with the updated lvb. */				memcpy(lksb->lvb, mres->lvb, DLM_LVB_LEN);			} else {				/* otherwise, the node is sending its 				 * most recent valid lvb info */				BUG_ON(ml->type != LKM_EXMODE &&				       ml->type != LKM_PRMODE);				if (res->lvb[0] && (ml->type == LKM_EXMODE ||				    memcmp(res->lvb, mres->lvb, DLM_LVB_LEN))) {					mlog(ML_ERROR, "received bad lvb!\n");					__dlm_print_one_lock_resource(res);					BUG();				}				memcpy(res->lvb, mres->lvb, DLM_LVB_LEN);			}		}		/* NOTE:		 * wrt lock queue ordering and recovery:		 *    1. order of locks on granted queue is		 *       meaningless.		 *    2. order of locks on converting queue is		 *       LOST with the node death.  sorry charlie.		 *    3. order of locks on the blocked queue is		 *       also LOST.		 * order of locks does not affect integrity, it		 * just means that a lock request may get pushed		 * back in line as a result of the node death.		 * also note that for a given node the lock order		 * for its secondary queue locks is preserved		 * relative to each other, but clearly *not*		 * preserved relative to locks from other nodes.		 */		spin_lock(&res->spinlock);		dlm_lock_get(newlock);		list_add_tail(&newlock->list, queue);		spin_unlock(&res->spinlock);	}	mlog(0, "done running all the locks\n");leave:	if (ret < 0) {		mlog_errno(ret);		if (newlock)			dlm_lock_put(newlock);	}	mlog_exit(ret);	return ret;}void dlm_move_lockres_to_recovery_list(struct dlm_ctxt *dlm,				       struct dlm_lock_resource *res){	int i;	struct list_head *queue, *iter, *iter2;	struct dlm_lock *lock;	res->state |= DLM_LOCK_RES_RECOVERING;	if (!list_empty(&res->recovering))		list_del_init(&res->recovering);	list_add_tail(&res->recovering, &dlm->reco.resources);	/* find any pending locks and put them back on proper list */	for (i=DLM_BLOCKED_LIST; i>=DLM_GRANTED_LIST; i--) {		queue = dlm_list_idx_to_ptr(res, i);		list_for_each_safe(iter, iter2, queue) {			lock = list_entry (iter, struct dlm_lock, list);			dlm_lock_get(lock);			if (lock->convert_pending) {				/* move converting lock back to granted */				BUG_ON(i != DLM_CONVERTING_LIST);				mlog(0, "node died with convert pending "				     "on %.*s. move back to granted list.\n",				     res->lockname.len, res->lockname.name);				dlm_revert_pending_convert(res, lock);				lock->convert_pending = 0;			} else if (lock->lock_pending) {				/* remove pending lock requests completely */				BUG_ON(i != DLM_BLOCKED_LIST);				mlog(0, "node died with lock pending "				     "on %.*s. remove from blocked list and skip.\n",				     res->lockname.len, res->lockname.name);				/* lock will be floating until ref in				 * dlmlock_remote is freed after the network				 * call returns.  ok for it to not be on any				 * list since no ast can be called				 * (the master is dead). */				dlm_revert_pending_lock(res, lock);				lock->lock_pending = 0;			} else if (lock->unlock_pending) {				/* if an unlock was in progress, treat as				 * if this had completed successfully				 * before sending this lock state to the				 * new master.  note that the dlm_unlock				 * call is still responsible for calling				 * the unlockast.  that will happen after				 * the network call times out.  for now,				 * just move lists to prepare the new				 * recovery master.  */				BUG_ON(i != DLM_GRANTED_LIST);				mlog(0, "node died with unlock pending "				     "on %.*s. remove from blocked list and skip.\n",				     res->lockname.len, res->lockname.name);				dlm_commit_pending_unlock(res, lock);				lock->unlock_pending = 0;			} else if (lock->cancel_pending) {				/* if a cancel was in progress, treat as				 * if this had completed successfully				 * before sending this lock state to the				 * new master */				BUG_ON(i != DLM_CONVERTING_LIST);				mlog(0, "node died with cancel pending "				     "on %.*s. move back to granted list.\n",				     res->lockname.len, res->lockname.name);				dlm_commit_pending_cancel(res, lock);				lock->cancel_pending = 0;			}			dlm_lock_put(lock);		}	}}/* removes all recovered locks from the recovery list. * sets the res->owner to the new master. * unsets the RECOVERY flag and wakes waiters. */static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm,					      u8 dead_node, u8 new_master){	int i;	struct list_head *iter, *iter2;	struct hlist_node *hash_iter;	struct hlist_head *bucket;	struct dlm_lock_resource *res;	mlog_entry_void();	assert_spin_locked(&dlm->spinlock);	list_for_each_safe(iter, iter2, &dlm->reco.resources) {		res = list_entry (iter, struct dlm_lock_resource, recovering);		if (res->owner == dead_node) {			list_del_init(&res->recovering);			spin_lock(&res->spinlock);			dlm_change_lockres_owner(dlm, res, new_master);			res->state &= ~DLM_LOCK_RES_RECOVERING;			__dlm_dirty_lockres(dlm, res);			spin_unlock(&res->spinlock);			wake_up(&res->wq);		}	}	/* this will become unnecessary eventually, but	 * for now we need to run the whole hash, clear	 * the RECOVERING state and set the owner	 * if necessary */	for (i = 0; i < DLM_HASH_BUCKETS; i++) {		bucket = &(dlm->lockres_hash[i]);		hlist_for_each_entry(res, hash_iter, bucket, hash_node) {			if (res->state & DLM_LOCK_RES_RECOVERING) {				if (res->owner == dead_node) {					mlog(0, "(this=%u) res %.*s owner=%u "					     "was not on recovering list, but "					     "clearing state anyway\n",					     dlm->node_num, res->lockname.len,					     res->lockname.name, new_master);				} else if (res->owner == dlm->node_num) {					mlog(0, "(this=%u) res %.*s owner=%u "					     "was not on recovering list, "					     "owner is THIS node, clearing\n",					     dlm->node_num, res->lockname.len,					     res->lockname.name, new_master);				} else					continue;				spin_lock(&res->spinlock);				dlm_change_lockres_owner(dlm, res, new_master);				res->state &= ~DLM_LOCK_RES_RECOVERING;				__dlm_dirty_lockres(dlm, res);				spin_unlock(&res->spinlock);				wake_up(&res->wq);			}		}	}}static inline int dlm_lvb_needs_invalidation(struct dlm_lock *lock, int local){	if (local) {		if (lock->ml.type != LKM_EXMODE &&		    lock->ml.type != LKM_PRMODE)			return 1;	} else if (lock->ml.type == LKM_EXMODE)		return 1;	return 0;}static void dlm_revalidate_lvb(struct dlm_ctxt *dlm,

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -