ldlm_request.c
来自「lustre 1.6.5 source code」· C语言 代码 · 共 1,663 行 · 第 1/5 页
C
1,663 行
}static void failed_lock_cleanup(struct ldlm_namespace *ns, struct ldlm_lock *lock, struct lustre_handle *lockh, int mode){ /* Set a flag to prevent us from sending a CANCEL (bug 407) */ lock_res_and_lock(lock); lock->l_flags |= LDLM_FL_LOCAL_ONLY; unlock_res_and_lock(lock); LDLM_DEBUG(lock, "setting FL_LOCAL_ONLY"); ldlm_lock_decref_and_cancel(lockh, mode); /* XXX - HACK because we shouldn't call ldlm_lock_destroy() * from llite/file.c/ll_file_flock(). */ if (lock->l_resource->lr_type == LDLM_FLOCK) { ldlm_lock_destroy(lock); }}int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req, ldlm_type_t type, __u8 with_policy, ldlm_mode_t mode, int *flags, void *lvb, __u32 lvb_len, void *lvb_swabber, struct lustre_handle *lockh,int rc){ struct ldlm_namespace *ns = exp->exp_obd->obd_namespace; int is_replay = *flags & LDLM_FL_REPLAY; struct ldlm_lock *lock; struct ldlm_reply *reply; int cleanup_phase = 1; ENTRY; lock = ldlm_handle2lock(lockh); /* ldlm_cli_enqueue is holding a reference on this lock. */ if (!lock) { LASSERT(type == LDLM_FLOCK); RETURN(-ENOLCK); } if (rc != ELDLM_OK) { LASSERT(!is_replay); LDLM_DEBUG(lock, "client-side enqueue END (%s)", rc == ELDLM_LOCK_ABORTED ? "ABORTED" : "FAILED"); if (rc == ELDLM_LOCK_ABORTED) { /* Before we return, swab the reply */ reply = lustre_swab_repbuf(req, DLM_LOCKREPLY_OFF, sizeof(*reply), lustre_swab_ldlm_reply); if (reply == NULL) { CERROR("Can't unpack ldlm_reply\n"); rc = -EPROTO; } if (lvb_len) { void *tmplvb; tmplvb = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF, lvb_len, lvb_swabber); if (tmplvb == NULL) GOTO(cleanup, rc = -EPROTO); if (lvb != NULL) memcpy(lvb, tmplvb, lvb_len); } } GOTO(cleanup, rc); } reply = lustre_swab_repbuf(req, DLM_LOCKREPLY_OFF, sizeof(*reply), lustre_swab_ldlm_reply); if (reply == NULL) { CERROR("Can't unpack ldlm_reply\n"); GOTO(cleanup, rc = -EPROTO); } /* lock enqueued on the server */ cleanup_phase = 0; lock_res_and_lock(lock); lock->l_remote_handle = reply->lock_handle; *flags = reply->lock_flags; lock->l_flags |= reply->lock_flags & LDLM_INHERIT_FLAGS; /* move NO_TIMEOUT flag to the lock to force ldlm_lock_match() * to wait with no timeout as well */ lock->l_flags |= reply->lock_flags & LDLM_FL_NO_TIMEOUT; unlock_res_and_lock(lock); CDEBUG(D_INFO, "local: %p, remote cookie: "LPX64", flags: 0x%x\n", lock, reply->lock_handle.cookie, *flags); /* If enqueue returned a blocked lock but the completion handler has * already run, then it fixed up the resource and we don't need to do it * again. */ if ((*flags) & LDLM_FL_LOCK_CHANGED) { int newmode = reply->lock_desc.l_req_mode; LASSERT(!is_replay); if (newmode && newmode != lock->l_req_mode) { LDLM_DEBUG(lock, "server returned different mode %s", ldlm_lockname[newmode]); lock->l_req_mode = newmode; } if (reply->lock_desc.l_resource.lr_name.name[0] != lock->l_resource->lr_name.name[0]) { CDEBUG(D_INFO, "remote intent success, locking %ld " "instead of %ld\n", (long)reply->lock_desc.l_resource.lr_name.name[0], (long)lock->l_resource->lr_name.name[0]); ldlm_lock_change_resource(ns, lock, reply->lock_desc.l_resource.lr_name); if (lock->l_resource == NULL) { LBUG(); GOTO(cleanup, rc = -ENOMEM); } LDLM_DEBUG(lock, "client-side enqueue, new resource"); } if (with_policy) if (!(type == LDLM_IBITS && !(exp->exp_connect_flags & OBD_CONNECT_IBITS))) lock->l_policy_data = reply->lock_desc.l_policy_data; if (type != LDLM_PLAIN) LDLM_DEBUG(lock,"client-side enqueue, new policy data"); } if ((*flags) & LDLM_FL_AST_SENT || /* Cancel extent locks as soon as possible on a liblustre client, * because it cannot handle asynchronous ASTs robustly (see * bug 7311). */ (LIBLUSTRE_CLIENT && type == LDLM_EXTENT)) { lock_res_and_lock(lock); lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST; unlock_res_and_lock(lock); LDLM_DEBUG(lock, "enqueue reply includes blocking AST"); } /* If the lock has already been granted by a completion AST, don't * clobber the LVB with an older one. */ if (lvb_len && (lock->l_req_mode != lock->l_granted_mode)) { void *tmplvb; tmplvb = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF, lvb_len, lvb_swabber); if (tmplvb == NULL) GOTO(cleanup, rc = -EPROTO); memcpy(lock->l_lvb_data, tmplvb, lvb_len); } if (!is_replay) { rc = ldlm_lock_enqueue(ns, &lock, NULL, flags); if (lock->l_completion_ast != NULL) { int err = lock->l_completion_ast(lock, *flags, NULL); if (!rc) rc = err; if (rc && type != LDLM_FLOCK) /* bug 9425, bug 10250 */ cleanup_phase = 1; } } if (lvb_len && lvb != NULL) { /* Copy the LVB here, and not earlier, because the completion * AST (if any) can override what we got in the reply */ memcpy(lvb, lock->l_lvb_data, lvb_len); } LDLM_DEBUG(lock, "client-side enqueue END"); EXIT;cleanup: if (cleanup_phase == 1 && rc) failed_lock_cleanup(ns, lock, lockh, mode); /* Put lock 2 times, the second reference is held by ldlm_cli_enqueue */ LDLM_LOCK_PUT(lock); LDLM_LOCK_PUT(lock); return rc;}/* PAGE_SIZE-512 is to allow TCP/IP and LNET headers to fit into * a single page on the send/receive side. XXX: 512 should be changed * to more adequate value. */static inline int ldlm_req_handles_avail(struct obd_export *exp, int *size, int bufcount, int off){ int avail = min_t(int, LDLM_MAXREQSIZE, PAGE_SIZE - 512); avail -= lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic, bufcount, size); avail /= sizeof(struct lustre_handle); avail += LDLM_LOCKREQ_HANDLES - off; return avail;}static inline int ldlm_cancel_handles_avail(struct obd_export *exp){ int size[2] = { sizeof(struct ptlrpc_body), sizeof(struct ldlm_request) }; return ldlm_req_handles_avail(exp, size, 2, 0);}/* Cancel lru locks and pack them into the enqueue request. Pack there the given * @count locks in @cancels. */struct ptlrpc_request *ldlm_prep_elc_req(struct obd_export *exp, int version, int opc, int bufcount, int *size, int bufoff, int canceloff, struct list_head *cancels, int count){ struct ldlm_namespace *ns = exp->exp_obd->obd_namespace; int flags, avail, to_free, pack = 0; struct ldlm_request *dlm = NULL; struct ptlrpc_request *req; CFS_LIST_HEAD(head); ENTRY; if (cancels == NULL) cancels = &head; if (exp_connect_cancelset(exp)) { /* Estimate the amount of free space in the request. */ LASSERT(bufoff < bufcount); avail = ldlm_req_handles_avail(exp, size, bufcount, canceloff); flags = ns_connect_lru_resize(ns) ? LDLM_CANCEL_LRUR : LDLM_CANCEL_AGED; to_free = !ns_connect_lru_resize(ns) && opc == LDLM_ENQUEUE ? 1 : 0; /* Cancel lru locks here _only_ if the server supports * EARLY_CANCEL. Otherwise we have to send extra CANCEL * rpc, what will make us slower. */ if (avail > count) count += ldlm_cancel_lru_local(ns, cancels, to_free, avail - count, 0, flags); if (avail > count) pack = count; else pack = avail; size[bufoff] = ldlm_request_bufsize(pack, opc); } req = ptlrpc_prep_req(class_exp2cliimp(exp), version, opc, bufcount, size, NULL); if (exp_connect_cancelset(exp) && req) { if (canceloff) { dlm = lustre_msg_buf(req->rq_reqmsg, bufoff, sizeof(*dlm)); /* Skip first lock handler in ldlm_request_pack(), * this method will incrment @lock_count according * to the lock handle amount actually written to * the buffer. */ dlm->lock_count = canceloff; } /* Pack into the request @pack lock handles. */ ldlm_cli_cancel_list(cancels, pack, req, bufoff); /* Prepare and send separate cancel rpc for others. */ ldlm_cli_cancel_list(cancels, count - pack, NULL, 0); } else { ldlm_lock_list_put(cancels, l_bl_ast, count); } RETURN(req);}struct ptlrpc_request *ldlm_prep_enqueue_req(struct obd_export *exp, int bufcount, int *size, struct list_head *cancels, int count){ return ldlm_prep_elc_req(exp, LUSTRE_DLM_VERSION, LDLM_ENQUEUE, bufcount, size, DLM_LOCKREQ_OFF, LDLM_ENQUEUE_CANCEL_OFF, cancels, count);}/* If a request has some specific initialisation it is passed in @reqp, * otherwise it is created in ldlm_cli_enqueue. * * Supports sync and async requests, pass @async flag accordingly. If a * request was created in ldlm_cli_enqueue and it is the async request, * pass it to the caller in @reqp. */int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp, struct ldlm_enqueue_info *einfo, struct ldlm_res_id res_id, ldlm_policy_data_t *policy, int *flags, void *lvb, __u32 lvb_len, void *lvb_swabber, struct lustre_handle *lockh, int async){ struct ldlm_namespace *ns = exp->exp_obd->obd_namespace; struct ldlm_lock *lock; struct ldlm_request *body; struct ldlm_reply *reply; int size[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body), [DLM_LOCKREQ_OFF] = sizeof(*body), [DLM_REPLY_REC_OFF] = lvb_len }; int is_replay = *flags & LDLM_FL_REPLAY; int req_passed_in = 1, rc, err; struct ptlrpc_request *req; ENTRY; LASSERT(exp != NULL); /* If we're replaying this lock, just check some invariants. * If we're creating a new lock, get everything all setup nice. */ if (is_replay) { lock = ldlm_handle2lock(lockh); LASSERT(lock != NULL); LDLM_DEBUG(lock, "client-side enqueue START"); LASSERT(exp == lock->l_conn_export); } else { lock = ldlm_lock_create(ns, res_id, einfo->ei_type, einfo->ei_mode, einfo->ei_cb_bl, einfo->ei_cb_cp, einfo->ei_cb_gl, einfo->ei_cbdata, lvb_len); if (lock == NULL) RETURN(-ENOMEM); /* for the local lock, add the reference */ ldlm_lock_addref_internal(lock, einfo->ei_mode); ldlm_lock2handle(lock, lockh); lock->l_lvb_swabber = lvb_swabber; if (policy != NULL) { /* INODEBITS_INTEROP: If the server does not support * inodebits, we will request a plain lock in the * descriptor (ldlm_lock2desc() below) but use an * inodebits lock internally with both bits set. */ if (einfo->ei_type == LDLM_IBITS && !(exp->exp_connect_flags & OBD_CONNECT_IBITS)) lock->l_policy_data.l_inodebits.bits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE; else lock->l_policy_data = *policy; } if (einfo->ei_type == LDLM_EXTENT) lock->l_req_extent = policy->l_extent; LDLM_DEBUG(lock, "client-side enqueue START"); }
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?