quota_context.c
来自「lustre 1.6.5 source code」· C语言 代码 · 共 1,161 行 · 第 1/3 页
C
1,161 行
OBD_SLAB_ALLOC(qunit, qunit_cachep, CFS_ALLOC_IO, sizeof(*qunit)); if (qunit == NULL) RETURN(NULL); INIT_LIST_HEAD(&qunit->lq_hash); init_waitqueue_head(&qunit->lq_waitq); atomic_set(&qunit->lq_refcnt, 1); qunit->lq_ctxt = qctxt; memcpy(&qunit->lq_data, qdata, sizeof(*qdata)); qunit->lq_opc = opc; qunit->lq_lock = SPIN_LOCK_UNLOCKED; QUNIT_SET_STATE_AND_RC(qunit, QUNIT_CREATED, 0); RETURN(qunit);}static inline void free_qunit(struct lustre_qunit *qunit){ OBD_SLAB_FREE(qunit, qunit_cachep, sizeof(*qunit));}static inline void qunit_get(struct lustre_qunit *qunit){ atomic_inc(&qunit->lq_refcnt);}static void qunit_put(struct lustre_qunit *qunit){ LASSERT(atomic_read(&qunit->lq_refcnt)); if (atomic_dec_and_test(&qunit->lq_refcnt)) free_qunit(qunit);}static voidinsert_qunit_nolock(struct lustre_quota_ctxt *qctxt, struct lustre_qunit *qunit){ struct list_head *head; LASSERT(list_empty(&qunit->lq_hash)); head = qunit_hash + qunit_hashfn(qctxt, &qunit->lq_data); list_add(&qunit->lq_hash, head); QUNIT_SET_STATE(qunit, QUNIT_IN_HASH);}static void compute_lqs_after_removing_qunit(struct lustre_qunit *qunit){ struct lustre_qunit_size *lqs = NULL; quota_search_lqs(&qunit->lq_data, NULL, qunit->lq_ctxt, &lqs); if (lqs) { spin_lock(&lqs->lqs_lock); if (qunit->lq_opc == QUOTA_DQACQ) quota_compute_lqs(&qunit->lq_data, lqs, 0, 1); if (qunit->lq_opc == QUOTA_DQREL) quota_compute_lqs(&qunit->lq_data, lqs, 0, 0); spin_unlock(&lqs->lqs_lock); /* this is for quota_search_lqs */ lqs_putref(lqs); /* this is for schedule_dqacq */ lqs_putref(lqs); }}static void remove_qunit_nolock(struct lustre_qunit *qunit){ LASSERT(!list_empty(&qunit->lq_hash)); LASSERT_SPIN_LOCKED(&qunit_hash_lock); list_del_init(&qunit->lq_hash); QUNIT_SET_STATE(qunit, QUNIT_RM_FROM_HASH);}#define INC_QLIMIT(limit, count) (limit == MIN_QLIMIT) ? \ (limit = count) : (limit += count)/* FIXME check if this mds is the master of specified id */static intis_master(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, unsigned int id, int type){ return qctxt->lqc_handler ? 1 : 0;}static intschedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, struct qunit_data *qdata, int opc, int wait);static int split_before_schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, struct qunit_data *qdata, int opc, int wait){ int rc = 0; unsigned long factor; struct qunit_data tmp_qdata; ENTRY; LASSERT(qdata && qdata->qd_count); QDATA_DEBUG(qdata, "%s quota split.\n", QDATA_IS_BLK(qdata) ? "block" : "inode"); if (QDATA_IS_BLK(qdata)) factor = MAX_QUOTA_COUNT32 / qctxt->lqc_bunit_sz * qctxt->lqc_bunit_sz; else factor = MAX_QUOTA_COUNT32 / qctxt->lqc_iunit_sz * qctxt->lqc_iunit_sz; if (qctxt->lqc_import && should_translate_quota(qctxt->lqc_import) && qdata->qd_count > factor) { tmp_qdata = *qdata; tmp_qdata.qd_count = factor; qdata->qd_count -= tmp_qdata.qd_count; QDATA_DEBUG((&tmp_qdata), "be split.\n"); rc = schedule_dqacq(obd, qctxt, &tmp_qdata, opc, wait); } else{ QDATA_DEBUG(qdata, "don't be split.\n"); rc = schedule_dqacq(obd, qctxt, qdata, opc, wait); } RETURN(rc);}static intdqacq_completion(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, struct qunit_data *qdata, int rc, int opc){ struct lustre_qunit *qunit = NULL; struct super_block *sb = qctxt->lqc_sb; int err = 0; struct quota_adjust_qunit *oqaq = NULL; int rc1 = 0; ENTRY; LASSERT(qdata); QDATA_DEBUG(qdata, "obd(%s): complete %s quota req\n", obd->obd_name, (opc == QUOTA_DQACQ) ? "acq" : "rel"); /* update local operational quota file */ if (rc == 0) { __u64 count = QUSG(qdata->qd_count, QDATA_IS_BLK(qdata)); struct obd_quotactl *qctl; __u64 *hardlimit; OBD_ALLOC_PTR(qctl); if (qctl == NULL) GOTO(out, err = -ENOMEM); /* acq/rel qunit for specified uid/gid is serialized, * so there is no race between get fs quota limit and * set fs quota limit */ qctl->qc_cmd = Q_GETQUOTA; qctl->qc_id = qdata->qd_id; qctl->qc_type = QDATA_IS_GRP(qdata); err = fsfilt_quotactl(obd, sb, qctl); if (err) { CERROR("error get quota fs limit! (rc:%d)\n", err); GOTO(out_mem, err); } if (QDATA_IS_BLK(qdata)) { qctl->qc_dqblk.dqb_valid = QIF_BLIMITS; hardlimit = &qctl->qc_dqblk.dqb_bhardlimit; } else { qctl->qc_dqblk.dqb_valid = QIF_ILIMITS; hardlimit = &qctl->qc_dqblk.dqb_ihardlimit; } CDEBUG(D_QUOTA, "hardlimt: "LPU64"\n", *hardlimit); switch (opc) { case QUOTA_DQACQ: INC_QLIMIT(*hardlimit, count); break; case QUOTA_DQREL: LASSERTF(count < *hardlimit, "count: "LPU64", hardlimit: "LPU64".\n", count, *hardlimit); *hardlimit -= count; break; default: LBUG(); } /* clear quota limit */ if (count == 0) *hardlimit = 0; qctl->qc_cmd = Q_SETQUOTA; err = fsfilt_quotactl(obd, sb, qctl); if (err) CERROR("error set quota fs limit! (rc:%d)\n", err); QDATA_DEBUG(qdata, "%s completion\n", opc == QUOTA_DQACQ ? "DQACQ" : "DQREL");out_mem: OBD_FREE_PTR(qctl); } else if (rc == -EDQUOT) { QDATA_DEBUG(qdata, "acquire qunit got EDQUOT.\n"); } else if (rc == -EBUSY) { QDATA_DEBUG(qdata, "it's is recovering, got EBUSY.\n"); } else { CERROR("acquire qunit got error! (rc:%d)\n", rc); }out: /* remove the qunit from hash */ spin_lock(&qunit_hash_lock); qunit = dqacq_in_flight(qctxt, qdata); /* this qunit has been removed by qctxt_cleanup() */ if (!qunit) { spin_unlock(&qunit_hash_lock); QDATA_DEBUG(qdata, "%s is discarded because qunit isn't found\n", opc == QUOTA_DQACQ ? "DQACQ" : "DQREL"); RETURN(err); } LASSERT(opc == qunit->lq_opc); /* remove this qunit from lq_hash so that new processes cannot be added * to qunit->lq_waiters */ remove_qunit_nolock(qunit); spin_unlock(&qunit_hash_lock); compute_lqs_after_removing_qunit(qunit); /* wake up all waiters */ QUNIT_SET_STATE_AND_RC(qunit, QUNIT_FINISHED, rc); wake_up(&qunit->lq_waitq); qunit_put(qunit); if (rc < 0 && rc != -EDQUOT) RETURN(err); /* don't reschedule in such cases: * - acq/rel failure and qunit isn't changed, * but not for quota recovery. * - local dqacq/dqrel. * - local disk io failure. */ OBD_ALLOC_PTR(oqaq); if (!oqaq) RETURN(-ENOMEM); qdata_to_oqaq(qdata, oqaq); /* adjust the qunit size in slaves */ rc1 = quota_adjust_slave_lqs(oqaq, qctxt); OBD_FREE_PTR(oqaq); if (rc1 < 0) { CERROR("adjust slave's qunit size failed!(rc:%d)\n", rc1); RETURN(rc1); } if (err || (rc && rc != -EBUSY && rc1 == 0) || is_master(obd, qctxt, qdata->qd_id, QDATA_IS_GRP(qdata))) RETURN(err); /* reschedule another dqacq/dqrel if needed */ qdata->qd_count = 0; qdata->qd_flags &= LQUOTA_QUNIT_FLAGS; rc1 = check_cur_qunit(obd, qctxt, qdata); if (rc1 > 0) { int opc; opc = rc1 == 1 ? QUOTA_DQACQ : QUOTA_DQREL; rc1 = split_before_schedule_dqacq(obd, qctxt, qdata, opc, 0); QDATA_DEBUG(qdata, "reschedudle opc(%d) rc(%d)\n", opc, rc1); } RETURN(err);}struct dqacq_async_args { struct lustre_quota_ctxt *aa_ctxt; struct lustre_qunit *aa_qunit;};static int dqacq_interpret(struct ptlrpc_request *req, void *data, int rc){ struct dqacq_async_args *aa = (struct dqacq_async_args *)data; struct lustre_quota_ctxt *qctxt = aa->aa_ctxt; struct lustre_qunit *qunit = aa->aa_qunit; struct obd_device *obd = req->rq_import->imp_obd; struct qunit_data *qdata = NULL; int rc1 = 0; ENTRY; LASSERT(req); LASSERT(req->rq_import); /* there are several forms of qunit(historic causes), so we need to * adjust qunit from slaves to the same form here */ OBD_ALLOC(qdata, sizeof(struct qunit_data)); if (!qdata) RETURN(-ENOMEM); if (rc == -EIO || rc == -EINTR || rc == -ENOTCONN ) /* if a quota req timeouts or is dropped, we should update quota * statistics which will be handled in dqacq_completion. And in * this situation we should get qdata from request instead of * reply */ rc1 = quota_get_qdata(req, qdata, QUOTA_REQUEST, QUOTA_IMPORT); else rc1 = quota_get_qdata(req, qdata, QUOTA_REPLY, QUOTA_IMPORT); if (rc1 < 0) { DEBUG_REQ(D_ERROR, req, "error unpacking qunit_data\n"); GOTO(exit, rc = -EPROTO); } QDATA_DEBUG(qdata, "qdata: interpret rc(%d).\n", rc); QDATA_DEBUG((&qunit->lq_data), "lq_data: \n"); if (qdata->qd_id != qunit->lq_data.qd_id || OBD_FAIL_CHECK_ONCE(OBD_FAIL_QUOTA_RET_QDATA)) { CDEBUG(D_ERROR, "the returned qd_id isn't expected!" "(qdata: %u, lq_data: %u)\n", qdata->qd_id, qunit->lq_data.qd_id); qdata->qd_id = qunit->lq_data.qd_id; rc = -EPROTO; } if (QDATA_IS_GRP(qdata) != QDATA_IS_GRP(&qunit->lq_data)) { CDEBUG(D_ERROR, "the returned grp/usr isn't expected!" "(qdata: %u, lq_data: %u)\n", qdata->qd_flags, qunit->lq_data.qd_flags); if (QDATA_IS_GRP(&qunit->lq_data)) QDATA_SET_GRP(qdata); else QDATA_CLR_GRP(qdata); rc = -EPROTO; } if (qdata->qd_count > qunit->lq_data.qd_count) { CDEBUG(D_ERROR, "the returned qd_count isn't expected!" "(qdata: "LPU64", lq_data: "LPU64")\n", qdata->qd_count, qunit->lq_data.qd_count); rc = -EPROTO; } rc = dqacq_completion(obd, qctxt, qdata, rc, lustre_msg_get_opc(req->rq_reqmsg));exit: OBD_FREE(qdata, sizeof(struct qunit_data)); RETURN(rc);}static int got_qunit(struct lustre_qunit *qunit){ int rc; ENTRY; spin_lock(&qunit->lq_lock); switch (qunit->lq_state) { case QUNIT_IN_HASH: case QUNIT_RM_FROM_HASH: rc = 0; break; case QUNIT_FINISHED: rc = 1; break; default: rc = 0; CERROR("invalid qunit state %d\n", qunit->lq_state); } spin_unlock(&qunit->lq_lock); RETURN(rc);}static intschedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, struct qunit_data *qdata, int opc, int wait){ struct lustre_qunit *qunit, *empty; struct l_wait_info lwi = { 0 }; struct ptlrpc_request *req; struct dqacq_async_args *aa; int size[2] = { sizeof(struct ptlrpc_body), 0 }; struct obd_import *imp = NULL; unsigned long factor; struct lustre_qunit_size *lqs = NULL; int rc = 0; ENTRY; if ((empty = alloc_qunit(qctxt, qdata, opc)) == NULL) RETURN(-ENOMEM); spin_lock(&qunit_hash_lock); qunit = dqacq_in_flight(qctxt, qdata); if (qunit) { if (wait) qunit_get(qunit); spin_unlock(&qunit_hash_lock); free_qunit(empty);
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?