📄 dlmrecovery.c
字号:
spin_lock(&dlm->spinlock); BUG_ON(!(dlm->reco.state & DLM_RECO_STATE_ACTIVE)); dlm->reco.state &= ~DLM_RECO_STATE_ACTIVE; spin_unlock(&dlm->spinlock); wake_up(&dlm->reco.event);}static int dlm_do_recovery(struct dlm_ctxt *dlm){ int status = 0; int ret; spin_lock(&dlm->spinlock); /* check to see if the new master has died */ if (dlm->reco.new_master != O2NM_INVALID_NODE_NUM && test_bit(dlm->reco.new_master, dlm->recovery_map)) { mlog(0, "new master %u died while recovering %u!\n", dlm->reco.new_master, dlm->reco.dead_node); /* unset the new_master, leave dead_node */ dlm_set_reco_master(dlm, O2NM_INVALID_NODE_NUM); } /* select a target to recover */ if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) { int bit; bit = find_next_bit (dlm->recovery_map, O2NM_MAX_NODES+1, 0); if (bit >= O2NM_MAX_NODES || bit < 0) dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM); else dlm_set_reco_dead_node(dlm, bit); } else if (!test_bit(dlm->reco.dead_node, dlm->recovery_map)) { /* BUG? */ mlog(ML_ERROR, "dead_node %u no longer in recovery map!\n", dlm->reco.dead_node); dlm_set_reco_dead_node(dlm, O2NM_INVALID_NODE_NUM); } if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) { // mlog(0, "nothing to recover! sleeping now!\n"); spin_unlock(&dlm->spinlock); /* return to main thread loop and sleep. */ return 0; } mlog(0, "%s(%d):recovery thread found node %u in the recovery map!\n", dlm->name, task_pid_nr(dlm->dlm_reco_thread_task), dlm->reco.dead_node); spin_unlock(&dlm->spinlock); /* take write barrier */ /* (stops the list reshuffling thread, proxy ast handling) */ dlm_begin_recovery(dlm); if (dlm->reco.new_master == dlm->node_num) goto master_here; if (dlm->reco.new_master == O2NM_INVALID_NODE_NUM) { /* choose a new master, returns 0 if this node * is the master, -EEXIST if it's another node. * this does not return until a new master is chosen * or recovery completes entirely. */ ret = dlm_pick_recovery_master(dlm); if (!ret) { /* already notified everyone. go. */ goto master_here; } mlog(0, "another node will master this recovery session.\n"); } mlog(0, "dlm=%s (%d), new_master=%u, this node=%u, dead_node=%u\n", dlm->name, task_pid_nr(dlm->dlm_reco_thread_task), dlm->reco.new_master, dlm->node_num, dlm->reco.dead_node); /* it is safe to start everything back up here * because all of the dead node's lock resources * have been marked as in-recovery */ dlm_end_recovery(dlm); /* sleep out in main dlm_recovery_thread loop. */ return 0;master_here: mlog(0, "(%d) mastering recovery of %s:%u here(this=%u)!\n", task_pid_nr(dlm->dlm_reco_thread_task), dlm->name, dlm->reco.dead_node, dlm->node_num); status = dlm_remaster_locks(dlm, dlm->reco.dead_node); if (status < 0) { /* we should never hit this anymore */ mlog(ML_ERROR, "error %d remastering locks for node %u, " "retrying.\n", status, dlm->reco.dead_node); /* yield a bit to allow any final network messages * to get handled on remaining nodes */ msleep(100); } else { /* success! see if any other nodes need recovery */ mlog(0, "DONE mastering recovery of %s:%u here(this=%u)!\n", dlm->name, dlm->reco.dead_node, dlm->node_num); dlm_reset_recovery(dlm); } dlm_end_recovery(dlm); /* continue and look for another dead node */ return -EAGAIN;}static int dlm_remaster_locks(struct dlm_ctxt *dlm, u8 dead_node){ int status = 0; struct dlm_reco_node_data *ndata; int all_nodes_done; int destroy = 0; int pass = 0; do { /* we have become recovery master. there is no escaping * this, so just keep trying until we get it. */ status = dlm_init_recovery_area(dlm, dead_node); if (status < 0) { mlog(ML_ERROR, "%s: failed to alloc recovery area, " "retrying\n", dlm->name); msleep(1000); } } while (status != 0); /* safe to access the node data list without a lock, since this * process is the only one to change the list */ list_for_each_entry(ndata, &dlm->reco.node_data, list) { BUG_ON(ndata->state != DLM_RECO_NODE_DATA_INIT); ndata->state = DLM_RECO_NODE_DATA_REQUESTING; mlog(0, "requesting lock info from node %u\n", ndata->node_num); if (ndata->node_num == dlm->node_num) { ndata->state = DLM_RECO_NODE_DATA_DONE; continue; } do { status = dlm_request_all_locks(dlm, ndata->node_num, dead_node); if (status < 0) { mlog_errno(status); if (dlm_is_host_down(status)) { /* node died, ignore it for recovery */ status = 0; ndata->state = DLM_RECO_NODE_DATA_DEAD; /* wait for the domain map to catch up * with the network state. */ wait_event_timeout(dlm->dlm_reco_thread_wq, dlm_is_node_dead(dlm, ndata->node_num), msecs_to_jiffies(1000)); mlog(0, "waited 1 sec for %u, " "dead? %s\n", ndata->node_num, dlm_is_node_dead(dlm, ndata->node_num) ? "yes" : "no"); } else { /* -ENOMEM on the other node */ mlog(0, "%s: node %u returned " "%d during recovery, retrying " "after a short wait\n", dlm->name, ndata->node_num, status); msleep(100); } } } while (status != 0); spin_lock(&dlm_reco_state_lock); switch (ndata->state) { case DLM_RECO_NODE_DATA_INIT: case DLM_RECO_NODE_DATA_FINALIZE_SENT: case DLM_RECO_NODE_DATA_REQUESTED: BUG(); break; case DLM_RECO_NODE_DATA_DEAD: mlog(0, "node %u died after requesting " "recovery info for node %u\n", ndata->node_num, dead_node); /* fine. don't need this node's info. * continue without it. */ break; case DLM_RECO_NODE_DATA_REQUESTING: ndata->state = DLM_RECO_NODE_DATA_REQUESTED; mlog(0, "now receiving recovery data from " "node %u for dead node %u\n", ndata->node_num, dead_node); break; case DLM_RECO_NODE_DATA_RECEIVING: mlog(0, "already receiving recovery data from " "node %u for dead node %u\n", ndata->node_num, dead_node); break; case DLM_RECO_NODE_DATA_DONE: mlog(0, "already DONE receiving recovery data " "from node %u for dead node %u\n", ndata->node_num, dead_node); break; } spin_unlock(&dlm_reco_state_lock); } mlog(0, "done requesting all lock info\n"); /* nodes should be sending reco data now * just need to wait */ while (1) { /* check all the nodes now to see if we are * done, or if anyone died */ all_nodes_done = 1; spin_lock(&dlm_reco_state_lock); list_for_each_entry(ndata, &dlm->reco.node_data, list) { mlog(0, "checking recovery state of node %u\n", ndata->node_num); switch (ndata->state) { case DLM_RECO_NODE_DATA_INIT: case DLM_RECO_NODE_DATA_REQUESTING: mlog(ML_ERROR, "bad ndata state for " "node %u: state=%d\n", ndata->node_num, ndata->state); BUG(); break; case DLM_RECO_NODE_DATA_DEAD: mlog(0, "node %u died after " "requesting recovery info for " "node %u\n", ndata->node_num, dead_node); break; case DLM_RECO_NODE_DATA_RECEIVING: case DLM_RECO_NODE_DATA_REQUESTED: mlog(0, "%s: node %u still in state %s\n", dlm->name, ndata->node_num, ndata->state==DLM_RECO_NODE_DATA_RECEIVING ? "receiving" : "requested"); all_nodes_done = 0; break; case DLM_RECO_NODE_DATA_DONE: mlog(0, "%s: node %u state is done\n", dlm->name, ndata->node_num); break; case DLM_RECO_NODE_DATA_FINALIZE_SENT: mlog(0, "%s: node %u state is finalize\n", dlm->name, ndata->node_num); break; } } spin_unlock(&dlm_reco_state_lock); mlog(0, "pass #%d, all_nodes_done?: %s\n", ++pass, all_nodes_done?"yes":"no"); if (all_nodes_done) { int ret; /* all nodes are now in DLM_RECO_NODE_DATA_DONE state * just send a finalize message to everyone and * clean up */ mlog(0, "all nodes are done! send finalize\n"); ret = dlm_send_finalize_reco_message(dlm); if (ret < 0) mlog_errno(ret); spin_lock(&dlm->spinlock); dlm_finish_local_lockres_recovery(dlm, dead_node, dlm->node_num); spin_unlock(&dlm->spinlock); mlog(0, "should be done with recovery!\n"); mlog(0, "finishing recovery of %s at %lu, " "dead=%u, this=%u, new=%u\n", dlm->name, jiffies, dlm->reco.dead_node, dlm->node_num, dlm->reco.new_master); destroy = 1; status = 0; /* rescan everything marked dirty along the way */ dlm_kick_thread(dlm, NULL); break; } /* wait to be signalled, with periodic timeout * to check for node death */ wait_event_interruptible_timeout(dlm->dlm_reco_thread_wq, kthread_should_stop(), msecs_to_jiffies(DLM_RECO_THREAD_TIMEOUT_MS)); } if (destroy) dlm_destroy_recovery_area(dlm, dead_node); mlog_exit(status); return status;}static int dlm_init_recovery_area(struct dlm_ctxt *dlm, u8 dead_node){ int num=0; struct dlm_reco_node_data *ndata; spin_lock(&dlm->spinlock); memcpy(dlm->reco.node_map, dlm->domain_map, sizeof(dlm->domain_map)); /* nodes can only be removed (by dying) after dropping * this lock, and death will be trapped later, so this should do */ spin_unlock(&dlm->spinlock); while (1) { num = find_next_bit (dlm->reco.node_map, O2NM_MAX_NODES, num); if (num >= O2NM_MAX_NODES) { break; } BUG_ON(num == dead_node); ndata = kzalloc(sizeof(*ndata), GFP_NOFS); if (!ndata) { dlm_destroy_recovery_area(dlm, dead_node); return -ENOMEM; } ndata->node_num = num; ndata->state = DLM_RECO_NODE_DATA_INIT; spin_lock(&dlm_reco_state_lock); list_add_tail(&ndata->list, &dlm->reco.node_data); spin_unlock(&dlm_reco_state_lock); num++; } return 0;}static void dlm_destroy_recovery_area(struct dlm_ctxt *dlm, u8 dead_node){ struct dlm_reco_node_data *ndata, *next; LIST_HEAD(tmplist); spin_lock(&dlm_reco_state_lock); list_splice_init(&dlm->reco.node_data, &tmplist); spin_unlock(&dlm_reco_state_lock); list_for_each_entry_safe(ndata, next, &tmplist, list) { list_del_init(&ndata->list); kfree(ndata); }}static int dlm_request_all_locks(struct dlm_ctxt *dlm, u8 request_from, u8 dead_node){ struct dlm_lock_request lr; enum dlm_status ret; mlog(0, "\n"); mlog(0, "dlm_request_all_locks: dead node is %u, sending request " "to %u\n", dead_node, request_from); memset(&lr, 0, sizeof(lr)); lr.node_idx = dlm->node_num; lr.dead_node = dead_node; // send message ret = DLM_NOLOCKMGR; ret = o2net_send_message(DLM_LOCK_REQUEST_MSG, dlm->key, &lr, sizeof(lr), request_from, NULL); /* negative status is handled by caller */ if (ret < 0) mlog_errno(ret); // return from here, then // sleep until all received or error return ret;}int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data, void **ret_data){ struct dlm_ctxt *dlm = data; struct dlm_lock_request *lr = (struct dlm_lock_request *)msg->buf; char *buf = NULL; struct dlm_work_item *item = NULL; if (!dlm_grab(dlm)) return -EINVAL; if (lr->dead_node != dlm->reco.dead_node) { mlog(ML_ERROR, "%s: node %u sent dead_node=%u, but local " "dead_node is %u\n", dlm->name, lr->node_idx, lr->dead_node, dlm->reco.dead_node); dlm_print_reco_node_status(dlm); /* this is a hack */ dlm_put(dlm); return -ENOMEM; } BUG_ON(lr->dead_node != dlm->reco.dead_node); item = kzalloc(sizeof(*item), GFP_NOFS); if (!item) { dlm_put(dlm); return -ENOMEM; } /* this will get freed by dlm_request_all_locks_worker */ buf = (char *) __get_free_page(GFP_NOFS); if (!buf) { kfree(item); dlm_put(dlm); return -ENOMEM; } /* queue up work for dlm_request_all_locks_worker */ dlm_grab(dlm); /* get an extra ref for the work item */ dlm_init_work_item(dlm, item, dlm_request_all_locks_worker, buf); item->u.ral.reco_master = lr->node_idx; item->u.ral.dead_node = lr->dead_node; spin_lock(&dlm->work_lock); list_add_tail(&item->list, &dlm->work_list); spin_unlock(&dlm->work_lock); queue_work(dlm->dlm_worker, &dlm->dispatched_work); dlm_put(dlm); return 0;}static void dlm_request_all_locks_worker(struct dlm_work_item *item, void *data){ struct dlm_migratable_lockres *mres; struct dlm_lock_resource *res; struct dlm_ctxt *dlm; LIST_HEAD(resources); int ret; u8 dead_node, reco_master; int skip_all_done = 0; dlm = item->dlm; dead_node = item->u.ral.dead_node; reco_master = item->u.ral.reco_master; mres = (struct dlm_migratable_lockres *)data;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -