echo_client.c

来自「lustre 1.6.5 source code」· C语言 代码 · 共 1,511 行 · 第 1/4 页

C
1,511
字号
        CDEBUG(D_INFO, "put %p: "LPX64"=%u#%u@%d refs %d del %d\n",               eco, eco->eco_id,               eco->eco_lsm->lsm_stripe_size,               eco->eco_lsm->lsm_stripe_count,               eco->eco_lsm->lsm_oinfo[0]->loi_ost_idx,               eco->eco_refcount, eco->eco_deleted);        if (eco->eco_refcount != 0 || !eco->eco_deleted) {                spin_unlock (&ec->ec_lock);                return;        }        spin_unlock (&ec->ec_lock);        /* NB leave obj in the object list.  We must prevent anyone from         * attempting to enqueue on this object number until we can be         * sure there will be no more lock callbacks.         */        obd_cancel_unused(ec->ec_exp, eco->eco_lsm, 0, NULL);        /* now we can let it go */        spin_lock (&ec->ec_lock);        list_del (&eco->eco_obj_chain);        spin_unlock (&ec->ec_lock);        LASSERT (eco->eco_refcount == 0);        echo_free_object (eco);}static voidecho_get_stripe_off_id (struct lov_stripe_md *lsm, obd_off *offp, obd_id *idp){        unsigned long stripe_count;        unsigned long stripe_size;        unsigned long width;        unsigned long woffset;        int           stripe_index;        obd_off       offset;        if (lsm->lsm_stripe_count <= 1)                return;        offset       = *offp;        stripe_size  = lsm->lsm_stripe_size;        stripe_count = lsm->lsm_stripe_count;        /* width = # bytes in all stripes */        width = stripe_size * stripe_count;        /* woffset = offset within a width; offset = whole number of widths */        woffset = do_div (offset, width);        stripe_index = woffset / stripe_size;        *idp = lsm->lsm_oinfo[stripe_index]->loi_id;        *offp = offset * stripe_size + woffset % stripe_size;}static void echo_client_page_debug_setup(struct lov_stripe_md *lsm,                              cfs_page_t *page, int rw, obd_id id,                              obd_off offset, obd_off count){        char    *addr;        obd_off  stripe_off;        obd_id   stripe_id;        int      delta;        /* no partial pages on the client */        LASSERT(count == CFS_PAGE_SIZE);        addr = cfs_kmap(page);        for (delta = 0; delta < CFS_PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {                if (rw == OBD_BRW_WRITE) {                        stripe_off = offset + delta;                        stripe_id = id;                        echo_get_stripe_off_id(lsm, &stripe_off, &stripe_id);                } else {                        stripe_off = 0xdeadbeef00c0ffeeULL;                        stripe_id = 0xdeadbeef00c0ffeeULL;                }                block_debug_setup(addr + delta, OBD_ECHO_BLOCK_SIZE,                                   stripe_off, stripe_id);        }        cfs_kunmap(page);}static intecho_client_page_debug_check(struct lov_stripe_md *lsm,                              cfs_page_t *page, obd_id id,                              obd_off offset, obd_off count){        obd_off stripe_off;        obd_id  stripe_id;        char   *addr;        int     delta;        int     rc;        int     rc2;        /* no partial pages on the client */        LASSERT(count == CFS_PAGE_SIZE);        addr = cfs_kmap(page);        for (rc = delta = 0; delta < CFS_PAGE_SIZE; delta += OBD_ECHO_BLOCK_SIZE) {                stripe_off = offset + delta;                stripe_id = id;                echo_get_stripe_off_id (lsm, &stripe_off, &stripe_id);                rc2 = block_debug_check("test_brw",                                         addr + delta, OBD_ECHO_BLOCK_SIZE,                                         stripe_off, stripe_id);                if (rc2 != 0) {                        CERROR ("Error in echo object "LPX64"\n", id);                        rc = rc2;                }        }        cfs_kunmap(page);        return rc;}static int echo_client_kbrw(struct obd_device *obd, int rw, struct obdo *oa,                            struct lov_stripe_md *lsm, obd_off offset,                            obd_size count, struct obd_trans_info *oti){        struct echo_client_obd *ec = &obd->u.echo_client;        struct obd_info         oinfo = { { { 0 } } };        obd_count               npages;        struct brw_page        *pga;        struct brw_page        *pgp;        obd_off                 off;        int                     i;        int                     rc;        int                     verify;        int                     gfp_mask;        verify = ((oa->o_id) != ECHO_PERSISTENT_OBJID &&                  (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&                  (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0);        gfp_mask = ((oa->o_id & 2) == 0) ? CFS_ALLOC_STD : CFS_ALLOC_HIGHUSER;        LASSERT(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ);        LASSERT(lsm != NULL);        LASSERT(lsm->lsm_object_id == oa->o_id);        if (count <= 0 ||            (count & (~CFS_PAGE_MASK)) != 0)                return (-EINVAL);        /* XXX think again with misaligned I/O */        npages = count >> CFS_PAGE_SHIFT;        OBD_ALLOC(pga, npages * sizeof(*pga));        if (pga == NULL)                return (-ENOMEM);        for (i = 0, pgp = pga, off = offset;             i < npages;             i++, pgp++, off += CFS_PAGE_SIZE) {                LASSERT (pgp->pg == NULL);      /* for cleanup */                rc = -ENOMEM;                OBD_PAGE_ALLOC(pgp->pg, gfp_mask);                if (pgp->pg == NULL)                        goto out;                pgp->count = CFS_PAGE_SIZE;                pgp->off = off;                pgp->flag = 0;                if (verify)                        echo_client_page_debug_setup(lsm, pgp->pg, rw,                                                     oa->o_id, off, pgp->count);        }        oinfo.oi_oa = oa;        oinfo.oi_md = lsm;        rc = obd_brw(rw, ec->ec_exp, &oinfo, npages, pga, oti); out:        if (rc != 0 || rw != OBD_BRW_READ)                verify = 0;        for (i = 0, pgp = pga; i < npages; i++, pgp++) {                if (pgp->pg == NULL)                        continue;                if (verify) {                        int vrc;                        vrc = echo_client_page_debug_check(lsm, pgp->pg, oa->o_id,                                                           pgp->off, pgp->count);                        if (vrc != 0 && rc == 0)                                rc = vrc;                }                OBD_PAGE_FREE(pgp->pg);        }        OBD_FREE(pga, npages * sizeof(*pga));        return (rc);}#ifdef __KERNEL__#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))#include <linux/iobuf.h>static int echo_client_ubrw(struct obd_device *obd, int rw,                            struct obdo *oa, struct lov_stripe_md *lsm,                            obd_off offset, obd_size count, char *buffer,                            struct obd_trans_info *oti){        struct echo_client_obd *ec = &obd->u.echo_client;        struct obd_info         oinfo = { { { 0 } } };        obd_count               npages;        struct brw_page        *pga;        struct brw_page        *pgp;        obd_off                 off;        struct kiobuf          *kiobuf;        int                     i;        int                     rc;        LASSERT (rw == OBD_BRW_WRITE || rw == OBD_BRW_READ);        /* NB: for now, only whole pages, page aligned */        if (count <= 0 ||            ((long)buffer & (~CFS_PAGE_MASK)) != 0 ||            (count & (~CFS_PAGE_MASK)) != 0 ||            (lsm != NULL && lsm->lsm_object_id != oa->o_id))                return (-EINVAL);        /* XXX think again with misaligned I/O */        npages = count >> CFS_PAGE_SHIFT;        OBD_ALLOC(pga, npages * sizeof(*pga));        if (pga == NULL)                return (-ENOMEM);        rc = alloc_kiovec (1, &kiobuf);        if (rc != 0)                goto out_1;        rc = map_user_kiobuf ((rw == OBD_BRW_READ) ? READ : WRITE,                              kiobuf, (unsigned long)buffer, count);        if (rc != 0)                goto out_2;        LASSERT (kiobuf->offset == 0);        LASSERT (kiobuf->nr_pages == npages);        for (i = 0, off = offset, pgp = pga;             i < npages;             i++, off += CFS_PAGE_SIZE, pgp++) {                pgp->off = off;                pgp->pg = kiobuf->maplist[i];                pgp->count = CFS_PAGE_SIZE;                pgp->flag = 0;        }        oinfo.oi_oa = oa;        oinfo.oi_md = lsm;        rc = obd_brw(rw, ec->ec_exp, &oinfo, npages, pga, oti);        //        if (rw == OBD_BRW_READ)        //                mark_dirty_kiobuf (kiobuf, count);        unmap_kiobuf (kiobuf); out_2:        free_kiovec (1, &kiobuf); out_1:        OBD_FREE(pga, npages * sizeof(*pga));        return (rc);}#elsestatic int echo_client_ubrw(struct obd_device *obd, int rw,                            struct obdo *oa, struct lov_stripe_md *lsm,                            obd_off offset, obd_size count, char *buffer,                            struct obd_trans_info *oti){        /* echo_client_ubrw() needs to be ported on 2.6 yet */        LBUG();        return 0;}#endif#endifstruct echo_async_state;#define EAP_MAGIC 79277927struct echo_async_page {        int                     eap_magic;        cfs_page_t             *eap_page;        void                    *eap_cookie;        obd_off                 eap_off;        struct echo_async_state *eap_eas;        struct list_head        eap_item;};#define EAP_FROM_COOKIE(c)                                                      \        (LASSERT(((struct echo_async_page *)(c))->eap_magic == EAP_MAGIC),      \         (struct echo_async_page *)(c))struct echo_async_state {        spinlock_t              eas_lock;        obd_off                 eas_next_offset;        obd_off                 eas_end_offset;        int                     eas_in_flight;        int                     eas_rc;        cfs_waitq_t             eas_waitq;        struct list_head        eas_avail;        struct obdo             eas_oa;        struct lov_stripe_md    *eas_lsm;};static int eas_should_wake(struct echo_async_state *eas){        int rc = 0;        spin_lock(&eas->eas_lock);        if (eas->eas_rc == 0 && !list_empty(&eas->eas_avail))            rc = 1;        spin_unlock(&eas->eas_lock);        return rc;};static int ec_ap_make_ready(void *data, int cmd){        /* our pages are issued ready */        LBUG();        return 0;}static int ec_ap_refresh_count(void *data, int cmd){        /* our pages are issued with a stable count */        LBUG();        return CFS_PAGE_SIZE;}static void ec_ap_fill_obdo(void *data, int cmd, struct obdo *oa){        struct echo_async_page *eap = EAP_FROM_COOKIE(data);        memcpy(oa, &eap->eap_eas->eas_oa, sizeof(*oa));}static int ec_ap_completion(void *data, int cmd, struct obdo *oa, int rc){        struct echo_async_page *eap = EAP_FROM_COOKIE(data);        struct echo_async_state *eas;        eas = eap->eap_eas;        if (cmd == OBD_BRW_READ &&            eas->eas_oa.o_id != ECHO_PERSISTENT_OBJID &&            (eas->eas_oa.o_valid & OBD_MD_FLFLAGS) != 0 &&            (eas->eas_oa.o_flags & OBD_FL_DEBUG_CHECK) != 0)                echo_client_page_debug_check(eas->eas_lsm, eap->eap_page,                                              eas->eas_oa.o_id, eap->eap_off,                                             CFS_PAGE_SIZE);        spin_lock(&eas->eas_lock);        if (rc && !eas->eas_rc)                eas->eas_rc = rc;        eas->eas_in_flight--;        list_add(&eap->eap_item, &eas->eas_avail);        cfs_waitq_signal(&eas->eas_waitq);        spin_unlock(&eas->eas_lock);        return 0;}static struct obd_async_page_ops ec_async_page_ops = {        .ap_make_ready =        ec_ap_make_ready,        .ap_refresh_count =     ec_ap_refresh_count,        .ap_fill_obdo =         ec_ap_fill_obdo,        .ap_completion =        ec_ap_completion,

⌨️ 快捷键说明

复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?