📄 dlmmaster.c
字号:
mlog(0, "%s: waiting 500ms for heartbeat state " "change\n", dlm->name); msleep(500); } continue; } dlm_kick_recovery_thread(dlm); msleep(1000); dlm_wait_for_recovery(dlm); spin_lock(&dlm->spinlock); bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0); if (bit < O2NM_MAX_NODES) { mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to " "recover before lock mastery can begin\n", dlm->name, namelen, (char *)lockid, bit); wait_on_recovery = 1; } else wait_on_recovery = 0; spin_unlock(&dlm->spinlock); if (wait_on_recovery) dlm_wait_for_node_recovery(dlm, bit, 10000); } /* must wait for lock to be mastered elsewhere */ if (blocked) goto wait; ret = -EINVAL; dlm_node_iter_init(mle->vote_map, &iter); while ((nodenum = dlm_node_iter_next(&iter)) >= 0) { ret = dlm_do_master_request(res, mle, nodenum); if (ret < 0) mlog_errno(ret); if (mle->master != O2NM_MAX_NODES) { /* found a master ! */ if (mle->master <= nodenum) break; /* if our master request has not reached the master * yet, keep going until it does. this is how the * master will know that asserts are needed back to * the lower nodes. */ mlog(0, "%s:%.*s: requests only up to %u but master " "is %u, keep going\n", dlm->name, namelen, lockid, nodenum, mle->master); } }wait: /* keep going until the response map includes all nodes */ ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked); if (ret < 0) { wait_on_recovery = 1; mlog(0, "%s:%.*s: node map changed, redo the " "master request now, blocked=%d\n", dlm->name, res->lockname.len, res->lockname.name, blocked); if (++tries > 20) { mlog(ML_ERROR, "%s:%.*s: spinning on " "dlm_wait_for_lock_mastery, blocked=%d\n", dlm->name, res->lockname.len, res->lockname.name, blocked); dlm_print_one_lock_resource(res); dlm_print_one_mle(mle); tries = 0; } goto redo_request; } mlog(0, "lockres mastered by %u\n", res->owner); /* make sure we never continue without this */ BUG_ON(res->owner == O2NM_MAX_NODES); /* master is known, detach if not already detached */ dlm_mle_detach_hb_events(dlm, mle); dlm_put_mle(mle); /* put the extra ref */ dlm_put_mle_inuse(mle);wake_waiters: spin_lock(&res->spinlock); if (res->owner != dlm->node_num && drop_inflight_if_nonlocal) dlm_lockres_drop_inflight_ref(dlm, res); res->state &= ~DLM_LOCK_RES_IN_PROGRESS; spin_unlock(&res->spinlock); wake_up(&res->wq);leave: /* need to free the unused mle */ if (alloc_mle) kmem_cache_free(dlm_mle_cache, alloc_mle); return res;}#define DLM_MASTERY_TIMEOUT_MS 5000static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, struct dlm_master_list_entry *mle, int *blocked){ u8 m; int ret, bit; int map_changed, voting_done; int assert, sleep;recheck: ret = 0; assert = 0; /* check if another node has already become the owner */ spin_lock(&res->spinlock); if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) { mlog(0, "%s:%.*s: owner is suddenly %u\n", dlm->name, res->lockname.len, res->lockname.name, res->owner); spin_unlock(&res->spinlock); /* this will cause the master to re-assert across * the whole cluster, freeing up mles */ if (res->owner != dlm->node_num) { ret = dlm_do_master_request(res, mle, res->owner); if (ret < 0) { /* give recovery a chance to run */ mlog(ML_ERROR, "link to %u went down?: %d\n", res->owner, ret); msleep(500); goto recheck; } } ret = 0; goto leave; } spin_unlock(&res->spinlock); spin_lock(&mle->spinlock); m = mle->master; map_changed = (memcmp(mle->vote_map, mle->node_map, sizeof(mle->vote_map)) != 0); voting_done = (memcmp(mle->vote_map, mle->response_map, sizeof(mle->vote_map)) == 0); /* restart if we hit any errors */ if (map_changed) { int b; mlog(0, "%s: %.*s: node map changed, restarting\n", dlm->name, res->lockname.len, res->lockname.name); ret = dlm_restart_lock_mastery(dlm, res, mle, *blocked); b = (mle->type == DLM_MLE_BLOCK); if ((*blocked && !b) || (!*blocked && b)) { mlog(0, "%s:%.*s: status change: old=%d new=%d\n", dlm->name, res->lockname.len, res->lockname.name, *blocked, b); *blocked = b; } spin_unlock(&mle->spinlock); if (ret < 0) { mlog_errno(ret); goto leave; } mlog(0, "%s:%.*s: restart lock mastery succeeded, " "rechecking now\n", dlm->name, res->lockname.len, res->lockname.name); goto recheck; } else { if (!voting_done) { mlog(0, "map not changed and voting not done " "for %s:%.*s\n", dlm->name, res->lockname.len, res->lockname.name); } } if (m != O2NM_MAX_NODES) { /* another node has done an assert! * all done! */ sleep = 0; } else { sleep = 1; /* have all nodes responded? */ if (voting_done && !*blocked) { bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0); if (dlm->node_num <= bit) { /* my node number is lowest. * now tell other nodes that I am * mastering this. */ mle->master = dlm->node_num; /* ref was grabbed in get_lock_resource * will be dropped in dlmlock_master */ assert = 1; sleep = 0; } /* if voting is done, but we have not received * an assert master yet, we must sleep */ } } spin_unlock(&mle->spinlock); /* sleep if we haven't finished voting yet */ if (sleep) { unsigned long timeo = msecs_to_jiffies(DLM_MASTERY_TIMEOUT_MS); /* if (atomic_read(&mle->mle_refs.refcount) < 2) mlog(ML_ERROR, "mle (%p) refs=%d, name=%.*s\n", mle, atomic_read(&mle->mle_refs.refcount), res->lockname.len, res->lockname.name); */ atomic_set(&mle->woken, 0); (void)wait_event_timeout(mle->wq, (atomic_read(&mle->woken) == 1), timeo); if (res->owner == O2NM_MAX_NODES) { mlog(0, "%s:%.*s: waiting again\n", dlm->name, res->lockname.len, res->lockname.name); goto recheck; } mlog(0, "done waiting, master is %u\n", res->owner); ret = 0; goto leave; } ret = 0; /* done */ if (assert) { m = dlm->node_num; mlog(0, "about to master %.*s here, this=%u\n", res->lockname.len, res->lockname.name, m); ret = dlm_do_assert_master(dlm, res, mle->vote_map, 0); if (ret) { /* This is a failure in the network path, * not in the response to the assert_master * (any nonzero response is a BUG on this node). * Most likely a socket just got disconnected * due to node death. */ mlog_errno(ret); } /* no longer need to restart lock mastery. * all living nodes have been contacted. */ ret = 0; } /* set the lockres owner */ spin_lock(&res->spinlock); /* mastery reference obtained either during * assert_master_handler or in get_lock_resource */ dlm_change_lockres_owner(dlm, res, m); spin_unlock(&res->spinlock);leave: return ret;}struct dlm_bitmap_diff_iter{ int curnode; unsigned long *orig_bm; unsigned long *cur_bm; unsigned long diff_bm[BITS_TO_LONGS(O2NM_MAX_NODES)];};enum dlm_node_state_change{ NODE_DOWN = -1, NODE_NO_CHANGE = 0, NODE_UP};static void dlm_bitmap_diff_iter_init(struct dlm_bitmap_diff_iter *iter, unsigned long *orig_bm, unsigned long *cur_bm){ unsigned long p1, p2; int i; iter->curnode = -1; iter->orig_bm = orig_bm; iter->cur_bm = cur_bm; for (i = 0; i < BITS_TO_LONGS(O2NM_MAX_NODES); i++) { p1 = *(iter->orig_bm + i); p2 = *(iter->cur_bm + i); iter->diff_bm[i] = (p1 & ~p2) | (p2 & ~p1); }}static int dlm_bitmap_diff_iter_next(struct dlm_bitmap_diff_iter *iter, enum dlm_node_state_change *state){ int bit; if (iter->curnode >= O2NM_MAX_NODES) return -ENOENT; bit = find_next_bit(iter->diff_bm, O2NM_MAX_NODES, iter->curnode+1); if (bit >= O2NM_MAX_NODES) { iter->curnode = O2NM_MAX_NODES; return -ENOENT; } /* if it was there in the original then this node died */ if (test_bit(bit, iter->orig_bm)) *state = NODE_DOWN; else *state = NODE_UP; iter->curnode = bit; return bit;}static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, struct dlm_master_list_entry *mle, int blocked){ struct dlm_bitmap_diff_iter bdi; enum dlm_node_state_change sc; int node; int ret = 0; mlog(0, "something happened such that the " "master process may need to be restarted!\n"); assert_spin_locked(&mle->spinlock); dlm_bitmap_diff_iter_init(&bdi, mle->vote_map, mle->node_map); node = dlm_bitmap_diff_iter_next(&bdi, &sc); while (node >= 0) { if (sc == NODE_UP) { /* a node came up. clear any old vote from * the response map and set it in the vote map * then restart the mastery. */ mlog(ML_NOTICE, "node %d up while restarting\n", node); /* redo the master request, but only for the new node */ mlog(0, "sending request to new node\n"); clear_bit(node, mle->response_map); set_bit(node, mle->vote_map); } else { mlog(ML_ERROR, "node down! %d\n", node); if (blocked) { int lowest = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0); /* act like it was never there */ clear_bit(node, mle->maybe_map); if (node == lowest) { mlog(0, "expected master %u died" " while this node was blocked " "waiting on it!\n", node); lowest = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, lowest+1); if (lowest < O2NM_MAX_NODES) { mlog(0, "%s:%.*s:still " "blocked. waiting on %u " "now\n", dlm->name, res->lockname.len, res->lockname.name, lowest); } else { /* mle is an MLE_BLOCK, but * there is now nothing left to * block on. we need to return * all the way back out and try * again with an MLE_MASTER. * dlm_do_local_recovery_cleanup * has already run, so the mle * refcount is ok */ mlog(0, "%s:%.*s: no " "longer blocking. try to " "master this here\n", dlm->name, res->lockname.len, res->lockname.name); mle->type = DLM_MLE_MASTER; mle->u.res = res; } } } /* now blank out everything, as if we had never * contacted anyone */ memset(mle->maybe_map, 0, sizeof(mle->maybe_map)); memset(mle->response_map, 0, sizeof(mle->response_map)); /* reset the vote_map to the current node_map */ memcpy(mle->vote_map, mle->node_map, sizeof(mle->node_map)); /* put myself into the maybe map */ if (mle->type != DLM_MLE_BLOCK) set_bit(dlm->node_num, mle->maybe_map); } ret = -EAGAIN; node = dlm_bitmap_diff_iter_next(&bdi, &sc); } return ret;}/* * DLM_MASTER_REQUEST_MSG * * returns: 0 on success, * -errno on a network error * * on error, the caller should assume the target node is "dead" * */static int dlm_do_master_request(struct dlm_lock_resource *res, struct dlm_master_list_entry *mle, int to){ struct dlm_ctxt *dlm = mle->dlm; struct dlm_master_request request; int ret, response=0, resend; memset(&request, 0, sizeof(request)); request.node_idx = dlm->node_num; BUG_ON(mle->type == DLM_MLE_MIGRATION); if (mle->type != DLM_MLE_MASTER) { request.namelen = mle->u.name.len; memcpy(request.name, mle->u.name.name, request.namelen); } else { request.namelen = mle->u.res->lockname.len; memcpy(request.name, mle->u.res->lockname.name, request.namelen); }again: ret = o2net_send_message(DLM_MASTER_REQUEST_MSG, dlm->key, &request, sizeof(request), to, &response); if (ret < 0) { if (ret == -ESRCH) { /* should never happen */ mlog(ML_ERROR, "TCP stack not ready!\n"); BUG(); } else if (ret == -EINVAL) { mlog(ML_ERROR, "bad args passed to o2net!\n"); BUG(); } else if (ret == -ENOMEM) { mlog(ML_ERROR, "out of memory while trying to send " "network message! retrying\n"); /* this is totally crude */ msleep(50); goto again; } else if (!dlm_is_host_down(ret)) { /* not a network error. bad. */ mlog_errno(ret); mlog(ML_ERROR, "unhandled error!"); BUG(); } /* all other errors should be network errors, * and likely indicate node death */ mlog(ML_ERROR, "link to %d went down!\n", to); goto out; } ret = 0; resend = 0; spin_lock(&mle->spinlock); switch (response) { case DLM_MASTER_RESP_YES: set_bit(to, mle->response_map); mlog(0, "node %u is the master, response=YES\n", to); mlog(0, "%s:%.*s: master node %u now knows I have a " "reference\n", dlm->name, res->lockname.len, res->lockname.name, to); mle->master = to; break; case DLM_MASTER_RESP_NO:
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -