📄 dlmmaster.c
字号:
"%u, but current owner is " "%u! (%.*s)\n", assert->node_idx, res->owner, namelen, name); goto kill; } } else if (mle->type != DLM_MLE_MIGRATION) { if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) { /* owner is just re-asserting */ if (res->owner == assert->node_idx) { mlog(0, "owner %u re-asserting on " "lock %.*s\n", assert->node_idx, namelen, name); goto ok; } mlog(ML_ERROR, "got assert_master from " "node %u, but %u is the owner! " "(%.*s)\n", assert->node_idx, res->owner, namelen, name); goto kill; } if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) { mlog(ML_ERROR, "got assert from %u, but lock " "with no owner should be " "in-progress! (%.*s)\n", assert->node_idx, namelen, name); goto kill; } } else /* mle->type == DLM_MLE_MIGRATION */ { /* should only be getting an assert from new master */ if (assert->node_idx != mle->new_master) { mlog(ML_ERROR, "got assert from %u, but " "new master is %u, and old master " "was %u (%.*s)\n", assert->node_idx, mle->new_master, mle->master, namelen, name); goto kill; } }ok: spin_unlock(&res->spinlock); } spin_unlock(&dlm->spinlock); // mlog(0, "woo! got an assert_master from node %u!\n", // assert->node_idx); if (mle) { int extra_ref = 0; int nn = -1; int rr, err = 0; spin_lock(&mle->spinlock); if (mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION) extra_ref = 1; else { /* MASTER mle: if any bits set in the response map * then the calling node needs to re-assert to clear * up nodes that this node contacted */ while ((nn = find_next_bit (mle->response_map, O2NM_MAX_NODES, nn+1)) < O2NM_MAX_NODES) { if (nn != dlm->node_num && nn != assert->node_idx) master_request = 1; } } mle->master = assert->node_idx; atomic_set(&mle->woken, 1); wake_up(&mle->wq); spin_unlock(&mle->spinlock); if (res) { int wake = 0; spin_lock(&res->spinlock); if (mle->type == DLM_MLE_MIGRATION) { mlog(0, "finishing off migration of lockres %.*s, " "from %u to %u\n", res->lockname.len, res->lockname.name, dlm->node_num, mle->new_master); res->state &= ~DLM_LOCK_RES_MIGRATING; wake = 1; dlm_change_lockres_owner(dlm, res, mle->new_master); BUG_ON(res->state & DLM_LOCK_RES_DIRTY); } else { dlm_change_lockres_owner(dlm, res, mle->master); } spin_unlock(&res->spinlock); have_lockres_ref = 1; if (wake) wake_up(&res->wq); } /* master is known, detach if not already detached. * ensures that only one assert_master call will happen * on this mle. */ spin_lock(&dlm->spinlock); spin_lock(&dlm->master_lock); rr = atomic_read(&mle->mle_refs.refcount); if (mle->inuse > 0) { if (extra_ref && rr < 3) err = 1; else if (!extra_ref && rr < 2) err = 1; } else { if (extra_ref && rr < 2) err = 1; else if (!extra_ref && rr < 1) err = 1; } if (err) { mlog(ML_ERROR, "%s:%.*s: got assert master from %u " "that will mess up this node, refs=%d, extra=%d, " "inuse=%d\n", dlm->name, namelen, name, assert->node_idx, rr, extra_ref, mle->inuse); dlm_print_one_mle(mle); } list_del_init(&mle->list); __dlm_mle_detach_hb_events(dlm, mle); __dlm_put_mle(mle); if (extra_ref) { /* the assert master message now balances the extra * ref given by the master / migration request message. * if this is the last put, it will be removed * from the list. */ __dlm_put_mle(mle); } spin_unlock(&dlm->master_lock); spin_unlock(&dlm->spinlock); } else if (res) { if (res->owner != assert->node_idx) { mlog(0, "assert_master from %u, but current " "owner is %u (%.*s), no mle\n", assert->node_idx, res->owner, namelen, name); } }done: ret = 0; if (res) { spin_lock(&res->spinlock); res->state |= DLM_LOCK_RES_SETREF_INPROG; spin_unlock(&res->spinlock); *ret_data = (void *)res; } dlm_put(dlm); if (master_request) { mlog(0, "need to tell master to reassert\n"); /* positive. negative would shoot down the node. */ ret |= DLM_ASSERT_RESPONSE_REASSERT; if (!have_lockres_ref) { mlog(ML_ERROR, "strange, got assert from %u, MASTER " "mle present here for %s:%.*s, but no lockres!\n", assert->node_idx, dlm->name, namelen, name); } } if (have_lockres_ref) { /* let the master know we have a reference to the lockres */ ret |= DLM_ASSERT_RESPONSE_MASTERY_REF; mlog(0, "%s:%.*s: got assert from %u, need a ref\n", dlm->name, namelen, name, assert->node_idx); } return ret;kill: /* kill the caller! */ mlog(ML_ERROR, "Bad message received from another node. Dumping state " "and killing the other node now! This node is OK and can continue.\n"); __dlm_print_one_lock_resource(res); spin_unlock(&res->spinlock); spin_unlock(&dlm->spinlock); *ret_data = (void *)res; dlm_put(dlm); return -EINVAL;}void dlm_assert_master_post_handler(int status, void *data, void *ret_data){ struct dlm_lock_resource *res = (struct dlm_lock_resource *)ret_data; if (ret_data) { spin_lock(&res->spinlock); res->state &= ~DLM_LOCK_RES_SETREF_INPROG; spin_unlock(&res->spinlock); wake_up(&res->wq); dlm_lockres_put(res); } return;}int dlm_dispatch_assert_master(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, int ignore_higher, u8 request_from, u32 flags){ struct dlm_work_item *item; item = kzalloc(sizeof(*item), GFP_NOFS); if (!item) return -ENOMEM; /* queue up work for dlm_assert_master_worker */ dlm_grab(dlm); /* get an extra ref for the work item */ dlm_init_work_item(dlm, item, dlm_assert_master_worker, NULL); item->u.am.lockres = res; /* already have a ref */ /* can optionally ignore node numbers higher than this node */ item->u.am.ignore_higher = ignore_higher; item->u.am.request_from = request_from; item->u.am.flags = flags; if (ignore_higher) mlog(0, "IGNORE HIGHER: %.*s\n", res->lockname.len, res->lockname.name); spin_lock(&dlm->work_lock); list_add_tail(&item->list, &dlm->work_list); spin_unlock(&dlm->work_lock); queue_work(dlm->dlm_worker, &dlm->dispatched_work); return 0;}static void dlm_assert_master_worker(struct dlm_work_item *item, void *data){ struct dlm_ctxt *dlm = data; int ret = 0; struct dlm_lock_resource *res; unsigned long nodemap[BITS_TO_LONGS(O2NM_MAX_NODES)]; int ignore_higher; int bit; u8 request_from; u32 flags; dlm = item->dlm; res = item->u.am.lockres; ignore_higher = item->u.am.ignore_higher; request_from = item->u.am.request_from; flags = item->u.am.flags; spin_lock(&dlm->spinlock); memcpy(nodemap, dlm->domain_map, sizeof(nodemap)); spin_unlock(&dlm->spinlock); clear_bit(dlm->node_num, nodemap); if (ignore_higher) { /* if is this just to clear up mles for nodes below * this node, do not send the message to the original * caller or any node number higher than this */ clear_bit(request_from, nodemap); bit = dlm->node_num; while (1) { bit = find_next_bit(nodemap, O2NM_MAX_NODES, bit+1); if (bit >= O2NM_MAX_NODES) break; clear_bit(bit, nodemap); } } /* * If we're migrating this lock to someone else, we are no * longer allowed to assert out own mastery. OTOH, we need to * prevent migration from starting while we're still asserting * our dominance. The reserved ast delays migration. */ spin_lock(&res->spinlock); if (res->state & DLM_LOCK_RES_MIGRATING) { mlog(0, "Someone asked us to assert mastery, but we're " "in the middle of migration. Skipping assert, " "the new master will handle that.\n"); spin_unlock(&res->spinlock); goto put; } else __dlm_lockres_reserve_ast(res); spin_unlock(&res->spinlock); /* this call now finishes out the nodemap * even if one or more nodes die */ mlog(0, "worker about to master %.*s here, this=%u\n", res->lockname.len, res->lockname.name, dlm->node_num); ret = dlm_do_assert_master(dlm, res, nodemap, flags); if (ret < 0) { /* no need to restart, we are done */ if (!dlm_is_host_down(ret)) mlog_errno(ret); } /* Ok, we've asserted ourselves. Let's let migration start. */ dlm_lockres_release_ast(dlm, res);put: dlm_lockres_put(res); mlog(0, "finished with dlm_assert_master_worker\n");}/* SPECIAL CASE for the $RECOVERY lock used by the recovery thread. * We cannot wait for node recovery to complete to begin mastering this * lockres because this lockres is used to kick off recovery! ;-) * So, do a pre-check on all living nodes to see if any of those nodes * think that $RECOVERY is currently mastered by a dead node. If so, * we wait a short time to allow that node to get notified by its own * heartbeat stack, then check again. All $RECOVERY lock resources * mastered by dead nodes are purged when the hearbeat callback is * fired, so we can know for sure that it is safe to continue once * the node returns a live node or no node. */static int dlm_pre_master_reco_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res){ struct dlm_node_iter iter; int nodenum; int ret = 0; u8 master = DLM_LOCK_RES_OWNER_UNKNOWN; spin_lock(&dlm->spinlock); dlm_node_iter_init(dlm->domain_map, &iter); spin_unlock(&dlm->spinlock); while ((nodenum = dlm_node_iter_next(&iter)) >= 0) { /* do not send to self */ if (nodenum == dlm->node_num) continue; ret = dlm_do_master_requery(dlm, res, nodenum, &master); if (ret < 0) { mlog_errno(ret); if (!dlm_is_host_down(ret)) BUG(); /* host is down, so answer for that node would be * DLM_LOCK_RES_OWNER_UNKNOWN. continue. */ ret = 0; } if (master != DLM_LOCK_RES_OWNER_UNKNOWN) { /* check to see if this master is in the recovery map */ spin_lock(&dlm->spinlock); if (test_bit(master, dlm->recovery_map)) { mlog(ML_NOTICE, "%s: node %u has not seen " "node %u go down yet, and thinks the " "dead node is mastering the recovery " "lock. must wait.\n", dlm->name, nodenum, master); ret = -EAGAIN; } spin_unlock(&dlm->spinlock); mlog(0, "%s: reco lock master is %u\n", dlm->name, master); break; } } return ret;}/* * DLM_DEREF_LOCKRES_MSG */int dlm_drop_lockres_ref(struct dlm_ctxt *dlm, struct dlm_lock_resource *res){ struct dlm_deref_lockres deref; int ret = 0, r; const char *lockname; unsigned int namelen; lockname = res->lockname.name; namelen = res->lockname.len; BUG_ON(namelen > O2NM_MAX_NAME_LEN); mlog(0, "%s:%.*s: sending deref to %d\n", dlm->name, namelen, lockname, res->owner); memset(&deref, 0, sizeof(deref)); deref.node_idx = dlm->node_num; deref.namelen = namelen; memcpy(deref.name, lockname, namelen); ret = o2net_send_message(DLM_DEREF_LOCKRES_MSG, dlm->key, &deref, sizeof(deref), res->owner, &r); if (ret < 0) mlog_errno(ret); else if (r < 0) { /* BAD. other node says I did not have a ref. */ mlog(ML_ERROR,"while dropping ref on %s:%.*s " "(master=%u) got %d.\n", dlm->name, namelen, lockname, res->owner, r); dlm_print_one_lock_resource(res); BUG(); } return ret;}int dlm_deref_lockres_handler(struct o2net_msg *msg, u32 len, void *data, void **ret_data){ struct dlm_ctxt *dlm = data; struct dlm_deref_lockres *deref = (struct dlm_deref_lockres *)msg->buf; struct dlm_lock_resource *res = NULL; char *name; unsigned int namelen; int ret = -EINVAL; u8 node; unsigned int hash; struct dlm_work_item *item; int cleared = 0; int dispatch = 0; if (!dlm_grab(dlm)) return 0; name = deref->name; namelen = deref->namelen; node = deref->node_idx; if (namelen > DLM_LOCKID_NAME_MAX) { mlog(ML_ERROR, "Invalid name length!"); goto done; } if (deref->node_idx >= O2NM_MAX_NODES) { mlog(ML_ERROR, "Invalid node number: %u\n", node); goto done; } hash = dlm_lockid_hash(name, namelen); spin_lock(&dlm->spinlock); res = __dlm_lookup_lockres_full(dlm, name, namelen, hash); if (!res) { spin_unlock(&dlm->spinlock); mlog(ML_ERROR, "%s:%.*s: bad lockres name\n", dlm->name, namelen, name); goto done; } spin_unlock(&dlm->spinlock); spin_lock(&res->spinlock); if (res->state & DLM_LOCK_RES_SETREF_INPROG) dispatch = 1; else { BUG_ON(res->state & DLM_LOCK_RES_DROPPING_REF); if (test_bit(node, res->refmap)) { dlm_lockres_clear_refmap_bit(node, res); cleared = 1; } } spin_unlock(&res->spinlock); if (!dispatch) { if (cleared) dlm_lockres_calc_usage(dlm, res); else { mlog(ML_ERROR, "%s:%.*s: node %u trying to drop ref " "but it is already dropped!\n", dlm->name, res->lockname.len, res->lockname.name, node); __dlm_print_one_lock_resource(res); } ret = 0; goto done; } item = kzalloc(sizeof(*item), GFP_NOFS); if (!item) { ret = -ENOMEM; mlog_errno(ret); goto done; } dlm_init_work_item(dlm, item, dlm_deref_lockres_worker, NULL); item->u.dl.deref_res = res; item->u.dl.deref_node = node; spin_lock(&dlm->work_lock); list_add_tail(&item->list, &dlm->work_list); spin_unlock(&dlm->work_lock); queue_work(dlm->dlm_worker, &dlm->dispatched_work); ret
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -