📄 dlmmaster.c
字号:
* also, make sure that all callers of dlm_get_mle * take both dlm->spinlock and dlm->master_lock */ spin_lock(&dlm->spinlock); spin_lock(&dlm->master_lock); dlm_get_mle(mle); spin_unlock(&dlm->master_lock); spin_unlock(&dlm->spinlock); /* notify new node and send all lock state */ /* call send_one_lockres with migration flag. * this serves as notice to the target node that a * migration is starting. */ ret = dlm_send_one_lockres(dlm, res, mres, target, DLM_MRES_MIGRATION); if (ret < 0) { mlog(0, "migration to node %u failed with %d\n", target, ret); /* migration failed, detach and clean up mle */ dlm_mle_detach_hb_events(dlm, mle); dlm_put_mle(mle); dlm_put_mle(mle); goto leave; } /* at this point, the target sends a message to all nodes, * (using dlm_do_migrate_request). this node is skipped since * we had to put an mle in the list to begin the process. this * node now waits for target to do an assert master. this node * will be the last one notified, ensuring that the migration * is complete everywhere. if the target dies while this is * going on, some nodes could potentially see the target as the * master, so it is important that my recovery finds the migration * mle and sets the master to UNKNONWN. */ /* wait for new node to assert master */ while (1) { ret = wait_event_interruptible_timeout(mle->wq, (atomic_read(&mle->woken) == 1), msecs_to_jiffies(5000)); if (ret >= 0) { if (atomic_read(&mle->woken) == 1 || res->owner == target) break; mlog(0, "timed out during migration\n"); /* avoid hang during shutdown when migrating lockres * to a node which also goes down */ if (dlm_is_node_dead(dlm, target)) { mlog(0, "%s:%.*s: expected migration target %u " "is no longer up. restarting.\n", dlm->name, res->lockname.len, res->lockname.name, target); ret = -ERESTARTSYS; } } if (ret == -ERESTARTSYS) { /* migration failed, detach and clean up mle */ dlm_mle_detach_hb_events(dlm, mle); dlm_put_mle(mle); dlm_put_mle(mle); goto leave; } /* TODO: if node died: stop, clean up, return error */ } /* all done, set the owner, clear the flag */ spin_lock(&res->spinlock); dlm_set_lockres_owner(dlm, res, target); res->state &= ~DLM_LOCK_RES_MIGRATING; dlm_remove_nonlocal_locks(dlm, res); spin_unlock(&res->spinlock); wake_up(&res->wq); /* master is known, detach if not already detached */ dlm_mle_detach_hb_events(dlm, mle); dlm_put_mle(mle); ret = 0; dlm_lockres_calc_usage(dlm, res);leave: /* re-dirty the lockres if we failed */ if (ret < 0) dlm_kick_thread(dlm, res); /* TODO: cleanup */ if (mres) free_page((unsigned long)mres); dlm_put(dlm); mlog(0, "returning %d\n", ret); return ret;}EXPORT_SYMBOL_GPL(dlm_migrate_lockres);int dlm_lock_basts_flushed(struct dlm_ctxt *dlm, struct dlm_lock *lock){ int ret; spin_lock(&dlm->ast_lock); spin_lock(&lock->spinlock); ret = (list_empty(&lock->bast_list) && !lock->bast_pending); spin_unlock(&lock->spinlock); spin_unlock(&dlm->ast_lock); return ret;}static int dlm_migration_can_proceed(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, u8 mig_target){ int can_proceed; spin_lock(&res->spinlock); can_proceed = !!(res->state & DLM_LOCK_RES_MIGRATING); spin_unlock(&res->spinlock); /* target has died, so make the caller break out of the * wait_event, but caller must recheck the domain_map */ spin_lock(&dlm->spinlock); if (!test_bit(mig_target, dlm->domain_map)) can_proceed = 1; spin_unlock(&dlm->spinlock); return can_proceed;}int dlm_lockres_is_dirty(struct dlm_ctxt *dlm, struct dlm_lock_resource *res){ int ret; spin_lock(&res->spinlock); ret = !!(res->state & DLM_LOCK_RES_DIRTY); spin_unlock(&res->spinlock); return ret;}static int dlm_mark_lockres_migrating(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, u8 target){ int ret = 0; mlog(0, "dlm_mark_lockres_migrating: %.*s, from %u to %u\n", res->lockname.len, res->lockname.name, dlm->node_num, target); /* need to set MIGRATING flag on lockres. this is done by * ensuring that all asts have been flushed for this lockres. */ spin_lock(&res->spinlock); BUG_ON(res->migration_pending); res->migration_pending = 1; /* strategy is to reserve an extra ast then release * it below, letting the release do all of the work */ __dlm_lockres_reserve_ast(res); spin_unlock(&res->spinlock); /* now flush all the pending asts.. hang out for a bit */ dlm_kick_thread(dlm, res); wait_event(dlm->ast_wq, !dlm_lockres_is_dirty(dlm, res)); dlm_lockres_release_ast(dlm, res); mlog(0, "about to wait on migration_wq, dirty=%s\n", res->state & DLM_LOCK_RES_DIRTY ? "yes" : "no"); /* if the extra ref we just put was the final one, this * will pass thru immediately. otherwise, we need to wait * for the last ast to finish. */again: ret = wait_event_interruptible_timeout(dlm->migration_wq, dlm_migration_can_proceed(dlm, res, target), msecs_to_jiffies(1000)); if (ret < 0) { mlog(0, "woken again: migrating? %s, dead? %s\n", res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no", test_bit(target, dlm->domain_map) ? "no":"yes"); } else { mlog(0, "all is well: migrating? %s, dead? %s\n", res->state & DLM_LOCK_RES_MIGRATING ? "yes":"no", test_bit(target, dlm->domain_map) ? "no":"yes"); } if (!dlm_migration_can_proceed(dlm, res, target)) { mlog(0, "trying again...\n"); goto again; } /* did the target go down or die? */ spin_lock(&dlm->spinlock); if (!test_bit(target, dlm->domain_map)) { mlog(ML_ERROR, "aha. migration target %u just went down\n", target); ret = -EHOSTDOWN; } spin_unlock(&dlm->spinlock); /* * at this point: * * o the DLM_LOCK_RES_MIGRATING flag is set * o there are no pending asts on this lockres * o all processes trying to reserve an ast on this * lockres must wait for the MIGRATING flag to clear */ return ret;}/* last step in the migration process. * original master calls this to free all of the dlm_lock * structures that used to be for other nodes. */static void dlm_remove_nonlocal_locks(struct dlm_ctxt *dlm, struct dlm_lock_resource *res){ struct list_head *iter, *iter2; struct list_head *queue = &res->granted; int i; struct dlm_lock *lock; assert_spin_locked(&res->spinlock); BUG_ON(res->owner == dlm->node_num); for (i=0; i<3; i++) { list_for_each_safe(iter, iter2, queue) { lock = list_entry (iter, struct dlm_lock, list); if (lock->ml.node != dlm->node_num) { mlog(0, "putting lock for node %u\n", lock->ml.node); /* be extra careful */ BUG_ON(!list_empty(&lock->ast_list)); BUG_ON(!list_empty(&lock->bast_list)); BUG_ON(lock->ast_pending); BUG_ON(lock->bast_pending); list_del_init(&lock->list); dlm_lock_put(lock); } } queue++; }}/* for now this is not too intelligent. we will * need stats to make this do the right thing. * this just finds the first lock on one of the * queues and uses that node as the target. */static u8 dlm_pick_migration_target(struct dlm_ctxt *dlm, struct dlm_lock_resource *res){ int i; struct list_head *queue = &res->granted; struct list_head *iter; struct dlm_lock *lock; int nodenum; assert_spin_locked(&dlm->spinlock); spin_lock(&res->spinlock); for (i=0; i<3; i++) { list_for_each(iter, queue) { /* up to the caller to make sure this node * is alive */ lock = list_entry (iter, struct dlm_lock, list); if (lock->ml.node != dlm->node_num) { spin_unlock(&res->spinlock); return lock->ml.node; } } queue++; } spin_unlock(&res->spinlock); mlog(0, "have not found a suitable target yet! checking domain map\n"); /* ok now we're getting desperate. pick anyone alive. */ nodenum = -1; while (1) { nodenum = find_next_bit(dlm->domain_map, O2NM_MAX_NODES, nodenum+1); mlog(0, "found %d in domain map\n", nodenum); if (nodenum >= O2NM_MAX_NODES) break; if (nodenum != dlm->node_num) { mlog(0, "picking %d\n", nodenum); return nodenum; } } mlog(0, "giving up. no master to migrate to\n"); return DLM_LOCK_RES_OWNER_UNKNOWN;}/* this is called by the new master once all lockres * data has been received */static int dlm_do_migrate_request(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, u8 master, u8 new_master, struct dlm_node_iter *iter){ struct dlm_migrate_request migrate; int ret, status = 0; int nodenum; memset(&migrate, 0, sizeof(migrate)); migrate.namelen = res->lockname.len; memcpy(migrate.name, res->lockname.name, migrate.namelen); migrate.new_master = new_master; migrate.master = master; ret = 0; /* send message to all nodes, except the master and myself */ while ((nodenum = dlm_node_iter_next(iter)) >= 0) { if (nodenum == master || nodenum == new_master) continue; ret = o2net_send_message(DLM_MIGRATE_REQUEST_MSG, dlm->key, &migrate, sizeof(migrate), nodenum, &status); if (ret < 0) mlog_errno(ret); else if (status < 0) { mlog(0, "migrate request (node %u) returned %d!\n", nodenum, status); ret = status; } } if (ret < 0) mlog_errno(ret); mlog(0, "returning ret=%d\n", ret); return ret;}/* if there is an existing mle for this lockres, we now know who the master is. * (the one who sent us *this* message) we can clear it up right away. * since the process that put the mle on the list still has a reference to it, * we can unhash it now, set the master and wake the process. as a result, * we will have no mle in the list to start with. now we can add an mle for * the migration and this should be the only one found for those scanning the * list. */int dlm_migrate_request_handler(struct o2net_msg *msg, u32 len, void *data){ struct dlm_ctxt *dlm = data; struct dlm_lock_resource *res = NULL; struct dlm_migrate_request *migrate = (struct dlm_migrate_request *) msg->buf; struct dlm_master_list_entry *mle = NULL, *oldmle = NULL; const char *name; unsigned int namelen; int ret = 0; if (!dlm_grab(dlm)) return -EINVAL; name = migrate->name; namelen = migrate->namelen; /* preallocate.. if this fails, abort */ mle = (struct dlm_master_list_entry *) kmem_cache_alloc(dlm_mle_cache, GFP_KERNEL); if (!mle) { ret = -ENOMEM; goto leave; } /* check for pre-existing lock */ spin_lock(&dlm->spinlock); res = __dlm_lookup_lockres(dlm, name, namelen); spin_lock(&dlm->master_lock); if (res) { spin_lock(&res->spinlock); if (res->state & DLM_LOCK_RES_RECOVERING) { /* if all is working ok, this can only mean that we got * a migrate request from a node that we now see as * dead. what can we do here? drop it to the floor? */ spin_unlock(&res->spinlock); mlog(ML_ERROR, "Got a migrate request, but the " "lockres is marked as recovering!"); kmem_cache_free(dlm_mle_cache, mle); ret = -EINVAL; /* need a better solution */ goto unlock; } res->state |= DLM_LOCK_RES_MIGRATING; spin_unlock(&res->spinlock); } /* ignore status. only nonzero status would BUG. */ ret = dlm_add_migration_mle(dlm, res, mle, &oldmle, name, namelen, migrate->new_master, migrate->master);unlock: spin_unlock(&dlm->master_lock); spin_unlock(&dlm->spinlock); if (oldmle) { /* master is known, detach if not already detached */ dlm_mle_detach_hb_events(dlm, oldmle); dlm_put_mle(oldmle); } if (res) dlm_lockres_put(res);leave: dlm_put(dlm); return ret;}/* must be holding dlm->spinlock and dlm->master_lock * when adding a migration mle, we can clear any other mles * in the master list because we know with certainty that * the master is "master". so we remove any old mle from * the list after setting it's master field, and then add * the new migration mle. this way we can hold with the rule * of having only one mle for a given lock name at all times. */static int dlm_add_migration_mle(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, struct dlm_master_list_entry *mle, struct dlm_master_list_entry **oldmle, const char *name, unsigned int namelen, u8 new_master, u8 master){ int found; int ret = 0; *oldmle = NULL; mlog_entry_void(); assert_spin_locked(&dlm->spinlock); assert_spin_locked(&dlm->master_lock); /* caller is responsible for any ref taken here on oldmle */ found = dlm_find_mle(dlm, oldmle, (char *)name, namelen); if (found) { struct dlm_master_list_entry *tmp = *oldmle; spin_lock(&tmp->spinlock); if (tmp->type == DLM_MLE_MIGRATION) { if (master == dlm->node_num) { /* ah another process raced me to it */ mlog(0, "tried to migrate %.*s, but some " "process beat me to it\n", namelen, name); ret = -EEXIST; } else { /* bad. 2 NODES are trying to migrate! */ mlog(ML_ERROR, "migration error mle: " "master=%u new_master=%u // request: " "master=%u new_master=%u // " "lockres=%.*s\n", tmp->master, tmp->new_master, master, new_master, namelen, name); BUG(); } } else { /* this is essentially what assert_master does */ tmp->master = master; atomic_set(&tmp->woken, 1); wake_up(&tmp->wq); /* remove it from the list so that only one * mle will be found */ list_del_init(&tmp->list); } spin_unlock(&tmp->spinlock); } /* now add a migration mle to the tail of the list */ dlm_init_mle(mle, DLM_MLE_MIGRATION, dlm, res, name, namelen); mle->new_master = new_master; mle->master = master; /* do this for consistency with other mle types */ set_bit(new_master, mle->maybe_map); list_add(&mle->list, &dlm->master_list); return ret;}void dlm_clean_master_list(struct dlm_ctxt *dlm, u8 dead_node){ struct list_head *iter,
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -