ldlm_flock.c

来自「lustre 1.6.5 source code」· C语言 代码 · 共 580 行 · 第 1/2 页

C
580
字号
                        }                        if (added) {                                ldlm_flock_destroy(lock, mode, *flags);                        } else {                                new = lock;                                added = 1;                        }                        continue;                }                if (new->l_policy_data.l_flock.start >                    lock->l_policy_data.l_flock.end)                        continue;                if (new->l_policy_data.l_flock.end <                    lock->l_policy_data.l_flock.start)                        break;                ++overlaps;                if (new->l_policy_data.l_flock.start <=                    lock->l_policy_data.l_flock.start) {                        if (new->l_policy_data.l_flock.end <                            lock->l_policy_data.l_flock.end) {                                lock->l_policy_data.l_flock.start =                                        new->l_policy_data.l_flock.end + 1;                                break;                        }                        ldlm_flock_destroy(lock, lock->l_req_mode, *flags);                        continue;                }                if (new->l_policy_data.l_flock.end >=                    lock->l_policy_data.l_flock.end) {                        lock->l_policy_data.l_flock.end =                                new->l_policy_data.l_flock.start - 1;                        continue;                }                /* split the existing lock into two locks */                /* if this is an F_UNLCK operation then we could avoid                 * allocating a new lock and use the req lock passed in                 * with the request but this would complicate the reply                 * processing since updates to req get reflected in the                 * reply. The client side replays the lock request so                 * it must see the original lock data in the reply. */                /* XXX - if ldlm_lock_new() can sleep we should                 * release the ns_lock, allocate the new lock,                 * and restart processing this lock. */                if (!new2) {                        unlock_res_and_lock(req);                         new2 = ldlm_lock_create(ns, res->lr_name, LDLM_FLOCK,                                        lock->l_granted_mode, NULL, NULL, NULL,                                        NULL, 0);                        lock_res_and_lock(req);                        if (!new2) {                                ldlm_flock_destroy(req, lock->l_granted_mode, *flags);                                *err = -ENOLCK;                                RETURN(LDLM_ITER_STOP);                        }                        goto reprocess;                }                splitted = 1;                new2->l_granted_mode = lock->l_granted_mode;                new2->l_policy_data.l_flock.pid =                        new->l_policy_data.l_flock.pid;                new2->l_policy_data.l_flock.start =                        lock->l_policy_data.l_flock.start;                new2->l_policy_data.l_flock.end =                        new->l_policy_data.l_flock.start - 1;                lock->l_policy_data.l_flock.start =                        new->l_policy_data.l_flock.end + 1;                new2->l_conn_export = lock->l_conn_export;                if (lock->l_export != NULL) {                        new2->l_export = class_export_get(lock->l_export);                        spin_lock(&new2->l_export->exp_ldlm_data.led_lock);                        list_add(&new2->l_export_chain,                                 &new2->l_export->exp_ldlm_data.led_held_locks);                        spin_unlock(&new2->l_export->exp_ldlm_data.led_lock);                }                if (*flags == LDLM_FL_WAIT_NOREPROC) {                        ldlm_lock_addref_internal_nolock(new2, lock->l_granted_mode);                }                /* insert new2 at lock */                ldlm_resource_add_lock(res, ownlocks, new2);                LDLM_LOCK_PUT(new2);                break;        }        /* if new2 is created but never used, destroy it*/        if (splitted == 0 && new2 != NULL)                ldlm_lock_destroy_nolock(new2);        /* At this point we're granting the lock request. */        req->l_granted_mode = req->l_req_mode;        /* Add req to the granted queue before calling ldlm_reprocess_all(). */        if (!added) {                list_del_init(&req->l_res_link);                /* insert new lock before ownlocks in list. */                ldlm_resource_add_lock(res, ownlocks, req);        }        if (*flags != LDLM_FL_WAIT_NOREPROC) {                if (first_enq) {                        /* If this is an unlock, reprocess the waitq and                         * send completions ASTs for locks that can now be                          * granted. The only problem with doing this                         * reprocessing here is that the completion ASTs for                         * newly granted locks will be sent before the unlock                         * completion is sent. It shouldn't be an issue. Also                         * note that ldlm_process_flock_lock() will recurse,                         * but only once because first_enq will be false from                         * ldlm_reprocess_queue. */                        if ((mode == LCK_NL) && overlaps) {                                struct list_head rpc_list                                                    = CFS_LIST_HEAD_INIT(rpc_list);                                int rc;restart:                                ldlm_reprocess_queue(res, &res->lr_waiting,                                                     &rpc_list);                                unlock_res_and_lock(req);                                rc = ldlm_run_cp_ast_work(&rpc_list);                                lock_res_and_lock(req);                                if (rc == -ERESTART)                                        GOTO(restart, -ERESTART);                       }                } else {                        LASSERT(req->l_completion_ast);                        ldlm_add_ast_work_item(req, NULL, work_list);                }        }        /* In case we're reprocessing the requested lock we can't destroy         * it until after calling ldlm_ast_work_item() above so that lawi()         * can bump the reference count on req. Otherwise req could be freed         * before the completion AST can be sent.  */        if (added)                ldlm_flock_destroy(req, mode, *flags);        ldlm_resource_dump(D_OTHER, res);        RETURN(LDLM_ITER_CONTINUE);}struct ldlm_flock_wait_data {        struct ldlm_lock *fwd_lock;        int               fwd_generation;};static voidldlm_flock_interrupted_wait(void *data){        struct ldlm_lock *lock;        struct lustre_handle lockh;        int rc;        ENTRY;        lock = ((struct ldlm_flock_wait_data *)data)->fwd_lock;        /* take lock off the deadlock detection waitq. */        list_del_init(&lock->l_flock_waitq);        /* client side - set flag to prevent lock from being put on lru list */        lock->l_flags |= LDLM_FL_CBPENDING;        ldlm_lock_decref_internal(lock, lock->l_req_mode);        ldlm_lock2handle(lock, &lockh);        rc = ldlm_cli_cancel(&lockh);        if (rc != ELDLM_OK)                CERROR("ldlm_cli_cancel: %d\n", rc);        EXIT;}intldlm_flock_completion_ast(struct ldlm_lock *lock, int flags, void *data){        struct ldlm_namespace *ns;        cfs_flock_t *getlk = lock->l_ast_data;        struct ldlm_flock_wait_data fwd;        struct obd_device *obd;        struct obd_import *imp = NULL;        ldlm_error_t err;        int rc = 0;        struct l_wait_info lwi;        ENTRY;        CDEBUG(D_DLMTRACE, "flags: 0x%x data: %p getlk: %p\n",               flags, data, getlk);        LASSERT(flags != LDLM_FL_WAIT_NOREPROC);        if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |                       LDLM_FL_BLOCK_CONV)))                goto  granted;        LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "                   "sleeping");        fwd.fwd_lock = lock;        obd = class_exp2obd(lock->l_conn_export);        /* if this is a local lock, then there is no import */        if (obd != NULL)                imp = obd->u.cli.cl_import;        if (imp != NULL) {                spin_lock(&imp->imp_lock);                fwd.fwd_generation = imp->imp_generation;                spin_unlock(&imp->imp_lock);        }        lwi = LWI_TIMEOUT_INTR(0, NULL, ldlm_flock_interrupted_wait, &fwd);        /* Go to sleep until the lock is granted. */        rc = l_wait_event(lock->l_waitq,                          ((lock->l_req_mode == lock->l_granted_mode) ||                           lock->l_destroyed), &lwi);        LDLM_DEBUG(lock, "client-side enqueue waking up: rc = %d", rc);        RETURN(rc); granted:        LDLM_DEBUG(lock, "client-side enqueue granted");        ns = lock->l_resource->lr_namespace;        lock_res_and_lock(lock);        /* take lock off the deadlock detection waitq. */        list_del_init(&lock->l_flock_waitq);        /* ldlm_lock_enqueue() has already placed lock on the granted list. */        list_del_init(&lock->l_res_link);        if (flags & LDLM_FL_TEST_LOCK) {                /* fcntl(F_GETLK) request */                /* The old mode was saved in getlk->fl_type so that if the mode                 * in the lock changes we can decref the approprate refcount. */                ldlm_flock_destroy(lock, cfs_flock_type(getlk), LDLM_FL_WAIT_NOREPROC);                switch (lock->l_granted_mode) {                case LCK_PR:                        cfs_flock_set_type(getlk, F_RDLCK);                        break;                case LCK_PW:                        cfs_flock_set_type(getlk, F_WRLCK);                        break;                default:                        cfs_flock_set_type(getlk, F_UNLCK);                }                cfs_flock_set_pid(getlk, (pid_t)lock->l_policy_data.l_flock.pid);                cfs_flock_set_start(getlk, (loff_t)lock->l_policy_data.l_flock.start);                cfs_flock_set_end(getlk, (loff_t)lock->l_policy_data.l_flock.end);        } else {                int noreproc = LDLM_FL_WAIT_NOREPROC;                /* We need to reprocess the lock to do merges or splits                 * with existing locks owned by this process. */                ldlm_process_flock_lock(lock, &noreproc, 1, &err, NULL);                if (flags == 0)                        cfs_waitq_signal(&lock->l_waitq);        }        unlock_res_and_lock(lock);        RETURN(0);}EXPORT_SYMBOL(ldlm_flock_completion_ast);int ldlm_flock_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,                            void *data, int flag){        struct ldlm_namespace *ns;        ENTRY;        LASSERT(lock);        LASSERT(flag == LDLM_CB_CANCELING);        ns = lock->l_resource->lr_namespace;        /* take lock off the deadlock detection waitq. */        lock_res_and_lock(lock);        list_del_init(&lock->l_flock_waitq);        unlock_res_and_lock(lock);        RETURN(0);}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?