ldlm_lock.c

来自「lustre 1.6.5 source code」· C语言 代码 · 共 1,767 行 · 第 1/5 页

C
1,767
字号
                                                          LDLM_FL_WAIT_NOREPROC,                                                                 NULL);                                if (err) {                                        if (flags & LDLM_FL_TEST_LOCK)                                                LDLM_LOCK_PUT(lock);                                        else                                                ldlm_lock_decref_internal(lock, mode);                                        rc = 0;                                        goto out2;                                }                        }                        lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(obd_timeout), NULL,                                               LWI_ON_SIGNAL_NOOP, NULL);                        /* XXX FIXME see comment on CAN_MATCH in lustre_dlm.h */                        l_wait_event(lock->l_waitq,                                     (lock->l_flags & LDLM_FL_LVB_READY), &lwi);                }        } out2:        if (rc) {                LDLM_DEBUG(lock, "matched ("LPU64" "LPU64")",                           (type == LDLM_PLAIN || type == LDLM_IBITS) ?                                res_id->name[2] : policy->l_extent.start,                           (type == LDLM_PLAIN || type == LDLM_IBITS) ?                                res_id->name[3] : policy->l_extent.end);        } else if (!(flags & LDLM_FL_TEST_LOCK)) {/*less verbose for test-only*/                LDLM_DEBUG_NOLOCK("not matched ns %p type %u mode %u res "                                  LPU64"/"LPU64" ("LPU64" "LPU64")", ns,                                  type, mode, res_id->name[0], res_id->name[1],                                  (type == LDLM_PLAIN || type == LDLM_IBITS) ?                                        res_id->name[2] :policy->l_extent.start,                                (type == LDLM_PLAIN || type == LDLM_IBITS) ?                                        res_id->name[3] : policy->l_extent.end);        }        if (old_lock)                LDLM_LOCK_PUT(old_lock);        if (flags & LDLM_FL_TEST_LOCK && rc)                LDLM_LOCK_PUT(lock);        return rc ? mode : 0;}/* Returns a referenced lock */struct ldlm_lock *ldlm_lock_create(struct ldlm_namespace *ns,                                   struct ldlm_res_id res_id, ldlm_type_t type,                                   ldlm_mode_t mode,                                   ldlm_blocking_callback blocking,                                   ldlm_completion_callback completion,                                   ldlm_glimpse_callback glimpse,                                   void *data, __u32 lvb_len){        struct ldlm_lock *lock;        struct ldlm_resource *res;        ENTRY;        res = ldlm_resource_get(ns, NULL, res_id, type, 1);        if (res == NULL)                RETURN(NULL);        lock = ldlm_lock_new(res);        ldlm_resource_putref(res);        if (lock == NULL)                RETURN(NULL);        lock->l_req_mode = mode;        lock->l_ast_data = data;        lock->l_blocking_ast = blocking;        lock->l_completion_ast = completion;        lock->l_glimpse_ast = glimpse;        lock->l_pid = cfs_curproc_pid();        lock->l_tree_node = NULL;        /* if this is the extent lock, allocate the interval tree node */        if (type == LDLM_EXTENT) {                if (ldlm_interval_alloc(lock) == NULL)                        GOTO(out, 0);        }        if (lvb_len) {                lock->l_lvb_len = lvb_len;                OBD_ALLOC(lock->l_lvb_data, lvb_len);                if (lock->l_lvb_data == NULL)                        GOTO(out, 0);        }        RETURN(lock);out:        if (lock->l_lvb_data)                OBD_FREE(lock->l_lvb_data, lvb_len);        ldlm_interval_free(ldlm_interval_detach(lock));        OBD_SLAB_FREE(lock, ldlm_lock_slab, sizeof(*lock));        return NULL;}ldlm_error_t ldlm_lock_enqueue(struct ldlm_namespace *ns,                               struct ldlm_lock **lockp,                               void *cookie, int *flags){        struct ldlm_lock *lock = *lockp;        struct ldlm_resource *res = lock->l_resource;        int local = ns_is_client(res->lr_namespace);        ldlm_processing_policy policy;        ldlm_error_t rc = ELDLM_OK;        struct ldlm_interval *node = NULL;        ENTRY;        do_gettimeofday(&lock->l_enqueued_time);        /* policies are not executed on the client or during replay */        if ((*flags & (LDLM_FL_HAS_INTENT|LDLM_FL_REPLAY)) == LDLM_FL_HAS_INTENT            && !local && ns->ns_policy) {                rc = ns->ns_policy(ns, lockp, cookie, lock->l_req_mode, *flags,                                   NULL);                if (rc == ELDLM_LOCK_REPLACED) {                        /* The lock that was returned has already been granted,                         * and placed into lockp.  If it's not the same as the                         * one we passed in, then destroy the old one and our                         * work here is done. */                        if (lock != *lockp) {                                ldlm_lock_destroy(lock);                                LDLM_LOCK_PUT(lock);                        }                        *flags |= LDLM_FL_LOCK_CHANGED;                        RETURN(0);                } else if (rc != ELDLM_OK ||                           (rc == ELDLM_OK && (*flags & LDLM_FL_INTENT_ONLY))) {                        ldlm_lock_destroy(lock);                        RETURN(rc);                }        }        /* For a replaying lock, it might be already in granted list. So         * unlinking the lock will cause the interval node to be freed, we         * have to allocate the interval node early otherwise we can't regrant         * this lock in the future. - jay */        if (!local && (*flags & LDLM_FL_REPLAY) && res->lr_type == LDLM_EXTENT)                OBD_SLAB_ALLOC(node, ldlm_interval_slab, CFS_ALLOC_IO,                               sizeof(*node));        lock_res_and_lock(lock);        if (local && lock->l_req_mode == lock->l_granted_mode) {                /* The server returned a blocked lock, but it was granted                 * before we got a chance to actually enqueue it.  We don't                 * need to do anything else. */                *flags &= ~(LDLM_FL_BLOCK_GRANTED |                            LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_WAIT);                GOTO(out, ELDLM_OK);        }        ldlm_resource_unlink_lock(lock);        if (res->lr_type == LDLM_EXTENT && lock->l_tree_node == NULL) {                if (node == NULL) {                        ldlm_lock_destroy_nolock(lock);                        GOTO(out, rc = -ENOMEM);                }                CFS_INIT_LIST_HEAD(&node->li_group);                ldlm_interval_attach(node, lock);                node = NULL;        }        /* Some flags from the enqueue want to make it into the AST, via the         * lock's l_flags. */        lock->l_flags |= *flags & LDLM_AST_DISCARD_DATA;        /* This distinction between local lock trees is very important; a client         * namespace only has information about locks taken by that client, and         * thus doesn't have enough information to decide for itself if it can         * be granted (below).  In this case, we do exactly what the server         * tells us to do, as dictated by the 'flags'.         *         * We do exactly the same thing during recovery, when the server is         * more or less trusting the clients not to lie.         *         * FIXME (bug 268): Detect obvious lies by checking compatibility in         * granted/converting queues. */        if (local) {                if (*flags & LDLM_FL_BLOCK_CONV)                        ldlm_resource_add_lock(res, &res->lr_converting, lock);                else if (*flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED))                        ldlm_resource_add_lock(res, &res->lr_waiting, lock);                else                        ldlm_grant_lock(lock, NULL);                GOTO(out, ELDLM_OK);        } else if (*flags & LDLM_FL_REPLAY) {                if (*flags & LDLM_FL_BLOCK_CONV) {                        ldlm_resource_add_lock(res, &res->lr_converting, lock);                        GOTO(out, ELDLM_OK);                } else if (*flags & LDLM_FL_BLOCK_WAIT) {                        ldlm_resource_add_lock(res, &res->lr_waiting, lock);                        GOTO(out, ELDLM_OK);                } else if (*flags & LDLM_FL_BLOCK_GRANTED) {                        ldlm_grant_lock(lock, NULL);                        GOTO(out, ELDLM_OK);                }                /* If no flags, fall through to normal enqueue path. */        }        policy = ldlm_processing_policy_table[res->lr_type];        policy(lock, flags, 1, &rc, NULL);        GOTO(out, rc);out:        unlock_res_and_lock(lock);        if (node)                OBD_SLAB_FREE(node, ldlm_interval_slab, sizeof(*node));        return rc;}/* Must be called with namespace taken: queue is waiting or converting. */int ldlm_reprocess_queue(struct ldlm_resource *res, struct list_head *queue,                         struct list_head *work_list){        struct list_head *tmp, *pos;        ldlm_processing_policy policy;        int flags;        int rc = LDLM_ITER_CONTINUE;        ldlm_error_t err;        ENTRY;        check_res_locked(res);        policy = ldlm_processing_policy_table[res->lr_type];        LASSERT(policy);        list_for_each_safe(tmp, pos, queue) {                struct ldlm_lock *pending;                pending = list_entry(tmp, struct ldlm_lock, l_res_link);                CDEBUG(D_INFO, "Reprocessing lock %p\n", pending);                flags = 0;                rc = policy(pending, &flags, 0, &err, work_list);                if (rc != LDLM_ITER_CONTINUE)                        break;        }        RETURN(rc);}/* Helper function for pair ldlm_run_{bl,cp}_ast_work(). *  * Send an existing rpc set specified by @arg->set and then * destroy it. Create new one if @do_create flag is set. */static voidldlm_send_and_maybe_create_set(struct ldlm_cb_set_arg *arg, int do_create){        int rc;        rc = ptlrpc_set_wait(arg->set);        if (arg->type == LDLM_BL_CALLBACK)                OBD_FAIL_TIMEOUT(OBD_FAIL_LDLM_GLIMPSE, 2);        ptlrpc_set_destroy(arg->set);        if (do_create)                arg->set = ptlrpc_prep_set();}int ldlm_run_bl_ast_work(struct list_head *rpc_list){        struct ldlm_cb_set_arg arg;        struct list_head *tmp, *pos;        struct ldlm_lock_desc d;        int ast_count;        int rc = 0;        ENTRY;        arg.set = ptlrpc_prep_set();        atomic_set(&arg.restart, 0);        arg.type = LDLM_BL_CALLBACK;        ast_count = 0;        list_for_each_safe(tmp, pos, rpc_list) {                struct ldlm_lock *lock =                        list_entry(tmp, struct ldlm_lock, l_bl_ast);                /* nobody should touch l_bl_ast */                lock_res_and_lock(lock);                list_del_init(&lock->l_bl_ast);                LASSERT(lock->l_flags & LDLM_FL_AST_SENT);                LASSERT(lock->l_bl_ast_run == 0);                LASSERT(lock->l_blocking_lock);                lock->l_bl_ast_run++;                unlock_res_and_lock(lock);                ldlm_lock2desc(lock->l_blocking_lock, &d);                LDLM_LOCK_PUT(lock->l_blocking_lock);                lock->l_blocking_lock = NULL;                rc = lock->l_blocking_ast(lock, &d, (void *)&arg,                                           LDLM_CB_BLOCKING);                LDLM_LOCK_PUT(lock);                ast_count++;                /* Send the request set if it exceeds the PARALLEL_AST_LIMIT,                 * and create a new set for requests that remained in                 * @rpc_list */                if (unlikely(ast_count == PARALLEL_AST_LIMIT)) {                        ldlm_send_and_maybe_create_set(&arg, 1);                        ast_count = 0;                }        }        if (ast_count > 0)                ldlm_send_and_maybe_create_set(&arg, 0);        else                /* In case when number of ASTs is multiply of                 * PARALLEL_AST_LIMIT or @rpc_list was initially empty,                 * @arg.set must be destroyed here, otherwise we get                  * write memory leaking. */                ptlrpc_set_destroy(arg.set);        RETURN(atomic_read(&arg.restart) ? -ERESTART : 0);}int ldlm_run_cp_ast_work(struct list_head *rpc_list){        struct ldlm_cb_set_arg arg;        struct list_head *tmp, *pos;        int ast_count;        int rc = 0;        ENTRY;        arg.set = ptlrpc_prep_set();        atomic_set(&arg.restart, 0);        arg.type = LDLM_CP_CALLBACK;        /* It's possible to receive a completion AST before we've set         * the l_completion_ast pointer: either because the AST arrived         * before the reply, or simply because there's a small race         * window between receiving the reply and finishing the local         * enqueue. (bug 842)         *         * This can't happen with the blocking_ast, however, because we         * will never call the local blocking_ast until we drop our         * reader/writer reference, which we won't do until we get the         * reply and finish enqueueing. */                ast_count = 0;        list_for_each_safe(tmp, pos, rpc_list) {                struct ldlm_lock *lock =                        list_entry(tmp, struct ldlm_lock, l_cp_ast);                ldlm_completion_callback completion_callback;                /* nobody should touch l_cp_ast */                lock_res_and_lock(lock);                list_del_init(&lock->l_cp_ast);                LASSERT(lock->l_flags & LDLM_FL_CP_REQD);                /* save l_completion_ast since it can be changed by                 * mds_intent_policy(), see bug 14225 */                completion_callback = lock->l_completion_ast;

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?