⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dlmrecovery.c

📁 linux2.6.16版本
💻 C
📖 第 1 页 / 共 5 页
字号:
			       struct dlm_lock_resource *res, u8 dead_node){	struct list_head *iter, *queue;	struct dlm_lock *lock;	int blank_lvb = 0, local = 0;	int i;	u8 search_node;	assert_spin_locked(&dlm->spinlock);	assert_spin_locked(&res->spinlock);	if (res->owner == dlm->node_num)		/* if this node owned the lockres, and if the dead node 		 * had an EX when he died, blank out the lvb */		search_node = dead_node;	else {		/* if this is a secondary lockres, and we had no EX or PR		 * locks granted, we can no longer trust the lvb */		search_node = dlm->node_num;		local = 1;  /* check local state for valid lvb */	}	for (i=DLM_GRANTED_LIST; i<=DLM_CONVERTING_LIST; i++) {		queue = dlm_list_idx_to_ptr(res, i);		list_for_each(iter, queue) {			lock = list_entry (iter, struct dlm_lock, list);			if (lock->ml.node == search_node) {				if (dlm_lvb_needs_invalidation(lock, local)) {					/* zero the lksb lvb and lockres lvb */					blank_lvb = 1;					memset(lock->lksb->lvb, 0, DLM_LVB_LEN);				}			}		}	}	if (blank_lvb) {		mlog(0, "clearing %.*s lvb, dead node %u had EX\n",		     res->lockname.len, res->lockname.name, dead_node);		memset(res->lvb, 0, DLM_LVB_LEN);	}}static void dlm_free_dead_locks(struct dlm_ctxt *dlm,				struct dlm_lock_resource *res, u8 dead_node){	struct list_head *iter, *tmpiter;	struct dlm_lock *lock;	/* this node is the lockres master:	 * 1) remove any stale locks for the dead node	 * 2) if the dead node had an EX when he died, blank out the lvb 	 */	assert_spin_locked(&dlm->spinlock);	assert_spin_locked(&res->spinlock);	/* TODO: check pending_asts, pending_basts here */	list_for_each_safe(iter, tmpiter, &res->granted) {		lock = list_entry (iter, struct dlm_lock, list);		if (lock->ml.node == dead_node) {			list_del_init(&lock->list);			dlm_lock_put(lock);		}	}	list_for_each_safe(iter, tmpiter, &res->converting) {		lock = list_entry (iter, struct dlm_lock, list);		if (lock->ml.node == dead_node) {			list_del_init(&lock->list);			dlm_lock_put(lock);		}	}	list_for_each_safe(iter, tmpiter, &res->blocked) {		lock = list_entry (iter, struct dlm_lock, list);		if (lock->ml.node == dead_node) {			list_del_init(&lock->list);			dlm_lock_put(lock);		}	}	/* do not kick thread yet */	__dlm_dirty_lockres(dlm, res);}/* if this node is the recovery master, and there are no * locks for a given lockres owned by this node that are in * either PR or EX mode, zero out the lvb before requesting. * */static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node){	struct hlist_node *iter;	struct dlm_lock_resource *res;	int i;	struct hlist_head *bucket;	struct dlm_lock *lock;	/* purge any stale mles */	dlm_clean_master_list(dlm, dead_node);	/*	 * now clean up all lock resources.  there are two rules:	 *	 * 1) if the dead node was the master, move the lockres	 *    to the recovering list.  set the RECOVERING flag.	 *    this lockres needs to be cleaned up before it can	 *    be used further.	 *	 * 2) if this node was the master, remove all locks from	 *    each of the lockres queues that were owned by the	 *    dead node.  once recovery finishes, the dlm thread	 *    can be kicked again to see if any ASTs or BASTs	 *    need to be fired as a result.	 */	for (i = 0; i < DLM_HASH_BUCKETS; i++) {		bucket = &(dlm->lockres_hash[i]);		hlist_for_each_entry(res, iter, bucket, hash_node) { 			/* always prune any $RECOVERY entries for dead nodes, 			 * otherwise hangs can occur during later recovery */			if (dlm_is_recovery_lock(res->lockname.name,						 res->lockname.len)) {				spin_lock(&res->spinlock);				list_for_each_entry(lock, &res->granted, list) {					if (lock->ml.node == dead_node) {						mlog(0, "AHA! there was "						     "a $RECOVERY lock for dead "						     "node %u (%s)!\n",						     dead_node, dlm->name);						list_del_init(&lock->list);						dlm_lock_put(lock);						break;					}				}				spin_unlock(&res->spinlock);				continue;			}						spin_lock(&res->spinlock);			/* zero the lvb if necessary */			dlm_revalidate_lvb(dlm, res, dead_node);			if (res->owner == dead_node)				dlm_move_lockres_to_recovery_list(dlm, res);			else if (res->owner == dlm->node_num) {				dlm_free_dead_locks(dlm, res, dead_node);				__dlm_lockres_calc_usage(dlm, res);			}			spin_unlock(&res->spinlock);		}	}}static void __dlm_hb_node_down(struct dlm_ctxt *dlm, int idx){	assert_spin_locked(&dlm->spinlock);	/* check to see if the node is already considered dead */	if (!test_bit(idx, dlm->live_nodes_map)) {		mlog(0, "for domain %s, node %d is already dead. "		     "another node likely did recovery already.\n",		     dlm->name, idx);		return;	}	/* check to see if we do not care about this node */	if (!test_bit(idx, dlm->domain_map)) {		/* This also catches the case that we get a node down		 * but haven't joined the domain yet. */		mlog(0, "node %u already removed from domain!\n", idx);		return;	}	clear_bit(idx, dlm->live_nodes_map);	/* Clean up join state on node death. */	if (dlm->joining_node == idx) {		mlog(0, "Clearing join state for node %u\n", idx);		__dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN);	}	/* make sure local cleanup occurs before the heartbeat events */	if (!test_bit(idx, dlm->recovery_map))		dlm_do_local_recovery_cleanup(dlm, idx);	/* notify anything attached to the heartbeat events */	dlm_hb_event_notify_attached(dlm, idx, 0);	mlog(0, "node %u being removed from domain map!\n", idx);	clear_bit(idx, dlm->domain_map);	/* wake up migration waiters if a node goes down.	 * perhaps later we can genericize this for other waiters. */	wake_up(&dlm->migration_wq);	if (test_bit(idx, dlm->recovery_map))		mlog(0, "domain %s, node %u already added "		     "to recovery map!\n", dlm->name, idx);	else		set_bit(idx, dlm->recovery_map);}void dlm_hb_node_down_cb(struct o2nm_node *node, int idx, void *data){	struct dlm_ctxt *dlm = data;	if (!dlm_grab(dlm))		return;	spin_lock(&dlm->spinlock);	__dlm_hb_node_down(dlm, idx);	spin_unlock(&dlm->spinlock);	dlm_put(dlm);}void dlm_hb_node_up_cb(struct o2nm_node *node, int idx, void *data){	struct dlm_ctxt *dlm = data;	if (!dlm_grab(dlm))		return;	spin_lock(&dlm->spinlock);	set_bit(idx, dlm->live_nodes_map);	/* do NOT notify mle attached to the heartbeat events.	 * new nodes are not interesting in mastery until joined. */	spin_unlock(&dlm->spinlock);	dlm_put(dlm);}static void dlm_reco_ast(void *astdata){	struct dlm_ctxt *dlm = astdata;	mlog(0, "ast for recovery lock fired!, this=%u, dlm=%s\n",	     dlm->node_num, dlm->name);}static void dlm_reco_bast(void *astdata, int blocked_type){	struct dlm_ctxt *dlm = astdata;	mlog(0, "bast for recovery lock fired!, this=%u, dlm=%s\n",	     dlm->node_num, dlm->name);}static void dlm_reco_unlock_ast(void *astdata, enum dlm_status st){	mlog(0, "unlockast for recovery lock fired!\n");}/* * dlm_pick_recovery_master will continually attempt to use * dlmlock() on the special "$RECOVERY" lockres with the * LKM_NOQUEUE flag to get an EX.  every thread that enters * this function on each node racing to become the recovery * master will not stop attempting this until either: * a) this node gets the EX (and becomes the recovery master), * or b) dlm->reco.new_master gets set to some nodenum  * != O2NM_INVALID_NODE_NUM (another node will do the reco). * so each time a recovery master is needed, the entire cluster * will sync at this point.  if the new master dies, that will * be detected in dlm_do_recovery */static int dlm_pick_recovery_master(struct dlm_ctxt *dlm){	enum dlm_status ret;	struct dlm_lockstatus lksb;	int status = -EINVAL;	mlog(0, "starting recovery of %s at %lu, dead=%u, this=%u\n",	     dlm->name, jiffies, dlm->reco.dead_node, dlm->node_num);again:		memset(&lksb, 0, sizeof(lksb));	ret = dlmlock(dlm, LKM_EXMODE, &lksb, LKM_NOQUEUE|LKM_RECOVERY,		      DLM_RECOVERY_LOCK_NAME, dlm_reco_ast, dlm, dlm_reco_bast);	mlog(0, "%s: dlmlock($RECOVERY) returned %d, lksb=%d\n",	     dlm->name, ret, lksb.status);	if (ret == DLM_NORMAL) {		mlog(0, "dlm=%s dlmlock says I got it (this=%u)\n",		     dlm->name, dlm->node_num);				/* got the EX lock.  check to see if another node 		 * just became the reco master */		if (dlm_reco_master_ready(dlm)) {			mlog(0, "%s: got reco EX lock, but %u will "			     "do the recovery\n", dlm->name,			     dlm->reco.new_master);			status = -EEXIST;		} else {			status = 0;			/* see if recovery was already finished elsewhere */			spin_lock(&dlm->spinlock);			if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) {				status = -EINVAL;					mlog(0, "%s: got reco EX lock, but "				     "node got recovered already\n", dlm->name);				if (dlm->reco.new_master != O2NM_INVALID_NODE_NUM) {					mlog(ML_ERROR, "%s: new master is %u "					     "but no dead node!\n", 					     dlm->name, dlm->reco.new_master);					BUG();				}			}			spin_unlock(&dlm->spinlock);		}		/* if this node has actually become the recovery master,		 * set the master and send the messages to begin recovery */		if (!status) {			mlog(0, "%s: dead=%u, this=%u, sending "			     "begin_reco now\n", dlm->name, 			     dlm->reco.dead_node, dlm->node_num);			status = dlm_send_begin_reco_message(dlm,				      dlm->reco.dead_node);			/* this always succeeds */			BUG_ON(status);			/* set the new_master to this node */			spin_lock(&dlm->spinlock);			dlm->reco.new_master = dlm->node_num;			spin_unlock(&dlm->spinlock);		}		/* recovery lock is a special case.  ast will not get fired,		 * so just go ahead and unlock it. */		ret = dlmunlock(dlm, &lksb, 0, dlm_reco_unlock_ast, dlm);		if (ret == DLM_DENIED) {			mlog(0, "got DLM_DENIED, trying LKM_CANCEL\n");			ret = dlmunlock(dlm, &lksb, LKM_CANCEL, dlm_reco_unlock_ast, dlm);		}		if (ret != DLM_NORMAL) {			/* this would really suck. this could only happen			 * if there was a network error during the unlock			 * because of node death.  this means the unlock			 * is actually "done" and the lock structure is			 * even freed.  we can continue, but only			 * because this specific lock name is special. */			mlog(ML_ERROR, "dlmunlock returned %d\n", ret);		}	} else if (ret == DLM_NOTQUEUED) {		mlog(0, "dlm=%s dlmlock says another node got it (this=%u)\n",		     dlm->name, dlm->node_num);		/* another node is master. wait on		 * reco.new_master != O2NM_INVALID_NODE_NUM 		 * for at most one second */		wait_event_timeout(dlm->dlm_reco_thread_wq,					 dlm_reco_master_ready(dlm),					 msecs_to_jiffies(1000));		if (!dlm_reco_master_ready(dlm)) {			mlog(0, "%s: reco master taking awhile\n",			     dlm->name);			goto again;		}		/* another node has informed this one that it is reco master */		mlog(0, "%s: reco master %u is ready to recover %u\n",		     dlm->name, dlm->reco.new_master, dlm->reco.dead_node);		status = -EEXIST;	} else {		struct dlm_lock_resource *res;		/* dlmlock returned something other than NOTQUEUED or NORMAL */		mlog(ML_ERROR, "%s: got %s from dlmlock($RECOVERY), "		     "lksb.status=%s\n", dlm->name, dlm_errname(ret),		     dlm_errname(lksb.status));		res = dlm_lookup_lockres(dlm, DLM_RECOVERY_LOCK_NAME,					 DLM_RECOVERY_LOCK_NAME_LEN);		if (res) {			dlm_print_one_lock_resource(res);			dlm_lockres_put(res);		} else {			mlog(ML_ERROR, "recovery lock not found\n");		}		BUG();	}	return status;}static int dlm_send_begin_reco_message(struct dlm_ctxt *dlm, u8 dead_node){	struct dlm_begin_reco br;	int ret = 0;	struct dlm_node_iter iter;	int nodenum;	int status;	mlog_entry("%u\n", dead_node);	mlog(0, "dead node is %u\n", dead_node);	spin_lock(&dlm->spinlock);	dlm_node_iter_init(dlm->domain_map, &iter);	spin_unlock(&dlm->spinlock);	clear_bit(dead_node, iter.node_map);	memset(&br, 0, sizeof(br));	br.node_idx = dlm->node_num;	br.dead_node = dead_node;	while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {		ret = 0;		if (nodenum == dead_node) {			mlog(0, "not sending begin reco to dead node "				  "%u\n", dead_node);			continue;		}		if (nodenum == dlm->node_num) {			mlog(0, "not sending begin reco to self\n");			continue;		}retry:		ret = -EINVAL;		mlog(0, "attempting to send begin reco msg to %d\n",			  nodenum);		ret = o2net_send_message(DLM_BEGIN_RECO_MSG, dlm->key,					 &br, sizeof(br), nodenum, &status);		/* negative status is handled ok by caller here */		if (ret >= 0)			ret = status;		if (dlm_is_host_down(ret)) {			/* node is down.  not involved in recovery			 * so just keep going */			mlog(0, "%s: node %u was down when sending "			     "begin reco msg (%d)\n", dlm->name, nodenum, ret);			ret = 0;		}		if (ret < 0) {			struct dlm_lock_resource *res;			/* this is now a serious problem, possibly ENOMEM 			 * in the network stack.  must retry */			mlog_errno(ret);			mlog(ML_ERROR, "begin reco of dlm %s to node %u "			    " returned %d\n", dlm->name, nodenum, ret);			res = dlm_lookup_lockres(dlm, DLM_RECOVERY_LOCK_NAME,						 DLM_RECOVERY_LOCK_NAME_LEN);			if (res) {				dlm_print_one_lock_reso

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -