📄 ptllnd_cb.c
字号:
* peer closes. * * Note that the following condition handles this case, where it * actually increases the extra lazy credit counter. */ if (nasync <= peer->plp_extra_lazy_credits) { peer->plp_extra_lazy_credits -= nasync; return 0; } LASSERT (nasync > 0); nasync -= peer->plp_extra_lazy_credits; peer->plp_extra_lazy_credits = 0; rc = ptllnd_size_buffers(ni, nasync); if (rc == 0) { peer->plp_lazy_credits += nasync; peer->plp_outstanding_credits += nasync; } return rc;}__u32ptllnd_cksum (void *ptr, int nob){ char *c = ptr; __u32 sum = 0; while (nob-- > 0) sum = ((sum << 1) | (sum >> 31)) + *c++; /* ensure I don't return 0 (== no checksum) */ return (sum == 0) ? 1 : sum;}ptllnd_tx_t *ptllnd_new_tx(ptllnd_peer_t *peer, int type, int payload_nob){ lnet_ni_t *ni = peer->plp_ni; ptllnd_ni_t *plni = ni->ni_data; ptllnd_tx_t *tx; int msgsize; CDEBUG(D_NET, "peer=%p type=%d payload=%d\n", peer, type, payload_nob); switch (type) { default: LBUG(); case PTLLND_RDMA_WRITE: case PTLLND_RDMA_READ: LASSERT (payload_nob == 0); msgsize = 0; break; case PTLLND_MSG_TYPE_PUT: case PTLLND_MSG_TYPE_GET: LASSERT (payload_nob == 0); msgsize = offsetof(kptl_msg_t, ptlm_u) + sizeof(kptl_rdma_msg_t); break; case PTLLND_MSG_TYPE_IMMEDIATE: msgsize = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[payload_nob]); break; case PTLLND_MSG_TYPE_NOOP: LASSERT (payload_nob == 0); msgsize = offsetof(kptl_msg_t, ptlm_u); break; case PTLLND_MSG_TYPE_HELLO: LASSERT (payload_nob == 0); msgsize = offsetof(kptl_msg_t, ptlm_u) + sizeof(kptl_hello_msg_t); break; } msgsize = (msgsize + 7) & ~7; LASSERT (msgsize <= peer->plp_max_msg_size); LIBCFS_ALLOC(tx, offsetof(ptllnd_tx_t, tx_msg) + msgsize); if (tx == NULL) { CERROR("Can't allocate msg type %d for %s\n", type, libcfs_id2str(peer->plp_id)); return NULL; } CFS_INIT_LIST_HEAD(&tx->tx_list); tx->tx_peer = peer; tx->tx_type = type; tx->tx_lnetmsg = tx->tx_lnetreplymsg = NULL; tx->tx_niov = 0; tx->tx_iov = NULL; tx->tx_reqmdh = PTL_INVALID_HANDLE; tx->tx_bulkmdh = PTL_INVALID_HANDLE; tx->tx_msgsize = msgsize; tx->tx_completing = 0; tx->tx_status = 0; memset(&tx->tx_bulk_posted, 0, sizeof(tx->tx_bulk_posted)); memset(&tx->tx_bulk_done, 0, sizeof(tx->tx_bulk_done)); memset(&tx->tx_req_posted, 0, sizeof(tx->tx_req_posted)); memset(&tx->tx_req_done, 0, sizeof(tx->tx_req_done)); if (msgsize != 0) { tx->tx_msg.ptlm_magic = PTLLND_MSG_MAGIC; tx->tx_msg.ptlm_version = PTLLND_MSG_VERSION; tx->tx_msg.ptlm_type = type; tx->tx_msg.ptlm_credits = 0; tx->tx_msg.ptlm_nob = msgsize; tx->tx_msg.ptlm_cksum = 0; tx->tx_msg.ptlm_srcnid = ni->ni_nid; tx->tx_msg.ptlm_srcstamp = plni->plni_stamp; tx->tx_msg.ptlm_dstnid = peer->plp_id.nid; tx->tx_msg.ptlm_dststamp = peer->plp_stamp; tx->tx_msg.ptlm_srcpid = the_lnet.ln_pid; tx->tx_msg.ptlm_dstpid = peer->plp_id.pid; } ptllnd_peer_addref(peer); plni->plni_ntxs++; CDEBUG(D_NET, "tx=%p\n",tx); return tx;}voidptllnd_abort_tx(ptllnd_tx_t *tx, ptl_handle_md_t *mdh){ ptllnd_peer_t *peer = tx->tx_peer; lnet_ni_t *ni = peer->plp_ni; int rc; time_t start = cfs_time_current_sec(); ptllnd_ni_t *plni = ni->ni_data; int w = plni->plni_long_wait; while (!PtlHandleIsEqual(*mdh, PTL_INVALID_HANDLE)) { rc = PtlMDUnlink(*mdh);#ifndef LUSTRE_PORTALS_UNLINK_SEMANTICS if (rc == PTL_OK) /* unlink successful => no unlinked event */ return; LASSERT (rc == PTL_MD_IN_USE);#endif if (w > 0 && cfs_time_current_sec() > start + w/1000) { CWARN("Waited %ds to abort tx to %s\n", (int)(cfs_time_current_sec() - start), libcfs_id2str(peer->plp_id)); w *= 2; } /* Wait for ptllnd_tx_event() to invalidate */ ptllnd_wait(ni, w); }}voidptllnd_cull_tx_history(ptllnd_ni_t *plni){ int max = plni->plni_max_tx_history; while (plni->plni_ntx_history > max) { ptllnd_tx_t *tx = list_entry(plni->plni_tx_history.next, ptllnd_tx_t, tx_list); list_del(&tx->tx_list); ptllnd_peer_decref(tx->tx_peer); LIBCFS_FREE(tx, offsetof(ptllnd_tx_t, tx_msg) + tx->tx_msgsize); LASSERT (plni->plni_ntxs > 0); plni->plni_ntxs--; plni->plni_ntx_history--; }}voidptllnd_tx_done(ptllnd_tx_t *tx){ ptllnd_peer_t *peer = tx->tx_peer; lnet_ni_t *ni = peer->plp_ni; ptllnd_ni_t *plni = ni->ni_data; /* CAVEAT EMPTOR: If this tx is being aborted, I'll continue to get * events for this tx until it's unlinked. So I set tx_completing to * flag the tx is getting handled */ if (tx->tx_completing) return; tx->tx_completing = 1; if (!list_empty(&tx->tx_list)) list_del_init(&tx->tx_list); if (tx->tx_status != 0) { if (plni->plni_debug) { CERROR("Completing tx for %s with error %d\n", libcfs_id2str(peer->plp_id), tx->tx_status); ptllnd_debug_tx(tx); } ptllnd_close_peer(peer, tx->tx_status); } ptllnd_abort_tx(tx, &tx->tx_reqmdh); ptllnd_abort_tx(tx, &tx->tx_bulkmdh); if (tx->tx_niov > 0) { LIBCFS_FREE(tx->tx_iov, tx->tx_niov * sizeof(*tx->tx_iov)); tx->tx_niov = 0; } if (tx->tx_lnetreplymsg != NULL) { LASSERT (tx->tx_type == PTLLND_MSG_TYPE_GET); LASSERT (tx->tx_lnetmsg != NULL); /* Simulate GET success always */ lnet_finalize(ni, tx->tx_lnetmsg, 0); CDEBUG(D_NET, "lnet_finalize(tx_lnetreplymsg=%p)\n",tx->tx_lnetreplymsg); lnet_finalize(ni, tx->tx_lnetreplymsg, tx->tx_status); } else if (tx->tx_lnetmsg != NULL) { lnet_finalize(ni, tx->tx_lnetmsg, tx->tx_status); } plni->plni_ntx_history++; list_add_tail(&tx->tx_list, &plni->plni_tx_history); ptllnd_cull_tx_history(plni);}intptllnd_set_txiov(ptllnd_tx_t *tx, unsigned int niov, struct iovec *iov, unsigned int offset, unsigned int len){ ptl_md_iovec_t *piov; int npiov; if (len == 0) { tx->tx_niov = 0; return 0; } /* * Remove iovec's at the beginning that * are skipped because of the offset. * Adjust the offset accordingly */ for (;;) { LASSERT (niov > 0); if (offset < iov->iov_len) break; offset -= iov->iov_len; niov--; iov++; } for (;;) { int temp_offset = offset; int resid = len; LIBCFS_ALLOC(piov, niov * sizeof(*piov)); if (piov == NULL) return -ENOMEM; for (npiov = 0;; npiov++) { LASSERT (npiov < niov); LASSERT (iov->iov_len >= temp_offset); piov[npiov].iov_base = iov[npiov].iov_base + temp_offset; piov[npiov].iov_len = iov[npiov].iov_len - temp_offset; if (piov[npiov].iov_len >= resid) { piov[npiov].iov_len = resid; npiov++; break; } resid -= piov[npiov].iov_len; temp_offset = 0; } if (npiov == niov) { tx->tx_niov = niov; tx->tx_iov = piov; return 0; } /* Dang! The piov I allocated was too big and it's a drag to * have to maintain separate 'allocated' and 'used' sizes, so * I'll just do it again; NB this doesn't happen normally... */ LIBCFS_FREE(piov, niov * sizeof(*piov)); niov = npiov; }}voidptllnd_set_md_buffer(ptl_md_t *md, ptllnd_tx_t *tx){ unsigned int niov = tx->tx_niov; ptl_md_iovec_t *iov = tx->tx_iov; LASSERT ((md->options & PTL_MD_IOVEC) == 0); if (niov == 0) { md->start = NULL; md->length = 0; } else if (niov == 1) { md->start = iov[0].iov_base; md->length = iov[0].iov_len; } else { md->start = iov; md->length = niov; md->options |= PTL_MD_IOVEC; }}intptllnd_post_buffer(ptllnd_buffer_t *buf){ lnet_ni_t *ni = buf->plb_ni; ptllnd_ni_t *plni = ni->ni_data; ptl_process_id_t anyid = { .nid = PTL_NID_ANY, .pid = PTL_PID_ANY}; ptl_md_t md = { .start = buf->plb_buffer, .length = plni->plni_buffer_size, .threshold = PTL_MD_THRESH_INF, .max_size = plni->plni_max_msg_size, .options = (PTLLND_MD_OPTIONS | PTL_MD_OP_PUT | PTL_MD_MAX_SIZE | PTL_MD_LOCAL_ALIGN8), .user_ptr = ptllnd_obj2eventarg(buf, PTLLND_EVENTARG_TYPE_BUF), .eq_handle = plni->plni_eqh}; ptl_handle_me_t meh; int rc; LASSERT (!buf->plb_posted); rc = PtlMEAttach(plni->plni_nih, plni->plni_portal, anyid, LNET_MSG_MATCHBITS, 0, PTL_UNLINK, PTL_INS_AFTER, &meh); if (rc != PTL_OK) { CERROR("PtlMEAttach failed: %s(%d)\n", ptllnd_errtype2str(rc), rc); return -ENOMEM; } buf->plb_posted = 1; plni->plni_nposted_buffers++; rc = PtlMDAttach(meh, md, LNET_UNLINK, &buf->plb_md); if (rc == PTL_OK) return 0; CERROR("PtlMDAttach failed: %s(%d)\n", ptllnd_errtype2str(rc), rc); buf->plb_posted = 0; plni->plni_nposted_buffers--; rc = PtlMEUnlink(meh); LASSERT (rc == PTL_OK); return -ENOMEM;}void
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -