📄 mdc_request.c
字号:
} DEBUG_REQ(D_INODE, req, "storing generation %u for ino "LPU64, rec->cr_replayfid.generation, rec->cr_replayfid.id);}#ifdef CONFIG_FS_POSIX_ACLstaticint mdc_unpack_acl(struct obd_export *exp, struct ptlrpc_request *req, struct lustre_md *md, unsigned int offset){ struct mds_body *body = md->body; struct posix_acl *acl; void *buf; int rc; if (!body->aclsize) return 0; buf = lustre_msg_buf(req->rq_repmsg, offset, body->aclsize); if (!buf) { CERROR("aclsize %u, bufcount %u, bufsize %u\n", body->aclsize, lustre_msg_bufcount(req->rq_repmsg), (lustre_msg_bufcount(req->rq_repmsg) <= offset) ? -1 : lustre_msg_buflen(req->rq_repmsg, offset)); return -EPROTO; } acl = posix_acl_from_xattr(buf, body->aclsize); if (IS_ERR(acl)) { rc = PTR_ERR(acl); CERROR("convert xattr to acl: %d\n", rc); return rc; } rc = posix_acl_valid(acl); if (rc) { CERROR("validate acl: %d\n", rc); posix_acl_release(acl); return rc; } md->posix_acl = acl; return 0;}#else#define mdc_unpack_acl(exp, req, md, offset) 0#endifint mdc_req2lustre_md(struct ptlrpc_request *req, int offset, struct obd_export *exp, struct lustre_md *md){ int rc = 0; ENTRY; LASSERT(md); memset(md, 0, sizeof(*md)); md->body = lustre_msg_buf(req->rq_repmsg, offset, sizeof (*md->body)); LASSERT (md->body != NULL); LASSERT(lustre_rep_swabbed(req, offset)); offset++; if (md->body->valid & OBD_MD_FLEASIZE) { int lmmsize; struct lov_mds_md *lmm; if (!S_ISREG(md->body->mode)) { CERROR("OBD_MD_FLEASIZE set, should be a regular file, " "but is not\n"); GOTO(err_out, rc = -EPROTO); } if (md->body->eadatasize == 0) { CERROR ("OBD_MD_FLEASIZE set, but eadatasize 0\n"); GOTO(err_out, rc = -EPROTO); } lmmsize = md->body->eadatasize; lmm = lustre_msg_buf(req->rq_repmsg, offset, lmmsize); if (!lmm) { CERROR ("incorrect message: lmm == 0\n"); GOTO(err_out, rc = -EPROTO); } LASSERT(lustre_rep_swabbed(req, offset)); rc = obd_unpackmd(exp, &md->lsm, lmm, lmmsize); if (rc < 0) GOTO(err_out, rc); if (rc < sizeof(*md->lsm)) { CERROR ("lsm size too small: rc < sizeof (*md->lsm) " "(%d < %d)\n", rc, (int)sizeof(*md->lsm)); GOTO(err_out, rc = -EPROTO); } rc = 0; offset++; } if (md->body->valid & OBD_MD_FLDIREA) { if(!S_ISDIR(md->body->mode)) { CERROR("OBD_MD_FLDIREA set, should be a directory, but " "is not\n"); GOTO(err_out, rc = -EPROTO); } offset++; } /* for ACL, it's possible that FLACL is set but aclsize is zero. * only when aclsize != 0 there's an actual segment for ACL in * reply buffer. */ if ((md->body->valid & OBD_MD_FLACL) && md->body->aclsize) { rc = mdc_unpack_acl(exp, req, md, offset); if (rc) GOTO(err_out, rc); offset++; }out: RETURN(rc);err_out: if (md->lsm) obd_free_memmd(exp, &md->lsm); goto out;}void mdc_free_lustre_md(struct obd_export *exp, struct lustre_md *md){ if (md->lsm) obd_free_memmd(exp, &md->lsm);#ifdef CONFIG_FS_POSIX_ACL if (md->posix_acl) { posix_acl_release(md->posix_acl); md->posix_acl = NULL; }#endif}static void mdc_commit_open(struct ptlrpc_request *req){ struct mdc_open_data *mod = req->rq_cb_data; if (mod == NULL) return; if (mod->mod_close_req != NULL) mod->mod_close_req->rq_cb_data = NULL; if (mod->mod_och != NULL) mod->mod_och->och_mod = NULL; OBD_FREE(mod, sizeof(*mod)); req->rq_cb_data = NULL;}static void mdc_replay_open(struct ptlrpc_request *req){ struct mdc_open_data *mod = req->rq_cb_data; struct obd_client_handle *och; struct ptlrpc_request *close_req; struct lustre_handle old; struct mds_body *body; ENTRY; body = lustre_swab_repbuf(req, DLM_REPLY_REC_OFF, sizeof(*body), lustre_swab_mds_body); LASSERT (body != NULL); if (mod == NULL) { DEBUG_REQ(D_ERROR, req, "can't properly replay without open data"); EXIT; return; } och = mod->mod_och; if (och != NULL) { struct lustre_handle *file_fh; LASSERT(och->och_magic == OBD_CLIENT_HANDLE_MAGIC); file_fh = &och->och_fh; CDEBUG(D_RPCTRACE, "updating handle from "LPX64" to "LPX64"\n", file_fh->cookie, body->handle.cookie); memcpy(&old, file_fh, sizeof(old)); memcpy(file_fh, &body->handle, sizeof(*file_fh)); } close_req = mod->mod_close_req; if (close_req != NULL) { struct mds_body *close_body; LASSERT(lustre_msg_get_opc(close_req->rq_reqmsg) == MDS_CLOSE); close_body = lustre_msg_buf(close_req->rq_reqmsg, REQ_REC_OFF, sizeof(*close_body)); if (och != NULL) LASSERT(!memcmp(&old, &close_body->handle, sizeof old)); DEBUG_REQ(D_RPCTRACE, close_req, "updating close with new fh"); memcpy(&close_body->handle, &body->handle, sizeof(close_body->handle)); } EXIT;}void mdc_set_open_replay_data(struct obd_client_handle *och, struct ptlrpc_request *open_req){ struct mdc_open_data *mod; struct mds_rec_create *rec = lustre_msg_buf(open_req->rq_reqmsg, DLM_INTENT_REC_OFF, sizeof(*rec)); struct mds_body *body = lustre_msg_buf(open_req->rq_repmsg, DLM_REPLY_REC_OFF, sizeof(*body)); /* If request is not eligible for replay, just bail out */ if (!open_req->rq_replay) return; /* incoming message in my byte order (it's been swabbed) */ LASSERT(rec != NULL); LASSERT(lustre_rep_swabbed(open_req, DLM_REPLY_REC_OFF)); /* outgoing messages always in my byte order */ LASSERT(body != NULL); if (och) { OBD_ALLOC(mod, sizeof(*mod)); if (mod == NULL) { DEBUG_REQ(D_ERROR, open_req, "can't allocate mdc_open_data"); return; } spin_lock(&open_req->rq_lock); och->och_mod = mod; mod->mod_och = och; mod->mod_open_req = open_req; open_req->rq_cb_data = mod; open_req->rq_commit_cb = mdc_commit_open; spin_unlock(&open_req->rq_lock); } memcpy(&rec->cr_replayfid, &body->fid1, sizeof rec->cr_replayfid); open_req->rq_replay_cb = mdc_replay_open; if (body->fid1.id == 0) { DEBUG_REQ(D_ERROR, open_req, "saving replay request with " "id = 0 gen = %u", body->fid1.generation); LBUG(); } DEBUG_REQ(D_RPCTRACE, open_req, "set up replay data");}void mdc_clear_open_replay_data(struct obd_client_handle *och){ struct mdc_open_data *mod = och->och_mod; /* Don't free the structure now (it happens in mdc_commit_open, after * we're sure we won't need to fix up the close request in the future), * but make sure that replay doesn't poke at the och, which is about to * be freed. */ LASSERT(mod != LP_POISON); if (mod != NULL) mod->mod_och = NULL; och->och_mod = NULL;}static void mdc_commit_close(struct ptlrpc_request *req){ struct mdc_open_data *mod = req->rq_cb_data; struct ptlrpc_request *open_req; struct obd_import *imp = req->rq_import; DEBUG_REQ(D_RPCTRACE, req, "close req committed"); if (mod == NULL) return; mod->mod_close_req = NULL; req->rq_cb_data = NULL; req->rq_commit_cb = NULL; open_req = mod->mod_open_req; LASSERT(open_req != NULL); LASSERT(open_req != LP_POISON); LASSERT(open_req->rq_type != LI_POISON); DEBUG_REQ(D_RPCTRACE, open_req, "open req balanced"); LASSERT(open_req->rq_transno != 0); LASSERT(open_req->rq_import == imp); /* We no longer want to preserve this for transno-unconditional * replay. */ spin_lock(&open_req->rq_lock); open_req->rq_replay = 0; spin_unlock(&open_req->rq_lock);}int mdc_close(struct obd_export *exp, struct obdo *oa, struct obd_client_handle *och, struct ptlrpc_request **request){ struct obd_device *obd = class_exp2obd(exp); int reqsize[2] = { sizeof(struct ptlrpc_body), sizeof(struct mds_body) }; int rc, repsize[4] = { sizeof(struct ptlrpc_body), sizeof(struct mds_body), obd->u.cli.cl_max_mds_easize, obd->u.cli.cl_max_mds_cookiesize }; struct ptlrpc_request *req; struct mdc_open_data *mod; ENTRY; req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION, MDS_CLOSE, 2, reqsize, NULL); if (req == NULL) GOTO(out, rc = -ENOMEM); /* To avoid a livelock (bug 7034), we need to send CLOSE RPCs to a * portal whose threads are not taking any DLM locks and are therefore * always progressing */ req->rq_request_portal = MDS_READPAGE_PORTAL; ptlrpc_at_set_req_timeout(req); /* Ensure that this close's handle is fixed up during replay. */ LASSERT(och != NULL); LASSERT(och->och_magic == OBD_CLIENT_HANDLE_MAGIC); mod = och->och_mod; if (likely(mod != NULL)) { if (mod->mod_open_req->rq_type == LI_POISON) { CERROR("LBUG POISONED open %p!\n", mod->mod_open_req); LBUG(); ptlrpc_req_finished(req); req = NULL; GOTO(out, rc = -EIO); } mod->mod_close_req = req; DEBUG_REQ(D_RPCTRACE, mod->mod_open_req, "matched open"); } else { CDEBUG(D_RPCTRACE, "couldn't find open req; expecting error\n"); } mdc_close_pack(req, REQ_REC_OFF, oa, oa->o_valid, och); ptlrpc_req_set_repsize(req, 4, repsize); req->rq_commit_cb = mdc_commit_close; LASSERT(req->rq_cb_data == NULL); req->rq_cb_data = mod; mdc_get_rpc_lock(obd->u.cli.cl_close_lock, NULL); rc = ptlrpc_queue_wait(req); mdc_put_rpc_lock(obd->u.cli.cl_close_lock, NULL); if (req->rq_repmsg == NULL) { CDEBUG(D_RPCTRACE, "request failed to send: %p, %d\n", req, req->rq_status); if (rc == 0) rc = req->rq_status ? req->rq_status : -EIO; } else if (rc == 0) { rc = lustre_msg_get_status(req->rq_repmsg); if (lustre_msg_get_type(req->rq_repmsg) == PTL_RPC_MSG_ERR) { DEBUG_REQ(D_ERROR, req, "type == PTL_RPC_MSG_ERR, err " "= %d", rc);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -