ldlm_lockd.c

来自「lustre 1.6.5 source code」· C语言 代码 · 共 1,721 行 · 第 1/5 页

C
1,721
字号
        /* Don't move a pending lock onto the export if it has already         * been evicted.  Cancel it now instead. (bug 5683) */        if (req->rq_export->exp_failed ||            OBD_FAIL_CHECK_ONCE(OBD_FAIL_LDLM_ENQUEUE_OLD_EXPORT)) {                LDLM_ERROR(lock, "lock on destroyed export %p", req->rq_export);                rc = -ENOTCONN;        } else if (lock->l_flags & LDLM_FL_AST_SENT) {                dlm_rep->lock_flags |= LDLM_FL_AST_SENT;                if (lock->l_granted_mode == lock->l_req_mode) {                        /* Only cancel lock if it was granted, because it                         * would be destroyed immediatelly and would never                         * be granted in the future, causing timeouts on client.                         * Not granted lock will be cancelled immediatelly after                         * sending completion AST.                         */                        if (dlm_rep->lock_flags & LDLM_FL_CANCEL_ON_BLOCK) {                                unlock_res_and_lock(lock);                                ldlm_lock_cancel(lock);                                lock_res_and_lock(lock);                        } else                                ldlm_add_waiting_lock(lock);                }        }        /* Make sure we never ever grant usual metadata locks to liblustre           clients */        if ((dlm_req->lock_desc.l_resource.lr_type == LDLM_PLAIN ||            dlm_req->lock_desc.l_resource.lr_type == LDLM_IBITS) &&             req->rq_export->exp_libclient) {                if (!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK) ||                    !(dlm_rep->lock_flags & LDLM_FL_CANCEL_ON_BLOCK)) {                        CERROR("Granting sync lock to libclient. "                               "req fl %d, rep fl %d, lock fl %d\n",                               dlm_req->lock_flags, dlm_rep->lock_flags,                               lock->l_flags);                        LDLM_ERROR(lock, "sync lock");                        if (dlm_req->lock_flags & LDLM_FL_HAS_INTENT) {                                struct ldlm_intent *it;                                it = lustre_msg_buf(req->rq_reqmsg,                                                    DLM_INTENT_IT_OFF,                                                    sizeof(*it));                                if (it != NULL) {                                        CERROR("This is intent %s ("LPU64")\n",                                               ldlm_it2str(it->opc), it->opc);                                }                        }                }        }        unlock_res_and_lock(lock);        EXIT; out:        req->rq_status = rc ?: err;  /* return either error - bug 11190 */        if (!req->rq_packed_final) {                err = lustre_pack_reply(req, 1, NULL, NULL);                if (rc == 0)                        rc = err;        }        /* The LOCK_CHANGED code in ldlm_lock_enqueue depends on this         * ldlm_reprocess_all.  If this moves, revisit that code. -phil */        if (lock) {                LDLM_DEBUG(lock, "server-side enqueue handler, sending reply"                           "(err=%d, rc=%d)", err, rc);                if (rc == 0) {                        lock_res_and_lock(lock);                        size[DLM_REPLY_REC_OFF] = lock->l_resource->lr_lvb_len;                        if (size[DLM_REPLY_REC_OFF] > 0) {                                void *lvb = lustre_msg_buf(req->rq_repmsg,                                                       DLM_REPLY_REC_OFF,                                                       size[DLM_REPLY_REC_OFF]);                                LASSERTF(lvb != NULL, "req %p, lock %p\n",                                         req, lock);                                memcpy(lvb, lock->l_resource->lr_lvb_data,                                       size[DLM_REPLY_REC_OFF]);                        }                        unlock_res_and_lock(lock);                } else {                        lock_res_and_lock(lock);                        ldlm_resource_unlink_lock(lock);                        ldlm_lock_destroy_nolock(lock);                        unlock_res_and_lock(lock);                }                if (!err && dlm_req->lock_desc.l_resource.lr_type != LDLM_FLOCK)                        ldlm_reprocess_all(lock->l_resource);                LDLM_LOCK_PUT(lock);        }        LDLM_DEBUG_NOLOCK("server-side enqueue handler END (lock %p, rc %d)",                          lock, rc);        return rc;}int ldlm_handle_convert(struct ptlrpc_request *req){        struct ldlm_request *dlm_req;        struct ldlm_reply *dlm_rep;        struct ldlm_lock *lock;        int rc;        int size[2] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),                        [DLM_LOCKREPLY_OFF]   = sizeof(*dlm_rep) };        ENTRY;        dlm_req = lustre_swab_reqbuf(req, DLM_LOCKREQ_OFF, sizeof(*dlm_req),                                     lustre_swab_ldlm_request);        if (dlm_req == NULL) {                CERROR ("Can't unpack dlm_req\n");                RETURN (-EFAULT);        }        if (req->rq_export && req->rq_export->exp_ldlm_stats)                lprocfs_counter_incr(req->rq_export->exp_ldlm_stats,                                     LDLM_CONVERT - LDLM_FIRST_OPC);        rc = lustre_pack_reply(req, 2, size, NULL);        if (rc)                RETURN(rc);        dlm_rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,                                 sizeof(*dlm_rep));        dlm_rep->lock_flags = dlm_req->lock_flags;        lock = ldlm_handle2lock(&dlm_req->lock_handle[0]);        if (!lock) {                req->rq_status = EINVAL;        } else {                void *res = NULL;                LDLM_DEBUG(lock, "server-side convert handler START");                do_gettimeofday(&lock->l_enqueued_time);                res = ldlm_lock_convert(lock, dlm_req->lock_desc.l_req_mode,                                        &dlm_rep->lock_flags);                if (res) {                        if (ldlm_del_waiting_lock(lock))                                LDLM_DEBUG(lock, "converted waiting lock");                        req->rq_status = 0;                } else {                        req->rq_status = EDEADLOCK;                }        }        if (lock) {                if (!req->rq_status)                        ldlm_reprocess_all(lock->l_resource);                LDLM_DEBUG(lock, "server-side convert handler END");                LDLM_LOCK_PUT(lock);        } else                LDLM_DEBUG_NOLOCK("server-side convert handler END");        RETURN(0);}/* Cancel all the locks whos handles are packed into ldlm_request */int ldlm_request_cancel(struct ptlrpc_request *req,                        struct ldlm_request *dlm_req, int first){        struct ldlm_resource *res, *pres = NULL;        struct ldlm_lock *lock;        int i, count, done = 0;        ENTRY;        count = dlm_req->lock_count ? dlm_req->lock_count : 1;        if (first >= count)                RETURN(0);        /* There is no lock on the server at the replay time,         * skip lock cancelling to make replay tests to pass. */        if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY)                RETURN(0);        LDLM_DEBUG_NOLOCK("server-side cancel handler START: %d locks",                          count - first);        for (i = first; i < count; i++) {                lock = ldlm_handle2lock(&dlm_req->lock_handle[i]);                if (!lock) {                        LDLM_DEBUG_NOLOCK("server-side cancel handler stale "                                          "lock (cookie "LPU64")",                                          dlm_req->lock_handle[i].cookie);                        continue;                }                done++;                res = lock->l_resource;                if (res != pres) {                        if (pres != NULL) {                                ldlm_reprocess_all(pres);                                ldlm_resource_putref(pres);                        }                        if (res != NULL) {                                ldlm_resource_getref(res);                                ldlm_res_lvbo_update(res, NULL, 0, 1);                        }                        pres = res;                }                ldlm_lock_cancel(lock);                LDLM_LOCK_PUT(lock);        }        if (pres != NULL) {                ldlm_reprocess_all(pres);                ldlm_resource_putref(pres);        }        LDLM_DEBUG_NOLOCK("server-side cancel handler END");        RETURN(done);}int ldlm_handle_cancel(struct ptlrpc_request *req){        struct ldlm_request *dlm_req;        int rc;        ENTRY;        dlm_req = lustre_swab_reqbuf(req, DLM_LOCKREQ_OFF, sizeof(*dlm_req),                                     lustre_swab_ldlm_request);        if (dlm_req == NULL) {                CERROR("bad request buffer for cancel\n");                RETURN(-EFAULT);        }        if (req->rq_export && req->rq_export->exp_ldlm_stats)                lprocfs_counter_incr(req->rq_export->exp_ldlm_stats,                                     LDLM_CANCEL - LDLM_FIRST_OPC);        rc = lustre_pack_reply(req, 1, NULL, NULL);        if (rc)                RETURN(rc);        if (!ldlm_request_cancel(req, dlm_req, 0))                req->rq_status = ESTALE;        if (ptlrpc_reply(req) != 0)                LBUG();        RETURN(0);}void ldlm_handle_bl_callback(struct ldlm_namespace *ns,                             struct ldlm_lock_desc *ld, struct ldlm_lock *lock){        int do_ast;        ENTRY;        LDLM_DEBUG(lock, "client blocking AST callback handler START");        lock_res_and_lock(lock);        lock->l_flags |= LDLM_FL_CBPENDING;        if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)                lock->l_flags |= LDLM_FL_CANCEL;        do_ast = (!lock->l_readers && !lock->l_writers);        unlock_res_and_lock(lock);        if (do_ast) {                LDLM_DEBUG(lock, "already unused, calling "                           "callback (%p)", lock->l_blocking_ast);                if (lock->l_blocking_ast != NULL)                        lock->l_blocking_ast(lock, ld, lock->l_ast_data,                                             LDLM_CB_BLOCKING);        } else {                LDLM_DEBUG(lock, "Lock still has references, will be"                           " cancelled later");        }        LDLM_DEBUG(lock, "client blocking callback handler END");        LDLM_LOCK_PUT(lock);        EXIT;}static void ldlm_handle_cp_callback(struct ptlrpc_request *req,                                    struct ldlm_namespace *ns,                                    struct ldlm_request *dlm_req,                                    struct ldlm_lock *lock){        CFS_LIST_HEAD(ast_list);        ENTRY;        LDLM_DEBUG(lock, "client completion callback handler START");        if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_BL_CB_RACE)) {                int to = cfs_time_seconds(1);                while (to > 0) {                        to = schedule_timeout(to);                        if (lock->l_granted_mode == lock->l_req_mode ||                            lock->l_destroyed)                                break;                }        }        lock_res_and_lock(lock);        if (lock->l_destroyed ||            lock->l_granted_mode == lock->l_req_mode) {                /* bug 11300: the lock has already been granted */                unlock_res_and_lock(lock);                LDLM_DEBUG(lock, "Double grant race happened");                LDLM_LOCK_PUT(lock);                EXIT;                return;        }        /* If we receive the completion AST before the actual enqueue returned,         * then we might need to switch lock modes, resources, or extents. */        if (dlm_req->lock_desc.l_granted_mode != lock->l_req_mode) {                lock->l_req_mode = dlm_req->lock_desc.l_granted_mode;                LDLM_DEBUG(lock, "completion AST, new lock mode");        }        if (lock->l_resource->lr_type != LDLM_PLAIN) {                lock->l_policy_data = dlm_req->lock_desc.l_policy_data;                LDLM_DEBUG(lock, "completion AST, new policy data");        }        ldlm_resource_unlink_lock(lock);        if (memcmp(&dlm_req->lock_desc.l_resource.lr_name,                   &lock->l_resource->lr_name,                   sizeof(lock->l_resource->lr_name)) != 0) {                unlock_res_and_lock(lock);                ldlm_lock_change_resource(ns, lock,                                         dlm_req->lock_desc.l_resource.lr_name);                LDLM_DEBUG(lock, "completion AST, new resource");                CERROR("change resource!\n");                lock_res_and_lock(lock);        }        if (dlm_req->lock_flags & LDLM_FL_AST_SENT) {                /* BL_AST locks are not needed in lru.                 * let ldlm_cancel_lru() be fast. */                ldlm_lock_remove_from_lru(lock);                lock->l_flags |= LDLM_FL_CBPENDING | LDLM_FL_BL_AST;                LDLM_DEBUG(lock, "completion AST includes blocking AST");        }        if (lock->l_lvb_len) {                void *lvb;                lvb = lustre_swab_reqbuf(req, DLM_REQ_REC_OFF, lock->l_lvb_len,                                         lock->l_lvb_swabber);                if (lvb == NULL) {                        LDLM_ERROR(lock, "completion AST did not contain "                                   "expected LVB!");

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?