📄 dlmrecovery.c
字号:
status = dlm_init_recovery_area(dlm, dead_node); if (status < 0) goto leave; /* safe to access the node data list without a lock, since this * process is the only one to change the list */ list_for_each(iter, &dlm->reco.node_data) { ndata = list_entry (iter, struct dlm_reco_node_data, list); BUG_ON(ndata->state != DLM_RECO_NODE_DATA_INIT); ndata->state = DLM_RECO_NODE_DATA_REQUESTING; mlog(0, "requesting lock info from node %u\n", ndata->node_num); if (ndata->node_num == dlm->node_num) { ndata->state = DLM_RECO_NODE_DATA_DONE; continue; } status = dlm_request_all_locks(dlm, ndata->node_num, dead_node); if (status < 0) { mlog_errno(status); if (dlm_is_host_down(status)) ndata->state = DLM_RECO_NODE_DATA_DEAD; else { destroy = 1; goto leave; } } switch (ndata->state) { case DLM_RECO_NODE_DATA_INIT: case DLM_RECO_NODE_DATA_FINALIZE_SENT: case DLM_RECO_NODE_DATA_REQUESTED: BUG(); break; case DLM_RECO_NODE_DATA_DEAD: mlog(0, "node %u died after requesting " "recovery info for node %u\n", ndata->node_num, dead_node); // start all over destroy = 1; status = -EAGAIN; goto leave; case DLM_RECO_NODE_DATA_REQUESTING: ndata->state = DLM_RECO_NODE_DATA_REQUESTED; mlog(0, "now receiving recovery data from " "node %u for dead node %u\n", ndata->node_num, dead_node); break; case DLM_RECO_NODE_DATA_RECEIVING: mlog(0, "already receiving recovery data from " "node %u for dead node %u\n", ndata->node_num, dead_node); break; case DLM_RECO_NODE_DATA_DONE: mlog(0, "already DONE receiving recovery data " "from node %u for dead node %u\n", ndata->node_num, dead_node); break; } } mlog(0, "done requesting all lock info\n"); /* nodes should be sending reco data now * just need to wait */ while (1) { /* check all the nodes now to see if we are * done, or if anyone died */ all_nodes_done = 1; spin_lock(&dlm_reco_state_lock); list_for_each(iter, &dlm->reco.node_data) { ndata = list_entry (iter, struct dlm_reco_node_data, list); mlog(0, "checking recovery state of node %u\n", ndata->node_num); switch (ndata->state) { case DLM_RECO_NODE_DATA_INIT: case DLM_RECO_NODE_DATA_REQUESTING: mlog(ML_ERROR, "bad ndata state for " "node %u: state=%d\n", ndata->node_num, ndata->state); BUG(); break; case DLM_RECO_NODE_DATA_DEAD: mlog(ML_NOTICE, "node %u died after " "requesting recovery info for " "node %u\n", ndata->node_num, dead_node); spin_unlock(&dlm_reco_state_lock); // start all over destroy = 1; status = -EAGAIN; /* instead of spinning like crazy here, * wait for the domain map to catch up * with the network state. otherwise this * can be hit hundreds of times before * the node is really seen as dead. */ wait_event_timeout(dlm->dlm_reco_thread_wq, dlm_is_node_dead(dlm, ndata->node_num), msecs_to_jiffies(1000)); mlog(0, "waited 1 sec for %u, " "dead? %s\n", ndata->node_num, dlm_is_node_dead(dlm, ndata->node_num) ? "yes" : "no"); goto leave; case DLM_RECO_NODE_DATA_RECEIVING: case DLM_RECO_NODE_DATA_REQUESTED: all_nodes_done = 0; break; case DLM_RECO_NODE_DATA_DONE: break; case DLM_RECO_NODE_DATA_FINALIZE_SENT: break; } } spin_unlock(&dlm_reco_state_lock); mlog(0, "pass #%d, all_nodes_done?: %s\n", ++pass, all_nodes_done?"yes":"no"); if (all_nodes_done) { int ret; /* all nodes are now in DLM_RECO_NODE_DATA_DONE state * just send a finalize message to everyone and * clean up */ mlog(0, "all nodes are done! send finalize\n"); ret = dlm_send_finalize_reco_message(dlm); if (ret < 0) mlog_errno(ret); spin_lock(&dlm->spinlock); dlm_finish_local_lockres_recovery(dlm, dead_node, dlm->node_num); spin_unlock(&dlm->spinlock); mlog(0, "should be done with recovery!\n"); mlog(0, "finishing recovery of %s at %lu, " "dead=%u, this=%u, new=%u\n", dlm->name, jiffies, dlm->reco.dead_node, dlm->node_num, dlm->reco.new_master); destroy = 1; status = ret; /* rescan everything marked dirty along the way */ dlm_kick_thread(dlm, NULL); break; } /* wait to be signalled, with periodic timeout * to check for node death */ wait_event_interruptible_timeout(dlm->dlm_reco_thread_wq, kthread_should_stop(), msecs_to_jiffies(DLM_RECO_THREAD_TIMEOUT_MS)); }leave: if (destroy) dlm_destroy_recovery_area(dlm, dead_node); mlog_exit(status); return status;}static int dlm_init_recovery_area(struct dlm_ctxt *dlm, u8 dead_node){ int num=0; struct dlm_reco_node_data *ndata; spin_lock(&dlm->spinlock); memcpy(dlm->reco.node_map, dlm->domain_map, sizeof(dlm->domain_map)); /* nodes can only be removed (by dying) after dropping * this lock, and death will be trapped later, so this should do */ spin_unlock(&dlm->spinlock); while (1) { num = find_next_bit (dlm->reco.node_map, O2NM_MAX_NODES, num); if (num >= O2NM_MAX_NODES) { break; } BUG_ON(num == dead_node); ndata = kcalloc(1, sizeof(*ndata), GFP_KERNEL); if (!ndata) { dlm_destroy_recovery_area(dlm, dead_node); return -ENOMEM; } ndata->node_num = num; ndata->state = DLM_RECO_NODE_DATA_INIT; spin_lock(&dlm_reco_state_lock); list_add_tail(&ndata->list, &dlm->reco.node_data); spin_unlock(&dlm_reco_state_lock); num++; } return 0;}static void dlm_destroy_recovery_area(struct dlm_ctxt *dlm, u8 dead_node){ struct list_head *iter, *iter2; struct dlm_reco_node_data *ndata; LIST_HEAD(tmplist); spin_lock(&dlm_reco_state_lock); list_splice_init(&dlm->reco.node_data, &tmplist); spin_unlock(&dlm_reco_state_lock); list_for_each_safe(iter, iter2, &tmplist) { ndata = list_entry (iter, struct dlm_reco_node_data, list); list_del_init(&ndata->list); kfree(ndata); }}static int dlm_request_all_locks(struct dlm_ctxt *dlm, u8 request_from, u8 dead_node){ struct dlm_lock_request lr; enum dlm_status ret; mlog(0, "\n"); mlog(0, "dlm_request_all_locks: dead node is %u, sending request " "to %u\n", dead_node, request_from); memset(&lr, 0, sizeof(lr)); lr.node_idx = dlm->node_num; lr.dead_node = dead_node; // send message ret = DLM_NOLOCKMGR; ret = o2net_send_message(DLM_LOCK_REQUEST_MSG, dlm->key, &lr, sizeof(lr), request_from, NULL); /* negative status is handled by caller */ if (ret < 0) mlog_errno(ret); // return from here, then // sleep until all received or error return ret;}int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data){ struct dlm_ctxt *dlm = data; struct dlm_lock_request *lr = (struct dlm_lock_request *)msg->buf; char *buf = NULL; struct dlm_work_item *item = NULL; if (!dlm_grab(dlm)) return -EINVAL; BUG_ON(lr->dead_node != dlm->reco.dead_node); item = kcalloc(1, sizeof(*item), GFP_KERNEL); if (!item) { dlm_put(dlm); return -ENOMEM; } /* this will get freed by dlm_request_all_locks_worker */ buf = (char *) __get_free_page(GFP_KERNEL); if (!buf) { kfree(item); dlm_put(dlm); return -ENOMEM; } /* queue up work for dlm_request_all_locks_worker */ dlm_grab(dlm); /* get an extra ref for the work item */ dlm_init_work_item(dlm, item, dlm_request_all_locks_worker, buf); item->u.ral.reco_master = lr->node_idx; item->u.ral.dead_node = lr->dead_node; spin_lock(&dlm->work_lock); list_add_tail(&item->list, &dlm->work_list); spin_unlock(&dlm->work_lock); schedule_work(&dlm->dispatched_work); dlm_put(dlm); return 0;}static void dlm_request_all_locks_worker(struct dlm_work_item *item, void *data){ struct dlm_migratable_lockres *mres; struct dlm_lock_resource *res; struct dlm_ctxt *dlm; LIST_HEAD(resources); struct list_head *iter; int ret; u8 dead_node, reco_master; dlm = item->dlm; dead_node = item->u.ral.dead_node; reco_master = item->u.ral.reco_master; mres = (struct dlm_migratable_lockres *)data; if (dead_node != dlm->reco.dead_node || reco_master != dlm->reco.new_master) { /* show extra debug info if the recovery state is messed */ mlog(ML_ERROR, "%s: bad reco state: reco(dead=%u, master=%u), " "request(dead=%u, master=%u)\n", dlm->name, dlm->reco.dead_node, dlm->reco.new_master, dead_node, reco_master); mlog(ML_ERROR, "%s: name=%.*s master=%u locks=%u/%u flags=%u " "entry[0]={c=%"MLFu64",l=%u,f=%u,t=%d,ct=%d,hb=%d,n=%u}\n", dlm->name, mres->lockname_len, mres->lockname, mres->master, mres->num_locks, mres->total_locks, mres->flags, mres->ml[0].cookie, mres->ml[0].list, mres->ml[0].flags, mres->ml[0].type, mres->ml[0].convert_type, mres->ml[0].highest_blocked, mres->ml[0].node); BUG(); } BUG_ON(dead_node != dlm->reco.dead_node); BUG_ON(reco_master != dlm->reco.new_master); /* lock resources should have already been moved to the * dlm->reco.resources list. now move items from that list * to a temp list if the dead owner matches. note that the * whole cluster recovers only one node at a time, so we * can safely move UNKNOWN lock resources for each recovery * session. */ dlm_move_reco_locks_to_list(dlm, &resources, dead_node); /* now we can begin blasting lockreses without the dlm lock */ list_for_each(iter, &resources) { res = list_entry (iter, struct dlm_lock_resource, recovering); ret = dlm_send_one_lockres(dlm, res, mres, reco_master, DLM_MRES_RECOVERY); if (ret < 0) mlog_errno(ret); } /* move the resources back to the list */ spin_lock(&dlm->spinlock); list_splice_init(&resources, &dlm->reco.resources); spin_unlock(&dlm->spinlock); ret = dlm_send_all_done_msg(dlm, dead_node, reco_master); if (ret < 0) mlog_errno(ret); free_page((unsigned long)data);}static int dlm_send_all_done_msg(struct dlm_ctxt *dlm, u8 dead_node, u8 send_to){ int ret, tmpret; struct dlm_reco_data_done done_msg; memset(&done_msg, 0, sizeof(done_msg)); done_msg.node_idx = dlm->node_num; done_msg.dead_node = dead_node; mlog(0, "sending DATA DONE message to %u, " "my node=%u, dead node=%u\n", send_to, done_msg.node_idx, done_msg.dead_node); ret = o2net_send_message(DLM_RECO_DATA_DONE_MSG, dlm->key, &done_msg, sizeof(done_msg), send_to, &tmpret); /* negative status is ignored by the caller */ if (ret >= 0) ret = tmpret; return ret;}int dlm_reco_data_done_handler(struct o2net_msg *msg, u32 len, void *data){ struct dlm_ctxt *dlm = data; struct dlm_reco_data_done *done = (struct dlm_reco_data_done *)msg->buf; struct list_head *iter; struct dlm_reco_node_data *ndata = NULL; int ret = -EINVAL; if (!dlm_grab(dlm)) return -EINVAL; mlog(0, "got DATA DONE: dead_node=%u, reco.dead_node=%u, " "node_idx=%u, this node=%u\n", done->dead_node, dlm->reco.dead_node, done->node_idx, dlm->node_num); BUG_ON(done->dead_node != dlm->reco.dead_node); spin_lock(&dlm_reco_state_lock); list_for_each(iter, &dlm->reco.node_data) { ndata = list_entry (iter, struct dlm_reco_node_data, list); if (ndata->node_num != done->node_idx) continue; switch (ndata->state) { /* should have moved beyond INIT but not to FINALIZE yet */ case DLM_RECO_NODE_DATA_INIT: case DLM_RECO_NODE_DATA_DEAD: case DLM_RECO_NODE_DATA_FINALIZE_SENT: mlog(ML_ERROR, "bad ndata state for node %u:" " state=%d\n", ndata->node_num, ndata->state); BUG(); break; /* these states are possible at this point, anywhere along * the line of recovery */ case DLM_RECO_NODE_DATA_DONE: case DLM_RECO_NODE_DATA_RECEIVING: case DLM_RECO_NODE_DATA_REQUESTED: case DLM_RECO_NODE_DATA_REQUESTING: mlog(0, "node %u is DONE sending " "recovery data!\n", ndata->node_num); ndata->state = DLM_RECO_NODE_DATA_DONE; ret = 0; break; } } spin_unlock(&dlm_reco_state_lock); /* wake the recovery thread, some node is done */ if (!ret) dlm_kick_recovery_thread(dlm); if (ret < 0) mlog(ML_ERROR, "failed to find recovery node data for node " "%u\n", done->node_idx); dlm_put(dlm); mlog(0, "leaving reco data done handler, ret=%d\n", ret); return ret;}static void dlm_move_reco_locks_to_list(struct dlm_ctxt *dlm, struct list_head *list, u8 dead_node){ struct dlm_lock_resource *res; struct list_head *iter, *iter2;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -