📄 dlmrecovery.c
字号:
struct dlm_lock_resource *res, u8 dead_node){ struct list_head *iter, *queue; struct dlm_lock *lock; int blank_lvb = 0, local = 0; int i; u8 search_node; assert_spin_locked(&dlm->spinlock); assert_spin_locked(&res->spinlock); if (res->owner == dlm->node_num) /* if this node owned the lockres, and if the dead node * had an EX when he died, blank out the lvb */ search_node = dead_node; else { /* if this is a secondary lockres, and we had no EX or PR * locks granted, we can no longer trust the lvb */ search_node = dlm->node_num; local = 1; /* check local state for valid lvb */ } for (i=DLM_GRANTED_LIST; i<=DLM_CONVERTING_LIST; i++) { queue = dlm_list_idx_to_ptr(res, i); list_for_each(iter, queue) { lock = list_entry (iter, struct dlm_lock, list); if (lock->ml.node == search_node) { if (dlm_lvb_needs_invalidation(lock, local)) { /* zero the lksb lvb and lockres lvb */ blank_lvb = 1; memset(lock->lksb->lvb, 0, DLM_LVB_LEN); } } } } if (blank_lvb) { mlog(0, "clearing %.*s lvb, dead node %u had EX\n", res->lockname.len, res->lockname.name, dead_node); memset(res->lvb, 0, DLM_LVB_LEN); }}static void dlm_free_dead_locks(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, u8 dead_node){ struct list_head *iter, *tmpiter; struct dlm_lock *lock; /* this node is the lockres master: * 1) remove any stale locks for the dead node * 2) if the dead node had an EX when he died, blank out the lvb */ assert_spin_locked(&dlm->spinlock); assert_spin_locked(&res->spinlock); /* TODO: check pending_asts, pending_basts here */ list_for_each_safe(iter, tmpiter, &res->granted) { lock = list_entry (iter, struct dlm_lock, list); if (lock->ml.node == dead_node) { list_del_init(&lock->list); dlm_lock_put(lock); } } list_for_each_safe(iter, tmpiter, &res->converting) { lock = list_entry (iter, struct dlm_lock, list); if (lock->ml.node == dead_node) { list_del_init(&lock->list); dlm_lock_put(lock); } } list_for_each_safe(iter, tmpiter, &res->blocked) { lock = list_entry (iter, struct dlm_lock, list); if (lock->ml.node == dead_node) { list_del_init(&lock->list); dlm_lock_put(lock); } } /* do not kick thread yet */ __dlm_dirty_lockres(dlm, res);}/* if this node is the recovery master, and there are no * locks for a given lockres owned by this node that are in * either PR or EX mode, zero out the lvb before requesting. * */static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node){ struct hlist_node *iter; struct dlm_lock_resource *res; int i; struct hlist_head *bucket; struct dlm_lock *lock; /* purge any stale mles */ dlm_clean_master_list(dlm, dead_node); /* * now clean up all lock resources. there are two rules: * * 1) if the dead node was the master, move the lockres * to the recovering list. set the RECOVERING flag. * this lockres needs to be cleaned up before it can * be used further. * * 2) if this node was the master, remove all locks from * each of the lockres queues that were owned by the * dead node. once recovery finishes, the dlm thread * can be kicked again to see if any ASTs or BASTs * need to be fired as a result. */ for (i = 0; i < DLM_HASH_BUCKETS; i++) { bucket = &(dlm->lockres_hash[i]); hlist_for_each_entry(res, iter, bucket, hash_node) { /* always prune any $RECOVERY entries for dead nodes, * otherwise hangs can occur during later recovery */ if (dlm_is_recovery_lock(res->lockname.name, res->lockname.len)) { spin_lock(&res->spinlock); list_for_each_entry(lock, &res->granted, list) { if (lock->ml.node == dead_node) { mlog(0, "AHA! there was " "a $RECOVERY lock for dead " "node %u (%s)!\n", dead_node, dlm->name); list_del_init(&lock->list); dlm_lock_put(lock); break; } } spin_unlock(&res->spinlock); continue; } spin_lock(&res->spinlock); /* zero the lvb if necessary */ dlm_revalidate_lvb(dlm, res, dead_node); if (res->owner == dead_node) dlm_move_lockres_to_recovery_list(dlm, res); else if (res->owner == dlm->node_num) { dlm_free_dead_locks(dlm, res, dead_node); __dlm_lockres_calc_usage(dlm, res); } spin_unlock(&res->spinlock); } }}static void __dlm_hb_node_down(struct dlm_ctxt *dlm, int idx){ assert_spin_locked(&dlm->spinlock); /* check to see if the node is already considered dead */ if (!test_bit(idx, dlm->live_nodes_map)) { mlog(0, "for domain %s, node %d is already dead. " "another node likely did recovery already.\n", dlm->name, idx); return; } /* check to see if we do not care about this node */ if (!test_bit(idx, dlm->domain_map)) { /* This also catches the case that we get a node down * but haven't joined the domain yet. */ mlog(0, "node %u already removed from domain!\n", idx); return; } clear_bit(idx, dlm->live_nodes_map); /* Clean up join state on node death. */ if (dlm->joining_node == idx) { mlog(0, "Clearing join state for node %u\n", idx); __dlm_set_joining_node(dlm, DLM_LOCK_RES_OWNER_UNKNOWN); } /* make sure local cleanup occurs before the heartbeat events */ if (!test_bit(idx, dlm->recovery_map)) dlm_do_local_recovery_cleanup(dlm, idx); /* notify anything attached to the heartbeat events */ dlm_hb_event_notify_attached(dlm, idx, 0); mlog(0, "node %u being removed from domain map!\n", idx); clear_bit(idx, dlm->domain_map); /* wake up migration waiters if a node goes down. * perhaps later we can genericize this for other waiters. */ wake_up(&dlm->migration_wq); if (test_bit(idx, dlm->recovery_map)) mlog(0, "domain %s, node %u already added " "to recovery map!\n", dlm->name, idx); else set_bit(idx, dlm->recovery_map);}void dlm_hb_node_down_cb(struct o2nm_node *node, int idx, void *data){ struct dlm_ctxt *dlm = data; if (!dlm_grab(dlm)) return; spin_lock(&dlm->spinlock); __dlm_hb_node_down(dlm, idx); spin_unlock(&dlm->spinlock); dlm_put(dlm);}void dlm_hb_node_up_cb(struct o2nm_node *node, int idx, void *data){ struct dlm_ctxt *dlm = data; if (!dlm_grab(dlm)) return; spin_lock(&dlm->spinlock); set_bit(idx, dlm->live_nodes_map); /* do NOT notify mle attached to the heartbeat events. * new nodes are not interesting in mastery until joined. */ spin_unlock(&dlm->spinlock); dlm_put(dlm);}static void dlm_reco_ast(void *astdata){ struct dlm_ctxt *dlm = astdata; mlog(0, "ast for recovery lock fired!, this=%u, dlm=%s\n", dlm->node_num, dlm->name);}static void dlm_reco_bast(void *astdata, int blocked_type){ struct dlm_ctxt *dlm = astdata; mlog(0, "bast for recovery lock fired!, this=%u, dlm=%s\n", dlm->node_num, dlm->name);}static void dlm_reco_unlock_ast(void *astdata, enum dlm_status st){ mlog(0, "unlockast for recovery lock fired!\n");}/* * dlm_pick_recovery_master will continually attempt to use * dlmlock() on the special "$RECOVERY" lockres with the * LKM_NOQUEUE flag to get an EX. every thread that enters * this function on each node racing to become the recovery * master will not stop attempting this until either: * a) this node gets the EX (and becomes the recovery master), * or b) dlm->reco.new_master gets set to some nodenum * != O2NM_INVALID_NODE_NUM (another node will do the reco). * so each time a recovery master is needed, the entire cluster * will sync at this point. if the new master dies, that will * be detected in dlm_do_recovery */static int dlm_pick_recovery_master(struct dlm_ctxt *dlm){ enum dlm_status ret; struct dlm_lockstatus lksb; int status = -EINVAL; mlog(0, "starting recovery of %s at %lu, dead=%u, this=%u\n", dlm->name, jiffies, dlm->reco.dead_node, dlm->node_num);again: memset(&lksb, 0, sizeof(lksb)); ret = dlmlock(dlm, LKM_EXMODE, &lksb, LKM_NOQUEUE|LKM_RECOVERY, DLM_RECOVERY_LOCK_NAME, dlm_reco_ast, dlm, dlm_reco_bast); mlog(0, "%s: dlmlock($RECOVERY) returned %d, lksb=%d\n", dlm->name, ret, lksb.status); if (ret == DLM_NORMAL) { mlog(0, "dlm=%s dlmlock says I got it (this=%u)\n", dlm->name, dlm->node_num); /* got the EX lock. check to see if another node * just became the reco master */ if (dlm_reco_master_ready(dlm)) { mlog(0, "%s: got reco EX lock, but %u will " "do the recovery\n", dlm->name, dlm->reco.new_master); status = -EEXIST; } else { status = 0; /* see if recovery was already finished elsewhere */ spin_lock(&dlm->spinlock); if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) { status = -EINVAL; mlog(0, "%s: got reco EX lock, but " "node got recovered already\n", dlm->name); if (dlm->reco.new_master != O2NM_INVALID_NODE_NUM) { mlog(ML_ERROR, "%s: new master is %u " "but no dead node!\n", dlm->name, dlm->reco.new_master); BUG(); } } spin_unlock(&dlm->spinlock); } /* if this node has actually become the recovery master, * set the master and send the messages to begin recovery */ if (!status) { mlog(0, "%s: dead=%u, this=%u, sending " "begin_reco now\n", dlm->name, dlm->reco.dead_node, dlm->node_num); status = dlm_send_begin_reco_message(dlm, dlm->reco.dead_node); /* this always succeeds */ BUG_ON(status); /* set the new_master to this node */ spin_lock(&dlm->spinlock); dlm->reco.new_master = dlm->node_num; spin_unlock(&dlm->spinlock); } /* recovery lock is a special case. ast will not get fired, * so just go ahead and unlock it. */ ret = dlmunlock(dlm, &lksb, 0, dlm_reco_unlock_ast, dlm); if (ret == DLM_DENIED) { mlog(0, "got DLM_DENIED, trying LKM_CANCEL\n"); ret = dlmunlock(dlm, &lksb, LKM_CANCEL, dlm_reco_unlock_ast, dlm); } if (ret != DLM_NORMAL) { /* this would really suck. this could only happen * if there was a network error during the unlock * because of node death. this means the unlock * is actually "done" and the lock structure is * even freed. we can continue, but only * because this specific lock name is special. */ mlog(ML_ERROR, "dlmunlock returned %d\n", ret); } } else if (ret == DLM_NOTQUEUED) { mlog(0, "dlm=%s dlmlock says another node got it (this=%u)\n", dlm->name, dlm->node_num); /* another node is master. wait on * reco.new_master != O2NM_INVALID_NODE_NUM * for at most one second */ wait_event_timeout(dlm->dlm_reco_thread_wq, dlm_reco_master_ready(dlm), msecs_to_jiffies(1000)); if (!dlm_reco_master_ready(dlm)) { mlog(0, "%s: reco master taking awhile\n", dlm->name); goto again; } /* another node has informed this one that it is reco master */ mlog(0, "%s: reco master %u is ready to recover %u\n", dlm->name, dlm->reco.new_master, dlm->reco.dead_node); status = -EEXIST; } else { struct dlm_lock_resource *res; /* dlmlock returned something other than NOTQUEUED or NORMAL */ mlog(ML_ERROR, "%s: got %s from dlmlock($RECOVERY), " "lksb.status=%s\n", dlm->name, dlm_errname(ret), dlm_errname(lksb.status)); res = dlm_lookup_lockres(dlm, DLM_RECOVERY_LOCK_NAME, DLM_RECOVERY_LOCK_NAME_LEN); if (res) { dlm_print_one_lock_resource(res); dlm_lockres_put(res); } else { mlog(ML_ERROR, "recovery lock not found\n"); } BUG(); } return status;}static int dlm_send_begin_reco_message(struct dlm_ctxt *dlm, u8 dead_node){ struct dlm_begin_reco br; int ret = 0; struct dlm_node_iter iter; int nodenum; int status; mlog_entry("%u\n", dead_node); mlog(0, "dead node is %u\n", dead_node); spin_lock(&dlm->spinlock); dlm_node_iter_init(dlm->domain_map, &iter); spin_unlock(&dlm->spinlock); clear_bit(dead_node, iter.node_map); memset(&br, 0, sizeof(br)); br.node_idx = dlm->node_num; br.dead_node = dead_node; while ((nodenum = dlm_node_iter_next(&iter)) >= 0) { ret = 0; if (nodenum == dead_node) { mlog(0, "not sending begin reco to dead node " "%u\n", dead_node); continue; } if (nodenum == dlm->node_num) { mlog(0, "not sending begin reco to self\n"); continue; }retry: ret = -EINVAL; mlog(0, "attempting to send begin reco msg to %d\n", nodenum); ret = o2net_send_message(DLM_BEGIN_RECO_MSG, dlm->key, &br, sizeof(br), nodenum, &status); /* negative status is handled ok by caller here */ if (ret >= 0) ret = status; if (dlm_is_host_down(ret)) { /* node is down. not involved in recovery * so just keep going */ mlog(0, "%s: node %u was down when sending " "begin reco msg (%d)\n", dlm->name, nodenum, ret); ret = 0; } if (ret < 0) { struct dlm_lock_resource *res; /* this is now a serious problem, possibly ENOMEM * in the network stack. must retry */ mlog_errno(ret); mlog(ML_ERROR, "begin reco of dlm %s to node %u " " returned %d\n", dlm->name, nodenum, ret); res = dlm_lookup_lockres(dlm, DLM_RECOVERY_LOCK_NAME, DLM_RECOVERY_LOCK_NAME_LEN); if (res) { dlm_print_one_lock_reso
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -