📄 ost_handler.c
字号:
lustre_swab_ost_body); if (body == NULL) RETURN(-EFAULT); rc = lustre_pack_reply(req, 2, size, NULL); if (rc) RETURN(rc); repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*repbody)); memcpy(&repbody->oa, &body->oa, sizeof(body->oa)); oinfo.oi_oa = &repbody->oa; req->rq_status = obd_setattr(exp, &oinfo, oti); RETURN(0);}static int ost_bulk_timeout(void *data){ ENTRY; /* We don't fail the connection here, because having the export * killed makes the (vital) call to commitrw very sad. */ RETURN(1);}static int get_per_page_niobufs(struct obd_ioobj *ioo, int nioo, struct niobuf_remote *rnb, int nrnb, struct niobuf_remote **pp_rnbp){ /* Copy a remote niobuf, splitting it into page-sized chunks * and setting ioo[i].ioo_bufcnt accordingly */ struct niobuf_remote *pp_rnb; int i; int j; int page; int rnbidx = 0; int npages = 0; /* * array of sufficient size already preallocated by caller */ LASSERT(pp_rnbp != NULL); LASSERT(*pp_rnbp != NULL); /* first count and check the number of pages required */ for (i = 0; i < nioo; i++) for (j = 0; j < ioo->ioo_bufcnt; j++, rnbidx++) { obd_off offset = rnb[rnbidx].offset; obd_off p0 = offset >> CFS_PAGE_SHIFT; obd_off pn = (offset + rnb[rnbidx].len - 1)>>CFS_PAGE_SHIFT; LASSERT(rnbidx < nrnb); npages += (pn + 1 - p0); if (rnb[rnbidx].len == 0) { CERROR("zero len BRW: obj %d objid "LPX64 " buf %u\n", i, ioo[i].ioo_id, j); return -EINVAL; } if (j > 0 && rnb[rnbidx].offset <= rnb[rnbidx-1].offset) { CERROR("unordered BRW: obj %d objid "LPX64 " buf %u offset "LPX64" <= "LPX64"\n", i, ioo[i].ioo_id, j, rnb[rnbidx].offset, rnb[rnbidx].offset); return -EINVAL; } } LASSERT(rnbidx == nrnb); if (npages == nrnb) { /* all niobufs are for single pages */ *pp_rnbp = rnb; return npages; } pp_rnb = *pp_rnbp; /* now do the actual split */ page = rnbidx = 0; for (i = 0; i < nioo; i++) { int obj_pages = 0; for (j = 0; j < ioo[i].ioo_bufcnt; j++, rnbidx++) { obd_off off = rnb[rnbidx].offset; int nob = rnb[rnbidx].len; LASSERT(rnbidx < nrnb); do { obd_off poff = off & ~CFS_PAGE_MASK; int pnob = (poff + nob > CFS_PAGE_SIZE) ? CFS_PAGE_SIZE - poff : nob; LASSERT(page < npages); pp_rnb[page].len = pnob; pp_rnb[page].offset = off; pp_rnb[page].flags = rnb[rnbidx].flags; CDEBUG(0, " obj %d id "LPX64 "page %d(%d) "LPX64" for %d, flg %x\n", i, ioo[i].ioo_id, obj_pages, page, pp_rnb[page].offset, pp_rnb[page].len, pp_rnb[page].flags); page++; obj_pages++; off += pnob; nob -= pnob; } while (nob > 0); LASSERT(nob == 0); } ioo[i].ioo_bufcnt = obj_pages; } LASSERT(page == npages); return npages;}static __u32 ost_checksum_bulk(struct ptlrpc_bulk_desc *desc, int opc, cksum_type_t cksum_type){ __u32 cksum; int i; cksum = init_checksum(cksum_type); for (i = 0; i < desc->bd_iov_count; i++) { struct page *page = desc->bd_iov[i].kiov_page; int off = desc->bd_iov[i].kiov_offset & ~CFS_PAGE_MASK; char *ptr = kmap(page) + off; int len = desc->bd_iov[i].kiov_len; /* corrupt the data before we compute the checksum, to * simulate a client->OST data error */ if (i == 0 && opc == OST_WRITE && OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_CHECKSUM_RECEIVE)) memcpy(ptr, "bad3", min(4, len)); cksum = compute_checksum(cksum, ptr, len, cksum_type); /* corrupt the data after we compute the checksum, to * simulate an OST->client data error */ if (i == 0 && opc == OST_READ && OBD_FAIL_CHECK_ONCE(OBD_FAIL_OST_CHECKSUM_SEND)) memcpy(ptr, "bad4", min(4, len)); kunmap(page); } return cksum;}/* * populate @nio by @nrpages pages from per-thread page pool */static void ost_nio_pages_get(struct ptlrpc_request *req, struct niobuf_local *nio, int nrpages){ int i; struct ost_thread_local_cache *tls; ENTRY; LASSERT(nrpages <= OST_THREAD_POOL_SIZE); LASSERT(req != NULL); LASSERT(req->rq_svc_thread != NULL); tls = ost_tls(req); LASSERT(tls != NULL); memset(nio, 0, nrpages * sizeof *nio); for (i = 0; i < nrpages; ++ i) { struct page *page; page = tls->page[i]; LASSERT(page != NULL); POISON_PAGE(page, 0xf1); nio[i].page = page; LL_CDEBUG_PAGE(D_INFO, page, "%d\n", i); } EXIT;}/* * Dual for ost_nio_pages_get(). Poison pages in pool for debugging */static void ost_nio_pages_put(struct ptlrpc_request *req, struct niobuf_local *nio, int nrpages){ int i; ENTRY; LASSERT(nrpages <= OST_THREAD_POOL_SIZE); for (i = 0; i < nrpages; ++ i) POISON_PAGE(nio[i].page, 0xf2); EXIT;}static int ost_brw_lock_get(int mode, struct obd_export *exp, struct obd_ioobj *obj, struct niobuf_remote *nb, struct lustre_handle *lh){ int flags = 0; int nrbufs = obj->ioo_bufcnt; struct ldlm_res_id res_id = { .name = { obj->ioo_id } }; ldlm_policy_data_t policy; int i; ENTRY; LASSERT(mode == LCK_PR || mode == LCK_PW); LASSERT(!lustre_handle_is_used(lh)); if (nrbufs == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK)) RETURN(0); /* EXPENSIVE ASSERTION */ for (i = 1; i < nrbufs; i ++) LASSERT((nb[0].flags & OBD_BRW_SRVLOCK) == (nb[i].flags & OBD_BRW_SRVLOCK)); policy.l_extent.start = nb[0].offset & CFS_PAGE_MASK; policy.l_extent.end = (nb[nrbufs - 1].offset + nb[nrbufs - 1].len - 1) | ~CFS_PAGE_MASK; RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id, LDLM_EXTENT, &policy, mode, &flags, ldlm_blocking_ast, ldlm_completion_ast, ldlm_glimpse_ast, NULL, 0, NULL, lh));}static void ost_brw_lock_put(int mode, struct obd_ioobj *obj, struct niobuf_remote *niob, struct lustre_handle *lh){ ENTRY; LASSERT(mode == LCK_PR || mode == LCK_PW); LASSERT((obj->ioo_bufcnt > 0 && (niob[0].flags & OBD_BRW_SRVLOCK)) == lustre_handle_is_used(lh)); if (lustre_handle_is_used(lh)) ldlm_lock_decref(lh, mode); EXIT;}struct ost_prolong_data { struct obd_export *opd_exp; ldlm_policy_data_t opd_policy; ldlm_mode_t opd_mode;};static int ost_prolong_locks_iter(struct ldlm_lock *lock, void *data){ struct ost_prolong_data *opd = data; LASSERT(lock->l_resource->lr_type == LDLM_EXTENT); if (lock->l_req_mode != lock->l_granted_mode) { /* scan granted locks only */ return LDLM_ITER_STOP; } if (lock->l_export != opd->opd_exp) { /* prolong locks only for given client */ return LDLM_ITER_CONTINUE; } if (!(lock->l_granted_mode & opd->opd_mode)) { /* we aren't interesting in all type of locks */ return LDLM_ITER_CONTINUE; } if (lock->l_policy_data.l_extent.end < opd->opd_policy.l_extent.start || lock->l_policy_data.l_extent.start > opd->opd_policy.l_extent.end) { /* the request doesn't cross the lock, skip it */ return LDLM_ITER_CONTINUE; } if (!(lock->l_flags & LDLM_FL_AST_SENT)) { /* ignore locks not being cancelled */ return LDLM_ITER_CONTINUE; } /* OK. this is a possible lock the user holds doing I/O * let's refresh eviction timer for it */ ldlm_refresh_waiting_lock(lock); return LDLM_ITER_CONTINUE;}static void ost_prolong_locks(struct obd_export *exp, struct obd_ioobj *obj, struct niobuf_remote *nb, struct obdo *oa, ldlm_mode_t mode){ struct ldlm_res_id res_id = { .name = { obj->ioo_id } }; int nrbufs = obj->ioo_bufcnt; struct ost_prolong_data opd; ENTRY; opd.opd_mode = mode; opd.opd_exp = exp; opd.opd_policy.l_extent.start = nb[0].offset & CFS_PAGE_MASK; opd.opd_policy.l_extent.end = (nb[nrbufs - 1].offset + nb[nrbufs - 1].len - 1) | ~CFS_PAGE_MASK; CDEBUG(D_DLMTRACE,"refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n", res_id.name[0], res_id.name[1], opd.opd_policy.l_extent.start, opd.opd_policy.l_extent.end); if (oa->o_valid & OBD_MD_FLHANDLE) { struct ldlm_lock *lock; lock = ldlm_handle2lock(&oa->o_handle); if (lock != NULL) { ost_prolong_locks_iter(lock, &opd); LDLM_LOCK_PUT(lock); EXIT; return; } } ldlm_resource_iterate(exp->exp_obd->obd_namespace, &res_id, ost_prolong_locks_iter, &opd); EXIT;}static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti){ struct ptlrpc_bulk_desc *desc; struct obd_export *exp = req->rq_export; struct niobuf_remote *remote_nb; struct niobuf_remote *pp_rnb = NULL; struct niobuf_local *local_nb; struct obd_ioobj *ioo; struct ost_body *body, *repbody;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -