📄 dlmrecovery.c
字号:
int nodenum; int ret = 0; *real_master = DLM_LOCK_RES_OWNER_UNKNOWN; /* we only reach here if one of the two nodes in a * migration died while the migration was in progress. * at this point we need to requery the master. we * know that the new_master got as far as creating * an mle on at least one node, but we do not know * if any nodes had actually cleared the mle and set * the master to the new_master. the old master * is supposed to set the owner to UNKNOWN in the * event of a new_master death, so the only possible * responses that we can get from nodes here are * that the master is new_master, or that the master * is UNKNOWN. * if all nodes come back with UNKNOWN then we know * the lock needs remastering here. * if any node comes back with a valid master, check * to see if that master is the one that we are * recovering. if so, then the new_master died and * we need to remaster this lock. if not, then the * new_master survived and that node will respond to * other nodes about the owner. * if there is an owner, this node needs to dump this * lockres and alert the sender that this lockres * was rejected. */ spin_lock(&dlm->spinlock); dlm_node_iter_init(dlm->domain_map, &iter); spin_unlock(&dlm->spinlock); while ((nodenum = dlm_node_iter_next(&iter)) >= 0) { /* do not send to self */ if (nodenum == dlm->node_num) continue; ret = dlm_do_master_requery(dlm, res, nodenum, real_master); if (ret < 0) { mlog_errno(ret); BUG(); /* TODO: need to figure a way to restart this */ } if (*real_master != DLM_LOCK_RES_OWNER_UNKNOWN) { mlog(0, "lock master is %u\n", *real_master); break; } } return ret;}static int dlm_do_master_requery(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, u8 nodenum, u8 *real_master){ int ret = -EINVAL; struct dlm_master_requery req; int status = DLM_LOCK_RES_OWNER_UNKNOWN; memset(&req, 0, sizeof(req)); req.node_idx = dlm->node_num; req.namelen = res->lockname.len; memcpy(req.name, res->lockname.name, res->lockname.len); ret = o2net_send_message(DLM_MASTER_REQUERY_MSG, dlm->key, &req, sizeof(req), nodenum, &status); /* XXX: negative status not handled properly here. */ if (ret < 0) mlog_errno(ret); else { BUG_ON(status < 0); BUG_ON(status > DLM_LOCK_RES_OWNER_UNKNOWN); *real_master = (u8) (status & 0xff); mlog(0, "node %u responded to master requery with %u\n", nodenum, *real_master); ret = 0; } return ret;}/* this function cannot error, so unless the sending * or receiving of the message failed, the owner can * be trusted */int dlm_master_requery_handler(struct o2net_msg *msg, u32 len, void *data){ struct dlm_ctxt *dlm = data; struct dlm_master_requery *req = (struct dlm_master_requery *)msg->buf; struct dlm_lock_resource *res = NULL; int master = DLM_LOCK_RES_OWNER_UNKNOWN; u32 flags = DLM_ASSERT_MASTER_REQUERY; if (!dlm_grab(dlm)) { /* since the domain has gone away on this * node, the proper response is UNKNOWN */ return master; } spin_lock(&dlm->spinlock); res = __dlm_lookup_lockres(dlm, req->name, req->namelen); if (res) { spin_lock(&res->spinlock); master = res->owner; if (master == dlm->node_num) { int ret = dlm_dispatch_assert_master(dlm, res, 0, 0, flags); if (ret < 0) { mlog_errno(-ENOMEM); /* retry!? */ BUG(); } } spin_unlock(&res->spinlock); } spin_unlock(&dlm->spinlock); dlm_put(dlm); return master;}static inline struct list_head *dlm_list_num_to_pointer(struct dlm_lock_resource *res, int list_num){ struct list_head *ret; BUG_ON(list_num < 0); BUG_ON(list_num > 2); ret = &(res->granted); ret += list_num; return ret;}/* TODO: do ast flush business * TODO: do MIGRATING and RECOVERING spinning *//** NOTE about in-flight requests during migration:** Before attempting the migrate, the master has marked the lockres as* MIGRATING and then flushed all of its pending ASTS. So any in-flight* requests either got queued before the MIGRATING flag got set, in which* case the lock data will reflect the change and a return message is on* the way, or the request failed to get in before MIGRATING got set. In* this case, the caller will be told to spin and wait for the MIGRATING* flag to be dropped, then recheck the master.* This holds true for the convert, cancel and unlock cases, and since lvb* updates are tied to these same messages, it applies to lvb updates as* well. For the lock case, there is no way a lock can be on the master* queue and not be on the secondary queue since the lock is always added* locally first. This means that the new target node will never be sent* a lock that he doesn't already have on the list.* In total, this means that the local lock is correct and should not be* updated to match the one sent by the master. Any messages sent back* from the master before the MIGRATING flag will bring the lock properly* up-to-date, and the change will be ordered properly for the waiter.* We will *not* attempt to modify the lock underneath the waiter.*/static int dlm_process_recovery_data(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, struct dlm_migratable_lockres *mres){ struct dlm_migratable_lock *ml; struct list_head *queue; struct dlm_lock *newlock = NULL; struct dlm_lockstatus *lksb = NULL; int ret = 0; int i; struct list_head *iter; struct dlm_lock *lock = NULL; mlog(0, "running %d locks for this lockres\n", mres->num_locks); for (i=0; i<mres->num_locks; i++) { ml = &(mres->ml[i]); BUG_ON(ml->highest_blocked != LKM_IVMODE); newlock = NULL; lksb = NULL; queue = dlm_list_num_to_pointer(res, ml->list); /* if the lock is for the local node it needs to * be moved to the proper location within the queue. * do not allocate a new lock structure. */ if (ml->node == dlm->node_num) { /* MIGRATION ONLY! */ BUG_ON(!(mres->flags & DLM_MRES_MIGRATION)); spin_lock(&res->spinlock); list_for_each(iter, queue) { lock = list_entry (iter, struct dlm_lock, list); if (lock->ml.cookie != ml->cookie) lock = NULL; else break; } /* lock is always created locally first, and * destroyed locally last. it must be on the list */ if (!lock) { mlog(ML_ERROR, "could not find local lock " "with cookie %"MLFu64"!\n", ml->cookie); BUG(); } BUG_ON(lock->ml.node != ml->node); /* see NOTE above about why we do not update * to match the master here */ /* move the lock to its proper place */ /* do not alter lock refcount. switching lists. */ list_del_init(&lock->list); list_add_tail(&lock->list, queue); spin_unlock(&res->spinlock); mlog(0, "just reordered a local lock!\n"); continue; } /* lock is for another node. */ newlock = dlm_new_lock(ml->type, ml->node, be64_to_cpu(ml->cookie), NULL); if (!newlock) { ret = -ENOMEM; goto leave; } lksb = newlock->lksb; dlm_lock_attach_lockres(newlock, res); if (ml->convert_type != LKM_IVMODE) { BUG_ON(queue != &res->converting); newlock->ml.convert_type = ml->convert_type; } lksb->flags |= (ml->flags & (DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB)); if (mres->lvb[0]) { if (lksb->flags & DLM_LKSB_PUT_LVB) { /* other node was trying to update * lvb when node died. recreate the * lksb with the updated lvb. */ memcpy(lksb->lvb, mres->lvb, DLM_LVB_LEN); } else { /* otherwise, the node is sending its * most recent valid lvb info */ BUG_ON(ml->type != LKM_EXMODE && ml->type != LKM_PRMODE); if (res->lvb[0] && (ml->type == LKM_EXMODE || memcmp(res->lvb, mres->lvb, DLM_LVB_LEN))) { mlog(ML_ERROR, "received bad lvb!\n"); __dlm_print_one_lock_resource(res); BUG(); } memcpy(res->lvb, mres->lvb, DLM_LVB_LEN); } } /* NOTE: * wrt lock queue ordering and recovery: * 1. order of locks on granted queue is * meaningless. * 2. order of locks on converting queue is * LOST with the node death. sorry charlie. * 3. order of locks on the blocked queue is * also LOST. * order of locks does not affect integrity, it * just means that a lock request may get pushed * back in line as a result of the node death. * also note that for a given node the lock order * for its secondary queue locks is preserved * relative to each other, but clearly *not* * preserved relative to locks from other nodes. */ spin_lock(&res->spinlock); dlm_lock_get(newlock); list_add_tail(&newlock->list, queue); spin_unlock(&res->spinlock); } mlog(0, "done running all the locks\n");leave: if (ret < 0) { mlog_errno(ret); if (newlock) dlm_lock_put(newlock); } mlog_exit(ret); return ret;}void dlm_move_lockres_to_recovery_list(struct dlm_ctxt *dlm, struct dlm_lock_resource *res){ int i; struct list_head *queue, *iter, *iter2; struct dlm_lock *lock; res->state |= DLM_LOCK_RES_RECOVERING; if (!list_empty(&res->recovering)) list_del_init(&res->recovering); list_add_tail(&res->recovering, &dlm->reco.resources); /* find any pending locks and put them back on proper list */ for (i=DLM_BLOCKED_LIST; i>=DLM_GRANTED_LIST; i--) { queue = dlm_list_idx_to_ptr(res, i); list_for_each_safe(iter, iter2, queue) { lock = list_entry (iter, struct dlm_lock, list); dlm_lock_get(lock); if (lock->convert_pending) { /* move converting lock back to granted */ BUG_ON(i != DLM_CONVERTING_LIST); mlog(0, "node died with convert pending " "on %.*s. move back to granted list.\n", res->lockname.len, res->lockname.name); dlm_revert_pending_convert(res, lock); lock->convert_pending = 0; } else if (lock->lock_pending) { /* remove pending lock requests completely */ BUG_ON(i != DLM_BLOCKED_LIST); mlog(0, "node died with lock pending " "on %.*s. remove from blocked list and skip.\n", res->lockname.len, res->lockname.name); /* lock will be floating until ref in * dlmlock_remote is freed after the network * call returns. ok for it to not be on any * list since no ast can be called * (the master is dead). */ dlm_revert_pending_lock(res, lock); lock->lock_pending = 0; } else if (lock->unlock_pending) { /* if an unlock was in progress, treat as * if this had completed successfully * before sending this lock state to the * new master. note that the dlm_unlock * call is still responsible for calling * the unlockast. that will happen after * the network call times out. for now, * just move lists to prepare the new * recovery master. */ BUG_ON(i != DLM_GRANTED_LIST); mlog(0, "node died with unlock pending " "on %.*s. remove from blocked list and skip.\n", res->lockname.len, res->lockname.name); dlm_commit_pending_unlock(res, lock); lock->unlock_pending = 0; } else if (lock->cancel_pending) { /* if a cancel was in progress, treat as * if this had completed successfully * before sending this lock state to the * new master */ BUG_ON(i != DLM_CONVERTING_LIST); mlog(0, "node died with cancel pending " "on %.*s. move back to granted list.\n", res->lockname.len, res->lockname.name); dlm_commit_pending_cancel(res, lock); lock->cancel_pending = 0; } dlm_lock_put(lock); } }}/* removes all recovered locks from the recovery list. * sets the res->owner to the new master. * unsets the RECOVERY flag and wakes waiters. */static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm, u8 dead_node, u8 new_master){ int i; struct list_head *iter, *iter2; struct hlist_node *hash_iter; struct hlist_head *bucket; struct dlm_lock_resource *res; mlog_entry_void(); assert_spin_locked(&dlm->spinlock); list_for_each_safe(iter, iter2, &dlm->reco.resources) { res = list_entry (iter, struct dlm_lock_resource, recovering); if (res->owner == dead_node) { list_del_init(&res->recovering); spin_lock(&res->spinlock); dlm_change_lockres_owner(dlm, res, new_master); res->state &= ~DLM_LOCK_RES_RECOVERING; __dlm_dirty_lockres(dlm, res); spin_unlock(&res->spinlock); wake_up(&res->wq); } } /* this will become unnecessary eventually, but * for now we need to run the whole hash, clear * the RECOVERING state and set the owner * if necessary */ for (i = 0; i < DLM_HASH_BUCKETS; i++) { bucket = &(dlm->lockres_hash[i]); hlist_for_each_entry(res, hash_iter, bucket, hash_node) { if (res->state & DLM_LOCK_RES_RECOVERING) { if (res->owner == dead_node) { mlog(0, "(this=%u) res %.*s owner=%u " "was not on recovering list, but " "clearing state anyway\n", dlm->node_num, res->lockname.len, res->lockname.name, new_master); } else if (res->owner == dlm->node_num) { mlog(0, "(this=%u) res %.*s owner=%u " "was not on recovering list, " "owner is THIS node, clearing\n", dlm->node_num, res->lockname.len, res->lockname.name, new_master); } else continue; spin_lock(&res->spinlock); dlm_change_lockres_owner(dlm, res, new_master); res->state &= ~DLM_LOCK_RES_RECOVERING; __dlm_dirty_lockres(dlm, res); spin_unlock(&res->spinlock); wake_up(&res->wq); } } }}static inline int dlm_lvb_needs_invalidation(struct dlm_lock *lock, int local){ if (local) { if (lock->ml.type != LKM_EXMODE && lock->ml.type != LKM_PRMODE) return 1; } else if (lock->ml.type == LKM_EXMODE) return 1; return 0;}static void dlm_revalidate_lvb(struct dlm_ctxt *dlm,
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -