ldlm_request.c

来自「lustre 1.6.5 source code」· C语言 代码 · 共 1,663 行 · 第 1/5 页

C
1,663
字号
/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * *  Copyright (C) 2002, 2003 Cluster File Systems, Inc. * *   This file is part of the Lustre file system, http://www.lustre.org *   Lustre is a trademark of Cluster File Systems, Inc. * *   You may have signed or agreed to another license before downloading *   this software.  If so, you are bound by the terms and conditions *   of that agreement, and the following does not apply to you.  See the *   LICENSE file included with this distribution for more information. * *   If you did not agree to a different license, then this copy of Lustre *   is open source software; you can redistribute it and/or modify it *   under the terms of version 2 of the GNU General Public License as *   published by the Free Software Foundation. * *   In either case, Lustre is distributed in the hope that it will be *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the *   license text for more details. */#define DEBUG_SUBSYSTEM S_LDLM#ifndef __KERNEL__#include <signal.h>#include <liblustre.h>#endif#include <lustre_dlm.h>#include <obd_class.h>#include <obd.h>#include "ldlm_internal.h"int ldlm_enqueue_min = OBD_TIMEOUT_DEFAULT;CFS_MODULE_PARM(ldlm_enqueue_min, "i", int, 0644,                "lock enqueue timeout minimum");static void interrupted_completion_wait(void *data){}struct lock_wait_data {        struct ldlm_lock *lwd_lock;        __u32             lwd_conn_cnt;};struct ldlm_async_args {        struct lustre_handle lock_handle;};int ldlm_expired_completion_wait(void *data){        struct lock_wait_data *lwd = data;        struct ldlm_lock *lock = lwd->lwd_lock;        struct obd_import *imp;        struct obd_device *obd;        ENTRY;        if (lock->l_conn_export == NULL) {                static cfs_time_t next_dump = 0, last_dump = 0;                LDLM_ERROR(lock, "lock timed out (enqueued at %lu, %lus ago); "                           "not entering recovery in server code, just going "                           "back to sleep", lock->l_enqueued_time.tv_sec,                           cfs_time_current_sec() -                           lock->l_enqueued_time.tv_sec);                if (cfs_time_after(cfs_time_current(), next_dump)) {                        last_dump = next_dump;                        next_dump = cfs_time_shift(300);                        ldlm_namespace_dump(D_DLMTRACE,                                            lock->l_resource->lr_namespace);                        if (last_dump == 0)                                libcfs_debug_dumplog();                }                RETURN(0);        }        obd = lock->l_conn_export->exp_obd;        imp = obd->u.cli.cl_import;        ptlrpc_fail_import(imp, lwd->lwd_conn_cnt);        LDLM_ERROR(lock, "lock timed out (enqueued at %lu, %lus ago), entering "                   "recovery for %s@%s", lock->l_enqueued_time.tv_sec,                   CURRENT_SECONDS - lock->l_enqueued_time.tv_sec,                   obd2cli_tgt(obd), imp->imp_connection->c_remote_uuid.uuid);        RETURN(0);}/* We use the same basis for both server side and client side functions   from a single node. */int ldlm_get_enq_timeout(struct ldlm_lock *lock){        int timeout = at_get(&lock->l_resource->lr_namespace->ns_at_estimate);        if (AT_OFF)                return obd_timeout / 2;        /* Since these are non-updating timeouts, we should be conservative.           It would be nice to have some kind of "early reply" mechanism for           lock callbacks too... */        timeout = timeout + (timeout >> 1); /* 150% */        return max(timeout, ldlm_enqueue_min);}static int is_granted_or_cancelled(struct ldlm_lock *lock){        int ret = 0;        lock_res_and_lock(lock);        if (((lock->l_req_mode == lock->l_granted_mode) &&             !(lock->l_flags & LDLM_FL_CP_REQD)) ||            (lock->l_flags & LDLM_FL_FAILED))                ret = 1;        unlock_res_and_lock(lock);        return ret;}int ldlm_completion_ast(struct ldlm_lock *lock, int flags, void *data){        /* XXX ALLOCATE - 160 bytes */        struct lock_wait_data lwd;        struct obd_device *obd;        struct obd_import *imp = NULL;        struct l_wait_info lwi;        __u32 timeout;        int rc = 0;        ENTRY;        if (flags == LDLM_FL_WAIT_NOREPROC) {                LDLM_DEBUG(lock, "client-side enqueue waiting on pending lock");                goto noreproc;        }        if (!(flags & (LDLM_FL_BLOCK_WAIT | LDLM_FL_BLOCK_GRANTED |                       LDLM_FL_BLOCK_CONV))) {                cfs_waitq_signal(&lock->l_waitq);                RETURN(0);        }        LDLM_DEBUG(lock, "client-side enqueue returned a blocked lock, "                   "sleeping");        ldlm_lock_dump(D_OTHER, lock, 0);        ldlm_reprocess_all(lock->l_resource);noreproc:        obd = class_exp2obd(lock->l_conn_export);        /* if this is a local lock, then there is no import */        if (obd != NULL) {                imp = obd->u.cli.cl_import;        }        /* Wait a long time for enqueue - server may have to callback a           lock from another client.  Server will evict the other client if it           doesn't respond reasonably, and then give us the lock. */        timeout = ldlm_get_enq_timeout(lock) * 2;        lwd.lwd_lock = lock;        if (lock->l_flags & LDLM_FL_NO_TIMEOUT) {                LDLM_DEBUG(lock, "waiting indefinitely because of NO_TIMEOUT");                lwi = LWI_INTR(interrupted_completion_wait, &lwd);        } else {                lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(timeout),                                       ldlm_expired_completion_wait,                                       interrupted_completion_wait, &lwd);        }        if (imp != NULL) {                spin_lock(&imp->imp_lock);                lwd.lwd_conn_cnt = imp->imp_conn_cnt;                spin_unlock(&imp->imp_lock);        }        /* Go to sleep until the lock is granted or cancelled. */        rc = l_wait_event(lock->l_waitq, is_granted_or_cancelled(lock), &lwi);        if (lock->l_destroyed || lock->l_flags & LDLM_FL_FAILED) {                LDLM_DEBUG(lock, "client-side enqueue waking up: destroyed");                RETURN(-EIO);        }        if (rc) {                LDLM_DEBUG(lock, "client-side enqueue waking up: failed (%d)",                           rc);                RETURN(rc);        }        LDLM_DEBUG(lock, "client-side enqueue waking up: granted after %lds",                   cfs_time_current_sec() - lock->l_enqueued_time.tv_sec);        /* Update our time estimate */        at_add(&lock->l_resource->lr_namespace->ns_at_estimate,               cfs_time_current_sec() - lock->l_enqueued_time.tv_sec);        RETURN(0);}/* * ->l_blocking_ast() callback for LDLM locks acquired by server-side OBDs. */int ldlm_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,                      void *data, int flag){        int do_ast;        ENTRY;        if (flag == LDLM_CB_CANCELING) {                /* Don't need to do anything here. */                RETURN(0);        }        lock_res_and_lock(lock);        /* Get this: if ldlm_blocking_ast is racing with intent_policy, such         * that ldlm_blocking_ast is called just before intent_policy method         * takes the ns_lock, then by the time we get the lock, we might not         * be the correct blocking function anymore.  So check, and return         * early, if so. */        if (lock->l_blocking_ast != ldlm_blocking_ast) {                unlock_res_and_lock(lock);                RETURN(0);        }        lock->l_flags |= LDLM_FL_CBPENDING;        do_ast = (!lock->l_readers && !lock->l_writers);        unlock_res_and_lock(lock);        if (do_ast) {                struct lustre_handle lockh;                int rc;                LDLM_DEBUG(lock, "already unused, calling ldlm_cli_cancel");                ldlm_lock2handle(lock, &lockh);                rc = ldlm_cli_cancel(&lockh);                if (rc < 0)                        CERROR("ldlm_cli_cancel: %d\n", rc);        } else {                LDLM_DEBUG(lock, "Lock still has references, will be "                           "cancelled later");        }        RETURN(0);}/* * ->l_glimpse_ast() for DLM extent locks acquired on the server-side. See * comment in filter_intent_policy() on why you may need this. */int ldlm_glimpse_ast(struct ldlm_lock *lock, void *reqp){        /*         * Returning -ELDLM_NO_LOCK_DATA actually works, but the reason for         * that is rather subtle: with OST-side locking, it may so happen that         * _all_ extent locks are held by the OST. If client wants to obtain         * current file size it calls ll{,u}_glimpse_size(), and (as locks are         * on the server), dummy glimpse callback fires and does         * nothing. Client still receives correct file size due to the         * following fragment in filter_intent_policy():         *         * rc = l->l_glimpse_ast(l, NULL); // this will update the LVB         * if (rc != 0 && res->lr_namespace->ns_lvbo &&         *     res->lr_namespace->ns_lvbo->lvbo_update) {         *         res->lr_namespace->ns_lvbo->lvbo_update(res, NULL, 0, 1);         * }         *         * that is, after glimpse_ast() fails, filter_lvbo_update() runs, and         * returns correct file size to the client.         */        return -ELDLM_NO_LOCK_DATA;}int ldlm_cli_enqueue_local(struct ldlm_namespace *ns,                           struct ldlm_res_id *res_id,                           ldlm_type_t type, ldlm_policy_data_t *policy,                           ldlm_mode_t mode, int *flags,                           ldlm_blocking_callback blocking,                           ldlm_completion_callback completion,                           ldlm_glimpse_callback glimpse,                           void *data, __u32 lvb_len, void *lvb_swabber,                           struct lustre_handle *lockh){        struct ldlm_lock *lock;        int err;        ENTRY;        LASSERT(!(*flags & LDLM_FL_REPLAY));        if (ns_is_client(ns)) {                CERROR("Trying to enqueue local lock in a shadow namespace\n");                LBUG();        }        lock = ldlm_lock_create(ns, *res_id, type, mode, blocking,                                completion, glimpse, data, lvb_len);        if (!lock)                GOTO(out_nolock, err = -ENOMEM);        LDLM_DEBUG(lock, "client-side local enqueue handler, new lock created");        ldlm_lock_addref_internal(lock, mode);        ldlm_lock2handle(lock, lockh);        lock_res_and_lock(lock);        lock->l_flags |= LDLM_FL_LOCAL;        if (*flags & LDLM_FL_ATOMIC_CB)                lock->l_flags |= LDLM_FL_ATOMIC_CB;        lock->l_lvb_swabber = lvb_swabber;        unlock_res_and_lock(lock);        if (policy != NULL)                lock->l_policy_data = *policy;        if (type == LDLM_EXTENT)                lock->l_req_extent = policy->l_extent;        err = ldlm_lock_enqueue(ns, &lock, policy, flags);        if (err != ELDLM_OK)                GOTO(out, err);        if (policy != NULL)                *policy = lock->l_policy_data;        if ((*flags) & LDLM_FL_LOCK_CHANGED)                *res_id = lock->l_resource->lr_name;        LDLM_DEBUG_NOLOCK("client-side local enqueue handler END (lock %p)",                          lock);        if (lock->l_completion_ast)                lock->l_completion_ast(lock, *flags, NULL);        LDLM_DEBUG(lock, "client-side local enqueue END");        EXIT; out:        LDLM_LOCK_PUT(lock); out_nolock:        return err;

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?