📄 ost_handler.c
字号:
struct l_wait_info lwi; struct lustre_handle lockh = { 0 }; int size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) }; int niocount, npages, nob = 0, rc, i; int no_reply = 0; ENTRY; if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK)) GOTO(out, rc = -EIO); OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, (obd_timeout + 1) / 4); /* Check if there is eviction in progress, and if so, wait for it to * finish */ if (unlikely(atomic_read(&exp->exp_obd->obd_evict_inprogress))) { lwi = LWI_INTR(NULL, NULL); // We do not care how long it takes rc = l_wait_event(exp->exp_obd->obd_evict_inprogress_waitq, !atomic_read(&exp->exp_obd->obd_evict_inprogress), &lwi); } if (exp->exp_failed) GOTO(out, rc = -ENOTCONN); body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body), lustre_swab_ost_body); if (body == NULL) { CERROR("Missing/short ost_body\n"); GOTO(out, rc = -EFAULT); } ioo = lustre_swab_reqbuf(req, REQ_REC_OFF + 1, sizeof(*ioo), lustre_swab_obd_ioobj); if (ioo == NULL) { CERROR("Missing/short ioobj\n"); GOTO(out, rc = -EFAULT); } niocount = ioo->ioo_bufcnt; if (niocount > PTLRPC_MAX_BRW_PAGES) { DEBUG_REQ(D_ERROR, req, "bulk has too many pages (%d)", niocount); GOTO(out, rc = -EFAULT); } remote_nb = lustre_swab_reqbuf(req, REQ_REC_OFF + 2, niocount * sizeof(*remote_nb), lustre_swab_niobuf_remote); if (remote_nb == NULL) { CERROR("Missing/short niobuf\n"); GOTO(out, rc = -EFAULT); } if (lustre_msg_swabbed(req->rq_reqmsg)) { /* swab remaining niobufs */ for (i = 1; i < niocount; i++) lustre_swab_niobuf_remote (&remote_nb[i]); } rc = lustre_pack_reply(req, 2, size, NULL); if (rc) GOTO(out, rc); /* * Per-thread array of struct niobuf_{local,remote}'s was allocated by * ost_thread_init(). */ local_nb = ost_tls(req)->local; pp_rnb = ost_tls(req)->remote; /* FIXME all niobuf splitting should be done in obdfilter if needed */ /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */ npages = get_per_page_niobufs(ioo, 1, remote_nb, niocount, &pp_rnb); if (npages < 0) GOTO(out, rc = npages); LASSERT(npages <= OST_THREAD_POOL_SIZE); ost_nio_pages_get(req, local_nb, npages); desc = ptlrpc_prep_bulk_exp(req, npages, BULK_PUT_SOURCE, OST_BULK_PORTAL); if (desc == NULL) GOTO(out, rc = -ENOMEM); rc = ost_brw_lock_get(LCK_PR, exp, ioo, pp_rnb, &lockh); if (rc != 0) GOTO(out_bulk, rc); /* * If getting the lock took more time than * client was willing to wait, drop it. b=11330 */ if (cfs_time_current_sec() > req->rq_deadline || OBD_FAIL_CHECK(OBD_FAIL_OST_DROP_REQ)) { no_reply = 1; CERROR("Dropping timed-out read from %s because locking" "object "LPX64" took %ld seconds (limit was %ld).\n", libcfs_id2str(req->rq_peer), ioo->ioo_id, cfs_time_current_sec() - req->rq_arrival_time.tv_sec, req->rq_deadline - req->rq_arrival_time.tv_sec); GOTO(out_lock, rc = -ETIMEDOUT); } rc = obd_preprw(OBD_BRW_READ, exp, &body->oa, 1, ioo, npages, pp_rnb, local_nb, oti); if (rc != 0) GOTO(out_lock, rc); ost_prolong_locks(exp, ioo, pp_rnb, &body->oa, LCK_PW | LCK_PR); nob = 0; for (i = 0; i < npages; i++) { int page_rc = local_nb[i].rc; if (page_rc < 0) { /* error */ rc = page_rc; break; } LASSERTF(page_rc <= pp_rnb[i].len, "page_rc (%d) > " "pp_rnb[%d].len (%d)\n", page_rc, i, pp_rnb[i].len); nob += page_rc; if (page_rc != 0) { /* some data! */ LASSERT (local_nb[i].page != NULL); ptlrpc_prep_bulk_page(desc, local_nb[i].page, pp_rnb[i].offset & ~CFS_PAGE_MASK, page_rc); } if (page_rc != pp_rnb[i].len) { /* short read */ /* All subsequent pages should be 0 */ while(++i < npages) LASSERT(local_nb[i].rc == 0); break; } } if (body->oa.o_valid & OBD_MD_FLCKSUM) { cksum_type_t cksum_type = OBD_CKSUM_CRC32; if (body->oa.o_valid & OBD_MD_FLFLAGS) cksum_type = cksum_type_unpack(body->oa.o_flags); body->oa.o_flags = cksum_type_pack(cksum_type); body->oa.o_valid = OBD_MD_FLCKSUM | OBD_MD_FLFLAGS; body->oa.o_cksum = ost_checksum_bulk(desc, OST_READ, cksum_type); CDEBUG(D_PAGE,"checksum at read origin: %x\n",body->oa.o_cksum); } else { body->oa.o_valid = 0; } /* We're finishing using body->oa as an input variable */ /* Check if client was evicted while we were doing i/o before touching network */ if (rc == 0) { /* Check if there is eviction in progress, and if so, wait for * it to finish */ if (unlikely(atomic_read(&exp->exp_obd-> obd_evict_inprogress))) { lwi = LWI_INTR(NULL, NULL); rc = l_wait_event(exp->exp_obd-> obd_evict_inprogress_waitq, !atomic_read(&exp->exp_obd-> obd_evict_inprogress), &lwi); } if (exp->exp_failed) rc = -ENOTCONN; else rc = ptlrpc_start_bulk_transfer(desc); if (rc == 0) { time_t start = cfs_time_current_sec(); do { long timeoutl = req->rq_deadline - cfs_time_current_sec(); cfs_duration_t timeout = (timeoutl <= 0 || rc) ? CFS_TICK : cfs_time_seconds(timeoutl); lwi = LWI_TIMEOUT_INTERVAL(timeout, cfs_time_seconds(1), ost_bulk_timeout, desc); rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc) || exp->exp_failed, &lwi); LASSERT(rc == 0 || rc == -ETIMEDOUT); /* Wait again if we changed deadline */ } while ((rc == -ETIMEDOUT) && (req->rq_deadline > cfs_time_current_sec())); if (rc == -ETIMEDOUT) { DEBUG_REQ(D_ERROR, req, "timeout on bulk PUT after %ld%+lds", req->rq_deadline - start, cfs_time_current_sec() - req->rq_deadline); ptlrpc_abort_bulk(desc); } else if (exp->exp_failed) { DEBUG_REQ(D_ERROR, req, "Eviction on bulk PUT"); rc = -ENOTCONN; ptlrpc_abort_bulk(desc); } else if (!desc->bd_success || desc->bd_nob_transferred != desc->bd_nob) { DEBUG_REQ(D_ERROR, req, "%s bulk PUT %d(%d)", desc->bd_success ? "truncated" : "network error on", desc->bd_nob_transferred, desc->bd_nob); /* XXX should this be a different errno? */ rc = -ETIMEDOUT; } } else { DEBUG_REQ(D_ERROR, req, "bulk PUT failed: rc %d", rc); } no_reply = rc != 0; } /* Must commit after prep above in all cases */ rc = obd_commitrw(OBD_BRW_READ, exp, &body->oa, 1, ioo, npages, local_nb, oti, rc); ost_nio_pages_put(req, local_nb, npages); if (rc == 0) { repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*repbody)); memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa)); } out_lock: ost_brw_lock_put(LCK_PR, ioo, pp_rnb, &lockh); out_bulk: ptlrpc_free_bulk(desc); out: LASSERT(rc <= 0); if (rc == 0) { req->rq_status = nob; target_committed_to_req(req); ptlrpc_reply(req); } else if (!no_reply) { /* Only reply if there was no comms problem with bulk */ target_committed_to_req(req); req->rq_status = rc; ptlrpc_error(req); } else { /* reply out callback would free */ ptlrpc_req_drop_rs(req); CWARN("%s: ignoring bulk IO comm error with %s@%s id %s - " "client will retry\n", exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp->exp_connection->c_remote_uuid.uuid, libcfs_id2str(req->rq_peer)); } RETURN(rc);}static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti){ struct ptlrpc_bulk_desc *desc; struct obd_export *exp = req->rq_export; struct niobuf_remote *remote_nb; struct niobuf_remote *pp_rnb; struct niobuf_local *local_nb; struct obd_ioobj *ioo; struct ost_body *body, *repbody; struct l_wait_info lwi; struct lustre_handle lockh = {0}; __u32 *rcs; int size[3] = { sizeof(struct ptlrpc_body), sizeof(*body) }; int objcount, niocount, npages; int rc, swab, i, j; obd_count client_cksum = 0, server_cksum = 0; cksum_type_t cksum_type = OBD_CKSUM_CRC32; int no_reply = 0; ENTRY; if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK)) GOTO(out, rc = -EIO); if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK2)) GOTO(out, rc = -EFAULT); /* pause before transaction has been started */ OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, (obd_timeout + 1) / 4); /* Check if there is eviction in progress, and if so, wait for it to * finish */ if (unlikely(atomic_read(&exp->exp_obd->obd_evict_inprogress))) { lwi = LWI_INTR(NULL, NULL); // We do not care how long it takes rc = l_wait_event(exp->exp_obd->obd_evict_inprogress_waitq, !atomic_read(&exp->exp_obd->obd_evict_inprogress), &lwi); } if (exp->exp_failed) GOTO(out, rc = -ENOTCONN); swab = lustre_msg_swabbed(req->rq_reqmsg); body = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*body), lustre_swab_ost_body); if (body == NULL) { CERROR("Missing/short ost_body\n"); GOTO(out, rc = -EFAULT); } lustre_set_req_swabbed(req, REQ_REC_OFF + 1); objcount = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1) / sizeof(*ioo); if (objcount == 0) { CERROR("Missing/short ioobj\n"); GOTO(out, rc = -EFAULT); } if (objcount > 1) { CERROR("too many ioobjs (%d)\n", objcount); GOTO(out, rc = -EFAULT); } ioo = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, objcount * sizeof(*ioo)); LASSERT (ioo != NULL); for (niocount = i = 0; i < objcount; i++) { if (swab) lustre_swab_obd_ioobj(&ioo[i]); if (ioo[i].ioo_bufcnt == 0) { CERROR("ioo[%d] has zero bufcnt\n", i); GOTO(out, rc = -EFAULT); } niocount += ioo[i].ioo_bufcnt; } if (niocount > PTLRPC_MAX_BRW_PAGES) { DEBUG_REQ(D_ERROR, req, "bulk has too many pages (%d)", niocount); GOTO(out, rc = -EFAULT); } remote_nb = lustre_swab_reqbuf(req, REQ_REC_OFF + 2, niocount * sizeof(*remote_nb), lustre_swab_niobuf_remote); if (remote_nb == NULL) { CERROR("Missing/short niobuf\n"); GOTO(out, rc = -EFAULT);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -