📄 dlmrecovery.c
字号:
dlm_get_lock_cookie_node(be64_to_cpu(c)), dlm_get_lock_cookie_seq(be64_to_cpu(c))); __dlm_print_one_lock_resource(res); BUG(); } BUG_ON(lock->ml.node != ml->node); if (tmpq != queue) { mlog(0, "lock was on %u instead of %u for %.*s\n", j, ml->list, res->lockname.len, res->lockname.name); spin_unlock(&res->spinlock); continue; } /* see NOTE above about why we do not update * to match the master here */ /* move the lock to its proper place */ /* do not alter lock refcount. switching lists. */ list_move_tail(&lock->list, queue); spin_unlock(&res->spinlock); added++; mlog(0, "just reordered a local lock!\n"); continue; } /* lock is for another node. */ newlock = dlm_new_lock(ml->type, ml->node, be64_to_cpu(ml->cookie), NULL); if (!newlock) { ret = -ENOMEM; goto leave; } lksb = newlock->lksb; dlm_lock_attach_lockres(newlock, res); if (ml->convert_type != LKM_IVMODE) { BUG_ON(queue != &res->converting); newlock->ml.convert_type = ml->convert_type; } lksb->flags |= (ml->flags & (DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB)); if (ml->type == LKM_NLMODE) goto skip_lvb; if (!dlm_lvb_is_empty(mres->lvb)) { if (lksb->flags & DLM_LKSB_PUT_LVB) { /* other node was trying to update * lvb when node died. recreate the * lksb with the updated lvb. */ memcpy(lksb->lvb, mres->lvb, DLM_LVB_LEN); /* the lock resource lvb update must happen * NOW, before the spinlock is dropped. * we no longer wait for the AST to update * the lvb. */ memcpy(res->lvb, mres->lvb, DLM_LVB_LEN); } else { /* otherwise, the node is sending its * most recent valid lvb info */ BUG_ON(ml->type != LKM_EXMODE && ml->type != LKM_PRMODE); if (!dlm_lvb_is_empty(res->lvb) && (ml->type == LKM_EXMODE || memcmp(res->lvb, mres->lvb, DLM_LVB_LEN))) { int i; mlog(ML_ERROR, "%s:%.*s: received bad " "lvb! type=%d\n", dlm->name, res->lockname.len, res->lockname.name, ml->type); printk("lockres lvb=["); for (i=0; i<DLM_LVB_LEN; i++) printk("%02x", res->lvb[i]); printk("]\nmigrated lvb=["); for (i=0; i<DLM_LVB_LEN; i++) printk("%02x", mres->lvb[i]); printk("]\n"); dlm_print_one_lock_resource(res); BUG(); } memcpy(res->lvb, mres->lvb, DLM_LVB_LEN); } }skip_lvb: /* NOTE: * wrt lock queue ordering and recovery: * 1. order of locks on granted queue is * meaningless. * 2. order of locks on converting queue is * LOST with the node death. sorry charlie. * 3. order of locks on the blocked queue is * also LOST. * order of locks does not affect integrity, it * just means that a lock request may get pushed * back in line as a result of the node death. * also note that for a given node the lock order * for its secondary queue locks is preserved * relative to each other, but clearly *not* * preserved relative to locks from other nodes. */ bad = 0; spin_lock(&res->spinlock); list_for_each_entry(lock, queue, list) { if (lock->ml.cookie == ml->cookie) { __be64 c = lock->ml.cookie; mlog(ML_ERROR, "%s:%.*s: %u:%llu: lock already " "exists on this lockres!\n", dlm->name, res->lockname.len, res->lockname.name, dlm_get_lock_cookie_node(be64_to_cpu(c)), dlm_get_lock_cookie_seq(be64_to_cpu(c))); mlog(ML_NOTICE, "sent lock: type=%d, conv=%d, " "node=%u, cookie=%u:%llu, queue=%d\n", ml->type, ml->convert_type, ml->node, dlm_get_lock_cookie_node(be64_to_cpu(ml->cookie)), dlm_get_lock_cookie_seq(be64_to_cpu(ml->cookie)), ml->list); __dlm_print_one_lock_resource(res); bad = 1; break; } } if (!bad) { dlm_lock_get(newlock); list_add_tail(&newlock->list, queue); mlog(0, "%s:%.*s: added lock for node %u, " "setting refmap bit\n", dlm->name, res->lockname.len, res->lockname.name, ml->node); dlm_lockres_set_refmap_bit(ml->node, res); added++; } spin_unlock(&res->spinlock); } mlog(0, "done running all the locks\n");leave: /* balance the ref taken when the work was queued */ spin_lock(&res->spinlock); dlm_lockres_drop_inflight_ref(dlm, res); spin_unlock(&res->spinlock); if (ret < 0) { mlog_errno(ret); if (newlock) dlm_lock_put(newlock); } mlog_exit(ret); return ret;}void dlm_move_lockres_to_recovery_list(struct dlm_ctxt *dlm, struct dlm_lock_resource *res){ int i; struct list_head *queue; struct dlm_lock *lock, *next; res->state |= DLM_LOCK_RES_RECOVERING; if (!list_empty(&res->recovering)) { mlog(0, "Recovering res %s:%.*s, is already on recovery list!\n", dlm->name, res->lockname.len, res->lockname.name); list_del_init(&res->recovering); } /* We need to hold a reference while on the recovery list */ dlm_lockres_get(res); list_add_tail(&res->recovering, &dlm->reco.resources); /* find any pending locks and put them back on proper list */ for (i=DLM_BLOCKED_LIST; i>=DLM_GRANTED_LIST; i--) { queue = dlm_list_idx_to_ptr(res, i); list_for_each_entry_safe(lock, next, queue, list) { dlm_lock_get(lock); if (lock->convert_pending) { /* move converting lock back to granted */ BUG_ON(i != DLM_CONVERTING_LIST); mlog(0, "node died with convert pending " "on %.*s. move back to granted list.\n", res->lockname.len, res->lockname.name); dlm_revert_pending_convert(res, lock); lock->convert_pending = 0; } else if (lock->lock_pending) { /* remove pending lock requests completely */ BUG_ON(i != DLM_BLOCKED_LIST); mlog(0, "node died with lock pending " "on %.*s. remove from blocked list and skip.\n", res->lockname.len, res->lockname.name); /* lock will be floating until ref in * dlmlock_remote is freed after the network * call returns. ok for it to not be on any * list since no ast can be called * (the master is dead). */ dlm_revert_pending_lock(res, lock); lock->lock_pending = 0; } else if (lock->unlock_pending) { /* if an unlock was in progress, treat as * if this had completed successfully * before sending this lock state to the * new master. note that the dlm_unlock * call is still responsible for calling * the unlockast. that will happen after * the network call times out. for now, * just move lists to prepare the new * recovery master. */ BUG_ON(i != DLM_GRANTED_LIST); mlog(0, "node died with unlock pending " "on %.*s. remove from blocked list and skip.\n", res->lockname.len, res->lockname.name); dlm_commit_pending_unlock(res, lock); lock->unlock_pending = 0; } else if (lock->cancel_pending) { /* if a cancel was in progress, treat as * if this had completed successfully * before sending this lock state to the * new master */ BUG_ON(i != DLM_CONVERTING_LIST); mlog(0, "node died with cancel pending " "on %.*s. move back to granted list.\n", res->lockname.len, res->lockname.name); dlm_commit_pending_cancel(res, lock); lock->cancel_pending = 0; } dlm_lock_put(lock); } }}/* removes all recovered locks from the recovery list. * sets the res->owner to the new master. * unsets the RECOVERY flag and wakes waiters. */static void dlm_finish_local_lockres_recovery(struct dlm_ctxt *dlm, u8 dead_node, u8 new_master){ int i; struct hlist_node *hash_iter; struct hlist_head *bucket; struct dlm_lock_resource *res, *next; mlog_entry_void(); assert_spin_locked(&dlm->spinlock); list_for_each_entry_safe(res, next, &dlm->reco.resources, recovering) { if (res->owner == dead_node) { list_del_init(&res->recovering); spin_lock(&res->spinlock); /* new_master has our reference from * the lock state sent during recovery */ dlm_change_lockres_owner(dlm, res, new_master); res->state &= ~DLM_LOCK_RES_RECOVERING; if (__dlm_lockres_has_locks(res)) __dlm_dirty_lockres(dlm, res); spin_unlock(&res->spinlock); wake_up(&res->wq); dlm_lockres_put(res); } } /* this will become unnecessary eventually, but * for now we need to run the whole hash, clear * the RECOVERING state and set the owner * if necessary */ for (i = 0; i < DLM_HASH_BUCKETS; i++) { bucket = dlm_lockres_hash(dlm, i); hlist_for_each_entry(res, hash_iter, bucket, hash_node) { if (res->state & DLM_LOCK_RES_RECOVERING) { if (res->owner == dead_node) { mlog(0, "(this=%u) res %.*s owner=%u " "was not on recovering list, but " "clearing state anyway\n", dlm->node_num, res->lockname.len, res->lockname.name, new_master); } else if (res->owner == dlm->node_num) { mlog(0, "(this=%u) res %.*s owner=%u " "was not on recovering list, " "owner is THIS node, clearing\n", dlm->node_num, res->lockname.len, res->lockname.name, new_master); } else continue; if (!list_empty(&res->recovering)) { mlog(0, "%s:%.*s: lockres was " "marked RECOVERING, owner=%u\n", dlm->name, res->lockname.len, res->lockname.name, res->owner); list_del_init(&res->recovering); dlm_lockres_put(res); } spin_lock(&res->spinlock); /* new_master has our reference from * the lock state sent during recovery */ dlm_change_lockres_owner(dlm, res, new_master); res->state &= ~DLM_LOCK_RES_RECOVERING; if (__dlm_lockres_has_locks(res)) __dlm_dirty_lockres(dlm, res); spin_unlock(&res->spinlock); wake_up(&res->wq); } } }}static inline int dlm_lvb_needs_invalidation(struct dlm_lock *lock, int local){ if (local) { if (lock->ml.type != LKM_EXMODE && lock->ml.type != LKM_PRMODE) return 1; } else if (lock->ml.type == LKM_EXMODE) return 1; return 0;}static void dlm_revalidate_lvb(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, u8 dead_node){ struct list_head *queue; struct dlm_lock *lock; int blank_lvb = 0, local = 0; int i; u8 search_node; assert_spin_locked(&dlm->spinlock); assert_spin_locked(&res->spinlock); if (res->owner == dlm->node_num) /* if this node owned the lockres, and if the dead node * had an EX when he died, blank out the lvb */ search_node = dead_node; else { /* if this is a secondary lockres, and we had no EX or PR * locks granted, we can no longer trust the lvb */ search_node = dlm->node_num; local = 1; /* check local state for valid lvb */ } for (i=DLM_GRANTED_LIST; i<=DLM_CONVERTING_LIST; i++) { queue = dlm_list_idx_to_ptr(res, i); list_for_each_entry(lock, queue, list) { if (lock->ml.node == search_node) { if (dlm_lvb_needs_invalidation(lock, local)) { /* zero the lksb lvb and lockres lvb */ blank_lvb = 1; memset(lock->lksb->lvb, 0, DLM_LVB_LEN); } } } } if (blank_lvb) { mlog(0, "clearing %.*s lvb, dead node %u had EX\n", res->lockname.len, res->lockname.name, dead_node); memset(res->lvb, 0, DLM_LVB_LEN); }}static void dlm_free_dead_locks(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, u8 dead_node){ struct dlm_lock *lock, *next; unsigned int freed = 0; /* this node is the lockres master: * 1) remove any stale locks for the dead node * 2) if the dead node had an EX when he died, blank out the lvb */ assert_spin_locked(&dlm->spinlock); assert_spin_locked(&res->spinlock); /* TODO: check pending_asts, pending_basts here */ list_for_each_entry_safe(lock, next, &res->granted, list) { if (lock->ml.node == dead_node) { list_del_init(&lock->list); dlm_lock_put(lock); freed++; } } list_for_each_entry_safe(lock, next, &res->converting, list) { if (lock->ml.node == dead_node) { list_del_init(&lock->list); dlm_lock_put(lock); freed++; } } list_for_each_entry_safe(lock, next, &res->blocked, list) { if (lock->ml.node == dead_node) { list_del_init(&lock->list); dlm_lock_put(lock); freed++; } } if (freed) { mlog(0, "%s:%.*s: freed %u locks for dead node %u, " "dropping ref from lockres\n", dlm->name, res->lockname.len, res->lockname.name, freed, dead_node); BUG_ON(!test_bit(dead_node, res->refmap)); dlm_lockres_clear_refmap_bit(dead_node, res); } else if (test_bit(dead_node, res->refmap)) { mlog(0, "%s:%.*s: dead node %u had a ref, but had " "no locks and had not purged before dying\n", dlm->name, res->lockname.len, res->lockname.name, dead_node); dlm_lockres_clear_refmap_bit(dead_node, res); } /* do not kick thread yet */ __dlm_dirty_lockres(dlm, res);}/* if this node is the recovery master, and there are no * locks for a given lockres owned by this node that are in * either PR or EX mode, zero out the lvb before requesting. * */static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node){ struct hlist_node *iter; struct dlm_lock_resource *res; int i; struct hlist_head *bucket; struct dlm_lock *lock; /* purge any stale mles */ dlm_clean_master_list(dlm, dead_node); /* * now clean up all lock resources. there are two rules: * * 1) if the dead node was the master, move the lockre
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -