filter_io.c

来自「lustre 1.6.5 source code」· C语言 代码 · 共 863 行 · 第 1/3 页

C
863
字号
                /* We still set up for ungranted pages so that granted pages                 * can be written to disk as they were promised, and portals                 * needs to keep the pages all aligned properly. */                lnb->dentry = dentry;                lnb->offset = rnb->offset;                lnb->len    = rnb->len;                lnb->flags  = rnb->flags;                /*                 * ost_brw_write()->ost_nio_pages_get() already initialized                 * lnb->page to point to the page from the per-thread page                 * pool (bug 5137), initialize page.                 */                LASSERT(lnb->page != NULL);                if (lnb->len != CFS_PAGE_SIZE) {                        memset(kmap(lnb->page) + lnb->len,                               0, CFS_PAGE_SIZE - lnb->len);                        kunmap(lnb->page);                }                lnb->page->index = lnb->offset >> CFS_PAGE_SHIFT;                cleanup_phase = 4;                /* If the filter writes a partial page, then has the file                 * extended, the client will read in the whole page.  the                 * filter has to be careful to zero the rest of the partial                 * page on disk.  we do it by hand for partial extending                 * writes, send_bio() is responsible for zeroing pages when                 * asked to read unmapped blocks -- brw_kiovec() does this. */                if (lnb->len != CFS_PAGE_SIZE) {                        __s64 maxidx;                        maxidx = ((i_size_read(dentry->d_inode) +                                   CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT) - 1;                        if (maxidx >= lnb->page->index) {                                LL_CDEBUG_PAGE(D_PAGE, lnb->page, "write %u @ "                                               LPU64" flg %x before EOF %llu\n",                                               lnb->len, lnb->offset,lnb->flags,                                               i_size_read(dentry->d_inode));                                filter_iobuf_add_page(exp->exp_obd, iobuf,                                                      dentry->d_inode,                                                      lnb->page);                        } else {                                long off;                                char *p = kmap(lnb->page);                                off = lnb->offset & ~CFS_PAGE_MASK;                                if (off)                                        memset(p, 0, off);                                off = (lnb->offset + lnb->len) & ~CFS_PAGE_MASK;                                if (off)                                        memset(p + off, 0, CFS_PAGE_SIZE - off);                                kunmap(lnb->page);                        }                }                if (lnb->rc == 0)                        tot_bytes += lnb->len;        }        rc = filter_direct_io(OBD_BRW_READ, dentry, iobuf, exp,                              NULL, NULL, NULL);        fsfilt_check_slow(exp->exp_obd, now, "start_page_write");        lprocfs_counter_add(exp->exp_obd->obd_stats, LPROC_FILTER_WRITE_BYTES,                            tot_bytes);        if (exp->exp_nid_stats && exp->exp_nid_stats->nid_stats)                lprocfs_counter_add(exp->exp_nid_stats->nid_stats,                                    LPROC_FILTER_WRITE_BYTES, tot_bytes);        EXIT;cleanup:        switch(cleanup_phase) {        case 4:        case 3:                filter_iobuf_put(&exp->exp_obd->u.filter, iobuf, oti);        case 2:                pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);                if (rc)                        f_dput(dentry);                break;        case 1:                filter_iobuf_put(&exp->exp_obd->u.filter, iobuf, oti);        case 0:                spin_lock(&exp->exp_obd->obd_osfs_lock);                if (oa)                        filter_grant_incoming(exp, oa);                spin_unlock(&exp->exp_obd->obd_osfs_lock);                pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL);                break;        default:;        }        return rc;}int filter_preprw(int cmd, struct obd_export *exp, struct obdo *oa,                  int objcount, struct obd_ioobj *obj, int niocount,                  struct niobuf_remote *nb, struct niobuf_local *res,                  struct obd_trans_info *oti){        if (cmd == OBD_BRW_WRITE)                return filter_preprw_write(cmd, exp, oa, objcount, obj,                                           niocount, nb, res, oti);        if (cmd == OBD_BRW_READ)                return filter_preprw_read(cmd, exp, oa, objcount, obj,                                          niocount, nb, res, oti);        LBUG();        return -EPROTO;}void filter_release_read_page(struct filter_obd *filter, struct inode *inode,                              struct page *page){        int drop = 0;        if (inode != NULL &&            (i_size_read(inode) > filter->fo_readcache_max_filesize))                drop = 1;        /* drop from cache like truncate_list_pages() */        if (drop && !TryLockPage(page)) {                if (page->mapping)                        ll_truncate_complete_page(page);                unlock_page(page);        }        page_cache_release(page);}static int filter_commitrw_read(struct obd_export *exp, struct obdo *oa,                                int objcount, struct obd_ioobj *obj,                                int niocount, struct niobuf_local *res,                                struct obd_trans_info *oti, int rc){        struct inode *inode = NULL;        struct ldlm_res_id res_id = { .name = { obj->ioo_id } };        struct ldlm_resource *resource = NULL;        struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;        ENTRY;        /* If oa != NULL then filter_preprw_read updated the inode atime         * and we should update the lvb so that other glimpses will also         * get the updated value. bug 5972 */        if (oa && ns && ns->ns_lvbo && ns->ns_lvbo->lvbo_update) {                resource = ldlm_resource_get(ns, NULL, res_id, LDLM_EXTENT, 0);                if (resource != NULL) {                        ns->ns_lvbo->lvbo_update(resource, NULL, 0, 1);                        ldlm_resource_putref(resource);                }        }        if (res->dentry != NULL)                inode = res->dentry->d_inode;        filter_free_dio_pages(objcount, obj, niocount, res);        if (res->dentry != NULL)                f_dput(res->dentry);        RETURN(rc);}void flip_into_page_cache(struct inode *inode, struct page *new_page){        struct page *old_page;        int rc;        do {                /* the dlm is protecting us from read/write concurrency, so we                 * expect this find_lock_page to return quickly.  even if we                 * race with another writer it won't be doing much work with                 * the page locked.  we do this 'cause t_c_p expects a                 * locked page, and it wants to grab the pagecache lock                 * as well. */                old_page = find_lock_page(inode->i_mapping, new_page->index);                if (old_page) {                        ll_truncate_complete_page(old_page);                        unlock_page(old_page);                        page_cache_release(old_page);                }#if 0 /* this should be a /proc tunable someday */                /* racing o_directs (no locking ioctl) could race adding                 * their pages, so we repeat the page invalidation unless                 * we successfully added our new page */                rc = add_to_page_cache_unique(new_page, inode->i_mapping,                                              new_page->index,                                              page_hash(inode->i_mapping,                                                        new_page->index));                if (rc == 0) {                        /* add_to_page_cache clears uptodate|dirty and locks                         * the page */                        SetPageUptodate(new_page);                        unlock_page(new_page);                }#else                rc = 0;#endif        } while (rc != 0);}void filter_grant_commit(struct obd_export *exp, int niocount,                         struct niobuf_local *res){        struct filter_obd *filter = &exp->exp_obd->u.filter;        struct niobuf_local *lnb = res;        unsigned long pending = 0;        int i;        spin_lock(&exp->exp_obd->obd_osfs_lock);        for (i = 0, lnb = res; i < niocount; i++, lnb++)                pending += lnb->lnb_grant_used;        LASSERTF(exp->exp_filter_data.fed_pending >= pending,                 "%s: cli %s/%p fed_pending: %lu grant_used: %lu\n",                 exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp,                 exp->exp_filter_data.fed_pending, pending);        exp->exp_filter_data.fed_pending -= pending;        LASSERTF(filter->fo_tot_granted >= pending,                 "%s: cli %s/%p tot_granted: "LPU64" grant_used: %lu\n",                 exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp,                 exp->exp_obd->u.filter.fo_tot_granted, pending);        filter->fo_tot_granted -= pending;        LASSERTF(filter->fo_tot_pending >= pending,                 "%s: cli %s/%p tot_pending: "LPU64" grant_used: %lu\n",                 exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp,                 filter->fo_tot_pending, pending);        filter->fo_tot_pending -= pending;        spin_unlock(&exp->exp_obd->obd_osfs_lock);}int filter_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,                    int objcount, struct obd_ioobj *obj, int niocount,                    struct niobuf_local *res, struct obd_trans_info *oti,                    int rc){        if (cmd == OBD_BRW_WRITE)                return filter_commitrw_write(exp, oa, objcount, obj, niocount,                                             res, oti, rc);        if (cmd == OBD_BRW_READ)                return filter_commitrw_read(exp, oa, objcount, obj, niocount,                                            res, oti, rc);        LBUG();        return -EPROTO;}int filter_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,               obd_count oa_bufs, struct brw_page *pga,               struct obd_trans_info *oti){        struct obd_ioobj ioo;        struct niobuf_local *lnb;        struct niobuf_remote *rnb;        obd_count i;        int ret = 0;        ENTRY;        OBD_ALLOC(lnb, oa_bufs * sizeof(struct niobuf_local));        OBD_ALLOC(rnb, oa_bufs * sizeof(struct niobuf_remote));        if (lnb == NULL || rnb == NULL)                GOTO(out, ret = -ENOMEM);        for (i = 0; i < oa_bufs; i++) {                lnb[i].page = pga[i].pg;                rnb[i].offset = pga[i].off;                rnb[i].len = pga[i].count;        }        obdo_to_ioobj(oinfo->oi_oa, &ioo);        ioo.ioo_bufcnt = oa_bufs;        ret = filter_preprw(cmd, exp, oinfo->oi_oa, 1, &ioo,                            oa_bufs, rnb, lnb, oti);        if (ret != 0)                GOTO(out, ret);        ret = filter_commitrw(cmd, exp, oinfo->oi_oa, 1, &ioo,                              oa_bufs, lnb, oti, ret);out:        if (lnb)                OBD_FREE(lnb, oa_bufs * sizeof(struct niobuf_local));        if (rnb)                OBD_FREE(rnb, oa_bufs * sizeof(struct niobuf_remote));        RETURN(ret);}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?