quota_master.c
来自「lustre 1.6.5 source code」· C语言 代码 · 共 1,592 行 · 第 1/4 页
C
1,592 行
/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * * lustre/quota/quota_master.c * Lustre Quota Master request handler * * Copyright (c) 2001-2005 Cluster File Systems, Inc. * Author: Niu YaWei <niu@clusterfs.com> * * This file is part of Lustre, http://www.lustre.org. * * No redistribution or use is permitted outside of Cluster File Systems, Inc. * */#ifndef EXPORT_SYMTAB# define EXPORT_SYMTAB#endif#define DEBUG_SUBSYSTEM S_MDS#include <linux/version.h>#include <linux/fs.h>#include <asm/unistd.h>#include <linux/slab.h>#include <linux/quotaops.h>#include <linux/module.h>#include <linux/init.h>#include <linux/quota.h>#include <obd_class.h>#include <lustre_quota.h>#include <lustre_fsfilt.h>#include <lustre_mds.h>#include "quota_internal.h"/* lock ordering: mds->mds_qonoff_sem > dquot->dq_sem */static struct list_head lustre_dquot_hash[NR_DQHASH];static spinlock_t dquot_hash_lock = SPIN_LOCK_UNLOCKED;cfs_mem_cache_t *lustre_dquot_cachep;int lustre_dquot_init(void){ int i; ENTRY; LASSERT(lustre_dquot_cachep == NULL); lustre_dquot_cachep = cfs_mem_cache_create("lustre_dquot_cache", sizeof(struct lustre_dquot), 0, 0); if (!lustre_dquot_cachep) return (-ENOMEM); for (i = 0; i < NR_DQHASH; i++) { INIT_LIST_HEAD(lustre_dquot_hash + i); } RETURN(0);}void lustre_dquot_exit(void){ int i; ENTRY; /* FIXME cleanup work ?? */ for (i = 0; i < NR_DQHASH; i++) { LASSERT(list_empty(lustre_dquot_hash + i)); } if (lustre_dquot_cachep) { int rc; rc = cfs_mem_cache_destroy(lustre_dquot_cachep); LASSERTF(rc == 0,"couldn't destroy lustre_dquot_cachep slab\n"); lustre_dquot_cachep = NULL; } EXIT;}static inline intdquot_hashfn(struct lustre_quota_info *info, unsigned int id, int type) __attribute__((__const__));static inline intdquot_hashfn(struct lustre_quota_info *info, unsigned int id, int type){ unsigned long tmp = ((unsigned long)info >> L1_CACHE_SHIFT) ^ id; tmp = (tmp * (MAXQUOTAS - type)) % NR_DQHASH; return tmp;}/* caller must hold dquot_hash_lock */static struct lustre_dquot *find_dquot(int hashent, struct lustre_quota_info *lqi, qid_t id, int type){ struct lustre_dquot *dquot; ENTRY; LASSERT_SPIN_LOCKED(&dquot_hash_lock); list_for_each_entry(dquot, &lustre_dquot_hash[hashent], dq_hash) { if (dquot->dq_info == lqi && dquot->dq_id == id && dquot->dq_type == type) RETURN(dquot); } RETURN(NULL);}static struct lustre_dquot *alloc_dquot(struct lustre_quota_info *lqi, qid_t id, int type){ struct lustre_dquot *dquot = NULL; ENTRY; OBD_SLAB_ALLOC(dquot, lustre_dquot_cachep, CFS_ALLOC_IO, sizeof(*dquot)); if (dquot == NULL) RETURN(NULL); INIT_LIST_HEAD(&dquot->dq_hash); init_mutex_locked(&dquot->dq_sem); dquot->dq_refcnt = 1; dquot->dq_info = lqi; dquot->dq_id = id; dquot->dq_type = type; dquot->dq_status = DQ_STATUS_AVAIL; RETURN(dquot);}static void free_dquot(struct lustre_dquot *dquot){ OBD_SLAB_FREE(dquot, lustre_dquot_cachep, sizeof(*dquot));}static void insert_dquot_nolock(struct lustre_dquot *dquot){ struct list_head *head = lustre_dquot_hash + dquot_hashfn(dquot->dq_info, dquot->dq_id, dquot->dq_type); LASSERT(list_empty(&dquot->dq_hash)); list_add(&dquot->dq_hash, head);}static void remove_dquot_nolock(struct lustre_dquot *dquot){ LASSERT(!list_empty(&dquot->dq_hash)); list_del_init(&dquot->dq_hash);}static void lustre_dqput(struct lustre_dquot *dquot){ ENTRY; spin_lock(&dquot_hash_lock); LASSERT(dquot->dq_refcnt); dquot->dq_refcnt--; if (!dquot->dq_refcnt) { remove_dquot_nolock(dquot); free_dquot(dquot); } spin_unlock(&dquot_hash_lock); EXIT;}static struct lustre_dquot *lustre_dqget(struct obd_device *obd, struct lustre_quota_info *lqi, qid_t id, int type){ unsigned int hashent = dquot_hashfn(lqi, id, type); struct lustre_dquot *dquot, *empty; ENTRY; if ((empty = alloc_dquot(lqi, id, type)) == NULL) RETURN(ERR_PTR(-ENOMEM)); spin_lock(&dquot_hash_lock); if ((dquot = find_dquot(hashent, lqi, id, type)) != NULL) { dquot->dq_refcnt++; spin_unlock(&dquot_hash_lock); free_dquot(empty); } else { int rc; dquot = empty; insert_dquot_nolock(dquot); spin_unlock(&dquot_hash_lock); rc = fsfilt_dquot(obd, dquot, QFILE_RD_DQUOT); up(&dquot->dq_sem); if (rc) { CERROR("can't read dquot from admin quotafile! " "(rc:%d)\n", rc); lustre_dqput(dquot); RETURN(ERR_PTR(rc)); } } LASSERT(dquot); RETURN(dquot);}static void init_oqaq(struct quota_adjust_qunit *oqaq, struct lustre_quota_ctxt *qctxt, qid_t id, int type){ struct lustre_qunit_size *lqs = NULL; oqaq->qaq_id = id; oqaq->qaq_flags = type; quota_search_lqs(NULL, oqaq, qctxt, &lqs); if (lqs) { spin_lock(&lqs->lqs_lock); oqaq->qaq_bunit_sz = lqs->lqs_bunit_sz; oqaq->qaq_iunit_sz = lqs->lqs_iunit_sz; oqaq->qaq_flags = lqs->lqs_flags; spin_unlock(&lqs->lqs_lock); lqs_putref(lqs); } else { CDEBUG(D_QUOTA, "Can't find the lustre qunit size!\n"); oqaq->qaq_bunit_sz = qctxt->lqc_bunit_sz; oqaq->qaq_iunit_sz = qctxt->lqc_iunit_sz; }}int dqacq_adjust_qunit_sz(struct obd_device *obd, qid_t id, int type, __u32 is_blk){ struct mds_obd *mds = &obd->u.mds; struct lustre_quota_ctxt *qctxt = &mds->mds_obt.obt_qctxt; struct obd_device *lov_mds_obd = class_exp2obd(mds->mds_osc_exp); struct lov_obd *lov = &lov_mds_obd->u.lov; __u32 ost_num = lov->desc.ld_tgt_count, mdt_num = 1; struct quota_adjust_qunit *oqaq = NULL; unsigned int uid = 0, gid = 0; struct lustre_quota_info *info = &mds->mds_quota_info; struct lustre_dquot *dquot = NULL; int adjust_res = 0; int rc = 0; ENTRY; LASSERT(mds); dquot = lustre_dqget(obd, info, id, type); if (IS_ERR(dquot)) RETURN(PTR_ERR(dquot)); OBD_ALLOC_PTR(oqaq); if (!oqaq) GOTO(out, rc = -ENOMEM); down(&dquot->dq_sem); init_oqaq(oqaq, qctxt, id, type); rc = dquot_create_oqaq(qctxt, dquot, ost_num, mdt_num, is_blk ? LQUOTA_FLAGS_ADJBLK : LQUOTA_FLAGS_ADJINO, oqaq); if (rc < 0) { CDEBUG(D_ERROR, "create oqaq failed! (rc:%d)\n", rc); GOTO(out_sem, rc); } QAQ_DEBUG(oqaq, "show oqaq.\n") if (!QAQ_IS_ADJBLK(oqaq) && !QAQ_IS_ADJINO(oqaq)) GOTO(out_sem, rc); /* adjust the mds slave qunit size */ adjust_res = quota_adjust_slave_lqs(oqaq, qctxt); if (adjust_res <= 0) { if (adjust_res < 0) { rc = adjust_res; CDEBUG(D_ERROR, "adjust mds slave's qunit size failed! \ (rc:%d)\n", rc); } else { CDEBUG(D_QUOTA, "qunit doesn't need to be adjusted.\n"); } GOTO(out_sem, rc); } if (type) gid = dquot->dq_id; else uid = dquot->dq_id; up(&dquot->dq_sem); rc = qctxt_adjust_qunit(obd, qctxt, uid, gid, is_blk, 0); if (rc == -EDQUOT || rc == -EBUSY) { CDEBUG(D_QUOTA, "rc: %d.\n", rc); rc = 0; } if (rc) { CDEBUG(D_ERROR, "mds fail to adjust file quota! \ (rc:%d)\n", rc); GOTO(out, rc); } /* only when block qunit is reduced, boardcast to osts */ if ((adjust_res & LQS_BLK_DECREASE) && QAQ_IS_ADJBLK(oqaq)) rc = obd_quota_adjust_qunit(mds->mds_osc_exp, oqaq);out: lustre_dqput(dquot); if (oqaq) OBD_FREE_PTR(oqaq); RETURN(rc);out_sem: up(&dquot->dq_sem); goto out;}int dqacq_handler(struct obd_device *obd, struct qunit_data *qdata, int opc){ struct mds_obd *mds = &obd->u.mds; struct lustre_quota_ctxt *qctxt = &mds->mds_obt.obt_qctxt; struct lustre_quota_info *info = &mds->mds_quota_info; struct lustre_dquot *dquot = NULL; __u64 *usage = NULL; __u64 hlimit = 0, slimit = 0; time_t *time = NULL; unsigned int grace = 0; struct lustre_qunit_size *lqs = NULL; int rc = 0; ENTRY; OBD_FAIL_RETURN(OBD_FAIL_OBD_DQACQ, -EIO); dquot = lustre_dqget(obd, info, qdata->qd_id, QDATA_IS_GRP(qdata)); if (IS_ERR(dquot)) RETURN(PTR_ERR(dquot)); DQUOT_DEBUG(dquot, "get dquot in dqacq_handler\n"); QINFO_DEBUG(dquot->dq_info, "get dquot in dqadq_handler\n"); down(&mds->mds_qonoff_sem); down(&dquot->dq_sem); if (dquot->dq_status & DQ_STATUS_RECOVERY) { DQUOT_DEBUG(dquot, "this dquot is under recovering.\n"); GOTO(out, rc = -EBUSY); } if (QDATA_IS_BLK(qdata)) { grace = info->qi_info[QDATA_IS_GRP(qdata)].dqi_bgrace; usage = &dquot->dq_dqb.dqb_curspace; hlimit = dquot->dq_dqb.dqb_bhardlimit; slimit = dquot->dq_dqb.dqb_bsoftlimit; time = &dquot->dq_dqb.dqb_btime; } else { grace = info->qi_info[QDATA_IS_GRP(qdata)].dqi_igrace; usage = (__u64 *) & dquot->dq_dqb.dqb_curinodes; hlimit = dquot->dq_dqb.dqb_ihardlimit; slimit = dquot->dq_dqb.dqb_isoftlimit; time = &dquot->dq_dqb.dqb_itime; } /* if the quota limit in admin quotafile is zero, we just inform * slave to clear quota limit with zero qd_count */ if (hlimit == 0 && slimit == 0) { qdata->qd_count = 0; GOTO(out, rc); } switch (opc) { case QUOTA_DQACQ: if (hlimit && QUSG(*usage + qdata->qd_count, QDATA_IS_BLK(qdata)) > hlimit) { if (QDATA_IS_CHANGE_QS(qdata) && QUSG(*usage, QDATA_IS_BLK(qdata)) < hlimit) qdata->qd_count = (hlimit - QUSG(*usage, QDATA_IS_BLK(qdata))) * QUOTABLOCK_SIZE; else GOTO(out, rc = -EDQUOT); } if (slimit && QUSG(*usage + qdata->qd_count, QDATA_IS_BLK(qdata)) > slimit) { if (*time && cfs_time_current_sec() >= *time) GOTO(out, rc = -EDQUOT); else if (!*time) *time = cfs_time_current_sec() + grace; } *usage += qdata->qd_count; break; case QUOTA_DQREL: /* The usage in administrative file might be incorrect before * recovery done */ if (*usage - qdata->qd_count < 0) *usage = 0; else *usage -= qdata->qd_count; /* (usage <= soft limit) but not (usage < soft limit) */ if (!slimit || QUSG(*usage, QDATA_IS_BLK(qdata)) <= slimit) *time = 0; break; default:
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?