⭐ 欢迎来到虫虫下载站! | 📦 资源下载 📁 资源专辑 ℹ️ 关于我们
⭐ 虫虫下载站

📄 xprt.c

📁 linux 内核源代码
💻 C
📖 第 1 页 / 共 2 页
字号:
/* *  linux/net/sunrpc/xprt.c * *  This is a generic RPC call interface supporting congestion avoidance, *  and asynchronous calls. * *  The interface works like this: * *  -	When a process places a call, it allocates a request slot if *	one is available. Otherwise, it sleeps on the backlog queue *	(xprt_reserve). *  -	Next, the caller puts together the RPC message, stuffs it into *	the request struct, and calls xprt_transmit(). *  -	xprt_transmit sends the message and installs the caller on the *	transport's wait list. At the same time, it installs a timer that *	is run after the packet's timeout has expired. *  -	When a packet arrives, the data_ready handler walks the list of *	pending requests for that transport. If a matching XID is found, the *	caller is woken up, and the timer removed. *  -	When no reply arrives within the timeout interval, the timer is *	fired by the kernel and runs xprt_timer(). It either adjusts the *	timeout values (minor timeout) or wakes up the caller with a status *	of -ETIMEDOUT. *  -	When the caller receives a notification from RPC that a reply arrived, *	it should release the RPC slot, and process the reply. *	If the call timed out, it may choose to retry the operation by *	adjusting the initial timeout value, and simply calling rpc_call *	again. * *  Support for async RPC is done through a set of RPC-specific scheduling *  primitives that `transparently' work for processes as well as async *  tasks that rely on callbacks. * *  Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de> * *  Transport switch API copyright (C) 2005, Chuck Lever <cel@netapp.com> */#include <linux/module.h>#include <linux/types.h>#include <linux/interrupt.h>#include <linux/workqueue.h>#include <linux/net.h>#include <linux/sunrpc/clnt.h>#include <linux/sunrpc/metrics.h>/* * Local variables */#ifdef RPC_DEBUG# define RPCDBG_FACILITY	RPCDBG_XPRT#endif/* * Local functions */static void	xprt_request_init(struct rpc_task *, struct rpc_xprt *);static inline void	do_xprt_reserve(struct rpc_task *);static void	xprt_connect_status(struct rpc_task *task);static int      __xprt_get_cong(struct rpc_xprt *, struct rpc_task *);static DEFINE_SPINLOCK(xprt_list_lock);static LIST_HEAD(xprt_list);/* * The transport code maintains an estimate on the maximum number of out- * standing RPC requests, using a smoothed version of the congestion * avoidance implemented in 44BSD. This is basically the Van Jacobson * congestion algorithm: If a retransmit occurs, the congestion window is * halved; otherwise, it is incremented by 1/cwnd when * *	-	a reply is received and *	-	a full number of requests are outstanding and *	-	the congestion window hasn't been updated recently. */#define RPC_CWNDSHIFT		(8U)#define RPC_CWNDSCALE		(1U << RPC_CWNDSHIFT)#define RPC_INITCWND		RPC_CWNDSCALE#define RPC_MAXCWND(xprt)	((xprt)->max_reqs << RPC_CWNDSHIFT)#define RPCXPRT_CONGESTED(xprt) ((xprt)->cong >= (xprt)->cwnd)/** * xprt_register_transport - register a transport implementation * @transport: transport to register * * If a transport implementation is loaded as a kernel module, it can * call this interface to make itself known to the RPC client. * * Returns: * 0:		transport successfully registered * -EEXIST:	transport already registered * -EINVAL:	transport module being unloaded */int xprt_register_transport(struct xprt_class *transport){	struct xprt_class *t;	int result;	result = -EEXIST;	spin_lock(&xprt_list_lock);	list_for_each_entry(t, &xprt_list, list) {		/* don't register the same transport class twice */		if (t->ident == transport->ident)			goto out;	}	result = -EINVAL;	if (try_module_get(THIS_MODULE)) {		list_add_tail(&transport->list, &xprt_list);		printk(KERN_INFO "RPC: Registered %s transport module.\n",			transport->name);		result = 0;	}out:	spin_unlock(&xprt_list_lock);	return result;}EXPORT_SYMBOL_GPL(xprt_register_transport);/** * xprt_unregister_transport - unregister a transport implementation * transport: transport to unregister * * Returns: * 0:		transport successfully unregistered * -ENOENT:	transport never registered */int xprt_unregister_transport(struct xprt_class *transport){	struct xprt_class *t;	int result;	result = 0;	spin_lock(&xprt_list_lock);	list_for_each_entry(t, &xprt_list, list) {		if (t == transport) {			printk(KERN_INFO				"RPC: Unregistered %s transport module.\n",				transport->name);			list_del_init(&transport->list);			module_put(THIS_MODULE);			goto out;		}	}	result = -ENOENT;out:	spin_unlock(&xprt_list_lock);	return result;}EXPORT_SYMBOL_GPL(xprt_unregister_transport);/** * xprt_reserve_xprt - serialize write access to transports * @task: task that is requesting access to the transport * * This prevents mixing the payload of separate requests, and prevents * transport connects from colliding with writes.  No congestion control * is provided. */int xprt_reserve_xprt(struct rpc_task *task){	struct rpc_xprt	*xprt = task->tk_xprt;	struct rpc_rqst *req = task->tk_rqstp;	if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {		if (task == xprt->snd_task)			return 1;		if (task == NULL)			return 0;		goto out_sleep;	}	xprt->snd_task = task;	if (req) {		req->rq_bytes_sent = 0;		req->rq_ntrans++;	}	return 1;out_sleep:	dprintk("RPC: %5u failed to lock transport %p\n",			task->tk_pid, xprt);	task->tk_timeout = 0;	task->tk_status = -EAGAIN;	if (req && req->rq_ntrans)		rpc_sleep_on(&xprt->resend, task, NULL, NULL);	else		rpc_sleep_on(&xprt->sending, task, NULL, NULL);	return 0;}EXPORT_SYMBOL_GPL(xprt_reserve_xprt);static void xprt_clear_locked(struct rpc_xprt *xprt){	xprt->snd_task = NULL;	if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state) || xprt->shutdown) {		smp_mb__before_clear_bit();		clear_bit(XPRT_LOCKED, &xprt->state);		smp_mb__after_clear_bit();	} else		queue_work(rpciod_workqueue, &xprt->task_cleanup);}/* * xprt_reserve_xprt_cong - serialize write access to transports * @task: task that is requesting access to the transport * * Same as xprt_reserve_xprt, but Van Jacobson congestion control is * integrated into the decision of whether a request is allowed to be * woken up and given access to the transport. */int xprt_reserve_xprt_cong(struct rpc_task *task){	struct rpc_xprt	*xprt = task->tk_xprt;	struct rpc_rqst *req = task->tk_rqstp;	if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {		if (task == xprt->snd_task)			return 1;		goto out_sleep;	}	if (__xprt_get_cong(xprt, task)) {		xprt->snd_task = task;		if (req) {			req->rq_bytes_sent = 0;			req->rq_ntrans++;		}		return 1;	}	xprt_clear_locked(xprt);out_sleep:	dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt);	task->tk_timeout = 0;	task->tk_status = -EAGAIN;	if (req && req->rq_ntrans)		rpc_sleep_on(&xprt->resend, task, NULL, NULL);	else		rpc_sleep_on(&xprt->sending, task, NULL, NULL);	return 0;}EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong);static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task){	int retval;	spin_lock_bh(&xprt->transport_lock);	retval = xprt->ops->reserve_xprt(task);	spin_unlock_bh(&xprt->transport_lock);	return retval;}static void __xprt_lock_write_next(struct rpc_xprt *xprt){	struct rpc_task *task;	struct rpc_rqst *req;	if (test_and_set_bit(XPRT_LOCKED, &xprt->state))		return;	task = rpc_wake_up_next(&xprt->resend);	if (!task) {		task = rpc_wake_up_next(&xprt->sending);		if (!task)			goto out_unlock;	}	req = task->tk_rqstp;	xprt->snd_task = task;	if (req) {		req->rq_bytes_sent = 0;		req->rq_ntrans++;	}	return;out_unlock:	xprt_clear_locked(xprt);}static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt){	struct rpc_task *task;	if (test_and_set_bit(XPRT_LOCKED, &xprt->state))		return;	if (RPCXPRT_CONGESTED(xprt))		goto out_unlock;	task = rpc_wake_up_next(&xprt->resend);	if (!task) {		task = rpc_wake_up_next(&xprt->sending);		if (!task)			goto out_unlock;	}	if (__xprt_get_cong(xprt, task)) {		struct rpc_rqst *req = task->tk_rqstp;		xprt->snd_task = task;		if (req) {			req->rq_bytes_sent = 0;			req->rq_ntrans++;		}		return;	}out_unlock:	xprt_clear_locked(xprt);}/** * xprt_release_xprt - allow other requests to use a transport * @xprt: transport with other tasks potentially waiting * @task: task that is releasing access to the transport * * Note that "task" can be NULL.  No congestion control is provided. */void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task){	if (xprt->snd_task == task) {		xprt_clear_locked(xprt);		__xprt_lock_write_next(xprt);	}}EXPORT_SYMBOL_GPL(xprt_release_xprt);/** * xprt_release_xprt_cong - allow other requests to use a transport * @xprt: transport with other tasks potentially waiting * @task: task that is releasing access to the transport * * Note that "task" can be NULL.  Another task is awoken to use the * transport if the transport's congestion window allows it. */void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task){	if (xprt->snd_task == task) {		xprt_clear_locked(xprt);		__xprt_lock_write_next_cong(xprt);	}}EXPORT_SYMBOL_GPL(xprt_release_xprt_cong);static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task){	spin_lock_bh(&xprt->transport_lock);	xprt->ops->release_xprt(xprt, task);	spin_unlock_bh(&xprt->transport_lock);}/* * Van Jacobson congestion avoidance. Check if the congestion window * overflowed. Put the task to sleep if this is the case. */static int__xprt_get_cong(struct rpc_xprt *xprt, struct rpc_task *task){	struct rpc_rqst *req = task->tk_rqstp;	if (req->rq_cong)		return 1;	dprintk("RPC: %5u xprt_cwnd_limited cong = %lu cwnd = %lu\n",			task->tk_pid, xprt->cong, xprt->cwnd);	if (RPCXPRT_CONGESTED(xprt))		return 0;	req->rq_cong = 1;	xprt->cong += RPC_CWNDSCALE;	return 1;}/* * Adjust the congestion window, and wake up the next task * that has been sleeping due to congestion */static void__xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req){	if (!req->rq_cong)		return;	req->rq_cong = 0;	xprt->cong -= RPC_CWNDSCALE;	__xprt_lock_write_next_cong(xprt);}/** * xprt_release_rqst_cong - housekeeping when request is complete * @task: RPC request that recently completed * * Useful for transports that require congestion control. */void xprt_release_rqst_cong(struct rpc_task *task){	__xprt_put_cong(task->tk_xprt, task->tk_rqstp);}EXPORT_SYMBOL_GPL(xprt_release_rqst_cong);/** * xprt_adjust_cwnd - adjust transport congestion window * @task: recently completed RPC request used to adjust window * @result: result code of completed RPC request * * We use a time-smoothed congestion estimator to avoid heavy oscillation. */void xprt_adjust_cwnd(struct rpc_task *task, int result){	struct rpc_rqst *req = task->tk_rqstp;	struct rpc_xprt *xprt = task->tk_xprt;	unsigned long cwnd = xprt->cwnd;	if (result >= 0 && cwnd <= xprt->cong) {		/* The (cwnd >> 1) term makes sure		 * the result gets rounded properly. */		cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd;		if (cwnd > RPC_MAXCWND(xprt))			cwnd = RPC_MAXCWND(xprt);		__xprt_lock_write_next_cong(xprt);	} else if (result == -ETIMEDOUT) {		cwnd >>= 1;		if (cwnd < RPC_CWNDSCALE)			cwnd = RPC_CWNDSCALE;	}	dprintk("RPC:       cong %ld, cwnd was %ld, now %ld\n",			xprt->cong, xprt->cwnd, cwnd);	xprt->cwnd = cwnd;	__xprt_put_cong(xprt, req);}EXPORT_SYMBOL_GPL(xprt_adjust_cwnd);/** * xprt_wake_pending_tasks - wake all tasks on a transport's pending queue * @xprt: transport with waiting tasks * @status: result code to plant in each task before waking it * */void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status){	if (status < 0)		rpc_wake_up_status(&xprt->pending, status);	else		rpc_wake_up(&xprt->pending);}EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks);/** * xprt_wait_for_buffer_space - wait for transport output buffer to clear * @task: task to be put to sleep * */void xprt_wait_for_buffer_space(struct rpc_task *task){	struct rpc_rqst *req = task->tk_rqstp;	struct rpc_xprt *xprt = req->rq_xprt;	task->tk_timeout = req->rq_timeout;	rpc_sleep_on(&xprt->pending, task, NULL, NULL);}EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space);/** * xprt_write_space - wake the task waiting for transport output buffer space * @xprt: transport with waiting tasks * * Can be called in a soft IRQ context, so xprt_write_space never sleeps. */void xprt_write_space(struct rpc_xprt *xprt){	if (unlikely(xprt->shutdown))		return;	spin_lock_bh(&xprt->transport_lock);	if (xprt->snd_task) {		dprintk("RPC:       write space: waking waiting task on "				"xprt %p\n", xprt);		rpc_wake_up_task(xprt->snd_task);	}	spin_unlock_bh(&xprt->transport_lock);}EXPORT_SYMBOL_GPL(xprt_write_space);/** * xprt_set_retrans_timeout_def - set a request's retransmit timeout * @task: task whose timeout is to be set * * Set a request's retransmit timeout based on the transport's * default timeout parameters.  Used by transports that don't adjust * the retransmit timeout based on round-trip time estimation. */void xprt_set_retrans_timeout_def(struct rpc_task *task){	task->tk_timeout = task->tk_rqstp->rq_timeout;}EXPORT_SYMBOL_GPL(xprt_set_retrans_timeout_def);/* * xprt_set_retrans_timeout_rtt - set a request's retransmit timeout * @task: task whose timeout is to be set * * Set a request's retransmit timeout using the RTT estimator. */void xprt_set_retrans_timeout_rtt(struct rpc_task *task){	int timer = task->tk_msg.rpc_proc->p_timer;	struct rpc_rtt *rtt = task->tk_client->cl_rtt;	struct rpc_rqst *req = task->tk_rqstp;	unsigned long max_timeout = req->rq_xprt->timeout.to_maxval;	task->tk_timeout = rpc_calc_rto(rtt, timer);	task->tk_timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries;	if (task->tk_timeout > max_timeout || task->tk_timeout == 0)		task->tk_timeout = max_timeout;}EXPORT_SYMBOL_GPL(xprt_set_retrans_timeout_rtt);static void xprt_reset_majortimeo(struct rpc_rqst *req){	struct rpc_timeout *to = &req->rq_xprt->timeout;	req->rq_majortimeo = req->rq_timeout;	if (to->to_exponential)		req->rq_majortimeo <<= to->to_retries;	else		req->rq_majortimeo += to->to_increment * to->to_retries;	if (req->rq_majortimeo > to->to_maxval || req->rq_majortimeo == 0)		req->rq_majortimeo = to->to_maxval;	req->rq_majortimeo += jiffies;}/** * xprt_adjust_timeout - adjust timeout values for next retransmit * @req: RPC request containing parameters to use for the adjustment * */int xprt_adjust_timeout(struct rpc_rqst *req){	struct rpc_xprt *xprt = req->rq_xprt;	struct rpc_timeout *to = &xprt->timeout;	int status = 0;

⌨️ 快捷键说明

复制代码 Ctrl + C
搜索代码 Ctrl + F
全屏模式 F11
切换主题 Ctrl + Shift + D
显示快捷键 ?
增大字号 Ctrl + =
减小字号 Ctrl + -