⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 rw.c

📁 lustre 1.6.5 source code
💻 C
📖 第 1 页 / 共 2 页
字号:
                offset = (pos & ~CFS_PAGE_MASK);                index = pos >> CFS_PAGE_SHIFT;                bytes = CFS_PAGE_SIZE - offset;                if (bytes > count)                        bytes = count;                /* prevent read beyond file range */                if (/* local_lock && */                    cmd == OBD_BRW_READ && pos + bytes >= st->st_size) {                        if (pos >= st->st_size)                                break;                        bytes = st->st_size - pos;                }                /* prepare page for this index */                page->index = index;                page->addr = buf - offset;                page->_offset = offset;                page->_count = bytes;                page++;                npages++;                count -= bytes;                pos += bytes;                buf += bytes;                group->lig_rwcount += bytes;                ret_bytes += bytes;        } while (count);        group->lig_npages += npages;        for (i = 0, page = pages; i < npages;             i++, page++, llap++, llap_cookie += llap_cookie_size){                llap->llap_magic = LLAP_MAGIC;                llap->llap_cookie = llap_cookie;                rc = obd_prep_async_page(exp, lsm, NULL, page,                                         (obd_off)page->index << CFS_PAGE_SHIFT,                                         &llu_async_page_ops,                                         llap, &llap->llap_cookie,                                         1 /* no cache in liblustre at all */,                                         NULL);                if (rc) {                        LASSERT(rc < 0);                        llap->llap_cookie = NULL;                        RETURN(rc);                }                CDEBUG(D_CACHE, "llap %p page %p group %p obj off "LPU64"\n",                       llap, page, llap->llap_cookie,                       (obd_off)pages->index << CFS_PAGE_SHIFT);                page->private = (unsigned long)llap;                llap->llap_page = page;                llap->llap_inode = group->lig_inode;                rc = obd_queue_group_io(exp, lsm, NULL, group->lig_oig,                                        llap->llap_cookie, cmd,                                        page->_offset, page->_count,                                        group->lig_params->lrp_brw_flags,                                        ASYNC_READY | ASYNC_URGENT |                                        ASYNC_COUNT_STABLE | ASYNC_GROUP_SYNC);                if (!local_lock && cmd == OBD_BRW_READ) {                        /*                         * In OST-side locking case short reads cannot be                         * detected properly.                         *                         * The root of the problem is that                         *                         * kms = lov_merge_size(lsm, 1);                         * if (end >= kms)                         *         glimpse_size(inode);                         * else                         *         st->st_size = kms;                         *                         * logic in the read code (both llite and liblustre)                         * only works correctly when client holds DLM lock on                         * [start, end]. Without DLM lock KMS can be                         * completely out of date, and client can either make                         * spurious short-read (missing concurrent write), or                         * return stale data (missing concurrent                         * truncate). For llite client this is fatal, because                         * incorrect data are cached and can be later sent                         * back to the server (vide bug 5047). This is hard to                         * fix by handling short-reads on the server, as there                         * is no easy way to communicate file size (or amount                         * of bytes read/written) back to the client,                         * _especially_ because OSC pages can be sliced and                         * dices into multiple RPCs arbitrary. Fortunately,                         * liblustre doesn't cache data and the worst case is                         * that we get race with concurrent write or truncate.                         */                }                if (rc) {                        LASSERT(rc < 0);                        RETURN(rc);                }                llap->llap_queued = 1;        }        RETURN(ret_bytes);}staticstruct llu_io_group * get_io_group(struct inode *inode, int maxpages,                                   struct lustre_rw_params *params){        struct llu_io_group *group;        int rc;        if (!llap_cookie_size)                llap_cookie_size = obd_prep_async_page(llu_i2obdexp(inode),                                                       NULL, NULL, NULL, 0,                                                       NULL, NULL, NULL, 0,                                                       NULL);        OBD_ALLOC(group, LLU_IO_GROUP_SIZE(maxpages));        if (!group)                return ERR_PTR(-ENOMEM);        I_REF(inode);        group->lig_inode = inode;        group->lig_maxpages = maxpages;        group->lig_params = params;        group->lig_llaps = (struct ll_async_page *)(group + 1);        group->lig_pages = (struct page *)(&group->lig_llaps[maxpages]);        group->lig_llap_cookies = (void *)(&group->lig_pages[maxpages]);        rc = oig_init(&group->lig_oig);        if (rc) {                OBD_FREE(group, LLU_IO_GROUP_SIZE(maxpages));                return ERR_PTR(rc);        }        return group;}static int max_io_pages(ssize_t len, int iovlen){        return (((len + CFS_PAGE_SIZE -1) >> CFS_PAGE_SHIFT) + 2 + iovlen - 1);}staticvoid put_io_group(struct llu_io_group *group){        struct lov_stripe_md *lsm = llu_i2info(group->lig_inode)->lli_smd;        struct obd_export *exp = llu_i2obdexp(group->lig_inode);        struct ll_async_page *llap = group->lig_llaps;        int i;        for (i = 0; i < group->lig_npages; i++, llap++) {                if (llap->llap_cookie)                        obd_teardown_async_page(exp, lsm, NULL,                                                llap->llap_cookie);        }        I_RELE(group->lig_inode);        oig_release(group->lig_oig);        OBD_FREE(group, LLU_IO_GROUP_SIZE(group->lig_maxpages));}staticssize_t llu_file_prwv(const struct iovec *iovec, int iovlen,                        _SYSIO_OFF_T pos, ssize_t len,                        void *private){        struct llu_io_session *session = (struct llu_io_session *) private;        struct inode *inode = session->lis_inode;        struct llu_inode_info *lli = llu_i2info(inode);        struct intnl_stat *st = llu_i2stat(inode);        struct ll_file_data *fd = lli->lli_file_data;        struct lustre_handle lockh = {0};        struct lov_stripe_md *lsm = lli->lli_smd;        struct obd_export *exp = NULL;        struct llu_io_group *iogroup;        struct lustre_rw_params p;        struct ost_lvb lvb;        __u64 kms;        int err, is_read, iovidx, ret;        int local_lock;        ssize_t ret_len = len;        ENTRY;        /* in a large iov read/write we'll be repeatedly called.         * so give a chance to answer cancel ast here         */        liblustre_wait_event(0);        exp = llu_i2obdexp(inode);        if (exp == NULL)                RETURN(-EINVAL);        if (len == 0 || iovlen == 0)                RETURN(0);        if (pos + len > lli->lli_maxbytes)                RETURN(-ERANGE);        lustre_build_lock_params(session->lis_cmd, lli->lli_open_flags,                                 lli->lli_sbi->ll_lco.lco_flags,                                 pos, len, &p);        iogroup = get_io_group(inode, max_io_pages(len, iovlen), &p);        if (IS_ERR(iogroup))                RETURN(PTR_ERR(iogroup));        local_lock = p.lrp_lock_mode != LCK_NL;        err = llu_extent_lock(fd, inode, lsm, p.lrp_lock_mode, &p.lrp_policy,                              &lockh, p.lrp_ast_flags);        if (err != ELDLM_OK)                GOTO(err_put, err);        is_read = (session->lis_cmd == OBD_BRW_READ);        if (is_read) {                /*                 * If OST-side locking is used, KMS can be completely out of                 * date, and, hence, cannot be used for short-read                 * detection. Rely in OST to handle short reads in that case.                 */                inode_init_lvb(inode, &lvb);                obd_merge_lvb(exp, lsm, &lvb, 1);                kms = lvb.lvb_size;                /* extent.end is last byte of the range */                if (p.lrp_policy.l_extent.end >= kms) {                        /* A glimpse is necessary to determine whether                         * we return a short read or some zeroes at                         * the end of the buffer                         *                         * In the case of OST-side locking KMS can be                         * completely out of date and short-reads maybe                         * mishandled. See llu_queue_pio() for more detailed                         * comment.                         */                        if ((err = llu_glimpse_size(inode))) {                                GOTO(err_unlock, err);                        }                } else {                        st->st_size = kms;                }        } else if (lli->lli_open_flags & O_APPEND) {                pos = st->st_size;        }        for (iovidx = 0; iovidx < iovlen; iovidx++) {                char *buf = (char *) iovec[iovidx].iov_base;                size_t count = iovec[iovidx].iov_len;                if (!count)                        continue;                if (len < count)                        count = len;                if (IS_BAD_PTR(buf) || IS_BAD_PTR(buf + count)) {                        GOTO(err_unlock, err = -EFAULT);                }                if (is_read) {                        if (/* local_lock && */ pos >= st->st_size)                                break;                } else {                        if (pos >= lli->lli_maxbytes) {                                GOTO(err_unlock, err = -EFBIG);                        }                        if (pos + count >= lli->lli_maxbytes)                                count = lli->lli_maxbytes - pos;                }                ret = llu_queue_pio(session->lis_cmd, iogroup, buf, count, pos);                if (ret < 0) {                        GOTO(err_unlock, err = ret);                } else {                        pos += ret;                        if (!is_read) {                                LASSERT(ret == count);                                obd_adjust_kms(exp, lsm, pos, 0);                                /* file size grow immediately */                                if (pos > st->st_size)                                        st->st_size = pos;                        }                        len -= ret;                        if (!len)                                break;                }        }        LASSERT(len == 0 || is_read); /* libsysio should guarantee this */        err = obd_trigger_group_io(exp, lsm, NULL, iogroup->lig_oig);        if (err)                GOTO(err_unlock, err);        err = oig_wait(iogroup->lig_oig);        if (err) {                CERROR("%s error: %s\n", is_read ? "read" : "write", strerror(-err));                GOTO(err_unlock, err);        }        ret = llu_extent_unlock(fd, inode, lsm, p.lrp_lock_mode, &lockh);        if (ret)                CERROR("extent unlock error %d\n", ret);        session->lis_groups[session->lis_ngroups++] = iogroup;        RETURN(ret_len);err_unlock:        llu_extent_unlock(fd, inode, lsm, p.lrp_lock_mode, &lockh);err_put:        put_io_group(iogroup);        RETURN((ssize_t)err);}staticstruct llu_io_session *get_io_session(struct inode *ino, int ngroups, int cmd){        struct llu_io_session *session;        OBD_ALLOC(session, LLU_IO_SESSION_SIZE(ngroups));        if (!session)                return NULL;        I_REF(ino);        session->lis_inode = ino;        session->lis_max_groups = ngroups;        session->lis_cmd = cmd;        return session;}static void put_io_session(struct llu_io_session *session){        int i;        for (i = 0; i < session->lis_ngroups; i++) {                if (session->lis_groups[i]) {                        put_io_group(session->lis_groups[i]);                        session->lis_groups[i] = NULL;                }        }        I_RELE(session->lis_inode);        OBD_FREE(session, LLU_IO_SESSION_SIZE(session->lis_max_groups));}static int llu_file_rwx(struct inode *ino,                        struct ioctx *ioctx,                        int read){        struct llu_io_session *session;        ssize_t cc;        int cmd = read ? OBD_BRW_READ : OBD_BRW_WRITE;        ENTRY;        LASSERT(ioctx->ioctx_xtvlen >= 0);        LASSERT(ioctx->ioctx_iovlen >= 0);        liblustre_wait_event(0);        if (!ioctx->ioctx_xtvlen)                RETURN(0);        /* XXX consider other types later */        if (S_ISDIR(llu_i2stat(ino)->st_mode))                RETURN(-EISDIR);        if (!S_ISREG(llu_i2stat(ino)->st_mode))                RETURN(-EOPNOTSUPP);        session = get_io_session(ino, ioctx->ioctx_xtvlen * 2, cmd);        if (!session)                RETURN(-ENOMEM);        cc = _sysio_enumerate_extents(ioctx->ioctx_xtv, ioctx->ioctx_xtvlen,                                      ioctx->ioctx_iov, ioctx->ioctx_iovlen,                                      llu_file_prwv, session);        if (cc >= 0) {                LASSERT(!ioctx->ioctx_cc);                ioctx->ioctx_private = session;                cc = 0;        } else {                put_io_session(session);        }        liblustre_wait_event(0);        RETURN(cc);}int llu_iop_read(struct inode *ino,                 struct ioctx *ioctx){        /* BUG: 5972 */        struct intnl_stat *st = llu_i2stat(ino);        st->st_atime = CURRENT_TIME;        return llu_file_rwx(ino, ioctx, 1);}int llu_iop_write(struct inode *ino,                  struct ioctx *ioctx){        struct intnl_stat *st = llu_i2stat(ino);        st->st_mtime = st->st_ctime = CURRENT_TIME;        return llu_file_rwx(ino, ioctx, 0);}int llu_iop_iodone(struct ioctx *ioctx){        struct llu_io_session *session;        struct llu_io_group *group;        int i, err = 0, rc = 0;        ENTRY;        liblustre_wait_event(0);        session = (struct llu_io_session *) ioctx->ioctx_private;        LASSERT(session);        LASSERT(!IS_ERR(session));        for (i = 0; i < session->lis_ngroups; i++) {                group = session->lis_groups[i];                if (group) {                        if (!rc) {                                err = oig_wait(group->lig_oig);                                if (err)                                        rc = err;                        }                        if (!rc)                                ioctx->ioctx_cc += group->lig_rwcount;                        put_io_group(group);                        session->lis_groups[i] = NULL;                }        }        if (rc) {                LASSERT(rc < 0);                ioctx->ioctx_cc = -1;                ioctx->ioctx_errno = -rc;        }        put_io_session(session);        ioctx->ioctx_private = NULL;        liblustre_wait_event(0);        RETURN(1);}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -