⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 xprt.c

📁 Linux内核源代码 为压缩文件 是<<Linux内核>>一书中的源代码
💻 C
📖 第 1 页 / 共 3 页
字号:
/* * Serialize access to sockets, in order to prevent different * requests from interfering with each other. */static intxprt_down_transmit(struct rpc_task *task){	struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;	struct rpc_rqst	*req = task->tk_rqstp;	spin_lock(&xprt_lock);	if (xprt->snd_task && xprt->snd_task != task) {		dprintk("RPC: %4d TCP write queue full (task %d)\n",			task->tk_pid, xprt->snd_task->tk_pid);		task->tk_timeout = 0;		task->tk_status = -EAGAIN;		rpc_sleep_on(&xprt->sending, task, NULL, NULL);	} else if (!xprt->snd_task) {		xprt->snd_task = task;#ifdef RPC_PROFILE		req->rq_xtime = jiffies;#endif		req->rq_bytes_sent = 0;	}	spin_unlock(&xprt_lock);	return xprt->snd_task == task;}/* * Releases the socket for use by other requests. */static inline voidxprt_up_transmit(struct rpc_task *task){	struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;	if (xprt->snd_task && xprt->snd_task == task) {		spin_lock(&xprt_lock);		xprt->snd_task = NULL;		rpc_wake_up_next(&xprt->sending);		spin_unlock(&xprt_lock);	}}/* * Place the actual RPC call. * We have to copy the iovec because sendmsg fiddles with its contents. */voidxprt_transmit(struct rpc_task *task){	struct rpc_rqst	*req = task->tk_rqstp;	struct rpc_xprt	*xprt = req->rq_xprt;	dprintk("RPC: %4d xprt_transmit(%x)\n", task->tk_pid, 				*(u32 *)(req->rq_svec[0].iov_base));	if (xprt->shutdown)		task->tk_status = -EIO;	if (!xprt_connected(xprt))		task->tk_status = -ENOTCONN;	if (task->tk_status < 0)		return;	if (task->tk_rpcwait)		rpc_remove_wait_queue(task);	/* set up everything as needed. */	/* Write the record marker */	if (xprt->stream) {		u32	*marker = req->rq_svec[0].iov_base;		*marker = htonl(0x80000000|(req->rq_slen-sizeof(*marker)));	}	if (!xprt_down_transmit(task))		return;	do_xprt_transmit(task);}static voiddo_xprt_transmit(struct rpc_task *task){	struct rpc_rqst	*req = task->tk_rqstp;	struct rpc_xprt	*xprt = req->rq_xprt;	int status, retry = 0;	/* For fast networks/servers we have to put the request on	 * the pending list now:	 * Note that we don't want the task timing out during the	 * call to xprt_sendmsg(), so we initially disable the timeout,	 * and then reset it later...	 */	xprt_receive(task);	/* Continue transmitting the packet/record. We must be careful	 * to cope with writespace callbacks arriving _after_ we have	 * called xprt_sendmsg().	 */	while (1) {		xprt_clear_wspace(xprt);		status = xprt_sendmsg(xprt, req);		if (status < 0)			break;		if (xprt->stream) {			req->rq_bytes_sent += status;			if (req->rq_bytes_sent >= req->rq_slen)				goto out_receive;		} else {			if (status >= req->rq_slen)				goto out_receive;			status = -ENOMEM;			break;		}		dprintk("RPC: %4d xmit incomplete (%d left of %d)\n",				task->tk_pid, req->rq_slen - req->rq_bytes_sent,				req->rq_slen);		status = -EAGAIN;		if (retry++ > 50)			break;	}	rpc_unlock_task(task);	/* Note: at this point, task->tk_sleeping has not yet been set,	 *	 hence there is no danger of the waking up task being put on	 *	 schedq, and being picked up by a parallel run of rpciod().	 */	rpc_wake_up_task(task);	if (!RPC_IS_RUNNING(task))		goto out_release;	if (req->rq_received)		goto out_release;	task->tk_status = status;	switch (status) {	case -ENOMEM:		/* Protect against (udp|tcp)_write_space */		spin_lock_bh(&xprt_sock_lock);		if (!xprt_wspace(xprt)) {			task->tk_timeout = req->rq_timeout.to_current;			rpc_sleep_on(&xprt->sending, task, NULL, NULL);		}		spin_unlock_bh(&xprt_sock_lock);		return;	case -EAGAIN:		/* Keep holding the socket if it is blocked */		rpc_delay(task, HZ>>4);		return;	case -ECONNREFUSED:	case -ENOTCONN:		if (!xprt->stream)			return;	default:		goto out_release;	} out_receive:	dprintk("RPC: %4d xmit complete\n", task->tk_pid);	/* Set the task's receive timeout value */	task->tk_timeout = req->rq_timeout.to_current;	rpc_add_timer(task, xprt_timer);	rpc_unlock_task(task); out_release:	xprt_up_transmit(task);}/* * Queue the task for a reply to our call. * When the callback is invoked, the congestion window should have * been updated already. */voidxprt_receive(struct rpc_task *task){	struct rpc_rqst	*req = task->tk_rqstp;	struct rpc_xprt	*xprt = req->rq_xprt;	dprintk("RPC: %4d xprt_receive\n", task->tk_pid);	req->rq_received = 0;	task->tk_timeout = 0;	rpc_sleep_locked(&xprt->pending, task, NULL, NULL);}/* * Reserve an RPC call slot. */intxprt_reserve(struct rpc_task *task){	struct rpc_xprt	*xprt = task->tk_xprt;	/* We already have an initialized request. */	if (task->tk_rqstp)		return 0;	dprintk("RPC: %4d xprt_reserve cong = %ld cwnd = %ld\n",				task->tk_pid, xprt->cong, xprt->cwnd);	spin_lock_bh(&xprt_sock_lock);	xprt_reserve_status(task);	if (task->tk_rqstp) {		task->tk_timeout = 0;	} else if (!task->tk_timeout) {		task->tk_status = -ENOBUFS;	} else {		dprintk("RPC:      xprt_reserve waiting on backlog\n");		task->tk_status = -EAGAIN;		rpc_sleep_on(&xprt->backlog, task, NULL, NULL);	}	spin_unlock_bh(&xprt_sock_lock);	dprintk("RPC: %4d xprt_reserve returns %d\n",				task->tk_pid, task->tk_status);	return task->tk_status;}/* * Reservation callback */static voidxprt_reserve_status(struct rpc_task *task){	struct rpc_xprt	*xprt = task->tk_xprt;	struct rpc_rqst	*req;	if (xprt->shutdown) {		task->tk_status = -EIO;	} else if (task->tk_status < 0) {		/* NOP */	} else if (task->tk_rqstp) {		/* We've already been given a request slot: NOP */	} else {		if (RPCXPRT_CONGESTED(xprt) || !(req = xprt->free))			goto out_nofree;		/* OK: There's room for us. Grab a free slot and bump		 * congestion value */		xprt->free     = req->rq_next;		req->rq_next   = NULL;		xprt->cong    += RPC_CWNDSCALE;		task->tk_rqstp = req;		xprt_request_init(task, xprt);		if (xprt->free)			xprt_clear_backlog(xprt);	}	return;out_nofree:	task->tk_status = -EAGAIN;}/* * Initialize RPC request */static voidxprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt){	struct rpc_rqst	*req = task->tk_rqstp;	static u32	xid = 0;	if (!xid)		xid = CURRENT_TIME << 12;	dprintk("RPC: %4d reserved req %p xid %08x\n", task->tk_pid, req, xid);	task->tk_status = 0;	req->rq_timeout = xprt->timeout;	req->rq_task	= task;	req->rq_xprt    = xprt;	req->rq_xid     = xid++;	if (!xid)		xid++;}/* * Release an RPC call slot */voidxprt_release(struct rpc_task *task){	struct rpc_xprt	*xprt = task->tk_xprt;	struct rpc_rqst	*req;	xprt_up_transmit(task);	if (!(req = task->tk_rqstp))		return;	task->tk_rqstp = NULL;	memset(req, 0, sizeof(*req));	/* mark unused */	dprintk("RPC: %4d release request %p\n", task->tk_pid, req);	/* remove slot from queue of pending */	if (task->tk_rpcwait) {		printk("RPC: task of released request still queued!\n");		rpc_remove_wait_queue(task);	}	spin_lock_bh(&xprt_sock_lock);	req->rq_next = xprt->free;	xprt->free   = req;	/* Decrease congestion value. */	xprt->cong -= RPC_CWNDSCALE;	xprt_clear_backlog(xprt);	spin_unlock_bh(&xprt_sock_lock);}/* * Set default timeout parameters */voidxprt_default_timeout(struct rpc_timeout *to, int proto){	if (proto == IPPROTO_UDP)		xprt_set_timeout(to, 5,  5 * HZ);	else		xprt_set_timeout(to, 5, 60 * HZ);}/* * Set constant timeout */voidxprt_set_timeout(struct rpc_timeout *to, unsigned int retr, unsigned long incr){	to->to_current   = 	to->to_initval   = 	to->to_increment = incr;	to->to_maxval    = incr * retr;	to->to_resrvval  = incr * retr;	to->to_retries   = retr;	to->to_exponential = 0;}/* * Initialize an RPC client */static struct rpc_xprt *xprt_setup(struct socket *sock, int proto,			struct sockaddr_in *ap, struct rpc_timeout *to){	struct rpc_xprt	*xprt;	struct rpc_rqst	*req;	int		i;	dprintk("RPC:      setting up %s transport...\n",				proto == IPPROTO_UDP? "UDP" : "TCP");	if ((xprt = kmalloc(sizeof(struct rpc_xprt), GFP_KERNEL)) == NULL)		return NULL;	memset(xprt, 0, sizeof(*xprt)); /* Nnnngh! */	xprt->addr = *ap;	xprt->prot = proto;	xprt->stream = (proto == IPPROTO_TCP)? 1 : 0;	if (xprt->stream) {		xprt->cwnd = RPC_MAXCWND;		xprt->nocong = 1;	} else		xprt->cwnd = RPC_INITCWND;	xprt->congtime = jiffies;	init_waitqueue_head(&xprt->cong_wait);	/* Set timeout parameters */	if (to) {		xprt->timeout = *to;		xprt->timeout.to_current = to->to_initval;		xprt->timeout.to_resrvval = to->to_maxval << 1;	} else		xprt_default_timeout(&xprt->timeout, xprt->prot);	xprt->pending = RPC_INIT_WAITQ("xprt_pending");	xprt->sending = RPC_INIT_WAITQ("xprt_sending");	xprt->backlog = RPC_INIT_WAITQ("xprt_backlog");	xprt->reconn  = RPC_INIT_WAITQ("xprt_reconn");	/* initialize free list */	for (i = 0, req = xprt->slot; i < RPC_MAXREQS-1; i++, req++)		req->rq_next = req + 1;	req->rq_next = NULL;	xprt->free = xprt->slot;	INIT_LIST_HEAD(&xprt->rx_pending);	dprintk("RPC:      created transport %p\n", xprt);		xprt_bind_socket(xprt, sock);	return xprt;}/* * Bind to a reserved port */static inline intxprt_bindresvport(struct socket *sock){	struct sockaddr_in myaddr;	int		err, port;	memset(&myaddr, 0, sizeof(myaddr));	myaddr.sin_family = AF_INET;	port = 800;	do {		myaddr.sin_port = htons(port);		err = sock->ops->bind(sock, (struct sockaddr *) &myaddr,						sizeof(myaddr));	} while (err == -EADDRINUSE && --port > 0);	if (err < 0)		printk("RPC: Can't bind to reserved port (%d).\n", -err);	return err;}static int xprt_bind_socket(struct rpc_xprt *xprt, struct socket *sock){	struct sock	*sk = sock->sk;	if (xprt->inet)		return -EBUSY;	sk->user_data = xprt;	xprt->old_data_ready = sk->data_ready;	xprt->old_state_change = sk->state_change;	xprt->old_write_space = sk->write_space;	if (xprt->prot == IPPROTO_UDP) {		sk->data_ready = udp_data_ready;		sk->write_space = udp_write_space;		sk->no_check = UDP_CSUM_NORCV;		xprt_set_connected(xprt);	} else {		sk->data_ready = tcp_data_ready;		sk->state_change = tcp_state_change;		sk->write_space = tcp_write_space;		xprt_clear_connected(xprt);	}	/* Reset to new socket */	xprt->sock = sock;	xprt->inet = sk;	/*	 *	TCP requires the rpc I/O daemon is present	 */	if(xprt->stream)		rpciod_up();	return 0;}/* * Create a client socket given the protocol and peer address. */static struct socket *xprt_create_socket(int proto, struct rpc_timeout *to){	struct socket	*sock;	int		type, err;	dprintk("RPC:      xprt_create_socket(%s %d)\n",			   (proto == IPPROTO_UDP)? "udp" : "tcp", proto);	type = (proto == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM;	if ((err = sock_create(PF_INET, type, proto, &sock)) < 0) {		printk("RPC: can't create socket (%d).\n", -err);		goto failed;	}	/* If the caller has the capability, bind to a reserved port */	if (capable(CAP_NET_BIND_SERVICE) && xprt_bindresvport(sock) < 0)		goto failed;	return sock;failed:	sock_release(sock);	return NULL;}/* * Create an RPC client transport given the protocol and peer address. */struct rpc_xprt *xprt_create_proto(int proto, struct sockaddr_in *sap, struct rpc_timeout *to){	struct socket	*sock;	struct rpc_xprt	*xprt;	dprintk("RPC:      xprt_create_proto called\n");	if (!(sock = xprt_create_socket(proto, to)))		return NULL;	if (!(xprt = xprt_setup(sock, proto, sap, to)))		sock_release(sock);	return xprt;}/* * Prepare for transport shutdown. */voidxprt_shutdown(struct rpc_xprt *xprt){	xprt->shutdown = 1;	rpc_wake_up(&xprt->sending);	rpc_wake_up(&xprt->pending);	rpc_wake_up(&xprt->backlog);	rpc_wake_up(&xprt->reconn);	if (waitqueue_active(&xprt->cong_wait))		wake_up(&xprt->cong_wait);}/* * Clear the xprt backlog queue */intxprt_clear_backlog(struct rpc_xprt *xprt) {	if (RPCXPRT_CONGESTED(xprt))		return 0;	rpc_wake_up_next(&xprt->backlog);	if (waitqueue_active(&xprt->cong_wait))		wake_up(&xprt->cong_wait);	return 1;}/* * Destroy an RPC transport, killing off all requests. */intxprt_destroy(struct rpc_xprt *xprt){	dprintk("RPC:      destroying transport %p\n", xprt);	xprt_shutdown(xprt);	xprt_close(xprt);	kfree(xprt);	return 0;}

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -