📄 lock.c
字号:
static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb, struct dlm_args *args){ int rv = -EINVAL; if (args->flags & DLM_LKF_CONVERT) { if (lkb->lkb_flags & DLM_IFL_MSTCPY) goto out; if (args->flags & DLM_LKF_QUECVT && !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1]) goto out; rv = -EBUSY; if (lkb->lkb_status != DLM_LKSTS_GRANTED) goto out; if (lkb->lkb_wait_type) goto out; if (is_overlap(lkb)) goto out; } lkb->lkb_exflags = args->flags; lkb->lkb_sbflags = 0; lkb->lkb_astaddr = args->astaddr; lkb->lkb_astparam = args->astparam; lkb->lkb_bastaddr = args->bastaddr; lkb->lkb_rqmode = args->mode; lkb->lkb_lksb = args->lksb; lkb->lkb_lvbptr = args->lksb->sb_lvbptr; lkb->lkb_ownpid = (int) current->pid; lkb->lkb_timeout_cs = args->timeout; rv = 0; out: return rv;}/* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0 for success *//* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here because there may be a lookup in progress and it's valid to do cancel/unlockf on it */static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args){ struct dlm_ls *ls = lkb->lkb_resource->res_ls; int rv = -EINVAL; if (lkb->lkb_flags & DLM_IFL_MSTCPY) { log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id); dlm_print_lkb(lkb); goto out; } /* an lkb may still exist even though the lock is EOL'ed due to a cancel, unlock or failed noqueue request; an app can't use these locks; return same error as if the lkid had not been found at all */ if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) { log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id); rv = -ENOENT; goto out; } /* an lkb may be waiting for an rsb lookup to complete where the lookup was initiated by another lock */ if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) { if (!list_empty(&lkb->lkb_rsb_lookup)) { log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id); list_del_init(&lkb->lkb_rsb_lookup); queue_cast(lkb->lkb_resource, lkb, args->flags & DLM_LKF_CANCEL ? -DLM_ECANCEL : -DLM_EUNLOCK); unhold_lkb(lkb); /* undoes create_lkb() */ rv = -EBUSY; goto out; } } /* cancel not allowed with another cancel/unlock in progress */ if (args->flags & DLM_LKF_CANCEL) { if (lkb->lkb_exflags & DLM_LKF_CANCEL) goto out; if (is_overlap(lkb)) goto out; /* don't let scand try to do a cancel */ del_timeout(lkb); if (lkb->lkb_flags & DLM_IFL_RESEND) { lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL; rv = -EBUSY; goto out; } switch (lkb->lkb_wait_type) { case DLM_MSG_LOOKUP: case DLM_MSG_REQUEST: lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL; rv = -EBUSY; goto out; case DLM_MSG_UNLOCK: case DLM_MSG_CANCEL: goto out; } /* add_to_waiters() will set OVERLAP_CANCEL */ goto out_ok; } /* do we need to allow a force-unlock if there's a normal unlock already in progress? in what conditions could the normal unlock fail such that we'd want to send a force-unlock to be sure? */ if (args->flags & DLM_LKF_FORCEUNLOCK) { if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK) goto out; if (is_overlap_unlock(lkb)) goto out; /* don't let scand try to do a cancel */ del_timeout(lkb); if (lkb->lkb_flags & DLM_IFL_RESEND) { lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK; rv = -EBUSY; goto out; } switch (lkb->lkb_wait_type) { case DLM_MSG_LOOKUP: case DLM_MSG_REQUEST: lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK; rv = -EBUSY; goto out; case DLM_MSG_UNLOCK: goto out; } /* add_to_waiters() will set OVERLAP_UNLOCK */ goto out_ok; } /* normal unlock not allowed if there's any op in progress */ rv = -EBUSY; if (lkb->lkb_wait_type || lkb->lkb_wait_count) goto out; out_ok: /* an overlapping op shouldn't blow away exflags from other op */ lkb->lkb_exflags |= args->flags; lkb->lkb_sbflags = 0; lkb->lkb_astparam = args->astparam; rv = 0; out: if (rv) log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv, lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags, args->flags, lkb->lkb_wait_type, lkb->lkb_resource->res_name); return rv;}/* * Four stage 4 varieties: * do_request(), do_convert(), do_unlock(), do_cancel() * These are called on the master node for the given lock and * from the central locking logic. */static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb){ int error = 0; if (can_be_granted(r, lkb, 1, NULL)) { grant_lock(r, lkb); queue_cast(r, lkb, 0); goto out; } if (can_be_queued(lkb)) { error = -EINPROGRESS; add_lkb(r, lkb, DLM_LKSTS_WAITING); send_blocking_asts(r, lkb); add_timeout(lkb); goto out; } error = -EAGAIN; if (force_blocking_asts(lkb)) send_blocking_asts_all(r, lkb); queue_cast(r, lkb, -EAGAIN); out: return error;}static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb){ int error = 0; int deadlk = 0; /* changing an existing lock may allow others to be granted */ if (can_be_granted(r, lkb, 1, &deadlk)) { grant_lock(r, lkb); queue_cast(r, lkb, 0); grant_pending_locks(r); goto out; } /* can_be_granted() detected that this lock would block in a conversion deadlock, so we leave it on the granted queue and return EDEADLK in the ast for the convert. */ if (deadlk) { /* it's left on the granted queue */ log_debug(r->res_ls, "deadlock %x node %d sts%d g%d r%d %s", lkb->lkb_id, lkb->lkb_nodeid, lkb->lkb_status, lkb->lkb_grmode, lkb->lkb_rqmode, r->res_name); revert_lock(r, lkb); queue_cast(r, lkb, -EDEADLK); error = -EDEADLK; goto out; } /* is_demoted() means the can_be_granted() above set the grmode to NL, and left us on the granted queue. This auto-demotion (due to CONVDEADLK) might mean other locks, and/or this lock, are now grantable. We have to try to grant other converting locks before we try again to grant this one. */ if (is_demoted(lkb)) { grant_pending_convert(r, DLM_LOCK_IV, NULL); if (_can_be_granted(r, lkb, 1)) { grant_lock(r, lkb); queue_cast(r, lkb, 0); grant_pending_locks(r); goto out; } /* else fall through and move to convert queue */ } if (can_be_queued(lkb)) { error = -EINPROGRESS; del_lkb(r, lkb); add_lkb(r, lkb, DLM_LKSTS_CONVERT); send_blocking_asts(r, lkb); add_timeout(lkb); goto out; } error = -EAGAIN; if (force_blocking_asts(lkb)) send_blocking_asts_all(r, lkb); queue_cast(r, lkb, -EAGAIN); out: return error;}static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb){ remove_lock(r, lkb); queue_cast(r, lkb, -DLM_EUNLOCK); grant_pending_locks(r); return -DLM_EUNLOCK;}/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */ static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb){ int error; error = revert_lock(r, lkb); if (error) { queue_cast(r, lkb, -DLM_ECANCEL); grant_pending_locks(r); return -DLM_ECANCEL; } return 0;}/* * Four stage 3 varieties: * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock() *//* add a new lkb to a possibly new rsb, called by requesting process */static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb){ int error; /* set_master: sets lkb nodeid from r */ error = set_master(r, lkb); if (error < 0) goto out; if (error) { error = 0; goto out; } if (is_remote(r)) /* receive_request() calls do_request() on remote node */ error = send_request(r, lkb); else error = do_request(r, lkb); out: return error;}/* change some property of an existing lkb, e.g. mode */static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb){ int error; if (is_remote(r)) /* receive_convert() calls do_convert() on remote node */ error = send_convert(r, lkb); else error = do_convert(r, lkb); return error;}/* remove an existing lkb from the granted queue */static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb){ int error; if (is_remote(r)) /* receive_unlock() calls do_unlock() on remote node */ error = send_unlock(r, lkb); else error = do_unlock(r, lkb); return error;}/* remove an existing lkb from the convert or wait queue */static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb){ int error; if (is_remote(r)) /* receive_cancel() calls do_cancel() on remote node */ error = send_cancel(r, lkb); else error = do_cancel(r, lkb); return error;}/* * Four stage 2 varieties: * request_lock(), convert_lock(), unlock_lock(), cancel_lock() */static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name, int len, struct dlm_args *args){ struct dlm_rsb *r; int error; error = validate_lock_args(ls, lkb, args); if (error) goto out; error = find_rsb(ls, name, len, R_CREATE, &r); if (error) goto out; lock_rsb(r); attach_lkb(r, lkb); lkb->lkb_lksb->sb_lkid = lkb->lkb_id; error = _request_lock(r, lkb); unlock_rsb(r); put_rsb(r); out: return error;}static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, struct dlm_args *args){ struct dlm_rsb *r; int error; r = lkb->lkb_resource; hold_rsb(r); lock_rsb(r); error = validate_lock_args(ls, lkb, args); if (error) goto out; error = _convert_lock(r, lkb); out: unlock_rsb(r); put_rsb(r); return error;}static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, struct dlm_args *args){ struct dlm_rsb *r; int error; r = lkb->lkb_resource; hold_rsb(r); lock_rsb(r); error = validate_unlock_args(lkb, args); if (error) goto out; error = _unlock_lock(r, lkb); out: unlock_rsb(r); put_rsb(r); return error;}static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, struct dlm_args *args){ struct dlm_rsb *r; int error; r = lkb->lkb_resource; hold_rsb(r); lock_rsb(r); error = validate_unlock_args(lkb, args); if (error) goto out; error = _cancel_lock(r, lkb); out: unlock_rsb(r); put_rsb(r); return error;}/* * Two stage 1 varieties: dlm_lock() and dlm_unlock() */int dlm_lock(dlm_lockspace_t *lockspace, int mode, struct dlm_lksb *lksb, uint32_t flags, void *name, unsigned int namelen, uint32_t parent_lkid, void (*ast) (void *astarg), void *astarg, void (*bast) (void *astarg, int mode)){ struct dlm_ls *ls; struct dlm_lkb *lkb; struct dlm_args args; int error, convert = flags & DLM_LKF_CONVERT; ls = dlm_find_lockspace_local(lockspace); if (!ls) return -EINVAL; dlm_lock_recovery(ls); if (convert) error = find_lkb(ls, lksb->sb_lkid, &lkb); else error = create_lkb(ls, &lkb); if (error) goto out; error = set_lock_args(mode, lksb, flags, namelen, 0, ast, astarg, bast, &args); if (error) goto out_put; if (convert) error = convert_lock(ls, lkb, &args); else error = request_lock(ls, lkb, n
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -