📄 ost_handler.c
字号:
} if (swab) { /* swab the remaining niobufs */ for (i = 1; i < niocount; i++) lustre_swab_niobuf_remote (&remote_nb[i]); } size[REPLY_REC_OFF + 1] = niocount * sizeof(*rcs); rc = lustre_pack_reply(req, 3, size, NULL); if (rc != 0) GOTO(out, rc); OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_PACK, obd_fail_val); rcs = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF + 1, niocount * sizeof(*rcs)); /* * Per-thread array of struct niobuf_{local,remote}'s was allocated by * ost_thread_init(). */ local_nb = ost_tls(req)->local; pp_rnb = ost_tls(req)->remote; /* FIXME all niobuf splitting should be done in obdfilter if needed */ /* CAVEAT EMPTOR this sets ioo->ioo_bufcnt to # pages */ npages = get_per_page_niobufs(ioo, objcount,remote_nb,niocount,&pp_rnb); if (npages < 0) GOTO(out, rc = npages); LASSERT(npages <= OST_THREAD_POOL_SIZE); ost_nio_pages_get(req, local_nb, npages); desc = ptlrpc_prep_bulk_exp(req, npages, BULK_GET_SINK, OST_BULK_PORTAL); if (desc == NULL) GOTO(out, rc = -ENOMEM); rc = ost_brw_lock_get(LCK_PW, exp, ioo, pp_rnb, &lockh); if (rc != 0) GOTO(out_bulk, rc); /* * If getting the lock took more time than * client was willing to wait, drop it. b=11330 */ if (cfs_time_current_sec() > req->rq_deadline || OBD_FAIL_CHECK(OBD_FAIL_OST_DROP_REQ)) { no_reply = 1; CERROR("Dropping timed-out write from %s because locking " "object "LPX64" took %ld seconds (limit was %ld).\n", libcfs_id2str(req->rq_peer), ioo->ioo_id, cfs_time_current_sec() - req->rq_arrival_time.tv_sec, req->rq_deadline - req->rq_arrival_time.tv_sec); GOTO(out_lock, rc = -ETIMEDOUT); } ost_prolong_locks(exp, ioo, pp_rnb, &body->oa, LCK_PW); /* obd_preprw clobbers oa->valid, so save what we need */ if (body->oa.o_valid & OBD_MD_FLCKSUM) { client_cksum = body->oa.o_cksum; if (body->oa.o_valid & OBD_MD_FLFLAGS) cksum_type = cksum_type_unpack(body->oa.o_flags); } /* Because we already sync grant info with client when reconnect, * grant info will be cleared for resent req, then fed_grant and * total_grant will not be modified in following preprw_write*/ if (lustre_msg_get_flags(req->rq_reqmsg) & (MSG_RESENT | MSG_REPLAY)) { DEBUG_REQ(D_CACHE, req, "clear resent/replay req grant info"); body->oa.o_valid &= ~OBD_MD_FLGRANT; } rc = obd_preprw(OBD_BRW_WRITE, exp, &body->oa, objcount, ioo, npages, pp_rnb, local_nb, oti); if (rc != 0) GOTO(out_lock, rc); /* NB Having prepped, we must commit... */ for (i = 0; i < npages; i++) ptlrpc_prep_bulk_page(desc, local_nb[i].page, pp_rnb[i].offset & ~CFS_PAGE_MASK, pp_rnb[i].len); /* Check if client was evicted while we were doing i/o before touching network */ if (desc->bd_export->exp_failed) rc = -ENOTCONN; else rc = ptlrpc_start_bulk_transfer (desc); if (rc == 0) { time_t start = cfs_time_current_sec(); do { long timeoutl = req->rq_deadline - cfs_time_current_sec(); cfs_duration_t timeout = (timeoutl <= 0 || rc) ? CFS_TICK : cfs_time_seconds(timeoutl); lwi = LWI_TIMEOUT_INTERVAL(timeout, cfs_time_seconds(1), ost_bulk_timeout, desc); rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc) || desc->bd_export->exp_failed, &lwi); LASSERT(rc == 0 || rc == -ETIMEDOUT); /* Wait again if we changed deadline */ } while ((rc == -ETIMEDOUT) && (req->rq_deadline > cfs_time_current_sec())); if (rc == -ETIMEDOUT) { DEBUG_REQ(D_ERROR, req, "timeout on bulk GET after %ld%+lds", req->rq_deadline - start, cfs_time_current_sec() - req->rq_deadline); ptlrpc_abort_bulk(desc); } else if (desc->bd_export->exp_failed) { DEBUG_REQ(D_ERROR, req, "Eviction on bulk GET"); rc = -ENOTCONN; ptlrpc_abort_bulk(desc); } else if (!desc->bd_success || desc->bd_nob_transferred != desc->bd_nob) { DEBUG_REQ(D_ERROR, req, "%s bulk GET %d(%d)", desc->bd_success ? "truncated" : "network error on", desc->bd_nob_transferred, desc->bd_nob); /* XXX should this be a different errno? */ rc = -ETIMEDOUT; } } else { DEBUG_REQ(D_ERROR, req, "ptlrpc_bulk_get failed: rc %d", rc); } no_reply = rc != 0; repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*repbody)); memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa)); if (client_cksum != 0 && rc == 0) { static int cksum_counter; repbody->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS; repbody->oa.o_flags &= ~OBD_FL_CKSUM_ALL; repbody->oa.o_flags |= cksum_type_pack(cksum_type); server_cksum = ost_checksum_bulk(desc, OST_WRITE, cksum_type); repbody->oa.o_cksum = server_cksum; cksum_counter++; if (unlikely(client_cksum != server_cksum)) { CERROR("client csum %x, server csum %x\n", client_cksum, server_cksum); cksum_counter = 0; } else if ((cksum_counter & (-cksum_counter)) == cksum_counter){ CDEBUG(D_INFO, "Checksum %u from %s OK: %x\n", cksum_counter, libcfs_id2str(req->rq_peer), server_cksum); } } /* Check if there is eviction in progress, and if so, wait for * it to finish */ if (unlikely(atomic_read(&exp->exp_obd->obd_evict_inprogress))) { lwi = LWI_INTR(NULL, NULL); rc = l_wait_event(exp->exp_obd->obd_evict_inprogress_waitq, !atomic_read(&exp->exp_obd->obd_evict_inprogress), &lwi); } if (rc == 0 && exp->exp_failed) rc = -ENOTCONN; /* Must commit after prep above in all cases */ rc = obd_commitrw(OBD_BRW_WRITE, exp, &repbody->oa, objcount, ioo, npages, local_nb, oti, rc); if (unlikely(client_cksum != server_cksum && rc == 0)) { int new_cksum = ost_checksum_bulk(desc, OST_WRITE, cksum_type); char *msg; char *via; char *router; if (new_cksum == server_cksum) msg = "changed in transit before arrival at OST"; else if (new_cksum == client_cksum) msg = "initial checksum before message complete"; else msg = "changed in transit AND after initial checksum"; if (req->rq_peer.nid == desc->bd_sender) { via = router = ""; } else { via = " via "; router = libcfs_nid2str(desc->bd_sender); } LCONSOLE_ERROR_MSG(0x168, "%s: BAD WRITE CHECKSUM: %s from %s" "%s%s inum "LPU64"/"LPU64" object "LPU64"/" LPU64" extent ["LPU64"-"LPU64"]\n", exp->exp_obd->obd_name, msg, libcfs_id2str(req->rq_peer), via, router, body->oa.o_valid & OBD_MD_FLFID ? body->oa.o_fid : (__u64)0, body->oa.o_valid & OBD_MD_FLFID ? body->oa.o_generation :(__u64)0, body->oa.o_id, body->oa.o_valid & OBD_MD_FLGROUP ? body->oa.o_gr : (__u64)0, pp_rnb[0].offset, pp_rnb[npages-1].offset+pp_rnb[npages-1].len - 1 ); CERROR("client csum %x, original server csum %x, " "server csum now %x\n", client_cksum, server_cksum, new_cksum); } ost_nio_pages_put(req, local_nb, npages); if (rc == 0) { /* set per-requested niobuf return codes */ for (i = j = 0; i < niocount; i++) { int nob = remote_nb[i].len; rcs[i] = 0; do { LASSERT(j < npages); if (local_nb[j].rc < 0) rcs[i] = local_nb[j].rc; nob -= pp_rnb[j].len; j++; } while (nob > 0); LASSERT(nob == 0); } LASSERT(j == npages); } out_lock: ost_brw_lock_put(LCK_PW, ioo, pp_rnb, &lockh); out_bulk: ptlrpc_free_bulk(desc); out: if (rc == 0) { oti_to_request(oti, req); target_committed_to_req(req); rc = ptlrpc_reply(req); } else if (!no_reply) { /* Only reply if there was no comms problem with bulk */ target_committed_to_req(req); req->rq_status = rc; ptlrpc_error(req); } else { /* reply out callback would free */ ptlrpc_req_drop_rs(req); CWARN("%s: ignoring bulk IO comm error with %s@%s id %s - " "client will retry\n", exp->exp_obd->obd_name, exp->exp_client_uuid.uuid, exp->exp_connection->c_remote_uuid.uuid, libcfs_id2str(req->rq_peer)); } RETURN(rc);}static int ost_set_info(struct obd_export *exp, struct ptlrpc_request *req){ char *key, *val = NULL; int keylen, vallen, rc = 0; ENTRY; key = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, 1); if (key == NULL) { DEBUG_REQ(D_HA, req, "no set_info key"); RETURN(-EFAULT); } keylen = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF); rc = lustre_pack_reply(req, 1, NULL, NULL); if (rc) RETURN(rc); vallen = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1); if (vallen) val = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, 0); if (KEY_IS("evict_by_nid")) { if (val && vallen) obd_export_evict_by_nid(exp->exp_obd, val); GOTO(out, rc = 0); } rc = obd_set_info_async(exp, keylen, key, vallen, val, NULL);out: lustre_msg_set_status(req->rq_repmsg, 0); RETURN(rc);}static int ost_get_info(struct obd_export *exp, struct ptlrpc_request *req){ void *key, *reply; int keylen, rc = 0; int size[2] = { sizeof(struct ptlrpc_body), 0 }; ENTRY; key = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, 1); if (key == NULL) { DEBUG_REQ(D_HA, req, "no get_info key"); RETURN(-EFAULT); } keylen = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF); /* call once to get the size to allocate the reply buffer */ rc = obd_get_info(exp, keylen, key, &size[1], NULL); if (rc) RETURN(rc); rc = lustre_pack_reply(req, 2, size, NULL); if (rc) RETURN(rc); reply = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*reply)); /* call again to fill in the reply buffer */ rc = obd_get_info(exp, keylen, key, size, reply); lustre_msg_set_status(req->rq_repmsg, 0); RETURN(rc);}static int ost_handle_quotactl(struct ptlrpc_request *req){ struct obd_quotactl *oqctl, *repoqc; int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*repoqc) }; ENTRY; oqctl = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*oqctl), lustre_swab_obd_quotactl); if (oqctl == NULL) GOTO(out, rc = -EPROTO); rc = lustre_pack_reply(req, 2, size, NULL); if (rc)
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -