echo_client.c

来自「lustre 1.6.5 source code」· C语言 代码 · 共 1,511 行 · 第 1/4 页

C
1,511
字号
};static int echo_client_async_page(struct obd_export *exp, int rw,                                   struct obdo *oa, struct lov_stripe_md *lsm,                                   obd_off offset, obd_size count,                                   obd_size batching){        obd_count npages, i;        struct echo_async_page *eap;        struct echo_async_state eas;        int rc = 0;        struct echo_async_page **aps = NULL;        ENTRY;#if 0        int                     verify;        int                     gfp_mask;        verify = ((oa->o_id) != ECHO_PERSISTENT_OBJID &&                  (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&                  (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0);        gfp_mask = ((oa->o_id & 2) == 0) ? GFP_KERNEL : GFP_HIGHUSER;#endif        LASSERT(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ);        if (count <= 0 ||            (count & (~CFS_PAGE_MASK)) != 0 ||            (lsm != NULL &&             lsm->lsm_object_id != oa->o_id))                return (-EINVAL);        /* XXX think again with misaligned I/O */        npages = batching >> CFS_PAGE_SHIFT;        memcpy(&eas.eas_oa, oa, sizeof(*oa));        eas.eas_next_offset = offset;        eas.eas_end_offset = offset + count;        spin_lock_init(&eas.eas_lock);        cfs_waitq_init(&eas.eas_waitq);        eas.eas_in_flight = 0;        eas.eas_rc = 0;        eas.eas_lsm = lsm;        CFS_INIT_LIST_HEAD(&eas.eas_avail);        OBD_ALLOC(aps, npages * sizeof aps[0]);        if (aps == NULL)                return (-ENOMEM);        /* prepare the group of pages that we're going to be keeping         * in flight */        for (i = 0; i < npages; i++) {                cfs_page_t *page;                OBD_PAGE_ALLOC(page, CFS_ALLOC_STD);                if (page == NULL)                        GOTO(out, rc = -ENOMEM);                OBD_ALLOC(eap, sizeof(*eap));                if (eap == NULL) {                        OBD_PAGE_FREE(page);                        GOTO(out, rc = -ENOMEM);                }                eap->eap_magic = EAP_MAGIC;                eap->eap_page = page;                eap->eap_eas = &eas;                list_add_tail(&eap->eap_item, &eas.eas_avail);                aps[i] = eap;        }        /* first we spin queueing io and being woken by its completion */        spin_lock(&eas.eas_lock);        for(;;) {                int rc;                /* sleep until we have a page to send */                spin_unlock(&eas.eas_lock);                rc = wait_event_interruptible(eas.eas_waitq,                                               eas_should_wake(&eas));                spin_lock(&eas.eas_lock);                if (rc && !eas.eas_rc)                        eas.eas_rc = rc;                if (eas.eas_rc)                        break;                if (list_empty(&eas.eas_avail))                        continue;                eap = list_entry(eas.eas_avail.next, struct echo_async_page,                                 eap_item);                list_del(&eap->eap_item);                spin_unlock(&eas.eas_lock);                /* unbind the eap from its old page offset */                if (eap->eap_cookie != NULL) {                        obd_teardown_async_page(exp, lsm, NULL,                                                 eap->eap_cookie);                        eap->eap_cookie = NULL;                }                eas.eas_next_offset += CFS_PAGE_SIZE;                eap->eap_off = eas.eas_next_offset;                rc = obd_prep_async_page(exp, lsm, NULL, eap->eap_page,                                         eap->eap_off, &ec_async_page_ops,                                         eap, &eap->eap_cookie, 1, NULL);                if (rc) {                        spin_lock(&eas.eas_lock);                        eas.eas_rc = rc;                        break;                }                if (oa->o_id != ECHO_PERSISTENT_OBJID &&                    (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&                    (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0)                        echo_client_page_debug_setup(lsm, eap->eap_page, rw,                                                      oa->o_id,                                                      eap->eap_off, CFS_PAGE_SIZE);                /* always asserts urgent, which isn't quite right */                rc = obd_queue_async_io(exp, lsm, NULL, eap->eap_cookie,                                        rw, 0, CFS_PAGE_SIZE, 0,                                        ASYNC_READY | ASYNC_URGENT |                                        ASYNC_COUNT_STABLE);                spin_lock(&eas.eas_lock);                if (rc && !eas.eas_rc) {                        eas.eas_rc = rc;                        break;                }                eas.eas_in_flight++;                if (eas.eas_next_offset == eas.eas_end_offset)                        break;        }         /* still hold the eas_lock here.. */        /* now we just spin waiting for all the rpcs to complete */        while(eas.eas_in_flight) {                spin_unlock(&eas.eas_lock);                wait_event_interruptible(eas.eas_waitq,                                          eas.eas_in_flight == 0);                spin_lock(&eas.eas_lock);        }        spin_unlock(&eas.eas_lock);out:        if (aps != NULL) {                for (i = 0; i < npages; ++ i) {                        cfs_page_t *page;                        eap = aps[i];                        page = eap->eap_page;                        if (eap->eap_cookie != NULL)                                obd_teardown_async_page(exp, lsm, NULL,                                                        eap->eap_cookie);                        OBD_FREE(eap, sizeof(*eap));                        OBD_PAGE_FREE(page);                }                OBD_FREE(aps, npages * sizeof aps[0]);        }        RETURN(rc);}static int echo_client_prep_commit(struct obd_export *exp, int rw,                                   struct obdo *oa, struct lov_stripe_md *lsm,                                   obd_off offset, obd_size count,                                     obd_size batch, struct obd_trans_info *oti){        struct obd_ioobj ioo;        struct niobuf_local *lnb;        struct niobuf_remote *rnb;        obd_off off;        obd_size npages, tot_pages;        int i, ret = 0;        ENTRY;        if (count <= 0 || (count & (~CFS_PAGE_MASK)) != 0 ||            (lsm != NULL && lsm->lsm_object_id != oa->o_id))                RETURN(-EINVAL);        npages = batch >> CFS_PAGE_SHIFT;        tot_pages = count >> CFS_PAGE_SHIFT;        OBD_ALLOC(lnb, npages * sizeof(struct niobuf_local));        OBD_ALLOC(rnb, npages * sizeof(struct niobuf_remote));        if (lnb == NULL || rnb == NULL)                GOTO(out, ret = -ENOMEM);        obdo_to_ioobj(oa, &ioo);        off = offset;        for(; tot_pages; tot_pages -= npages) {                if (tot_pages < npages)                        npages = tot_pages;                for (i = 0; i < npages; i++, off += CFS_PAGE_SIZE) {                        rnb[i].offset = off;                        rnb[i].len = CFS_PAGE_SIZE;                }                ioo.ioo_bufcnt = npages;                oti->oti_transno = 0;                ret = obd_preprw(rw, exp, oa, 1, &ioo, npages, rnb, lnb, oti);                if (ret != 0)                        GOTO(out, ret);                for (i = 0; i < npages; i++) {                        cfs_page_t *page = lnb[i].page;                        /* read past eof? */                        if (page == NULL && lnb[i].rc == 0)                                continue;                        if (oa->o_id == ECHO_PERSISTENT_OBJID ||                            (oa->o_valid & OBD_MD_FLFLAGS) == 0 ||                            (oa->o_flags & OBD_FL_DEBUG_CHECK) == 0)                                continue;                        if (rw == OBD_BRW_WRITE)                                echo_client_page_debug_setup(lsm, page, rw,                                                             oa->o_id,                                                             rnb[i].offset,                                                             rnb[i].len);                        else                                echo_client_page_debug_check(lsm, page,                                                             oa->o_id,                                                             rnb[i].offset,                                                             rnb[i].len);                }                ret = obd_commitrw(rw, exp, oa, 1, &ioo, npages, lnb, oti, ret);                if (ret != 0)                        GOTO(out, ret);        }out:        if (lnb)                OBD_FREE(lnb, npages * sizeof(struct niobuf_local));        if (rnb)                OBD_FREE(rnb, npages * sizeof(struct niobuf_remote));        RETURN(ret);}int echo_client_brw_ioctl(int rw, struct obd_export *exp,                          struct obd_ioctl_data *data){        struct obd_device *obd = class_exp2obd(exp);        struct echo_client_obd *ec = &obd->u.echo_client;        struct obd_trans_info dummy_oti = { .oti_thread_id = -1 };        struct ec_object *eco;        int rc;        ENTRY;        rc = echo_get_object(&eco, obd, &data->ioc_obdo1);        if (rc)                RETURN(rc);        data->ioc_obdo1.o_valid &= ~OBD_MD_FLHANDLE;        data->ioc_obdo1.o_valid |= OBD_MD_FLGROUP;        data->ioc_obdo1.o_gr = FILTER_GROUP_ECHO;        switch((long)data->ioc_pbuf1) {        case 1:                if (data->ioc_pbuf2 == NULL) { // NULL user data pointer                        rc = echo_client_kbrw(obd, rw, &data->ioc_obdo1,                                              eco->eco_lsm, data->ioc_offset,                                              data->ioc_count, &dummy_oti);                } else {#ifdef __KERNEL__                        rc = echo_client_ubrw(obd, rw, &data->ioc_obdo1,                                              eco->eco_lsm, data->ioc_offset,                                              data->ioc_count, data->ioc_pbuf2,                                              &dummy_oti);#endif                }                break;        case 2:                rc = echo_client_async_page(ec->ec_exp, rw, &data->ioc_obdo1,                                           eco->eco_lsm, data->ioc_offset,                                           data->ioc_count, data->ioc_plen1);                break;        case 3:                rc = echo_client_prep_commit(ec->ec_exp, rw, &data->ioc_obdo1,                                            eco->eco_lsm, data->ioc_offset,                                            data->ioc_count, data->ioc_plen1,                                            &dummy_oti);                break;        default:                rc = -EINVAL;        }        echo_put_object(eco);        RETURN(rc);}static intecho_ldlm_callback (struct ldlm_lock *lock, struct ldlm_lock_desc *new,                    void *data, int flag){        struct ec_object       *eco = (struct ec_object *)data;        struct echo_client_obd *ec = &(eco->eco_device->u.echo_client);        struct lustre_handle    lockh;        struct list_head       *el;        int                     found = 0;        int                     rc;        ldlm_lock2handle (lock, &lockh);        /* #ifdef this out if we're not feeling paranoid */        spin_lock (&ec->ec_lock);        list_for_each (el, &ec->ec_objects) {                found = (eco == list_entry(el, struct ec_object,                                           eco_obj_chain));                if (found)                        break;        }        spin_unlock (&ec->ec_lock);        LASSERT (found);        switch (flag) {        case LDLM_CB_BLOCKING:                CDEBUG(D_INFO, "blocking callback on "LPX64", handle "LPX64"\n",                       eco->eco_id, lockh.cookie);                rc = ldlm_cli_cancel (&lockh);                if (rc != ELDLM_OK)                        CERROR ("ldlm_cli_cancel failed: %d\n", rc);                break;        case LDLM_CB_CANCELING:                CDEBUG(D_INFO, "cancel callback on "LPX64", handle "LPX64"\n",                       eco->eco_id, lockh.cookie);                break;        default:                LBUG ();        }        return (0);}static intecho_client_enqueue(struct obd_export *exp, struct obdo *oa,                    int mode, obd_off offset, obd_size nob){        struct obd_device      *obd = exp->exp_obd;        struct echo_client_obd *ec = &obd->u.echo_client;        struct lustre_handle   *ulh = &oa->o_handle;        struct ldlm_enqueue_info einfo = { 0 };        struct obd_info oinfo = { { { 0 } } };        struct ec_object       *eco;        struct ec_lock         *ecl;        int                     rc;        if (!(mode == LCK_PR || mode == LCK_PW))                return -EINVAL;        if ((offset & (~CFS_PAGE_MASK)) != 0 ||            (nob & (~CFS_PAGE_MASK)) != 0)                return -EINVAL;        rc = echo_get_object (&eco, obd, oa);        if (rc != 0)                return rc;        rc = -ENOMEM;        OBD_ALLOC (ecl, sizeof (*ecl));        if (ecl == NULL)                goto failed_0;        ecl->ecl_mode = mode;        ecl->ecl_object = eco;        ecl->ecl_policy.l_extent.start = offset;        ecl->ecl_policy.l_extent.end =                (nob == 0) ? ((obd_off) -1) : (offset + nob - 1);        einfo.ei_type = LDLM_EXTENT;

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?