⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 xprt.c

📁 Linux内核源代码 为压缩文件 是<<Linux内核>>一书中的源代码
💻 C
📖 第 1 页 / 共 3 页
字号:
	/* Adjust congestion window */	xprt_adjust_cwnd(xprt, copied);#ifdef RPC_PROFILE	/* Profile only reads for now */	if (copied > 1024) {		static unsigned long	nextstat = 0;		static unsigned long	pkt_rtt = 0, pkt_len = 0, pkt_cnt = 0;		pkt_cnt++;		pkt_len += req->rq_slen + copied;		pkt_rtt += jiffies - req->rq_xtime;		if (time_before(nextstat, jiffies)) {			printk("RPC: %lu %ld cwnd\n", jiffies, xprt->cwnd);			printk("RPC: %ld %ld %ld %ld stat\n",					jiffies, pkt_cnt, pkt_len, pkt_rtt);			pkt_rtt = pkt_len = pkt_cnt = 0;			nextstat = jiffies + 5 * HZ;		}	}#endif	dprintk("RPC: %4d has input (%d bytes)\n", task->tk_pid, copied);	task->tk_status = copied;	req->rq_received = 1;	/* ... and wake up the process. */	rpc_wake_up_task(task);	return;}/* * We have set things up such that we perform the checksum of the UDP * packet in parallel with the copies into the RPC client iovec.  -DaveM */static int csum_partial_copy_to_page_cache(struct iovec *iov,					   struct sk_buff *skb,					   int copied){	__u8 *pkt_data = skb->h.raw + sizeof(struct udphdr);	__u8 *cur_ptr = iov->iov_base;	__kernel_size_t cur_len = iov->iov_len;	unsigned int csum = skb->csum;	int need_csum = (skb->ip_summed != CHECKSUM_UNNECESSARY);	int slack = skb->len - copied - sizeof(struct udphdr);	if (need_csum)		csum = csum_partial(skb->h.raw, sizeof(struct udphdr), csum);	while (copied > 0) {		if (cur_len) {			int to_move = cur_len;			if (to_move > copied)				to_move = copied;			if (need_csum)				csum = csum_partial_copy_nocheck(pkt_data, cur_ptr,								 to_move, csum);			else				memcpy(cur_ptr, pkt_data, to_move);			pkt_data += to_move;			copied -= to_move;			cur_ptr += to_move;			cur_len -= to_move;		}		if (cur_len <= 0) {			iov++;			cur_len = iov->iov_len;			cur_ptr = iov->iov_base;		}	}	if (need_csum) {		if (slack > 0)			csum = csum_partial(pkt_data, slack, csum);		if ((unsigned short)csum_fold(csum))			return -1;	}	return 0;}/* * Input handler for RPC replies. Called from a bottom half and hence * atomic. */static voidudp_data_ready(struct sock *sk, int len){	struct rpc_task	*task;	struct rpc_xprt	*xprt;	struct rpc_rqst *rovr;	struct sk_buff	*skb;	int		err, repsize, copied;	dprintk("RPC:      udp_data_ready...\n");	if (!(xprt = xprt_from_sock(sk))) {		printk("RPC:      udp_data_ready request not found!\n");		goto out;	}	dprintk("RPC:      udp_data_ready client %p\n", xprt);	if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)		goto out;	if (xprt->shutdown)		goto dropit;	repsize = skb->len - sizeof(struct udphdr);	if (repsize < 4) {		printk("RPC: impossible RPC reply size %d!\n", repsize);		goto dropit;	}	/* Look up and lock the request corresponding to the given XID */	rovr = xprt_lookup_rqst(xprt, *(u32 *) (skb->h.raw + sizeof(struct udphdr)));	if (!rovr)		goto dropit;	task = rovr->rq_task;	dprintk("RPC: %4d received reply\n", task->tk_pid);	xprt_pktdump("packet data:",		     (u32 *) (skb->h.raw+sizeof(struct udphdr)), repsize);	if ((copied = rovr->rq_rlen) > repsize)		copied = repsize;	/* Suck it into the iovec, verify checksum if not done by hw. */	if (csum_partial_copy_to_page_cache(rovr->rq_rvec, skb, copied))		goto out_unlock;	/* Something worked... */	dst_confirm(skb->dst);	xprt_complete_rqst(xprt, rovr, copied); out_unlock:	rpc_unlock_task(task); dropit:	skb_free_datagram(sk, skb); out:	if (sk->sleep && waitqueue_active(sk->sleep))		wake_up_interruptible(sk->sleep);}/* * TCP read fragment marker */static inline inttcp_read_fraghdr(struct rpc_xprt *xprt){	struct iovec	riov;	int		want, result;	if (xprt->tcp_offset >= xprt->tcp_reclen + sizeof(xprt->tcp_recm)) {		xprt->tcp_offset = 0;		xprt->tcp_reclen = 0;	}	if (xprt->tcp_offset >= sizeof(xprt->tcp_recm))		goto done;	want = sizeof(xprt->tcp_recm) - xprt->tcp_offset;	dprintk("RPC:      reading header (%d bytes)\n", want);	do {		riov.iov_base = ((u8*) &xprt->tcp_recm) + xprt->tcp_offset;		riov.iov_len  = want;		result = xprt_recvmsg(xprt, &riov, 1, want, 0);		if (result < 0)			return result;		xprt->tcp_offset += result;		want -= result;	} while (want);	/* Is this another fragment in the last message */	if (!xprt->tcp_more)		xprt->tcp_copied = 0; /* No, so we're reading a new message */	/* Get the record length and mask out the last fragment bit */	xprt->tcp_reclen = ntohl(xprt->tcp_recm);	xprt->tcp_more = (xprt->tcp_reclen & 0x80000000) ? 0 : 1;	xprt->tcp_reclen &= 0x7fffffff;	dprintk("RPC:      New record reclen %d morefrags %d\n",				   xprt->tcp_reclen, xprt->tcp_more); done:	return xprt->tcp_reclen + sizeof(xprt->tcp_recm) - xprt->tcp_offset;}/* * TCP read xid */static inline inttcp_read_xid(struct rpc_xprt *xprt, int avail){	struct iovec	riov;	int		want, result;	if (xprt->tcp_copied >= sizeof(xprt->tcp_xid) || !avail)		goto done;	want = MIN(sizeof(xprt->tcp_xid) - xprt->tcp_copied, avail);	do {		dprintk("RPC:      reading xid (%d bytes)\n", want);		riov.iov_base = ((u8*) &xprt->tcp_xid) + xprt->tcp_copied;		riov.iov_len  = want;		result = xprt_recvmsg(xprt, &riov, 1, want, 0);		if (result < 0)			return result;		xprt->tcp_copied += result;		xprt->tcp_offset += result;		want  -= result;		avail -= result;	} while (want); done:	return avail;}/* * TCP read and complete request */static inline inttcp_read_request(struct rpc_xprt *xprt, struct rpc_rqst *req, int avail){	int	want, result;	if (req->rq_rlen <= xprt->tcp_copied || !avail)		goto done;	want = MIN(req->rq_rlen - xprt->tcp_copied, avail);	do {		dprintk("RPC: %4d TCP receiving %d bytes\n",			req->rq_task->tk_pid, want);		result = xprt_recvmsg(xprt, req->rq_rvec, req->rq_rnr, want, xprt->tcp_copied);		if (result < 0)			return result;		xprt->tcp_copied += result;		xprt->tcp_offset += result;		avail  -= result;		want   -= result;	} while (want); done:	if (req->rq_rlen > xprt->tcp_copied && xprt->tcp_more)		return avail;	dprintk("RPC: %4d received reply complete\n", req->rq_task->tk_pid);	xprt_complete_rqst(xprt, req, xprt->tcp_copied);	return avail;}/* * TCP discard extra bytes from a short read */static inline inttcp_read_discard(struct rpc_xprt *xprt, int avail){	struct iovec	riov;	static u8	dummy[64];	int		want, result = 0;	while (avail) {		want = MIN(avail, sizeof(dummy));		riov.iov_base = dummy;		riov.iov_len  = want;		dprintk("RPC:      TCP skipping %d bytes\n", want);		result = xprt_recvmsg(xprt, &riov, 1, want, 0);		if (result < 0)			return result;		xprt->tcp_offset += result;		avail  -= result;	}	return avail;}/* * TCP record receive routine * This is not the most efficient code since we call recvfrom thrice-- * first receiving the record marker, then the XID, then the data. *  * The optimal solution would be a RPC support in the TCP layer, which * would gather all data up to the next record marker and then pass us * the list of all TCP segments ready to be copied. */static inttcp_input_record(struct rpc_xprt *xprt){	struct rpc_rqst	*req = NULL;	struct rpc_task	*task = NULL;	int		avail, result;	dprintk("RPC:      tcp_input_record\n");	if (xprt->shutdown)		return -EIO;	if (!xprt_connected(xprt))		return -ENOTCONN;	/* Read in a new fragment marker if necessary */	/* Can we ever really expect to get completely empty fragments? */	if ((result = tcp_read_fraghdr(xprt)) <= 0)		return result;	avail = result;	/* Read in the xid if necessary */	if ((result = tcp_read_xid(xprt, avail)) <= 0)		return result;	avail = result;	/* Find and lock the request corresponding to this xid */	req = xprt_lookup_rqst(xprt, xprt->tcp_xid);	if (req) {		task = req->rq_task;		/* Read in the request data */		result = tcp_read_request(xprt,  req, avail);		rpc_unlock_task(task);		if (result < 0)			return result;		avail = result;	}	/* Skip over any trailing bytes on short reads */	if ((result = tcp_read_discard(xprt, avail)) < 0)		return result;	dprintk("RPC:      tcp_input_record done (off %d reclen %d copied %d)\n",			xprt->tcp_offset, xprt->tcp_reclen, xprt->tcp_copied);	result = xprt->tcp_reclen;	return result;}/* *	TCP task queue stuff */LIST_HEAD(rpc_xprt_pending);	/* List of xprts having pending tcp requests */static inlinevoid tcp_rpciod_queue(void){	rpciod_wake_up();}static inlinevoid xprt_append_pending(struct rpc_xprt *xprt){	if (!list_empty(&xprt->rx_pending))		return;	spin_lock_bh(&rpc_queue_lock);	if (list_empty(&xprt->rx_pending)) {		list_add(&xprt->rx_pending, rpc_xprt_pending.prev);		dprintk("RPC:     xprt queue %p\n", xprt);		tcp_rpciod_queue();	}	spin_unlock_bh(&rpc_queue_lock);}staticvoid xprt_remove_pending(struct rpc_xprt *xprt){	spin_lock_bh(&rpc_queue_lock);	if (!list_empty(&xprt->rx_pending)) {		list_del(&xprt->rx_pending);		INIT_LIST_HEAD(&xprt->rx_pending);	}	spin_unlock_bh(&rpc_queue_lock);}static inlinestruct rpc_xprt *xprt_remove_pending_next(void){	struct rpc_xprt	*xprt = NULL;	spin_lock_bh(&rpc_queue_lock);	if (!list_empty(&rpc_xprt_pending)) {		xprt = list_entry(rpc_xprt_pending.next, struct rpc_xprt, rx_pending);		list_del(&xprt->rx_pending);		INIT_LIST_HEAD(&xprt->rx_pending);	}	spin_unlock_bh(&rpc_queue_lock);	return xprt;}/* *	This is protected from tcp_data_ready and the stack as its run *	inside of the RPC I/O daemon */void__rpciod_tcp_dispatcher(void){	struct rpc_xprt *xprt;	int safe_retry = 0, result;	dprintk("rpciod_tcp_dispatcher: Queue Running\n");	/*	 *	Empty each pending socket	 */	while ((xprt = xprt_remove_pending_next()) != NULL) {		dprintk("rpciod_tcp_dispatcher: Processing %p\n", xprt);		do {			result = tcp_input_record(xprt);		} while (result >= 0);		if (safe_retry++ > 200) {			schedule();			safe_retry = 0;		}	}}/* *	data_ready callback for TCP. We can't just jump into the *	tcp recvmsg functions inside of the network receive bh or * 	bad things occur. We queue it to pick up after networking *	is done. */ static void tcp_data_ready(struct sock *sk, int len){	struct rpc_xprt	*xprt;	dprintk("RPC:      tcp_data_ready...\n");	if (!(xprt = xprt_from_sock(sk)))	{		printk("Not a socket with xprt %p\n", sk);		goto out;	}	if (xprt->shutdown)		goto out;	xprt_append_pending(xprt);	dprintk("RPC:      tcp_data_ready client %p\n", xprt);	dprintk("RPC:      state %x conn %d dead %d zapped %d\n",				sk->state, xprt_connected(xprt),				sk->dead, sk->zapped); out:	if (sk->sleep && waitqueue_active(sk->sleep))		wake_up_interruptible(sk->sleep);}static voidtcp_state_change(struct sock *sk){	struct rpc_xprt	*xprt;	if (!(xprt = xprt_from_sock(sk)))		goto out;	dprintk("RPC:      tcp_state_change client %p...\n", xprt);	dprintk("RPC:      state %x conn %d dead %d zapped %d\n",				sk->state, xprt_connected(xprt),				sk->dead, sk->zapped);	switch (sk->state) {	case TCP_ESTABLISHED:		if (xprt_test_and_set_connected(xprt))			break;		spin_lock_bh(&xprt_sock_lock);		if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->sending)			rpc_wake_up_task(xprt->snd_task);		rpc_wake_up(&xprt->reconn);		spin_unlock_bh(&xprt_sock_lock);		break;	case TCP_SYN_SENT:	case TCP_SYN_RECV:		break;	default:		xprt_disconnect(xprt);		break;	} out:	if (sk->sleep && waitqueue_active(sk->sleep))		wake_up_interruptible_all(sk->sleep);}/* * The following 2 routines allow a task to sleep while socket memory is * low. */static voidtcp_write_space(struct sock *sk){	struct rpc_xprt	*xprt;	struct socket	*sock;	if (!(xprt = xprt_from_sock(sk)) || !(sock = sk->socket))		return;	if (xprt->shutdown)		return;	/* Wait until we have enough socket memory */	if (!sock_writeable(sk))		return;	if (!xprt_test_and_set_wspace(xprt)) {		spin_lock_bh(&xprt_sock_lock);		if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->sending)			rpc_wake_up_task(xprt->snd_task);		spin_unlock_bh(&xprt_sock_lock);	}	if (test_bit(SOCK_NOSPACE, &sock->flags)) {		if (sk->sleep && waitqueue_active(sk->sleep)) {			clear_bit(SOCK_NOSPACE, &sock->flags);			wake_up_interruptible(sk->sleep);		}	}}static voidudp_write_space(struct sock *sk){	struct rpc_xprt *xprt;	if (!(xprt = xprt_from_sock(sk)))		return;	if (xprt->shutdown)		return;	/* Wait until we have enough socket memory */	if (sock_wspace(sk) < min(sk->sndbuf,XPRT_MIN_WRITE_SPACE))		return;	if (!xprt_test_and_set_wspace(xprt)) {		spin_lock_bh(&xprt_sock_lock);		if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->sending)			rpc_wake_up_task(xprt->snd_task);		spin_unlock_bh(&xprt_sock_lock);	}	if (sk->sleep && waitqueue_active(sk->sleep))		wake_up_interruptible(sk->sleep);}/* * RPC receive timeout handler. */static voidxprt_timer(struct rpc_task *task){	struct rpc_rqst	*req = task->tk_rqstp;	if (req)		xprt_adjust_cwnd(task->tk_xprt, -ETIMEDOUT);	dprintk("RPC: %4d xprt_timer (%s request)\n",		task->tk_pid, req ? "pending" : "backlogged");	task->tk_status  = -ETIMEDOUT;	task->tk_timeout = 0;	rpc_wake_up_task(task);}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -