📄 mds_reint.c
字号:
l_dput(dchild); cleanup_phase = 1; /* parent lock only */ *dchildp = dchild = vchild; if (dchild->d_inode) { int flags = LDLM_FL_ATOMIC_CB; child_res_id->name[0] = dchild->d_inode->i_ino; child_res_id->name[1] = dchild->d_inode->i_generation; /* Make sure that we don't try to re-enqueue a lock on the * same resource if it happens that the source is renamed to * the target by another thread (bug 9974, thanks racer :-) */ if (!res_gt(child_res_id, parent_res_id, NULL, NULL) || !res_gt(child_res_id, maxres, NULL, NULL)) { CDEBUG(D_DLMTRACE, "relock "LPU64"<("LPU64"|"LPU64")\n", child_res_id->name[0], parent_res_id->name[0], maxres->name[0]); GOTO(cleanup, rc = 1); } rc = ldlm_cli_enqueue_local(obd->obd_namespace, child_res_id, LDLM_IBITS, child_policy, child_mode, &flags, ldlm_blocking_ast, ldlm_completion_ast, NULL, NULL, 0, NULL, child_lockh); if (rc != ELDLM_OK) GOTO(cleanup, rc = -EIO); } else { memset(child_res_id, 0, sizeof(*child_res_id)); } EXIT;cleanup: if (rc) { switch(cleanup_phase) { case 2: if (child_res_id->name[0] != 0) ldlm_lock_decref(child_lockh, child_mode); case 1: ldlm_lock_decref(parent_lockh, parent_mode); } } return rc;}#define INODE_CTIME_AGE (10)#define INODE_CTIME_OLD(inode) (LTIME_S(inode->i_ctime) + \ INODE_CTIME_AGE < CURRENT_SECONDS)int mds_get_parent_child_locked(struct obd_device *obd, struct mds_obd *mds, struct ll_fid *fid, struct lustre_handle *parent_lockh, struct dentry **dparentp, int parent_mode, __u64 parent_lockpart, char *name, int namelen, struct lustre_handle *child_lockh, struct dentry **dchildp, int child_mode, __u64 child_lockpart){ struct ldlm_res_id child_res_id = { .name = {0} }; struct ldlm_res_id parent_res_id = { .name = {0} }; ldlm_policy_data_t parent_policy = {.l_inodebits = { parent_lockpart }}; ldlm_policy_data_t child_policy = {.l_inodebits = { child_lockpart }}; struct inode *inode; int rc = 0, cleanup_phase = 0; ENTRY; /* Step 1: Lookup parent */ *dparentp = mds_fid2dentry(mds, fid, NULL); if (IS_ERR(*dparentp)) { rc = PTR_ERR(*dparentp); *dparentp = NULL; RETURN(rc); } CDEBUG(D_INODE, "parent ino %lu, name %s\n", (*dparentp)->d_inode->i_ino, name); parent_res_id.name[0] = (*dparentp)->d_inode->i_ino; parent_res_id.name[1] = (*dparentp)->d_inode->i_generation; cleanup_phase = 1; /* parent dentry */ /* Step 2: Lookup child (without DLM lock, to get resource name) */ *dchildp = ll_lookup_one_len(name, *dparentp, namelen - 1); if (IS_ERR(*dchildp)) { rc = PTR_ERR(*dchildp); CDEBUG(D_INODE, "child lookup error %d\n", rc); GOTO(cleanup, rc); } cleanup_phase = 2; /* child dentry */ inode = (*dchildp)->d_inode; if (inode != NULL) { if (is_bad_inode(inode)) { CERROR("bad inode returned %lu/%u\n", inode->i_ino, inode->i_generation); GOTO(cleanup, rc = -ENOENT); } inode = igrab(inode); } if (inode == NULL) goto retry_locks; child_res_id.name[0] = inode->i_ino; child_res_id.name[1] = inode->i_generation; /* If we want a LCK_CR for a directory, and this directory has not been changed for some time, we return not only a LOOKUP lock, but also an UPDATE lock to have negative dentry starts working for this dir. Also we apply same logic to non-directories. If the file is rarely changed - we return both locks and this might save us RPC on later STAT. */ if ((child_mode & (LCK_CR|LCK_PR|LCK_CW)) && INODE_CTIME_OLD(inode)) child_policy.l_inodebits.bits |= MDS_INODELOCK_UPDATE; iput(inode);retry_locks: cleanup_phase = 2; /* child dentry */ /* Step 3: Lock parent and child in resource order. If child doesn't * exist, we still have to lock the parent and re-lookup. */ rc = enqueue_ordered_locks(obd,&parent_res_id,parent_lockh,parent_mode, &parent_policy, &child_res_id, child_lockh, child_mode, &child_policy); if (rc) GOTO(cleanup, rc); if (!(*dchildp)->d_inode) cleanup_phase = 3; /* parent lock */ else cleanup_phase = 4; /* child lock */ /* Step 4: Re-lookup child to verify it hasn't changed since locking */ rc = mds_verify_child(obd, &parent_res_id, parent_lockh, *dparentp, parent_mode, &child_res_id, child_lockh, dchildp, child_mode,&child_policy, name, namelen, &parent_res_id); if (rc > 0) goto retry_locks; if (rc < 0) { cleanup_phase = 2; GOTO(cleanup, rc); }cleanup: if (rc) { switch (cleanup_phase) { case 4: ldlm_lock_decref(child_lockh, child_mode); case 3: ldlm_lock_decref(parent_lockh, parent_mode); case 2: l_dput(*dchildp); case 1: l_dput(*dparentp); default: ; } } return rc;}void mds_reconstruct_generic(struct ptlrpc_request *req){ struct mds_export_data *med = &req->rq_export->exp_mds_data; mds_req_from_mcd(req, med->med_mcd);}/* If we are unlinking an open file/dir (i.e. creating an orphan) then * we instead link the inode into the PENDING directory until it is * finally released. We can't simply call mds_reint_rename() or some * part thereof, because we don't have the inode to check for link * count/open status until after it is locked. * * For lock ordering, caller must get child->i_mutex first, then * pending->i_mutex before starting journal transaction. * * returns 1 on success * returns 0 if we lost a race and didn't make a new link * returns negative on error */static int mds_orphan_add_link(struct mds_update_record *rec, struct obd_device *obd, struct dentry *dentry){ struct mds_obd *mds = &obd->u.mds; struct inode *pending_dir = mds->mds_pending_dir->d_inode; struct inode *inode = dentry->d_inode; struct dentry *pending_child; char fidname[LL_FID_NAMELEN]; int fidlen = 0, rc, mode; ENTRY; LASSERT(inode != NULL); LASSERT(!mds_inode_is_orphan(inode));#ifndef HAVE_I_ALLOC_SEM LASSERT(TRYLOCK_INODE_MUTEX(inode) == 0);#endif LASSERT(TRYLOCK_INODE_MUTEX(pending_dir) == 0); fidlen = ll_fid2str(fidname, inode->i_ino, inode->i_generation); CDEBUG(D_INODE, "pending destroy of %dx open %d linked %s %s = %s\n", mds_orphan_open_count(inode), inode->i_nlink, S_ISDIR(inode->i_mode) ? "dir" : S_ISREG(inode->i_mode) ? "file" : "other",rec->ur_name,fidname); if (mds_orphan_open_count(inode) == 0 || inode->i_nlink != 0) RETURN(0); pending_child = lookup_one_len(fidname, mds->mds_pending_dir, fidlen); if (IS_ERR(pending_child)) RETURN(PTR_ERR(pending_child)); if (pending_child->d_inode != NULL) { CERROR("re-destroying orphan file %s?\n", rec->ur_name); LASSERT(pending_child->d_inode == inode); GOTO(out_dput, rc = 0); } /* link() is semanticaly-wrong for S_IFDIR, so we set S_IFREG * for linking and return real mode back then -bzzz */ mode = inode->i_mode; inode->i_mode = S_IFREG; rc = vfs_link(dentry, pending_dir, pending_child); if (rc) CERROR("error linking orphan %s to PENDING: rc = %d\n", rec->ur_name, rc); else mds_inode_set_orphan(inode); /* return mode and correct i_nlink if inode is directory */ inode->i_mode = mode; LASSERTF(inode->i_nlink == 1, "%s nlink == %d\n", S_ISDIR(mode) ? "dir" : S_ISREG(mode) ? "file" : "other", inode->i_nlink); if (S_ISDIR(mode)) { inode->i_nlink++; pending_dir->i_nlink++; mark_inode_dirty(inode); mark_inode_dirty(pending_dir); } GOTO(out_dput, rc = 1);out_dput: l_dput(pending_child); RETURN(rc);}int mds_get_cookie_size(struct obd_device *obd, struct lov_mds_md *lmm){ int count = le32_to_cpu(lmm->lmm_stripe_count); int real_csize = count * sizeof(struct llog_cookie); return real_csize;}void mds_shrink_reply(struct obd_device *obd, struct ptlrpc_request *req, struct mds_body *body, int md_off){ int cookie_size = 0, md_size = 0; if (body && body->valid & OBD_MD_FLEASIZE) { md_size = body->eadatasize; } if (body && body->valid & OBD_MD_FLCOOKIE) { LASSERT(body->valid & OBD_MD_FLEASIZE); cookie_size = mds_get_cookie_size(obd, lustre_msg_buf( req->rq_repmsg, md_off, 0)); } CDEBUG(D_INFO, "Shrink to md_size %d cookie_size %d \n", md_size, cookie_size); lustre_shrink_reply(req, md_off, md_size, 1); lustre_shrink_reply(req, md_off + (md_size > 0), cookie_size, 0); }static int mds_reint_unlink(struct mds_update_record *rec, int offset, struct ptlrpc_request *req, struct lustre_handle *lh){ struct dentry *dparent = NULL, *dchild; struct mds_obd *mds = mds_req2mds(req); struct obd_device *obd = req->rq_export->exp_obd; struct mds_body *body = NULL; struct inode *child_inode = NULL; struct lustre_handle parent_lockh, child_lockh, child_reuse_lockh; void *handle = NULL; int rc = 0, cleanup_phase = 0; unsigned int qcids[MAXQUOTAS] = { 0, 0 }; unsigned int qpids[MAXQUOTAS] = { 0, 0 }; ENTRY; LASSERT(offset == REQ_REC_OFF); /* || offset == DLM_INTENT_REC_OFF); */ offset = REPLY_REC_OFF; DEBUG_REQ(D_INODE, req, "parent ino "LPU64"/%u, child %s", rec->ur_fid1->id, rec->ur_fid1->generation, rec->ur_name); MDS_CHECK_RESENT(req, mds_reconstruct_generic(req)); if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK)) GOTO(cleanup, rc = -ENOENT); if (rec->ur_dlm) ldlm_request_cancel(req, rec->ur_dlm, 0); rc = mds_get_parent_child_locked(obd, mds, rec->ur_fid1, &parent_lockh, &dparent, LCK_EX, MDS_INODELOCK_UPDATE, rec->ur_name, rec->ur_namelen, &child_lockh, &dchild, LCK_EX, MDS_INODELOCK_FULL);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -