⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dlmmaster.c

📁 linux2.6.16版本
💻 C
📖 第 1 页 / 共 5 页
字号:
		     mle->u.name.len, mle->u.name.name, mle->type);	} else {		mlog(0, "calling mle_release for %.*s, type %d\n",		     mle->u.res->lockname.len,		     mle->u.res->lockname.name, mle->type);	}	assert_spin_locked(&dlm->spinlock);	assert_spin_locked(&dlm->master_lock);	/* remove from list if not already */	if (!list_empty(&mle->list))		list_del_init(&mle->list);	/* detach the mle from the domain node up/down events */	__dlm_mle_detach_hb_events(dlm, mle);	/* NOTE: kfree under spinlock here.	 * if this is bad, we can move this to a freelist. */	kmem_cache_free(dlm_mle_cache, mle);}/* * LOCK RESOURCE FUNCTIONS */static void dlm_set_lockres_owner(struct dlm_ctxt *dlm,				  struct dlm_lock_resource *res,				  u8 owner){	assert_spin_locked(&res->spinlock);	mlog_entry("%.*s, %u\n", res->lockname.len, res->lockname.name, owner);	if (owner == dlm->node_num)		atomic_inc(&dlm->local_resources);	else if (owner == DLM_LOCK_RES_OWNER_UNKNOWN)		atomic_inc(&dlm->unknown_resources);	else		atomic_inc(&dlm->remote_resources);	res->owner = owner;}void dlm_change_lockres_owner(struct dlm_ctxt *dlm,			      struct dlm_lock_resource *res, u8 owner){	assert_spin_locked(&res->spinlock);	if (owner == res->owner)		return;	if (res->owner == dlm->node_num)		atomic_dec(&dlm->local_resources);	else if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN)		atomic_dec(&dlm->unknown_resources);	else		atomic_dec(&dlm->remote_resources);	dlm_set_lockres_owner(dlm, res, owner);}static void dlm_lockres_release(struct kref *kref){	struct dlm_lock_resource *res;	res = container_of(kref, struct dlm_lock_resource, refs);	/* This should not happen -- all lockres' have a name	 * associated with them at init time. */	BUG_ON(!res->lockname.name);	mlog(0, "destroying lockres %.*s\n", res->lockname.len,	     res->lockname.name);	/* By the time we're ready to blow this guy away, we shouldn't	 * be on any lists. */	BUG_ON(!hlist_unhashed(&res->hash_node));	BUG_ON(!list_empty(&res->granted));	BUG_ON(!list_empty(&res->converting));	BUG_ON(!list_empty(&res->blocked));	BUG_ON(!list_empty(&res->dirty));	BUG_ON(!list_empty(&res->recovering));	BUG_ON(!list_empty(&res->purge));	kfree(res->lockname.name);	kfree(res);}void dlm_lockres_get(struct dlm_lock_resource *res){	kref_get(&res->refs);}void dlm_lockres_put(struct dlm_lock_resource *res){	kref_put(&res->refs, dlm_lockres_release);}static void dlm_init_lockres(struct dlm_ctxt *dlm,			     struct dlm_lock_resource *res,			     const char *name, unsigned int namelen){	char *qname;	/* If we memset here, we lose our reference to the kmalloc'd	 * res->lockname.name, so be sure to init every field	 * correctly! */	qname = (char *) res->lockname.name;	memcpy(qname, name, namelen);	res->lockname.len = namelen;	res->lockname.hash = full_name_hash(name, namelen);	init_waitqueue_head(&res->wq);	spin_lock_init(&res->spinlock);	INIT_HLIST_NODE(&res->hash_node);	INIT_LIST_HEAD(&res->granted);	INIT_LIST_HEAD(&res->converting);	INIT_LIST_HEAD(&res->blocked);	INIT_LIST_HEAD(&res->dirty);	INIT_LIST_HEAD(&res->recovering);	INIT_LIST_HEAD(&res->purge);	atomic_set(&res->asts_reserved, 0);	res->migration_pending = 0;	kref_init(&res->refs);	/* just for consistency */	spin_lock(&res->spinlock);	dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN);	spin_unlock(&res->spinlock);	res->state = DLM_LOCK_RES_IN_PROGRESS;	res->last_used = 0;	memset(res->lvb, 0, DLM_LVB_LEN);}struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm,				   const char *name,				   unsigned int namelen){	struct dlm_lock_resource *res;	res = kmalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL);	if (!res)		return NULL;	res->lockname.name = kmalloc(namelen, GFP_KERNEL);	if (!res->lockname.name) {		kfree(res);		return NULL;	}	dlm_init_lockres(dlm, res, name, namelen);	return res;}/* * lookup a lock resource by name. * may already exist in the hashtable. * lockid is null terminated * * if not, allocate enough for the lockres and for * the temporary structure used in doing the mastering. * * also, do a lookup in the dlm->master_list to see * if another node has begun mastering the same lock. * if so, there should be a block entry in there * for this name, and we should *not* attempt to master * the lock here.   need to wait around for that node * to assert_master (or die). * */struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm,					  const char *lockid,					  int flags){	struct dlm_lock_resource *tmpres=NULL, *res=NULL;	struct dlm_master_list_entry *mle = NULL;	struct dlm_master_list_entry *alloc_mle = NULL;	int blocked = 0;	int ret, nodenum;	struct dlm_node_iter iter;	unsigned int namelen;	int tries = 0;	BUG_ON(!lockid);	namelen = strlen(lockid);	mlog(0, "get lockres %s (len %d)\n", lockid, namelen);lookup:	spin_lock(&dlm->spinlock);	tmpres = __dlm_lookup_lockres(dlm, lockid, namelen);	if (tmpres) {		spin_unlock(&dlm->spinlock);		mlog(0, "found in hash!\n");		if (res)			dlm_lockres_put(res);		res = tmpres;		goto leave;	}	if (!res) {		spin_unlock(&dlm->spinlock);		mlog(0, "allocating a new resource\n");		/* nothing found and we need to allocate one. */		alloc_mle = (struct dlm_master_list_entry *)			kmem_cache_alloc(dlm_mle_cache, GFP_KERNEL);		if (!alloc_mle)			goto leave;		res = dlm_new_lockres(dlm, lockid, namelen);		if (!res)			goto leave;		goto lookup;	}	mlog(0, "no lockres found, allocated our own: %p\n", res);	if (flags & LKM_LOCAL) {		/* caller knows it's safe to assume it's not mastered elsewhere		 * DONE!  return right away */		spin_lock(&res->spinlock);		dlm_change_lockres_owner(dlm, res, dlm->node_num);		__dlm_insert_lockres(dlm, res);		spin_unlock(&res->spinlock);		spin_unlock(&dlm->spinlock);		/* lockres still marked IN_PROGRESS */		goto wake_waiters;	}	/* check master list to see if another node has started mastering it */	spin_lock(&dlm->master_lock);	/* if we found a block, wait for lock to be mastered by another node */	blocked = dlm_find_mle(dlm, &mle, (char *)lockid, namelen);	if (blocked) {		if (mle->type == DLM_MLE_MASTER) {			mlog(ML_ERROR, "master entry for nonexistent lock!\n");			BUG();		} else if (mle->type == DLM_MLE_MIGRATION) {			/* migration is in progress! */			/* the good news is that we now know the			 * "current" master (mle->master). */			spin_unlock(&dlm->master_lock);			assert_spin_locked(&dlm->spinlock);			/* set the lockres owner and hash it */			spin_lock(&res->spinlock);			dlm_set_lockres_owner(dlm, res, mle->master);			__dlm_insert_lockres(dlm, res);			spin_unlock(&res->spinlock);			spin_unlock(&dlm->spinlock);			/* master is known, detach */			dlm_mle_detach_hb_events(dlm, mle);			dlm_put_mle(mle);			mle = NULL;			goto wake_waiters;		}	} else {		/* go ahead and try to master lock on this node */		mle = alloc_mle;		/* make sure this does not get freed below */		alloc_mle = NULL;		dlm_init_mle(mle, DLM_MLE_MASTER, dlm, res, NULL, 0);		set_bit(dlm->node_num, mle->maybe_map);		list_add(&mle->list, &dlm->master_list);	}	/* at this point there is either a DLM_MLE_BLOCK or a	 * DLM_MLE_MASTER on the master list, so it's safe to add the	 * lockres to the hashtable.  anyone who finds the lock will	 * still have to wait on the IN_PROGRESS. */	/* finally add the lockres to its hash bucket */	__dlm_insert_lockres(dlm, res);	/* get an extra ref on the mle in case this is a BLOCK	 * if so, the creator of the BLOCK may try to put the last	 * ref at this time in the assert master handler, so we	 * need an extra one to keep from a bad ptr deref. */	dlm_get_mle(mle);	spin_unlock(&dlm->master_lock);	spin_unlock(&dlm->spinlock);	/* must wait for lock to be mastered elsewhere */	if (blocked)		goto wait;redo_request:	ret = -EINVAL;	dlm_node_iter_init(mle->vote_map, &iter);	while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {		ret = dlm_do_master_request(mle, nodenum);		if (ret < 0)			mlog_errno(ret);		if (mle->master != O2NM_MAX_NODES) {			/* found a master ! */			break;		}	}wait:	/* keep going until the response map includes all nodes */	ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked);	if (ret < 0) {		mlog(0, "%s:%.*s: node map changed, redo the "		     "master request now, blocked=%d\n",		     dlm->name, res->lockname.len,		     res->lockname.name, blocked);		if (++tries > 20) {			mlog(ML_ERROR, "%s:%.*s: spinning on "			     "dlm_wait_for_lock_mastery, blocked=%d\n", 			     dlm->name, res->lockname.len, 			     res->lockname.name, blocked);			dlm_print_one_lock_resource(res);			/* dlm_print_one_mle(mle); */			tries = 0;		}		goto redo_request;	}	mlog(0, "lockres mastered by %u\n", res->owner);	/* make sure we never continue without this */	BUG_ON(res->owner == O2NM_MAX_NODES);	/* master is known, detach if not already detached */	dlm_mle_detach_hb_events(dlm, mle);	dlm_put_mle(mle);	/* put the extra ref */	dlm_put_mle(mle);wake_waiters:	spin_lock(&res->spinlock);	res->state &= ~DLM_LOCK_RES_IN_PROGRESS;	spin_unlock(&res->spinlock);	wake_up(&res->wq);leave:	/* need to free the unused mle */	if (alloc_mle)		kmem_cache_free(dlm_mle_cache, alloc_mle);	return res;}#define DLM_MASTERY_TIMEOUT_MS   5000static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm,				     struct dlm_lock_resource *res,				     struct dlm_master_list_entry *mle,				     int *blocked){	u8 m;	int ret, bit;	int map_changed, voting_done;	int assert, sleep;recheck:	ret = 0;	assert = 0;	/* check if another node has already become the owner */	spin_lock(&res->spinlock);	if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) {		spin_unlock(&res->spinlock);		goto leave;	}	spin_unlock(&res->spinlock);	spin_lock(&mle->spinlock);	m = mle->master;	map_changed = (memcmp(mle->vote_map, mle->node_map,			      sizeof(mle->vote_map)) != 0);	voting_done = (memcmp(mle->vote_map, mle->response_map,			     sizeof(mle->vote_map)) == 0);	/* restart if we hit any errors */	if (map_changed) {		int b;		mlog(0, "%s: %.*s: node map changed, restarting\n",		     dlm->name, res->lockname.len, res->lockname.name);		ret = dlm_restart_lock_mastery(dlm, res, mle, *blocked);		b = (mle->type == DLM_MLE_BLOCK);		if ((*blocked && !b) || (!*blocked && b)) {			mlog(0, "%s:%.*s: status change: old=%d new=%d\n", 			     dlm->name, res->lockname.len, res->lockname.name,			     *blocked, b);			*blocked = b;		}		spin_unlock(&mle->spinlock);		if (ret < 0) {			mlog_errno(ret);			goto leave;		}		mlog(0, "%s:%.*s: restart lock mastery succeeded, "		     "rechecking now\n", dlm->name, res->lockname.len,		     res->lockname.name);		goto recheck;	}	if (m != O2NM_MAX_NODES) {		/* another node has done an assert!		 * all done! */		sleep = 0;	} else {		sleep = 1;		/* have all nodes responded? */		if (voting_done && !*blocked) {			bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0);			if (dlm->node_num <= bit) {				/* my node number is lowest.			 	 * now tell other nodes that I am				 * mastering this. */				mle->master = dlm->node_num;				assert = 1;				sleep = 0;			}			/* if voting is done, but we have not received			 * an assert master yet, we must sleep */		}	}	spin_unlock(&mle->spinlock);	/* sleep if we haven't finished voting yet */	if (sleep) {		unsigned long timeo = msecs_to_jiffies(DLM_MASTERY_TIMEOUT_MS);		/*		if (atomic_read(&mle->mle_refs.refcount) < 2)			mlog(ML_ERROR, "mle (%p) refs=%d, name=%.*s\n", mle,			atomic_read(&mle->mle_refs.refcount),			res->lockname.len, res->lockname.name);		*/		atomic_set(&mle->woken, 0);		(void)wait_event_timeout(mle->wq,					 (atomic_read(&mle->woken) == 1),					 timeo);		if (res->owner == O2NM_MAX_NODES) {			mlog(0, "waiting again\n");			goto recheck;		}		mlog(0, "done waiting, master is %u\n", res->owner);		ret = 0;		goto leave;	}	ret = 0;   /* done */	if (assert) {		m = dlm->node_num;		mlog(0, "about to master %.*s here, this=%u\n",		     res->lockname.len, res->lockname.name, m);		ret = dlm_do_assert_master(dlm, res->lockname.name,					   res->lockname.len, mle->vote_map, 0);		if (ret) {			/* This is a failure in the network path,			 * not in the response to the assert_master			 * (any nonzero response is a BUG on this node).			 * Most likely a socket just got disconnected			 * due to node death. */			mlog_errno(ret);		}		/* no longer need to restart lock mastery.		 * all living nodes have been contacted. */		ret = 0;	}	/* set the lockres owner */	spin_lock(&res->spinlock);	dlm_change_lockres_owner(dlm, res, m);	spin_unlock(&res->spinlock);leave:	return ret;}struct dlm_bitmap_diff_iter{

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -