filter_io.c
来自「lustre 1.6.5 source code」· C语言 代码 · 共 863 行 · 第 1/3 页
C
863 行
/* We still set up for ungranted pages so that granted pages * can be written to disk as they were promised, and portals * needs to keep the pages all aligned properly. */ lnb->dentry = dentry; lnb->offset = rnb->offset; lnb->len = rnb->len; lnb->flags = rnb->flags; /* * ost_brw_write()->ost_nio_pages_get() already initialized * lnb->page to point to the page from the per-thread page * pool (bug 5137), initialize page. */ LASSERT(lnb->page != NULL); if (lnb->len != CFS_PAGE_SIZE) { memset(kmap(lnb->page) + lnb->len, 0, CFS_PAGE_SIZE - lnb->len); kunmap(lnb->page); } lnb->page->index = lnb->offset >> CFS_PAGE_SHIFT; cleanup_phase = 4; /* If the filter writes a partial page, then has the file * extended, the client will read in the whole page. the * filter has to be careful to zero the rest of the partial * page on disk. we do it by hand for partial extending * writes, send_bio() is responsible for zeroing pages when * asked to read unmapped blocks -- brw_kiovec() does this. */ if (lnb->len != CFS_PAGE_SIZE) { __s64 maxidx; maxidx = ((i_size_read(dentry->d_inode) + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT) - 1; if (maxidx >= lnb->page->index) { LL_CDEBUG_PAGE(D_PAGE, lnb->page, "write %u @ " LPU64" flg %x before EOF %llu\n", lnb->len, lnb->offset,lnb->flags, i_size_read(dentry->d_inode)); filter_iobuf_add_page(exp->exp_obd, iobuf, dentry->d_inode, lnb->page); } else { long off; char *p = kmap(lnb->page); off = lnb->offset & ~CFS_PAGE_MASK; if (off) memset(p, 0, off); off = (lnb->offset + lnb->len) & ~CFS_PAGE_MASK; if (off) memset(p + off, 0, CFS_PAGE_SIZE - off); kunmap(lnb->page); } } if (lnb->rc == 0) tot_bytes += lnb->len; } rc = filter_direct_io(OBD_BRW_READ, dentry, iobuf, exp, NULL, NULL, NULL); fsfilt_check_slow(exp->exp_obd, now, "start_page_write"); lprocfs_counter_add(exp->exp_obd->obd_stats, LPROC_FILTER_WRITE_BYTES, tot_bytes); if (exp->exp_nid_stats && exp->exp_nid_stats->nid_stats) lprocfs_counter_add(exp->exp_nid_stats->nid_stats, LPROC_FILTER_WRITE_BYTES, tot_bytes); EXIT;cleanup: switch(cleanup_phase) { case 4: case 3: filter_iobuf_put(&exp->exp_obd->u.filter, iobuf, oti); case 2: pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL); if (rc) f_dput(dentry); break; case 1: filter_iobuf_put(&exp->exp_obd->u.filter, iobuf, oti); case 0: spin_lock(&exp->exp_obd->obd_osfs_lock); if (oa) filter_grant_incoming(exp, oa); spin_unlock(&exp->exp_obd->obd_osfs_lock); pop_ctxt(&saved, &exp->exp_obd->obd_lvfs_ctxt, NULL); break; default:; } return rc;}int filter_preprw(int cmd, struct obd_export *exp, struct obdo *oa, int objcount, struct obd_ioobj *obj, int niocount, struct niobuf_remote *nb, struct niobuf_local *res, struct obd_trans_info *oti){ if (cmd == OBD_BRW_WRITE) return filter_preprw_write(cmd, exp, oa, objcount, obj, niocount, nb, res, oti); if (cmd == OBD_BRW_READ) return filter_preprw_read(cmd, exp, oa, objcount, obj, niocount, nb, res, oti); LBUG(); return -EPROTO;}void filter_release_read_page(struct filter_obd *filter, struct inode *inode, struct page *page){ int drop = 0; if (inode != NULL && (i_size_read(inode) > filter->fo_readcache_max_filesize)) drop = 1; /* drop from cache like truncate_list_pages() */ if (drop && !TryLockPage(page)) { if (page->mapping) ll_truncate_complete_page(page); unlock_page(page); } page_cache_release(page);}static int filter_commitrw_read(struct obd_export *exp, struct obdo *oa, int objcount, struct obd_ioobj *obj, int niocount, struct niobuf_local *res, struct obd_trans_info *oti, int rc){ struct inode *inode = NULL; struct ldlm_res_id res_id = { .name = { obj->ioo_id } }; struct ldlm_resource *resource = NULL; struct ldlm_namespace *ns = exp->exp_obd->obd_namespace; ENTRY; /* If oa != NULL then filter_preprw_read updated the inode atime * and we should update the lvb so that other glimpses will also * get the updated value. bug 5972 */ if (oa && ns && ns->ns_lvbo && ns->ns_lvbo->lvbo_update) { resource = ldlm_resource_get(ns, NULL, res_id, LDLM_EXTENT, 0); if (resource != NULL) { ns->ns_lvbo->lvbo_update(resource, NULL, 0, 1); ldlm_resource_putref(resource); } } if (res->dentry != NULL) inode = res->dentry->d_inode; filter_free_dio_pages(objcount, obj, niocount, res); if (res->dentry != NULL) f_dput(res->dentry); RETURN(rc);}void flip_into_page_cache(struct inode *inode, struct page *new_page){ struct page *old_page; int rc; do { /* the dlm is protecting us from read/write concurrency, so we * expect this find_lock_page to return quickly. even if we * race with another writer it won't be doing much work with * the page locked. we do this 'cause t_c_p expects a * locked page, and it wants to grab the pagecache lock * as well. */ old_page = find_lock_page(inode->i_mapping, new_page->index); if (old_page) { ll_truncate_complete_page(old_page); unlock_page(old_page); page_cache_release(old_page); }#if 0 /* this should be a /proc tunable someday */ /* racing o_directs (no locking ioctl) could race adding * their pages, so we repeat the page invalidation unless * we successfully added our new page */ rc = add_to_page_cache_unique(new_page, inode->i_mapping, new_page->index, page_hash(inode->i_mapping, new_page->index)); if (rc == 0) { /* add_to_page_cache clears uptodate|dirty and locks * the page */ SetPageUptodate(new_page); unlock_page(new_page); }#else rc = 0;#endif } while (rc != 0);}void filter_grant_commit(struct obd_export *exp, int niocount, struct niobuf_local *res){ struct filter_obd *filter = &exp->exp_obd->u.filter; struct niobuf_local *lnb = res; unsigned long pending = 0; int i; spin_lock(&exp->exp_obd->obd_osfs_lock); for (i = 0, lnb = res; i < niocount; i++, lnb++) pending += lnb->lnb_grant_used; LASSERTF(exp->exp_filter_data.fed_pending >= pending, "%s: cli %s/%p fed_pending: %lu grant_used: %lu\n", exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp, exp->exp_filter_data.fed_pending, pending); exp->exp_filter_data.fed_pending -= pending; LASSERTF(filter->fo_tot_granted >= pending, "%s: cli %s/%p tot_granted: "LPU64" grant_used: %lu\n", exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp, exp->exp_obd->u.filter.fo_tot_granted, pending); filter->fo_tot_granted -= pending; LASSERTF(filter->fo_tot_pending >= pending, "%s: cli %s/%p tot_pending: "LPU64" grant_used: %lu\n", exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp, filter->fo_tot_pending, pending); filter->fo_tot_pending -= pending; spin_unlock(&exp->exp_obd->obd_osfs_lock);}int filter_commitrw(int cmd, struct obd_export *exp, struct obdo *oa, int objcount, struct obd_ioobj *obj, int niocount, struct niobuf_local *res, struct obd_trans_info *oti, int rc){ if (cmd == OBD_BRW_WRITE) return filter_commitrw_write(exp, oa, objcount, obj, niocount, res, oti, rc); if (cmd == OBD_BRW_READ) return filter_commitrw_read(exp, oa, objcount, obj, niocount, res, oti, rc); LBUG(); return -EPROTO;}int filter_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo, obd_count oa_bufs, struct brw_page *pga, struct obd_trans_info *oti){ struct obd_ioobj ioo; struct niobuf_local *lnb; struct niobuf_remote *rnb; obd_count i; int ret = 0; ENTRY; OBD_ALLOC(lnb, oa_bufs * sizeof(struct niobuf_local)); OBD_ALLOC(rnb, oa_bufs * sizeof(struct niobuf_remote)); if (lnb == NULL || rnb == NULL) GOTO(out, ret = -ENOMEM); for (i = 0; i < oa_bufs; i++) { lnb[i].page = pga[i].pg; rnb[i].offset = pga[i].off; rnb[i].len = pga[i].count; } obdo_to_ioobj(oinfo->oi_oa, &ioo); ioo.ioo_bufcnt = oa_bufs; ret = filter_preprw(cmd, exp, oinfo->oi_oa, 1, &ioo, oa_bufs, rnb, lnb, oti); if (ret != 0) GOTO(out, ret); ret = filter_commitrw(cmd, exp, oinfo->oi_oa, 1, &ioo, oa_bufs, lnb, oti, ret);out: if (lnb) OBD_FREE(lnb, oa_bufs * sizeof(struct niobuf_local)); if (rnb) OBD_FREE(rnb, oa_bufs * sizeof(struct niobuf_remote)); RETURN(ret);}
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?