⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dlmmaster.c

📁 linux2.6.16版本
💻 C
📖 第 1 页 / 共 5 页
字号:
	 * also, make sure that all callers of dlm_get_mle	 * take both dlm->spinlock and dlm->master_lock */	spin_lock(&dlm->spinlock);	spin_lock(&dlm->master_lock);	dlm_get_mle(mle);	spin_unlock(&dlm->master_lock);	spin_unlock(&dlm->spinlock);	/* notify new node and send all lock state */	/* call send_one_lockres with migration flag.	 * this serves as notice to the target node that a	 * migration is starting. */	ret = dlm_send_one_lockres(dlm, res, mres, target,				   DLM_MRES_MIGRATION);	if (ret < 0) {		mlog(0, "migration to node %u failed with %d\n",		     target, ret);		/* migration failed, detach and clean up mle */		dlm_mle_detach_hb_events(dlm, mle);		dlm_put_mle(mle);		dlm_put_mle(mle);		goto leave;	}	/* at this point, the target sends a message to all nodes,	 * (using dlm_do_migrate_request).  this node is skipped since	 * we had to put an mle in the list to begin the process.  this	 * node now waits for target to do an assert master.  this node	 * will be the last one notified, ensuring that the migration	 * is complete everywhere.  if the target dies while this is	 * going on, some nodes could potentially see the target as the	 * master, so it is important that my recovery finds the migration	 * mle and sets the master to UNKNONWN. */	/* wait for new node to assert master */	while (1) {		ret = wait_event_interruptible_timeout(mle->wq,					(atomic_read(&mle->woken) == 1),					msecs_to_jiffies(5000));		if (ret >= 0) {		       	if (atomic_read(&mle->woken) == 1 ||			    res->owner == target)				break;			mlog(0, "timed out during migration\n");			/* avoid hang during shutdown when migrating lockres 			 * to a node which also goes down */			if (dlm_is_node_dead(dlm, target)) {				mlog(0, "%s:%.*s: expected migration target %u "				     "is no longer up.  restarting.\n",				     dlm->name, res->lockname.len,				     res->lockname.name, target);				ret = -ERESTARTSYS;			}		}		if (ret == -ERESTARTSYS) {			/* migration failed, detach and clean up mle */			dlm_mle_detach_hb_events(dlm, mle);			dlm_put_mle(mle);			dlm_put_mle(mle);			goto leave;		}		/* TODO: if node died: stop, clean up, return error */	}	/* all done, set the owner, clear the flag */	spin_lock(&res->spinlock);	dlm_set_lockres_owner(dlm, res, target);	res->state &= ~DLM_LOCK_RES_MIGRATING;	dlm_remove_nonlocal_locks(dlm, res);	spin_unlock(&res->spinlock);	wake_up(&res->wq);	/* master is known, detach if not already detached */	dlm_mle_detach_hb_events(dlm, mle);	dlm_put_mle(mle);	ret = 0;	dlm_lockres_calc_usage(dlm, res);leave:	/* re-dirty the lockres if we failed */	if (ret < 0)		dlm_kick_thread(dlm, res);	/* TODO: cleanup */	if (mres)		free_page((unsigned long)mres);	dlm_put(dlm);	mlog(0, "returning %d\n", ret);	return ret;}EXPORT_SYMBOL_GPL(dlm_migrate_lockres);int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock){	int ret;	spin_lock(&dlm->ast_lock);	spin_lock(&lock->spinlock);	ret = (list_empty(&lock->bast_list) && !lock->bast_pending);	spin_unlock(&lock->spinlock);	spin_unlock(&dlm->ast_lock);	return ret;}static int dlm_migration_can_proceed(struct dlm_ctxt *dlm,				     struct dlm_lock_resource *res,				     u8 mig_target){	int can_proceed;	spin_lock(&res->spinlock);	can_proceed = !!(res->state & DLM_LOCK_RES_MIGRATING);	spin_unlock(&res->spinlock);	/* target has died, so make the caller break out of the 	 * wait_event, but caller must recheck the domain_map */	spin_lock(&dlm->spinlock);	if (!test_bit(mig_target, dlm->domain_map))		can_proceed = 1;	spin_unlock(&dlm->spinlock);	return can_proceed;}int dlm_lockres_is_dirty(struct dlm_ctxt *dlm, struct dlm_lock_resource *res){	int ret;	spin_lock(&res->spinlock);	ret = !!(res->state & DLM_LOCK_RES_DIRTY);	spin_unlock(&res->spinlock);	return ret;}static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm,				       struct dlm_lock_resource *res,				       u8 target){	int ret = 0;	mlog(0, "dlm_mark_lockres_migrating: %.*s, from %u to %u\n",	       res->lockname.len, res->lockname.name, dlm->node_num,	       target);	/* need to set MIGRATING flag on lockres.  this is done by	 * ensuring that all asts have been flushed for this lockres. */	spin_lock(&res->spinlock);	BUG_ON(res->migration_pending);	res->migration_pending = 1;	/* strategy is to reserve an extra ast then release	 * it below, letting the release do all of the work */	__dlm_lockres_reserve_ast(res);	spin_unlock(&res->spinlock);	/* now flush all the pending asts.. hang out for a bit */	dlm_kick_thread(dlm, res);	wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res));	dlm_lockres_release_ast(dlm, res);	mlog(0, "about to wait on migration_wq, dirty=%s\n",	       res->state & DLM_LOCK_RES_DIRTY ? "yes" : "no");	/* if the extra ref we just put was the final one, this	 * will pass thru immediately.  otherwise, we need to wait	 * for the last ast to finish. */again:	ret = wait_event_interruptible_timeout(dlm->migration_wq,		   dlm_migration_can_proceed(dlm, res, target),		   msecs_to_jiffies(1000));	if (ret < 0) {		mlog(0, "woken again: migrating? %s, dead? %s\n",		       res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",		       test_bit(target, dlm->domain_map) ? "no":"yes");	} else {		mlog(0, "all is well: migrating? %s, dead? %s\n",		       res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no",		       test_bit(target, dlm->domain_map) ? "no":"yes");	}	if (!dlm_migration_can_proceed(dlm, res, target)) {		mlog(0, "trying again...\n");		goto again;	}	/* did the target go down or die? */	spin_lock(&dlm->spinlock);	if (!test_bit(target, dlm->domain_map)) {		mlog(ML_ERROR, "aha. migration target %u just went down\n",		     target);		ret = -EHOSTDOWN;	}	spin_unlock(&dlm->spinlock);	/*	 * at this point:	 *	 *   o the DLM_LOCK_RES_MIGRATING flag is set	 *   o there are no pending asts on this lockres	 *   o all processes trying to reserve an ast on this	 *     lockres must wait for the MIGRATING flag to clear	 */	return ret;}/* last step in the migration process. * original master calls this to free all of the dlm_lock * structures that used to be for other nodes. */static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm,				      struct dlm_lock_resource *res){	struct list_head *iter, *iter2;	struct list_head *queue = &res->granted;	int i;	struct dlm_lock *lock;	assert_spin_locked(&res->spinlock);	BUG_ON(res->owner == dlm->node_num);	for (i=0; i<3; i++) {		list_for_each_safe(iter, iter2, queue) {			lock = list_entry (iter, struct dlm_lock, list);			if (lock->ml.node != dlm->node_num) {				mlog(0, "putting lock for node %u\n",				     lock->ml.node);				/* be extra careful */				BUG_ON(!list_empty(&lock->ast_list));				BUG_ON(!list_empty(&lock->bast_list));				BUG_ON(lock->ast_pending);				BUG_ON(lock->bast_pending);				list_del_init(&lock->list);				dlm_lock_put(lock);			}		}		queue++;	}}/* for now this is not too intelligent.  we will * need stats to make this do the right thing. * this just finds the first lock on one of the * queues and uses that node as the target. */static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm,				    struct dlm_lock_resource *res){	int i;	struct list_head *queue = &res->granted;	struct list_head *iter;	struct dlm_lock *lock;	int nodenum;	assert_spin_locked(&dlm->spinlock);	spin_lock(&res->spinlock);	for (i=0; i<3; i++) {		list_for_each(iter, queue) {			/* up to the caller to make sure this node			 * is alive */			lock = list_entry (iter, struct dlm_lock, list);			if (lock->ml.node != dlm->node_num) {				spin_unlock(&res->spinlock);				return lock->ml.node;			}		}		queue++;	}	spin_unlock(&res->spinlock);	mlog(0, "have not found a suitable target yet! checking domain map\n");	/* ok now we're getting desperate.  pick anyone alive. */	nodenum = -1;	while (1) {		nodenum = find_next_bit(dlm->domain_map,					O2NM_MAX_NODES, nodenum+1);		mlog(0, "found %d in domain map\n", nodenum);		if (nodenum >= O2NM_MAX_NODES)			break;		if (nodenum != dlm->node_num) {			mlog(0, "picking %d\n", nodenum);			return nodenum;		}	}	mlog(0, "giving up.  no master to migrate to\n");	return DLM_LOCK_RES_OWNER_UNKNOWN;}/* this is called by the new master once all lockres * data has been received */static int dlm_do_migrate_request(struct dlm_ctxt *dlm,				  struct dlm_lock_resource *res,				  u8 master, u8 new_master,				  struct dlm_node_iter *iter){	struct dlm_migrate_request migrate;	int ret, status = 0;	int nodenum;	memset(&migrate, 0, sizeof(migrate));	migrate.namelen = res->lockname.len;	memcpy(migrate.name, res->lockname.name, migrate.namelen);	migrate.new_master = new_master;	migrate.master = master;	ret = 0;	/* send message to all nodes, except the master and myself */	while ((nodenum = dlm_node_iter_next(iter)) >= 0) {		if (nodenum == master ||		    nodenum == new_master)			continue;		ret = o2net_send_message(DLM_MIGRATE_REQUEST_MSG, dlm->key,					 &migrate, sizeof(migrate), nodenum,					 &status);		if (ret < 0)			mlog_errno(ret);		else if (status < 0) {			mlog(0, "migrate request (node %u) returned %d!\n",			     nodenum, status);			ret = status;		}	}	if (ret < 0)		mlog_errno(ret);	mlog(0, "returning ret=%d\n", ret);	return ret;}/* if there is an existing mle for this lockres, we now know who the master is. * (the one who sent us *this* message) we can clear it up right away. * since the process that put the mle on the list still has a reference to it, * we can unhash it now, set the master and wake the process.  as a result, * we will have no mle in the list to start with.  now we can add an mle for * the migration and this should be the only one found for those scanning the * list.  */int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data){	struct dlm_ctxt *dlm = data;	struct dlm_lock_resource *res = NULL;	struct dlm_migrate_request *migrate = (struct dlm_migrate_request *) msg->buf;	struct dlm_master_list_entry *mle = NULL, *oldmle = NULL;	const char *name;	unsigned int namelen;	int ret = 0;	if (!dlm_grab(dlm))		return -EINVAL;	name = migrate->name;	namelen = migrate->namelen;	/* preallocate.. if this fails, abort */	mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache,							 GFP_KERNEL);	if (!mle) {		ret = -ENOMEM;		goto leave;	}	/* check for pre-existing lock */	spin_lock(&dlm->spinlock);	res = __dlm_lookup_lockres(dlm, name, namelen);	spin_lock(&dlm->master_lock);	if (res) {		spin_lock(&res->spinlock);		if (res->state & DLM_LOCK_RES_RECOVERING) {			/* if all is working ok, this can only mean that we got		 	* a migrate request from a node that we now see as		 	* dead.  what can we do here?  drop it to the floor? */			spin_unlock(&res->spinlock);			mlog(ML_ERROR, "Got a migrate request, but the "			     "lockres is marked as recovering!");			kmem_cache_free(dlm_mle_cache, mle);			ret = -EINVAL; /* need a better solution */			goto unlock;		}		res->state |= DLM_LOCK_RES_MIGRATING;		spin_unlock(&res->spinlock);	}	/* ignore status.  only nonzero status would BUG. */	ret = dlm_add_migration_mle(dlm, res, mle, &oldmle,				    name, namelen,				    migrate->new_master,				    migrate->master);unlock:	spin_unlock(&dlm->master_lock);	spin_unlock(&dlm->spinlock);	if (oldmle) {		/* master is known, detach if not already detached */		dlm_mle_detach_hb_events(dlm, oldmle);		dlm_put_mle(oldmle);	}	if (res)		dlm_lockres_put(res);leave:	dlm_put(dlm);	return ret;}/* must be holding dlm->spinlock and dlm->master_lock * when adding a migration mle, we can clear any other mles * in the master list because we know with certainty that * the master is "master".  so we remove any old mle from * the list after setting it's master field, and then add * the new migration mle.  this way we can hold with the rule * of having only one mle for a given lock name at all times. */static int dlm_add_migration_mle(struct dlm_ctxt *dlm,				 struct dlm_lock_resource *res,				 struct dlm_master_list_entry *mle,				 struct dlm_master_list_entry **oldmle,				 const char *name, unsigned int namelen,				 u8 new_master, u8 master){	int found;	int ret = 0;	*oldmle = NULL;	mlog_entry_void();	assert_spin_locked(&dlm->spinlock);	assert_spin_locked(&dlm->master_lock);	/* caller is responsible for any ref taken here on oldmle */	found = dlm_find_mle(dlm, oldmle, (char *)name, namelen);	if (found) {		struct dlm_master_list_entry *tmp = *oldmle;		spin_lock(&tmp->spinlock);		if (tmp->type == DLM_MLE_MIGRATION) {			if (master == dlm->node_num) {				/* ah another process raced me to it */				mlog(0, "tried to migrate %.*s, but some "				     "process beat me to it\n",				     namelen, name);				ret = -EEXIST;			} else {				/* bad.  2 NODES are trying to migrate! */				mlog(ML_ERROR, "migration error  mle: "				     "master=%u new_master=%u // request: "				     "master=%u new_master=%u // "				     "lockres=%.*s\n",				     tmp->master, tmp->new_master,				     master, new_master,				     namelen, name);				BUG();			}		} else {			/* this is essentially what assert_master does */			tmp->master = master;			atomic_set(&tmp->woken, 1);			wake_up(&tmp->wq);			/* remove it from the list so that only one			 * mle will be found */			list_del_init(&tmp->list);		}		spin_unlock(&tmp->spinlock);	}	/* now add a migration mle to the tail of the list */	dlm_init_mle(mle, DLM_MLE_MIGRATION, dlm, res, name, namelen);	mle->new_master = new_master;	mle->master = master;	/* do this for consistency with other mle types */	set_bit(new_master, mle->maybe_map);	list_add(&mle->list, &dlm->master_list);	return ret;}void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node){	struct list_head *iter,

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -