📄 dlmmaster.c
字号:
nodenum, master); ret = -EAGAIN; } spin_unlock(&dlm->spinlock); mlog(0, "%s: reco lock master is %u\n", dlm->name, master); break; } } return ret;}/* * DLM_MIGRATE_LOCKRES */int dlm_migrate_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, u8 target){ struct dlm_master_list_entry *mle = NULL; struct dlm_master_list_entry *oldmle = NULL; struct dlm_migratable_lockres *mres = NULL; int ret = -EINVAL; const char *name; unsigned int namelen; int mle_added = 0; struct list_head *queue, *iter; int i; struct dlm_lock *lock; int empty = 1; if (!dlm_grab(dlm)) return -EINVAL; name = res->lockname.name; namelen = res->lockname.len; mlog(0, "migrating %.*s to %u\n", namelen, name, target); /* * ensure this lockres is a proper candidate for migration */ spin_lock(&res->spinlock); if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) { mlog(0, "cannot migrate lockres with unknown owner!\n"); spin_unlock(&res->spinlock); goto leave; } if (res->owner != dlm->node_num) { mlog(0, "cannot migrate lockres this node doesn't own!\n"); spin_unlock(&res->spinlock); goto leave; } mlog(0, "checking queues...\n"); queue = &res->granted; for (i=0; i<3; i++) { list_for_each(iter, queue) { lock = list_entry (iter, struct dlm_lock, list); empty = 0; if (lock->ml.node == dlm->node_num) { mlog(0, "found a lock owned by this node " "still on the %s queue! will not " "migrate this lockres\n", i==0 ? "granted" : (i==1 ? "converting" : "blocked")); spin_unlock(&res->spinlock); ret = -ENOTEMPTY; goto leave; } } queue++; } mlog(0, "all locks on this lockres are nonlocal. continuing\n"); spin_unlock(&res->spinlock); /* no work to do */ if (empty) { mlog(0, "no locks were found on this lockres! done!\n"); ret = 0; goto leave; } /* * preallocate up front * if this fails, abort */ ret = -ENOMEM; mres = (struct dlm_migratable_lockres *) __get_free_page(GFP_KERNEL); if (!mres) { mlog_errno(ret); goto leave; } mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache, GFP_KERNEL); if (!mle) { mlog_errno(ret); goto leave; } ret = 0; /* * find a node to migrate the lockres to */ mlog(0, "picking a migration node\n"); spin_lock(&dlm->spinlock); /* pick a new node */ if (!test_bit(target, dlm->domain_map) || target >= O2NM_MAX_NODES) { target = dlm_pick_migration_target(dlm, res); } mlog(0, "node %u chosen for migration\n", target); if (target >= O2NM_MAX_NODES || !test_bit(target, dlm->domain_map)) { /* target chosen is not alive */ ret = -EINVAL; } if (ret) { spin_unlock(&dlm->spinlock); goto fail; } mlog(0, "continuing with target = %u\n", target); /* * clear any existing master requests and * add the migration mle to the list */ spin_lock(&dlm->master_lock); ret = dlm_add_migration_mle(dlm, res, mle, &oldmle, name, namelen, target, dlm->node_num); spin_unlock(&dlm->master_lock); spin_unlock(&dlm->spinlock); if (ret == -EEXIST) { mlog(0, "another process is already migrating it\n"); goto fail; } mle_added = 1; /* * set the MIGRATING flag and flush asts * if we fail after this we need to re-dirty the lockres */ if (dlm_mark_lockres_migrating(dlm, res, target) < 0) { mlog(ML_ERROR, "tried to migrate %.*s to %u, but " "the target went down.\n", res->lockname.len, res->lockname.name, target); spin_lock(&res->spinlock); res->state &= ~DLM_LOCK_RES_MIGRATING; spin_unlock(&res->spinlock); ret = -EINVAL; }fail: if (oldmle) { /* master is known, detach if not already detached */ dlm_mle_detach_hb_events(dlm, oldmle); dlm_put_mle(oldmle); } if (ret < 0) { if (mle_added) { dlm_mle_detach_hb_events(dlm, mle); dlm_put_mle(mle); } else if (mle) { kmem_cache_free(dlm_mle_cache, mle); } goto leave; } /* * at this point, we have a migration target, an mle * in the master list, and the MIGRATING flag set on * the lockres */ /* get an extra reference on the mle. * otherwise the assert_master from the new * master will destroy this. * also, make sure that all callers of dlm_get_mle * take both dlm->spinlock and dlm->master_lock */ spin_lock(&dlm->spinlock); spin_lock(&dlm->master_lock); dlm_get_mle(mle); spin_unlock(&dlm->master_lock); spin_unlock(&dlm->spinlock); /* notify new node and send all lock state */ /* call send_one_lockres with migration flag. * this serves as notice to the target node that a * migration is starting. */ ret = dlm_send_one_lockres(dlm, res, mres, target, DLM_MRES_MIGRATION); if (ret < 0) { mlog(0, "migration to node %u failed with %d\n", target, ret); /* migration failed, detach and clean up mle */ dlm_mle_detach_hb_events(dlm, mle); dlm_put_mle(mle); dlm_put_mle(mle); goto leave; } /* at this point, the target sends a message to all nodes, * (using dlm_do_migrate_request). this node is skipped since * we had to put an mle in the list to begin the process. this * node now waits for target to do an assert master. this node * will be the last one notified, ensuring that the migration * is complete everywhere. if the target dies while this is * going on, some nodes could potentially see the target as the * master, so it is important that my recovery finds the migration * mle and sets the master to UNKNONWN. */ /* wait for new node to assert master */ while (1) { ret = wait_event_interruptible_timeout(mle->wq, (atomic_read(&mle->woken) == 1), msecs_to_jiffies(5000)); if (ret >= 0) { if (atomic_read(&mle->woken) == 1 || res->owner == target) break; mlog(0, "timed out during migration\n"); /* avoid hang during shutdown when migrating lockres * to a node which also goes down */ if (dlm_is_node_dead(dlm, target)) { mlog(0, "%s:%.*s: expected migration target %u " "is no longer up. restarting.\n", dlm->name, res->lockname.len, res->lockname.name, target); ret = -ERESTARTSYS; } } if (ret == -ERESTARTSYS) { /* migration failed, detach and clean up mle */ dlm_mle_detach_hb_events(dlm, mle); dlm_put_mle(mle); dlm_put_mle(mle); goto leave; } /* TODO: if node died: stop, clean up, return error */ } /* all done, set the owner, clear the flag */ spin_lock(&res->spinlock); dlm_set_lockres_owner(dlm, res, target); res->state &= ~DLM_LOCK_RES_MIGRATING; dlm_remove_nonlocal_locks(dlm, res); spin_unlock(&res->spinlock); wake_up(&res->wq); /* master is known, detach if not already detached */ dlm_mle_detach_hb_events(dlm, mle); dlm_put_mle(mle); ret = 0; dlm_lockres_calc_usage(dlm, res);leave: /* re-dirty the lockres if we failed */ if (ret < 0) dlm_kick_thread(dlm, res); /* TODO: cleanup */ if (mres) free_page((unsigned long)mres); dlm_put(dlm); mlog(0, "returning %d\n", ret); return ret;}EXPORT_SYMBOL_GPL(dlm_migrate_lockres);int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock){ int ret; spin_lock(&dlm->ast_lock); spin_lock(&lock->spinlock); ret = (list_empty(&lock->bast_list) && !lock->bast_pending); spin_unlock(&lock->spinlock); spin_unlock(&dlm->ast_lock); return ret;}static int dlm_migration_can_proceed(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, u8 mig_target){ int can_proceed; spin_lock(&res->spinlock); can_proceed = !!(res->state & DLM_LOCK_RES_MIGRATING); spin_unlock(&res->spinlock); /* target has died, so make the caller break out of the * wait_event, but caller must recheck the domain_map */ spin_lock(&dlm->spinlock); if (!test_bit(mig_target, dlm->domain_map)) can_proceed = 1; spin_unlock(&dlm->spinlock); return can_proceed;}int dlm_lockres_is_dirty(struct dlm_ctxt *dlm, struct dlm_lock_resource *res){ int ret; spin_lock(&res->spinlock); ret = !!(res->state & DLM_LOCK_RES_DIRTY); spin_unlock(&res->spinlock); return ret;}static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, u8 target){ int ret = 0; mlog(0, "dlm_mark_lockres_migrating: %.*s, from %u to %u\n", res->lockname.len, res->lockname.name, dlm->node_num, target); /* need to set MIGRATING flag on lockres. this is done by * ensuring that all asts have been flushed for this lockres. */ spin_lock(&res->spinlock); BUG_ON(res->migration_pending); res->migration_pending = 1; /* strategy is to reserve an extra ast then release * it below, letting the release do all of the work */ __dlm_lockres_reserve_ast(res); spin_unlock(&res->spinlock); /* now flush all the pending asts.. hang out for a bit */ dlm_kick_thread(dlm, res); wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res)); dlm_lockres_release_ast(dlm, res); mlog(0, "about to wait on migration_wq, dirty=%s\n", res->state & DLM_LOCK_RES_DIRTY ? "yes" : "no"); /* if the extra ref we just put was the final one, this * will pass thru immediately. otherwise, we need to wait * for the last ast to finish. */again: ret = wait_event_interruptible_timeout(dlm->migration_wq, dlm_migration_can_proceed(dlm, res, target), msecs_to_jiffies(1000)); if (ret < 0) { mlog(0, "woken again: migrating? %s, dead? %s\n", res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no", test_bit(target, dlm->domain_map) ? "no":"yes"); } else { mlog(0, "all is well: migrating? %s, dead? %s\n", res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no", test_bit(target, dlm->domain_map) ? "no":"yes"); } if (!dlm_migration_can_proceed(dlm, res, target)) { mlog(0, "trying again...\n"); goto again; } /* did the target go down or die? */ spin_lock(&dlm->spinlock); if (!test_bit(target, dlm->domain_map)) { mlog(ML_ERROR, "aha. migration target %u just went down\n", target); ret = -EHOSTDOWN; } spin_unlock(&dlm->spinlock); /* * at this point: * * o the DLM_LOCK_RES_MIGRATING flag is set * o there are no pending asts on this lockres * o all processes trying to reserve an ast on this * lockres must wait for the MIGRATING flag to clear */ return ret;}/* last step in the migration process. * original master calls this to free all of the dlm_lock * structures that used to be for other nodes. */static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm, struct dlm_lock_resource *res){ struct list_head *iter, *iter2; struct list_head *queue = &res->granted; int i; struct dlm_lock *lock; assert_spin_locked(&res->spinlock); BUG_ON(res->owner == dlm->node_num); for (i=0; i<3; i++) { list_for_each_safe(iter, iter2, queue) { lock = list_entry (iter, struct dlm_lock, list); if (lock->ml.node != dlm->node_num) { mlog(0, "putting lock for node %u\n", lock->ml.node); /* be extra careful */ BUG_ON(!list_empty(&lock->ast_list)); BUG_ON(!list_empty(&lock->bast_list)); BUG_ON(lock->ast_pending); BUG_ON(lock->bast_pending); list_del_init(&lock->list); dlm_lock_put(lock); } } queue++; }}/* for now this is not too intelligent. we will * need stats to make this do the right thing. * this just finds the first lock on one of the * queues and uses that node as the target. */static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm, struct dlm_lock_resource *res){ int i; struct list_head *queue = &res->granted; struct list_head *iter; struct dlm_lock *lock; int nodenum; assert_spin_locked(&dlm->spinlock); spin_lock(&res->spinlock); for (i=0; i<3; i++) { list_for_each(iter, queue) { /* up to the caller to make sure this node * is alive */ lock = list_entry (iter, struct dlm_lock, list); if (lock->ml.node != dlm->node_num) { spin_unlock(&res->spinlock); return lock->ml.node; } } queue++; } spin_unlock(&res->spinlock); mlog(0, "have not found a suitable target yet! checking domain map\n"); /* ok now we're getting desperate. pick anyone alive. */ nodenum = -1; while (1) { nodenum = find_next_bit(dlm->domain_map, O2NM_MAX_NODES, nodenum+1); mlog(0, "found %d in domain map\n", nodenum); if (nodenum >= O2NM_MAX_NODES) break; if (nodenum != dlm->node_num) { mlog(0, "picking %d\n", nodenum); return nodenum; } } mlog(0, "giving up. no master to migrate to\n"); return DLM_LOCK_RES_OWNER_UNKNOWN;}/* this is called by the new master once all lockres * data has been received */static int dlm_do_migrate_request(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -