📄 dlmmaster.c
字号:
int curnode; unsigned long *orig_bm; unsigned long *cur_bm; unsigned long diff_bm[BITS_TO_LONGS(O2NM_MAX_NODES)];};enum dlm_node_state_change{ NODE_DOWN = -1, NODE_NO_CHANGE = 0, NODE_UP};static void dlm_bitmap_diff_iter_init(struct dlm_bitmap_diff_iter *iter, unsigned long *orig_bm, unsigned long *cur_bm){ unsigned long p1, p2; int i; iter->curnode = -1; iter->orig_bm = orig_bm; iter->cur_bm = cur_bm; for (i = 0; i < BITS_TO_LONGS(O2NM_MAX_NODES); i++) { p1 = *(iter->orig_bm + i); p2 = *(iter->cur_bm + i); iter->diff_bm[i] = (p1 & ~p2) | (p2 & ~p1); }}static int dlm_bitmap_diff_iter_next(struct dlm_bitmap_diff_iter *iter, enum dlm_node_state_change *state){ int bit; if (iter->curnode >= O2NM_MAX_NODES) return -ENOENT; bit = find_next_bit(iter->diff_bm, O2NM_MAX_NODES, iter->curnode+1); if (bit >= O2NM_MAX_NODES) { iter->curnode = O2NM_MAX_NODES; return -ENOENT; } /* if it was there in the original then this node died */ if (test_bit(bit, iter->orig_bm)) *state = NODE_DOWN; else *state = NODE_UP; iter->curnode = bit; return bit;}static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, struct dlm_master_list_entry *mle, int blocked){ struct dlm_bitmap_diff_iter bdi; enum dlm_node_state_change sc; int node; int ret = 0; mlog(0, "something happened such that the " "master process may need to be restarted!\n"); assert_spin_locked(&mle->spinlock); dlm_bitmap_diff_iter_init(&bdi, mle->vote_map, mle->node_map); node = dlm_bitmap_diff_iter_next(&bdi, &sc); while (node >= 0) { if (sc == NODE_UP) { /* a node came up. clear any old vote from * the response map and set it in the vote map * then restart the mastery. */ mlog(ML_NOTICE, "node %d up while restarting\n", node); /* redo the master request, but only for the new node */ mlog(0, "sending request to new node\n"); clear_bit(node, mle->response_map); set_bit(node, mle->vote_map); } else { mlog(ML_ERROR, "node down! %d\n", node); /* if the node wasn't involved in mastery skip it, * but clear it out from the maps so that it will * not affect mastery of this lockres */ clear_bit(node, mle->response_map); clear_bit(node, mle->vote_map); if (!test_bit(node, mle->maybe_map)) goto next; /* if we're already blocked on lock mastery, and the * dead node wasn't the expected master, or there is * another node in the maybe_map, keep waiting */ if (blocked) { int lowest = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0); /* act like it was never there */ clear_bit(node, mle->maybe_map); if (node != lowest) goto next; mlog(ML_ERROR, "expected master %u died while " "this node was blocked waiting on it!\n", node); lowest = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, lowest+1); if (lowest < O2NM_MAX_NODES) { mlog(0, "still blocked. waiting " "on %u now\n", lowest); goto next; } /* mle is an MLE_BLOCK, but there is now * nothing left to block on. we need to return * all the way back out and try again with * an MLE_MASTER. dlm_do_local_recovery_cleanup * has already run, so the mle refcount is ok */ mlog(0, "no longer blocking. we can " "try to master this here\n"); mle->type = DLM_MLE_MASTER; memset(mle->maybe_map, 0, sizeof(mle->maybe_map)); memset(mle->response_map, 0, sizeof(mle->maybe_map)); memcpy(mle->vote_map, mle->node_map, sizeof(mle->node_map)); mle->u.res = res; set_bit(dlm->node_num, mle->maybe_map); ret = -EAGAIN; goto next; } clear_bit(node, mle->maybe_map); if (node > dlm->node_num) goto next; mlog(0, "dead node in map!\n"); /* yuck. go back and re-contact all nodes * in the vote_map, removing this node. */ memset(mle->response_map, 0, sizeof(mle->response_map)); } ret = -EAGAIN;next: node = dlm_bitmap_diff_iter_next(&bdi, &sc); } return ret;}/* * DLM_MASTER_REQUEST_MSG * * returns: 0 on success, * -errno on a network error * * on error, the caller should assume the target node is "dead" * */static int dlm_do_master_request(struct dlm_master_list_entry *mle, int to){ struct dlm_ctxt *dlm = mle->dlm; struct dlm_master_request request; int ret, response=0, resend; memset(&request, 0, sizeof(request)); request.node_idx = dlm->node_num; BUG_ON(mle->type == DLM_MLE_MIGRATION); if (mle->type != DLM_MLE_MASTER) { request.namelen = mle->u.name.len; memcpy(request.name, mle->u.name.name, request.namelen); } else { request.namelen = mle->u.res->lockname.len; memcpy(request.name, mle->u.res->lockname.name, request.namelen); }again: ret = o2net_send_message(DLM_MASTER_REQUEST_MSG, dlm->key, &request, sizeof(request), to, &response); if (ret < 0) { if (ret == -ESRCH) { /* should never happen */ mlog(ML_ERROR, "TCP stack not ready!\n"); BUG(); } else if (ret == -EINVAL) { mlog(ML_ERROR, "bad args passed to o2net!\n"); BUG(); } else if (ret == -ENOMEM) { mlog(ML_ERROR, "out of memory while trying to send " "network message! retrying\n"); /* this is totally crude */ msleep(50); goto again; } else if (!dlm_is_host_down(ret)) { /* not a network error. bad. */ mlog_errno(ret); mlog(ML_ERROR, "unhandled error!"); BUG(); } /* all other errors should be network errors, * and likely indicate node death */ mlog(ML_ERROR, "link to %d went down!\n", to); goto out; } ret = 0; resend = 0; spin_lock(&mle->spinlock); switch (response) { case DLM_MASTER_RESP_YES: set_bit(to, mle->response_map); mlog(0, "node %u is the master, response=YES\n", to); mle->master = to; break; case DLM_MASTER_RESP_NO: mlog(0, "node %u not master, response=NO\n", to); set_bit(to, mle->response_map); break; case DLM_MASTER_RESP_MAYBE: mlog(0, "node %u not master, response=MAYBE\n", to); set_bit(to, mle->response_map); set_bit(to, mle->maybe_map); break; case DLM_MASTER_RESP_ERROR: mlog(0, "node %u hit an error, resending\n", to); resend = 1; response = 0; break; default: mlog(ML_ERROR, "bad response! %u\n", response); BUG(); } spin_unlock(&mle->spinlock); if (resend) { /* this is also totally crude */ msleep(50); goto again; }out: return ret;}/* * locks that can be taken here: * dlm->spinlock * res->spinlock * mle->spinlock * dlm->master_list * * if possible, TRIM THIS DOWN!!! */int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data){ u8 response = DLM_MASTER_RESP_MAYBE; struct dlm_ctxt *dlm = data; struct dlm_lock_resource *res; struct dlm_master_request *request = (struct dlm_master_request *) msg->buf; struct dlm_master_list_entry *mle = NULL, *tmpmle = NULL; char *name; unsigned int namelen; int found, ret; int set_maybe; if (!dlm_grab(dlm)) return DLM_MASTER_RESP_NO; if (!dlm_domain_fully_joined(dlm)) { response = DLM_MASTER_RESP_NO; goto send_response; } name = request->name; namelen = request->namelen; if (namelen > DLM_LOCKID_NAME_MAX) { response = DLM_IVBUFLEN; goto send_response; }way_up_top: spin_lock(&dlm->spinlock); res = __dlm_lookup_lockres(dlm, name, namelen); if (res) { spin_unlock(&dlm->spinlock); /* take care of the easy cases up front */ spin_lock(&res->spinlock); if (res->state & DLM_LOCK_RES_RECOVERING) { spin_unlock(&res->spinlock); mlog(0, "returning DLM_MASTER_RESP_ERROR since res is " "being recovered\n"); response = DLM_MASTER_RESP_ERROR; if (mle) kmem_cache_free(dlm_mle_cache, mle); goto send_response; } if (res->owner == dlm->node_num) { u32 flags = DLM_ASSERT_MASTER_MLE_CLEANUP; spin_unlock(&res->spinlock); // mlog(0, "this node is the master\n"); response = DLM_MASTER_RESP_YES; if (mle) kmem_cache_free(dlm_mle_cache, mle); /* this node is the owner. * there is some extra work that needs to * happen now. the requesting node has * caused all nodes up to this one to * create mles. this node now needs to * go back and clean those up. */ mlog(0, "%u is the owner of %.*s, cleaning everyone else\n", dlm->node_num, res->lockname.len, res->lockname.name); ret = dlm_dispatch_assert_master(dlm, res, 1, request->node_idx, flags); if (ret < 0) { mlog(ML_ERROR, "failed to dispatch assert " "master work\n"); response = DLM_MASTER_RESP_ERROR; } goto send_response; } else if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) { spin_unlock(&res->spinlock); // mlog(0, "node %u is the master\n", res->owner); response = DLM_MASTER_RESP_NO; if (mle) kmem_cache_free(dlm_mle_cache, mle); goto send_response; } /* ok, there is no owner. either this node is * being blocked, or it is actively trying to * master this lock. */ if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) { mlog(ML_ERROR, "lock with no owner should be " "in-progress!\n"); BUG(); } // mlog(0, "lockres is in progress...\n"); spin_lock(&dlm->master_lock); found = dlm_find_mle(dlm, &tmpmle, name, namelen); if (!found) { mlog(ML_ERROR, "no mle found for this lock!\n"); BUG(); } set_maybe = 1; spin_lock(&tmpmle->spinlock); if (tmpmle->type == DLM_MLE_BLOCK) { // mlog(0, "this node is waiting for " // "lockres to be mastered\n"); response = DLM_MASTER_RESP_NO; } else if (tmpmle->type == DLM_MLE_MIGRATION) { mlog(0, "node %u is master, but trying to migrate to " "node %u.\n", tmpmle->master, tmpmle->new_master); if (tmpmle->master == dlm->node_num) { response = DLM_MASTER_RESP_YES; mlog(ML_ERROR, "no owner on lockres, but this " "node is trying to migrate it to %u?!\n", tmpmle->new_master); BUG(); } else { /* the real master can respond on its own */ response = DLM_MASTER_RESP_NO; } } else if (tmpmle->master != DLM_LOCK_RES_OWNER_UNKNOWN) { set_maybe = 0; if (tmpmle->master == dlm->node_num) response = DLM_MASTER_RESP_YES; else response = DLM_MASTER_RESP_NO; } else { // mlog(0, "this node is attempting to " // "master lockres\n"); response = DLM_MASTER_RESP_MAYBE; } if (set_maybe) set_bit(request->node_idx, tmpmle->maybe_map); spin_unlock(&tmpmle->spinlock); spin_unlock(&dlm->master_lock); spin_unlock(&res->spinlock); /* keep the mle attached to heartbeat events */ dlm_put_mle(tmpmle); if (mle) kmem_cache_free(dlm_mle_cache, mle); goto send_response; } /* * lockres doesn't exist on this node * if there is an MLE_BLOCK, return NO * if there is an MLE_MASTER, return MAYBE * otherwise, add an MLE_BLOCK, return NO */ spin_lock(&dlm->master_lock); found = dlm_find_mle(dlm, &tmpmle, name, namelen); if (!found) { /* this lockid has never been seen on this node yet */ // mlog(0, "no mle found\n"); if (!mle) { spin_unlock(&dlm->master_lock); spin_unlock(&dlm->spinlock); mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache, GFP_KERNEL); if (!mle) { // bad bad bad... this sucks. response = DLM_MASTER_RESP_ERROR; goto send_response; } spin_lock(&dlm->spinlock); dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL, name, namelen); spin_unlock(&dlm->spinlock); goto way_up_top; } // mlog(0, "this is second time thru, already allocated, " // "add the block.\n"); set_bit(request->node_idx, mle->maybe_map); list_add(&mle->list, &dlm->master_list); response = DLM_MASTER_RESP_NO; } else { // mlog(0, "mle was found\n"); set_maybe = 1; spin_lock(&tmpmle->spinlock); if (tmpmle->type == DLM_MLE_BLOCK) response = DLM_MASTER_RESP_NO; else if (tmpmle->type == DLM_MLE_MIGRATION) { mlog(0, "migration mle was found (%u->%u)\n", tmpmle->master, tmpmle->new_master); if (tmpmle->master == dlm->node_num) { mlog(ML_ERROR, "no lockres, but migration mle " "says that this node is master!\n"); BUG(); } /* real master can respond on its own */ response = DLM_MASTER_RESP_NO; } else { if (tmpmle->master == dlm->node_num) { response = DLM_MASTER_RESP_YES; set_maybe = 0; } else response = DLM_MASTER_RESP_MAYBE; } if (set_maybe) set_bit(request->node_idx, tmpmle->maybe_map); spin_unlock(&tmpmle->spinlock); } spin_unlock(&dlm->master_lock); spin_unlock(&dlm->spinlock); if (found) { /* keep the mle attached to heartbeat events */ dlm_put_mle(tmpmle); }send_response: dlm_put(dlm); return response;}/* * DLM_ASSERT_MASTER_MSG *//* * NOTE: this can be used for debugging * can periodically run all locks owned by this node * and re-assert across the cluster...
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -