📄 dlmmaster.c
字号:
*/static int dlm_do_assert_master(struct dlm_ctxt *dlm, const char *lockname, unsigned int namelen, void *nodemap, u32 flags){ struct dlm_assert_master assert; int to, tmpret; struct dlm_node_iter iter; int ret = 0; BUG_ON(namelen > O2NM_MAX_NAME_LEN); /* note that if this nodemap is empty, it returns 0 */ dlm_node_iter_init(nodemap, &iter); while ((to = dlm_node_iter_next(&iter)) >= 0) { int r = 0; mlog(0, "sending assert master to %d (%.*s)\n", to, namelen, lockname); memset(&assert, 0, sizeof(assert)); assert.node_idx = dlm->node_num; assert.namelen = namelen; memcpy(assert.name, lockname, namelen); assert.flags = cpu_to_be32(flags); tmpret = o2net_send_message(DLM_ASSERT_MASTER_MSG, dlm->key, &assert, sizeof(assert), to, &r); if (tmpret < 0) { mlog(ML_ERROR, "assert_master returned %d!\n", tmpret); if (!dlm_is_host_down(tmpret)) { mlog(ML_ERROR, "unhandled error!\n"); BUG(); } /* a node died. finish out the rest of the nodes. */ mlog(ML_ERROR, "link to %d went down!\n", to); /* any nonzero status return will do */ ret = tmpret; } else if (r < 0) { /* ok, something horribly messed. kill thyself. */ mlog(ML_ERROR,"during assert master of %.*s to %u, " "got %d.\n", namelen, lockname, to, r); dlm_dump_lock_resources(dlm); BUG(); } } return ret;}/* * locks that can be taken here: * dlm->spinlock * res->spinlock * mle->spinlock * dlm->master_list * * if possible, TRIM THIS DOWN!!! */int dlm_assert_master_handler(struct o2net_msg *msg, u32 len, void *data){ struct dlm_ctxt *dlm = data; struct dlm_master_list_entry *mle = NULL; struct dlm_assert_master *assert = (struct dlm_assert_master *)msg->buf; struct dlm_lock_resource *res = NULL; char *name; unsigned int namelen; u32 flags; if (!dlm_grab(dlm)) return 0; name = assert->name; namelen = assert->namelen; flags = be32_to_cpu(assert->flags); if (namelen > DLM_LOCKID_NAME_MAX) { mlog(ML_ERROR, "Invalid name length!"); goto done; } spin_lock(&dlm->spinlock); if (flags) mlog(0, "assert_master with flags: %u\n", flags); /* find the MLE */ spin_lock(&dlm->master_lock); if (!dlm_find_mle(dlm, &mle, name, namelen)) { /* not an error, could be master just re-asserting */ mlog(0, "just got an assert_master from %u, but no " "MLE for it! (%.*s)\n", assert->node_idx, namelen, name); } else { int bit = find_next_bit (mle->maybe_map, O2NM_MAX_NODES, 0); if (bit >= O2NM_MAX_NODES) { /* not necessarily an error, though less likely. * could be master just re-asserting. */ mlog(ML_ERROR, "no bits set in the maybe_map, but %u " "is asserting! (%.*s)\n", assert->node_idx, namelen, name); } else if (bit != assert->node_idx) { if (flags & DLM_ASSERT_MASTER_MLE_CLEANUP) { mlog(0, "master %u was found, %u should " "back off\n", assert->node_idx, bit); } else { /* with the fix for bug 569, a higher node * number winning the mastery will respond * YES to mastery requests, but this node * had no way of knowing. let it pass. */ mlog(ML_ERROR, "%u is the lowest node, " "%u is asserting. (%.*s) %u must " "have begun after %u won.\n", bit, assert->node_idx, namelen, name, bit, assert->node_idx); } } } spin_unlock(&dlm->master_lock); /* ok everything checks out with the MLE * now check to see if there is a lockres */ res = __dlm_lookup_lockres(dlm, name, namelen); if (res) { spin_lock(&res->spinlock); if (res->state & DLM_LOCK_RES_RECOVERING) { mlog(ML_ERROR, "%u asserting but %.*s is " "RECOVERING!\n", assert->node_idx, namelen, name); goto kill; } if (!mle) { if (res->owner != assert->node_idx) { mlog(ML_ERROR, "assert_master from " "%u, but current owner is " "%u! (%.*s)\n", assert->node_idx, res->owner, namelen, name); goto kill; } } else if (mle->type != DLM_MLE_MIGRATION) { if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) { /* owner is just re-asserting */ if (res->owner == assert->node_idx) { mlog(0, "owner %u re-asserting on " "lock %.*s\n", assert->node_idx, namelen, name); goto ok; } mlog(ML_ERROR, "got assert_master from " "node %u, but %u is the owner! " "(%.*s)\n", assert->node_idx, res->owner, namelen, name); goto kill; } if (!(res->state & DLM_LOCK_RES_IN_PROGRESS)) { mlog(ML_ERROR, "got assert from %u, but lock " "with no owner should be " "in-progress! (%.*s)\n", assert->node_idx, namelen, name); goto kill; } } else /* mle->type == DLM_MLE_MIGRATION */ { /* should only be getting an assert from new master */ if (assert->node_idx != mle->new_master) { mlog(ML_ERROR, "got assert from %u, but " "new master is %u, and old master " "was %u (%.*s)\n", assert->node_idx, mle->new_master, mle->master, namelen, name); goto kill; } }ok: spin_unlock(&res->spinlock); } spin_unlock(&dlm->spinlock); // mlog(0, "woo! got an assert_master from node %u!\n", // assert->node_idx); if (mle) { int extra_ref; spin_lock(&mle->spinlock); extra_ref = !!(mle->type == DLM_MLE_BLOCK || mle->type == DLM_MLE_MIGRATION); mle->master = assert->node_idx; atomic_set(&mle->woken, 1); wake_up(&mle->wq); spin_unlock(&mle->spinlock); if (mle->type == DLM_MLE_MIGRATION && res) { mlog(0, "finishing off migration of lockres %.*s, " "from %u to %u\n", res->lockname.len, res->lockname.name, dlm->node_num, mle->new_master); spin_lock(&res->spinlock); res->state &= ~DLM_LOCK_RES_MIGRATING; dlm_change_lockres_owner(dlm, res, mle->new_master); BUG_ON(res->state & DLM_LOCK_RES_DIRTY); spin_unlock(&res->spinlock); } /* master is known, detach if not already detached */ dlm_mle_detach_hb_events(dlm, mle); dlm_put_mle(mle); if (extra_ref) { /* the assert master message now balances the extra * ref given by the master / migration request message. * if this is the last put, it will be removed * from the list. */ dlm_put_mle(mle); } }done: if (res) dlm_lockres_put(res); dlm_put(dlm); return 0;kill: /* kill the caller! */ spin_unlock(&res->spinlock); spin_unlock(&dlm->spinlock); dlm_lockres_put(res); mlog(ML_ERROR, "Bad message received from another node. Dumping state " "and killing the other node now! This node is OK and can continue.\n"); dlm_dump_lock_resources(dlm); dlm_put(dlm); return -EINVAL;}int dlm_dispatch_assert_master(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, int ignore_higher, u8 request_from, u32 flags){ struct dlm_work_item *item; item = kcalloc(1, sizeof(*item), GFP_KERNEL); if (!item) return -ENOMEM; /* queue up work for dlm_assert_master_worker */ dlm_grab(dlm); /* get an extra ref for the work item */ dlm_init_work_item(dlm, item, dlm_assert_master_worker, NULL); item->u.am.lockres = res; /* already have a ref */ /* can optionally ignore node numbers higher than this node */ item->u.am.ignore_higher = ignore_higher; item->u.am.request_from = request_from; item->u.am.flags = flags; spin_lock(&dlm->work_lock); list_add_tail(&item->list, &dlm->work_list); spin_unlock(&dlm->work_lock); schedule_work(&dlm->dispatched_work); return 0;}static void dlm_assert_master_worker(struct dlm_work_item *item, void *data){ struct dlm_ctxt *dlm = data; int ret = 0; struct dlm_lock_resource *res; unsigned long nodemap[BITS_TO_LONGS(O2NM_MAX_NODES)]; int ignore_higher; int bit; u8 request_from; u32 flags; dlm = item->dlm; res = item->u.am.lockres; ignore_higher = item->u.am.ignore_higher; request_from = item->u.am.request_from; flags = item->u.am.flags; spin_lock(&dlm->spinlock); memcpy(nodemap, dlm->domain_map, sizeof(nodemap)); spin_unlock(&dlm->spinlock); clear_bit(dlm->node_num, nodemap); if (ignore_higher) { /* if is this just to clear up mles for nodes below * this node, do not send the message to the original * caller or any node number higher than this */ clear_bit(request_from, nodemap); bit = dlm->node_num; while (1) { bit = find_next_bit(nodemap, O2NM_MAX_NODES, bit+1); if (bit >= O2NM_MAX_NODES) break; clear_bit(bit, nodemap); } } /* this call now finishes out the nodemap * even if one or more nodes die */ mlog(0, "worker about to master %.*s here, this=%u\n", res->lockname.len, res->lockname.name, dlm->node_num); ret = dlm_do_assert_master(dlm, res->lockname.name, res->lockname.len, nodemap, flags); if (ret < 0) { /* no need to restart, we are done */ mlog_errno(ret); } dlm_lockres_put(res); mlog(0, "finished with dlm_assert_master_worker\n");}/* * DLM_MIGRATE_LOCKRES */int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, u8 target){ struct dlm_master_list_entry *mle = NULL; struct dlm_master_list_entry *oldmle = NULL; struct dlm_migratable_lockres *mres = NULL; int ret = -EINVAL; const char *name; unsigned int namelen; int mle_added = 0; struct list_head *queue, *iter; int i; struct dlm_lock *lock; int empty = 1; if (!dlm_grab(dlm)) return -EINVAL; name = res->lockname.name; namelen = res->lockname.len; mlog(0, "migrating %.*s to %u\n", namelen, name, target); /* * ensure this lockres is a proper candidate for migration */ spin_lock(&res->spinlock); if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) { mlog(0, "cannot migrate lockres with unknown owner!\n"); spin_unlock(&res->spinlock); goto leave; } if (res->owner != dlm->node_num) { mlog(0, "cannot migrate lockres this node doesn't own!\n"); spin_unlock(&res->spinlock); goto leave; } mlog(0, "checking queues...\n"); queue = &res->granted; for (i=0; i<3; i++) { list_for_each(iter, queue) { lock = list_entry (iter, struct dlm_lock, list); empty = 0; if (lock->ml.node == dlm->node_num) { mlog(0, "found a lock owned by this node " "still on the %s queue! will not " "migrate this lockres\n", i==0 ? "granted" : (i==1 ? "converting" : "blocked")); spin_unlock(&res->spinlock); ret = -ENOTEMPTY; goto leave; } } queue++; } mlog(0, "all locks on this lockres are nonlocal. continuing\n"); spin_unlock(&res->spinlock); /* no work to do */ if (empty) { mlog(0, "no locks were found on this lockres! done!\n"); ret = 0; goto leave; } /* * preallocate up front * if this fails, abort */ ret = -ENOMEM; mres = (struct dlm_migratable_lockres *) __get_free_page(GFP_KERNEL); if (!mres) { mlog_errno(ret); goto leave; } mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache, GFP_KERNEL); if (!mle) { mlog_errno(ret); goto leave; } ret = 0; /* * find a node to migrate the lockres to */ mlog(0, "picking a migration node\n"); spin_lock(&dlm->spinlock); /* pick a new node */ if (!test_bit(target, dlm->domain_map) || target >= O2NM_MAX_NODES) { target = dlm_pick_migration_target(dlm, res); } mlog(0, "node %u chosen for migration\n", target); if (target >= O2NM_MAX_NODES || !test_bit(target, dlm->domain_map)) { /* target chosen is not alive */ ret = -EINVAL; } if (ret) { spin_unlock(&dlm->spinlock); goto fail; } mlog(0, "continuing with target = %u\n", target); /* * clear any existing master requests and * add the migration mle to the list */ spin_lock(&dlm->master_lock); ret = dlm_add_migration_mle(dlm, res, mle, &oldmle, name, namelen, target, dlm->node_num); spin_unlock(&dlm->master_lock); spin_unlock(&dlm->spinlock); if (ret == -EEXIST) { mlog(0, "another process is already migrating it\n"); goto fail; } mle_added = 1; /* * set the MIGRATING flag and flush asts * if we fail after this we need to re-dirty the lockres */ if (dlm_mark_lockres_migrating(dlm, res, target) < 0) { mlog(ML_ERROR, "tried to migrate %.*s to %u, but " "the target went down.\n", res->lockname.len, res->lockname.name, target); spin_lock(&res->spinlock); res->state &= ~DLM_LOCK_RES_MIGRATING; spin_unlock(&res->spinlock); ret = -EINVAL; }fail: if (oldmle) { /* master is known, detach if not already detached */ dlm_mle_detach_hb_events(dlm, oldmle); dlm_put_mle(oldmle); } if (ret < 0) { if (mle_added) { dlm_mle_detach_hb_events(dlm, mle); dlm_put_mle(mle); } else if (mle) { kmem_cache_free(dlm_mle_cache, mle); } goto leave; } /* * at this point, we have a migration target, an mle * in the master list, and the MIGRATING flag set on * the lockres */ /* get an extra reference on the mle. * otherwise the assert_master from the new * master will destroy this.
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -