lov_request.c
来自「lustre 1.6.5 source code」· C语言 代码 · 共 1,640 行 · 第 1/4 页
C
1,640 行
/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * * Copyright (C) 2002, 2003 Cluster File Systems, Inc. * * This file is part of the Lustre file system, http://www.lustre.org * Lustre is a trademark of Cluster File Systems, Inc. * * You may have signed or agreed to another license before downloading * this software. If so, you are bound by the terms and conditions * of that agreement, and the following does not apply to you. See the * LICENSE file included with this distribution for more information. * * If you did not agree to a different license, then this copy of Lustre * is open source software; you can redistribute it and/or modify it * under the terms of version 2 of the GNU General Public License as * published by the Free Software Foundation. * * In either case, Lustre is distributed in the hope that it will be * useful, but WITHOUT ANY WARRANTY; without even the implied warranty * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * license text for more details. */#ifndef EXPORT_SYMTAB# define EXPORT_SYMTAB#endif#define DEBUG_SUBSYSTEM S_LOV#ifdef __KERNEL__#include <libcfs/libcfs.h>#else#include <liblustre.h>#endif#include <obd_class.h>#include <obd_lov.h>#include <lustre/lustre_idl.h>#include "lov_internal.h"static void lov_init_set(struct lov_request_set *set){ set->set_count = 0; set->set_completes = 0; set->set_success = 0; set->set_cookies = 0; CFS_INIT_LIST_HEAD(&set->set_list); atomic_set(&set->set_refcount, 1);}static void lov_finish_set(struct lov_request_set *set){ struct list_head *pos, *n; ENTRY; LASSERT(set); list_for_each_safe(pos, n, &set->set_list) { struct lov_request *req = list_entry(pos, struct lov_request, rq_link); list_del_init(&req->rq_link); if (req->rq_oi.oi_oa) OBDO_FREE(req->rq_oi.oi_oa); if (req->rq_oi.oi_md) OBD_FREE(req->rq_oi.oi_md, req->rq_buflen); if (req->rq_oi.oi_osfs) OBD_FREE(req->rq_oi.oi_osfs, sizeof(*req->rq_oi.oi_osfs)); OBD_FREE(req, sizeof(*req)); } if (set->set_pga) { int len = set->set_oabufs * sizeof(*set->set_pga); OBD_FREE(set->set_pga, len); } if (set->set_lockh) lov_llh_put(set->set_lockh); OBD_FREE(set, sizeof(*set)); EXIT;}void lov_update_set(struct lov_request_set *set, struct lov_request *req, int rc){ req->rq_complete = 1; req->rq_rc = rc; set->set_completes++; if (rc == 0) set->set_success++;}int lov_update_common_set(struct lov_request_set *set, struct lov_request *req, int rc){ struct lov_obd *lov = &set->set_exp->exp_obd->u.lov; ENTRY; lov_update_set(set, req, rc); /* grace error on inactive ost */ if (rc && !(lov->lov_tgts[req->rq_idx] && lov->lov_tgts[req->rq_idx]->ltd_active)) rc = 0; /* FIXME in raid1 regime, should return 0 */ RETURN(rc);}void lov_set_add_req(struct lov_request *req, struct lov_request_set *set){ list_add_tail(&req->rq_link, &set->set_list); set->set_count++;}int lov_update_enqueue_set(struct lov_request *req, __u32 mode, int rc){ struct lov_request_set *set = req->rq_rqset; struct lustre_handle *lov_lockhp; struct lov_oinfo *loi; ENTRY; LASSERT(set != NULL); LASSERT(set->set_oi != NULL); lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe; loi = set->set_oi->oi_md->lsm_oinfo[req->rq_stripe]; /* XXX LOV STACKING: OSC gets a copy, created in lov_prep_enqueue_set * and that copy can be arbitrarily out of date. * * The LOV API is due for a serious rewriting anyways, and this * can be addressed then. */ if (rc == ELDLM_OK) { struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp); __u64 tmp; LASSERT(lock != NULL); lov_stripe_lock(set->set_oi->oi_md); loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb; tmp = loi->loi_lvb.lvb_size; /* Extend KMS up to the end of this lock and no further * A lock on [x,y] means a KMS of up to y + 1 bytes! */ if (tmp > lock->l_policy_data.l_extent.end) tmp = lock->l_policy_data.l_extent.end + 1; if (tmp >= loi->loi_kms) { LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64 ", kms="LPU64, loi->loi_lvb.lvb_size, tmp); loi->loi_kms = tmp; loi->loi_kms_valid = 1; } else { LDLM_DEBUG(lock, "lock acquired, setting rss=" LPU64"; leaving kms="LPU64", end="LPU64, loi->loi_lvb.lvb_size, loi->loi_kms, lock->l_policy_data.l_extent.end); } lov_stripe_unlock(set->set_oi->oi_md); ldlm_lock_allow_match(lock); LDLM_LOCK_PUT(lock); } else if ((rc == ELDLM_LOCK_ABORTED) && (set->set_oi->oi_flags & LDLM_FL_HAS_INTENT)) { memset(lov_lockhp, 0, sizeof(*lov_lockhp)); lov_stripe_lock(set->set_oi->oi_md); loi->loi_lvb = req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb; lov_stripe_unlock(set->set_oi->oi_md); CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving" " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms); rc = ELDLM_OK; } else { struct obd_export *exp = set->set_exp; struct lov_obd *lov = &exp->exp_obd->u.lov; memset(lov_lockhp, 0, sizeof(*lov_lockhp)); if (lov->lov_tgts[req->rq_idx] && lov->lov_tgts[req->rq_idx]->ltd_active) { /* -EUSERS used by OST to report file contention */ if (rc != -EINTR && rc != -EUSERS) CERROR("enqueue objid "LPX64" subobj " LPX64" on OST idx %d: rc %d\n", set->set_oi->oi_md->lsm_object_id, loi->loi_id, loi->loi_ost_idx, rc); } else { rc = ELDLM_OK; } } lov_update_set(set, req, rc); RETURN(rc);}/* The callback for osc_enqueue that updates lov info for every OSC request. */static int cb_update_enqueue(struct obd_info *oinfo, int rc){ struct ldlm_enqueue_info *einfo; struct lov_request *lovreq; lovreq = container_of(oinfo, struct lov_request, rq_oi); einfo = lovreq->rq_rqset->set_ei; return lov_update_enqueue_set(lovreq, einfo->ei_mode, rc);}static int enqueue_done(struct lov_request_set *set, __u32 mode){ struct lov_request *req; struct lov_obd *lov = &set->set_exp->exp_obd->u.lov; int rc = 0; ENTRY; /* enqueue/match success, just return */ if (set->set_completes && set->set_completes == set->set_success) RETURN(0); /* cancel enqueued/matched locks */ list_for_each_entry(req, &set->set_list, rq_link) { struct lustre_handle *lov_lockhp; if (!req->rq_complete || req->rq_rc) continue; lov_lockhp = set->set_lockh->llh_handles + req->rq_stripe; LASSERT(lov_lockhp); if (!lustre_handle_is_used(lov_lockhp)) continue; rc = obd_cancel(lov->lov_tgts[req->rq_idx]->ltd_exp, req->rq_oi.oi_md, mode, lov_lockhp); if (rc && lov->lov_tgts[req->rq_idx] && lov->lov_tgts[req->rq_idx]->ltd_active) CERROR("cancelling obdjid "LPX64" on OST " "idx %d error: rc = %d\n", req->rq_oi.oi_md->lsm_object_id, req->rq_idx, rc); } if (set->set_lockh) lov_llh_put(set->set_lockh); RETURN(rc);}int lov_fini_enqueue_set(struct lov_request_set *set, __u32 mode, int rc, struct ptlrpc_request_set *rqset){ int ret = 0; ENTRY; if (set == NULL) RETURN(0); LASSERT(set->set_exp); /* Do enqueue_done only for sync requests and if any request * succeeded. */ if (!rqset) { if (rc) set->set_completes = 0; ret = enqueue_done(set, mode); } else if (set->set_lockh) lov_llh_put(set->set_lockh); if (atomic_dec_and_test(&set->set_refcount)) lov_finish_set(set); RETURN(rc ? rc : ret);}int lov_prep_enqueue_set(struct obd_export *exp, struct obd_info *oinfo, struct ldlm_enqueue_info *einfo, struct lov_request_set **reqset){ struct lov_obd *lov = &exp->exp_obd->u.lov; struct lov_request_set *set; int i, rc = 0; struct lov_oinfo *loi; ENTRY; OBD_ALLOC(set, sizeof(*set)); if (set == NULL) RETURN(-ENOMEM); lov_init_set(set); set->set_exp = exp; set->set_oi = oinfo; set->set_ei = einfo; set->set_lockh = lov_llh_new(oinfo->oi_md); if (set->set_lockh == NULL) GOTO(out_set, rc = -ENOMEM); oinfo->oi_lockh->cookie = set->set_lockh->llh_handle.h_cookie; for (i = 0; i < oinfo->oi_md->lsm_stripe_count; i++) { struct lov_request *req; obd_off start, end; loi = oinfo->oi_md->lsm_oinfo[i]; if (!lov_stripe_intersects(oinfo->oi_md, i, oinfo->oi_policy.l_extent.start, oinfo->oi_policy.l_extent.end, &start, &end)) continue; if (!lov->lov_tgts[loi->loi_ost_idx] || !lov->lov_tgts[loi->loi_ost_idx]->ltd_active) { CDEBUG(D_HA, "lov idx %d inactive\n", loi->loi_ost_idx); continue; } OBD_ALLOC(req, sizeof(*req)); if (req == NULL) GOTO(out_set, rc = -ENOMEM); req->rq_buflen = sizeof(*req->rq_oi.oi_md) + sizeof(struct lov_oinfo *) + sizeof(struct lov_oinfo); OBD_ALLOC(req->rq_oi.oi_md, req->rq_buflen); if (req->rq_oi.oi_md == NULL) { OBD_FREE(req, sizeof(*req)); GOTO(out_set, rc = -ENOMEM); } req->rq_oi.oi_md->lsm_oinfo[0] = ((void *)req->rq_oi.oi_md) + sizeof(*req->rq_oi.oi_md) + sizeof(struct lov_oinfo *); req->rq_rqset = set; /* Set lov request specific parameters. */ req->rq_oi.oi_lockh = set->set_lockh->llh_handles + i; req->rq_oi.oi_cb_up = cb_update_enqueue; req->rq_oi.oi_flags = oinfo->oi_flags; LASSERT(req->rq_oi.oi_lockh); req->rq_oi.oi_policy.l_extent.gid = oinfo->oi_policy.l_extent.gid; req->rq_oi.oi_policy.l_extent.start = start; req->rq_oi.oi_policy.l_extent.end = end; req->rq_idx = loi->loi_ost_idx; req->rq_stripe = i; /* XXX LOV STACKING: submd should be from the subobj */ req->rq_oi.oi_md->lsm_object_id = loi->loi_id; req->rq_oi.oi_md->lsm_stripe_count = 0; req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms_valid = loi->loi_kms_valid; req->rq_oi.oi_md->lsm_oinfo[0]->loi_kms = loi->loi_kms; req->rq_oi.oi_md->lsm_oinfo[0]->loi_lvb = loi->loi_lvb; lov_set_add_req(req, set); } if (!set->set_count) GOTO(out_set, rc = -EIO); *reqset = set; RETURN(0);out_set: lov_fini_enqueue_set(set, einfo->ei_mode, rc, NULL); RETURN(rc);}int lov_update_match_set(struct lov_request_set *set, struct lov_request *req, int rc){ int ret = rc; ENTRY; if (rc > 0) ret = 0; else if (rc == 0) ret = 1; lov_update_set(set, req, ret); RETURN(rc);}int lov_fini_match_set(struct lov_request_set *set, __u32 mode, int flags){ int rc = 0; ENTRY; if (set == NULL) RETURN(0); LASSERT(set->set_exp); rc = enqueue_done(set, mode); if ((set->set_count == set->set_success) && (flags & LDLM_FL_TEST_LOCK)) lov_llh_put(set->set_lockh); if (atomic_dec_and_test(&set->set_refcount)) lov_finish_set(set); RETURN(rc);}int lov_prep_match_set(struct obd_export *exp, struct obd_info *oinfo, struct lov_stripe_md *lsm, ldlm_policy_data_t *policy, __u32 mode, struct lustre_handle *lockh, struct lov_request_set **reqset){ struct lov_obd *lov = &exp->exp_obd->u.lov; struct lov_request_set *set; int i, rc = 0; struct lov_oinfo *loi; ENTRY; OBD_ALLOC(set, sizeof(*set)); if (set == NULL) RETURN(-ENOMEM); lov_init_set(set); set->set_exp = exp; set->set_oi = oinfo; set->set_oi->oi_md = lsm; set->set_lockh = lov_llh_new(lsm); if (set->set_lockh == NULL)
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?