📄 ptllnd_cb.c
字号:
/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved. * Author: PJ Kirner <pjkirner@clusterfs.com> * * This file is part of the Lustre file system, http://www.lustre.org * Lustre is a trademark of Cluster File Systems, Inc. * * This file is confidential source code owned by Cluster File Systems. * No viewing, modification, compilation, redistribution, or any other * form of use is permitted except through a signed license agreement. * * If you have not signed such an agreement, then you have no rights to * this file. Please destroy it immediately and contact CFS. * */#include "ptllnd.h"#ifndef _USING_LUSTRE_PORTALS_intkptllnd_extract_iov (int dst_niov, ptl_md_iovec_t *dst, int src_niov, struct iovec *src, unsigned int offset, unsigned int len){ /* Initialise 'dst' to the subset of 'src' starting at 'offset', * for exactly 'len' bytes, and return the number of entries. * NB not destructive to 'src' */ unsigned int frag_len; unsigned int niov; if (len == 0) /* no data => */ return (0); /* no frags */ LASSERT (src_niov > 0); while (offset >= src->iov_len) { /* skip initial frags */ offset -= src->iov_len; src_niov--; src++; LASSERT (src_niov > 0); } niov = 1; for (;;) { LASSERT (src_niov > 0); LASSERT (niov <= dst_niov); frag_len = src->iov_len - offset; dst->iov_base = ((char *)src->iov_base) + offset; if (len <= frag_len) { dst->iov_len = len; return (niov); } dst->iov_len = frag_len; len -= frag_len; dst++; src++; niov++; src_niov--; offset = 0; }}intkptllnd_extract_phys (int dst_niov, ptl_md_iovec_t *dst, int src_niov, lnet_kiov_t *src, unsigned int offset, unsigned int len){ /* Initialise 'dst' to the physical addresses of the subset of 'src' * starting at 'offset', for exactly 'len' bytes, and return the number * of entries. NB not destructive to 'src' */ unsigned int frag_len; unsigned int niov; __u64 phys_page; __u64 phys; if (len == 0) /* no data => */ return (0); /* no frags */ LASSERT (src_niov > 0); while (offset >= src->kiov_len) { /* skip initial frags */ offset -= src->kiov_len; src_niov--; src++; LASSERT (src_niov > 0); } niov = 1; for (;;) { LASSERT (src_niov > 0); LASSERT (niov <= dst_niov); frag_len = min(src->kiov_len - offset, len); phys_page = lnet_page2phys(src->kiov_page); phys = phys_page + src->kiov_offset + offset; LASSERT (sizeof(void *) > 4 || (phys <= 0xffffffffULL && phys + (frag_len - 1) <= 0xffffffffULL)); dst->iov_base = (void *)((unsigned long)phys); dst->iov_len = frag_len; if (frag_len == len) return niov; len -= frag_len; dst++; src++; niov++; src_niov--; offset = 0; }}#endifvoidkptllnd_init_rdma_md(kptl_tx_t *tx, unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov, unsigned int offset, unsigned int nob){ LASSERT (iov == NULL || kiov == NULL); memset(&tx->tx_rdma_md, 0, sizeof(tx->tx_rdma_md)); tx->tx_rdma_md.start = tx->tx_frags; tx->tx_rdma_md.user_ptr = &tx->tx_rdma_eventarg; tx->tx_rdma_md.eq_handle = kptllnd_data.kptl_eqh; tx->tx_rdma_md.options = PTL_MD_LUSTRE_COMPLETION_SEMANTICS | PTL_MD_EVENT_START_DISABLE; switch (tx->tx_type) { default: LBUG(); case TX_TYPE_PUT_REQUEST: /* passive: peer gets */ tx->tx_rdma_md.threshold = 1; /* GET event */ tx->tx_rdma_md.options |= PTL_MD_OP_GET; break; case TX_TYPE_GET_REQUEST: /* passive: peer puts */ tx->tx_rdma_md.threshold = 1; /* PUT event */ tx->tx_rdma_md.options |= PTL_MD_OP_PUT; break; case TX_TYPE_PUT_RESPONSE: /* active: I get */ tx->tx_rdma_md.threshold = 2; /* SEND + REPLY */ break; case TX_TYPE_GET_RESPONSE: /* active: I put */ tx->tx_rdma_md.threshold = tx->tx_acked ? 2 : 1; /* SEND + ACK? */ break; } if (nob == 0) { tx->tx_rdma_md.length = 0; return; }#ifdef _USING_LUSTRE_PORTALS_ if (iov != NULL) { tx->tx_rdma_md.options |= PTL_MD_IOVEC; tx->tx_rdma_md.length = lnet_extract_iov(PTL_MD_MAX_IOV, tx->tx_frags->iov, niov, iov, offset, nob); return; } /* Cheating OK since ptl_kiov_t == lnet_kiov_t */ CLASSERT(sizeof(ptl_kiov_t) == sizeof(lnet_kiov_t)); CLASSERT(offsetof(ptl_kiov_t, kiov_offset) == offsetof(lnet_kiov_t, kiov_offset)); CLASSERT(offsetof(ptl_kiov_t, kiov_page) == offsetof(lnet_kiov_t, kiov_page)); CLASSERT(offsetof(ptl_kiov_t, kiov_len) == offsetof(lnet_kiov_t, kiov_len)); tx->tx_rdma_md.options |= PTL_MD_KIOV; tx->tx_rdma_md.length = lnet_extract_kiov(PTL_MD_MAX_IOV, tx->tx_frags->kiov, niov, kiov, offset, nob);#else if (iov != NULL) { tx->tx_rdma_md.options |= PTL_MD_IOVEC; tx->tx_rdma_md.length = kptllnd_extract_iov(PTL_MD_MAX_IOV, tx->tx_frags->iov, niov, iov, offset, nob); return; } tx->tx_rdma_md.options |= PTL_MD_IOVEC | PTL_MD_PHYS; tx->tx_rdma_md.length = kptllnd_extract_phys(PTL_MD_MAX_IOV, tx->tx_frags->iov, niov, kiov, offset, nob);#endif}intkptllnd_active_rdma(kptl_rx_t *rx, lnet_msg_t *lntmsg, int type, unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov, unsigned int offset, int nob){ kptl_tx_t *tx; ptl_err_t ptlrc; kptl_msg_t *rxmsg = rx->rx_msg; kptl_peer_t *peer = rx->rx_peer; unsigned long flags; ptl_handle_md_t mdh; LASSERT (type == TX_TYPE_PUT_RESPONSE || type == TX_TYPE_GET_RESPONSE); tx = kptllnd_get_idle_tx(type); if (tx == NULL) { CERROR ("Can't do %s rdma to %s: can't allocate descriptor\n", type == TX_TYPE_PUT_RESPONSE ? "GET" : "PUT", libcfs_id2str(peer->peer_id)); return -ENOMEM; } kptllnd_set_tx_peer(tx, peer); kptllnd_init_rdma_md(tx, niov, iov, kiov, offset, nob); ptlrc = PtlMDBind(kptllnd_data.kptl_nih, tx->tx_rdma_md, PTL_UNLINK, &mdh); if (ptlrc != PTL_OK) { CERROR("PtlMDBind(%s) failed: %s(%d)\n", libcfs_id2str(peer->peer_id), kptllnd_errtype2str(ptlrc), ptlrc); tx->tx_status = -EIO; kptllnd_tx_decref(tx); return -EIO; } spin_lock_irqsave(&peer->peer_lock, flags); tx->tx_lnet_msg = lntmsg; /* lnet_finalize() will be called when tx is torn down, so I must * return success from here on... */ tx->tx_deadline = jiffies + (*kptllnd_tunables.kptl_timeout * HZ); tx->tx_rdma_mdh = mdh; tx->tx_active = 1; list_add_tail(&tx->tx_list, &peer->peer_activeq); /* peer has now got my ref on 'tx' */ spin_unlock_irqrestore(&peer->peer_lock, flags); tx->tx_tposted = jiffies; if (type == TX_TYPE_GET_RESPONSE) ptlrc = PtlPut(mdh, tx->tx_acked ? PTL_ACK_REQ : PTL_NOACK_REQ, rx->rx_initiator, *kptllnd_tunables.kptl_portal, 0, /* acl cookie */ rxmsg->ptlm_u.rdma.kptlrm_matchbits, 0, /* offset */ (lntmsg != NULL) ? /* header data */ PTLLND_RDMA_OK : PTLLND_RDMA_FAIL); else ptlrc = PtlGet(mdh, rx->rx_initiator, *kptllnd_tunables.kptl_portal, 0, /* acl cookie */ rxmsg->ptlm_u.rdma.kptlrm_matchbits, 0); /* offset */ if (ptlrc != PTL_OK) { CERROR("Ptl%s failed: %s(%d)\n", (type == TX_TYPE_GET_RESPONSE) ? "Put" : "Get", kptllnd_errtype2str(ptlrc), ptlrc); kptllnd_peer_close(peer, -EIO); /* Everything (including this RDMA) queued on the peer will * be completed with failure */ } return 0;}intkptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg){ lnet_hdr_t *hdr = &lntmsg->msg_hdr; int type = lntmsg->msg_type; lnet_process_id_t target = lntmsg->msg_target; int target_is_router = lntmsg->msg_target_is_router; int routing = lntmsg->msg_routing; unsigned int payload_niov = lntmsg->msg_niov; struct iovec *payload_iov = lntmsg->msg_iov; lnet_kiov_t *payload_kiov = lntmsg->msg_kiov; unsigned int payload_offset = lntmsg->msg_offset; unsigned int payload_nob = lntmsg->msg_len; kptl_peer_t *peer; kptl_tx_t *tx; int nob; int nfrag; int rc; LASSERT (payload_nob == 0 || payload_niov > 0); LASSERT (payload_niov <= LNET_MAX_IOV); LASSERT (payload_niov <= PTL_MD_MAX_IOV); /* !!! */ LASSERT (!(payload_kiov != NULL && payload_iov != NULL)); LASSERT (!in_interrupt()); rc = kptllnd_find_target(&peer, target); if (rc != 0) return rc; switch (type) { default: LBUG(); return -EINVAL; case LNET_MSG_REPLY: case LNET_MSG_PUT: /* Should the payload avoid RDMA? */ nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[payload_nob]); if (payload_kiov == NULL && nob <= peer->peer_max_msg_size) break; tx = kptllnd_get_idle_tx(TX_TYPE_PUT_REQUEST); if (tx == NULL) { CERROR("Can't send %s to %s: can't allocate descriptor\n", lnet_msgtyp2str(type), libcfs_id2str(target)); rc = -ENOMEM; goto out; } kptllnd_init_rdma_md(tx, payload_niov, payload_iov, payload_kiov, payload_offset, payload_nob); tx->tx_lnet_msg = lntmsg; tx->tx_msg->ptlm_u.rdma.kptlrm_hdr = *hdr; kptllnd_init_msg (tx->tx_msg, PTLLND_MSG_TYPE_PUT, sizeof(kptl_rdma_msg_t)); CDEBUG(D_NETTRACE, "%s: passive PUT p %d %p\n", libcfs_id2str(target), le32_to_cpu(lntmsg->msg_hdr.msg.put.ptl_index), tx); kptllnd_tx_launch(peer, tx, 0); goto out; case LNET_MSG_GET: /* routed gets don't RDMA */ if (target_is_router || routing) break; /* Is the payload small enough not to need RDMA? */ nob = lntmsg->msg_md->md_length; nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[nob]); if (nob <= peer->peer_max_msg_size) break; tx = kptllnd_get_idle_tx(TX_TYPE_GET_REQUEST); if (tx == NULL) { CERROR("Can't send GET to %s: can't allocate descriptor\n", libcfs_id2str(target)); rc = -ENOMEM; goto out; } tx->tx_lnet_replymsg = lnet_create_reply_msg(kptllnd_data.kptl_ni, lntmsg); if (tx->tx_lnet_replymsg == NULL) { CERROR("Failed to allocate LNET reply for %s\n", libcfs_id2str(target)); kptllnd_tx_decref(tx); rc = -ENOMEM; goto out; } if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0) kptllnd_init_rdma_md(tx, lntmsg->msg_md->md_niov, lntmsg->msg_md->md_iov.iov, NULL, 0, lntmsg->msg_md->md_length); else kptllnd_init_rdma_md(tx, lntmsg->msg_md->md_niov, NULL, lntmsg->msg_md->md_iov.kiov, 0, lntmsg->msg_md->md_length); tx->tx_lnet_msg = lntmsg; tx->tx_msg->ptlm_u.rdma.kptlrm_hdr = *hdr; kptllnd_init_msg (tx->tx_msg, PTLLND_MSG_TYPE_GET, sizeof(kptl_rdma_msg_t)); CDEBUG(D_NETTRACE, "%s: passive GET p %d %p\n", libcfs_id2str(target), le32_to_cpu(lntmsg->msg_hdr.msg.put.ptl_index), tx); kptllnd_tx_launch(peer, tx, 0); goto out; case LNET_MSG_ACK: CDEBUG(D_NET, "LNET_MSG_ACK\n"); LASSERT (payload_nob == 0);
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -