⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dlmmaster.c

📁 LINUX 2.6.17.4的源码
💻 C
📖 第 1 页 / 共 5 页
字号:
	 * lockres doesn't exist on this node	 * if there is an MLE_BLOCK, return NO	 * if there is an MLE_MASTER, return MAYBE	 * otherwise, add an MLE_BLOCK, return NO	 */	spin_lock(&dlm->master_lock);	found = dlm_find_mle(dlm, &tmpmle, name, namelen);	if (!found) {		/* this lockid has never been seen on this node yet */		// mlog(0, "no mle found\n");		if (!mle) {			spin_unlock(&dlm->master_lock);			spin_unlock(&dlm->spinlock);			mle = (struct dlm_master_list_entry *)				kmem_cache_alloc(dlm_mle_cache, GFP_KERNEL);			if (!mle) {				response = DLM_MASTER_RESP_ERROR;				mlog_errno(-ENOMEM);				goto send_response;			}			spin_lock(&dlm->spinlock);			dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL,					 name, namelen);			spin_unlock(&dlm->spinlock);			goto way_up_top;		}		// mlog(0, "this is second time thru, already allocated, "		// "add the block.\n");		set_bit(request->node_idx, mle->maybe_map);		list_add(&mle->list, &dlm->master_list);		response = DLM_MASTER_RESP_NO;	} else {		// mlog(0, "mle was found\n");		set_maybe = 1;		spin_lock(&tmpmle->spinlock);		if (tmpmle->master == dlm->node_num) {			mlog(ML_ERROR, "no lockres, but an mle with this node as master!\n");			BUG();		}		if (tmpmle->type == DLM_MLE_BLOCK)			response = DLM_MASTER_RESP_NO;		else if (tmpmle->type == DLM_MLE_MIGRATION) {			mlog(0, "migration mle was found (%u->%u)\n",			     tmpmle->master, tmpmle->new_master);			/* real master can respond on its own */			response = DLM_MASTER_RESP_NO;		} else			response = DLM_MASTER_RESP_MAYBE;		if (set_maybe)			set_bit(request->node_idx, tmpmle->maybe_map);		spin_unlock(&tmpmle->spinlock);	}	spin_unlock(&dlm->master_lock);	spin_unlock(&dlm->spinlock);	if (found) {		/* keep the mle attached to heartbeat events */		dlm_put_mle(tmpmle);	}send_response:	if (dispatch_assert) {		if (response != DLM_MASTER_RESP_YES)			mlog(ML_ERROR, "invalid response %d\n", response);		if (!res) {			mlog(ML_ERROR, "bad lockres while trying to assert!\n");			BUG();		}		mlog(0, "%u is the owner of %.*s, cleaning everyone else\n",			     dlm->node_num, res->lockname.len, res->lockname.name);		ret = dlm_dispatch_assert_master(dlm, res, 0, request->node_idx, 						 DLM_ASSERT_MASTER_MLE_CLEANUP);		if (ret < 0) {			mlog(ML_ERROR, "failed to dispatch assert master work\n");			response = DLM_MASTER_RESP_ERROR;		}	}	dlm_put(dlm);	return response;}/* * DLM_ASSERT_MASTER_MSG *//* * NOTE: this can be used for debugging * can periodically run all locks owned by this node * and re-assert across the cluster... */static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname,				unsigned int namelen, void *nodemap,				u32 flags){	struct dlm_assert_master assert;	int to, tmpret;	struct dlm_node_iter iter;	int ret = 0;	int reassert;	BUG_ON(namelen > O2NM_MAX_NAME_LEN);again:	reassert = 0;	/* note that if this nodemap is empty, it returns 0 */	dlm_node_iter_init(nodemap, &iter);	while ((to = dlm_node_iter_next(&iter)) >= 0) {		int r = 0;		mlog(0, "sending assert master to %d (%.*s)\n", to,		     namelen, lockname);		memset(&assert, 0, sizeof(assert));		assert.node_idx = dlm->node_num;		assert.namelen = namelen;		memcpy(assert.name, lockname, namelen);		assert.flags = cpu_to_be32(flags);		tmpret = o2net_send_message(DLM_ASSERT_MASTER_MSG, dlm->key,					    &assert, sizeof(assert), to, &r);		if (tmpret < 0) {			mlog(ML_ERROR, "assert_master returned %d!\n", tmpret);			if (!dlm_is_host_down(tmpret)) {				mlog(ML_ERROR, "unhandled error!\n");				BUG();			}			/* a node died.  finish out the rest of the nodes. */			mlog(ML_ERROR, "link to %d went down!\n", to);			/* any nonzero status return will do */			ret = tmpret;		} else if (r < 0) {			/* ok, something horribly messed.  kill thyself. */			mlog(ML_ERROR,"during assert master of %.*s to %u, "			     "got %d.\n", namelen, lockname, to, r);			dlm_dump_lock_resources(dlm);			BUG();		} else if (r == EAGAIN) {			mlog(0, "%.*s: node %u create mles on other "			     "nodes and requests a re-assert\n", 			     namelen, lockname, to);			reassert = 1;		}	}	if (reassert)		goto again;	return ret;}/* * locks that can be taken here: * dlm->spinlock * res->spinlock * mle->spinlock * dlm->master_list * * if possible, TRIM THIS DOWN!!! */int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data){	struct dlm_ctxt *dlm = data;	struct dlm_master_list_entry *mle = NULL;	struct dlm_assert_master *assert = (struct dlm_assert_master *)msg->buf;	struct dlm_lock_resource *res = NULL;	char *name;	unsigned int namelen;	u32 flags;	int master_request = 0;	int ret = 0;	if (!dlm_grab(dlm))		return 0;	name = assert->name;	namelen = assert->namelen;	flags = be32_to_cpu(assert->flags);	if (namelen > DLM_LOCKID_NAME_MAX) {		mlog(ML_ERROR, "Invalid name length!");		goto done;	}	spin_lock(&dlm->spinlock);	if (flags)		mlog(0, "assert_master with flags: %u\n", flags);	/* find the MLE */	spin_lock(&dlm->master_lock);	if (!dlm_find_mle(dlm, &mle, name, namelen)) {		/* not an error, could be master just re-asserting */		mlog(0, "just got an assert_master from %u, but no "		     "MLE for it! (%.*s)\n", assert->node_idx,		     namelen, name);	} else {		int bit = find_next_bit (mle->maybe_map, O2NM_MAX_NODES, 0);		if (bit >= O2NM_MAX_NODES) {			/* not necessarily an error, though less likely.			 * could be master just re-asserting. */			mlog(ML_ERROR, "no bits set in the maybe_map, but %u "			     "is asserting! (%.*s)\n", assert->node_idx,			     namelen, name);		} else if (bit != assert->node_idx) {			if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {				mlog(0, "master %u was found, %u should "				     "back off\n", assert->node_idx, bit);			} else {				/* with the fix for bug 569, a higher node				 * number winning the mastery will respond				 * YES to mastery requests, but this node				 * had no way of knowing.  let it pass. */				mlog(ML_ERROR, "%u is the lowest node, "				     "%u is asserting. (%.*s)  %u must "				     "have begun after %u won.\n", bit,				     assert->node_idx, namelen, name, bit,				     assert->node_idx);			}		}	}	spin_unlock(&dlm->master_lock);	/* ok everything checks out with the MLE	 * now check to see if there is a lockres */	res = __dlm_lookup_lockres(dlm, name, namelen);	if (res) {		spin_lock(&res->spinlock);		if (res->state & DLM_LOCK_RES_RECOVERING)  {			mlog(ML_ERROR, "%u asserting but %.*s is "			     "RECOVERING!\n", assert->node_idx, namelen, name);			goto kill;		}		if (!mle) {			if (res->owner != assert->node_idx) {				mlog(ML_ERROR, "assert_master from "					  "%u, but current owner is "					  "%u! (%.*s)\n",				       assert->node_idx, res->owner,				       namelen, name);				goto kill;			}		} else if (mle->type != DLM_MLE_MIGRATION) {			if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {				/* owner is just re-asserting */				if (res->owner == assert->node_idx) {					mlog(0, "owner %u re-asserting on "					     "lock %.*s\n", assert->node_idx,					     namelen, name);					goto ok;				}				mlog(ML_ERROR, "got assert_master from "				     "node %u, but %u is the owner! "				     "(%.*s)\n", assert->node_idx,				     res->owner, namelen, name);				goto kill;			}			if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {				mlog(ML_ERROR, "got assert from %u, but lock "				     "with no owner should be "				     "in-progress! (%.*s)\n",				     assert->node_idx,				     namelen, name);				goto kill;			}		} else /* mle->type == DLM_MLE_MIGRATION */ {			/* should only be getting an assert from new master */			if (assert->node_idx != mle->new_master) {				mlog(ML_ERROR, "got assert from %u, but "				     "new master is %u, and old master "				     "was %u (%.*s)\n",				     assert->node_idx, mle->new_master,				     mle->master, namelen, name);				goto kill;			}		}ok:		spin_unlock(&res->spinlock);	}	spin_unlock(&dlm->spinlock);	// mlog(0, "woo!  got an assert_master from node %u!\n",	// 	     assert->node_idx);	if (mle) {		int extra_ref = 0;		int nn = -1;				spin_lock(&mle->spinlock);		if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION)			extra_ref = 1;		else {			/* MASTER mle: if any bits set in the response map			 * then the calling node needs to re-assert to clear			 * up nodes that this node contacted */			while ((nn = find_next_bit (mle->response_map, O2NM_MAX_NODES, 						    nn+1)) < O2NM_MAX_NODES) {				if (nn != dlm->node_num && nn != assert->node_idx)					master_request = 1;			}		}		mle->master = assert->node_idx;		atomic_set(&mle->woken, 1);		wake_up(&mle->wq);		spin_unlock(&mle->spinlock);		if (mle->type == DLM_MLE_MIGRATION && res) {			mlog(0, "finishing off migration of lockres %.*s, "			     "from %u to %u\n",			       res->lockname.len, res->lockname.name,			       dlm->node_num, mle->new_master);			spin_lock(&res->spinlock);			res->state &= ~DLM_LOCK_RES_MIGRATING;			dlm_change_lockres_owner(dlm, res, mle->new_master);			BUG_ON(res->state & DLM_LOCK_RES_DIRTY);			spin_unlock(&res->spinlock);		}		/* master is known, detach if not already detached */		dlm_mle_detach_hb_events(dlm, mle);		dlm_put_mle(mle);				if (extra_ref) {			/* the assert master message now balances the extra		 	 * ref given by the master / migration request message.		 	 * if this is the last put, it will be removed		 	 * from the list. */			dlm_put_mle(mle);		}	}done:	ret = 0;	if (res)		dlm_lockres_put(res);	dlm_put(dlm);	if (master_request) {		mlog(0, "need to tell master to reassert\n");		ret = EAGAIN;  // positive. negative would shoot down the node.	}	return ret;kill:	/* kill the caller! */	spin_unlock(&res->spinlock);	spin_unlock(&dlm->spinlock);	dlm_lockres_put(res);	mlog(ML_ERROR, "Bad message received from another node.  Dumping state "	     "and killing the other node now!  This node is OK and can continue.\n");	dlm_dump_lock_resources(dlm);	dlm_put(dlm);	return -EINVAL;}int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,			       struct dlm_lock_resource *res,			       int ignore_higher, u8 request_from, u32 flags){	struct dlm_work_item *item;	item = kcalloc(1, sizeof(*item), GFP_KERNEL);	if (!item)		return -ENOMEM;	/* queue up work for dlm_assert_master_worker */	dlm_grab(dlm);  /* get an extra ref for the work item */	dlm_init_work_item(dlm, item, dlm_assert_master_worker, NULL);	item->u.am.lockres = res; /* already have a ref */	/* can optionally ignore node numbers higher than this node */	item->u.am.ignore_higher = ignore_higher;	item->u.am.request_from = request_from;	item->u.am.flags = flags;	if (ignore_higher) 		mlog(0, "IGNORE HIGHER: %.*s\n", res->lockname.len, 		     res->lockname.name);			spin_lock(&dlm->work_lock);	list_add_tail(&item->list, &dlm->work_list);	spin_unlock(&dlm->work_lock);	schedule_work(&dlm->dispatched_work);	return 0;}static void dlm_assert_master_worker(struct dlm_work_item *item, void *data){	struct dlm_ctxt *dlm = data;	int ret = 0;	struct dlm_lock_resource *res;	unsigned long nodemap[BITS_TO_LONGS(O2NM_MAX_NODES)];	int ignore_higher;	int bit;	u8 request_from;	u32 flags;	dlm = item->dlm;	res = item->u.am.lockres;	ignore_higher = item->u.am.ignore_higher;	request_from = item->u.am.request_from;	flags = item->u.am.flags;	spin_lock(&dlm->spinlock);	memcpy(nodemap, dlm->domain_map, sizeof(nodemap));	spin_unlock(&dlm->spinlock);	clear_bit(dlm->node_num, nodemap);	if (ignore_higher) {		/* if is this just to clear up mles for nodes below		 * this node, do not send the message to the original		 * caller or any node number higher than this */		clear_bit(request_from, nodemap);		bit = dlm->node_num;		while (1) {			bit = find_next_bit(nodemap, O2NM_MAX_NODES,					    bit+1);		       	if (bit >= O2NM_MAX_NODES)				break;			clear_bit(bit, nodemap);		}	}	/* this call now finishes out the nodemap	 * even if one or more nodes die */	mlog(0, "worker about to master %.*s here, this=%u\n",		     res->lockname.len, res->lockname.name, dlm->node_num);	ret = dlm_do_assert_master(dlm, res->lockname.name,				   res->lockname.len,				   nodemap, flags);	if (ret < 0) {		/* no need to restart, we are done */		mlog_errno(ret);	}	dlm_lockres_put(res);	mlog(0, "finished with dlm_assert_master_worker\n");}/* SPECIAL CASE for the $RECOVERY lock used by the recovery thread. * We cannot wait for node recovery to complete to begin mastering this * lockres because this lockres is used to kick off recovery! ;-) * So, do a pre-check on all living nodes to see if any of those nodes * think that $RECOVERY is currently mastered by a dead node.  If so, * we wait a short time to allow that node to get notified by its own * heartbeat stack, then check again.  All $RECOVERY lock resources * mastered by dead nodes are purged when the hearbeat callback is  * fired, so we can know for sure that it is safe to continue once * the node returns a live node or no node.  */static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,				       struct dlm_lock_resource *res){	struct dlm_node_iter iter;	int nodenum;	int ret = 0;	u8 master = DLM_LOCK_RES_OWNER_UNKNOWN;	spin_lock(&dlm->spinlock);	dlm_node_iter_init(dlm->domain_map, &iter);	spin_unlock(&dlm->spinlock);	while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {		/* do not send to self */		if (nodenum == dlm->node_num)			continue;		ret = dlm_do_master_requery(dlm, res, nodenum, &master);		if (ret < 0) {			mlog_errno(ret);			if (!dlm_is_host_down(ret))				BUG();			/* host is down, so answer for that node would be			 * DLM_LOCK_RES_OWNER_UNKNOWN.  continue. */		}		if (master != DLM_LOCK_RES_OWNER_UNKNOWN) {			/* check to see if this master is in the recovery map */			spin_lock(&dlm->spinlock);			if (test_bit(master, dlm->recovery_map)) {				mlog(ML_NOTICE, "%s: node %u has not seen "				     "node %u go down yet, and thinks the "				     "dead node is mastering the recovery "				     "lock.  must wait.\n", dlm->name,

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -