📄 dlmrecovery.c
字号:
struct dlm_lock *lock; spin_lock(&dlm->spinlock); list_for_each_safe(iter, iter2, &dlm->reco.resources) { res = list_entry (iter, struct dlm_lock_resource, recovering); /* always prune any $RECOVERY entries for dead nodes, * otherwise hangs can occur during later recovery */ if (dlm_is_recovery_lock(res->lockname.name, res->lockname.len)) { spin_lock(&res->spinlock); list_for_each_entry(lock, &res->granted, list) { if (lock->ml.node == dead_node) { mlog(0, "AHA! there was " "a $RECOVERY lock for dead " "node %u (%s)!\n", dead_node, dlm->name); list_del_init(&lock->list); dlm_lock_put(lock); break; } } spin_unlock(&res->spinlock); continue; } if (res->owner == dead_node) { mlog(0, "found lockres owned by dead node while " "doing recovery for node %u. sending it.\n", dead_node); list_del_init(&res->recovering); list_add_tail(&res->recovering, list); } else if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) { mlog(0, "found UNKNOWN owner while doing recovery " "for node %u. sending it.\n", dead_node); list_del_init(&res->recovering); list_add_tail(&res->recovering, list); } } spin_unlock(&dlm->spinlock);}static inline int dlm_num_locks_in_lockres(struct dlm_lock_resource *res){ int total_locks = 0; struct list_head *iter, *queue = &res->granted; int i; for (i=0; i<3; i++) { list_for_each(iter, queue) total_locks++; queue++; } return total_locks;}static int dlm_send_mig_lockres_msg(struct dlm_ctxt *dlm, struct dlm_migratable_lockres *mres, u8 send_to, struct dlm_lock_resource *res, int total_locks){ u64 mig_cookie = be64_to_cpu(mres->mig_cookie); int mres_total_locks = be32_to_cpu(mres->total_locks); int sz, ret = 0, status = 0; u8 orig_flags = mres->flags, orig_master = mres->master; BUG_ON(mres->num_locks > DLM_MAX_MIGRATABLE_LOCKS); if (!mres->num_locks) return 0; sz = sizeof(struct dlm_migratable_lockres) + (mres->num_locks * sizeof(struct dlm_migratable_lock)); /* add an all-done flag if we reached the last lock */ orig_flags = mres->flags; BUG_ON(total_locks > mres_total_locks); if (total_locks == mres_total_locks) mres->flags |= DLM_MRES_ALL_DONE; /* send it */ ret = o2net_send_message(DLM_MIG_LOCKRES_MSG, dlm->key, mres, sz, send_to, &status); if (ret < 0) { /* XXX: negative status is not handled. * this will end up killing this node. */ mlog_errno(ret); } else { /* might get an -ENOMEM back here */ ret = status; if (ret < 0) { mlog_errno(ret); if (ret == -EFAULT) { mlog(ML_ERROR, "node %u told me to kill " "myself!\n", send_to); BUG(); } } } /* zero and reinit the message buffer */ dlm_init_migratable_lockres(mres, res->lockname.name, res->lockname.len, mres_total_locks, mig_cookie, orig_flags, orig_master); return ret;}static void dlm_init_migratable_lockres(struct dlm_migratable_lockres *mres, const char *lockname, int namelen, int total_locks, u64 cookie, u8 flags, u8 master){ /* mres here is one full page */ memset(mres, 0, PAGE_SIZE); mres->lockname_len = namelen; memcpy(mres->lockname, lockname, namelen); mres->num_locks = 0; mres->total_locks = cpu_to_be32(total_locks); mres->mig_cookie = cpu_to_be64(cookie); mres->flags = flags; mres->master = master;}/* returns 1 if this lock fills the network structure, * 0 otherwise */static int dlm_add_lock_to_array(struct dlm_lock *lock, struct dlm_migratable_lockres *mres, int queue){ struct dlm_migratable_lock *ml; int lock_num = mres->num_locks; ml = &(mres->ml[lock_num]); ml->cookie = lock->ml.cookie; ml->type = lock->ml.type; ml->convert_type = lock->ml.convert_type; ml->highest_blocked = lock->ml.highest_blocked; ml->list = queue; if (lock->lksb) { ml->flags = lock->lksb->flags; /* send our current lvb */ if (ml->type == LKM_EXMODE || ml->type == LKM_PRMODE) { /* if it is already set, this had better be a PR * and it has to match */ if (mres->lvb[0] && (ml->type == LKM_EXMODE || memcmp(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN))) { mlog(ML_ERROR, "mismatched lvbs!\n"); __dlm_print_one_lock_resource(lock->lockres); BUG(); } memcpy(mres->lvb, lock->lksb->lvb, DLM_LVB_LEN); } } ml->node = lock->ml.node; mres->num_locks++; /* we reached the max, send this network message */ if (mres->num_locks == DLM_MAX_MIGRATABLE_LOCKS) return 1; return 0;}int dlm_send_one_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, struct dlm_migratable_lockres *mres, u8 send_to, u8 flags){ struct list_head *queue, *iter; int total_locks, i; u64 mig_cookie = 0; struct dlm_lock *lock; int ret = 0; BUG_ON(!(flags & (DLM_MRES_RECOVERY|DLM_MRES_MIGRATION))); mlog(0, "sending to %u\n", send_to); total_locks = dlm_num_locks_in_lockres(res); if (total_locks > DLM_MAX_MIGRATABLE_LOCKS) { /* rare, but possible */ mlog(0, "argh. lockres has %d locks. this will " "require more than one network packet to " "migrate\n", total_locks); mig_cookie = dlm_get_next_mig_cookie(); } dlm_init_migratable_lockres(mres, res->lockname.name, res->lockname.len, total_locks, mig_cookie, flags, res->owner); total_locks = 0; for (i=DLM_GRANTED_LIST; i<=DLM_BLOCKED_LIST; i++) { queue = dlm_list_idx_to_ptr(res, i); list_for_each(iter, queue) { lock = list_entry (iter, struct dlm_lock, list); /* add another lock. */ total_locks++; if (!dlm_add_lock_to_array(lock, mres, i)) continue; /* this filled the lock message, * we must send it immediately. */ ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, res, total_locks); if (ret < 0) { // TODO mlog(ML_ERROR, "dlm_send_mig_lockres_msg " "returned %d, TODO\n", ret); BUG(); } } } /* flush any remaining locks */ ret = dlm_send_mig_lockres_msg(dlm, mres, send_to, res, total_locks); if (ret < 0) { // TODO mlog(ML_ERROR, "dlm_send_mig_lockres_msg returned %d, " "TODO\n", ret); BUG(); } return ret;}/* * this message will contain no more than one page worth of * recovery data, and it will work on only one lockres. * there may be many locks in this page, and we may need to wait * for additional packets to complete all the locks (rare, but * possible). *//* * NOTE: the allocation error cases here are scary * we really cannot afford to fail an alloc in recovery * do we spin? returning an error only delays the problem really */int dlm_mig_lockres_handler(struct o2net_msg *msg, u32 len, void *data){ struct dlm_ctxt *dlm = data; struct dlm_migratable_lockres *mres = (struct dlm_migratable_lockres *)msg->buf; int ret = 0; u8 real_master; char *buf = NULL; struct dlm_work_item *item = NULL; struct dlm_lock_resource *res = NULL; if (!dlm_grab(dlm)) return -EINVAL; BUG_ON(!(mres->flags & (DLM_MRES_RECOVERY|DLM_MRES_MIGRATION))); real_master = mres->master; if (real_master == DLM_LOCK_RES_OWNER_UNKNOWN) { /* cannot migrate a lockres with no master */ BUG_ON(!(mres->flags & DLM_MRES_RECOVERY)); } mlog(0, "%s message received from node %u\n", (mres->flags & DLM_MRES_RECOVERY) ? "recovery" : "migration", mres->master); if (mres->flags & DLM_MRES_ALL_DONE) mlog(0, "all done flag. all lockres data received!\n"); ret = -ENOMEM; buf = kmalloc(be16_to_cpu(msg->data_len), GFP_KERNEL); item = kcalloc(1, sizeof(*item), GFP_KERNEL); if (!buf || !item) goto leave; /* lookup the lock to see if we have a secondary queue for this * already... just add the locks in and this will have its owner * and RECOVERY flag changed when it completes. */ res = dlm_lookup_lockres(dlm, mres->lockname, mres->lockname_len); if (res) { /* this will get a ref on res */ /* mark it as recovering/migrating and hash it */ spin_lock(&res->spinlock); if (mres->flags & DLM_MRES_RECOVERY) { res->state |= DLM_LOCK_RES_RECOVERING; } else { if (res->state & DLM_LOCK_RES_MIGRATING) { /* this is at least the second * lockres message */ mlog(0, "lock %.*s is already migrating\n", mres->lockname_len, mres->lockname); } else if (res->state & DLM_LOCK_RES_RECOVERING) { /* caller should BUG */ mlog(ML_ERROR, "node is attempting to migrate " "lock %.*s, but marked as recovering!\n", mres->lockname_len, mres->lockname); ret = -EFAULT; spin_unlock(&res->spinlock); goto leave; } res->state |= DLM_LOCK_RES_MIGRATING; } spin_unlock(&res->spinlock); } else { /* need to allocate, just like if it was * mastered here normally */ res = dlm_new_lockres(dlm, mres->lockname, mres->lockname_len); if (!res) goto leave; /* to match the ref that we would have gotten if * dlm_lookup_lockres had succeeded */ dlm_lockres_get(res); /* mark it as recovering/migrating and hash it */ if (mres->flags & DLM_MRES_RECOVERY) res->state |= DLM_LOCK_RES_RECOVERING; else res->state |= DLM_LOCK_RES_MIGRATING; spin_lock(&dlm->spinlock); __dlm_insert_lockres(dlm, res); spin_unlock(&dlm->spinlock); /* now that the new lockres is inserted, * make it usable by other processes */ spin_lock(&res->spinlock); res->state &= ~DLM_LOCK_RES_IN_PROGRESS; spin_unlock(&res->spinlock); /* add an extra ref for just-allocated lockres * otherwise the lockres will be purged immediately */ dlm_lockres_get(res); } /* at this point we have allocated everything we need, * and we have a hashed lockres with an extra ref and * the proper res->state flags. */ ret = 0; if (mres->master == DLM_LOCK_RES_OWNER_UNKNOWN) { /* migration cannot have an unknown master */ BUG_ON(!(mres->flags & DLM_MRES_RECOVERY)); mlog(0, "recovery has passed me a lockres with an " "unknown owner.. will need to requery: " "%.*s\n", mres->lockname_len, mres->lockname); } else { spin_lock(&res->spinlock); dlm_change_lockres_owner(dlm, res, dlm->node_num); spin_unlock(&res->spinlock); } /* queue up work for dlm_mig_lockres_worker */ dlm_grab(dlm); /* get an extra ref for the work item */ memcpy(buf, msg->buf, be16_to_cpu(msg->data_len)); /* copy the whole message */ dlm_init_work_item(dlm, item, dlm_mig_lockres_worker, buf); item->u.ml.lockres = res; /* already have a ref */ item->u.ml.real_master = real_master; spin_lock(&dlm->work_lock); list_add_tail(&item->list, &dlm->work_list); spin_unlock(&dlm->work_lock); schedule_work(&dlm->dispatched_work);leave: dlm_put(dlm); if (ret < 0) { if (buf) kfree(buf); if (item) kfree(item); } mlog_exit(ret); return ret;}static void dlm_mig_lockres_worker(struct dlm_work_item *item, void *data){ struct dlm_ctxt *dlm = data; struct dlm_migratable_lockres *mres; int ret = 0; struct dlm_lock_resource *res; u8 real_master; dlm = item->dlm; mres = (struct dlm_migratable_lockres *)data; res = item->u.ml.lockres; real_master = item->u.ml.real_master; if (real_master == DLM_LOCK_RES_OWNER_UNKNOWN) { /* this case is super-rare. only occurs if * node death happens during migration. */again: ret = dlm_lockres_master_requery(dlm, res, &real_master); if (ret < 0) { mlog(0, "dlm_lockres_master_requery ret=%d\n", ret); goto again; } if (real_master == DLM_LOCK_RES_OWNER_UNKNOWN) { mlog(0, "lockres %.*s not claimed. " "this node will take it.\n", res->lockname.len, res->lockname.name); } else { mlog(0, "master needs to respond to sender " "that node %u still owns %.*s\n", real_master, res->lockname.len, res->lockname.name); /* cannot touch this lockres */ goto leave; } } ret = dlm_process_recovery_data(dlm, res, mres); if (ret < 0) mlog(0, "dlm_process_recovery_data returned %d\n", ret); else mlog(0, "dlm_process_recovery_data succeeded\n"); if ((mres->flags & (DLM_MRES_MIGRATION|DLM_MRES_ALL_DONE)) == (DLM_MRES_MIGRATION|DLM_MRES_ALL_DONE)) { ret = dlm_finish_migration(dlm, res, mres->master); if (ret < 0) mlog_errno(ret); }leave: kfree(data); mlog_exit(ret);}static int dlm_lockres_master_requery(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, u8 *real_master){ struct dlm_node_iter iter;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -