⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dlmmaster.c

📁 linux2.6.16版本
💻 C
📖 第 1 页 / 共 5 页
字号:
	int curnode;	unsigned long *orig_bm;	unsigned long *cur_bm;	unsigned long diff_bm[BITS_TO_LONGS(O2NM_MAX_NODES)];};enum dlm_node_state_change{	NODE_DOWN = -1,	NODE_NO_CHANGE = 0,	NODE_UP};static void dlm_bitmap_diff_iter_init(struct dlm_bitmap_diff_iter *iter,				      unsigned long *orig_bm,				      unsigned long *cur_bm){	unsigned long p1, p2;	int i;	iter->curnode = -1;	iter->orig_bm = orig_bm;	iter->cur_bm = cur_bm;	for (i = 0; i < BITS_TO_LONGS(O2NM_MAX_NODES); i++) {       		p1 = *(iter->orig_bm + i);	       	p2 = *(iter->cur_bm + i);		iter->diff_bm[i] = (p1 & ~p2) | (p2 & ~p1);	}}static int dlm_bitmap_diff_iter_next(struct dlm_bitmap_diff_iter *iter,				     enum dlm_node_state_change *state){	int bit;	if (iter->curnode >= O2NM_MAX_NODES)		return -ENOENT;	bit = find_next_bit(iter->diff_bm, O2NM_MAX_NODES,			    iter->curnode+1);	if (bit >= O2NM_MAX_NODES) {		iter->curnode = O2NM_MAX_NODES;		return -ENOENT;	}	/* if it was there in the original then this node died */	if (test_bit(bit, iter->orig_bm))		*state = NODE_DOWN;	else		*state = NODE_UP;	iter->curnode = bit;	return bit;}static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,				    struct dlm_lock_resource *res,				    struct dlm_master_list_entry *mle,				    int blocked){	struct dlm_bitmap_diff_iter bdi;	enum dlm_node_state_change sc;	int node;	int ret = 0;	mlog(0, "something happened such that the "	     "master process may need to be restarted!\n");	assert_spin_locked(&mle->spinlock);	dlm_bitmap_diff_iter_init(&bdi, mle->vote_map, mle->node_map);	node = dlm_bitmap_diff_iter_next(&bdi, &sc);	while (node >= 0) {		if (sc == NODE_UP) {			/* a node came up.  clear any old vote from			 * the response map and set it in the vote map			 * then restart the mastery. */			mlog(ML_NOTICE, "node %d up while restarting\n", node);			/* redo the master request, but only for the new node */			mlog(0, "sending request to new node\n");			clear_bit(node, mle->response_map);			set_bit(node, mle->vote_map);		} else {			mlog(ML_ERROR, "node down! %d\n", node);			/* if the node wasn't involved in mastery skip it,			 * but clear it out from the maps so that it will			 * not affect mastery of this lockres */			clear_bit(node, mle->response_map);			clear_bit(node, mle->vote_map);			if (!test_bit(node, mle->maybe_map))				goto next;			/* if we're already blocked on lock mastery, and the			 * dead node wasn't the expected master, or there is			 * another node in the maybe_map, keep waiting */			if (blocked) {				int lowest = find_next_bit(mle->maybe_map,						       O2NM_MAX_NODES, 0);				/* act like it was never there */				clear_bit(node, mle->maybe_map);			       	if (node != lowest)					goto next;				mlog(ML_ERROR, "expected master %u died while "				     "this node was blocked waiting on it!\n",				     node);				lowest = find_next_bit(mle->maybe_map,						       O2NM_MAX_NODES,						       lowest+1);				if (lowest < O2NM_MAX_NODES) {					mlog(0, "still blocked. waiting "					     "on %u now\n", lowest);					goto next;				}				/* mle is an MLE_BLOCK, but there is now				 * nothing left to block on.  we need to return				 * all the way back out and try again with				 * an MLE_MASTER. dlm_do_local_recovery_cleanup				 * has already run, so the mle refcount is ok */				mlog(0, "no longer blocking. we can "				     "try to master this here\n");				mle->type = DLM_MLE_MASTER;				memset(mle->maybe_map, 0,				       sizeof(mle->maybe_map));				memset(mle->response_map, 0,				       sizeof(mle->maybe_map));				memcpy(mle->vote_map, mle->node_map,				       sizeof(mle->node_map));				mle->u.res = res;				set_bit(dlm->node_num, mle->maybe_map);				ret = -EAGAIN;				goto next;			}			clear_bit(node, mle->maybe_map);			if (node > dlm->node_num)				goto next;			mlog(0, "dead node in map!\n");			/* yuck. go back and re-contact all nodes			 * in the vote_map, removing this node. */			memset(mle->response_map, 0,			       sizeof(mle->response_map));		}		ret = -EAGAIN;next:		node = dlm_bitmap_diff_iter_next(&bdi, &sc);	}	return ret;}/* * DLM_MASTER_REQUEST_MSG * * returns: 0 on success, *          -errno on a network error * * on error, the caller should assume the target node is "dead" * */static int dlm_do_master_request(struct dlm_master_list_entry *mle, int to){	struct dlm_ctxt *dlm = mle->dlm;	struct dlm_master_request request;	int ret, response=0, resend;	memset(&request, 0, sizeof(request));	request.node_idx = dlm->node_num;	BUG_ON(mle->type == DLM_MLE_MIGRATION);	if (mle->type != DLM_MLE_MASTER) {		request.namelen = mle->u.name.len;		memcpy(request.name, mle->u.name.name, request.namelen);	} else {		request.namelen = mle->u.res->lockname.len;		memcpy(request.name, mle->u.res->lockname.name,			request.namelen);	}again:	ret = o2net_send_message(DLM_MASTER_REQUEST_MSG, dlm->key, &request,				 sizeof(request), to, &response);	if (ret < 0)  {		if (ret == -ESRCH) {			/* should never happen */			mlog(ML_ERROR, "TCP stack not ready!\n");			BUG();		} else if (ret == -EINVAL) {			mlog(ML_ERROR, "bad args passed to o2net!\n");			BUG();		} else if (ret == -ENOMEM) {			mlog(ML_ERROR, "out of memory while trying to send "			     "network message!  retrying\n");			/* this is totally crude */			msleep(50);			goto again;		} else if (!dlm_is_host_down(ret)) {			/* not a network error. bad. */			mlog_errno(ret);			mlog(ML_ERROR, "unhandled error!");			BUG();		}		/* all other errors should be network errors,		 * and likely indicate node death */		mlog(ML_ERROR, "link to %d went down!\n", to);		goto out;	}	ret = 0;	resend = 0;	spin_lock(&mle->spinlock);	switch (response) {		case DLM_MASTER_RESP_YES:			set_bit(to, mle->response_map);			mlog(0, "node %u is the master, response=YES\n", to);			mle->master = to;			break;		case DLM_MASTER_RESP_NO:			mlog(0, "node %u not master, response=NO\n", to);			set_bit(to, mle->response_map);			break;		case DLM_MASTER_RESP_MAYBE:			mlog(0, "node %u not master, response=MAYBE\n", to);			set_bit(to, mle->response_map);			set_bit(to, mle->maybe_map);			break;		case DLM_MASTER_RESP_ERROR:			mlog(0, "node %u hit an error, resending\n", to);			resend = 1;			response = 0;			break;		default:			mlog(ML_ERROR, "bad response! %u\n", response);			BUG();	}	spin_unlock(&mle->spinlock);	if (resend) {		/* this is also totally crude */		msleep(50);		goto again;	}out:	return ret;}/* * locks that can be taken here: * dlm->spinlock * res->spinlock * mle->spinlock * dlm->master_list * * if possible, TRIM THIS DOWN!!! */int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data){	u8 response = DLM_MASTER_RESP_MAYBE;	struct dlm_ctxt *dlm = data;	struct dlm_lock_resource *res;	struct dlm_master_request *request = (struct dlm_master_request *) msg->buf;	struct dlm_master_list_entry *mle = NULL, *tmpmle = NULL;	char *name;	unsigned int namelen;	int found, ret;	int set_maybe;	if (!dlm_grab(dlm))		return DLM_MASTER_RESP_NO;	if (!dlm_domain_fully_joined(dlm)) {		response = DLM_MASTER_RESP_NO;		goto send_response;	}	name = request->name;	namelen = request->namelen;	if (namelen > DLM_LOCKID_NAME_MAX) {		response = DLM_IVBUFLEN;		goto send_response;	}way_up_top:	spin_lock(&dlm->spinlock);	res = __dlm_lookup_lockres(dlm, name, namelen);	if (res) {		spin_unlock(&dlm->spinlock);		/* take care of the easy cases up front */		spin_lock(&res->spinlock);		if (res->state & DLM_LOCK_RES_RECOVERING) {			spin_unlock(&res->spinlock);			mlog(0, "returning DLM_MASTER_RESP_ERROR since res is "			     "being recovered\n");			response = DLM_MASTER_RESP_ERROR;			if (mle)				kmem_cache_free(dlm_mle_cache, mle);			goto send_response;		}		if (res->owner == dlm->node_num) {			u32 flags = DLM_ASSERT_MASTER_MLE_CLEANUP;			spin_unlock(&res->spinlock);			// mlog(0, "this node is the master\n");			response = DLM_MASTER_RESP_YES;			if (mle)				kmem_cache_free(dlm_mle_cache, mle);			/* this node is the owner.			 * there is some extra work that needs to			 * happen now.  the requesting node has			 * caused all nodes up to this one to			 * create mles.  this node now needs to			 * go back and clean those up. */			mlog(0, "%u is the owner of %.*s, cleaning everyone else\n",			     dlm->node_num, res->lockname.len, res->lockname.name);			ret = dlm_dispatch_assert_master(dlm, res, 1,							 request->node_idx,							 flags);			if (ret < 0) {				mlog(ML_ERROR, "failed to dispatch assert "				     "master work\n");				response = DLM_MASTER_RESP_ERROR;			}			goto send_response;		} else if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {			spin_unlock(&res->spinlock);			// mlog(0, "node %u is the master\n", res->owner);			response = DLM_MASTER_RESP_NO;			if (mle)				kmem_cache_free(dlm_mle_cache, mle);			goto send_response;		}		/* ok, there is no owner.  either this node is		 * being blocked, or it is actively trying to		 * master this lock. */		if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {			mlog(ML_ERROR, "lock with no owner should be "			     "in-progress!\n");			BUG();		}		// mlog(0, "lockres is in progress...\n");		spin_lock(&dlm->master_lock);		found = dlm_find_mle(dlm, &tmpmle, name, namelen);		if (!found) {			mlog(ML_ERROR, "no mle found for this lock!\n");			BUG();		}		set_maybe = 1;		spin_lock(&tmpmle->spinlock);		if (tmpmle->type == DLM_MLE_BLOCK) {			// mlog(0, "this node is waiting for "			// "lockres to be mastered\n");			response = DLM_MASTER_RESP_NO;		} else if (tmpmle->type == DLM_MLE_MIGRATION) {			mlog(0, "node %u is master, but trying to migrate to "			     "node %u.\n", tmpmle->master, tmpmle->new_master);			if (tmpmle->master == dlm->node_num) {				response = DLM_MASTER_RESP_YES;				mlog(ML_ERROR, "no owner on lockres, but this "				     "node is trying to migrate it to %u?!\n",				     tmpmle->new_master);				BUG();			} else {				/* the real master can respond on its own */				response = DLM_MASTER_RESP_NO;			}		} else if (tmpmle->master != DLM_LOCK_RES_OWNER_UNKNOWN) {			set_maybe = 0;			if (tmpmle->master == dlm->node_num)				response = DLM_MASTER_RESP_YES;			else				response = DLM_MASTER_RESP_NO;		} else {			// mlog(0, "this node is attempting to "			// "master lockres\n");			response = DLM_MASTER_RESP_MAYBE;		}		if (set_maybe)			set_bit(request->node_idx, tmpmle->maybe_map);		spin_unlock(&tmpmle->spinlock);		spin_unlock(&dlm->master_lock);		spin_unlock(&res->spinlock);		/* keep the mle attached to heartbeat events */		dlm_put_mle(tmpmle);		if (mle)			kmem_cache_free(dlm_mle_cache, mle);		goto send_response;	}	/*	 * lockres doesn't exist on this node	 * if there is an MLE_BLOCK, return NO	 * if there is an MLE_MASTER, return MAYBE	 * otherwise, add an MLE_BLOCK, return NO	 */	spin_lock(&dlm->master_lock);	found = dlm_find_mle(dlm, &tmpmle, name, namelen);	if (!found) {		/* this lockid has never been seen on this node yet */		// mlog(0, "no mle found\n");		if (!mle) {			spin_unlock(&dlm->master_lock);			spin_unlock(&dlm->spinlock);			mle = (struct dlm_master_list_entry *)				kmem_cache_alloc(dlm_mle_cache, GFP_KERNEL);			if (!mle) {				// bad bad bad... this sucks.				response = DLM_MASTER_RESP_ERROR;				goto send_response;			}			spin_lock(&dlm->spinlock);			dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL,					 name, namelen);			spin_unlock(&dlm->spinlock);			goto way_up_top;		}		// mlog(0, "this is second time thru, already allocated, "		// "add the block.\n");		set_bit(request->node_idx, mle->maybe_map);		list_add(&mle->list, &dlm->master_list);		response = DLM_MASTER_RESP_NO;	} else {		// mlog(0, "mle was found\n");		set_maybe = 1;		spin_lock(&tmpmle->spinlock);		if (tmpmle->type == DLM_MLE_BLOCK)			response = DLM_MASTER_RESP_NO;		else if (tmpmle->type == DLM_MLE_MIGRATION) {			mlog(0, "migration mle was found (%u->%u)\n",			     tmpmle->master, tmpmle->new_master);			if (tmpmle->master == dlm->node_num) {				mlog(ML_ERROR, "no lockres, but migration mle "				     "says that this node is master!\n");				BUG();			}			/* real master can respond on its own */			response = DLM_MASTER_RESP_NO;		} else {			if (tmpmle->master == dlm->node_num) {				response = DLM_MASTER_RESP_YES;				set_maybe = 0;			} else				response = DLM_MASTER_RESP_MAYBE;		}		if (set_maybe)			set_bit(request->node_idx, tmpmle->maybe_map);		spin_unlock(&tmpmle->spinlock);	}	spin_unlock(&dlm->master_lock);	spin_unlock(&dlm->spinlock);	if (found) {		/* keep the mle attached to heartbeat events */		dlm_put_mle(tmpmle);	}send_response:	dlm_put(dlm);	return response;}/* * DLM_ASSERT_MASTER_MSG *//* * NOTE: this can be used for debugging * can periodically run all locks owned by this node * and re-assert across the cluster...

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -