lov_request.c
来自「lustre 1.6.5 source code」· C语言 代码 · 共 1,640 行 · 第 1/4 页
C
1,640 行
GOTO(out_set, rc = -ENOMEM); lockh->cookie = set->set_lockh->llh_handle.h_cookie; for (i = 0; i < lsm->lsm_stripe_count; i++){ struct lov_request *req; obd_off start, end; loi = lsm->lsm_oinfo[i]; if (!lov_stripe_intersects(lsm, i, policy->l_extent.start, policy->l_extent.end, &start, &end)) continue; /* FIXME raid1 should grace this error */ if (!lov->lov_tgts[loi->loi_ost_idx] || !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) { CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); GOTO(out_set, rc = -EIO); } OBD_ALLOC(req, sizeof(*req)); if (req == NULL) GOTO(out_set, rc = -ENOMEM); req->rq_buflen = sizeof(*req->rq_oi.oi_md); OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen); if (req->rq_oi.oi_md == NULL) { OBD_FREE(req, sizeof(*req)); GOTO(out_set, rc = -ENOMEM); } req->rq_oi.oi_policy.l_extent.start = start; req->rq_oi.oi_policy.l_extent.end = end; req->rq_oi.oi_policy.l_extent.gid = policy->l_extent.gid; req->rq_idx = loi->loi_ost_idx; req->rq_stripe = i; /* XXX LOV STACKING: submd should be from the subobj */ req->rq_oi.oi_md->lsm_object_id = loi->loi_id; req->rq_oi.oi_md->lsm_stripe_count = 0; lov_set_add_req(req, set); } if (!set->set_count) GOTO(out_set, rc = -EIO); *reqset = set; RETURN(rc);out_set: lov_fini_match_set(set, mode, 0); RETURN(rc);}int lov_fini_cancel_set(struct lov_request_set *set){ int rc = 0; ENTRY; if (set == NULL) RETURN(0); LASSERT(set->set_exp); if (set->set_lockh) lov_llh_put(set->set_lockh); if (atomic_dec_and_test(&set->set_refcount)) lov_finish_set(set); RETURN(rc);}int lov_prep_cancel_set(struct obd_export *exp, struct obd_info *oinfo, struct lov_stripe_md *lsm, __u32 mode, struct lustre_handle *lockh, struct lov_request_set **reqset){ struct lov_request_set *set; int i, rc = 0; struct lov_oinfo *loi; ENTRY; OBD_ALLOC(set, sizeof(*set)); if (set == NULL) RETURN(-ENOMEM); lov_init_set(set); set->set_exp = exp; set->set_oi = oinfo; set->set_oi->oi_md = lsm; set->set_lockh = lov_handle2llh(lockh); if (set->set_lockh == NULL) { CERROR("LOV: invalid lov lock handle %p\n", lockh); GOTO(out_set, rc = -EINVAL); } lockh->cookie = set->set_lockh->llh_handle.h_cookie; for (i = 0; i < lsm->lsm_stripe_count; i++){ struct lov_request *req; struct lustre_handle *lov_lockhp; loi = lsm->lsm_oinfo[i]; lov_lockhp = set->set_lockh->llh_handles + i; if (!lustre_handle_is_used(lov_lockhp)) { CDEBUG(D_RPCTRACE,"lov idx %d subobj "LPX64" no lock\n", loi->loi_ost_idx, loi->loi_id); continue; } OBD_ALLOC(req, sizeof(*req)); if (req == NULL) GOTO(out_set, rc = -ENOMEM); req->rq_buflen = sizeof(*req->rq_oi.oi_md); OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen); if (req->rq_oi.oi_md == NULL) { OBD_FREE(req, sizeof(*req)); GOTO(out_set, rc = -ENOMEM); } req->rq_idx = loi->loi_ost_idx; req->rq_stripe = i; /* XXX LOV STACKING: submd should be from the subobj */ req->rq_oi.oi_md->lsm_object_id = loi->loi_id; req->rq_oi.oi_md->lsm_stripe_count = 0; lov_set_add_req(req, set); } if (!set->set_count) GOTO(out_set, rc = -EIO); *reqset = set; RETURN(rc);out_set: lov_fini_cancel_set(set); RETURN(rc);}static int create_done(struct obd_export *exp, struct lov_request_set *set, struct lov_stripe_md **lsmp){ struct lov_obd *lov = &exp->exp_obd->u.lov; struct obd_trans_info *oti = set->set_oti; struct obdo *src_oa = set->set_oi->oi_oa; struct lov_request *req; struct obdo *ret_oa = NULL; int attrset = 0, rc = 0; ENTRY; LASSERT(set->set_completes); /* try alloc objects on other osts if osc_create fails for * exceptions: RPC failure, ENOSPC, etc */ if (set->set_count != set->set_success) { list_for_each_entry (req, &set->set_list, rq_link) { if (req->rq_rc == 0) continue; set->set_completes--; req->rq_complete = 0; rc = qos_remedy_create(set, req); lov_update_create_set(set, req, rc); if (rc) break; } } /* no successful creates */ if (set->set_success == 0) GOTO(cleanup, rc); /* If there was an explicit stripe set, fail. Otherwise, we * got some objects and that's not bad. */ if (set->set_count != set->set_success) { if (*lsmp) GOTO(cleanup, rc); set->set_count = set->set_success; qos_shrink_lsm(set); } OBDO_ALLOC(ret_oa); if (ret_oa == NULL) GOTO(cleanup, rc = -ENOMEM); list_for_each_entry(req, &set->set_list, rq_link) { if (!req->rq_complete || req->rq_rc) continue; lov_merge_attrs(ret_oa, req->rq_oi.oi_oa, req->rq_oi.oi_oa->o_valid, set->set_oi->oi_md, req->rq_stripe, &attrset); } if (src_oa->o_valid & OBD_MD_FLSIZE && ret_oa->o_size != src_oa->o_size) { CERROR("original size "LPU64" isn't new object size "LPU64"\n", src_oa->o_size, ret_oa->o_size); LBUG(); } ret_oa->o_id = src_oa->o_id; memcpy(src_oa, ret_oa, sizeof(*src_oa)); OBDO_FREE(ret_oa); *lsmp = set->set_oi->oi_md; GOTO(done, rc = 0);cleanup: list_for_each_entry(req, &set->set_list, rq_link) { struct obd_export *sub_exp; int err = 0; if (!req->rq_complete || req->rq_rc) continue; sub_exp = lov->lov_tgts[req->rq_idx]->ltd_exp; err = obd_destroy(sub_exp, req->rq_oi.oi_oa, NULL, oti, NULL); if (err) CERROR("Failed to uncreate objid "LPX64" subobj " LPX64" on OST idx %d: rc = %d\n", src_oa->o_id, req->rq_oi.oi_oa->o_id, req->rq_idx, rc); } if (*lsmp == NULL) obd_free_memmd(exp, &set->set_oi->oi_md);done: if (oti && set->set_cookies) { oti->oti_logcookies = set->set_cookies; if (!set->set_cookie_sent) { oti_free_cookies(oti); src_oa->o_valid &= ~OBD_MD_FLCOOKIE; } else { src_oa->o_valid |= OBD_MD_FLCOOKIE; } } RETURN(rc);}int lov_fini_create_set(struct lov_request_set *set,struct lov_stripe_md **lsmp){ int rc = 0; ENTRY; if (set == NULL) RETURN(0); LASSERT(set->set_exp); if (set->set_completes) rc = create_done(set->set_exp, set, lsmp); if (atomic_dec_and_test(&set->set_refcount)) lov_finish_set(set); RETURN(rc);}int lov_update_create_set(struct lov_request_set *set, struct lov_request *req, int rc){ struct obd_trans_info *oti = set->set_oti; struct lov_stripe_md *lsm = set->set_oi->oi_md; struct lov_oinfo *loi; struct lov_obd *lov = &set->set_exp->exp_obd->u.lov; ENTRY; req->rq_stripe = set->set_success; loi = lsm->lsm_oinfo[req->rq_stripe]; if (rc && lov->lov_tgts[req->rq_idx] && lov->lov_tgts[req->rq_idx]->ltd_active) { CERROR("error creating fid "LPX64" sub-object" " on OST idx %d/%d: rc = %d\n", set->set_oi->oi_oa->o_id, req->rq_idx, lsm->lsm_stripe_count, rc); if (rc > 0) { CERROR("obd_create returned invalid err %d\n", rc); rc = -EIO; } } lov_update_set(set, req, rc); if (rc) RETURN(rc); loi->loi_id = req->rq_oi.oi_oa->o_id; loi->loi_ost_idx = req->rq_idx; CDEBUG(D_INODE, "objid "LPX64" has subobj "LPX64"/"LPU64" at idx %d\n", lsm->lsm_object_id, loi->loi_id, loi->loi_id, req->rq_idx); loi_init(loi); if (oti && set->set_cookies) ++oti->oti_logcookies; if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLCOOKIE) set->set_cookie_sent++; RETURN(0);}int lov_prep_create_set(struct obd_export *exp, struct obd_info *oinfo, struct lov_stripe_md **lsmp, struct obdo *src_oa, struct obd_trans_info *oti, struct lov_request_set **reqset){ struct lov_request_set *set; int rc = 0; ENTRY; OBD_ALLOC(set, sizeof(*set)); if (set == NULL) RETURN(-ENOMEM); lov_init_set(set); set->set_exp = exp; set->set_oi = oinfo; set->set_oi->oi_md = *lsmp; set->set_oi->oi_oa = src_oa; set->set_oti = oti; rc = qos_prep_create(exp, set); if (rc) lov_fini_create_set(set, lsmp); else *reqset = set; RETURN(rc);}static int common_attr_done(struct lov_request_set *set){ struct list_head *pos; struct lov_request *req; struct obdo *tmp_oa; int rc = 0, attrset = 0; ENTRY; LASSERT(set->set_oi != NULL); if (set->set_oi->oi_oa == NULL) RETURN(0); if (!set->set_success) RETURN(-EIO); OBDO_ALLOC(tmp_oa); if (tmp_oa == NULL) GOTO(out, rc = -ENOMEM); list_for_each (pos, &set->set_list) { req = list_entry(pos, struct lov_request, rq_link); if (!req->rq_complete || req->rq_rc) continue; if (req->rq_oi.oi_oa->o_valid == 0) /* inactive stripe */ continue; lov_merge_attrs(tmp_oa, req->rq_oi.oi_oa, req->rq_oi.oi_oa->o_valid, set->set_oi->oi_md, req->rq_stripe, &attrset); } if (!attrset) { CERROR("No stripes had valid attrs\n"); rc = -EIO; } tmp_oa->o_id = set->set_oi->oi_oa->o_id; memcpy(set->set_oi->oi_oa, tmp_oa, sizeof(*set->set_oi->oi_oa));out: if (tmp_oa) OBDO_FREE(tmp_oa); RETURN(rc);}static int brw_done(struct lov_request_set *set){ struct lov_stripe_md *lsm = set->set_oi->oi_md; struct lov_oinfo *loi = NULL; struct list_head *pos; struct lov_request *req; ENTRY; list_for_each (pos, &set->set_list) { req = list_entry(pos, struct lov_request, rq_link); if (!req->rq_complete || req->rq_rc) continue; loi = lsm->lsm_oinfo[req->rq_stripe]; if (req->rq_oi.oi_oa->o_valid & OBD_MD_FLBLOCKS) loi->loi_lvb.lvb_blocks = req->rq_oi.oi_oa->o_blocks; } RETURN(0);}int lov_fini_brw_set(struct lov_request_set *set){ int rc = 0; ENTRY; if (set == NULL) RETURN(0); LASSERT(set->set_exp); if (set->set_completes) { rc = brw_done(set); /* FIXME update qos data here */ } if (atomic_dec_and_test(&set->set_refcount)) lov_finish_set(set); RETURN(rc);}int lov_prep_brw_set(struct obd_export *exp, struct obd_info *oinfo, obd_count oa_bufs, struct brw_page *pga, struct obd_trans_info *oti, struct lov_request_set **reqset)
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?