⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dlmrecovery.c

📁 linux 内核源代码
💻 C
📖 第 1 页 / 共 5 页
字号:
	mlog(0, "%s: recovery worker started, dead=%u, master=%u\n",	     dlm->name, dead_node, reco_master);	if (dead_node != dlm->reco.dead_node ||	    reco_master != dlm->reco.new_master) {		/* worker could have been created before the recovery master		 * died.  if so, do not continue, but do not error. */		if (dlm->reco.new_master == O2NM_INVALID_NODE_NUM) {			mlog(ML_NOTICE, "%s: will not send recovery state, "			     "recovery master %u died, thread=(dead=%u,mas=%u)"			     " current=(dead=%u,mas=%u)\n", dlm->name,			     reco_master, dead_node, reco_master,			     dlm->reco.dead_node, dlm->reco.new_master);		} else {			mlog(ML_NOTICE, "%s: reco state invalid: reco(dead=%u, "			     "master=%u), request(dead=%u, master=%u)\n",			     dlm->name, dlm->reco.dead_node,			     dlm->reco.new_master, dead_node, reco_master);		}		goto leave;	}	/* lock resources should have already been moved to the 	 * dlm->reco.resources list.  now move items from that list 	 * to a temp list if the dead owner matches.  note that the	 * whole cluster recovers only one node at a time, so we	 * can safely move UNKNOWN lock resources for each recovery	 * session. */	dlm_move_reco_locks_to_list(dlm, &resources, dead_node);	/* now we can begin blasting lockreses without the dlm lock */	/* any errors returned will be due to the new_master dying,	 * the dlm_reco_thread should detect this */	list_for_each_entry(res, &resources, recovering) {		ret = dlm_send_one_lockres(dlm, res, mres, reco_master,				   	DLM_MRES_RECOVERY);		if (ret < 0) {			mlog(ML_ERROR, "%s: node %u went down while sending "			     "recovery state for dead node %u, ret=%d\n", dlm->name,			     reco_master, dead_node, ret);			skip_all_done = 1;			break;		}	}	/* move the resources back to the list */	spin_lock(&dlm->spinlock);	list_splice_init(&resources, &dlm->reco.resources);	spin_unlock(&dlm->spinlock);	if (!skip_all_done) {		ret = dlm_send_all_done_msg(dlm, dead_node, reco_master);		if (ret < 0) {			mlog(ML_ERROR, "%s: node %u went down while sending "			     "recovery all-done for dead node %u, ret=%d\n",			     dlm->name, reco_master, dead_node, ret);		}	}leave:	free_page((unsigned long)data);}static int dlm_send_all_done_msg(struct dlm_ctxt *dlm, u8 dead_node, u8 send_to){	int ret, tmpret;	struct dlm_reco_data_done done_msg;	memset(&done_msg, 0, sizeof(done_msg));	done_msg.node_idx = dlm->node_num;	done_msg.dead_node = dead_node;	mlog(0, "sending DATA DONE message to %u, "	     "my node=%u, dead node=%u\n", send_to, done_msg.node_idx,	     done_msg.dead_node);	ret = o2net_send_message(DLM_RECO_DATA_DONE_MSG, dlm->key, &done_msg,				 sizeof(done_msg), send_to, &tmpret);	if (ret < 0) {		if (!dlm_is_host_down(ret)) {			mlog_errno(ret);			mlog(ML_ERROR, "%s: unknown error sending data-done "			     "to %u\n", dlm->name, send_to);			BUG();		}	} else		ret = tmpret;	return ret;}int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data,			       void **ret_data){	struct dlm_ctxt *dlm = data;	struct dlm_reco_data_done *done = (struct dlm_reco_data_done *)msg->buf;	struct dlm_reco_node_data *ndata = NULL;	int ret = -EINVAL;	if (!dlm_grab(dlm))		return -EINVAL;	mlog(0, "got DATA DONE: dead_node=%u, reco.dead_node=%u, "	     "node_idx=%u, this node=%u\n", done->dead_node,	     dlm->reco.dead_node, done->node_idx, dlm->node_num);	mlog_bug_on_msg((done->dead_node != dlm->reco.dead_node),			"Got DATA DONE: dead_node=%u, reco.dead_node=%u, "			"node_idx=%u, this node=%u\n", done->dead_node,			dlm->reco.dead_node, done->node_idx, dlm->node_num);	spin_lock(&dlm_reco_state_lock);	list_for_each_entry(ndata, &dlm->reco.node_data, list) {		if (ndata->node_num != done->node_idx)			continue;		switch (ndata->state) {			/* should have moved beyond INIT but not to FINALIZE yet */			case DLM_RECO_NODE_DATA_INIT:			case DLM_RECO_NODE_DATA_DEAD:			case DLM_RECO_NODE_DATA_FINALIZE_SENT:				mlog(ML_ERROR, "bad ndata state for node %u:"				     " state=%d\n", ndata->node_num,				     ndata->state);				BUG();				break;			/* these states are possible at this point, anywhere along			 * the line of recovery */			case DLM_RECO_NODE_DATA_DONE:			case DLM_RECO_NODE_DATA_RECEIVING:			case DLM_RECO_NODE_DATA_REQUESTED:			case DLM_RECO_NODE_DATA_REQUESTING:				mlog(0, "node %u is DONE sending "					  "recovery data!\n",					  ndata->node_num);				ndata->state = DLM_RECO_NODE_DATA_DONE;				ret = 0;				break;		}	}	spin_unlock(&dlm_reco_state_lock);	/* wake the recovery thread, some node is done */	if (!ret)		dlm_kick_recovery_thread(dlm);	if (ret < 0)		mlog(ML_ERROR, "failed to find recovery node data for node "		     "%u\n", done->node_idx);	dlm_put(dlm);	mlog(0, "leaving reco data done handler, ret=%d\n", ret);	return ret;}static void dlm_move_reco_locks_to_list(struct dlm_ctxt *dlm,					struct list_head *list,				       	u8 dead_node){	struct dlm_lock_resource *res, *next;	struct dlm_lock *lock;	spin_lock(&dlm->spinlock);	list_for_each_entry_safe(res, next, &dlm->reco.resources, recovering) {		/* always prune any $RECOVERY entries for dead nodes,		 * otherwise hangs can occur during later recovery */		if (dlm_is_recovery_lock(res->lockname.name,					 res->lockname.len)) {			spin_lock(&res->spinlock);			list_for_each_entry(lock, &res->granted, list) {				if (lock->ml.node == dead_node) {					mlog(0, "AHA! there was "					     "a $RECOVERY lock for dead "					     "node %u (%s)!\n", 					     dead_node, dlm->name);					list_del_init(&lock->list);					dlm_lock_put(lock);					break;				}			}			spin_unlock(&res->spinlock);			continue;		}		if (res->owner == dead_node) {			mlog(0, "found lockres owned by dead node while "				  "doing recovery for node %u. sending it.\n",				  dead_node);			list_move_tail(&res->recovering, list);		} else if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {			mlog(0, "found UNKNOWN owner while doing recovery "				  "for node %u. sending it.\n", dead_node);			list_move_tail(&res->recovering, list);		}	}	spin_unlock(&dlm->spinlock);}static inline int dlm_num_locks_in_lockres(struct dlm_lock_resource *res){	int total_locks = 0;	struct list_head *iter, *queue = &res->granted;	int i;	for (i=0; i<3; i++) {		list_for_each(iter, queue)			total_locks++;		queue++;	}	return total_locks;}static int dlm_send_mig_lockres_msg(struct dlm_ctxt *dlm,				      struct dlm_migratable_lockres *mres,				      u8 send_to,				      struct dlm_lock_resource *res,				      int total_locks){	u64 mig_cookie = be64_to_cpu(mres->mig_cookie);	int mres_total_locks = be32_to_cpu(mres->total_locks);	int sz, ret = 0, status = 0;	u8 orig_flags = mres->flags,	   orig_master = mres->master;	BUG_ON(mres->num_locks > DLM_MAX_MIGRATABLE_LOCKS);	if (!mres->num_locks)		return 0;	sz = sizeof(struct dlm_migratable_lockres) +		(mres->num_locks * sizeof(struct dlm_migratable_lock));	/* add an all-done flag if we reached the last lock */	orig_flags = mres->flags;	BUG_ON(total_locks > mres_total_locks);	if (total_locks == mres_total_locks)		mres->flags |= DLM_MRES_ALL_DONE;	mlog(0, "%s:%.*s: sending mig lockres (%s) to %u\n",	     dlm->name, res->lockname.len, res->lockname.name,	     orig_flags & DLM_MRES_MIGRATION ? "migrate" : "recovery",	     send_to);	/* send it */	ret = o2net_send_message(DLM_MIG_LOCKRES_MSG, dlm->key, mres,				 sz, send_to, &status);	if (ret < 0) {		/* XXX: negative status is not handled.		 * this will end up killing this node. */		mlog_errno(ret);	} else {		/* might get an -ENOMEM back here */		ret = status;		if (ret < 0) {			mlog_errno(ret);			if (ret == -EFAULT) {				mlog(ML_ERROR, "node %u told me to kill "				     "myself!\n", send_to);				BUG();			}		}	}	/* zero and reinit the message buffer */	dlm_init_migratable_lockres(mres, res->lockname.name,				    res->lockname.len, mres_total_locks,				    mig_cookie, orig_flags, orig_master);	return ret;}static void dlm_init_migratable_lockres(struct dlm_migratable_lockres *mres,					const char *lockname, int namelen,					int total_locks, u64 cookie,					u8 flags, u8 master){	/* mres here is one full page */	clear_page(mres);	mres->lockname_len = namelen;	memcpy(mres->lockname, lockname, namelen);	mres->num_locks = 0;	mres->total_locks = cpu_to_be32(total_locks);	mres->mig_cookie = cpu_to_be64(cookie);	mres->flags = flags;	mres->master = master;}/* returns 1 if this lock fills the network structure, * 0 otherwise */static int dlm_add_lock_to_array(struct dlm_lock *lock,				 struct dlm_migratable_lockres *mres, int queue){	struct dlm_migratable_lock *ml;	int lock_num = mres->num_locks;	ml = &(mres->ml[lock_num]);	ml->cookie = lock->ml.cookie;	ml->type = lock->ml.type;	ml->convert_type = lock->ml.convert_type;	ml->highest_blocked = lock->ml.highest_blocked;	ml->list = queue;	if (lock->lksb) {		ml->flags = lock->lksb->flags;		/* send our current lvb */		if (ml->type == LKM_EXMODE ||		    ml->type == LKM_PRMODE) {			/* if it is already set, this had better be a PR			 * and it has to match */			if (!dlm_lvb_is_empty(mres->lvb) &&			    (ml->type == LKM_EXMODE ||			     memcmp(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN))) {				mlog(ML_ERROR, "mismatched lvbs!\n");				__dlm_print_one_lock_resource(lock->lockres);				BUG();			}			memcpy(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN);		}	}	ml->node = lock->ml.node;	mres->num_locks++;	/* we reached the max, send this network message */	if (mres->num_locks == DLM_MAX_MIGRATABLE_LOCKS)		return 1;	return 0;}static void dlm_add_dummy_lock(struct dlm_ctxt *dlm,			       struct dlm_migratable_lockres *mres){	struct dlm_lock dummy;	memset(&dummy, 0, sizeof(dummy));	dummy.ml.cookie = 0;	dummy.ml.type = LKM_IVMODE;	dummy.ml.convert_type = LKM_IVMODE;	dummy.ml.highest_blocked = LKM_IVMODE;	dummy.lksb = NULL;	dummy.ml.node = dlm->node_num;	dlm_add_lock_to_array(&dummy, mres, DLM_BLOCKED_LIST);}static inline int dlm_is_dummy_lock(struct dlm_ctxt *dlm,				    struct dlm_migratable_lock *ml,				    u8 *nodenum){	if (unlikely(ml->cookie == 0 &&	    ml->type == LKM_IVMODE &&	    ml->convert_type == LKM_IVMODE &&	    ml->highest_blocked == LKM_IVMODE &&	    ml->list == DLM_BLOCKED_LIST)) {		*nodenum = ml->node;		return 1;	}	return 0;}int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,			 struct dlm_migratable_lockres *mres,			 u8 send_to, u8 flags){	struct list_head *queue;	int total_locks, i;	u64 mig_cookie = 0;	struct dlm_lock *lock;	int ret = 0;	BUG_ON(!(flags & (DLM_MRES_RECOVERY|DLM_MRES_MIGRATION)));	mlog(0, "sending to %u\n", send_to);	total_locks = dlm_num_locks_in_lockres(res);	if (total_locks > DLM_MAX_MIGRATABLE_LOCKS) {		/* rare, but possible */		mlog(0, "argh.  lockres has %d locks.  this will "			  "require more than one network packet to "			  "migrate\n", total_locks);		mig_cookie = dlm_get_next_mig_cookie();	}	dlm_init_migratable_lockres(mres, res->lockname.name,				    res->lockname.len, total_locks,				    mig_cookie, flags, res->owner);	total_locks = 0;	for (i=DLM_GRANTED_LIST; i<=DLM_BLOCKED_LIST; i++) {		queue = dlm_list_idx_to_ptr(res, i);		list_for_each_entry(lock, queue, list) {			/* add another lock. */			total_locks++;			if (!dlm_add_lock_to_array(lock, mres, i))				continue;			/* this filled the lock message,			 * we must send it immediately. */			ret = dlm_send_mig_lockres_msg(dlm, mres, send_to,						       res, total_locks);			if (ret < 0)				goto error;		}	}	if (total_locks == 0) {		/* send a dummy lock to indicate a mastery reference only */		mlog(0, "%s:%.*s: sending dummy lock to %u, %s\n",		     dlm->name, res->lockname.len, res->lockname.name,		     send_to, flags & DLM_MRES_RECOVERY ? "recovery" :		     "migration");		dlm_add_dummy_lock(dlm, mres);	}	/* flush any remaining locks */	ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, res, total_locks);	if (ret < 0)		goto error;	return ret;error:	mlog(ML_ERROR, "%s: dlm_send_mig_lockres_msg returned %d\n",	     dlm->name, ret);	if (!dlm_is_host_down(ret))		BUG();	mlog(0, "%s: node %u went down while sending %s "	     "lockres %.*s\n", dlm->name, send_to,	     flags & DLM_MRES_RECOVERY ?  "recovery" : "migration",	     res->lockname.len, res->lockname.name);	return ret;}/* * this message will contain no more than one page worth of * recovery data, and it will work on only one lockres. * there may be many locks in this page, and we may need to wait * for additional packets to complete all the locks (rare, but * possible). *//* * NOTE: the allocation error cases here are scary

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -