📄 mds_open.c
字号:
/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * * Copyright (c) 2003 Cluster File Systems, Inc. * Author: Peter Braam <braam@clusterfs.com> * Author: Andreas Dilger <adilger@clusterfs.com> * Author: Phil Schwan <phil@clusterfs.com> * Author: Mike Shaver <shaver@clusterfs.com> * * This file is part of the Lustre file system, http://www.lustre.org * Lustre is a trademark of Cluster File Systems, Inc. * * You may have signed or agreed to another license before downloading * this software. If so, you are bound by the terms and conditions * of that agreement, and the following does not apply to you. See the * LICENSE file included with this distribution for more information. * * If you did not agree to a different license, then this copy of Lustre * is open source software; you can redistribute it and/or modify it * under the terms of version 2 of the GNU General Public License as * published by the Free Software Foundation. * * In either case, Lustre is distributed in the hope that it will be * useful, but WITHOUT ANY WARRANTY; without even the implied warranty * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * license text for more details. */#ifndef EXPORT_SYMTAB# define EXPORT_SYMTAB#endif#define DEBUG_SUBSYSTEM S_MDS#include <linux/version.h>#include <linux/module.h>#include <linux/init.h>#include <linux/version.h>#if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))# include <linux/buffer_head.h># include <linux/workqueue.h>#else# include <linux/locks.h>#endif#include <obd_class.h>#include <obd_lov.h>#include <lustre_fsfilt.h>#include <lprocfs_status.h>#include "mds_internal.h"/* Exported function from this file are: * * mds_open - called by the intent handler * mds_close - an rpc handling function * mds_pin - an rpc handling function - which will go away * mds_mfd_close - for force closing files when a client dies *//* * MDS file data handling: file data holds a handle for a file opened * by a client. */static void mds_mfd_addref(void *mfdp){ struct mds_file_data *mfd = mfdp; atomic_inc(&mfd->mfd_refcount); CDEBUG(D_INFO, "GETting mfd %p : new refcount %d\n", mfd, atomic_read(&mfd->mfd_refcount));}/* Create a new mds_file_data struct. * One reference for handle+med_open_head list and dropped by mds_mfd_unlink(), * one reference for the caller of this function. */struct mds_file_data *mds_mfd_new(void){ struct mds_file_data *mfd; OBD_ALLOC(mfd, sizeof *mfd); if (mfd == NULL) { CERROR("mds: out of memory\n"); return NULL; } atomic_set(&mfd->mfd_refcount, 2); INIT_LIST_HEAD(&mfd->mfd_handle.h_link); INIT_LIST_HEAD(&mfd->mfd_list); class_handle_hash(&mfd->mfd_handle, mds_mfd_addref); return mfd;}/* Get a new reference on the mfd pointed to by handle, if handle is still * valid. Caller must drop reference with mds_mfd_put(). */static struct mds_file_data *mds_handle2mfd(struct lustre_handle *handle){ ENTRY; LASSERT(handle != NULL); RETURN(class_handle2object(handle->cookie));}/* Drop mfd reference, freeing struct if this is the last one. */static void mds_mfd_put(struct mds_file_data *mfd){ CDEBUG(D_INFO, "PUTting mfd %p : new refcount %d\n", mfd, atomic_read(&mfd->mfd_refcount) - 1); LASSERT(atomic_read(&mfd->mfd_refcount) > 0 && atomic_read(&mfd->mfd_refcount) < 0x5a5a); if (atomic_dec_and_test(&mfd->mfd_refcount)) { OBD_FREE_RCU(mfd, sizeof *mfd, &mfd->mfd_handle); }}/* Remove the mfd handle so that it cannot be found by open/close again. * Caller must hold med_open_lock for mfd_list manipulation. */void mds_mfd_unlink(struct mds_file_data *mfd, int decref){ class_handle_unhash(&mfd->mfd_handle); list_del_init(&mfd->mfd_list); if (decref) mds_mfd_put(mfd);}/* Caller must hold mds->mds_epoch_sem */static int mds_alloc_filterdata(struct inode *inode){ LASSERT(inode->i_filterdata == NULL); OBD_ALLOC(inode->i_filterdata, sizeof(struct mds_filter_data)); if (inode->i_filterdata == NULL) return -ENOMEM; LASSERT(igrab(inode) == inode); return 0;}/* Caller must hold mds->mds_epoch_sem */static void mds_free_filterdata(struct inode *inode){ LASSERT(inode->i_filterdata != NULL); OBD_FREE(inode->i_filterdata, sizeof(struct mds_filter_data)); inode->i_filterdata = NULL; iput(inode);}/* Write access to a file: executors cause a negative count, * writers a positive count. The semaphore is needed to perform * a check for the sign and then increment or decrement atomically. * * This code is closely tied to the allocation of the d_fsdata and the * MDS epoch, so we use the same semaphore for the whole lot. * * We could use a different semaphore for each file, if it ever shows * up in a profile, which it won't. * * epoch argument is nonzero during recovery */static int mds_get_write_access(struct mds_obd *mds, struct inode *inode, __u64 epoch){ int rc = 0; down(&mds->mds_epoch_sem); if (atomic_read(&inode->i_writecount) < 0) { up(&mds->mds_epoch_sem); RETURN(-ETXTBSY); } if (MDS_FILTERDATA(inode) && MDS_FILTERDATA(inode)->io_epoch != 0) { CDEBUG(D_INODE, "continuing MDS epoch "LPU64" for ino %lu/%u\n", MDS_FILTERDATA(inode)->io_epoch, inode->i_ino, inode->i_generation); goto out; } if (inode->i_filterdata == NULL) mds_alloc_filterdata(inode); if (inode->i_filterdata == NULL) { rc = -ENOMEM; goto out; } if (epoch > mds->mds_io_epoch) mds->mds_io_epoch = epoch; else mds->mds_io_epoch++; MDS_FILTERDATA(inode)->io_epoch = mds->mds_io_epoch; CDEBUG(D_INODE, "starting MDS epoch "LPU64" for ino %lu/%u\n", mds->mds_io_epoch, inode->i_ino, inode->i_generation); out: if (rc == 0) atomic_inc(&inode->i_writecount); up(&mds->mds_epoch_sem); return rc;}/* Returns EAGAIN if the client needs to get size and/or cookies and close * again -- which is never true if the file is about to be unlinked. Otherwise * returns the number of remaining writers. */static int mds_put_write_access(struct mds_obd *mds, struct inode *inode, struct mds_body *body, int unlinking){ int rc = 0; ENTRY; down(&mds->mds_epoch_sem); atomic_dec(&inode->i_writecount); rc = atomic_read(&inode->i_writecount); if (rc > 0) GOTO(out, rc);#if 0 if (!unlinking && !(body->valid & OBD_MD_FLSIZE)) GOTO(out, rc = EAGAIN);#endif mds_free_filterdata(inode); out: up(&mds->mds_epoch_sem); return rc;}static int mds_deny_write_access(struct mds_obd *mds, struct inode *inode){ ENTRY; down(&mds->mds_epoch_sem); if (atomic_read(&inode->i_writecount) > 0) { up(&mds->mds_epoch_sem); RETURN(-ETXTBSY); } atomic_dec(&inode->i_writecount); up(&mds->mds_epoch_sem); RETURN(0);}static void mds_allow_write_access(struct inode *inode){ ENTRY; atomic_inc(&inode->i_writecount);}int mds_query_write_access(struct inode *inode){ ENTRY; RETURN(atomic_read(&inode->i_writecount));}/* This replaces the VFS dentry_open, it manages mfd and writecount */static struct mds_file_data *mds_dentry_open(struct dentry *dentry, struct vfsmount *mnt, int flags, struct ptlrpc_request *req){ struct mds_export_data *med = &req->rq_export->exp_mds_data; struct mds_obd *mds = mds_req2mds(req); struct mds_file_data *mfd; struct mds_body *body; int error; ENTRY; mfd = mds_mfd_new(); if (mfd == NULL) { CERROR("mds: out of memory\n"); GOTO(cleanup_dentry, error = -ENOMEM); } body = lustre_msg_buf(req->rq_repmsg, DLM_REPLY_REC_OFF, sizeof(*body)); if (flags & FMODE_WRITE) { /* FIXME: in recovery, need to pass old epoch here */ error = mds_get_write_access(mds, dentry->d_inode, 0); if (error) GOTO(cleanup_mfd, error); body->io_epoch = MDS_FILTERDATA(dentry->d_inode)->io_epoch; } else if (flags & MDS_FMODE_EXEC) { error = mds_deny_write_access(mds, dentry->d_inode); if (error) GOTO(cleanup_mfd, error); } dget(dentry); /* Mark the file as open to handle open-unlink. */ MDS_DOWN_WRITE_ORPHAN_SEM(dentry->d_inode); mds_orphan_open_inc(dentry->d_inode); MDS_UP_WRITE_ORPHAN_SEM(dentry->d_inode); mfd->mfd_mode = flags; mfd->mfd_dentry = dentry; mfd->mfd_xid = req->rq_xid; spin_lock(&med->med_open_lock); list_add(&mfd->mfd_list, &med->med_open_head); spin_unlock(&med->med_open_lock); body->handle.cookie = mfd->mfd_handle.h_cookie; RETURN(mfd);cleanup_mfd: mds_mfd_put(mfd); mds_mfd_unlink(mfd, 1);cleanup_dentry: return ERR_PTR(error);}/* Must be called with i_mutex held */static int mds_create_objects(struct ptlrpc_request *req, int offset, struct mds_update_record *rec, struct mds_obd *mds, struct obd_device *obd, struct dentry *dchild, void **handle, struct lov_mds_md **objid)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -