echo_client.c
来自「lustre 1.6.5 source code」· C语言 代码 · 共 1,511 行 · 第 1/4 页
C
1,511 行
};static int echo_client_async_page(struct obd_export *exp, int rw, struct obdo *oa, struct lov_stripe_md *lsm, obd_off offset, obd_size count, obd_size batching){ obd_count npages, i; struct echo_async_page *eap; struct echo_async_state eas; int rc = 0; struct echo_async_page **aps = NULL; ENTRY;#if 0 int verify; int gfp_mask; verify = ((oa->o_id) != ECHO_PERSISTENT_OBJID && (oa->o_valid & OBD_MD_FLFLAGS) != 0 && (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0); gfp_mask = ((oa->o_id & 2) == 0) ? GFP_KERNEL : GFP_HIGHUSER;#endif LASSERT(rw == OBD_BRW_WRITE || rw == OBD_BRW_READ); if (count <= 0 || (count & (~CFS_PAGE_MASK)) != 0 || (lsm != NULL && lsm->lsm_object_id != oa->o_id)) return (-EINVAL); /* XXX think again with misaligned I/O */ npages = batching >> CFS_PAGE_SHIFT; memcpy(&eas.eas_oa, oa, sizeof(*oa)); eas.eas_next_offset = offset; eas.eas_end_offset = offset + count; spin_lock_init(&eas.eas_lock); cfs_waitq_init(&eas.eas_waitq); eas.eas_in_flight = 0; eas.eas_rc = 0; eas.eas_lsm = lsm; CFS_INIT_LIST_HEAD(&eas.eas_avail); OBD_ALLOC(aps, npages * sizeof aps[0]); if (aps == NULL) return (-ENOMEM); /* prepare the group of pages that we're going to be keeping * in flight */ for (i = 0; i < npages; i++) { cfs_page_t *page; OBD_PAGE_ALLOC(page, CFS_ALLOC_STD); if (page == NULL) GOTO(out, rc = -ENOMEM); OBD_ALLOC(eap, sizeof(*eap)); if (eap == NULL) { OBD_PAGE_FREE(page); GOTO(out, rc = -ENOMEM); } eap->eap_magic = EAP_MAGIC; eap->eap_page = page; eap->eap_eas = &eas; list_add_tail(&eap->eap_item, &eas.eas_avail); aps[i] = eap; } /* first we spin queueing io and being woken by its completion */ spin_lock(&eas.eas_lock); for(;;) { int rc; /* sleep until we have a page to send */ spin_unlock(&eas.eas_lock); rc = wait_event_interruptible(eas.eas_waitq, eas_should_wake(&eas)); spin_lock(&eas.eas_lock); if (rc && !eas.eas_rc) eas.eas_rc = rc; if (eas.eas_rc) break; if (list_empty(&eas.eas_avail)) continue; eap = list_entry(eas.eas_avail.next, struct echo_async_page, eap_item); list_del(&eap->eap_item); spin_unlock(&eas.eas_lock); /* unbind the eap from its old page offset */ if (eap->eap_cookie != NULL) { obd_teardown_async_page(exp, lsm, NULL, eap->eap_cookie); eap->eap_cookie = NULL; } eas.eas_next_offset += CFS_PAGE_SIZE; eap->eap_off = eas.eas_next_offset; rc = obd_prep_async_page(exp, lsm, NULL, eap->eap_page, eap->eap_off, &ec_async_page_ops, eap, &eap->eap_cookie, 1, NULL); if (rc) { spin_lock(&eas.eas_lock); eas.eas_rc = rc; break; } if (oa->o_id != ECHO_PERSISTENT_OBJID && (oa->o_valid & OBD_MD_FLFLAGS) != 0 && (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0) echo_client_page_debug_setup(lsm, eap->eap_page, rw, oa->o_id, eap->eap_off, CFS_PAGE_SIZE); /* always asserts urgent, which isn't quite right */ rc = obd_queue_async_io(exp, lsm, NULL, eap->eap_cookie, rw, 0, CFS_PAGE_SIZE, 0, ASYNC_READY | ASYNC_URGENT | ASYNC_COUNT_STABLE); spin_lock(&eas.eas_lock); if (rc && !eas.eas_rc) { eas.eas_rc = rc; break; } eas.eas_in_flight++; if (eas.eas_next_offset == eas.eas_end_offset) break; } /* still hold the eas_lock here.. */ /* now we just spin waiting for all the rpcs to complete */ while(eas.eas_in_flight) { spin_unlock(&eas.eas_lock); wait_event_interruptible(eas.eas_waitq, eas.eas_in_flight == 0); spin_lock(&eas.eas_lock); } spin_unlock(&eas.eas_lock);out: if (aps != NULL) { for (i = 0; i < npages; ++ i) { cfs_page_t *page; eap = aps[i]; page = eap->eap_page; if (eap->eap_cookie != NULL) obd_teardown_async_page(exp, lsm, NULL, eap->eap_cookie); OBD_FREE(eap, sizeof(*eap)); OBD_PAGE_FREE(page); } OBD_FREE(aps, npages * sizeof aps[0]); } RETURN(rc);}static int echo_client_prep_commit(struct obd_export *exp, int rw, struct obdo *oa, struct lov_stripe_md *lsm, obd_off offset, obd_size count, obd_size batch, struct obd_trans_info *oti){ struct obd_ioobj ioo; struct niobuf_local *lnb; struct niobuf_remote *rnb; obd_off off; obd_size npages, tot_pages; int i, ret = 0; ENTRY; if (count <= 0 || (count & (~CFS_PAGE_MASK)) != 0 || (lsm != NULL && lsm->lsm_object_id != oa->o_id)) RETURN(-EINVAL); npages = batch >> CFS_PAGE_SHIFT; tot_pages = count >> CFS_PAGE_SHIFT; OBD_ALLOC(lnb, npages * sizeof(struct niobuf_local)); OBD_ALLOC(rnb, npages * sizeof(struct niobuf_remote)); if (lnb == NULL || rnb == NULL) GOTO(out, ret = -ENOMEM); obdo_to_ioobj(oa, &ioo); off = offset; for(; tot_pages; tot_pages -= npages) { if (tot_pages < npages) npages = tot_pages; for (i = 0; i < npages; i++, off += CFS_PAGE_SIZE) { rnb[i].offset = off; rnb[i].len = CFS_PAGE_SIZE; } ioo.ioo_bufcnt = npages; oti->oti_transno = 0; ret = obd_preprw(rw, exp, oa, 1, &ioo, npages, rnb, lnb, oti); if (ret != 0) GOTO(out, ret); for (i = 0; i < npages; i++) { cfs_page_t *page = lnb[i].page; /* read past eof? */ if (page == NULL && lnb[i].rc == 0) continue; if (oa->o_id == ECHO_PERSISTENT_OBJID || (oa->o_valid & OBD_MD_FLFLAGS) == 0 || (oa->o_flags & OBD_FL_DEBUG_CHECK) == 0) continue; if (rw == OBD_BRW_WRITE) echo_client_page_debug_setup(lsm, page, rw, oa->o_id, rnb[i].offset, rnb[i].len); else echo_client_page_debug_check(lsm, page, oa->o_id, rnb[i].offset, rnb[i].len); } ret = obd_commitrw(rw, exp, oa, 1, &ioo, npages, lnb, oti, ret); if (ret != 0) GOTO(out, ret); }out: if (lnb) OBD_FREE(lnb, npages * sizeof(struct niobuf_local)); if (rnb) OBD_FREE(rnb, npages * sizeof(struct niobuf_remote)); RETURN(ret);}int echo_client_brw_ioctl(int rw, struct obd_export *exp, struct obd_ioctl_data *data){ struct obd_device *obd = class_exp2obd(exp); struct echo_client_obd *ec = &obd->u.echo_client; struct obd_trans_info dummy_oti = { .oti_thread_id = -1 }; struct ec_object *eco; int rc; ENTRY; rc = echo_get_object(&eco, obd, &data->ioc_obdo1); if (rc) RETURN(rc); data->ioc_obdo1.o_valid &= ~OBD_MD_FLHANDLE; data->ioc_obdo1.o_valid |= OBD_MD_FLGROUP; data->ioc_obdo1.o_gr = FILTER_GROUP_ECHO; switch((long)data->ioc_pbuf1) { case 1: if (data->ioc_pbuf2 == NULL) { // NULL user data pointer rc = echo_client_kbrw(obd, rw, &data->ioc_obdo1, eco->eco_lsm, data->ioc_offset, data->ioc_count, &dummy_oti); } else {#ifdef __KERNEL__ rc = echo_client_ubrw(obd, rw, &data->ioc_obdo1, eco->eco_lsm, data->ioc_offset, data->ioc_count, data->ioc_pbuf2, &dummy_oti);#endif } break; case 2: rc = echo_client_async_page(ec->ec_exp, rw, &data->ioc_obdo1, eco->eco_lsm, data->ioc_offset, data->ioc_count, data->ioc_plen1); break; case 3: rc = echo_client_prep_commit(ec->ec_exp, rw, &data->ioc_obdo1, eco->eco_lsm, data->ioc_offset, data->ioc_count, data->ioc_plen1, &dummy_oti); break; default: rc = -EINVAL; } echo_put_object(eco); RETURN(rc);}static intecho_ldlm_callback (struct ldlm_lock *lock, struct ldlm_lock_desc *new, void *data, int flag){ struct ec_object *eco = (struct ec_object *)data; struct echo_client_obd *ec = &(eco->eco_device->u.echo_client); struct lustre_handle lockh; struct list_head *el; int found = 0; int rc; ldlm_lock2handle (lock, &lockh); /* #ifdef this out if we're not feeling paranoid */ spin_lock (&ec->ec_lock); list_for_each (el, &ec->ec_objects) { found = (eco == list_entry(el, struct ec_object, eco_obj_chain)); if (found) break; } spin_unlock (&ec->ec_lock); LASSERT (found); switch (flag) { case LDLM_CB_BLOCKING: CDEBUG(D_INFO, "blocking callback on "LPX64", handle "LPX64"\n", eco->eco_id, lockh.cookie); rc = ldlm_cli_cancel (&lockh); if (rc != ELDLM_OK) CERROR ("ldlm_cli_cancel failed: %d\n", rc); break; case LDLM_CB_CANCELING: CDEBUG(D_INFO, "cancel callback on "LPX64", handle "LPX64"\n", eco->eco_id, lockh.cookie); break; default: LBUG (); } return (0);}static intecho_client_enqueue(struct obd_export *exp, struct obdo *oa, int mode, obd_off offset, obd_size nob){ struct obd_device *obd = exp->exp_obd; struct echo_client_obd *ec = &obd->u.echo_client; struct lustre_handle *ulh = &oa->o_handle; struct ldlm_enqueue_info einfo = { 0 }; struct obd_info oinfo = { { { 0 } } }; struct ec_object *eco; struct ec_lock *ecl; int rc; if (!(mode == LCK_PR || mode == LCK_PW)) return -EINVAL; if ((offset & (~CFS_PAGE_MASK)) != 0 || (nob & (~CFS_PAGE_MASK)) != 0) return -EINVAL; rc = echo_get_object (&eco, obd, oa); if (rc != 0) return rc; rc = -ENOMEM; OBD_ALLOC (ecl, sizeof (*ecl)); if (ecl == NULL) goto failed_0; ecl->ecl_mode = mode; ecl->ecl_object = eco; ecl->ecl_policy.l_extent.start = offset; ecl->ecl_policy.l_extent.end = (nob == 0) ? ((obd_off) -1) : (offset + nob - 1); einfo.ei_type = LDLM_EXTENT;
⌨️ 快捷键说明
复制代码Ctrl + C
搜索代码Ctrl + F
全屏模式F11
增大字号Ctrl + =
减小字号Ctrl + -
显示快捷键?