⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dlmmaster.c

📁 linux 内核源代码
💻 C
📖 第 1 页 / 共 5 页
字号:
			mlog(0, "node %u not master, response=NO\n", to);			set_bit(to, mle->response_map);			break;		case DLM_MASTER_RESP_MAYBE:			mlog(0, "node %u not master, response=MAYBE\n", to);			set_bit(to, mle->response_map);			set_bit(to, mle->maybe_map);			break;		case DLM_MASTER_RESP_ERROR:			mlog(0, "node %u hit an error, resending\n", to);			resend = 1;			response = 0;			break;		default:			mlog(ML_ERROR, "bad response! %u\n", response);			BUG();	}	spin_unlock(&mle->spinlock);	if (resend) {		/* this is also totally crude */		msleep(50);		goto again;	}out:	return ret;}/* * locks that can be taken here: * dlm->spinlock * res->spinlock * mle->spinlock * dlm->master_list * * if possible, TRIM THIS DOWN!!! */int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data,			       void **ret_data){	u8 response = DLM_MASTER_RESP_MAYBE;	struct dlm_ctxt *dlm = data;	struct dlm_lock_resource *res = NULL;	struct dlm_master_request *request = (struct dlm_master_request *) msg->buf;	struct dlm_master_list_entry *mle = NULL, *tmpmle = NULL;	char *name;	unsigned int namelen, hash;	int found, ret;	int set_maybe;	int dispatch_assert = 0;	if (!dlm_grab(dlm))		return DLM_MASTER_RESP_NO;	if (!dlm_domain_fully_joined(dlm)) {		response = DLM_MASTER_RESP_NO;		goto send_response;	}	name = request->name;	namelen = request->namelen;	hash = dlm_lockid_hash(name, namelen);	if (namelen > DLM_LOCKID_NAME_MAX) {		response = DLM_IVBUFLEN;		goto send_response;	}way_up_top:	spin_lock(&dlm->spinlock);	res = __dlm_lookup_lockres(dlm, name, namelen, hash);	if (res) {		spin_unlock(&dlm->spinlock);		/* take care of the easy cases up front */		spin_lock(&res->spinlock);		if (res->state & (DLM_LOCK_RES_RECOVERING|				  DLM_LOCK_RES_MIGRATING)) {			spin_unlock(&res->spinlock);			mlog(0, "returning DLM_MASTER_RESP_ERROR since res is "			     "being recovered/migrated\n");			response = DLM_MASTER_RESP_ERROR;			if (mle)				kmem_cache_free(dlm_mle_cache, mle);			goto send_response;		}		if (res->owner == dlm->node_num) {			mlog(0, "%s:%.*s: setting bit %u in refmap\n",			     dlm->name, namelen, name, request->node_idx);			dlm_lockres_set_refmap_bit(request->node_idx, res);			spin_unlock(&res->spinlock);			response = DLM_MASTER_RESP_YES;			if (mle)				kmem_cache_free(dlm_mle_cache, mle);			/* this node is the owner.			 * there is some extra work that needs to			 * happen now.  the requesting node has			 * caused all nodes up to this one to			 * create mles.  this node now needs to			 * go back and clean those up. */			dispatch_assert = 1;			goto send_response;		} else if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {			spin_unlock(&res->spinlock);			// mlog(0, "node %u is the master\n", res->owner);			response = DLM_MASTER_RESP_NO;			if (mle)				kmem_cache_free(dlm_mle_cache, mle);			goto send_response;		}		/* ok, there is no owner.  either this node is		 * being blocked, or it is actively trying to		 * master this lock. */		if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {			mlog(ML_ERROR, "lock with no owner should be "			     "in-progress!\n");			BUG();		}		// mlog(0, "lockres is in progress...\n");		spin_lock(&dlm->master_lock);		found = dlm_find_mle(dlm, &tmpmle, name, namelen);		if (!found) {			mlog(ML_ERROR, "no mle found for this lock!\n");			BUG();		}		set_maybe = 1;		spin_lock(&tmpmle->spinlock);		if (tmpmle->type == DLM_MLE_BLOCK) {			// mlog(0, "this node is waiting for "			// "lockres to be mastered\n");			response = DLM_MASTER_RESP_NO;		} else if (tmpmle->type == DLM_MLE_MIGRATION) {			mlog(0, "node %u is master, but trying to migrate to "			     "node %u.\n", tmpmle->master, tmpmle->new_master);			if (tmpmle->master == dlm->node_num) {				mlog(ML_ERROR, "no owner on lockres, but this "				     "node is trying to migrate it to %u?!\n",				     tmpmle->new_master);				BUG();			} else {				/* the real master can respond on its own */				response = DLM_MASTER_RESP_NO;			}		} else if (tmpmle->master != DLM_LOCK_RES_OWNER_UNKNOWN) {			set_maybe = 0;			if (tmpmle->master == dlm->node_num) {				response = DLM_MASTER_RESP_YES;				/* this node will be the owner.				 * go back and clean the mles on any				 * other nodes */				dispatch_assert = 1;				dlm_lockres_set_refmap_bit(request->node_idx, res);				mlog(0, "%s:%.*s: setting bit %u in refmap\n",				     dlm->name, namelen, name,				     request->node_idx);			} else				response = DLM_MASTER_RESP_NO;		} else {			// mlog(0, "this node is attempting to "			// "master lockres\n");			response = DLM_MASTER_RESP_MAYBE;		}		if (set_maybe)			set_bit(request->node_idx, tmpmle->maybe_map);		spin_unlock(&tmpmle->spinlock);		spin_unlock(&dlm->master_lock);		spin_unlock(&res->spinlock);		/* keep the mle attached to heartbeat events */		dlm_put_mle(tmpmle);		if (mle)			kmem_cache_free(dlm_mle_cache, mle);		goto send_response;	}	/*	 * lockres doesn't exist on this node	 * if there is an MLE_BLOCK, return NO	 * if there is an MLE_MASTER, return MAYBE	 * otherwise, add an MLE_BLOCK, return NO	 */	spin_lock(&dlm->master_lock);	found = dlm_find_mle(dlm, &tmpmle, name, namelen);	if (!found) {		/* this lockid has never been seen on this node yet */		// mlog(0, "no mle found\n");		if (!mle) {			spin_unlock(&dlm->master_lock);			spin_unlock(&dlm->spinlock);			mle = (struct dlm_master_list_entry *)				kmem_cache_alloc(dlm_mle_cache, GFP_NOFS);			if (!mle) {				response = DLM_MASTER_RESP_ERROR;				mlog_errno(-ENOMEM);				goto send_response;			}			goto way_up_top;		}		// mlog(0, "this is second time thru, already allocated, "		// "add the block.\n");		dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL, name, namelen);		set_bit(request->node_idx, mle->maybe_map);		list_add(&mle->list, &dlm->master_list);		response = DLM_MASTER_RESP_NO;	} else {		// mlog(0, "mle was found\n");		set_maybe = 1;		spin_lock(&tmpmle->spinlock);		if (tmpmle->master == dlm->node_num) {			mlog(ML_ERROR, "no lockres, but an mle with this node as master!\n");			BUG();		}		if (tmpmle->type == DLM_MLE_BLOCK)			response = DLM_MASTER_RESP_NO;		else if (tmpmle->type == DLM_MLE_MIGRATION) {			mlog(0, "migration mle was found (%u->%u)\n",			     tmpmle->master, tmpmle->new_master);			/* real master can respond on its own */			response = DLM_MASTER_RESP_NO;		} else			response = DLM_MASTER_RESP_MAYBE;		if (set_maybe)			set_bit(request->node_idx, tmpmle->maybe_map);		spin_unlock(&tmpmle->spinlock);	}	spin_unlock(&dlm->master_lock);	spin_unlock(&dlm->spinlock);	if (found) {		/* keep the mle attached to heartbeat events */		dlm_put_mle(tmpmle);	}send_response:	if (dispatch_assert) {		if (response != DLM_MASTER_RESP_YES)			mlog(ML_ERROR, "invalid response %d\n", response);		if (!res) {			mlog(ML_ERROR, "bad lockres while trying to assert!\n");			BUG();		}		mlog(0, "%u is the owner of %.*s, cleaning everyone else\n",			     dlm->node_num, res->lockname.len, res->lockname.name);		ret = dlm_dispatch_assert_master(dlm, res, 0, request->node_idx, 						 DLM_ASSERT_MASTER_MLE_CLEANUP);		if (ret < 0) {			mlog(ML_ERROR, "failed to dispatch assert master work\n");			response = DLM_MASTER_RESP_ERROR;		}	}	dlm_put(dlm);	return response;}/* * DLM_ASSERT_MASTER_MSG *//* * NOTE: this can be used for debugging * can periodically run all locks owned by this node * and re-assert across the cluster... */int dlm_do_assert_master(struct dlm_ctxt *dlm,			 struct dlm_lock_resource *res,			 void *nodemap, u32 flags){	struct dlm_assert_master assert;	int to, tmpret;	struct dlm_node_iter iter;	int ret = 0;	int reassert;	const char *lockname = res->lockname.name;	unsigned int namelen = res->lockname.len;	BUG_ON(namelen > O2NM_MAX_NAME_LEN);	spin_lock(&res->spinlock);	res->state |= DLM_LOCK_RES_SETREF_INPROG;	spin_unlock(&res->spinlock);again:	reassert = 0;	/* note that if this nodemap is empty, it returns 0 */	dlm_node_iter_init(nodemap, &iter);	while ((to = dlm_node_iter_next(&iter)) >= 0) {		int r = 0;		struct dlm_master_list_entry *mle = NULL;		mlog(0, "sending assert master to %d (%.*s)\n", to,		     namelen, lockname);		memset(&assert, 0, sizeof(assert));		assert.node_idx = dlm->node_num;		assert.namelen = namelen;		memcpy(assert.name, lockname, namelen);		assert.flags = cpu_to_be32(flags);		tmpret = o2net_send_message(DLM_ASSERT_MASTER_MSG, dlm->key,					    &assert, sizeof(assert), to, &r);		if (tmpret < 0) {			mlog(0, "assert_master returned %d!\n", tmpret);			if (!dlm_is_host_down(tmpret)) {				mlog(ML_ERROR, "unhandled error=%d!\n", tmpret);				BUG();			}			/* a node died.  finish out the rest of the nodes. */			mlog(0, "link to %d went down!\n", to);			/* any nonzero status return will do */			ret = tmpret;			r = 0;		} else if (r < 0) {			/* ok, something horribly messed.  kill thyself. */			mlog(ML_ERROR,"during assert master of %.*s to %u, "			     "got %d.\n", namelen, lockname, to, r);			spin_lock(&dlm->spinlock);			spin_lock(&dlm->master_lock);			if (dlm_find_mle(dlm, &mle, (char *)lockname,					 namelen)) {				dlm_print_one_mle(mle);				__dlm_put_mle(mle);			}			spin_unlock(&dlm->master_lock);			spin_unlock(&dlm->spinlock);			BUG();		}		if (r & DLM_ASSERT_RESPONSE_REASSERT &&		    !(r & DLM_ASSERT_RESPONSE_MASTERY_REF)) {				mlog(ML_ERROR, "%.*s: very strange, "				     "master MLE but no lockres on %u\n",				     namelen, lockname, to);		}		if (r & DLM_ASSERT_RESPONSE_REASSERT) {			mlog(0, "%.*s: node %u create mles on other "			     "nodes and requests a re-assert\n", 			     namelen, lockname, to);			reassert = 1;		}		if (r & DLM_ASSERT_RESPONSE_MASTERY_REF) {			mlog(0, "%.*s: node %u has a reference to this "			     "lockres, set the bit in the refmap\n",			     namelen, lockname, to);			spin_lock(&res->spinlock);			dlm_lockres_set_refmap_bit(to, res);			spin_unlock(&res->spinlock);		}	}	if (reassert)		goto again;	spin_lock(&res->spinlock);	res->state &= ~DLM_LOCK_RES_SETREF_INPROG;	spin_unlock(&res->spinlock);	wake_up(&res->wq);	return ret;}/* * locks that can be taken here: * dlm->spinlock * res->spinlock * mle->spinlock * dlm->master_list * * if possible, TRIM THIS DOWN!!! */int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data,			      void **ret_data){	struct dlm_ctxt *dlm = data;	struct dlm_master_list_entry *mle = NULL;	struct dlm_assert_master *assert = (struct dlm_assert_master *)msg->buf;	struct dlm_lock_resource *res = NULL;	char *name;	unsigned int namelen, hash;	u32 flags;	int master_request = 0, have_lockres_ref = 0;	int ret = 0;	if (!dlm_grab(dlm))		return 0;	name = assert->name;	namelen = assert->namelen;	hash = dlm_lockid_hash(name, namelen);	flags = be32_to_cpu(assert->flags);	if (namelen > DLM_LOCKID_NAME_MAX) {		mlog(ML_ERROR, "Invalid name length!");		goto done;	}	spin_lock(&dlm->spinlock);	if (flags)		mlog(0, "assert_master with flags: %u\n", flags);	/* find the MLE */	spin_lock(&dlm->master_lock);	if (!dlm_find_mle(dlm, &mle, name, namelen)) {		/* not an error, could be master just re-asserting */		mlog(0, "just got an assert_master from %u, but no "		     "MLE for it! (%.*s)\n", assert->node_idx,		     namelen, name);	} else {		int bit = find_next_bit (mle->maybe_map, O2NM_MAX_NODES, 0);		if (bit >= O2NM_MAX_NODES) {			/* not necessarily an error, though less likely.			 * could be master just re-asserting. */			mlog(0, "no bits set in the maybe_map, but %u "			     "is asserting! (%.*s)\n", assert->node_idx,			     namelen, name);		} else if (bit != assert->node_idx) {			if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {				mlog(0, "master %u was found, %u should "				     "back off\n", assert->node_idx, bit);			} else {				/* with the fix for bug 569, a higher node				 * number winning the mastery will respond				 * YES to mastery requests, but this node				 * had no way of knowing.  let it pass. */				mlog(0, "%u is the lowest node, "				     "%u is asserting. (%.*s)  %u must "				     "have begun after %u won.\n", bit,				     assert->node_idx, namelen, name, bit,				     assert->node_idx);			}		}		if (mle->type == DLM_MLE_MIGRATION) {			if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) {				mlog(0, "%s:%.*s: got cleanup assert"				     " from %u for migration\n",				     dlm->name, namelen, name,				     assert->node_idx);			} else if (!(flags & DLM_ASSERT_MASTER_FINISH_MIGRATION)) {				mlog(0, "%s:%.*s: got unrelated assert"				     " from %u for migration, ignoring\n",				     dlm->name, namelen, name,				     assert->node_idx);				__dlm_put_mle(mle);				spin_unlock(&dlm->master_lock);				spin_unlock(&dlm->spinlock);				goto done;			}			}	}	spin_unlock(&dlm->master_lock);	/* ok everything checks out with the MLE	 * now check to see if there is a lockres */	res = __dlm_lookup_lockres(dlm, name, namelen, hash);	if (res) {		spin_lock(&res->spinlock);		if (res->state & DLM_LOCK_RES_RECOVERING)  {			mlog(ML_ERROR, "%u asserting but %.*s is "			     "RECOVERING!\n", assert->node_idx, namelen, name);			goto kill;		}		if (!mle) {			if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN &&			    res->owner != assert->node_idx) {				mlog(ML_ERROR, "assert_master from "

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -