lov_request.c

来自「lustre 1.6.5 source code」· C语言 代码 · 共 1,640 行 · 第 1/4 页

C
1,640
字号
/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * * Copyright (C) 2002, 2003 Cluster File Systems, Inc. * *   This file is part of the Lustre file system, http://www.lustre.org *   Lustre is a trademark of Cluster File Systems, Inc. * *   You may have signed or agreed to another license before downloading *   this software.  If so, you are bound by the terms and conditions *   of that agreement, and the following does not apply to you.  See the *   LICENSE file included with this distribution for more information. * *   If you did not agree to a different license, then this copy of Lustre *   is open source software; you can redistribute it and/or modify it *   under the terms of version 2 of the GNU General Public License as *   published by the Free Software Foundation. * *   In either case, Lustre is distributed in the hope that it will be *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the *   license text for more details. */#ifndef EXPORT_SYMTAB# define EXPORT_SYMTAB#endif#define DEBUG_SUBSYSTEM S_LOV#ifdef __KERNEL__#include <libcfs/libcfs.h>#else#include <liblustre.h>#endif#include <obd_class.h>#include <obd_lov.h>#include <lustre/lustre_idl.h>#include "lov_internal.h"static void lov_init_set(struct lov_request_set *set){        set->set_count = 0;        set->set_completes = 0;        set->set_success = 0;        set->set_cookies = 0;        CFS_INIT_LIST_HEAD(&set->set_list);        atomic_set(&set->set_refcount, 1);}static void lov_finish_set(struct lov_request_set *set){        struct list_head *pos, *n;        ENTRY;        LASSERT(set);        list_for_each_safe(pos, n, &set->set_list) {                struct lov_request *req = list_entry(pos, struct lov_request,                                                     rq_link);                list_del_init(&req->rq_link);                if (req->rq_oi.oi_oa)                        OBDO_FREE(req->rq_oi.oi_oa);                if (req->rq_oi.oi_md)                        OBD_FREE(req->rq_oi.oi_md, req->rq_buflen);                if (req->rq_oi.oi_osfs)                        OBD_FREE(req->rq_oi.oi_osfs,                                 sizeof(*req->rq_oi.oi_osfs));                OBD_FREE(req, sizeof(*req));        }        if (set->set_pga) {                int len = set->set_oabufs * sizeof(*set->set_pga);                OBD_FREE(set->set_pga, len);        }        if (set->set_lockh)                lov_llh_put(set->set_lockh);        OBD_FREE(set, sizeof(*set));        EXIT;}void lov_update_set(struct lov_request_set *set,                    struct lov_request *req, int rc){        req->rq_complete = 1;        req->rq_rc = rc;        set->set_completes++;        if (rc == 0)                set->set_success++;}int lov_update_common_set(struct lov_request_set *set,                          struct lov_request *req, int rc){        struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;        ENTRY;        lov_update_set(set, req, rc);        /* grace error on inactive ost */        if (rc && !(lov->lov_tgts[req->rq_idx] &&                     lov->lov_tgts[req->rq_idx]->ltd_active))                rc = 0;        /* FIXME in raid1 regime, should return 0 */        RETURN(rc);}void lov_set_add_req(struct lov_request *req, struct lov_request_set *set){        list_add_tail(&req->rq_link, &set->set_list);        set->set_count++;}int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc){        struct lov_request_set *set = req->rq_rqset;        struct lustre_handle *lov_lockhp;        struct lov_oinfo *loi;        ENTRY;        LASSERT(set != NULL);        LASSERT(set->set_oi != NULL);        lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;        loi = set->set_oi->oi_md->lsm_oinfo[req->rq_stripe];        /* XXX LOV STACKING: OSC gets a copy, created in lov_prep_enqueue_set         * and that copy can be arbitrarily out of date.         *         * The LOV API is due for a serious rewriting anyways, and this         * can be addressed then. */        if (rc == ELDLM_OK) {                struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);                __u64 tmp;                LASSERT(lock != NULL);                lov_stripe_lock(set->set_oi->oi_md);                loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb;                tmp = loi->loi_lvb.lvb_size;                /* Extend KMS up to the end of this lock and no further                 * A lock on [x,y] means a KMS of up to y + 1 bytes! */                if (tmp > lock->l_policy_data.l_extent.end)                        tmp = lock->l_policy_data.l_extent.end + 1;                if (tmp >= loi->loi_kms) {                        LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64                                   ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);                        loi->loi_kms = tmp;                        loi->loi_kms_valid = 1;                } else {                        LDLM_DEBUG(lock, "lock acquired, setting rss="                                   LPU64"; leaving kms="LPU64", end="LPU64,                                   loi->loi_lvb.lvb_size, loi->loi_kms,                                   lock->l_policy_data.l_extent.end);                }                lov_stripe_unlock(set->set_oi->oi_md);                ldlm_lock_allow_match(lock);                LDLM_LOCK_PUT(lock);        } else if ((rc == ELDLM_LOCK_ABORTED) &&                   (set->set_oi->oi_flags & LDLM_FL_HAS_INTENT)) {                memset(lov_lockhp, 0, sizeof(*lov_lockhp));                lov_stripe_lock(set->set_oi->oi_md);                loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb;                lov_stripe_unlock(set->set_oi->oi_md);                CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"                       " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);                rc = ELDLM_OK;        } else {                struct obd_export *exp = set->set_exp;                struct lov_obd *lov = &exp->exp_obd->u.lov;                memset(lov_lockhp, 0, sizeof(*lov_lockhp));                if (lov->lov_tgts[req->rq_idx] &&                    lov->lov_tgts[req->rq_idx]->ltd_active) {                        /* -EUSERS used by OST to report file contention */                        if (rc != -EINTR && rc != -EUSERS)                                CERROR("enqueue objid "LPX64" subobj "                                       LPX64" on OST idx %d: rc %d\n",                                       set->set_oi->oi_md->lsm_object_id,                                       loi->loi_id, loi->loi_ost_idx, rc);                } else {                        rc = ELDLM_OK;                }        }        lov_update_set(set, req, rc);        RETURN(rc);}/* The callback for osc_enqueue that updates lov info for every OSC request. */static int cb_update_enqueue(struct obd_info *oinfo, int rc){        struct ldlm_enqueue_info *einfo;        struct lov_request *lovreq;        lovreq = container_of(oinfo, struct lov_request, rq_oi);        einfo = lovreq->rq_rqset->set_ei;        return lov_update_enqueue_set(lovreq, einfo->ei_mode, rc);}static int enqueue_done(struct lov_request_set *set, __u32 mode){        struct lov_request *req;        struct lov_obd *lov = &set->set_exp->exp_obd->u.lov;        int rc = 0;        ENTRY;        /* enqueue/match success, just return */        if (set->set_completes && set->set_completes == set->set_success)                RETURN(0);        /* cancel enqueued/matched locks */        list_for_each_entry(req, &set->set_list, rq_link) {                struct lustre_handle *lov_lockhp;                if (!req->rq_complete || req->rq_rc)                        continue;                lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe;                LASSERT(lov_lockhp);                if (!lustre_handle_is_used(lov_lockhp))                        continue;                rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp,                                req->rq_oi.oi_md, mode, lov_lockhp);                if (rc && lov->lov_tgts[req->rq_idx] &&                    lov->lov_tgts[req->rq_idx]->ltd_active)                        CERROR("cancelling obdjid "LPX64" on OST "                               "idx %d error: rc = %d\n",                               req->rq_oi.oi_md->lsm_object_id,                               req->rq_idx, rc);        }        if (set->set_lockh)                lov_llh_put(set->set_lockh);        RETURN(rc);}int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc,                         struct ptlrpc_request_set *rqset){        int ret = 0;        ENTRY;        if (set == NULL)                RETURN(0);        LASSERT(set->set_exp);        /* Do enqueue_done only for sync requests and if any request         * succeeded. */        if (!rqset) {                if (rc)                        set->set_completes = 0;                ret = enqueue_done(set, mode);        } else if (set->set_lockh)                lov_llh_put(set->set_lockh);        if (atomic_dec_and_test(&set->set_refcount))                lov_finish_set(set);        RETURN(rc ? rc : ret);}int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo,                         struct ldlm_enqueue_info *einfo,                         struct lov_request_set **reqset){        struct lov_obd *lov = &exp->exp_obd->u.lov;        struct lov_request_set *set;        int i, rc = 0;        struct lov_oinfo *loi;        ENTRY;        OBD_ALLOC(set, sizeof(*set));        if (set == NULL)                RETURN(-ENOMEM);        lov_init_set(set);        set->set_exp = exp;        set->set_oi = oinfo;        set->set_ei = einfo;        set->set_lockh = lov_llh_new(oinfo->oi_md);        if (set->set_lockh == NULL)                GOTO(out_set, rc = -ENOMEM);        oinfo->oi_lockh->cookie = set->set_lockh->llh_handle.h_cookie;        for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) {                struct lov_request *req;                obd_off start, end;                loi = oinfo->oi_md->lsm_oinfo[i];                if (!lov_stripe_intersects(oinfo->oi_md, i,                                           oinfo->oi_policy.l_extent.start,                                           oinfo->oi_policy.l_extent.end,                                           &start, &end))                        continue;                if (!lov->lov_tgts[loi->loi_ost_idx] ||                    !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) {                        CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx);                        continue;                }                OBD_ALLOC(req, sizeof(*req));                if (req == NULL)                        GOTO(out_set, rc = -ENOMEM);                req->rq_buflen = sizeof(*req->rq_oi.oi_md) +                        sizeof(struct lov_oinfo *) +                        sizeof(struct lov_oinfo);                OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen);                if (req->rq_oi.oi_md == NULL) {                        OBD_FREE(req, sizeof(*req));                        GOTO(out_set, rc = -ENOMEM);                }                req->rq_oi.oi_md->lsm_oinfo[0] =                        ((void *)req->rq_oi.oi_md) + sizeof(*req->rq_oi.oi_md) +                        sizeof(struct lov_oinfo *);                req->rq_rqset = set;                /* Set lov request specific parameters. */                req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i;                req->rq_oi.oi_cb_up = cb_update_enqueue;                req->rq_oi.oi_flags = oinfo->oi_flags;                LASSERT(req->rq_oi.oi_lockh);                req->rq_oi.oi_policy.l_extent.gid =                        oinfo->oi_policy.l_extent.gid;                req->rq_oi.oi_policy.l_extent.start = start;                req->rq_oi.oi_policy.l_extent.end = end;                req->rq_idx = loi->loi_ost_idx;                req->rq_stripe = i;                /* XXX LOV STACKING: submd should be from the subobj */                req->rq_oi.oi_md->lsm_object_id = loi->loi_id;                req->rq_oi.oi_md->lsm_stripe_count = 0;                req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms_valid =                        loi->loi_kms_valid;                req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms = loi->loi_kms;                req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb = loi->loi_lvb;                lov_set_add_req(req, set);        }        if (!set->set_count)                GOTO(out_set, rc = -EIO);        *reqset = set;        RETURN(0);out_set:        lov_fini_enqueue_set(set, einfo->ei_mode, rc, NULL);        RETURN(rc);}int lov_update_match_set(struct lov_request_set *set, struct lov_request *req,                         int rc){        int ret = rc;        ENTRY;        if (rc > 0)                ret = 0;        else if (rc == 0)                ret = 1;        lov_update_set(set, req, ret);        RETURN(rc);}int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags){        int rc = 0;        ENTRY;        if (set == NULL)                RETURN(0);        LASSERT(set->set_exp);        rc = enqueue_done(set, mode);        if ((set->set_count == set->set_success) &&            (flags & LDLM_FL_TEST_LOCK))                lov_llh_put(set->set_lockh);        if (atomic_dec_and_test(&set->set_refcount))                lov_finish_set(set);        RETURN(rc);}int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo,                       struct lov_stripe_md *lsm, ldlm_policy_data_t *policy,                       __u32 mode, struct lustre_handle *lockh,                       struct lov_request_set **reqset){        struct lov_obd *lov = &exp->exp_obd->u.lov;        struct lov_request_set *set;        int i, rc = 0;        struct lov_oinfo *loi;        ENTRY;        OBD_ALLOC(set, sizeof(*set));        if (set == NULL)                RETURN(-ENOMEM);        lov_init_set(set);        set->set_exp = exp;        set->set_oi = oinfo;        set->set_oi->oi_md = lsm;        set->set_lockh = lov_llh_new(lsm);        if (set->set_lockh == NULL)

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?