📄 rpc_rdma.c
字号:
/* * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved. * * This software is available to you under a choice of one of two * licenses. You may choose to be licensed under the terms of the GNU * General Public License (GPL) Version 2, available from the file * COPYING in the main directory of this source tree, or the BSD-type * license below: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * * Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * Redistributions in binary form must reproduce the above * copyright notice, this list of conditions and the following * disclaimer in the documentation and/or other materials provided * with the distribution. * * Neither the name of the Network Appliance, Inc. nor the names of * its contributors may be used to endorse or promote products * derived from this software without specific prior written * permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. *//* * rpc_rdma.c * * This file contains the guts of the RPC RDMA protocol, and * does marshaling/unmarshaling, etc. It is also where interfacing * to the Linux RPC framework lives. */#include "xprt_rdma.h"#include <linux/highmem.h>#ifdef RPC_DEBUG# define RPCDBG_FACILITY RPCDBG_TRANS#endifenum rpcrdma_chunktype { rpcrdma_noch = 0, rpcrdma_readch, rpcrdma_areadch, rpcrdma_writech, rpcrdma_replych};#ifdef RPC_DEBUGstatic const char transfertypes[][12] = { "pure inline", /* no chunks */ " read chunk", /* some argument via rdma read */ "*read chunk", /* entire request via rdma read */ "write chunk", /* some result via rdma write */ "reply chunk" /* entire reply via rdma write */};#endif/* * Chunk assembly from upper layer xdr_buf. * * Prepare the passed-in xdr_buf into representation as RPC/RDMA chunk * elements. Segments are then coalesced when registered, if possible * within the selected memreg mode. * * Note, this routine is never called if the connection's memory * registration strategy is 0 (bounce buffers). */static intrpcrdma_convert_iovs(struct xdr_buf *xdrbuf, int pos, enum rpcrdma_chunktype type, struct rpcrdma_mr_seg *seg, int nsegs){ int len, n = 0, p; if (pos == 0 && xdrbuf->head[0].iov_len) { seg[n].mr_page = NULL; seg[n].mr_offset = xdrbuf->head[0].iov_base; seg[n].mr_len = xdrbuf->head[0].iov_len; ++n; } if (xdrbuf->page_len && (xdrbuf->pages[0] != NULL)) { if (n == nsegs) return 0; seg[n].mr_page = xdrbuf->pages[0]; seg[n].mr_offset = (void *)(unsigned long) xdrbuf->page_base; seg[n].mr_len = min_t(u32, PAGE_SIZE - xdrbuf->page_base, xdrbuf->page_len); len = xdrbuf->page_len - seg[n].mr_len; ++n; p = 1; while (len > 0) { if (n == nsegs) return 0; seg[n].mr_page = xdrbuf->pages[p]; seg[n].mr_offset = NULL; seg[n].mr_len = min_t(u32, PAGE_SIZE, len); len -= seg[n].mr_len; ++n; ++p; } } if (xdrbuf->tail[0].iov_len) { if (n == nsegs) return 0; seg[n].mr_page = NULL; seg[n].mr_offset = xdrbuf->tail[0].iov_base; seg[n].mr_len = xdrbuf->tail[0].iov_len; ++n; } return n;}/* * Create read/write chunk lists, and reply chunks, for RDMA * * Assume check against THRESHOLD has been done, and chunks are required. * Assume only encoding one list entry for read|write chunks. The NFSv3 * protocol is simple enough to allow this as it only has a single "bulk * result" in each procedure - complicated NFSv4 COMPOUNDs are not. (The * RDMA/Sessions NFSv4 proposal addresses this for future v4 revs.) * * When used for a single reply chunk (which is a special write * chunk used for the entire reply, rather than just the data), it * is used primarily for READDIR and READLINK which would otherwise * be severely size-limited by a small rdma inline read max. The server * response will come back as an RDMA Write, followed by a message * of type RDMA_NOMSG carrying the xid and length. As a result, reply * chunks do not provide data alignment, however they do not require * "fixup" (moving the response to the upper layer buffer) either. * * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64): * * Read chunklist (a linked list): * N elements, position P (same P for all chunks of same arg!): * 1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0 * * Write chunklist (a list of (one) counted array): * N elements: * 1 - N - HLOO - HLOO - ... - HLOO - 0 * * Reply chunk (a counted array): * N elements: * 1 - N - HLOO - HLOO - ... - HLOO */static unsigned intrpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target, struct rpcrdma_msg *headerp, enum rpcrdma_chunktype type){ struct rpcrdma_req *req = rpcr_to_rdmar(rqst); struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_task->tk_xprt); int nsegs, nchunks = 0; int pos; struct rpcrdma_mr_seg *seg = req->rl_segments; struct rpcrdma_read_chunk *cur_rchunk = NULL; struct rpcrdma_write_array *warray = NULL; struct rpcrdma_write_chunk *cur_wchunk = NULL; __be32 *iptr = headerp->rm_body.rm_chunks; if (type == rpcrdma_readch || type == rpcrdma_areadch) { /* a read chunk - server will RDMA Read our memory */ cur_rchunk = (struct rpcrdma_read_chunk *) iptr; } else { /* a write or reply chunk - server will RDMA Write our memory */ *iptr++ = xdr_zero; /* encode a NULL read chunk list */ if (type == rpcrdma_replych) *iptr++ = xdr_zero; /* a NULL write chunk list */ warray = (struct rpcrdma_write_array *) iptr; cur_wchunk = (struct rpcrdma_write_chunk *) (warray + 1); } if (type == rpcrdma_replych || type == rpcrdma_areadch) pos = 0; else pos = target->head[0].iov_len; nsegs = rpcrdma_convert_iovs(target, pos, type, seg, RPCRDMA_MAX_SEGS); if (nsegs == 0) return 0; do { /* bind/register the memory, then build chunk from result. */ int n = rpcrdma_register_external(seg, nsegs, cur_wchunk != NULL, r_xprt); if (n <= 0) goto out; if (cur_rchunk) { /* read */ cur_rchunk->rc_discrim = xdr_one; /* all read chunks have the same "position" */ cur_rchunk->rc_position = htonl(pos); cur_rchunk->rc_target.rs_handle = htonl(seg->mr_rkey); cur_rchunk->rc_target.rs_length = htonl(seg->mr_len); xdr_encode_hyper( (__be32 *)&cur_rchunk->rc_target.rs_offset, seg->mr_base); dprintk("RPC: %s: read chunk " "elem %d@0x%llx:0x%x pos %d (%s)\n", __func__, seg->mr_len, (unsigned long long)seg->mr_base, seg->mr_rkey, pos, n < nsegs ? "more" : "last"); cur_rchunk++; r_xprt->rx_stats.read_chunk_count++; } else { /* write/reply */ cur_wchunk->wc_target.rs_handle = htonl(seg->mr_rkey); cur_wchunk->wc_target.rs_length = htonl(seg->mr_len); xdr_encode_hyper( (__be32 *)&cur_wchunk->wc_target.rs_offset, seg->mr_base); dprintk("RPC: %s: %s chunk " "elem %d@0x%llx:0x%x (%s)\n", __func__, (type == rpcrdma_replych) ? "reply" : "write", seg->mr_len, (unsigned long long)seg->mr_base, seg->mr_rkey, n < nsegs ? "more" : "last"); cur_wchunk++; if (type == rpcrdma_replych) r_xprt->rx_stats.reply_chunk_count++; else r_xprt->rx_stats.write_chunk_count++; r_xprt->rx_stats.total_rdma_request += seg->mr_len; } nchunks++; seg += n; nsegs -= n; } while (nsegs); /* success. all failures return above */ req->rl_nchunks = nchunks; BUG_ON(nchunks == 0); /* * finish off header. If write, marshal discrim and nchunks. */ if (cur_rchunk) { iptr = (__be32 *) cur_rchunk; *iptr++ = xdr_zero; /* finish the read chunk list */ *iptr++ = xdr_zero; /* encode a NULL write chunk list */ *iptr++ = xdr_zero; /* encode a NULL reply chunk */ } else { warray->wc_discrim = xdr_one; warray->wc_nchunks = htonl(nchunks); iptr = (__be32 *) cur_wchunk; if (type == rpcrdma_writech) { *iptr++ = xdr_zero; /* finish the write chunk list */ *iptr++ = xdr_zero; /* encode a NULL reply chunk */ } } /* * Return header size. */ return (unsigned char *)iptr - (unsigned char *)headerp;out: for (pos = 0; nchunks--;) pos += rpcrdma_deregister_external( &req->rl_segments[pos], r_xprt, NULL); return 0;}/* * Copy write data inline. * This function is used for "small" requests. Data which is passed * to RPC via iovecs (or page list) is copied directly into the * pre-registered memory buffer for this request. For small amounts * of data, this is efficient. The cutoff value is tunable. */static intrpcrdma_inline_pullup(struct rpc_rqst *rqst, int pad){ int i, npages, curlen; int copy_len; unsigned char *srcp, *destp; struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt); destp = rqst->rq_svec[0].iov_base; curlen = rqst->rq_svec[0].iov_len; destp += curlen; /* * Do optional padding where it makes sense. Alignment of write * payload can help the server, if our setting is accurate. */ pad -= (curlen + 36/*sizeof(struct rpcrdma_msg_padded)*/); if (pad < 0 || rqst->rq_slen - curlen < RPCRDMA_INLINE_PAD_THRESH) pad = 0; /* don't pad this request */ dprintk("RPC: %s: pad %d destp 0x%p len %d hdrlen %d\n", __func__, pad, destp, rqst->rq_slen, curlen); copy_len = rqst->rq_snd_buf.page_len; r_xprt->rx_stats.pullup_copy_count += copy_len; npages = PAGE_ALIGN(rqst->rq_snd_buf.page_base+copy_len) >> PAGE_SHIFT; for (i = 0; copy_len && i < npages; i++) { if (i == 0) curlen = PAGE_SIZE - rqst->rq_snd_buf.page_base; else curlen = PAGE_SIZE; if (curlen > copy_len) curlen = copy_len; dprintk("RPC: %s: page %d destp 0x%p len %d curlen %d\n", __func__, i, destp, copy_len, curlen); srcp = kmap_atomic(rqst->rq_snd_buf.pages[i], KM_SKB_SUNRPC_DATA); if (i == 0) memcpy(destp, srcp+rqst->rq_snd_buf.page_base, curlen); else memcpy(destp, srcp, curlen); kunmap_atomic(srcp, KM_SKB_SUNRPC_DATA); rqst->rq_svec[0].iov_len += curlen; destp += curlen; copy_len -= curlen; } if (rqst->rq_snd_buf.tail[0].iov_len) { curlen = rqst->rq_snd_buf.tail[0].iov_len; if (destp != rqst->rq_snd_buf.tail[0].iov_base) { memcpy(destp, rqst->rq_snd_buf.tail[0].iov_base, curlen); r_xprt->rx_stats.pullup_copy_count += curlen; } dprintk("RPC: %s: tail destp 0x%p len %d curlen %d\n", __func__, destp, copy_len, curlen); rqst->rq_svec[0].iov_len += curlen; } /* header now contains entire send message */ return pad;}/* * Marshal a request: the primary job of this routine is to choose * the transfer modes. See comments below. * * Uses multiple RDMA IOVs for a request: * [0] -- RPC RDMA header, which uses memory from the *start* of the * preregistered buffer that already holds the RPC data in * its middle. * [1] -- the RPC header/data, marshaled by RPC and the NFS protocol. * [2] -- optional padding. * [3] -- if padded, header only in [1] and data here. */intrpcrdma_marshal_req(struct rpc_rqst *rqst){ struct rpc_xprt *xprt = rqst->rq_task->tk_xprt; struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); struct rpcrdma_req *req = rpcr_to_rdmar(rqst); char *base; size_t hdrlen, rpclen, padlen; enum rpcrdma_chunktype rtype, wtype; struct rpcrdma_msg *headerp; /* * rpclen gets amount of data in first buffer, which is the * pre-registered buffer. */ base = rqst->rq_svec[0].iov_base; rpclen = rqst->rq_svec[0].iov_len; /* build RDMA header in private area at front */ headerp = (struct rpcrdma_msg *) req->rl_base; /* don't htonl XID, it's already done in request */ headerp->rm_xid = rqst->rq_xid; headerp->rm_vers = xdr_one; headerp->rm_credit = htonl(r_xprt->rx_buf.rb_max_requests); headerp->rm_type = __constant_htonl(RDMA_MSG); /* * Chunks needed for results? * * o If the expected result is under the inline threshold, all ops * return as inline (but see later). * o Large non-read ops return as a single reply chunk. * o Large read ops return data as write chunk(s), header as inline. * * Note: the NFS code sending down multiple result segments implies * the op is one of read, readdir[plus], readlink or NFSv4 getacl. */ /* * This code can handle read chunks, write chunks OR reply * chunks -- only one type. If the request is too big to fit * inline, then we will choose read chunks. If the request is * a READ, then use write chunks to separate the file data * into pages; otherwise use reply chunks. */ if (rqst->rq_rcv_buf.buflen <= RPCRDMA_INLINE_READ_THRESHOLD(rqst)) wtype = rpcrdma_noch; else if (rqst->rq_rcv_buf.page_len == 0) wtype = rpcrdma_replych; else if (rqst->rq_rcv_buf.flags & XDRBUF_READ) wtype = rpcrdma_writech; else wtype = rpcrdma_replych; /* * Chunks needed for arguments? * * o If the total request is under the inline threshold, all ops * are sent as inline. * o Large non-write ops are sent with the entire message as a * single read chunk (protocol 0-position special case). * o Large write ops transmit data as read chunk(s), header as * inline. * * Note: the NFS code sending down multiple argument segments * implies the op is a write. * TBD check NFSv4 setacl */ if (rqst->rq_snd_buf.len <= RPCRDMA_INLINE_WRITE_THRESHOLD(rqst)) rtype = rpcrdma_noch; else if (rqst->rq_snd_buf.page_len == 0) rtype = rpcrdma_areadch; else
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -