📄 handler.c
字号:
/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * * lustre/mds/handler.c * Lustre Metadata Server (mds) request handler * * Copyright (c) 2001-2005 Cluster File Systems, Inc. * Author: Peter Braam <braam@clusterfs.com> * Author: Andreas Dilger <adilger@clusterfs.com> * Author: Phil Schwan <phil@clusterfs.com> * Author: Mike Shaver <shaver@clusterfs.com> * * This file is part of the Lustre file system, http://www.lustre.org * Lustre is a trademark of Cluster File Systems, Inc. * * You may have signed or agreed to another license before downloading * this software. If so, you are bound by the terms and conditions * of that agreement, and the following does not apply to you. See the * LICENSE file included with this distribution for more information. * * If you did not agree to a different license, then this copy of Lustre * is open source software; you can redistribute it and/or modify it * under the terms of version 2 of the GNU General Public License as * published by the Free Software Foundation. * * In either case, Lustre is distributed in the hope that it will be * useful, but WITHOUT ANY WARRANTY; without even the implied warranty * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * license text for more details. */#ifndef EXPORT_SYMTAB# define EXPORT_SYMTAB#endif#define DEBUG_SUBSYSTEM S_MDS#include <lustre_mds.h>#include <linux/module.h>#include <linux/init.h>#include <linux/random.h>#include <linux/fs.h>#include <linux/jbd.h>#if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))# include <linux/smp_lock.h># include <linux/buffer_head.h># include <linux/workqueue.h># include <linux/mount.h>#else# include <linux/locks.h>#endif#include <obd_class.h>#include <lustre_dlm.h>#include <obd_lov.h>#include <lustre_fsfilt.h>#include <lprocfs_status.h>#include <lustre_commit_confd.h>#include <lustre_quota.h>#include <lustre_disk.h>#include <lustre_param.h>#include "mds_internal.h"int mds_num_threads;CFS_MODULE_PARM(mds_num_threads, "i", int, 0444, "number of MDS service threads to start");static int mds_intent_policy(struct ldlm_namespace *ns, struct ldlm_lock **lockp, void *req_cookie, ldlm_mode_t mode, int flags, void *data);static int mds_postsetup(struct obd_device *obd);static int mds_cleanup(struct obd_device *obd);/* Assumes caller has already pushed into the kernel filesystem context */static int mds_sendpage(struct ptlrpc_request *req, struct file *file, loff_t offset, int count){ struct ptlrpc_bulk_desc *desc; struct l_wait_info lwi; struct page **pages; int timeout; int rc = 0, npages, i, tmpcount, tmpsize = 0; ENTRY; LASSERT((offset & ~CFS_PAGE_MASK) == 0); /* I'm dubious about this */ npages = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT; OBD_ALLOC(pages, sizeof(*pages) * npages); if (!pages) GOTO(out, rc = -ENOMEM); desc = ptlrpc_prep_bulk_exp(req, npages, BULK_PUT_SOURCE, MDS_BULK_PORTAL); if (desc == NULL) GOTO(out_free, rc = -ENOMEM); for (i = 0, tmpcount = count; i < npages; i++, tmpcount -= tmpsize) { tmpsize = tmpcount > CFS_PAGE_SIZE ? CFS_PAGE_SIZE : tmpcount; OBD_PAGE_ALLOC(pages[i], CFS_ALLOC_STD); if (pages[i] == NULL) GOTO(cleanup_buf, rc = -ENOMEM); ptlrpc_prep_bulk_page(desc, pages[i], 0, tmpsize); } for (i = 0, tmpcount = count; i < npages; i++, tmpcount -= tmpsize) { tmpsize = tmpcount > CFS_PAGE_SIZE ? CFS_PAGE_SIZE : tmpcount; CDEBUG(D_EXT2, "reading %u@%llu from dir %lu (size %llu)\n", tmpsize, offset, file->f_dentry->d_inode->i_ino, i_size_read(file->f_dentry->d_inode)); rc = fsfilt_readpage(req->rq_export->exp_obd, file, kmap(pages[i]), tmpsize, &offset); kunmap(pages[i]); if (rc != tmpsize) GOTO(cleanup_buf, rc = -EIO); } LASSERT(desc->bd_nob == count); rc = ptlrpc_start_bulk_transfer(desc); if (rc) GOTO(cleanup_buf, rc); if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) { CERROR("obd_fail_loc=%x, fail operation rc=%d\n", OBD_FAIL_MDS_SENDPAGE, rc); GOTO(abort_bulk, rc); } timeout = (int)req->rq_deadline - (int)cfs_time_current_sec(); if (timeout < 0) { CERROR("Req deadline already passed %lu (now: %lu)\n", req->rq_deadline, cfs_time_current_sec()); } lwi = LWI_TIMEOUT(max(timeout, 1) * HZ, NULL, NULL); rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), &lwi); LASSERT (rc == 0 || rc == -ETIMEDOUT); if (rc == 0) { if (desc->bd_success && desc->bd_nob_transferred == count) GOTO(cleanup_buf, rc); rc = -ETIMEDOUT; /* XXX should this be a different errno? */ } DEBUG_REQ(D_ERROR, req, "bulk failed: %s %d(%d), evicting %s@%s\n", (rc == -ETIMEDOUT) ? "timeout" : "network error", desc->bd_nob_transferred, count, req->rq_export->exp_client_uuid.uuid, req->rq_export->exp_connection->c_remote_uuid.uuid); class_fail_export(req->rq_export); EXIT; abort_bulk: ptlrpc_abort_bulk (desc); cleanup_buf: for (i = 0; i < npages; i++) if (pages[i]) OBD_PAGE_FREE(pages[i]); ptlrpc_free_bulk(desc); out_free: OBD_FREE(pages, sizeof(*pages) * npages); out: return rc;}/* only valid locked dentries or errors should be returned */struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid, struct vfsmount **mnt, int lock_mode, struct lustre_handle *lockh, char *name, int namelen, __u64 lockpart){ struct mds_obd *mds = &obd->u.mds; struct dentry *de = mds_fid2dentry(mds, fid, mnt), *retval = de; struct ldlm_res_id res_id = { .name = {0} }; int flags = LDLM_FL_ATOMIC_CB, rc; ldlm_policy_data_t policy = { .l_inodebits = { lockpart} }; ENTRY; if (IS_ERR(de)) RETURN(de); res_id.name[0] = de->d_inode->i_ino; res_id.name[1] = de->d_inode->i_generation; rc = ldlm_cli_enqueue_local(obd->obd_namespace, &res_id, LDLM_IBITS, &policy, lock_mode, &flags, ldlm_blocking_ast, ldlm_completion_ast, NULL, NULL, 0, NULL, lockh); if (rc != ELDLM_OK) { l_dput(de); retval = ERR_PTR(-EIO); /* XXX translate ldlm code */ } RETURN(retval);}/* Look up an entry by inode number. *//* this function ONLY returns valid dget'd dentries with an initialized inode or errors */struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid, struct vfsmount **mnt){ char fid_name[32]; unsigned long ino = fid->id; __u32 generation = fid->generation; struct inode *inode; struct dentry *result; if (ino == 0) RETURN(ERR_PTR(-ESTALE)); snprintf(fid_name, sizeof(fid_name), "0x%lx", ino); CDEBUG(D_DENTRY, "--> mds_fid2dentry: ino/gen %lu/%u, sb %p\n", ino, generation, mds->mds_obt.obt_sb); /* under ext3 this is neither supposed to return bad inodes nor NULL inodes. */ result = ll_lookup_one_len(fid_name, mds->mds_fid_de, strlen(fid_name)); if (IS_ERR(result)) RETURN(result); inode = result->d_inode; if (!inode) RETURN(ERR_PTR(-ENOENT)); if (inode->i_nlink == 0) { if (inode->i_mode == 0 && LTIME_S(inode->i_ctime) == 0 ) { struct obd_device *obd = container_of(mds, struct obd_device, u.mds); LCONSOLE_WARN("Found inode with zero nlink, mode and " "ctime -- this may indicate disk" "corruption (device %s, inode %lu, link:" " %lu, count: %d)\n", obd->obd_name, inode->i_ino, (unsigned long)inode->i_nlink, atomic_read(&inode->i_count)); } dput(result); RETURN(ERR_PTR(-ENOENT)); } if (generation && inode->i_generation != generation) { /* we didn't find the right inode.. */ CDEBUG(D_INODE, "found wrong generation: inode %lu, link: %lu, " "count: %d, generation %u/%u\n", inode->i_ino, (unsigned long)inode->i_nlink, atomic_read(&inode->i_count), inode->i_generation, generation); dput(result); RETURN(ERR_PTR(-ENOENT)); } if (mnt) { *mnt = mds->mds_vfsmnt; mntget(*mnt); } RETURN(result);}static int mds_connect_internal(struct obd_export *exp, struct obd_connect_data *data){ struct obd_device *obd = exp->exp_obd; if (data != NULL) { data->ocd_connect_flags &= MDS_CONNECT_SUPPORTED; data->ocd_ibits_known &= MDS_INODELOCK_FULL; /* If no known bits (which should not happen, probably, as everybody should support LOOKUP and UPDATE bits at least) revert to compat mode with plain locks. */ if (!data->ocd_ibits_known && data->ocd_connect_flags & OBD_CONNECT_IBITS) data->ocd_connect_flags &= ~OBD_CONNECT_IBITS; if (!obd->u.mds.mds_fl_acl) data->ocd_connect_flags &= ~OBD_CONNECT_ACL; if (!obd->u.mds.mds_fl_user_xattr) data->ocd_connect_flags &= ~OBD_CONNECT_XATTR; exp->exp_connect_flags = data->ocd_connect_flags; data->ocd_version = LUSTRE_VERSION_CODE; exp->exp_mds_data.med_ibits_known = data->ocd_ibits_known; } if (obd->u.mds.mds_fl_acl && ((exp->exp_connect_flags & OBD_CONNECT_ACL) == 0)) { CWARN("%s: MDS requires ACL support but client does not\n", obd->obd_name); return -EBADE; } return 0;}static int mds_reconnect(struct obd_export *exp, struct obd_device *obd, struct obd_uuid *cluuid, struct obd_connect_data *data){ int rc; ENTRY; if (exp == NULL || obd == NULL || cluuid == NULL) RETURN(-EINVAL); rc = mds_connect_internal(exp, data); RETURN(rc);}/* Establish a connection to the MDS. * * This will set up an export structure for the client to hold state data * about that client, like open files, the last operation number it did * on the server, etc. */static int mds_connect(struct lustre_handle *conn, struct obd_device *obd, struct obd_uuid *cluuid, struct obd_connect_data *data, void *localdata){ struct obd_export *exp; struct mds_export_data *med; struct mds_client_data *mcd = NULL; lnet_nid_t *client_nid = (lnet_nid_t *)localdata; int rc, abort_recovery; ENTRY; if (!conn || !obd || !cluuid) RETURN(-EINVAL); /* Check for aborted recovery. */ spin_lock_bh(&obd->obd_processing_task_lock); abort_recovery = obd->obd_abort_recovery; spin_unlock_bh(&obd->obd_processing_task_lock); if (abort_recovery) target_abort_recovery(obd); /* XXX There is a small race between checking the list and adding a * new connection for the same UUID, but the real threat (list * corruption when multiple different clients connect) is solved. * * There is a second race between adding the export to the list, * and filling in the client data below. Hence skipping the case * of NULL mcd above. We should already be controlling multiple * connects at the client, and we can't hold the spinlock over * memory allocations without risk of deadlocking. */ rc = class_connect(conn, obd, cluuid);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -