📄 dlmrecovery.c
字号:
mlog(0, "%s: recovery worker started, dead=%u, master=%u\n", dlm->name, dead_node, reco_master); if (dead_node != dlm->reco.dead_node || reco_master != dlm->reco.new_master) { /* worker could have been created before the recovery master * died. if so, do not continue, but do not error. */ if (dlm->reco.new_master == O2NM_INVALID_NODE_NUM) { mlog(ML_NOTICE, "%s: will not send recovery state, " "recovery master %u died, thread=(dead=%u,mas=%u)" " current=(dead=%u,mas=%u)\n", dlm->name, reco_master, dead_node, reco_master, dlm->reco.dead_node, dlm->reco.new_master); } else { mlog(ML_NOTICE, "%s: reco state invalid: reco(dead=%u, " "master=%u), request(dead=%u, master=%u)\n", dlm->name, dlm->reco.dead_node, dlm->reco.new_master, dead_node, reco_master); } goto leave; } /* lock resources should have already been moved to the * dlm->reco.resources list. now move items from that list * to a temp list if the dead owner matches. note that the * whole cluster recovers only one node at a time, so we * can safely move UNKNOWN lock resources for each recovery * session. */ dlm_move_reco_locks_to_list(dlm, &resources, dead_node); /* now we can begin blasting lockreses without the dlm lock */ /* any errors returned will be due to the new_master dying, * the dlm_reco_thread should detect this */ list_for_each_entry(res, &resources, recovering) { ret = dlm_send_one_lockres(dlm, res, mres, reco_master, DLM_MRES_RECOVERY); if (ret < 0) { mlog(ML_ERROR, "%s: node %u went down while sending " "recovery state for dead node %u, ret=%d\n", dlm->name, reco_master, dead_node, ret); skip_all_done = 1; break; } } /* move the resources back to the list */ spin_lock(&dlm->spinlock); list_splice_init(&resources, &dlm->reco.resources); spin_unlock(&dlm->spinlock); if (!skip_all_done) { ret = dlm_send_all_done_msg(dlm, dead_node, reco_master); if (ret < 0) { mlog(ML_ERROR, "%s: node %u went down while sending " "recovery all-done for dead node %u, ret=%d\n", dlm->name, reco_master, dead_node, ret); } }leave: free_page((unsigned long)data);}static int dlm_send_all_done_msg(struct dlm_ctxt *dlm, u8 dead_node, u8 send_to){ int ret, tmpret; struct dlm_reco_data_done done_msg; memset(&done_msg, 0, sizeof(done_msg)); done_msg.node_idx = dlm->node_num; done_msg.dead_node = dead_node; mlog(0, "sending DATA DONE message to %u, " "my node=%u, dead node=%u\n", send_to, done_msg.node_idx, done_msg.dead_node); ret = o2net_send_message(DLM_RECO_DATA_DONE_MSG, dlm->key, &done_msg, sizeof(done_msg), send_to, &tmpret); if (ret < 0) { if (!dlm_is_host_down(ret)) { mlog_errno(ret); mlog(ML_ERROR, "%s: unknown error sending data-done " "to %u\n", dlm->name, send_to); BUG(); } } else ret = tmpret; return ret;}int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data, void **ret_data){ struct dlm_ctxt *dlm = data; struct dlm_reco_data_done *done = (struct dlm_reco_data_done *)msg->buf; struct dlm_reco_node_data *ndata = NULL; int ret = -EINVAL; if (!dlm_grab(dlm)) return -EINVAL; mlog(0, "got DATA DONE: dead_node=%u, reco.dead_node=%u, " "node_idx=%u, this node=%u\n", done->dead_node, dlm->reco.dead_node, done->node_idx, dlm->node_num); mlog_bug_on_msg((done->dead_node != dlm->reco.dead_node), "Got DATA DONE: dead_node=%u, reco.dead_node=%u, " "node_idx=%u, this node=%u\n", done->dead_node, dlm->reco.dead_node, done->node_idx, dlm->node_num); spin_lock(&dlm_reco_state_lock); list_for_each_entry(ndata, &dlm->reco.node_data, list) { if (ndata->node_num != done->node_idx) continue; switch (ndata->state) { /* should have moved beyond INIT but not to FINALIZE yet */ case DLM_RECO_NODE_DATA_INIT: case DLM_RECO_NODE_DATA_DEAD: case DLM_RECO_NODE_DATA_FINALIZE_SENT: mlog(ML_ERROR, "bad ndata state for node %u:" " state=%d\n", ndata->node_num, ndata->state); BUG(); break; /* these states are possible at this point, anywhere along * the line of recovery */ case DLM_RECO_NODE_DATA_DONE: case DLM_RECO_NODE_DATA_RECEIVING: case DLM_RECO_NODE_DATA_REQUESTED: case DLM_RECO_NODE_DATA_REQUESTING: mlog(0, "node %u is DONE sending " "recovery data!\n", ndata->node_num); ndata->state = DLM_RECO_NODE_DATA_DONE; ret = 0; break; } } spin_unlock(&dlm_reco_state_lock); /* wake the recovery thread, some node is done */ if (!ret) dlm_kick_recovery_thread(dlm); if (ret < 0) mlog(ML_ERROR, "failed to find recovery node data for node " "%u\n", done->node_idx); dlm_put(dlm); mlog(0, "leaving reco data done handler, ret=%d\n", ret); return ret;}static void dlm_move_reco_locks_to_list(struct dlm_ctxt *dlm, struct list_head *list, u8 dead_node){ struct dlm_lock_resource *res, *next; struct dlm_lock *lock; spin_lock(&dlm->spinlock); list_for_each_entry_safe(res, next, &dlm->reco.resources, recovering) { /* always prune any $RECOVERY entries for dead nodes, * otherwise hangs can occur during later recovery */ if (dlm_is_recovery_lock(res->lockname.name, res->lockname.len)) { spin_lock(&res->spinlock); list_for_each_entry(lock, &res->granted, list) { if (lock->ml.node == dead_node) { mlog(0, "AHA! there was " "a $RECOVERY lock for dead " "node %u (%s)!\n", dead_node, dlm->name); list_del_init(&lock->list); dlm_lock_put(lock); break; } } spin_unlock(&res->spinlock); continue; } if (res->owner == dead_node) { mlog(0, "found lockres owned by dead node while " "doing recovery for node %u. sending it.\n", dead_node); list_move_tail(&res->recovering, list); } else if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) { mlog(0, "found UNKNOWN owner while doing recovery " "for node %u. sending it.\n", dead_node); list_move_tail(&res->recovering, list); } } spin_unlock(&dlm->spinlock);}static inline int dlm_num_locks_in_lockres(struct dlm_lock_resource *res){ int total_locks = 0; struct list_head *iter, *queue = &res->granted; int i; for (i=0; i<3; i++) { list_for_each(iter, queue) total_locks++; queue++; } return total_locks;}static int dlm_send_mig_lockres_msg(struct dlm_ctxt *dlm, struct dlm_migratable_lockres *mres, u8 send_to, struct dlm_lock_resource *res, int total_locks){ u64 mig_cookie = be64_to_cpu(mres->mig_cookie); int mres_total_locks = be32_to_cpu(mres->total_locks); int sz, ret = 0, status = 0; u8 orig_flags = mres->flags, orig_master = mres->master; BUG_ON(mres->num_locks > DLM_MAX_MIGRATABLE_LOCKS); if (!mres->num_locks) return 0; sz = sizeof(struct dlm_migratable_lockres) + (mres->num_locks * sizeof(struct dlm_migratable_lock)); /* add an all-done flag if we reached the last lock */ orig_flags = mres->flags; BUG_ON(total_locks > mres_total_locks); if (total_locks == mres_total_locks) mres->flags |= DLM_MRES_ALL_DONE; mlog(0, "%s:%.*s: sending mig lockres (%s) to %u\n", dlm->name, res->lockname.len, res->lockname.name, orig_flags & DLM_MRES_MIGRATION ? "migrate" : "recovery", send_to); /* send it */ ret = o2net_send_message(DLM_MIG_LOCKRES_MSG, dlm->key, mres, sz, send_to, &status); if (ret < 0) { /* XXX: negative status is not handled. * this will end up killing this node. */ mlog_errno(ret); } else { /* might get an -ENOMEM back here */ ret = status; if (ret < 0) { mlog_errno(ret); if (ret == -EFAULT) { mlog(ML_ERROR, "node %u told me to kill " "myself!\n", send_to); BUG(); } } } /* zero and reinit the message buffer */ dlm_init_migratable_lockres(mres, res->lockname.name, res->lockname.len, mres_total_locks, mig_cookie, orig_flags, orig_master); return ret;}static void dlm_init_migratable_lockres(struct dlm_migratable_lockres *mres, const char *lockname, int namelen, int total_locks, u64 cookie, u8 flags, u8 master){ /* mres here is one full page */ clear_page(mres); mres->lockname_len = namelen; memcpy(mres->lockname, lockname, namelen); mres->num_locks = 0; mres->total_locks = cpu_to_be32(total_locks); mres->mig_cookie = cpu_to_be64(cookie); mres->flags = flags; mres->master = master;}/* returns 1 if this lock fills the network structure, * 0 otherwise */static int dlm_add_lock_to_array(struct dlm_lock *lock, struct dlm_migratable_lockres *mres, int queue){ struct dlm_migratable_lock *ml; int lock_num = mres->num_locks; ml = &(mres->ml[lock_num]); ml->cookie = lock->ml.cookie; ml->type = lock->ml.type; ml->convert_type = lock->ml.convert_type; ml->highest_blocked = lock->ml.highest_blocked; ml->list = queue; if (lock->lksb) { ml->flags = lock->lksb->flags; /* send our current lvb */ if (ml->type == LKM_EXMODE || ml->type == LKM_PRMODE) { /* if it is already set, this had better be a PR * and it has to match */ if (!dlm_lvb_is_empty(mres->lvb) && (ml->type == LKM_EXMODE || memcmp(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN))) { mlog(ML_ERROR, "mismatched lvbs!\n"); __dlm_print_one_lock_resource(lock->lockres); BUG(); } memcpy(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN); } } ml->node = lock->ml.node; mres->num_locks++; /* we reached the max, send this network message */ if (mres->num_locks == DLM_MAX_MIGRATABLE_LOCKS) return 1; return 0;}static void dlm_add_dummy_lock(struct dlm_ctxt *dlm, struct dlm_migratable_lockres *mres){ struct dlm_lock dummy; memset(&dummy, 0, sizeof(dummy)); dummy.ml.cookie = 0; dummy.ml.type = LKM_IVMODE; dummy.ml.convert_type = LKM_IVMODE; dummy.ml.highest_blocked = LKM_IVMODE; dummy.lksb = NULL; dummy.ml.node = dlm->node_num; dlm_add_lock_to_array(&dummy, mres, DLM_BLOCKED_LIST);}static inline int dlm_is_dummy_lock(struct dlm_ctxt *dlm, struct dlm_migratable_lock *ml, u8 *nodenum){ if (unlikely(ml->cookie == 0 && ml->type == LKM_IVMODE && ml->convert_type == LKM_IVMODE && ml->highest_blocked == LKM_IVMODE && ml->list == DLM_BLOCKED_LIST)) { *nodenum = ml->node; return 1; } return 0;}int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, struct dlm_migratable_lockres *mres, u8 send_to, u8 flags){ struct list_head *queue; int total_locks, i; u64 mig_cookie = 0; struct dlm_lock *lock; int ret = 0; BUG_ON(!(flags & (DLM_MRES_RECOVERY|DLM_MRES_MIGRATION))); mlog(0, "sending to %u\n", send_to); total_locks = dlm_num_locks_in_lockres(res); if (total_locks > DLM_MAX_MIGRATABLE_LOCKS) { /* rare, but possible */ mlog(0, "argh. lockres has %d locks. this will " "require more than one network packet to " "migrate\n", total_locks); mig_cookie = dlm_get_next_mig_cookie(); } dlm_init_migratable_lockres(mres, res->lockname.name, res->lockname.len, total_locks, mig_cookie, flags, res->owner); total_locks = 0; for (i=DLM_GRANTED_LIST; i<=DLM_BLOCKED_LIST; i++) { queue = dlm_list_idx_to_ptr(res, i); list_for_each_entry(lock, queue, list) { /* add another lock. */ total_locks++; if (!dlm_add_lock_to_array(lock, mres, i)) continue; /* this filled the lock message, * we must send it immediately. */ ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, res, total_locks); if (ret < 0) goto error; } } if (total_locks == 0) { /* send a dummy lock to indicate a mastery reference only */ mlog(0, "%s:%.*s: sending dummy lock to %u, %s\n", dlm->name, res->lockname.len, res->lockname.name, send_to, flags & DLM_MRES_RECOVERY ? "recovery" : "migration"); dlm_add_dummy_lock(dlm, mres); } /* flush any remaining locks */ ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, res, total_locks); if (ret < 0) goto error; return ret;error: mlog(ML_ERROR, "%s: dlm_send_mig_lockres_msg returned %d\n", dlm->name, ret); if (!dlm_is_host_down(ret)) BUG(); mlog(0, "%s: node %u went down while sending %s " "lockres %.*s\n", dlm->name, send_to, flags & DLM_MRES_RECOVERY ? "recovery" : "migration", res->lockname.len, res->lockname.name); return ret;}/* * this message will contain no more than one page worth of * recovery data, and it will work on only one lockres. * there may be many locks in this page, and we may need to wait * for additional packets to complete all the locks (rare, but * possible). *//* * NOTE: the allocation error cases here are scary
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -