📄 dlmunlock.c
字号:
mlog(0, "master was in-progress. retry\n"); ret = DLM_FORWARD; } else ret = status; lksb->status = status; } else { mlog_errno(tmpret); if (dlm_is_host_down(tmpret)) { /* NOTE: this seems strange, but it is what we want. * when the master goes down during a cancel or * unlock, the recovery code completes the operation * as if the master had not died, then passes the * updated state to the recovery master. this thread * just needs to finish out the operation and call * the unlockast. */ ret = DLM_NORMAL; } else { /* something bad. this will BUG in ocfs2 */ ret = dlm_err_to_dlm_status(tmpret); } lksb->status = ret; } return ret;}/* * locking: * caller needs: none * taken: takes and drops res->spinlock * held on exit: none * returns: DLM_NORMAL, DLM_BADARGS, DLM_IVLOCKID, * return value from dlmunlock_master */int dlm_unlock_lock_handler(struct o2net_msg *msg, u32 len, void *data){ struct dlm_ctxt *dlm = data; struct dlm_unlock_lock *unlock = (struct dlm_unlock_lock *)msg->buf; struct dlm_lock_resource *res = NULL; struct list_head *iter; struct dlm_lock *lock = NULL; enum dlm_status status = DLM_NORMAL; int found = 0, i; struct dlm_lockstatus *lksb = NULL; int ignore; u32 flags; struct list_head *queue; flags = be32_to_cpu(unlock->flags); if (flags & LKM_GET_LVB) { mlog(ML_ERROR, "bad args! GET_LVB specified on unlock!\n"); return DLM_BADARGS; } if ((flags & (LKM_PUT_LVB|LKM_CANCEL)) == (LKM_PUT_LVB|LKM_CANCEL)) { mlog(ML_ERROR, "bad args! cannot modify lvb on a CANCEL " "request!\n"); return DLM_BADARGS; } if (unlock->namelen > DLM_LOCKID_NAME_MAX) { mlog(ML_ERROR, "Invalid name length in unlock handler!\n"); return DLM_IVBUFLEN; } if (!dlm_grab(dlm)) return DLM_REJECTED; mlog_bug_on_msg(!dlm_domain_fully_joined(dlm), "Domain %s not fully joined!\n", dlm->name); mlog(0, "lvb: %s\n", flags & LKM_PUT_LVB ? "put lvb" : "none"); res = dlm_lookup_lockres(dlm, unlock->name, unlock->namelen); if (!res) { /* We assume here that a no lock resource simply means * it was migrated away and destroyed before the other * node could detect it. */ mlog(0, "returning DLM_FORWARD -- res no longer exists\n"); status = DLM_FORWARD; goto not_found; } queue=&res->granted; found = 0; spin_lock(&res->spinlock); if (res->state & DLM_LOCK_RES_RECOVERING) { spin_unlock(&res->spinlock); mlog(0, "returning DLM_RECOVERING\n"); status = DLM_RECOVERING; goto leave; } if (res->state & DLM_LOCK_RES_MIGRATING) { spin_unlock(&res->spinlock); mlog(0, "returning DLM_MIGRATING\n"); status = DLM_MIGRATING; goto leave; } if (res->owner != dlm->node_num) { spin_unlock(&res->spinlock); mlog(0, "returning DLM_FORWARD -- not master\n"); status = DLM_FORWARD; goto leave; } for (i=0; i<3; i++) { list_for_each(iter, queue) { lock = list_entry(iter, struct dlm_lock, list); if (lock->ml.cookie == unlock->cookie && lock->ml.node == unlock->node_idx) { dlm_lock_get(lock); found = 1; break; } } if (found) break; /* scan granted -> converting -> blocked queues */ queue++; } spin_unlock(&res->spinlock); if (!found) { status = DLM_IVLOCKID; goto not_found; } /* lock was found on queue */ lksb = lock->lksb; /* unlockast only called on originating node */ if (flags & LKM_PUT_LVB) { lksb->flags |= DLM_LKSB_PUT_LVB; memcpy(&lksb->lvb[0], &unlock->lvb[0], DLM_LVB_LEN); } /* if this is in-progress, propagate the DLM_FORWARD * all the way back out */ status = dlmunlock_master(dlm, res, lock, lksb, flags, &ignore); if (status == DLM_FORWARD) mlog(0, "lockres is in progress\n"); if (flags & LKM_PUT_LVB) lksb->flags &= ~DLM_LKSB_PUT_LVB; dlm_lockres_calc_usage(dlm, res); dlm_kick_thread(dlm, res);not_found: if (!found) mlog(ML_ERROR, "failed to find lock to unlock! " "cookie=%"MLFu64"\n", unlock->cookie); else { /* send the lksb->status back to the other node */ status = lksb->status; dlm_lock_put(lock); }leave: if (res) dlm_lockres_put(res); dlm_put(dlm); return status;}static enum dlm_status dlm_get_cancel_actions(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, struct dlm_lock *lock, struct dlm_lockstatus *lksb, int *actions){ enum dlm_status status; if (dlm_lock_on_list(&res->blocked, lock)) { /* cancel this outright */ lksb->status = DLM_NORMAL; status = DLM_NORMAL; *actions = (DLM_UNLOCK_CALL_AST | DLM_UNLOCK_REMOVE_LOCK); } else if (dlm_lock_on_list(&res->converting, lock)) { /* cancel the request, put back on granted */ lksb->status = DLM_NORMAL; status = DLM_NORMAL; *actions = (DLM_UNLOCK_CALL_AST | DLM_UNLOCK_REMOVE_LOCK | DLM_UNLOCK_REGRANT_LOCK | DLM_UNLOCK_CLEAR_CONVERT_TYPE); } else if (dlm_lock_on_list(&res->granted, lock)) { /* too late, already granted. DLM_CANCELGRANT */ lksb->status = DLM_CANCELGRANT; status = DLM_NORMAL; *actions = DLM_UNLOCK_CALL_AST; } else { mlog(ML_ERROR, "lock to cancel is not on any list!\n"); lksb->status = DLM_IVLOCKID; status = DLM_IVLOCKID; *actions = 0; } return status;}static enum dlm_status dlm_get_unlock_actions(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, struct dlm_lock *lock, struct dlm_lockstatus *lksb, int *actions){ enum dlm_status status; /* unlock request */ if (!dlm_lock_on_list(&res->granted, lock)) { lksb->status = DLM_DENIED; status = DLM_DENIED; dlm_error(status); *actions = 0; } else { /* unlock granted lock */ lksb->status = DLM_NORMAL; status = DLM_NORMAL; *actions = (DLM_UNLOCK_FREE_LOCK | DLM_UNLOCK_CALL_AST | DLM_UNLOCK_REMOVE_LOCK); } return status;}/* there seems to be no point in doing this async * since (even for the remote case) there is really * no work to queue up... so just do it and fire the * unlockast by hand when done... */enum dlm_status dlmunlock(struct dlm_ctxt *dlm, struct dlm_lockstatus *lksb, int flags, dlm_astunlockfunc_t *unlockast, void *data){ enum dlm_status status; struct dlm_lock_resource *res; struct dlm_lock *lock = NULL; int call_ast, is_master; mlog_entry_void(); if (!lksb) { dlm_error(DLM_BADARGS); return DLM_BADARGS; } if (flags & ~(LKM_CANCEL | LKM_VALBLK | LKM_INVVALBLK)) { dlm_error(DLM_BADPARAM); return DLM_BADPARAM; } if ((flags & (LKM_VALBLK | LKM_CANCEL)) == (LKM_VALBLK | LKM_CANCEL)) { mlog(0, "VALBLK given with CANCEL: ignoring VALBLK\n"); flags &= ~LKM_VALBLK; } if (!lksb->lockid || !lksb->lockid->lockres) { dlm_error(DLM_BADPARAM); return DLM_BADPARAM; } lock = lksb->lockid; BUG_ON(!lock); dlm_lock_get(lock); res = lock->lockres; BUG_ON(!res); dlm_lockres_get(res);retry: call_ast = 0; /* need to retry up here because owner may have changed */ mlog(0, "lock=%p res=%p\n", lock, res); spin_lock(&res->spinlock); is_master = (res->owner == dlm->node_num); spin_unlock(&res->spinlock); if (is_master) { status = dlmunlock_master(dlm, res, lock, lksb, flags, &call_ast); mlog(0, "done calling dlmunlock_master: returned %d, " "call_ast is %d\n", status, call_ast); } else { status = dlmunlock_remote(dlm, res, lock, lksb, flags, &call_ast); mlog(0, "done calling dlmunlock_remote: returned %d, " "call_ast is %d\n", status, call_ast); } if (status == DLM_RECOVERING || status == DLM_MIGRATING || status == DLM_FORWARD) { /* We want to go away for a tiny bit to allow recovery * / migration to complete on this resource. I don't * know of any wait queue we could sleep on as this * may be happening on another node. Perhaps the * proper solution is to queue up requests on the * other end? */ /* do we want to yield(); ?? */ msleep(50); mlog(0, "retrying unlock due to pending recovery/" "migration/in-progress\n"); goto retry; } if (call_ast) { mlog(0, "calling unlockast(%p, %d)\n", data, lksb->status); if (is_master) { /* it is possible that there is one last bast * pending. make sure it is flushed, then * call the unlockast. * not an issue if this is a mastered remotely, * since this lock has been removed from the * lockres queues and cannot be found. */ dlm_kick_thread(dlm, NULL); wait_event(dlm->ast_wq, dlm_lock_basts_flushed(dlm, lock)); } (*unlockast)(data, lksb->status); } if (status == DLM_NORMAL) { mlog(0, "kicking the thread\n"); dlm_kick_thread(dlm, res); } else dlm_error(status); dlm_lockres_calc_usage(dlm, res); dlm_lockres_put(res); dlm_lock_put(lock); mlog(0, "returning status=%d!\n", status); return status;}EXPORT_SYMBOL_GPL(dlmunlock);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -