lov_obd.c

来自「lustre 1.6.5 source code」· C语言 代码 · 共 1,866 行 · 第 1/5 页

C
1,866
字号
}/* the LOV expects oa->o_id to be set to the LOV object id */static int lov_create(struct obd_export *exp, struct obdo *src_oa,                      struct lov_stripe_md **ea, struct obd_trans_info *oti){        struct lov_obd *lov;        struct obd_info oinfo;        struct lov_request_set *set = NULL;        struct lov_request *req;        struct obd_statfs osfs;        __u64 maxage;        int rc = 0;        ENTRY;        LASSERT(ea != NULL);        if (exp == NULL)                RETURN(-EINVAL);        if ((src_oa->o_valid & OBD_MD_FLFLAGS) &&            src_oa->o_flags == OBD_FL_DELORPHAN) {                rc = lov_clear_orphans(exp, src_oa, ea, oti);                RETURN(rc);        }        lov = &exp->exp_obd->u.lov;        if (!lov->desc.ld_active_tgt_count)                RETURN(-EIO);        /* Recreate a specific object id at the given OST index */        if ((src_oa->o_valid & OBD_MD_FLFLAGS) &&            (src_oa->o_flags & OBD_FL_RECREATE_OBJS)) {                 rc = lov_recreate(exp, src_oa, ea, oti);                 RETURN(rc);        }        maxage = cfs_time_shift_64(-lov->desc.ld_qos_maxage);        obd_statfs_rqset(exp->exp_obd, &osfs, maxage, OBD_STATFS_NODELAY);        rc = lov_prep_create_set(exp, &oinfo, ea, src_oa, oti, &set);        if (rc)                RETURN(rc);        list_for_each_entry(req, &set->set_list, rq_link) {                /* XXX: LOV STACKING: use real "obj_mdp" sub-data */                rc = obd_create(lov->lov_tgts[req->rq_idx]->ltd_exp,                                req->rq_oi.oi_oa, &req->rq_oi.oi_md, oti);                lov_update_create_set(set, req, rc);        }        rc = lov_fini_create_set(set, ea);        RETURN(rc);}#define ASSERT_LSM_MAGIC(lsmp)                                                  \do {                                                                            \        LASSERT((lsmp) != NULL);                                                \        LASSERTF(((lsmp)->lsm_magic == LOV_MAGIC ||                             \                 (lsmp)->lsm_magic == LOV_MAGIC_JOIN), "%p->lsm_magic=%x\n",    \                 (lsmp), (lsmp)->lsm_magic);                                    \} while (0)static int lov_destroy(struct obd_export *exp, struct obdo *oa,                       struct lov_stripe_md *lsm, struct obd_trans_info *oti,                       struct obd_export *md_exp){        struct lov_request_set *set;        struct obd_info oinfo;        struct lov_request *req;        struct list_head *pos;        struct lov_obd *lov;        int rc = 0, err;        ENTRY;        ASSERT_LSM_MAGIC(lsm);        if (!exp || !exp->exp_obd)                RETURN(-ENODEV);        if (oa->o_valid & OBD_MD_FLCOOKIE) {                LASSERT(oti);                LASSERT(oti->oti_logcookies);        }        lov = &exp->exp_obd->u.lov;        rc = lov_prep_destroy_set(exp, &oinfo, oa, lsm, oti, &set);        if (rc)                RETURN(rc);        list_for_each (pos, &set->set_list) {                int err;                req = list_entry(pos, struct lov_request, rq_link);                if (oa->o_valid & OBD_MD_FLCOOKIE)                        oti->oti_logcookies = set->set_cookies + req->rq_stripe;                err = obd_destroy(lov->lov_tgts[req->rq_idx]->ltd_exp,                                  req->rq_oi.oi_oa, NULL, oti, NULL);                err = lov_update_common_set(set, req, err);                if (err) {                        CERROR("error: destroying objid "LPX64" subobj "                               LPX64" on OST idx %d: rc = %d\n",                               oa->o_id, req->rq_oi.oi_oa->o_id,                               req->rq_idx, err);                        if (!rc)                                rc = err;                }        }        if (rc == 0) {                LASSERT(lsm_op_find(lsm->lsm_magic) != NULL);                rc = lsm_op_find(lsm->lsm_magic)->lsm_destroy(lsm, oa, md_exp);        }        err = lov_fini_destroy_set(set);        RETURN(rc ? rc : err);}static int lov_getattr(struct obd_export *exp, struct obd_info *oinfo){        struct lov_request_set *set;        struct lov_request *req;        struct list_head *pos;        struct lov_obd *lov;        int err = 0, rc = 0;        ENTRY;        LASSERT(oinfo);        ASSERT_LSM_MAGIC(oinfo->oi_md);        if (!exp || !exp->exp_obd)                RETURN(-ENODEV);        lov = &exp->exp_obd->u.lov;        rc = lov_prep_getattr_set(exp, oinfo, &set);        if (rc)                RETURN(rc);        list_for_each (pos, &set->set_list) {                req = list_entry(pos, struct lov_request, rq_link);                CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "                       "%u\n", oinfo->oi_oa->o_id, req->rq_stripe,                        req->rq_oi.oi_oa->o_id, req->rq_idx);                rc = obd_getattr(lov->lov_tgts[req->rq_idx]->ltd_exp,                                 &req->rq_oi);                err = lov_update_common_set(set, req, rc);                if (err) {                        CERROR("error: getattr objid "LPX64" subobj "                               LPX64" on OST idx %d: rc = %d\n",                               oinfo->oi_oa->o_id, req->rq_oi.oi_oa->o_id,                               req->rq_idx, err);                        break;                }        }        rc = lov_fini_getattr_set(set);        if (err)                rc = err;        RETURN(rc);}static int lov_getattr_interpret(struct ptlrpc_request_set *rqset,                                  void *data, int rc){        struct lov_request_set *lovset = (struct lov_request_set *)data;        int err;        ENTRY;        /* don't do attribute merge if this aysnc op failed */        if (rc)                lovset->set_completes = 0;        err = lov_fini_getattr_set(lovset);        RETURN(rc ? rc : err);}static int lov_getattr_async(struct obd_export *exp, struct obd_info *oinfo,                              struct ptlrpc_request_set *rqset){        struct lov_request_set *lovset;        struct lov_obd *lov;        struct list_head *pos;        struct lov_request *req;        int rc = 0, err;        ENTRY;        LASSERT(oinfo);        ASSERT_LSM_MAGIC(oinfo->oi_md);        if (!exp || !exp->exp_obd)                RETURN(-ENODEV);        lov = &exp->exp_obd->u.lov;        rc = lov_prep_getattr_set(exp, oinfo, &lovset);        if (rc)                RETURN(rc);        CDEBUG(D_INFO, "objid "LPX64": %ux%u byte stripes\n",               oinfo->oi_md->lsm_object_id, oinfo->oi_md->lsm_stripe_count,                oinfo->oi_md->lsm_stripe_size);        list_for_each (pos, &lovset->set_list) {                req = list_entry(pos, struct lov_request, rq_link);                CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "                       "%u\n", oinfo->oi_oa->o_id, req->rq_stripe,                        req->rq_oi.oi_oa->o_id, req->rq_idx);                rc = obd_getattr_async(lov->lov_tgts[req->rq_idx]->ltd_exp,                                       &req->rq_oi, rqset);                if (rc) {                        CERROR("error: getattr objid "LPX64" subobj "                               LPX64" on OST idx %d: rc = %d\n",                               oinfo->oi_oa->o_id, req->rq_oi.oi_oa->o_id,                               req->rq_idx, rc);                        GOTO(out, rc);                }        }        if (!list_empty(&rqset->set_requests)) {                LASSERT(rc == 0);                LASSERT (rqset->set_interpret == NULL);                rqset->set_interpret = lov_getattr_interpret;                rqset->set_arg = (void *)lovset;                RETURN(rc);        }out:        if (rc)                lovset->set_completes = 0;        err = lov_fini_getattr_set(lovset);        RETURN(rc ? rc : err);}static int lov_setattr(struct obd_export *exp, struct obd_info *oinfo,                       struct obd_trans_info *oti){        struct lov_request_set *set;        struct lov_obd *lov;        struct list_head *pos;        struct lov_request *req;        int err = 0, rc = 0;        ENTRY;        LASSERT(oinfo);        ASSERT_LSM_MAGIC(oinfo->oi_md);        if (!exp || !exp->exp_obd)                RETURN(-ENODEV);        /* for now, we only expect the following updates here */        LASSERT(!(oinfo->oi_oa->o_valid & ~(OBD_MD_FLID | OBD_MD_FLTYPE |                                             OBD_MD_FLMODE | OBD_MD_FLATIME |                                             OBD_MD_FLMTIME | OBD_MD_FLCTIME |                                            OBD_MD_FLFLAGS | OBD_MD_FLSIZE |                                             OBD_MD_FLGROUP | OBD_MD_FLUID |                                             OBD_MD_FLGID | OBD_MD_FLFID |                                             OBD_MD_FLGENER)));        lov = &exp->exp_obd->u.lov;        rc = lov_prep_setattr_set(exp, oinfo, oti, &set);        if (rc)                RETURN(rc);        list_for_each (pos, &set->set_list) {                req = list_entry(pos, struct lov_request, rq_link);                rc = obd_setattr(lov->lov_tgts[req->rq_idx]->ltd_exp,                                  &req->rq_oi, NULL);                err = lov_update_setattr_set(set, req, rc);                if (err) {                        CERROR("error: setattr objid "LPX64" subobj "                               LPX64" on OST idx %d: rc = %d\n",                               set->set_oi->oi_oa->o_id,                                req->rq_oi.oi_oa->o_id, req->rq_idx, err);                        if (!rc)                                rc = err;                }        }        err = lov_fini_setattr_set(set);        if (!rc)                rc = err;        RETURN(rc);}static int lov_setattr_interpret(struct ptlrpc_request_set *rqset,                                 void *data, int rc){        struct lov_request_set *lovset = (struct lov_request_set *)data;        int err;        ENTRY;        if (rc)                lovset->set_completes = 0;        err = lov_fini_setattr_set(lovset);        RETURN(rc ? rc : err);}/* If @oti is given, the request goes from MDS and responses from OSTs are not   needed. Otherwise, a client is waiting for responses. */static int lov_setattr_async(struct obd_export *exp, struct obd_info *oinfo,                             struct obd_trans_info *oti,                             struct ptlrpc_request_set *rqset){        struct lov_request_set *set;        struct lov_request *req;        struct list_head *pos;        struct lov_obd *lov;        int rc = 0;        ENTRY;        LASSERT(oinfo);        ASSERT_LSM_MAGIC(oinfo->oi_md);        if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE) {                LASSERT(oti);                LASSERT(oti->oti_logcookies);        }        if (!exp || !exp->exp_obd)                RETURN(-ENODEV);        lov = &exp->exp_obd->u.lov;        rc = lov_prep_setattr_set(exp, oinfo, oti, &set);        if (rc)                RETURN(rc);        CDEBUG(D_INFO, "objid "LPX64": %ux%u byte stripes\n",               oinfo->oi_md->lsm_object_id, oinfo->oi_md->lsm_stripe_count,               oinfo->oi_md->lsm_stripe_size);        list_for_each (pos, &set->set_list) {                req = list_entry(pos, struct lov_request, rq_link);                if (oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)                        oti->oti_logcookies = set->set_cookies + req->rq_stripe;                CDEBUG(D_INFO, "objid "LPX64"[%d] has subobj "LPX64" at idx "                       "%u\n", oinfo->oi_oa->o_id, req->rq_stripe,                       req->rq_oi.oi_oa->o_id, req->rq_idx);                rc = obd_setattr_async(lov->lov_tgts[req->rq_idx]->ltd_exp,                                       &req->rq_oi, oti, rqset);                if (rc) {                        CERROR("error: setattr objid "LPX64" subobj "                               LPX64" on OST idx %d: rc = %d\n",                               set->set_oi->oi_oa->o_id,                               req->rq_oi.oi_oa->o_id,                               req->rq_idx, rc);                        break;                }        }        /* If we are not waiting for responses on async requests, return. */        if (rc || !rqset || list_empty(&rqset->set_requests)) {                int err;                if (rc)                        set->set_completes = 0;                err = lov_fini_setattr_set(set);                RETURN(rc ? rc : err);        }        LASSERT(rqset->set_interpret == NULL);        rqset->set_interpret = lov_setattr_interpret;        rqset->set_arg = (void *)set;        RETURN(0);}static int lov_punch_interpret(struct ptlrpc_request_set *rqset,                               void *data, int rc){        struct lov_request_set *lovset = (struct lov_request_set *)data;        int err;        ENTRY;        if (rc)

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?