⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 xprt.c

📁 Linux Kernel 2.6.9 for OMAP1710
💻 C
📖 第 1 页 / 共 3 页
字号:
	task->tk_timeout = RPC_CONNECT_TIMEOUT;	rpc_sleep_on(&xprt->pending, task, xprt_connect_status, NULL);	if (!test_and_set_bit(XPRT_CONNECTING, &xprt->sockstate)) {		/* Note: if we are here due to a dropped connection		 * 	 we delay reconnecting by RPC_REESTABLISH_TIMEOUT/HZ		 * 	 seconds		 */		if (xprt->sock != NULL)			schedule_delayed_work(&xprt->sock_connect,					RPC_REESTABLISH_TIMEOUT);		else			schedule_work(&xprt->sock_connect);	}	return; out_write:	xprt_release_write(xprt, task);}/* * We arrive here when awoken from waiting on connection establishment. */static voidxprt_connect_status(struct rpc_task *task){	struct rpc_xprt	*xprt = task->tk_xprt;	if (task->tk_status >= 0) {		dprintk("RPC: %4d xprt_connect_status: connection established\n",				task->tk_pid);		return;	}	/* if soft mounted, just cause this RPC to fail */	if (RPC_IS_SOFT(task))		task->tk_status = -EIO;	switch (task->tk_status) {	case -ECONNREFUSED:	case -ECONNRESET:	case -ENOTCONN:		return;	case -ETIMEDOUT:		dprintk("RPC: %4d xprt_connect_status: timed out\n",				task->tk_pid);		break;	default:		printk(KERN_ERR "RPC: error %d connecting to server %s\n",				-task->tk_status, task->tk_client->cl_server);	}	xprt_release_write(xprt, task);}/* * Look up the RPC request corresponding to a reply, and then lock it. */static inline struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid){	struct list_head *pos;	struct rpc_rqst	*req = NULL;	list_for_each(pos, &xprt->recv) {		struct rpc_rqst *entry = list_entry(pos, struct rpc_rqst, rq_list);		if (entry->rq_xid == xid) {			req = entry;			break;		}	}	return req;}/* * Complete reply received. * The TCP code relies on us to remove the request from xprt->pending. */static voidxprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied){	struct rpc_task	*task = req->rq_task;	struct rpc_clnt *clnt = task->tk_client;	/* Adjust congestion window */	if (!xprt->nocong) {		unsigned timer = task->tk_msg.rpc_proc->p_timer;		xprt_adjust_cwnd(xprt, copied);		__xprt_put_cong(xprt, req);		if (timer) {			if (req->rq_ntrans == 1)				rpc_update_rtt(clnt->cl_rtt, timer,						(long)jiffies - req->rq_xtime);			rpc_set_timeo(clnt->cl_rtt, timer, req->rq_ntrans - 1);		}	}#ifdef RPC_PROFILE	/* Profile only reads for now */	if (copied > 1024) {		static unsigned long	nextstat;		static unsigned long	pkt_rtt, pkt_len, pkt_cnt;		pkt_cnt++;		pkt_len += req->rq_slen + copied;		pkt_rtt += jiffies - req->rq_xtime;		if (time_before(nextstat, jiffies)) {			printk("RPC: %lu %ld cwnd\n", jiffies, xprt->cwnd);			printk("RPC: %ld %ld %ld %ld stat\n",					jiffies, pkt_cnt, pkt_len, pkt_rtt);			pkt_rtt = pkt_len = pkt_cnt = 0;			nextstat = jiffies + 5 * HZ;		}	}#endif	dprintk("RPC: %4d has input (%d bytes)\n", task->tk_pid, copied);	list_del_init(&req->rq_list);	req->rq_received = req->rq_private_buf.len = copied;	/* ... and wake up the process. */	rpc_wake_up_task(task);	return;}static size_tskb_read_bits(skb_reader_t *desc, void *to, size_t len){	if (len > desc->count)		len = desc->count;	if (skb_copy_bits(desc->skb, desc->offset, to, len))		return 0;	desc->count -= len;	desc->offset += len;	return len;}static size_tskb_read_and_csum_bits(skb_reader_t *desc, void *to, size_t len){	unsigned int csum2, pos;	if (len > desc->count)		len = desc->count;	pos = desc->offset;	csum2 = skb_copy_and_csum_bits(desc->skb, pos, to, len, 0);	desc->csum = csum_block_add(desc->csum, csum2, pos);	desc->count -= len;	desc->offset += len;	return len;}/* * We have set things up such that we perform the checksum of the UDP * packet in parallel with the copies into the RPC client iovec.  -DaveM */intcsum_partial_copy_to_xdr(struct xdr_buf *xdr, struct sk_buff *skb){	skb_reader_t desc;	desc.skb = skb;	desc.offset = sizeof(struct udphdr);	desc.count = skb->len - desc.offset;	if (skb->ip_summed == CHECKSUM_UNNECESSARY)		goto no_checksum;	desc.csum = csum_partial(skb->data, desc.offset, skb->csum);	xdr_partial_copy_from_skb(xdr, 0, &desc, skb_read_and_csum_bits);	if (desc.offset != skb->len) {		unsigned int csum2;		csum2 = skb_checksum(skb, desc.offset, skb->len - desc.offset, 0);		desc.csum = csum_block_add(desc.csum, csum2, desc.offset);	}	if (desc.count)		return -1;	if ((unsigned short)csum_fold(desc.csum))		return -1;	return 0;no_checksum:	xdr_partial_copy_from_skb(xdr, 0, &desc, skb_read_bits);	if (desc.count)		return -1;	return 0;}/* * Input handler for RPC replies. Called from a bottom half and hence * atomic. */static voidudp_data_ready(struct sock *sk, int len){	struct rpc_task	*task;	struct rpc_xprt	*xprt;	struct rpc_rqst *rovr;	struct sk_buff	*skb;	int err, repsize, copied;	u32 _xid, *xp;	read_lock(&sk->sk_callback_lock);	dprintk("RPC:      udp_data_ready...\n");	if (!(xprt = xprt_from_sock(sk))) {		printk("RPC:      udp_data_ready request not found!\n");		goto out;	}	dprintk("RPC:      udp_data_ready client %p\n", xprt);	if ((skb = skb_recv_datagram(sk, 0, 1, &err)) == NULL)		goto out;	if (xprt->shutdown)		goto dropit;	repsize = skb->len - sizeof(struct udphdr);	if (repsize < 4) {		printk("RPC: impossible RPC reply size %d!\n", repsize);		goto dropit;	}	/* Copy the XID from the skb... */	xp = skb_header_pointer(skb, sizeof(struct udphdr),				sizeof(_xid), &_xid);	if (xp == NULL)		goto dropit;	/* Look up and lock the request corresponding to the given XID */	spin_lock(&xprt->sock_lock);	rovr = xprt_lookup_rqst(xprt, *xp);	if (!rovr)		goto out_unlock;	task = rovr->rq_task;	dprintk("RPC: %4d received reply\n", task->tk_pid);	if ((copied = rovr->rq_private_buf.buflen) > repsize)		copied = repsize;	/* Suck it into the iovec, verify checksum if not done by hw. */	if (csum_partial_copy_to_xdr(&rovr->rq_private_buf, skb))		goto out_unlock;	/* Something worked... */	dst_confirm(skb->dst);	xprt_complete_rqst(xprt, rovr, copied); out_unlock:	spin_unlock(&xprt->sock_lock); dropit:	skb_free_datagram(sk, skb); out:	read_unlock(&sk->sk_callback_lock);}/* * Copy from an skb into memory and shrink the skb. */static inline size_ttcp_copy_data(skb_reader_t *desc, void *p, size_t len){	if (len > desc->count)		len = desc->count;	if (skb_copy_bits(desc->skb, desc->offset, p, len))		return 0;	desc->offset += len;	desc->count -= len;	return len;}/* * TCP read fragment marker */static inline voidtcp_read_fraghdr(struct rpc_xprt *xprt, skb_reader_t *desc){	size_t len, used;	char *p;	p = ((char *) &xprt->tcp_recm) + xprt->tcp_offset;	len = sizeof(xprt->tcp_recm) - xprt->tcp_offset;	used = tcp_copy_data(desc, p, len);	xprt->tcp_offset += used;	if (used != len)		return;	xprt->tcp_reclen = ntohl(xprt->tcp_recm);	if (xprt->tcp_reclen & 0x80000000)		xprt->tcp_flags |= XPRT_LAST_FRAG;	else		xprt->tcp_flags &= ~XPRT_LAST_FRAG;	xprt->tcp_reclen &= 0x7fffffff;	xprt->tcp_flags &= ~XPRT_COPY_RECM;	xprt->tcp_offset = 0;	/* Sanity check of the record length */	if (xprt->tcp_reclen < 4) {		printk(KERN_ERR "RPC: Invalid TCP record fragment length\n");		xprt_disconnect(xprt);	}	dprintk("RPC:      reading TCP record fragment of length %d\n",			xprt->tcp_reclen);}static voidtcp_check_recm(struct rpc_xprt *xprt){	if (xprt->tcp_offset == xprt->tcp_reclen) {		xprt->tcp_flags |= XPRT_COPY_RECM;		xprt->tcp_offset = 0;		if (xprt->tcp_flags & XPRT_LAST_FRAG) {			xprt->tcp_flags &= ~XPRT_COPY_DATA;			xprt->tcp_flags |= XPRT_COPY_XID;			xprt->tcp_copied = 0;		}	}}/* * TCP read xid */static inline voidtcp_read_xid(struct rpc_xprt *xprt, skb_reader_t *desc){	size_t len, used;	char *p;	len = sizeof(xprt->tcp_xid) - xprt->tcp_offset;	dprintk("RPC:      reading XID (%Zu bytes)\n", len);	p = ((char *) &xprt->tcp_xid) + xprt->tcp_offset;	used = tcp_copy_data(desc, p, len);	xprt->tcp_offset += used;	if (used != len)		return;	xprt->tcp_flags &= ~XPRT_COPY_XID;	xprt->tcp_flags |= XPRT_COPY_DATA;	xprt->tcp_copied = 4;	dprintk("RPC:      reading reply for XID %08x\n", xprt->tcp_xid);	tcp_check_recm(xprt);}/* * TCP read and complete request */static inline voidtcp_read_request(struct rpc_xprt *xprt, skb_reader_t *desc){	struct rpc_rqst *req;	struct xdr_buf *rcvbuf;	size_t len;	/* Find and lock the request corresponding to this xid */	spin_lock(&xprt->sock_lock);	req = xprt_lookup_rqst(xprt, xprt->tcp_xid);	if (!req) {		xprt->tcp_flags &= ~XPRT_COPY_DATA;		dprintk("RPC:      XID %08x request not found!\n",				xprt->tcp_xid);		spin_unlock(&xprt->sock_lock);		return;	}	rcvbuf = &req->rq_private_buf;	len = desc->count;	if (len > xprt->tcp_reclen - xprt->tcp_offset) {		skb_reader_t my_desc;		len = xprt->tcp_reclen - xprt->tcp_offset;		memcpy(&my_desc, desc, sizeof(my_desc));		my_desc.count = len;		xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied,					  &my_desc, tcp_copy_data);		desc->count -= len;		desc->offset += len;	} else		xdr_partial_copy_from_skb(rcvbuf, xprt->tcp_copied,					  desc, tcp_copy_data);	xprt->tcp_copied += len;	xprt->tcp_offset += len;	if (xprt->tcp_copied == req->rq_private_buf.buflen)		xprt->tcp_flags &= ~XPRT_COPY_DATA;	else if (xprt->tcp_offset == xprt->tcp_reclen) {		if (xprt->tcp_flags & XPRT_LAST_FRAG)			xprt->tcp_flags &= ~XPRT_COPY_DATA;	}	if (!(xprt->tcp_flags & XPRT_COPY_DATA)) {		dprintk("RPC: %4d received reply complete\n",				req->rq_task->tk_pid);		xprt_complete_rqst(xprt, req, xprt->tcp_copied);	}	spin_unlock(&xprt->sock_lock);	tcp_check_recm(xprt);}/* * TCP discard extra bytes from a short read */static inline voidtcp_read_discard(struct rpc_xprt *xprt, skb_reader_t *desc){	size_t len;	len = xprt->tcp_reclen - xprt->tcp_offset;	if (len > desc->count)		len = desc->count;	desc->count -= len;	desc->offset += len;	xprt->tcp_offset += len;	tcp_check_recm(xprt);}/* * TCP record receive routine * We first have to grab the record marker, then the XID, then the data. */static inttcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb,		unsigned int offset, size_t len){	struct rpc_xprt *xprt = rd_desc->arg.data;	skb_reader_t desc = {		.skb	= skb,		.offset	= offset,		.count	= len,		.csum	= 0       	};	dprintk("RPC:      tcp_data_recv\n");	do {		/* Read in a new fragment marker if necessary */		/* Can we ever really expect to get completely empty fragments? */		if (xprt->tcp_flags & XPRT_COPY_RECM) {			tcp_read_fraghdr(xprt, &desc);			continue;		}		/* Read in the xid if necessary */		if (xprt->tcp_flags & XPRT_COPY_XID) {			tcp_read_xid(xprt, &desc);			continue;		}		/* Read in the request data */		if (xprt->tcp_flags & XPRT_COPY_DATA) {			tcp_read_request(xprt, &desc);			continue;		}		/* Skip over any trailing bytes on short reads */		tcp_read_discard(xprt, &desc);	} while (desc.count);	dprintk("RPC:      tcp_data_recv done\n");	return len - desc.count;}static void tcp_data_ready(struct sock *sk, int bytes){	struct rpc_xprt *xprt;	read_descriptor_t rd_desc;	read_lock(&sk->sk_callback_lock);	dprintk("RPC:      tcp_data_ready...\n");	if (!(xprt = xprt_from_sock(sk))) {		printk("RPC:      tcp_data_ready socket info not found!\n");		goto out;	}	if (xprt->shutdown)		goto out;	/* We use rd_desc to pass struct xprt to tcp_data_recv */	rd_desc.arg.data = xprt;	rd_desc.count = 65536;	tcp_read_sock(sk, &rd_desc, tcp_data_recv);out:	read_unlock(&sk->sk_callback_lock);}static voidtcp_state_change(struct sock *sk){	struct rpc_xprt	*xprt;	read_lock(&sk->sk_callback_lock);	if (!(xprt = xprt_from_sock(sk)))		goto out;	dprintk("RPC:      tcp_state_change client %p...\n", xprt);	dprintk("RPC:      state %x conn %d dead %d zapped %d\n",				sk->sk_state, xprt_connected(xprt),				sock_flag(sk, SOCK_DEAD), sk->sk_zapped);	switch (sk->sk_state) {	case TCP_ESTABLISHED:		spin_lock_bh(&xprt->sock_lock);		if (!xprt_test_and_set_connected(xprt)) {			/* Reset TCP record info */			xprt->tcp_offset = 0;			xprt->tcp_reclen = 0;			xprt->tcp_copied = 0;			xprt->tcp_flags = XPRT_COPY_RECM | XPRT_COPY_XID;			rpc_wake_up(&xprt->pending);		}		spin_unlock_bh(&xprt->sock_lock);		break;	case TCP_SYN_SENT:	case TCP_SYN_RECV:		break;	default:		if (xprt_test_and_clear_connected(xprt))			rpc_wake_up_status(&xprt->pending, -ENOTCONN);		break;	} out:	read_unlock(&sk->sk_callback_lock);}/* * Called when more output buffer space is available for this socket. * We try not to wake our writers until they can make "significant" * progress, otherwise we'll waste resources thrashing sock_sendmsg * with a bunch of small requests. */static voidxprt_write_space(struct sock *sk){	struct rpc_xprt	*xprt;	struct socket	*sock;	read_lock(&sk->sk_callback_lock);	if (!(xprt = xprt_from_sock(sk)) || !(sock = sk->sk_socket))		goto out;	if (xprt->shutdown)		goto out;	/* Wait until we have enough socket memory */	if (xprt->stream) {		/* from net/core/stream.c:sk_stream_write_space */		if (sk_stream_wspace(sk) < sk_stream_min_wspace(sk))			goto out;	} else {		/* from net/core/sock.c:sock_def_write_space */		if (!sock_writeable(sk))			goto out;	}	if (!test_and_clear_bit(SOCK_NOSPACE, &sock->flags))		goto out;	spin_lock_bh(&xprt->sock_lock);	if (xprt->snd_task && xprt->snd_task->tk_rpcwait == &xprt->pending)		rpc_wake_up_task(xprt->snd_task);	spin_unlock_bh(&xprt->sock_lock);out:	read_unlock(&sk->sk_callback_lock);}/* * RPC receive timeout handler. */static voidxprt_timer(struct rpc_task *task){

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -