osc_request.c
来自「lustre 1.6.5 source code」· C语言 代码 · 共 1,753 行 · 第 1/5 页
C
1,753 行
} RETURN(0);}int osc_real_create(struct obd_export *exp, struct obdo *oa, struct lov_stripe_md **ea, struct obd_trans_info *oti){ struct ptlrpc_request *req; struct ost_body *body; struct lov_stripe_md *lsm; int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) }; ENTRY; LASSERT(oa); LASSERT(ea); lsm = *ea; if (!lsm) { rc = obd_alloc_memmd(exp, &lsm); if (rc < 0) RETURN(rc); } req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION, OST_CREATE, 2, size, NULL); if (!req) GOTO(out, rc = -ENOMEM); body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body)); memcpy(&body->oa, oa, sizeof(body->oa)); ptlrpc_req_set_repsize(req, 2, size); if ((oa->o_valid & OBD_MD_FLFLAGS) && oa->o_flags == OBD_FL_DELORPHAN) { DEBUG_REQ(D_HA, req, "delorphan from OST integration"); /* Don't resend the delorphan req */ req->rq_no_resend = req->rq_no_delay = 1; } rc = ptlrpc_queue_wait(req); if (rc) GOTO(out_req, rc); body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body), lustre_swab_ost_body); if (body == NULL) { CERROR ("can't unpack ost_body\n"); GOTO (out_req, rc = -EPROTO); } memcpy(oa, &body->oa, sizeof(*oa)); /* This should really be sent by the OST */ oa->o_blksize = PTLRPC_MAX_BRW_SIZE; oa->o_valid |= OBD_MD_FLBLKSZ; /* XXX LOV STACKING: the lsm that is passed to us from LOV does not * have valid lsm_oinfo data structs, so don't go touching that. * This needs to be fixed in a big way. */ lsm->lsm_object_id = oa->o_id; *ea = lsm; if (oti != NULL) { oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg); if (oa->o_valid & OBD_MD_FLCOOKIE) { if (!oti->oti_logcookies) oti_alloc_cookies(oti, 1); *oti->oti_logcookies = oa->o_lcookie; } } CDEBUG(D_HA, "transno: "LPD64"\n", lustre_msg_get_transno(req->rq_repmsg));out_req: ptlrpc_req_finished(req);out: if (rc && !*ea) obd_free_memmd(exp, &lsm); RETURN(rc);}static int osc_punch_interpret(struct ptlrpc_request *req, struct osc_async_args *aa, int rc){ struct ost_body *body; ENTRY; if (rc != 0) GOTO(out, rc); body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body), lustre_swab_ost_body); if (body == NULL) { CERROR ("can't unpack ost_body\n"); GOTO(out, rc = -EPROTO); } memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));out: rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc); RETURN(rc);}static int osc_punch(struct obd_export *exp, struct obd_info *oinfo, struct obd_trans_info *oti, struct ptlrpc_request_set *rqset){ struct ptlrpc_request *req; struct osc_async_args *aa; struct ost_body *body; int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) }; ENTRY; if (!oinfo->oi_oa) { CERROR("oa NULL\n"); RETURN(-EINVAL); } req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION, OST_PUNCH, 2, size, NULL); if (!req) RETURN(-ENOMEM); req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */ ptlrpc_at_set_req_timeout(req); body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body)); memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa)); /* overload the size and blocks fields in the oa with start/end */ body->oa.o_size = oinfo->oi_policy.l_extent.start; body->oa.o_blocks = oinfo->oi_policy.l_extent.end; body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS); ptlrpc_req_set_repsize(req, 2, size); req->rq_interpret_reply = osc_punch_interpret; CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); aa = (struct osc_async_args *)&req->rq_async_args; aa->aa_oi = oinfo; ptlrpc_set_add_req(rqset, req); RETURN(0);}static int osc_sync(struct obd_export *exp, struct obdo *oa, struct lov_stripe_md *md, obd_size start, obd_size end){ struct ptlrpc_request *req; struct ost_body *body; int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) }; ENTRY; if (!oa) { CERROR("oa NULL\n"); RETURN(-EINVAL); } req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION, OST_SYNC, 2, size, NULL); if (!req) RETURN(-ENOMEM); body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body)); memcpy(&body->oa, oa, sizeof(*oa)); /* overload the size and blocks fields in the oa with start/end */ body->oa.o_size = start; body->oa.o_blocks = end; body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS); ptlrpc_req_set_repsize(req, 2, size); rc = ptlrpc_queue_wait(req); if (rc) GOTO(out, rc); body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body), lustre_swab_ost_body); if (body == NULL) { CERROR ("can't unpack ost_body\n"); GOTO (out, rc = -EPROTO); } memcpy(oa, &body->oa, sizeof(*oa)); EXIT; out: ptlrpc_req_finished(req); return rc;}/* Find and cancel locally locks matched by @mode in the resource found by * @objid. Found locks are added into @cancel list. Returns the amount of * locks added to @cancels list. */static int osc_resource_get_unused(struct obd_export *exp, __u64 objid, struct list_head *cancels, ldlm_mode_t mode, int lock_flags){ struct ldlm_namespace *ns = exp->exp_obd->obd_namespace; struct ldlm_res_id res_id = { .name = { objid } }; struct ldlm_resource *res = ldlm_resource_get(ns, NULL, res_id, 0, 0); int count; ENTRY; if (res == NULL) RETURN(0); count = ldlm_cancel_resource_local(res, cancels, NULL, mode, lock_flags, 0, NULL); ldlm_resource_putref(res); RETURN(count);}static int osc_destroy_interpret(struct ptlrpc_request *req, void *data, int rc){ struct client_obd *cli = &req->rq_import->imp_obd->u.cli; atomic_dec(&cli->cl_destroy_in_flight); cfs_waitq_signal(&cli->cl_destroy_waitq); return 0;}static int osc_can_send_destroy(struct client_obd *cli){ if (atomic_inc_return(&cli->cl_destroy_in_flight) <= cli->cl_max_rpcs_in_flight) { /* The destroy request can be sent */ return 1; } if (atomic_dec_return(&cli->cl_destroy_in_flight) < cli->cl_max_rpcs_in_flight) { /* * The counter has been modified between the two atomic * operations. */ cfs_waitq_signal(&cli->cl_destroy_waitq); } return 0;}/* Destroy requests can be async always on the client, and we don't even really * care about the return code since the client cannot do anything at all about * a destroy failure. * When the MDS is unlinking a filename, it saves the file objects into a * recovery llog, and these object records are cancelled when the OST reports * they were destroyed and sync'd to disk (i.e. transaction committed). * If the client dies, or the OST is down when the object should be destroyed, * the records are not cancelled, and when the OST reconnects to the MDS next, * it will retrieve the llog unlink logs and then sends the log cancellation * cookies to the MDS after committing destroy transactions. */static int osc_destroy(struct obd_export *exp, struct obdo *oa, struct lov_stripe_md *ea, struct obd_trans_info *oti, struct obd_export *md_export){ CFS_LIST_HEAD(cancels); struct ptlrpc_request *req; struct ost_body *body; int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body), sizeof(struct ldlm_request) }; int count, bufcount = 2; struct client_obd *cli = &exp->exp_obd->u.cli; ENTRY; if (!oa) { CERROR("oa NULL\n"); RETURN(-EINVAL); } LASSERT(oa->o_id != 0); count = osc_resource_get_unused(exp, oa->o_id, &cancels, LCK_PW, LDLM_FL_DISCARD_DATA); if (exp_connect_cancelset(exp)) bufcount = 3; req = ldlm_prep_elc_req(exp, LUSTRE_OST_VERSION, OST_DESTROY, bufcount, size, REQ_REC_OFF + 1, 0, &cancels, count); if (!req) RETURN(-ENOMEM); req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */ req->rq_interpret_reply = osc_destroy_interpret; ptlrpc_at_set_req_timeout(req); body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body)); if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) { oa->o_lcookie = *oti->oti_logcookies; } memcpy(&body->oa, oa, sizeof(*oa)); ptlrpc_req_set_repsize(req, 2, size); if (!osc_can_send_destroy(cli)) { struct l_wait_info lwi = { 0 }; /* * Wait until the number of on-going destroy RPCs drops * under max_rpc_in_flight */ l_wait_event_exclusive(cli->cl_destroy_waitq, osc_can_send_destroy(cli), &lwi); } /* Do not wait for response */ ptlrpcd_add_req(req); RETURN(0);}static void osc_announce_cached(struct client_obd *cli, struct obdo *oa, long writing_bytes){ obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT; LASSERT(!(oa->o_valid & bits)); oa->o_valid |= bits; client_obd_list_lock(&cli->cl_loi_list_lock); oa->o_dirty = cli->cl_dirty; if (cli->cl_dirty > cli->cl_dirty_max) { CERROR("dirty %lu > dirty_max %lu\n", cli->cl_dirty, cli->cl_dirty_max); oa->o_undirty = 0; } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) { CERROR("dirty %d > system dirty_max %d\n", atomic_read(&obd_dirty_pages), obd_max_dirty_pages); oa->o_undirty = 0; } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) { CERROR("dirty %lu - dirty_max %lu too big???\n", cli->cl_dirty, cli->cl_dirty_max); oa->o_undirty = 0; } else { long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)* (cli->cl_max_rpcs_in_flight + 1); oa->o_undirty = max(cli->cl_dirty_max, max_in_flight); } oa->o_grant = cli->cl_avail_grant; oa->o_dropped = cli->cl_lost_grant; cli->cl_lost_grant = 0; client_obd_list_unlock(&cli->cl_loi_list_lock); CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n", oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);}/* caller must hold loi_list_lock */static void osc_consume_write_grant(struct client_obd *cli,struct brw_page *pga)
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?