ldlm_lock.c

来自「lustre 1.6.5 source code」· C语言 代码 · 共 1,767 行 · 第 1/5 页

C
1,767
字号
                lock->l_flags &= ~LDLM_FL_CP_REQD;                unlock_res_and_lock(lock);                if (completion_callback != NULL) {                        rc = completion_callback(lock, 0, (void *)&arg);                        ast_count++;                }                LDLM_LOCK_PUT(lock);                /* Send the request set if it exceeds the PARALLEL_AST_LIMIT,                 * and create a new set for requests that remained in                 * @rpc_list */                if (unlikely(ast_count == PARALLEL_AST_LIMIT)) {                        ldlm_send_and_maybe_create_set(&arg, 1);                        ast_count = 0;                }        }        if (ast_count > 0)                ldlm_send_and_maybe_create_set(&arg, 0);        else                /* In case when number of ASTs is multiply of                 * PARALLEL_AST_LIMIT or @rpc_list was initially empty,                 * @arg.set must be destroyed here, otherwise we get                  * write memory leaking. */                ptlrpc_set_destroy(arg.set);        RETURN(atomic_read(&arg.restart) ? -ERESTART : 0);}static int reprocess_one_queue(struct ldlm_resource *res, void *closure){        ldlm_reprocess_all(res);        return LDLM_ITER_CONTINUE;}void ldlm_reprocess_all_ns(struct ldlm_namespace *ns){        struct list_head *tmp;        int i, rc;        ENTRY;        spin_lock(&ns->ns_hash_lock);        for (i = 0; i < RES_HASH_SIZE; i++) {                tmp = ns->ns_hash[i].next;                while (tmp != &(ns->ns_hash[i])) {                        struct ldlm_resource *res =                                list_entry(tmp, struct ldlm_resource, lr_hash);                        ldlm_resource_getref(res);                        spin_unlock(&ns->ns_hash_lock);                        rc = reprocess_one_queue(res, NULL);                        spin_lock(&ns->ns_hash_lock);                        tmp = tmp->next;                        ldlm_resource_putref_locked(res);                        if (rc == LDLM_ITER_STOP)                                GOTO(out, rc);                }        } out:        spin_unlock(&ns->ns_hash_lock);        EXIT;}void ldlm_reprocess_all(struct ldlm_resource *res){        CFS_LIST_HEAD(rpc_list);        int rc;        ENTRY;        /* Local lock trees don't get reprocessed. */        if (ns_is_client(res->lr_namespace)) {                EXIT;                return;        } restart:        lock_res(res);        rc = ldlm_reprocess_queue(res, &res->lr_converting, &rpc_list);        if (rc == LDLM_ITER_CONTINUE)                ldlm_reprocess_queue(res, &res->lr_waiting, &rpc_list);        unlock_res(res);        rc = ldlm_run_cp_ast_work(&rpc_list);        if (rc == -ERESTART) {                LASSERT(list_empty(&rpc_list));                goto restart;        }        EXIT;}void ldlm_cancel_callback(struct ldlm_lock *lock){        check_res_locked(lock->l_resource);        if (!(lock->l_flags & LDLM_FL_CANCEL)) {                lock->l_flags |= LDLM_FL_CANCEL;                if (lock->l_blocking_ast) {                        // l_check_no_ns_lock(ns);                        unlock_res_and_lock(lock);                        lock->l_blocking_ast(lock, NULL, lock->l_ast_data,                                             LDLM_CB_CANCELING);                        lock_res_and_lock(lock);                } else {                        LDLM_DEBUG(lock, "no blocking ast");                }        }        lock->l_flags |= LDLM_FL_BL_DONE;}void ldlm_unlink_lock_skiplist(struct ldlm_lock *req){        struct ldlm_lock *lock;        if (req->l_resource->lr_type != LDLM_PLAIN &&            req->l_resource->lr_type != LDLM_IBITS)                return;                if (LDLM_SL_HEAD(&req->l_sl_mode)) {                lock = list_entry(req->l_res_link.next, struct ldlm_lock,                                  l_res_link);                if (req->l_sl_mode.next == &lock->l_sl_mode) {                        lock->l_sl_mode.prev = NULL;                } else {                        lock->l_sl_mode.next = req->l_sl_mode.next;                        lock->l_sl_mode.next->prev = &lock->l_sl_mode;                }                req->l_sl_mode.next = NULL;        } else if (LDLM_SL_TAIL(&req->l_sl_mode)) {                lock = list_entry(req->l_res_link.prev, struct ldlm_lock,                                  l_res_link);                if (req->l_sl_mode.prev == &lock->l_sl_mode) {                        lock->l_sl_mode.next = NULL;                } else {                        lock->l_sl_mode.prev = req->l_sl_mode.prev;                        lock->l_sl_mode.prev->next = &lock->l_sl_mode;                }                req->l_sl_mode.prev = NULL;        }        if (LDLM_SL_HEAD(&req->l_sl_policy)) {                lock = list_entry(req->l_res_link.next, struct ldlm_lock,                                  l_res_link);                if (req->l_sl_policy.next == &lock->l_sl_policy) {                        lock->l_sl_policy.prev = NULL;                } else {                        lock->l_sl_policy.next = req->l_sl_policy.next;                        lock->l_sl_policy.next->prev = &lock->l_sl_policy;                }                req->l_sl_policy.next = NULL;        } else if (LDLM_SL_TAIL(&req->l_sl_policy)) {                lock = list_entry(req->l_res_link.prev, struct ldlm_lock,                                  l_res_link);                if (req->l_sl_policy.prev == &lock->l_sl_policy) {                        lock->l_sl_policy.next = NULL;                } else {                        lock->l_sl_policy.prev = req->l_sl_policy.prev;                        lock->l_sl_policy.prev->next = &lock->l_sl_policy;                }                req->l_sl_policy.prev = NULL;        }}void ldlm_lock_cancel(struct ldlm_lock *lock){        struct ldlm_resource *res;        struct ldlm_namespace *ns;        ENTRY;        lock_res_and_lock(lock);        res = lock->l_resource;        ns = res->lr_namespace;        /* Please do not, no matter how tempting, remove this LBUG without         * talking to me first. -phik */        if (lock->l_readers || lock->l_writers) {                LDLM_ERROR(lock, "lock still has references");                LBUG();        }        ldlm_del_waiting_lock(lock);        /* Releases res lock */        ldlm_cancel_callback(lock);        /* Yes, second time, just in case it was added again while we were           running with no res lock in ldlm_cancel_callback */        ldlm_del_waiting_lock(lock);         ldlm_resource_unlink_lock(lock);        ldlm_lock_destroy_nolock(lock);        if (lock->l_granted_mode == lock->l_req_mode)                ldlm_pool_del(&ns->ns_pool, lock);        /* Make sure we will not be called again for same lock what is possible         * if not to zero out lock->l_granted_mode */        lock->l_granted_mode = 0;        unlock_res_and_lock(lock);        EXIT;}int ldlm_lock_set_data(struct lustre_handle *lockh, void *data){        struct ldlm_lock *lock = ldlm_handle2lock(lockh);        ENTRY;        if (lock == NULL)                RETURN(-EINVAL);        lock->l_ast_data = data;        LDLM_LOCK_PUT(lock);        RETURN(0);}void ldlm_cancel_locks_for_export(struct obd_export *exp){        struct ldlm_lock *lock;        struct ldlm_resource *res;        spin_lock(&exp->exp_ldlm_data.led_lock);        while(!list_empty(&exp->exp_ldlm_data.led_held_locks)) {                lock = list_entry(exp->exp_ldlm_data.led_held_locks.next,                                  struct ldlm_lock, l_export_chain);                res = ldlm_resource_getref(lock->l_resource);                LDLM_LOCK_GET(lock);                spin_unlock(&exp->exp_ldlm_data.led_lock);                LDLM_DEBUG(lock, "export %p", exp);                ldlm_res_lvbo_update(res, NULL, 0, 1);                ldlm_lock_cancel(lock);                ldlm_reprocess_all(res);                ldlm_resource_putref(res);                LDLM_LOCK_PUT(lock);                spin_lock(&exp->exp_ldlm_data.led_lock);        }        spin_unlock(&exp->exp_ldlm_data.led_lock);}struct ldlm_resource *ldlm_lock_convert(struct ldlm_lock *lock, int new_mode,                                        __u32 *flags){        CFS_LIST_HEAD(rpc_list);        struct ldlm_resource *res;        struct ldlm_namespace *ns;        int granted = 0;        int old_mode, rc;        struct ldlm_lock *mark_lock = NULL;        int join= LDLM_JOIN_NONE;        ldlm_error_t err;        struct ldlm_interval *node;        ENTRY;        if (new_mode == lock->l_granted_mode) { // No changes? Just return.                *flags |= LDLM_FL_BLOCK_GRANTED;                RETURN(lock->l_resource);        }        /* I can't check the type of lock here because the bitlock of lock         * is not held here, so do the allocation blindly. -jay */        OBD_SLAB_ALLOC(node, ldlm_interval_slab, CFS_ALLOC_IO, sizeof(*node));        if (node == NULL)  /* Actually, this causes EDEADLOCK to be returned */                RETURN(NULL);        LASSERTF(new_mode == LCK_PW && lock->l_granted_mode == LCK_PR,                 "new_mode %u, granted %u\n", new_mode, lock->l_granted_mode);        lock_res_and_lock(lock);        res = lock->l_resource;        ns = res->lr_namespace;        old_mode = lock->l_req_mode;        lock->l_req_mode = new_mode;        if (res->lr_type == LDLM_PLAIN || res->lr_type == LDLM_IBITS) {                /* remember the lock position where the lock might be                  * added back to the granted list later and also                  * remember the join mode for skiplist fixing. */                if (LDLM_SL_HEAD(&lock->l_sl_mode))                        join = LDLM_MODE_JOIN_RIGHT;                else if (LDLM_SL_TAIL(&lock->l_sl_mode))                        join = LDLM_MODE_JOIN_LEFT;                if (LDLM_SL_HEAD(&lock->l_sl_policy))                        join |= LDLM_POLICY_JOIN_RIGHT;                else if (LDLM_SL_TAIL(&lock->l_sl_policy))                        join |= LDLM_POLICY_JOIN_LEFT;                LASSERT(!((join & LDLM_MODE_JOIN_RIGHT) &&                          (join & LDLM_POLICY_JOIN_LEFT)));                LASSERT(!((join & LDLM_MODE_JOIN_LEFT) &&                          (join & LDLM_POLICY_JOIN_RIGHT)));                if ((join & LDLM_MODE_JOIN_LEFT) ||                    (join & LDLM_POLICY_JOIN_LEFT))                        mark_lock = list_entry(lock->l_res_link.prev,                                               struct ldlm_lock, l_res_link);                else if (lock->l_res_link.next != &res->lr_granted)                        mark_lock = list_entry(lock->l_res_link.next,                                               struct ldlm_lock, l_res_link);        } else {                ldlm_resource_unlink_lock(lock);                if (res->lr_type == LDLM_EXTENT) {                        /* FIXME: ugly code, I have to attach the lock to a                          * interval node again since perhaps it will be granted                         * soon */                        CFS_INIT_LIST_HEAD(&node->li_group);                        ldlm_interval_attach(node, lock);                        node = NULL;                }        }        /* If this is a local resource, put it on the appropriate list. */        if (ns_is_client(res->lr_namespace)) {                if (*flags & (LDLM_FL_BLOCK_CONV | LDLM_FL_BLOCK_GRANTED)) {                        ldlm_resource_add_lock(res, &res->lr_converting, lock);                } else {                        /* This should never happen, because of the way the                         * server handles conversions. */                        LDLM_ERROR(lock, "Erroneous flags %d on local lock\n",                                   *flags);                        LBUG();                        ldlm_grant_lock(lock, &rpc_list);                        granted = 1;                        /* FIXME: completion handling not with ns_lock held ! */                        if (lock->l_completion_ast)                                lock->l_completion_ast(lock, 0, NULL);                }        } else {                int pflags = 0;                ldlm_processing_policy policy;                policy = ldlm_processing_policy_table[res->lr_type];                rc = policy(lock, &pflags, 0, &err, &rpc_list);                if (rc == LDLM_ITER_STOP) {                        lock->l_req_mode = old_mode;                        if (res->lr_type == LDLM_EXTENT)                                ldlm_extent_add_lock(res, lock);                        else                                ldlm_granted_list_add_lock(lock, mark_lock,                                                           join);                        res = NULL;                } else {                        *flags |= LDLM_FL_BLOCK_GRANTED;                        granted = 1;                }       

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?