⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 rw.c

📁 lustre 1.6.5 source code
💻 C
📖 第 1 页 / 共 2 页
字号:
/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * * Lustre Light block IO * *  Copyright (c) 2002-2004 Cluster File Systems, Inc. * *   This file is part of Lustre, http://www.lustre.org. * *   Lustre is free software; you can redistribute it and/or *   modify it under the terms of version 2 of the GNU General Public *   License as published by the Free Software Foundation. * *   Lustre is distributed in the hope that it will be useful, *   but WITHOUT ANY WARRANTY; without even the implied warranty of *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the *   GNU General Public License for more details. * *   You should have received a copy of the GNU General Public License *   along with Lustre; if not, write to the Free Software *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */#define DEBUG_SUBSYSTEM S_LLITE#include <stdlib.h>#include <string.h>#include <assert.h>#include <time.h>#include <sys/types.h>#include <sys/stat.h>#include <sys/queue.h>#include <fcntl.h>#include <sys/uio.h>#include <sysio.h>#ifdef HAVE_XTIO_H#include <xtio.h>#endif#include <fs.h>#include <mount.h>#include <inode.h>#ifdef HAVE_FILE_H#include <file.h>#endif#undef LIST_HEAD#include "llite_lib.h"struct llu_io_group{        struct obd_io_group    *lig_oig;        struct inode           *lig_inode;        struct lustre_rw_params *lig_params;        int                     lig_maxpages;        int                     lig_npages;        __u64                   lig_rwcount;        struct ll_async_page   *lig_llaps;        struct page            *lig_pages;        void                   *lig_llap_cookies;};#define LLU_IO_GROUP_SIZE(x) \        (sizeof(struct llu_io_group) + \         (sizeof(struct ll_async_page) + \          sizeof(struct page) + \          llap_cookie_size) * (x))struct llu_io_session{        struct inode           *lis_inode;        int                     lis_cmd;        int                     lis_max_groups;        int                     lis_ngroups;        struct llu_io_group    *lis_groups[0];};#define LLU_IO_SESSION_SIZE(x)  \        (sizeof(struct llu_io_session) + (x) * 2 * sizeof(void *))typedef ssize_t llu_file_piov_t(const struct iovec *iovec, int iovlen,                                _SYSIO_OFF_T pos, ssize_t len,                                void *private);size_t llap_cookie_size;static int llu_lock_to_stripe_offset(struct inode *inode, struct ldlm_lock *lock){        struct llu_inode_info *lli = llu_i2info(inode);        struct lov_stripe_md *lsm = lli->lli_smd;        struct obd_export *exp = llu_i2obdexp(inode);        struct {                char name[16];                struct ldlm_lock *lock;                struct lov_stripe_md *lsm;        } key = { .name = "lock_to_stripe", .lock = lock, .lsm = lsm };        __u32 stripe, vallen = sizeof(stripe);        int rc;        ENTRY;        if (lsm->lsm_stripe_count == 1)                RETURN(0);        /* get our offset in the lov */        rc = obd_get_info(exp, sizeof(key), &key, &vallen, &stripe);        if (rc != 0) {                CERROR("obd_get_info: rc = %d\n", rc);                LBUG();        }        LASSERT(stripe < lsm->lsm_stripe_count);        RETURN(stripe);}int llu_extent_lock_cancel_cb(struct ldlm_lock *lock,                                    struct ldlm_lock_desc *new, void *data,                                    int flag){        struct lustre_handle lockh = { 0 };        int rc;        ENTRY;        if ((unsigned long)data > 0 && (unsigned long)data < 0x1000) {                LDLM_ERROR(lock, "cancelling lock with bad data %p", data);                LBUG();        }        switch (flag) {        case LDLM_CB_BLOCKING:                ldlm_lock2handle(lock, &lockh);                rc = ldlm_cli_cancel(&lockh);                if (rc != ELDLM_OK)                        CERROR("ldlm_cli_cancel failed: %d\n", rc);                break;        case LDLM_CB_CANCELING: {                struct inode *inode;                struct llu_inode_info *lli;                struct lov_stripe_md *lsm;                __u32 stripe;                __u64 kms;                /* This lock wasn't granted, don't try to evict pages */                if (lock->l_req_mode != lock->l_granted_mode)                        RETURN(0);                inode = llu_inode_from_lock(lock);                if (!inode)                        RETURN(0);                lli= llu_i2info(inode);                if (!lli)                        goto iput;                if (!lli->lli_smd)                        goto iput;                lsm = lli->lli_smd;                stripe = llu_lock_to_stripe_offset(inode, lock);                lock_res_and_lock(lock);                kms = ldlm_extent_shift_kms(lock,                                            lsm->lsm_oinfo[stripe]->loi_kms);                unlock_res_and_lock(lock);                if (lsm->lsm_oinfo[stripe]->loi_kms != kms)                        LDLM_DEBUG(lock, "updating kms from "LPU64" to "LPU64,                                   lsm->lsm_oinfo[stripe]->loi_kms, kms);                lsm->lsm_oinfo[stripe]->loi_kms = kms;iput:                I_RELE(inode);                break;        }        default:                LBUG();        }        RETURN(0);}static int llu_glimpse_callback(struct ldlm_lock *lock, void *reqp){        struct ptlrpc_request *req = reqp;        struct inode *inode = llu_inode_from_lock(lock);        struct llu_inode_info *lli;        struct ost_lvb *lvb;        int size[2] = { sizeof(struct ptlrpc_body), sizeof(*lvb) };        int rc, stripe = 0;        ENTRY;        if (inode == NULL)                GOTO(out, rc = -ELDLM_NO_LOCK_DATA);        lli = llu_i2info(inode);        if (lli == NULL)                GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);        if (lli->lli_smd == NULL)                GOTO(iput, rc = -ELDLM_NO_LOCK_DATA);        /* First, find out which stripe index this lock corresponds to. */        if (lli->lli_smd->lsm_stripe_count > 1)                stripe = llu_lock_to_stripe_offset(inode, lock);        rc = lustre_pack_reply(req, 2, size, NULL);        if (rc)                GOTO(iput, rc);        lvb = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*lvb));        lvb->lvb_size = lli->lli_smd->lsm_oinfo[stripe]->loi_kms;        LDLM_DEBUG(lock, "i_size: %llu -> stripe number %u -> kms "LPU64,                   (long long)llu_i2stat(inode)->st_size, stripe,lvb->lvb_size); iput:        I_RELE(inode); out:        /* These errors are normal races, so we don't want to fill the console         * with messages by calling ptlrpc_error() */        if (rc == -ELDLM_NO_LOCK_DATA)                lustre_pack_reply(req, 1, NULL, NULL);        req->rq_status = rc;        return rc;}/* NB: lov_merge_size will prefer locally cached writes if they extend the * file (because it prefers KMS over RSS when larger) */int llu_glimpse_size(struct inode *inode){        struct llu_inode_info *lli = llu_i2info(inode);        struct intnl_stat *st = llu_i2stat(inode);        struct llu_sb_info *sbi = llu_i2sbi(inode);        struct lustre_handle lockh = { 0 };        struct ldlm_enqueue_info einfo = { 0 };        struct obd_info oinfo = { { { 0 } } };        struct ost_lvb lvb;        int rc;        ENTRY;        CDEBUG(D_DLMTRACE, "Glimpsing inode %llu\n", (long long)st->st_ino);        if (!lli->lli_smd) {                CDEBUG(D_DLMTRACE, "No objects for inode %llu\n",                        (long long)st->st_ino);                RETURN(0);        }        einfo.ei_type = LDLM_EXTENT;        einfo.ei_mode = LCK_PR;        einfo.ei_cb_bl = osc_extent_blocking_cb;        einfo.ei_cb_cp = ldlm_completion_ast;        einfo.ei_cb_gl = llu_glimpse_callback;        einfo.ei_cbdata = inode;        oinfo.oi_policy.l_extent.end = OBD_OBJECT_EOF;        oinfo.oi_lockh = &lockh;        oinfo.oi_md = lli->lli_smd;        oinfo.oi_flags = LDLM_FL_HAS_INTENT;        rc = obd_enqueue_rqset(sbi->ll_osc_exp, &oinfo, &einfo);        if (rc) {                CERROR("obd_enqueue returned rc %d, returning -EIO\n", rc);                RETURN(rc > 0 ? -EIO : rc);        }        inode_init_lvb(inode, &lvb);        rc = obd_merge_lvb(sbi->ll_osc_exp, lli->lli_smd, &lvb, 0);        st->st_size = lvb.lvb_size;        st->st_blocks = lvb.lvb_blocks;        /* handle st_blocks overflow gracefully */        if (st->st_blocks < lvb.lvb_blocks)                st->st_blocks = ~0UL;        st->st_mtime = lvb.lvb_mtime;        st->st_atime = lvb.lvb_atime;        st->st_ctime = lvb.lvb_ctime;        CDEBUG(D_DLMTRACE, "glimpse: size: "LPU64", blocks: "LPU64"\n",               (__u64)st->st_size, (__u64)st->st_blocks);        RETURN(rc);}int llu_extent_lock(struct ll_file_data *fd, struct inode *inode,                    struct lov_stripe_md *lsm, int mode,                    ldlm_policy_data_t *policy, struct lustre_handle *lockh,                    int ast_flags){        struct llu_sb_info *sbi = llu_i2sbi(inode);        struct intnl_stat *st = llu_i2stat(inode);        struct ldlm_enqueue_info einfo = { 0 };        struct obd_info oinfo = { { { 0 } } };        struct ost_lvb lvb;        int rc;        ENTRY;        LASSERT(!lustre_handle_is_used(lockh));        CLASSERT(ELDLM_OK == 0);        /* XXX phil: can we do this?  won't it screw the file size up? */        if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||            (sbi->ll_flags & LL_SBI_NOLCK) || mode == LCK_NL)                RETURN(0);        CDEBUG(D_DLMTRACE, "Locking inode %llu, start "LPU64" end "LPU64"\n",               (long long)st->st_ino, policy->l_extent.start,               policy->l_extent.end);        einfo.ei_type = LDLM_EXTENT;        einfo.ei_mode = mode;        einfo.ei_cb_bl = osc_extent_blocking_cb;        einfo.ei_cb_cp = ldlm_completion_ast;        einfo.ei_cb_gl = llu_glimpse_callback;        einfo.ei_cbdata = inode;        oinfo.oi_policy = *policy;        oinfo.oi_lockh = lockh;        oinfo.oi_md = lsm;        oinfo.oi_flags = ast_flags;        rc = obd_enqueue(sbi->ll_osc_exp, &oinfo, &einfo, NULL);        *policy = oinfo.oi_policy;        if (rc > 0)                rc = -EIO;        inode_init_lvb(inode, &lvb);        obd_merge_lvb(sbi->ll_osc_exp, lsm, &lvb, 1);        if (policy->l_extent.start == 0 &&            policy->l_extent.end == OBD_OBJECT_EOF)                st->st_size = lvb.lvb_size;        if (rc == 0) {                st->st_mtime = lvb.lvb_mtime;                st->st_atime = lvb.lvb_atime;                st->st_ctime = lvb.lvb_ctime;        }        RETURN(rc);}int llu_extent_unlock(struct ll_file_data *fd, struct inode *inode,                struct lov_stripe_md *lsm, int mode,                struct lustre_handle *lockh){        struct llu_sb_info *sbi = llu_i2sbi(inode);        int rc;        ENTRY;        CLASSERT(ELDLM_OK == 0);        /* XXX phil: can we do this?  won't it screw the file size up? */        if ((fd && (fd->fd_flags & LL_FILE_IGNORE_LOCK)) ||            (sbi->ll_flags & LL_SBI_NOLCK) || mode == LCK_NL)                RETURN(0);        rc = obd_cancel(sbi->ll_osc_exp, lsm, mode, lockh);        RETURN(rc);}#define LLAP_MAGIC 12346789struct ll_async_page {        int             llap_magic;        void           *llap_cookie;        int             llap_queued;        struct page    *llap_page;        struct inode   *llap_inode;};static void llu_ap_fill_obdo(void *data, int cmd, struct obdo *oa){        struct ll_async_page *llap;        struct inode *inode;        struct lov_stripe_md *lsm;        obd_flag valid_flags;        ENTRY;        llap = LLAP_FROM_COOKIE(data);        inode = llap->llap_inode;        lsm = llu_i2info(inode)->lli_smd;        oa->o_id = lsm->lsm_object_id;        oa->o_valid = OBD_MD_FLID;        valid_flags = OBD_MD_FLTYPE | OBD_MD_FLATIME;        if (cmd & OBD_BRW_WRITE)                valid_flags |= OBD_MD_FLMTIME | OBD_MD_FLCTIME |                        OBD_MD_FLUID | OBD_MD_FLGID |                        OBD_MD_FLFID | OBD_MD_FLGENER;        obdo_from_inode(oa, inode, valid_flags);        EXIT;}static void llu_ap_update_obdo(void *data, int cmd, struct obdo *oa,                               obd_valid valid){        struct ll_async_page *llap;        ENTRY;        llap = LLAP_FROM_COOKIE(data);        obdo_from_inode(oa, llap->llap_inode, valid);        EXIT;}/* called for each page in a completed rpc.*/static int llu_ap_completion(void *data, int cmd, struct obdo *oa, int rc){        struct ll_async_page *llap;        struct page *page;        ENTRY;        llap = LLAP_FROM_COOKIE(data);        llap->llap_queued = 0;        page = llap->llap_page;        if (rc != 0) {                if (cmd & OBD_BRW_WRITE)                        CERROR("writeback error on page %p index %ld: %d\n",                               page, page->index, rc);        }        RETURN(0);}static struct obd_async_page_ops llu_async_page_ops = {        .ap_make_ready =        NULL,        .ap_refresh_count =     NULL,        .ap_fill_obdo =         llu_ap_fill_obdo,        .ap_update_obdo =       llu_ap_update_obdo,        .ap_completion =        llu_ap_completion,};static int llu_queue_pio(int cmd, struct llu_io_group *group,                         char *buf, size_t count, loff_t pos){        struct llu_inode_info *lli = llu_i2info(group->lig_inode);        struct intnl_stat *st = llu_i2stat(group->lig_inode);        struct lov_stripe_md *lsm = lli->lli_smd;        struct obd_export *exp = llu_i2obdexp(group->lig_inode);        struct page *pages = &group->lig_pages[group->lig_npages],*page = pages;        struct ll_async_page *llap = &group->lig_llaps[group->lig_npages];        void *llap_cookie = group->lig_llap_cookies +                llap_cookie_size * group->lig_npages;        int i, rc, npages = 0, ret_bytes = 0;        int local_lock;        ENTRY;        if (!exp)                RETURN(-EINVAL);        local_lock = group->lig_params->lrp_lock_mode != LCK_NL;        /* prepare the pages array */	do {                unsigned long index, offset, bytes;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -