📄 xprt.c
字号:
/* * linux/net/sunrpc/xprt.c * * This is a generic RPC call interface supporting congestion avoidance, * and asynchronous calls. * * The interface works like this: * * - When a process places a call, it allocates a request slot if * one is available. Otherwise, it sleeps on the backlog queue * (xprt_reserve). * - Next, the caller puts together the RPC message, stuffs it into * the request struct, and calls xprt_call(). * - xprt_call transmits the message and installs the caller on the * socket's wait list. At the same time, it installs a timer that * is run after the packet's timeout has expired. * - When a packet arrives, the data_ready handler walks the list of * pending requests for that socket. If a matching XID is found, the * caller is woken up, and the timer removed. * - When no reply arrives within the timeout interval, the timer is * fired by the kernel and runs xprt_timer(). It either adjusts the * timeout values (minor timeout) or wakes up the caller with a status * of -ETIMEDOUT. * - When the caller receives a notification from RPC that a reply arrived, * it should release the RPC slot, and process the reply. * If the call timed out, it may choose to retry the operation by * adjusting the initial timeout value, and simply calling rpc_call * again. * * Support for async RPC is done through a set of RPC-specific scheduling * primitives that `transparently' work for processes as well as async * tasks that rely on callbacks. * * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de> * * TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com> * TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com> * TCP NFS related read + write fixes * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie> * * Rewrite of larges part of the code in order to stabilize TCP stuff. * Fix behaviour when socket buffer is full. * (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no> */#include <linux/types.h>#include <linux/slab.h>#include <linux/capability.h>#include <linux/sched.h>#include <linux/errno.h>#include <linux/socket.h>#include <linux/in.h>#include <linux/net.h>#include <linux/mm.h>#include <linux/udp.h>#include <linux/tcp.h>#include <linux/sunrpc/clnt.h>#include <linux/file.h>#include <linux/workqueue.h>#include <linux/random.h>#include <net/sock.h>#include <net/checksum.h>#include <net/udp.h>#include <net/tcp.h>/* * Local variables */#ifdef RPC_DEBUG# undef RPC_DEBUG_DATA# define RPCDBG_FACILITY RPCDBG_XPRT#endif#define XPRT_MAX_BACKOFF (8)#define XPRT_IDLE_TIMEOUT (5*60*HZ)#define XPRT_MAX_RESVPORT (800)/* * Local functions */static void xprt_request_init(struct rpc_task *, struct rpc_xprt *);static inline void do_xprt_reserve(struct rpc_task *);static void xprt_disconnect(struct rpc_xprt *);static void xprt_connect_status(struct rpc_task *task);static struct rpc_xprt * xprt_setup(int proto, struct sockaddr_in *ap, struct rpc_timeout *to);static struct socket *xprt_create_socket(struct rpc_xprt *, int, int);static void xprt_bind_socket(struct rpc_xprt *, struct socket *);static int __xprt_get_cong(struct rpc_xprt *, struct rpc_task *);#ifdef RPC_DEBUG_DATA/* * Print the buffer contents (first 128 bytes only--just enough for * diropres return). */static voidxprt_pktdump(char *msg, u32 *packet, unsigned int count){ u8 *buf = (u8 *) packet; int j; dprintk("RPC: %s\n", msg); for (j = 0; j < count && j < 128; j += 4) { if (!(j & 31)) { if (j) dprintk("\n"); dprintk("0x%04x ", j); } dprintk("%02x%02x%02x%02x ", buf[j], buf[j+1], buf[j+2], buf[j+3]); } dprintk("\n");}#elsestatic inline voidxprt_pktdump(char *msg, u32 *packet, unsigned int count){ /* NOP */}#endif/* * Look up RPC transport given an INET socket */static inline struct rpc_xprt *xprt_from_sock(struct sock *sk){ return (struct rpc_xprt *) sk->sk_user_data;}/* * Serialize write access to sockets, in order to prevent different * requests from interfering with each other. * Also prevents TCP socket connects from colliding with writes. */static int__xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task){ struct rpc_rqst *req = task->tk_rqstp; if (test_and_set_bit(XPRT_LOCKED, &xprt->sockstate)) { if (task == xprt->snd_task) return 1; if (task == NULL) return 0; goto out_sleep; } if (xprt->nocong || __xprt_get_cong(xprt, task)) { xprt->snd_task = task; if (req) { req->rq_bytes_sent = 0; req->rq_ntrans++; } return 1; } smp_mb__before_clear_bit(); clear_bit(XPRT_LOCKED, &xprt->sockstate); smp_mb__after_clear_bit();out_sleep: dprintk("RPC: %4d failed to lock socket %p\n", task->tk_pid, xprt); task->tk_timeout = 0; task->tk_status = -EAGAIN; if (req && req->rq_ntrans) rpc_sleep_on(&xprt->resend, task, NULL, NULL); else rpc_sleep_on(&xprt->sending, task, NULL, NULL); return 0;}static inline intxprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task){ int retval; spin_lock_bh(&xprt->sock_lock); retval = __xprt_lock_write(xprt, task); spin_unlock_bh(&xprt->sock_lock); return retval;}static void__xprt_lock_write_next(struct rpc_xprt *xprt){ struct rpc_task *task; if (test_and_set_bit(XPRT_LOCKED, &xprt->sockstate)) return; if (!xprt->nocong && RPCXPRT_CONGESTED(xprt)) goto out_unlock; task = rpc_wake_up_next(&xprt->resend); if (!task) { task = rpc_wake_up_next(&xprt->sending); if (!task) goto out_unlock; } if (xprt->nocong || __xprt_get_cong(xprt, task)) { struct rpc_rqst *req = task->tk_rqstp; xprt->snd_task = task; if (req) { req->rq_bytes_sent = 0; req->rq_ntrans++; } return; }out_unlock: smp_mb__before_clear_bit(); clear_bit(XPRT_LOCKED, &xprt->sockstate); smp_mb__after_clear_bit();}/* * Releases the socket for use by other requests. */static void__xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task){ if (xprt->snd_task == task) { xprt->snd_task = NULL; smp_mb__before_clear_bit(); clear_bit(XPRT_LOCKED, &xprt->sockstate); smp_mb__after_clear_bit(); __xprt_lock_write_next(xprt); }}static inline voidxprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task){ spin_lock_bh(&xprt->sock_lock); __xprt_release_write(xprt, task); spin_unlock_bh(&xprt->sock_lock);}/* * Write data to socket. */static inline intxprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req){ struct socket *sock = xprt->sock; struct xdr_buf *xdr = &req->rq_snd_buf; struct sockaddr *addr = NULL; int addrlen = 0; unsigned int skip; int result; if (!sock) return -ENOTCONN; xprt_pktdump("packet data:", req->rq_svec->iov_base, req->rq_svec->iov_len); /* For UDP, we need to provide an address */ if (!xprt->stream) { addr = (struct sockaddr *) &xprt->addr; addrlen = sizeof(xprt->addr); } /* Dont repeat bytes */ skip = req->rq_bytes_sent; clear_bit(SOCK_ASYNC_NOSPACE, &sock->flags); result = xdr_sendpages(sock, addr, addrlen, xdr, skip, MSG_DONTWAIT); dprintk("RPC: xprt_sendmsg(%d) = %d\n", xdr->len - skip, result); if (result >= 0) return result; switch (result) { case -ECONNREFUSED: /* When the server has died, an ICMP port unreachable message * prompts ECONNREFUSED. */ case -EAGAIN: break; case -ECONNRESET: case -ENOTCONN: case -EPIPE: /* connection broken */ if (xprt->stream) result = -ENOTCONN; break; default: printk(KERN_NOTICE "RPC: sendmsg returned error %d\n", -result); } return result;}/* * Van Jacobson congestion avoidance. Check if the congestion window * overflowed. Put the task to sleep if this is the case. */static int__xprt_get_cong(struct rpc_xprt *xprt, struct rpc_task *task){ struct rpc_rqst *req = task->tk_rqstp; if (req->rq_cong) return 1; dprintk("RPC: %4d xprt_cwnd_limited cong = %ld cwnd = %ld\n", task->tk_pid, xprt->cong, xprt->cwnd); if (RPCXPRT_CONGESTED(xprt)) return 0; req->rq_cong = 1; xprt->cong += RPC_CWNDSCALE; return 1;}/* * Adjust the congestion window, and wake up the next task * that has been sleeping due to congestion */static void__xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req){ if (!req->rq_cong) return; req->rq_cong = 0; xprt->cong -= RPC_CWNDSCALE; __xprt_lock_write_next(xprt);}/* * Adjust RPC congestion window * We use a time-smoothed congestion estimator to avoid heavy oscillation. */static voidxprt_adjust_cwnd(struct rpc_xprt *xprt, int result){ unsigned long cwnd; cwnd = xprt->cwnd; if (result >= 0 && cwnd <= xprt->cong) { /* The (cwnd >> 1) term makes sure * the result gets rounded properly. */ cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd; if (cwnd > RPC_MAXCWND(xprt)) cwnd = RPC_MAXCWND(xprt); __xprt_lock_write_next(xprt); } else if (result == -ETIMEDOUT) { cwnd >>= 1; if (cwnd < RPC_CWNDSCALE) cwnd = RPC_CWNDSCALE; } dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n", xprt->cong, xprt->cwnd, cwnd); xprt->cwnd = cwnd;}/* * Reset the major timeout value */static void xprt_reset_majortimeo(struct rpc_rqst *req){ struct rpc_timeout *to = &req->rq_xprt->timeout; req->rq_majortimeo = req->rq_timeout; if (to->to_exponential) req->rq_majortimeo <<= to->to_retries; else req->rq_majortimeo += to->to_increment * to->to_retries; if (req->rq_majortimeo > to->to_maxval || req->rq_majortimeo == 0) req->rq_majortimeo = to->to_maxval; req->rq_majortimeo += jiffies;}/* * Adjust timeout values etc for next retransmit */int xprt_adjust_timeout(struct rpc_rqst *req){ struct rpc_xprt *xprt = req->rq_xprt; struct rpc_timeout *to = &xprt->timeout; int status = 0; if (time_before(jiffies, req->rq_majortimeo)) { if (to->to_exponential) req->rq_timeout <<= 1; else req->rq_timeout += to->to_increment; if (to->to_maxval && req->rq_timeout >= to->to_maxval) req->rq_timeout = to->to_maxval; req->rq_retries++; pprintk("RPC: %lu retrans\n", jiffies); } else { req->rq_timeout = to->to_initval; req->rq_retries = 0; xprt_reset_majortimeo(req); /* Reset the RTT counters == "slow start" */ spin_lock_bh(&xprt->sock_lock); rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval); spin_unlock_bh(&xprt->sock_lock); pprintk("RPC: %lu timeout\n", jiffies); status = -ETIMEDOUT; } if (req->rq_timeout == 0) { printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n"); req->rq_timeout = 5 * HZ; } return status;}/* * Close down a transport socket */static voidxprt_close(struct rpc_xprt *xprt){ struct socket *sock = xprt->sock; struct sock *sk = xprt->inet; if (!sk) return; write_lock_bh(&sk->sk_callback_lock); xprt->inet = NULL; xprt->sock = NULL; sk->sk_user_data = NULL; sk->sk_data_ready = xprt->old_data_ready; sk->sk_state_change = xprt->old_state_change; sk->sk_write_space = xprt->old_write_space; write_unlock_bh(&sk->sk_callback_lock); sk->sk_no_check = 0; sock_release(sock);}static voidxprt_socket_autoclose(void *args){ struct rpc_xprt *xprt = (struct rpc_xprt *)args; xprt_disconnect(xprt); xprt_close(xprt); xprt_release_write(xprt, NULL);}/* * Mark a transport as disconnected */static voidxprt_disconnect(struct rpc_xprt *xprt){ dprintk("RPC: disconnected transport %p\n", xprt); spin_lock_bh(&xprt->sock_lock); xprt_clear_connected(xprt); rpc_wake_up_status(&xprt->pending, -ENOTCONN); spin_unlock_bh(&xprt->sock_lock);}/* * Used to allow disconnection when we've been idle */static voidxprt_init_autodisconnect(unsigned long data){ struct rpc_xprt *xprt = (struct rpc_xprt *)data; spin_lock(&xprt->sock_lock); if (!list_empty(&xprt->recv) || xprt->shutdown) goto out_abort; if (test_and_set_bit(XPRT_LOCKED, &xprt->sockstate)) goto out_abort; spin_unlock(&xprt->sock_lock); /* Let keventd close the socket */ if (test_bit(XPRT_CONNECTING, &xprt->sockstate) != 0) xprt_release_write(xprt, NULL); else schedule_work(&xprt->task_cleanup); return;out_abort: spin_unlock(&xprt->sock_lock);}static void xprt_socket_connect(void *args){ struct rpc_xprt *xprt = (struct rpc_xprt *)args; struct socket *sock = xprt->sock; int status = -EIO; if (xprt->shutdown || xprt->addr.sin_port == 0) goto out; /* * Start by resetting any existing state */ xprt_close(xprt); sock = xprt_create_socket(xprt, xprt->prot, xprt->resvport); if (sock == NULL) { /* couldn't create socket or bind to reserved port; * this is likely a permanent error, so cause an abort */ goto out; } xprt_bind_socket(xprt, sock); xprt_sock_setbufsize(xprt); status = 0; if (!xprt->stream) goto out; /* * Tell the socket layer to start connecting... */ status = sock->ops->connect(sock, (struct sockaddr *) &xprt->addr, sizeof(xprt->addr), O_NONBLOCK); dprintk("RPC: %p connect status %d connected %d sock state %d\n", xprt, -status, xprt_connected(xprt), sock->sk->sk_state); if (status < 0) { switch (status) { case -EINPROGRESS: case -EALREADY: goto out_clear; } }out: if (status < 0) rpc_wake_up_status(&xprt->pending, status); else rpc_wake_up(&xprt->pending);out_clear: smp_mb__before_clear_bit(); clear_bit(XPRT_CONNECTING, &xprt->sockstate); smp_mb__after_clear_bit();}/* * Attempt to connect a TCP socket. * */void xprt_connect(struct rpc_task *task){ struct rpc_xprt *xprt = task->tk_xprt; dprintk("RPC: %4d xprt_connect xprt %p %s connected\n", task->tk_pid, xprt, (xprt_connected(xprt) ? "is" : "is not")); if (xprt->shutdown) { task->tk_status = -EIO; return; } if (!xprt->addr.sin_port) { task->tk_status = -EIO; return; } if (!xprt_lock_write(xprt, task)) return; if (xprt_connected(xprt)) goto out_write; if (task->tk_rqstp) task->tk_rqstp->rq_bytes_sent = 0;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -