filter.c
来自「lustre 1.6.5 source code」· C语言 代码 · 共 1,698 行 · 第 1/5 页
C
1,698 行
/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * * linux/fs/obdfilter/filter.c * * Copyright (c) 2001-2003 Cluster File Systems, Inc. * Author: Peter Braam <braam@clusterfs.com> * Author: Andreas Dilger <adilger@clusterfs.com> * * This file is part of Lustre, http://www.lustre.org. * * Lustre is free software; you can redistribute it and/or * modify it under the terms of version 2 of the GNU General Public * License as published by the Free Software Foundation. * * Lustre is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with Lustre; if not, write to the Free Software * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. *//* * Invariant: Get O/R i_mutex for lookup, if needed, before any journal ops * (which need to get journal_lock, may block if journal full). * * Invariant: Call filter_start_transno() before any journal ops to avoid the * same deadlock problem. We can (and want) to get rid of the * transno sem in favour of the dir/inode i_mutex to avoid single * threaded operation on the OST. */#define DEBUG_SUBSYSTEM S_FILTER#ifndef AUTOCONF_INCLUDED#include <linux/config.h>#endif#include <linux/module.h>#include <linux/fs.h>#include <linux/dcache.h>#include <linux/init.h>#include <linux/version.h>#include <linux/sched.h>#if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))# include <linux/mount.h># include <linux/buffer_head.h>#endif#include <obd_class.h>#include <obd_lov.h>#include <lustre_dlm.h>#include <lustre_fsfilt.h>#include <lprocfs_status.h>#include <lustre_log.h>#include <lustre_commit_confd.h>#include <libcfs/list.h>#include <lustre_disk.h>#include <lustre_quota.h>#include <linux/slab.h>#include <lustre_param.h>#include "filter_internal.h"static struct lvfs_callback_ops filter_lvfs_ops;cfs_mem_cache_t *ll_fmd_cachep;static void filter_commit_cb(struct obd_device *obd, __u64 transno, void *cb_data, int error){ obd_transno_commit_cb(obd, transno, error);}/* Assumes caller has already pushed us into the kernel context. */int filter_finish_transno(struct obd_export *exp, struct obd_trans_info *oti, int rc, int force_sync){ struct filter_obd *filter = &exp->exp_obd->u.filter; struct filter_export_data *fed = &exp->exp_filter_data; struct filter_client_data *fcd = fed->fed_fcd; __u64 last_rcvd; loff_t off; int err, log_pri = D_RPCTRACE; /* Propagate error code. */ if (rc) RETURN(rc); if (!exp->exp_obd->obd_replayable || oti == NULL) RETURN(rc); /* we don't allocate new transnos for replayed requests */ if (oti->oti_transno == 0) { spin_lock(&filter->fo_translock); last_rcvd = le64_to_cpu(filter->fo_fsd->lsd_last_transno) + 1; filter->fo_fsd->lsd_last_transno = cpu_to_le64(last_rcvd); spin_unlock(&filter->fo_translock); oti->oti_transno = last_rcvd; } else { spin_lock(&filter->fo_translock); last_rcvd = oti->oti_transno; if (last_rcvd > le64_to_cpu(filter->fo_fsd->lsd_last_transno)) filter->fo_fsd->lsd_last_transno = cpu_to_le64(last_rcvd); spin_unlock(&filter->fo_translock); } fcd->fcd_last_rcvd = cpu_to_le64(last_rcvd); /* could get xid from oti, if it's ever needed */ fcd->fcd_last_xid = 0; off = fed->fed_lr_off; if (off <= 0) { CERROR("%s: client idx %d is %lld\n", exp->exp_obd->obd_name, fed->fed_lr_idx, fed->fed_lr_off); err = -EINVAL; } else { if (!force_sync) force_sync = fsfilt_add_journal_cb(exp->exp_obd, last_rcvd, oti->oti_handle, filter_commit_cb, NULL); err = fsfilt_write_record(exp->exp_obd, filter->fo_rcvd_filp, fcd, sizeof(*fcd), &off, force_sync | exp->exp_need_sync); if (force_sync) filter_commit_cb(exp->exp_obd, last_rcvd, NULL, err); } if (err) { log_pri = D_ERROR; if (rc == 0) rc = err; } CDEBUG(log_pri, "wrote trans "LPU64" for client %s at #%d: err = %d\n", last_rcvd, fcd->fcd_uuid, fed->fed_lr_idx, err); RETURN(rc);}void f_dput(struct dentry *dentry){ /* Can't go inside filter_ddelete because it can block */ CDEBUG(D_INODE, "putting %s: %p, count = %d\n", dentry->d_name.name, dentry, atomic_read(&dentry->d_count) - 1); LASSERT(atomic_read(&dentry->d_count) > 0); dput(dentry);}static void init_brw_stats(struct brw_stats *brw_stats){ int i; for (i = 0; i < BRW_LAST; i++) spin_lock_init(&brw_stats->hist[i].oh_lock);}static int lprocfs_init_rw_stats(struct obd_device *obd, struct lprocfs_stats **stats){ int num_stats; num_stats = (sizeof(*obd->obd_type->typ_ops) / sizeof(void *)) + LPROC_FILTER_LAST - 1; *stats = lprocfs_alloc_stats(num_stats, 0); if (*stats == NULL) return -ENOMEM; lprocfs_init_ops_stats(LPROC_FILTER_LAST, *stats); lprocfs_counter_init(*stats, LPROC_FILTER_READ_BYTES, LPROCFS_CNTR_AVGMINMAX, "read_bytes", "bytes"); lprocfs_counter_init(*stats, LPROC_FILTER_WRITE_BYTES, LPROCFS_CNTR_AVGMINMAX, "write_bytes", "bytes"); return(0);}/* brw_stats are 2128, ops are 3916, ldlm are 204, so 6248 bytes per client, plus the procfs overhead :( */static int filter_export_stats_init(struct obd_device *obd, struct obd_export *exp, lnet_nid_t client_nid){ struct filter_export_data *fed = &exp->exp_filter_data; struct proc_dir_entry *brw_entry; int rc, newnid = 0; ENTRY; init_brw_stats(&fed->fed_brw_stats); if (obd_uuid_equals(&exp->exp_client_uuid, &obd->obd_uuid)) /* Self-export gets no proc entry */ RETURN(0); rc = lprocfs_exp_setup(exp, client_nid, &newnid); if (rc) RETURN(rc); if (client_nid && newnid) { struct nid_stat *tmp = exp->exp_nid_stats; LASSERT(tmp != NULL); OBD_ALLOC(tmp->nid_brw_stats, sizeof(struct brw_stats)); if (tmp->nid_brw_stats == NULL) RETURN(-ENOMEM); init_brw_stats(tmp->nid_brw_stats); brw_entry = create_proc_entry("brw_stats", 0644, exp->exp_nid_stats->nid_proc); if (brw_entry == NULL) RETURN(-ENOMEM); brw_entry->proc_fops = &filter_per_nid_stats_fops; brw_entry->data = exp->exp_nid_stats; rc = lprocfs_init_rw_stats(obd, &exp->exp_nid_stats->nid_stats); if (rc) RETURN(rc); rc = lprocfs_register_stats(tmp->nid_proc, "stats", tmp->nid_stats); if (rc) RETURN(rc); } RETURN(0);}/* Add client data to the FILTER. We use a bitmap to locate a free space * in the last_rcvd file if cl_idx is -1 (i.e. a new client). * Otherwise, we have just read the data from the last_rcvd file and * we know its offset. */static int filter_client_add(struct obd_device *obd, struct obd_export *exp, int cl_idx, lnet_nid_t client_nid){ struct filter_obd *filter = &obd->u.filter; struct filter_export_data *fed = &exp->exp_filter_data; unsigned long *bitmap = filter->fo_last_rcvd_slots; int new_client = (cl_idx == -1); ENTRY; LASSERT(bitmap != NULL); LASSERTF(cl_idx > -2, "%d\n", cl_idx); /* Self-export */ if (strcmp(fed->fed_fcd->fcd_uuid, obd->obd_uuid.uuid) == 0) RETURN(0); /* the bitmap operations can handle cl_idx > sizeof(long) * 8, so * there's no need for extra complication here */ if (new_client) { cl_idx = find_first_zero_bit(bitmap, LR_MAX_CLIENTS); repeat: if (cl_idx >= LR_MAX_CLIENTS) { CERROR("no room for %u clients - fix LR_MAX_CLIENTS\n", cl_idx); RETURN(-EOVERFLOW); } if (test_and_set_bit(cl_idx, bitmap)) { cl_idx = find_next_zero_bit(bitmap, LR_MAX_CLIENTS, cl_idx); goto repeat; } } else { if (test_and_set_bit(cl_idx, bitmap)) { CERROR("FILTER client %d: bit already set in bitmap!\n", cl_idx); LBUG(); } } fed->fed_lr_idx = cl_idx; fed->fed_lr_off = le32_to_cpu(filter->fo_fsd->lsd_client_start) + cl_idx * le16_to_cpu(filter->fo_fsd->lsd_client_size); LASSERTF(fed->fed_lr_off > 0, "fed_lr_off = %llu\n", fed->fed_lr_off); CDEBUG(D_INFO, "client at index %d (%llu) with UUID '%s' added\n", fed->fed_lr_idx, fed->fed_lr_off, fed->fed_fcd->fcd_uuid); if (new_client) { struct lvfs_run_ctxt saved; loff_t off = fed->fed_lr_off; void *handle; int rc; CDEBUG(D_INFO, "writing client fcd at idx %u (%llu) (len %u)\n", fed->fed_lr_idx,off,(unsigned int)sizeof(*fed->fed_fcd)); push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); /* Transaction needed to fix bug 1403 */ handle = fsfilt_start(obd, filter->fo_rcvd_filp->f_dentry->d_inode, FSFILT_OP_SETATTR, NULL); if (IS_ERR(handle)) { rc = PTR_ERR(handle); CERROR("unable to start transaction: rc %d\n", rc); } else { rc = fsfilt_add_journal_cb(obd, 0, handle, target_client_add_cb, exp); if (rc == 0) { spin_lock(&exp->exp_lock); exp->exp_need_sync = 1; spin_unlock(&exp->exp_lock); } rc = fsfilt_write_record(obd, filter->fo_rcvd_filp, fed->fed_fcd, sizeof(*fed->fed_fcd), &off, rc /* sync if no cb */); fsfilt_commit(obd, filter->fo_rcvd_filp->f_dentry->d_inode, handle, 0); } pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); if (rc) { CERROR("error writing %s client idx %u: rc %d\n", LAST_RCVD, fed->fed_lr_idx, rc); RETURN(rc); } } RETURN(0);}static int filter_client_free(struct obd_export *exp){ struct filter_export_data *fed = &exp->exp_filter_data; struct filter_obd *filter = &exp->exp_obd->u.filter; struct obd_device *obd = exp->exp_obd; struct filter_client_data zero_fcd; struct lvfs_run_ctxt saved; int rc; loff_t off; ENTRY; if (fed->fed_fcd == NULL)
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?