⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 xprt.c

📁 Linux Kernel 2.6.9 for OMAP1710
💻 C
📖 第 1 页 / 共 3 页
字号:
	struct rpc_rqst	*req = task->tk_rqstp;	struct rpc_xprt *xprt = req->rq_xprt;	spin_lock(&xprt->sock_lock);	if (req->rq_received)		goto out;	xprt_adjust_cwnd(req->rq_xprt, -ETIMEDOUT);	__xprt_put_cong(xprt, req);	dprintk("RPC: %4d xprt_timer (%s request)\n",		task->tk_pid, req ? "pending" : "backlogged");	task->tk_status  = -ETIMEDOUT;out:	task->tk_timeout = 0;	rpc_wake_up_task(task);	spin_unlock(&xprt->sock_lock);}/* * Place the actual RPC call. * We have to copy the iovec because sendmsg fiddles with its contents. */intxprt_prepare_transmit(struct rpc_task *task){	struct rpc_rqst	*req = task->tk_rqstp;	struct rpc_xprt	*xprt = req->rq_xprt;	int err = 0;	dprintk("RPC: %4d xprt_prepare_transmit\n", task->tk_pid);	if (xprt->shutdown)		return -EIO;	spin_lock_bh(&xprt->sock_lock);	if (req->rq_received && !req->rq_bytes_sent) {		err = req->rq_received;		goto out_unlock;	}	if (!__xprt_lock_write(xprt, task)) {		err = -EAGAIN;		goto out_unlock;	}	if (!xprt_connected(xprt)) {		err = -ENOTCONN;		goto out_unlock;	}out_unlock:	spin_unlock_bh(&xprt->sock_lock);	return err;}voidxprt_transmit(struct rpc_task *task){	struct rpc_clnt *clnt = task->tk_client;	struct rpc_rqst	*req = task->tk_rqstp;	struct rpc_xprt	*xprt = req->rq_xprt;	int status, retry = 0;	dprintk("RPC: %4d xprt_transmit(%u)\n", task->tk_pid, req->rq_slen);	/* set up everything as needed. */	/* Write the record marker */	if (xprt->stream) {		u32	*marker = req->rq_svec[0].iov_base;		*marker = htonl(0x80000000|(req->rq_slen-sizeof(*marker)));	}	smp_rmb();	if (!req->rq_received) {		if (list_empty(&req->rq_list)) {			spin_lock_bh(&xprt->sock_lock);			/* Update the softirq receive buffer */			memcpy(&req->rq_private_buf, &req->rq_rcv_buf,					sizeof(req->rq_private_buf));			/* Add request to the receive list */			list_add_tail(&req->rq_list, &xprt->recv);			spin_unlock_bh(&xprt->sock_lock);			xprt_reset_majortimeo(req);		}	} else if (!req->rq_bytes_sent)		return;	/* Continue transmitting the packet/record. We must be careful	 * to cope with writespace callbacks arriving _after_ we have	 * called xprt_sendmsg().	 */	while (1) {		req->rq_xtime = jiffies;		status = xprt_sendmsg(xprt, req);		if (status < 0)			break;		if (xprt->stream) {			req->rq_bytes_sent += status;			/* If we've sent the entire packet, immediately			 * reset the count of bytes sent. */			if (req->rq_bytes_sent >= req->rq_slen) {				req->rq_bytes_sent = 0;				goto out_receive;			}		} else {			if (status >= req->rq_slen)				goto out_receive;			status = -EAGAIN;			break;		}		dprintk("RPC: %4d xmit incomplete (%d left of %d)\n",				task->tk_pid, req->rq_slen - req->rq_bytes_sent,				req->rq_slen);		status = -EAGAIN;		if (retry++ > 50)			break;	}	/* Note: at this point, task->tk_sleeping has not yet been set,	 *	 hence there is no danger of the waking up task being put on	 *	 schedq, and being picked up by a parallel run of rpciod().	 */	task->tk_status = status;	switch (status) {	case -EAGAIN:		if (test_bit(SOCK_ASYNC_NOSPACE, &xprt->sock->flags)) {			/* Protect against races with xprt_write_space */			spin_lock_bh(&xprt->sock_lock);			/* Don't race with disconnect */			if (!xprt_connected(xprt))				task->tk_status = -ENOTCONN;			else if (test_bit(SOCK_NOSPACE, &xprt->sock->flags)) {				task->tk_timeout = req->rq_timeout;				rpc_sleep_on(&xprt->pending, task, NULL, NULL);			}			spin_unlock_bh(&xprt->sock_lock);			return;		}		/* Keep holding the socket if it is blocked */		rpc_delay(task, HZ>>4);		return;	case -ECONNREFUSED:		task->tk_timeout = RPC_REESTABLISH_TIMEOUT;		rpc_sleep_on(&xprt->sending, task, NULL, NULL);	case -ENOTCONN:		return;	default:		if (xprt->stream)			xprt_disconnect(xprt);	}	xprt_release_write(xprt, task);	return; out_receive:	dprintk("RPC: %4d xmit complete\n", task->tk_pid);	/* Set the task's receive timeout value */	spin_lock_bh(&xprt->sock_lock);	if (!xprt->nocong) {		int timer = task->tk_msg.rpc_proc->p_timer;		task->tk_timeout = rpc_calc_rto(clnt->cl_rtt, timer);		task->tk_timeout <<= rpc_ntimeo(clnt->cl_rtt, timer) + req->rq_retries;		if (task->tk_timeout > xprt->timeout.to_maxval || task->tk_timeout == 0)			task->tk_timeout = xprt->timeout.to_maxval;	} else		task->tk_timeout = req->rq_timeout;	/* Don't race with disconnect */	if (!xprt_connected(xprt))		task->tk_status = -ENOTCONN;	else if (!req->rq_received)		rpc_sleep_on(&xprt->pending, task, NULL, xprt_timer);	__xprt_release_write(xprt, task);	spin_unlock_bh(&xprt->sock_lock);}/* * Reserve an RPC call slot. */static inline voiddo_xprt_reserve(struct rpc_task *task){	struct rpc_xprt	*xprt = task->tk_xprt;	task->tk_status = 0;	if (task->tk_rqstp)		return;	if (!list_empty(&xprt->free)) {		struct rpc_rqst	*req = list_entry(xprt->free.next, struct rpc_rqst, rq_list);		list_del_init(&req->rq_list);		task->tk_rqstp = req;		xprt_request_init(task, xprt);		return;	}	dprintk("RPC:      waiting for request slot\n");	task->tk_status = -EAGAIN;	task->tk_timeout = 0;	rpc_sleep_on(&xprt->backlog, task, NULL, NULL);}voidxprt_reserve(struct rpc_task *task){	struct rpc_xprt	*xprt = task->tk_xprt;	task->tk_status = -EIO;	if (!xprt->shutdown) {		spin_lock(&xprt->xprt_lock);		do_xprt_reserve(task);		spin_unlock(&xprt->xprt_lock);		if (task->tk_rqstp)			del_timer_sync(&xprt->timer);	}}/* * Allocate a 'unique' XID */static inline u32 xprt_alloc_xid(struct rpc_xprt *xprt){	return xprt->xid++;}static inline void xprt_init_xid(struct rpc_xprt *xprt){	get_random_bytes(&xprt->xid, sizeof(xprt->xid));}/* * Initialize RPC request */static voidxprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt){	struct rpc_rqst	*req = task->tk_rqstp;	req->rq_timeout = xprt->timeout.to_initval;	req->rq_task	= task;	req->rq_xprt    = xprt;	req->rq_xid     = xprt_alloc_xid(xprt);	dprintk("RPC: %4d reserved req %p xid %08x\n", task->tk_pid,			req, req->rq_xid);}/* * Release an RPC call slot */voidxprt_release(struct rpc_task *task){	struct rpc_xprt	*xprt = task->tk_xprt;	struct rpc_rqst	*req;	if (!(req = task->tk_rqstp))		return;	spin_lock_bh(&xprt->sock_lock);	__xprt_release_write(xprt, task);	__xprt_put_cong(xprt, req);	if (!list_empty(&req->rq_list))		list_del(&req->rq_list);	xprt->last_used = jiffies;	if (list_empty(&xprt->recv) && !xprt->shutdown)		mod_timer(&xprt->timer, xprt->last_used + XPRT_IDLE_TIMEOUT);	spin_unlock_bh(&xprt->sock_lock);	task->tk_rqstp = NULL;	memset(req, 0, sizeof(*req));	/* mark unused */	dprintk("RPC: %4d release request %p\n", task->tk_pid, req);	spin_lock(&xprt->xprt_lock);	list_add(&req->rq_list, &xprt->free);	xprt_clear_backlog(xprt);	spin_unlock(&xprt->xprt_lock);}/* * Set default timeout parameters */voidxprt_default_timeout(struct rpc_timeout *to, int proto){	if (proto == IPPROTO_UDP)		xprt_set_timeout(to, 5,  5 * HZ);	else		xprt_set_timeout(to, 5, 60 * HZ);}/* * Set constant timeout */voidxprt_set_timeout(struct rpc_timeout *to, unsigned int retr, unsigned long incr){	to->to_initval   = 	to->to_increment = incr;	to->to_maxval    = incr * retr;	to->to_retries   = retr;	to->to_exponential = 0;}unsigned int xprt_udp_slot_table_entries = RPC_DEF_SLOT_TABLE;unsigned int xprt_tcp_slot_table_entries = RPC_DEF_SLOT_TABLE;/* * Initialize an RPC client */static struct rpc_xprt *xprt_setup(int proto, struct sockaddr_in *ap, struct rpc_timeout *to){	struct rpc_xprt	*xprt;	unsigned int entries;	size_t slot_table_size;	struct rpc_rqst	*req;	dprintk("RPC:      setting up %s transport...\n",				proto == IPPROTO_UDP? "UDP" : "TCP");	entries = (proto == IPPROTO_TCP)?		xprt_tcp_slot_table_entries : xprt_udp_slot_table_entries;	if ((xprt = kmalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL)		return ERR_PTR(-ENOMEM);	memset(xprt, 0, sizeof(*xprt)); /* Nnnngh! */	xprt->max_reqs = entries;	slot_table_size = entries * sizeof(xprt->slot[0]);	xprt->slot = kmalloc(slot_table_size, GFP_KERNEL);	if (xprt->slot == NULL) {		kfree(xprt);		return ERR_PTR(-ENOMEM);	}	memset(xprt->slot, 0, slot_table_size);	xprt->addr = *ap;	xprt->prot = proto;	xprt->stream = (proto == IPPROTO_TCP)? 1 : 0;	if (xprt->stream) {		xprt->cwnd = RPC_MAXCWND(xprt);		xprt->nocong = 1;	} else		xprt->cwnd = RPC_INITCWND;	spin_lock_init(&xprt->sock_lock);	spin_lock_init(&xprt->xprt_lock);	init_waitqueue_head(&xprt->cong_wait);	INIT_LIST_HEAD(&xprt->free);	INIT_LIST_HEAD(&xprt->recv);	INIT_WORK(&xprt->sock_connect, xprt_socket_connect, xprt);	INIT_WORK(&xprt->task_cleanup, xprt_socket_autoclose, xprt);	init_timer(&xprt->timer);	xprt->timer.function = xprt_init_autodisconnect;	xprt->timer.data = (unsigned long) xprt;	xprt->last_used = jiffies;	xprt->port = XPRT_MAX_RESVPORT;	/* Set timeout parameters */	if (to) {		xprt->timeout = *to;	} else		xprt_default_timeout(&xprt->timeout, xprt->prot);	rpc_init_wait_queue(&xprt->pending, "xprt_pending");	rpc_init_wait_queue(&xprt->sending, "xprt_sending");	rpc_init_wait_queue(&xprt->resend, "xprt_resend");	rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog");	/* initialize free list */	for (req = &xprt->slot[entries-1]; req >= &xprt->slot[0]; req--)		list_add(&req->rq_list, &xprt->free);	xprt_init_xid(xprt);	/* Check whether we want to use a reserved port */	xprt->resvport = capable(CAP_NET_BIND_SERVICE) ? 1 : 0;	dprintk("RPC:      created transport %p with %u slots\n", xprt,			xprt->max_reqs);		return xprt;}/* * Bind to a reserved port */static inline int xprt_bindresvport(struct rpc_xprt *xprt, struct socket *sock){	struct sockaddr_in myaddr = {		.sin_family = AF_INET,	};	int		err, port;	/* Were we already bound to a given port? Try to reuse it */	port = xprt->port;	do {		myaddr.sin_port = htons(port);		err = sock->ops->bind(sock, (struct sockaddr *) &myaddr,						sizeof(myaddr));		if (err == 0) {			xprt->port = port;			return 0;		}		if (--port == 0)			port = XPRT_MAX_RESVPORT;	} while (err == -EADDRINUSE && port != xprt->port);	printk("RPC: Can't bind to reserved port (%d).\n", -err);	return err;}static voidxprt_bind_socket(struct rpc_xprt *xprt, struct socket *sock){	struct sock	*sk = sock->sk;	if (xprt->inet)		return;	write_lock_bh(&sk->sk_callback_lock);	sk->sk_user_data = xprt;	xprt->old_data_ready = sk->sk_data_ready;	xprt->old_state_change = sk->sk_state_change;	xprt->old_write_space = sk->sk_write_space;	if (xprt->prot == IPPROTO_UDP) {		sk->sk_data_ready = udp_data_ready;		sk->sk_no_check = UDP_CSUM_NORCV;		xprt_set_connected(xprt);	} else {		struct tcp_opt *tp = tcp_sk(sk);		tp->nonagle = 1;	/* disable Nagle's algorithm */		sk->sk_data_ready = tcp_data_ready;		sk->sk_state_change = tcp_state_change;		xprt_clear_connected(xprt);	}	sk->sk_write_space = xprt_write_space;	/* Reset to new socket */	xprt->sock = sock;	xprt->inet = sk;	write_unlock_bh(&sk->sk_callback_lock);	return;}/* * Set socket buffer length */voidxprt_sock_setbufsize(struct rpc_xprt *xprt){	struct sock *sk = xprt->inet;	if (xprt->stream)		return;	if (xprt->rcvsize) {		sk->sk_userlocks |= SOCK_RCVBUF_LOCK;		sk->sk_rcvbuf = xprt->rcvsize * xprt->max_reqs *  2;	}	if (xprt->sndsize) {		sk->sk_userlocks |= SOCK_SNDBUF_LOCK;		sk->sk_sndbuf = xprt->sndsize * xprt->max_reqs * 2;		sk->sk_write_space(sk);	}}/* * Datastream sockets are created here, but xprt_connect will create * and connect stream sockets. */static struct socket * xprt_create_socket(struct rpc_xprt *xprt, int proto, int resvport){	struct socket	*sock;	int		type, err;	dprintk("RPC:      xprt_create_socket(%s %d)\n",			   (proto == IPPROTO_UDP)? "udp" : "tcp", proto);	type = (proto == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM;	if ((err = sock_create_kern(PF_INET, type, proto, &sock)) < 0) {		printk("RPC: can't create socket (%d).\n", -err);		return NULL;	}	/* If the caller has the capability, bind to a reserved port */	if (resvport && xprt_bindresvport(xprt, sock) < 0) {		printk("RPC: can't bind to reserved port.\n");		goto failed;	}	return sock;failed:	sock_release(sock);	return NULL;}/* * Create an RPC client transport given the protocol and peer address. */struct rpc_xprt *xprt_create_proto(int proto, struct sockaddr_in *sap, struct rpc_timeout *to){	struct rpc_xprt	*xprt;	xprt = xprt_setup(proto, sap, to);	if (IS_ERR(xprt))		dprintk("RPC:      xprt_create_proto failed\n");	else		dprintk("RPC:      xprt_create_proto created xprt %p\n", xprt);	return xprt;}/* * Prepare for transport shutdown. */voidxprt_shutdown(struct rpc_xprt *xprt){	xprt->shutdown = 1;	rpc_wake_up(&xprt->sending);	rpc_wake_up(&xprt->resend);	rpc_wake_up(&xprt->pending);	rpc_wake_up(&xprt->backlog);	wake_up(&xprt->cong_wait);	del_timer_sync(&xprt->timer);}/* * Clear the xprt backlog queue */intxprt_clear_backlog(struct rpc_xprt *xprt) {	rpc_wake_up_next(&xprt->backlog);	wake_up(&xprt->cong_wait);	return 1;}/* * Destroy an RPC transport, killing off all requests. */intxprt_destroy(struct rpc_xprt *xprt){	dprintk("RPC:      destroying transport %p\n", xprt);	xprt_shutdown(xprt);	xprt_disconnect(xprt);	xprt_close(xprt);	kfree(xprt->slot);	kfree(xprt);	return 0;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -