ldlm_request.c

来自「lustre 1.6.5 source code」· C语言 代码 · 共 1,663 行 · 第 1/5 页

C
1,663
字号
}static void failed_lock_cleanup(struct ldlm_namespace *ns,                                struct ldlm_lock *lock,                                struct lustre_handle *lockh, int mode){        /* Set a flag to prevent us from sending a CANCEL (bug 407) */        lock_res_and_lock(lock);        lock->l_flags |= LDLM_FL_LOCAL_ONLY;        unlock_res_and_lock(lock);        LDLM_DEBUG(lock, "setting FL_LOCAL_ONLY");        ldlm_lock_decref_and_cancel(lockh, mode);        /* XXX - HACK because we shouldn't call ldlm_lock_destroy()         *       from llite/file.c/ll_file_flock(). */        if (lock->l_resource->lr_type == LDLM_FLOCK) {                ldlm_lock_destroy(lock);        }}int ldlm_cli_enqueue_fini(struct obd_export *exp, struct ptlrpc_request *req,                          ldlm_type_t type, __u8 with_policy, ldlm_mode_t mode,                          int *flags, void *lvb, __u32 lvb_len,                          void *lvb_swabber, struct lustre_handle *lockh,int rc){        struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;        int is_replay = *flags & LDLM_FL_REPLAY;        struct ldlm_lock *lock;        struct ldlm_reply *reply;        int cleanup_phase = 1;        ENTRY;        lock = ldlm_handle2lock(lockh);        /* ldlm_cli_enqueue is holding a reference on this lock. */        if (!lock) {                LASSERT(type == LDLM_FLOCK);                RETURN(-ENOLCK);        }        if (rc != ELDLM_OK) {                LASSERT(!is_replay);                LDLM_DEBUG(lock, "client-side enqueue END (%s)",                           rc == ELDLM_LOCK_ABORTED ? "ABORTED" : "FAILED");                if (rc == ELDLM_LOCK_ABORTED) {                        /* Before we return, swab the reply */                        reply = lustre_swab_repbuf(req, DLM_LOCKREPLY_OFF,                                                   sizeof(*reply),                                                   lustre_swab_ldlm_reply);                        if (reply == NULL) {                                CERROR("Can't unpack ldlm_reply\n");                                rc = -EPROTO;                        }                        if (lvb_len) {                                void *tmplvb;                                tmplvb = lustre_swab_repbuf(req,                                                            DLM_REPLY_REC_OFF,                                                            lvb_len,                                                            lvb_swabber);                                if (tmplvb == NULL)                                        GOTO(cleanup, rc = -EPROTO);                                if (lvb != NULL)                                        memcpy(lvb, tmplvb, lvb_len);                        }                }                GOTO(cleanup, rc);        }        reply = lustre_swab_repbuf(req, DLM_LOCKREPLY_OFF, sizeof(*reply),                                   lustre_swab_ldlm_reply);        if (reply == NULL) {                CERROR("Can't unpack ldlm_reply\n");                GOTO(cleanup, rc = -EPROTO);        }        /* lock enqueued on the server */        cleanup_phase = 0;        lock_res_and_lock(lock);        lock->l_remote_handle = reply->lock_handle;        *flags = reply->lock_flags;        lock->l_flags |= reply->lock_flags & LDLM_INHERIT_FLAGS;        /* move NO_TIMEOUT flag to the lock to force ldlm_lock_match()         * to wait with no timeout as well */        lock->l_flags |= reply->lock_flags & LDLM_FL_NO_TIMEOUT;        unlock_res_and_lock(lock);        CDEBUG(D_INFO, "local: %p, remote cookie: "LPX64", flags: 0x%x\n",               lock, reply->lock_handle.cookie, *flags);        /* If enqueue returned a blocked lock but the completion handler has         * already run, then it fixed up the resource and we don't need to do it         * again. */        if ((*flags) & LDLM_FL_LOCK_CHANGED) {                int newmode = reply->lock_desc.l_req_mode;                LASSERT(!is_replay);                if (newmode && newmode != lock->l_req_mode) {                        LDLM_DEBUG(lock, "server returned different mode %s",                                   ldlm_lockname[newmode]);                        lock->l_req_mode = newmode;                }                if (reply->lock_desc.l_resource.lr_name.name[0] !=                    lock->l_resource->lr_name.name[0]) {                        CDEBUG(D_INFO, "remote intent success, locking %ld "                               "instead of %ld\n",                              (long)reply->lock_desc.l_resource.lr_name.name[0],                               (long)lock->l_resource->lr_name.name[0]);                        ldlm_lock_change_resource(ns, lock,                                           reply->lock_desc.l_resource.lr_name);                        if (lock->l_resource == NULL) {                                LBUG();                                GOTO(cleanup, rc = -ENOMEM);                        }                        LDLM_DEBUG(lock, "client-side enqueue, new resource");                }                if (with_policy)                        if (!(type == LDLM_IBITS && !(exp->exp_connect_flags &                                                    OBD_CONNECT_IBITS)))                                lock->l_policy_data =                                                 reply->lock_desc.l_policy_data;                if (type != LDLM_PLAIN)                        LDLM_DEBUG(lock,"client-side enqueue, new policy data");        }        if ((*flags) & LDLM_FL_AST_SENT ||            /* Cancel extent locks as soon as possible on a liblustre client,             * because it cannot handle asynchronous ASTs robustly (see             * bug 7311). */            (LIBLUSTRE_CLIENT && type == LDLM_EXTENT)) {                lock_res_and_lock(lock);                lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST;                unlock_res_and_lock(lock);                LDLM_DEBUG(lock, "enqueue reply includes blocking AST");        }        /* If the lock has already been granted by a completion AST, don't         * clobber the LVB with an older one. */        if (lvb_len && (lock->l_req_mode != lock->l_granted_mode)) {                void *tmplvb;                tmplvb = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF, lvb_len,                                            lvb_swabber);                if (tmplvb == NULL)                        GOTO(cleanup, rc = -EPROTO);                memcpy(lock->l_lvb_data, tmplvb, lvb_len);        }        if (!is_replay) {                rc = ldlm_lock_enqueue(ns, &lock, NULL, flags);                if (lock->l_completion_ast != NULL) {                        int err = lock->l_completion_ast(lock, *flags, NULL);                        if (!rc)                                rc = err;                        if (rc && type != LDLM_FLOCK) /* bug 9425, bug 10250 */                                cleanup_phase = 1;                }        }        if (lvb_len && lvb != NULL) {                /* Copy the LVB here, and not earlier, because the completion                 * AST (if any) can override what we got in the reply */                memcpy(lvb, lock->l_lvb_data, lvb_len);        }        LDLM_DEBUG(lock, "client-side enqueue END");        EXIT;cleanup:        if (cleanup_phase == 1 && rc)                failed_lock_cleanup(ns, lock, lockh, mode);        /* Put lock 2 times, the second reference is held by ldlm_cli_enqueue */        LDLM_LOCK_PUT(lock);        LDLM_LOCK_PUT(lock);        return rc;}/* PAGE_SIZE-512 is to allow TCP/IP and LNET headers to fit into * a single page on the send/receive side. XXX: 512 should be changed * to more adequate value. */static inline int ldlm_req_handles_avail(struct obd_export *exp,                                         int *size, int bufcount, int off){        int avail = min_t(int, LDLM_MAXREQSIZE, PAGE_SIZE - 512);        avail -= lustre_msg_size(class_exp2cliimp(exp)->imp_msg_magic,                                 bufcount, size);        avail /= sizeof(struct lustre_handle);        avail += LDLM_LOCKREQ_HANDLES - off;        return avail;}static inline int ldlm_cancel_handles_avail(struct obd_export *exp){        int size[2] = { sizeof(struct ptlrpc_body),                        sizeof(struct ldlm_request) };        return ldlm_req_handles_avail(exp, size, 2, 0);}/* Cancel lru locks and pack them into the enqueue request. Pack there the given * @count locks in @cancels. */struct ptlrpc_request *ldlm_prep_elc_req(struct obd_export *exp, int version,                                         int opc, int bufcount, int *size,                                         int bufoff, int canceloff,                                         struct list_head *cancels, int count){        struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;        int flags, avail, to_free, pack = 0;        struct ldlm_request *dlm = NULL;        struct ptlrpc_request *req;        CFS_LIST_HEAD(head);        ENTRY;        if (cancels == NULL)                cancels = &head;        if (exp_connect_cancelset(exp)) {                /* Estimate the amount of free space in the request. */                LASSERT(bufoff < bufcount);                avail = ldlm_req_handles_avail(exp, size, bufcount, canceloff);                flags = ns_connect_lru_resize(ns) ?                         LDLM_CANCEL_LRUR : LDLM_CANCEL_AGED;                to_free = !ns_connect_lru_resize(ns) &&                          opc == LDLM_ENQUEUE ? 1 : 0;                /* Cancel lru locks here _only_ if the server supports                  * EARLY_CANCEL. Otherwise we have to send extra CANCEL                 * rpc, what will make us slower. */                if (avail > count)                        count += ldlm_cancel_lru_local(ns, cancels, to_free,                                                       avail - count, 0, flags);                if (avail > count)                        pack = count;                else                        pack = avail;                size[bufoff] = ldlm_request_bufsize(pack, opc);        }        req = ptlrpc_prep_req(class_exp2cliimp(exp), version,                              opc, bufcount, size, NULL);        if (exp_connect_cancelset(exp) && req) {                if (canceloff) {                        dlm = lustre_msg_buf(req->rq_reqmsg, bufoff,                                             sizeof(*dlm));                        /* Skip first lock handler in ldlm_request_pack(),                         * this method will incrment @lock_count according                         * to the lock handle amount actually written to                         * the buffer. */                        dlm->lock_count = canceloff;                }                /* Pack into the request @pack lock handles. */                ldlm_cli_cancel_list(cancels, pack, req, bufoff);                /* Prepare and send separate cancel rpc for others. */                ldlm_cli_cancel_list(cancels, count - pack, NULL, 0);        } else {                ldlm_lock_list_put(cancels, l_bl_ast, count);        }        RETURN(req);}struct ptlrpc_request *ldlm_prep_enqueue_req(struct obd_export *exp,                                             int bufcount, int *size,                                             struct list_head *cancels,                                             int count){        return ldlm_prep_elc_req(exp, LUSTRE_DLM_VERSION, LDLM_ENQUEUE,                                 bufcount, size, DLM_LOCKREQ_OFF,                                 LDLM_ENQUEUE_CANCEL_OFF, cancels, count);}/* If a request has some specific initialisation it is passed in @reqp, * otherwise it is created in ldlm_cli_enqueue. * * Supports sync and async requests, pass @async flag accordingly. If a * request was created in ldlm_cli_enqueue and it is the async request, * pass it to the caller in @reqp. */int ldlm_cli_enqueue(struct obd_export *exp, struct ptlrpc_request **reqp,                     struct ldlm_enqueue_info *einfo, struct ldlm_res_id res_id,                     ldlm_policy_data_t *policy, int *flags,                     void *lvb, __u32 lvb_len, void *lvb_swabber,                     struct lustre_handle *lockh, int async){        struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;        struct ldlm_lock *lock;        struct ldlm_request *body;        struct ldlm_reply *reply;        int size[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),                        [DLM_LOCKREQ_OFF]     = sizeof(*body),                        [DLM_REPLY_REC_OFF]   = lvb_len };        int is_replay = *flags & LDLM_FL_REPLAY;        int req_passed_in = 1, rc, err;        struct ptlrpc_request *req;        ENTRY;        LASSERT(exp != NULL);        /* If we're replaying this lock, just check some invariants.         * If we're creating a new lock, get everything all setup nice. */        if (is_replay) {                lock = ldlm_handle2lock(lockh);                LASSERT(lock != NULL);                LDLM_DEBUG(lock, "client-side enqueue START");                LASSERT(exp == lock->l_conn_export);        } else {                lock = ldlm_lock_create(ns, res_id, einfo->ei_type,                                        einfo->ei_mode, einfo->ei_cb_bl,                                        einfo->ei_cb_cp, einfo->ei_cb_gl,                                        einfo->ei_cbdata, lvb_len);                if (lock == NULL)                        RETURN(-ENOMEM);                /* for the local lock, add the reference */                ldlm_lock_addref_internal(lock, einfo->ei_mode);                ldlm_lock2handle(lock, lockh);                lock->l_lvb_swabber = lvb_swabber;                if (policy != NULL) {                        /* INODEBITS_INTEROP: If the server does not support                         * inodebits, we will request a plain lock in the                         * descriptor (ldlm_lock2desc() below) but use an                         * inodebits lock internally with both bits set.                         */                        if (einfo->ei_type == LDLM_IBITS &&                            !(exp->exp_connect_flags & OBD_CONNECT_IBITS))                                lock->l_policy_data.l_inodebits.bits =                                        MDS_INODELOCK_LOOKUP |                                        MDS_INODELOCK_UPDATE;                        else                                lock->l_policy_data = *policy;                }                if (einfo->ei_type == LDLM_EXTENT)                        lock->l_req_extent = policy->l_extent;                LDLM_DEBUG(lock, "client-side enqueue START");        }

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?