📄 clnt.c
字号:
struct rpc_message *msg, int flags, const struct rpc_call_ops *ops, void *data){ struct rpc_task *task, *ret; sigset_t oldset; task = rpc_new_task(clnt, flags, ops, data); if (task == NULL) { rpc_release_calldata(ops, data); return ERR_PTR(-ENOMEM); } /* Mask signals on synchronous RPC calls and RPCSEC_GSS upcalls */ rpc_task_sigmask(task, &oldset); if (msg != NULL) { rpc_call_setup(task, msg, 0); if (task->tk_status != 0) { ret = ERR_PTR(task->tk_status); rpc_put_task(task); goto out; } } atomic_inc(&task->tk_count); rpc_execute(task); ret = task;out: rpc_restore_sigmask(&oldset); return ret;}/** * rpc_call_sync - Perform a synchronous RPC call * @clnt: pointer to RPC client * @msg: RPC call parameters * @flags: RPC call flags */int rpc_call_sync(struct rpc_clnt *clnt, struct rpc_message *msg, int flags){ struct rpc_task *task; int status; BUG_ON(flags & RPC_TASK_ASYNC); task = rpc_do_run_task(clnt, msg, flags, &rpc_default_ops, NULL); if (IS_ERR(task)) return PTR_ERR(task); status = task->tk_status; rpc_put_task(task); return status;}/** * rpc_call_async - Perform an asynchronous RPC call * @clnt: pointer to RPC client * @msg: RPC call parameters * @flags: RPC call flags * @ops: RPC call ops * @data: user call data */intrpc_call_async(struct rpc_clnt *clnt, struct rpc_message *msg, int flags, const struct rpc_call_ops *tk_ops, void *data){ struct rpc_task *task; task = rpc_do_run_task(clnt, msg, flags|RPC_TASK_ASYNC, tk_ops, data); if (IS_ERR(task)) return PTR_ERR(task); rpc_put_task(task); return 0;}/** * rpc_run_task - Allocate a new RPC task, then run rpc_execute against it * @clnt: pointer to RPC client * @flags: RPC flags * @ops: RPC call ops * @data: user call data */struct rpc_task *rpc_run_task(struct rpc_clnt *clnt, int flags, const struct rpc_call_ops *tk_ops, void *data){ return rpc_do_run_task(clnt, NULL, flags, tk_ops, data);}EXPORT_SYMBOL(rpc_run_task);voidrpc_call_setup(struct rpc_task *task, struct rpc_message *msg, int flags){ task->tk_msg = *msg; task->tk_flags |= flags; /* Bind the user cred */ if (task->tk_msg.rpc_cred != NULL) rpcauth_holdcred(task); else rpcauth_bindcred(task); if (task->tk_status == 0) task->tk_action = call_start; else task->tk_action = rpc_exit_task;}/** * rpc_peeraddr - extract remote peer address from clnt's xprt * @clnt: RPC client structure * @buf: target buffer * @size: length of target buffer * * Returns the number of bytes that are actually in the stored address. */size_t rpc_peeraddr(struct rpc_clnt *clnt, struct sockaddr *buf, size_t bufsize){ size_t bytes; struct rpc_xprt *xprt = clnt->cl_xprt; bytes = sizeof(xprt->addr); if (bytes > bufsize) bytes = bufsize; memcpy(buf, &clnt->cl_xprt->addr, bytes); return xprt->addrlen;}EXPORT_SYMBOL_GPL(rpc_peeraddr);/** * rpc_peeraddr2str - return remote peer address in printable format * @clnt: RPC client structure * @format: address format * */char *rpc_peeraddr2str(struct rpc_clnt *clnt, enum rpc_display_format_t format){ struct rpc_xprt *xprt = clnt->cl_xprt; if (xprt->address_strings[format] != NULL) return xprt->address_strings[format]; else return "unprintable";}EXPORT_SYMBOL_GPL(rpc_peeraddr2str);voidrpc_setbufsize(struct rpc_clnt *clnt, unsigned int sndsize, unsigned int rcvsize){ struct rpc_xprt *xprt = clnt->cl_xprt; if (xprt->ops->set_buffer_size) xprt->ops->set_buffer_size(xprt, sndsize, rcvsize);}/* * Return size of largest payload RPC client can support, in bytes * * For stream transports, this is one RPC record fragment (see RFC * 1831), as we don't support multi-record requests yet. For datagram * transports, this is the size of an IP packet minus the IP, UDP, and * RPC header sizes. */size_t rpc_max_payload(struct rpc_clnt *clnt){ return clnt->cl_xprt->max_payload;}EXPORT_SYMBOL_GPL(rpc_max_payload);/** * rpc_force_rebind - force transport to check that remote port is unchanged * @clnt: client to rebind * */void rpc_force_rebind(struct rpc_clnt *clnt){ if (clnt->cl_autobind) xprt_clear_bound(clnt->cl_xprt);}EXPORT_SYMBOL_GPL(rpc_force_rebind);/* * Restart an (async) RPC call. Usually called from within the * exit handler. */voidrpc_restart_call(struct rpc_task *task){ if (RPC_ASSASSINATED(task)) return; task->tk_action = call_start;}/* * 0. Initial state * * Other FSM states can be visited zero or more times, but * this state is visited exactly once for each RPC. */static voidcall_start(struct rpc_task *task){ struct rpc_clnt *clnt = task->tk_client; dprintk("RPC: %5u call_start %s%d proc %d (%s)\n", task->tk_pid, clnt->cl_protname, clnt->cl_vers, task->tk_msg.rpc_proc->p_proc, (RPC_IS_ASYNC(task) ? "async" : "sync")); /* Increment call count */ task->tk_msg.rpc_proc->p_count++; clnt->cl_stats->rpccnt++; task->tk_action = call_reserve;}/* * 1. Reserve an RPC call slot */static voidcall_reserve(struct rpc_task *task){ dprint_status(task); if (!rpcauth_uptodatecred(task)) { task->tk_action = call_refresh; return; } task->tk_status = 0; task->tk_action = call_reserveresult; xprt_reserve(task);}/* * 1b. Grok the result of xprt_reserve() */static voidcall_reserveresult(struct rpc_task *task){ int status = task->tk_status; dprint_status(task); /* * After a call to xprt_reserve(), we must have either * a request slot or else an error status. */ task->tk_status = 0; if (status >= 0) { if (task->tk_rqstp) { task->tk_action = call_allocate; return; } printk(KERN_ERR "%s: status=%d, but no request slot, exiting\n", __FUNCTION__, status); rpc_exit(task, -EIO); return; } /* * Even though there was an error, we may have acquired * a request slot somehow. Make sure not to leak it. */ if (task->tk_rqstp) { printk(KERN_ERR "%s: status=%d, request allocated anyway\n", __FUNCTION__, status); xprt_release(task); } switch (status) { case -EAGAIN: /* woken up; retry */ task->tk_action = call_reserve; return; case -EIO: /* probably a shutdown */ break; default: printk(KERN_ERR "%s: unrecognized error %d, exiting\n", __FUNCTION__, status); break; } rpc_exit(task, status);}/* * 2. Allocate the buffer. For details, see sched.c:rpc_malloc. * (Note: buffer memory is freed in xprt_release). */static voidcall_allocate(struct rpc_task *task){ unsigned int slack = task->tk_msg.rpc_cred->cr_auth->au_cslack; struct rpc_rqst *req = task->tk_rqstp; struct rpc_xprt *xprt = task->tk_xprt; struct rpc_procinfo *proc = task->tk_msg.rpc_proc; dprint_status(task); task->tk_status = 0; task->tk_action = call_bind; if (req->rq_buffer) return; if (proc->p_proc != 0) { BUG_ON(proc->p_arglen == 0); if (proc->p_decode != NULL) BUG_ON(proc->p_replen == 0); } /* * Calculate the size (in quads) of the RPC call * and reply headers, and convert both values * to byte sizes. */ req->rq_callsize = RPC_CALLHDRSIZE + (slack << 1) + proc->p_arglen; req->rq_callsize <<= 2; req->rq_rcvsize = RPC_REPHDRSIZE + slack + proc->p_replen; req->rq_rcvsize <<= 2; req->rq_buffer = xprt->ops->buf_alloc(task, req->rq_callsize + req->rq_rcvsize); if (req->rq_buffer != NULL) return; dprintk("RPC: %5u rpc_buffer allocation failed\n", task->tk_pid); if (RPC_IS_ASYNC(task) || !signalled()) { task->tk_action = call_allocate; rpc_delay(task, HZ>>4); return; } rpc_exit(task, -ERESTARTSYS);}static inline intrpc_task_need_encode(struct rpc_task *task){ return task->tk_rqstp->rq_snd_buf.len == 0;}static inline voidrpc_task_force_reencode(struct rpc_task *task){ task->tk_rqstp->rq_snd_buf.len = 0;}static inline voidrpc_xdr_buf_init(struct xdr_buf *buf, void *start, size_t len){ buf->head[0].iov_base = start; buf->head[0].iov_len = len; buf->tail[0].iov_len = 0; buf->page_len = 0; buf->flags = 0; buf->len = 0; buf->buflen = len;}/* * 3. Encode arguments of an RPC call */static voidcall_encode(struct rpc_task *task){ struct rpc_rqst *req = task->tk_rqstp; kxdrproc_t encode; __be32 *p; dprint_status(task); rpc_xdr_buf_init(&req->rq_snd_buf, req->rq_buffer, req->rq_callsize); rpc_xdr_buf_init(&req->rq_rcv_buf, (char *)req->rq_buffer + req->rq_callsize, req->rq_rcvsize); /* Encode header and provided arguments */ encode = task->tk_msg.rpc_proc->p_encode; if (!(p = call_header(task))) { printk(KERN_INFO "RPC: call_header failed, exit EIO\n"); rpc_exit(task, -EIO); return; } if (encode == NULL) return; task->tk_status = rpcauth_wrap_req(task, encode, req, p, task->tk_msg.rpc_argp); if (task->tk_status == -ENOMEM) { /* XXX: Is this sane? */ rpc_delay(task, 3*HZ); task->tk_status = -EAGAIN; }}/* * 4. Get the server port number if not yet set */static voidcall_bind(struct rpc_task *task){ struct rpc_xprt *xprt = task->tk_xprt; dprint_status(task); task->tk_action = call_connect; if (!xprt_bound(xprt)) { task->tk_action = call_bind_status; task->tk_timeout = xprt->bind_timeout; xprt->ops->rpcbind(task); }}/* * 4a. Sort out bind result */static voidcall_bind_status(struct rpc_task *task){ int status = -EIO; if (task->tk_status >= 0) { dprint_status(task); task->tk_status = 0; task->tk_action = call_connect; return; } switch (task->tk_status) { case -EAGAIN: dprintk("RPC: %5u rpcbind waiting for another request " "to finish\n", task->tk_pid); /* avoid busy-waiting here -- could be a network outage. */ rpc_delay(task, 5*HZ); goto retry_timeout; case -EACCES: dprintk("RPC: %5u remote rpcbind: RPC program/version " "unavailable\n", task->tk_pid); /* fail immediately if this is an RPC ping */ if (task->tk_msg.rpc_proc->p_proc == 0) { status = -EOPNOTSUPP; break; } rpc_delay(task, 3*HZ); goto retry_timeout; case -ETIMEDOUT: dprintk("RPC: %5u rpcbind request timed out\n", task->tk_pid); goto retry_timeout; case -EPFNOSUPPORT: /* server doesn't support any rpcbind version we know of */ dprintk("RPC: %5u remote rpcbind service unavailable\n", task->tk_pid); break; case -EPROTONOSUPPORT: dprintk("RPC: %5u remote rpcbind version unavailable, retrying\n", task->tk_pid); task->tk_status = 0; task->tk_action = call_bind; return; default: dprintk("RPC: %5u unrecognized rpcbind error (%d)\n", task->tk_pid, -task->tk_status); } rpc_exit(task, status); return;retry_timeout: task->tk_action = call_timeout;}/* * 4b. Connect to the RPC server */static voidcall_connect(struct rpc_task *task){ struct rpc_xprt *xprt = task->tk_xprt; dprintk("RPC: %5u call_connect xprt %p %s connected\n", task->tk_pid, xprt, (xprt_connected(xprt) ? "is" : "is not")); task->tk_action = call_transmit; if (!xprt_connected(xprt)) { task->tk_action = call_connect_status; if (task->tk_status < 0) return; xprt_connect(task); }}/* * 4c. Sort out connect result */static voidcall_connect_status(struct rpc_task *task){ struct rpc_clnt *clnt = task->tk_client; int status = task->tk_status; dprint_status(task); task->tk_status = 0; if (status >= 0) { clnt->cl_stats->netreconn++; task->tk_action = call_transmit; return; } /* Something failed: remote service port may have changed */ rpc_force_rebind(clnt); switch (status) { case -ENOTCONN: case -EAGAIN: task->tk_action = call_bind; if (!RPC_IS_SOFT(task)) return; /* if soft mounted, test if we've timed out */
⌨️ 快捷键说明
复制代码
Ctrl + C
搜索代码
Ctrl + F
全屏模式
F11
切换主题
Ctrl + Shift + D
显示快捷键
?
增大字号
Ctrl + =
减小字号
Ctrl + -