⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dlmmaster.c

📁 linux 内核源代码
💻 C
📖 第 1 页 / 共 5 页
字号:
				mlog(0, "%s: waiting 500ms for heartbeat state "				    "change\n", dlm->name);				msleep(500);			}			continue;		} 		dlm_kick_recovery_thread(dlm);		msleep(1000);		dlm_wait_for_recovery(dlm);		spin_lock(&dlm->spinlock);		bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0);		if (bit < O2NM_MAX_NODES) {			mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to "			     "recover before lock mastery can begin\n",			     dlm->name, namelen, (char *)lockid, bit);			wait_on_recovery = 1;		} else			wait_on_recovery = 0;		spin_unlock(&dlm->spinlock);		if (wait_on_recovery)			dlm_wait_for_node_recovery(dlm, bit, 10000);	}	/* must wait for lock to be mastered elsewhere */	if (blocked)		goto wait;	ret = -EINVAL;	dlm_node_iter_init(mle->vote_map, &iter);	while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {		ret = dlm_do_master_request(res, mle, nodenum);		if (ret < 0)			mlog_errno(ret);		if (mle->master != O2NM_MAX_NODES) {			/* found a master ! */			if (mle->master <= nodenum)				break;			/* if our master request has not reached the master			 * yet, keep going until it does.  this is how the			 * master will know that asserts are needed back to			 * the lower nodes. */			mlog(0, "%s:%.*s: requests only up to %u but master "			     "is %u, keep going\n", dlm->name, namelen,			     lockid, nodenum, mle->master);		}	}wait:	/* keep going until the response map includes all nodes */	ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked);	if (ret < 0) {		wait_on_recovery = 1;		mlog(0, "%s:%.*s: node map changed, redo the "		     "master request now, blocked=%d\n",		     dlm->name, res->lockname.len,		     res->lockname.name, blocked);		if (++tries > 20) {			mlog(ML_ERROR, "%s:%.*s: spinning on "			     "dlm_wait_for_lock_mastery, blocked=%d\n", 			     dlm->name, res->lockname.len, 			     res->lockname.name, blocked);			dlm_print_one_lock_resource(res);			dlm_print_one_mle(mle);			tries = 0;		}		goto redo_request;	}	mlog(0, "lockres mastered by %u\n", res->owner);	/* make sure we never continue without this */	BUG_ON(res->owner == O2NM_MAX_NODES);	/* master is known, detach if not already detached */	dlm_mle_detach_hb_events(dlm, mle);	dlm_put_mle(mle);	/* put the extra ref */	dlm_put_mle_inuse(mle);wake_waiters:	spin_lock(&res->spinlock);	if (res->owner != dlm->node_num && drop_inflight_if_nonlocal)		dlm_lockres_drop_inflight_ref(dlm, res);	res->state &= ~DLM_LOCK_RES_IN_PROGRESS;	spin_unlock(&res->spinlock);	wake_up(&res->wq);leave:	/* need to free the unused mle */	if (alloc_mle)		kmem_cache_free(dlm_mle_cache, alloc_mle);	return res;}#define DLM_MASTERY_TIMEOUT_MS   5000static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,				     struct dlm_lock_resource *res,				     struct dlm_master_list_entry *mle,				     int *blocked){	u8 m;	int ret, bit;	int map_changed, voting_done;	int assert, sleep;recheck:	ret = 0;	assert = 0;	/* check if another node has already become the owner */	spin_lock(&res->spinlock);	if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {		mlog(0, "%s:%.*s: owner is suddenly %u\n", dlm->name,		     res->lockname.len, res->lockname.name, res->owner);		spin_unlock(&res->spinlock);		/* this will cause the master to re-assert across		 * the whole cluster, freeing up mles */		if (res->owner != dlm->node_num) {			ret = dlm_do_master_request(res, mle, res->owner);			if (ret < 0) {				/* give recovery a chance to run */				mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret);				msleep(500);				goto recheck;			}		}		ret = 0;		goto leave;	}	spin_unlock(&res->spinlock);	spin_lock(&mle->spinlock);	m = mle->master;	map_changed = (memcmp(mle->vote_map, mle->node_map,			      sizeof(mle->vote_map)) != 0);	voting_done = (memcmp(mle->vote_map, mle->response_map,			     sizeof(mle->vote_map)) == 0);	/* restart if we hit any errors */	if (map_changed) {		int b;		mlog(0, "%s: %.*s: node map changed, restarting\n",		     dlm->name, res->lockname.len, res->lockname.name);		ret = dlm_restart_lock_mastery(dlm, res, mle, *blocked);		b = (mle->type == DLM_MLE_BLOCK);		if ((*blocked && !b) || (!*blocked && b)) {			mlog(0, "%s:%.*s: status change: old=%d new=%d\n", 			     dlm->name, res->lockname.len, res->lockname.name,			     *blocked, b);			*blocked = b;		}		spin_unlock(&mle->spinlock);		if (ret < 0) {			mlog_errno(ret);			goto leave;		}		mlog(0, "%s:%.*s: restart lock mastery succeeded, "		     "rechecking now\n", dlm->name, res->lockname.len,		     res->lockname.name);		goto recheck;	} else {		if (!voting_done) {			mlog(0, "map not changed and voting not done "			     "for %s:%.*s\n", dlm->name, res->lockname.len,			     res->lockname.name);		}	}	if (m != O2NM_MAX_NODES) {		/* another node has done an assert!		 * all done! */		sleep = 0;	} else {		sleep = 1;		/* have all nodes responded? */		if (voting_done && !*blocked) {			bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);			if (dlm->node_num <= bit) {				/* my node number is lowest.			 	 * now tell other nodes that I am				 * mastering this. */				mle->master = dlm->node_num;				/* ref was grabbed in get_lock_resource				 * will be dropped in dlmlock_master */				assert = 1;				sleep = 0;			}			/* if voting is done, but we have not received			 * an assert master yet, we must sleep */		}	}	spin_unlock(&mle->spinlock);	/* sleep if we haven't finished voting yet */	if (sleep) {		unsigned long timeo = msecs_to_jiffies(DLM_MASTERY_TIMEOUT_MS);		/*		if (atomic_read(&mle->mle_refs.refcount) < 2)			mlog(ML_ERROR, "mle (%p) refs=%d, name=%.*s\n", mle,			atomic_read(&mle->mle_refs.refcount),			res->lockname.len, res->lockname.name);		*/		atomic_set(&mle->woken, 0);		(void)wait_event_timeout(mle->wq,					 (atomic_read(&mle->woken) == 1),					 timeo);		if (res->owner == O2NM_MAX_NODES) {			mlog(0, "%s:%.*s: waiting again\n", dlm->name,			     res->lockname.len, res->lockname.name);			goto recheck;		}		mlog(0, "done waiting, master is %u\n", res->owner);		ret = 0;		goto leave;	}	ret = 0;   /* done */	if (assert) {		m = dlm->node_num;		mlog(0, "about to master %.*s here, this=%u\n",		     res->lockname.len, res->lockname.name, m);		ret = dlm_do_assert_master(dlm, res, mle->vote_map, 0);		if (ret) {			/* This is a failure in the network path,			 * not in the response to the assert_master			 * (any nonzero response is a BUG on this node).			 * Most likely a socket just got disconnected			 * due to node death. */			mlog_errno(ret);		}		/* no longer need to restart lock mastery.		 * all living nodes have been contacted. */		ret = 0;	}	/* set the lockres owner */	spin_lock(&res->spinlock);	/* mastery reference obtained either during	 * assert_master_handler or in get_lock_resource */	dlm_change_lockres_owner(dlm, res, m);	spin_unlock(&res->spinlock);leave:	return ret;}struct dlm_bitmap_diff_iter{	int curnode;	unsigned long *orig_bm;	unsigned long *cur_bm;	unsigned long diff_bm[BITS_TO_LONGS(O2NM_MAX_NODES)];};enum dlm_node_state_change{	NODE_DOWN = -1,	NODE_NO_CHANGE = 0,	NODE_UP};static void dlm_bitmap_diff_iter_init(struct dlm_bitmap_diff_iter *iter,				      unsigned long *orig_bm,				      unsigned long *cur_bm){	unsigned long p1, p2;	int i;	iter->curnode = -1;	iter->orig_bm = orig_bm;	iter->cur_bm = cur_bm;	for (i = 0; i < BITS_TO_LONGS(O2NM_MAX_NODES); i++) {       		p1 = *(iter->orig_bm + i);	       	p2 = *(iter->cur_bm + i);		iter->diff_bm[i] = (p1 & ~p2) | (p2 & ~p1);	}}static int dlm_bitmap_diff_iter_next(struct dlm_bitmap_diff_iter *iter,				     enum dlm_node_state_change *state){	int bit;	if (iter->curnode >= O2NM_MAX_NODES)		return -ENOENT;	bit = find_next_bit(iter->diff_bm, O2NM_MAX_NODES,			    iter->curnode+1);	if (bit >= O2NM_MAX_NODES) {		iter->curnode = O2NM_MAX_NODES;		return -ENOENT;	}	/* if it was there in the original then this node died */	if (test_bit(bit, iter->orig_bm))		*state = NODE_DOWN;	else		*state = NODE_UP;	iter->curnode = bit;	return bit;}static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,				    struct dlm_lock_resource *res,				    struct dlm_master_list_entry *mle,				    int blocked){	struct dlm_bitmap_diff_iter bdi;	enum dlm_node_state_change sc;	int node;	int ret = 0;	mlog(0, "something happened such that the "	     "master process may need to be restarted!\n");	assert_spin_locked(&mle->spinlock);	dlm_bitmap_diff_iter_init(&bdi, mle->vote_map, mle->node_map);	node = dlm_bitmap_diff_iter_next(&bdi, &sc);	while (node >= 0) {		if (sc == NODE_UP) {			/* a node came up.  clear any old vote from			 * the response map and set it in the vote map			 * then restart the mastery. */			mlog(ML_NOTICE, "node %d up while restarting\n", node);			/* redo the master request, but only for the new node */			mlog(0, "sending request to new node\n");			clear_bit(node, mle->response_map);			set_bit(node, mle->vote_map);		} else {			mlog(ML_ERROR, "node down! %d\n", node);			if (blocked) {				int lowest = find_next_bit(mle->maybe_map,						       O2NM_MAX_NODES, 0);				/* act like it was never there */				clear_bit(node, mle->maybe_map);			       	if (node == lowest) {					mlog(0, "expected master %u died"					    " while this node was blocked "					    "waiting on it!\n", node);					lowest = find_next_bit(mle->maybe_map,						       	O2NM_MAX_NODES,						       	lowest+1);					if (lowest < O2NM_MAX_NODES) {						mlog(0, "%s:%.*s:still "						     "blocked. waiting on %u "						     "now\n", dlm->name,						     res->lockname.len,						     res->lockname.name,						     lowest);					} else {						/* mle is an MLE_BLOCK, but						 * there is now nothing left to						 * block on.  we need to return						 * all the way back out and try						 * again with an MLE_MASTER.						 * dlm_do_local_recovery_cleanup						 * has already run, so the mle						 * refcount is ok */						mlog(0, "%s:%.*s: no "						     "longer blocking. try to "						     "master this here\n",						     dlm->name,						     res->lockname.len,						     res->lockname.name);						mle->type = DLM_MLE_MASTER;						mle->u.res = res;					}				}			}			/* now blank out everything, as if we had never			 * contacted anyone */			memset(mle->maybe_map, 0, sizeof(mle->maybe_map));			memset(mle->response_map, 0, sizeof(mle->response_map));			/* reset the vote_map to the current node_map */			memcpy(mle->vote_map, mle->node_map,			       sizeof(mle->node_map));			/* put myself into the maybe map */			if (mle->type != DLM_MLE_BLOCK)				set_bit(dlm->node_num, mle->maybe_map);		}		ret = -EAGAIN;		node = dlm_bitmap_diff_iter_next(&bdi, &sc);	}	return ret;}/* * DLM_MASTER_REQUEST_MSG * * returns: 0 on success, *          -errno on a network error * * on error, the caller should assume the target node is "dead" * */static int dlm_do_master_request(struct dlm_lock_resource *res,				 struct dlm_master_list_entry *mle, int to){	struct dlm_ctxt *dlm = mle->dlm;	struct dlm_master_request request;	int ret, response=0, resend;	memset(&request, 0, sizeof(request));	request.node_idx = dlm->node_num;	BUG_ON(mle->type == DLM_MLE_MIGRATION);	if (mle->type != DLM_MLE_MASTER) {		request.namelen = mle->u.name.len;		memcpy(request.name, mle->u.name.name, request.namelen);	} else {		request.namelen = mle->u.res->lockname.len;		memcpy(request.name, mle->u.res->lockname.name,			request.namelen);	}again:	ret = o2net_send_message(DLM_MASTER_REQUEST_MSG, dlm->key, &request,				 sizeof(request), to, &response);	if (ret < 0)  {		if (ret == -ESRCH) {			/* should never happen */			mlog(ML_ERROR, "TCP stack not ready!\n");			BUG();		} else if (ret == -EINVAL) {			mlog(ML_ERROR, "bad args passed to o2net!\n");			BUG();		} else if (ret == -ENOMEM) {			mlog(ML_ERROR, "out of memory while trying to send "			     "network message!  retrying\n");			/* this is totally crude */			msleep(50);			goto again;		} else if (!dlm_is_host_down(ret)) {			/* not a network error. bad. */			mlog_errno(ret);			mlog(ML_ERROR, "unhandled error!");			BUG();		}		/* all other errors should be network errors,		 * and likely indicate node death */		mlog(ML_ERROR, "link to %d went down!\n", to);		goto out;	}	ret = 0;	resend = 0;	spin_lock(&mle->spinlock);	switch (response) {		case DLM_MASTER_RESP_YES:			set_bit(to, mle->response_map);			mlog(0, "node %u is the master, response=YES\n", to);			mlog(0, "%s:%.*s: master node %u now knows I have a "			     "reference\n", dlm->name, res->lockname.len,			     res->lockname.name, to);			mle->master = to;			break;		case DLM_MASTER_RESP_NO:

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -