⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dlmmaster.c

📁 linux2.6.16版本
💻 C
📖 第 1 页 / 共 5 页
字号:
 */static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname,				unsigned int namelen, void *nodemap,				u32 flags){	struct dlm_assert_master assert;	int to, tmpret;	struct dlm_node_iter iter;	int ret = 0;	BUG_ON(namelen > O2NM_MAX_NAME_LEN);	/* note that if this nodemap is empty, it returns 0 */	dlm_node_iter_init(nodemap, &iter);	while ((to = dlm_node_iter_next(&iter)) >= 0) {		int r = 0;		mlog(0, "sending assert master to %d (%.*s)\n", to,		     namelen, lockname);		memset(&assert, 0, sizeof(assert));		assert.node_idx = dlm->node_num;		assert.namelen = namelen;		memcpy(assert.name, lockname, namelen);		assert.flags = cpu_to_be32(flags);		tmpret = o2net_send_message(DLM_ASSERT_MASTER_MSG, dlm->key,					    &assert, sizeof(assert), to, &r);		if (tmpret < 0) {			mlog(ML_ERROR, "assert_master returned %d!\n", tmpret);			if (!dlm_is_host_down(tmpret)) {				mlog(ML_ERROR, "unhandled error!\n");				BUG();			}			/* a node died.  finish out the rest of the nodes. */			mlog(ML_ERROR, "link to %d went down!\n", to);			/* any nonzero status return will do */			ret = tmpret;		} else if (r < 0) {			/* ok, something horribly messed.  kill thyself. */			mlog(ML_ERROR,"during assert master of %.*s to %u, "			     "got %d.\n", namelen, lockname, to, r);			dlm_dump_lock_resources(dlm);			BUG();		}	}	return ret;}/* * locks that can be taken here: * dlm->spinlock * res->spinlock * mle->spinlock * dlm->master_list * * if possible, TRIM THIS DOWN!!! */int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data){	struct dlm_ctxt *dlm = data;	struct dlm_master_list_entry *mle = NULL;	struct dlm_assert_master *assert = (struct dlm_assert_master *)msg->buf;	struct dlm_lock_resource *res = NULL;	char *name;	unsigned int namelen;	u32 flags;	if (!dlm_grab(dlm))		return 0;	name = assert->name;	namelen = assert->namelen;	flags = be32_to_cpu(assert->flags);	if (namelen > DLM_LOCKID_NAME_MAX) {		mlog(ML_ERROR, "Invalid name length!");		goto done;	}	spin_lock(&dlm->spinlock);	if (flags)		mlog(0, "assert_master with flags: %u\n", flags);	/* find the MLE */	spin_lock(&dlm->master_lock);	if (!dlm_find_mle(dlm, &mle, name, namelen)) {		/* not an error, could be master just re-asserting */		mlog(0, "just got an assert_master from %u, but no "		     "MLE for it! (%.*s)\n", assert->node_idx,		     namelen, name);	} else {		int bit = find_next_bit (mle->maybe_map, O2NM_MAX_NODES, 0);		if (bit >= O2NM_MAX_NODES) {			/* not necessarily an error, though less likely.			 * could be master just re-asserting. */			mlog(ML_ERROR, "no bits set in the maybe_map, but %u "			     "is asserting! (%.*s)\n", assert->node_idx,			     namelen, name);		} else if (bit != assert->node_idx) {			if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {				mlog(0, "master %u was found, %u should "				     "back off\n", assert->node_idx, bit);			} else {				/* with the fix for bug 569, a higher node				 * number winning the mastery will respond				 * YES to mastery requests, but this node				 * had no way of knowing.  let it pass. */				mlog(ML_ERROR, "%u is the lowest node, "				     "%u is asserting. (%.*s)  %u must "				     "have begun after %u won.\n", bit,				     assert->node_idx, namelen, name, bit,				     assert->node_idx);			}		}	}	spin_unlock(&dlm->master_lock);	/* ok everything checks out with the MLE	 * now check to see if there is a lockres */	res = __dlm_lookup_lockres(dlm, name, namelen);	if (res) {		spin_lock(&res->spinlock);		if (res->state & DLM_LOCK_RES_RECOVERING)  {			mlog(ML_ERROR, "%u asserting but %.*s is "			     "RECOVERING!\n", assert->node_idx, namelen, name);			goto kill;		}		if (!mle) {			if (res->owner != assert->node_idx) {				mlog(ML_ERROR, "assert_master from "					  "%u, but current owner is "					  "%u! (%.*s)\n",				       assert->node_idx, res->owner,				       namelen, name);				goto kill;			}		} else if (mle->type != DLM_MLE_MIGRATION) {			if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {				/* owner is just re-asserting */				if (res->owner == assert->node_idx) {					mlog(0, "owner %u re-asserting on "					     "lock %.*s\n", assert->node_idx,					     namelen, name);					goto ok;				}				mlog(ML_ERROR, "got assert_master from "				     "node %u, but %u is the owner! "				     "(%.*s)\n", assert->node_idx,				     res->owner, namelen, name);				goto kill;			}			if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {				mlog(ML_ERROR, "got assert from %u, but lock "				     "with no owner should be "				     "in-progress! (%.*s)\n",				     assert->node_idx,				     namelen, name);				goto kill;			}		} else /* mle->type == DLM_MLE_MIGRATION */ {			/* should only be getting an assert from new master */			if (assert->node_idx != mle->new_master) {				mlog(ML_ERROR, "got assert from %u, but "				     "new master is %u, and old master "				     "was %u (%.*s)\n",				     assert->node_idx, mle->new_master,				     mle->master, namelen, name);				goto kill;			}		}ok:		spin_unlock(&res->spinlock);	}	spin_unlock(&dlm->spinlock);	// mlog(0, "woo!  got an assert_master from node %u!\n",	// 	     assert->node_idx);	if (mle) {		int extra_ref;				spin_lock(&mle->spinlock);		extra_ref = !!(mle->type == DLM_MLE_BLOCK			       || mle->type == DLM_MLE_MIGRATION);		mle->master = assert->node_idx;		atomic_set(&mle->woken, 1);		wake_up(&mle->wq);		spin_unlock(&mle->spinlock);		if (mle->type == DLM_MLE_MIGRATION && res) {			mlog(0, "finishing off migration of lockres %.*s, "			     "from %u to %u\n",			       res->lockname.len, res->lockname.name,			       dlm->node_num, mle->new_master);			spin_lock(&res->spinlock);			res->state &= ~DLM_LOCK_RES_MIGRATING;			dlm_change_lockres_owner(dlm, res, mle->new_master);			BUG_ON(res->state & DLM_LOCK_RES_DIRTY);			spin_unlock(&res->spinlock);		}		/* master is known, detach if not already detached */		dlm_mle_detach_hb_events(dlm, mle);		dlm_put_mle(mle);				if (extra_ref) {			/* the assert master message now balances the extra		 	 * ref given by the master / migration request message.		 	 * if this is the last put, it will be removed		 	 * from the list. */			dlm_put_mle(mle);		}	}done:	if (res)		dlm_lockres_put(res);	dlm_put(dlm);	return 0;kill:	/* kill the caller! */	spin_unlock(&res->spinlock);	spin_unlock(&dlm->spinlock);	dlm_lockres_put(res);	mlog(ML_ERROR, "Bad message received from another node.  Dumping state "	     "and killing the other node now!  This node is OK and can continue.\n");	dlm_dump_lock_resources(dlm);	dlm_put(dlm);	return -EINVAL;}int dlm_dispatch_assert_master(struct dlm_ctxt *dlm,			       struct dlm_lock_resource *res,			       int ignore_higher, u8 request_from, u32 flags){	struct dlm_work_item *item;	item = kcalloc(1, sizeof(*item), GFP_KERNEL);	if (!item)		return -ENOMEM;	/* queue up work for dlm_assert_master_worker */	dlm_grab(dlm);  /* get an extra ref for the work item */	dlm_init_work_item(dlm, item, dlm_assert_master_worker, NULL);	item->u.am.lockres = res; /* already have a ref */	/* can optionally ignore node numbers higher than this node */	item->u.am.ignore_higher = ignore_higher;	item->u.am.request_from = request_from;	item->u.am.flags = flags;	spin_lock(&dlm->work_lock);	list_add_tail(&item->list, &dlm->work_list);	spin_unlock(&dlm->work_lock);	schedule_work(&dlm->dispatched_work);	return 0;}static void dlm_assert_master_worker(struct dlm_work_item *item, void *data){	struct dlm_ctxt *dlm = data;	int ret = 0;	struct dlm_lock_resource *res;	unsigned long nodemap[BITS_TO_LONGS(O2NM_MAX_NODES)];	int ignore_higher;	int bit;	u8 request_from;	u32 flags;	dlm = item->dlm;	res = item->u.am.lockres;	ignore_higher = item->u.am.ignore_higher;	request_from = item->u.am.request_from;	flags = item->u.am.flags;	spin_lock(&dlm->spinlock);	memcpy(nodemap, dlm->domain_map, sizeof(nodemap));	spin_unlock(&dlm->spinlock);	clear_bit(dlm->node_num, nodemap);	if (ignore_higher) {		/* if is this just to clear up mles for nodes below		 * this node, do not send the message to the original		 * caller or any node number higher than this */		clear_bit(request_from, nodemap);		bit = dlm->node_num;		while (1) {			bit = find_next_bit(nodemap, O2NM_MAX_NODES,					    bit+1);		       	if (bit >= O2NM_MAX_NODES)				break;			clear_bit(bit, nodemap);		}	}	/* this call now finishes out the nodemap	 * even if one or more nodes die */	mlog(0, "worker about to master %.*s here, this=%u\n",		     res->lockname.len, res->lockname.name, dlm->node_num);	ret = dlm_do_assert_master(dlm, res->lockname.name,				   res->lockname.len,				   nodemap, flags);	if (ret < 0) {		/* no need to restart, we are done */		mlog_errno(ret);	}	dlm_lockres_put(res);	mlog(0, "finished with dlm_assert_master_worker\n");}/* * DLM_MIGRATE_LOCKRES */int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,			u8 target){	struct dlm_master_list_entry *mle = NULL;	struct dlm_master_list_entry *oldmle = NULL; 	struct dlm_migratable_lockres *mres = NULL;	int ret = -EINVAL;	const char *name;	unsigned int namelen;	int mle_added = 0;	struct list_head *queue, *iter;	int i;	struct dlm_lock *lock;	int empty = 1;	if (!dlm_grab(dlm))		return -EINVAL;	name = res->lockname.name;	namelen = res->lockname.len;	mlog(0, "migrating %.*s to %u\n", namelen, name, target);	/*	 * ensure this lockres is a proper candidate for migration	 */	spin_lock(&res->spinlock);	if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {		mlog(0, "cannot migrate lockres with unknown owner!\n");		spin_unlock(&res->spinlock);		goto leave;	}	if (res->owner != dlm->node_num) {		mlog(0, "cannot migrate lockres this node doesn't own!\n");		spin_unlock(&res->spinlock);		goto leave;	}	mlog(0, "checking queues...\n");	queue = &res->granted;	for (i=0; i<3; i++) {		list_for_each(iter, queue) {			lock = list_entry (iter, struct dlm_lock, list);			empty = 0;			if (lock->ml.node == dlm->node_num) {				mlog(0, "found a lock owned by this node "				     "still on the %s queue!  will not "				     "migrate this lockres\n",				     i==0 ? "granted" :				     (i==1 ? "converting" : "blocked"));				spin_unlock(&res->spinlock);				ret = -ENOTEMPTY;				goto leave;			}		}		queue++;	}	mlog(0, "all locks on this lockres are nonlocal.  continuing\n");	spin_unlock(&res->spinlock);	/* no work to do */	if (empty) {		mlog(0, "no locks were found on this lockres! done!\n");		ret = 0;		goto leave;	}	/*	 * preallocate up front	 * if this fails, abort	 */	ret = -ENOMEM;	mres = (struct dlm_migratable_lockres *) __get_free_page(GFP_KERNEL);	if (!mres) {		mlog_errno(ret);		goto leave;	}	mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache,								GFP_KERNEL);	if (!mle) {		mlog_errno(ret);		goto leave;	}	ret = 0;	/*	 * find a node to migrate the lockres to	 */	mlog(0, "picking a migration node\n");	spin_lock(&dlm->spinlock);	/* pick a new node */	if (!test_bit(target, dlm->domain_map) ||	    target >= O2NM_MAX_NODES) {		target = dlm_pick_migration_target(dlm, res);	}	mlog(0, "node %u chosen for migration\n", target);	if (target >= O2NM_MAX_NODES ||	    !test_bit(target, dlm->domain_map)) {		/* target chosen is not alive */		ret = -EINVAL;	}	if (ret) {		spin_unlock(&dlm->spinlock);		goto fail;	}	mlog(0, "continuing with target = %u\n", target);	/*	 * clear any existing master requests and	 * add the migration mle to the list	 */	spin_lock(&dlm->master_lock);	ret = dlm_add_migration_mle(dlm, res, mle, &oldmle, name,				    namelen, target, dlm->node_num);	spin_unlock(&dlm->master_lock);	spin_unlock(&dlm->spinlock);	if (ret == -EEXIST) {		mlog(0, "another process is already migrating it\n");		goto fail;	}	mle_added = 1;	/*	 * set the MIGRATING flag and flush asts	 * if we fail after this we need to re-dirty the lockres	 */	if (dlm_mark_lockres_migrating(dlm, res, target) < 0) {		mlog(ML_ERROR, "tried to migrate %.*s to %u, but "		     "the target went down.\n", res->lockname.len,		     res->lockname.name, target);		spin_lock(&res->spinlock);		res->state &= ~DLM_LOCK_RES_MIGRATING;		spin_unlock(&res->spinlock);		ret = -EINVAL;	}fail:	if (oldmle) {		/* master is known, detach if not already detached */		dlm_mle_detach_hb_events(dlm, oldmle);		dlm_put_mle(oldmle);	}	if (ret < 0) {		if (mle_added) {			dlm_mle_detach_hb_events(dlm, mle);			dlm_put_mle(mle);		} else if (mle) {			kmem_cache_free(dlm_mle_cache, mle);		}		goto leave;	}	/*	 * at this point, we have a migration target, an mle	 * in the master list, and the MIGRATING flag set on	 * the lockres	 */	/* get an extra reference on the mle.	 * otherwise the assert_master from the new	 * master will destroy this.

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -