📄 dlmmaster.c
字号:
}}static void dlm_mle_node_down(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle, struct o2nm_node *node, int idx){ spin_lock(&mle->spinlock); if (!test_bit(idx, mle->node_map)) mlog(0, "node %u already removed from nodemap!\n", idx); else clear_bit(idx, mle->node_map); spin_unlock(&mle->spinlock);}static void dlm_mle_node_up(struct dlm_ctxt *dlm, struct dlm_master_list_entry *mle, struct o2nm_node *node, int idx){ spin_lock(&mle->spinlock); if (test_bit(idx, mle->node_map)) mlog(0, "node %u already in node map!\n", idx); else set_bit(idx, mle->node_map); spin_unlock(&mle->spinlock);}int dlm_init_mle_cache(void){ dlm_mle_cache = kmem_cache_create("dlm_mle_cache", sizeof(struct dlm_master_list_entry), 0, SLAB_HWCACHE_ALIGN, NULL); if (dlm_mle_cache == NULL) return -ENOMEM; return 0;}void dlm_destroy_mle_cache(void){ if (dlm_mle_cache) kmem_cache_destroy(dlm_mle_cache);}static void dlm_mle_release(struct kref *kref){ struct dlm_master_list_entry *mle; struct dlm_ctxt *dlm; mlog_entry_void(); mle = container_of(kref, struct dlm_master_list_entry, mle_refs); dlm = mle->dlm; if (mle->type != DLM_MLE_MASTER) { mlog(0, "calling mle_release for %.*s, type %d\n", mle->u.name.len, mle->u.name.name, mle->type); } else { mlog(0, "calling mle_release for %.*s, type %d\n", mle->u.res->lockname.len, mle->u.res->lockname.name, mle->type); } assert_spin_locked(&dlm->spinlock); assert_spin_locked(&dlm->master_lock); /* remove from list if not already */ if (!list_empty(&mle->list)) list_del_init(&mle->list); /* detach the mle from the domain node up/down events */ __dlm_mle_detach_hb_events(dlm, mle); /* NOTE: kfree under spinlock here. * if this is bad, we can move this to a freelist. */ kmem_cache_free(dlm_mle_cache, mle);}/* * LOCK RESOURCE FUNCTIONS */static void dlm_set_lockres_owner(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, u8 owner){ assert_spin_locked(&res->spinlock); mlog_entry("%.*s, %u\n", res->lockname.len, res->lockname.name, owner); if (owner == dlm->node_num) atomic_inc(&dlm->local_resources); else if (owner == DLM_LOCK_RES_OWNER_UNKNOWN) atomic_inc(&dlm->unknown_resources); else atomic_inc(&dlm->remote_resources); res->owner = owner;}void dlm_change_lockres_owner(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, u8 owner){ assert_spin_locked(&res->spinlock); if (owner == res->owner) return; if (res->owner == dlm->node_num) atomic_dec(&dlm->local_resources); else if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) atomic_dec(&dlm->unknown_resources); else atomic_dec(&dlm->remote_resources); dlm_set_lockres_owner(dlm, res, owner);}static void dlm_lockres_release(struct kref *kref){ struct dlm_lock_resource *res; res = container_of(kref, struct dlm_lock_resource, refs); /* This should not happen -- all lockres' have a name * associated with them at init time. */ BUG_ON(!res->lockname.name); mlog(0, "destroying lockres %.*s\n", res->lockname.len, res->lockname.name); if (!hlist_unhashed(&res->hash_node) || !list_empty(&res->granted) || !list_empty(&res->converting) || !list_empty(&res->blocked) || !list_empty(&res->dirty) || !list_empty(&res->recovering) || !list_empty(&res->purge)) { mlog(ML_ERROR, "Going to BUG for resource %.*s." " We're on a list! [%c%c%c%c%c%c%c]\n", res->lockname.len, res->lockname.name, !hlist_unhashed(&res->hash_node) ? 'H' : ' ', !list_empty(&res->granted) ? 'G' : ' ', !list_empty(&res->converting) ? 'C' : ' ', !list_empty(&res->blocked) ? 'B' : ' ', !list_empty(&res->dirty) ? 'D' : ' ', !list_empty(&res->recovering) ? 'R' : ' ', !list_empty(&res->purge) ? 'P' : ' '); dlm_print_one_lock_resource(res); } /* By the time we're ready to blow this guy away, we shouldn't * be on any lists. */ BUG_ON(!hlist_unhashed(&res->hash_node)); BUG_ON(!list_empty(&res->granted)); BUG_ON(!list_empty(&res->converting)); BUG_ON(!list_empty(&res->blocked)); BUG_ON(!list_empty(&res->dirty)); BUG_ON(!list_empty(&res->recovering)); BUG_ON(!list_empty(&res->purge)); kfree(res->lockname.name); kfree(res);}void dlm_lockres_put(struct dlm_lock_resource *res){ kref_put(&res->refs, dlm_lockres_release);}static void dlm_init_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, const char *name, unsigned int namelen){ char *qname; /* If we memset here, we lose our reference to the kmalloc'd * res->lockname.name, so be sure to init every field * correctly! */ qname = (char *) res->lockname.name; memcpy(qname, name, namelen); res->lockname.len = namelen; res->lockname.hash = dlm_lockid_hash(name, namelen); init_waitqueue_head(&res->wq); spin_lock_init(&res->spinlock); INIT_HLIST_NODE(&res->hash_node); INIT_LIST_HEAD(&res->granted); INIT_LIST_HEAD(&res->converting); INIT_LIST_HEAD(&res->blocked); INIT_LIST_HEAD(&res->dirty); INIT_LIST_HEAD(&res->recovering); INIT_LIST_HEAD(&res->purge); atomic_set(&res->asts_reserved, 0); res->migration_pending = 0; res->inflight_locks = 0; kref_init(&res->refs); /* just for consistency */ spin_lock(&res->spinlock); dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN); spin_unlock(&res->spinlock); res->state = DLM_LOCK_RES_IN_PROGRESS; res->last_used = 0; memset(res->lvb, 0, DLM_LVB_LEN); memset(res->refmap, 0, sizeof(res->refmap));}struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm, const char *name, unsigned int namelen){ struct dlm_lock_resource *res; res = kmalloc(sizeof(struct dlm_lock_resource), GFP_NOFS); if (!res) return NULL; res->lockname.name = kmalloc(namelen, GFP_NOFS); if (!res->lockname.name) { kfree(res); return NULL; } dlm_init_lockres(dlm, res, name, namelen); return res;}void __dlm_lockres_grab_inflight_ref(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, int new_lockres, const char *file, int line){ if (!new_lockres) assert_spin_locked(&res->spinlock); if (!test_bit(dlm->node_num, res->refmap)) { BUG_ON(res->inflight_locks != 0); dlm_lockres_set_refmap_bit(dlm->node_num, res); } res->inflight_locks++; mlog(0, "%s:%.*s: inflight++: now %u\n", dlm->name, res->lockname.len, res->lockname.name, res->inflight_locks);}void __dlm_lockres_drop_inflight_ref(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, const char *file, int line){ assert_spin_locked(&res->spinlock); BUG_ON(res->inflight_locks == 0); res->inflight_locks--; mlog(0, "%s:%.*s: inflight--: now %u\n", dlm->name, res->lockname.len, res->lockname.name, res->inflight_locks); if (res->inflight_locks == 0) dlm_lockres_clear_refmap_bit(dlm->node_num, res); wake_up(&res->wq);}/* * lookup a lock resource by name. * may already exist in the hashtable. * lockid is null terminated * * if not, allocate enough for the lockres and for * the temporary structure used in doing the mastering. * * also, do a lookup in the dlm->master_list to see * if another node has begun mastering the same lock. * if so, there should be a block entry in there * for this name, and we should *not* attempt to master * the lock here. need to wait around for that node * to assert_master (or die). * */struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm, const char *lockid, int namelen, int flags){ struct dlm_lock_resource *tmpres=NULL, *res=NULL; struct dlm_master_list_entry *mle = NULL; struct dlm_master_list_entry *alloc_mle = NULL; int blocked = 0; int ret, nodenum; struct dlm_node_iter iter; unsigned int hash; int tries = 0; int bit, wait_on_recovery = 0; int drop_inflight_if_nonlocal = 0; BUG_ON(!lockid); hash = dlm_lockid_hash(lockid, namelen); mlog(0, "get lockres %s (len %d)\n", lockid, namelen);lookup: spin_lock(&dlm->spinlock); tmpres = __dlm_lookup_lockres_full(dlm, lockid, namelen, hash); if (tmpres) { int dropping_ref = 0; spin_lock(&tmpres->spinlock); if (tmpres->owner == dlm->node_num) { BUG_ON(tmpres->state & DLM_LOCK_RES_DROPPING_REF); dlm_lockres_grab_inflight_ref(dlm, tmpres); } else if (tmpres->state & DLM_LOCK_RES_DROPPING_REF) dropping_ref = 1; spin_unlock(&tmpres->spinlock); spin_unlock(&dlm->spinlock); /* wait until done messaging the master, drop our ref to allow * the lockres to be purged, start over. */ if (dropping_ref) { spin_lock(&tmpres->spinlock); __dlm_wait_on_lockres_flags(tmpres, DLM_LOCK_RES_DROPPING_REF); spin_unlock(&tmpres->spinlock); dlm_lockres_put(tmpres); tmpres = NULL; goto lookup; } mlog(0, "found in hash!\n"); if (res) dlm_lockres_put(res); res = tmpres; goto leave; } if (!res) { spin_unlock(&dlm->spinlock); mlog(0, "allocating a new resource\n"); /* nothing found and we need to allocate one. */ alloc_mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache, GFP_NOFS); if (!alloc_mle) goto leave; res = dlm_new_lockres(dlm, lockid, namelen); if (!res) goto leave; goto lookup; } mlog(0, "no lockres found, allocated our own: %p\n", res); if (flags & LKM_LOCAL) { /* caller knows it's safe to assume it's not mastered elsewhere * DONE! return right away */ spin_lock(&res->spinlock); dlm_change_lockres_owner(dlm, res, dlm->node_num); __dlm_insert_lockres(dlm, res); dlm_lockres_grab_inflight_ref(dlm, res); spin_unlock(&res->spinlock); spin_unlock(&dlm->spinlock); /* lockres still marked IN_PROGRESS */ goto wake_waiters; } /* check master list to see if another node has started mastering it */ spin_lock(&dlm->master_lock); /* if we found a block, wait for lock to be mastered by another node */ blocked = dlm_find_mle(dlm, &mle, (char *)lockid, namelen); if (blocked) { int mig; if (mle->type == DLM_MLE_MASTER) { mlog(ML_ERROR, "master entry for nonexistent lock!\n"); BUG(); } mig = (mle->type == DLM_MLE_MIGRATION); /* if there is a migration in progress, let the migration * finish before continuing. we can wait for the absence * of the MIGRATION mle: either the migrate finished or * one of the nodes died and the mle was cleaned up. * if there is a BLOCK here, but it already has a master * set, we are too late. the master does not have a ref * for us in the refmap. detach the mle and drop it. * either way, go back to the top and start over. */ if (mig || mle->master != O2NM_MAX_NODES) { BUG_ON(mig && mle->master == dlm->node_num); /* we arrived too late. the master does not * have a ref for us. retry. */ mlog(0, "%s:%.*s: late on %s\n", dlm->name, namelen, lockid, mig ? "MIGRATION" : "BLOCK"); spin_unlock(&dlm->master_lock); spin_unlock(&dlm->spinlock); /* master is known, detach */ if (!mig) dlm_mle_detach_hb_events(dlm, mle); dlm_put_mle(mle); mle = NULL; /* this is lame, but we cant wait on either * the mle or lockres waitqueue here */ if (mig) msleep(100); goto lookup; } } else { /* go ahead and try to master lock on this node */ mle = alloc_mle; /* make sure this does not get freed below */ alloc_mle = NULL; dlm_init_mle(mle, DLM_MLE_MASTER, dlm, res, NULL, 0); set_bit(dlm->node_num, mle->maybe_map); list_add(&mle->list, &dlm->master_list); /* still holding the dlm spinlock, check the recovery map * to see if there are any nodes that still need to be * considered. these will not appear in the mle nodemap * but they might own this lockres. wait on them. */ bit = find_next_bit(dlm->recovery_map, O2NM_MAX_NODES, 0); if (bit < O2NM_MAX_NODES) { mlog(ML_NOTICE, "%s:%.*s: at least one node (%d) to " "recover before lock mastery can begin\n", dlm->name, namelen, (char *)lockid, bit); wait_on_recovery = 1; } } /* at this point there is either a DLM_MLE_BLOCK or a * DLM_MLE_MASTER on the master list, so it's safe to add the * lockres to the hashtable. anyone who finds the lock will * still have to wait on the IN_PROGRESS. */ /* finally add the lockres to its hash bucket */ __dlm_insert_lockres(dlm, res); /* since this lockres is new it doesnt not require the spinlock */ dlm_lockres_grab_inflight_ref_new(dlm, res); /* if this node does not become the master make sure to drop * this inflight reference below */ drop_inflight_if_nonlocal = 1; /* get an extra ref on the mle in case this is a BLOCK * if so, the creator of the BLOCK may try to put the last * ref at this time in the assert master handler, so we * need an extra one to keep from a bad ptr deref. */ dlm_get_mle_inuse(mle); spin_unlock(&dlm->master_lock); spin_unlock(&dlm->spinlock);redo_request: while (wait_on_recovery) { /* any cluster changes that occurred after dropping the * dlm spinlock would be detectable be a change on the mle, * so we only need to clear out the recovery map once. */ if (dlm_is_recovery_lock(lockid, namelen)) { mlog(ML_NOTICE, "%s: recovery map is not empty, but " "must master $RECOVERY lock now\n", dlm->name); if (!dlm_pre_master_reco_lockres(dlm, res)) wait_on_recovery = 0; else {
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -