⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dlmmaster.c

📁 linux 内核源代码
💻 C
📖 第 1 页 / 共 5 页
字号:
					  "%u, but current owner is "					  "%u! (%.*s)\n",				       assert->node_idx, res->owner,				       namelen, name);				goto kill;			}		} else if (mle->type != DLM_MLE_MIGRATION) {			if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {				/* owner is just re-asserting */				if (res->owner == assert->node_idx) {					mlog(0, "owner %u re-asserting on "					     "lock %.*s\n", assert->node_idx,					     namelen, name);					goto ok;				}				mlog(ML_ERROR, "got assert_master from "				     "node %u, but %u is the owner! "				     "(%.*s)\n", assert->node_idx,				     res->owner, namelen, name);				goto kill;			}			if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {				mlog(ML_ERROR, "got assert from %u, but lock "				     "with no owner should be "				     "in-progress! (%.*s)\n",				     assert->node_idx,				     namelen, name);				goto kill;			}		} else /* mle->type == DLM_MLE_MIGRATION */ {			/* should only be getting an assert from new master */			if (assert->node_idx != mle->new_master) {				mlog(ML_ERROR, "got assert from %u, but "				     "new master is %u, and old master "				     "was %u (%.*s)\n",				     assert->node_idx, mle->new_master,				     mle->master, namelen, name);				goto kill;			}		}ok:		spin_unlock(&res->spinlock);	}	spin_unlock(&dlm->spinlock);	// mlog(0, "woo!  got an assert_master from node %u!\n",	// 	     assert->node_idx);	if (mle) {		int extra_ref = 0;		int nn = -1;		int rr, err = 0;				spin_lock(&mle->spinlock);		if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION)			extra_ref = 1;		else {			/* MASTER mle: if any bits set in the response map			 * then the calling node needs to re-assert to clear			 * up nodes that this node contacted */			while ((nn = find_next_bit (mle->response_map, O2NM_MAX_NODES, 						    nn+1)) < O2NM_MAX_NODES) {				if (nn != dlm->node_num && nn != assert->node_idx)					master_request = 1;			}		}		mle->master = assert->node_idx;		atomic_set(&mle->woken, 1);		wake_up(&mle->wq);		spin_unlock(&mle->spinlock);		if (res) {			int wake = 0;			spin_lock(&res->spinlock);			if (mle->type == DLM_MLE_MIGRATION) {				mlog(0, "finishing off migration of lockres %.*s, "			     		"from %u to %u\n",			       		res->lockname.len, res->lockname.name,			       		dlm->node_num, mle->new_master);				res->state &= ~DLM_LOCK_RES_MIGRATING;				wake = 1;				dlm_change_lockres_owner(dlm, res, mle->new_master);				BUG_ON(res->state & DLM_LOCK_RES_DIRTY);			} else {				dlm_change_lockres_owner(dlm, res, mle->master);			}			spin_unlock(&res->spinlock);			have_lockres_ref = 1;			if (wake)				wake_up(&res->wq);		}		/* master is known, detach if not already detached.		 * ensures that only one assert_master call will happen		 * on this mle. */		spin_lock(&dlm->spinlock);		spin_lock(&dlm->master_lock);		rr = atomic_read(&mle->mle_refs.refcount);		if (mle->inuse > 0) {			if (extra_ref && rr < 3)				err = 1;			else if (!extra_ref && rr < 2)				err = 1;		} else {			if (extra_ref && rr < 2)				err = 1;			else if (!extra_ref && rr < 1)				err = 1;		}		if (err) {			mlog(ML_ERROR, "%s:%.*s: got assert master from %u "			     "that will mess up this node, refs=%d, extra=%d, "			     "inuse=%d\n", dlm->name, namelen, name,			     assert->node_idx, rr, extra_ref, mle->inuse);			dlm_print_one_mle(mle);		}		list_del_init(&mle->list);		__dlm_mle_detach_hb_events(dlm, mle);		__dlm_put_mle(mle);		if (extra_ref) {			/* the assert master message now balances the extra		 	 * ref given by the master / migration request message.		 	 * if this is the last put, it will be removed		 	 * from the list. */			__dlm_put_mle(mle);		}		spin_unlock(&dlm->master_lock);		spin_unlock(&dlm->spinlock);	} else if (res) {		if (res->owner != assert->node_idx) {			mlog(0, "assert_master from %u, but current "			     "owner is %u (%.*s), no mle\n", assert->node_idx,			     res->owner, namelen, name);		}	}done:	ret = 0;	if (res) {		spin_lock(&res->spinlock);		res->state |= DLM_LOCK_RES_SETREF_INPROG;		spin_unlock(&res->spinlock);		*ret_data = (void *)res;	}	dlm_put(dlm);	if (master_request) {		mlog(0, "need to tell master to reassert\n");		/* positive. negative would shoot down the node. */		ret |= DLM_ASSERT_RESPONSE_REASSERT;		if (!have_lockres_ref) {			mlog(ML_ERROR, "strange, got assert from %u, MASTER "			     "mle present here for %s:%.*s, but no lockres!\n",			     assert->node_idx, dlm->name, namelen, name);		}	}	if (have_lockres_ref) {		/* let the master know we have a reference to the lockres */		ret |= DLM_ASSERT_RESPONSE_MASTERY_REF;		mlog(0, "%s:%.*s: got assert from %u, need a ref\n",		     dlm->name, namelen, name, assert->node_idx);	}	return ret;kill:	/* kill the caller! */	mlog(ML_ERROR, "Bad message received from another node.  Dumping state "	     "and killing the other node now!  This node is OK and can continue.\n");	__dlm_print_one_lock_resource(res);	spin_unlock(&res->spinlock);	spin_unlock(&dlm->spinlock);	*ret_data = (void *)res; 	dlm_put(dlm);	return -EINVAL;}void dlm_assert_master_post_handler(int status, void *data, void *ret_data){	struct dlm_lock_resource *res = (struct dlm_lock_resource *)ret_data;	if (ret_data) {		spin_lock(&res->spinlock);		res->state &= ~DLM_LOCK_RES_SETREF_INPROG;		spin_unlock(&res->spinlock);		wake_up(&res->wq);		dlm_lockres_put(res);	}	return;}int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,			       struct dlm_lock_resource *res,			       int ignore_higher, u8 request_from, u32 flags){	struct dlm_work_item *item;	item = kzalloc(sizeof(*item), GFP_NOFS);	if (!item)		return -ENOMEM;	/* queue up work for dlm_assert_master_worker */	dlm_grab(dlm);  /* get an extra ref for the work item */	dlm_init_work_item(dlm, item, dlm_assert_master_worker, NULL);	item->u.am.lockres = res; /* already have a ref */	/* can optionally ignore node numbers higher than this node */	item->u.am.ignore_higher = ignore_higher;	item->u.am.request_from = request_from;	item->u.am.flags = flags;	if (ignore_higher) 		mlog(0, "IGNORE HIGHER: %.*s\n", res->lockname.len, 		     res->lockname.name);			spin_lock(&dlm->work_lock);	list_add_tail(&item->list, &dlm->work_list);	spin_unlock(&dlm->work_lock);	queue_work(dlm->dlm_worker, &dlm->dispatched_work);	return 0;}static void dlm_assert_master_worker(struct dlm_work_item *item, void *data){	struct dlm_ctxt *dlm = data;	int ret = 0;	struct dlm_lock_resource *res;	unsigned long nodemap[BITS_TO_LONGS(O2NM_MAX_NODES)];	int ignore_higher;	int bit;	u8 request_from;	u32 flags;	dlm = item->dlm;	res = item->u.am.lockres;	ignore_higher = item->u.am.ignore_higher;	request_from = item->u.am.request_from;	flags = item->u.am.flags;	spin_lock(&dlm->spinlock);	memcpy(nodemap, dlm->domain_map, sizeof(nodemap));	spin_unlock(&dlm->spinlock);	clear_bit(dlm->node_num, nodemap);	if (ignore_higher) {		/* if is this just to clear up mles for nodes below		 * this node, do not send the message to the original		 * caller or any node number higher than this */		clear_bit(request_from, nodemap);		bit = dlm->node_num;		while (1) {			bit = find_next_bit(nodemap, O2NM_MAX_NODES,					    bit+1);		       	if (bit >= O2NM_MAX_NODES)				break;			clear_bit(bit, nodemap);		}	}	/*	 * If we're migrating this lock to someone else, we are no	 * longer allowed to assert out own mastery.  OTOH, we need to	 * prevent migration from starting while we're still asserting	 * our dominance.  The reserved ast delays migration.	 */	spin_lock(&res->spinlock);	if (res->state & DLM_LOCK_RES_MIGRATING) {		mlog(0, "Someone asked us to assert mastery, but we're "		     "in the middle of migration.  Skipping assert, "		     "the new master will handle that.\n");		spin_unlock(&res->spinlock);		goto put;	} else		__dlm_lockres_reserve_ast(res);	spin_unlock(&res->spinlock);	/* this call now finishes out the nodemap	 * even if one or more nodes die */	mlog(0, "worker about to master %.*s here, this=%u\n",		     res->lockname.len, res->lockname.name, dlm->node_num);	ret = dlm_do_assert_master(dlm, res, nodemap, flags);	if (ret < 0) {		/* no need to restart, we are done */		if (!dlm_is_host_down(ret))			mlog_errno(ret);	}	/* Ok, we've asserted ourselves.  Let's let migration start. */	dlm_lockres_release_ast(dlm, res);put:	dlm_lockres_put(res);	mlog(0, "finished with dlm_assert_master_worker\n");}/* SPECIAL CASE for the $RECOVERY lock used by the recovery thread. * We cannot wait for node recovery to complete to begin mastering this * lockres because this lockres is used to kick off recovery! ;-) * So, do a pre-check on all living nodes to see if any of those nodes * think that $RECOVERY is currently mastered by a dead node.  If so, * we wait a short time to allow that node to get notified by its own * heartbeat stack, then check again.  All $RECOVERY lock resources * mastered by dead nodes are purged when the hearbeat callback is  * fired, so we can know for sure that it is safe to continue once * the node returns a live node or no node.  */static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm,				       struct dlm_lock_resource *res){	struct dlm_node_iter iter;	int nodenum;	int ret = 0;	u8 master = DLM_LOCK_RES_OWNER_UNKNOWN;	spin_lock(&dlm->spinlock);	dlm_node_iter_init(dlm->domain_map, &iter);	spin_unlock(&dlm->spinlock);	while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {		/* do not send to self */		if (nodenum == dlm->node_num)			continue;		ret = dlm_do_master_requery(dlm, res, nodenum, &master);		if (ret < 0) {			mlog_errno(ret);			if (!dlm_is_host_down(ret))				BUG();			/* host is down, so answer for that node would be			 * DLM_LOCK_RES_OWNER_UNKNOWN.  continue. */			ret = 0;		}		if (master != DLM_LOCK_RES_OWNER_UNKNOWN) {			/* check to see if this master is in the recovery map */			spin_lock(&dlm->spinlock);			if (test_bit(master, dlm->recovery_map)) {				mlog(ML_NOTICE, "%s: node %u has not seen "				     "node %u go down yet, and thinks the "				     "dead node is mastering the recovery "				     "lock.  must wait.\n", dlm->name,				     nodenum, master);				ret = -EAGAIN;			}			spin_unlock(&dlm->spinlock);			mlog(0, "%s: reco lock master is %u\n", dlm->name, 			     master);			break;		}	}	return ret;}/* * DLM_DEREF_LOCKRES_MSG */int dlm_drop_lockres_ref(struct dlm_ctxt *dlm, struct dlm_lock_resource *res){	struct dlm_deref_lockres deref;	int ret = 0, r;	const char *lockname;	unsigned int namelen;	lockname = res->lockname.name;	namelen = res->lockname.len;	BUG_ON(namelen > O2NM_MAX_NAME_LEN);	mlog(0, "%s:%.*s: sending deref to %d\n",	     dlm->name, namelen, lockname, res->owner);	memset(&deref, 0, sizeof(deref));	deref.node_idx = dlm->node_num;	deref.namelen = namelen;	memcpy(deref.name, lockname, namelen);	ret = o2net_send_message(DLM_DEREF_LOCKRES_MSG, dlm->key,				 &deref, sizeof(deref), res->owner, &r);	if (ret < 0)		mlog_errno(ret);	else if (r < 0) {		/* BAD.  other node says I did not have a ref. */		mlog(ML_ERROR,"while dropping ref on %s:%.*s "		    "(master=%u) got %d.\n", dlm->name, namelen,		    lockname, res->owner, r);		dlm_print_one_lock_resource(res);		BUG();	}	return ret;}int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data,			      void **ret_data){	struct dlm_ctxt *dlm = data;	struct dlm_deref_lockres *deref = (struct dlm_deref_lockres *)msg->buf;	struct dlm_lock_resource *res = NULL;	char *name;	unsigned int namelen;	int ret = -EINVAL;	u8 node;	unsigned int hash;	struct dlm_work_item *item;	int cleared = 0;	int dispatch = 0;	if (!dlm_grab(dlm))		return 0;	name = deref->name;	namelen = deref->namelen;	node = deref->node_idx;	if (namelen > DLM_LOCKID_NAME_MAX) {		mlog(ML_ERROR, "Invalid name length!");		goto done;	}	if (deref->node_idx >= O2NM_MAX_NODES) {		mlog(ML_ERROR, "Invalid node number: %u\n", node);		goto done;	}	hash = dlm_lockid_hash(name, namelen);	spin_lock(&dlm->spinlock);	res = __dlm_lookup_lockres_full(dlm, name, namelen, hash);	if (!res) {		spin_unlock(&dlm->spinlock);		mlog(ML_ERROR, "%s:%.*s: bad lockres name\n",		     dlm->name, namelen, name);		goto done;	}	spin_unlock(&dlm->spinlock);	spin_lock(&res->spinlock);	if (res->state & DLM_LOCK_RES_SETREF_INPROG)		dispatch = 1;	else {		BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF);		if (test_bit(node, res->refmap)) {			dlm_lockres_clear_refmap_bit(node, res);			cleared = 1;		}	}	spin_unlock(&res->spinlock);	if (!dispatch) {		if (cleared)			dlm_lockres_calc_usage(dlm, res);		else {			mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref "		     	"but it is already dropped!\n", dlm->name,		     	res->lockname.len, res->lockname.name, node);			__dlm_print_one_lock_resource(res);		}		ret = 0;		goto done;	}	item = kzalloc(sizeof(*item), GFP_NOFS);	if (!item) {		ret = -ENOMEM;		mlog_errno(ret);		goto done;	}	dlm_init_work_item(dlm, item, dlm_deref_lockres_worker, NULL);	item->u.dl.deref_res = res;	item->u.dl.deref_node = node;	spin_lock(&dlm->work_lock);	list_add_tail(&item->list, &dlm->work_list);	spin_unlock(&dlm->work_lock);	queue_work(dlm->dlm_worker, &dlm->dispatched_work);	ret

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -