📄 clnt.c
字号:
/* * linux/net/sunrpc/rpcclnt.c * * This file contains the high-level RPC interface. * It is modeled as a finite state machine to support both synchronous * and asynchronous requests. * * - RPC header generation and argument serialization. * - Credential refresh. * - TCP connect handling. * - Retry of operation when it is suspected the operation failed because * of uid squashing on the server, or when the credentials were stale * and need to be refreshed, or when a packet was damaged in transit. * This may be have to be moved to the VFS layer. * * NB: BSD uses a more intelligent approach to guessing when a request * or reply has been lost by keeping the RTO estimate for each procedure. * We currently make do with a constant timeout value. * * Copyright (C) 1992,1993 Rick Sladkey <jrs@world.std.com> * Copyright (C) 1995,1996 Olaf Kirch <okir@monad.swb.de> */#include <asm/system.h>#include <linux/types.h>#include <linux/mm.h>#include <linux/slab.h>#include <linux/in.h>#include <linux/utsname.h>#include <linux/sunrpc/clnt.h>#include <linux/workqueue.h>#include <linux/sunrpc/rpc_pipe_fs.h>#include <linux/nfs.h>#define RPC_SLACK_SPACE (1024) /* total overkill */#ifdef RPC_DEBUG# define RPCDBG_FACILITY RPCDBG_CALL#endifstatic DECLARE_WAIT_QUEUE_HEAD(destroy_wait);static void call_start(struct rpc_task *task);static void call_reserve(struct rpc_task *task);static void call_reserveresult(struct rpc_task *task);static void call_allocate(struct rpc_task *task);static void call_encode(struct rpc_task *task);static void call_decode(struct rpc_task *task);static void call_bind(struct rpc_task *task);static void call_transmit(struct rpc_task *task);static void call_status(struct rpc_task *task);static void call_refresh(struct rpc_task *task);static void call_refreshresult(struct rpc_task *task);static void call_timeout(struct rpc_task *task);static void call_connect(struct rpc_task *task);static void call_connect_status(struct rpc_task *task);static u32 * call_header(struct rpc_task *task);static u32 * call_verify(struct rpc_task *task);static intrpc_setup_pipedir(struct rpc_clnt *clnt, char *dir_name){ static uint32_t clntid; int error; if (dir_name == NULL) return 0; for (;;) { snprintf(clnt->cl_pathname, sizeof(clnt->cl_pathname), "%s/clnt%x", dir_name, (unsigned int)clntid++); clnt->cl_pathname[sizeof(clnt->cl_pathname) - 1] = '\0'; clnt->cl_dentry = rpc_mkdir(clnt->cl_pathname, clnt); if (!IS_ERR(clnt->cl_dentry)) return 0; error = PTR_ERR(clnt->cl_dentry); if (error != -EEXIST) { printk(KERN_INFO "RPC: Couldn't create pipefs entry %s, error %d\n", clnt->cl_pathname, error); return error; } }}/* * Create an RPC client * FIXME: This should also take a flags argument (as in task->tk_flags). * It's called (among others) from pmap_create_client, which may in * turn be called by an async task. In this case, rpciod should not be * made to sleep too long. */struct rpc_clnt *rpc_create_client(struct rpc_xprt *xprt, char *servname, struct rpc_program *program, u32 vers, rpc_authflavor_t flavor){ struct rpc_version *version; struct rpc_clnt *clnt = NULL; int err; int len; dprintk("RPC: creating %s client for %s (xprt %p)\n", program->name, servname, xprt); err = -EINVAL; if (!xprt) goto out_err; if (vers >= program->nrvers || !(version = program->version[vers])) goto out_err; err = -ENOMEM; clnt = (struct rpc_clnt *) kmalloc(sizeof(*clnt), GFP_KERNEL); if (!clnt) goto out_err; memset(clnt, 0, sizeof(*clnt)); atomic_set(&clnt->cl_users, 0); atomic_set(&clnt->cl_count, 1); clnt->cl_parent = clnt; clnt->cl_server = clnt->cl_inline_name; len = strlen(servname) + 1; if (len > sizeof(clnt->cl_inline_name)) { char *buf = kmalloc(len, GFP_KERNEL); if (buf != 0) clnt->cl_server = buf; else len = sizeof(clnt->cl_inline_name); } strlcpy(clnt->cl_server, servname, len); clnt->cl_xprt = xprt; clnt->cl_procinfo = version->procs; clnt->cl_maxproc = version->nrprocs; clnt->cl_protname = program->name; clnt->cl_pmap = &clnt->cl_pmap_default; clnt->cl_port = xprt->addr.sin_port; clnt->cl_prog = program->number; clnt->cl_vers = version->number; clnt->cl_prot = xprt->prot; clnt->cl_stats = program->stats; rpc_init_wait_queue(&clnt->cl_pmap_default.pm_bindwait, "bindwait"); if (!clnt->cl_port) clnt->cl_autobind = 1; clnt->cl_rtt = &clnt->cl_rtt_default; rpc_init_rtt(&clnt->cl_rtt_default, xprt->timeout.to_initval); err = rpc_setup_pipedir(clnt, program->pipe_dir_name); if (err < 0) goto out_no_path; err = -ENOMEM; if (!rpcauth_create(flavor, clnt)) { printk(KERN_INFO "RPC: Couldn't create auth handle (flavor %u)\n", flavor); goto out_no_auth; } /* save the nodename */ clnt->cl_nodelen = strlen(system_utsname.nodename); if (clnt->cl_nodelen > UNX_MAXNODENAME) clnt->cl_nodelen = UNX_MAXNODENAME; memcpy(clnt->cl_nodename, system_utsname.nodename, clnt->cl_nodelen); return clnt;out_no_auth: rpc_rmdir(clnt->cl_pathname);out_no_path: if (clnt->cl_server != clnt->cl_inline_name) kfree(clnt->cl_server); kfree(clnt);out_err: return ERR_PTR(err);}/* * This function clones the RPC client structure. It allows us to share the * same transport while varying parameters such as the authentication * flavour. */struct rpc_clnt *rpc_clone_client(struct rpc_clnt *clnt){ struct rpc_clnt *new; new = (struct rpc_clnt *)kmalloc(sizeof(*new), GFP_KERNEL); if (!new) goto out_no_clnt; memcpy(new, clnt, sizeof(*new)); atomic_set(&new->cl_count, 1); atomic_set(&new->cl_users, 0); new->cl_parent = clnt; atomic_inc(&clnt->cl_count); /* Duplicate portmapper */ rpc_init_wait_queue(&new->cl_pmap_default.pm_bindwait, "bindwait"); /* Turn off autobind on clones */ new->cl_autobind = 0; new->cl_oneshot = 0; new->cl_dead = 0; rpc_init_rtt(&new->cl_rtt_default, clnt->cl_xprt->timeout.to_initval); if (new->cl_auth) atomic_inc(&new->cl_auth->au_count); return new;out_no_clnt: printk(KERN_INFO "RPC: out of memory in %s\n", __FUNCTION__); return ERR_PTR(-ENOMEM);}/* * Properly shut down an RPC client, terminating all outstanding * requests. Note that we must be certain that cl_oneshot and * cl_dead are cleared, or else the client would be destroyed * when the last task releases it. */intrpc_shutdown_client(struct rpc_clnt *clnt){ dprintk("RPC: shutting down %s client for %s, tasks=%d\n", clnt->cl_protname, clnt->cl_server, atomic_read(&clnt->cl_users)); while (atomic_read(&clnt->cl_users) > 0) { /* Don't let rpc_release_client destroy us */ clnt->cl_oneshot = 0; clnt->cl_dead = 0; rpc_killall_tasks(clnt); sleep_on_timeout(&destroy_wait, 1*HZ); } if (atomic_read(&clnt->cl_users) < 0) { printk(KERN_ERR "RPC: rpc_shutdown_client clnt %p tasks=%d\n", clnt, atomic_read(&clnt->cl_users));#ifdef RPC_DEBUG rpc_show_tasks();#endif BUG(); } return rpc_destroy_client(clnt);}/* * Delete an RPC client */intrpc_destroy_client(struct rpc_clnt *clnt){ if (!atomic_dec_and_test(&clnt->cl_count)) return 1; BUG_ON(atomic_read(&clnt->cl_users) != 0); dprintk("RPC: destroying %s client for %s\n", clnt->cl_protname, clnt->cl_server); if (clnt->cl_auth) { rpcauth_destroy(clnt->cl_auth); clnt->cl_auth = NULL; } if (clnt->cl_parent != clnt) { rpc_destroy_client(clnt->cl_parent); goto out_free; } if (clnt->cl_pathname[0]) rpc_rmdir(clnt->cl_pathname); if (clnt->cl_xprt) { xprt_destroy(clnt->cl_xprt); clnt->cl_xprt = NULL; } if (clnt->cl_server != clnt->cl_inline_name) kfree(clnt->cl_server);out_free: kfree(clnt); return 0;}/* * Release an RPC client */voidrpc_release_client(struct rpc_clnt *clnt){ dprintk("RPC: rpc_release_client(%p, %d)\n", clnt, atomic_read(&clnt->cl_users)); if (!atomic_dec_and_test(&clnt->cl_users)) return; wake_up(&destroy_wait); if (clnt->cl_oneshot || clnt->cl_dead) rpc_destroy_client(clnt);}/* * Default callback for async RPC calls */static voidrpc_default_callback(struct rpc_task *task){}/* * Export the signal mask handling for aysnchronous code that * sleeps on RPC calls */ void rpc_clnt_sigmask(struct rpc_clnt *clnt, sigset_t *oldset){ unsigned long sigallow = sigmask(SIGKILL); unsigned long irqflags; /* Turn off various signals */ if (clnt->cl_intr) { struct k_sigaction *action = current->sighand->action; if (action[SIGINT-1].sa.sa_handler == SIG_DFL) sigallow |= sigmask(SIGINT); if (action[SIGQUIT-1].sa.sa_handler == SIG_DFL) sigallow |= sigmask(SIGQUIT); } spin_lock_irqsave(¤t->sighand->siglock, irqflags); *oldset = current->blocked; siginitsetinv(¤t->blocked, sigallow & ~oldset->sig[0]); recalc_sigpending(); spin_unlock_irqrestore(¤t->sighand->siglock, irqflags);}void rpc_clnt_sigunmask(struct rpc_clnt *clnt, sigset_t *oldset){ unsigned long irqflags; spin_lock_irqsave(¤t->sighand->siglock, irqflags); current->blocked = *oldset; recalc_sigpending(); spin_unlock_irqrestore(¤t->sighand->siglock, irqflags);}/* * New rpc_call implementation */int rpc_call_sync(struct rpc_clnt *clnt, struct rpc_message *msg, int flags){ struct rpc_task *task; sigset_t oldset; int status; /* If this client is slain all further I/O fails */ if (clnt->cl_dead) return -EIO; BUG_ON(flags & RPC_TASK_ASYNC); rpc_clnt_sigmask(clnt, &oldset); status = -ENOMEM; task = rpc_new_task(clnt, NULL, flags); if (task == NULL) goto out; rpc_call_setup(task, msg, 0); /* Set up the call info struct and execute the task */ if (task->tk_status == 0) status = rpc_execute(task); else { status = task->tk_status; rpc_release_task(task); }out: rpc_clnt_sigunmask(clnt, &oldset); return status;}/* * New rpc_call implementation */intrpc_call_async(struct rpc_clnt *clnt, struct rpc_message *msg, int flags, rpc_action callback, void *data){ struct rpc_task *task; sigset_t oldset; int status; /* If this client is slain all further I/O fails */ if (clnt->cl_dead) return -EIO; flags |= RPC_TASK_ASYNC; rpc_clnt_sigmask(clnt, &oldset); /* Create/initialize a new RPC task */ if (!callback) callback = rpc_default_callback; status = -ENOMEM; if (!(task = rpc_new_task(clnt, callback, flags))) goto out; task->tk_calldata = data; rpc_call_setup(task, msg, 0); /* Set up the call info struct and execute the task */ if (task->tk_status == 0) status = rpc_execute(task); else { status = task->tk_status; rpc_release_task(task); }out: rpc_clnt_sigunmask(clnt, &oldset); return status;}voidrpc_call_setup(struct rpc_task *task, struct rpc_message *msg, int flags){ task->tk_msg = *msg; task->tk_flags |= flags; /* Bind the user cred */ if (task->tk_msg.rpc_cred != NULL) { rpcauth_holdcred(task); } else rpcauth_bindcred(task); if (task->tk_status == 0) task->tk_action = call_start; else task->tk_action = NULL;}voidrpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize){ struct rpc_xprt *xprt = clnt->cl_xprt; xprt->sndsize = 0; if (sndsize) xprt->sndsize = sndsize + RPC_SLACK_SPACE; xprt->rcvsize = 0; if (rcvsize) xprt->rcvsize = rcvsize + RPC_SLACK_SPACE; if (xprt_connected(xprt)) xprt_sock_setbufsize(xprt);}/* * Restart an (async) RPC call. Usually called from within the * exit handler. */voidrpc_restart_call(struct rpc_task *task){ if (RPC_ASSASSINATED(task)) return; task->tk_action = call_start;}/* * 0. Initial state * * Other FSM states can be visited zero or more times, but * this state is visited exactly once for each RPC. */static voidcall_start(struct rpc_task *task){ struct rpc_clnt *clnt = task->tk_client; dprintk("RPC: %4d call_start %s%d proc %d (%s)\n", task->tk_pid, clnt->cl_protname, clnt->cl_vers, task->tk_msg.rpc_proc->p_proc, (RPC_IS_ASYNC(task) ? "async" : "sync")); /* Increment call count */ task->tk_msg.rpc_proc->p_count++; clnt->cl_stats->rpccnt++; task->tk_action = call_reserve;}/* * 1. Reserve an RPC call slot */static voidcall_reserve(struct rpc_task *task){ dprintk("RPC: %4d call_reserve\n", task->tk_pid); if (!rpcauth_uptodatecred(task)) { task->tk_action = call_refresh; return; } task->tk_status = 0; task->tk_action = call_reserveresult; xprt_reserve(task);}/* * 1b. Grok the result of xprt_reserve() */static voidcall_reserveresult(struct rpc_task *task){ int status = task->tk_status; dprintk("RPC: %4d call_reserveresult (status %d)\n", task->tk_pid, task->tk_status); /* * After a call to xprt_reserve(), we must have either * a request slot or else an error status. */ task->tk_status = 0; if (status >= 0) { if (task->tk_rqstp) { task->tk_action = call_allocate; return; } printk(KERN_ERR "%s: status=%d, but no request slot, exiting\n", __FUNCTION__, status); rpc_exit(task, -EIO); return; } /* * Even though there was an error, we may have acquired * a request slot somehow. Make sure not to leak it. */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -