📄 dlmmaster.c
字号:
mle->u.name.len, mle->u.name.name, mle->type); } else { mlog(0, "calling mle_release for %.*s, type %d\n", mle->u.res->lockname.len, mle->u.res->lockname.name, mle->type); } assert_spin_locked(&dlm->spinlock); assert_spin_locked(&dlm->master_lock); /* remove from list if not already */ if (!list_empty(&mle->list)) list_del_init(&mle->list); /* detach the mle from the domain node up/down events */ __dlm_mle_detach_hb_events(dlm, mle); /* NOTE: kfree under spinlock here. * if this is bad, we can move this to a freelist. */ kmem_cache_free(dlm_mle_cache, mle);}/* * LOCK RESOURCE FUNCTIONS */static void dlm_set_lockres_owner(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, u8 owner){ assert_spin_locked(&res->spinlock); mlog_entry("%.*s, %u\n", res->lockname.len, res->lockname.name, owner); if (owner == dlm->node_num) atomic_inc(&dlm->local_resources); else if (owner == DLM_LOCK_RES_OWNER_UNKNOWN) atomic_inc(&dlm->unknown_resources); else atomic_inc(&dlm->remote_resources); res->owner = owner;}void dlm_change_lockres_owner(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, u8 owner){ assert_spin_locked(&res->spinlock); if (owner == res->owner) return; if (res->owner == dlm->node_num) atomic_dec(&dlm->local_resources); else if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) atomic_dec(&dlm->unknown_resources); else atomic_dec(&dlm->remote_resources); dlm_set_lockres_owner(dlm, res, owner);}static void dlm_lockres_release(struct kref *kref){ struct dlm_lock_resource *res; res = container_of(kref, struct dlm_lock_resource, refs); /* This should not happen -- all lockres' have a name * associated with them at init time. */ BUG_ON(!res->lockname.name); mlog(0, "destroying lockres %.*s\n", res->lockname.len, res->lockname.name); /* By the time we're ready to blow this guy away, we shouldn't * be on any lists. */ BUG_ON(!hlist_unhashed(&res->hash_node)); BUG_ON(!list_empty(&res->granted)); BUG_ON(!list_empty(&res->converting)); BUG_ON(!list_empty(&res->blocked)); BUG_ON(!list_empty(&res->dirty)); BUG_ON(!list_empty(&res->recovering)); BUG_ON(!list_empty(&res->purge)); kfree(res->lockname.name); kfree(res);}void dlm_lockres_get(struct dlm_lock_resource *res){ kref_get(&res->refs);}void dlm_lockres_put(struct dlm_lock_resource *res){ kref_put(&res->refs, dlm_lockres_release);}static void dlm_init_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, const char *name, unsigned int namelen){ char *qname; /* If we memset here, we lose our reference to the kmalloc'd * res->lockname.name, so be sure to init every field * correctly! */ qname = (char *) res->lockname.name; memcpy(qname, name, namelen); res->lockname.len = namelen; res->lockname.hash = full_name_hash(name, namelen); init_waitqueue_head(&res->wq); spin_lock_init(&res->spinlock); INIT_HLIST_NODE(&res->hash_node); INIT_LIST_HEAD(&res->granted); INIT_LIST_HEAD(&res->converting); INIT_LIST_HEAD(&res->blocked); INIT_LIST_HEAD(&res->dirty); INIT_LIST_HEAD(&res->recovering); INIT_LIST_HEAD(&res->purge); atomic_set(&res->asts_reserved, 0); res->migration_pending = 0; kref_init(&res->refs); /* just for consistency */ spin_lock(&res->spinlock); dlm_set_lockres_owner(dlm, res, DLM_LOCK_RES_OWNER_UNKNOWN); spin_unlock(&res->spinlock); res->state = DLM_LOCK_RES_IN_PROGRESS; res->last_used = 0; memset(res->lvb, 0, DLM_LVB_LEN);}struct dlm_lock_resource *dlm_new_lockres(struct dlm_ctxt *dlm, const char *name, unsigned int namelen){ struct dlm_lock_resource *res; res = kmalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL); if (!res) return NULL; res->lockname.name = kmalloc(namelen, GFP_KERNEL); if (!res->lockname.name) { kfree(res); return NULL; } dlm_init_lockres(dlm, res, name, namelen); return res;}/* * lookup a lock resource by name. * may already exist in the hashtable. * lockid is null terminated * * if not, allocate enough for the lockres and for * the temporary structure used in doing the mastering. * * also, do a lookup in the dlm->master_list to see * if another node has begun mastering the same lock. * if so, there should be a block entry in there * for this name, and we should *not* attempt to master * the lock here. need to wait around for that node * to assert_master (or die). * */struct dlm_lock_resource * dlm_get_lock_resource(struct dlm_ctxt *dlm, const char *lockid, int flags){ struct dlm_lock_resource *tmpres=NULL, *res=NULL; struct dlm_master_list_entry *mle = NULL; struct dlm_master_list_entry *alloc_mle = NULL; int blocked = 0; int ret, nodenum; struct dlm_node_iter iter; unsigned int namelen; int tries = 0; BUG_ON(!lockid); namelen = strlen(lockid); mlog(0, "get lockres %s (len %d)\n", lockid, namelen);lookup: spin_lock(&dlm->spinlock); tmpres = __dlm_lookup_lockres(dlm, lockid, namelen); if (tmpres) { spin_unlock(&dlm->spinlock); mlog(0, "found in hash!\n"); if (res) dlm_lockres_put(res); res = tmpres; goto leave; } if (!res) { spin_unlock(&dlm->spinlock); mlog(0, "allocating a new resource\n"); /* nothing found and we need to allocate one. */ alloc_mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache, GFP_KERNEL); if (!alloc_mle) goto leave; res = dlm_new_lockres(dlm, lockid, namelen); if (!res) goto leave; goto lookup; } mlog(0, "no lockres found, allocated our own: %p\n", res); if (flags & LKM_LOCAL) { /* caller knows it's safe to assume it's not mastered elsewhere * DONE! return right away */ spin_lock(&res->spinlock); dlm_change_lockres_owner(dlm, res, dlm->node_num); __dlm_insert_lockres(dlm, res); spin_unlock(&res->spinlock); spin_unlock(&dlm->spinlock); /* lockres still marked IN_PROGRESS */ goto wake_waiters; } /* check master list to see if another node has started mastering it */ spin_lock(&dlm->master_lock); /* if we found a block, wait for lock to be mastered by another node */ blocked = dlm_find_mle(dlm, &mle, (char *)lockid, namelen); if (blocked) { if (mle->type == DLM_MLE_MASTER) { mlog(ML_ERROR, "master entry for nonexistent lock!\n"); BUG(); } else if (mle->type == DLM_MLE_MIGRATION) { /* migration is in progress! */ /* the good news is that we now know the * "current" master (mle->master). */ spin_unlock(&dlm->master_lock); assert_spin_locked(&dlm->spinlock); /* set the lockres owner and hash it */ spin_lock(&res->spinlock); dlm_set_lockres_owner(dlm, res, mle->master); __dlm_insert_lockres(dlm, res); spin_unlock(&res->spinlock); spin_unlock(&dlm->spinlock); /* master is known, detach */ dlm_mle_detach_hb_events(dlm, mle); dlm_put_mle(mle); mle = NULL; goto wake_waiters; } } else { /* go ahead and try to master lock on this node */ mle = alloc_mle; /* make sure this does not get freed below */ alloc_mle = NULL; dlm_init_mle(mle, DLM_MLE_MASTER, dlm, res, NULL, 0); set_bit(dlm->node_num, mle->maybe_map); list_add(&mle->list, &dlm->master_list); } /* at this point there is either a DLM_MLE_BLOCK or a * DLM_MLE_MASTER on the master list, so it's safe to add the * lockres to the hashtable. anyone who finds the lock will * still have to wait on the IN_PROGRESS. */ /* finally add the lockres to its hash bucket */ __dlm_insert_lockres(dlm, res); /* get an extra ref on the mle in case this is a BLOCK * if so, the creator of the BLOCK may try to put the last * ref at this time in the assert master handler, so we * need an extra one to keep from a bad ptr deref. */ dlm_get_mle(mle); spin_unlock(&dlm->master_lock); spin_unlock(&dlm->spinlock); /* must wait for lock to be mastered elsewhere */ if (blocked) goto wait;redo_request: ret = -EINVAL; dlm_node_iter_init(mle->vote_map, &iter); while ((nodenum = dlm_node_iter_next(&iter)) >= 0) { ret = dlm_do_master_request(mle, nodenum); if (ret < 0) mlog_errno(ret); if (mle->master != O2NM_MAX_NODES) { /* found a master ! */ break; } }wait: /* keep going until the response map includes all nodes */ ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked); if (ret < 0) { mlog(0, "%s:%.*s: node map changed, redo the " "master request now, blocked=%d\n", dlm->name, res->lockname.len, res->lockname.name, blocked); if (++tries > 20) { mlog(ML_ERROR, "%s:%.*s: spinning on " "dlm_wait_for_lock_mastery, blocked=%d\n", dlm->name, res->lockname.len, res->lockname.name, blocked); dlm_print_one_lock_resource(res); /* dlm_print_one_mle(mle); */ tries = 0; } goto redo_request; } mlog(0, "lockres mastered by %u\n", res->owner); /* make sure we never continue without this */ BUG_ON(res->owner == O2NM_MAX_NODES); /* master is known, detach if not already detached */ dlm_mle_detach_hb_events(dlm, mle); dlm_put_mle(mle); /* put the extra ref */ dlm_put_mle(mle);wake_waiters: spin_lock(&res->spinlock); res->state &= ~DLM_LOCK_RES_IN_PROGRESS; spin_unlock(&res->spinlock); wake_up(&res->wq);leave: /* need to free the unused mle */ if (alloc_mle) kmem_cache_free(dlm_mle_cache, alloc_mle); return res;}#define DLM_MASTERY_TIMEOUT_MS 5000static int dlm_wait_for_lock_mastery(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, struct dlm_master_list_entry *mle, int *blocked){ u8 m; int ret, bit; int map_changed, voting_done; int assert, sleep;recheck: ret = 0; assert = 0; /* check if another node has already become the owner */ spin_lock(&res->spinlock); if (res->owner != DLM_LOCK_RES_OWNER_UNKNOWN) { spin_unlock(&res->spinlock); goto leave; } spin_unlock(&res->spinlock); spin_lock(&mle->spinlock); m = mle->master; map_changed = (memcmp(mle->vote_map, mle->node_map, sizeof(mle->vote_map)) != 0); voting_done = (memcmp(mle->vote_map, mle->response_map, sizeof(mle->vote_map)) == 0); /* restart if we hit any errors */ if (map_changed) { int b; mlog(0, "%s: %.*s: node map changed, restarting\n", dlm->name, res->lockname.len, res->lockname.name); ret = dlm_restart_lock_mastery(dlm, res, mle, *blocked); b = (mle->type == DLM_MLE_BLOCK); if ((*blocked && !b) || (!*blocked && b)) { mlog(0, "%s:%.*s: status change: old=%d new=%d\n", dlm->name, res->lockname.len, res->lockname.name, *blocked, b); *blocked = b; } spin_unlock(&mle->spinlock); if (ret < 0) { mlog_errno(ret); goto leave; } mlog(0, "%s:%.*s: restart lock mastery succeeded, " "rechecking now\n", dlm->name, res->lockname.len, res->lockname.name); goto recheck; } if (m != O2NM_MAX_NODES) { /* another node has done an assert! * all done! */ sleep = 0; } else { sleep = 1; /* have all nodes responded? */ if (voting_done && !*blocked) { bit = find_next_bit(mle->maybe_map, O2NM_MAX_NODES, 0); if (dlm->node_num <= bit) { /* my node number is lowest. * now tell other nodes that I am * mastering this. */ mle->master = dlm->node_num; assert = 1; sleep = 0; } /* if voting is done, but we have not received * an assert master yet, we must sleep */ } } spin_unlock(&mle->spinlock); /* sleep if we haven't finished voting yet */ if (sleep) { unsigned long timeo = msecs_to_jiffies(DLM_MASTERY_TIMEOUT_MS); /* if (atomic_read(&mle->mle_refs.refcount) < 2) mlog(ML_ERROR, "mle (%p) refs=%d, name=%.*s\n", mle, atomic_read(&mle->mle_refs.refcount), res->lockname.len, res->lockname.name); */ atomic_set(&mle->woken, 0); (void)wait_event_timeout(mle->wq, (atomic_read(&mle->woken) == 1), timeo); if (res->owner == O2NM_MAX_NODES) { mlog(0, "waiting again\n"); goto recheck; } mlog(0, "done waiting, master is %u\n", res->owner); ret = 0; goto leave; } ret = 0; /* done */ if (assert) { m = dlm->node_num; mlog(0, "about to master %.*s here, this=%u\n", res->lockname.len, res->lockname.name, m); ret = dlm_do_assert_master(dlm, res->lockname.name, res->lockname.len, mle->vote_map, 0); if (ret) { /* This is a failure in the network path, * not in the response to the assert_master * (any nonzero response is a BUG on this node). * Most likely a socket just got disconnected * due to node death. */ mlog_errno(ret); } /* no longer need to restart lock mastery. * all living nodes have been contacted. */ ret = 0; } /* set the lockres owner */ spin_lock(&res->spinlock); dlm_change_lockres_owner(dlm, res, m); spin_unlock(&res->spinlock);leave: return ret;}struct dlm_bitmap_diff_iter{
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -