lov_request.c

来自「lustre 1.6.5 source code」· C语言 代码 · 共 1,640 行 · 第 1/4 页

C
1,640
字号
                GOTO(out_set, rc = -ENOMEM);        lockh->cookie = set->set_lockh->llh_handle.h_cookie;        for (i = 0; i < lsm->lsm_stripe_count; i++){                struct lov_request *req;                obd_off start, end;                loi = lsm->lsm_oinfo[i];                if (!lov_stripe_intersects(lsm, i, policy->l_extent.start,                                           policy->l_extent.end, &start, &end))                        continue;                /* FIXME raid1 should grace this error */                if (!lov->lov_tgts[loi->loi_ost_idx] ||                    !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {                        CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);                        GOTO(out_set, rc = -EIO);                }                OBD_ALLOC(req, sizeof(*req));                if (req == NULL)                        GOTO(out_set, rc = -ENOMEM);                req->rq_buflen = sizeof(*req->rq_oi.oi_md);                OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);                if (req->rq_oi.oi_md == NULL) {                        OBD_FREE(req, sizeof(*req));                        GOTO(out_set, rc = -ENOMEM);                }                req->rq_oi.oi_policy.l_extent.start = start;                req->rq_oi.oi_policy.l_extent.end = end;                req->rq_oi.oi_policy.l_extent.gid = policy->l_extent.gid;                req->rq_idx = loi->loi_ost_idx;                req->rq_stripe = i;                /* XXX LOV STACKING: submd should be from the subobj */                req->rq_oi.oi_md->lsm_object_id = loi->loi_id;                req->rq_oi.oi_md->lsm_stripe_count = 0;                lov_set_add_req(req, set);        }        if (!set->set_count)                GOTO(out_set, rc = -EIO);        *reqset = set;        RETURN(rc);out_set:        lov_fini_match_set(set, mode, 0);        RETURN(rc);}int lov_fini_cancel_set(struct lov_request_set *set){        int rc = 0;        ENTRY;        if (set == NULL)                RETURN(0);        LASSERT(set->set_exp);        if (set->set_lockh)                lov_llh_put(set->set_lockh);        if (atomic_dec_and_test(&set->set_refcount))                lov_finish_set(set);        RETURN(rc);}int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo,                        struct lov_stripe_md *lsm, __u32 mode,                        struct lustre_handle *lockh,                        struct lov_request_set **reqset){        struct lov_request_set *set;        int i, rc = 0;        struct lov_oinfo *loi;        ENTRY;        OBD_ALLOC(set, sizeof(*set));        if (set == NULL)                RETURN(-ENOMEM);        lov_init_set(set);        set->set_exp = exp;        set->set_oi = oinfo;        set->set_oi->oi_md = lsm;        set->set_lockh = lov_handle2llh(lockh);        if (set->set_lockh == NULL) {                CERROR("LOV: invalid lov lock handle %p\n", lockh);                GOTO(out_set, rc = -EINVAL);        }        lockh->cookie = set->set_lockh->llh_handle.h_cookie;        for (i = 0; i < lsm->lsm_stripe_count; i++){                struct lov_request *req;                struct lustre_handle *lov_lockhp;                loi = lsm->lsm_oinfo[i];                lov_lockhp = set->set_lockh->llh_handles + i;                if (!lustre_handle_is_used(lov_lockhp)) {                        CDEBUG(D_RPCTRACE,"lov idx %d subobj "LPX64" no lock\n",                               loi->loi_ost_idx, loi->loi_id);                        continue;                }                OBD_ALLOC(req, sizeof(*req));                if (req == NULL)                        GOTO(out_set, rc = -ENOMEM);                req->rq_buflen = sizeof(*req->rq_oi.oi_md);                OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);                if (req->rq_oi.oi_md == NULL) {                        OBD_FREE(req, sizeof(*req));                        GOTO(out_set, rc = -ENOMEM);                }                req->rq_idx = loi->loi_ost_idx;                req->rq_stripe = i;                /* XXX LOV STACKING: submd should be from the subobj */                req->rq_oi.oi_md->lsm_object_id = loi->loi_id;                req->rq_oi.oi_md->lsm_stripe_count = 0;                lov_set_add_req(req, set);        }        if (!set->set_count)                GOTO(out_set, rc = -EIO);        *reqset = set;        RETURN(rc);out_set:        lov_fini_cancel_set(set);        RETURN(rc);}static int create_done(struct obd_export *exp, struct lov_request_set *set,                       struct lov_stripe_md **lsmp){        struct lov_obd *lov = &exp->exp_obd->u.lov;        struct obd_trans_info *oti = set->set_oti;        struct obdo *src_oa = set->set_oi->oi_oa;        struct lov_request *req;        struct obdo *ret_oa = NULL;        int attrset = 0, rc = 0;        ENTRY;        LASSERT(set->set_completes);        /* try alloc objects on other osts if osc_create fails for         * exceptions: RPC failure, ENOSPC, etc */        if (set->set_count != set->set_success) {                list_for_each_entry (req, &set->set_list, rq_link) {                        if (req->rq_rc == 0)                                continue;                        set->set_completes--;                        req->rq_complete = 0;                        rc = qos_remedy_create(set, req);                        lov_update_create_set(set, req, rc);                        if (rc)                                break;                }        }        /* no successful creates */        if (set->set_success == 0)                GOTO(cleanup, rc);        /* If there was an explicit stripe set, fail.  Otherwise, we         * got some objects and that's not bad. */        if (set->set_count != set->set_success) {                if (*lsmp)                        GOTO(cleanup, rc);                set->set_count = set->set_success;                qos_shrink_lsm(set);        }        OBDO_ALLOC(ret_oa);        if (ret_oa == NULL)                GOTO(cleanup, rc = -ENOMEM);        list_for_each_entry(req, &set->set_list, rq_link) {                if (!req->rq_complete || req->rq_rc)                        continue;                lov_merge_attrs(ret_oa, req->rq_oi.oi_oa,                                req->rq_oi.oi_oa->o_valid, set->set_oi->oi_md,                                req->rq_stripe, &attrset);        }        if (src_oa->o_valid & OBD_MD_FLSIZE &&            ret_oa->o_size != src_oa->o_size) {                CERROR("original size "LPU64" isn't new object size "LPU64"\n",                       src_oa->o_size, ret_oa->o_size);                LBUG();        }        ret_oa->o_id = src_oa->o_id;        memcpy(src_oa, ret_oa, sizeof(*src_oa));        OBDO_FREE(ret_oa);        *lsmp = set->set_oi->oi_md;        GOTO(done, rc = 0);cleanup:        list_for_each_entry(req, &set->set_list, rq_link) {                struct obd_export *sub_exp;                int err = 0;                if (!req->rq_complete || req->rq_rc)                        continue;                sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp;                err = obd_destroy(sub_exp, req->rq_oi.oi_oa, NULL, oti, NULL);                if (err)                        CERROR("Failed to uncreate objid "LPX64" subobj "                               LPX64" on OST idx %d: rc = %d\n",                               src_oa->o_id, req->rq_oi.oi_oa->o_id,                               req->rq_idx, rc);        }        if (*lsmp == NULL)                obd_free_memmd(exp, &set->set_oi->oi_md);done:        if (oti && set->set_cookies) {                oti->oti_logcookies = set->set_cookies;                if (!set->set_cookie_sent) {                        oti_free_cookies(oti);                        src_oa->o_valid &= ~OBD_MD_FLCOOKIE;                } else {                        src_oa->o_valid |= OBD_MD_FLCOOKIE;                }        }        RETURN(rc);}int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp){        int rc = 0;        ENTRY;        if (set == NULL)                RETURN(0);        LASSERT(set->set_exp);        if (set->set_completes)                rc = create_done(set->set_exp, set, lsmp);        if (atomic_dec_and_test(&set->set_refcount))                lov_finish_set(set);        RETURN(rc);}int lov_update_create_set(struct lov_request_set *set,                          struct lov_request *req, int rc){        struct obd_trans_info *oti = set->set_oti;        struct lov_stripe_md *lsm = set->set_oi->oi_md;        struct lov_oinfo *loi;        struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;        ENTRY;        req->rq_stripe = set->set_success;        loi = lsm->lsm_oinfo[req->rq_stripe];        if (rc && lov->lov_tgts[req->rq_idx] &&            lov->lov_tgts[req->rq_idx]->ltd_active) {                CERROR("error creating fid "LPX64" sub-object"                       " on OST idx %d/%d: rc = %d\n",                       set->set_oi->oi_oa->o_id, req->rq_idx,                       lsm->lsm_stripe_count, rc);                if (rc > 0) {                        CERROR("obd_create returned invalid err %d\n", rc);                        rc = -EIO;                }        }        lov_update_set(set, req, rc);        if (rc)                RETURN(rc);        loi->loi_id = req->rq_oi.oi_oa->o_id;        loi->loi_ost_idx = req->rq_idx;        CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n",               lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx);        loi_init(loi);        if (oti && set->set_cookies)                ++oti->oti_logcookies;        if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE)                set->set_cookie_sent++;        RETURN(0);}int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo,                        struct lov_stripe_md **lsmp, struct obdo *src_oa,                        struct obd_trans_info *oti,                        struct lov_request_set **reqset){        struct lov_request_set *set;        int rc = 0;        ENTRY;        OBD_ALLOC(set, sizeof(*set));        if (set == NULL)                RETURN(-ENOMEM);        lov_init_set(set);        set->set_exp = exp;        set->set_oi = oinfo;        set->set_oi->oi_md = *lsmp;        set->set_oi->oi_oa = src_oa;        set->set_oti = oti;        rc = qos_prep_create(exp, set);        if (rc)                lov_fini_create_set(set, lsmp);        else                *reqset = set;        RETURN(rc);}static int common_attr_done(struct lov_request_set *set){        struct list_head *pos;        struct lov_request *req;        struct obdo *tmp_oa;        int rc = 0, attrset = 0;        ENTRY;        LASSERT(set->set_oi != NULL);        if (set->set_oi->oi_oa == NULL)                RETURN(0);        if (!set->set_success)                RETURN(-EIO);        OBDO_ALLOC(tmp_oa);        if (tmp_oa == NULL)                GOTO(out, rc = -ENOMEM);        list_for_each (pos, &set->set_list) {                req = list_entry(pos, struct lov_request, rq_link);                if (!req->rq_complete || req->rq_rc)                        continue;                if (req->rq_oi.oi_oa->o_valid == 0)   /* inactive stripe */                        continue;                lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa,                                req->rq_oi.oi_oa->o_valid,                                set->set_oi->oi_md, req->rq_stripe, &attrset);        }        if (!attrset) {                CERROR("No stripes had valid attrs\n");                rc = -EIO;        }        tmp_oa->o_id = set->set_oi->oi_oa->o_id;        memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));out:        if (tmp_oa)                OBDO_FREE(tmp_oa);        RETURN(rc);}static int brw_done(struct lov_request_set *set){        struct lov_stripe_md *lsm = set->set_oi->oi_md;        struct lov_oinfo     *loi = NULL;        struct list_head *pos;        struct lov_request *req;        ENTRY;        list_for_each (pos, &set->set_list) {                req = list_entry(pos, struct lov_request, rq_link);                if (!req->rq_complete || req->rq_rc)                        continue;                loi = lsm->lsm_oinfo[req->rq_stripe];                if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS)                        loi->loi_lvb.lvb_blocks = req->rq_oi.oi_oa->o_blocks;        }        RETURN(0);}int lov_fini_brw_set(struct lov_request_set *set){        int rc = 0;        ENTRY;        if (set == NULL)                RETURN(0);        LASSERT(set->set_exp);        if (set->set_completes) {                rc = brw_done(set);                /* FIXME update qos data here */        }        if (atomic_dec_and_test(&set->set_refcount))                lov_finish_set(set);        RETURN(rc);}int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo,                     obd_count oa_bufs, struct brw_page *pga,                     struct obd_trans_info *oti,                     struct lov_request_set **reqset)

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?