⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 dlmrecovery.c

📁 linux2.6.16版本
💻 C
📖 第 1 页 / 共 5 页
字号:
	struct dlm_lock *lock;	spin_lock(&dlm->spinlock);	list_for_each_safe(iter, iter2, &dlm->reco.resources) {		res = list_entry (iter, struct dlm_lock_resource, recovering);		/* always prune any $RECOVERY entries for dead nodes,		 * otherwise hangs can occur during later recovery */		if (dlm_is_recovery_lock(res->lockname.name,					 res->lockname.len)) {			spin_lock(&res->spinlock);			list_for_each_entry(lock, &res->granted, list) {				if (lock->ml.node == dead_node) {					mlog(0, "AHA! there was "					     "a $RECOVERY lock for dead "					     "node %u (%s)!\n", 					     dead_node, dlm->name);					list_del_init(&lock->list);					dlm_lock_put(lock);					break;				}			}			spin_unlock(&res->spinlock);			continue;		}		if (res->owner == dead_node) {			mlog(0, "found lockres owned by dead node while "				  "doing recovery for node %u. sending it.\n",				  dead_node);			list_del_init(&res->recovering);			list_add_tail(&res->recovering, list);		} else if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {			mlog(0, "found UNKNOWN owner while doing recovery "				  "for node %u. sending it.\n", dead_node);			list_del_init(&res->recovering);			list_add_tail(&res->recovering, list);		}	}	spin_unlock(&dlm->spinlock);}static inline int dlm_num_locks_in_lockres(struct dlm_lock_resource *res){	int total_locks = 0;	struct list_head *iter, *queue = &res->granted;	int i;	for (i=0; i<3; i++) {		list_for_each(iter, queue)			total_locks++;		queue++;	}	return total_locks;}static int dlm_send_mig_lockres_msg(struct dlm_ctxt *dlm,				      struct dlm_migratable_lockres *mres,				      u8 send_to,				      struct dlm_lock_resource *res,				      int total_locks){	u64 mig_cookie = be64_to_cpu(mres->mig_cookie);	int mres_total_locks = be32_to_cpu(mres->total_locks);	int sz, ret = 0, status = 0;	u8 orig_flags = mres->flags,	   orig_master = mres->master;	BUG_ON(mres->num_locks > DLM_MAX_MIGRATABLE_LOCKS);	if (!mres->num_locks)		return 0;	sz = sizeof(struct dlm_migratable_lockres) +		(mres->num_locks * sizeof(struct dlm_migratable_lock));	/* add an all-done flag if we reached the last lock */	orig_flags = mres->flags;	BUG_ON(total_locks > mres_total_locks);	if (total_locks == mres_total_locks)		mres->flags |= DLM_MRES_ALL_DONE;	/* send it */	ret = o2net_send_message(DLM_MIG_LOCKRES_MSG, dlm->key, mres,				 sz, send_to, &status);	if (ret < 0) {		/* XXX: negative status is not handled.		 * this will end up killing this node. */		mlog_errno(ret);	} else {		/* might get an -ENOMEM back here */		ret = status;		if (ret < 0) {			mlog_errno(ret);			if (ret == -EFAULT) {				mlog(ML_ERROR, "node %u told me to kill "				     "myself!\n", send_to);				BUG();			}		}	}	/* zero and reinit the message buffer */	dlm_init_migratable_lockres(mres, res->lockname.name,				    res->lockname.len, mres_total_locks,				    mig_cookie, orig_flags, orig_master);	return ret;}static void dlm_init_migratable_lockres(struct dlm_migratable_lockres *mres,					const char *lockname, int namelen,					int total_locks, u64 cookie,					u8 flags, u8 master){	/* mres here is one full page */	memset(mres, 0, PAGE_SIZE);	mres->lockname_len = namelen;	memcpy(mres->lockname, lockname, namelen);	mres->num_locks = 0;	mres->total_locks = cpu_to_be32(total_locks);	mres->mig_cookie = cpu_to_be64(cookie);	mres->flags = flags;	mres->master = master;}/* returns 1 if this lock fills the network structure, * 0 otherwise */static int dlm_add_lock_to_array(struct dlm_lock *lock,				 struct dlm_migratable_lockres *mres, int queue){	struct dlm_migratable_lock *ml;	int lock_num = mres->num_locks;	ml = &(mres->ml[lock_num]);	ml->cookie = lock->ml.cookie;	ml->type = lock->ml.type;	ml->convert_type = lock->ml.convert_type;	ml->highest_blocked = lock->ml.highest_blocked;	ml->list = queue;	if (lock->lksb) {		ml->flags = lock->lksb->flags;		/* send our current lvb */		if (ml->type == LKM_EXMODE ||		    ml->type == LKM_PRMODE) {			/* if it is already set, this had better be a PR			 * and it has to match */			if (mres->lvb[0] && (ml->type == LKM_EXMODE ||			    memcmp(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN))) {				mlog(ML_ERROR, "mismatched lvbs!\n");				__dlm_print_one_lock_resource(lock->lockres);				BUG();			}			memcpy(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN);		}	}	ml->node = lock->ml.node;	mres->num_locks++;	/* we reached the max, send this network message */	if (mres->num_locks == DLM_MAX_MIGRATABLE_LOCKS)		return 1;	return 0;}int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,			 struct dlm_migratable_lockres *mres,			 u8 send_to, u8 flags){	struct list_head *queue, *iter;	int total_locks, i;	u64 mig_cookie = 0;	struct dlm_lock *lock;	int ret = 0;	BUG_ON(!(flags & (DLM_MRES_RECOVERY|DLM_MRES_MIGRATION)));	mlog(0, "sending to %u\n", send_to);	total_locks = dlm_num_locks_in_lockres(res);	if (total_locks > DLM_MAX_MIGRATABLE_LOCKS) {		/* rare, but possible */		mlog(0, "argh.  lockres has %d locks.  this will "			  "require more than one network packet to "			  "migrate\n", total_locks);		mig_cookie = dlm_get_next_mig_cookie();	}	dlm_init_migratable_lockres(mres, res->lockname.name,				    res->lockname.len, total_locks,				    mig_cookie, flags, res->owner);	total_locks = 0;	for (i=DLM_GRANTED_LIST; i<=DLM_BLOCKED_LIST; i++) {		queue = dlm_list_idx_to_ptr(res, i);		list_for_each(iter, queue) {			lock = list_entry (iter, struct dlm_lock, list);			/* add another lock. */			total_locks++;			if (!dlm_add_lock_to_array(lock, mres, i))				continue;			/* this filled the lock message,			 * we must send it immediately. */			ret = dlm_send_mig_lockres_msg(dlm, mres, send_to,						       res, total_locks);			if (ret < 0) {				// TODO				mlog(ML_ERROR, "dlm_send_mig_lockres_msg "				     "returned %d, TODO\n", ret);				BUG();			}		}	}	/* flush any remaining locks */	ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, res, total_locks);	if (ret < 0) {		// TODO		mlog(ML_ERROR, "dlm_send_mig_lockres_msg returned %d, "		     "TODO\n", ret);		BUG();	}	return ret;}/* * this message will contain no more than one page worth of * recovery data, and it will work on only one lockres. * there may be many locks in this page, and we may need to wait * for additional packets to complete all the locks (rare, but * possible). *//* * NOTE: the allocation error cases here are scary * we really cannot afford to fail an alloc in recovery * do we spin?  returning an error only delays the problem really */int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data){	struct dlm_ctxt *dlm = data;	struct dlm_migratable_lockres *mres =		(struct dlm_migratable_lockres *)msg->buf;	int ret = 0;	u8 real_master;	char *buf = NULL;	struct dlm_work_item *item = NULL;	struct dlm_lock_resource *res = NULL;	if (!dlm_grab(dlm))		return -EINVAL;	BUG_ON(!(mres->flags & (DLM_MRES_RECOVERY|DLM_MRES_MIGRATION)));	real_master = mres->master;	if (real_master == DLM_LOCK_RES_OWNER_UNKNOWN) {		/* cannot migrate a lockres with no master */		BUG_ON(!(mres->flags & DLM_MRES_RECOVERY));	}	mlog(0, "%s message received from node %u\n",		  (mres->flags & DLM_MRES_RECOVERY) ?		  "recovery" : "migration", mres->master);	if (mres->flags & DLM_MRES_ALL_DONE)		mlog(0, "all done flag.  all lockres data received!\n");	ret = -ENOMEM;	buf = kmalloc(be16_to_cpu(msg->data_len), GFP_KERNEL);	item = kcalloc(1, sizeof(*item), GFP_KERNEL);	if (!buf || !item)		goto leave;	/* lookup the lock to see if we have a secondary queue for this	 * already...  just add the locks in and this will have its owner	 * and RECOVERY flag changed when it completes. */	res = dlm_lookup_lockres(dlm, mres->lockname, mres->lockname_len);	if (res) {	 	/* this will get a ref on res */		/* mark it as recovering/migrating and hash it */		spin_lock(&res->spinlock);		if (mres->flags & DLM_MRES_RECOVERY) {			res->state |= DLM_LOCK_RES_RECOVERING;		} else {			if (res->state & DLM_LOCK_RES_MIGRATING) {				/* this is at least the second				 * lockres message */				mlog(0, "lock %.*s is already migrating\n",					  mres->lockname_len,					  mres->lockname);			} else if (res->state & DLM_LOCK_RES_RECOVERING) {				/* caller should BUG */				mlog(ML_ERROR, "node is attempting to migrate "				     "lock %.*s, but marked as recovering!\n",				     mres->lockname_len, mres->lockname);				ret = -EFAULT;				spin_unlock(&res->spinlock);				goto leave;			}			res->state |= DLM_LOCK_RES_MIGRATING;		}		spin_unlock(&res->spinlock);	} else {		/* need to allocate, just like if it was		 * mastered here normally  */		res = dlm_new_lockres(dlm, mres->lockname, mres->lockname_len);		if (!res)			goto leave;		/* to match the ref that we would have gotten if		 * dlm_lookup_lockres had succeeded */		dlm_lockres_get(res);		/* mark it as recovering/migrating and hash it */		if (mres->flags & DLM_MRES_RECOVERY)			res->state |= DLM_LOCK_RES_RECOVERING;		else			res->state |= DLM_LOCK_RES_MIGRATING;		spin_lock(&dlm->spinlock);		__dlm_insert_lockres(dlm, res);		spin_unlock(&dlm->spinlock);		/* now that the new lockres is inserted,		 * make it usable by other processes */		spin_lock(&res->spinlock);		res->state &= ~DLM_LOCK_RES_IN_PROGRESS;		spin_unlock(&res->spinlock);		/* add an extra ref for just-allocated lockres 		 * otherwise the lockres will be purged immediately */		dlm_lockres_get(res);	}	/* at this point we have allocated everything we need,	 * and we have a hashed lockres with an extra ref and	 * the proper res->state flags. */	ret = 0;	if (mres->master == DLM_LOCK_RES_OWNER_UNKNOWN) {		/* migration cannot have an unknown master */		BUG_ON(!(mres->flags & DLM_MRES_RECOVERY));		mlog(0, "recovery has passed me a lockres with an "			  "unknown owner.. will need to requery: "			  "%.*s\n", mres->lockname_len, mres->lockname);	} else {		spin_lock(&res->spinlock);		dlm_change_lockres_owner(dlm, res, dlm->node_num);		spin_unlock(&res->spinlock);	}	/* queue up work for dlm_mig_lockres_worker */	dlm_grab(dlm);  /* get an extra ref for the work item */	memcpy(buf, msg->buf, be16_to_cpu(msg->data_len));  /* copy the whole message */	dlm_init_work_item(dlm, item, dlm_mig_lockres_worker, buf);	item->u.ml.lockres = res; /* already have a ref */	item->u.ml.real_master = real_master;	spin_lock(&dlm->work_lock);	list_add_tail(&item->list, &dlm->work_list);	spin_unlock(&dlm->work_lock);	schedule_work(&dlm->dispatched_work);leave:	dlm_put(dlm);	if (ret < 0) {		if (buf)			kfree(buf);		if (item)			kfree(item);	}	mlog_exit(ret);	return ret;}static void dlm_mig_lockres_worker(struct dlm_work_item *item, void *data){	struct dlm_ctxt *dlm = data;	struct dlm_migratable_lockres *mres;	int ret = 0;	struct dlm_lock_resource *res;	u8 real_master;	dlm = item->dlm;	mres = (struct dlm_migratable_lockres *)data;	res = item->u.ml.lockres;	real_master = item->u.ml.real_master;	if (real_master == DLM_LOCK_RES_OWNER_UNKNOWN) {		/* this case is super-rare. only occurs if		 * node death happens during migration. */again:		ret = dlm_lockres_master_requery(dlm, res, &real_master);		if (ret < 0) {			mlog(0, "dlm_lockres_master_requery ret=%d\n",				  ret);			goto again;		}		if (real_master == DLM_LOCK_RES_OWNER_UNKNOWN) {			mlog(0, "lockres %.*s not claimed.  "				   "this node will take it.\n",				   res->lockname.len, res->lockname.name);		} else {			mlog(0, "master needs to respond to sender "				  "that node %u still owns %.*s\n",				  real_master, res->lockname.len,				  res->lockname.name);			/* cannot touch this lockres */			goto leave;		}	}	ret = dlm_process_recovery_data(dlm, res, mres);	if (ret < 0)		mlog(0, "dlm_process_recovery_data returned  %d\n", ret);	else		mlog(0, "dlm_process_recovery_data succeeded\n");	if ((mres->flags & (DLM_MRES_MIGRATION|DLM_MRES_ALL_DONE)) ==	                   (DLM_MRES_MIGRATION|DLM_MRES_ALL_DONE)) {		ret = dlm_finish_migration(dlm, res, mres->master);		if (ret < 0)			mlog_errno(ret);	}leave:	kfree(data);	mlog_exit(ret);}static int dlm_lockres_master_requery(struct dlm_ctxt *dlm,				      struct dlm_lock_resource *res,				      u8 *real_master){	struct dlm_node_iter iter;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -