📄 dlmlock.c
字号:
newlock->ml.pad1 = 0; newlock->ml.list = 0; newlock->ml.flags = 0; newlock->ast = NULL; newlock->bast = NULL; newlock->astdata = NULL; newlock->ml.cookie = cpu_to_be64(cookie); newlock->ast_pending = 0; newlock->bast_pending = 0; newlock->convert_pending = 0; newlock->lock_pending = 0; newlock->unlock_pending = 0; newlock->cancel_pending = 0; newlock->lksb_kernel_allocated = 0; kref_init(&newlock->lock_refs);}struct dlm_lock * dlm_new_lock(int type, u8 node, u64 cookie, struct dlm_lockstatus *lksb){ struct dlm_lock *lock; int kernel_allocated = 0; lock = kcalloc(1, sizeof(*lock), GFP_KERNEL); if (!lock) return NULL; if (!lksb) { /* zero memory only if kernel-allocated */ lksb = kcalloc(1, sizeof(*lksb), GFP_KERNEL); if (!lksb) { kfree(lock); return NULL; } kernel_allocated = 1; } dlm_init_lock(lock, type, node, cookie); if (kernel_allocated) lock->lksb_kernel_allocated = 1; lock->lksb = lksb; lksb->lockid = lock; return lock;}/* handler for lock creation net message * locking: * caller needs: none * taken: takes and drops res->spinlock * held on exit: none * returns: DLM_NORMAL, DLM_SYSERR, DLM_IVLOCKID, DLM_NOTQUEUED */int dlm_create_lock_handler(struct o2net_msg *msg, u32 len, void *data){ struct dlm_ctxt *dlm = data; struct dlm_create_lock *create = (struct dlm_create_lock *)msg->buf; struct dlm_lock_resource *res = NULL; struct dlm_lock *newlock = NULL; struct dlm_lockstatus *lksb = NULL; enum dlm_status status = DLM_NORMAL; char *name; unsigned int namelen; BUG_ON(!dlm); mlog_entry_void(); if (!dlm_grab(dlm)) return DLM_REJECTED; mlog_bug_on_msg(!dlm_domain_fully_joined(dlm), "Domain %s not fully joined!\n", dlm->name); name = create->name; namelen = create->namelen; status = DLM_IVBUFLEN; if (namelen > DLM_LOCKID_NAME_MAX) { dlm_error(status); goto leave; } status = DLM_SYSERR; newlock = dlm_new_lock(create->requested_type, create->node_idx, be64_to_cpu(create->cookie), NULL); if (!newlock) { dlm_error(status); goto leave; } lksb = newlock->lksb; if (be32_to_cpu(create->flags) & LKM_GET_LVB) { lksb->flags |= DLM_LKSB_GET_LVB; mlog(0, "set DLM_LKSB_GET_LVB flag\n"); } status = DLM_IVLOCKID; res = dlm_lookup_lockres(dlm, name, namelen); if (!res) { dlm_error(status); goto leave; } spin_lock(&res->spinlock); status = __dlm_lockres_state_to_status(res); spin_unlock(&res->spinlock); if (status != DLM_NORMAL) { mlog(0, "lockres recovering/migrating/in-progress\n"); goto leave; } dlm_lock_attach_lockres(newlock, res); status = dlmlock_master(dlm, res, newlock, be32_to_cpu(create->flags));leave: if (status != DLM_NORMAL) if (newlock) dlm_lock_put(newlock); if (res) dlm_lockres_put(res); dlm_put(dlm); return status;}/* fetch next node-local (u8 nodenum + u56 cookie) into u64 */static inline void dlm_get_next_cookie(u8 node_num, u64 *cookie){ u64 tmpnode = node_num; /* shift single byte of node num into top 8 bits */ tmpnode <<= 56; spin_lock(&dlm_cookie_lock); *cookie = (dlm_next_cookie | tmpnode); if (++dlm_next_cookie & 0xff00000000000000ull) { mlog(0, "This node's cookie will now wrap!\n"); dlm_next_cookie = 1; } spin_unlock(&dlm_cookie_lock);}enum dlm_status dlmlock(struct dlm_ctxt *dlm, int mode, struct dlm_lockstatus *lksb, int flags, const char *name, dlm_astlockfunc_t *ast, void *data, dlm_bastlockfunc_t *bast){ enum dlm_status status; struct dlm_lock_resource *res = NULL; struct dlm_lock *lock = NULL; int convert = 0, recovery = 0; /* yes this function is a mess. * TODO: clean this up. lots of common code in the * lock and convert paths, especially in the retry blocks */ if (!lksb) { dlm_error(DLM_BADARGS); return DLM_BADARGS; } status = DLM_BADPARAM; if (mode != LKM_EXMODE && mode != LKM_PRMODE && mode != LKM_NLMODE) { dlm_error(status); goto error; } if (flags & ~LKM_VALID_FLAGS) { dlm_error(status); goto error; } convert = (flags & LKM_CONVERT); recovery = (flags & LKM_RECOVERY); if (recovery && (!dlm_is_recovery_lock(name, strlen(name)) || convert) ) { dlm_error(status); goto error; } if (convert && (flags & LKM_LOCAL)) { mlog(ML_ERROR, "strange LOCAL convert request!\n"); goto error; } if (convert) { /* CONVERT request */ /* if converting, must pass in a valid dlm_lock */ lock = lksb->lockid; if (!lock) { mlog(ML_ERROR, "NULL lock pointer in convert " "request\n"); goto error; } res = lock->lockres; if (!res) { mlog(ML_ERROR, "NULL lockres pointer in convert " "request\n"); goto error; } dlm_lockres_get(res); /* XXX: for ocfs2 purposes, the ast/bast/astdata/lksb are * static after the original lock call. convert requests will * ensure that everything is the same, or return DLM_BADARGS. * this means that DLM_DENIED_NOASTS will never be returned. */ if (lock->lksb != lksb || lock->ast != ast || lock->bast != bast || lock->astdata != data) { status = DLM_BADARGS; mlog(ML_ERROR, "new args: lksb=%p, ast=%p, bast=%p, " "astdata=%p\n", lksb, ast, bast, data); mlog(ML_ERROR, "orig args: lksb=%p, ast=%p, bast=%p, " "astdata=%p\n", lock->lksb, lock->ast, lock->bast, lock->astdata); goto error; }retry_convert: dlm_wait_for_recovery(dlm); if (res->owner == dlm->node_num) status = dlmconvert_master(dlm, res, lock, flags, mode); else status = dlmconvert_remote(dlm, res, lock, flags, mode); if (status == DLM_RECOVERING || status == DLM_MIGRATING || status == DLM_FORWARD) { /* for now, see how this works without sleeping * and just retry right away. I suspect the reco * or migration will complete fast enough that * no waiting will be necessary */ mlog(0, "retrying convert with migration/recovery/" "in-progress\n"); msleep(100); goto retry_convert; } } else { u64 tmpcookie; /* LOCK request */ status = DLM_BADARGS; if (!name) { dlm_error(status); goto error; } status = DLM_IVBUFLEN; if (strlen(name) > DLM_LOCKID_NAME_MAX || strlen(name) < 1) { dlm_error(status); goto error; } dlm_get_next_cookie(dlm->node_num, &tmpcookie); lock = dlm_new_lock(mode, dlm->node_num, tmpcookie, lksb); if (!lock) { dlm_error(status); goto error; } if (!recovery) dlm_wait_for_recovery(dlm); /* find or create the lock resource */ res = dlm_get_lock_resource(dlm, name, flags); if (!res) { status = DLM_IVLOCKID; dlm_error(status); goto error; } mlog(0, "type=%d, flags = 0x%x\n", mode, flags); mlog(0, "creating lock: lock=%p res=%p\n", lock, res); dlm_lock_attach_lockres(lock, res); lock->ast = ast; lock->bast = bast; lock->astdata = data;retry_lock: if (flags & LKM_VALBLK) { mlog(0, "LKM_VALBLK passed by caller\n"); /* LVB requests for non PR, PW or EX locks are * ignored. */ if (mode < LKM_PRMODE) flags &= ~LKM_VALBLK; else { flags |= LKM_GET_LVB; lock->lksb->flags |= DLM_LKSB_GET_LVB; } } if (res->owner == dlm->node_num) status = dlmlock_master(dlm, res, lock, flags); else status = dlmlock_remote(dlm, res, lock, flags); if (status == DLM_RECOVERING || status == DLM_MIGRATING || status == DLM_FORWARD) { mlog(0, "retrying lock with migration/" "recovery/in progress\n"); msleep(100); /* no waiting for dlm_reco_thread */ if (recovery) { if (status == DLM_RECOVERING) { mlog(0, "%s: got RECOVERING " "for $REOCVERY lock, master " "was %u\n", dlm->name, res->owner); dlm_wait_for_node_death(dlm, res->owner, DLM_NODE_DEATH_WAIT_MAX); } } else { dlm_wait_for_recovery(dlm); } goto retry_lock; } if (status != DLM_NORMAL) { lock->lksb->flags &= ~DLM_LKSB_GET_LVB; if (status != DLM_NOTQUEUED) dlm_error(status); goto error; } }error: if (status != DLM_NORMAL) { if (lock && !convert) dlm_lock_put(lock); // this is kind of unnecessary lksb->status = status; } /* put lockres ref from the convert path * or from dlm_get_lock_resource */ if (res) dlm_lockres_put(res); return status;}EXPORT_SYMBOL_GPL(dlmlock);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -