📄 xprt.c
字号:
/* * linux/net/sunrpc/xprt.c * * This is a generic RPC call interface supporting congestion avoidance, * and asynchronous calls. * * The interface works like this: * * - When a process places a call, it allocates a request slot if * one is available. Otherwise, it sleeps on the backlog queue * (xprt_reserve). * - Next, the caller puts together the RPC message, stuffs it into * the request struct, and calls xprt_transmit(). * - xprt_transmit sends the message and installs the caller on the * transport's wait list. At the same time, it installs a timer that * is run after the packet's timeout has expired. * - When a packet arrives, the data_ready handler walks the list of * pending requests for that transport. If a matching XID is found, the * caller is woken up, and the timer removed. * - When no reply arrives within the timeout interval, the timer is * fired by the kernel and runs xprt_timer(). It either adjusts the * timeout values (minor timeout) or wakes up the caller with a status * of -ETIMEDOUT. * - When the caller receives a notification from RPC that a reply arrived, * it should release the RPC slot, and process the reply. * If the call timed out, it may choose to retry the operation by * adjusting the initial timeout value, and simply calling rpc_call * again. * * Support for async RPC is done through a set of RPC-specific scheduling * primitives that `transparently' work for processes as well as async * tasks that rely on callbacks. * * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de> * * Transport switch API copyright (C) 2005, Chuck Lever <cel@netapp.com> */#include <linux/module.h>#include <linux/types.h>#include <linux/interrupt.h>#include <linux/workqueue.h>#include <linux/net.h>#include <linux/sunrpc/clnt.h>#include <linux/sunrpc/metrics.h>/* * Local variables */#ifdef RPC_DEBUG# define RPCDBG_FACILITY RPCDBG_XPRT#endif/* * Local functions */static void xprt_request_init(struct rpc_task *, struct rpc_xprt *);static inline void do_xprt_reserve(struct rpc_task *);static void xprt_connect_status(struct rpc_task *task);static int __xprt_get_cong(struct rpc_xprt *, struct rpc_task *);static DEFINE_SPINLOCK(xprt_list_lock);static LIST_HEAD(xprt_list);/* * The transport code maintains an estimate on the maximum number of out- * standing RPC requests, using a smoothed version of the congestion * avoidance implemented in 44BSD. This is basically the Van Jacobson * congestion algorithm: If a retransmit occurs, the congestion window is * halved; otherwise, it is incremented by 1/cwnd when * * - a reply is received and * - a full number of requests are outstanding and * - the congestion window hasn't been updated recently. */#define RPC_CWNDSHIFT (8U)#define RPC_CWNDSCALE (1U << RPC_CWNDSHIFT)#define RPC_INITCWND RPC_CWNDSCALE#define RPC_MAXCWND(xprt) ((xprt)->max_reqs << RPC_CWNDSHIFT)#define RPCXPRT_CONGESTED(xprt) ((xprt)->cong >= (xprt)->cwnd)/** * xprt_register_transport - register a transport implementation * @transport: transport to register * * If a transport implementation is loaded as a kernel module, it can * call this interface to make itself known to the RPC client. * * Returns: * 0: transport successfully registered * -EEXIST: transport already registered * -EINVAL: transport module being unloaded */int xprt_register_transport(struct xprt_class *transport){ struct xprt_class *t; int result; result = -EEXIST; spin_lock(&xprt_list_lock); list_for_each_entry(t, &xprt_list, list) { /* don't register the same transport class twice */ if (t->ident == transport->ident) goto out; } result = -EINVAL; if (try_module_get(THIS_MODULE)) { list_add_tail(&transport->list, &xprt_list); printk(KERN_INFO "RPC: Registered %s transport module.\n", transport->name); result = 0; }out: spin_unlock(&xprt_list_lock); return result;}EXPORT_SYMBOL_GPL(xprt_register_transport);/** * xprt_unregister_transport - unregister a transport implementation * transport: transport to unregister * * Returns: * 0: transport successfully unregistered * -ENOENT: transport never registered */int xprt_unregister_transport(struct xprt_class *transport){ struct xprt_class *t; int result; result = 0; spin_lock(&xprt_list_lock); list_for_each_entry(t, &xprt_list, list) { if (t == transport) { printk(KERN_INFO "RPC: Unregistered %s transport module.\n", transport->name); list_del_init(&transport->list); module_put(THIS_MODULE); goto out; } } result = -ENOENT;out: spin_unlock(&xprt_list_lock); return result;}EXPORT_SYMBOL_GPL(xprt_unregister_transport);/** * xprt_reserve_xprt - serialize write access to transports * @task: task that is requesting access to the transport * * This prevents mixing the payload of separate requests, and prevents * transport connects from colliding with writes. No congestion control * is provided. */int xprt_reserve_xprt(struct rpc_task *task){ struct rpc_xprt *xprt = task->tk_xprt; struct rpc_rqst *req = task->tk_rqstp; if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { if (task == xprt->snd_task) return 1; if (task == NULL) return 0; goto out_sleep; } xprt->snd_task = task; if (req) { req->rq_bytes_sent = 0; req->rq_ntrans++; } return 1;out_sleep: dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt); task->tk_timeout = 0; task->tk_status = -EAGAIN; if (req && req->rq_ntrans) rpc_sleep_on(&xprt->resend, task, NULL, NULL); else rpc_sleep_on(&xprt->sending, task, NULL, NULL); return 0;}EXPORT_SYMBOL_GPL(xprt_reserve_xprt);static void xprt_clear_locked(struct rpc_xprt *xprt){ xprt->snd_task = NULL; if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state) || xprt->shutdown) { smp_mb__before_clear_bit(); clear_bit(XPRT_LOCKED, &xprt->state); smp_mb__after_clear_bit(); } else queue_work(rpciod_workqueue, &xprt->task_cleanup);}/* * xprt_reserve_xprt_cong - serialize write access to transports * @task: task that is requesting access to the transport * * Same as xprt_reserve_xprt, but Van Jacobson congestion control is * integrated into the decision of whether a request is allowed to be * woken up and given access to the transport. */int xprt_reserve_xprt_cong(struct rpc_task *task){ struct rpc_xprt *xprt = task->tk_xprt; struct rpc_rqst *req = task->tk_rqstp; if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) { if (task == xprt->snd_task) return 1; goto out_sleep; } if (__xprt_get_cong(xprt, task)) { xprt->snd_task = task; if (req) { req->rq_bytes_sent = 0; req->rq_ntrans++; } return 1; } xprt_clear_locked(xprt);out_sleep: dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt); task->tk_timeout = 0; task->tk_status = -EAGAIN; if (req && req->rq_ntrans) rpc_sleep_on(&xprt->resend, task, NULL, NULL); else rpc_sleep_on(&xprt->sending, task, NULL, NULL); return 0;}EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong);static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task){ int retval; spin_lock_bh(&xprt->transport_lock); retval = xprt->ops->reserve_xprt(task); spin_unlock_bh(&xprt->transport_lock); return retval;}static void __xprt_lock_write_next(struct rpc_xprt *xprt){ struct rpc_task *task; struct rpc_rqst *req; if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) return; task = rpc_wake_up_next(&xprt->resend); if (!task) { task = rpc_wake_up_next(&xprt->sending); if (!task) goto out_unlock; } req = task->tk_rqstp; xprt->snd_task = task; if (req) { req->rq_bytes_sent = 0; req->rq_ntrans++; } return;out_unlock: xprt_clear_locked(xprt);}static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt){ struct rpc_task *task; if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) return; if (RPCXPRT_CONGESTED(xprt)) goto out_unlock; task = rpc_wake_up_next(&xprt->resend); if (!task) { task = rpc_wake_up_next(&xprt->sending); if (!task) goto out_unlock; } if (__xprt_get_cong(xprt, task)) { struct rpc_rqst *req = task->tk_rqstp; xprt->snd_task = task; if (req) { req->rq_bytes_sent = 0; req->rq_ntrans++; } return; }out_unlock: xprt_clear_locked(xprt);}/** * xprt_release_xprt - allow other requests to use a transport * @xprt: transport with other tasks potentially waiting * @task: task that is releasing access to the transport * * Note that "task" can be NULL. No congestion control is provided. */void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task){ if (xprt->snd_task == task) { xprt_clear_locked(xprt); __xprt_lock_write_next(xprt); }}EXPORT_SYMBOL_GPL(xprt_release_xprt);/** * xprt_release_xprt_cong - allow other requests to use a transport * @xprt: transport with other tasks potentially waiting * @task: task that is releasing access to the transport * * Note that "task" can be NULL. Another task is awoken to use the * transport if the transport's congestion window allows it. */void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task){ if (xprt->snd_task == task) { xprt_clear_locked(xprt); __xprt_lock_write_next_cong(xprt); }}EXPORT_SYMBOL_GPL(xprt_release_xprt_cong);static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task){ spin_lock_bh(&xprt->transport_lock); xprt->ops->release_xprt(xprt, task); spin_unlock_bh(&xprt->transport_lock);}/* * Van Jacobson congestion avoidance. Check if the congestion window * overflowed. Put the task to sleep if this is the case. */static int__xprt_get_cong(struct rpc_xprt *xprt, struct rpc_task *task){ struct rpc_rqst *req = task->tk_rqstp; if (req->rq_cong) return 1; dprintk("RPC: %5u xprt_cwnd_limited cong = %lu cwnd = %lu\n", task->tk_pid, xprt->cong, xprt->cwnd); if (RPCXPRT_CONGESTED(xprt)) return 0; req->rq_cong = 1; xprt->cong += RPC_CWNDSCALE; return 1;}/* * Adjust the congestion window, and wake up the next task * that has been sleeping due to congestion */static void__xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req){ if (!req->rq_cong) return; req->rq_cong = 0; xprt->cong -= RPC_CWNDSCALE; __xprt_lock_write_next_cong(xprt);}/** * xprt_release_rqst_cong - housekeeping when request is complete * @task: RPC request that recently completed * * Useful for transports that require congestion control. */void xprt_release_rqst_cong(struct rpc_task *task){ __xprt_put_cong(task->tk_xprt, task->tk_rqstp);}EXPORT_SYMBOL_GPL(xprt_release_rqst_cong);/** * xprt_adjust_cwnd - adjust transport congestion window * @task: recently completed RPC request used to adjust window * @result: result code of completed RPC request * * We use a time-smoothed congestion estimator to avoid heavy oscillation. */void xprt_adjust_cwnd(struct rpc_task *task, int result){ struct rpc_rqst *req = task->tk_rqstp; struct rpc_xprt *xprt = task->tk_xprt; unsigned long cwnd = xprt->cwnd; if (result >= 0 && cwnd <= xprt->cong) { /* The (cwnd >> 1) term makes sure * the result gets rounded properly. */ cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd; if (cwnd > RPC_MAXCWND(xprt)) cwnd = RPC_MAXCWND(xprt); __xprt_lock_write_next_cong(xprt); } else if (result == -ETIMEDOUT) { cwnd >>= 1; if (cwnd < RPC_CWNDSCALE) cwnd = RPC_CWNDSCALE; } dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n", xprt->cong, xprt->cwnd, cwnd); xprt->cwnd = cwnd; __xprt_put_cong(xprt, req);}EXPORT_SYMBOL_GPL(xprt_adjust_cwnd);/** * xprt_wake_pending_tasks - wake all tasks on a transport's pending queue * @xprt: transport with waiting tasks * @status: result code to plant in each task before waking it * */void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status){ if (status < 0) rpc_wake_up_status(&xprt->pending, status); else rpc_wake_up(&xprt->pending);}EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks);/** * xprt_wait_for_buffer_space - wait for transport output buffer to clear * @task: task to be put to sleep * */void xprt_wait_for_buffer_space(struct rpc_task *task){ struct rpc_rqst *req = task->tk_rqstp; struct rpc_xprt *xprt = req->rq_xprt; task->tk_timeout = req->rq_timeout; rpc_sleep_on(&xprt->pending, task, NULL, NULL);}EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space);/** * xprt_write_space - wake the task waiting for transport output buffer space * @xprt: transport with waiting tasks * * Can be called in a soft IRQ context, so xprt_write_space never sleeps. */void xprt_write_space(struct rpc_xprt *xprt){ if (unlikely(xprt->shutdown)) return; spin_lock_bh(&xprt->transport_lock); if (xprt->snd_task) { dprintk("RPC: write space: waking waiting task on " "xprt %p\n", xprt); rpc_wake_up_task(xprt->snd_task); } spin_unlock_bh(&xprt->transport_lock);}EXPORT_SYMBOL_GPL(xprt_write_space);/** * xprt_set_retrans_timeout_def - set a request's retransmit timeout * @task: task whose timeout is to be set * * Set a request's retransmit timeout based on the transport's * default timeout parameters. Used by transports that don't adjust * the retransmit timeout based on round-trip time estimation. */void xprt_set_retrans_timeout_def(struct rpc_task *task){ task->tk_timeout = task->tk_rqstp->rq_timeout;}EXPORT_SYMBOL_GPL(xprt_set_retrans_timeout_def);/* * xprt_set_retrans_timeout_rtt - set a request's retransmit timeout * @task: task whose timeout is to be set * * Set a request's retransmit timeout using the RTT estimator. */void xprt_set_retrans_timeout_rtt(struct rpc_task *task){ int timer = task->tk_msg.rpc_proc->p_timer; struct rpc_rtt *rtt = task->tk_client->cl_rtt; struct rpc_rqst *req = task->tk_rqstp; unsigned long max_timeout = req->rq_xprt->timeout.to_maxval; task->tk_timeout = rpc_calc_rto(rtt, timer); task->tk_timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries; if (task->tk_timeout > max_timeout || task->tk_timeout == 0) task->tk_timeout = max_timeout;}EXPORT_SYMBOL_GPL(xprt_set_retrans_timeout_rtt);static void xprt_reset_majortimeo(struct rpc_rqst *req){ struct rpc_timeout *to = &req->rq_xprt->timeout; req->rq_majortimeo = req->rq_timeout; if (to->to_exponential) req->rq_majortimeo <<= to->to_retries; else req->rq_majortimeo += to->to_increment * to->to_retries; if (req->rq_majortimeo > to->to_maxval || req->rq_majortimeo == 0) req->rq_majortimeo = to->to_maxval; req->rq_majortimeo += jiffies;}/** * xprt_adjust_timeout - adjust timeout values for next retransmit * @req: RPC request containing parameters to use for the adjustment * */int xprt_adjust_timeout(struct rpc_rqst *req){ struct rpc_xprt *xprt = req->rq_xprt; struct rpc_timeout *to = &xprt->timeout; int status = 0;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -