osc_request.c

来自「lustre 1.6.5 source code」· C语言 代码 · 共 1,753 行 · 第 1/5 页

C
1,753
字号
        }        RETURN(0);}int osc_real_create(struct obd_export *exp, struct obdo *oa,                    struct lov_stripe_md **ea, struct obd_trans_info *oti){        struct ptlrpc_request *req;        struct ost_body *body;        struct lov_stripe_md *lsm;        int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };        ENTRY;        LASSERT(oa);        LASSERT(ea);        lsm = *ea;        if (!lsm) {                rc = obd_alloc_memmd(exp, &lsm);                if (rc < 0)                        RETURN(rc);        }        req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,                              OST_CREATE, 2, size, NULL);        if (!req)                GOTO(out, rc = -ENOMEM);        body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));        memcpy(&body->oa, oa, sizeof(body->oa));        ptlrpc_req_set_repsize(req, 2, size);        if ((oa->o_valid & OBD_MD_FLFLAGS) &&            oa->o_flags == OBD_FL_DELORPHAN) {                DEBUG_REQ(D_HA, req,                          "delorphan from OST integration");                /* Don't resend the delorphan req */                req->rq_no_resend = req->rq_no_delay = 1;        }        rc = ptlrpc_queue_wait(req);        if (rc)                GOTO(out_req, rc);        body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),                                  lustre_swab_ost_body);        if (body == NULL) {                CERROR ("can't unpack ost_body\n");                GOTO (out_req, rc = -EPROTO);        }        memcpy(oa, &body->oa, sizeof(*oa));        /* This should really be sent by the OST */        oa->o_blksize = PTLRPC_MAX_BRW_SIZE;        oa->o_valid |= OBD_MD_FLBLKSZ;        /* XXX LOV STACKING: the lsm that is passed to us from LOV does not         * have valid lsm_oinfo data structs, so don't go touching that.         * This needs to be fixed in a big way.         */        lsm->lsm_object_id = oa->o_id;        *ea = lsm;        if (oti != NULL) {                oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);                if (oa->o_valid & OBD_MD_FLCOOKIE) {                        if (!oti->oti_logcookies)                                oti_alloc_cookies(oti, 1);                        *oti->oti_logcookies = oa->o_lcookie;                }        }        CDEBUG(D_HA, "transno: "LPD64"\n",               lustre_msg_get_transno(req->rq_repmsg));out_req:        ptlrpc_req_finished(req);out:        if (rc && !*ea)                obd_free_memmd(exp, &lsm);        RETURN(rc);}static int osc_punch_interpret(struct ptlrpc_request *req,                               struct osc_async_args *aa, int rc){        struct ost_body *body;        ENTRY;        if (rc != 0)                GOTO(out, rc);        body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof (*body),                                  lustre_swab_ost_body);        if (body == NULL) {                CERROR ("can't unpack ost_body\n");                GOTO(out, rc = -EPROTO);        }        memcpy(aa->aa_oi->oi_oa, &body->oa, sizeof(*aa->aa_oi->oi_oa));out:        rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);        RETURN(rc);}static int osc_punch(struct obd_export *exp, struct obd_info *oinfo,                     struct obd_trans_info *oti,                     struct ptlrpc_request_set *rqset){        struct ptlrpc_request *req;        struct osc_async_args *aa;        struct ost_body *body;        int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };        ENTRY;        if (!oinfo->oi_oa) {                CERROR("oa NULL\n");                RETURN(-EINVAL);        }        req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,                              OST_PUNCH, 2, size, NULL);        if (!req)                RETURN(-ENOMEM);        req->rq_request_portal = OST_IO_PORTAL;         /* bug 7198 */        ptlrpc_at_set_req_timeout(req);        body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));        memcpy(&body->oa, oinfo->oi_oa, sizeof(*oinfo->oi_oa));        /* overload the size and blocks fields in the oa with start/end */        body->oa.o_size = oinfo->oi_policy.l_extent.start;        body->oa.o_blocks = oinfo->oi_policy.l_extent.end;        body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);        ptlrpc_req_set_repsize(req, 2, size);        req->rq_interpret_reply = osc_punch_interpret;        CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));        aa = (struct osc_async_args *)&req->rq_async_args;        aa->aa_oi = oinfo;        ptlrpc_set_add_req(rqset, req);        RETURN(0);}static int osc_sync(struct obd_export *exp, struct obdo *oa,                    struct lov_stripe_md *md, obd_size start, obd_size end){        struct ptlrpc_request *req;        struct ost_body *body;        int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };        ENTRY;        if (!oa) {                CERROR("oa NULL\n");                RETURN(-EINVAL);        }        req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_OST_VERSION,                              OST_SYNC, 2, size, NULL);        if (!req)                RETURN(-ENOMEM);        body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));        memcpy(&body->oa, oa, sizeof(*oa));        /* overload the size and blocks fields in the oa with start/end */        body->oa.o_size = start;        body->oa.o_blocks = end;        body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);        ptlrpc_req_set_repsize(req, 2, size);        rc = ptlrpc_queue_wait(req);        if (rc)                GOTO(out, rc);        body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),                                  lustre_swab_ost_body);        if (body == NULL) {                CERROR ("can't unpack ost_body\n");                GOTO (out, rc = -EPROTO);        }        memcpy(oa, &body->oa, sizeof(*oa));        EXIT; out:        ptlrpc_req_finished(req);        return rc;}/* Find and cancel locally locks matched by @mode in the resource found by * @objid. Found locks are added into @cancel list. Returns the amount of * locks added to @cancels list. */static int osc_resource_get_unused(struct obd_export *exp, __u64 objid,                                   struct list_head *cancels, ldlm_mode_t mode,                                   int lock_flags){        struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;        struct ldlm_res_id res_id = { .name = { objid } };        struct ldlm_resource *res = ldlm_resource_get(ns, NULL, res_id, 0, 0);        int count;        ENTRY;        if (res == NULL)                RETURN(0);        count = ldlm_cancel_resource_local(res, cancels, NULL, mode,                                           lock_flags, 0, NULL);        ldlm_resource_putref(res);        RETURN(count);}static int osc_destroy_interpret(struct ptlrpc_request *req, void *data,                                 int rc){        struct client_obd *cli = &req->rq_import->imp_obd->u.cli;        atomic_dec(&cli->cl_destroy_in_flight);        cfs_waitq_signal(&cli->cl_destroy_waitq);        return 0;}static int osc_can_send_destroy(struct client_obd *cli){        if (atomic_inc_return(&cli->cl_destroy_in_flight) <=            cli->cl_max_rpcs_in_flight) {                /* The destroy request can be sent */                return 1;        }        if (atomic_dec_return(&cli->cl_destroy_in_flight) <            cli->cl_max_rpcs_in_flight) {                /*                 * The counter has been modified between the two atomic                 * operations.                 */                cfs_waitq_signal(&cli->cl_destroy_waitq);        }        return 0;}/* Destroy requests can be async always on the client, and we don't even really * care about the return code since the client cannot do anything at all about * a destroy failure. * When the MDS is unlinking a filename, it saves the file objects into a * recovery llog, and these object records are cancelled when the OST reports * they were destroyed and sync'd to disk (i.e. transaction committed). * If the client dies, or the OST is down when the object should be destroyed, * the records are not cancelled, and when the OST reconnects to the MDS next, * it will retrieve the llog unlink logs and then sends the log cancellation * cookies to the MDS after committing destroy transactions. */static int osc_destroy(struct obd_export *exp, struct obdo *oa,                       struct lov_stripe_md *ea, struct obd_trans_info *oti,                       struct obd_export *md_export){        CFS_LIST_HEAD(cancels);        struct ptlrpc_request *req;        struct ost_body *body;        int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body),                        sizeof(struct ldlm_request) };        int count, bufcount = 2;        struct client_obd *cli = &exp->exp_obd->u.cli;        ENTRY;        if (!oa) {                CERROR("oa NULL\n");                RETURN(-EINVAL);        }        LASSERT(oa->o_id != 0);        count = osc_resource_get_unused(exp, oa->o_id, &cancels, LCK_PW,                                        LDLM_FL_DISCARD_DATA);        if (exp_connect_cancelset(exp))                bufcount = 3;        req = ldlm_prep_elc_req(exp, LUSTRE_OST_VERSION, OST_DESTROY, bufcount,                                size, REQ_REC_OFF + 1, 0, &cancels, count);        if (!req)                RETURN(-ENOMEM);        req->rq_request_portal = OST_IO_PORTAL;         /* bug 7198 */        req->rq_interpret_reply = osc_destroy_interpret;        ptlrpc_at_set_req_timeout(req);        body = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(*body));        if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE) {                oa->o_lcookie = *oti->oti_logcookies;        }        memcpy(&body->oa, oa, sizeof(*oa));        ptlrpc_req_set_repsize(req, 2, size);        if (!osc_can_send_destroy(cli)) {                struct l_wait_info lwi = { 0 };                /*                 * Wait until the number of on-going destroy RPCs drops                 * under max_rpc_in_flight                 */                l_wait_event_exclusive(cli->cl_destroy_waitq,                                       osc_can_send_destroy(cli), &lwi);        }        /* Do not wait for response */        ptlrpcd_add_req(req);        RETURN(0);}static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,                                long writing_bytes){        obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;        LASSERT(!(oa->o_valid & bits));        oa->o_valid |= bits;        client_obd_list_lock(&cli->cl_loi_list_lock);        oa->o_dirty = cli->cl_dirty;        if (cli->cl_dirty > cli->cl_dirty_max) {                CERROR("dirty %lu > dirty_max %lu\n",                       cli->cl_dirty, cli->cl_dirty_max);                oa->o_undirty = 0;        } else if (atomic_read(&obd_dirty_pages) > obd_max_dirty_pages) {                CERROR("dirty %d > system dirty_max %d\n",                       atomic_read(&obd_dirty_pages), obd_max_dirty_pages);                oa->o_undirty = 0;        } else if (cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff) {                CERROR("dirty %lu - dirty_max %lu too big???\n",                       cli->cl_dirty, cli->cl_dirty_max);                oa->o_undirty = 0;        } else {                long max_in_flight = (cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT)*                                (cli->cl_max_rpcs_in_flight + 1);                oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);        }        oa->o_grant = cli->cl_avail_grant;        oa->o_dropped = cli->cl_lost_grant;        cli->cl_lost_grant = 0;        client_obd_list_unlock(&cli->cl_loi_list_lock);        CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",               oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);}/* caller must hold loi_list_lock */static void osc_consume_write_grant(struct client_obd *cli,struct brw_page *pga)

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?