⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 xprt.c

📁 Linux内核源代码 为压缩文件 是<<Linux内核>>一书中的源代码
💻 C
📖 第 1 页 / 共 3 页
字号:
/* *  linux/net/sunrpc/xprt.c * *  This is a generic RPC call interface supporting congestion avoidance, *  and asynchronous calls. * *  The interface works like this: * *  -	When a process places a call, it allocates a request slot if *	one is available. Otherwise, it sleeps on the backlog queue *	(xprt_reserve). *  -	Next, the caller puts together the RPC message, stuffs it into *	the request struct, and calls xprt_call(). *  -	xprt_call transmits the message and installs the caller on the *	socket's wait list. At the same time, it installs a timer that *	is run after the packet's timeout has expired. *  -	When a packet arrives, the data_ready handler walks the list of *	pending requests for that socket. If a matching XID is found, the *	caller is woken up, and the timer removed. *  -	When no reply arrives within the timeout interval, the timer is *	fired by the kernel and runs xprt_timer(). It either adjusts the *	timeout values (minor timeout) or wakes up the caller with a status *	of -ETIMEDOUT. *  -	When the caller receives a notification from RPC that a reply arrived, *	it should release the RPC slot, and process the reply. *	If the call timed out, it may choose to retry the operation by *	adjusting the initial timeout value, and simply calling rpc_call *	again. * *  Support for async RPC is done through a set of RPC-specific scheduling *  primitives that `transparently' work for processes as well as async *  tasks that rely on callbacks. * *  Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de> * *  TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com> *  TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com> *  TCP NFS related read + write fixes *   (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie> * *  Rewrite of larges part of the code in order to stabilize TCP stuff. *  Fix behaviour when socket buffer is full. *   (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no> */#define __KERNEL_SYSCALLS__#include <linux/version.h>#include <linux/types.h>#include <linux/malloc.h>#include <linux/capability.h>#include <linux/sched.h>#include <linux/errno.h>#include <linux/socket.h>#include <linux/in.h>#include <linux/net.h>#include <linux/mm.h>#include <linux/udp.h>#include <linux/unistd.h>#include <linux/sunrpc/clnt.h>#include <linux/file.h>#include <net/sock.h>#include <net/checksum.h>#include <net/udp.h>#include <asm/uaccess.h>/* Following value should be > 32k + RPC overhead */#define XPRT_MIN_WRITE_SPACE (35000 + SOCK_MIN_WRITE_SPACE)extern spinlock_t rpc_queue_lock;/* * Local variables *//* Spinlock for critical sections in the code. */spinlock_t xprt_sock_lock = SPIN_LOCK_UNLOCKED;spinlock_t xprt_lock = SPIN_LOCK_UNLOCKED;#ifdef RPC_DEBUG# undef  RPC_DEBUG_DATA# define RPCDBG_FACILITY	RPCDBG_XPRT#endif#ifndef MAX# define MAX(a, b)	((a) > (b)? (a) : (b))# define MIN(a, b)	((a) < (b)? (a) : (b))#endif/* * Local functions */static void	xprt_request_init(struct rpc_task *, struct rpc_xprt *);static void	do_xprt_transmit(struct rpc_task *);static void	xprt_reserve_status(struct rpc_task *task);static void	xprt_disconnect(struct rpc_xprt *);static void	xprt_reconn_status(struct rpc_task *task);static struct socket *xprt_create_socket(int, struct rpc_timeout *);static int	xprt_bind_socket(struct rpc_xprt *, struct socket *);static void	xprt_remove_pending(struct rpc_xprt *);#ifdef RPC_DEBUG_DATA/* * Print the buffer contents (first 128 bytes only--just enough for * diropres return). */static voidxprt_pktdump(char *msg, u32 *packet, unsigned int count){	u8	*buf = (u8 *) packet;	int	j;	dprintk("RPC:      %s\n", msg);	for (j = 0; j < count && j < 128; j += 4) {		if (!(j & 31)) {			if (j)				dprintk("\n");			dprintk("0x%04x ", j);		}		dprintk("%02x%02x%02x%02x ",			buf[j], buf[j+1], buf[j+2], buf[j+3]);	}	dprintk("\n");}#elsestatic inline voidxprt_pktdump(char *msg, u32 *packet, unsigned int count){	/* NOP */}#endif/* * Look up RPC transport given an INET socket */static inline struct rpc_xprt *xprt_from_sock(struct sock *sk){	return (struct rpc_xprt *) sk->user_data;}/* *	Adjust the iovec to move on 'n' bytes */ extern inline voidxprt_move_iov(struct msghdr *msg, struct iovec *niv, unsigned amount){	struct iovec *iv=msg->msg_iov;	int i;		/*	 *	Eat any sent iovecs	 */	while (iv->iov_len <= amount) {		amount -= iv->iov_len;		iv++;		msg->msg_iovlen--;	}	/*	 *	And chew down the partial one	 */	niv[0].iov_len = iv->iov_len-amount;	niv[0].iov_base =((unsigned char *)iv->iov_base)+amount;	iv++;	/*	 *	And copy any others	 */	for(i = 1; i < msg->msg_iovlen; i++)		niv[i]=*iv++;	msg->msg_iov=niv;}/* * Write data to socket. */static inline intxprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req){	struct socket	*sock = xprt->sock;	struct msghdr	msg;	mm_segment_t	oldfs;	int		result;	int		slen = req->rq_slen - req->rq_bytes_sent;	struct iovec	niv[MAX_IOVEC];	if (slen <= 0)		return 0;	if (!sock)		return -ENOTCONN;	xprt_pktdump("packet data:",				req->rq_svec->iov_base,				req->rq_svec->iov_len);	msg.msg_flags   = MSG_DONTWAIT|MSG_NOSIGNAL;	msg.msg_iov	= req->rq_svec;	msg.msg_iovlen	= req->rq_snr;	msg.msg_name	= (struct sockaddr *) &xprt->addr;	msg.msg_namelen = sizeof(xprt->addr);	msg.msg_control = NULL;	msg.msg_controllen = 0;	/* Dont repeat bytes */	if (req->rq_bytes_sent)		xprt_move_iov(&msg, niv, req->rq_bytes_sent);	oldfs = get_fs(); set_fs(get_ds());	result = sock_sendmsg(sock, &msg, slen);	set_fs(oldfs);	dprintk("RPC:      xprt_sendmsg(%d) = %d\n", slen, result);	if (result >= 0)		return result;	switch (result) {	case -ECONNREFUSED:		/* When the server has died, an ICMP port unreachable message		 * prompts ECONNREFUSED.		 */		break;	case -EAGAIN:		if (test_bit(SOCK_NOSPACE, &sock->flags))			result = -ENOMEM;		break;	case -ENOTCONN:	case -EPIPE:		/* connection broken */		if (xprt->stream)			result = -ENOTCONN;		break;	default:		printk(KERN_NOTICE "RPC: sendmsg returned error %d\n", -result);	}	return result;}/* * Read data from socket */static intxprt_recvmsg(struct rpc_xprt *xprt, struct iovec *iov, int nr, unsigned len, unsigned shift){	struct socket	*sock = xprt->sock;	struct msghdr	msg;	mm_segment_t	oldfs;	struct iovec	niv[MAX_IOVEC];	int		result;	if (!sock)		return -ENOTCONN;	msg.msg_flags   = MSG_DONTWAIT|MSG_NOSIGNAL;	msg.msg_iov	= iov;	msg.msg_iovlen	= nr;	msg.msg_name	= NULL;	msg.msg_namelen = 0;	msg.msg_control = NULL;	msg.msg_controllen = 0;	/* Adjust the iovec if we've already filled it */	if (shift)		xprt_move_iov(&msg, niv, shift);	oldfs = get_fs(); set_fs(get_ds());	result = sock_recvmsg(sock, &msg, len, MSG_DONTWAIT);	set_fs(oldfs);	dprintk("RPC:      xprt_recvmsg(iov %p, len %d) = %d\n",						iov, len, result);	return result;}/* * Adjust RPC congestion window * We use a time-smoothed congestion estimator to avoid heavy oscillation. */static voidxprt_adjust_cwnd(struct rpc_xprt *xprt, int result){	unsigned long	cwnd;	if (xprt->nocong)		return;	spin_lock_bh(&xprt_sock_lock);	cwnd = xprt->cwnd;	if (result >= 0) {		if (xprt->cong < cwnd || time_before(jiffies, xprt->congtime))			goto out;		/* The (cwnd >> 1) term makes sure		 * the result gets rounded properly. */		cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd;		if (cwnd > RPC_MAXCWND)			cwnd = RPC_MAXCWND;		else			pprintk("RPC: %lu %ld cwnd\n", jiffies, cwnd);		xprt->congtime = jiffies + ((cwnd * HZ) << 2) / RPC_CWNDSCALE;		dprintk("RPC:      cong %08lx, cwnd was %08lx, now %08lx, "			"time %ld ms\n", xprt->cong, xprt->cwnd, cwnd,			(xprt->congtime-jiffies)*1000/HZ);	} else if (result == -ETIMEDOUT) {		if ((cwnd >>= 1) < RPC_CWNDSCALE)			cwnd = RPC_CWNDSCALE;		xprt->congtime = jiffies + ((cwnd * HZ) << 3) / RPC_CWNDSCALE;		dprintk("RPC:      cong %ld, cwnd was %ld, now %ld, "			"time %ld ms\n", xprt->cong, xprt->cwnd, cwnd,			(xprt->congtime-jiffies)*1000/HZ);		pprintk("RPC: %lu %ld cwnd\n", jiffies, cwnd);	}	xprt->cwnd = cwnd; out:	spin_unlock_bh(&xprt_sock_lock);}/* * Adjust timeout values etc for next retransmit */intxprt_adjust_timeout(struct rpc_timeout *to){	if (to->to_retries > 0) {		if (to->to_exponential)			to->to_current <<= 1;		else			to->to_current += to->to_increment;		if (to->to_maxval && to->to_current >= to->to_maxval)			to->to_current = to->to_maxval;	} else {		if (to->to_exponential)			to->to_initval <<= 1;		else			to->to_initval += to->to_increment;		if (to->to_maxval && to->to_initval >= to->to_maxval)			to->to_initval = to->to_maxval;		to->to_current = to->to_initval;	}	if (!to->to_current) {		printk(KERN_WARNING "xprt_adjust_timeout: to_current = 0!\n");		to->to_current = 5 * HZ;	}	pprintk("RPC: %lu %s\n", jiffies,			to->to_retries? "retrans" : "timeout");	return to->to_retries-- > 0;}/* * Close down a transport socket */static voidxprt_close(struct rpc_xprt *xprt){	struct socket	*sock = xprt->sock;	struct sock	*sk = xprt->inet;	if (!sk)		return;	xprt->inet = NULL;	xprt->sock = NULL;	sk->user_data    = NULL;	sk->data_ready   = xprt->old_data_ready;	sk->state_change = xprt->old_state_change;	sk->write_space  = xprt->old_write_space;	xprt_disconnect(xprt);	sk->no_check	 = 0;	sock_release(sock);	/*	 *	TCP doesnt require the rpciod now - other things may	 *	but rpciod handles that not us.	 */	if(xprt->stream)		rpciod_down();}/* * Mark a transport as disconnected */static voidxprt_disconnect(struct rpc_xprt *xprt){	dprintk("RPC:      disconnected transport %p\n", xprt);	xprt_clear_connected(xprt);	xprt_remove_pending(xprt);	rpc_wake_up_status(&xprt->pending, -ENOTCONN);}/* * Reconnect a broken TCP connection. */voidxprt_reconnect(struct rpc_task *task){	struct rpc_xprt	*xprt = task->tk_xprt;	struct socket	*sock = xprt->sock;	struct sock	*inet = xprt->inet;	int		status;	dprintk("RPC: %4d xprt_reconnect %p connected %d\n",				task->tk_pid, xprt, xprt_connected(xprt));	if (xprt->shutdown)		return;	if (!xprt->stream)		return;	if (!xprt->addr.sin_port) {		task->tk_status = -EIO;		return;	}	spin_lock(&xprt_lock);	if (xprt->connecting) {		task->tk_timeout = 0;		rpc_sleep_on(&xprt->reconn, task, NULL, NULL);		spin_unlock(&xprt_lock);		return;	}	xprt->connecting = 1;	spin_unlock(&xprt_lock);	status = -ENOTCONN;	if (!inet) {		/* Create an unconnected socket */		if (!(sock = xprt_create_socket(xprt->prot, &xprt->timeout)))			goto defer;		xprt_bind_socket(xprt, sock);		inet = sock->sk;	}	xprt_disconnect(xprt);	/* Reset TCP record info */	xprt->tcp_offset = 0;	xprt->tcp_copied = 0;	xprt->tcp_more = 0;	/* Now connect it asynchronously. */	dprintk("RPC: %4d connecting new socket\n", task->tk_pid);	status = sock->ops->connect(sock, (struct sockaddr *) &xprt->addr,				sizeof(xprt->addr), O_NONBLOCK);	if (status < 0) {		switch (status) {		case -EALREADY:		case -EINPROGRESS:			status = 0;			break;		case -EISCONN:		case -EPIPE:			status = 0;			xprt_close(xprt);			goto defer;		default:			printk("RPC: TCP connect error %d!\n", -status);			xprt_close(xprt);			goto defer;		}		dprintk("RPC: %4d connect status %d connected %d\n",				task->tk_pid, status, xprt_connected(xprt));		spin_lock_bh(&xprt_sock_lock);		if (!xprt_connected(xprt)) {			task->tk_timeout = xprt->timeout.to_maxval;			rpc_sleep_on(&xprt->reconn, task, xprt_reconn_status, NULL);			spin_unlock_bh(&xprt_sock_lock);			return;		}		spin_unlock_bh(&xprt_sock_lock);	}defer:	spin_lock(&xprt_lock);	xprt->connecting = 0;	if (status < 0) {		rpc_delay(task, 5*HZ);		task->tk_status = -ENOTCONN;	}	rpc_wake_up(&xprt->reconn);	spin_unlock(&xprt_lock);}/* * Reconnect timeout. We just mark the transport as not being in the * process of reconnecting, and leave the rest to the upper layers. */static voidxprt_reconn_status(struct rpc_task *task){	struct rpc_xprt	*xprt = task->tk_xprt;	dprintk("RPC: %4d xprt_reconn_timeout %d\n",				task->tk_pid, task->tk_status);	spin_lock(&xprt_lock);	xprt->connecting = 0;	rpc_wake_up(&xprt->reconn);	spin_unlock(&xprt_lock);}/* * Look up the RPC request corresponding to a reply, and then lock it. */static inline struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid){	struct rpc_task	*head, *task;	struct rpc_rqst	*req;	int		safe = 0;	spin_lock_bh(&rpc_queue_lock);	if ((head = xprt->pending.task) != NULL) {		task = head;		do {			if ((req = task->tk_rqstp) && req->rq_xid == xid)				goto out;			task = task->tk_next;			if (++safe > 100) {				printk("xprt_lookup_rqst: loop in Q!\n");				goto out_bad;			}		} while (task != head);	}	dprintk("RPC:      unknown XID %08x in reply.\n", xid); out_bad:	req = NULL; out:	if (req && !__rpc_lock_task(req->rq_task))		req = NULL;	spin_unlock_bh(&rpc_queue_lock);	return req;}/* * Complete reply received. * The TCP code relies on us to remove the request from xprt->pending. */static inline voidxprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied){	struct rpc_task	*task = req->rq_task;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -