⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 ost_handler.c

📁 lustre 1.6.5 source code
💻 C
📖 第 1 页 / 共 5 页
字号:
                                  lustre_swab_ost_body);        if (body == NULL)                RETURN(-EFAULT);        rc = lustre_pack_reply(req, 2, size, NULL);        if (rc)                RETURN(rc);        repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,                                 sizeof(*repbody));        memcpy(&repbody->oa, &body->oa, sizeof(body->oa));        oinfo.oi_oa = &repbody->oa;        req->rq_status = obd_setattr(exp, &oinfo, oti);        RETURN(0);}static int ost_bulk_timeout(void *data){        ENTRY;        /* We don't fail the connection here, because having the export         * killed makes the (vital) call to commitrw very sad.         */        RETURN(1);}static int get_per_page_niobufs(struct obd_ioobj *ioo, int nioo,                                struct niobuf_remote *rnb, int nrnb,                                struct niobuf_remote **pp_rnbp){        /* Copy a remote niobuf, splitting it into page-sized chunks         * and setting ioo[i].ioo_bufcnt accordingly */        struct niobuf_remote *pp_rnb;        int   i;        int   j;        int   page;        int   rnbidx = 0;        int   npages = 0;        /*         * array of sufficient size already preallocated by caller         */        LASSERT(pp_rnbp != NULL);        LASSERT(*pp_rnbp != NULL);        /* first count and check the number of pages required */        for (i = 0; i < nioo; i++)                for (j = 0; j < ioo->ioo_bufcnt; j++, rnbidx++) {                        obd_off offset = rnb[rnbidx].offset;                        obd_off p0 = offset >> CFS_PAGE_SHIFT;                        obd_off pn = (offset + rnb[rnbidx].len - 1)>>CFS_PAGE_SHIFT;                        LASSERT(rnbidx < nrnb);                        npages += (pn + 1 - p0);                        if (rnb[rnbidx].len == 0) {                                CERROR("zero len BRW: obj %d objid "LPX64                                       " buf %u\n", i, ioo[i].ioo_id, j);                                return -EINVAL;                        }                        if (j > 0 &&                            rnb[rnbidx].offset <= rnb[rnbidx-1].offset) {                                CERROR("unordered BRW: obj %d objid "LPX64                                       " buf %u offset "LPX64" <= "LPX64"\n",                                       i, ioo[i].ioo_id, j, rnb[rnbidx].offset,                                       rnb[rnbidx].offset);                                return -EINVAL;                        }                }        LASSERT(rnbidx == nrnb);        if (npages == nrnb) {       /* all niobufs are for single pages */                *pp_rnbp = rnb;                return npages;        }        pp_rnb = *pp_rnbp;        /* now do the actual split */        page = rnbidx = 0;        for (i = 0; i < nioo; i++) {                int  obj_pages = 0;                for (j = 0; j < ioo[i].ioo_bufcnt; j++, rnbidx++) {                        obd_off off = rnb[rnbidx].offset;                        int     nob = rnb[rnbidx].len;                        LASSERT(rnbidx < nrnb);                        do {                                obd_off  poff = off & ~CFS_PAGE_MASK;                                int      pnob = (poff + nob > CFS_PAGE_SIZE) ?                                                CFS_PAGE_SIZE - poff : nob;                                LASSERT(page < npages);                                pp_rnb[page].len = pnob;                                pp_rnb[page].offset = off;                                pp_rnb[page].flags = rnb[rnbidx].flags;                                CDEBUG(0, "   obj %d id "LPX64                                       "page %d(%d) "LPX64" for %d, flg %x\n",                                       i, ioo[i].ioo_id, obj_pages, page,                                       pp_rnb[page].offset, pp_rnb[page].len,                                       pp_rnb[page].flags);                                page++;                                obj_pages++;                                off += pnob;                                nob -= pnob;                        } while (nob > 0);                        LASSERT(nob == 0);                }                ioo[i].ioo_bufcnt = obj_pages;        }        LASSERT(page == npages);        return npages;}static __u32 ost_checksum_bulk(struct ptlrpc_bulk_desc *desc, int opc,                               cksum_type_t cksum_type){        __u32 cksum;        int i;        cksum = init_checksum(cksum_type);        for (i = 0; i < desc->bd_iov_count; i++) {                struct page *page = desc->bd_iov[i].kiov_page;                int off = desc->bd_iov[i].kiov_offset & ~CFS_PAGE_MASK;                char *ptr = kmap(page) + off;                int len = desc->bd_iov[i].kiov_len;                /* corrupt the data before we compute the checksum, to                 * simulate a client->OST data error */                if (i == 0 && opc == OST_WRITE &&                    OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_CHECKSUM_RECEIVE))                        memcpy(ptr, "bad3", min(4, len));                cksum = compute_checksum(cksum, ptr, len, cksum_type);                /* corrupt the data after we compute the checksum, to                 * simulate an OST->client data error */                if (i == 0 && opc == OST_READ &&                    OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_CHECKSUM_SEND))                        memcpy(ptr, "bad4", min(4, len));                kunmap(page);        }        return cksum;}/* * populate @nio by @nrpages pages from per-thread page pool */static void ost_nio_pages_get(struct ptlrpc_request *req,                              struct niobuf_local *nio, int nrpages){        int i;        struct ost_thread_local_cache *tls;        ENTRY;        LASSERT(nrpages <= OST_THREAD_POOL_SIZE);        LASSERT(req != NULL);        LASSERT(req->rq_svc_thread != NULL);        tls = ost_tls(req);        LASSERT(tls != NULL);        memset(nio, 0, nrpages * sizeof *nio);        for (i = 0; i < nrpages; ++ i) {                struct page *page;                page = tls->page[i];                LASSERT(page != NULL);                POISON_PAGE(page, 0xf1);                nio[i].page = page;                LL_CDEBUG_PAGE(D_INFO, page, "%d\n", i);        }        EXIT;}/* * Dual for ost_nio_pages_get(). Poison pages in pool for debugging */static void ost_nio_pages_put(struct ptlrpc_request *req,                              struct niobuf_local *nio, int nrpages){        int i;        ENTRY;        LASSERT(nrpages <= OST_THREAD_POOL_SIZE);        for (i = 0; i < nrpages; ++ i)                POISON_PAGE(nio[i].page, 0xf2);        EXIT;}static int ost_brw_lock_get(int mode, struct obd_export *exp,                            struct obd_ioobj *obj, struct niobuf_remote *nb,                            struct lustre_handle *lh){        int flags                 = 0;        int nrbufs                = obj->ioo_bufcnt;        struct ldlm_res_id res_id = { .name = { obj->ioo_id } };        ldlm_policy_data_t policy;        int i;        ENTRY;        LASSERT(mode == LCK_PR || mode == LCK_PW);        LASSERT(!lustre_handle_is_used(lh));        if (nrbufs == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))                RETURN(0);        /* EXPENSIVE ASSERTION */        for (i = 1; i < nrbufs; i ++)                LASSERT((nb[0].flags & OBD_BRW_SRVLOCK) ==                        (nb[i].flags & OBD_BRW_SRVLOCK));        policy.l_extent.start = nb[0].offset & CFS_PAGE_MASK;        policy.l_extent.end   = (nb[nrbufs - 1].offset +                                 nb[nrbufs - 1].len - 1) | ~CFS_PAGE_MASK;        RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id,                                       LDLM_EXTENT, &policy, mode, &flags,                                      ldlm_blocking_ast, ldlm_completion_ast,                                      ldlm_glimpse_ast, NULL, 0, NULL, lh));}static void ost_brw_lock_put(int mode,                             struct obd_ioobj *obj, struct niobuf_remote *niob,                             struct lustre_handle *lh){        ENTRY;        LASSERT(mode == LCK_PR || mode == LCK_PW);        LASSERT((obj->ioo_bufcnt > 0 && (niob[0].flags & OBD_BRW_SRVLOCK)) ==                lustre_handle_is_used(lh));        if (lustre_handle_is_used(lh))                ldlm_lock_decref(lh, mode);        EXIT;}struct ost_prolong_data {        struct obd_export *opd_exp;        ldlm_policy_data_t opd_policy;        ldlm_mode_t opd_mode;};static int ost_prolong_locks_iter(struct ldlm_lock *lock, void *data){        struct ost_prolong_data *opd = data;        LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);        if (lock->l_req_mode != lock->l_granted_mode) {                /* scan granted locks only */                return LDLM_ITER_STOP;        }        if (lock->l_export != opd->opd_exp) {                /* prolong locks only for given client */                return LDLM_ITER_CONTINUE;        }        if (!(lock->l_granted_mode & opd->opd_mode)) {                /* we aren't interesting in all type of locks */                return LDLM_ITER_CONTINUE;        }        if (lock->l_policy_data.l_extent.end < opd->opd_policy.l_extent.start ||            lock->l_policy_data.l_extent.start > opd->opd_policy.l_extent.end) {                /* the request doesn't cross the lock, skip it */                return LDLM_ITER_CONTINUE;        }        if (!(lock->l_flags & LDLM_FL_AST_SENT)) {                /* ignore locks not being cancelled */                return LDLM_ITER_CONTINUE;        }        /* OK. this is a possible lock the user holds doing I/O         * let's refresh eviction timer for it */        ldlm_refresh_waiting_lock(lock);        return LDLM_ITER_CONTINUE;}static void ost_prolong_locks(struct obd_export *exp, struct obd_ioobj *obj,                              struct niobuf_remote *nb, struct obdo *oa,                              ldlm_mode_t mode){        struct ldlm_res_id res_id = { .name = { obj->ioo_id } };        int nrbufs = obj->ioo_bufcnt;        struct ost_prolong_data opd;        ENTRY;        opd.opd_mode = mode;        opd.opd_exp = exp;        opd.opd_policy.l_extent.start = nb[0].offset & CFS_PAGE_MASK;        opd.opd_policy.l_extent.end = (nb[nrbufs - 1].offset +                                       nb[nrbufs - 1].len - 1) | ~CFS_PAGE_MASK;        CDEBUG(D_DLMTRACE,"refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",               res_id.name[0], res_id.name[1], opd.opd_policy.l_extent.start,               opd.opd_policy.l_extent.end);        if (oa->o_valid & OBD_MD_FLHANDLE) {                struct ldlm_lock *lock;                lock = ldlm_handle2lock(&oa->o_handle);                if (lock != NULL) {                        ost_prolong_locks_iter(lock, &opd);                        LDLM_LOCK_PUT(lock);                        EXIT;                        return;                }        }        ldlm_resource_iterate(exp->exp_obd->obd_namespace, &res_id,                              ost_prolong_locks_iter, &opd);        EXIT;}static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti){        struct ptlrpc_bulk_desc *desc;        struct obd_export       *exp = req->rq_export;        struct niobuf_remote *remote_nb;        struct niobuf_remote *pp_rnb = NULL;        struct niobuf_local *local_nb;        struct obd_ioobj *ioo;        struct ost_body *body, *repbody;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -