📄 dlmmaster.c
字号:
mlog(0, "node %u not master, response=NO\n", to); set_bit(to, mle->response_map); break; case DLM_MASTER_RESP_MAYBE: mlog(0, "node %u not master, response=MAYBE\n", to); set_bit(to, mle->response_map); set_bit(to, mle->maybe_map); break; case DLM_MASTER_RESP_ERROR: mlog(0, "node %u hit an error, resending\n", to); resend = 1; response = 0; break; default: mlog(ML_ERROR, "bad response! %u\n", response); BUG(); } spin_unlock(&mle->spinlock); if (resend) { /* this is also totally crude */ msleep(50); goto again; }out: return ret;}/* * locks that can be taken here: * dlm->spinlock * res->spinlock * mle->spinlock * dlm->master_list * * if possible, TRIM THIS DOWN!!! */int dlm_master_request_handler(struct o2net_msg *msg, u32 len, void *data, void **ret_data){ u8 response = DLM_MASTER_RESP_MAYBE; struct dlm_ctxt *dlm = data; struct dlm_lock_resource *res = NULL; struct dlm_master_request *request = (struct dlm_master_request *) msg->buf; struct dlm_master_list_entry *mle = NULL, *tmpmle = NULL; char *name; unsigned int namelen, hash; int found, ret; int set_maybe; int dispatch_assert = 0; if (!dlm_grab(dlm)) return DLM_MASTER_RESP_NO; if (!dlm_domain_fully_joined(dlm)) { response = DLM_MASTER_RESP_NO; goto send_response; } name = request->name; namelen = request->namelen; hash = dlm_lockid_hash(name, namelen); if (namelen > DLM_LOCKID_NAME_MAX) { response = DLM_IVBUFLEN; goto send_response; }way_up_top: spin_lock(&dlm->spinlock); res = __dlm_lookup_lockres(dlm, name, namelen, hash); if (res) { spin_unlock(&dlm->spinlock); /* take care of the easy cases up front */ spin_lock(&res->spinlock); if (res->state & (DLM_LOCK_RES_RECOVERING| DLM_LOCK_RES_MIGRATING)) { spin_unlock(&res->spinlock); mlog(0, "returning DLM_MASTER_RESP_ERROR since res is " "being recovered/migrated\n"); response = DLM_MASTER_RESP_ERROR; if (mle) kmem_cache_free(dlm_mle_cache, mle); goto send_response; } if (res->owner == dlm->node_num) { mlog(0, "%s:%.*s: setting bit %u in refmap\n", dlm->name, namelen, name, request->node_idx); dlm_lockres_set_refmap_bit(request->node_idx, res); spin_unlock(&res->spinlock); response = DLM_MASTER_RESP_YES; if (mle) kmem_cache_free(dlm_mle_cache, mle); /* this node is the owner. * there is some extra work that needs to * happen now. the requesting node has * caused all nodes up to this one to * create mles. this node now needs to * go back and clean those up. */ dispatch_assert = 1; goto send_response; } else if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) { spin_unlock(&res->spinlock); // mlog(0, "node %u is the master\n", res->owner); response = DLM_MASTER_RESP_NO; if (mle) kmem_cache_free(dlm_mle_cache, mle); goto send_response; } /* ok, there is no owner. either this node is * being blocked, or it is actively trying to * master this lock. */ if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) { mlog(ML_ERROR, "lock with no owner should be " "in-progress!\n"); BUG(); } // mlog(0, "lockres is in progress...\n"); spin_lock(&dlm->master_lock); found = dlm_find_mle(dlm, &tmpmle, name, namelen); if (!found) { mlog(ML_ERROR, "no mle found for this lock!\n"); BUG(); } set_maybe = 1; spin_lock(&tmpmle->spinlock); if (tmpmle->type == DLM_MLE_BLOCK) { // mlog(0, "this node is waiting for " // "lockres to be mastered\n"); response = DLM_MASTER_RESP_NO; } else if (tmpmle->type == DLM_MLE_MIGRATION) { mlog(0, "node %u is master, but trying to migrate to " "node %u.\n", tmpmle->master, tmpmle->new_master); if (tmpmle->master == dlm->node_num) { mlog(ML_ERROR, "no owner on lockres, but this " "node is trying to migrate it to %u?!\n", tmpmle->new_master); BUG(); } else { /* the real master can respond on its own */ response = DLM_MASTER_RESP_NO; } } else if (tmpmle->master != DLM_LOCK_RES_OWNER_UNKNOWN) { set_maybe = 0; if (tmpmle->master == dlm->node_num) { response = DLM_MASTER_RESP_YES; /* this node will be the owner. * go back and clean the mles on any * other nodes */ dispatch_assert = 1; dlm_lockres_set_refmap_bit(request->node_idx, res); mlog(0, "%s:%.*s: setting bit %u in refmap\n", dlm->name, namelen, name, request->node_idx); } else response = DLM_MASTER_RESP_NO; } else { // mlog(0, "this node is attempting to " // "master lockres\n"); response = DLM_MASTER_RESP_MAYBE; } if (set_maybe) set_bit(request->node_idx, tmpmle->maybe_map); spin_unlock(&tmpmle->spinlock); spin_unlock(&dlm->master_lock); spin_unlock(&res->spinlock); /* keep the mle attached to heartbeat events */ dlm_put_mle(tmpmle); if (mle) kmem_cache_free(dlm_mle_cache, mle); goto send_response; } /* * lockres doesn't exist on this node * if there is an MLE_BLOCK, return NO * if there is an MLE_MASTER, return MAYBE * otherwise, add an MLE_BLOCK, return NO */ spin_lock(&dlm->master_lock); found = dlm_find_mle(dlm, &tmpmle, name, namelen); if (!found) { /* this lockid has never been seen on this node yet */ // mlog(0, "no mle found\n"); if (!mle) { spin_unlock(&dlm->master_lock); spin_unlock(&dlm->spinlock); mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache, GFP_NOFS); if (!mle) { response = DLM_MASTER_RESP_ERROR; mlog_errno(-ENOMEM); goto send_response; } goto way_up_top; } // mlog(0, "this is second time thru, already allocated, " // "add the block.\n"); dlm_init_mle(mle, DLM_MLE_BLOCK, dlm, NULL, name, namelen); set_bit(request->node_idx, mle->maybe_map); list_add(&mle->list, &dlm->master_list); response = DLM_MASTER_RESP_NO; } else { // mlog(0, "mle was found\n"); set_maybe = 1; spin_lock(&tmpmle->spinlock); if (tmpmle->master == dlm->node_num) { mlog(ML_ERROR, "no lockres, but an mle with this node as master!\n"); BUG(); } if (tmpmle->type == DLM_MLE_BLOCK) response = DLM_MASTER_RESP_NO; else if (tmpmle->type == DLM_MLE_MIGRATION) { mlog(0, "migration mle was found (%u->%u)\n", tmpmle->master, tmpmle->new_master); /* real master can respond on its own */ response = DLM_MASTER_RESP_NO; } else response = DLM_MASTER_RESP_MAYBE; if (set_maybe) set_bit(request->node_idx, tmpmle->maybe_map); spin_unlock(&tmpmle->spinlock); } spin_unlock(&dlm->master_lock); spin_unlock(&dlm->spinlock); if (found) { /* keep the mle attached to heartbeat events */ dlm_put_mle(tmpmle); }send_response: if (dispatch_assert) { if (response != DLM_MASTER_RESP_YES) mlog(ML_ERROR, "invalid response %d\n", response); if (!res) { mlog(ML_ERROR, "bad lockres while trying to assert!\n"); BUG(); } mlog(0, "%u is the owner of %.*s, cleaning everyone else\n", dlm->node_num, res->lockname.len, res->lockname.name); ret = dlm_dispatch_assert_master(dlm, res, 0, request->node_idx, DLM_ASSERT_MASTER_MLE_CLEANUP); if (ret < 0) { mlog(ML_ERROR, "failed to dispatch assert master work\n"); response = DLM_MASTER_RESP_ERROR; } } dlm_put(dlm); return response;}/* * DLM_ASSERT_MASTER_MSG *//* * NOTE: this can be used for debugging * can periodically run all locks owned by this node * and re-assert across the cluster... */int dlm_do_assert_master(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, void *nodemap, u32 flags){ struct dlm_assert_master assert; int to, tmpret; struct dlm_node_iter iter; int ret = 0; int reassert; const char *lockname = res->lockname.name; unsigned int namelen = res->lockname.len; BUG_ON(namelen > O2NM_MAX_NAME_LEN); spin_lock(&res->spinlock); res->state |= DLM_LOCK_RES_SETREF_INPROG; spin_unlock(&res->spinlock);again: reassert = 0; /* note that if this nodemap is empty, it returns 0 */ dlm_node_iter_init(nodemap, &iter); while ((to = dlm_node_iter_next(&iter)) >= 0) { int r = 0; struct dlm_master_list_entry *mle = NULL; mlog(0, "sending assert master to %d (%.*s)\n", to, namelen, lockname); memset(&assert, 0, sizeof(assert)); assert.node_idx = dlm->node_num; assert.namelen = namelen; memcpy(assert.name, lockname, namelen); assert.flags = cpu_to_be32(flags); tmpret = o2net_send_message(DLM_ASSERT_MASTER_MSG, dlm->key, &assert, sizeof(assert), to, &r); if (tmpret < 0) { mlog(0, "assert_master returned %d!\n", tmpret); if (!dlm_is_host_down(tmpret)) { mlog(ML_ERROR, "unhandled error=%d!\n", tmpret); BUG(); } /* a node died. finish out the rest of the nodes. */ mlog(0, "link to %d went down!\n", to); /* any nonzero status return will do */ ret = tmpret; r = 0; } else if (r < 0) { /* ok, something horribly messed. kill thyself. */ mlog(ML_ERROR,"during assert master of %.*s to %u, " "got %d.\n", namelen, lockname, to, r); spin_lock(&dlm->spinlock); spin_lock(&dlm->master_lock); if (dlm_find_mle(dlm, &mle, (char *)lockname, namelen)) { dlm_print_one_mle(mle); __dlm_put_mle(mle); } spin_unlock(&dlm->master_lock); spin_unlock(&dlm->spinlock); BUG(); } if (r & DLM_ASSERT_RESPONSE_REASSERT && !(r & DLM_ASSERT_RESPONSE_MASTERY_REF)) { mlog(ML_ERROR, "%.*s: very strange, " "master MLE but no lockres on %u\n", namelen, lockname, to); } if (r & DLM_ASSERT_RESPONSE_REASSERT) { mlog(0, "%.*s: node %u create mles on other " "nodes and requests a re-assert\n", namelen, lockname, to); reassert = 1; } if (r & DLM_ASSERT_RESPONSE_MASTERY_REF) { mlog(0, "%.*s: node %u has a reference to this " "lockres, set the bit in the refmap\n", namelen, lockname, to); spin_lock(&res->spinlock); dlm_lockres_set_refmap_bit(to, res); spin_unlock(&res->spinlock); } } if (reassert) goto again; spin_lock(&res->spinlock); res->state &= ~DLM_LOCK_RES_SETREF_INPROG; spin_unlock(&res->spinlock); wake_up(&res->wq); return ret;}/* * locks that can be taken here: * dlm->spinlock * res->spinlock * mle->spinlock * dlm->master_list * * if possible, TRIM THIS DOWN!!! */int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data, void **ret_data){ struct dlm_ctxt *dlm = data; struct dlm_master_list_entry *mle = NULL; struct dlm_assert_master *assert = (struct dlm_assert_master *)msg->buf; struct dlm_lock_resource *res = NULL; char *name; unsigned int namelen, hash; u32 flags; int master_request = 0, have_lockres_ref = 0; int ret = 0; if (!dlm_grab(dlm)) return 0; name = assert->name; namelen = assert->namelen; hash = dlm_lockid_hash(name, namelen); flags = be32_to_cpu(assert->flags); if (namelen > DLM_LOCKID_NAME_MAX) { mlog(ML_ERROR, "Invalid name length!"); goto done; } spin_lock(&dlm->spinlock); if (flags) mlog(0, "assert_master with flags: %u\n", flags); /* find the MLE */ spin_lock(&dlm->master_lock); if (!dlm_find_mle(dlm, &mle, name, namelen)) { /* not an error, could be master just re-asserting */ mlog(0, "just got an assert_master from %u, but no " "MLE for it! (%.*s)\n", assert->node_idx, namelen, name); } else { int bit = find_next_bit (mle->maybe_map, O2NM_MAX_NODES, 0); if (bit >= O2NM_MAX_NODES) { /* not necessarily an error, though less likely. * could be master just re-asserting. */ mlog(0, "no bits set in the maybe_map, but %u " "is asserting! (%.*s)\n", assert->node_idx, namelen, name); } else if (bit != assert->node_idx) { if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) { mlog(0, "master %u was found, %u should " "back off\n", assert->node_idx, bit); } else { /* with the fix for bug 569, a higher node * number winning the mastery will respond * YES to mastery requests, but this node * had no way of knowing. let it pass. */ mlog(0, "%u is the lowest node, " "%u is asserting. (%.*s) %u must " "have begun after %u won.\n", bit, assert->node_idx, namelen, name, bit, assert->node_idx); } } if (mle->type == DLM_MLE_MIGRATION) { if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) { mlog(0, "%s:%.*s: got cleanup assert" " from %u for migration\n", dlm->name, namelen, name, assert->node_idx); } else if (!(flags & DLM_ASSERT_MASTER_FINISH_MIGRATION)) { mlog(0, "%s:%.*s: got unrelated assert" " from %u for migration, ignoring\n", dlm->name, namelen, name, assert->node_idx); __dlm_put_mle(mle); spin_unlock(&dlm->master_lock); spin_unlock(&dlm->spinlock); goto done; } } } spin_unlock(&dlm->master_lock); /* ok everything checks out with the MLE * now check to see if there is a lockres */ res = __dlm_lookup_lockres(dlm, name, namelen, hash); if (res) { spin_lock(&res->spinlock); if (res->state & DLM_LOCK_RES_RECOVERING) { mlog(ML_ERROR, "%u asserting but %.*s is " "RECOVERING!\n", assert->node_idx, namelen, name); goto kill; } if (!mle) { if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN && res->owner != assert->node_idx) { mlog(ML_ERROR, "assert_master from "
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -