ldlm_lockd.c

来自「lustre 1.6.5 source code」· C语言 代码 · 共 1,721 行 · 第 1/5 页

C
1,721
字号
 * Called with namespace lock held. */int __ldlm_del_waiting_lock(struct ldlm_lock *lock){        struct list_head *list_next;        if (list_empty(&lock->l_pending_chain))                return 0;        list_next = lock->l_pending_chain.next;        if (lock->l_pending_chain.prev == &waiting_locks_list) {                /* Removing the head of the list, adjust timer. */                if (list_next == &waiting_locks_list) {                        /* No more, just cancel. */                        cfs_timer_disarm(&waiting_locks_timer);                } else {                        struct ldlm_lock *next;                        next = list_entry(list_next, struct ldlm_lock,                                          l_pending_chain);                        cfs_timer_arm(&waiting_locks_timer,                                      round_timeout(next->l_callback_timeout));                }        }        list_del_init(&lock->l_pending_chain);        return 1;}int ldlm_del_waiting_lock(struct ldlm_lock *lock){        int ret;        if (lock->l_export == NULL) {                /* We don't have a "waiting locks list" on clients. */                LDLM_DEBUG(lock, "client lock: no-op");                return 0;        }        spin_lock_bh(&waiting_locks_spinlock);        ret = __ldlm_del_waiting_lock(lock);        spin_unlock_bh(&waiting_locks_spinlock);        LDLM_DEBUG(lock, "%s", ret == 0 ? "wasn't waiting" : "removed");        return ret;}/* * Prolong the lock * * Called with namespace lock held. */int ldlm_refresh_waiting_lock(struct ldlm_lock *lock){        if (lock->l_export == NULL) {                /* We don't have a "waiting locks list" on clients. */                LDLM_DEBUG(lock, "client lock: no-op");                return 0;        }        spin_lock_bh(&waiting_locks_spinlock);        if (list_empty(&lock->l_pending_chain)) {                spin_unlock_bh(&waiting_locks_spinlock);                LDLM_DEBUG(lock, "wasn't waiting");                return 0;        }        __ldlm_del_waiting_lock(lock);        __ldlm_add_waiting_lock(lock);        spin_unlock_bh(&waiting_locks_spinlock);        LDLM_DEBUG(lock, "refreshed");        return 1;}#else /* !__KERNEL__ */static int ldlm_add_waiting_lock(struct ldlm_lock *lock){        LASSERT(!(lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK));        RETURN(1);}int ldlm_del_waiting_lock(struct ldlm_lock *lock){        RETURN(0);}int ldlm_refresh_waiting_lock(struct ldlm_lock *lock){        RETURN(0);}#endif /* __KERNEL__ */static void ldlm_failed_ast(struct ldlm_lock *lock, int rc,                            const char *ast_type){        struct ptlrpc_connection *conn = lock->l_export->exp_connection;        char                     *str = libcfs_nid2str(conn->c_peer.nid);        LCONSOLE_ERROR_MSG(0x138, "%s: A client on nid %s was evicted due "                             "to a lock %s callback to %s timed out: rc %d\n",                             lock->l_export->exp_obd->obd_name, str,                             ast_type, obd_export_nid2str(lock->l_export), rc);        if (obd_dump_on_timeout)                libcfs_debug_dumplog();        class_fail_export(lock->l_export);}static int ldlm_handle_ast_error(struct ldlm_lock *lock,                                 struct ptlrpc_request *req, int rc,                                 const char *ast_type){        lnet_process_id_t peer = req->rq_import->imp_connection->c_peer;        if (rc == -ETIMEDOUT || rc == -EINTR || rc == -ENOTCONN) {                LASSERT(lock->l_export);                if (lock->l_export->exp_libclient) {                        LDLM_DEBUG(lock, "%s AST to liblustre client (nid %s)"                                   " timeout, just cancelling lock", ast_type,                                   libcfs_nid2str(peer.nid));                        ldlm_lock_cancel(lock);                        rc = -ERESTART;                } else if (lock->l_flags & LDLM_FL_CANCEL) {                        LDLM_DEBUG(lock, "%s AST timeout from nid %s, but "                                   "cancel was received (AST reply lost?)",                                   ast_type, libcfs_nid2str(peer.nid));                        ldlm_lock_cancel(lock);                        rc = -ERESTART;                } else {                        ldlm_del_waiting_lock(lock);                        ldlm_failed_ast(lock, rc, ast_type);                }        } else if (rc) {                if (rc == -EINVAL)                        LDLM_DEBUG(lock, "client (nid %s) returned %d"                                   " from %s AST - normal race",                                   libcfs_nid2str(peer.nid),                                   lustre_msg_get_status(req->rq_repmsg),                                   ast_type);                else                        LDLM_ERROR(lock, "client (nid %s) returned %d "                                   "from %s AST", libcfs_nid2str(peer.nid),                                   (req->rq_repmsg != NULL) ?                                   lustre_msg_get_status(req->rq_repmsg) : 0,                                   ast_type);                ldlm_lock_cancel(lock);                /* Server-side AST functions are called from ldlm_reprocess_all,                 * which needs to be told to please restart its reprocessing. */                rc = -ERESTART;        }        return rc;}static int ldlm_cb_interpret(struct ptlrpc_request *req, void *data, int rc){        struct ldlm_cb_set_arg *arg;        struct ldlm_lock *lock;        ENTRY;        LASSERT(data != NULL);        arg = req->rq_async_args.pointer_arg[0];        lock = req->rq_async_args.pointer_arg[1];        LASSERT(lock != NULL);        if (rc != 0) {                /* If client canceled the lock but the cancel has not                 * been recieved yet, we need to update lvbo to have the                 * proper attributes cached. */                if (rc == -EINVAL && arg->type == LDLM_BL_CALLBACK)                        ldlm_res_lvbo_update(lock->l_resource, NULL,                                             0, 1);                rc = ldlm_handle_ast_error(lock, req, rc,                                           arg->type == LDLM_BL_CALLBACK                                           ? "blocking" : "completion");        }        LDLM_LOCK_PUT(lock);        if (rc == -ERESTART)                atomic_set(&arg->restart, 1);        RETURN(0);}static inline int ldlm_bl_and_cp_ast_fini(struct ptlrpc_request *req,                                          struct ldlm_cb_set_arg *arg,                                          struct ldlm_lock *lock,                                          int instant_cancel){        int rc = 0;        ENTRY;        if (unlikely(instant_cancel)) {                rc = ptl_send_rpc(req, 1);                ptlrpc_req_finished(req);                if (rc == 0)                        /* If we cancelled the lock, we need to restart                         * ldlm_reprocess_queue */                        atomic_set(&arg->restart, 1);        } else {                LDLM_LOCK_GET(lock);                ptlrpc_set_add_req(arg->set, req);        }        RETURN(rc);}/* * ->l_blocking_ast() method for server-side locks. This is invoked when newly * enqueued server lock conflicts with given one. * * Sends blocking ast rpc to the client owning that lock; arms timeout timer * to wait for client response. */int ldlm_server_blocking_ast(struct ldlm_lock *lock,                             struct ldlm_lock_desc *desc,                             void *data, int flag){        struct ldlm_cb_set_arg *arg = data;        struct ldlm_request *body;        struct ptlrpc_request *req;        int size[] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),                       [DLM_LOCKREQ_OFF]     = sizeof(*body) };        int instant_cancel = 0, rc;        ENTRY;        if (flag == LDLM_CB_CANCELING) {                /* Don't need to do anything here. */                RETURN(0);        }        LASSERT(lock);        LASSERT(data != NULL);        req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,                              LUSTRE_DLM_VERSION, LDLM_BL_CALLBACK, 2, size,                              NULL);        if (req == NULL)                RETURN(-ENOMEM);        req->rq_async_args.pointer_arg[0] = arg;        req->rq_async_args.pointer_arg[1] = lock;        req->rq_interpret_reply = ldlm_cb_interpret;        req->rq_no_resend = 1;        lock_res(lock->l_resource);        if (lock->l_granted_mode != lock->l_req_mode) {                /* this blocking AST will be communicated as part of the                 * completion AST instead */                unlock_res(lock->l_resource);                ptlrpc_req_finished(req);                LDLM_DEBUG(lock, "lock not granted, not sending blocking AST");                RETURN(0);        }        if (lock->l_destroyed) {                /* What's the point? */                unlock_res(lock->l_resource);                ptlrpc_req_finished(req);                RETURN(0);        }#if 0        if (CURRENT_SECONDS - lock->l_export->exp_last_request_time > 30){                unlock_res(lock->l_resource);                ptlrpc_req_finished(req);                ldlm_failed_ast(lock, -ETIMEDOUT, "Not-attempted blocking");                RETURN(-ETIMEDOUT);        }#endif        if (lock->l_flags & LDLM_FL_CANCEL_ON_BLOCK)                instant_cancel = 1;        body = lustre_msg_buf(req->rq_reqmsg, DLM_LOCKREQ_OFF, sizeof(*body));        body->lock_handle[0] = lock->l_remote_handle;        body->lock_desc = *desc;        body->lock_flags |= (lock->l_flags & LDLM_AST_FLAGS);        LDLM_DEBUG(lock, "server preparing blocking AST");        ptlrpc_req_set_repsize(req, 1, NULL);        if (instant_cancel) {                unlock_res(lock->l_resource);                ldlm_lock_cancel(lock);        } else {                LASSERT(lock->l_granted_mode == lock->l_req_mode);                ldlm_add_waiting_lock(lock);                unlock_res(lock->l_resource);        }        req->rq_send_state = LUSTRE_IMP_FULL;        /* ptlrpc_prep_req already set timeout */        if (AT_OFF)                req->rq_timeout = ldlm_get_rq_timeout();        if (lock->l_export && lock->l_export->exp_ldlm_stats)                lprocfs_counter_incr(lock->l_export->exp_ldlm_stats,                                     LDLM_BL_CALLBACK - LDLM_FIRST_OPC);        rc = ldlm_bl_and_cp_ast_fini(req, arg, lock, instant_cancel);        RETURN(rc);}int ldlm_server_completion_ast(struct ldlm_lock *lock, int flags, void *data){        struct ldlm_cb_set_arg *arg = data;        struct ldlm_request *body;        struct ptlrpc_request *req;        struct timeval granted_time;        long total_enqueue_wait;        int size[3] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),                        [DLM_LOCKREQ_OFF]     = sizeof(*body) };        int rc, buffers = 2, instant_cancel = 0;        ENTRY;        LASSERT(lock != NULL);        LASSERT(data != NULL);        do_gettimeofday(&granted_time);        total_enqueue_wait = cfs_timeval_sub(&granted_time,                                             &lock->l_enqueued_time, NULL);        if (total_enqueue_wait / ONE_MILLION > obd_timeout)                /* non-fatal with AT - change to LDLM_DEBUG? */                LDLM_ERROR(lock, "enqueue wait took %luus from %lu",                           total_enqueue_wait, lock->l_enqueued_time.tv_sec);        lock_res_and_lock(lock);        if (lock->l_resource->lr_lvb_len) {                size[DLM_REQ_REC_OFF] = lock->l_resource->lr_lvb_len;                buffers = 3;        }        unlock_res_and_lock(lock);        req = ptlrpc_prep_req(lock->l_export->exp_imp_reverse,                              LUSTRE_DLM_VERSION, LDLM_CP_CALLBACK, buffers,                              size, NULL);        if (req == NULL)                RETURN(-ENOMEM);

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?