⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 handler.c

📁 lustre 1.6.5 source code
💻 C
📖 第 1 页 / 共 5 页
字号:
/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * *  lustre/mds/handler.c *  Lustre Metadata Server (mds) request handler * *  Copyright (c) 2001-2005 Cluster File Systems, Inc. *   Author: Peter Braam <braam@clusterfs.com> *   Author: Andreas Dilger <adilger@clusterfs.com> *   Author: Phil Schwan <phil@clusterfs.com> *   Author: Mike Shaver <shaver@clusterfs.com> * *   This file is part of the Lustre file system, http://www.lustre.org *   Lustre is a trademark of Cluster File Systems, Inc. * *   You may have signed or agreed to another license before downloading *   this software.  If so, you are bound by the terms and conditions *   of that agreement, and the following does not apply to you.  See the *   LICENSE file included with this distribution for more information. * *   If you did not agree to a different license, then this copy of Lustre *   is open source software; you can redistribute it and/or modify it *   under the terms of version 2 of the GNU General Public License as *   published by the Free Software Foundation. * *   In either case, Lustre is distributed in the hope that it will be *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the *   license text for more details. */#ifndef EXPORT_SYMTAB# define EXPORT_SYMTAB#endif#define DEBUG_SUBSYSTEM S_MDS#include <lustre_mds.h>#include <linux/module.h>#include <linux/init.h>#include <linux/random.h>#include <linux/fs.h>#include <linux/jbd.h>#if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))# include <linux/smp_lock.h># include <linux/buffer_head.h># include <linux/workqueue.h># include <linux/mount.h>#else# include <linux/locks.h>#endif#include <obd_class.h>#include <lustre_dlm.h>#include <obd_lov.h>#include <lustre_fsfilt.h>#include <lprocfs_status.h>#include <lustre_commit_confd.h>#include <lustre_quota.h>#include <lustre_disk.h>#include <lustre_param.h>#include "mds_internal.h"int mds_num_threads;CFS_MODULE_PARM(mds_num_threads, "i", int, 0444,                "number of MDS service threads to start");static int mds_intent_policy(struct ldlm_namespace *ns,                             struct ldlm_lock **lockp, void *req_cookie,                             ldlm_mode_t mode, int flags, void *data);static int mds_postsetup(struct obd_device *obd);static int mds_cleanup(struct obd_device *obd);/* Assumes caller has already pushed into the kernel filesystem context */static int mds_sendpage(struct ptlrpc_request *req, struct file *file,                        loff_t offset, int count){        struct ptlrpc_bulk_desc *desc;        struct l_wait_info lwi;        struct page **pages;        int timeout;        int rc = 0, npages, i, tmpcount, tmpsize = 0;        ENTRY;        LASSERT((offset & ~CFS_PAGE_MASK) == 0); /* I'm dubious about this */        npages = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;        OBD_ALLOC(pages, sizeof(*pages) * npages);        if (!pages)                GOTO(out, rc = -ENOMEM);        desc = ptlrpc_prep_bulk_exp(req, npages, BULK_PUT_SOURCE,                                    MDS_BULK_PORTAL);        if (desc == NULL)                GOTO(out_free, rc = -ENOMEM);        for (i = 0, tmpcount = count; i < npages; i++, tmpcount -= tmpsize) {                tmpsize = tmpcount > CFS_PAGE_SIZE ? CFS_PAGE_SIZE : tmpcount;                OBD_PAGE_ALLOC(pages[i], CFS_ALLOC_STD);                if (pages[i] == NULL)                        GOTO(cleanup_buf, rc = -ENOMEM);                ptlrpc_prep_bulk_page(desc, pages[i], 0, tmpsize);        }        for (i = 0, tmpcount = count; i < npages; i++, tmpcount -= tmpsize) {                tmpsize = tmpcount > CFS_PAGE_SIZE ? CFS_PAGE_SIZE : tmpcount;                CDEBUG(D_EXT2, "reading %u@%llu from dir %lu (size %llu)\n",                       tmpsize, offset, file->f_dentry->d_inode->i_ino,                       i_size_read(file->f_dentry->d_inode));                rc = fsfilt_readpage(req->rq_export->exp_obd, file,                                     kmap(pages[i]), tmpsize, &offset);                kunmap(pages[i]);                if (rc != tmpsize)                        GOTO(cleanup_buf, rc = -EIO);        }        LASSERT(desc->bd_nob == count);        rc = ptlrpc_start_bulk_transfer(desc);        if (rc)                GOTO(cleanup_buf, rc);        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) {                CERROR("obd_fail_loc=%x, fail operation rc=%d\n",                       OBD_FAIL_MDS_SENDPAGE, rc);                GOTO(abort_bulk, rc);        }        timeout = (int)req->rq_deadline - (int)cfs_time_current_sec();        if (timeout < 0) {                CERROR("Req deadline already passed %lu (now: %lu)\n",                       req->rq_deadline, cfs_time_current_sec());        }        lwi = LWI_TIMEOUT(max(timeout, 1) * HZ, NULL, NULL);        rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), &lwi);        LASSERT (rc == 0 || rc == -ETIMEDOUT);        if (rc == 0) {                if (desc->bd_success &&                    desc->bd_nob_transferred == count)                        GOTO(cleanup_buf, rc);                rc = -ETIMEDOUT; /* XXX should this be a different errno? */        }        DEBUG_REQ(D_ERROR, req, "bulk failed: %s %d(%d), evicting %s@%s\n",                  (rc == -ETIMEDOUT) ? "timeout" : "network error",                  desc->bd_nob_transferred, count,                  req->rq_export->exp_client_uuid.uuid,                  req->rq_export->exp_connection->c_remote_uuid.uuid);        class_fail_export(req->rq_export);        EXIT; abort_bulk:        ptlrpc_abort_bulk (desc); cleanup_buf:        for (i = 0; i < npages; i++)                if (pages[i])                        OBD_PAGE_FREE(pages[i]);        ptlrpc_free_bulk(desc); out_free:        OBD_FREE(pages, sizeof(*pages) * npages); out:        return rc;}/* only valid locked dentries or errors should be returned */struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid,                                     struct vfsmount **mnt, int lock_mode,                                     struct lustre_handle *lockh,                                     char *name, int namelen, __u64 lockpart){        struct mds_obd *mds = &obd->u.mds;        struct dentry *de = mds_fid2dentry(mds, fid, mnt), *retval = de;        struct ldlm_res_id res_id = { .name = {0} };        int flags = LDLM_FL_ATOMIC_CB, rc;        ldlm_policy_data_t policy = { .l_inodebits = { lockpart} };         ENTRY;        if (IS_ERR(de))                RETURN(de);        res_id.name[0] = de->d_inode->i_ino;        res_id.name[1] = de->d_inode->i_generation;        rc = ldlm_cli_enqueue_local(obd->obd_namespace, &res_id,                                     LDLM_IBITS, &policy, lock_mode, &flags,                                     ldlm_blocking_ast, ldlm_completion_ast,                                    NULL, NULL, 0, NULL, lockh);        if (rc != ELDLM_OK) {                l_dput(de);                retval = ERR_PTR(-EIO); /* XXX translate ldlm code */        }        RETURN(retval);}/* Look up an entry by inode number. *//* this function ONLY returns valid dget'd dentries with an initialized inode   or errors */struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,                              struct vfsmount **mnt){        char fid_name[32];        unsigned long ino = fid->id;        __u32 generation = fid->generation;        struct inode *inode;        struct dentry *result;        if (ino == 0)                RETURN(ERR_PTR(-ESTALE));        snprintf(fid_name, sizeof(fid_name), "0x%lx", ino);        CDEBUG(D_DENTRY, "--> mds_fid2dentry: ino/gen %lu/%u, sb %p\n",               ino, generation, mds->mds_obt.obt_sb);        /* under ext3 this is neither supposed to return bad inodes           nor NULL inodes. */        result = ll_lookup_one_len(fid_name, mds->mds_fid_de, strlen(fid_name));        if (IS_ERR(result))                RETURN(result);        inode = result->d_inode;        if (!inode)                RETURN(ERR_PTR(-ENOENT));       if (inode->i_nlink == 0) {                if (inode->i_mode == 0 &&                    LTIME_S(inode->i_ctime) == 0 ) {                        struct obd_device *obd = container_of(mds, struct                                                              obd_device, u.mds);                        LCONSOLE_WARN("Found inode with zero nlink, mode and "                                      "ctime -- this may indicate disk"                                      "corruption (device %s, inode %lu, link:"                                      " %lu, count: %d)\n", obd->obd_name, inode->i_ino,                                      (unsigned long)inode->i_nlink,                                      atomic_read(&inode->i_count));                }                dput(result);                RETURN(ERR_PTR(-ENOENT));        }        if (generation && inode->i_generation != generation) {                /* we didn't find the right inode.. */                CDEBUG(D_INODE, "found wrong generation: inode %lu, link: %lu, "                       "count: %d, generation %u/%u\n", inode->i_ino,                       (unsigned long)inode->i_nlink,                       atomic_read(&inode->i_count), inode->i_generation,                       generation);                dput(result);                RETURN(ERR_PTR(-ENOENT));        }        if (mnt) {                *mnt = mds->mds_vfsmnt;                mntget(*mnt);        }        RETURN(result);}static int mds_connect_internal(struct obd_export *exp,                                 struct obd_connect_data *data){        struct obd_device *obd = exp->exp_obd;        if (data != NULL) {                data->ocd_connect_flags &= MDS_CONNECT_SUPPORTED;                data->ocd_ibits_known &= MDS_INODELOCK_FULL;                /* If no known bits (which should not happen, probably,                   as everybody should support LOOKUP and UPDATE bits at least)                   revert to compat mode with plain locks. */                if (!data->ocd_ibits_known &&                    data->ocd_connect_flags & OBD_CONNECT_IBITS)                        data->ocd_connect_flags &= ~OBD_CONNECT_IBITS;                if (!obd->u.mds.mds_fl_acl)                        data->ocd_connect_flags &= ~OBD_CONNECT_ACL;                if (!obd->u.mds.mds_fl_user_xattr)                        data->ocd_connect_flags &= ~OBD_CONNECT_XATTR;                exp->exp_connect_flags = data->ocd_connect_flags;                data->ocd_version = LUSTRE_VERSION_CODE;                exp->exp_mds_data.med_ibits_known = data->ocd_ibits_known;        }        if (obd->u.mds.mds_fl_acl &&            ((exp->exp_connect_flags & OBD_CONNECT_ACL) == 0)) {                CWARN("%s: MDS requires ACL support but client does not\n",                      obd->obd_name);                return -EBADE;        }        return 0;}static int mds_reconnect(struct obd_export *exp, struct obd_device *obd,                         struct obd_uuid *cluuid,                         struct obd_connect_data *data){        int rc;        ENTRY;        if (exp == NULL || obd == NULL || cluuid == NULL)                RETURN(-EINVAL);        rc = mds_connect_internal(exp, data);        RETURN(rc);}/* Establish a connection to the MDS. * * This will set up an export structure for the client to hold state data * about that client, like open files, the last operation number it did * on the server, etc. */static int mds_connect(struct lustre_handle *conn, struct obd_device *obd,                       struct obd_uuid *cluuid, struct obd_connect_data *data,                       void *localdata){        struct obd_export *exp;        struct mds_export_data *med;        struct mds_client_data *mcd = NULL;        lnet_nid_t *client_nid = (lnet_nid_t *)localdata;        int rc, abort_recovery;        ENTRY;        if (!conn || !obd || !cluuid)                RETURN(-EINVAL);        /* Check for aborted recovery. */        spin_lock_bh(&obd->obd_processing_task_lock);        abort_recovery = obd->obd_abort_recovery;        spin_unlock_bh(&obd->obd_processing_task_lock);        if (abort_recovery)                target_abort_recovery(obd);        /* XXX There is a small race between checking the list and adding a         * new connection for the same UUID, but the real threat (list         * corruption when multiple different clients connect) is solved.         *         * There is a second race between adding the export to the list,         * and filling in the client data below.  Hence skipping the case         * of NULL mcd above.  We should already be controlling multiple         * connects at the client, and we can't hold the spinlock over         * memory allocations without risk of deadlocking.         */        rc = class_connect(conn, obd, cluuid);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -