⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dlmrecovery.c

📁 ocfs1.4.1 oracle分布式文件系统
💻 C
📖 第 1 页 / 共 5 页
字号:
static void dlm_end_recovery(struct dlm_ctxt *dlm){	spin_lock(&dlm->spinlock);	BUG_ON(!(dlm->reco.state & DLM_RECO_STATE_ACTIVE));	dlm->reco.state &= ~DLM_RECO_STATE_ACTIVE;	spin_unlock(&dlm->spinlock);	wake_up(&dlm->reco.event);}static int dlm_do_recovery(struct dlm_ctxt *dlm){	int status = 0;	int ret;	spin_lock(&dlm->spinlock);	/* check to see if the new master has died */	if (dlm->reco.new_master != O2NM_INVALID_NODE_NUM &&	    test_bit(dlm->reco.new_master, dlm->recovery_map)) {		mlog(0, "new master %u died while recovering %u!\n",		     dlm->reco.new_master, dlm->reco.dead_node);		/* unset the new_master, leave dead_node */		dlm_set_reco_master(dlm, O2NM_INVALID_NODE_NUM);	}	/* select a target to recover */	if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) {		int bit;		bit = find_next_bit (dlm->recovery_map, O2NM_MAX_NODES+1, 0);		if (bit >= O2NM_MAX_NODES || bit < 0)			dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM);		else			dlm_set_reco_dead_node(dlm, bit);	} else if (!test_bit(dlm->reco.dead_node, dlm->recovery_map)) {		/* BUG? */		mlog(ML_ERROR, "dead_node %u no longer in recovery map!\n",		     dlm->reco.dead_node);		dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM);	}	if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) {		// mlog(0, "nothing to recover!  sleeping now!\n");		spin_unlock(&dlm->spinlock);		/* return to main thread loop and sleep. */		return 0;	}	mlog(0, "%s(%d):recovery thread found node %u in the recovery map!\n",	     dlm->name, dlm->dlm_reco_thread_task->pid,	     dlm->reco.dead_node);	spin_unlock(&dlm->spinlock);	/* take write barrier */	/* (stops the list reshuffling thread, proxy ast handling) */	dlm_begin_recovery(dlm);	if (dlm->reco.new_master == dlm->node_num)		goto master_here;	if (dlm->reco.new_master == O2NM_INVALID_NODE_NUM) {		/* choose a new master, returns 0 if this node		 * is the master, -EEXIST if it's another node.		 * this does not return until a new master is chosen		 * or recovery completes entirely. */		ret = dlm_pick_recovery_master(dlm);		if (!ret) {			/* already notified everyone.  go. */			goto master_here;		}		mlog(0, "another node will master this recovery session.\n");	}	mlog(0, "dlm=%s (%d), new_master=%u, this node=%u, dead_node=%u\n",	     dlm->name, dlm->dlm_reco_thread_task->pid, dlm->reco.new_master,	     dlm->node_num, dlm->reco.dead_node);	/* it is safe to start everything back up here	 * because all of the dead node's lock resources	 * have been marked as in-recovery */	dlm_end_recovery(dlm);	/* sleep out in main dlm_recovery_thread loop. */	return 0;master_here:	mlog(ML_NOTICE, "(%d) Node %u is the Recovery Master for the Dead Node "	     "%u for Domain %s\n", task_pid_nr(dlm->dlm_reco_thread_task),	     dlm->node_num, dlm->reco.dead_node, dlm->name);	status = dlm_remaster_locks(dlm, dlm->reco.dead_node);	if (status < 0) {		/* we should never hit this anymore */		mlog(ML_ERROR, "error %d remastering locks for node %u, "		     "retrying.\n", status, dlm->reco.dead_node);		/* yield a bit to allow any final network messages		 * to get handled on remaining nodes */		msleep(100);	} else {		/* success!  see if any other nodes need recovery */		mlog(0, "DONE mastering recovery of %s:%u here(this=%u)!\n",		     dlm->name, dlm->reco.dead_node, dlm->node_num);		dlm_reset_recovery(dlm);	}	dlm_end_recovery(dlm);	/* continue and look for another dead node */	return -EAGAIN;}static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node){	int status = 0;	struct dlm_reco_node_data *ndata;	int all_nodes_done;	int destroy = 0;	int pass = 0;	do {		/* we have become recovery master.  there is no escaping		 * this, so just keep trying until we get it. */		status = dlm_init_recovery_area(dlm, dead_node);		if (status < 0) {			mlog(ML_ERROR, "%s: failed to alloc recovery area, "			     "retrying\n", dlm->name);			msleep(1000);		}	} while (status != 0);	/* safe to access the node data list without a lock, since this	 * process is the only one to change the list */	list_for_each_entry(ndata, &dlm->reco.node_data, list) {		BUG_ON(ndata->state != DLM_RECO_NODE_DATA_INIT);		ndata->state = DLM_RECO_NODE_DATA_REQUESTING;		mlog(0, "requesting lock info from node %u\n",		     ndata->node_num);		if (ndata->node_num == dlm->node_num) {			ndata->state = DLM_RECO_NODE_DATA_DONE;			continue;		}		do {			status = dlm_request_all_locks(dlm, ndata->node_num,						       dead_node);			if (status < 0) {				mlog_errno(status);				if (dlm_is_host_down(status)) {					/* node died, ignore it for recovery */					status = 0;					ndata->state = DLM_RECO_NODE_DATA_DEAD;					/* wait for the domain map to catch up					 * with the network state. */					wait_event_timeout(dlm->dlm_reco_thread_wq,							   dlm_is_node_dead(dlm,								ndata->node_num),							   msecs_to_jiffies(1000));					mlog(0, "waited 1 sec for %u, "					     "dead? %s\n", ndata->node_num,					     dlm_is_node_dead(dlm, ndata->node_num) ?					     "yes" : "no");				} else {					/* -ENOMEM on the other node */					mlog(0, "%s: node %u returned "					     "%d during recovery, retrying "					     "after a short wait\n",					     dlm->name, ndata->node_num,					     status);					msleep(100);				}			}		} while (status != 0);		spin_lock(&dlm_reco_state_lock);		switch (ndata->state) {			case DLM_RECO_NODE_DATA_INIT:			case DLM_RECO_NODE_DATA_FINALIZE_SENT:			case DLM_RECO_NODE_DATA_REQUESTED:				BUG();				break;			case DLM_RECO_NODE_DATA_DEAD:				mlog(0, "node %u died after requesting "				     "recovery info for node %u\n",				     ndata->node_num, dead_node);				/* fine.  don't need this node's info.				 * continue without it. */				break;			case DLM_RECO_NODE_DATA_REQUESTING:				ndata->state = DLM_RECO_NODE_DATA_REQUESTED;				mlog(0, "now receiving recovery data from "				     "node %u for dead node %u\n",				     ndata->node_num, dead_node);				break;			case DLM_RECO_NODE_DATA_RECEIVING:				mlog(0, "already receiving recovery data from "				     "node %u for dead node %u\n",				     ndata->node_num, dead_node);				break;			case DLM_RECO_NODE_DATA_DONE:				mlog(0, "already DONE receiving recovery data "				     "from node %u for dead node %u\n",				     ndata->node_num, dead_node);				break;		}		spin_unlock(&dlm_reco_state_lock);	}	mlog(0, "done requesting all lock info\n");	/* nodes should be sending reco data now	 * just need to wait */	while (1) {		/* check all the nodes now to see if we are		 * done, or if anyone died */		all_nodes_done = 1;		spin_lock(&dlm_reco_state_lock);		list_for_each_entry(ndata, &dlm->reco.node_data, list) {			mlog(0, "checking recovery state of node %u\n",			     ndata->node_num);			switch (ndata->state) {				case DLM_RECO_NODE_DATA_INIT:				case DLM_RECO_NODE_DATA_REQUESTING:					mlog(ML_ERROR, "bad ndata state for "					     "node %u: state=%d\n",					     ndata->node_num, ndata->state);					BUG();					break;				case DLM_RECO_NODE_DATA_DEAD:					mlog(0, "node %u died after "					     "requesting recovery info for "					     "node %u\n", ndata->node_num,					     dead_node);					break;				case DLM_RECO_NODE_DATA_RECEIVING:				case DLM_RECO_NODE_DATA_REQUESTED:					mlog(0, "%s: node %u still in state %s\n",					     dlm->name, ndata->node_num,					     ndata->state==DLM_RECO_NODE_DATA_RECEIVING ?					     "receiving" : "requested");					all_nodes_done = 0;					break;				case DLM_RECO_NODE_DATA_DONE:					mlog(0, "%s: node %u state is done\n",					     dlm->name, ndata->node_num);					break;				case DLM_RECO_NODE_DATA_FINALIZE_SENT:					mlog(0, "%s: node %u state is finalize\n",					     dlm->name, ndata->node_num);					break;			}		}		spin_unlock(&dlm_reco_state_lock);		mlog(0, "pass #%d, all_nodes_done?: %s\n", ++pass,		     all_nodes_done?"yes":"no");		if (all_nodes_done) {			int ret;			/* all nodes are now in DLM_RECO_NODE_DATA_DONE state	 		 * just send a finalize message to everyone and	 		 * clean up */			mlog(0, "all nodes are done! send finalize\n");			ret = dlm_send_finalize_reco_message(dlm);			if (ret < 0)				mlog_errno(ret);			spin_lock(&dlm->spinlock);			dlm_finish_local_lockres_recovery(dlm, dead_node,							  dlm->node_num);			spin_unlock(&dlm->spinlock);			mlog(0, "should be done with recovery!\n");			mlog(0, "finishing recovery of %s at %lu, "			     "dead=%u, this=%u, new=%u\n", dlm->name,			     jiffies, dlm->reco.dead_node,			     dlm->node_num, dlm->reco.new_master);			destroy = 1;			status = 0;			/* rescan everything marked dirty along the way */			dlm_kick_thread(dlm, NULL);			break;		}		/* wait to be signalled, with periodic timeout		 * to check for node death */		wait_event_interruptible_timeout(dlm->dlm_reco_thread_wq,					 kthread_should_stop(),					 msecs_to_jiffies(DLM_RECO_THREAD_TIMEOUT_MS));	}	if (destroy)		dlm_destroy_recovery_area(dlm, dead_node);	mlog_exit(status);	return status;}static int dlm_init_recovery_area(struct dlm_ctxt *dlm, u8 dead_node){	int num=0;	struct dlm_reco_node_data *ndata;	spin_lock(&dlm->spinlock);	memcpy(dlm->reco.node_map, dlm->domain_map, sizeof(dlm->domain_map));	/* nodes can only be removed (by dying) after dropping	 * this lock, and death will be trapped later, so this should do */	spin_unlock(&dlm->spinlock);	while (1) {		num = find_next_bit (dlm->reco.node_map, O2NM_MAX_NODES, num);		if (num >= O2NM_MAX_NODES) {			break;		}		BUG_ON(num == dead_node);		ndata = kzalloc(sizeof(*ndata), GFP_NOFS);		if (!ndata) {			dlm_destroy_recovery_area(dlm, dead_node);			return -ENOMEM;		}		ndata->node_num = num;		ndata->state = DLM_RECO_NODE_DATA_INIT;		spin_lock(&dlm_reco_state_lock);		list_add_tail(&ndata->list, &dlm->reco.node_data);		spin_unlock(&dlm_reco_state_lock);		num++;	}	return 0;}static void dlm_destroy_recovery_area(struct dlm_ctxt *dlm, u8 dead_node){	struct dlm_reco_node_data *ndata, *next;	LIST_HEAD(tmplist);	spin_lock(&dlm_reco_state_lock);	list_splice_init(&dlm->reco.node_data, &tmplist);	spin_unlock(&dlm_reco_state_lock);	list_for_each_entry_safe(ndata, next, &tmplist, list) {		list_del_init(&ndata->list);		kfree(ndata);	}}static int dlm_request_all_locks(struct dlm_ctxt *dlm, u8 request_from,				 u8 dead_node){	struct dlm_lock_request lr;	enum dlm_status ret;	mlog(0, "\n");	mlog(0, "dlm_request_all_locks: dead node is %u, sending request "		  "to %u\n", dead_node, request_from);	memset(&lr, 0, sizeof(lr));	lr.node_idx = dlm->node_num;	lr.dead_node = dead_node;	// send message	ret = DLM_NOLOCKMGR;	ret = o2net_send_message(DLM_LOCK_REQUEST_MSG, dlm->key,				 &lr, sizeof(lr), request_from, NULL);	/* negative status is handled by caller */	if (ret < 0)		mlog_errno(ret);	// return from here, then	// sleep until all received or error	return ret;}int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data,				  void **ret_data){	struct dlm_ctxt *dlm = data;	struct dlm_lock_request *lr = (struct dlm_lock_request *)msg->buf;	char *buf = NULL;	struct dlm_work_item *item = NULL;	if (!dlm_grab(dlm))		return -EINVAL;	if (lr->dead_node != dlm->reco.dead_node) {		mlog(ML_ERROR, "%s: node %u sent dead_node=%u, but local "		     "dead_node is %u\n", dlm->name, lr->node_idx,		     lr->dead_node, dlm->reco.dead_node);		dlm_print_reco_node_status(dlm);		/* this is a hack */		dlm_put(dlm);		return -ENOMEM;	}	BUG_ON(lr->dead_node != dlm->reco.dead_node);	item = kzalloc(sizeof(*item), GFP_NOFS);	if (!item) {		dlm_put(dlm);		return -ENOMEM;	}	/* this will get freed by dlm_request_all_locks_worker */	buf = (char *) __get_free_page(GFP_NOFS);	if (!buf) {		kfree(item);		dlm_put(dlm);		return -ENOMEM;	}	/* queue up work for dlm_request_all_locks_worker */	dlm_grab(dlm);  /* get an extra ref for the work item */	dlm_init_work_item(dlm, item, dlm_request_all_locks_worker, buf);	item->u.ral.reco_master = lr->node_idx;	item->u.ral.dead_node = lr->dead_node;	spin_lock(&dlm->work_lock);	list_add_tail(&item->list, &dlm->work_list);	spin_unlock(&dlm->work_lock);	queue_work(dlm->dlm_worker, &dlm->dispatched_work);	dlm_put(dlm);	return 0;}static void dlm_request_all_locks_worker(struct dlm_work_item *item, void *data){	struct dlm_migratable_lockres *mres;	struct dlm_lock_resource *res;	struct dlm_ctxt *dlm;	LIST_HEAD(resources);	int ret;	u8 dead_node, reco_master;	int skip_all_done = 0;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -