osc_create.c

来自「lustre 1.6.5 source code」· C语言 代码 · 共 443 行 · 第 1/2 页

C
443
字号
                RETURN(0);        if (!wait)                RETURN(0);        /* no rc check -- a no-INTR, no-TIMEOUT wait can't fail */        l_wait_event(oscc->oscc_waitq, oscc_wait_for_objects(oscc, 1), &lwi);        if (!oscc_has_objects(oscc, 1) && (oscc->oscc_flags & OSCC_FLAG_NOSPC))                rc = -ENOSPC;        if (oscc->oscc_obd->u.cli.cl_import->imp_invalid)                rc = -EIO;        RETURN(rc);}int oscc_recovering(struct osc_creator *oscc){        int recov = 0;        spin_lock(&oscc->oscc_lock);        recov = oscc->oscc_flags & OSCC_FLAG_RECOVERING;        spin_unlock(&oscc->oscc_lock);        return recov;}/* decide if the OST has remaining object, return value :        0 : the OST has remaining object, and don't need to do precreate.        1 : the OST has no remaining object, and will send a RPC for precreate.        2 : the OST has no remaining object, and will not get any for            a potentially very long time     1000 : unusable */int osc_precreate(struct obd_export *exp){        struct osc_creator *oscc = &exp->exp_obd->u.cli.cl_oscc;        struct obd_import *imp = exp->exp_imp_reverse;        ENTRY;        LASSERT(oscc != NULL);        if (imp != NULL && imp->imp_deactive)                RETURN(1000);        if (oscc->oscc_last_id < oscc->oscc_next_id) {                spin_lock(&oscc->oscc_lock);                if (oscc->oscc_flags & OSCC_FLAG_NOSPC) {                        spin_unlock(&oscc->oscc_lock);                        RETURN(1000);                }                if (oscc->oscc_flags & OSCC_FLAG_SYNC_IN_PROGRESS) {                        spin_unlock(&oscc->oscc_lock);                        RETURN(1);                }                if (oscc->oscc_flags & OSCC_FLAG_RECOVERING) {                        spin_unlock(&oscc->oscc_lock);                        RETURN(2);                }                if (oscc->oscc_flags & OSCC_FLAG_CREATING) {                        spin_unlock(&oscc->oscc_lock);                        RETURN(1);                }                oscc_internal_create(oscc);                RETURN(1);        }        RETURN(0);}int osc_create(struct obd_export *exp, struct obdo *oa,               struct lov_stripe_md **ea, struct obd_trans_info *oti){        struct lov_stripe_md *lsm;        struct osc_creator *oscc = &exp->exp_obd->u.cli.cl_oscc;        int try_again = 1, rc = 0;        ENTRY;        LASSERT(oa);        LASSERT(ea);        if ((oa->o_valid & OBD_MD_FLGROUP) && (oa->o_gr != 0))                RETURN(osc_real_create(exp, oa, ea, oti));        if ((oa->o_valid & OBD_MD_FLFLAGS) &&            oa->o_flags == OBD_FL_RECREATE_OBJS) {                RETURN(osc_real_create(exp, oa, ea, oti));        }        /* this is the special case where create removes orphans */        if ((oa->o_valid & OBD_MD_FLFLAGS) &&            oa->o_flags == OBD_FL_DELORPHAN) {                spin_lock(&oscc->oscc_lock);                if (oscc->oscc_flags & OSCC_FLAG_SYNC_IN_PROGRESS) {                        spin_unlock(&oscc->oscc_lock);                        RETURN(-EBUSY);                }                if (!(oscc->oscc_flags & OSCC_FLAG_RECOVERING)) {                        spin_unlock(&oscc->oscc_lock);                        RETURN(0);                }                oscc->oscc_flags |= OSCC_FLAG_SYNC_IN_PROGRESS;                spin_unlock(&oscc->oscc_lock);                CDEBUG(D_HA, "%s: oscc recovery started - delete to "LPU64"\n",                       oscc->oscc_obd->obd_name, oscc->oscc_next_id - 1);                /* delete from next_id on up */                oa->o_valid |= OBD_MD_FLID;                oa->o_id = oscc->oscc_next_id - 1;                rc = osc_real_create(exp, oa, ea, NULL);                spin_lock(&oscc->oscc_lock);                oscc->oscc_flags &= ~OSCC_FLAG_SYNC_IN_PROGRESS;                if (rc == 0 || rc == -ENOSPC) {                        if (rc == -ENOSPC)                                oscc->oscc_flags |= OSCC_FLAG_NOSPC;                        oscc->oscc_flags &= ~OSCC_FLAG_RECOVERING;                        oscc->oscc_last_id = oa->o_id;                        CDEBUG(D_HA, "%s: oscc recovery finished, last_id: "                               LPU64", rc: %d\n", oscc->oscc_obd->obd_name,                               oscc->oscc_last_id, rc);                        cfs_waitq_signal(&oscc->oscc_waitq);                } else {                        CDEBUG(D_ERROR, "%s: oscc recovery failed: %d\n",                               oscc->oscc_obd->obd_name, rc);                }                spin_unlock(&oscc->oscc_lock);                RETURN(rc);        }        lsm = *ea;        if (lsm == NULL) {                rc = obd_alloc_memmd(exp, &lsm);                if (rc < 0)                        RETURN(rc);        }        while (try_again) {                /* If orphans are being recovered, then we must wait until                   it is finished before we can continue with create. */                if (oscc_recovering(oscc)) {                        struct l_wait_info lwi;                        CDEBUG(D_HA,"%s: oscc recovery in progress, waiting\n",                               oscc->oscc_obd->obd_name);                        lwi = LWI_TIMEOUT(cfs_timeout_cap(cfs_time_seconds(                                obd_timeout / 4)), NULL, NULL);                        rc = l_wait_event(oscc->oscc_waitq,                                          !oscc_recovering(oscc), &lwi);                        LASSERT(rc == 0 || rc == -ETIMEDOUT);                        if (rc == -ETIMEDOUT) {                                CDEBUG(D_HA,"%s: timeout waiting on recovery\n",                                       oscc->oscc_obd->obd_name);                                RETURN(rc);                        }                        CDEBUG(D_HA, "%s: oscc recovery over, waking up\n",                               oscc->oscc_obd->obd_name);                }                spin_lock(&oscc->oscc_lock);                if (oscc->oscc_flags & OSCC_FLAG_EXITING) {                        spin_unlock(&oscc->oscc_lock);                        break;                }                if (oscc->oscc_last_id >= oscc->oscc_next_id) {                        memcpy(oa, &oscc->oscc_oa, sizeof(*oa));                        oa->o_id = oscc->oscc_next_id;                        lsm->lsm_object_id = oscc->oscc_next_id;                        *ea = lsm;                        oscc->oscc_next_id++;                        try_again = 0;                        CDEBUG(D_RPCTRACE, "%s: set oscc_next_id = "LPU64"\n",                               exp->exp_obd->obd_name, oscc->oscc_next_id);                } else if (oscc->oscc_flags & OSCC_FLAG_NOSPC) {                        rc = -ENOSPC;                        spin_unlock(&oscc->oscc_lock);                        break;                }                spin_unlock(&oscc->oscc_lock);                rc = oscc_precreate(oscc, try_again);                if (rc)                        break;        }        if (rc == 0)                CDEBUG(D_INFO, "%s: returning objid "LPU64"\n",                       obd2cli_tgt(oscc->oscc_obd), lsm->lsm_object_id);        else if (*ea == NULL)                obd_free_memmd(exp, &lsm);        RETURN(rc);}void oscc_init(struct obd_device *obd){        struct osc_creator *oscc;        if (obd == NULL)                return;        oscc = &obd->u.cli.cl_oscc;        memset(oscc, 0, sizeof(*oscc));        CFS_INIT_LIST_HEAD(&oscc->oscc_list);        cfs_waitq_init(&oscc->oscc_waitq);        spin_lock_init(&oscc->oscc_lock);        oscc->oscc_obd = obd;        oscc->oscc_grow_count = OST_MIN_PRECREATE;        oscc->oscc_next_id = 2;        oscc->oscc_last_id = 1;        oscc->oscc_flags |= OSCC_FLAG_RECOVERING;        /* XXX the export handle should give the oscc the last object */        /* oed->oed_oscc.oscc_last_id = exph->....; */}

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?