📄 xprt.c
字号:
/* * linux/net/sunrpc/xprt.c * * This is a generic RPC call interface supporting congestion avoidance, * and asynchronous calls. * * The interface works like this: * * - When a process places a call, it allocates a request slot if * one is available. Otherwise, it sleeps on the backlog queue * (xprt_reserve). * - Next, the caller puts together the RPC message, stuffs it into * the request struct, and calls xprt_call(). * - xprt_call transmits the message and installs the caller on the * socket's wait list. At the same time, it installs a timer that * is run after the packet's timeout has expired. * - When a packet arrives, the data_ready handler walks the list of * pending requests for that socket. If a matching XID is found, the * caller is woken up, and the timer removed. * - When no reply arrives within the timeout interval, the timer is * fired by the kernel and runs xprt_timer(). It either adjusts the * timeout values (minor timeout) or wakes up the caller with a status * of -ETIMEDOUT. * - When the caller receives a notification from RPC that a reply arrived, * it should release the RPC slot, and process the reply. * If the call timed out, it may choose to retry the operation by * adjusting the initial timeout value, and simply calling rpc_call * again. * * Support for async RPC is done through a set of RPC-specific scheduling * primitives that `transparently' work for processes as well as async * tasks that rely on callbacks. * * Copyright (C) 1995-1997, Olaf Kirch <okir@monad.swb.de> * * TCP callback races fixes (C) 1998 Red Hat Software <alan@redhat.com> * TCP send fixes (C) 1998 Red Hat Software <alan@redhat.com> * TCP NFS related read + write fixes * (C) 1999 Dave Airlie, University of Limerick, Ireland <airlied@linux.ie> * * Rewrite of larges part of the code in order to stabilize TCP stuff. * Fix behaviour when socket buffer is full. * (C) 1999 Trond Myklebust <trond.myklebust@fys.uio.no> */#define __KERNEL_SYSCALLS__#include <linux/version.h>#include <linux/types.h>#include <linux/malloc.h>#include <linux/capability.h>#include <linux/sched.h>#include <linux/errno.h>#include <linux/socket.h>#include <linux/in.h>#include <linux/net.h>#include <linux/mm.h>#include <linux/udp.h>#include <linux/unistd.h>#include <linux/sunrpc/clnt.h>#include <linux/file.h>#include <net/sock.h>#include <net/checksum.h>#include <net/udp.h>#include <asm/uaccess.h>/* Following value should be > 32k + RPC overhead */#define XPRT_MIN_WRITE_SPACE (35000 + SOCK_MIN_WRITE_SPACE)extern spinlock_t rpc_queue_lock;/* * Local variables *//* Spinlock for critical sections in the code. */spinlock_t xprt_sock_lock = SPIN_LOCK_UNLOCKED;spinlock_t xprt_lock = SPIN_LOCK_UNLOCKED;#ifdef RPC_DEBUG# undef RPC_DEBUG_DATA# define RPCDBG_FACILITY RPCDBG_XPRT#endif#ifndef MAX# define MAX(a, b) ((a) > (b)? (a) : (b))# define MIN(a, b) ((a) < (b)? (a) : (b))#endif/* * Local functions */static void xprt_request_init(struct rpc_task *, struct rpc_xprt *);static void do_xprt_transmit(struct rpc_task *);static void xprt_reserve_status(struct rpc_task *task);static void xprt_disconnect(struct rpc_xprt *);static void xprt_reconn_status(struct rpc_task *task);static struct socket *xprt_create_socket(int, struct rpc_timeout *);static int xprt_bind_socket(struct rpc_xprt *, struct socket *);static void xprt_remove_pending(struct rpc_xprt *);#ifdef RPC_DEBUG_DATA/* * Print the buffer contents (first 128 bytes only--just enough for * diropres return). */static voidxprt_pktdump(char *msg, u32 *packet, unsigned int count){ u8 *buf = (u8 *) packet; int j; dprintk("RPC: %s\n", msg); for (j = 0; j < count && j < 128; j += 4) { if (!(j & 31)) { if (j) dprintk("\n"); dprintk("0x%04x ", j); } dprintk("%02x%02x%02x%02x ", buf[j], buf[j+1], buf[j+2], buf[j+3]); } dprintk("\n");}#elsestatic inline voidxprt_pktdump(char *msg, u32 *packet, unsigned int count){ /* NOP */}#endif/* * Look up RPC transport given an INET socket */static inline struct rpc_xprt *xprt_from_sock(struct sock *sk){ return (struct rpc_xprt *) sk->user_data;}/* * Adjust the iovec to move on 'n' bytes */ extern inline voidxprt_move_iov(struct msghdr *msg, struct iovec *niv, unsigned amount){ struct iovec *iv=msg->msg_iov; int i; /* * Eat any sent iovecs */ while (iv->iov_len <= amount) { amount -= iv->iov_len; iv++; msg->msg_iovlen--; } /* * And chew down the partial one */ niv[0].iov_len = iv->iov_len-amount; niv[0].iov_base =((unsigned char *)iv->iov_base)+amount; iv++; /* * And copy any others */ for(i = 1; i < msg->msg_iovlen; i++) niv[i]=*iv++; msg->msg_iov=niv;}/* * Write data to socket. */static inline intxprt_sendmsg(struct rpc_xprt *xprt, struct rpc_rqst *req){ struct socket *sock = xprt->sock; struct msghdr msg; mm_segment_t oldfs; int result; int slen = req->rq_slen - req->rq_bytes_sent; struct iovec niv[MAX_IOVEC]; if (slen <= 0) return 0; if (!sock) return -ENOTCONN; xprt_pktdump("packet data:", req->rq_svec->iov_base, req->rq_svec->iov_len); msg.msg_flags = MSG_DONTWAIT|MSG_NOSIGNAL; msg.msg_iov = req->rq_svec; msg.msg_iovlen = req->rq_snr; msg.msg_name = (struct sockaddr *) &xprt->addr; msg.msg_namelen = sizeof(xprt->addr); msg.msg_control = NULL; msg.msg_controllen = 0; /* Dont repeat bytes */ if (req->rq_bytes_sent) xprt_move_iov(&msg, niv, req->rq_bytes_sent); oldfs = get_fs(); set_fs(get_ds()); result = sock_sendmsg(sock, &msg, slen); set_fs(oldfs); dprintk("RPC: xprt_sendmsg(%d) = %d\n", slen, result); if (result >= 0) return result; switch (result) { case -ECONNREFUSED: /* When the server has died, an ICMP port unreachable message * prompts ECONNREFUSED. */ break; case -EAGAIN: if (test_bit(SOCK_NOSPACE, &sock->flags)) result = -ENOMEM; break; case -ENOTCONN: case -EPIPE: /* connection broken */ if (xprt->stream) result = -ENOTCONN; break; default: printk(KERN_NOTICE "RPC: sendmsg returned error %d\n", -result); } return result;}/* * Read data from socket */static intxprt_recvmsg(struct rpc_xprt *xprt, struct iovec *iov, int nr, unsigned len, unsigned shift){ struct socket *sock = xprt->sock; struct msghdr msg; mm_segment_t oldfs; struct iovec niv[MAX_IOVEC]; int result; if (!sock) return -ENOTCONN; msg.msg_flags = MSG_DONTWAIT|MSG_NOSIGNAL; msg.msg_iov = iov; msg.msg_iovlen = nr; msg.msg_name = NULL; msg.msg_namelen = 0; msg.msg_control = NULL; msg.msg_controllen = 0; /* Adjust the iovec if we've already filled it */ if (shift) xprt_move_iov(&msg, niv, shift); oldfs = get_fs(); set_fs(get_ds()); result = sock_recvmsg(sock, &msg, len, MSG_DONTWAIT); set_fs(oldfs); dprintk("RPC: xprt_recvmsg(iov %p, len %d) = %d\n", iov, len, result); return result;}/* * Adjust RPC congestion window * We use a time-smoothed congestion estimator to avoid heavy oscillation. */static voidxprt_adjust_cwnd(struct rpc_xprt *xprt, int result){ unsigned long cwnd; if (xprt->nocong) return; spin_lock_bh(&xprt_sock_lock); cwnd = xprt->cwnd; if (result >= 0) { if (xprt->cong < cwnd || time_before(jiffies, xprt->congtime)) goto out; /* The (cwnd >> 1) term makes sure * the result gets rounded properly. */ cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd; if (cwnd > RPC_MAXCWND) cwnd = RPC_MAXCWND; else pprintk("RPC: %lu %ld cwnd\n", jiffies, cwnd); xprt->congtime = jiffies + ((cwnd * HZ) << 2) / RPC_CWNDSCALE; dprintk("RPC: cong %08lx, cwnd was %08lx, now %08lx, " "time %ld ms\n", xprt->cong, xprt->cwnd, cwnd, (xprt->congtime-jiffies)*1000/HZ); } else if (result == -ETIMEDOUT) { if ((cwnd >>= 1) < RPC_CWNDSCALE) cwnd = RPC_CWNDSCALE; xprt->congtime = jiffies + ((cwnd * HZ) << 3) / RPC_CWNDSCALE; dprintk("RPC: cong %ld, cwnd was %ld, now %ld, " "time %ld ms\n", xprt->cong, xprt->cwnd, cwnd, (xprt->congtime-jiffies)*1000/HZ); pprintk("RPC: %lu %ld cwnd\n", jiffies, cwnd); } xprt->cwnd = cwnd; out: spin_unlock_bh(&xprt_sock_lock);}/* * Adjust timeout values etc for next retransmit */intxprt_adjust_timeout(struct rpc_timeout *to){ if (to->to_retries > 0) { if (to->to_exponential) to->to_current <<= 1; else to->to_current += to->to_increment; if (to->to_maxval && to->to_current >= to->to_maxval) to->to_current = to->to_maxval; } else { if (to->to_exponential) to->to_initval <<= 1; else to->to_initval += to->to_increment; if (to->to_maxval && to->to_initval >= to->to_maxval) to->to_initval = to->to_maxval; to->to_current = to->to_initval; } if (!to->to_current) { printk(KERN_WARNING "xprt_adjust_timeout: to_current = 0!\n"); to->to_current = 5 * HZ; } pprintk("RPC: %lu %s\n", jiffies, to->to_retries? "retrans" : "timeout"); return to->to_retries-- > 0;}/* * Close down a transport socket */static voidxprt_close(struct rpc_xprt *xprt){ struct socket *sock = xprt->sock; struct sock *sk = xprt->inet; if (!sk) return; xprt->inet = NULL; xprt->sock = NULL; sk->user_data = NULL; sk->data_ready = xprt->old_data_ready; sk->state_change = xprt->old_state_change; sk->write_space = xprt->old_write_space; xprt_disconnect(xprt); sk->no_check = 0; sock_release(sock); /* * TCP doesnt require the rpciod now - other things may * but rpciod handles that not us. */ if(xprt->stream) rpciod_down();}/* * Mark a transport as disconnected */static voidxprt_disconnect(struct rpc_xprt *xprt){ dprintk("RPC: disconnected transport %p\n", xprt); xprt_clear_connected(xprt); xprt_remove_pending(xprt); rpc_wake_up_status(&xprt->pending, -ENOTCONN);}/* * Reconnect a broken TCP connection. */voidxprt_reconnect(struct rpc_task *task){ struct rpc_xprt *xprt = task->tk_xprt; struct socket *sock = xprt->sock; struct sock *inet = xprt->inet; int status; dprintk("RPC: %4d xprt_reconnect %p connected %d\n", task->tk_pid, xprt, xprt_connected(xprt)); if (xprt->shutdown) return; if (!xprt->stream) return; if (!xprt->addr.sin_port) { task->tk_status = -EIO; return; } spin_lock(&xprt_lock); if (xprt->connecting) { task->tk_timeout = 0; rpc_sleep_on(&xprt->reconn, task, NULL, NULL); spin_unlock(&xprt_lock); return; } xprt->connecting = 1; spin_unlock(&xprt_lock); status = -ENOTCONN; if (!inet) { /* Create an unconnected socket */ if (!(sock = xprt_create_socket(xprt->prot, &xprt->timeout))) goto defer; xprt_bind_socket(xprt, sock); inet = sock->sk; } xprt_disconnect(xprt); /* Reset TCP record info */ xprt->tcp_offset = 0; xprt->tcp_copied = 0; xprt->tcp_more = 0; /* Now connect it asynchronously. */ dprintk("RPC: %4d connecting new socket\n", task->tk_pid); status = sock->ops->connect(sock, (struct sockaddr *) &xprt->addr, sizeof(xprt->addr), O_NONBLOCK); if (status < 0) { switch (status) { case -EALREADY: case -EINPROGRESS: status = 0; break; case -EISCONN: case -EPIPE: status = 0; xprt_close(xprt); goto defer; default: printk("RPC: TCP connect error %d!\n", -status); xprt_close(xprt); goto defer; } dprintk("RPC: %4d connect status %d connected %d\n", task->tk_pid, status, xprt_connected(xprt)); spin_lock_bh(&xprt_sock_lock); if (!xprt_connected(xprt)) { task->tk_timeout = xprt->timeout.to_maxval; rpc_sleep_on(&xprt->reconn, task, xprt_reconn_status, NULL); spin_unlock_bh(&xprt_sock_lock); return; } spin_unlock_bh(&xprt_sock_lock); }defer: spin_lock(&xprt_lock); xprt->connecting = 0; if (status < 0) { rpc_delay(task, 5*HZ); task->tk_status = -ENOTCONN; } rpc_wake_up(&xprt->reconn); spin_unlock(&xprt_lock);}/* * Reconnect timeout. We just mark the transport as not being in the * process of reconnecting, and leave the rest to the upper layers. */static voidxprt_reconn_status(struct rpc_task *task){ struct rpc_xprt *xprt = task->tk_xprt; dprintk("RPC: %4d xprt_reconn_timeout %d\n", task->tk_pid, task->tk_status); spin_lock(&xprt_lock); xprt->connecting = 0; rpc_wake_up(&xprt->reconn); spin_unlock(&xprt_lock);}/* * Look up the RPC request corresponding to a reply, and then lock it. */static inline struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid){ struct rpc_task *head, *task; struct rpc_rqst *req; int safe = 0; spin_lock_bh(&rpc_queue_lock); if ((head = xprt->pending.task) != NULL) { task = head; do { if ((req = task->tk_rqstp) && req->rq_xid == xid) goto out; task = task->tk_next; if (++safe > 100) { printk("xprt_lookup_rqst: loop in Q!\n"); goto out_bad; } } while (task != head); } dprintk("RPC: unknown XID %08x in reply.\n", xid); out_bad: req = NULL; out: if (req && !__rpc_lock_task(req->rq_task)) req = NULL; spin_unlock_bh(&rpc_queue_lock); return req;}/* * Complete reply received. * The TCP code relies on us to remove the request from xprt->pending. */static inline voidxprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied){ struct rpc_task *task = req->rq_task;
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -