📄 xprtsock.c
字号:
any.sa_family = AF_UNSPEC; result = kernel_connect(transport->sock, &any, sizeof(any), 0); if (result) dprintk("RPC: AF_UNSPEC connect return code %d\n", result);}static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock){ struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); if (!transport->inet) { struct sock *sk = sock->sk; write_lock_bh(&sk->sk_callback_lock); sk->sk_user_data = xprt; transport->old_data_ready = sk->sk_data_ready; transport->old_state_change = sk->sk_state_change; transport->old_write_space = sk->sk_write_space; sk->sk_data_ready = xs_tcp_data_ready; sk->sk_state_change = xs_tcp_state_change; sk->sk_write_space = xs_tcp_write_space; sk->sk_allocation = GFP_ATOMIC; /* socket options */ sk->sk_userlocks |= SOCK_BINDPORT_LOCK; sock_reset_flag(sk, SOCK_LINGER); tcp_sk(sk)->linger2 = 0; tcp_sk(sk)->nonagle |= TCP_NAGLE_OFF; xprt_clear_connected(xprt); /* Reset to new socket */ transport->sock = sock; transport->inet = sk; write_unlock_bh(&sk->sk_callback_lock); } /* Tell the socket layer to start connecting... */ xprt->stat.connect_count++; xprt->stat.connect_start = jiffies; return kernel_connect(sock, xs_addr(xprt), xprt->addrlen, O_NONBLOCK);}/** * xs_tcp_connect_worker4 - connect a TCP socket to a remote endpoint * @work: RPC transport to connect * * Invoked by a work queue tasklet. */static void xs_tcp_connect_worker4(struct work_struct *work){ struct sock_xprt *transport = container_of(work, struct sock_xprt, connect_worker.work); struct rpc_xprt *xprt = &transport->xprt; struct socket *sock = transport->sock; int err, status = -EIO; if (xprt->shutdown || !xprt_bound(xprt)) goto out; if (!sock) { /* start from scratch */ if ((err = sock_create_kern(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock)) < 0) { dprintk("RPC: can't create TCP transport socket (%d).\n", -err); goto out; } xs_reclassify_socket4(sock); if (xs_bind4(transport, sock) < 0) { sock_release(sock); goto out; } } else /* "close" the socket, preserving the local port */ xs_tcp_reuse_connection(xprt); dprintk("RPC: worker connecting xprt %p to address: %s\n", xprt, xprt->address_strings[RPC_DISPLAY_ALL]); status = xs_tcp_finish_connecting(xprt, sock); dprintk("RPC: %p connect status %d connected %d sock state %d\n", xprt, -status, xprt_connected(xprt), sock->sk->sk_state); if (status < 0) { switch (status) { case -EINPROGRESS: case -EALREADY: goto out_clear; case -ECONNREFUSED: case -ECONNRESET: /* retry with existing socket, after a delay */ break; default: /* get rid of existing socket, and retry */ xs_close(xprt); break; } }out: xprt_wake_pending_tasks(xprt, status);out_clear: xprt_clear_connecting(xprt);}/** * xs_tcp_connect_worker6 - connect a TCP socket to a remote endpoint * @work: RPC transport to connect * * Invoked by a work queue tasklet. */static void xs_tcp_connect_worker6(struct work_struct *work){ struct sock_xprt *transport = container_of(work, struct sock_xprt, connect_worker.work); struct rpc_xprt *xprt = &transport->xprt; struct socket *sock = transport->sock; int err, status = -EIO; if (xprt->shutdown || !xprt_bound(xprt)) goto out; if (!sock) { /* start from scratch */ if ((err = sock_create_kern(PF_INET6, SOCK_STREAM, IPPROTO_TCP, &sock)) < 0) { dprintk("RPC: can't create TCP transport socket (%d).\n", -err); goto out; } xs_reclassify_socket6(sock); if (xs_bind6(transport, sock) < 0) { sock_release(sock); goto out; } } else /* "close" the socket, preserving the local port */ xs_tcp_reuse_connection(xprt); dprintk("RPC: worker connecting xprt %p to address: %s\n", xprt, xprt->address_strings[RPC_DISPLAY_ALL]); status = xs_tcp_finish_connecting(xprt, sock); dprintk("RPC: %p connect status %d connected %d sock state %d\n", xprt, -status, xprt_connected(xprt), sock->sk->sk_state); if (status < 0) { switch (status) { case -EINPROGRESS: case -EALREADY: goto out_clear; case -ECONNREFUSED: case -ECONNRESET: /* retry with existing socket, after a delay */ break; default: /* get rid of existing socket, and retry */ xs_close(xprt); break; } }out: xprt_wake_pending_tasks(xprt, status);out_clear: xprt_clear_connecting(xprt);}/** * xs_connect - connect a socket to a remote endpoint * @task: address of RPC task that manages state of connect request * * TCP: If the remote end dropped the connection, delay reconnecting. * * UDP socket connects are synchronous, but we use a work queue anyway * to guarantee that even unprivileged user processes can set up a * socket on a privileged port. * * If a UDP socket connect fails, the delay behavior here prevents * retry floods (hard mounts). */static void xs_connect(struct rpc_task *task){ struct rpc_xprt *xprt = task->tk_xprt; struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); if (xprt_test_and_set_connecting(xprt)) return; if (transport->sock != NULL) { dprintk("RPC: xs_connect delayed xprt %p for %lu " "seconds\n", xprt, xprt->reestablish_timeout / HZ); queue_delayed_work(rpciod_workqueue, &transport->connect_worker, xprt->reestablish_timeout); xprt->reestablish_timeout <<= 1; if (xprt->reestablish_timeout > XS_TCP_MAX_REEST_TO) xprt->reestablish_timeout = XS_TCP_MAX_REEST_TO; } else { dprintk("RPC: xs_connect scheduled xprt %p\n", xprt); queue_delayed_work(rpciod_workqueue, &transport->connect_worker, 0); }}/** * xs_udp_print_stats - display UDP socket-specifc stats * @xprt: rpc_xprt struct containing statistics * @seq: output file * */static void xs_udp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq){ struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); seq_printf(seq, "\txprt:\tudp %u %lu %lu %lu %lu %Lu %Lu\n", transport->port, xprt->stat.bind_count, xprt->stat.sends, xprt->stat.recvs, xprt->stat.bad_xids, xprt->stat.req_u, xprt->stat.bklog_u);}/** * xs_tcp_print_stats - display TCP socket-specifc stats * @xprt: rpc_xprt struct containing statistics * @seq: output file * */static void xs_tcp_print_stats(struct rpc_xprt *xprt, struct seq_file *seq){ struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt); long idle_time = 0; if (xprt_connected(xprt)) idle_time = (long)(jiffies - xprt->last_used) / HZ; seq_printf(seq, "\txprt:\ttcp %u %lu %lu %lu %ld %lu %lu %lu %Lu %Lu\n", transport->port, xprt->stat.bind_count, xprt->stat.connect_count, xprt->stat.connect_time, idle_time, xprt->stat.sends, xprt->stat.recvs, xprt->stat.bad_xids, xprt->stat.req_u, xprt->stat.bklog_u);}static struct rpc_xprt_ops xs_udp_ops = { .set_buffer_size = xs_udp_set_buffer_size, .reserve_xprt = xprt_reserve_xprt_cong, .release_xprt = xprt_release_xprt_cong, .rpcbind = rpcb_getport_async, .set_port = xs_set_port, .connect = xs_connect, .buf_alloc = rpc_malloc, .buf_free = rpc_free, .send_request = xs_udp_send_request, .set_retrans_timeout = xprt_set_retrans_timeout_rtt, .timer = xs_udp_timer, .release_request = xprt_release_rqst_cong, .close = xs_close, .destroy = xs_destroy, .print_stats = xs_udp_print_stats,};static struct rpc_xprt_ops xs_tcp_ops = { .reserve_xprt = xprt_reserve_xprt, .release_xprt = xs_tcp_release_xprt, .rpcbind = rpcb_getport_async, .set_port = xs_set_port, .connect = xs_connect, .buf_alloc = rpc_malloc, .buf_free = rpc_free, .send_request = xs_tcp_send_request, .set_retrans_timeout = xprt_set_retrans_timeout_def, .close = xs_close, .destroy = xs_destroy, .print_stats = xs_tcp_print_stats,};static struct rpc_xprt *xs_setup_xprt(struct xprt_create *args, unsigned int slot_table_size){ struct rpc_xprt *xprt; struct sock_xprt *new; if (args->addrlen > sizeof(xprt->addr)) { dprintk("RPC: xs_setup_xprt: address too large\n"); return ERR_PTR(-EBADF); } new = kzalloc(sizeof(*new), GFP_KERNEL); if (new == NULL) { dprintk("RPC: xs_setup_xprt: couldn't allocate " "rpc_xprt\n"); return ERR_PTR(-ENOMEM); } xprt = &new->xprt; xprt->max_reqs = slot_table_size; xprt->slot = kcalloc(xprt->max_reqs, sizeof(struct rpc_rqst), GFP_KERNEL); if (xprt->slot == NULL) { kfree(xprt); dprintk("RPC: xs_setup_xprt: couldn't allocate slot " "table\n"); return ERR_PTR(-ENOMEM); } memcpy(&xprt->addr, args->dstaddr, args->addrlen); xprt->addrlen = args->addrlen; if (args->srcaddr) memcpy(&new->addr, args->srcaddr, args->addrlen); new->port = xs_get_random_port(); return xprt;}/** * xs_setup_udp - Set up transport to use a UDP socket * @args: rpc transport creation arguments * */static struct rpc_xprt *xs_setup_udp(struct xprt_create *args){ struct sockaddr *addr = args->dstaddr; struct rpc_xprt *xprt; struct sock_xprt *transport; xprt = xs_setup_xprt(args, xprt_udp_slot_table_entries); if (IS_ERR(xprt)) return xprt; transport = container_of(xprt, struct sock_xprt, xprt); xprt->prot = IPPROTO_UDP; xprt->tsh_size = 0; /* XXX: header size can vary due to auth type, IPv6, etc. */ xprt->max_payload = (1U << 16) - (MAX_HEADER << 3); xprt->bind_timeout = XS_BIND_TO; xprt->connect_timeout = XS_UDP_CONN_TO; xprt->reestablish_timeout = XS_UDP_REEST_TO; xprt->idle_timeout = XS_IDLE_DISC_TO; xprt->ops = &xs_udp_ops; if (args->timeout) xprt->timeout = *args->timeout; else xprt_set_timeout(&xprt->timeout, 5, 5 * HZ); switch (addr->sa_family) { case AF_INET: if (((struct sockaddr_in *)addr)->sin_port != htons(0)) xprt_set_bound(xprt); INIT_DELAYED_WORK(&transport->connect_worker, xs_udp_connect_worker4); xs_format_ipv4_peer_addresses(xprt); break; case AF_INET6: if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0)) xprt_set_bound(xprt); INIT_DELAYED_WORK(&transport->connect_worker, xs_udp_connect_worker6); xs_format_ipv6_peer_addresses(xprt); break; default: kfree(xprt); return ERR_PTR(-EAFNOSUPPORT); } dprintk("RPC: set up transport to address %s\n", xprt->address_strings[RPC_DISPLAY_ALL]); if (try_module_get(THIS_MODULE)) return xprt; kfree(xprt->slot); kfree(xprt); return ERR_PTR(-EINVAL);}/** * xs_setup_tcp - Set up transport to use a TCP socket * @args: rpc transport creation arguments * */static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args){ struct sockaddr *addr = args->dstaddr; struct rpc_xprt *xprt; struct sock_xprt *transport; xprt = xs_setup_xprt(args, xprt_tcp_slot_table_entries); if (IS_ERR(xprt)) return xprt; transport = container_of(xprt, struct sock_xprt, xprt); xprt->prot = IPPROTO_TCP; xprt->tsh_size = sizeof(rpc_fraghdr) / sizeof(u32); xprt->max_payload = RPC_MAX_FRAGMENT_SIZE; xprt->bind_timeout = XS_BIND_TO; xprt->connect_timeout = XS_TCP_CONN_TO; xprt->reestablish_timeout = XS_TCP_INIT_REEST_TO; xprt->idle_timeout = XS_IDLE_DISC_TO; xprt->ops = &xs_tcp_ops; if (args->timeout) xprt->timeout = *args->timeout; else xprt_set_timeout(&xprt->timeout, 2, 60 * HZ); switch (addr->sa_family) { case AF_INET: if (((struct sockaddr_in *)addr)->sin_port != htons(0)) xprt_set_bound(xprt); INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_connect_worker4); xs_format_ipv4_peer_addresses(xprt); break; case AF_INET6: if (((struct sockaddr_in6 *)addr)->sin6_port != htons(0)) xprt_set_bound(xprt); INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_connect_worker6); xs_format_ipv6_peer_addresses(xprt); break; default: kfree(xprt); return ERR_PTR(-EAFNOSUPPORT); } dprintk("RPC: set up transport to address %s\n", xprt->address_strings[RPC_DISPLAY_ALL]); if (try_module_get(THIS_MODULE)) return xprt; kfree(xprt->slot); kfree(xprt); return ERR_PTR(-EINVAL);}static struct xprt_class xs_udp_transport = { .list = LIST_HEAD_INIT(xs_udp_transport.list), .name = "udp", .owner = THIS_MODULE, .ident = IPPROTO_UDP, .setup = xs_setup_udp,};static struct xprt_class xs_tcp_transport = { .list = LIST_HEAD_INIT(xs_tcp_transport.list), .name = "tcp", .owner = THIS_MODULE, .ident = IPPROTO_TCP, .setup = xs_setup_tcp,};/** * init_socket_xprt - set up xprtsock's sysctls, register with RPC client * */int init_socket_xprt(void){#ifdef RPC_DEBUG if (!sunrpc_table_header) sunrpc_table_header = register_sysctl_table(sunrpc_table);#endif xprt_register_transport(&xs_udp_transport); xprt_register_transport(&xs_tcp_transport); return 0;}/** * cleanup_socket_xprt - remove xprtsock's sysctls, unregister * */void cleanup_socket_xprt(void){#ifdef RPC_DEBUG if (sunrpc_table_header) { unregister_sysctl_table(sunrpc_table_header); sunrpc_table_header = NULL; }#endif xprt_unregister_transport(&xs_udp_transport); xprt_unregister_transport(&xs_tcp_transport);}
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -