osc_create.c

来自「lustre 1.6.5 source code」· C语言 代码 · 共 443 行 · 第 1/2 页

C
443
字号
/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * *  Copyright (C) 2001-2003 Cluster File Systems, Inc. *   Author Peter Braam <braam@clusterfs.com> * *   This file is part of the Lustre file system, http://www.lustre.org *   Lustre is a trademark of Cluster File Systems, Inc. * *   You may have signed or agreed to another license before downloading *   this software.  If so, you are bound by the terms and conditions *   of that agreement, and the following does not apply to you.  See the *   LICENSE file included with this distribution for more information. * *   If you did not agree to a different license, then this copy of Lustre *   is open source software; you can redistribute it and/or modify it *   under the terms of version 2 of the GNU General Public License as *   published by the Free Software Foundation. * *   In either case, Lustre is distributed in the hope that it will be *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the *   license text for more details. * *  For testing and management it is treated as an obd_device, *  although * it does not export a full OBD method table (the *  requests are coming * in over the wire, so object target modules *  do not have a full * method table.) * */#ifndef EXPORT_SYMTAB# define EXPORT_SYMTAB#endif#define DEBUG_SUBSYSTEM S_OSC#ifdef __KERNEL__# include <libcfs/libcfs.h>#else /* __KERNEL__ */# include <liblustre.h>#endif#ifdef  __CYGWIN__# include <ctype.h>#endif# include <lustre_dlm.h>#include <obd_class.h>#include "osc_internal.h"static int osc_interpret_create(struct ptlrpc_request *req, void *data, int rc){        struct osc_creator *oscc;        struct ost_body *body = NULL;        ENTRY;        if (req->rq_repmsg) {                body = lustre_swab_repbuf(req, REPLY_REC_OFF, sizeof(*body),                                          lustre_swab_ost_body);                if (body == NULL && rc == 0)                        rc = -EPROTO;        }        oscc = req->rq_async_args.pointer_arg[0];        LASSERT(oscc && (oscc->oscc_obd != LP_POISON));                spin_lock(&oscc->oscc_lock);        oscc->oscc_flags &= ~OSCC_FLAG_CREATING;        switch (rc) {        case 0: {                if (body) {                        int diff = body->oa.o_id - oscc->oscc_last_id;                        if (diff < oscc->oscc_grow_count)                                oscc->oscc_grow_count =                                        max(diff/3, OST_MIN_PRECREATE);                        else                                oscc->oscc_flags &= ~OSCC_FLAG_LOW;                        oscc->oscc_last_id = body->oa.o_id;                }                spin_unlock(&oscc->oscc_lock);                break;        }        case -EAGAIN:                /* valid race delorphan vs create, or somthing after resend */                spin_unlock(&oscc->oscc_lock);                DEBUG_REQ(D_INODE, req, "Got EGAIN - resend \n");                break;        case -ENOSPC:        case -EROFS: {                oscc->oscc_flags |= OSCC_FLAG_NOSPC;                if (body && rc == -ENOSPC) {                        oscc->oscc_grow_count = OST_MIN_PRECREATE;                        oscc->oscc_last_id = body->oa.o_id;                }                spin_unlock(&oscc->oscc_lock);                DEBUG_REQ(D_INODE, req, "OST out of space, flagging");                break;        }        case -EIO: {                /* filter always set body->oa.o_id as the last_id                  * of filter (see filter_handle_precreate for detail)*/                if (body && body->oa.o_id > oscc->oscc_last_id)                        oscc->oscc_last_id = body->oa.o_id;                spin_unlock(&oscc->oscc_lock);                break;        }        default: {                oscc->oscc_flags |= OSCC_FLAG_RECOVERING;                oscc->oscc_grow_count = OST_MIN_PRECREATE;                spin_unlock(&oscc->oscc_lock);                DEBUG_REQ(D_ERROR, req,                          "unknown rc %d from async create: failing oscc", rc);                ptlrpc_fail_import(req->rq_import,                                   lustre_msg_get_conn_cnt(req->rq_reqmsg));        }        }        CDEBUG(D_RPCTRACE, "prealloc through id "LPU64", next to use "LPU64"\n",               oscc->oscc_last_id, oscc->oscc_next_id);        cfs_waitq_signal(&oscc->oscc_waitq);        RETURN(rc);}static int oscc_internal_create(struct osc_creator *oscc){        struct ptlrpc_request *request;        struct ost_body *body;        int size[] = { sizeof(struct ptlrpc_body), sizeof(*body) };        ENTRY;        LASSERT_SPIN_LOCKED(&oscc->oscc_lock);        if (oscc->oscc_grow_count < OST_MAX_PRECREATE &&            !(oscc->oscc_flags & (OSCC_FLAG_LOW | OSCC_FLAG_RECOVERING)) &&            (__s64)(oscc->oscc_last_id - oscc->oscc_next_id) <=                   (oscc->oscc_grow_count / 4 + 1)) {                oscc->oscc_flags |= OSCC_FLAG_LOW;                oscc->oscc_grow_count *= 2;        }        if (oscc->oscc_grow_count > OST_MAX_PRECREATE / 2)                oscc->oscc_grow_count = OST_MAX_PRECREATE / 2;        if (oscc->oscc_flags & OSCC_FLAG_CREATING ||            oscc->oscc_flags & OSCC_FLAG_RECOVERING) {                spin_unlock(&oscc->oscc_lock);                RETURN(0);        }        oscc->oscc_flags |= OSCC_FLAG_CREATING;        spin_unlock(&oscc->oscc_lock);        request = ptlrpc_prep_req(oscc->oscc_obd->u.cli.cl_import,                                  LUSTRE_OST_VERSION, OST_CREATE, 2,                                  size, NULL);        if (request == NULL) {                spin_lock(&oscc->oscc_lock);                oscc->oscc_flags &= ~OSCC_FLAG_CREATING;                spin_unlock(&oscc->oscc_lock);                RETURN(-ENOMEM);        }        request->rq_request_portal = OST_CREATE_PORTAL;        ptlrpc_at_set_req_timeout(request);        body = lustre_msg_buf(request->rq_reqmsg, REQ_REC_OFF, sizeof(*body));        spin_lock(&oscc->oscc_lock);        body->oa.o_id = oscc->oscc_last_id + oscc->oscc_grow_count;        body->oa.o_valid |= OBD_MD_FLID;        spin_unlock(&oscc->oscc_lock);        CDEBUG(D_RPCTRACE, "prealloc through id "LPU64" (last seen "LPU64")\n",               body->oa.o_id, oscc->oscc_last_id);        ptlrpc_req_set_repsize(request, 2, size);        request->rq_async_args.pointer_arg[0] = oscc;        request->rq_interpret_reply = osc_interpret_create;        ptlrpcd_add_req(request);        RETURN(0);}static int oscc_has_objects(struct osc_creator *oscc, int count){        int have_objs;        spin_lock(&oscc->oscc_lock);        have_objs = ((__s64)(oscc->oscc_last_id - oscc->oscc_next_id) >= count);        if (!have_objs) {                oscc_internal_create(oscc);        } else {                spin_unlock(&oscc->oscc_lock);        }        return have_objs;}static int oscc_wait_for_objects(struct osc_creator *oscc, int count){        int have_objs;        int ost_full;        int osc_invalid;        have_objs = oscc_has_objects(oscc, count);        spin_lock(&oscc->oscc_lock);        ost_full = (oscc->oscc_flags & OSCC_FLAG_NOSPC);        spin_unlock(&oscc->oscc_lock);        osc_invalid = oscc->oscc_obd->u.cli.cl_import->imp_invalid;        return have_objs || ost_full || osc_invalid;}static int oscc_precreate(struct osc_creator *oscc, int wait){        struct l_wait_info lwi = { 0 };        int rc = 0;        ENTRY;        if (oscc_has_objects(oscc, oscc->oscc_grow_count / 2))

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?