⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dlmrecovery.c

📁 linux2.6.16版本
💻 C
📖 第 1 页 / 共 5 页
字号:
	status = dlm_init_recovery_area(dlm, dead_node);	if (status < 0)		goto leave;	/* safe to access the node data list without a lock, since this	 * process is the only one to change the list */	list_for_each(iter, &dlm->reco.node_data) {		ndata = list_entry (iter, struct dlm_reco_node_data, list);		BUG_ON(ndata->state != DLM_RECO_NODE_DATA_INIT);		ndata->state = DLM_RECO_NODE_DATA_REQUESTING;		mlog(0, "requesting lock info from node %u\n",		     ndata->node_num);		if (ndata->node_num == dlm->node_num) {			ndata->state = DLM_RECO_NODE_DATA_DONE;			continue;		}		status = dlm_request_all_locks(dlm, ndata->node_num, dead_node);		if (status < 0) {			mlog_errno(status);			if (dlm_is_host_down(status))				ndata->state = DLM_RECO_NODE_DATA_DEAD;			else {				destroy = 1;				goto leave;			}		}		switch (ndata->state) {			case DLM_RECO_NODE_DATA_INIT:			case DLM_RECO_NODE_DATA_FINALIZE_SENT:			case DLM_RECO_NODE_DATA_REQUESTED:				BUG();				break;			case DLM_RECO_NODE_DATA_DEAD:				mlog(0, "node %u died after requesting "				     "recovery info for node %u\n",				     ndata->node_num, dead_node);				// start all over				destroy = 1;				status = -EAGAIN;				goto leave;			case DLM_RECO_NODE_DATA_REQUESTING:				ndata->state = DLM_RECO_NODE_DATA_REQUESTED;				mlog(0, "now receiving recovery data from "				     "node %u for dead node %u\n",				     ndata->node_num, dead_node);				break;			case DLM_RECO_NODE_DATA_RECEIVING:				mlog(0, "already receiving recovery data from "				     "node %u for dead node %u\n",				     ndata->node_num, dead_node);				break;			case DLM_RECO_NODE_DATA_DONE:				mlog(0, "already DONE receiving recovery data "				     "from node %u for dead node %u\n",				     ndata->node_num, dead_node);				break;		}	}	mlog(0, "done requesting all lock info\n");	/* nodes should be sending reco data now	 * just need to wait */	while (1) {		/* check all the nodes now to see if we are		 * done, or if anyone died */		all_nodes_done = 1;		spin_lock(&dlm_reco_state_lock);		list_for_each(iter, &dlm->reco.node_data) {			ndata = list_entry (iter, struct dlm_reco_node_data, list);			mlog(0, "checking recovery state of node %u\n",			     ndata->node_num);			switch (ndata->state) {				case DLM_RECO_NODE_DATA_INIT:				case DLM_RECO_NODE_DATA_REQUESTING:					mlog(ML_ERROR, "bad ndata state for "					     "node %u: state=%d\n",					     ndata->node_num, ndata->state);					BUG();					break;				case DLM_RECO_NODE_DATA_DEAD:					mlog(ML_NOTICE, "node %u died after "					     "requesting recovery info for "					     "node %u\n", ndata->node_num,					     dead_node);					spin_unlock(&dlm_reco_state_lock);					// start all over					destroy = 1;					status = -EAGAIN;					/* instead of spinning like crazy here,					 * wait for the domain map to catch up					 * with the network state.  otherwise this					 * can be hit hundreds of times before					 * the node is really seen as dead. */					wait_event_timeout(dlm->dlm_reco_thread_wq,							   dlm_is_node_dead(dlm,								ndata->node_num),							   msecs_to_jiffies(1000));					mlog(0, "waited 1 sec for %u, "					     "dead? %s\n", ndata->node_num,					     dlm_is_node_dead(dlm, ndata->node_num) ?					     "yes" : "no");					goto leave;				case DLM_RECO_NODE_DATA_RECEIVING:				case DLM_RECO_NODE_DATA_REQUESTED:					all_nodes_done = 0;					break;				case DLM_RECO_NODE_DATA_DONE:					break;				case DLM_RECO_NODE_DATA_FINALIZE_SENT:					break;			}		}		spin_unlock(&dlm_reco_state_lock);		mlog(0, "pass #%d, all_nodes_done?: %s\n", ++pass,		     all_nodes_done?"yes":"no");		if (all_nodes_done) {			int ret;			/* all nodes are now in DLM_RECO_NODE_DATA_DONE state	 		 * just send a finalize message to everyone and	 		 * clean up */			mlog(0, "all nodes are done! send finalize\n");			ret = dlm_send_finalize_reco_message(dlm);			if (ret < 0)				mlog_errno(ret);			spin_lock(&dlm->spinlock);			dlm_finish_local_lockres_recovery(dlm, dead_node,							  dlm->node_num);			spin_unlock(&dlm->spinlock);			mlog(0, "should be done with recovery!\n");			mlog(0, "finishing recovery of %s at %lu, "			     "dead=%u, this=%u, new=%u\n", dlm->name,			     jiffies, dlm->reco.dead_node,			     dlm->node_num, dlm->reco.new_master);			destroy = 1;			status = ret;			/* rescan everything marked dirty along the way */			dlm_kick_thread(dlm, NULL);			break;		}		/* wait to be signalled, with periodic timeout		 * to check for node death */		wait_event_interruptible_timeout(dlm->dlm_reco_thread_wq,					 kthread_should_stop(),					 msecs_to_jiffies(DLM_RECO_THREAD_TIMEOUT_MS));	}leave:	if (destroy)		dlm_destroy_recovery_area(dlm, dead_node);	mlog_exit(status);	return status;}static int dlm_init_recovery_area(struct dlm_ctxt *dlm, u8 dead_node){	int num=0;	struct dlm_reco_node_data *ndata;	spin_lock(&dlm->spinlock);	memcpy(dlm->reco.node_map, dlm->domain_map, sizeof(dlm->domain_map));	/* nodes can only be removed (by dying) after dropping	 * this lock, and death will be trapped later, so this should do */	spin_unlock(&dlm->spinlock);	while (1) {		num = find_next_bit (dlm->reco.node_map, O2NM_MAX_NODES, num);		if (num >= O2NM_MAX_NODES) {			break;		}		BUG_ON(num == dead_node);		ndata = kcalloc(1, sizeof(*ndata), GFP_KERNEL);		if (!ndata) {			dlm_destroy_recovery_area(dlm, dead_node);			return -ENOMEM;		}		ndata->node_num = num;		ndata->state = DLM_RECO_NODE_DATA_INIT;		spin_lock(&dlm_reco_state_lock);		list_add_tail(&ndata->list, &dlm->reco.node_data);		spin_unlock(&dlm_reco_state_lock);		num++;	}	return 0;}static void dlm_destroy_recovery_area(struct dlm_ctxt *dlm, u8 dead_node){	struct list_head *iter, *iter2;	struct dlm_reco_node_data *ndata;	LIST_HEAD(tmplist);	spin_lock(&dlm_reco_state_lock);	list_splice_init(&dlm->reco.node_data, &tmplist);	spin_unlock(&dlm_reco_state_lock);	list_for_each_safe(iter, iter2, &tmplist) {		ndata = list_entry (iter, struct dlm_reco_node_data, list);		list_del_init(&ndata->list);		kfree(ndata);	}}static int dlm_request_all_locks(struct dlm_ctxt *dlm, u8 request_from,				 u8 dead_node){	struct dlm_lock_request lr;	enum dlm_status ret;	mlog(0, "\n");	mlog(0, "dlm_request_all_locks: dead node is %u, sending request "		  "to %u\n", dead_node, request_from);	memset(&lr, 0, sizeof(lr));	lr.node_idx = dlm->node_num;	lr.dead_node = dead_node;	// send message	ret = DLM_NOLOCKMGR;	ret = o2net_send_message(DLM_LOCK_REQUEST_MSG, dlm->key,				 &lr, sizeof(lr), request_from, NULL);	/* negative status is handled by caller */	if (ret < 0)		mlog_errno(ret);	// return from here, then	// sleep until all received or error	return ret;}int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data){	struct dlm_ctxt *dlm = data;	struct dlm_lock_request *lr = (struct dlm_lock_request *)msg->buf;	char *buf = NULL;	struct dlm_work_item *item = NULL;	if (!dlm_grab(dlm))		return -EINVAL;	BUG_ON(lr->dead_node != dlm->reco.dead_node);	item = kcalloc(1, sizeof(*item), GFP_KERNEL);	if (!item) {		dlm_put(dlm);		return -ENOMEM;	}	/* this will get freed by dlm_request_all_locks_worker */	buf = (char *) __get_free_page(GFP_KERNEL);	if (!buf) {		kfree(item);		dlm_put(dlm);		return -ENOMEM;	}	/* queue up work for dlm_request_all_locks_worker */	dlm_grab(dlm);  /* get an extra ref for the work item */	dlm_init_work_item(dlm, item, dlm_request_all_locks_worker, buf);	item->u.ral.reco_master = lr->node_idx;	item->u.ral.dead_node = lr->dead_node;	spin_lock(&dlm->work_lock);	list_add_tail(&item->list, &dlm->work_list);	spin_unlock(&dlm->work_lock);	schedule_work(&dlm->dispatched_work);	dlm_put(dlm);	return 0;}static void dlm_request_all_locks_worker(struct dlm_work_item *item, void *data){	struct dlm_migratable_lockres *mres;	struct dlm_lock_resource *res;	struct dlm_ctxt *dlm;	LIST_HEAD(resources);	struct list_head *iter;	int ret;	u8 dead_node, reco_master;	dlm = item->dlm;	dead_node = item->u.ral.dead_node;	reco_master = item->u.ral.reco_master;	mres = (struct dlm_migratable_lockres *)data;	if (dead_node != dlm->reco.dead_node ||	    reco_master != dlm->reco.new_master) {		/* show extra debug info if the recovery state is messed */		mlog(ML_ERROR, "%s: bad reco state: reco(dead=%u, master=%u), "		     "request(dead=%u, master=%u)\n",		     dlm->name, dlm->reco.dead_node, dlm->reco.new_master,		     dead_node, reco_master);		mlog(ML_ERROR, "%s: name=%.*s master=%u locks=%u/%u flags=%u "		     "entry[0]={c=%"MLFu64",l=%u,f=%u,t=%d,ct=%d,hb=%d,n=%u}\n",		     dlm->name, mres->lockname_len, mres->lockname, mres->master,		     mres->num_locks, mres->total_locks, mres->flags,		     mres->ml[0].cookie, mres->ml[0].list, mres->ml[0].flags,		     mres->ml[0].type, mres->ml[0].convert_type,		     mres->ml[0].highest_blocked, mres->ml[0].node);		BUG();	}	BUG_ON(dead_node != dlm->reco.dead_node);	BUG_ON(reco_master != dlm->reco.new_master);	/* lock resources should have already been moved to the 	 * dlm->reco.resources list.  now move items from that list 	 * to a temp list if the dead owner matches.  note that the	 * whole cluster recovers only one node at a time, so we	 * can safely move UNKNOWN lock resources for each recovery	 * session. */	dlm_move_reco_locks_to_list(dlm, &resources, dead_node);	/* now we can begin blasting lockreses without the dlm lock */	list_for_each(iter, &resources) {		res = list_entry (iter, struct dlm_lock_resource, recovering);		ret = dlm_send_one_lockres(dlm, res, mres, reco_master,				   	DLM_MRES_RECOVERY);		if (ret < 0)			mlog_errno(ret);	}	/* move the resources back to the list */	spin_lock(&dlm->spinlock);	list_splice_init(&resources, &dlm->reco.resources);	spin_unlock(&dlm->spinlock);	ret = dlm_send_all_done_msg(dlm, dead_node, reco_master);	if (ret < 0)		mlog_errno(ret);	free_page((unsigned long)data);}static int dlm_send_all_done_msg(struct dlm_ctxt *dlm, u8 dead_node, u8 send_to){	int ret, tmpret;	struct dlm_reco_data_done done_msg;	memset(&done_msg, 0, sizeof(done_msg));	done_msg.node_idx = dlm->node_num;	done_msg.dead_node = dead_node;	mlog(0, "sending DATA DONE message to %u, "	     "my node=%u, dead node=%u\n", send_to, done_msg.node_idx,	     done_msg.dead_node);	ret = o2net_send_message(DLM_RECO_DATA_DONE_MSG, dlm->key, &done_msg,				 sizeof(done_msg), send_to, &tmpret);	/* negative status is ignored by the caller */	if (ret >= 0)		ret = tmpret;	return ret;}int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data){	struct dlm_ctxt *dlm = data;	struct dlm_reco_data_done *done = (struct dlm_reco_data_done *)msg->buf;	struct list_head *iter;	struct dlm_reco_node_data *ndata = NULL;	int ret = -EINVAL;	if (!dlm_grab(dlm))		return -EINVAL;	mlog(0, "got DATA DONE: dead_node=%u, reco.dead_node=%u, "	     "node_idx=%u, this node=%u\n", done->dead_node,	     dlm->reco.dead_node, done->node_idx, dlm->node_num);	BUG_ON(done->dead_node != dlm->reco.dead_node);	spin_lock(&dlm_reco_state_lock);	list_for_each(iter, &dlm->reco.node_data) {		ndata = list_entry (iter, struct dlm_reco_node_data, list);		if (ndata->node_num != done->node_idx)			continue;		switch (ndata->state) {			/* should have moved beyond INIT but not to FINALIZE yet */			case DLM_RECO_NODE_DATA_INIT:			case DLM_RECO_NODE_DATA_DEAD:			case DLM_RECO_NODE_DATA_FINALIZE_SENT:				mlog(ML_ERROR, "bad ndata state for node %u:"				     " state=%d\n", ndata->node_num,				     ndata->state);				BUG();				break;			/* these states are possible at this point, anywhere along			 * the line of recovery */			case DLM_RECO_NODE_DATA_DONE:			case DLM_RECO_NODE_DATA_RECEIVING:			case DLM_RECO_NODE_DATA_REQUESTED:			case DLM_RECO_NODE_DATA_REQUESTING:				mlog(0, "node %u is DONE sending "					  "recovery data!\n",					  ndata->node_num);				ndata->state = DLM_RECO_NODE_DATA_DONE;				ret = 0;				break;		}	}	spin_unlock(&dlm_reco_state_lock);	/* wake the recovery thread, some node is done */	if (!ret)		dlm_kick_recovery_thread(dlm);	if (ret < 0)		mlog(ML_ERROR, "failed to find recovery node data for node "		     "%u\n", done->node_idx);	dlm_put(dlm);	mlog(0, "leaving reco data done handler, ret=%d\n", ret);	return ret;}static void dlm_move_reco_locks_to_list(struct dlm_ctxt *dlm,					struct list_head *list,				       	u8 dead_node){	struct dlm_lock_resource *res;	struct list_head *iter, *iter2;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -