📄 dlmmaster.c
字号:
} if (m != O2NM_MAX_NODES) { /* another node has done an assert! * all done! */ sleep = 0; } else { sleep = 1; /* have all nodes responded? */ if (voting_done && !*blocked) { bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0); if (dlm->node_num <= bit) { /* my node number is lowest. * now tell other nodes that I am * mastering this. */ mle->master = dlm->node_num; assert = 1; sleep = 0; } /* if voting is done, but we have not received * an assert master yet, we must sleep */ } } spin_unlock(&mle->spinlock); /* sleep if we haven't finished voting yet */ if (sleep) { unsigned long timeo = msecs_to_jiffies(DLM_MASTERY_TIMEOUT_MS); /* if (atomic_read(&mle->mle_refs.refcount) < 2) mlog(ML_ERROR, "mle (%p) refs=%d, name=%.*s\n", mle, atomic_read(&mle->mle_refs.refcount), res->lockname.len, res->lockname.name); */ atomic_set(&mle->woken, 0); (void)wait_event_timeout(mle->wq, (atomic_read(&mle->woken) == 1), timeo); if (res->owner == O2NM_MAX_NODES) { mlog(0, "waiting again\n"); goto recheck; } mlog(0, "done waiting, master is %u\n", res->owner); ret = 0; goto leave; } ret = 0; /* done */ if (assert) { m = dlm->node_num; mlog(0, "about to master %.*s here, this=%u\n", res->lockname.len, res->lockname.name, m); ret = dlm_do_assert_master(dlm, res->lockname.name, res->lockname.len, mle->vote_map, 0); if (ret) { /* This is a failure in the network path, * not in the response to the assert_master * (any nonzero response is a BUG on this node). * Most likely a socket just got disconnected * due to node death. */ mlog_errno(ret); } /* no longer need to restart lock mastery. * all living nodes have been contacted. */ ret = 0; } /* set the lockres owner */ spin_lock(&res->spinlock); dlm_change_lockres_owner(dlm, res, m); spin_unlock(&res->spinlock);leave: return ret;}struct dlm_bitmap_diff_iter{ int curnode; unsigned long *orig_bm; unsigned long *cur_bm; unsigned long diff_bm[BITS_TO_LONGS(O2NM_MAX_NODES)];};enum dlm_node_state_change{ NODE_DOWN = -1, NODE_NO_CHANGE = 0, NODE_UP};static void dlm_bitmap_diff_iter_init(struct dlm_bitmap_diff_iter *iter, unsigned long *orig_bm, unsigned long *cur_bm){ unsigned long p1, p2; int i; iter->curnode = -1; iter->orig_bm = orig_bm; iter->cur_bm = cur_bm; for (i = 0; i < BITS_TO_LONGS(O2NM_MAX_NODES); i++) { p1 = *(iter->orig_bm + i); p2 = *(iter->cur_bm + i); iter->diff_bm[i] = (p1 & ~p2) | (p2 & ~p1); }}static int dlm_bitmap_diff_iter_next(struct dlm_bitmap_diff_iter *iter, enum dlm_node_state_change *state){ int bit; if (iter->curnode >= O2NM_MAX_NODES) return -ENOENT; bit = find_next_bit(iter->diff_bm, O2NM_MAX_NODES, iter->curnode+1); if (bit >= O2NM_MAX_NODES) { iter->curnode = O2NM_MAX_NODES; return -ENOENT; } /* if it was there in the original then this node died */ if (test_bit(bit, iter->orig_bm)) *state = NODE_DOWN; else *state = NODE_UP; iter->curnode = bit; return bit;}static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, struct dlm_master_list_entry *mle, int blocked){ struct dlm_bitmap_diff_iter bdi; enum dlm_node_state_change sc; int node; int ret = 0; mlog(0, "something happened such that the " "master process may need to be restarted!\n"); assert_spin_locked(&mle->spinlock); dlm_bitmap_diff_iter_init(&bdi, mle->vote_map, mle->node_map); node = dlm_bitmap_diff_iter_next(&bdi, &sc); while (node >= 0) { if (sc == NODE_UP) { /* a node came up. clear any old vote from * the response map and set it in the vote map * then restart the mastery. */ mlog(ML_NOTICE, "node %d up while restarting\n", node); /* redo the master request, but only for the new node */ mlog(0, "sending request to new node\n"); clear_bit(node, mle->response_map); set_bit(node, mle->vote_map); } else { mlog(ML_ERROR, "node down! %d\n", node); /* if the node wasn't involved in mastery skip it, * but clear it out from the maps so that it will * not affect mastery of this lockres */ clear_bit(node, mle->response_map); clear_bit(node, mle->vote_map); if (!test_bit(node, mle->maybe_map)) goto next; /* if we're already blocked on lock mastery, and the * dead node wasn't the expected master, or there is * another node in the maybe_map, keep waiting */ if (blocked) { int lowest = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0); /* act like it was never there */ clear_bit(node, mle->maybe_map); if (node != lowest) goto next; mlog(ML_ERROR, "expected master %u died while " "this node was blocked waiting on it!\n", node); lowest = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, lowest+1); if (lowest < O2NM_MAX_NODES) { mlog(0, "still blocked. waiting " "on %u now\n", lowest); goto next; } /* mle is an MLE_BLOCK, but there is now * nothing left to block on. we need to return * all the way back out and try again with * an MLE_MASTER. dlm_do_local_recovery_cleanup * has already run, so the mle refcount is ok */ mlog(0, "no longer blocking. we can " "try to master this here\n"); mle->type = DLM_MLE_MASTER; memset(mle->maybe_map, 0, sizeof(mle->maybe_map)); memset(mle->response_map, 0, sizeof(mle->maybe_map)); memcpy(mle->vote_map, mle->node_map, sizeof(mle->node_map)); mle->u.res = res; set_bit(dlm->node_num, mle->maybe_map); ret = -EAGAIN; goto next; } clear_bit(node, mle->maybe_map); if (node > dlm->node_num) goto next; mlog(0, "dead node in map!\n"); /* yuck. go back and re-contact all nodes * in the vote_map, removing this node. */ memset(mle->response_map, 0, sizeof(mle->response_map)); } ret = -EAGAIN;next: node = dlm_bitmap_diff_iter_next(&bdi, &sc); } return ret;}/* * DLM_MASTER_REQUEST_MSG * * returns: 0 on success, * -errno on a network error * * on error, the caller should assume the target node is "dead" * */static int dlm_do_master_request(struct dlm_master_list_entry *mle, int to){ struct dlm_ctxt *dlm = mle->dlm; struct dlm_master_request request; int ret, response=0, resend; memset(&request, 0, sizeof(request)); request.node_idx = dlm->node_num; BUG_ON(mle->type == DLM_MLE_MIGRATION); if (mle->type != DLM_MLE_MASTER) { request.namelen = mle->u.name.len; memcpy(request.name, mle->u.name.name, request.namelen); } else { request.namelen = mle->u.res->lockname.len; memcpy(request.name, mle->u.res->lockname.name, request.namelen); }again: ret = o2net_send_message(DLM_MASTER_REQUEST_MSG, dlm->key, &request, sizeof(request), to, &response); if (ret < 0) { if (ret == -ESRCH) { /* should never happen */ mlog(ML_ERROR, "TCP stack not ready!\n"); BUG(); } else if (ret == -EINVAL) { mlog(ML_ERROR, "bad args passed to o2net!\n"); BUG(); } else if (ret == -ENOMEM) { mlog(ML_ERROR, "out of memory while trying to send " "network message! retrying\n"); /* this is totally crude */ msleep(50); goto again; } else if (!dlm_is_host_down(ret)) { /* not a network error. bad. */ mlog_errno(ret); mlog(ML_ERROR, "unhandled error!"); BUG(); } /* all other errors should be network errors, * and likely indicate node death */ mlog(ML_ERROR, "link to %d went down!\n", to); goto out; } ret = 0; resend = 0; spin_lock(&mle->spinlock); switch (response) { case DLM_MASTER_RESP_YES: set_bit(to, mle->response_map); mlog(0, "node %u is the master, response=YES\n", to); mle->master = to; break; case DLM_MASTER_RESP_NO: mlog(0, "node %u not master, response=NO\n", to); set_bit(to, mle->response_map); break; case DLM_MASTER_RESP_MAYBE: mlog(0, "node %u not master, response=MAYBE\n", to); set_bit(to, mle->response_map); set_bit(to, mle->maybe_map); break; case DLM_MASTER_RESP_ERROR: mlog(0, "node %u hit an error, resending\n", to); resend = 1; response = 0; break; default: mlog(ML_ERROR, "bad response! %u\n", response); BUG(); } spin_unlock(&mle->spinlock); if (resend) { /* this is also totally crude */ msleep(50); goto again; }out: return ret;}/* * locks that can be taken here: * dlm->spinlock * res->spinlock * mle->spinlock * dlm->master_list * * if possible, TRIM THIS DOWN!!! */int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data){ u8 response = DLM_MASTER_RESP_MAYBE; struct dlm_ctxt *dlm = data; struct dlm_lock_resource *res = NULL; struct dlm_master_request *request = (struct dlm_master_request *) msg->buf; struct dlm_master_list_entry *mle = NULL, *tmpmle = NULL; char *name; unsigned int namelen; int found, ret; int set_maybe; int dispatch_assert = 0; if (!dlm_grab(dlm)) return DLM_MASTER_RESP_NO; if (!dlm_domain_fully_joined(dlm)) { response = DLM_MASTER_RESP_NO; goto send_response; } name = request->name; namelen = request->namelen; if (namelen > DLM_LOCKID_NAME_MAX) { response = DLM_IVBUFLEN; goto send_response; }way_up_top: spin_lock(&dlm->spinlock); res = __dlm_lookup_lockres(dlm, name, namelen); if (res) { spin_unlock(&dlm->spinlock); /* take care of the easy cases up front */ spin_lock(&res->spinlock); if (res->state & DLM_LOCK_RES_RECOVERING) { spin_unlock(&res->spinlock); mlog(0, "returning DLM_MASTER_RESP_ERROR since res is " "being recovered\n"); response = DLM_MASTER_RESP_ERROR; if (mle) kmem_cache_free(dlm_mle_cache, mle); goto send_response; } if (res->owner == dlm->node_num) { spin_unlock(&res->spinlock); // mlog(0, "this node is the master\n"); response = DLM_MASTER_RESP_YES; if (mle) kmem_cache_free(dlm_mle_cache, mle); /* this node is the owner. * there is some extra work that needs to * happen now. the requesting node has * caused all nodes up to this one to * create mles. this node now needs to * go back and clean those up. */ dispatch_assert = 1; goto send_response; } else if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) { spin_unlock(&res->spinlock); // mlog(0, "node %u is the master\n", res->owner); response = DLM_MASTER_RESP_NO; if (mle) kmem_cache_free(dlm_mle_cache, mle); goto send_response; } /* ok, there is no owner. either this node is * being blocked, or it is actively trying to * master this lock. */ if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) { mlog(ML_ERROR, "lock with no owner should be " "in-progress!\n"); BUG(); } // mlog(0, "lockres is in progress...\n"); spin_lock(&dlm->master_lock); found = dlm_find_mle(dlm, &tmpmle, name, namelen); if (!found) { mlog(ML_ERROR, "no mle found for this lock!\n"); BUG(); } set_maybe = 1; spin_lock(&tmpmle->spinlock); if (tmpmle->type == DLM_MLE_BLOCK) { // mlog(0, "this node is waiting for " // "lockres to be mastered\n"); response = DLM_MASTER_RESP_NO; } else if (tmpmle->type == DLM_MLE_MIGRATION) { mlog(0, "node %u is master, but trying to migrate to " "node %u.\n", tmpmle->master, tmpmle->new_master); if (tmpmle->master == dlm->node_num) { response = DLM_MASTER_RESP_YES; mlog(ML_ERROR, "no owner on lockres, but this " "node is trying to migrate it to %u?!\n", tmpmle->new_master); BUG(); } else { /* the real master can respond on its own */ response = DLM_MASTER_RESP_NO; } } else if (tmpmle->master != DLM_LOCK_RES_OWNER_UNKNOWN) { set_maybe = 0; if (tmpmle->master == dlm->node_num) { response = DLM_MASTER_RESP_YES; /* this node will be the owner. * go back and clean the mles on any * other nodes */ dispatch_assert = 1; } else response = DLM_MASTER_RESP_NO; } else { // mlog(0, "this node is attempting to " // "master lockres\n"); response = DLM_MASTER_RESP_MAYBE; } if (set_maybe) set_bit(request->node_idx, tmpmle->maybe_map); spin_unlock(&tmpmle->spinlock); spin_unlock(&dlm->master_lock); spin_unlock(&res->spinlock); /* keep the mle attached to heartbeat events */ dlm_put_mle(tmpmle); if (mle) kmem_cache_free(dlm_mle_cache, mle); goto send_response; } /*
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -