⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ost_handler.c

📁 lustre 1.6.5 source code
💻 C
📖 第 1 页 / 共 5 页
字号:
        struct l_wait_info lwi;        struct lustre_handle lockh = { 0 };        int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };        int niocount, npages, nob = 0, rc, i;        int no_reply = 0;        ENTRY;        if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))                GOTO(out, rc = -EIO);        OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, (obd_timeout + 1) / 4);        /* Check if there is eviction in progress, and if so, wait for it to         * finish */        if (unlikely(atomic_read(&exp->exp_obd->obd_evict_inprogress))) {                lwi = LWI_INTR(NULL, NULL); // We do not care how long it takes                rc = l_wait_event(exp->exp_obd->obd_evict_inprogress_waitq,                        !atomic_read(&exp->exp_obd->obd_evict_inprogress),                        &lwi);        }        if (exp->exp_failed)                GOTO(out, rc = -ENOTCONN);        body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),                                  lustre_swab_ost_body);        if (body == NULL) {                CERROR("Missing/short ost_body\n");                GOTO(out, rc = -EFAULT);        }        ioo = lustre_swab_reqbuf(req, REQ_REC_OFF + 1, sizeof(*ioo),                                 lustre_swab_obd_ioobj);        if (ioo == NULL) {                CERROR("Missing/short ioobj\n");                GOTO(out, rc = -EFAULT);        }        niocount = ioo->ioo_bufcnt;        if (niocount > PTLRPC_MAX_BRW_PAGES) {                DEBUG_REQ(D_ERROR, req, "bulk has too many pages (%d)",                          niocount);                GOTO(out, rc = -EFAULT);        }        remote_nb = lustre_swab_reqbuf(req, REQ_REC_OFF + 2,                                       niocount * sizeof(*remote_nb),                                       lustre_swab_niobuf_remote);        if (remote_nb == NULL) {                CERROR("Missing/short niobuf\n");                GOTO(out, rc = -EFAULT);        }        if (lustre_msg_swabbed(req->rq_reqmsg)) { /* swab remaining niobufs */                for (i = 1; i < niocount; i++)                        lustre_swab_niobuf_remote (&remote_nb[i]);        }        rc = lustre_pack_reply(req, 2, size, NULL);        if (rc)                GOTO(out, rc);        /*         * Per-thread array of struct niobuf_{local,remote}'s was allocated by         * ost_thread_init().         */        local_nb = ost_tls(req)->local;        pp_rnb   = ost_tls(req)->remote;        /* FIXME all niobuf splitting should be done in obdfilter if needed */        /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */        npages = get_per_page_niobufs(ioo, 1, remote_nb, niocount, &pp_rnb);        if (npages < 0)                GOTO(out, rc = npages);        LASSERT(npages <= OST_THREAD_POOL_SIZE);        ost_nio_pages_get(req, local_nb, npages);        desc = ptlrpc_prep_bulk_exp(req, npages,                                     BULK_PUT_SOURCE, OST_BULK_PORTAL);        if (desc == NULL)                GOTO(out, rc = -ENOMEM);        rc = ost_brw_lock_get(LCK_PR, exp, ioo, pp_rnb, &lockh);        if (rc != 0)                GOTO(out_bulk, rc);        /*          * If getting the lock took more time than         * client was willing to wait, drop it. b=11330         */        if (cfs_time_current_sec() > req->rq_deadline ||             OBD_FAIL_CHECK(OBD_FAIL_OST_DROP_REQ)) {                no_reply = 1;                CERROR("Dropping timed-out read from %s because locking"                       "object "LPX64" took %ld seconds (limit was %ld).\n",                       libcfs_id2str(req->rq_peer), ioo->ioo_id,                       cfs_time_current_sec() - req->rq_arrival_time.tv_sec,                       req->rq_deadline - req->rq_arrival_time.tv_sec);                GOTO(out_lock, rc = -ETIMEDOUT);        }        rc = obd_preprw(OBD_BRW_READ, exp, &body->oa, 1,                        ioo, npages, pp_rnb, local_nb, oti);        if (rc != 0)                GOTO(out_lock, rc);        ost_prolong_locks(exp, ioo, pp_rnb, &body->oa, LCK_PW | LCK_PR);        nob = 0;        for (i = 0; i < npages; i++) {                int page_rc = local_nb[i].rc;                if (page_rc < 0) {              /* error */                        rc = page_rc;                        break;                }                LASSERTF(page_rc <= pp_rnb[i].len, "page_rc (%d) > "                         "pp_rnb[%d].len (%d)\n", page_rc, i, pp_rnb[i].len);                nob += page_rc;                if (page_rc != 0) {             /* some data! */                        LASSERT (local_nb[i].page != NULL);                        ptlrpc_prep_bulk_page(desc, local_nb[i].page,                                              pp_rnb[i].offset & ~CFS_PAGE_MASK,                                              page_rc);                }                if (page_rc != pp_rnb[i].len) { /* short read */                        /* All subsequent pages should be 0 */                        while(++i < npages)                                LASSERT(local_nb[i].rc == 0);                        break;                }        }        if (body->oa.o_valid & OBD_MD_FLCKSUM) {                cksum_type_t cksum_type = OBD_CKSUM_CRC32;                if (body->oa.o_valid & OBD_MD_FLFLAGS)                        cksum_type = cksum_type_unpack(body->oa.o_flags);                body->oa.o_flags = cksum_type_pack(cksum_type);                body->oa.o_valid = OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;                body->oa.o_cksum = ost_checksum_bulk(desc, OST_READ, cksum_type);                CDEBUG(D_PAGE,"checksum at read origin: %x\n",body->oa.o_cksum);        } else {                body->oa.o_valid = 0;        }        /* We're finishing using body->oa as an input variable */        /* Check if client was evicted while we were doing i/o before touching           network */        if (rc == 0) {                /* Check if there is eviction in progress, and if so, wait for                 * it to finish */                if (unlikely(atomic_read(&exp->exp_obd->                                                obd_evict_inprogress))) {                        lwi = LWI_INTR(NULL, NULL);                        rc = l_wait_event(exp->exp_obd->                                                obd_evict_inprogress_waitq,                                          !atomic_read(&exp->exp_obd->                                                        obd_evict_inprogress),                                          &lwi);                }                if (exp->exp_failed)                        rc = -ENOTCONN;                else                        rc = ptlrpc_start_bulk_transfer(desc);                if (rc == 0) {                        time_t start = cfs_time_current_sec();                        do {                                long timeoutl = req->rq_deadline -                                         cfs_time_current_sec();                                cfs_duration_t timeout = (timeoutl <= 0 || rc) ?                                         CFS_TICK : cfs_time_seconds(timeoutl);                                lwi = LWI_TIMEOUT_INTERVAL(timeout,                                                            cfs_time_seconds(1),                                                           ost_bulk_timeout,                                                            desc);                                rc = l_wait_event(desc->bd_waitq,                                                   !ptlrpc_bulk_active(desc) ||                                                  exp->exp_failed, &lwi);                                LASSERT(rc == 0 || rc == -ETIMEDOUT);                                /* Wait again if we changed deadline */                        } while ((rc == -ETIMEDOUT) &&                                  (req->rq_deadline > cfs_time_current_sec()));                        if (rc == -ETIMEDOUT) {                                DEBUG_REQ(D_ERROR, req,                                          "timeout on bulk PUT after %ld%+lds",                                          req->rq_deadline - start,                                          cfs_time_current_sec() -                                          req->rq_deadline);                                ptlrpc_abort_bulk(desc);                        } else if (exp->exp_failed) {                                DEBUG_REQ(D_ERROR, req, "Eviction on bulk PUT");                                rc = -ENOTCONN;                                ptlrpc_abort_bulk(desc);                        } else if (!desc->bd_success ||                                   desc->bd_nob_transferred != desc->bd_nob) {                                DEBUG_REQ(D_ERROR, req, "%s bulk PUT %d(%d)",                                          desc->bd_success ?                                          "truncated" : "network error on",                                          desc->bd_nob_transferred,                                          desc->bd_nob);                                /* XXX should this be a different errno? */                                rc = -ETIMEDOUT;                        }                } else {                        DEBUG_REQ(D_ERROR, req, "bulk PUT failed: rc %d", rc);                }                no_reply = rc != 0;        }        /* Must commit after prep above in all cases */        rc = obd_commitrw(OBD_BRW_READ, exp, &body->oa, 1,                          ioo, npages, local_nb, oti, rc);        ost_nio_pages_put(req, local_nb, npages);        if (rc == 0) {                repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,                                         sizeof(*repbody));                memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));        } out_lock:        ost_brw_lock_put(LCK_PR, ioo, pp_rnb, &lockh); out_bulk:        ptlrpc_free_bulk(desc); out:        LASSERT(rc <= 0);        if (rc == 0) {                req->rq_status = nob;                target_committed_to_req(req);                ptlrpc_reply(req);        } else if (!no_reply) {                /* Only reply if there was no comms problem with bulk */                target_committed_to_req(req);                req->rq_status = rc;                ptlrpc_error(req);        } else {                /* reply out callback would free */                ptlrpc_req_drop_rs(req);                CWARN("%s: ignoring bulk IO comm error with %s@%s id %s - "                      "client will retry\n",                      exp->exp_obd->obd_name,                      exp->exp_client_uuid.uuid,                      exp->exp_connection->c_remote_uuid.uuid,                      libcfs_id2str(req->rq_peer));        }        RETURN(rc);}static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti){        struct ptlrpc_bulk_desc *desc;        struct obd_export       *exp = req->rq_export;        struct niobuf_remote    *remote_nb;        struct niobuf_remote    *pp_rnb;        struct niobuf_local     *local_nb;        struct obd_ioobj        *ioo;        struct ost_body         *body, *repbody;        struct l_wait_info       lwi;        struct lustre_handle     lockh = {0};        __u32                   *rcs;        int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) };        int objcount, niocount, npages;        int rc, swab, i, j;        obd_count                client_cksum = 0, server_cksum = 0;        cksum_type_t             cksum_type = OBD_CKSUM_CRC32;        int                      no_reply = 0;         ENTRY;        if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))                GOTO(out, rc = -EIO);        if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK2))                GOTO(out, rc = -EFAULT);        /* pause before transaction has been started */        OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, (obd_timeout + 1) / 4);        /* Check if there is eviction in progress, and if so, wait for it to         * finish */        if (unlikely(atomic_read(&exp->exp_obd->obd_evict_inprogress))) {                lwi = LWI_INTR(NULL, NULL); // We do not care how long it takes                rc = l_wait_event(exp->exp_obd->obd_evict_inprogress_waitq,                        !atomic_read(&exp->exp_obd->obd_evict_inprogress),                        &lwi);        }        if (exp->exp_failed)                GOTO(out, rc = -ENOTCONN);        swab = lustre_msg_swabbed(req->rq_reqmsg);        body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body),                                  lustre_swab_ost_body);        if (body == NULL) {                CERROR("Missing/short ost_body\n");                GOTO(out, rc = -EFAULT);        }        lustre_set_req_swabbed(req, REQ_REC_OFF + 1);        objcount = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1) /                   sizeof(*ioo);        if (objcount == 0) {                CERROR("Missing/short ioobj\n");                GOTO(out, rc = -EFAULT);        }        if (objcount > 1) {                CERROR("too many ioobjs (%d)\n", objcount);                GOTO(out, rc = -EFAULT);        }        ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1,                             objcount * sizeof(*ioo));        LASSERT (ioo != NULL);        for (niocount = i = 0; i < objcount; i++) {                if (swab)                        lustre_swab_obd_ioobj(&ioo[i]);                if (ioo[i].ioo_bufcnt == 0) {                        CERROR("ioo[%d] has zero bufcnt\n", i);                        GOTO(out, rc = -EFAULT);                }                niocount += ioo[i].ioo_bufcnt;        }        if (niocount > PTLRPC_MAX_BRW_PAGES) {                DEBUG_REQ(D_ERROR, req, "bulk has too many pages (%d)",                          niocount);                GOTO(out, rc = -EFAULT);        }        remote_nb = lustre_swab_reqbuf(req, REQ_REC_OFF + 2,                                       niocount * sizeof(*remote_nb),                                       lustre_swab_niobuf_remote);        if (remote_nb == NULL) {                CERROR("Missing/short niobuf\n");                GOTO(out, rc = -EFAULT);

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -