📄 mds_reint.c
字号:
if (rec_pending) lquota_pending_commit(mds_quota_interface_ref, obd, current->fsuid, gid, 1); if (rc && created) { /* Destroy the file we just created. This should not need * extra journal credits, as we have already modified all of * the blocks needed in order to create the file in the first * place. */ switch (type) { case S_IFDIR: err = vfs_rmdir(dir, dchild); if (err) CERROR("rmdir in error path: %d\n", err); break; default: err = vfs_unlink(dir, dchild); if (err) CERROR("unlink in error path: %d\n", err); break; } } else if (created) { /* The inode we were allocated may have just been freed * by an unlink operation. We take this lock to * synchronize against the matching reply-ack-lock taken * in unlink, to avoid replay problems if this reply * makes it out to the client but the unlink's does not. * See bug 2029 for more detail.*/ mds_lock_new_child(obd, dchild->d_inode, NULL); /* save uid/gid of create inode and parent */ qpids[USRQUOTA] = dir->i_uid; qpids[GRPQUOTA] = dir->i_gid; } else { rc = err; } switch (cleanup_phase) { case 2: /* child dentry */ l_dput(dchild); case 1: /* locked parent dentry */ if (rc) { ldlm_lock_decref(&lockh, LCK_EX); } else { ptlrpc_save_lock (req, &lockh, LCK_EX); } l_dput(dparent); case 0: break; default: CERROR("invalid cleanup_phase %d\n", cleanup_phase); LBUG(); } req->rq_status = rc; /* trigger dqacq on the owner of child and parent */ lquota_adjust(mds_quota_interface_ref, obd, qcids, qpids, rc, FSFILT_OP_CREATE); return 0;}int res_gt(struct ldlm_res_id *res1, struct ldlm_res_id *res2, ldlm_policy_data_t *p1, ldlm_policy_data_t *p2){ int i; for (i = 0; i < RES_NAME_SIZE; i++) { /* return 1 here, because enqueue_ordered will skip resources * of all zeroes if they're sorted to the end of the list. */ if (res1->name[i] == 0 && res2->name[i] != 0) return 1; if (res2->name[i] == 0 && res1->name[i] != 0) return 0; if (res1->name[i] > res2->name[i]) return 1; if (res1->name[i] < res2->name[i]) return 0; } if (!p1 || !p2) return 0; if (memcmp(p1, p2, sizeof(*p1)) < 0) return 1; return 0;}/* This function doesn't use ldlm_match_or_enqueue because we're always called * with EX or PW locks, and the MDS is no longer allowed to match write locks, * because they take the place of local semaphores. * * One or two locks are taken in numerical order. A res_id->name[0] of 0 means * no lock is taken for that res_id. Must be at least one non-zero res_id. */int enqueue_ordered_locks(struct obd_device *obd, struct ldlm_res_id *p1_res_id, struct lustre_handle *p1_lockh, int p1_lock_mode, ldlm_policy_data_t *p1_policy, struct ldlm_res_id *p2_res_id, struct lustre_handle *p2_lockh, int p2_lock_mode, ldlm_policy_data_t *p2_policy){ struct ldlm_res_id *res_id[2] = { p1_res_id, p2_res_id }; struct lustre_handle *handles[2] = { p1_lockh, p2_lockh }; int lock_modes[2] = { p1_lock_mode, p2_lock_mode }; ldlm_policy_data_t *policies[2] = {p1_policy, p2_policy}; int rc, flags; ENTRY; LASSERT(p1_res_id != NULL && p2_res_id != NULL); CDEBUG(D_INFO, "locks before: "LPU64"/"LPU64"\n", res_id[0]->name[0], res_id[1]->name[0]); if (res_gt(p1_res_id, p2_res_id, p1_policy, p2_policy)) { handles[1] = p1_lockh; handles[0] = p2_lockh; res_id[1] = p1_res_id; res_id[0] = p2_res_id; lock_modes[1] = p1_lock_mode; lock_modes[0] = p2_lock_mode; policies[1] = p1_policy; policies[0] = p2_policy; } CDEBUG(D_DLMTRACE, "lock order: "LPU64"/"LPU64"\n", res_id[0]->name[0], res_id[1]->name[0]); flags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB; rc = ldlm_cli_enqueue_local(obd->obd_namespace, res_id[0], LDLM_IBITS, policies[0], lock_modes[0], &flags, ldlm_blocking_ast, ldlm_completion_ast, NULL, NULL, 0, NULL, handles[0]); if (rc != ELDLM_OK) RETURN(-EIO); ldlm_lock_dump_handle(D_OTHER, handles[0]); if (memcmp(res_id[0], res_id[1], sizeof(*res_id[0])) == 0 && (policies[0]->l_inodebits.bits & policies[1]->l_inodebits.bits)) { memcpy(handles[1], handles[0], sizeof(*(handles[1]))); ldlm_lock_addref(handles[1], lock_modes[1]); } else if (res_id[1]->name[0] != 0) { flags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB; rc = ldlm_cli_enqueue_local(obd->obd_namespace, res_id[1], LDLM_IBITS, policies[1], lock_modes[1], &flags, ldlm_blocking_ast, ldlm_completion_ast, NULL, NULL, 0, NULL, handles[1]); if (rc != ELDLM_OK) { ldlm_lock_decref(handles[0], lock_modes[0]); RETURN(-EIO); } ldlm_lock_dump_handle(D_OTHER, handles[1]); } RETURN(0);}static inline int res_eq(struct ldlm_res_id *res1, struct ldlm_res_id *res2){ return !memcmp(res1, res2, sizeof(*res1));}static inline voidtry_to_aggregate_locks(struct ldlm_res_id *res1, ldlm_policy_data_t *p1, struct ldlm_res_id *res2, ldlm_policy_data_t *p2){ if (!res_eq(res1, res2)) return; /* XXX: any additional inodebits (to current LOOKUP and UPDATE) * should be taken with great care here */ p1->l_inodebits.bits |= p2->l_inodebits.bits;}int enqueue_4ordered_locks(struct obd_device *obd,struct ldlm_res_id *p1_res_id, struct lustre_handle *p1_lockh, int p1_lock_mode, ldlm_policy_data_t *p1_policy, struct ldlm_res_id *p2_res_id, struct lustre_handle *p2_lockh, int p2_lock_mode, ldlm_policy_data_t *p2_policy, struct ldlm_res_id *c1_res_id, struct lustre_handle *c1_lockh, int c1_lock_mode, ldlm_policy_data_t *c1_policy, struct ldlm_res_id *c2_res_id, struct lustre_handle *c2_lockh, int c2_lock_mode, ldlm_policy_data_t *c2_policy){ struct ldlm_res_id *res_id[5] = { p1_res_id, p2_res_id, c1_res_id, c2_res_id }; struct lustre_handle *dlm_handles[5] = { p1_lockh, p2_lockh, c1_lockh, c2_lockh }; int lock_modes[5] = { p1_lock_mode, p2_lock_mode, c1_lock_mode, c2_lock_mode }; ldlm_policy_data_t *policies[5] = {p1_policy, p2_policy, c1_policy, c2_policy}; int rc, i, j, sorted, flags; ENTRY; CDEBUG(D_DLMTRACE, "locks before: "LPU64"/"LPU64"/"LPU64"/"LPU64"\n", res_id[0]->name[0], res_id[1]->name[0], res_id[2]->name[0], res_id[3]->name[0]); /* simple insertion sort - we have at most 4 elements */ for (i = 1; i < 4; i++) { j = i - 1; dlm_handles[4] = dlm_handles[i]; res_id[4] = res_id[i]; lock_modes[4] = lock_modes[i]; policies[4] = policies[i]; sorted = 0; do { if (res_gt(res_id[j], res_id[4], policies[j], policies[4])) { dlm_handles[j + 1] = dlm_handles[j]; res_id[j + 1] = res_id[j]; lock_modes[j + 1] = lock_modes[j]; policies[j + 1] = policies[j]; j--; } else { sorted = 1; } } while (j >= 0 && !sorted); dlm_handles[j + 1] = dlm_handles[4]; res_id[j + 1] = res_id[4]; lock_modes[j + 1] = lock_modes[4]; policies[j + 1] = policies[4]; } CDEBUG(D_DLMTRACE, "lock order: "LPU64"/"LPU64"/"LPU64"/"LPU64"\n", res_id[0]->name[0], res_id[1]->name[0], res_id[2]->name[0], res_id[3]->name[0]); /* XXX we could send ASTs on all these locks first before blocking? */ for (i = 0; i < 4; i++) { flags = LDLM_FL_ATOMIC_CB; if (res_id[i]->name[0] == 0) break; if (i && res_eq(res_id[i], res_id[i-1])) { memcpy(dlm_handles[i], dlm_handles[i-1], sizeof(*(dlm_handles[i]))); ldlm_lock_addref(dlm_handles[i], lock_modes[i]); } else { /* we need to enqueue locks with different inodebits * at once, because otherwise concurrent thread can * hit the windown between these two locks and we'll * get to deadlock. see bug 10360. note also, that it * is impossible to have >2 equal res. */ if (i < 3) try_to_aggregate_locks(res_id[i], policies[i], res_id[i+1], policies[i+1]); rc = ldlm_cli_enqueue_local(obd->obd_namespace, res_id[i], LDLM_IBITS, policies[i], lock_modes[i], &flags, ldlm_blocking_ast, ldlm_completion_ast, NULL, NULL, 0, NULL, dlm_handles[i]); if (rc != ELDLM_OK) GOTO(out_err, rc = -EIO); ldlm_lock_dump_handle(D_OTHER, dlm_handles[i]); } } RETURN(0);out_err: while (i-- > 0) ldlm_lock_decref(dlm_handles[i], lock_modes[i]); return rc;}/* In the unlikely case that the child changed while we were waiting * on the lock, we need to drop the lock on the old child and either: * - if the child has a lower resource name, then we have to also * drop the parent lock and regain the locks in the right order * - in the rename case, if the child has a lower resource name than one of * the other parent/child resources (maxres) we also need to reget the locks * - if the child has a higher resource name (this is the common case) * we can just get the lock on the new child (still in lock order) * * Returns 0 if the child did not change or if it changed but could be locked. * Returns 1 if the child changed and we need to re-lock (no locks held). * Returns -ve error with a valid dchild (no locks held). */static int mds_verify_child(struct obd_device *obd, struct ldlm_res_id *parent_res_id, struct lustre_handle *parent_lockh, struct dentry *dparent, int parent_mode, struct ldlm_res_id *child_res_id, struct lustre_handle *child_lockh, struct dentry **dchildp, int child_mode, ldlm_policy_data_t *child_policy, const char *name, int namelen, struct ldlm_res_id *maxres){ struct dentry *vchild, *dchild = *dchildp; int rc = 0, cleanup_phase = 2; /* parent, child locks */ ENTRY; vchild = ll_lookup_one_len(name, dparent, namelen - 1); if (IS_ERR(vchild)) GOTO(cleanup, rc = PTR_ERR(vchild)); if (likely((vchild->d_inode == NULL && child_res_id->name[0] == 0) || (vchild->d_inode != NULL && child_res_id->name[0] == vchild->d_inode->i_ino && child_res_id->name[1] == vchild->d_inode->i_generation))) { if (dchild != NULL) l_dput(dchild); *dchildp = vchild; RETURN(0); } CDEBUG(D_DLMTRACE, "child inode changed: %p != %p (%lu != "LPU64")\n", vchild->d_inode, dchild ? dchild->d_inode : 0, vchild->d_inode ? vchild->d_inode->i_ino : 0, child_res_id->name[0]); if (child_res_id->name[0] != 0) ldlm_lock_decref(child_lockh, child_mode); if (dchild)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -