ldlm_request.c

来自「lustre 1.6.5 source code」· C语言 代码 · 共 1,663 行 · 第 1/5 页

C
1,663
字号
        /* lock not sent to server yet */        if (reqp == NULL || *reqp == NULL) {                req = ldlm_prep_enqueue_req(exp, 2, size, NULL, 0);                if (req == NULL) {                        failed_lock_cleanup(ns, lock, lockh, einfo->ei_mode);                        LDLM_LOCK_PUT(lock);                        RETURN(-ENOMEM);                }                req_passed_in = 0;                if (reqp)                        *reqp = req;        } else {                req = *reqp;                LASSERTF(lustre_msg_buflen(req->rq_reqmsg, DLM_LOCKREQ_OFF) >=                         sizeof(*body), "buflen[%d] = %d, not %d\n",                         DLM_LOCKREQ_OFF,                         lustre_msg_buflen(req->rq_reqmsg, DLM_LOCKREQ_OFF),                         (int)sizeof(*body));        }        lock->l_conn_export = exp;        lock->l_export = NULL;        lock->l_blocking_ast = einfo->ei_cb_bl;        /* Dump lock data into the request buffer */        body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body));        ldlm_lock2desc(lock, &body->lock_desc);        body->lock_flags = *flags;        body->lock_handle[0] = *lockh;        /* Continue as normal. */        if (!req_passed_in) {                size[DLM_LOCKREPLY_OFF] = sizeof(*reply);                ptlrpc_req_set_repsize(req, 2 + (lvb_len > 0), size);        }        /*         * Liblustre client doesn't get extent locks, except for O_APPEND case         * where [0, OBD_OBJECT_EOF] lock is taken, or truncate, where         * [i_size, OBD_OBJECT_EOF] lock is taken.         */        LASSERT(ergo(LIBLUSTRE_CLIENT, einfo->ei_type != LDLM_EXTENT ||                     policy->l_extent.end == OBD_OBJECT_EOF));        if (async) {                LASSERT(reqp != NULL);                RETURN(0);        }        LDLM_DEBUG(lock, "sending request");        rc = ptlrpc_queue_wait(req);        err = ldlm_cli_enqueue_fini(exp, req, einfo->ei_type, policy ? 1 : 0,                                    einfo->ei_mode, flags, lvb, lvb_len,                                    lvb_swabber, lockh, rc);        /* If ldlm_cli_enqueue_fini did not find the lock, we need to free         * one reference that we took */        if (err == -ENOLCK)                LDLM_LOCK_PUT(lock);        else                rc = err;        if (!req_passed_in && req != NULL) {                ptlrpc_req_finished(req);                if (reqp)                        *reqp = NULL;        }        RETURN(rc);}static int ldlm_cli_convert_local(struct ldlm_lock *lock, int new_mode,                                  __u32 *flags){        struct ldlm_resource *res;        int rc;        ENTRY;        if (ns_is_client(lock->l_resource->lr_namespace)) {                CERROR("Trying to cancel local lock\n");                LBUG();        }        LDLM_DEBUG(lock, "client-side local convert");        res = ldlm_lock_convert(lock, new_mode, flags);        if (res) {                ldlm_reprocess_all(res);                rc = 0;        } else {                rc = EDEADLOCK;        }        LDLM_DEBUG(lock, "client-side local convert handler END");        LDLM_LOCK_PUT(lock);        RETURN(rc);}/* FIXME: one of ldlm_cli_convert or the server side should reject attempted * conversion of locks which are on the waiting or converting queue *//* Caller of this code is supposed to take care of lock readers/writers   accounting */int ldlm_cli_convert(struct lustre_handle *lockh, int new_mode, __u32 *flags){        struct ldlm_request *body;        struct ldlm_reply *reply;        struct ldlm_lock *lock;        struct ldlm_resource *res;        struct ptlrpc_request *req;        int size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),                        [DLM_LOCKREQ_OFF]     = sizeof(*body) };        int rc;        ENTRY;        lock = ldlm_handle2lock(lockh);        if (!lock) {                LBUG();                RETURN(-EINVAL);        }        *flags = 0;        if (lock->l_conn_export == NULL)                RETURN(ldlm_cli_convert_local(lock, new_mode, flags));        LDLM_DEBUG(lock, "client-side convert");        req = ptlrpc_prep_req(class_exp2cliimp(lock->l_conn_export),                              LUSTRE_DLM_VERSION, LDLM_CONVERT, 2, size, NULL);        if (!req)                GOTO(out, rc = -ENOMEM);        body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body));        body->lock_handle[0] = lock->l_remote_handle;        body->lock_desc.l_req_mode = new_mode;        body->lock_flags = *flags;        size[DLM_LOCKREPLY_OFF] = sizeof(*reply);        ptlrpc_req_set_repsize(req, 2, size);        rc = ptlrpc_queue_wait(req);        if (rc != ELDLM_OK)                GOTO(out, rc);        reply = lustre_swab_repbuf(req, DLM_LOCKREPLY_OFF, sizeof(*reply),                                   lustre_swab_ldlm_reply);        if (reply == NULL) {                CERROR ("Can't unpack ldlm_reply\n");                GOTO (out, rc = -EPROTO);        }        if (req->rq_status)                GOTO(out, rc = req->rq_status);        res = ldlm_lock_convert(lock, new_mode, &reply->lock_flags);        if (res != NULL) {                ldlm_reprocess_all(res);                /* Go to sleep until the lock is granted. */                /* FIXME: or cancelled. */                if (lock->l_completion_ast) {                        rc = lock->l_completion_ast(lock, LDLM_FL_WAIT_NOREPROC,                                                    NULL);                        if (rc)                                GOTO(out, rc);                }        } else {                rc = EDEADLOCK;        }        EXIT; out:        LDLM_LOCK_PUT(lock);        ptlrpc_req_finished(req);        return rc;}/* Cancel locks locally. * Returns: * LDLM_FL_LOCAL_ONLY if tere is no need in a CANCEL rpc to the server; * LDLM_FL_CANCELING otherwise; * LDLM_FL_BL_AST if there is a need in a separate CANCEL rpc. */static int ldlm_cli_cancel_local(struct ldlm_lock *lock){        int rc = LDLM_FL_LOCAL_ONLY;        ENTRY;        if (lock->l_conn_export) {                int local_only;                LDLM_DEBUG(lock, "client-side cancel");                /* Set this flag to prevent others from getting new references*/                lock_res_and_lock(lock);                lock->l_flags |= LDLM_FL_CBPENDING;                local_only = (lock->l_flags &                              (LDLM_FL_LOCAL_ONLY|LDLM_FL_CANCEL_ON_BLOCK));                ldlm_cancel_callback(lock);                rc = (lock->l_flags & LDLM_FL_BL_AST) ?                        LDLM_FL_BL_AST : LDLM_FL_CANCELING;                unlock_res_and_lock(lock);                if (local_only) {                        CDEBUG(D_DLMTRACE, "not sending request (at caller's "                               "instruction)\n");                        rc = LDLM_FL_LOCAL_ONLY;                }                ldlm_lock_cancel(lock);        } else {                if (ns_is_client(lock->l_resource->lr_namespace)) {                        LDLM_ERROR(lock, "Trying to cancel local lock");                        LBUG();                }                LDLM_DEBUG(lock, "server-side local cancel");                ldlm_lock_cancel(lock);                ldlm_reprocess_all(lock->l_resource);                LDLM_DEBUG(lock, "server-side local cancel handler END");        }        RETURN(rc);}/* Pack @count locks in @head into ldlm_request buffer at the offset @off,   of the request @req. */static void ldlm_cancel_pack(struct ptlrpc_request *req, int off,                             struct list_head *head, int count){        struct ldlm_request *dlm;        struct ldlm_lock *lock;        int max, packed = 0;        ENTRY;        dlm = lustre_msg_buf(req->rq_reqmsg, off, sizeof(*dlm));        LASSERT(dlm != NULL);        /* Check the room in the request buffer. */        max = lustre_msg_buflen(req->rq_reqmsg, off) -                sizeof(struct ldlm_request);        max /= sizeof(struct lustre_handle);        max += LDLM_LOCKREQ_HANDLES;        LASSERT(max >= dlm->lock_count + count);        /* XXX: it would be better to pack lock handles grouped by resource.         * so that the server cancel would call filter_lvbo_update() less         * frequently. */        list_for_each_entry(lock, head, l_bl_ast) {                if (!count--)                        break;                LASSERT(lock->l_conn_export);                /* Pack the lock handle to the given request buffer. */                LDLM_DEBUG(lock, "packing");                dlm->lock_handle[dlm->lock_count++] = lock->l_remote_handle;                packed++;        }        CDEBUG(D_DLMTRACE, "%d locks packed\n", packed);        EXIT;}/* Prepare and send a batched cancel rpc, it will include count lock handles * of locks given in @head. */int ldlm_cli_cancel_req(struct obd_export *exp,                        struct list_head *cancels, int count){        struct ptlrpc_request *req = NULL;        struct ldlm_request *body;        int size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),                        [DLM_LOCKREQ_OFF]     = sizeof(*body) };        struct obd_import *imp;        int free, sent = 0;        int rc = 0;        ENTRY;        LASSERT(exp != NULL);        LASSERT(count > 0);        OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_PAUSE_CANCEL, obd_fail_val);        if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_RACE))                RETURN(count);        free = ldlm_req_handles_avail(exp, size, 2, 0);        if (count > free)                count = free;        size[DLM_LOCKREQ_OFF] = ldlm_request_bufsize(count, LDLM_CANCEL);        while (1) {                imp = class_exp2cliimp(exp);                if (imp == NULL || imp->imp_invalid) {                        CDEBUG(D_DLMTRACE,                               "skipping cancel on invalid import %p\n", imp);                        RETURN(count);                }                req = ptlrpc_prep_req(imp, LUSTRE_DLM_VERSION, LDLM_CANCEL, 2,                                      size, NULL);                if (!req)                        GOTO(out, rc = -ENOMEM);                req->rq_no_resend = 1;                req->rq_no_delay = 1;                req->rq_request_portal = LDLM_CANCEL_REQUEST_PORTAL;                req->rq_reply_portal = LDLM_CANCEL_REPLY_PORTAL;                ptlrpc_at_set_req_timeout(req);                body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF,                                      sizeof(*body));                ldlm_cancel_pack(req, DLM_LOCKREQ_OFF, cancels, count);                ptlrpc_req_set_repsize(req, 1, NULL);                rc = ptlrpc_queue_wait(req);                if (rc == ESTALE) {                        CDEBUG(D_DLMTRACE, "client/server (nid %s) "                               "out of sync -- not fatal\n",                               libcfs_nid2str(req->rq_import->                                              imp_connection->c_peer.nid));                        rc = 0;                } else if (rc == -ETIMEDOUT && /* check there was no reconnect*/                           req->rq_import_generation == imp->imp_generation) {                        ptlrpc_req_finished(req);                        continue;                } else if (rc != ELDLM_OK) {                        CERROR("Got rc %d from cancel RPC: canceling "                               "anyway\n", rc);                        break;                }                sent = count;                break;        }        ptlrpc_req_finished(req);        EXIT;out:        return sent ? sent : rc;}static inline struct ldlm_pool *ldlm_imp2pl(struct obd_import *imp)

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?