⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 rpc_rdma.c

📁 linux 内核源代码
💻 C
📖 第 1 页 / 共 2 页
字号:
/* * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved. * * This software is available to you under a choice of one of two * licenses.  You may choose to be licensed under the terms of the GNU * General Public License (GPL) Version 2, available from the file * COPYING in the main directory of this source tree, or the BSD-type * license below: * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * *      Redistributions of source code must retain the above copyright *      notice, this list of conditions and the following disclaimer. * *      Redistributions in binary form must reproduce the above *      copyright notice, this list of conditions and the following *      disclaimer in the documentation and/or other materials provided *      with the distribution. * *      Neither the name of the Network Appliance, Inc. nor the names of *      its contributors may be used to endorse or promote products *      derived from this software without specific prior written *      permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. *//* * rpc_rdma.c * * This file contains the guts of the RPC RDMA protocol, and * does marshaling/unmarshaling, etc. It is also where interfacing * to the Linux RPC framework lives. */#include "xprt_rdma.h"#include <linux/highmem.h>#ifdef RPC_DEBUG# define RPCDBG_FACILITY	RPCDBG_TRANS#endifenum rpcrdma_chunktype {	rpcrdma_noch = 0,	rpcrdma_readch,	rpcrdma_areadch,	rpcrdma_writech,	rpcrdma_replych};#ifdef RPC_DEBUGstatic const char transfertypes[][12] = {	"pure inline",	/* no chunks */	" read chunk",	/* some argument via rdma read */	"*read chunk",	/* entire request via rdma read */	"write chunk",	/* some result via rdma write */	"reply chunk"	/* entire reply via rdma write */};#endif/* * Chunk assembly from upper layer xdr_buf. * * Prepare the passed-in xdr_buf into representation as RPC/RDMA chunk * elements. Segments are then coalesced when registered, if possible * within the selected memreg mode. * * Note, this routine is never called if the connection's memory * registration strategy is 0 (bounce buffers). */static intrpcrdma_convert_iovs(struct xdr_buf *xdrbuf, int pos,	enum rpcrdma_chunktype type, struct rpcrdma_mr_seg *seg, int nsegs){	int len, n = 0, p;	if (pos == 0 && xdrbuf->head[0].iov_len) {		seg[n].mr_page = NULL;		seg[n].mr_offset = xdrbuf->head[0].iov_base;		seg[n].mr_len = xdrbuf->head[0].iov_len;		++n;	}	if (xdrbuf->page_len && (xdrbuf->pages[0] != NULL)) {		if (n == nsegs)			return 0;		seg[n].mr_page = xdrbuf->pages[0];		seg[n].mr_offset = (void *)(unsigned long) xdrbuf->page_base;		seg[n].mr_len = min_t(u32,			PAGE_SIZE - xdrbuf->page_base, xdrbuf->page_len);		len = xdrbuf->page_len - seg[n].mr_len;		++n;		p = 1;		while (len > 0) {			if (n == nsegs)				return 0;			seg[n].mr_page = xdrbuf->pages[p];			seg[n].mr_offset = NULL;			seg[n].mr_len = min_t(u32, PAGE_SIZE, len);			len -= seg[n].mr_len;			++n;			++p;		}	}	if (xdrbuf->tail[0].iov_len) {		if (n == nsegs)			return 0;		seg[n].mr_page = NULL;		seg[n].mr_offset = xdrbuf->tail[0].iov_base;		seg[n].mr_len = xdrbuf->tail[0].iov_len;		++n;	}	return n;}/* * Create read/write chunk lists, and reply chunks, for RDMA * *   Assume check against THRESHOLD has been done, and chunks are required. *   Assume only encoding one list entry for read|write chunks. The NFSv3 *     protocol is simple enough to allow this as it only has a single "bulk *     result" in each procedure - complicated NFSv4 COMPOUNDs are not. (The *     RDMA/Sessions NFSv4 proposal addresses this for future v4 revs.) * * When used for a single reply chunk (which is a special write * chunk used for the entire reply, rather than just the data), it * is used primarily for READDIR and READLINK which would otherwise * be severely size-limited by a small rdma inline read max. The server * response will come back as an RDMA Write, followed by a message * of type RDMA_NOMSG carrying the xid and length. As a result, reply * chunks do not provide data alignment, however they do not require * "fixup" (moving the response to the upper layer buffer) either. * * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64): * *  Read chunklist (a linked list): *   N elements, position P (same P for all chunks of same arg!): *    1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0 * *  Write chunklist (a list of (one) counted array): *   N elements: *    1 - N - HLOO - HLOO - ... - HLOO - 0 * *  Reply chunk (a counted array): *   N elements: *    1 - N - HLOO - HLOO - ... - HLOO */static unsigned intrpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,		struct rpcrdma_msg *headerp, enum rpcrdma_chunktype type){	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_task->tk_xprt);	int nsegs, nchunks = 0;	int pos;	struct rpcrdma_mr_seg *seg = req->rl_segments;	struct rpcrdma_read_chunk *cur_rchunk = NULL;	struct rpcrdma_write_array *warray = NULL;	struct rpcrdma_write_chunk *cur_wchunk = NULL;	__be32 *iptr = headerp->rm_body.rm_chunks;	if (type == rpcrdma_readch || type == rpcrdma_areadch) {		/* a read chunk - server will RDMA Read our memory */		cur_rchunk = (struct rpcrdma_read_chunk *) iptr;	} else {		/* a write or reply chunk - server will RDMA Write our memory */		*iptr++ = xdr_zero;	/* encode a NULL read chunk list */		if (type == rpcrdma_replych)			*iptr++ = xdr_zero;	/* a NULL write chunk list */		warray = (struct rpcrdma_write_array *) iptr;		cur_wchunk = (struct rpcrdma_write_chunk *) (warray + 1);	}	if (type == rpcrdma_replych || type == rpcrdma_areadch)		pos = 0;	else		pos = target->head[0].iov_len;	nsegs = rpcrdma_convert_iovs(target, pos, type, seg, RPCRDMA_MAX_SEGS);	if (nsegs == 0)		return 0;	do {		/* bind/register the memory, then build chunk from result. */		int n = rpcrdma_register_external(seg, nsegs,						cur_wchunk != NULL, r_xprt);		if (n <= 0)			goto out;		if (cur_rchunk) {	/* read */			cur_rchunk->rc_discrim = xdr_one;			/* all read chunks have the same "position" */			cur_rchunk->rc_position = htonl(pos);			cur_rchunk->rc_target.rs_handle = htonl(seg->mr_rkey);			cur_rchunk->rc_target.rs_length = htonl(seg->mr_len);			xdr_encode_hyper(					(__be32 *)&cur_rchunk->rc_target.rs_offset,					seg->mr_base);			dprintk("RPC:       %s: read chunk "				"elem %d@0x%llx:0x%x pos %d (%s)\n", __func__,				seg->mr_len, (unsigned long long)seg->mr_base,				seg->mr_rkey, pos, n < nsegs ? "more" : "last");			cur_rchunk++;			r_xprt->rx_stats.read_chunk_count++;		} else {		/* write/reply */			cur_wchunk->wc_target.rs_handle = htonl(seg->mr_rkey);			cur_wchunk->wc_target.rs_length = htonl(seg->mr_len);			xdr_encode_hyper(					(__be32 *)&cur_wchunk->wc_target.rs_offset,					seg->mr_base);			dprintk("RPC:       %s: %s chunk "				"elem %d@0x%llx:0x%x (%s)\n", __func__,				(type == rpcrdma_replych) ? "reply" : "write",				seg->mr_len, (unsigned long long)seg->mr_base,				seg->mr_rkey, n < nsegs ? "more" : "last");			cur_wchunk++;			if (type == rpcrdma_replych)				r_xprt->rx_stats.reply_chunk_count++;			else				r_xprt->rx_stats.write_chunk_count++;			r_xprt->rx_stats.total_rdma_request += seg->mr_len;		}		nchunks++;		seg   += n;		nsegs -= n;	} while (nsegs);	/* success. all failures return above */	req->rl_nchunks = nchunks;	BUG_ON(nchunks == 0);	/*	 * finish off header. If write, marshal discrim and nchunks.	 */	if (cur_rchunk) {		iptr = (__be32 *) cur_rchunk;		*iptr++ = xdr_zero;	/* finish the read chunk list */		*iptr++ = xdr_zero;	/* encode a NULL write chunk list */		*iptr++ = xdr_zero;	/* encode a NULL reply chunk */	} else {		warray->wc_discrim = xdr_one;		warray->wc_nchunks = htonl(nchunks);		iptr = (__be32 *) cur_wchunk;		if (type == rpcrdma_writech) {			*iptr++ = xdr_zero; /* finish the write chunk list */			*iptr++ = xdr_zero; /* encode a NULL reply chunk */		}	}	/*	 * Return header size.	 */	return (unsigned char *)iptr - (unsigned char *)headerp;out:	for (pos = 0; nchunks--;)		pos += rpcrdma_deregister_external(				&req->rl_segments[pos], r_xprt, NULL);	return 0;}/* * Copy write data inline. * This function is used for "small" requests. Data which is passed * to RPC via iovecs (or page list) is copied directly into the * pre-registered memory buffer for this request. For small amounts * of data, this is efficient. The cutoff value is tunable. */static intrpcrdma_inline_pullup(struct rpc_rqst *rqst, int pad){	int i, npages, curlen;	int copy_len;	unsigned char *srcp, *destp;	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);	destp = rqst->rq_svec[0].iov_base;	curlen = rqst->rq_svec[0].iov_len;	destp += curlen;	/*	 * Do optional padding where it makes sense. Alignment of write	 * payload can help the server, if our setting is accurate.	 */	pad -= (curlen + 36/*sizeof(struct rpcrdma_msg_padded)*/);	if (pad < 0 || rqst->rq_slen - curlen < RPCRDMA_INLINE_PAD_THRESH)		pad = 0;	/* don't pad this request */	dprintk("RPC:       %s: pad %d destp 0x%p len %d hdrlen %d\n",		__func__, pad, destp, rqst->rq_slen, curlen);	copy_len = rqst->rq_snd_buf.page_len;	r_xprt->rx_stats.pullup_copy_count += copy_len;	npages = PAGE_ALIGN(rqst->rq_snd_buf.page_base+copy_len) >> PAGE_SHIFT;	for (i = 0; copy_len && i < npages; i++) {		if (i == 0)			curlen = PAGE_SIZE - rqst->rq_snd_buf.page_base;		else			curlen = PAGE_SIZE;		if (curlen > copy_len)			curlen = copy_len;		dprintk("RPC:       %s: page %d destp 0x%p len %d curlen %d\n",			__func__, i, destp, copy_len, curlen);		srcp = kmap_atomic(rqst->rq_snd_buf.pages[i],					KM_SKB_SUNRPC_DATA);		if (i == 0)			memcpy(destp, srcp+rqst->rq_snd_buf.page_base, curlen);		else			memcpy(destp, srcp, curlen);		kunmap_atomic(srcp, KM_SKB_SUNRPC_DATA);		rqst->rq_svec[0].iov_len += curlen;		destp += curlen;		copy_len -= curlen;	}	if (rqst->rq_snd_buf.tail[0].iov_len) {		curlen = rqst->rq_snd_buf.tail[0].iov_len;		if (destp != rqst->rq_snd_buf.tail[0].iov_base) {			memcpy(destp,				rqst->rq_snd_buf.tail[0].iov_base, curlen);			r_xprt->rx_stats.pullup_copy_count += curlen;		}		dprintk("RPC:       %s: tail destp 0x%p len %d curlen %d\n",			__func__, destp, copy_len, curlen);		rqst->rq_svec[0].iov_len += curlen;	}	/* header now contains entire send message */	return pad;}/* * Marshal a request: the primary job of this routine is to choose * the transfer modes. See comments below. * * Uses multiple RDMA IOVs for a request: *  [0] -- RPC RDMA header, which uses memory from the *start* of the *         preregistered buffer that already holds the RPC data in *         its middle. *  [1] -- the RPC header/data, marshaled by RPC and the NFS protocol. *  [2] -- optional padding. *  [3] -- if padded, header only in [1] and data here. */intrpcrdma_marshal_req(struct rpc_rqst *rqst){	struct rpc_xprt *xprt = rqst->rq_task->tk_xprt;	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);	char *base;	size_t hdrlen, rpclen, padlen;	enum rpcrdma_chunktype rtype, wtype;	struct rpcrdma_msg *headerp;	/*	 * rpclen gets amount of data in first buffer, which is the	 * pre-registered buffer.	 */	base = rqst->rq_svec[0].iov_base;	rpclen = rqst->rq_svec[0].iov_len;	/* build RDMA header in private area at front */	headerp = (struct rpcrdma_msg *) req->rl_base;	/* don't htonl XID, it's already done in request */	headerp->rm_xid = rqst->rq_xid;	headerp->rm_vers = xdr_one;	headerp->rm_credit = htonl(r_xprt->rx_buf.rb_max_requests);	headerp->rm_type = __constant_htonl(RDMA_MSG);	/*	 * Chunks needed for results?	 *	 * o If the expected result is under the inline threshold, all ops	 *   return as inline (but see later).	 * o Large non-read ops return as a single reply chunk.	 * o Large read ops return data as write chunk(s), header as inline.	 *	 * Note: the NFS code sending down multiple result segments implies	 * the op is one of read, readdir[plus], readlink or NFSv4 getacl.	 */	/*	 * This code can handle read chunks, write chunks OR reply	 * chunks -- only one type. If the request is too big to fit	 * inline, then we will choose read chunks. If the request is	 * a READ, then use write chunks to separate the file data	 * into pages; otherwise use reply chunks.	 */	if (rqst->rq_rcv_buf.buflen <= RPCRDMA_INLINE_READ_THRESHOLD(rqst))		wtype = rpcrdma_noch;	else if (rqst->rq_rcv_buf.page_len == 0)		wtype = rpcrdma_replych;	else if (rqst->rq_rcv_buf.flags & XDRBUF_READ)		wtype = rpcrdma_writech;	else		wtype = rpcrdma_replych;	/*	 * Chunks needed for arguments?	 *	 * o If the total request is under the inline threshold, all ops	 *   are sent as inline.	 * o Large non-write ops are sent with the entire message as a	 *   single read chunk (protocol 0-position special case).	 * o Large write ops transmit data as read chunk(s), header as	 *   inline.	 *	 * Note: the NFS code sending down multiple argument segments	 * implies the op is a write.	 * TBD check NFSv4 setacl	 */	if (rqst->rq_snd_buf.len <= RPCRDMA_INLINE_WRITE_THRESHOLD(rqst))		rtype = rpcrdma_noch;	else if (rqst->rq_snd_buf.page_len == 0)		rtype = rpcrdma_areadch;	else

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -