⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dlmmaster.c

📁 LINUX 2.6.17.4的源码
💻 C
📖 第 1 页 / 共 5 页
字号:
	}	if (m != O2NM_MAX_NODES) {		/* another node has done an assert!		 * all done! */		sleep = 0;	} else {		sleep = 1;		/* have all nodes responded? */		if (voting_done && !*blocked) {			bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);			if (dlm->node_num <= bit) {				/* my node number is lowest.			 	 * now tell other nodes that I am				 * mastering this. */				mle->master = dlm->node_num;				assert = 1;				sleep = 0;			}			/* if voting is done, but we have not received			 * an assert master yet, we must sleep */		}	}	spin_unlock(&mle->spinlock);	/* sleep if we haven't finished voting yet */	if (sleep) {		unsigned long timeo = msecs_to_jiffies(DLM_MASTERY_TIMEOUT_MS);		/*		if (atomic_read(&mle->mle_refs.refcount) < 2)			mlog(ML_ERROR, "mle (%p) refs=%d, name=%.*s\n", mle,			atomic_read(&mle->mle_refs.refcount),			res->lockname.len, res->lockname.name);		*/		atomic_set(&mle->woken, 0);		(void)wait_event_timeout(mle->wq,					 (atomic_read(&mle->woken) == 1),					 timeo);		if (res->owner == O2NM_MAX_NODES) {			mlog(0, "waiting again\n");			goto recheck;		}		mlog(0, "done waiting, master is %u\n", res->owner);		ret = 0;		goto leave;	}	ret = 0;   /* done */	if (assert) {		m = dlm->node_num;		mlog(0, "about to master %.*s here, this=%u\n",		     res->lockname.len, res->lockname.name, m);		ret = dlm_do_assert_master(dlm, res->lockname.name,					   res->lockname.len, mle->vote_map, 0);		if (ret) {			/* This is a failure in the network path,			 * not in the response to the assert_master			 * (any nonzero response is a BUG on this node).			 * Most likely a socket just got disconnected			 * due to node death. */			mlog_errno(ret);		}		/* no longer need to restart lock mastery.		 * all living nodes have been contacted. */		ret = 0;	}	/* set the lockres owner */	spin_lock(&res->spinlock);	dlm_change_lockres_owner(dlm, res, m);	spin_unlock(&res->spinlock);leave:	return ret;}struct dlm_bitmap_diff_iter{	int curnode;	unsigned long *orig_bm;	unsigned long *cur_bm;	unsigned long diff_bm[BITS_TO_LONGS(O2NM_MAX_NODES)];};enum dlm_node_state_change{	NODE_DOWN = -1,	NODE_NO_CHANGE = 0,	NODE_UP};static void dlm_bitmap_diff_iter_init(struct dlm_bitmap_diff_iter *iter,				      unsigned long *orig_bm,				      unsigned long *cur_bm){	unsigned long p1, p2;	int i;	iter->curnode = -1;	iter->orig_bm = orig_bm;	iter->cur_bm = cur_bm;	for (i = 0; i < BITS_TO_LONGS(O2NM_MAX_NODES); i++) {       		p1 = *(iter->orig_bm + i);	       	p2 = *(iter->cur_bm + i);		iter->diff_bm[i] = (p1 & ~p2) | (p2 & ~p1);	}}static int dlm_bitmap_diff_iter_next(struct dlm_bitmap_diff_iter *iter,				     enum dlm_node_state_change *state){	int bit;	if (iter->curnode >= O2NM_MAX_NODES)		return -ENOENT;	bit = find_next_bit(iter->diff_bm, O2NM_MAX_NODES,			    iter->curnode+1);	if (bit >= O2NM_MAX_NODES) {		iter->curnode = O2NM_MAX_NODES;		return -ENOENT;	}	/* if it was there in the original then this node died */	if (test_bit(bit, iter->orig_bm))		*state = NODE_DOWN;	else		*state = NODE_UP;	iter->curnode = bit;	return bit;}static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,				    struct dlm_lock_resource *res,				    struct dlm_master_list_entry *mle,				    int blocked){	struct dlm_bitmap_diff_iter bdi;	enum dlm_node_state_change sc;	int node;	int ret = 0;	mlog(0, "something happened such that the "	     "master process may need to be restarted!\n");	assert_spin_locked(&mle->spinlock);	dlm_bitmap_diff_iter_init(&bdi, mle->vote_map, mle->node_map);	node = dlm_bitmap_diff_iter_next(&bdi, &sc);	while (node >= 0) {		if (sc == NODE_UP) {			/* a node came up.  clear any old vote from			 * the response map and set it in the vote map			 * then restart the mastery. */			mlog(ML_NOTICE, "node %d up while restarting\n", node);			/* redo the master request, but only for the new node */			mlog(0, "sending request to new node\n");			clear_bit(node, mle->response_map);			set_bit(node, mle->vote_map);		} else {			mlog(ML_ERROR, "node down! %d\n", node);			/* if the node wasn't involved in mastery skip it,			 * but clear it out from the maps so that it will			 * not affect mastery of this lockres */			clear_bit(node, mle->response_map);			clear_bit(node, mle->vote_map);			if (!test_bit(node, mle->maybe_map))				goto next;			/* if we're already blocked on lock mastery, and the			 * dead node wasn't the expected master, or there is			 * another node in the maybe_map, keep waiting */			if (blocked) {				int lowest = find_next_bit(mle->maybe_map,						       O2NM_MAX_NODES, 0);				/* act like it was never there */				clear_bit(node, mle->maybe_map);			       	if (node != lowest)					goto next;				mlog(ML_ERROR, "expected master %u died while "				     "this node was blocked waiting on it!\n",				     node);				lowest = find_next_bit(mle->maybe_map,						       O2NM_MAX_NODES,						       lowest+1);				if (lowest < O2NM_MAX_NODES) {					mlog(0, "still blocked. waiting "					     "on %u now\n", lowest);					goto next;				}				/* mle is an MLE_BLOCK, but there is now				 * nothing left to block on.  we need to return				 * all the way back out and try again with				 * an MLE_MASTER. dlm_do_local_recovery_cleanup				 * has already run, so the mle refcount is ok */				mlog(0, "no longer blocking. we can "				     "try to master this here\n");				mle->type = DLM_MLE_MASTER;				memset(mle->maybe_map, 0,				       sizeof(mle->maybe_map));				memset(mle->response_map, 0,				       sizeof(mle->maybe_map));				memcpy(mle->vote_map, mle->node_map,				       sizeof(mle->node_map));				mle->u.res = res;				set_bit(dlm->node_num, mle->maybe_map);				ret = -EAGAIN;				goto next;			}			clear_bit(node, mle->maybe_map);			if (node > dlm->node_num)				goto next;			mlog(0, "dead node in map!\n");			/* yuck. go back and re-contact all nodes			 * in the vote_map, removing this node. */			memset(mle->response_map, 0,			       sizeof(mle->response_map));		}		ret = -EAGAIN;next:		node = dlm_bitmap_diff_iter_next(&bdi, &sc);	}	return ret;}/* * DLM_MASTER_REQUEST_MSG * * returns: 0 on success, *          -errno on a network error * * on error, the caller should assume the target node is "dead" * */static int dlm_do_master_request(struct dlm_master_list_entry *mle, int to){	struct dlm_ctxt *dlm = mle->dlm;	struct dlm_master_request request;	int ret, response=0, resend;	memset(&request, 0, sizeof(request));	request.node_idx = dlm->node_num;	BUG_ON(mle->type == DLM_MLE_MIGRATION);	if (mle->type != DLM_MLE_MASTER) {		request.namelen = mle->u.name.len;		memcpy(request.name, mle->u.name.name, request.namelen);	} else {		request.namelen = mle->u.res->lockname.len;		memcpy(request.name, mle->u.res->lockname.name,			request.namelen);	}again:	ret = o2net_send_message(DLM_MASTER_REQUEST_MSG, dlm->key, &request,				 sizeof(request), to, &response);	if (ret < 0)  {		if (ret == -ESRCH) {			/* should never happen */			mlog(ML_ERROR, "TCP stack not ready!\n");			BUG();		} else if (ret == -EINVAL) {			mlog(ML_ERROR, "bad args passed to o2net!\n");			BUG();		} else if (ret == -ENOMEM) {			mlog(ML_ERROR, "out of memory while trying to send "			     "network message!  retrying\n");			/* this is totally crude */			msleep(50);			goto again;		} else if (!dlm_is_host_down(ret)) {			/* not a network error. bad. */			mlog_errno(ret);			mlog(ML_ERROR, "unhandled error!");			BUG();		}		/* all other errors should be network errors,		 * and likely indicate node death */		mlog(ML_ERROR, "link to %d went down!\n", to);		goto out;	}	ret = 0;	resend = 0;	spin_lock(&mle->spinlock);	switch (response) {		case DLM_MASTER_RESP_YES:			set_bit(to, mle->response_map);			mlog(0, "node %u is the master, response=YES\n", to);			mle->master = to;			break;		case DLM_MASTER_RESP_NO:			mlog(0, "node %u not master, response=NO\n", to);			set_bit(to, mle->response_map);			break;		case DLM_MASTER_RESP_MAYBE:			mlog(0, "node %u not master, response=MAYBE\n", to);			set_bit(to, mle->response_map);			set_bit(to, mle->maybe_map);			break;		case DLM_MASTER_RESP_ERROR:			mlog(0, "node %u hit an error, resending\n", to);			resend = 1;			response = 0;			break;		default:			mlog(ML_ERROR, "bad response! %u\n", response);			BUG();	}	spin_unlock(&mle->spinlock);	if (resend) {		/* this is also totally crude */		msleep(50);		goto again;	}out:	return ret;}/* * locks that can be taken here: * dlm->spinlock * res->spinlock * mle->spinlock * dlm->master_list * * if possible, TRIM THIS DOWN!!! */int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data){	u8 response = DLM_MASTER_RESP_MAYBE;	struct dlm_ctxt *dlm = data;	struct dlm_lock_resource *res = NULL;	struct dlm_master_request *request = (struct dlm_master_request *) msg->buf;	struct dlm_master_list_entry *mle = NULL, *tmpmle = NULL;	char *name;	unsigned int namelen;	int found, ret;	int set_maybe;	int dispatch_assert = 0;	if (!dlm_grab(dlm))		return DLM_MASTER_RESP_NO;	if (!dlm_domain_fully_joined(dlm)) {		response = DLM_MASTER_RESP_NO;		goto send_response;	}	name = request->name;	namelen = request->namelen;	if (namelen > DLM_LOCKID_NAME_MAX) {		response = DLM_IVBUFLEN;		goto send_response;	}way_up_top:	spin_lock(&dlm->spinlock);	res = __dlm_lookup_lockres(dlm, name, namelen);	if (res) {		spin_unlock(&dlm->spinlock);		/* take care of the easy cases up front */		spin_lock(&res->spinlock);		if (res->state & DLM_LOCK_RES_RECOVERING) {			spin_unlock(&res->spinlock);			mlog(0, "returning DLM_MASTER_RESP_ERROR since res is "			     "being recovered\n");			response = DLM_MASTER_RESP_ERROR;			if (mle)				kmem_cache_free(dlm_mle_cache, mle);			goto send_response;		}		if (res->owner == dlm->node_num) {			spin_unlock(&res->spinlock);			// mlog(0, "this node is the master\n");			response = DLM_MASTER_RESP_YES;			if (mle)				kmem_cache_free(dlm_mle_cache, mle);			/* this node is the owner.			 * there is some extra work that needs to			 * happen now.  the requesting node has			 * caused all nodes up to this one to			 * create mles.  this node now needs to			 * go back and clean those up. */			dispatch_assert = 1;			goto send_response;		} else if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {			spin_unlock(&res->spinlock);			// mlog(0, "node %u is the master\n", res->owner);			response = DLM_MASTER_RESP_NO;			if (mle)				kmem_cache_free(dlm_mle_cache, mle);			goto send_response;		}		/* ok, there is no owner.  either this node is		 * being blocked, or it is actively trying to		 * master this lock. */		if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) {			mlog(ML_ERROR, "lock with no owner should be "			     "in-progress!\n");			BUG();		}		// mlog(0, "lockres is in progress...\n");		spin_lock(&dlm->master_lock);		found = dlm_find_mle(dlm, &tmpmle, name, namelen);		if (!found) {			mlog(ML_ERROR, "no mle found for this lock!\n");			BUG();		}		set_maybe = 1;		spin_lock(&tmpmle->spinlock);		if (tmpmle->type == DLM_MLE_BLOCK) {			// mlog(0, "this node is waiting for "			// "lockres to be mastered\n");			response = DLM_MASTER_RESP_NO;		} else if (tmpmle->type == DLM_MLE_MIGRATION) {			mlog(0, "node %u is master, but trying to migrate to "			     "node %u.\n", tmpmle->master, tmpmle->new_master);			if (tmpmle->master == dlm->node_num) {				response = DLM_MASTER_RESP_YES;				mlog(ML_ERROR, "no owner on lockres, but this "				     "node is trying to migrate it to %u?!\n",				     tmpmle->new_master);				BUG();			} else {				/* the real master can respond on its own */				response = DLM_MASTER_RESP_NO;			}		} else if (tmpmle->master != DLM_LOCK_RES_OWNER_UNKNOWN) {			set_maybe = 0;			if (tmpmle->master == dlm->node_num) {				response = DLM_MASTER_RESP_YES;				/* this node will be the owner.				 * go back and clean the mles on any				 * other nodes */				dispatch_assert = 1;			} else				response = DLM_MASTER_RESP_NO;		} else {			// mlog(0, "this node is attempting to "			// "master lockres\n");			response = DLM_MASTER_RESP_MAYBE;		}		if (set_maybe)			set_bit(request->node_idx, tmpmle->maybe_map);		spin_unlock(&tmpmle->spinlock);		spin_unlock(&dlm->master_lock);		spin_unlock(&res->spinlock);		/* keep the mle attached to heartbeat events */		dlm_put_mle(tmpmle);		if (mle)			kmem_cache_free(dlm_mle_cache, mle);		goto send_response;	}	/*

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -