📄 dlmmaster.c
字号:
* lockres doesn't exist on this node * if there is an MLE_BLOCK, return NO * if there is an MLE_MASTER, return MAYBE * otherwise, add an MLE_BLOCK, return NO */ spin_lock(&dlm->master_lock); found = dlm_find_mle(dlm, &tmpmle, name, namelen); if (!found) { /* this lockid has never been seen on this node yet */ // mlog(0, "no mle found\n"); if (!mle) { spin_unlock(&dlm->master_lock); spin_unlock(&dlm->spinlock); mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache, GFP_KERNEL); if (!mle) { response = DLM_MASTER_RESP_ERROR; mlog_errno(-ENOMEM); goto send_response; } spin_lock(&dlm->spinlock); dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL, name, namelen); spin_unlock(&dlm->spinlock); goto way_up_top; } // mlog(0, "this is second time thru, already allocated, " // "add the block.\n"); set_bit(request->node_idx, mle->maybe_map); list_add(&mle->list, &dlm->master_list); response = DLM_MASTER_RESP_NO; } else { // mlog(0, "mle was found\n"); set_maybe = 1; spin_lock(&tmpmle->spinlock); if (tmpmle->master == dlm->node_num) { mlog(ML_ERROR, "no lockres, but an mle with this node as master!\n"); BUG(); } if (tmpmle->type == DLM_MLE_BLOCK) response = DLM_MASTER_RESP_NO; else if (tmpmle->type == DLM_MLE_MIGRATION) { mlog(0, "migration mle was found (%u->%u)\n", tmpmle->master, tmpmle->new_master); /* real master can respond on its own */ response = DLM_MASTER_RESP_NO; } else response = DLM_MASTER_RESP_MAYBE; if (set_maybe) set_bit(request->node_idx, tmpmle->maybe_map); spin_unlock(&tmpmle->spinlock); } spin_unlock(&dlm->master_lock); spin_unlock(&dlm->spinlock); if (found) { /* keep the mle attached to heartbeat events */ dlm_put_mle(tmpmle); }send_response: if (dispatch_assert) { if (response != DLM_MASTER_RESP_YES) mlog(ML_ERROR, "invalid response %d\n", response); if (!res) { mlog(ML_ERROR, "bad lockres while trying to assert!\n"); BUG(); } mlog(0, "%u is the owner of %.*s, cleaning everyone else\n", dlm->node_num, res->lockname.len, res->lockname.name); ret = dlm_dispatch_assert_master(dlm, res, 0, request->node_idx, DLM_ASSERT_MASTER_MLE_CLEANUP); if (ret < 0) { mlog(ML_ERROR, "failed to dispatch assert master work\n"); response = DLM_MASTER_RESP_ERROR; } } dlm_put(dlm); return response;}/* * DLM_ASSERT_MASTER_MSG *//* * NOTE: this can be used for debugging * can periodically run all locks owned by this node * and re-assert across the cluster... */static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname, unsigned int namelen, void *nodemap, u32 flags){ struct dlm_assert_master assert; int to, tmpret; struct dlm_node_iter iter; int ret = 0; int reassert; BUG_ON(namelen > O2NM_MAX_NAME_LEN);again: reassert = 0; /* note that if this nodemap is empty, it returns 0 */ dlm_node_iter_init(nodemap, &iter); while ((to = dlm_node_iter_next(&iter)) >= 0) { int r = 0; mlog(0, "sending assert master to %d (%.*s)\n", to, namelen, lockname); memset(&assert, 0, sizeof(assert)); assert.node_idx = dlm->node_num; assert.namelen = namelen; memcpy(assert.name, lockname, namelen); assert.flags = cpu_to_be32(flags); tmpret = o2net_send_message(DLM_ASSERT_MASTER_MSG, dlm->key, &assert, sizeof(assert), to, &r); if (tmpret < 0) { mlog(ML_ERROR, "assert_master returned %d!\n", tmpret); if (!dlm_is_host_down(tmpret)) { mlog(ML_ERROR, "unhandled error!\n"); BUG(); } /* a node died. finish out the rest of the nodes. */ mlog(ML_ERROR, "link to %d went down!\n", to); /* any nonzero status return will do */ ret = tmpret; } else if (r < 0) { /* ok, something horribly messed. kill thyself. */ mlog(ML_ERROR,"during assert master of %.*s to %u, " "got %d.\n", namelen, lockname, to, r); dlm_dump_lock_resources(dlm); BUG(); } else if (r == EAGAIN) { mlog(0, "%.*s: node %u create mles on other " "nodes and requests a re-assert\n", namelen, lockname, to); reassert = 1; } } if (reassert) goto again; return ret;}/* * locks that can be taken here: * dlm->spinlock * res->spinlock * mle->spinlock * dlm->master_list * * if possible, TRIM THIS DOWN!!! */int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data){ struct dlm_ctxt *dlm = data; struct dlm_master_list_entry *mle = NULL; struct dlm_assert_master *assert = (struct dlm_assert_master *)msg->buf; struct dlm_lock_resource *res = NULL; char *name; unsigned int namelen; u32 flags; int master_request = 0; int ret = 0; if (!dlm_grab(dlm)) return 0; name = assert->name; namelen = assert->namelen; flags = be32_to_cpu(assert->flags); if (namelen > DLM_LOCKID_NAME_MAX) { mlog(ML_ERROR, "Invalid name length!"); goto done; } spin_lock(&dlm->spinlock); if (flags) mlog(0, "assert_master with flags: %u\n", flags); /* find the MLE */ spin_lock(&dlm->master_lock); if (!dlm_find_mle(dlm, &mle, name, namelen)) { /* not an error, could be master just re-asserting */ mlog(0, "just got an assert_master from %u, but no " "MLE for it! (%.*s)\n", assert->node_idx, namelen, name); } else { int bit = find_next_bit (mle->maybe_map, O2NM_MAX_NODES, 0); if (bit >= O2NM_MAX_NODES) { /* not necessarily an error, though less likely. * could be master just re-asserting. */ mlog(ML_ERROR, "no bits set in the maybe_map, but %u " "is asserting! (%.*s)\n", assert->node_idx, namelen, name); } else if (bit != assert->node_idx) { if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) { mlog(0, "master %u was found, %u should " "back off\n", assert->node_idx, bit); } else { /* with the fix for bug 569, a higher node * number winning the mastery will respond * YES to mastery requests, but this node * had no way of knowing. let it pass. */ mlog(ML_ERROR, "%u is the lowest node, " "%u is asserting. (%.*s) %u must " "have begun after %u won.\n", bit, assert->node_idx, namelen, name, bit, assert->node_idx); } } } spin_unlock(&dlm->master_lock); /* ok everything checks out with the MLE * now check to see if there is a lockres */ res = __dlm_lookup_lockres(dlm, name, namelen); if (res) { spin_lock(&res->spinlock); if (res->state & DLM_LOCK_RES_RECOVERING) { mlog(ML_ERROR, "%u asserting but %.*s is " "RECOVERING!\n", assert->node_idx, namelen, name); goto kill; } if (!mle) { if (res->owner != assert->node_idx) { mlog(ML_ERROR, "assert_master from " "%u, but current owner is " "%u! (%.*s)\n", assert->node_idx, res->owner, namelen, name); goto kill; } } else if (mle->type != DLM_MLE_MIGRATION) { if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) { /* owner is just re-asserting */ if (res->owner == assert->node_idx) { mlog(0, "owner %u re-asserting on " "lock %.*s\n", assert->node_idx, namelen, name); goto ok; } mlog(ML_ERROR, "got assert_master from " "node %u, but %u is the owner! " "(%.*s)\n", assert->node_idx, res->owner, namelen, name); goto kill; } if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) { mlog(ML_ERROR, "got assert from %u, but lock " "with no owner should be " "in-progress! (%.*s)\n", assert->node_idx, namelen, name); goto kill; } } else /* mle->type == DLM_MLE_MIGRATION */ { /* should only be getting an assert from new master */ if (assert->node_idx != mle->new_master) { mlog(ML_ERROR, "got assert from %u, but " "new master is %u, and old master " "was %u (%.*s)\n", assert->node_idx, mle->new_master, mle->master, namelen, name); goto kill; } }ok: spin_unlock(&res->spinlock); } spin_unlock(&dlm->spinlock); // mlog(0, "woo! got an assert_master from node %u!\n", // assert->node_idx); if (mle) { int extra_ref = 0; int nn = -1; spin_lock(&mle->spinlock); if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION) extra_ref = 1; else { /* MASTER mle: if any bits set in the response map * then the calling node needs to re-assert to clear * up nodes that this node contacted */ while ((nn = find_next_bit (mle->response_map, O2NM_MAX_NODES, nn+1)) < O2NM_MAX_NODES) { if (nn != dlm->node_num && nn != assert->node_idx) master_request = 1; } } mle->master = assert->node_idx; atomic_set(&mle->woken, 1); wake_up(&mle->wq); spin_unlock(&mle->spinlock); if (mle->type == DLM_MLE_MIGRATION && res) { mlog(0, "finishing off migration of lockres %.*s, " "from %u to %u\n", res->lockname.len, res->lockname.name, dlm->node_num, mle->new_master); spin_lock(&res->spinlock); res->state &= ~DLM_LOCK_RES_MIGRATING; dlm_change_lockres_owner(dlm, res, mle->new_master); BUG_ON(res->state & DLM_LOCK_RES_DIRTY); spin_unlock(&res->spinlock); } /* master is known, detach if not already detached */ dlm_mle_detach_hb_events(dlm, mle); dlm_put_mle(mle); if (extra_ref) { /* the assert master message now balances the extra * ref given by the master / migration request message. * if this is the last put, it will be removed * from the list. */ dlm_put_mle(mle); } }done: ret = 0; if (res) dlm_lockres_put(res); dlm_put(dlm); if (master_request) { mlog(0, "need to tell master to reassert\n"); ret = EAGAIN; // positive. negative would shoot down the node. } return ret;kill: /* kill the caller! */ spin_unlock(&res->spinlock); spin_unlock(&dlm->spinlock); dlm_lockres_put(res); mlog(ML_ERROR, "Bad message received from another node. Dumping state " "and killing the other node now! This node is OK and can continue.\n"); dlm_dump_lock_resources(dlm); dlm_put(dlm); return -EINVAL;}int dlm_dispatch_assert_master(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, int ignore_higher, u8 request_from, u32 flags){ struct dlm_work_item *item; item = kcalloc(1, sizeof(*item), GFP_KERNEL); if (!item) return -ENOMEM; /* queue up work for dlm_assert_master_worker */ dlm_grab(dlm); /* get an extra ref for the work item */ dlm_init_work_item(dlm, item, dlm_assert_master_worker, NULL); item->u.am.lockres = res; /* already have a ref */ /* can optionally ignore node numbers higher than this node */ item->u.am.ignore_higher = ignore_higher; item->u.am.request_from = request_from; item->u.am.flags = flags; if (ignore_higher) mlog(0, "IGNORE HIGHER: %.*s\n", res->lockname.len, res->lockname.name); spin_lock(&dlm->work_lock); list_add_tail(&item->list, &dlm->work_list); spin_unlock(&dlm->work_lock); schedule_work(&dlm->dispatched_work); return 0;}static void dlm_assert_master_worker(struct dlm_work_item *item, void *data){ struct dlm_ctxt *dlm = data; int ret = 0; struct dlm_lock_resource *res; unsigned long nodemap[BITS_TO_LONGS(O2NM_MAX_NODES)]; int ignore_higher; int bit; u8 request_from; u32 flags; dlm = item->dlm; res = item->u.am.lockres; ignore_higher = item->u.am.ignore_higher; request_from = item->u.am.request_from; flags = item->u.am.flags; spin_lock(&dlm->spinlock); memcpy(nodemap, dlm->domain_map, sizeof(nodemap)); spin_unlock(&dlm->spinlock); clear_bit(dlm->node_num, nodemap); if (ignore_higher) { /* if is this just to clear up mles for nodes below * this node, do not send the message to the original * caller or any node number higher than this */ clear_bit(request_from, nodemap); bit = dlm->node_num; while (1) { bit = find_next_bit(nodemap, O2NM_MAX_NODES, bit+1); if (bit >= O2NM_MAX_NODES) break; clear_bit(bit, nodemap); } } /* this call now finishes out the nodemap * even if one or more nodes die */ mlog(0, "worker about to master %.*s here, this=%u\n", res->lockname.len, res->lockname.name, dlm->node_num); ret = dlm_do_assert_master(dlm, res->lockname.name, res->lockname.len, nodemap, flags); if (ret < 0) { /* no need to restart, we are done */ mlog_errno(ret); } dlm_lockres_put(res); mlog(0, "finished with dlm_assert_master_worker\n");}/* SPECIAL CASE for the $RECOVERY lock used by the recovery thread. * We cannot wait for node recovery to complete to begin mastering this * lockres because this lockres is used to kick off recovery! ;-) * So, do a pre-check on all living nodes to see if any of those nodes * think that $RECOVERY is currently mastered by a dead node. If so, * we wait a short time to allow that node to get notified by its own * heartbeat stack, then check again. All $RECOVERY lock resources * mastered by dead nodes are purged when the hearbeat callback is * fired, so we can know for sure that it is safe to continue once * the node returns a live node or no node. */static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res){ struct dlm_node_iter iter; int nodenum; int ret = 0; u8 master = DLM_LOCK_RES_OWNER_UNKNOWN; spin_lock(&dlm->spinlock); dlm_node_iter_init(dlm->domain_map, &iter); spin_unlock(&dlm->spinlock); while ((nodenum = dlm_node_iter_next(&iter)) >= 0) { /* do not send to self */ if (nodenum == dlm->node_num) continue; ret = dlm_do_master_requery(dlm, res, nodenum, &master); if (ret < 0) { mlog_errno(ret); if (!dlm_is_host_down(ret)) BUG(); /* host is down, so answer for that node would be * DLM_LOCK_RES_OWNER_UNKNOWN. continue. */ } if (master != DLM_LOCK_RES_OWNER_UNKNOWN) { /* check to see if this master is in the recovery map */ spin_lock(&dlm->spinlock); if (test_bit(master, dlm->recovery_map)) { mlog(ML_NOTICE, "%s: node %u has not seen " "node %u go down yet, and thinks the " "dead node is mastering the recovery " "lock. must wait.\n", dlm->name,
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -